After watching the Lesson 10 video this week, I wanted to apply the parallel processing principles explained by Jeremy to a task I need to do for the State Farm competition from Part 1.
This task is about finding the 10 nearest neighbors of each State Farm test set image, it is based on the solution of the winner described in this conversation : https://www.kaggle.com/c/state-farm-distracted-driver-detection/discussion/22906 (sections K_Nearest_Neighbor Average and Segment average). The computation of nearest neighbors is based on the output of the last maxpooling layer of VGG. We compute nearest neighbors based on the euclidian distance between image features of shape 512,7,7.
The State Farm test set has around 79k photos. Serially computing the 10 nearest neighbors for each photo takes about 70 hours with my AWS p2 instance (I can’t build my own box because I am travelling). I wanted to use parallel processing to speed up the computation.
Playing with ThreadPoolExecutor and ProcessPoolExecutor, it turns out that ProcessPoolExecutor is faster in this case. I experimented with 2, 4 and 16 processes and 2 seems to be optimal, although the p2.xlarge instance is supposed to have 4 virtual cores according to the AWS documentation (https://aws.amazon.com/ec2/instance-types/p2/). Using 4 processes did not speed up the computation compared to the serial implementation (70 hours for both), using 2 processes brought down the computation to 50 hours, which is still a long time. I wanted to ask if others had suggestions on how to improve the parallel processing for this task.
I think that a big difference here compared to Jeremy’s use case is that we are not dealing with image processing here, therefore we can’t leverage the parallelism of simd (nor the vectorization improvements?).
In my implementation, I am constructing a bcolz array of shape 79k,10,2. For each image, I store the 10 nearest neighbors as tuples (neighbor_index, distance). Here is my implementation:
def nearest_neighbors(index, nbr_neighbors=10):
value = test_features[index]
nearest_tups = []
for i in range(n):
dist = np.linalg.norm(value-test_features[i]) #euclidian distance
if (len(nearest_tups) < nbr_neighbors or dist < nearest_tups[-1][1]) and i is not index:
nearest_tups.append(np.array([i, dist]))
if len(nearest_tups) > nbr_neighbors: nearest_tups.pop()
return np.array(nearest_tups)
def process_chunk(start_idx):
return [nearest_neighbors(i) for i in range(start_idx, min(n, start_idx + step//threads))]
test_features = load_array(base_dir + 'models/test_convlayer_features.bc')
n=len(test_features)
arr = bcolz.carray(np.empty((0, 10, 2), 'float32'), expectedlen=n, mode='w', rootdir=base_dir + 'models/10_neighbors.bc')
# Serial implementation
# for i in range(n): arr.append(nearest_neighbors(i))
# Parallel implementation
step=500 # multiple of nbr of threads
threads=2
pool = ProcessPoolExecutor(threads)
iters = ceil(n/step)
for i in tqdm(range(iters)):
results = pool.map(process_chunk, [i*step + j*(step//threads) for j in range(threads)])
for r in results:
if len(r): arr.append(r)
arr.flush()