Getting MemoryError when trying to create TextList

The code is most likely getting stuck after the labeling (there are no direct labels for a LM) when the pre-processing begins (which is called here). Then you go in the TokenizeProcessor which is very likely your cuplrit.

1 Like

Is that called from inside of label_for_lm? Really having trouble following the flow. I split it up to make sure it was within there that itā€™s bogged down.

Is there any way to parallelize this? Itā€™s pegging only 1 of the 96 cores at 100% CPU and saying itā€™ll take 97 hours for label_for_lm to complete. 1 hour split across all 96 cores would be a lot better (Iā€™m using a pre-emptible instance on Google Cloud which will get terminated after 24 hours so waiting 97 hours wonā€™t work even w/ lots of patience). Happy to try modifying the source but unfortunately need another point in the right direction.

Edit: Maybe Iā€™m reading the output of top wrong. I always thought thatā€™d bbe >1 if it was using multiple cores. But when I interrupted the processing it has a whole bunch of parallel-related functions in the stack trace. So maybe itā€™s already doing parallelization.

---------------------------------------------------------------------------
BrokenProcessPool                         Traceback (most recent call last)
<ipython-input-16-a969ef4ea37c> in <module>
----> 1 tl.label_for_lm()

/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py in _inner(*args, **kwargs)
    373             self.valid = fv(*args, **kwargs)
    374             self.__class__ = LabelLists
--> 375             self.process()
    376             return self
    377         return _inner

/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py in process(self)
    420         "Process the inner datasets."
    421         xp,yp = self.get_processors()
--> 422         for i,ds in enumerate(self.lists): ds.process(xp, yp, filter_missing_y=i==0)
    423         return self
    424 

/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py in process(self, xp, yp, filter_missing_y)
    519             filt = array([o is None for o in self.y])
    520             if filt.sum()>0: self.x,self.y = self.x[~filt],self.y[~filt]
--> 521         self.x.process(xp)
    522         return self
    523 

/opt/anaconda3/lib/python3.7/site-packages/fastai/data_block.py in process(self, processor)
     65         if processor is not None: self.processor = processor
     66         self.processor = listify(self.processor)
---> 67         for p in self.processor: p.process(self)
     68         return self
     69 

/opt/anaconda3/lib/python3.7/site-packages/fastai/text/data.py in process(self, ds)
    245         tokens = []
    246         for i in progress_bar(range(0,len(ds),self.chunksize), leave=False):
--> 247             tokens += self.tokenizer.process_all(ds.items[i:i+self.chunksize])
    248         ds.items = tokens
    249 

/opt/anaconda3/lib/python3.7/site-packages/fastai/text/transform.py in process_all(self, texts)
    115         if self.n_cpus <= 1: return self._process_all_1(texts)
    116         with ProcessPoolExecutor(self.n_cpus) as e:
--> 117             return sum(e.map(self._process_all_1, partition_by_cores(texts, self.n_cpus)), [])
    118 
    119 class Vocab():

/opt/anaconda3/lib/python3.7/concurrent/futures/process.py in map(self, fn, timeout, chunksize, *iterables)
    643         results = super().map(partial(_process_chunk, fn),
    644                               _get_chunks(*iterables, chunksize=chunksize),
--> 645                               timeout=timeout)
    646         return _chain_from_iterable_of_lists(results)
    647 

/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py in map(self, fn, timeout, chunksize, *iterables)
    573             end_time = timeout + time.monotonic()
    574 
--> 575         fs = [self.submit(fn, *args) for args in zip(*iterables)]
    576 
    577         # Yield must be hidden in closure so that the futures are submitted

/opt/anaconda3/lib/python3.7/concurrent/futures/_base.py in <listcomp>(.0)
    573             end_time = timeout + time.monotonic()
    574 
--> 575         fs = [self.submit(fn, *args) for args in zip(*iterables)]
    576 
    577         # Yield must be hidden in closure so that the futures are submitted

/opt/anaconda3/lib/python3.7/concurrent/futures/process.py in submit(self, fn, *args, **kwargs)
    597         with self._shutdown_lock:
    598             if self._broken:
--> 599                 raise BrokenProcessPool(self._broken)
    600             if self._shutdown_thread:
    601                 raise RuntimeError('cannot schedule new futures after shutdown')

BrokenProcessPool: A child process terminated abruptly, the process pool is not usable anymore

Edit 2: Found Tokenizer n_cpus parameter. Experimenting with raising that.

Edit 3: Yeah, looks like itā€™s not distributing between the available CPUs. Any ideas on how to find out why?

Edit 4: Got it down to about a 2 hour estimate. The problem appears to be that it was spending all its time pickling the data and distributing it to the cores and hardly any time on the cores processing it. I dramatically increased the chunksize parameter and that helped. Going to juice it even more and see how quick it can get.

Got it to ~90 minutes estimate with a chunksize of 4_000_000 ā€“ but getting a bit close for comfort on memory usage. 508GB used, only 53GB free. Ran out of memory on the 2nd chunkā€¦ apparently the size grows as it continues to process. Going to have to play more with chunksize but at least I think Iā€™m on a viable path to getting through tokenization!

Edit 5: still havenā€™t managed to get all the way through to a numerical tokenized output. It keeps exhausting all the memory (which is >10x the size of the dataset). Going to try the subclassing route and see if I can make it actually go in chunks and store to disk then merge them after.

1 Like

So based on last nightā€™s lecture I have a better understanding of what this __getattr__ method is doing. But how do I trace back where this is getting called from? If I understand correctly, it could be triggered by any undefined attribute on any instance of type ItemLists or its subclasses (which I think is only LabelLists).

Whatā€™s the best way to trace that back out to follow the flow?


Separately, I think Iā€™m going to create a PreProcessor subclass that combines TokenizeProcessor with NumericalizeProcessor so it can condense items all the way down to numbers without having to store all the tokens in memory.

It will create a Vocab on the fly and add to it at the end of each process_all chunk as it Numericalizes the tokens. The token will need to appear in the chunk (or a previous chunk) min_freq times to not get converted to xxunk. Then at the end of all the chunks it will constrain the Vocab down to the desired size.

Does that seem like a good approach to tackle the out of memory problems?

Yes, but if you are up to the point you need to do separate tokenization and numericalization because your corpus is that big, it will take a veeeeeeeeeeery long time for your model to train. Wikitext-103 is huge and the whole tokenization process fits in RAM.

1 Like

For reference, what hardware was WT103 trained on and how long did it take? Should I expect training time to scale linearly with data size?

I was able to train ~1/16 of this data in ~20 hours on my 2080ti, was thinking if I used 8x cloud GPUs on 16x the data it should take about 2x as long ā€“ is that estimate wildly inaccurate?

It doesnā€™t always scale linearly when you train on multiple GPUs, so I canā€™t really say. Training WT103 with the standard AWD_LSTM is roughly 1h-1h30 an epoch IIRC, on a p3 (so a V100).

1 Like

Ok so I finally got past the tokenization phase and have a databunch in hand!

It still took quite a bit of memory (peaked at 345GB of the 624 GB available on n1-highmem-96) but it was at least 2x less memory than the standard way with the default processors (which exhausted the available memory on this instance and crashed) and didnā€™t seem to take any longer (it didnā€™t seem to get as ā€œstuckā€ halfway through either).

Itā€™s not strictly equivalent to the default processors but it should be pretty close in most cases. Thereā€™s still a bit more optimization that could be done by parallelizing some other parts of the code but it did what I needed so I didnā€™t sweat it for simplicityā€™s sake. (Especially the tightlyPack_ function should be able to be parallelized fairly easily and it did take a fair amount of time to finish processing on such a large dataset.)

Hereā€™s what I ended up with for the Processor class:

class ChunkTokenizeAndNumericalizeProcessor(PreProcessor):
    def __init__(self,
                 ds:ItemList=None,
                 tokenizer:Tokenizer=None,
                 chunksize:int=10000,
                 mark_fields:bool=False,
                 include_bos:bool=True,
                 include_eos:bool=False,
                 max_vocab:int=60000,
                 min_freq:int=3
            ):
        self.tokenizer,self.chunksize,self.mark_fields = ifnone(tokenizer, Tokenizer()),chunksize,mark_fields
        self.include_bos, self.include_eos = include_bos, include_eos
        
        self.max_vocab,self.min_freq = max_vocab,min_freq

        self.stoi = {}
        self.itos = []
        self.counts = []
        
        self.vocab = None
        self.packed = False
        
        self.addSpecial()
    def addSpecial(self):
        for o in defaults.text_spec_tok: self.addToken(o, sys.maxsize)
    def addToken(self, t, startingCount=0):
        if(self.packed): return
        
        self.stoi[t] = len(self.itos)
        self.itos.append(t)
        self.counts.append(startingCount)
    def process_one(self, item):
        raise Exception('not implemented')
    def numericalize(self, tokens):
        return np.array([self.numericalize_one(token) for token in tokens], dtype=np.int32)
    def numericalize_one(self, token):
        if not token in self.stoi:
            if(self.packed):
                return 0
            else:
                self.addToken(token)
        
        n = self.stoi[token]
        self.counts[n] = self.counts[n] + 1
        return n
    def expunge(self, size):
        toRemove = len(self.stoi) - size
        if(toRemove <= 0): return
        
        for x in np.argsort(self.counts, kind='mergesort')[:toRemove]: # use mergesort so oldest ones are evicted first
            del self.stoi[self.itos[x]]
            self.itos[x] = None
            self.counts[x] = sys.maxsize # so that it doesnt try to remove this again the next time
    def expungeFreq(self):
        for x in np.argsort(self.counts, kind='mergesort'): # use mergesort so oldest ones are evicted first
            if(self.counts[x] >= self.min_freq): break
            
            if(x > len(self.itos)): print('larger', x, len(self.itos))
            del self.stoi[self.itos[x]]
            self.itos[x] = None
            self.counts[x] = sys.maxsize # so that it doesnt try to remove this again the next time
    def tightlyPack_(self, items):
        tight = []
        remapped = np.zeros(len(self.itos)) # default to remove by setting to UNK (0) if token removed
        i = 0
        for idx, s in enumerate(self.itos):
            if(not s == None):
                tight.append(s)
                remapped[idx] = i
                self.stoi[s] = i
                i = i+1
        
        for tokens in items:
            for idx, token in enumerate(tokens):
                tokens[idx] = remapped[token]
        
        self.itos = tight
        self.packed = True
    def process(self, ds):
        ds.items = _join_texts(ds.items, self.mark_fields, self.include_bos, self.include_eos)
        numeric_tokens = []
        for i in progress_bar(range(0,len(ds),self.chunksize), leave=False):
            print(len(self.stoi))
            tokens = self.tokenizer.process_all(ds.items[i:i+self.chunksize])
            for item in tokens:
                numeric_tokens.append(self.numericalize(item))
            
            self.expunge(self.max_vocab*5)
        
        if(not self.packed): # dont need to do this the second time around because won't grow during validation set
            self.expunge(self.max_vocab)
            self.expungeFreq()
            self.tightlyPack_(numeric_tokens)
        
        if self.vocab == None: self.vocab = Vocab(self.itos)
        
        ds.items = numeric_tokens
        ds.vocab = self.vocab

And I used it like this:

q = [ChunkTokenizeAndNumericalizeProcessor(tokenizer=Tokenizer(tok_func=SpacyTokenizer, n_cpus=96),
                                           chunksize=350000,
                                           max_vocab=100000,
                                           min_freq=2,
                                           mark_fields=False,
                                           include_bos=True,
                                           include_eos=False
     )]

tl = TextList.from_df(df, path=path, processor=q).split_by_rand_pct(0.1).label_for_lm()
data_lm = tl.databunch(bs=80)
data_lm.show_batch()

Onwards to training! :crossed_fingers:

2 Likes

Hi Brad,

Beautiful piece of work! So many questions - itā€™s hard to get my head past the idea that Tokenization shouldnā€™t be so slow and so memory-hungry, But with Spacy that seems to be the case. Iā€™m trying to tokenize a 9 Gigabyte dataset with 1 core and 64GiB RAM, and it still seems to be eating memory as bad as when I had it using multiple cores (tried 2, 4 and 6 cores). 4 and 6 crash with out of memory.

Dumb questions first:

  • are you completely rewriting the tokenizer or are there some places where you are calling out to spacy?

  • where do you define ā€œSpacyWhitespaceTokenizerā€?

  • 345 GB of memory still sounds like a lot, and Iā€™m not authorized to move my data to AWS. You said earlier

ā€œI did end up training a language model with a ~4GB subsample and it works great. But Iā€™m dreaming of how good it would be if it was trained on the other 60GB of data as well.ā€

I have similar dreams (9GB corpus today, 100GB someday). But I shouldnā€™t be storing any version of that corpus in RAM; it seems unnecessary. At most, the vocab should be in RAM, eg, some python dict. Even if our candidate vocab plus frequency counts gets up to several million strings, I just donā€™t see how that translates to 345 GB of RAM. Where was it that you and/or Spacy ended up needing so much memory?

Thanks for the generous contribution to our dreams! - Dana

1 Like

are you completely rewriting the tokenizer or are there some places where you are calling out to spacy?

Iā€™m still calling out to spacy; this is the line that does that (because tokenizer is a SpacyTokenizer):
tokens = self.tokenizer.process_all(ds.items[i:i+self.chunksize])

where do you define ā€œSpacyWhitespaceTokenizerā€?

Whoops, try SpacyTokenizer; SpacyWhitespaceTokenizer is a subclass of SpacyTokenizer that I customized a little bit because there were certain pieces of domain-specific whitespace I wanted to keep.

Where was it that you and/or Spacy ended up needing so much memory?

I think what takes up all the memory is that in order to parallelize between processes, python pickles the data itā€™s passing around so it multiplies your memory usage by the number of cores youā€™re using. Also the default stores a huge array of all the tokens in string format until itā€™s done processing the whole dataset; mine above throws those out once it numericalizes them.

Thereā€™s a tradeoff between number of cores (n_cpus), chunksize, speed, and memory. You can play with those parameters to see if you can find a combination that works on your machine and is fast enough for your needs.

My vocab only ended up taking ~5MB in the end. Iā€™m not sure yet how much memory will be used during training but I donā€™t expect it to be anywhere near that 345GB peak reached during tokenization.

2 Likes

Hi folks, slightly off-topic.

Where can I find the documentation for proc_call_mp and partition_by_cores in

tok = Tokenizer().proc_all_mp(partition_by_cores(texts))

Tokenizer() I found in the fastai.text library. Couldnā€™t figure out where the other two functions are.

Looks like proc_all_mp is in the old fastai v0.7

partition_by_cores is in here (in addition to the old code): https://github.com/fastai/fastai/blob/db1054441537d3fd948fc0e1f85b3c2d8ffec4f2/fastai/core.py

Whereā€™d you see that used? You may be looking at the wrong thing?

Ah. Thanks.

lesson 10, 2018
This code - https://github.com/fastai/fastai/blob/master/courses/dl2/imdb.ipynb

1 Like

Yep, last yearā€™s dl2 uses fastai v0.7; the above code is all for v1.

1 Like