Experiments on using Redis as DataSet for fast.ai for huge datasets


(Vitaliy Bondarenko) #1

Hi *,

I would like to share some experiments I did while taking part in kaggle quickdraw competition (by the way fast.ai gives 82% accuracy with default approach just after building the model on 1% of the data, awesome!).
I’ve tried to keep it shorter, so please don’t hesitate to ask about details if I omitted too much.

The main challenge is the amount of data. Just to summarize, train data contains about 50M images (given as a vector drawings). I was thinking about converting all those images to a regular images and then apply fast.ai learner for image files.

The first question: How long will it take just to convert all vector files to png files?

Just for reference, the function I’ve used for conversion looks like this:

def drawing_to_np_prepare_data_raw(drawing):
    # evaluates the drawing array
    drawing = eval(drawing)
    fig, ax = plt.subplots()#figsize=(6.,4.), dpi=72)
    # Close figure so it won't get displayed while transforming the set
    plt.close(fig)
    for x,y in drawing:
        ax.plot(x, y, marker='.')
        ax.axis('off')
    fig.canvas.draw()
    # Convert images to numpy array
    np_drawing = np.array(fig.canvas.renderer._renderer)
    return cv2.cvtColor(np_drawing.astype(np.uint8), cv2.COLOR_BGR2RGB)

To measure time I’ve picked up about 1% of data randomly (372576 keys) and generated those png files.

Timing results (on p2.xlarge AWS instance):
Converting files took 8603.43 seconds.
Saving files took 3083.26 seconds.

Summary: 0.031367 seconds per file. To convert all the data would require astonishing 325 hours(could be speed up using compute-optimized instances). But the worst thing, I could only use 4 concurrent threads to parallelize it because of compute/save ratio [I/O operations are not paralellizable well].

One of stranghtforward solutions is to take large Compute-optimized EC2 instance, attach multiple drives, convert files using multiple concurrent processes, then re-attach those multiple drives to p2 instance, and glue all class folders to be in the same ‘train’ folder by using mhddfs. But I became curious to try other alternatives.

Alternative 1: Redis (Elasticache)

Just to recap, Redis is in-memory data store, which makes it very fast (read access measured in 1-3 milliseconds) and very well parallelizable.

Pros:

  • should be very fast!
  • converting files are easily paralelizable
  • allow to perform multiple experiments without introducing additional latency.

Cons:

  • price (I am leaving this out of the scope here)
  • Strictly speaking, it is not persistent database, it could crash and loose the data, however backup would help in our case.

How to allow fast.ai to use Redis database as a source?

After digging fast.ai code it became quite clear for me: I need to introduce another dataset classes.

import redis

def resize_imgs_redis(redis_conn, targ, key_prefix, resume=True, fn=None):
“”"
Enlarge or shrink a set of images in the same directory to scale, such that the smaller of the height or width dimension is equal to targ.
Note:
– This function is multithreaded for efficiency.
– When destination file or folder already exist, function exists without raising an error.
“”"

s = key_prefix.split(':')
new_key_prefix = ':'.join([s[0], 'sz' + str(targ)])

# todo - change to SCAN
keys = redis_conn.keys(key_prefix)
keys = [k.decode('utf-8') for k in keys]
new_keys = [new_key(new_key_prefix, k) for k in keys]

errors = {}
def safely_process(key, new_key):
    try:
        if resume and redis_conn.get(new_key) is not None: return
        resize_img(redis_conn, targ, key, new_key, fn=fn)
    except Exception as ex:
        errors[fname] = str(ex)

if len(fnames) > 0:
    with ProcessPoolExecutor(num_cpus()) as e:
        ims = e.map(lambda p: safely_process(*p), zip(keys, new_keys))
        for _ in tqdm(ims, total=len(fnames), leave=False): pass
if errors:
    print('Some images failed to process:')
    print(json.dumps(errors, indent=2))
return new_key_predix

class RedisImageDataset(BaseDataset):

def __init__(self, redis_host, redis_port, key_prefix, keys_count, transform):
    self.redis_conn = redis.Redis(redis_host, redis_port)
    self.keys_count = keys_count
    self.key_prefix = key_prefix
    self.host, self.port = redis_host, redis_port
    super().__init__(transform)
def get_sz(self): return self.transform.sz
def get_x(self, i):
    value = self.redis_conn.get(':'.join([self.key_prefix, str(i)]))
    value = norm(decompress_array(value))
    return value
def get_n(self): return self.keys_count

def resize_imgs(self, targ, new_path, resume=True, fn=None):
    new_key_prefix = resize_imgs(self.redis_conn, targ, self.path, self.key_prefix, resume, fn)
    return self.__class__(self.host, self.port, new_key_prefix, self.keys_count, self.transform)

def denorm(self,arr):
    if type(arr) is not np.ndarray: arr = to_np(arr)
    if len(arr.shape)==3: arr = arr[None]
    return self.transform.denorm(np.rollaxis(arr,1,4))

class RedisNumpy(object):

def __init__(self, redis_host, redis_port, key_prefix, length):
    self.redis_conn = redis.Redis(redis_host, redis_port)
    self.key_prefix = key_prefix
    self.length = length

def __getitem__(self, item):
    return int(self.redis_conn.get(self.key_prefix + ':' +  str(item)))

def __len__(self):
    return self.length

def max(self):
    return 339

@property
def shape(self):
    return (self.length,)

class RedisImageArrayDataset(RedisImageDataset):

def __init__(self, redis_host, redis_port, key_prefix, keys_count, pred_prefix, classes_count, transform):
    self.pred_prefix = pred_prefix
    self.classes_count = classes_count
    self.keys_count = keys_count
    self.y = RedisNumpy(redis_host, redis_port, pred_prefix, keys_count)
    super().__init__(redis_host, redis_port, key_prefix, keys_count, transform)
def get_y(self, i):
    val = self.redis_conn.get(':'.join([self.pred_prefix, str(i)]))
    return int(val) if val is not None else 0
def get_c(self):
    return 0 #todo: enchance to have multiple columns for predictions
def get_index(self, i):
    a = self.key_prefix.split(':')
    return  self.redis_conn.get(':'.join([a[0], 'index', str(i)])) 

class RedisImageIndexArrayDataset(RedisImageArrayDataset):

def get_c(self):
    return self.classes_count
  • resize_imgs_redis function above is just an adoption of resize_imgs.
  • RedisNumpy class above is a workaround for a requirement to have ‘y’(prediction) vector loaded into memory. There is an agreement implemented how to name keys, which is basically something like this: train:orig:number, test:orig:number where train and test are configurable.

After adding from_redis class method to ImageClassifier class, i could use any fast.ai code by calling this function instead of from_paths:

@classmethod
def from_redis(cls, path, redis_host, redis_port, train_key_prefix, valid_key_prefix, train_pred_prefix, valid_pred_prefix, \
               train_keys_count, valid_keys_count, classes, test_key_prefix=None, test_pred_prefix=None, test_keys_count=0,bs=64, tfms=(None,None), num_workers=8):
    assert not(tfms[0] is None or tfms[1] is None), "please provide transformations for your train and validation sets"
    # todo: change to kwargs
    datasets = [
        RedisImageIndexArrayDataset(redis_host, redis_port, train_key_prefix, train_keys_count, train_pred_prefix, len(classes), tfms[0]), # train
        RedisImageIndexArrayDataset(redis_host, redis_port, valid_key_prefix, valid_keys_count, valid_pred_prefix, len(classes), tfms[1]), # val
        RedisImageIndexArrayDataset(redis_host, redis_port, train_key_prefix, train_keys_count, train_pred_prefix, len(classes), tfms[1]), # fix
        RedisImageIndexArrayDataset(redis_host, redis_port, valid_key_prefix, valid_keys_count, valid_pred_prefix, len(classes), tfms[0]), # aux
    ]
    if test_key_prefix is not None:
        datasets += [
            RedisImageIndexArrayDataset(redis_host, redis_port, test_key_prefix, test_keys_count, test_pred_prefix, len(classes), tfms[1]), # test
            RedisImageIndexArrayDataset(redis_host, redis_port, test_key_prefix, test_keys_count, test_pred_prefix, len(classes), tfms[0]), # test_aux
        ]
    else: datasets += [None,None]
    return cls(path, datasets, bs, num_workers, classes=classes)

And what about timing?

Converting files took 9796 seconds
Saving files to Redis took astonishing 9.5 seconds(!) which allow almost infinite parallelism.
Additional step is required after concurrent processing to renumber keys to make them consecutive: 0.00085 seconds per image, which is quite negligible.

So, if we spin up 64 cores instance we could potentially convert all data in 5 hours.

How about read timing?

To measure time I’ve just modified get_x functions in both Redis and Files related classes and got the following results:
File: 0.02 for a file
Redis (with decompression): 0.013

If you wonder what decompression means and why converting takes longer for Redis: in order to save space, I had to compress numpy arrays after applying drawing_to_np_prepare_data:

compressed_image = io.BytesIO()
np.savez_compressed(compressed_image, rawing_to_np_prepare_data_raw(drawing))
compressed_image.seek(0)

which made numpy arrays 109 times(!) smaller. However it comes for the price of decompression each time get_x is called:

def decompress_array(array):
    result = io.BytesIO(array)
    result.seek(0)
    return np.load(result)['arr_0']

This decompression allows to fit all 50M images in about 110 GB space in Redis, otherwise it would not be useful.

However, as for old fast.ai version, data is being transformed into ArrayDataset in learn.precompute(…) and reside in memory afterwards. I didn’t find a method to avoid this and use original data, however it is not hard to implement it.

Few other thoughs:

  1. Reading data from Redis (with decompression) takes 4.4 time longer then from memory (could be optimized to 3.8 times by getting minibatches instead of a single images). However Redis gives us almost infinite, easily scalable memory storage.
  2. if you wonder what is Alternative 2, then it is using pandas dataframes directly, which gives a couple interesting insights as well. It would make this post too long in my opinion.

I would appreciate any thoughts on this, and in particular:

  1. Could it be useful in general?
  2. Which use-cases/benefits you can see in Redis approach for you?
  3. Do you think it makes sense to add those functionality to fast.ai mainline?

Thanks for reading!


RuntimeError: DataLoader worker is killed by signal
(Marc Rostock) #2

Hi Vitaliy,
I think your approach is very interesting. One other usecase of working with redis would be storing the reference lists / labels also in redis, thereby maybe solving problems as discussed in this thread:
https://forums.fast.ai/t/runtimeerror-dataloader-worker-is-killed-by-signal/31277?u=marcmuc

The huge advantage of using redis vs. pandas dfs (your Alternative 2) is that redis would store the data in an outside “central” process and managing the memory (even if a little slower), whereas with pandas dfs all the data would be “copied” into the worker processes leading to problems as discussed in said thread.


(Ilia) #3

Definitely sounds like an interesting idea.

Just a small remark, I’ve generated 50K per class (~17M PNG images 256x256, RGB) in a few hours as I can recall, on my machine. At least, much faster then 325/3 hours :smile: Not sure why it takes so long on AWS? I believe the whole dataset conversion shouldn’t take too much time if you have enough cores and enough space on your SSD. Also, for that specific competition, I was generating images on the fly with a custom Dataset implementation.

Though probably you’ve used the “full” dataset instead of simplified one?

I am not sure about Alternative 2 though because of this thing. I am doing exactly this with my custom dataset and getting a huge memory leak. (Though probably I am doing something wrong also). So in my case, the worst problem is that old batches are kept in the RAM during training epoch.

@marcmuc How do you think, can we get the same leakage even in the case of Redis? Like, at the end of the day, we still need to put the data into RAM and if DataLoader is a guy who leaks memory due to python-specific multiprocessing implementation, then Redis will not help too much.


(Marc Rostock) #4

Well in theory bottom up from pytorch the only thing to do is to create a dataset that has a __getitem__ method that retrieves things from redis and a __len__ method that returns a “fixed” number that was somehow stored there at the same time the data was put into redis (so we know how many items are there). This means the actual python object of the dataset does not contain any data but just references to redis. There are no lists to change or reference counters to count up in the python objects, so this should in theory be enough.
The thing is though, that this may work in regular pyorch, but I am not sure about the current state of things with fastai, because basically the “Dataset” has been hidden away and all we deal with are ItemList objects (which is also why Vitaliys implementation above will not work anymore out of the box with all the changes, data_block api etc. in the last 2 months)
And those ItemLists obviously store these huge lists of filenames and also the labels in dicts/lists/arrays and that leads to the problem when “copying” them to workers I think. So one would have to change how an ItemList works and change the methods in there to also store all those lists in a redis database and query from there. Which I think is totally possible, but I am not very knowlegable with the fastai source code.


(Vitaliy Bondarenko) #5

Hi Marc,

thank you for your message. I think your analysis about alternatives is very interesting and I am happy that Redis approach makes sense. I’ve also found recently, that it is possible to speed it up a bit loading a minibatch at once https://discuss.pytorch.org/t/torch-dataloader-gives-torch-tensor-as-ouput/31084/6?u=ptrblck


(Vitaliy Bondarenko) #6

I am curious if pytorch v.1 has a fix for memory leak.

As for your question about long conversion, i’ve used just quick, not-palatalized piece of code on non-optimized instance :). I though it is ok for benchmark.