OReilly – Hadoop The Definitive Guide (06-2009)

At the beginning: This is a long article to cover all I will learn from this book. Most information is from the book. No copy right for me!

Website Links:

Chapter 1: Meet Hadoop

RDBMS compared to MapReduce
RDBMS compared to MapReduce

Hadoop is a collection of related subprojects that fall under the umbrella of infrastructure for distributed computing.

  • Hadoop Common: The common utilities that support the other Hadoop subprojects.
  • Chukwa: A data collection system for managing large distributed systems.
  • HBase: A scalable, distributed database that supports structured data storage for large tables.
  • HDFS: A distributed file system that provides high throughput access to application data.
  • Hive: A data warehouse infrastructure that provides data summarization and ad hoc querying.
  • MapReduce: A software framework for distributed processing of large data sets on compute clusters.
  • Pig: A high-level data-flow language and execution framework for parallel computation.
  • ZooKeeper: A high-performance coordination service for distributed applications.
Hadoop subprojects
Hadoop subprojects

MapReduce is designed to run jobs that last minutes or hours on trusted, dedicated hardware running in a single data center with very high aggregate bandwidth interconnects.

Another related project: Mahout (scalable machine learning libraries)

Chapter 2: MapReduce

MapReduce works by breaking the processing into two phases: the map phase and the
reduce phase. Each phase has key-value pairs as input and output, the types of which
may be chosen by the programmer. The programmer also specifies two functions: the
map function and the reduce function.

MapReduce logical data flow
MapReduce logical data flow

Rather than use built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package.

At last you need a main class with a JobConf object forming the specification of the job. When we run this job on a Hadoop cluster, we will package the code into a JAR file (which Hadoop will distribute round the cluster). Rather than explicitly specify the name of the JAR file, we can pass a class in the JobConf constructor, which Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this class. (Note: you can install Eclipse plug-in to directly test your codes)

import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;

public class MaxTemperature {

public static void main(String[] args) throws IOException {

if (args.length != 2) {
System.err.println("Usage: MaxTemperature <input path> <output path>");
System.exit(-1);
}

JobConf conf = new JobConf(MaxTemperature.class);

conf.setJobName("Max temperature");

FileInputFormat.addInputPath(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);
conf.setReducerClass(MaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);
 }
}

Having constructed a JobConf object, we specify the input and output paths. An input path is specified by calling the static addInputPath() method on FileInputFormat, and it can be a single file, a directory (in which case, the input forms all the files in that directory), or a file pattern. As the name suggests, addInputPath() can be called more than once to use input from multiple paths.

The output path (of which there is only one) is specified by the static setOutput
Path() method on FileOutputFormat. It specifies a directory where the output files from the reducer functions are written. The directory shouldn’t exist before running the job, as Hadoop will complain and not run the job. This precaution is to prevent data loss (it can be very annoying to accidentally overwrite the output of a long job with another).

The setOutputKeyClass() and setOutputValueClass() methods control the output types for the map and the reduce functions, which are often the same, as they are in our case. If they are different, then the map output types can be set using the methods setMapOutputKeyClass() and setMapOutputValueClass().

HADOOP_CLASSPATH might be useful if you run on standalone machine.

Release 0.20.0 of Hadoop included a new Java MapReduce API, sometimes referred to as “Context Objects,” designed to make the API easier to evolve in the future. The new API is type-incompatible with the old, however, so applications need to be rewritten to take advantage of it.

The program might have 0, 1 or more reduce function(s). The data flow could involve shuffle.

MapReduce data flow with a single reduce task
MapReduce data flow with a single reduce task

MapReduce data flow with multiple=

MapReduce data flow with no reduce tasks
MapReduce data flow with no reduce tasks

Hadoop allows theuser to specify a combiner function to be run on the map output—the combiner function’soutput forms the input to the reduce function. Since the combiner function is an optimization, Hadoop does not provide a guarantee of how many times it will call itfor a particular map output record, if at all. In other words, calling the combiner functionzero, one, or many times should produce the same output from the reducer.

CHAPTER 3: The Hadoop Distributed Filesystem

HDFS, which stands for HadoopDistributed Filesystem, is a filesystem designed for storing very large files with streaming data access patterns (write-once, read-many-times pattern), running on clusters on commodity hardware.

These are areas where HDFS is not a good fit today:

  • Low-latency data access: HBase is currently a better choice for low-latency access.
  • Lots of small files
  • Multiple writers, arbitrary file modifications

A HDFS cluster has two types of node operating in a master-worker pattern: a namenode (the master) and a number of datanodes (workers).  A client accesses the filesystem on behalf of the user by communicating with the namenode and datanodes.

It is important to make the namenode resilient to failure, and Hadoop provides two mechanisms for this:

  • back up the files that make up the persistent state of the filesystem metadata
  • to run a secondary namenode
A client reading data from HDFS
A client reading data from HDFS
A client writing data to HDFS
A client writing data to HDFS

Chapter 4: Hadoop I/O

Hadoop provides checksum for data integrity.

File compression brings two major benefits: it reduces the space needed to store files,and it speeds up data transfer across the network, or to or from disk. For performance, it is preferable to use a native library (in the lib/native directory) for compression and decompression.

Serialization is the process of turning structured objects into a byte stream for transmissionover a network or for writing to persistent storage. Deserialization is the process of turning a byte stream back into a series of structured object. Hadoop uses its own serialization format, Writables, which is certainly compact and fast (but not so easy to extend, or use from languages other than Java).

Writable class hierarchy
Writable class hierarchy

Chapter 5: Developing a MapReduce Application

Chapter 6: How MapReduce Works

How Hadoop runs a MapReduce job
How Hadoop runs a MapReduce job
  • The client, which submits the MapReduce job.
  • The jobtracker, which coordinates the job run. The jobtracker is a Java applicationwhose main class is JobTracker.
  • The tasktrackers, which run the tasks that the job has been split into. Tasktrackersare Java applications whose main class is TaskTracker.
  • The distributed filesystem (normally HDFS, covered in Chapter 3), which is usedfor sharing job files between the other entities.

The process by which the system performs the sort—and transfers the map outputs to thereducers as inputs—is known as the shuffle. In many ways, the shuffle is the heart of MapReduce, and is where the “magic” happens.

Shuffle and sort in MapReduce
Shuffle and sort in MapReduce

Chapter 7: MapReduce Types and Formats

The map and reduce functions in Hadoop MapReduce have the following general form:
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

In general, the map input key and value types (K1 and V1) are different from the map output types ( K2 and V2). However, the reduce input must have the same types as the
map output, although the reduce output types may be different again (K3 and V3).

If a combine function is used then it is the same form as the reduce function (and is an
implementation of Reducer), except its output types are the intermediate key and value
types (K2 and V2), so they can feed the reduce function:
map: (K1, V1) → list(K2, V2)
combine: (K2, list(V2)) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

Often the combine and reduce functions are the same, in which case, K3 is the same as
K2, and V3 is the same as V2.

The partition function operates on the intermediate key and value types (K2 and V2),
and returns the partition index. In practice, the partition is determined solely by the
key (the value is ignored):
partition: (K2, V2) → integer

Configuration of MapReduce types
Configuration of MapReduce types
InputFormat class hierarchy
InputFormat class hierarchy

DBInputFormat is an input format for reading data from a relational database, using
JDBC. Because it doesn’t have any sharding capabilities, you need to be careful not to
overwhelm the database you are reading from by running too many mappers. For this
reason, it is best used for loading relatively small datasets, perhaps for joining with
larger datasets from HDFS, using MultipleInputs. The corresponding output format is
DBOutputFormat, which is useful for dumping job outputs (of modest size) into a
database.

Instructions for how to use these formats are provided in “Database Access with Hadoop,” http://www.cloudera.com/blog/2009/03/06/database-access-with-hadoop/, by Aaron Kimball.

OutputFormat class hierarchy
OutputFormat class hierarchy

Chapter 8: MapReduce Features

Counters are a useful channel for gathering statistics about the job. Counters are global: the MapReduce framework aggregatesthem across all maps and reduces to produce a grand total at the end of the job.

The ability to sort data is at the heart of MapReduce. Even if your application isn’t concerned with sorting per se, it may be able to use the sorting stage that MapReduceprovides to organize its data.

MapReduce can perform joins between large datasets, but writing the code to do joinsfrom scratch is fairly involved. Rather than writing MapReduce programs, you mightconsider using a higher-level framework such as Pig, Hive, or Cascading, in which joinoperations are a core part of the implementation.

Side data can be defined as extra read-only data needed by a job to process the maindataset. The challenge is to make side data available to all the map or reduce tasks(which are spread across the cluster) in a convenient and efficient fashion.

Hadoop comes with a library of mappers and reducers for commonly used functions.

MapReduce library classes
MapReduce library classes

Chapter 9:

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s