avatarMunish Goyal

Summary

The provided content explains the MapReduce programming model, its implementation in Hadoop, and the process of configuring and executing MapReduce jobs for large-scale data processing tasks.

Abstract

MapReduce is a programming paradigm designed for processing large data sets with a parallel, distributed algorithm on a cluster. The Hadoop MapReduce framework is a specific implementation that allows for the handling of multi-terabyte data sets across thousands of nodes in a fault-tolerant manner. The process involves splitting input data into independent chunks, which are then processed by map tasks in parallel. The outputs of these maps are sorted, combined, and then sent to reduce tasks to aggregate the results. The framework manages task scheduling, monitoring, and failure recovery. Configuration of a MapReduce job requires specifying the input source, output destination, and the map and reduce functions. Hadoop's ecosystem, including YARN for resource management and HDFS for storage, supports the efficient execution of these jobs. Additionally, the content discusses the use of combiners for local data aggregation, the importance of partitioners and comparators for data distribution and sorting, and the use of the DistributedCache for sharing read-only files across the cluster.

Opinions

  • The author suggests that MapReduce is essential for manipulating large data sets, highlighting its effectiveness in parallel processing.
  • Hadoop's MapReduce is praised for its ability to process enormous amounts of data reliably and fault-tolerantly on large clusters of commodity hardware.
  • The use of combiners is recommended to reduce the amount of data transferred from mappers to reducers, which can lead to significant performance improvements.
  • The default HashPartitioner is mentioned as sufficient for most use cases, but the possibility of using custom partitioners is highlighted for more complex scenarios.
  • The content implies that while Java is the implementation language for Hadoop, MapReduce applications can be written in other languages, such as Python, using packages like mrjob.
  • The author emphasizes the importance of understanding the MapReduce framework's configuration parameters and environment variables for efficient job execution and troubleshooting.
  • It is noted that Hadoop comes with a library of mappers, reducers, and partitioners that can be leveraged for common use cases, suggesting a robust and well-equipped ecosystem for distributed data processing.

The Why and How of MapReduce

When do I need to use MapReduce? How can I translate my jobs to Map, Combiner, and Reducer?

Photo by Brooke Lark on Unsplash

MapReduce is a programming technique for manipulating large data sets, whereas Hadoop MapReduce is a specific implementation of this programming technique.

Following is how the process looks in general:

Map(s) (for individual chunk of input) ->
     - sorting individual map outputs -> 
Combiner(s) (for each individual map output) ->
     - shuffle and partition for distribution to reducers ->
     - sorting individual reducer input ->
Reducer(s) (for sorted data of group of partitions)

Hadoop’s MapReduce In General

Hadoop MapReduce is a framework to write applications that process enormous amounts of data (multi-terabyte) in-parallel on large clusters (thousands of nodes) of commodity hardware in a reliable, fault-tolerant manner.

A typical MapReduce job:

  • splits the input data-set into independent data sets
  • each individual dataset is processed in parallel by the map tasks
  • then the framework sorts the outputs of maps,
  • this output is then used as input to the reduce tasks

In general, both the input and the output of the job are stored in a file-system.

The Hadoop MapReduce framework takes care of scheduling tasks, monitoring them, and re-execution of the failed tasks.

Generally, the Hadoop’s MapReduce framework and Hadoop Distribution File System (HDFS) run on the same nodes, which means that each node is used for computing and storage both. The benefit of such a configuration is that tasks can be scheduled on the nodes where data resides and thus results in high aggregated bandwidth across the cluster.

The MapReduce framework consists of:

  • a single master ResourceManager (Hadoop YARN),
  • one worker NodeManager per cluster-node, and
  • MRAppMaster per application

The resource manager keeps track of compute resources, assigns them to specific tasks, and schedules jobs across the cluster.

In order to configure a MapReduce job, at minimum, an application specifies:

  • input source and output destination
  • map and reduce function

A job along with its configuration is then submitted by the Hadoop’s job client to YARN, which is then responsible for distributing it across the cluster, schedules tasks, monitors them, and provides their status back to the job client.

Although the Hadoop framework is implemented in Java, MapReduce applications need not be written in Java. We can use de>mrjobs Python package to write MapReduce jobs that can be run on Hadoop or AWS.

Inputs And Outputs Of MapReduce Jobs

For both input and output, the data is stored in key-value pairs. Each key and value class has to be serializable by the MapReduce framework and thus should implement the de>Writable interface. Apart from this, the key class needs to implement de>WritableComparable the interface as well which is required for sort mechanism.

The Mapper

A de>Mapper is a task which input key/value pairs to a set of output key/value pairs (which are then used by further steps). The output records do not need to be of the same type as that of input records, also an input pair may be mapped to zero or more output pairs.

All values associated with a given output key are subsequently grouped by the framework and passed to the Reducer(s) to determine the final output. The Mapper outputs are sorted and then partitioned per Reducer. The total number of partitions is the same as the number of reduce tasks for the job.

Shuffle & Sort Phases

The output of individual mapper output is sorted by the framework.

Before feeding data to reducers, the data from all mappers is partitioned by some grouping of keys. Each partition contains data from one or more keys. The data for each partition is sorted by keys. The partitions are then distributed to reducers. Each reducer input data is data from one or more partitions (generally 1:1 ratio).

The Reducer

A de>Reducer reduces a set of intermediate values (output of shuffle and sort phase) which share a key to a smaller set of values.

In the reducer phase, the reduce method is called for each <key, (list of values)> pair in the grouped inputs. Note that the output of Reducer is not sorted.

The right number of reducers are generally between 0.95 and 1.75 multiplied by <no. of nodes> * <no. of maximum containers per node>.

The Combiner

We can optionally specify a Combiner (know as local-reducer) to perform local aggregation of the intermediate outputs, which helps to cut down the amount of data transferred from the the Mapper to Reducer. In many cases, the same reducer code can be used as a combiner as well.

Let’s look at a simple example

Let’s look at a simple example of counting word frequencies. Consider the following mapper.py file:

and following reducer.py file:

We can locally test them as:

But Wait! The previous implementation has a problem…

The previous mapper code is memory intensive as it has to maintain a dictionary of the frequency of all of the unique words present chunk of input for each individual mapper. If memory is a concern even for dict involving individual chunks of inputs, then the better way is to let it just print each word as we encounter (with its frequency as 1). For example,

Now, let’s use new mapper with the same input data:

If it helps, we can also use a combiner (same as reducer code) for local aggregation from each mapper output.

Details on Partitioner And Comparator

A de>Partitioner partitions the data, which it does by essentially partitioning the “key space”.

Partitioner controls the partitioning of the keys of the intermediate map-outputs. The key (or a subset of key) is used to derive the partition, typically by a hash function. The total number of partitions is the same as the number of reduce tasks for the job. Hence this controls which of the m reduce tasks the intermediate key (and hence the record) is sent to for reduction.

HashPartitioner is the default Partitioner. You would need to use different partitioner (or your custom one) if you need to partition data by multiple keys.

Hadoop has a library class, KeyFieldBasedPartitioner which allows the MapReduce framework to partition the map outputs based on certain key fields, not the whole keys. For example, -D mapreduce.partition.keypartitioner.options=-k1,2.

We can control the grouping by specifying a de>Comparator.

Hadoop has a library class, KeyFieldBasedComparator, that provides a subset of features provided by Unix/GNU sort function. For example, -D mapreduce.partition.keycomparator.options=-k2,2nr.

The DistributedCache

The DistributedCache, provided by the MapReduce framework, efficiently distributes large read-only files as cache needed by applications.

The Configuration Parameters

We can get job configuration options through environment variables. When we launch the MapReduce application, the framework will assign data to available workers. We can access this data from our scripts. For instance, if we are running a mapper, then we can access the information about the file and slides we are working on. Also, we can get the information on whether we are running a mapper or reducer, which can be important if we are running the same script on the map and reduce phase. We can also access task id within the map or reduce phase with the following environment variables: mapreduce_task_id, mapreduce_task_partition.

The Configured Parameters are localized in the job configuration for each task’s execution. During the execution of a streaming job, the names of the “mapred” parameters are transformed. The dots (.) becomes underscores (_). For example, mapreduce.job.id becomes mapreduce_job_id. In your code, use the parameter names with the underscores.

Updating status in streaming applications:

A streaming process can use the stderr to emit status information. To set a status reporter:status:<message> should be sent to stderr.

Bundled Mappers, Reducers, and Partitions

Hadoop MapReduce comes bundled with a library of generally useful mappers, reducers, and partitioners.

Here are some related interesting stories that you might find helpful:

Gain Access to Expert View — Subscribe to DDI Intel

Data Engineering
Data Science
Big Data
Hadoop
Mapreduce
Recommended from ReadMedium