Lei Luo Machine Learning Engineer

# MapReduce Algorithm Design

2018-08-27

## Introduction

A large part of the power of MapReduce comes from its simplicity: in addition to preparing the input data, the programmer needs only to implement the mapper, the reducer, and optionally, the combiner and the partitioner. All other aspects of execution are handled transparently by the execution framework on clusters ranging from a single node to a few thousand nodes, over datasets ranging from gigabytes to petabytes. In this post, some examples will be provided to illustrate what can be thought of as “design patterns” for MapReduce, which instantiate arrangements of components and specific techniques designed to handle frequently-encountered situations across a variety of problem domains.

Synchronization is perhaps the most tricky aspect of designing MapReduce algorithms. Other than embarrassingly-parallel problems, processes running on separate nodes in a cluster must, at some point in time, come together for example, to distribute partial results from nodes that produced them to the nodes that will consume them. Within a single MapReduce job, there is only one opportunity for cluster-wide synchronization during the shuffle and sort stage where intermediate key-value pairs are copied from the mappers to the reducers and grouped by key. The programmer haves a number of techniques for controlling execution and managing the flow of data in MapReduce:

• The ability to construct complex data structures as keys and values to store and communicate partial results.

• The ability to execute user-specified initialization code at the beginning of a map or reduce task, and the ability to execute user-specified termination code at the end of a map or reduce task.

• The ability to preserve state in both mappers and reducers across multiple input or intermediate keys.

• The ability to control the sort order of intermediate keys, and therefore the order in which a reducer will encounter particular keys.

• The ability to control the partitioning of the key space, and therefore the set of keys that will be encountered by a particular reducer.

This post explains how various techniques to control code execution and data flow can be applied to design algorithms in MapReduce.

## Local Aggregation

In the previous post where a word example was given we have briefly touched on combiners, which provide a general mechanism within the MapReduce framework to reduce the amount of intermediate data generated by the mappers recall that they can be understood as “mini-reducers” that process the output of mappers.

An improvement on the basic algorithm is shown below. An associative array (i.e., Map in Java) is introduced inside the mapper to tally up term counts within a single document: instead of emitting a key-value pair for each term in the document, this version emits a key-value pair for each unique term in the document.

This basic idea can be taken one step further, as illustrated in the variant of the word count algorithm in graph below. In this example, we can continue Map to accumulate partial term counts in the associative array across multiple documents, and emit key-value pairs only when the mapper has processed all documents since it is possible to preserve state across multiple calls of the method (for each input key-value pair). That is, emission of intermediate data is deferred until the Close method in the pseudo-code.

With this technique, we are in essence incorporating combiner functionality di rectly inside the mapper. This is a sufficiently common design pattern in MapReduce that it’s worth giving it a name, “in-mapper combining”.

There is a fundamental scalability bottleneck associated with the in-mapper combining pattern. It critically depends on having sufficient memory to store intermediate results until the mapper has completely processed all key-value pairs in an input split. One common solution to limiting memory usage when using the in-mapper combining technique is to “block” input key-value pairs and flush” in-memory data structures periodically. The idea is simple: instead of emitting intermediate data only after every key-value pair has been processed, emit partial results after processing every $n$ key-value pairs.

## Pairs and Stripes

One common approach for synchronization in MapReduce is to construct complex keys and values in such a way that data necessary for a computation are naturally brought together by the execution framework.

To illustrate we will look at the problem of building word co-occurrence matrices from large corpora, a common task in corpus linguistics and statistical natural language processing. Formally, the co-occurrence matrix of a corpus is a square $n \times n$ matrix where $n$ is the number of unique words in the corpus (i.e., the vocabulary size). A cell $m_{ij}$ contains the number of times word $w_{i}$ co-occurs with word $w_{j}$ within a specific context – natural unit such as a sentence, paragraph, or a document, or a certain window of $m$ words.

The pseudo-code for this problem using the “pairs” approach is shown below. This is straightforwardly accomplished by two nested loops: the outer loop iterates over all words (the left element in the pair), and the inner loop iterates over all neighbors of the first word (the right element in the pair). The neighbors of a word can either be defined in terms of a sliding window or some other contextual unit such as a sentence.

An alternative approach, dubbed the “stripes” approach, is presented in the graph below. the major difference is that instead of emitting intermediate key-value pairs for each co-occurring word pair, co-occurrence information is first stored in an associative array, denoted $H$. The mapper emits key-value pairs with words as keys and corresponding associative arrays as values, where each associative array encodes the co-occurrence counts of the neighbors of a particular word.

It is immediately obvious that the pairs algorithm generates an immense number of key-value pairs compared to the stripes approach. The stripes representation is much more compact, since with pairs the left element is repeated for every co-occurring word pair. The stripes approach also generates fewer and shorter intermediate keys, and therefore the execution framework has less sorting to perform.

The stripes approach makes the assumption that, at any point in time, each associative array is small enough to fit into memory otherwise, memory paging will significantly impact performance. The pairs approach, on the other hand, does not suffer from this limitation, since it does not need to hold intermediate data in memory.

## Secondary Sorting

MapReduce sorts intermediate key-value pairs by the keys during the shuffle and sort phase, but what if in addition to sorting by key, we also need to sort by value? Google’s MapReduce implementation provides built-in functionality for (optional) secondary sorting, which guarantees that values arrive in sorted order. Hadoop, unfortunately, does not have this capability built in.

There is a general purpose solution, which we call the “value-to-key conversion” design pattern. The basic idea is to move part of the value into the intermediate key to form a composite key, and let the MapReduce execution framework handle the sorting.

The basic tradeoff between the two approaches discussed above (buffer and in memory sort vs. value-to-key conversion) is where sorting is performed. With value-to-key conversion, sorting is offloaded to the MapReduce execution framework. Note that this approach can be arbitrarily extended to tertiary, quaternary, etc. sorting. This pattern results in many more keys for the framework to sort, but distributed sorting is a task that the MapReduce runtime excels at since it lies at the heart of the programming model.

## Reduce-side Join

The first approach to relational joins is what’s known as a reduce-side join. The idea is quite simple: we map over both datasets and emit the join key as the intermediate key, and the tuple itself as the intermediate value. Since MapReduce guarantees that all values with the same key are brought together, all tuples will be grouped by the join key which is exactly what we need to perform the join operation. This approach is known as a parallel sort-merge join in the database community.

Assuming we would like to perform relational joins on two datasets (relations), generically named $S$ and $T$. Let us suppose that relation $S$ looks something like the following:

$(k_{1},s_{1},S_{1})$ $(k_{2},s_{2},S_{2})$ $(k_{3},s_{3},S_{3})$ $…$

$T$ looks something like this:

$(k_{1},t_{1},T_{1})$ $(k_{2},t_{2},T_{2})$ $(k_{3},t_{3},T_{3})$ $…$

where $k$ is the join key, $s_{n}$ and $t_{n}$ are unique id for the tuple. $S_{1}$ and $T_{n}$ denote data.

The first and simplest is a one-to-one join, where at most one tuple from $S$ and one tuple from $T$ share the same join key. In this case, the algorithm sketched above will work fine.

Let us now consider the one-to-many join. Assume that tuples in $S$ have unique join keys (i.e., $k$ is the primary key in $S$), so that $S$ is the “one” and $T$ is the “many”. The above algorithm will still work, but when processing each key in the reducer, we have no idea when the value corresponding to the tuple from $S$ will be encountered, since values are arbitrarily ordered. The easiest solution is to buffer all values in memory, pick out the tuple from $S$, and then cross it with every tuple from $T$ to perform the join. However, as we have seen several times already, this creates a scalability bottleneck since we may not have sufficient memory to hold all the tuples with the same join key. This is a problem that requires a secondary sort, and the solution lies in the value-to-key conversion design pattern we just presented.

Finally, let us consider the many-to-many join case. Assuming that S is the smaller dataset, the above algorithm still works as well. The basic idea behind the reduce-side join is to repartition the two datasets by the join key. The approach isn’t particularly efficient since it requires shuffling both datasets across the network. This leads us to the map-side join.

## Map-side Join

Suppose we have two datasets that are both sorted by the join key. We can perform a join by scanning through both datasets simultaneously|this is known as a merge join. We can parallelize this by partitioning and sorting both datasets in the same way. For example, suppose $S$ and $T$ were both divided into ten files, partitioned in the same manner by the join key. Further suppose that in each file, the tuples were sorted by the join key. In this case, we simply need to merge join the first file of $S$ with the first file of $T$, the second file with $S$ with the second file of $T$, etc. This can be accomplished in parallel, in the map phase of a MapReduce job|hence, a map-side join. A map-side join is far more efficient than a reduce-side join since there is no need to shuffle the datasets over the network.

## Memory-backed Join

The simplest version of memory-backed join is applicable when one of the two datasets completely fits in memory on each node. In this situation, we can load the smaller dataset into memory in every mapper, populating an associative array to facilitate random access to tuples based on the join key. Mappers are then applied to the other (larger) dataset, and for each input key-value pair, the mapper probes the in-memory dataset to see if there is a tuple with the same join key. If there is, the join is performed. This is known as a simple hash join by the database community.

What if neither dataset fits in memory? The simplest solution is to divide the smaller dataset into partitions are small enough to fit in memory, and then run memory-backed hash joins.

There is an alternative approach to memory-backed joins for cases where neither datasets fit into memory. A distributed key-value store can be used to hold one dataset in memory across multiple machines while mapping over the other. The mappers would then query this distributed key-value store in parallel and perform joins if the join keys match.

Disclaimer: This post includes my personal reflections and notes on reading Data-Intensive Text Processing with MapReduce by Jimmy Lin and Chris Dyer. Some texts and images are from the book for better educational purposes.

## Similar Posts

Previous post MapReduce Basics