Before you read this section, make sure you understand the concepts described in Distributed Processing
In some of your projects, you might need to match datums from multiple data repositories to process, join, or aggregate. For example, you might need to process multiple records that correspond to a certain user, experiment, or device together. In these scenarios, you can use the following approach:
- Create a pipeline that groups all of the records for a specific key and index.
- Create another pipeline that takes that grouped output and performs the merging, joining, or other processing for the group.
You can use these two data-combining pipelines for merging or grouped processing of data from various experiments, devices, and so on. You can also apply the same pattern to perform distributed joins of tabular data or data from database tables. For example, you can join user email records together with user IP records on the key/index of a user ID.
You can parallelize each of the stages across workers to scale with the size of your data and the number of data sources that you want to merge.
Tip: If your data is not split into separate files for each record. In these cases, you can split it automatically as described in Splitting Data for Distributed Processing to prepare your data for this sort of distributed merging.
Grouping the records that need to be processed together¶
In this example, you have two repositories
with JSON records.
These repositories may correspond to two experiments, two geographic
regions, two different devices that generate data, or other.
A has the following structure:
$ pachctl list file A@master NAME TYPE SIZE 1.json file 39 B 2.json file 39 B 3.json file 39 B
B has the following structure:
$ pachctl list file B@master NAME TYPE SIZE 1.json file 39 B 2.json file 39 B 3.json file 39 B
If you want to process
B/1.json to merge
their contents or otherwise process them together, you need to
group each set of JSON records into respective datums that
the pipelines that you create in
Processing the grouped records
can process together.
The grouping pipeline takes a union of
B as inputs,
each with glob pattern
/*. As the pipeline processes a JSON file,
the data is copied to a folder in the output corresponding to the
key and index for that record. In this example, it is just the
number in the file name. Pachyderm also renames the files to
unique names that correspond to the source:
/1 A.json B.json /2 A.json B.json /3 A.json B.json
When you group your data, set the following parameters in the pipeline specification:
- In the “pfs” section, set
"empty_files": trueto avoid unnecessary downloads of data.
- Use symlinks to avoid unnecessary uploads of data and unnecessary data duplication.
For more information, read about copy elision in Data Management.
Processing grouped records¶
After you group the records together by using the grouping pipeline, use
the pipeline in this section on the
group repository. This pipeline
as input with a glob pattern of
/*. By using the glob pattern of
the pipeline can process each grouping of records in parallel.
The second pipeline performs merging, aggregation, or other processing on the respective grouping of records. It can also output each respective result to the root of the output directory:
$ pachctl list file merge@master NAME TYPE SIZE result_1.json file 39 B result_2.json file 39 B result_3.json file 39 B