The Why and How of MapReduce
When do I need to use MapReduce? How can I translate my jobs to Map, Combiner, and Reducer?

MapReduce is a programming technique for manipulating large data sets, whereas Hadoop MapReduce is a specific implementation of this programming technique.
Following is how the process looks in general:
Map(s) (for individual chunk of input) ->
- sorting individual map outputs ->
Combiner(s) (for each individual map output) ->
- shuffle and partition for distribution to reducers ->
- sorting individual reducer input ->
Reducer(s) (for sorted data of group of partitions)Hadoop’s MapReduce In General
Hadoop MapReduce is a framework to write applications that process enormous amounts of data (multi-terabyte) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.
A typical MapReduce job:
- splits the input data-set into independent data sets
- each individual dataset is processed in parallel by the map tasks
- then the framework sorts the outputs of maps,
- this output is then used as input to the reduce tasks
In general, both the input and the output of the job are stored in a file-system.
The Hadoop MapReduce framework takes care of scheduling tasks, monitoring them, and re-execution of the failed tasks.
Generally, the Hadoop’s MapReduce framework and Hadoop Distribution File System (HDFS) run on the same nodes, which means that each node is used for computing and storage both. The benefit of such a configuration is that tasks can be scheduled on the nodes where data resides and thus results in high aggregated bandwidth across the cluster.
The MapReduce framework consists of:
- a single master
ResourceManager(Hadoop YARN), - one worker
NodeManagerper cluster-node, and MRAppMasterper application
The resource manager keeps track of compute resources, assigns them to specific tasks, and schedules jobs across the cluster.
In order to configure a MapReduce job, at minimum, an application specifies:
- input source and output destination
- map and reduce function
A job along with its configuration is then submitted by the Hadoop’s job client to YARN, which is then responsible for distributing it across the cluster, schedules tasks, monitors them, and provides their status back to the job client.
Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java. We can use
Inputs And Outputs Of MapReduce Jobs
For both input and output, the data is stored in key-value pairs. Each key and value class has to be serializable by the MapReduce framework and thus should implement the
The Mapper
A
All values associated with a given output key are subsequently grouped by the framework and passed to the Reducer(s) to determine the final output. The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job.
Shuffle & Sort Phases
The output of individual mapper output is sorted by the framework.
Before feeding data to reducers, the data from all mappers is partitioned by some grouping of keys. Each partition contains data from one or more keys. The data for each partition is sorted by keys. The partitions are then distributed to reducers. Each reducer input data is data from one or more partitions (generally 1:1 ratio).
The Reducer
A
In the reducer phase, the reduce method is called for each <key, (list of values)> pair in the grouped inputs. Note that the output of Reducer is not sorted.
The right number of reducers are generally between 0.95 and 1.75 multiplied by <no. of nodes> * <no. of maximum containers per node>.
The Combiner
We can optionally specify a Combiner (know as local-reducer) to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the the Mapper to Reducer. In many cases, the same reducer code can be used as a combiner as well.
Let’s look at a simple example
Let’s look at a simple example of counting word frequencies. Consider the following mapper.py file:

and following reducer.py file:

We can locally test them as:

But Wait! The previous implementation has a problem…
The previous mapper code is memory intensive as it has to maintain a dictionary of the frequency of all of the unique words present chunk of input for each individual mapper. If memory is a concern even for dict involving individual chunks of inputs, then the better way is to let it just print each word as we encounter (with its frequency as 1). For example,

Now, let’s use new mapper with the same input data:

If it helps, we can also use a combiner (same as reducer code) for local aggregation from each mapper output.

Details on Partitioner And Comparator
A
Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.
HashPartitioner is the default Partitioner. You would need to use different partitioner (or your custom one) if you need to partition data by multiple keys.
Hadoop has a library class, KeyFieldBasedPartitioner which allows the MapReduce framework to partition the map outputs based on certain key fields, not the whole keys. For example, -D mapreduce.partition.keypartitioner.options=-k1,2.
We can control the grouping by specifying a
Hadoop has a library class, KeyFieldBasedComparator, that provides a subset of features provided by Unix/GNU sort function. For example, -D mapreduce.partition.keycomparator.options=-k2,2nr.
The DistributedCache
The DistributedCache, provided by the MapReduce framework, efficiently distributes large read-only files as cache needed by applications.
The Configuration Parameters
We can get job configuration options through environment variables. When we launch the MapReduce application, the framework will assign data to available workers. We can access this data from our scripts. For instance, if we are running a mapper, then we can access the information about the file and slides we are working on. Also, we can get the information on whether we are running a mapper or reducer, which can be important if we are running the same script on the map and reduce phase. We can also access task id within the map or reduce phase with the following environment variables: mapreduce_task_id, mapreduce_task_partition.
The Configured Parameters are localized in the job configuration for each task’s execution. During the execution of a streaming job, the names of the “mapred” parameters are transformed. The dots (.) becomes underscores (_). For example, mapreduce.job.id becomes mapreduce_job_id. In your code, use the parameter names with the underscores.
Updating status in streaming applications:
A streaming process can use the stderr to emit status information. To set a status reporter:status:<message> should be sent to stderr.
Bundled Mappers, Reducers, and Partitions
Hadoop MapReduce comes bundled with a library of generally useful mappers, reducers, and partitioners.
Here are some related interesting stories that you might find helpful:
- Distributed Data Processing with Apache Spark
- Apache Cassandra — Distributed Row-Partitioned Database for Structured and Semi-Structured Data
Gain Access to Expert View — Subscribe to DDI Intel






