Most effective ways to merge "big data" on a single machine

One other option could be dump the csv into a Table like SQL or Redshift or even Google Big Query and Pandas can read from all of them. So you can do Sampling or process them in Chunks. Most DL methods are using mini-batch, so the Big Data is not required to be on the machine, just needs to be accessible.

7 Likes

As an option you can upload csv’s to S3 and use Amazon Athena and/or Amazon Glue to merge and transform datasets with SQL queries. It is essentially managed presto/spark serverless engines under the hood.
As benefits of those - no need to spin any clusters and Glue would learn data schemas automatically for you.
http://docs.aws.amazon.com/athena/latest/ug/glue-best-practices.html

2 Likes

This is the key insight :slight_smile:

I think this is the right place to post this question, since people are discussing storage formats above.

I have been trying to use feather with the grocery forecasting Kaggle competition and even after I optimize data types, saving and reading the dataframe to feather format is very very slow. It was taking 10-15 minutes for each operation on the joined dataframe.

Is there something I am missing? Everywhere I read says feather is supposed to be extremely fast and this is not that big of a dataset for in memory.

That’s not related to feather - feather format is simply saving a copy on disk.

Sorry, I meant the df.to_feather() and pd.read_feather() operations.are taking that long. It has been faster to read in the original csv files and go through my entire feature engineering process than just read in the already processed feather files that were saved from before.

Oh that’s odd. Maybe check how big those files are? It could just be a slow hard drive.

Admittedly I still need to dig into Feather and bcolz more, but for @mark-hoffmann 's case numpy could address some of these issues.

I make heavy use of numpy record arrays at work and am always surprised they aren’t more well known. You get named columns, binary data format, compression and memory mapping from numpy. numpy .savez and load for .npz is far faster than reading .csv for large data. If you ever find the need to accelerate your code, being a core numpy type, record arrays play very nicely with numba (where dataframes do not) and cython - this is a big reason I use them.

You can pass them to a dataframe constructor and get them back by df.to_records().

There are caveats as they are basically arrays of structs - they aren’t as nice to work with if you have a lot of text data (I use fixed width byte strings - you can convert back using byte_string.decode(…), but pandas handles as objects if I recall correctly). They also aren’t great for resizing -> I really just use them as an efficient data format.

Sorry - this doesn’t talk to the merge issue - curious what others find. Feather and bcolz are both columnar formats which should be much better in that case.

I believe feather should be quite a bit faster than numpy record arrays AFAIK.

Feather looks good - being able to target it in many languages is valuable - I’ll put it back in the test loop as we move away from numba.

As far as read, write and data size performance - I did some tests on a 1.7 GB data set. Timings and sizes are almost identical for numpy record arrays and feather. Standard ndarrays were 2x slower and wound up 20% larger on disk though. Of course this was just a small test (caveat,caveat…)

1 Like

Thanks for testing!

Why are you moving away from numba? What project is that for? (I’m a fan of numba, so curious to hear about folks finding it not working for them)

I’ll start by saying I really do like numba - it is awesome when its the right tool for the job. Numba is awesome for speeding up tight loops. I’ve written numba versions of iterative stats routines that provide 2 and 3 order of magnitude speedups over the rolling stats in pandas in some cases (note that the speed ups are due to the algo - same impl run around the same speed - its just so easy to do this with numba).

This stuff is all for trading simulators / back testers. The core of one of our back testers is optimized with numba. In its current state, numba is restrictive - mainly in the limited ability to use objects, strings and python libraries outside of numpy (hence record arrays being so useful to me).

You can use numba jit classes (but can not inherit) and they can have members that are other jit classes (this post shows how - undocumented feature afaik - https://stackoverflow.com/questions/38682260/how-to-nest-numba-jitclass). You can use lists and arrays - but there are some caveats to this and if you need other data structures or would benefit from more oop it gets really tough. And be careful with exception handling - you will compile fine, but it tends to kick you out to object mode (slow mode). I recommend avoiding dealing with strings in a jit class (lost a weekend to that) - supported in stand alone functions though.

My project would now be better served by cython - it is much better suited for optimizing a wider part of the code base and plays well with lots of python libraries. This is a heavier burden up front.

OK that makes sense. Cool to hear some positive stories there. I found I couldn’t get a random forest implementation to run fast with numba - possibly similar reasons. Need cython there too.

To come back to the main topic (big data) I tried many different things to load, transform and preprocess data from the groceries competitions (which have a csv file of size ~5gb). I have a machine with 32gb of ram and even by loading the data with the right types (by using int8 instead of the default int64) I quickly ran out of ram because as soon as you merge and transform the data you are creating new dimensions and eventually duplicate dataframes to work on them.

Of course you could just work on a subset of data but lets say you are using RNNs to load your data in batch, it would mean that you will find yourself doing preprocessing (merge, transformation etc…) each time you extract a batch from the whole data. But this is really inneficient as:

  • You are limited to few operations (you can forget mean/median operations for instance)
  • You face new challenges (what if the batch you extracted do not have the right indexes to be merged with the other tables?)
  • You are doing preprecessing for each batch which means you are loosing the optimizations made for preprocessing large amount of data and you are also preprocessing each time you rerun your neural network.

So basically: Doing preprocessing after extracting in batches -> Trash
I need to perform all the operations on all the data.
So I ran into Dask and tried to use it as a replacement for pandas. Big mistake… As they say on their website, Dask is not a replacement for pandas as it does not have all the capabilities Pandas has. While it was very useful for merging the tables, doing preprocessing with it was a pain in the ass as it lacks some useful functions pandas has so you find yourself switching between the 2 very often and sometimes well… your memory explodes.

While @jeremy’s method is valid (get a aws instance with a lot of ram then get back the result on your machine), I find it more being a hack than a sustainable solution. I think the issue with such hack is that everytime you find you transformed your data incorrectly you’ll spawn a new aws instance again and pay a little fee (in term of time and $$) everytime you do that.

I think next time I’ll face such issues I’ll just use Spark as @yinterian advised. I also found this tool (which run on Spark) to have a very clear and neat API.

1 Like

Probably the easiest way (as mentioned earlier in the thread) is to just use SQL.

1 Like

That’s also a good solution yes :slight_smile: .

I’d also echo the SQL or Spark based approaches others mentioned - they are a great general solutions.

Apologies that I didn’t tie file formats discussion back to the main topic. One reason we cared about it at my shop is that in some pipelines we generate data really quickly and have daily hard time deadlines to finish processing by (you need to finish your data work on yesterday’s market data before the market opens today). There were a couple merge, sort and search steps in the pipeline and all the canned solutions (SQL and the like) struggled - io can become a bottleneck with some of these. In the end, the team wrote its own merging etc approaches over carefully structured binary data formats - it gave a big gains (at least an order of magnitude for speed and data size).

Definitely don’t recommend doing this most the time - but I thought it might be worth mentioning the an odd edge case.

Don’t worry about that that was really great info! :slight_smile:

Interesting, what kind of custom “tool” did they make?

Doesn’t spawning a lot of spark cluster could have speed up the whole thing?

Their solution (as much as I can really talk about it) had some analogs to batching, and became relatively specific to the workflow (ie not easily generalizable) - leveraging knowledge of the entire flow to best ensure relative data locality and optimize for bandwidths at various levels (network, disk, even cache).

I believe the cost of getting equivalent performance with SPARK in the cloud was the deal breaker there. High performance general solutions are amazing for 99+% of use cases. In specific cases you do have to drop all general-ness to get the best solution (in some cases the solution is massively valuable IP - high frequency trading is a canonical example).

1 Like