[Image source] |
Monday, August 5, 2013
Reservoir Sampling in MapReduce
By had00b
12:36 PM
hadoop,
hadoop streaming,
mapreduce,
python,
random subset,
reservoir sampling,
sampling
415 comments
Saturday, August 3, 2013
Setup Apache Hadoop on your machine (single-node cluster)
Let's get your machine ready for some big data crunching! Installing Apache Hadoop on a single machine is very simple. Of course, the purpose of installing Hadoop on your machine is mainly for learning, developing and debugging. For production, you will want to deploy Hadoop in fully distributed mode on a cluster of machines. The fully distributed mode is not in the scope of this post.
Read More...
Thursday, August 1, 2013
MapReduce: a gentle introduction with examples
By had00b
11:49 AM
distributed file system,
hadoop,
hdfs,
mapreduce,
matrix transpose,
word count
33 comments
A brief history of MapReduce and Hadoop.
MapReduce is a programming framework for distributed processing originally developed at Google in 2004. The original paper by Jeffrey Dean and Sanjay Ghemawat describes the programming model and underlying system. The reason that led to the development of MapReduce was the fact that engineers at Google found themselves repeatedly solving the same problems (such as inverted indices, graph structure representations, set of most frequent queries) with ad-hoc distributed computations running on several machines. MapReduce was designed as a framework to express such computations and meanwhile automatically take care of parallelization, fault-tolerance, data distribution and load balancing.In the recent years, the MapReduce framework has gained wide adoption and has become the de facto standard for large scale data analysis. A major effort that favored such a quick adoption was the development of Hadoop, an open-source implementation of the MapReduce framework (the MapReduce implementation at Google was, and still is, a proprietary technology). Hadoop was created by Doug Cutting in 2004-05, who originally saw the potential of the MapReduce framework to aid the development of Apache Nutch, an open-source web search engine he and his team had started building in 2002. In 2006, Doug Cutting was hired by Yahoo! and Hadoop was moved out of Nutch and formed an independent project. Since 2008, many companies started adopting Hadoop and several Hadoop distributions spawned (Cloudera, MapR, HortonWorks, EMC Pivotal HD to name a few). Amazon's EC2 compute cloud and similar services offer users the possibility to rent a cluster and run MapReduce computation.
MapReduce as we know it today is composed of the MapReduce programming framework along with a distributed file-system. HDFS (the Hadoop Distributed File System) is the Hadoop implementation of such a file-system. As Hadoop came as an open-source implementation of MapReduce, likewise HDFS (firstly known as NDFS, aka Nutch Distributed File System) provided an open-source implementation of GFS, the distributed file-system used in production at Google, whose architecture was described in the 2003 paper The Google File System. Other distributed file-systems compatible with MapReduce exist such as Quantcast FS (QFS, previously CloudStore and KosmosFS) and MapR-FS.
MapReduce concepts
We now briefly discuss concepts that are at the core of MapReduce: the underlying hardware (i.e. the cluster of machines), the distributed file system, and the actual MapReduce programming framework.Physical organization of machines
Our usual view of computation was that of a single (possibly multi-processor) machine. The recent explosion in the amount of data available to analyze called for a lot more computational power. Cluster computing is a horizontal solution to this need: more computational capacity is generated by networking a large number of commodity low-end servers (as opposed to vertical solutions of purchasing more expensive, special-purpose machines with many processors). In a typical configuration, the machines (usually called nodes) of a cluster are physically stored on racks, each rack allocating 8-64 machines. The nodes on a rack are usually networked via gigabit Ethernet, and for large clusters with multiple racks, racks are connected by a switch or some other level of network.Failures of a single node are quite common in clusters -- for large clusters, even loss of an entire rack. Considering the goal of a cluster is to run intensive computations reading data from several machines, an obvious need is that the failure of a node doesn't cause the abortion and restart of the whole computation. In order to guarantee this, it must be the case that
- files are stored redundantly -- this justifies the need of a distributed file system
- computation is split into parallel, independent tasks (if a task dies, we only want to re-execute that task) -- the MapReduce programming model and framework take care of exactly this aspect
The Distributed File System
Distributed file systems manage the storage across a network (cluster) of machines. Note that the network aspect creates several complications: the problem we mentioned about tolerating node failures without data loss calls for data replication among different nodes. Data redundancy in turn yields synchronization problems when we want to update a file.Distributed file systems such as HDFS and GFS are designed for a scenario where files are huge (in the order of gigabytes and terabytes), and are pretty much static. The data is read many times and possibly additional data is written into new large files or appended to existing files. In this scenario we don't usually expect random access to a file, rather we expect reads of entire files.
The distributed file system stores the files divided into large chunks, which are typically 64 or 128 megabytes, to amortize seek time with large reads. Chunks are replicated, typically on three different machines, with two of them belonging to different racks in the case of large clusters. In order to find the chunks of a file, the distributed file system adopts a master–slave architecture: the master (called namenode in HDFS) maintains a namespace file containing the filesystem tree and the metadata (the locations of the chunks) of each file. This information is replicated as well, typically to a remote NFS mount. The slaves (called datanodes in HDFS) are distributed among the machines and manage the actual data chunks.
The MapReduce programming model
In the MapReduce programming paradigm, the basic unit of information is a (key, value) pair. The computation on an input (i.e. on a set of pairs) occur in three stages: the map stage, the shuffle stage and the reduce stage. Semantically, the map and shuffle phases distribute the data, and the reduce phase performs the computation.Map
In the map stage, the mapper takes as input a single (key, value) pair and produces as output any number of (key, value) pairs. It is important to think of the map operation as stateless, that is its logic operates on a single pair at a time (even if in practice several input pairs are delivered to the same mapper). The reason why is advised not to combine more input pairs for the computation of the output pairs is that the input and the computation is distributed, so we don't know a priori which and how many input pairs a mapper receives. To summarize, for the map phase, the user simply designs a map function that maps an input (key, value) pair to any number (even none) of output pairs. Most of the time, the map phase is simply used to specify the desired location of the input value by changing its key.Shuffle
The shuffle stage is automatically handled by the MapReduce framework, i.e. the engineer has nothing to do for this stage. The underlying system implementing MapReduce routes all of the values that are associated with an individual key to the same reducer.Reduce
In the reduce stage, the reducer takes all of the values associated with a single key k and outputs any number of (key, value) pairs. This highlights one of the sequential aspects of MapReduce computation: all of the maps need to finish before the reduce stage can begin. Since the reducer has access to all the values with the same key, it can perform sequential computations on these values. In the reduce step, the parallelism is exploited by observing that reducers operating on different keys can be executed simultaneously. To summarize, for the reduce phase, the user designs a function that takes in input a list of values associated with a single key and outputs any number of pairs. Often the output keys of a reducer equal the input key (in fact, in the original MapReduce paper the output key must equal to the input key, but Hadoop relaxed this constraint).Overall, a program in the MapReduce paradigm can consist of many rounds (usually called jobs) of different map and reduce functions, performed sequentially one after another.
Examples
We will go through a series of simple examples to help understanding how to solve problems in MapReduce. It is helpful to think the input data lying unordered in a cloud, with the output of the reduce being stored again unordered to such cloud. Also, it is useful to think of the mapper phase as a way to distribute the data (where to "map" the input), and the reduce phase as a way to perform computation on (its portion of the) data (how to "reduce" the data). There are cases in which it is convenient to depart from these guidelines, but in general they provide a good starting point.In the following examples, the key of the map input pairs is irrelevant and the value represents the actual input. Think of the input being stored in one or multiple files distributed across the machines in the cluster, with each line of these files representing (the value of) an input pair. Each input pair is read by a mapper, running on some machine of the cluster (usually, on a machine where that line physically resides).
Word count
Word count is the prototypical example of MapReduce, it's the "Hello World" of MapReduce. Imagine you have a bunch of text files in your cluster and would like to count the overall number of occurrences of each word. We assume each value in input is the line of one of the files.The standard solution is the following: the map will tokenize each input value (i.e. each line) into words, outputting a pair (word, 1) for each word; the reducer associated with a word will receive all the occurrences of a word and can count them and return the total number of occurrences.
- Map: For each input value v: for each word w obtained by tokenizing v, output the key-value pair (w, 1)
- Shuffle: For each key (word) w produced by any of the Map tasks, there will be one or more key-value pairs (w,1). Then, for key w, the shuffle will produce (w, [1, 1,..., 1]) and present this as input to a reducer.
- Reduce: the reducer associated with key (word) w turns (w, [1, 1, ..., 1]) into (w, n), where n is the sum of the values. Note that the reduce phase produces exactly one output pair for each key w.
Removing duplicates
We basically would like to implement the distinct operator: the desired output should contain all of the input values, but values that appear multiple times in input, should appear only once in output.The solution is simpler than that for word count: we use the value itself as the map output key; the reducer associated with a specific key, can then return a single output.
- Map: For each input value v, output the key-value pair (v, v)
- Shuffle: For each key v produced by any of the Map tasks, there will be one or more key-value pairs (v,v). Then, for key v, the shuffle will produce (v, [v, v, ..., v]) and present this as input to a reducer.
- Reduce: the reducer for key v turns (v, [v, v, ..., v]) into v, so it produces exactly one output v for this key v.
Matrix transpose
Given a sparse matrix in row major order, we would like to output the same matrix in column major order. In other words, each input value encodes a single row of the matrix and we would like the output to be composed of the columns of the matrix. If we view the input matrix as the adjacency list of a directed graph, this is equivalent to compute the graph obtained by reversing the edges.Assume each input value has the form $(i, [(j_1,v_{i,j_1}), (j_2,v_{i,j_2}), \ldots, (j_t,v_{i,j_t})])$, meaning that row $i$ of the matrix has value $v_{i,j_1}$ on column $j_1$, value $v_{i,j_2}$ on column $j_2$, and so on (with value zero on all missing columns). We will solve the problem as follows: the map phase distributes the values by column and the reducer associated with a column will output the whole column.
- Map: For each input $(i, [(j_1,v_{i,j_1}), (j_2,v_{i,j_2}), \ldots, (j_t,v_{i,j_t})])$, output the key-value pairs $(j_1, (i, v_{i,j_1})), (j_2, (i, v_{i,j_2})), \ldots, (j_t, (i, v_{i,j_t}))$
- Shuffle: For each key (column) $j$, the shuffle will produce $(j, [(i_1, v_{i_1,j}), (i_2, v_{i_2,j}), \ldots, (i_{\ell}, v_{i_{\ell},j})])$ and present it as input to a reducer.
- Reduce: the reducer for key (column) $j$ returns its whole input as a single output $(j, [(i_1, v_{i_1,j}), (i_2, v_{i_2,j}), \ldots, (i_{\ell}, v_{i_{\ell},j})])$. This exactly corresponds to column $j$ of the matrix.
Followers of Followers
We are given a directed graph where an edge $(u,v)$ indicates that $u$ follows $v$. The graph is stored as an unordered list of edges, i.e. each input corresponds to a single edge. We want to find the second hub of each user $u$, that is the set of users that follow a follower of $u$.The key insight for solving this problem is that the in-neighbors of a node $u$ are in the 2nd hub of each of $u$'s out-neighbors. We will distribute the edges in such a way the reducer associated with a node $u$ receives all the in-neighbors and out-neighbors of $u$ and can distinguish between the two.
- Map: For each input edge $(u,v)$, output the two key-value pairs $(u,(v, OUT))$ and $(v, (u,IN))$. This "says" to $u$ that $v$ is one of its out-neighbors and to $v$ that $u$ is one of its in-neighbors.
- Shuffle: For each key (node) $u$, the shuffle will produce $(u, [(v_1, a_1), (v_2, a_2), \ldots, (v_t, a_t)])$, where $a_i$ is either $IN$ or $OUT$; $v_i$ is an in-neighbor of $u$ if $a_i=IN$ and is an out-neighbor of $u$ if $a_i=OUT$. Note that the shuffle basically compiles the neighborhood of each node.
- Reduce: the reducer for key (node) $u$ takes in input the whole neighborhood of $u$ and therefore knows all in-neighbors and out-neighbors of $u$. For every node $w$ in-neighbor of $u$, and for every node $v$ out-neighbor of $u$, we generate the output pair $(v,w)$, to indicate that $w$ is in the second hub of $v$.
[If you're interested in follower-related problems, you can find a 2009 snapshot of follow relationships of Twitter here.]
Key ideas in MapReduce
Below are some of MapReduce key features.- Horizontal scaling: choosing a large number of commodity low-end machines over a small number of high-end machines.
- Move code to data, rather than data to code: in a MapReduce cluster, storage and processors are co-located. MapReduce exploits data locality by running the code on the machine storing the block of data needed to be read.
- Sequential reads, no random access: MapReduce is designed for batch processing, milking on the important fact that sequential reads of a file are orders of magnitude faster than random access. The important metric being optimized by MapReduce is throughput rather than latency.
- Developer only Map and Reduce aware: you don't need to know all of MapReduce intricacies and system-level details to run a program in MapReduce. The MapReduce abstraction is simple to understand and provides a clean interface for algorithm designers and developers.
References
- Hadoop: the definitive guide, by Tom White
- Data-Intensive Text Processing with MapReduce, by Jimmy Lin and Chris Dyer
- Mining of Massive Datasets, by Anand Rajaraman, Jure Leskovec, and Jeff Ullman
- To install Hadoop, follow the instructions in our tutorial