Lei Luo Machine Learning Engineer

MapReduce Basics

2018-08-27

Introduction

The only feasible approach to tackling large-data problems today is to divide and conquer. The basic idea is to partition a large problem into smaller subproblems. To the extent that the sub-problems are independent, they can be tackled in parallel by different workers threads in a processor core, cores in a multi-core processor, multiple processors in a machine, or many machines in a cluster. Intermediate results from each individual worker are then combined to yield the final output. There are several issues that need to be addressed:

  • How do we break up a large problem into smaller tasks?
  • How do we assign tasks to workers distributed across a potentially large number of machines?
  • How do we ensure that the workers get the data they need?
  • How do we coordinate synchronization among the different workers?
  • How do we share partial results from one worker that is needed by another?
  • How do we accomplish all of the above in the face of software errors and hardware faults?

In traditional parallel or distributed programming environments, the developer needs to explicitly address many of the above issues. One of the most signifcant advantages of MapReduce is that it provides an abstraction that hides many system-level details from the programmer. Instead of moving large amounts of data around, it is far more efficient, if possible, to move the code to the data. This is operationally realized by spreading data across the local disks of nodes in a cluster and running processes on nodes that hold the data.

Mappers and Reducers

Keys and values may be primitives such as integers, floating point values, strings, and raw bytes, or they may be arbitrarily complex structures (lists, tuples, associative arrays, etc.). Programmers typically need to define their own custom data types. Part of the design of MapReduce algorithms involves imposing the key-value structure on arbitrary datasets.

In MapReduce, the programmer defines a mapper and a reducer with the following signatures:

map: $(k_{1},v_{1}) \rightarrow [(k_{2},v_{2})]$

reduce:$ (k_{2},[v_{2}]) \rightarrow [(k_{3},v_{3})] $

The convention $[…]$ is used to denote a list. The mapper is applied to every input key-value pair (split across an arbitrary number of files) to generate an arbitrary number of intermediate key-value pairs. The reducer is applied to all values associated with the same intermediate key to generate output key-value pairs.

A simple word count algorithm in MapReduce is shown below. This algorithm counts the number of occurrences of every word in a text collection. Input key-values pairs take the form of (docid, doc) pairs stored on the distributed file system, where the former is a unique identifier for the document, and the latter is the text of the document itself. The mapper takes an input key-value pair, tokenizes the document, and emits an intermediate key-value pair for every word: the word itself serves as the key, and the integer one serves as the value (denoting that we’ve seen the word once).

The MapReduce execution framework guarantees that all values associated with the same key are brought together in the reducer. Therefore, in our word count algorithm, we simply need to sum up all counts (ones) associated with each word. Final output is written to the distributed file system, one file per reducer.

There are some differences between the Hadoop implementation of MapReduce and Google’s implementation. In Hadoop, the reducer is presented with a key and an iterator over all values associated with the particular key. The values are arbitrarily ordered. Google’s implementation allows the programmer to specify a secondary sort key for ordering the values (if desired) in which case values associated with each key would be presented to the developer’s reduce code in sorted order. Another difference: in Google’s implementation the programmer is not allowed to change the key in the reducer. That is, the reducer output key must be exactly the same as the reducer input key. In Hadoop, there is no such restriction, and the reducer can emit an arbitrary number of output key-value pairs (with different keys).

In addition to the “canonical” MapReduce processing flow, other variations are also possible. MapReduce programs can contain no reducers, in which case mapper output is directly written to disk. For embarrassingly parallel problems, e.g., parse a large text collection or independently analyze a large number of images, this would be a common pattern. In some cases it is useful for the reducer to implement the identity function, in which case the program simply sorts and groups mapper output. Finally, running identity mappers and reducers has the effect of regrouping and resorting the input data (which is sometimes useful).

The Execution Framework

One of the most important idea behind MapReduce is separating the what of distributed processing from the how. A MapReduce program, referred to as a job, consists of code for mappers and reducers (as well as combiners and partitioners to be discussed in the next section) packaged together with configuration parameters (such as where the in put lies and where the output should be stored). The developer submits the job to the submission node of a cluster (in Hadoop, this is called the jobtracker) and execution framework (sometimes called the “runtime”) takes care of everything else: it transparently handles all other aspects of distributed code execution, on clusters ranging from a single node to a few thousand nodes. Specific responsibilities include:

  • Scheduling
    Each MapReduce job is divided into smaller units called tasks. Another aspect of scheduling involves coordination among tasks belonging to different jobs (e.g., from different users). Speculative execution is an optimization that is implemented by both Hadoop and Google’s MapReduce implementation (called “backup tasks”). Due to the barrier between the map and reduce tasks, the map phase of a job is only as fast as the slowest map task. Similarly, the completion time of a job is bounded by the running time of the slowest reduce task. As a result, the speed of a MapReduce job is sensitive to what are known as stragglers, or tasks that take an usually long time to complete. One cause of stragglers is flaky hardware. With speculative execution, an identical copy of the same task is executed on a different machine, and the framework simply uses the result of the first task attempt to finish.

  • Data/code co-location
    The phrase data distribution is misleading, since one of the key ideas behind MapReduce is to move the code, not the data. However, the more general point remains|in order for computation to occur, we need to somehow feed data to the code.

  • Synchronization
    In general, synchronization refers to the mechanisms by which multiple concurrently running processes “join up”. In MapReduce, synchronization is accomplished by a barrier between the map and reduce phases of processing. Intermediate key-value pairs must be grouped by key, which is accomplished by a large distributed sort involving all the nodes that executed map tasks and all the nodes that will execute reduce tasks. This necessarily involves copying intermediate data over the network, and therefore the process is commonly known as “shuffle and sort”.

Note that the reduce computation cannot start until all the mappers have finished emitting key-value pairs and all intermediate key-value pairs have been shuffled and sorted, since the execution framework cannot otherwise guarantee that all values associated with the same key have been gathered.

  • Error and Fault Handling
    The MapReduce execution framework must accomplish all the tasks above in an environment where errors and faults are the norm, not the exception. Since MapReduce was explicitly designed around low-end commodity servers, the runtime must be especially resilient. In large clusters, disk failures are common and RAM experiences more errors than one might expect.

And that’s just hardware. No software is bug free exceptions must be appropriately trapped, logged, and recovered from. Large-data problems have a penchant for uncovering obscure corner cases in code that is otherwise thought to be bug-free. The MapReduce execution framework must thrive in this hostile environment.

Partitioners and Combiners

There are two additional elements that complete the programming model: partitioners and combiners. Partitioners are responsible for dividing up the intermediate key space and assigning intermediate key-value pairs to reducers. Combiners are an optimization in MapReduce that allow for local aggregation before the shuffle and sort phase.

In the previous exmaple of word count all the key-value pairs need to be copied across the network, and so the amount of intermediate data will be larger than the input collection itself. This is clearly inefficient. One solution is to perform local aggregation on the output of each mapper, i.e., to compute a local count for a word over all the documents processed by the mapper. With this modification (See the graph below. Assuming the maximum amount of local aggregation possible), the number of intermediate key-value pairs will be at most the number of unique words in the collection times the number of mappers. In many cases, proper use of combiners can spell the difference between an impractical algorithm and an efficient algorithm. A combiner can significantly reduce the amount of data that needs to be copied over the network, resulting in much faster algorithms.



Disclaimer: This post includes my personal reflections and notes on reading Data-Intensive Text Processing with MapReduce by Jimmy Lin and Chris Dyer. Some texts and images are from the book for better educational purposes.


Similar Posts

Comments