Monday, December 9, 2013

Use R in Hadoop by streaming

It seems that the combination of R and Hadoop is a must-have toolkit for people working with both statistics and large data set.

An aggregation example

The Hadoop version used here is Cloudera’s CDH4, and the underlying Linux OS is CentOS 6. The data used is a simulated sales data set form a training course by Udacity. Format of each line of the data set is: date, time, store name, item description, cost and method of payment. The six fields are separated by tab. Only two fields, store and cost, are used to aggregate the cost by each store.
A typical MapReduce job contains two R scripts: Mapper.R and reducer.R.
Mapper.R
# Use batch mode under R (don't use the path like /usr/bin/R)  
#! /usr/bin/env Rscript

options(warn=-1)

# We need to input tab-separated file and output tab-separated file

input = file("stdin", "r")
while(length(currentLine = readLines(input, n=1, warn=FALSE)) > 0) {
fields = unlist(strsplit(currentLine, "\t"))
# Make sure the line has six fields
if (length(fields)==6) {
cat(fields[3], fields[5], "\n", sep="\t")
}
}
close(input)
Reducer.R
#! /usr/bin/env Rscript  

options(warn=-1)
salesTotal = 0
oldKey = ""

# Loop around the data by the formats such as key-val pair
input = file("stdin", "r")
while(length(currentLine = readLines(input, n=1, warn=FALSE)) > 0) {
data_mapped = unlist(strsplit(currentLine, "\t"))
if (length(data_mapped) != 2) {
# Something has gone wrong. However, we can do nothing.
continue
}

thisKey = data_mapped[1]
thisSale = as.double(data_mapped[2])

if (!identical(oldKey, "") && !identical(oldKey, thisKey)) {
cat(oldKey, salesTotal, "\n", sep="\t")
oldKey = thisKey
salesTotal = 0
}

oldKey = thisKey
salesTotal = salesTotal + thisSale
}

if (!identical(oldKey, "")) {
cat(oldKey, salesTotal, "\n", sep="\t")
}

close(input)

Testing

Before running MapReduce, it is better to test the codes by some linux commands.
# Make R scripts executable   
chmod w+x mapper.R
chmod w+x reducer.R
ls -l

# Strip out a small file to test
head -500 purchases.txt > test1.txt
cat test1.txt | ./mapper.R | sort | ./reducer.R

Execution

One way is to specify all the paths and therefore start the expected MapReduce job.
hadoop jar /usr/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming-2.0.0-mr1-cdh4.1.1.jar   
-mapper mapper.R –reducer reducer.R
–file mapper.R –file reducer.R
-input myinput
-output joboutput
Or we can use the alias under CDH4, which saves a lot of typing.
hs mapper.R reducer.R myinput joboutput
Overall, the MapReduce job driven by R is performed smoothly. The Hadoop JobTracker can be used to monitor or diagnose the overall process.

Rhadoop or streaming?

RHadoop is a package developed under Revolution Alytics, which allows the users to apply MapReduce job directly in R and is surely a much more popular way to integrate R and Hadoop. However, this package currently undergoes fast evolution and requires complicated dependency. As an alternative, the functionality of streaming is embedded with Hadoop, and supports all programming languages including R. If the proper installation of RHadoop poses a challenge, then streaming is a good starting point.

No comments:

Post a Comment