2

I have input data stored as a single large file on S3. I want Dask to chop the file automatically, distribute to workers and manage the data flow. Hence the idea of using distributed collection, e.g. bag.

On each worker I have a command line tools (Java) that read the data from file(s). Therefore I'd like to write a whole chunk of data into file, call external CLI/code to process the data and then read the results from output file. This looks like processing batches of data instead of record-at-a-time.

What would be the best approach to this problem? Is it possible to write partition to disk on a worker and process it as a whole?

PS. It nor necessary, but desirable, to stay in a distributed collection model because other operations on data might be simpler Python functions that process data record by record.

1

1 Answer 1

3

You probably want the read_bytes function. This breaks the file into many chunks cleanly split by a delimiter (like an endline). It gives you back a list of dask.delayed objects that point to those blocks of bytes.

There is more information on this documentation page: http://dask.pydata.org/en/latest/bytes.html

Here is an example from the docstring:

>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n')  
Sign up to request clarification or add additional context in comments.

4 Comments

The read_bytes() was on my list of suspects. But I have a question about record delimiters. I would obviously like to specify approximate chunk size to be read by worker (e.g. 20MB) and at the same time a delimiter, because record length vary. How will the framework find out the exact delimiter positions? Will the scheduler read whole file (undesirable)? If file is just chopped into regular pieces then some records are cut in half. In this case a worker needs to know to read from a different ("earlier") index?
The read_bytes functions seeks to a location based on the chunk size and then reads forward until it finds a delimiter. Assuming that your delimiters are spaced somewhat frequently this will be fairly efficient, will respect your approximate chunk size, and will always end at a delimiter and start after one.
I did some experiments with read_bytes and it works ok, however.. Looking at the API I suspect that I might get similar/same effect by calling map_partitions() on a dataframe, running a custom processing on data extracted from each partition and returning a modified dataframe. This way I might be able to stay within the limits of dask dataframe API. Is that correct or am I missing something? (just to clarify, I assumed here that my input data is a CSV file)
Yes, if your file is a large csv file then dd.read_csv is a great way to go

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.