Creating an out of memory tabular dataset

I’d like to create an out-of-memory version of TabularDataset and TabularDataBunch, probably using Dask Dataframes to hold the data rather than pandas Dataframes. It looks pretty straightforward so far but I’m a bit unclear about a couple of things:

  1. I’m not sure what interface the eventual Learner requires the DaskDataset to have. I guess it needs a way of loading in batches, but that doesn’t seem to be provided by the TabularDataset; do I just need to override __len__ and __getitem__?

  2. The TabularDataset converts each dataset to tensors in full during its constructor. Since the idea of a DaskDataset is that it’s out-of-memory, this isn’t possible and we’ll have to do it for each batch. I can’t quite tell where this logic should live though! Is it as part of __getitem__?

I can create a PR on GitHub with my work so far if that would help. Thanks!

3 Likes

A dataset needs to have three things to work into fastai:

  • __len__
  • __getitem__
  • c for the number of classes

Preferably, it should inherit from the fastai DatasetBase but it’s not a must.

If you want to put in dataloaders yourself, you can then use the init method of DataBunch, otherwise DataBunch.create will take datasets and convert them for you.

Thanks @sgugger! Do you have any thoughts on my second question?

Just to clarify - it’s c, not __c__.

Sorry, edited my first post.

I don’t know what you want to do exactly, but the main thing your custom dataset has to do is this function yes.

In case anyone was following this, it’s trickier than it sounds because Dask DataFrames don’t support selecting rows by index. It’d need some sort of wrapper around a Dask DataFrame to get this working, from what I can tell.

If interested, here is a custom dataset implementation for Quick Draw data. It is a bit too “hacky” way to implement dataset because it doesn’t have required attributes expected by fastai code. And, probably it will fail on execution of some of Learner methods.

The code is not really related to your question, but maybe it could help you to figure out how to implement something similar. In your case, you don’t inherit from torch.Dataset but from fastai base class as was mentioned above. And you don’t need to have a decorator if your dataset will be a part of the library.

def fastai_dataset(loss_func):
    """A class decorator to convert custom dataset into its fastai compatible version.

    The decorator attaches required properties to the dataset to use it with
    """
    def class_wrapper(dataset_cls):

        def get_n_classes(self):
            return len(self.classes)

        def get_loss_func(self):
            return loss_func

        dataset_cls.c = property(get_n_classes)
        dataset_cls.loss_func = property(get_loss_func)
        return dataset_cls

    return class_wrapper


@fastai_dataset(F.cross_entropy)
class QuickDraw(Dataset):

    img_size = (256, 256)

    def __init__(self, root: Path, train: bool=True, take_subset: bool=True,
                 subset_size: FloatOrInt=1000, bg_color='white',
                 stroke_color='black', lw=2, use_cache: bool=True):

        subfolder = root/('train' if train else 'valid')
        cache_file = subfolder.parent / 'cache' / f'{subfolder.name}.feather'

        if use_cache and cache_file.exists():
            log.info('Reading cached data from %s', cache_file)
            # walk around to deal with pd.read_feather nthreads error
            cats_df = feather.read_dataframe(cache_file)

        else:
            log.info('Parsing CSV files from %s...', subfolder)
            subset_size = subset_size if take_subset else None
            n_jobs = 1 if DEBUG else None
            cats_df = read_parallel(subfolder.glob('*.csv'), subset_size, n_jobs)
            if train:
                cats_df = cats_df.sample(frac=1)
            cats_df.reset_index(drop=True, inplace=True)
            log.info('Done! Parsed files saved into cache file')
            cache_file.parent.mkdir(parents=True, exist_ok=True)
            cats_df.to_feather(cache_file)

        targets = cats_df.word.values
        classes = np.unique(targets)
        class2idx = {v: k for k, v in enumerate(classes)}
        labels = np.array([class2idx[c] for c in targets])

        self.root = root
        self.train = train
        self.bg_color = bg_color
        self.stroke_color = stroke_color
        self.lw = lw
        self.data = cats_df.points.values
        self.classes = classes
        self.class2idx = class2idx
        self.labels = labels
        self._cached_images = {}

    def __len__(self):
        return len(self.data)

    def __getitem__(self, item):
        points, target = self.data[item], self.labels[item]
        image = self.to_image_tensor(points)
        return image, target

    def to_image_tensor(self, points):
        img = to_pil_image(points, self.img_size, self.bg_color, self.stroke_color, self.lw)
        return Image(to_tensor(img))

Then, I was able to use my custom code with the library as usual:

train_ds = QuickDraw(PREPARED, train=True)
valid_ds = QuickDraw(PREPARED, train=False)
bunch = ImageDataBunch.create(
    train_ds, valid_ds, bs=bs, size=sz, ds_tfms=get_transforms())
bunch.normalize(imagenet_stats)
learn = create_cnn(bunch, models.resnet50, path='..')
learn.fit_one_cycle(n)
1 Like

I ended up converting my data to zarr datasets (http://zarr.readthedocs.io/) (after a brief foray with hdf5 and pytables - these didn’t work because they weren’t thread/process safe). I have some generic-ish code but it was written before I know about the data block API so it probably isn’t compatible any more!

If there is a lot of interest in this I can try and adapt it for the latest version, but it may be better to wait until things are more stable.

1 Like

Things should be stable now.

1 Like

Being able to use out of memory data sets (or really any way to deal with large tabular data sets) is something I have a need for a well. My skills are moderately limited but I’d be happy to try and help make this happen.

1 Like

I can try and take a look at this soon - it’s been a couple of months since I looked at it though so it might need a fair bit of work. In case you want to look beforehand I can share some of what I did:

  • first attempted to use PyTables or h5py to read data from HDF5 files. This turned out not to work because they’re not compatible with the way PyTorch handles extra processes/threading (I can’t quite remember the details but I was getting very hard-to-debug segfaults :frowning: )
  • then moved to using zarr, which was a breeze in comparison. I had to do some preprocessing of the data from CSV to zarr tables. The final zarr table had two groups: ‘categorical’ and ‘numeric’. Each of those groups had a ‘columns’ array holding the column names, and a ‘data’ array holding the actual data. I created the ‘data’ arrays as empty, then iterated over chunks of the CSV data and appended the data from the relevant columns to each ‘data’ array.
  • when appending the categorical columns it’s important to convert the values to indices and have a mapping table somewhere. E.g. if your data contains a column ‘foo’ with values ‘Yes’ and ‘No’ you’d need to store something like "foo": {0: "Yes", 1: "No"}, perhaps in a separate ‘mappings’ key of the table.
  • I ended up using the very new pyarrow.csv.read_csv function to read the CSV files, and it was incredibly fast!

The hardest part about doing generically over large data, assuming a CSV input, that you have to be supply the column types and categorical values in advance, or iterate over all of the data in advance at least once to figure it out. After that it’s fairly easy to store all the required metadata in the zarr table and use that in future (i.e in your DataBunch).

This code sample might be a bit more illustrative - assume that the definitions argument points to a JSONL file where each row contains an object identifying the name, type and (if necessary) codes of the variable.

def process_tabular(input: Path, definitions: Path):
    csvs = sorted(glob.glob(str(input / '*.csv.gz')))
    with gzip.open(csvs[0]) as infile:
        columns = infile.readline().decode('utf-8').strip().split(',')

    mappings = {}
    cat_cols = []
    cat_cols_set = set()
    cont_cols = []
    with open(definitions) as defsfile:
        log.info("Reading definitions")
        unique_columns = set(columns)
        for defn in defsfile:
            defn = ujson.loads(defn)
            name = defn['name']
            if defn['type'] == 'categorical':
                mappings[name] = {
                    code['value']: np.int64(i)
                    for i, code in enumerate(defn['codes'], 1)
                }
                cat_cols.append(alias)
                cat_cols_set.add(alias)
            elif defn['type'] == 'numeric':
                cont_cols.append(alias)

    outfile = f'{input}.zarr'
    log.info(f'Opening zarr file at {outfile}')
    compressor = Blosc(cname='blosclz', clevel=7)
    t = zarr.open(outfile, 'w')

    cats_group = t.create_group('categorical')
    nums_group = t.create_group('numeric')
    cats_group.array('columns', np.array(cat_cols), compressor=compressor)
    cats_array = cats_group.create(
        'data',
        shape=(0, len(cat_cols)),
        chunks=(2, None),
        compressor=compressor,
    )
    nums_group.array('columns', np.array(cont_cols), compressor=compressor)
    nums_array = nums_group.create(
        'data',
        shape=(0, len(cont_cols)),
        chunks=(2, None),
        compressor=compressor,
    )

    # we should also store mappings here somehow

    for file in csvs:
        log.info(f'Processing {file}')
        df = pyarrow.csv.read_csv(file).to_pandas()
        cats = np.stack([
            df.loc[:, col].map(mappings[col]).fillna(0).astype(np.int64)
            for col in cat_cols
        ], 1)
        nums = df.loc[:, cont_cols].values
        cats_array.append(cats)
        nums_array.append(nums)
    return t
1 Like

I had a crack at implementing an out of memory dataset using Dask, but I’m not 100% sure how the data block API works and how I should be subclassing the TabularList. See below for what I have so far. I’m getting an error:

TypeError: init() got an unexpected keyword argument ‘inner_df’

What I’m attempting to do is compute a new dask partition as necessary and then treat the resulting dataframe as the inner_df of the TabularList. I’m assuming that items will be accessed sequentially so no shuffling.

Can anyone weigh in on this?

class DaskTabularList(TabularList):

    @classmethod
    def from_dask(cls, df:DataFrame, cat_names:OptStrList=None, cont_names:OptStrList=None, procs=None, **kwargs)->'ItemList':
        "Get the list of inputs in the `col` of `path/csv_name`."
        cls.inner_dd = df
        cls.part_idx = 0
        current_partition = df.get_partition(cls.part_idx).compute()
        cls.len_part = len(current_partition)
        return cls(items=range(len(df)), cat_names=cat_names, cont_names=cont_names, procs=procs, inner_df=current_partition, **kwargs)

    def get(self, o):

        if o > (self.part_idx + 1) * self.len_part:
            # Need to load a new partition
            self.part_idx += 1
            if self.part_idx > self.inner_dd.npartitions: self.part_idx = 0
            self.inner_df = self.inner_dd.get_partition(self.part_idx).compute()

        # Fetch from index in this partition
        o %= self.len_part

        if not self.preprocessed: return self.inner_df.iloc[o] if hasattr(self, 'inner_df') else self.items[o]
        codes = [] if self.codes is None else self.codes[o]
        conts = [] if self.conts is None else self.conts[o]
        return self._item_cls(codes, conts, self.classes, self.col_names)
3 Likes

Can you tell if there is any reason datasets need to be converted to tensors during the constructor (as opposed to lazily)? I’m working with a fast.ai Image Class dataset and am running out of memory on my 32GB machine. It would be great to harness dask-kubernetes to make my dataset operations fly but I’m nervous about the level of complexity updating the fast.ai library with dask might entail…

I think it could be done lazily, per provided it’s very fast to do. The datasets need fast random access to each row (or training example), which isn’t really possible using CSV files since you’d need to seek and parse the row each time. Converting the datasets to a binary format like zarr works well because it supports random access as numpy arrays, which are trivial to convert to tensors.

(Sorry for the extremely delayed response, hopefully this is still useful!)

Any update on the performance of the above? I can’t seem to find DaskTabularList in the latest version of fastai

1 Like

Has anyone tried IterableDataset in Pytorch? It was added in 1.2. I’ve not tried it myself.