When training with a lot of data being able to partition the train set into epochs of arbitrary size can be quite useful (for instance, for better training monitoring, to save models, when using with reduce lr callbacks / stop training).
I have a good manual workaround but not sure how / if this should be integrated into the library. Here is the code that I use:
# https://stackoverflow.com/a/312464/1105837
def chunks(l, n):
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
class RandomSamplerWithEpochSize(Sampler):
"""Yields epochs of specified sizes. Iterates over all examples in a data_source in random
order. Ensures (nearly) all examples have been trained on before beginning the next iteration
over the data_source - drops the last epoch that would likely be smaller than epoch_size.
"""
def __init__(self, data_source, epoch_size):
self.n = len(data_source)
self.epoch_size = epoch_size
self._epochs = []
def __iter__(self):
return iter(self.next_epoch)
@property
def next_epoch(self):
if len(self._epochs) == 0: self.generate_epochs()
return self._epochs.pop()
def generate_epochs(self):
idxs = [i for i in range(self.n)]
np.random.shuffle(idxs)
self._epochs = list(chunks(idxs, self.epoch_size))[:-1]
def __len__(self):
return self.epoch_size
class DataBunch():
"Bind `train_dl`,`valid_dl` and`test_dl` to `device`. tfms are DL tfms (normalize). `path` is for models."
def __init__(self, train_dl:DataLoader, valid_dl:DataLoader, test_dl:Optional[DataLoader]=None,
device:torch.device=None, tfms:Optional[Collection[Callable]]=None, path:PathOrStr='.',
collate_fn:Callable=data_collate):
"Bind `train_dl`,`valid_dl` and`test_dl` to `device`. tfms are DL tfms (normalize). `path` is for models."
self.tfms = listify(tfms)
self.device = defaults.device if device is None else device
self.train_dl = DeviceDataLoader(train_dl, self.device, self.tfms, collate_fn)
self.valid_dl = DeviceDataLoader(valid_dl, self.device, self.tfms, collate_fn)
self.test_dl = DeviceDataLoader(test_dl, self.device, self.tfms, collate_fn) if test_dl else None
self.path = Path(path)
@classmethod
def create(cls, train_ds:Dataset, valid_ds:Dataset, test_ds:Dataset=None, path:PathOrStr='.', bs:int=64,
num_workers:int=defaults.cpus, tfms:Optional[Collection[Callable]]=None, device:torch.device=None,
collate_fn:Callable=data_collate, epoch_size:int=10_000)->'DataBunch':
"`DataBunch` factory. `bs` batch size, `ds_tfms` for `Dataset`, `tfms` for `DataLoader`."
datasets = [train_ds,valid_ds]
if test_ds is not None: datasets.append(test_ds)
dls = [DataLoader(*o, num_workers=num_workers) for o in
zip(datasets, (bs,bs*2,bs*2), (True,False,False))]
dls[0] = DataLoader(train_ds, num_workers=num_workers,
batch_sampler=BatchSampler(RandomSamplerWithEpochSize(train_ds, epoch_size), bs, False))
return cls(*dls, path=path, device=device, tfms=tfms, collate_fn=collate_fn)
def __getattr__(self,k:int)->Any: return getattr(self.train_ds, k)
def holdout(self, is_test:bool=False)->DeviceDataLoader:
"Returns correct holdout `Dataset` for test vs validation (`is_test`)."
return self.test_dl if is_test else self.valid_dl
def add_tfm(self,tfm:Callable)->None:
self.train_dl.add_tfm(tfm)
self.valid_dl.add_tfm(tfm)
if self.test_dl: self.test_dl.add_tfm(tfm)
@property
def train_ds(self)->Dataset: return self.train_dl.dl.dataset
@property
def valid_ds(self)->Dataset: return self.valid_dl.dl.dataset
@property
def loss_func(self)->Dataset: return self.train_ds.loss_func
This is a bit rough around the edges (drops the last epoch so that I donāt have to deal with smaller epochs). This functionality could be arrived at by passing a custom BatchSampler
into DataBunch.create
but not sure if adding this is a good idea. Also, this would go against the nice zipping mechanism we have for creating dataloaders.