Custom ItemList, getting ForkingPickler broken pipe

thanks.

I just updated fastai from git because this command did not include the ‘test_df’ parameter:

data3 = TabularDataBunch.from_df(CUR_DIR, train_df, dep_var, test_df=test_df, valid_idx=valid_idx, procs=procs, cat_names=cat_names, cont_names=cont_names)

Now the test_df does get added as I can see using:

data3.test_ds

which produces:

LabelList (200000 items)
x: TabularList
var_0 11.0625; var_1 7.7812; ,var_0 8.5312; var_1 1.2539; ,var_0 5.4844; var_1 -10.3594; ,var_0 8.5391; var_1 -1.3223; ,var_0 11.7031; var_1 -0.1327; 
y: EmptyLabelList
,,,,
Path: c:\data\code\jupyter\kaggle_santander

However, when I run:

data3.show_batch(rows=10,ds_type=DatasetType.Train)

or try to do an lr_find I get a BrokenPipe like:

---------------------------------------------------------------------------
BrokenPipeError                           Traceback (most recent call last)
<ipython-input-9-8982f998d792> in <module>
----> 1 data3.show_batch(rows=10,ds_type=DatasetType.Train)

C:\Anaconda3\envs\fastai-latest\lib\site-packages\fastai\basic_data.py in show_batch(self, rows, ds_type, reverse, **kwargs)
    183     def show_batch(self, rows:int=5, ds_type:DatasetType=DatasetType.Train, reverse:bool=False, **kwargs)->None:
    184         "Show a batch of data in `ds_type` on a few `rows`."
--> 185         x,y = self.one_batch(ds_type, True, True)
    186         if reverse: x,y = x.flip(0),y.flip(0)
    187         n_items = rows **2 if self.train_ds.x._square_show else rows

C:\Anaconda3\envs\fastai-latest\lib\site-packages\fastai\basic_data.py in one_batch(self, ds_type, detach, denorm, cpu)
    166         w = self.num_workers
    167         self.num_workers = 0
--> 168         try:     x,y = next(iter(dl))
    169         finally: self.num_workers = w
    170         if detach: x,y = to_detach(x,cpu=cpu),to_detach(y,cpu=cpu)

C:\Anaconda3\envs\fastai-latest\lib\site-packages\fastai\basic_data.py in __iter__(self)
     73     def __iter__(self):
     74         "Process and returns items from `DataLoader`."
---> 75         for b in self.dl: yield self.proc_batch(b)
     76 
     77     @classmethod

C:\Anaconda3\envs\fastai-latest\lib\site-packages\torch\utils\data\dataloader.py in __iter__(self)
    817 
    818     def __iter__(self):
--> 819         return _DataLoaderIter(self)
    820 
    821     def __len__(self):

C:\Anaconda3\envs\fastai-latest\lib\site-packages\torch\utils\data\dataloader.py in __init__(self, loader)
    558                 #     before it starts, and __del__ tries to join but will get:
    559                 #     AssertionError: can only join a started process.
--> 560                 w.start()
    561                 self.index_queues.append(index_queue)
    562                 self.workers.append(w)

C:\Anaconda3\envs\fastai-latest\lib\multiprocessing\process.py in start(self)
    103                'daemonic processes are not allowed to have children'
    104         _cleanup()
--> 105         self._popen = self._Popen(self)
    106         self._sentinel = self._popen.sentinel
    107         # Avoid a refcycle if the target function holds an indirect

C:\Anaconda3\envs\fastai-latest\lib\multiprocessing\context.py in _Popen(process_obj)
    221     @staticmethod
    222     def _Popen(process_obj):
--> 223         return _default_context.get_context().Process._Popen(process_obj)
    224 
    225 class DefaultContext(BaseContext):

C:\Anaconda3\envs\fastai-latest\lib\multiprocessing\context.py in _Popen(process_obj)
    320         def _Popen(process_obj):
    321             from .popen_spawn_win32 import Popen
--> 322             return Popen(process_obj)
    323 
    324     class SpawnContext(BaseContext):

C:\Anaconda3\envs\fastai-latest\lib\multiprocessing\popen_spawn_win32.py in __init__(self, process_obj)
     63             try:
     64                 reduction.dump(prep_data, to_child)
---> 65                 reduction.dump(process_obj, to_child)
     66             finally:
     67                 set_spawning_popen(None)

C:\Anaconda3\envs\fastai-latest\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
     58 def dump(obj, file, protocol=None):
     59     '''Replacement for pickle.dump() using ForkingPickler.'''
---> 60     ForkingPickler(file, protocol).dump(obj)
     61 
     62 #

BrokenPipeError: [Errno 32] Broken pipe

I have updated all the packages, tried Pytorch 1.0.0 and 1.0.1 and still get this problem. Obviously I’m running on Windows. Any help would be appreciated.

1 Like

That’s generally the multiprocessing in windows with PyTorch, so you should try with num_workers=0.

1 Like

Thanks, I was just figuring that out myself. :slight_smile: Using:

data3 = TabularDataBunch.from_df(CUR_DIR, train_df, dep_var, test_df=test_df, valid_idx=valid_idx, procs=procs, cat_names=cat_names, cont_names=cont_names, num_workers=0)

makes the error go away. Yay. Will that affect performance at all ?

1 Like

Hi, all of this pain is a combination of:

  • windows
  • multiprocessing
  • jupyter

If you skip one these points, it’s quite easy:

  • If no multiprocessing is fast enough, set num_workers=0 .
  • If you don’t use jupyter, protect the multiprocessing parts with a
if __name__ == '__main__':
    # create your dataloader here

block.

If you want all three, you also have to exclude your functions in another file. See https://stackoverflow.com/questions/47313732/jupyter-notebook-never-finishes-processing-using-multiprocessing-python-3

There is also a dynamic solution, but I haven’t tried it.

2 Likes

Thanks @stas

I have updated to fastai 1.0.49. now it starts building the model and then gives: [Errno 32] Broken pipe

then I followed the recommendation from this forum to put my code inside a function and call it via
‘’’
if name==‘main’:
train()
‘’’

but still I get the same exact error. here is my environment and the error I get :

=== Software === 
python        : 3.7.1
fastai        : 1.0.49
fastprogress  : 0.1.20
torch         : 1.0.1
torch cuda    : 10.0 / is available
torch cudnn   : 7401 / is enabled

=== Hardware === 
torch devices : 1
  - gpu0      : GeForce GTX 1080 with Max-Q Design

=== Environment === 
platform      : Windows-10-10.0.16299-SP0
conda env     : base
python        : C:\ProgramData\Anaconda3\python.exe
sys.path      : C:\Users\sshahinf\Desktop\FastAI_2\venvpy37\Scripts
C:\ProgramData\Anaconda3\python37.zip
C:\ProgramData\Anaconda3\DLLs
C:\ProgramData\Anaconda3\lib
C:\ProgramData\Anaconda3

C:\ProgramData\Anaconda3\lib\site-packages
C:\ProgramData\Anaconda3\lib\site-packages\win32
C:\ProgramData\Anaconda3\lib\site-packages\win32\lib
C:\ProgramData\Anaconda3\lib\site-packages\Pythonwin
C:\ProgramData\Anaconda3\lib\site-packages\IPython\extensions
C:\Users\sshahinf\.ipython
no nvidia-smi is found

‘’’ text

0.00% [0/2 00:00<00:00]

epoch

train_loss

valid_loss

accuracy

error_rate

time

Interrupted


BrokenPipeError Traceback (most recent call last)
in
1 if name==‘main’:
----> 2 train()

in train()
16 )
17
—> 18 learnS.fit_one_cycle(epochS, max_lr=maxLR, moms =[0.95, 0.85], div_factor = 25.0)
19 S= mName_S

C:\ProgramData\Anaconda3\lib\site-packages\fastai\train.py in fit_one_cycle(learn, cyc_len, max_lr, moms, div_factor, pct_start, final_div, wd, callbacks, tot_epochs, start_epoch)
20 callbacks.append(OneCycleScheduler(learn, max_lr, moms=moms, div_factor=div_factor, pct_start=pct_start,
21 final_div=final_div, tot_epochs=tot_epochs, start_epoch=start_epoch))
—> 22 learn.fit(cyc_len, max_lr, wd=wd, callbacks=callbacks)
23
24 def lr_find(learn:Learner, start_lr:Floats=1e-7, end_lr:Floats=10, num_it:int=100, stop_div:bool=True, wd:float=None):

C:\ProgramData\Anaconda3\lib\site-packages\fastai\basic_train.py in fit(self, epochs, lr, wd, callbacks)
194 callbacks = [cb(self) for cb in self.callback_fns] + listify(callbacks)
195 if defaults.extra_callbacks is not None: callbacks += defaults.extra_callbacks
–> 196 fit(epochs, self, metrics=self.metrics, callbacks=self.callbacks+callbacks)
197
198 def create_opt(self, lr:Floats, wd:Floats=0.)->None:

C:\ProgramData\Anaconda3\lib\site-packages\fastai\basic_train.py in fit(epochs, learn, callbacks, metrics)
96 cb_handler.set_dl(learn.data.train_dl)
97 cb_handler.on_epoch_begin()
—> 98 for xb,yb in progress_bar(learn.data.train_dl, parent=pbar):
99 xb, yb = cb_handler.on_batch_begin(xb, yb)
100 loss = loss_batch(learn.model, xb, yb, learn.loss_func, learn.opt, cb_handler)

C:\ProgramData\Anaconda3\lib\site-packages\fastprogress\fastprogress.py in iter(self)
64 self.update(0)
65 try:
—> 66 for i,o in enumerate(self._gen):
67 yield o
68 if self.auto_update: self.update(i+1)

C:\ProgramData\Anaconda3\lib\site-packages\fastai\basic_data.py in iter(self)
73 def iter(self):
74 “Process and returns items from DataLoader.”
—> 75 for b in self.dl: yield self.proc_batch(b)
76
77 @classmethod

C:\ProgramData\Anaconda3\lib\site-packages\torch\utils\data\dataloader.py in iter(self)
817
818 def iter(self):
–> 819 return _DataLoaderIter(self)
820
821 def len(self):

C:\ProgramData\Anaconda3\lib\site-packages\torch\utils\data\dataloader.py in init(self, loader)
558 # before it starts, and del tries to join but will get:
559 # AssertionError: can only join a started process.
–> 560 w.start()
561 self.index_queues.append(index_queue)
562 self.workers.append(w)

C:\ProgramData\Anaconda3\lib\multiprocessing\process.py in start(self)
110 ‘daemonic processes are not allowed to have children’
111 _cleanup()
–> 112 self._popen = self._Popen(self)
113 self._sentinel = self._popen.sentinel
114 # Avoid a refcycle if the target function holds an indirect

C:\ProgramData\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
221 @staticmethod
222 def _Popen(process_obj):
–> 223 return _default_context.get_context().Process._Popen(process_obj)
224
225 class DefaultContext(BaseContext):

C:\ProgramData\Anaconda3\lib\multiprocessing\context.py in _Popen(process_obj)
320 def _Popen(process_obj):
321 from .popen_spawn_win32 import Popen
–> 322 return Popen(process_obj)
323
324 class SpawnContext(BaseContext):

C:\ProgramData\Anaconda3\lib\multiprocessing\popen_spawn_win32.py in init(self, process_obj)
63 try:
64 reduction.dump(prep_data, to_child)
—> 65 reduction.dump(process_obj, to_child)
66 finally:
67 set_spawning_popen(None)

C:\ProgramData\Anaconda3\lib\multiprocessing\reduction.py in dump(obj, file, protocol)
58 def dump(obj, file, protocol=None):
59 ‘’‘Replacement for pickle.dump() using ForkingPickler.’’’
—> 60 ForkingPickler(file, protocol).dump(obj)
61
62 #

BrokenPipeError: [Errno 32] Broken pipe

‘’’

Sorry, I don’t know anything about windows. I trust @sgugger will have some ideas.

In the future please file those directly as Issues on github so that they are easy to track and for other people to find solutions to their similar problems.

No need to fill an issue in fastai for BrokenPipeError they come from the combination of

  • multiprocess
  • jupyter notebook
  • windows

Remove one of those and they’ll disappear. There’s nothing in the library that causes them.

thank you!
by ** multiprocess** you mean multiple GPU?

No using multiple processes (which is done by default to speed things up). You should set num_workers=0 when creating a DataBunch to remove it. More details here.

2 Likes

Thank you @sgugger !
problem solved now after adding num_workers=0 to DataBunch constructor.

Putting my code creating the ItemList and databunch inside a function inside a new python file and importing that file in my jupyter notebook seemed to remove the need to put num_workers=0.

Just curious… do you notice any performance difference using this workaround? I found it also worked for me but had zero effect on training:

Both loss, error_rate, and training time were identical.

Edit: I had an error in my import file and my learner was using the old (identical) databunch. After fixing the error and confirming the import file was creating the new databunch correctly, I kept getting the same BrokenPipeError.

My stuff used images and I think image transformations are what takes a lot of time if you have num_workers=0 because you don’t take advantage of multiple GPUs. But I have not done any empirical tests, but it felt a lot slower with num_workers=0.

But if you want to be 100% sure, just surround the code using the data block api inside this if (inside your jupyter notebook):

if name == ‘main’:
# your dataloading stuff here

Its crazy.
On windows 7 & 10, pytorch 1.0.1 & fastai 1.0.47 throws no errors. Uses 100% of CPU, resulting in very fast training.

On windows 8, pytorch 1.0.0 & fastai 1.0.47 works having the same effect.

For installing on Windows 8 (CUDA 9),

conda create -n fastai python=3.6
activate fastai
pip install https://download.pytorch.org/whl/cu90/torch-1.0.0-cp36-cp36m-win_amd64.whl
pip install torchvision
pip install fastai==1.0.47

For installing on Windows 7, 10 (CUDA 9),

conda create -n fastai python=3.6
activate fastai
pip install https://download.pytorch.org/whl/cu90/torch-1.0.1-cp36-cp36m-win_amd64.whl
pip install torchvision
pip install fastai==1.0.47

Hope this helps.

@ashwinakannan I’m running on Win10, but pytorch 1.0.1 fastai 1.0.47 did not resolve the issue for me. I’m still relying on num_workers=0 to workaround the ForkingPicker broken pipe error.

Have you tried latest version from master? I found an issue regarding that yesterday and committed a fix. But it seems like it needed some adjustments to make it work for google collab:

Thank you, @etremblay. That fixed my issue!

1 Like

@sgugger I am almost finished with a tutorial on how to use MixedItemList with the PetFinder dataset.. I want to release my notebook as a potential tutorial for fastai eventually. The only part left is predicting on the test dataset to make a submission to the kaggle competition but I am having problems with inference for the test set.

learn.export('mixed.pkl')

imgList = ImageList.from_df(petsTest, path=path, cols='PicturePath')
tabList = TabularList.from_df(petsTest, cat_names=cat_names, cont_names=cont_names, procs=procs, path=path)
textList = TextList.from_df(petsTest, cols='Description', path=path, vocab=vocab)

norm, denorm = normalize_custom_funcs(*imagenet_stats)

mixedTest = (MixedItemList([imgList, tabList, textList], path, inner_df=tabList.inner_df))

learn = load_learner(path, 'mixed.pkl', text=mixedTest)

But then I get this exception… Not too sure whatI am doing wrong, I followed the instructions from https://docs.fast.ai/tutorial.inference.html, but maybe there’s something I am missing…

---------------------------------------------------------------------------
IndexError                                Traceback (most recent call last)
<ipython-input-92-2ffbca749f38> in <module>
----> 1 learn = load_learner(path, 'mixed.pkl', text=mixedTest)

c:\work\ml\fastai-dev\fastai\fastai\basic_train.py in load_learner(path, file, test, **db_kwargs)
    595     state = torch.load(source, map_location='cpu') if defaults.device == torch.device('cpu') else torch.load(source)
    596     model = state.pop('model')
--> 597     src = LabelLists.load_state(path, state.pop('data'))
    598     if test is not None: src.add_test(test)
    599     data = src.databunch(**db_kwargs)

c:\work\ml\fastai-dev\fastai\fastai\data_block.py in load_state(cls, path, state)
    563         "Create a `LabelLists` with empty sets from the serialized `state`."
    564         path = Path(path)
--> 565         train_ds = LabelList.load_state(path, state)
    566         valid_ds = LabelList.load_state(path, state)
    567         return LabelLists(path, train=train_ds, valid=valid_ds)

c:\work\ml\fastai-dev\fastai\fastai\data_block.py in load_state(cls, path, state)
    673     def load_state(cls, path:PathOrStr, state:dict) -> 'LabelList':
    674         "Create a `LabelList` from `state`."
--> 675         x = state['x_cls']([], path=path, processor=state['x_proc'], ignore_empty=True)
    676         y = state['y_cls']([], path=path, processor=state['y_proc'], ignore_empty=True)
    677         res = cls(x, y, tfms=state['tfms'], tfm_y=state['tfm_y'], **state['tfmargs']).process()

c:\work\ml\fastai-dev\fastai\fastai\data_block.py in __init__(self, item_lists, path, label_cls, inner_df, x, ignore_empty, processor)
    769         if processor is None:
    770             processor = MixedProcessor([ifnone(il.processor, dp) for il,dp in zip(item_lists, default_procs)])
--> 771         super().__init__(range_of(item_lists[0]), processor=processor, path=ifnone(path, item_lists[0].path), 
    772                          label_cls=label_cls, inner_df=inner_df, x=x, ignore_empty=ignore_empty)
    773 

IndexError: list index out of range

Any pointers would be helpful!

Thanks!

Debugging the code, it seems like item_lists is empty in MixedItemList in the constuctor. The code is using item_lists[0] which is making it crash.