java.io.FileNotFoundException using Distributed Cache with Eclipse plug-in

Sometime, it is very useful to distribute some file cross nodes for a task. A classical case is JOIN with a small size metadata file. The file can be local or HDFS. 

If you use this Java snippet: DistributedCache.addCacheFile(new URI(“/model/conf/txn_header”), conf); We assume the file is at HDFS, but actually it will look at local file system and generate java.io.FileNotFoundException if you use Eclipse plug-in (no problem with Hadoop command line). To solve  this, please add Hadoop configuration into job config:

conf.addResource(new Path(“/usr/local/hadoop/conf/core-site.xml”));
conf.addResource(new Path(“/usr/local/hadoop/conf/hdfs-site.xml”));

Reference: http://blog.rajeevsharma.in/2009/06/using-hdfs-in-java-0200.html

setPartitionerClass, setOutputKeyComparatorClass and setOutputValueGroupingComparator

Partitioner decides which mapper output goes to which reduer based on mapper output key. In general, different key is in different group (Iterator at the reducer side). But sometimes, we want different key is in the same group. This is the time for Output Value Grouping Comparator, which  is used to group mapper output. For easy understanding, think this is the group by condition in SQL. I will give a detail example for time serial analysis later. Output Key Comparator is used during sort stage for the mapper output key.

The above looks pretty straight forward. But there is one thing to remember:  if you use setOutputValueGroupingComparator, all the key in the same group at reducer side will be same now even they are not the same at the mapper output.

You can download the example from: https://www.assembla.com/spaces/autofei_public/documents

  • record.txt is the input (three fields, year, an random number, place)
  • MaxTemperatureUsingSecondarySort.java is the main hadoop code
  • IntPair.java is the mapper output key object
  • output.txt is the output

You will notice that number for the same year is the same now, the max one.

Note: the code is modified from book “Hadoop The Tefinitive Guide”

A typical error in writing reducer code

Suppose the input is:

00403D91436B76D22DDD88ACEAA41FB4
00403D91436B76D22DDD88ACEAA41FB4        s1
00403DA1239B66B92BD91FFF0EC2DC3F        s1|s3
00403DC1F314463D904A0C03C9714743

The reducer is to output the first column which is not unique.

#!/usr/bin/perl -w

use strict;

# loop vars
my $key     = "";
my $cur_key = "";

# key and vals
my $uid;
my $tag;
my $count = 0;

sub onBeginKey( ) {
$cur_key = $key;
$count   = 0;
}

sub onSameKey( ) {
$count++;
}

sub onEndKey( ) {
if ( $count == 2 ) {
printf STDOUT "%s\t%s\n", $cur_key, $tag;
}
}

while ( my $line = <STDIN> ) {
chomp($line);

my @fields      = split( /\t/, $line );
my $fields_size = scalar @fields;

$key = $fields[0];
if ( $fields_size == 2 && $fields[1]) {
$tag = $fields[1];
}


if ($cur_key) {
if ( $key ne $cur_key ) {
&onEndKey();
&onBeginKey();
}
&onSameKey();
}
else {
&onBeginKey();
&onSameKey();
}
}
if ($cur_key) {
&onEndKey();
}

The correct one is:

#!/usr/bin/perl -w

use strict;

# loop vars
my $key     = "";
my $cur_key = "";
my $o_tag = "";

# key and vals
my $uid;
my $tag;
my $count = 0;

sub onBeginKey( ) {
$cur_key = $key;
$count   = 0;
}

sub onSameKey( ) {
$count++;
if($tag){ $o_tag = $tag;}
}

sub onEndKey( ) {
if ( $count == 2 && $o_tag) {
printf STDOUT "%s\t%s\n", $cur_key, $o_tag;
}
}

while ( my $line = <STDIN> ) {
chomp($line);

my @fields      = split( /\t/, $line );
my $fields_size = scalar @fields;

$key = $fields[0];
if ( $fields_size == 2 && $fields[1]) {
$tag = $fields[1];
}
else{
$tag = "";
}

if ($cur_key) {
if ( $key ne $cur_key ) {
&onEndKey();
&onBeginKey();
}
&onSameKey();
}
else {
&onBeginKey();
&onSameKey();
}
}
if ($cur_key) {
&onEndKey();
}

Note: The code is Perl. But the logic is same for Java.

Time series processing using Hadoop

Intuitively to me, time series data is like apache web log or stock price data. Obviously, it is very interesting to analyze them using Hadoop. “In statistics, signal processing, econometrics and mathematical finance, a time series is a sequence of data points, measured typically at successive times spaced at uniform time intervals.”[1]

In the implementation of [2], there are two version: using MR sort and secondary sort to keep all data times in order or using Priority Queue to sort all data points. Obviously, the first approach is more scalable.So I will analyze the first one in details.

hadoop@fxu-t60:~/workspace/TimeSeries/src/tv/floe/caduceus/hadoop/movingaverage$ tree
.
├── CompositeKeyComparator.java
├── MovingAverageJob.java
├── MovingAverageMapper.java
├── MovingAverageReducer.java
├── NaturalKeyGroupingComparator.java
├── NaturalKeyPartitioner.java
├── NoShuffleSort_MovingAverageJob.java
├── NoShuffleSort_MovingAverageMapper.java
├── NoShuffleSort_MovingAverageReducer.java
├── SlidingWindow.java
├── tests
│   ├── TestPreSortHeap.java
│   ├── TestSlidingWindow.java
│   └── TestYahooStockDataPoint.java
├── TimeseriesDataPoint.java
├── TimeseriesKey.java
└── YahooStockDataPoint.java

1 directory, 16 files

The Mapper
the input is the normal yahoo stock record: “NYSE,AA,2008-02-03,38.85,39.28,38.26,38.37,11279900,8.37”. Output key is “TimeseriesKey” type, which contains stock symbol and time-stamp.  Output value is “TimeseriesDataPoint” type, which contains stock time-stamp and stock price.
The Reducer
A secondary sort and a sliding window to produce a moving average are used. The window size is 30 days and window slides by one day. It use all data point inside the window to smooth the last data point.
The Job

......

conf.setMapOutputKeyClass(TimeseriesKey.class);
conf.setMapOutputValueClass(TimeseriesDataPoint.class);

conf.setMapperClass(MovingAverageMapper.class);
conf.setReducerClass(MovingAverageReducer.class);

conf.setPartitionerClass(NaturalKeyPartitioner.class);
conf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
conf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);

......

The partitioner: the key has stock symbol and the time-stamp, but is should be portioned by stock symbol only!

public class NaturalKeyPartitioner implements
Partitioner<TimeseriesKey, TimeseriesDataPoint> {

@Override
public int getPartition(TimeseriesKey key, TimeseriesDataPoint value,
int numPartitions) {
return Math.abs(key.getGroup().hashCode() * 127) % numPartitions;
}

@Override
public void configure(JobConf arg0) {
// TODO Auto-generated method stub

}
}

JobConf.setOutputKeyComparatorClass(Class): Output key of mapper is sorted by stock first then time, which make sure the date order in time series.

public class CompositeKeyComparator extends WritableComparator {

protected CompositeKeyComparator() {
super(TimeseriesKey.class, true);
}

@Override
public int compare(WritableComparable w1, WritableComparable w2) {

TimeseriesKey ip1 = (TimeseriesKey) w1;
TimeseriesKey ip2 = (TimeseriesKey) w2;

int cmp = ip1.getGroup().compareTo(ip2.getGroup());
if (cmp != 0) {
return cmp;
}

return ip1.getTimestamp() == ip2.getTimestamp() ? 0 : (ip1
.getTimestamp() < ip2.getTimestamp() ? -1 : 1);

}

}

JobConf.setOutputValueGroupingComparator(Class): secondary sort at reducer side to make sure stock symbol during shuffling.

public class NaturalKeyGroupingComparator extends WritableComparator {

protected NaturalKeyGroupingComparator() {
super(TimeseriesKey.class, true);
}

@Override
public int compare(WritableComparable o1, WritableComparable o2) {

TimeseriesKey tsK1 = (TimeseriesKey) o1;
TimeseriesKey tsK2 = (TimeseriesKey) o2;

return tsK1.getGroup().compareTo(tsK2.getGroup());

}

}

At the end,  project “zohmg” [3] is used to store time series data. The paper [5] is very interesting also.

Links:

  1. http://en.wikipedia.org/wiki/Time_series
  2. https://github.com/jpatanooga/Caduceus/tree/master/src/tv/floe/caduceus/hadoop/movingaverage
  3. https://github.com/zohmg/zohmg/blob/master/README
  4. http://hadoop.apache.org/common/docs/r0.20.2/mapred_tutorial.html
  5. www-scf.usc.edu/~selinach/segmentation-slides.pdf

Inner product of two sparse vectors using MapReduce streaming

Given two vectors, X = [x1, x2, …] and Y = [y1, y2, …], their inner product is Z = x1 * y1 + x2 * y2 + … .
Two vectors are usually given in a sparse representation:
Vector One

1,0.4
3,0.6
7,0.6
4,0.7
10,0.9
12,0.8
20,0.1

Vector Two

1,0.4
3,0.1
6,0.6
4,0.7
10,0.9
12,0.2
21,0.1

Mapper streaming code in Perl:

#!/usr/bin/perl

while($line=<STDIN>){
@fields = split(/,/, $line);
if($fields[0] && $fields[1]){
print "$fields[0]\t$fields[1]";
}
}

Reducer streaming code in Perl:

#!/usr/bin/perl

$lastKey="";
$product=1;
$count=0;

while($line=<STDIN>){
@fields=split(/\t/, $line);
$key = $fields[0];
$value = $fields[1];
if($lastKey ne "" && $key ne $lastKey){
if($count==2){
print "$lastKey\t$product\n";
}
$product=$value;
$lastKey=$key;
$count=1;
}
else{
$product=$product*$value;
$lastKey=$key;
$count++;
}
}
#the last key
if($count==2){
print "$lastKey\t$product\n";
}

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar  -D mapred.reducer.tasks=1 -input /home/hadoop/vectors -output /home/hadoop/vectors-output -mapper "VectorProduct.pl" -file ~/Desktop/HadoopInAction/VectorProduct.pl -reducer "VectorProductReducer.pl" -file ~/Desktop/HadoopInAction/VectorProductReducer.pl

Output

1    0.16
10    0.81
12    0.16
3    0.06
4    0.49

One more post-processing code is needed to do the sum of all values. Or use this reducer code to get the sum directly:

#!/usr/bin/perl

$lastKey="";
$product=1;
$count=0;
$sum=0;

while($line=<STDIN>){
@fields=split(/\t/, $line);
$key = $fields[0];
$value = $fields[1];
if($lastKey ne "" && $key ne $lastKey){
if($count==2){
$sum=$sum+$value;
}
$product=$value;
$lastKey=$key;
$count=1;
}
else{
$product=$product*$value;
$lastKey=$key;
$count++;
}
}
#the last key
if($count==2){
$sum=$sum+$value;
}

print $sum;

Here is only showing how to do product of two vectors. If there are more than two, giving each vector an ID might solve the problem with little modification. This is actually the numerator to compute the  Cosine similarity.

Web traffic measurement using MapReduce streaming

Question: “Take a web server log file and write a Streaming program with the Aggregate package to find the hourly traffic to that site.”

I will use “Aggregate package” with Python and Perl streaming. Input log file is like:

fcrawler.looksmart.com - - [26/Apr/2000:00:00:12 -0400] "GET /contacts.html HTTP/1.0" 200 4595 "-" "FAST-WebCrawler/2.1-pre2 (ashen@looksmart.net)"
#!/usr/bin/python

import sys
import re

# find 2007:04:54:20
my_regex = r'[0-9]{4}(?:\:[0-9]{2}){3}'

for line in sys.stdin:
logtime = re.findall(my_regex, line)
if len(logtime) == 1:
fields = logtime[0].split(":")
print "LongValueSum:" + fields[1] + "\t" + "1"

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input /home/hadoop/weblog -output /home/hadoop/weblog-output -mapper 'WebTraffic.py' -file ~/Desktop/HadoopInAction/WebTraffic.py -reducer aggregate
#!/usr/bin/perl

while($line = <STDIN>){
$line =~ m/(\d{4}(?:\:\d{2}){3})/;

if($1){
@fields = split(/:/, $1);
print "LongValueSum:$fields[1]\t1\n";
}
}

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input /home/hadoop/weblog -output /home/hadoop/weblog-output-pl -mapper 'WebTraffic.pl' -file ~/Desktop/HadoopInAction/WebTraffic.pl -reducer aggregate

DistributedCache incompleted guide

DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications.” Here are some user cases: you can cache file like “stopword” when process text file in information retrieval or cache the smaller file during semi-join of files.

But thing is quite tricky now for different version of Hadoop. I test 0.18.3, 0.21.2 and 0.21.0 to provide some piratical guide.

Hadoop 0.18.3

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class checker extends Configured implements Tool {
 public static String localFile = "/home/hadoop/Desktop/data/TLD/google-ip-valid.txt";
 public static String hdfsFile = "/home/hadoop/googleBL/google-ip-valid.txt";

 public static class MapClass extends MapReduceBase implements
 Mapper<LongWritable, Text, Text, NullWritable> {

 private HashSet<String> blIP = new HashSet<String>();

 static String ValidIpAddressRegex = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";

public void configure(JobConf job) {
 System.out.println("inside configure()");
 // blIP.add("98.202.86.206");
 try {
 Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);

 if (cacheFiles != null && cacheFiles.length > 0) {
 System.out.println("Inside setup(): "
 + cacheFiles[0].toString());

 String line;

 BufferedReader cacheReader = new BufferedReader(
 new FileReader(cacheFiles[0].toString()));
 try {
 while ((line = cacheReader.readLine()) != null) {
 blIP.add(line.trim());
 }
 } finally {
 cacheReader.close();
 }
 }
 } catch (IOException e) {
 System.err.println("Exception reading DistribtuedCache: " + e);
 }
 }

 public void map(LongWritable key, Text value,
 OutputCollector<Text, NullWritable> output, Reporter reporter)
 throws IOException {
 String[] fields = value.toString().split(" ");
 if (fields.length == 3) {

 String last = fields[2].trim();
 System.out.println(value + " : " + last);
 if (last.matches(ValidIpAddressRegex) && blIP.contains(last)) {
 output.collect(new Text(value), NullWritable.get());
 }
 }
 }
 }

 public int run(String[] args) throws Exception {
 Configuration conf = getConf();

 JobConf job = new JobConf(conf, checker.class);

 // Path cacheFile = new Path(args[2]);
 // DistributedCache.addCacheFile(cacheFile.toUri(), job);

 this.cacheFile(job);

 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(NullWritable.class);

 job.setMapperClass(MapClass.class);
 job.setNumReduceTasks(1);
 // job.setReducerClass(Reduce.class);

 job.setInputFormat(TextInputFormat.class);
 job.setOutputFormat(TextOutputFormat.class);

 Path in = new Path(args[0]);
 Path out = new Path(args[1]);
 FileInputFormat.addInputPath(job, in);
 FileOutputFormat.setOutputPath(job, out);

 JobClient.runJob(job);

 return 0;
 }

 void cacheFile(JobConf conf) throws IOException {
 FileSystem fs = FileSystem.get(conf);
 Path hdfsPath = new Path(hdfsFile);

 // upload the file to hdfs. Overwrite any existing copy.
 fs.copyFromLocalFile(false, true, new Path(localFile), hdfsPath);

 DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
 }

 /**
 * The main method.
 *
 * @param args
 *            the arguments
 * @throws Exception
 *             the exception
 */
 public static void main(String[] args) throws Exception {
 String in = "/home/hadoop/testTLD";
 String out = "/home/hadoop/tld-ip-bl";

 if (args.length == 0) {
 args = new String[2];
 args[0] = in;
 args[1] = out;
 }

 int res = ToolRunner.run(new Configuration(), new checker(), args);

 System.exit(res);
 }
}

Hadoop 0.20.2

According to Reference 1,  “DistributedCache is not changed to use new api in branch 0.20. The change is done in only from branch 0.21. See MAPREDUCE-898 ( https://issues.apache.org/jira/browse/MAPREDUCE-898).”

But according to my test, even the code running successfully at 0.18.3 (previous one) still can not run at the 0.20.2. I guess we need to apply the patch.

Hadoop 0.21.0 (not working)

Note: A new Cluster class and the  DistributedCache and Job are deprecated! I have not figured out why it did not work!

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountNew {

 public static class TokenizerMapper extends
 Mapper<Object, Text, Text, IntWritable> {

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public static HashSet<String> stopWord = new HashSet<String>();

 protected void setup(Context context) throws IOException,
 InterruptedException {
 try {
 URI[] uris = context.getCacheFiles();
 System.out.println("Inside setup(): " + uris[0].toString());

 if (uris != null && uris.length > 0) {
 String line;

 BufferedReader cacheReader = new BufferedReader(
 new FileReader(uris[0].toString()));
 try {
 while ((line = cacheReader.readLine()) != null) {
 stopWord.add(line.trim());
 }
 } finally {
 cacheReader.close();
 }
 }
 } catch (Exception e) {
 e.printStackTrace();
 }

 }

 public void map(Object key, Text value, Context context)
 throws IOException, InterruptedException {
 StringTokenizer itr = new StringTokenizer(value.toString());
 while (itr.hasMoreTokens()) {
 String token = itr.nextToken();

 // check the stop word here!
 if (stopWord.contains(token.trim().toLowerCase()))
 continue;
 else {
 word.set(token);
 context.write(word, one);
 }

 }
 }
 }

 public static class IntSumReducer extends
 Reducer<Text, IntWritable, Text, IntWritable> {
 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable<IntWritable> values,
 Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
 sum += val.get();
 }
 result.set(sum);
 context.write(key, result);
 }
 }

 public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

 Cluster cluster = new Cluster(conf);
 Job job = Job.getInstance(cluster);

 String path = "/user/hadoop/WordCount-cache/words";
 job.addCacheFile(new URI(path));

 job.setJarByClass(WordCountNew.class);
 job.setMapperClass(TokenizerMapper.class);
 job.setCombinerClass(IntSumReducer.class);
 job.setReducerClass(IntSumReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);

 String in = "/user/hadoop/WordCount-input";
 String out = "/user/hadoop/WordCount-output";

 FileInputFormat.addInputPath(job, new Path(in));
 FileOutputFormat.setOutputPath(job, new Path(out));

 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Reference:

  1. http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html#a722187