Python Parallel Processing - Tips and Applications

After playing around with Jeremy’s fast imagenet process notebook, I wanted to start a thread for all of us to discuss parallel processing in python. Specifically, the benefits/drawbacks, applications for deep learning, and to share anecdotal performance benchmarks and ideas for how we can improve our model training times.

After the lecture, I read up on Python’s new concurrent.futures library and wrote some test code to see if I could replicate the benefits. To start, here are some very rough notes on the differences. I also put together a small jupyter notebook to verify these ideas in practice.

What is parallel processing?
Basically doing two things at the same time, by either running code simultaneously on different CPUs, or running code on the same CPU and achieving speedups by taking advantage of “wasted” CPU cycles while your program is waiting for external resources–IO, API calls.

Serial Programming

Parallel Programming

In this example we use two threads to achieve speedups over normal serial programming.

Processes vs Threads
A process is an instance of program (e.g. Jupyter notebook, Google Chrome, Python interpreter). Processes spawn threads (sub-processes) to handle subtasks like reading keystrokes, loading HTML pages, saving files. Threads live inside processes and share the same memory space (they can read and write to the same variables).

Ex: Microsoft Word
When you open Word, you create a process (an instance of the program). When you start typing, the process spawns a number of threads: one to read keystrokes, another to display text on the screen, a thread to autosave your file, and yet another to highlight spelling mistakes. By spawning multiple threads, Microsoft takes advantage of “wasted CPU time” (waiting for your keystrokes or waiting for a file to save) to provide a smoother user interface and make you more productive.

Process

  • Created by the operating system to run applications and programs
  • Processes can have multiple threads
  • Processes have more overhead than threads as opening/closing processes takes more time
  • Sharing information between processes is slower than sharing between threads as processes do not share memory space. In python they share information by pickling data structures like arrays which requires IO time.
  • Two processes can execute code simultaneously in the same python program

Thread

  • Threads are like mini-processes
  • They exist in shared memory space and can easily access the same variables
  • Two threads cannot execute code simultaneously in the same python program (with exceptions)

Python’s GIL problem

  • CPython and GIL prevent two threads from executing simultaneously in the same python program
  • Other languages do not have this problem and are able to run multiple threads simultaneously on multiple cores/CPUs
  • Libraries like Numpy work around this limitation by running external code in C

CPU vs Core

  • The difference is slightly confusing to me (please correct me), but a CPU, also known as a “processor” manages the fundamental computation work that allow computers to run programs.
  • A CPU can have multiple cores, which allow the CPU to execute code simultaneously.
  • With a single core, there is no speedup for CPU-intensive tasks (loops, arithmetic), but the ability to launch multiple threads/processes allows applications to “seem to do things simultaneously” which is important for a buttery user experience and does achieve speedups
  • With only 1 CPU, only one instruction can be run at a time, so the OS switches back and forth between threads/processes executing each a little at a time.

When to use threads vs processes?

  • Multiprocessing - can speed up Python operations that are CPU intensive b/c they benefit from multiple cores/CPUs and avoid the GIL problem
  • Multithreading - no benefit in python for CPU intensive tasks b/c of GIL problem (this problem is unique to CPython)
  • Multithreading is often better than multiprocessing at IO or other tasks that rely on external systems because the threads can combine their work more efficiently (they exist in the same memory space). Multiprocessing needs to pickle the results and combine them at the end of the work

Here’s a good SO post on the differences.

Numpy Operations

  • For certain operations, (dot product), Numpy works around python’s GIL and can execute code in parallel on multiple CPUs.
  • Dot product uses parallel processing by default. You don’t need to write custom multitasking code
  • Other Numpy operations like multiplication and addition, do NOT run in parallel by default, however, unlike vanilla Python code, you CAN achieve speedups with threads/processes
  • Numpy works around the GIL because its python code makes calls to code in raw C

Context Switching and Overhead

  • For small operations (only a few loops), I didn’t see any benefit from multitasking
  • This is likely because of the overhead of launching and maintaining multiple threads/processes (OS context switching)

Some Example code

def multithreading(func, args, workers):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        res = executor.map(func, args)
    return list(res)
        
def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(max_workers=workers) as executor:
        res = executor.map(func, args)
    return list(res)

API calls
I found threads work better for API calls and see speedups over serial processing and multiprocessing

def download(url):
    try:
        resp = urlopen(url)
    except Exception as e:
        print ('ERROR: %s' % e)

2 threads

4 threads

2 processes

4 processes

IO Heavy Task
I passed in a huge file to see how performance differed. Threads seemed to win here again, but multiprocessing also beat serial processing.

def io_heavy(text,base):
    start = time.time() - base
    f = open('output.txt', 'wt', encoding='utf-8')
    f.write(text)
    f.close()
    stop = time.time() - base
    return start,stop

Serial

%timeit -n 1 [io_heavy(TEXT,1) for i in range(N)]
>> 1 loop, best of 3: 1.37 s per loop

Multithreading 4 threads

Multiprocessing 4 processes

Numpy Addition
Multiple threads/processes helped here, but only to a point. If I added too many I started to see slow downs. Likely due to the overhead of launching processes/threads and context switching.

def addition(i, base):
    start = time.time() - base
    res = a + b
    stop = time.time() - base
    return start,stop

Serial

%timeit -n 1 [addition(i, time.time()) for i in range(N)]
>>1 loop, best of 3: 14.9 s per loop

2 Threads

4 Threads

Dot Product
As expected, I saw no benefit from adding threads or processes to this code. Parallel processing works out-of-the-box.

def dot_product(i, base):
    start = time.time() - base
    res = np.dot(a,b)
    stop = time.time() - base
    return start,stop

Serial: 2.8 seconds
2 threads: 3.4 seconds
2 processes: 3.3 seconds

CPU Intensive
Multiprocessing won the day here as expected–multiple processes avoid python’s GIL and can execute simultaneously on different CPUs/cores.

def cpu_heavy(n,base):
    start = time.time() - base
    count = 0
    for i in range(n):
        count += i
    stop = time.time() - base
    return start,stop

Serial: 4.2 seconds
4 threads: 6.5 seconds
4 processes: 1.9 seconds

Here’s the notebook you can use to play around with this code.

Resources
Here are all the great articles I benefited from as I researched this topic. I’m only rehashing what’s been explained by my betters.

40 Likes

This is fantastic! I hope you’ll publish that when you feel ready! :slight_smile: I read through it, and it all looks right to me.

4 Likes

Another great one here @brendan! Thanks for writing this up.

When Jeremy mentioned in the class that python was not thread-safe, I was surprised and curious if python is, in fact, a good choice for Deep learning which is compute-intensive in many cases and can benefit from leveraging multi-cores. And here is my perspective:

Some building blocks before we dive into python situation:
1. Cores: Most modern CPUs have multiple cores. Multi-core architecture allows for MIMD (multiple instructions, multiple data). So each process or thread can execute different instructions parallelly. Cores share memory and in some cases L2 cache too. Threading has been around much before we had multi-core architectures. Threads traditionally ran on the same core but after multi-cores, multi-threading can happen on multiple cores as well. In most cases, OS handles this thread-core allocation but there are cases where language or application controls it.
2.GPU also has multiple cores, but it follows SIMD (Single Instruction Multiple Data). So at a given time, all 1000s of cores are performing the same operation but on different data. This is not as general purpose as CPU multi-cores but quite powerful for some numerical computations which are embarrassingly parallel.
3. Thread versus process: As you mentioned, threads share memory whereas processes do not. Also, processes startup/teardown overhead is bigger.

So coming back to python situation and leveraging parallelism:
Python is not thread safe - in other words(over simplified version), if two threads modify an in-memory object, it can have unexpected results. A popular solution in python for avoiding concurrency problems in multi-threaded applications is GIL. It is essentially a mutex which serializes all execution of interpreted code within a process. So it is not possible to run interpreted code truly parallelly using GIL within a single process. And default python interpreter CPython uses GIL. So, this means:

  • If your code block is I/O bound, using multithreading with GIL would be helpful. Other threads can still perform some operations while a thread is waiting on IO resources.
  • If your code block is CPU bound, there is little value doing multi-threading. As all of them would execute in sequence anyways, moreover there might be additional overhead which can degrade the performance. So, instead use multiple processes. Although multiple processes do not share data and have a bigger overhead. We also need to deal with interprocess communication.
  • But there seem to be some caveats here, which I still need to fully understand. From CPython GIL page, “Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPynumber crunching, happen outside the GIL”. I get the I/O part, but for image processing and numpy part - seems like if a process is not executing native python interpreted code, GIL will be released? Does this mean - any code coming from outside packages execute outside of GIL?

Here is an excellent resource you might enjoy reading: http://python-notes.curiousefficiency.org/en/latest/python3/multicore_python.html

6 Likes

I wrote some code to test what I’ve learned about parallel processing but was puzzled by the results.
Here’s my code (the check_paths function is borrowed from here):

import numpy as np
from matplotlib.path import Path
import time
import sys
a = list(zip(np.random.rand(5000,2), np.random.rand(5000,2)))
a = [Path(x) for x in a]
b = list(zip(np.random.rand(300,2), np.random.rand(300,2)))
def check_paths(path):
    '''A time consuming task.'''
    res='no cross'
    for other_path in a:
        if other_path.contains_path(path)==1:
            res= 'cross'
            break
    return res
# single thread
now = time.time()
res = [check_paths(Path(x)) for x in b]
print("Finished in", time.time()-now , "sec")
# 2 threads
from concurrent.futures import ThreadPoolExecutor
now = time.time()
with ThreadPoolExecutor(max_workers=2) as executor:
    result = executor.map(lambda x: check_paths(Path(x)), b)
print("Finished in", time.time()-now , "sec")
# 2 threads, another version
from joblib import Parallel, delayed
now = time.time()
res = Parallel(n_jobs=2) (delayed(check_paths) (Path(x)) for x in b)
print("Finished in", time.time()-now , "sec")

I ran the code on MacBook Pro, 2.7 GHz Intel Core i5 with 4 cores. The results are:

Finished in 4.470421075820923 sec
Finished in 4.649466037750244 sec
Finished in 2.7911150455474854 sec

The first parallel version (which is the one used in the imagenet_process notebook) is even slightly slower than the sequential version. The second parallel version seems to work.

Can somebody duplicate my result? Is it inappropriate to use the concurrent.futures package or am I using the wrong implementation?

I’ll be very grateful to hear your advice. ^ ^

It looks like the JobLib library uses processes default, not threads.
https://pythonhosted.org/joblib/parallel.html#using-the-threading-backend

Given your task is CPU intensive, extra threads won’t help due to Python’s GIL.

You could test this theory by switching from Multithreading to Multiprocessing in concurrent.futures.

def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(max_workers=workers) as executor:
        res = executor.map(func, args)
    return list(res)

Or you can pass in the backend="threading" argument to the Parallel object:

Parallel(n_jobs=2, backend="threading")

Curious to see what you find out!

1 Like

You’re right. I test this theory and the results are as expected:

Sequential: 4.595440864562988 sec
ThreadPoolExecutor: 4.745290994644165 sec
ProcessPoolExecutor: 2.4498229026794434 sec
joblib with multiprocessing backend: 2.507412910461426 sec
joblib with threading backend: 4.65504002571106 sec

I didn’t fully understand the difference between process and thread in python and now I finally figure it out. Thanks a lot for your help! Hope it will help others, too.

@sravya8 @brendan

But there seem to be some caveats here, which I still need to fully understand. From CPython GIL page, “Note that potentially blocking or long-running operations, such as I/O, image processing, and NumPynumber crunching, happen outside the GIL”. I get the I/O part, but for image processing and numpy part - seems like if a process is not executing native python interpreted code, GIL will be released? Does this mean - any code coming from outside packages execute outside of GIL?

From my understanding it has to do with whether or not code is executed natively (compiled code) or interpreted (pure python).
The numerical algorithms in Numpy are almost always offloaded to C subroutines, that can be executed outside of the GIL (because c can be made threadsafe?). So while numpy calls are blocking, it doesn’t prohibit multiprocessing of other processes while time is spent in c subroutines.

This means that if pure python code is only used for carrying around pointers but offloading the real cpu intensive work to compiled code, you can actually do quite a lot of multiprocessing.

Does anyone have some tutorial or pratical example on talking about design pattern of concurrency programming?

Check this one…multiprocessing vs threading

Hola!

Thank you very much for making such a good starting guide to parallelism in Python. This is really interesting to me as I have to annotate 2.5 million images and using multiple cores would be ideal.

I’ve been trying to get the following code to run in parallel but I’m very confused about the results. It seems to me that I’m not able to bypass the Python GIL completely and everything keeps running on the same CPU core, although a see a slight improvement. This might be because what I’m trying to do is not really CPU intensive. Does that make sense?

My workflow is to take the path to a local folder with images as input, create an image_list and run the analyze_image function to each image. The order is not important. I then append all the results and output a csv. Analyse_image will run a process where it will query several HTTP endpoints using query.

My system specs are:

  • Intel® Xeon® Silver 4116 CPU @ 2.10GHz, 2095 MHz with 6 cores
  • 64 GB RAM

Here are my tests so far:

- single threaded / total processing time was: 14.73 seconds with 14 analysed images
- joblib threads / total processing time was: 10.69 seconds with 14 analysed images
- joblib processes / total processing time was: 10.34 seconds with 14 analysed images
- map_async / total processing time was: 8.46 seconds with 14 analysed images
- map / total processing time was: 8.43 seconds with 14 analysed images
- dask.delayed / total processing time was: 8.45 seconds with 14 analysed images
- multiprocessing.Process / total processing time was: 8.28 seconds with 14 analysed images
- ThreadPoolExecutor / total processing time was: 10.49 seconds with 14 analysed images
- imap / total processing time was: 8.15 seconds with 14 analysed images
- Pipe / total processing time was: 8.07 seconds with 14 analysed images

Example using Pipe :

def f(conn, path):
    conn.send(analyse_image(path))
    conn.close()

def run_report_using_Pipe(image_list):
    stime = time.time()  
    parent_conn, child_conn = mp.Pipe()
    jobs = []
    for path in image_list:
        p = mp.Process(target=f, args=(child_conn,path,))
        jobs.append(p)
        p.start()
    res = [parent_conn.recv() for a in jobs]
    return print("total processing time was: {:.2f} seconds with {} analysed images".format(time.time()-stime, len(image_list)))

Example using Joblib:

def run_report_using_joblib_processes(image_list):
    stime = time.time()
    with parallel_backend('multiprocessing', n_jobs=2):
        res = Parallel()(delayed(analyse_image)(image) for image in image_list)
    return print("total processing time was: {:.2f} seconds with {} analysed images".format(time.time()-stime, len(image_list)))

Example using map:

def run_report_using_map(image_list):
    stime = time.time()
    procs = mp.cpu_count()
    pool = mp.Pool(procs)
    results = pool.map(analyse_image, image_list)
    pool.close()
    pool.join()
    return print("total processing time was: {:.2f} seconds with {} analysed images".format(time.time()-stime, len(image_list)))

And here is a simplified version of my analyse_image(path_to_image) function:

def analyse_image(path_to_image):
    im_str = base64.b64encode(open(path_to_image, "rb").read())
    
    model_01 = query(im_str, url_01)['result']
    model_02 = query(im_str, url_02)['result']
    
    if model_01 and model_02:
        model_03 = query(im_str, url_03)['result']
        model_04 = query(im_str, url_04, model_03)['result']
        output_model_04 = []
        
        for each in model_04['sub-img']:
            try:
                each_im_str = each['img'].encode('ascii')
                model_05 = query(each_im_str, url_05)['result']
                output_model_04.append(model_05)
            except Exception as e:
                print(e)
                continue
    else:
        output_model_04 = []
    return output_model_04

And my query function

def query(im_str, url, model_03=None):
    payload = [{'im_str': im_str.decode('ascii'), 'model_03': model_03}]
    headers = {'content-type': 'application/json'}
    r = requests.post(url, data=json.dumps(payload), headers=headers)
    
    return r.json()

Looking forward to reading your ideas and comments. Thank you very much!
Nicolas

Hi i think multiprocessing might work in your case because you work filebased thus avoiding the expensive serialization between processes.

Multithreading would not help because reading from the local disk is relatively fast so i guess you are cpu bound.

You can see an example of multiprocessing in the class Tokenizer in the fastai.text module

1 Like

Im late to the party. Thumb rule is,

If you are doing input output (io) job we go for threads.
IO jobs are like moving file in a disk, uploading downloading file over a network. for these jobs there are dedicated devices in a chip.

If you are doing cpu job we go for process.
CPU jobs are like transforming the images, loading files in df, running queries in db.

Have been trying to multi-process model training on a 32 processor CPU machine. Can anybody share experience / best practices for doing the same.

thanks
Hari

It’s a long shot, but is it possible to do use fastai2’s parallel (or something similar) with CUDA? My function reads a file, processes it on the GPU, and writes out a new file.

The error is
RuntimeError: Cannot re-initialize CUDA in forked subprocess. To use CUDA with multiprocessing, you must use the 'spawn' start method

The code that triggers is:
stats = parallel(processFile, paths, n_workers=4)
and the error starts with
return waveform[0].to(device),sample_rate

Thanks for letting me know if easily possible. It would be great to speed things up, but if it would require a day of of studying and hacking I can just let things run overnight. :slightly_smiling_face:

1 Like

@Pomo did you find a workaround? I came across the same issue

Sorry, no answers. :frowning_face: I just use one process and dream while it works.

2 Likes