BrokenProcessPool Error for TextLMDataBunch.from_csv() function

data_lm = TextLMDataBunch.from_csv('./','proc_notes',max_vocab=10000,chunksize=512)
data_lm.save()

I am getting the following error even though I am using 104GB RAM (Google cloud instance), My proc_notes CSV file is 8GB containing two columns namely “labels” and “Text”.

Traceback (most recent call last):                                                      
  File "diag_lm.py", line 9, in <module>
    data_lm = TextLMDataBunch.from_csv('./','proc_notes',max_vocab=10000,chunksize=512)
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/text/data.py", line 221, in from_csv
    label_cols, label_delim, **kwargs)
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/text/data.py", line 205, in from_df
    if cls==TextLMDataBunch: src = src.label_for_lm() 
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py", line 425, in _inner
    self.process()
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py", line 472, in process
    for ds,n in zip(self.lists, ['train','valid','test']): ds.process(xp, yp, name=n)
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py", line 627, in process
    self.x.process(xp)
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py", line 68, in process
    for p in self.processor: p.process(self)
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/text/data.py", line 285, in process
    tokens += self.tokenizer.process_all(ds.items[i:i+self.chunksize])
  File "/opt/anaconda3/lib/python3.7/site-packages/fastai/text/transform.py", line 116, in process_all
   return sum(e.map(self._process_all_1, partition_by_cores(texts, self.n_cpus)), [])
  File "/opt/anaconda3/lib/python3.7/concurrent/futures/process.py", line 476, in 
_chain_from_iterable_of_lists
    for element in iterable:
  File "/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
1 Like

I’m getting the same error message using TextLMDataBunch.from_csv() to try loading a 4.5GB file, though I have only 62GB Ram. It seems to use all the RAM during the attempted load, and only crashes after 1-2 hours of trying, with the progress bar showing over 50% loaded. It is also using all 6 cores at 100% each, of my single Intel CPU. The command didn’t fail when I limited my input file to the first 100 lines. Here is my command and traceback:

This is the code that failed

path = datapath4file(’/media/DataHD2/Notes_PHI_20190121/notes_dana_hp’)
data_lm = TextLMDataBunch.from_csv(path=path, csv_name=‘notes_hp_half.txt’, text_cols=‘note_text’,
header=0)

This is the traceback when it crashed



BrokenProcessPool Traceback (most recent call last)
in
4 path = datapath4file(’/media/DataHD2/Notes_PHI_20190121/notes_dana_hp’)
5 data_lm = TextLMDataBunch.from_csv(path=path, csv_name=‘notes_hp_half.txt’, text_cols=‘note_text’,
----> 6 header=0)
7 # notes_hp_half.txt = 2,600,000 rows; 4,382,461,616 bytes
8 # expected time: 1 hr 48 min

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/text/data.py in from_csv(cls, path, csv_name, valid_pct, test, tokenizer, vocab, classes, header, text_cols, label_cols, label_delim, **kwargs)
219 test_df = None if test is None else pd.read_csv(Path(path)/test, header=header)
220 return cls.from_df(path, train_df, valid_df, test_df, tokenizer, vocab, classes, text_cols,
–> 221 label_cols, label_delim, **kwargs)
222
223 @classmethod

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/text/data.py in from_df(cls, path, train_df, valid_df, test_df, tokenizer, vocab, classes, text_cols, label_cols, label_delim, **kwargs)
203 src = ItemLists(path, TextList.from_df(train_df, path, cols=text_cols, processor=processor),
204 TextList.from_df(valid_df, path, cols=text_cols, processor=processor))
–> 205 if cls==TextLMDataBunch: src = src.label_for_lm()
206 else: src = src.label_from_df(cols=label_cols, classes=classes, label_delim=label_delim)
207 if test_df is not None: src.add_test(TextList.from_df(test_df, path, cols=text_cols))

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/data_block.py in _inner(*args, **kwargs)
423 self.valid = fv(*args, **kwargs)
424 self.class = LabelLists
–> 425 self.process()
426 return self
427 return _inner

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/data_block.py in process(self)
470 “Process the inner datasets.”
471 xp,yp = self.get_processors()
–> 472 for ds,n in zip(self.lists, [‘train’,‘valid’,‘test’]): ds.process(xp, yp, name=n)
473 #progress_bar clear the outputs so in some case warnings issued during processing disappear.
474 for ds in self.lists:

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/data_block.py in process(self, xp, yp, name)
625 p.warns = []
626 self.x,self.y = self.x[~filt],self.y[~filt]
–> 627 self.x.process(xp)
628 return self
629

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/data_block.py in process(self, processor)
66 if processor is not None: self.processor = processor
67 self.processor = listify(self.processor)
—> 68 for p in self.processor: p.process(self)
69 return self
70

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/text/data.py in process(self, ds)
283 tokens = []
284 for i in progress_bar(range(0,len(ds),self.chunksize), leave=False):
–> 285 tokens += self.tokenizer.process_all(ds.items[i:i+self.chunksize])
286 ds.items = tokens
287

~/anaconda3/envs/fastaiv1/lib/python3.6/site-packages/fastai/text/transform.py in process_all(self, texts)
114 if self.n_cpus <= 1: return self._process_all_1(texts)
115 with ProcessPoolExecutor(self.n_cpus) as e:
–> 116 return sum(e.map(self._process_all_1, partition_by_cores(texts, self.n_cpus)), [])
117
118 class Vocab():

~/anaconda3/envs/fastaiv1/lib/python3.6/concurrent/futures/process.py in _chain_from_iterable_of_lists(iterable)
364 careful not to keep references to yielded objects.
365 “”"
–> 366 for element in iterable:
367 element.reverse()
368 while element:

~/anaconda3/envs/fastaiv1/lib/python3.6/concurrent/futures/_base.py in result_iterator()
584 # Careful not to keep a reference to the popped future
585 if timeout is None:
–> 586 yield fs.pop().result()
587 else:
588 yield fs.pop().result(end_time - time.monotonic())

~/anaconda3/envs/fastaiv1/lib/python3.6/concurrent/futures/_base.py in result(self, timeout)
430 raise CancelledError()
431 elif self._state == FINISHED:
–> 432 return self.__get_result()
433 else:
434 raise TimeoutError()

~/anaconda3/envs/fastaiv1/lib/python3.6/concurrent/futures/_base.py in __get_result(self)
382 def __get_result(self):
383 if self._exception:
–> 384 raise self._exception
385 else:
386 return self._result

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

You should try to use less CPUs in that case, or to remove objects you don’t need in RAM before launching this command.
To control the number of CPUS used: `defaults.cpus=…"

1 Like

I am getting the same error,

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-14-298f3e65eb89> in <module>
      1 #Data Preparation
      2 # Language model data
----> 3 data_lm = TextLMDataBunch.from_df(train_df = df, valid_df = df, text_cols = "DiscrepancyDescription", label_cols = "ScoreDesc", path = "")

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\text\data.py in from_df(cls, path, train_df, valid_df, test_df, tokenizer, vocab, classes, text_cols, label_cols, label_delim, chunksize, max_vocab, min_freq, mark_fields, include_bos, include_eos, **kwargs)
    200         src = ItemLists(path, TextList.from_df(train_df, path, cols=text_cols, processor=processor),
    201                         TextList.from_df(valid_df, path, cols=text_cols, processor=processor))
--> 202         if cls==TextLMDataBunch: src = src.label_for_lm()
    203         else: src = src.label_from_df(cols=label_cols, classes=classes, label_delim=label_delim)
    204         if test_df is not None: src.add_test(TextList.from_df(test_df, path, cols=text_cols))

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\data_block.py in _inner(*args, **kwargs)
    463             self.valid = fv(*args, from_item_lists=True, **kwargs)
    464             self.__class__ = LabelLists
--> 465             self.process()
    466             return self
    467         return _inner

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\data_block.py in process(self)
    517         "Process the inner datasets."
    518         xp,yp = self.get_processors()
--> 519         for ds,n in zip(self.lists, ['train','valid','test']): ds.process(xp, yp, name=n)
    520         #progress_bar clear the outputs so in some case warnings issued during processing disappear.
    521         for ds in self.lists:

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\data_block.py in process(self, xp, yp, name)
    694                     p.warns = []
    695                 self.x,self.y = self.x[~filt],self.y[~filt]
--> 696         self.x.process(xp)
    697         return self
    698 

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\data_block.py in process(self, processor)
     73         if processor is not None: self.processor = processor
     74         self.processor = listify(self.processor)
---> 75         for p in self.processor: p.process(self)
     76         return self
     77 

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\text\data.py in process(self, ds)
    292         tokens = []
    293         for i in progress_bar(range(0,len(ds),self.chunksize), leave=False):
--> 294             tokens += self.tokenizer.process_all(ds.items[i:i+self.chunksize])
    295         ds.items = tokens
    296 

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\site-packages\fastai\text\transform.py in process_all(self, texts)
    118         if self.n_cpus <= 1: return self._process_all_1(texts)
    119         with ProcessPoolExecutor(self.n_cpus) as e:
--> 120             return sum(e.map(self._process_all_1, partition_by_cores(texts, self.n_cpus)), [])
    121 
    122 class Vocab():

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\concurrent\futures\process.py in _chain_from_iterable_of_lists(iterable)
    364     careful not to keep references to yielded objects.
    365     """
--> 366     for element in iterable:
    367         element.reverse()
    368         while element:

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\concurrent\futures\_base.py in result_iterator()
    584                     # Careful not to keep a reference to the popped future
    585                     if timeout is None:
--> 586                         yield fs.pop().result()
    587                     else:
    588                         yield fs.pop().result(end_time - time.monotonic())

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\concurrent\futures\_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433             else:
    434                 raise TimeoutError()

~\AppData\Local\Continuum\anaconda3\envs\py367nlp\lib\concurrent\futures\_base.py in __get_result(self)
    382     def __get_result(self):
    383         if self._exception:
--> 384             raise self._exception
    385         else:
    386             return self._result

BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Not sure If this is due to updating Fastai to 1.0.50.post1
Did update the `defaults.cpus = 6’ - not working.

Please advise

Quick update.
Updated to fastai == 1.0.51 in a new environment.
The BrokenProcessPool error with TextLMDataBunch.from_df is gone.

At least for Windows 10 64-bit users, the bug is still not fixed.
I have Fastai version 1.0.58.1, but I still get the BrokenProcessPool error almost any time I access Fastai’s Text API. And it’s a stochastically occurring error – it happens sometimes but not all the time. Which is why I am calling it a bug.

On Windows machines passing num_workers=1 to the databunch fixes the issue.

Thanks, I’ll try this. But it is a suboptimal solution. Why shouldn’t a Windows user be able to access their GPU in Fastai or PyTorch?

I think you misunderstood, num_workers=1 says the DataBunch to load the data using a single process (1 CPU), GPU training still works fine on Windows. As for the BrokenProcessPool Error the function needs to be wrapped in if __name__=__main__: in Windows, even then it doesn’t work on an interactive environment. I have been facing this issue for a long time.

1 Like

Thanks for the clarification, @vijayabhaskar !

As for fixing the BrokenProcessPool Error:
Is there an equivalent way to wrap the function in if __name__=__main__: within a jupyter notebook?

I don’t think so, As far as I’ve read online It’s because of how Windows creates the individual processes, There is no way each process to know which one of them is the main process, so in interactive environments this gets even more complex.

1 Like

I checked the memory consumption of XYZDataBunch.from_csv and it causes very short spikes that are perhaps 4-10 times as memory intense than the normal memory needed during ‘from_csv’. The consequence of the memory shortage seems that some of the processes get killed and thus we get the ‘A process in the process pool was terminated abruptly while the future was running or pending’.
So to me it seems that the implementation should be changed such that these spikes get reduced greatly.
How can I request for such a change request best? On ‘https://docs.fast.ai/support.html’ they write that requests should get discussed in the forum, but not sure if this conversation will ever get picked up by a developer.