How to create a callback using torch.multiprocessing (TPU)

Yeah, but from what I can see the stuff you need for threading is basically a subset of mutliprocessing. Multiprocessing just creates multiple train/test loops in separate processes.

I know. The API is pretty similar. I just wanted to point that out as one potential caveat.

AFAICT the only thing that would need to change for multiprocessing is to put the #Start training cell in a function and pass it to spawn. And the only extra integration with fastai would be create a wrapper for spawn. But that can’t be on an existing Learner because you need to create the Learner inside the function you pass to spawn.

Isn’t this the same challenge I had mentioned at the beginning of this thread? I need to think about the solution to this more carefully but for now I will try Sylvain’s approach to have a script where I have a function with learn.fit() command and pass it to the spawning function.

BTW I will clean up my code and share it with you over here. Maybe you might have some insight regarding solving this semaphore problem that I don’t have?

I was busy today with other stuff and couldn’t get to work on this much. However, here is the work I am working on right now and getting semaphore warnings:

import torch_xla
import torch_xla.distributed.data_parallel as dp
import torch_xla.utils.utils as xu
import torch_xla.core.xla_model as xm
import torch_xla.distributed.parallel_loader as pl
import torch_xla.distributed.xla_multiprocessing as xmp
import torch

from fastai import *
from fastai.core import *
from fastai.torch_core import *
from fastai.vision import *
from fastai.basic_train import *

def len_parallelloader(self):
  return len(self._loader._loader)
pl.PerDeviceLoader.__len__ = len_parallelloader
  

class TPUDistributed(LearnerCallback):
  def __init__(self, learn:Learner):
    super().__init__(learn)
    self.device = xm.xla_device()
  def on_train_begin(self, **kwargs:Any)->None:
    self.learn.model = self.learn.model.to(self.device)
    self.old_dl = self.learn.data.train_dl
    self.learn.data.train_dl = pl.ParallelLoader(self.old_dl, [self.device]).per_device_loader(self.device)
    self.learn.data.train_dl.dataset = self.old_dl.dataset   
  def on_batch_begin(self, last_input, last_target, train, **kwargs):
    return {'last_input': last_target[0], 'last_target': last_target[1]}
  def on_step_end(self, **kwargs:Any)->None:
    xm.optimizer_step(self.learn.opt.opt)

def _to_tpu_distributed(learn:Learner) -> Learner:
  learn.callback_fns.append(TPUDistributed)
  return learn
  

Learner.to_tpu_distributed = _to_tpu_distributed
  

path = untar_data(URLs.MNIST_SAMPLE)
data = ImageDataBunch.from_folder(path)
learn = cnn_learner(data, models.resnet50, metrics=accuracy).to_tpu_distributed()
def train_loop(index):
  print('hello')
  learn.fit(1)

if __name__ == "__main__":
  xmp.spawn(train_loop,args=())

@TomB if you have any insights, please let me know! :slight_smile:

You need to look more at what spawn is doing and how multiprocessing works. You can’t just spawn a function and expect all the state to be there and magically duplicate itself appropriately. That’s why all the examples create everything from scratch in every process.
Notably, when you create the learner before calling spawn it’s init then calls self.device = xm.xla_device() but that can only be called from a child process as the spawn is what assigns devices to child processes.
This is also likely what causing your semaphore issues as semaphores probably won’t be inherited by the forked process so any code using one won’t transfer properly (if it can even manage to do the implicit binding of learn in .train_loop, it may be falling over here before even getting to trying to use learn).

I’m also pretty sure calling xm.optimizer_step in on_step_end won’t work. The model will have already been stepped by fastai in loss_batch.

Thinking about I think it may be better to create a custom learner class. There’s really not much you gain by trying to implement it as a callback and just means lots of stuff to work around the limitations there. You’s need to wrap most of the things the learner creates in on_train_begin anyway, so you may as well just create a custom learner. There’s very little logic in learner, it just calls the base fit and validate to do all the real training/validation logic.

So this should be moved to on_train_begin to solve this issue?

Yep you are right I meant to use on_backward_end

This won’t be supported in fastai v1 or even v2 probably. Then if anybody wants to use TPU with fastai, they would have to use a custom Learner class. I don’t think that’s a desired solution.

I will look into multiprocessing again this weekend. It’s been a couple years since I have used multiprocessing so I have forgotten the details.

@TomB unfortunately with the above fixes still getting the same errors:

/usr/lib/python3.6/multiprocessing/semaphore_tracker.py:143: UserWarning: semaphore_tracker: There appear to be 1 leaked semaphores to clean up at shutdown
  len(cache))

And:

Exception in device=TPU:2: Expected 4-dimensional input for 4-dimensional weight 64 3 7 7, but got 0-dimensional input of size [] instead
Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/torch_xla/distributed/xla_multiprocessing.py", line 107, in _start_fn
    fn(gindex, *args)
  File "/content/tpu_distributed_fastai.py", line 47, in train_loop
    learn.fit(1)
  File "/usr/local/lib/python3.6/dist-packages/fastai/basic_train.py", line 200, in fit
    fit(epochs, self, metrics=self.metrics, callbacks=self.callbacks+callbacks)
  File "/usr/local/lib/python3.6/dist-packages/fastai/basic_train.py", line 106, in fit
    cb_handler=cb_handler, pbar=pbar)
  File "/usr/local/lib/python3.6/dist-packages/fastai/basic_train.py", line 59, in validate
    val_loss = loss_batch(model, xb, yb, loss_func, cb_handler=cb_handler)
  File "/usr/local/lib/python3.6/dist-packages/fastai/basic_train.py", line 26, in loss_batch
    out = model(*xb)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/module.py", line 545, in __call__
    result = self.forward(*input, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/container.py", line 92, in forward
    input = module(input)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/module.py", line 545, in __call__
    result = self.forward(*input, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/container.py", line 92, in forward
    input = module(input)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/module.py", line 545, in __call__
    result = self.forward(*input, **kwargs)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/conv.py", line 345, in forward
    return self.conv2d_forward(input, self.weight)
  File "/usr/local/lib/python3.6/dist-packages/torch/nn/modules/conv.py", line 342, in conv2d_forward
    self.padding, self.dilation, self.groups)
RuntimeError: Expected 4-dimensional input for 4-dimensional weight 64 3 7 7, but got 0-dimensional input of size [] instead

I will look into multiprocessing more carefully this weekend.

What do you mean won’t be supported? Why not, you’re free to create a custom learner or a subclass of learner. There’s already CollabLearner and GANLearner. Of course you support the same inteface. And may well extend it.

@sgugger what are your thoughts?

@TomB looking into this further, the semaphore issue seems to be due to a bug with tqdm/fastprogress and probably cannot be fixed. The other error is an error during validation, so I might have messed up with validation. However, it seems that training is going ok. I cannot actually see how the training loop is progressing because all I see is " <IPython.core.display.HTML object>" but if I run the code ignoring warnings and skipping validation set, it completes. However, it is surprisingly slow, maybe takes about minute to train 1 epoch of MNIST, when it takes a maximum of 20 sec with K80 GPU.

At least the code technically runs without error.

Right now, I will prioritize incorporating another callback that logs the training so I can actually how long it’s training and whether or not there are any weird errors. I will also look more carefully into torch.multiprocessing.

That’s possibly another reason for doing it at the learner level. You likely don’t want to just run a normal learner on all cores (each core being handled by an individual optimiser/learner instance running in a separate process, or thread if doing it that way). You probably want a master on the first that does recording etc and a bunch of slaves that just report to the master. The distributed samples just leave all logging to tensorboard so don’t have this issue.
So then you want a different set of callbacks running on each core. Now, this (like basically anything) is technically possible in a callback (overwrite learn.callbacks in on_train_begin), but if you’re just redoing half the stuff the learner does then you may as well just inherit from it.

The smaller PyTorch datasets are in-memory, so you’ll get a pretty big hit there.
Also, you don’t seem to be doing distributed sampling at all. You have to setup a DistributedSampler as that’s what means each core gets a different set of samples rather than every core running on all samples. I guess this misunderstanding might also be why you monkeypatch the ```
PerDeviceLoader.__len__ to be the full length which is wrong, each core should see just a subset of the data.

Not properly setting up your loaders/samplers is also likely the cause of the above size mismatch error. This is what splits the single batch produced by the base DataLoader (data.train_dl) into separate batches for each core. That error is showing you’ve somehow provided a core with an empty batch (3 channel showing it’s input not an intermediate).

I agree but we would have to get @sgugger’s approval that doing something like this would be incorporated in the library.

No the master/children thing should be done in the callbacks. There is no reason to do it in the Learner directly.

But can the progress bar be disabled and then done separately in a callback? It seems this cannot be done as the progress bar is in the fit function.

You can silence the Recorder in the children processes, look at how the distributed module does it.

1 Like

You shouldn’t need to change the training loop - instead, in your callback, add something like:

self.learn.xb,self.learn.yb = self.learn.yb

…because fastai assumes the dataloader is returning (x,y), but actually the TPU loader is giving us (idx,(x,y)), if I understand correctly.

Thanks! This is what I did in my callback:

Also, after people were complaining (including myself), they fixed this. However, Google Colab’s version needs to be updated (“they are currently pinned to the 0.5 release, so they are fine.”) so I still need to include this in the callback when using Colab.