Should training Distributed with bs=x/gpus be equivalent?

I’ve been working on trying to scale up my model and have been making steady progress. But I’ve run into something that doesn’t seem right.

In my understanding, the following two training runs should produce equivalent results.

  1. 1 GPU, Batch Size = 160
  2. 8 GPUs, Batch Size = 20

From how I understand it, the gradients will be accumulated on each GPU and then summed together. So it shouldn’t matter whether it’s done on one GPU or spread across 8. (Is that right?)

Unfortunately, I’m getting worse accuracy with Distributed no matter the batch size I use.

I’ve done several runs on this language model varying (only) the number of GPUs and Batch Size. I’m wondering if maybe there’s a bug (possibly with different Dropouts on each GPU? How would I check that?)

Number of GPUs    Batch Size    Accuracy after 1 Epoch
             1            64                  0.604424
             1           160                  0.619184

             8            20                  0.589825
             8            40                  0.589547
             8            80                  0.588944
             8           140                  0.588154

It seems weird to me that all of the 8-GPU runs got nearly exactly the same accuracy and that varying the batch size didn’t matter at all. And that the 8x20 wasn’t close to the 1x160. So trying to figure out what’s going wrong.

I did check that the GPU memory usage during the runs passed a sanity check (reducing batch size also reduced GPU memory in use).

Edit: I suppose one way to check the differing dropout hypothesis is to set a random seed. (Trying that now. Will know whether it helped in ~1 hour.)

Edit 2: Setting random seeds (np, torch, random, torch.cuda) didn’t work. Still got 0.584737 with 8 GPUs. Wondering if torch.cuda seed is per-process or global…

Edit 3: I feel pretty confident I’ve ruled out random seeds being an issue. I tried setting them to the current timestamp (in seconds) on every forward pass as well and that didn’t work either. Got 0.583825. Anyone have ideas of what else to try?

Edit 4: Shooting in the dark but going to try adjusting the drop_mult (dividing by number of GPUs) 0.596205

My new theory is that it has to do with how the data is being split up between the replicas in torch.data.distributed.DistributedSampler. It looks like it’s feeding each of the processes a random permutation of the dataset.

I’m not sure how things work in the default data loader but I think I saw something about sequence order being important because the state needs to be maintained.

I’m going to try modifying this to pass contiguous chunks instead of permutations and see if that helps.

Making a DistributedSampler that chunks things rather than gives a random permutation is the first thing that got be back above .59; but still not close to the 0.619184 I had on a single v100 (chunks gave 0.591792).

I also found this line that I don’t understand why it’s needed. Commenting it out didn’t seem to make a difference one way or another:

self.bs *= num_distrib()

Intuitively, since the batch has already been distributed to each of the processes I’m not sure why they would need to know that there are other workers doing anything in parallel.

Got it! That line that I had commented did make a big difference once I was chunking the data instead of using randperm.

Not sure what that will mean for multiple epochs though… in my head it shouldn’t make a difference because each process should be shuffling its own data anyway… but I’m not positive. Going to test and make sure the unfrozen output looks similar as well.

That line is needed for the language model to know the total batch size and be able to parse the texts properly. Note that you are the first to test distributed training on a text problem (to my knowledge).

2 Likes

Finished training on 8 GPUs through 6 epochs (1 frozen, 5 unfrozen) and got one of the best accuracy levels I’ve seen so far on all my tests (0.7418) and still underfitting a bit. [Caveat: I’ve only trained this far on 1 other test; bs64 on 1 gpu was 0.749853 but that was with 5 more epochs]

One of my biggest takeaways from all of these tests is that awd-lstm seems to not mind (and possibly even performs better with) larger batch sizes. This most recent run was a batch size of 1520 (190x8) and it did just as well as with a batch size of 64 on a single GPU now that I’ve gotten the Distributed kinks worked out.

Gradient accumulation might merit further experimentation.

I’m going to try this Sampler tweak on a different language model and submit a pull request if it seems to generalize well.

1 Like

Brad, I am trying to tackle something similar where my model can’t fit a large batch size on a single GPU (model is large, data is large…) Can you post more about what you needed to change with the Distributed to get some success? By chance is it a public data set that I could help with?

I submitted a PR to fix this a few days ago. It got merged (and subsequently refactored by @sgugger) so if you pull in master you should get the working version!

I f I haven’t broken distributed in the meantime, since it seems to open once a day at the moment :-/

2 Likes

cool. Yes, I am pulling from master and seem to be able to train a text learner. But, only kinda. Sometimes it works, sometimes it fails. Do examples help to debug anything @sgugger ?

Train loop seems to run on 2 GPUs, but then hangs on Validation. Process is still alive. Not sure how to debug that one. Any ideas, I am happy to track down the problem and try to debug.

Bash command:

python -m torch.distributed.launch --nproc_per_node=2 text_distributed.py --bs=4

Full code here:

from fastai.text import *

from fastai.distributed import *
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank",type=int)
parser.add_argument("--bs",type=int)
args = parser.parse_args()
batch_size = args.bs

args = parser.parse_args()
torch.cuda.set_device(args.local_rank)
torch.distributed.init_process_group(backend='nccl',init_method='env://')

path = untar_data(URLs.IMDB_SAMPLE)
data_lm = TextLMDataBunch.from_csv(path, 'texts.csv',bs=batch_size)

learn = language_model_learner(data_lm, AWD_LSTM, drop_mult=0.5).to_distributed(args.local_rank)
learn.fit_one_cycle(1,3e-3,wd=0.5,div_factor=10,pct_start=0.5)

I have the same issue, @bfarzin. Mine gets stuck either on epoch 2 or on validation. With CPUs and GPUs spinning at 100% and nothing happening otherwise.

2 Likes

The first thing I realized is that the progress bar doesn’t work well in the multiprocess scenario. Only one process shows the progress bar header, but then both processes overwrite each other’s output - so the progress bar is a mess. If 2 GPUs work in total sync you don’t notice that. if one lags behind you will see the bar extending and shrinking. if the split was unequal the cnt/total will flicker between the two different values.

I added to the beginning of the program, a custom log file for each process, leaving std streams as they are:

parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()
gpu = args.local_rank

err = open(f"/tmp/gpu{gpu}-err.log", "a")
#out = open(f"/tmp/gpu{gpu}-out.log", "a")

from sys import stdout, stderr
def stdout_write_flush(args, w=stdout.write): w(args); stdout.flush(); err.write(args); err.flush()
def stderr_write_flush(args, w=stderr.write): w(args); stderr.flush(); err.write(args); err.flush()
stdout.write = stdout_write_flush
stderr.write = stderr_write_flush

now I can see from /tmp/gpu0-err.log that the first process finishes the training of the first epoch:

epoch     train_loss  valid_loss  accuracy  time
Epoch 1/2 : |--------------------| 0.00% [0/67 00:00<00:00]

whereas the 2nd doesn’t: /tmp/gpu1-err.log (67 is the size of validation batches)

Epoch 1/2 : |███████████████████-| 99.63% [266/267 00:48<00:00 17.3611]
  • 267 is the number of train batches.
  • 67 is the number of validation batches.

If I now use pyrasite to get the trace, the first process is stuck in:

  File "./lm-distr-pt2.py", line 85, in <module>
    learn.fit_one_cycle(2, 1e-2, moms=(0.8,0.7))
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/train.py", line 22, in fit_one_cycle
    learn.fit(cyc_len, max_lr, wd=wd, callbacks=callbacks)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/basic_train.py", line 200, in fit
    fit(epochs, self, metrics=self.metrics, callbacks=self.callbacks+callbacks)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/basic_train.py", line 106, in fit
    cb_handler=cb_handler, pbar=pbar)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/basic_train.py", line 63, in validate
    if cb_handler and cb_handler.on_batch_end(val_losses[-1]): break
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/callback.py", line 308, in on_batch_end
    self('batch_end', call_mets = not self.state_dict['train'])
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/callback.py", line 250, in __call__
    for met in self.metrics: self._call_and_update(met, cb_name, **kwargs)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/callback.py", line 241, in _call_and_update
    new = ifnone(getattr(cb, f'on_{cb_name}')(**self.state_dict, **kwargs), dict())
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/callback.py", line 349, in on_batch_end
    self.val += first_el(last_target).size(0) * val.detach().cpu()

the 2nd in:

  File "./lm-distr-pt2.py", line 85, in <module>
    learn.fit_one_cycle(2, 1e-2, moms=(0.8,0.7))
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/train.py", line 22, in fit_one_cycle
    learn.fit(cyc_len, max_lr, wd=wd, callbacks=callbacks)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/basic_train.py", line 200, in fit
    fit(epochs, self, metrics=self.metrics, callbacks=self.callbacks+callbacks)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/basic_train.py", line 101, in fit
    loss = loss_batch(learn.model, xb, yb, learn.loss_func, learn.opt, cb_handler)
  File "/mnt/nvme1/fast.ai-1/br/fastai/master/fastai/basic_train.py", line 38, in loss_batch
    return loss.detach().cpu()

And the problem has to do with the split - if the number of batches is identical all works fine. If one GPU has one extra batch it gets stuck in that last batch. Some kind of sync issue.

So far with my particular data I noticed that if bs is small the split is uneven, if bs is larger it is then even.

1 Like

OK, got it sorted out (we are talking about Language Modeling only):

The problem

  1. LanguageModelPreLoader takes N input text items and transforms them into M items, based on the length of all text across all items of the dataset - there is some complex code that does it, but all that matters is when you create a databunch from say 4000 items, you end up with ~7500 items in the dataloader (the actual numbers can be totally different, these are just from the sample I worked with).

    LanguageModelPreLoader code is deterministic, but it depends on how the train and validation sets were split, if the items are not of the same length - which is almost always the case with text. So sometimes the total length of the text in the training set is longer, sometimes shorter (while the number of inputs is the same).

  2. while split_by_rand_pct(0.2) consistently splits the N items into 2 groups of deterministic size (say 3200+800), the total length of the text contained in each group is different, since each item’s length is different, so we end up with a different number of M items, once LanguageModelPreLoader did its magic. And that leads to a non-deterministic number of batches.

When 2+ processes in a distributed setup have a different number of batches, everything breaks - CPU and GPU just spin at 100% right after the first training or after the first epoch. So this situation must not be allowed.

Solutions

One possible solution: adding a fixed seed in split_by_rand_pct(0.2, seed=42) solves this problem, because now LanguageModelPreLoader gets the same groups of items in each process.

The issue with that solution is that now we lose the randomality, which is essential when training things. (but perhaps not that important in the specific case of training a language model).

Potential workaround: randomize the seed at command line and pass it to all processes and have np.random.seed(seed). I think this one makes sense since it’s really a single process split over several processes, so the same seed is fine.

I’m not sure how this can be fixed on systemic level. Perhaps via fastai.launch. But many use torch.distributed.launch and that’s what the distributed tutorial of fast.ai recommends. So in that case the solution would look something like:

python -m torch.distributed.launch --nproc_per_node=2 \
./my-lm-training-prog.py --seed=`shuf -i 0-999 -n1`

(shuf is in the core-utils apt package, but you can use many other ways to set up a random number, e.g. od -A n -t d -N 1 /dev/urandom or tr -cd "[:digit:]" < /dev/urandom | head -c 6 - google for more solutions, e.g. here).

and in my-lm-training-prog.py add the option for the new argument in addition to the required --local_rank:

    parser = argparse.ArgumentParser()
    parser.add_argument("--local_rank", type=int)
    parser.add_argument("--seed", type=int)
    args = parser.parse_args()
    seed = args.seed
    # ensure LanguageModelPreLoader generates the same # of batches per process
    np.random.seed(seed) # import numpy as np if needed

I think it’s better to do it explicitly than delegating it to split_by_rand_pct(0.2, seed=seed) since it does the exact thing but hides it.

Debugging

Note, that if your process gets stuck after the training or validation of the 1st epoch most likely you have this issue. If this is your case, print out:

print(f"{len(data_lm.train_dl)}/{len(data_lm.valid_dl)}")

in each of the processes and see that they are identical.

And if you want to experiment and see how the number of dl items gets changed (assuming your texts are of different length), run the following with your path and text csv:

path = "/your/path"
fn = "your-texts.csv"
tl = TextList.from_csv(path, fn, cols='texts')
l = []
for i in range(10):
    data_lm = tl.split_by_rand_pct(0.2).label_for_lm().databunch()
    l.append(f"{len(data_lm.train_dl)}/{len(data_lm.valid_dl)}")
print(l)

w/o the fixed random see I got:

['114/29', '116/27', '114/29', '113/30', '115/28', '113/30', '115/29', '114/29', '114/29', '114/29']

which is what the distributed processes are likely to get and it’ll break.

The randomness essential to training things doesn’t apply to your validation set. Very often it’s fixed in academic datasets and split_by_rand_pct is a mere convenience function to quickly isolate a validation set.

Now distributed can’t work in general if you don’t fix the same splits for all processes, it might not bug and even give some good stats but those would be misleading. The way DistributedSampler works (which is the thing used behind the scenes to dispatch the samples in batches), it assumes your dataset is exactly the same, and if it’ shuffled, they use a see to shuffle it the same way for each process.

Why? Because that distributed sampler then tells each process to get some samples, which should together form a batch (they all get different samples). If your datasets aren’t exactly the same, you might end up with batches that contain several time the same thing, don’t loop over all the data or mix and match training and validation set…

1 Like

The trouble is that once you run np.random.seed(seed) you are likely to affect other situations, where randomness is essential. That’s why I don’t think having a fixed seed like 42 is a good idea and proposed a randomized seed that is then shared across all processes. Thoughts?

The bottom line - what should be the recipe for users to follow in fast.ai’s distributed doc?

Now distributed can’t work in general if you don’t fix the same splits for all processes, it might not bug and even give some good stats but those would be misleading. The way DistributedSampler works (which is the thing used behind the scenes to dispatch the samples in batches), it assumes your dataset is exactly the same, and if it’ shuffled, they use a see to shuffle it the same way for each process.

Why? Because that distributed sampler then tells each process to get some samples, which should together form a batch (they all get different samples). If your datasets aren’t exactly the same, you might end up with batches that contain several time the same thing, don’t loop over all the data or mix and match training and validation set…

That’s super-useful, @sgugger. Thank you! Let’s make sure these are added to https://docs.fast.ai/distributed.html.

Interesting. I think users should then manually create one random split once in a separate script/notebook and save the outcome in a file/csv, then use a label_from_file/label_from_csv way of splitting.

Note that a seed will be fixed by PyTorch in each process for the splitting if your training set is shuffled (which it should be) so the effort to get rid of that seed call in split_by_rand_pct might be pointless.

1 Like

My point/question wasn’t about whether to do it or not, or how. I was just proposing to use the same randomized seed at the start of each process, rather than a hardcoded in the code seed. Does this clarify?

1 Like

Oh yes, that works. Sorry I misunderstood.

1 Like

One workaround I’ve used in the past is a seed like Math.round(Date.now()/5000) that is different every run in practice but would be the same between processes so long as they’re started in the same 5-second window.

(Sorry for JS-syntax; can’t remember the Python way for epoch time off the top of my head)

1 Like