Most effective ways to merge "big data" on a single machine

Hello,
I’m currently participating in the grocery forecasting Kaggle competition and one of the issues I stumbled upon is “how to effectively merge very big datasets?”.

This questions apply not only to this competition but to all kind of problems where you face “big data” challenges.

Some context:

With 5 csv files (train.csv, test.csv, transactions.csv, stores.csv, oil.csv, items.csv, holidays.csv) I want to obtain 2 resulting files (train.csv and test.csv) which contains all the metadata from the other tables. Here is an overview of the size of each table:

# File sizes
train.csv            4997.45MB
test.csv             126.16MB
transactions.csv     1.55MB
stores.csv           0.0MB
oil.csv              0.02MB
items.csv            0.1MB
holidays_events.csv  0.02MB

As you can see it will be very difficult to keep the whole train.csv file into memory while doing some merge operations on it.

1st attempt

So I started my journey by implementing a “home made” solution (big mistake) which was opening into pandas DataFrames all the files into memory except train.csv and merging them all together via the pandas.merge() function.
As for the train.csv I opened it with a chunksize size so basically I was firing up as many processes as CPU cores I have and each train.csv chunk would get merged with the other tables then saved as individual csv files. At the end of the process when all chunks are merged with the other tables I would join together each .csv files to get a resulting merged_train.csv file.
Although this process worked it was not optimal as it was creating a lot of .csv files:

  • Which were eating all my disk space
  • Which format is inefficient for I/O operations
  • Which final merge (to create merged_train.csv was very slow an single threaded)

2nd attempt

So at that point I told myself: “Why reinventing the wheel? Other people must have run into the same kind of issues.” and I discovered many tools. Here are my findings:

Data containers

  • HDF5: Looks very neat, it’s basically a filesystem within a file so you can do all kind of operations without loading all your datasets into memory
  • Bcolz: Seems to be faster than HDF5 and the golden standard for fast I/O operations. You can think of it as a “on-disk” numpy.
  • Feather: Is fairly new and very few tools are yet developed for it.
  • SQLite: A portable relational database with a very powerful query language (SQL) but is not generalizable to all kind of data.

High level data libraries

  • Blaze: A frontend for all kind of data source (SQL db, Nosql db, csv, numpy arrays etc etc…) where you have an “unified” API to handle any kind of data.
  • PyTables: Built on top of HDF5 to make the code more “python-ish” and object oriented
  • Dask: A “pandas-like” tool. Allow you to do a lot of operations you would do with pandas (not all of them tho) but in contrast it loads the dataframe in memory only when it needs to (for example if you want to visualize your data or export it). It’s very handy as with this solution I don’t have to load my train.csv in chunks.

And I’m sure there are many more but I tried to stick to them as they seems to be the most used ones.
So now back to my issue: “Effectively merging big tables”.
Today what I’m doing is opening my datasets with Dask, doing the merge operations and then I’ll try to save my merged_train to the bcolz format so I can open it back later very fast when I’ll have to fit the data into my models.
Thing is, Dask does not have a to_blosc/to_bcolz function to easily do that, only to_hdf5.

I saw @jeremy using this format a lot but I’m not sure how he goes from csv (or multiple csv) to this format. I mean, you can surely open the csv files as text files and fill in the Bcolz ctable but isn’t there a more effective way? Btw I believe you are loosing your columns names by doing this so it’s harder to keep track of your data transformations.

Have you any tips or recommendations regarding this issue? Maybe I’m all wrong and I should instead use HDF5 and merge my tables with something like h5py? I would also like to find a pattern to replicate when I run into the same issue for any type of “big data” datasets.
Thanks :slight_smile:

8 Likes

I use pandas and merge. The entire groceries dataset with all tables takes <4GB RAM. Have a look at the kernels for that competition to see how to manage memory for it effectively. I then save them to feather files. Pandas is very efficient indeed at doing merges. If your dataset doesn’t fit in RAM, the best approach is to fire up a big AWS instance, do the merge, save to feather, and then copy that back, I think :slight_smile:

12 Likes

Thanks @jeremy that’s indeed much easier to do it that way aha. But lets consider my train.csv is now 120gb instead of 5. Then using pandas is no more possible. I would like to find a solution which works for every cases, not just this one in particular.
But I wanted to ask: I saw you were using both bcolz and feather, in which situation do you prefer one over the other?
Thanks :slight_smile:

I use feather for data frames, and bcolz for arrays.

120GB still fits in a big AWS instance. You need a pretty big dataset to have to go distributed. If you need to go there, then Dask is a good option - but I’ve not ever seen that situation required in practice for building models (I’ve always found just sampling and/or aggregating works fine).

3 Likes

Spark may be the other way to do it. You get a spark cluster in aws to do data manipulation. I used to aggregate when I worked with way to much data. Most machine learning methods would take weights.

feature1, feature2,feature3,y,weight

3 Likes

One other option could be dump the csv into a Table like SQL or Redshift or even Google Big Query and Pandas can read from all of them. So you can do Sampling or process them in Chunks. Most DL methods are using mini-batch, so the Big Data is not required to be on the machine, just needs to be accessible.

7 Likes

As an option you can upload csv’s to S3 and use Amazon Athena and/or Amazon Glue to merge and transform datasets with SQL queries. It is essentially managed presto/spark serverless engines under the hood.
As benefits of those - no need to spin any clusters and Glue would learn data schemas automatically for you.
http://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html

2 Likes

This is the key insight :slight_smile:

I think this is the right place to post this question, since people are discussing storage formats above.

I have been trying to use feather with the grocery forecasting Kaggle competition and even after I optimize data types, saving and reading the dataframe to feather format is very very slow. It was taking 10-15 minutes for each operation on the joined dataframe.

Is there something I am missing? Everywhere I read says feather is supposed to be extremely fast and this is not that big of a dataset for in memory.

That’s not related to feather - feather format is simply saving a copy on disk.

Sorry, I meant the df.to_feather() and pd.read_feather() operations.are taking that long. It has been faster to read in the original csv files and go through my entire feature engineering process than just read in the already processed feather files that were saved from before.

Oh that’s odd. Maybe check how big those files are? It could just be a slow hard drive.

Admittedly I still need to dig into Feather and bcolz more, but for @mark-hoffmann 's case numpy could address some of these issues.

I make heavy use of numpy record arrays at work and am always surprised they aren’t more well known. You get named columns, binary data format, compression and memory mapping from numpy. numpy .savez and load for .npz is far faster than reading .csv for large data. If you ever find the need to accelerate your code, being a core numpy type, record arrays play very nicely with numba (where dataframes do not) and cython - this is a big reason I use them.

You can pass them to a dataframe constructor and get them back by df.to_records().

There are caveats as they are basically arrays of structs - they aren’t as nice to work with if you have a lot of text data (I use fixed width byte strings - you can convert back using byte_string.decode(…), but pandas handles as objects if I recall correctly). They also aren’t great for resizing -> I really just use them as an efficient data format.

Sorry - this doesn’t talk to the merge issue - curious what others find. Feather and bcolz are both columnar formats which should be much better in that case.

I believe feather should be quite a bit faster than numpy record arrays AFAIK.

Feather looks good - being able to target it in many languages is valuable - I’ll put it back in the test loop as we move away from numba.

As far as read, write and data size performance - I did some tests on a 1.7 GB data set. Timings and sizes are almost identical for numpy record arrays and feather. Standard ndarrays were 2x slower and wound up 20% larger on disk though. Of course this was just a small test (caveat,caveat…)

1 Like

Thanks for testing!

Why are you moving away from numba? What project is that for? (I’m a fan of numba, so curious to hear about folks finding it not working for them)

I’ll start by saying I really do like numba - it is awesome when its the right tool for the job. Numba is awesome for speeding up tight loops. I’ve written numba versions of iterative stats routines that provide 2 and 3 order of magnitude speedups over the rolling stats in pandas in some cases (note that the speed ups are due to the algo - same impl run around the same speed - its just so easy to do this with numba).

This stuff is all for trading simulators / back testers. The core of one of our back testers is optimized with numba. In its current state, numba is restrictive - mainly in the limited ability to use objects, strings and python libraries outside of numpy (hence record arrays being so useful to me).

You can use numba jit classes (but can not inherit) and they can have members that are other jit classes (this post shows how - undocumented feature afaik - https://stackoverflow.com/questions/38682260/how-to-nest-numba-jitclass). You can use lists and arrays - but there are some caveats to this and if you need other data structures or would benefit from more oop it gets really tough. And be careful with exception handling - you will compile fine, but it tends to kick you out to object mode (slow mode). I recommend avoiding dealing with strings in a jit class (lost a weekend to that) - supported in stand alone functions though.

My project would now be better served by cython - it is much better suited for optimizing a wider part of the code base and plays well with lots of python libraries. This is a heavier burden up front.

OK that makes sense. Cool to hear some positive stories there. I found I couldn’t get a random forest implementation to run fast with numba - possibly similar reasons. Need cython there too.

To come back to the main topic (big data) I tried many different things to load, transform and preprocess data from the groceries competitions (which have a csv file of size ~5gb). I have a machine with 32gb of ram and even by loading the data with the right types (by using int8 instead of the default int64) I quickly ran out of ram because as soon as you merge and transform the data you are creating new dimensions and eventually duplicate dataframes to work on them.

Of course you could just work on a subset of data but lets say you are using RNNs to load your data in batch, it would mean that you will find yourself doing preprocessing (merge, transformation etc…) each time you extract a batch from the whole data. But this is really inneficient as:

  • You are limited to few operations (you can forget mean/median operations for instance)
  • You face new challenges (what if the batch you extracted do not have the right indexes to be merged with the other tables?)
  • You are doing preprecessing for each batch which means you are loosing the optimizations made for preprocessing large amount of data and you are also preprocessing each time you rerun your neural network.

So basically: Doing preprocessing after extracting in batches -> Trash
I need to perform all the operations on all the data.
So I ran into Dask and tried to use it as a replacement for pandas. Big mistake… As they say on their website, Dask is not a replacement for pandas as it does not have all the capabilities Pandas has. While it was very useful for merging the tables, doing preprocessing with it was a pain in the ass as it lacks some useful functions pandas has so you find yourself switching between the 2 very often and sometimes well… your memory explodes.

While @jeremy’s method is valid (get a aws instance with a lot of ram then get back the result on your machine), I find it more being a hack than a sustainable solution. I think the issue with such hack is that everytime you find you transformed your data incorrectly you’ll spawn a new aws instance again and pay a little fee (in term of time and $$) everytime you do that.

I think next time I’ll face such issues I’ll just use Spark as @yinterian advised. I also found this tool (which run on Spark) to have a very clear and neat API.

1 Like