Splitting Data for Distributed Processing¶
As described in the distributed computing with Pachyderm docs, Pachyderm allows you to parallelize computations over data as long as that data can be split up into multiple “datums.” However, in many cases, you might have a data set that you want or need to commit into Pachyderm as a single file, rather than a bunch of smaller files (e.g., one per record) that are easily mapped to datums. In these cases, Pachyderm provides an easy way to automatically split your data set for subsequent distributed computing.
Let’s say that we have a data set consisting of information about our users. This data is in CSV format in a single file,
user_data.csv, with one record per line:
$ head user_data.csv 1,email@example.com,22.214.171.124 2,firstname.lastname@example.org,126.96.36.199 3,email@example.com,188.8.131.52 4,firstname.lastname@example.org,184.108.40.206 5,email@example.com,220.127.116.11 6,firstname.lastname@example.org,18.104.22.168 7,email@example.com,22.214.171.124 8,firstname.lastname@example.org,126.96.36.199 9,email@example.com,188.8.131.52 10,firstname.lastname@example.org,184.108.40.206
If we just put this into Pachyderm as a single file, we could not subsequently process each of these user records in parallel as separate “datums” (see this guide for more information on datums and distributed computing). Of course, you could manually separate out each of these user records into separate files before you commit them into the
users repo or via a pipeline stage dedicated to this splitting task. This would work, but Pachyderm actually makes it much easier for you.
put file API includes an option for splitting up the file into separate datums automatically. You can do this with the
pachctl CLI tool via the
--split flag on
put file. For example, to automatically split the
user_data.csv file up into separate datums for each line, you could execute the following:
$ pachctl put file users@master -f user_data.csv --split line --target-file-datums 1
--split line argument specifies that Pachyderm should split this file on lines, and the
--target-file-datums 1 arguments specifies that each resulting file should include at most one “datum” (or one line). Note, that Pachyderm will still show the
user_data.csv entity to you as one entity in the repo:
$ pachctl list file users@master NAME TYPE SIZE user_data.csv dir 5.346 KiB
But, this entity is now a directory containing all of the split records:
$ pachctl list file users@master:user_data.csv NAME TYPE SIZE user_data.csv/0000000000000000 file 43 B user_data.csv/0000000000000001 file 39 B user_data.csv/0000000000000002 file 37 B user_data.csv/0000000000000003 file 34 B user_data.csv/0000000000000004 file 35 B user_data.csv/0000000000000005 file 41 B user_data.csv/0000000000000006 file 32 B etc...
A pipeline that then takes the repo
users as input with a glob pattern of
/user_data.csv/* would process each user record (i.e., each line of the CSV) in parallel.
This is, of course, just one example. Right now, Pachyderm supports this type of splitting on lines or on JSON blobs. Here are a few more examples:
# Split a json file on json blobs, putting # each json blob into it's own file. $ pachctl put file users@master -f user_data.json --split json --target-file-datums 1 # Split a json file on json blobs, putting # 3 json blobs into each split file. $ pachctl put file users@master -f user_data.json --split json --target-file-datums 3 # Split a file on lines, putting each 100 # bytes chunk into the split files. $ pachctl put file users@master -f user_data.txt --split line --target-file-bytes 100
PG Dump / SQL Support¶
You can also ingest data from postgres using split file.
- Generate your PG Dump file
$ pg_dump -t users -f users.pgdump $ cat users.pgdump -- -- PostgreSQL database dump -- -- Dumped from database version 9.5.12 -- Dumped by pg_dump version 9.5.12 SET statement_timeout = 0; SET lock_timeout = 0; SET client_encoding = 'UTF8'; SET standard_conforming_strings = on; SELECT pg_catalog.set_config('search_path', '', false); SET check_function_bodies = false; SET client_min_messages = warning; SET row_security = off; SET default_tablespace = ''; SET default_with_oids = false; -- -- Name: users; Type: TABLE; Schema: public; Owner: postgres -- CREATE TABLE public.users ( id integer NOT NULL, name text NOT NULL, saying text NOT NULL ); ALTER TABLE public.users OWNER TO postgres; -- -- Data for Name: users; Type: TABLE DATA; Schema: public; Owner: postgres -- COPY public.users (id, name, saying) FROM stdin; 0 wile E Coyote ... 1 road runner \\. \. -- -- PostgreSQL database dump complete --
- Ingest SQL data using split file
When you use
pachctl put file --split sql ... your pg dump file is split into
three parts - the header, rows, and the footer. The header contains all the SQL
statements in the pg dump that setup the schema and tables. The rows are split
into individual files (or if you specify the
--target-file-bytes multiple rows per file). The footer contains the remaining
SQL statements for setting up the tables.
The header and footer are stored on the directory containing the rows. This way,
if you request a
get file on the directory, you’ll get just the header and
footer. If you request an individual file, you’ll see the header plus the row(s)
plus the footer. If you request all the files with a glob pattern, e.g.
/directoryname/*, you’ll receive the header plus all the rows plus the footer,
recreating the full pg dump. In this way, you can construct full or partial
pg dump files so that you can load full or partial data sets.
$ pachctl put file data@master -f users.pgdump --split sql $ pachctl put file data@master:users --split sql -f users.pgdump $ pachctl list file data@master NAME TYPE SIZE users dir 914B $ pachctl list file data@master:users NAME TYPE SIZE /users/0000000000000000 file 20B /users/0000000000000001 file 18B
Then in your pipeline (where you’ve started and forked postgres), you can load the data by doing something like:
$ cat /pfs/data/users/* | sudo -u postgres psql
And with a glob pattern
/* this code would load each raw postgres chunk
into your postgres instance for processing by your pipeline.
For this use case, you’ll likely want to use
--target-file-bytes since it’s likely that you’ll want to run your queries
against many rows at a time.