Hi *,
I would like to share some experiments I did while taking part in kaggle quickdraw competition (by the way fast.ai gives 82% accuracy with default approach just after building the model on 1% of the data, awesome!).
I’ve tried to keep it shorter, so please don’t hesitate to ask about details if I omitted too much.
The main challenge is the amount of data. Just to summarize, train data contains about 50M images (given as a vector drawings). I was thinking about converting all those images to a regular images and then apply fast.ai learner for image files.
The first question: How long will it take just to convert all vector files to png files?
Just for reference, the function I’ve used for conversion looks like this:
def drawing_to_np_prepare_data_raw(drawing):
# evaluates the drawing array
drawing = eval(drawing)
fig, ax = plt.subplots()#figsize=(6.,4.), dpi=72)
# Close figure so it won't get displayed while transforming the set
plt.close(fig)
for x,y in drawing:
ax.plot(x, y, marker='.')
ax.axis('off')
fig.canvas.draw()
# Convert images to numpy array
np_drawing = np.array(fig.canvas.renderer._renderer)
return cv2.cvtColor(np_drawing.astype(np.uint8), cv2.COLOR_BGR2RGB)
To measure time I’ve picked up about 1% of data randomly (372576 keys) and generated those png files.
Timing results (on p2.xlarge AWS instance):
Converting files took 8603.43 seconds.
Saving files took 3083.26 seconds.
Summary: 0.031367 seconds per file. To convert all the data would require astonishing 325 hours(could be speed up using compute-optimized instances). But the worst thing, I could only use 4 concurrent threads to parallelize it because of compute/save ratio [I/O operations are not paralellizable well].
One of stranghtforward solutions is to take large Compute-optimized EC2 instance, attach multiple drives, convert files using multiple concurrent processes, then re-attach those multiple drives to p2 instance, and glue all class folders to be in the same ‘train’ folder by using mhddfs. But I became curious to try other alternatives.
Alternative 1: Redis (Elasticache)
Just to recap, Redis is in-memory data store, which makes it very fast (read access measured in 1-3 milliseconds) and very well parallelizable.
Pros:
- should be very fast!
- converting files are easily paralelizable
- allow to perform multiple experiments without introducing additional latency.
Cons:
- price (I am leaving this out of the scope here)
- Strictly speaking, it is not persistent database, it could crash and loose the data, however backup would help in our case.
How to allow fast.ai to use Redis database as a source?
After digging fast.ai code it became quite clear for me: I need to introduce another dataset classes.
import redis
def resize_imgs_redis(redis_conn, targ, key_prefix, resume=True, fn=None):
“”"
Enlarge or shrink a set of images in the same directory to scale, such that the smaller of the height or width dimension is equal to targ.
Note:
– This function is multithreaded for efficiency.
– When destination file or folder already exist, function exists without raising an error.
“”"s = key_prefix.split(':') new_key_prefix = ':'.join([s[0], 'sz' + str(targ)]) # todo - change to SCAN keys = redis_conn.keys(key_prefix) keys = [k.decode('utf-8') for k in keys] new_keys = [new_key(new_key_prefix, k) for k in keys] errors = {} def safely_process(key, new_key): try: if resume and redis_conn.get(new_key) is not None: return resize_img(redis_conn, targ, key, new_key, fn=fn) except Exception as ex: errors[fname] = str(ex) if len(fnames) > 0: with ProcessPoolExecutor(num_cpus()) as e: ims = e.map(lambda p: safely_process(*p), zip(keys, new_keys)) for _ in tqdm(ims, total=len(fnames), leave=False): pass if errors: print('Some images failed to process:') print(json.dumps(errors, indent=2)) return new_key_predix
class RedisImageDataset(BaseDataset):
def __init__(self, redis_host, redis_port, key_prefix, keys_count, transform): self.redis_conn = redis.Redis(redis_host, redis_port) self.keys_count = keys_count self.key_prefix = key_prefix self.host, self.port = redis_host, redis_port super().__init__(transform) def get_sz(self): return self.transform.sz def get_x(self, i): value = self.redis_conn.get(':'.join([self.key_prefix, str(i)])) value = norm(decompress_array(value)) return value def get_n(self): return self.keys_count def resize_imgs(self, targ, new_path, resume=True, fn=None): new_key_prefix = resize_imgs(self.redis_conn, targ, self.path, self.key_prefix, resume, fn) return self.__class__(self.host, self.port, new_key_prefix, self.keys_count, self.transform) def denorm(self,arr): if type(arr) is not np.ndarray: arr = to_np(arr) if len(arr.shape)==3: arr = arr[None] return self.transform.denorm(np.rollaxis(arr,1,4))
class RedisNumpy(object):
def __init__(self, redis_host, redis_port, key_prefix, length): self.redis_conn = redis.Redis(redis_host, redis_port) self.key_prefix = key_prefix self.length = length def __getitem__(self, item): return int(self.redis_conn.get(self.key_prefix + ':' + str(item))) def __len__(self): return self.length def max(self): return 339 @property def shape(self): return (self.length,)
class RedisImageArrayDataset(RedisImageDataset):
def __init__(self, redis_host, redis_port, key_prefix, keys_count, pred_prefix, classes_count, transform): self.pred_prefix = pred_prefix self.classes_count = classes_count self.keys_count = keys_count self.y = RedisNumpy(redis_host, redis_port, pred_prefix, keys_count) super().__init__(redis_host, redis_port, key_prefix, keys_count, transform) def get_y(self, i): val = self.redis_conn.get(':'.join([self.pred_prefix, str(i)])) return int(val) if val is not None else 0 def get_c(self): return 0 #todo: enchance to have multiple columns for predictions def get_index(self, i): a = self.key_prefix.split(':') return self.redis_conn.get(':'.join([a[0], 'index', str(i)]))
class RedisImageIndexArrayDataset(RedisImageArrayDataset):
def get_c(self): return self.classes_count
- resize_imgs_redis function above is just an adoption of resize_imgs.
- RedisNumpy class above is a workaround for a requirement to have ‘y’(prediction) vector loaded into memory. There is an agreement implemented how to name keys, which is basically something like this: train:orig:number, test:orig:number where train and test are configurable.
After adding from_redis class method to ImageClassifier class, i could use any fast.ai code by calling this function instead of from_paths:
@classmethod def from_redis(cls, path, redis_host, redis_port, train_key_prefix, valid_key_prefix, train_pred_prefix, valid_pred_prefix, \ train_keys_count, valid_keys_count, classes, test_key_prefix=None, test_pred_prefix=None, test_keys_count=0,bs=64, tfms=(None,None), num_workers=8): assert not(tfms[0] is None or tfms[1] is None), "please provide transformations for your train and validation sets" # todo: change to kwargs datasets = [ RedisImageIndexArrayDataset(redis_host, redis_port, train_key_prefix, train_keys_count, train_pred_prefix, len(classes), tfms[0]), # train RedisImageIndexArrayDataset(redis_host, redis_port, valid_key_prefix, valid_keys_count, valid_pred_prefix, len(classes), tfms[1]), # val RedisImageIndexArrayDataset(redis_host, redis_port, train_key_prefix, train_keys_count, train_pred_prefix, len(classes), tfms[1]), # fix RedisImageIndexArrayDataset(redis_host, redis_port, valid_key_prefix, valid_keys_count, valid_pred_prefix, len(classes), tfms[0]), # aux ] if test_key_prefix is not None: datasets += [ RedisImageIndexArrayDataset(redis_host, redis_port, test_key_prefix, test_keys_count, test_pred_prefix, len(classes), tfms[1]), # test RedisImageIndexArrayDataset(redis_host, redis_port, test_key_prefix, test_keys_count, test_pred_prefix, len(classes), tfms[0]), # test_aux ] else: datasets += [None,None] return cls(path, datasets, bs, num_workers, classes=classes)
And what about timing?
Converting files took 9796 seconds
Saving files to Redis took astonishing 9.5 seconds(!) which allow almost infinite parallelism.
Additional step is required after concurrent processing to renumber keys to make them consecutive: 0.00085 seconds per image, which is quite negligible.
So, if we spin up 64 cores instance we could potentially convert all data in 5 hours.
How about read timing?
To measure time I’ve just modified get_x functions in both Redis and Files related classes and got the following results:
File: 0.02 for a file
Redis (with decompression): 0.013
If you wonder what decompression means and why converting takes longer for Redis: in order to save space, I had to compress numpy arrays after applying drawing_to_np_prepare_data:
compressed_image = io.BytesIO() np.savez_compressed(compressed_image, rawing_to_np_prepare_data_raw(drawing)) compressed_image.seek(0)
which made numpy arrays 109 times(!) smaller. However it comes for the price of decompression each time get_x is called:
def decompress_array(array): result = io.BytesIO(array) result.seek(0) return np.load(result)['arr_0']
This decompression allows to fit all 50M images in about 110 GB space in Redis, otherwise it would not be useful.
However, as for old fast.ai version, data is being transformed into ArrayDataset in learn.precompute(…) and reside in memory afterwards. I didn’t find a method to avoid this and use original data, however it is not hard to implement it.
Few other thoughs:
- Reading data from Redis (with decompression) takes 4.4 time longer then from memory (could be optimized to 3.8 times by getting minibatches instead of a single images). However Redis gives us almost infinite, easily scalable memory storage.
- if you wonder what is Alternative 2, then it is using pandas dataframes directly, which gives a couple interesting insights as well. It would make this post too long in my opinion.
I would appreciate any thoughts on this, and in particular:
- Could it be useful in general?
- Which use-cases/benefits you can see in Redis approach for you?
- Do you think it makes sense to add those functionality to fast.ai mainline?
Thanks for reading!