Managing large datasets in memory and on disk

That was just an example, you can replace the load_data function with whatever you are using for loading and preprocessing your data. I’m working with saved .npy array files (generated from 3D medical imaging data).

If you are working with normal images arranged in folders by class, you can simply use the Keras image data generator (although you could easily create a custom load_data function using Dask too).

The important part is the @delayed decorator (which lets Dask schedule the task later) and that your preprocessing function have a fixed size output.

E.g.


from scipy.misc import imread, imresize

@delayed
def load_data(fp, shape=(256,256):
    # converting fp to str to avoid errors with pathlib
    img = imread(str(fp))
    return imresize(img, shape)

You then need to make sure the shape in da.from_delayed matches the shape of your output

1 Like

Dave, thanks again! I played around with this some more, and I got it to work. It is FAST! It is great to not have to wait a long time for big datasets of large images to load into memory.

This is really an awesome group and I am learning so much here! :heart_eyes:

im running into problems using dask. whenever i call gen.flow() or start the training using the regular fit command my computer freezes and i have to restart jupyter. this is the x_train info before i start training:

dask.array<array, shape=(52800, 192, 192, 1), dtype=float64, chunksize=(1000, 192, 192, 1)>

i cant see anything wrong with this. any ideas? i didnt use the delayed feature, but simply loaded a hdf5 file as a dask array.

Try indexing a piece of the array and computing that.

e.g.

print(x_train[0,0,0,0].compute())

I haven’t worked much with hdf5, but I’m guessing the code you’re using to construct the Dask array is trying to pull the whole dataset into memory first. (If the code above works, it isn’t that.)

Can you share more of your script? And have you tried running it in a normal terminal while watching top on another screen (should see if it eats all your memory).

Thanks David for getting back to me. Much appreciate the support. I’m on the road right now and I will continue my work tomorrow. I’ll report back.

What I can say for now is that I loaded the entire dataset from an hdf5 file as a dask array. Then I split it into train, test and valid using permutation, creating 3 new dask arrays with chunks of 1000. These 3 seem to be the trouble maker. I also noticed that I cannot save them back to hdf5.

My guess is that somewhere in there you’re triggering dask.compute, which if you run it on the full array will cause your program to go out of memory (it will return a numpy array of the whole dataset).

I’d assume you’re loading the HDF file into a dask array appropriately,
but here’s an example from their docs just in case:

http://dask.pydata.org/en/latest/examples/array-hdf5.html

My guess is there’s an issue with your permutation code triggering computation of the entire array in memory.
You only need the indices for your shuffle/split, so
try running it again just with train_idx, valid_idx, test_idx = my_permute_fxn(range(52800))?

Your validation dataset would then just be a[valid_idx], etc ( a view of the original dask array).

There are also some nice pre-made functions in Scikit Learn for doing this sort of thing (see model selection).

yeah, i agree. let me ask the other way around:

i have a large dataset of images, and i want to get rid of managing tens of tousands of files. so i wanted to read the images, shuffle them, spilt them (train, valid, test) and save each as a new hdf5 file. then i could just use the dask arrays for training.

how would you do that? do you think this will also be faster than working on the images directly with something like data = da.image.imread()?

thanks

EDIT: ok im moving forward. next problem is that the ImageDataGenerator and Dask arrays dont seem to work along nicely. im hung on the gen.flow() function, which usually finishes running within a second.

I would wrap my image preprocessing scripts in the @delayed decorator (make sure you know the output shape of your preprocessing function).

I would create the overall dataset in this manner:



import dask.array as da
from dask.delayed import delayed
from scipy.misc import imread, imresize
from sklearn.model_selection import KFold

shape = (256, 256) # etc
dtype = 'float32' # etc
h5_output = '/path/to/output.h5'
filelist = ['f1','f2','f3',...]


@delayed
def my_preproc_func(fp):
    # do something that outputs
    # a constant shape
    return imresize(imread(fp), shape).astype(dtype)



data = da.stack([da.from_delayed(my_preproc_func(fp), shape=shape, dtype=dtype) for fp in filelist])
kfold = KFold(n_split=10, shuffle=True)
cv = kfold.split(data)

with h5py.File(h5_output, 'w') as h5:
    for i, (train, valid) in enumerate(cv):
        # train is everything except the fold
        fold = data[valid]
        dset = h5.create_dataset('cv_%d' % i, shape=fold.shape, dtype=fold.dtype)
        # no images should be loaded until this point...
        da.store(fold, dset)

You can now use any 9 of the datasets as your training data and the 10th as your validation (or if you have GPUs to spare train 10 models this way and stack).

Hi Pietz, sorry for the slow response, I have been away from this forum for the last few days.

I used Dask for my ImageDataGenerator, and you’re right, they don’t really play that well together. I got it to work somewhat, but not elegantly. Here are a few code snippets:

trn = da.stack([da.from_delayed(load_data(fp, shape=input_shape_full), shape=input_shape_full, dtype=np.int32) for fp in train_p.rglob(’.jpg’)])
test = da.stack([da.from_delayed(load_data(fp, shape=input_shape_full), shape=input_shape_full, dtype=np.int32) for fp in test_p.rglob(’
.jpg’)])

gen = ImageDataGenerator(horizontal_flip=True)

This is where I use gen.flow()

batches = 0

for augmented_trn, augmented_trn_labels in gen.flow(X=trn[trn_index], y=trn_labels[trn_index], batch_size = batch_size, shuffle=False):
    model.fit(augmented_trn, augmented_trn_labels, batch_size=batch_size, nb_epoch=nb_epoch, validation_data=(trn[val_index], trn_labels[val_index]))
    batches += 1
    if batches >= len(trn_index) / batch_size:
        # we need to break the loop by hand because
        # the generator loops indefinitely
        break

I hope this helps…

You both are great, thank you!

Although I know the concept of lazy execution, isn’t anything I do to a Dask Array lazy be default? Do I understand correctly that I don’t need the delayed functionality for my example? I simply load the images, process the data however i like and then save it back to hdf5. the processing will start when the data is being read to save it to file. what would be the benefit of the delayed flag? I cant come up with an example because dask seems to always work lazy.

Thanks for the ideas Christina. You mentioned how surprised you were about the performance. I cant quite replicate that. Dask is around 60% slower than the ImageDataGenerator iterator on raw images. I tried chunk sizes of 100, 1000 and 5000.

When I switch to shuffle=False the performance will be similar to the ImageDataGenerator, but only if I dont do any further processing of the array. Rescaling like x_train /= 255 on the dask array before training will double the runtime of each epoch.

EDIT: i’m done with my project and i have the code available on github (https://github.com/pietz/language-recognition)

Hi @pietz, I am also unable to get a fast performance with a dask.array. I am following the code snipped that @davecg posted, but It is actually really slow for me.

What have you done to improve the speed?

Here you can find how I use dask to create the array.

Thanks

hey @jlda,

If you want to use Dask arrays for ML (which you totally should), you need to take care of shuffling yourself. Keras fit() function will use shuffle=True be default. This destroys most of the benefits that Dask delivers with chunking.

What I do for now is shuffling and rescaling the data myself before saving it to HDF5 and turning off shuffle for fitting.

Hi @pietz, thanks for your quick reply.
I have tried to turn off shuffle and the speed is similar. I believe I am doing something wrong with the chunksize.

What I am doing is the following:

  • I have a pandas DataFrame, df, that contains the paths to several wav file and their label.
  • The DataFrame is read and shuffled.
  • I iterate over the paths’ column and feed with each element a function that loads the waveform and processes it:
x = da.stack(
    [da.from_delayed(
        delayed(get_item_data)(fp, sr, mono, post_processing, data_shape),
        shape=data_shape, dtype=np.float32) for fp in df['path']]
)
  • x has the shape:

dask.array<reshape, shape=(6764, 96, 1366, 1), dtype=float32, chunksize=(1, 96, 1366, 1)>

As you can see, there are 6764 wav files that I want to train with. But, if I understand correctly, as the chunksize is just 1, the audio files are loaded one by one when keras.fit uses them. I believe the chunksize should be bigger than 1 so more than one file is loaded when keras.fit needs them.

I have tried to rechunk to a bigger number, but my computer freezes when keras.fit is executed.

Do you think the problem could be caused by the chunksize being 1? Would you recommend me any other tactic?

Thanks

That could absolutely be the problem. As you mentioned yourself, the overhead will be similar to loading each file separately, and by doing so, it doesn’t matter if shuffle is activated or not.

From my experience with Dask, and I’m far from being a power user, what works best is this:

  1. Load raw data
  2. Preprocess it
  3. Shuffle it
  4. Save it to hdf5
  5. Load the file as a dask array using a chunk size your ram can comfortably handle
  6. feed it straight into the fit() function

Even one simple calculation usually brings down the processing speed by an enormous amount in combination with keras or tensorflow. im looking for a better solution to this as well. Looking at your code and data samples a chunk size of 100 should be easily possible. id be surprised if even 1000 gave you any problems. also, i think theres no need in machine learning to specify a multidimensional chunksize. just give it chunks=1000

if this doesnt solve your problem, ill have a detailed look at your code.

best of luck

I was curious on how much better bcolz is better at saving memories, so I tried to save a folder of 200 dogscats images into a large array using numpy, pickle, pytorch, kur.utils.idx. Here is the result:

image folder size (200 images): 5mb (total)
folder generated by bcolz: 34mb
npy file: 120mb
pickle file: 120mb
torch file: 132mb
idx file: 134mb

If the issue is on disk storage, you can always just gzip everything.

I’d be more interested in time to load files from disk to memory for training.

1 Like

thanks for the tip on gzip.

You are right the loading time is an issue on my mac. I wonder If data is stored in floyd or aws, the loading time will be much quicker right?

I usually work on much smaller sample data locally, the loading time is an issue, but it is tolerable. With floyd and aws, will loading time for full dataset still be an issue? What is your experience?

Thanks

Is it possible to save and load DirectoryIterator object of tf.contrib.keras into and from a file efficiently?

It takes it a while for DirectoryIterator to convert a folder of images to batches for iteration. So, I wonder there is a more efficient way to access this DirectoryIterator.

pickle can’t save iterator, I tried. I have no idea what else can be used to save this python iterator object.

Thanks

There’s an option for saving preprocessed files in the flow_from_directory method IIRC.