Issue with making Dataloaders from image format that uses thread pool

Hi there,

I’ve been debugging this for a day or so, so apologies if I leave out some information because my head is too far in the weeds and please ask if I missed out any information. It’s also my first post here :slight_smile:

First, let me say that I’m using ubuntu linux and pytorch 1.7.0, fastai 2.1.5, fastcore 1.3.2, openimageio 2.1.17 and OpenEXR 2.5.3 (these last two parts will become relevant later).

I’m writing some code which will train a model that takes as input two images and produces multiple image outputs (let’s say 2 for the moment) and the images that are being read come from the EXR format. I’m finding that using any multiprocessing workers to read this data in the DataLoaders that my DataBlock makes will just lock up the process until I kill it (or provide a timeout kwarg to the dataloaders() function). When I put in some print statements I notice that my code is stopping when reading the EXR files and when I strace the child processes, I notice that the worker processes are both waiting on a lock in the OpenEXR library.

#1  do_futex_wait (sem=sem@entry=0x5582417e03d0, abstime=0x0, clockid=0) at sem_waitcommon.c:112
#2  0x00007f414c6be4e8 in __new_sem_wait_slow (sem=0x5582417e03d0, abstime=0x0, clockid=0) at sem_waitcommon.c:184
#3  0x00007f40c11b8884 in IlmThread_2_5::Semaphore::wait() () from ../anaconda3/envs/xx/lib/python3.8/site-packages/../.././libIlmThread.so.25
#4  0x00007f40c2e68b38 in Imf_2_5::ScanLineInputFile::readPixels(int, int) () from ../anaconda3/envs/xx/lib/python3.8/site-packages/../../libIlmImf.so.25
#5  0x00007f40c2e22d57 in Imf_2_5::InputFile::readPixels(int, int) () from ../anaconda3/envs/xx/lib/python3.8/site-packages/../../libIlmImf.so.25
....

My understanding is that OpenEXR uses a thread pool to read image data and it seems that the parent process is holding onto the threadpool which is shared with the children processes (since they are forked after the parent process has read some EXRs to begin with when the DataBlock.dataloaders function is called to make the DataLoaders). Is there any way I can disable DataBlock.dataloaders() from building that initial batch that it seems to be checking? Or is there any way I could start the worker threads earlier to pass onto the DataLoaders after they’re created? Are there any other strategies I could use to mitigate this problem?

I’ve written a little silly example test to just demonstrate the issue below. Sidenote: The ‘siamese’-style DataBlock is kind of inconsequential to my problem but I kept it in because I was originally unsure it wasn’t part of the problem and, perhaps, someone might have advice that I’m not doing that correctly either…


if __name__ == "__main__":
    # import multiprocessing
    # from torch import multiprocessing

    import OpenImageIO as oiio

    from pathlib import Path
    from functools import partial
    from random import choice

    from fastai.vision.all import *
    from fastai.data.all import *

    import fastcore
    import torch

    dev = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

    resolution = 32
    bs = 4

    get_image_files = partial(get_files, extensions=[".exr", ".jpg"])

    source1_path = Path.home() / "images" 
    source2_path = Path.home() / "images"

    def get_tuple_files(paths):

        source_path, target_path = paths
        source_files = get_image_files(source_path)
        target_files = get_image_files(target_path)
        return [
            [random.choice(source_files), random.choice(target_files)]
            for x in range(max(len(source_files), len(target_files)))
        ]

    class MyImageTuple(fastcore.utils.fastuple):
        @classmethod
        def create(cls, fns):
            exr_data = list()
            print("Reading image data")
            for f in fns:
                i = oiio.ImageInput.open(str(f))
                exr_data.append((255 * i.read_image()[:, :, :3]).astype(np.uint8))
                i.close()
            print("Image shape %s" % str(exr_data[0].shape))
            return cls(tuple(PILImage.create(e) for e in exr_data))

        def show(self, ctx=None, **kwargs):
            t1, t2 = self
            if not isinstance(t1, torch.Tensor) or not isinstance(t2, torch.Tensor) or t1.shape != t2.shape:
                return ctx
            line = t1.new_zeros(t1.shape[0], t1.shape[1], 10)
            return show_image(torch.cat([t1, line, t2], dim=2), ctx=ctx, **kwargs)

    def MyImageTupleBlock():
        return TransformBlock(type_tfms=MyImageTuple.create, batch_tfms=IntToFloatTensor)

    datablock = DataBlock(
        get_items=get_tuple_files,
        blocks=[MyImageTupleBlock, MyImageTupleBlock],
        splitter=RandomSplitter(valid_pct=0.1),
        item_tfms=[Resize(resolution, method="Squish")],
    )
    print("Made data block")

    dls = datablock.dataloaders(
        [source1_path, source2_path], device=dev, bs=bs, pin_memory=False, timeout=5
    )
    print("Made data loader")

    class MyConcat(torch.nn.Module):
        def __init__(self, batchsize):
            super().__init__()
            self._batchsize = batchsize

        def forward(self, input):
            x = torch.cat((input[0], input[1]), 1).view(self._batchsize, -1)
            return x

    model = nn.Sequential(
        MyConcat(bs),
        nn.Linear((resolution) ** 2 * 3 * 2, (resolution) ** 2 * 3 * 2),
        nn.ReLU(),
    )

    model.to(dev)

    class LossFunction(torch.nn.Module):
        def forward(self, input, target):
            batchsize = input.size()[0]
            in_reshaped = torch.reshape(input, [batchsize * 2, 3, resolution, resolution])
            (a, b) = in_reshaped[:batchsize, :, :, :], in_reshaped[batchsize : batchsize * 2, :, :, :]

            (a_hat, b_hat) = target

            l = torch.mean((b - b_hat) ** 2 + (a - a_hat) ** 2)
            return l

    loss = LossFunction()

    learn = Learner(dls, model, loss_func=loss)

    lr = 5e-5

    learn.fit_one_cycle(2)

I don’t have detailed knowledge bu you may check the docs…

Hi so I’m not attempting to use distributed or parallel training. This is purely about the multiprocessing that the DataLoaders do when num_workers is set to >0. There’s a deadlock I get when the data being loaded by those DataLoaders utilizes a format library (EXR) that seemingly uses a thread pool which all the workers are sharing with the parent process.

From what I understand of python multiprocessing, because the parent process has already used this library to open some EXRs (something the DataBatch.dataloaders() function seems to do to check that the first batch won’t fail outright), then when the worker processes get forked at the point that the DataLoader gets used by the Learner to generate batches using multiprocessing, all the worker processes will all get the references to the same EXR thread pool that the parent started and will deadlock.

The solution I’m trying to pursue is to make sure each child process has its own thread pool after it forks and I feel like I could get there if I could either a.) prevent fastai’s DataBatch.dataloaders() from performing the initial check or b.) somehow start the worker threads before this check (or do the check in a new worker thread?) which I could hand to the Learner when it’s time to train.

I think I’ve figured out a high-level way around this problem without having to change anything deeper about how fastai tests the first batch when it makes the Dataloaders. If I create a new multiprocessing queue and get my DataBlock to make its DataLoaders in a forked process and return them back to the main process then I don’t have this multithreading deadlock problem when it comes time to get batches during training (since the main thread never used any functionality from the image-reading library and the threadpool won’t be shared amongst the workers).

Essentially something like this would replace my datablock.dataloaders() call from the previous code:

    def get_dataloaders(queue, dblock, *args, **kwargs):
        return queue.put(dblock.dataloaders(*args, **kwargs))

    q = multiprocessing.Queue()
    p = multiprocessing.Process(
        target=get_dataloaders,
        args=(q, datablock, [source1_path, source2_path]),
        kwargs={"device": dev, "bs": bs},
    )
    p.start()
    dls = q.get()
    p.join()
    q.close()

8 lines of code instead of 1 but it works!