Example 1 - Count Last Names

The canonical map/reduce example: count the occurrences of words in a document. In this case, we’ll count the occurrences of last names in a data file containing lines of json.

Input

Here’s the input data:

diana@ubuntu:~$ cat data.txt
{"first":"Homer", "last":"Simpson"}
{"first":"Manjula", "last":"Nahasapeemapetilon"}
{"first":"Herbert", "last":"Powell"}
{"first":"Ruth", "last":"Powell"}
{"first":"Bart", "last":"Simpson"}
{"first":"Apu", "last":"Nahasapeemapetilon"}
{"first":"Marge", "last":"Simpson"}
{"first":"Janey", "last":"Powell"}
{"first":"Maggie", "last":"Simpson"}
{"first":"Sanjay", "last":"Nahasapeemapetilon"}
{"first":"Lisa", "last":"Simpson"}
{"first":"Maggie", "last":"Términos"}

Inferno Rule

And here’s the Inferno rule (inferno/example_rules/names.py):

def count(parts, params):
    parts['count'] = 1
    yield parts


InfernoRule(
    name='last_names_json',
    source_tags=['example:chunk:users'],
    map_input_stream=chunk_json_keyset_stream,
    parts_preprocess=[count],
    key_parts=['last'],
    value_parts=['count'],
)

DDFS

The first step is to place the data file in Disco’s Distributed Filesystem (DDFS). Once placed in DDFS, a file is referred to by Disco as a blob.

DDFS is a tag-based filesystem. Instead of organizing files into directories, you tag a collection of blobs with a tag_name for lookup later.

In this case, we’ll be tagging our data file as example:chunk:users.

tag_name -> [blob1, blob2, blob3]

Make sure Disco is running:

diana@ubuntu:~$ disco start
Master ubuntu:8989 started

Place the input data in DDFS:

diana@ubuntu:~$ ddfs chunk example:chunk:users ./data.txt
created: disco://localhost/ddfs/vol0/blob/99/data_txt-0$533-406a9-e50

Verify that the data is in DDFS:

diana@ubuntu:~$ ddfs xcat example:chunk:users | head -2
{"first":"Homer", "last":"Simpson"}
{"first":"Manjula", "last":"Nahasapeemapetilon"}

Map Reduce

For the purpose of this introductory example, think of an Inferno map/reduce job as a series of four steps, where the output of each step is used as the input to the next.

input -> map -> reduce -> output

Input

The input step of an Inferno map/reduce job is responsible for parsing and readying the input data for the map step.

If you’re using Inferno’s built in keyset map/reduce functionality, this step mostly amounts to transforming your CSV or JSON input into python dictionaries.

The default Inferno input reader is chunk_csv_keyset_stream, which is intended for CSV data that was placed in DDFS using the ddfs chunk command.

If the input data is lines of JSON, you would instead set the map_input_stream to use the chunk_json_keyset_stream reader in your Inferno rule.

The input reader will process all DDFS tags that are prefixed with the tag names defined in source_tags of your Inferno rule.

 InfernoRule(
     name='last_names_json',
     source_tags=['example:chunk:users'],
     map_input_stream=chunk_json_keyset_stream,
     parts_preprocess=[count],
     key_parts=['last'],
     value_parts=['count'],
 )

Example data transition during the input step:

data -> input

Map

The map step of an Inferno map/reduce job is responsible for extracting the relevant key and value parts from the incoming python dictionaries and yielding one, none, or many of them for further processing in the reduce step.

Inferno’s default map_function is the keyset_map. You define the relevant key and value parts by declaring key_parts and value_parts in your Inferno rule.

 InfernoRule(
     name='last_names_json',
     source_tags=['example:chunk:users'],
     map_input_stream=chunk_json_keyset_stream,
     parts_preprocess=[count],
     key_parts=['last'],
     value_parts=['count'],
 )

Example data transition during the map step:

input -> map

Reduce

The reduce step of an Inferno map/reduce job is responsible for summarizing the results of your map/reduce query.

Inferno’s default reduce_function is the keyset_reduce. It will sum the value parts yielded by the map step, grouped by the key parts.

In this example, we’re only summing one value (the count). You can define and sum many value parts, as you’ll see here in the next example.

Example data transition during the reduce step:

map -> reduce

Output

Unless you create and specify your own result_processor, Inferno defaults to the keyset_result processor which simply uses a CSV writer to print the results from the reduce step to standard output.

Other common result processor use cases include: populating a cache, persisting to a database, writing back to DDFS or DiscoDB, etc.

Example data transition during the output step:

reduce -> output

Execution

Run the last name counting job:

diana@ubuntu:~$ inferno -i names.last_names_json
2012-03-09 Processing tags: ['example:chunk:users']
2012-03-09 Started job last_names_json@533:40914:c355f processing 1 blobs
2012-03-09 Done waiting for job last_names_json@533:40914:c355f
2012-03-09 Finished job job last_names_json@533:40914:c355f

The output:

last,count
Nahasapeemapetilon,3
Powell,3
Simpson,5
Términos,1