Improving parallel processing for non image-processing task

After watching the Lesson 10 video this week, I wanted to apply the parallel processing principles explained by Jeremy to a task I need to do for the State Farm competition from Part 1.

This task is about finding the 10 nearest neighbors of each State Farm test set image, it is based on the solution of the winner described in this conversation : https://www.kaggle.com/c/state-farm-distracted-driver-detection/discussion/22906 (sections K_Nearest_Neighbor Average and Segment average). The computation of nearest neighbors is based on the output of the last maxpooling layer of VGG. We compute nearest neighbors based on the euclidian distance between image features of shape 512,7,7.

The State Farm test set has around 79k photos. Serially computing the 10 nearest neighbors for each photo takes about 70 hours with my AWS p2 instance (I can’t build my own box because I am travelling). I wanted to use parallel processing to speed up the computation.

Playing with ThreadPoolExecutor and ProcessPoolExecutor, it turns out that ProcessPoolExecutor is faster in this case. I experimented with 2, 4 and 16 processes and 2 seems to be optimal, although the p2.xlarge instance is supposed to have 4 virtual cores according to the AWS documentation (https://aws.amazon.com/ec2/instance-types/p2/). Using 4 processes did not speed up the computation compared to the serial implementation (70 hours for both), using 2 processes brought down the computation to 50 hours, which is still a long time. I wanted to ask if others had suggestions on how to improve the parallel processing for this task.

I think that a big difference here compared to Jeremy’s use case is that we are not dealing with image processing here, therefore we can’t leverage the parallelism of simd (nor the vectorization improvements?).

In my implementation, I am constructing a bcolz array of shape 79k,10,2. For each image, I store the 10 nearest neighbors as tuples (neighbor_index, distance). Here is my implementation:

def nearest_neighbors(index, nbr_neighbors=10):
    value = test_features[index]
    nearest_tups = []
    
    for i in range(n):
        dist = np.linalg.norm(value-test_features[i]) #euclidian distance
        if (len(nearest_tups) < nbr_neighbors or dist < nearest_tups[-1][1]) and i is not index:
            nearest_tups.append(np.array([i, dist]))
        if len(nearest_tups) > nbr_neighbors: nearest_tups.pop()
            
    return np.array(nearest_tups)

def process_chunk(start_idx):
    return [nearest_neighbors(i) for i in range(start_idx, min(n, start_idx + step//threads))]

test_features = load_array(base_dir + 'models/test_convlayer_features.bc')
n=len(test_features)
arr = bcolz.carray(np.empty((0, 10, 2), 'float32'), expectedlen=n, mode='w', rootdir=base_dir + 'models/10_neighbors.bc')

# Serial implementation
# for i in range(n): arr.append(nearest_neighbors(i)) 

# Parallel implementation
step=500 # multiple of nbr of threads
threads=2
pool = ProcessPoolExecutor(threads)
iters = ceil(n/step)
for i in tqdm(range(iters)):
    results = pool.map(process_chunk, [i*step + j*(step//threads) for j in range(threads)])
    for r in results:
        if len(r): arr.append(r)   
    arr.flush()

Your code is compute bound. Using ThreadPoolExecutor is unlikely to help. The reason is that python has a global interpreter lock preventing to run threads concurrently. This class is only useful for IO bound tasks.

I would also have picked 4 processes. Not sure why 2 seems to work better. To speed up your code, you can switch to Pytorch or Tensorflow and do pieces of the compute on the GPU. There may also be opportunities to vectorize the operations in nearest_neighbors. I feel like the for loop could be replaced by some vectorized code.

1 Like

After watching lesson 10 (and running my box for more than 2 days), I discovered the sklearn.neighbors.NearestNeighbors library method. Using it reduced the computation time from around 50 hours to 36 minutes…

Interestingly, using multi-processing by setting the n_jobs param slowed down the computation. Here is a summary of iteration times for finding the 10 nearest neighbors of 80 elements with n_jobs = -1, 1, 2, 3, 4, 10, 16:

We see that using a single job is much faster than using multiple jobs.

Here is the code:

test_features = load_array(base_dir + 'models/test_convlayer_features.bc')
nsamples, nfilters, nx, ny = test_features.shape
test_features = test_features.reshape((nsamples,nfilters*nx*ny))
n=len(test_features)
nb_neighbors = 10

step = 40000
result = []
for i in tqdm(range(0, n, step)):
    start_idx, end_idx = i, min(n, i+step)
    nn = NearestNeighbors(nb_neighbors + 1, metric='cosine', algorithm='brute').fit(test_features)
    dists, idxs = nn.kneighbors([test_features[j] for j in range(start_idx, end_idx)])
    result += [np.vstack((idxs[i],dists[i])).T[1:] for i in range(end_idx-start_idx)]
    
save_array(base_dir + 'models/10_neighbors.bc', numpy.array(result))

Hey, awesome to hear you found a solution! I ran into many of the same problems/questions you did after watching lesson 10. After banging my head against the wall for a while, I figured out a few things I was missing and summarized what I learned here. Maybe it can help others in the future.

1 Like

Awesome and useful post!

1 Like

Thanks! Your question inspired me to summarize it in a blog post last night. It’s sort of cheating I know, but I cleaned it up a bit :wink:

1 Like