Fastai v2 transforms / pipeline / data blocks

Alternatively, it may be possible to create an alternating loading mechanism using Nvidia DALI, although we haven’t really done much with that yet.

This got me curious and I played with it a bit. I took a simple image processing pipeline like this:

dsrc = DataSource(items, tfms)
tdl = TfmdDL(dsrc, bs=10, shuffle=True,
             after_item=[ Resize(224, method = ResizeMethod.Crop), ToTensor],
             after_batch=[Cuda, ByteToFloatTensor, Normalize(tmeans,tstds)],
             num_workers=8)

And created a DALI pipeline that does the same thing:

class DALIPipeline(Pipeline):
    def __init__(self, batch_size, num_threads, device_id):
        super(DALIPipeline, self).__init__(batch_size, num_threads, device_id)
        self.input = ops.FileReader(file_root = image_dir,random_shuffle=True)        
        self.tfms = compose(
            ops.ImageDecoder(device = "mixed"),
            ops.Resize(device = "gpu", resize_shorter = 224),
            ops.CropMirrorNormalize(device = "gpu", crop = (224, 224), mean = means, std = stds)
        )
    def define_graph(self):
        images, labels = self.input(name="Reader")
        return self.tfms(images), labels

The results are encouraging performance wise, since I got 5x speed improvement (4sec vs 20sec for 20k images). There are several things that contribute to that:

  • Transforms implemented in C++, including their custom JPEG decoder
  • More work done on the GPU - here resizing and cropping of the images.
  • The pipeline prefetches batches which allows to overlap the CPU and GPU work.

The downside is in the flexibility, if we want to do something not included in the built in operators we have to either implement it in C++, build it and link as a plugin. Or use the [Torch]PythonFunction operator, however these are limited at the moment as they can’t be used in the exec_pipelined mode and can’t operate on the GPU at all, as far as I can tell.

This is again the same pipeline but using python to load files and assign labels, and DALI operators to decode, resize and normalize images.

class MixedPipeline(Pipeline):
    def __init__(self, np_items, batch_size, num_threads, device_id):
        super(MixedPipeline, self).__init__(batch_size, num_threads, device_id, exec_async=False, exec_pipelined=False)
        self.input_iter = iter(DataLoader(np_items, bs=batch_size, create_batch=noop, shuffle=True, num_workers = num_threads))
        self.path_input = ops.ExternalSource()
        self.y_tfms = compose(
            ops.PythonFunction(extract_label), 
            ops.PythonFunction(categorize)
        )
        self.x_tfms = compose(
            ops.PythonFunction(read_file),
            ops.ImageDecoder(device = "mixed", output_type = types.RGB),
            ops.Resize(device = "gpu", resize_shorter = 224),
            ops.CropMirrorNormalize(device = "gpu", crop = (224, 224), mean = means, std = stds)
        )
    def define_graph(self):
        self.paths = self.path_input()
        return (self.x_tfms(self.paths), self.y_tfms(self.paths))

    def iter_setup(self):
        self.feed_input(self.paths, next(self.input_iter))

This gave me a smaller, but still respectable 2x speed increase.
The whole notebook is here:

3 Likes

I think the trick is to feed a fastai dataloader with the output of the DALI pipeline. fastai v2 has lots of GPU accelerated transforms already. If you give it a go, let me know how it works out! (And feel free to ask if you have any questions or hit any obstructions.)

Sure, you can do that. My thinking was to put as much work as possible inside DALI to take advantage of their optimizations, but then again maybe once the data is on the GPU it doesn’t matter that much.
I see there is a PR that seems to add GPU operators so I can try to compare both approaches once that works.

1 Like

I’m exploring the medium-level API – DataSource, DataBunch.
I use Imagenette dataset.
Please advise: how can I grab a part of the training set for faster training?

Not directly answering your question but Imagenette shouldn’t be very long to train. A single epoch takes 1-2 min with fastai v1, probably faster with v2.

In case someone else needs it, here is my own solution:

items = get_image_files(source)        
items_idx = torch.randperm(len(items)) # randomly sort indexes
items = items[items_idx[:1000]]        # grab part of the items

A more elegant way I am yet to implement – using a callback to stop after a few batches.

I’d like to load an image and use one channel as input and the others as output.
I use the DataBlock API but see a few issues:

  • If I use get_x and get_y, I will have to load the images twice and will lose time (even more since I want to convert the image in another space)
  • If I load the image with get_items, I’m going to use too much memory (large dataset so need to be loaded on the fly)

It seems that I could achieve it with a Transform similar to this example but there are no details on how to turn this specific example into a DataSource.

Here is what I tried:

class PetTfm(Transform):
    def __init__(self, vocab, o2i, lblr): self.vocab,self.o2i,self.lblr = vocab,o2i,lblr
    def encodes(self, o): return resized_image(o), self.o2i[self.lblr(o)]
    def decodes(self, x): return TitledImage(x[0],self.vocab[x[1]])

labeller = RegexLabeller(pat = r'/([^/]+)_\d+.jpg$')
vals = list(map(labeller, items[split_idx[0]]))
vocab,o2i = uniqueify(vals, sort=True, bidir=True)
pets = PetTfm(vocab,o2i,labeller)

dsrc = DataSource(items, tfms=pets)

The issue is that I don’t get a tuple at this point so I am not sure how to compose it into a databunch and add more transforms. Is there any other way?

You should use a TfmdList, not a DataSource, as your transform already returns a tuple. A TfmdList can be converted to a DataBunch.

2 Likes

Is it possible that FilteredBase.databunch should also pass after_item?

Based on my last example.

dsrc = TfmdList(items, tfms=pets)

dsrc.tfms.show(dsrc[0])
> displays correctly an image

dsrc.decode(dsrc[0]).show()
> displays correctly an image

db = dsrc.databunch()
batch = db.show_batch()
> AttributeError: 'Tensor' object has no attribute 'show'

db.after_item.decode
> <bound method Pipeline.decode of Pipeline: (#1) [Transform: False (object,object) -> noop ]>

I feel like the method data.core._decode_batch is supposed to decode the input (probably from my original Transform PetTfm) except that TfmdDL.after_item.decode is a noop.

Would you have a similar example of going from a custom TfmdList (based on one transform returning both inputs and outputs) to a Databunch?

You can pass after_item, after_batch and before_batch to your call to .databunch.

The issue is that the decoder needs to be called only for displaying the data.
It is called properly with TfmdList but not when I create a DataBunch.

I documented how I tested it in this notebook.

If I can make it work I’ll be happy to add it in the Pet Tutorial for future reference.

Hi

So I have been wondering how to go about running fastai2 on video data or data with multiple 2d slices of images with variable length. Meaning x is a set of 2d slices composing a 3d volume and between two distinct x’s the number of 2d slices may vary (i.e. one video may have more frames than the other since its a longer shot).

It seemed that the middle-level API is the right place to start. I successfully got a pipeline working but having issues creating a data set. Its my first time working with the API so it might be something obvious I’m missing.

As a toy example, I artificially aggregated paths into bags, the comparable to video frames paths saved on disk, and have binary label True if the bag contains more 3s than 7s

Dynamic images bags:
image

When I run the pipe the indexing is done successfully, however when attempting to create the dataset it the i variable for some reason is a path. You can see this by the prints of i.

Any help would be much appreciated.

How can I get a Dataset from the SiamesePair pipeline example in https://github.com/fastai/fastai2/blob/master/nbs/10_tutorial.pets.ipynb?

I tried:

OpenAndResize = TupleTransform(resized_image)
labeller = RegexLabeller(pat = r'/([^/]+)_\d+.jpg$')
sp = SiamesePair(items, items.map(labeller))
pipe = Pipeline([sp, OpenAndResize], as_item=True)
dsets = Datasets(items, pipe)
t = dsets[0]
type(t[0]),type(t[1])

getting error:

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-66-cab6cdc85da8> in <module>
      4 pipe = Pipeline([sp, OpenAndResize], as_item=True)
      5 dsets = Datasets(items, pipe)
----> 6 t = dsets[0]
      7 type(t[0]),type(t[1])

~/Dev/fastai2/fastai2/data/core.py in __getitem__(self, it)
    255 
    256     def __getitem__(self, it):
--> 257         res = tuple([tl[it] for tl in self.tls])
    258         return res if is_indexer(it) else list(zip(*res))
    259 

~/Dev/fastai2/fastai2/data/core.py in <listcomp>(.0)
    255 
    256     def __getitem__(self, it):
--> 257         res = tuple([tl[it] for tl in self.tls])
    258         return res if is_indexer(it) else list(zip(*res))
    259 

~/Dev/fastai2/fastai2/data/core.py in __getitem__(self, idx)
    232         res = super().__getitem__(idx)
    233         if self._after_item is None: return res
--> 234         return self._after_item(res) if is_indexer(idx) else res.map(self._after_item)
    235 
    236 # Cell

~/Dev/fastai2/fastai2/data/core.py in _after_item(self, o)
    196     def _new(self, items, **kwargs): return super()._new(items, tfms=self.tfms, do_setup=False, types=self.types, **kwargs)
    197     def subset(self, i): return self._new(self._get(self.splits[i]), split_idx=i)
--> 198     def _after_item(self, o): return self.tfms(o)
    199     def __repr__(self): return f"{self.__class__.__name__}: {self.items}\ntfms - {self.tfms.fs}"
    200     def __iter__(self): return (self[i] for i in range(len(self)))

~/Dev/fastcore/fastcore/transform.py in __call__(self, o)
    186         self.fs.append(t)
    187 
--> 188     def __call__(self, o): return compose_tfms(o, tfms=self.fs, split_idx=self.split_idx)
    189     def __repr__(self): return f"Pipeline: {' -> '.join([f.name for f in self.fs if f.name != 'noop'])}"
    190     def __getitem__(self,i): return self.fs[i]

~/Dev/fastcore/fastcore/transform.py in compose_tfms(x, tfms, is_enc, reverse, **kwargs)
    134     for f in tfms:
    135         if not is_enc: f = f.decode
--> 136         x = f(x, **kwargs)
    137     return x
    138 

~/Dev/fastcore/fastcore/transform.py in __call__(self, x, **kwargs)
     69     @property
     70     def name(self): return getattr(self, '_name', _get_name(self))
---> 71     def __call__(self, x, **kwargs): return self._call('encodes', x, **kwargs)
     72     def decode  (self, x, **kwargs): return self._call('decodes', x, **kwargs)
     73     def __repr__(self): return f'{self.name}: {self.use_as_item} {self.encodes} {self.decodes}'

~/Dev/fastcore/fastcore/transform.py in _call(self, fn, x, split_idx, **kwargs)
     80         if split_idx!=self.split_idx and self.split_idx is not None: return x
     81         f = getattr(self, fn)
---> 82         if self.use_as_item or not is_listy(x): return self._do_call(f, x, **kwargs)
     83         res = tuple(self._do_call(f, x_, **kwargs) for x_ in x)
     84         return retain_type(res, x)

~/Dev/fastcore/fastcore/transform.py in _do_call(self, f, x, **kwargs)
     85 
     86     def _do_call(self, f, x, **kwargs):
---> 87         return x if f is None else retain_type(f(x, **kwargs), x, f.returns_none(x))
     88 
     89 add_docs(Transform, decode="Delegate to `decodes` to undo transform", setup="Delegate to `setups` to set up transform")

~/Dev/fastcore/fastcore/dispatch.py in __call__(self, *args, **kwargs)
     96         if not f: return args[0]
     97         if self.inst is not None: f = MethodType(f, self.inst)
---> 98         return f(*args, **kwargs)
     99 
    100     def __get__(self, inst, owner):

<ipython-input-63-605ff57d4e17> in encodes(self, i)
     11         othercls = self.clsmap[self.labels[i]] if random.random()>0.5 else self.idxs
     12         otherit = random.choice(othercls)
---> 13         return SiameseImage(self.items[i], self.items[otherit], self.labels[otherit]==self.labels[i])

~/Dev/fastcore/fastcore/foundation.py in __getitem__(self, idx)
    314     def _xtra(self): return None
    315     def _new(self, items, *args, **kwargs): return type(self)(items, *args, use_list=None, **kwargs)
--> 316     def __getitem__(self, idx): return self._get(idx) if is_indexer(idx) else L(self._get(idx), use_list=None)
    317     def copy(self): return self._new(self.items.copy())
    318 

~/Dev/fastcore/fastcore/foundation.py in _get(self, i)
    319     def _get(self, i):
    320         if is_indexer(i) or isinstance(i,slice): return getattr(self.items,'iloc',self.items)[i]
--> 321         i = mask2idxs(i)
    322         return (self.items.iloc[list(i)] if hasattr(self.items,'iloc')
    323                 else self.items.__array__()[(i,)] if hasattr(self.items,'__array__')

~/Dev/fastcore/fastcore/foundation.py in mask2idxs(mask)
    253     "Convert bool mask or index list to index `L`"
    254     if isinstance(mask,slice): return mask
--> 255     mask = list(mask)
    256     if len(mask)==0: return []
    257     it = mask[0]

TypeError: 'PosixPath' object is not iterable

Just tried again relized I might not have initialized the tfms correctly, still getting error,

tfms = [[sp, OpenAndResize], [labeller, Categorize]]
dsets = Datasets(items, tfms, verbose=True)
t = dsets[0]
print(type(t[0]),type(t[1]))
x,y = dsets.decode(t)
print(x.shape,y)
dsets.show(t);

Whats the right way to get a siamese dataset following the tutorial notebook on pets?

I’m trying to replicate some code I have in Fastai V1, in which images are composed of 4 channels (R,G,B & Y). These images come from Kaggle’s Protein Atlas challenge. In the data directory there are 4 PNG images, one for each channel. Given the name of the image, I want to load each of these and form a single 4-channel image.

I’m just getting started with V2 and am struggling to get a Dataset working for this. What I have so far is shown below. In this ‘open_4_channel’ takes an data record and gets the image name as the first item of this; it then forms paths for each or the 4 possible images and loads these, before finally returning a TensorImage, which has shape [4,512,512].

‘protein_labels’ takes the second item of the data record, which contains a list of space-seperated numbers, representing the multi-label categories.

def open_4_channel(x):                
    fname = data_path/'train'/f'{x[0]}'
    fname = str(fname)
    colors = ['red','green','blue','yellow']
    flags = cv2.IMREAD_GRAYSCALE          
    img = [cv2.imread(fname+'_'+color+'.png', flags).astype(np.float32)/255 for color in colors]    
    x = np.stack(img, axis=-1)           
    return TensorImage(pil2tensor(x, np.float32).float())

def protein_labels(x):
    y = x[1].split(' ')

I then use these to form the transforms and create a data set from these, supplying the DataFrame ‘train_df’:

tfms = [[open_4_channel],[protein_labels]]
dsets = Datasets(train_df, tfms)
show_at(dsets.train, 0)

When I call ‘show_at’, as shown above, everything works fine and the first image from the data set is displayed. However, if I then try and create a data loader from this I get an error:

dls = dsets.dataloaders(bs=4)

TypeError: default_collate: batch must contain tensors, numpy arrays, numbers, dicts or lists; found <class ‘NoneType’>

I presume I’m doing something basic wrong (for example, is it ok just to use functions like this in the transforms list?) but I haven’t found a way to be able to load these 4 channel images, either using Datasets nor with DataBlocks. So if anyone could point me in the correct direction it would be much appreciated

I’m confused on applying Transform on tuples (which I use in after_batch for dataloaders).

Sometimes I just add as_item=False, sometimes I use TupleTransform and sometimes I need both.

Here is a confusing example with IntToFloatTensor:

x = (TensorImage(1),TensorImage(2))

with call

Just use as_item=False

IntToFloatTensor()(x)
>> (TensorImage(1), TensorImage(2))

IntToFloatTensor(as_item=False)(x)
>> (TensorImage(0.0039), TensorImage(0.0078))

with encodes

Use TupleTransform(IntToFloatTensor(as_item=False))

IntToFloatTensor().encodes(x)
>> (TensorImage(1), TensorImage(2))

IntToFloatTensor(as_item=False).encodes(x)
>> (TensorImage(1), TensorImage(2))

TupleTransform(IntToFloatTensor()).encodes(x)
>> (TensorImage(1), TensorImage(2))

TupleTransform(IntToFloatTensor(as_item=False)).encodes(x)
>>  (TensorImage(0.0039), TensorImage(0.0078))

Note that encodes is not supposed to be called by the user, so the inconsistent behavior there is not something we will fix. You’re supposed to call __call__ or encode.

I actually don’t call directly these methods. It is just to debug my dataloader which works only when I pass after_batch=[TupleTransform(IntToFloatTensor(as_item=False))]

@sgugger I created a minimal example to explain better my difficulty.

# 2 items with 2 tensors each
items = (TensorImage(1),TensorImage(2)), (TensorImage(3),TensorImage(4))

# create a dataset
dsrc = Datasets(items, tfms=[[None], [lambda x:TensorCategory(0)]])

# create a dataloader
dls = dsrc.dataloaders(bs=1)])

The first issue is this returns tensors instead of TensorImage as internally retain_type is used only to preserve tuple type (not type of tuple contents).

This is solved with the following “hack”:

class myTuple(Tuple):
    def __new__(cls, x=None, *rest):
        x = TensorImage(x[0]), TensorImage(x[1])
        return super().__new__(cls, x)
    
class keepType(Transform):
    def encodes(self, x): return myTuple(x)

Then I can use my Transform to preserve the correct types.

# use myTransform to retain tuple content type
dsrc = Datasets(items, tfms=[[keepType], [lambda x:TensorCategory(0)]])

# create a dataloader
dls = dsrc.dataloaders(bs=1, after_batch=[TupleTransform(IntToFloatTensor(as_item=False))])

My main confusion is on the second issue and the fact that I have to do TupleTransform(IntToFloatTensor(as_item=False)) to make the transform work.

Here is an alternative method. I can add a method for myTuple

@IntToFloatTensor
def encodes(self, o:myTuple):
    return [self.encodes(t) for t in o]

Both methods work but both look very “hacky” to me so I’m concerned they would become unsupported. The first one look cleaner but I don’t understand why I have to use both TupleTransform and as_item=False