Time series processing using Hadoop

Intuitively to me, time series data is like apache web log or stock price data. Obviously, it is very interesting to analyze them using Hadoop. “In statistics, signal processing, econometrics and mathematical finance, a time series is a sequence of data points, measured typically at successive times spaced at uniform time intervals.”[1]

In the implementation of [2], there are two version: using MR sort and secondary sort to keep all data times in order or using Priority Queue to sort all data points. Obviously, the first approach is more scalable.So I will analyze the first one in details.

hadoop@fxu-t60:~/workspace/TimeSeries/src/tv/floe/caduceus/hadoop/movingaverage$ tree
.
├── CompositeKeyComparator.java
├── MovingAverageJob.java
├── MovingAverageMapper.java
├── MovingAverageReducer.java
├── NaturalKeyGroupingComparator.java
├── NaturalKeyPartitioner.java
├── NoShuffleSort_MovingAverageJob.java
├── NoShuffleSort_MovingAverageMapper.java
├── NoShuffleSort_MovingAverageReducer.java
├── SlidingWindow.java
├── tests
│   ├── TestPreSortHeap.java
│   ├── TestSlidingWindow.java
│   └── TestYahooStockDataPoint.java
├── TimeseriesDataPoint.java
├── TimeseriesKey.java
└── YahooStockDataPoint.java

1 directory, 16 files

The Mapper
the input is the normal yahoo stock record: “NYSE,AA,2008-02-03,38.85,39.28,38.26,38.37,11279900,8.37”. Output key is “TimeseriesKey” type, which contains stock symbol and time-stamp.  Output value is “TimeseriesDataPoint” type, which contains stock time-stamp and stock price.
The Reducer
A secondary sort and a sliding window to produce a moving average are used. The window size is 30 days and window slides by one day. It use all data point inside the window to smooth the last data point.
The Job

......

conf.setMapOutputKeyClass(TimeseriesKey.class);
conf.setMapOutputValueClass(TimeseriesDataPoint.class);

conf.setMapperClass(MovingAverageMapper.class);
conf.setReducerClass(MovingAverageReducer.class);

conf.setPartitionerClass(NaturalKeyPartitioner.class);
conf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
conf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);

......

The partitioner: the key has stock symbol and the time-stamp, but is should be portioned by stock symbol only!

public class NaturalKeyPartitioner implements
Partitioner<TimeseriesKey, TimeseriesDataPoint> {

@Override
public int getPartition(TimeseriesKey key, TimeseriesDataPoint value,
int numPartitions) {
return Math.abs(key.getGroup().hashCode() * 127) % numPartitions;
}

@Override
public void configure(JobConf arg0) {
// TODO Auto-generated method stub

}
}

JobConf.setOutputKeyComparatorClass(Class): Output key of mapper is sorted by stock first then time, which make sure the date order in time series.

public class CompositeKeyComparator extends WritableComparator {

protected CompositeKeyComparator() {
super(TimeseriesKey.class, true);
}

@Override
public int compare(WritableComparable w1, WritableComparable w2) {

TimeseriesKey ip1 = (TimeseriesKey) w1;
TimeseriesKey ip2 = (TimeseriesKey) w2;

int cmp = ip1.getGroup().compareTo(ip2.getGroup());
if (cmp != 0) {
return cmp;
}

return ip1.getTimestamp() == ip2.getTimestamp() ? 0 : (ip1
.getTimestamp() < ip2.getTimestamp() ? -1 : 1);

}

}

JobConf.setOutputValueGroupingComparator(Class): secondary sort at reducer side to make sure stock symbol during shuffling.

public class NaturalKeyGroupingComparator extends WritableComparator {

protected NaturalKeyGroupingComparator() {
super(TimeseriesKey.class, true);
}

@Override
public int compare(WritableComparable o1, WritableComparable o2) {

TimeseriesKey tsK1 = (TimeseriesKey) o1;
TimeseriesKey tsK2 = (TimeseriesKey) o2;

return tsK1.getGroup().compareTo(tsK2.getGroup());

}

}

At the end,  project “zohmg” [3] is used to store time series data. The paper [5] is very interesting also.

Links:

  1. http://en.wikipedia.org/wiki/Time_series
  2. https://github.com/jpatanooga/Caduceus/tree/master/src/tv/floe/caduceus/hadoop/movingaverage
  3. https://github.com/zohmg/zohmg/blob/master/README
  4. http://hadoop.apache.org/common/docs/r0.20.2/mapred_tutorial.html
  5. www-scf.usc.edu/~selinach/segmentation-slides.pdf

Inner product of two sparse vectors using MapReduce streaming

Given two vectors, X = [x1, x2, …] and Y = [y1, y2, …], their inner product is Z = x1 * y1 + x2 * y2 + … .
Two vectors are usually given in a sparse representation:
Vector One

1,0.4
3,0.6
7,0.6
4,0.7
10,0.9
12,0.8
20,0.1

Vector Two

1,0.4
3,0.1
6,0.6
4,0.7
10,0.9
12,0.2
21,0.1

Mapper streaming code in Perl:

#!/usr/bin/perl

while($line=<STDIN>){
@fields = split(/,/, $line);
if($fields[0] && $fields[1]){
print "$fields[0]\t$fields[1]";
}
}

Reducer streaming code in Perl:

#!/usr/bin/perl

$lastKey="";
$product=1;
$count=0;

while($line=<STDIN>){
@fields=split(/\t/, $line);
$key = $fields[0];
$value = $fields[1];
if($lastKey ne "" && $key ne $lastKey){
if($count==2){
print "$lastKey\t$product\n";
}
$product=$value;
$lastKey=$key;
$count=1;
}
else{
$product=$product*$value;
$lastKey=$key;
$count++;
}
}
#the last key
if($count==2){
print "$lastKey\t$product\n";
}

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar  -D mapred.reducer.tasks=1 -input /home/hadoop/vectors -output /home/hadoop/vectors-output -mapper "VectorProduct.pl" -file ~/Desktop/HadoopInAction/VectorProduct.pl -reducer "VectorProductReducer.pl" -file ~/Desktop/HadoopInAction/VectorProductReducer.pl

Output

1    0.16
10    0.81
12    0.16
3    0.06
4    0.49

One more post-processing code is needed to do the sum of all values. Or use this reducer code to get the sum directly:

#!/usr/bin/perl

$lastKey="";
$product=1;
$count=0;
$sum=0;

while($line=<STDIN>){
@fields=split(/\t/, $line);
$key = $fields[0];
$value = $fields[1];
if($lastKey ne "" && $key ne $lastKey){
if($count==2){
$sum=$sum+$value;
}
$product=$value;
$lastKey=$key;
$count=1;
}
else{
$product=$product*$value;
$lastKey=$key;
$count++;
}
}
#the last key
if($count==2){
$sum=$sum+$value;
}

print $sum;

Here is only showing how to do product of two vectors. If there are more than two, giving each vector an ID might solve the problem with little modification. This is actually the numerator to compute the  Cosine similarity.

Web traffic measurement using MapReduce streaming

Question: “Take a web server log file and write a Streaming program with the Aggregate package to find the hourly traffic to that site.”

I will use “Aggregate package” with Python and Perl streaming. Input log file is like:

fcrawler.looksmart.com - - [26/Apr/2000:00:00:12 -0400] "GET /contacts.html HTTP/1.0" 200 4595 "-" "FAST-WebCrawler/2.1-pre2 (ashen@looksmart.net)"
#!/usr/bin/python

import sys
import re

# find 2007:04:54:20
my_regex = r'[0-9]{4}(?:\:[0-9]{2}){3}'

for line in sys.stdin:
logtime = re.findall(my_regex, line)
if len(logtime) == 1:
fields = logtime[0].split(":")
print "LongValueSum:" + fields[1] + "\t" + "1"

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input /home/hadoop/weblog -output /home/hadoop/weblog-output -mapper 'WebTraffic.py' -file ~/Desktop/HadoopInAction/WebTraffic.py -reducer aggregate
#!/usr/bin/perl

while($line = <STDIN>){
$line =~ m/(\d{4}(?:\:\d{2}){3})/;

if($1){
@fields = split(/:/, $1);
print "LongValueSum:$fields[1]\t1\n";
}
}

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -input /home/hadoop/weblog -output /home/hadoop/weblog-output-pl -mapper 'WebTraffic.pl' -file ~/Desktop/HadoopInAction/WebTraffic.pl -reducer aggregate

Useful Linux Commands

sort

sort -T . (use currently directory instead system tmp)

sort -t ”     ” -k3,3nr -k4,4n  input.txt > output.txt (sort the third numerical column in descending order and the forth numerical column in ascending order)

awk

awk '{s+=$1} END {print s}' mydatafile (sum number, one per line)

du

du –si * (show total file usage in human readable way)

ln

ln -f -s target_directory symbolic_link_directory (This will update existing link without removing old one.)

Uniq command

After sorting a file you will often find that some duplicate data, or you may be given various lists that need deduping. sort and uniq will quickly and easily remove duplicates, lsit only the dupilcates or only the unique data.

sort myfile.txt | uniq

List only the unique lines: sort myfile.txt | uniq -u

List only the duplicate lines: sort myfile.txt | uniq -d

Get a count of the number of lines by adding the -c option.

sort myfile.txt | uniq -uc

sort myfile.txt | uniq -dc

Skip fields: uniq -f 3 mylogfile. this could be useful with log files to skip the time stamp data

Skip characters. uniq -s 30 myfile.txt. Skip the first 30 characters

Compare characters. uniq -w 30 myfile.txt. Compare the first 30 characters

Generate SSH Key

  • ssh-keygen -t rsa -b 4096

Read RPM

  • rpm -qlp rpm-name

Search directory

  • find / -name directory-name -type d

Change to lower case

  • $ tr '[:upper:]' '[:lower:]' < input.txt > output.txt

Screen command

  • Ctrl+a c (new window)
  • Ctrl+a n (select the window)
  • Ctrl+a d (detach the session)
  • screen -ls (list session)
  • screen -r session name (attach to a detach session)
  • screen -rD session name (attach to an active session)

Read RPM content

  • rpm -qlp rpm-name

Find a directory or a file

  • find / -name directory-name -type d
  • find / -name file-name -type f

How do I extract a gz file?

  • $ gunzip file.gz
  • $ gzip -d file.gz

How do I extract a tar.gz or .tgz file?

  • $ gunzip < file.tar.gz | tar xvf –
  • $ gunzip < file.tgz | tar xvf –
  • $ tar xvzf file.tar.gz
  • $ tar xvzf file.tgz

Processing the delimited files

  • cut -d: -f1,3  /etc/passwd
  • awk -F’:’   ‘{print $1,$3}’   /etc/passwd

Copy files from remote machine

  • rcp -rp username@hostname(or IP):/path localPath (r: recursive, p: preserve)
  • scp is similar and based on SSH

Check HDFS system

  • bin/hadoop dfsadmin -report
    Configured Capacity: 603258257408 (561.83 GB)
    Present Capacity: 455640743936 (424.35 GB)
    DFS Remaining: 455623647232 (424.33 GB)
    DFS Used: 17096704 (16.3 MB)
    DFS Used%: 0%
    Under replicated blocks: 0
    Blocks with corrupt replicas: 0
    Missing blocks: 0
    
    -------------------------------------------------
    Datanodes available: 2 (2 total, 0 dead)
    
    Name: 192.168.241.22:50010
    Decommission Status : Normal
    Configured Capacity: 452167245824 (421.11 GB)
    DFS Used: 8548352 (8.15 MB)
    Non DFS Used: 30800699392 (28.69 GB)
    DFS Remaining: 421357998080(392.42 GB)
    DFS Used%: 0%
    DFS Remaining%: 93.19%
    Last contact: Thu Apr 21 09:59:01 PDT 2011
    
    Name: 192.168.241.23:50010
    Decommission Status : Normal
    Configured Capacity: 151091011584 (140.71 GB)
    DFS Used: 8548352 (8.15 MB)
    Non DFS Used: 116816814080 (108.79 GB)
    DFS Remaining: 34265649152(31.91 GB)
    DFS Used%: 0.01%
    DFS Remaining%: 22.68%
    Last contact: Thu Apr 21 09:59:02 PDT 2011
    

Others

DistributedCache incompleted guide

DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications.” Here are some user cases: you can cache file like “stopword” when process text file in information retrieval or cache the smaller file during semi-join of files.

But thing is quite tricky now for different version of Hadoop. I test 0.18.3, 0.21.2 and 0.21.0 to provide some piratical guide.

Hadoop 0.18.3

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class checker extends Configured implements Tool {
 public static String localFile = "/home/hadoop/Desktop/data/TLD/google-ip-valid.txt";
 public static String hdfsFile = "/home/hadoop/googleBL/google-ip-valid.txt";

 public static class MapClass extends MapReduceBase implements
 Mapper<LongWritable, Text, Text, NullWritable> {

 private HashSet<String> blIP = new HashSet<String>();

 static String ValidIpAddressRegex = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";

public void configure(JobConf job) {
 System.out.println("inside configure()");
 // blIP.add("98.202.86.206");
 try {
 Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);

 if (cacheFiles != null && cacheFiles.length > 0) {
 System.out.println("Inside setup(): "
 + cacheFiles[0].toString());

 String line;

 BufferedReader cacheReader = new BufferedReader(
 new FileReader(cacheFiles[0].toString()));
 try {
 while ((line = cacheReader.readLine()) != null) {
 blIP.add(line.trim());
 }
 } finally {
 cacheReader.close();
 }
 }
 } catch (IOException e) {
 System.err.println("Exception reading DistribtuedCache: " + e);
 }
 }

 public void map(LongWritable key, Text value,
 OutputCollector<Text, NullWritable> output, Reporter reporter)
 throws IOException {
 String[] fields = value.toString().split(" ");
 if (fields.length == 3) {

 String last = fields[2].trim();
 System.out.println(value + " : " + last);
 if (last.matches(ValidIpAddressRegex) && blIP.contains(last)) {
 output.collect(new Text(value), NullWritable.get());
 }
 }
 }
 }

 public int run(String[] args) throws Exception {
 Configuration conf = getConf();

 JobConf job = new JobConf(conf, checker.class);

 // Path cacheFile = new Path(args[2]);
 // DistributedCache.addCacheFile(cacheFile.toUri(), job);

 this.cacheFile(job);

 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(NullWritable.class);

 job.setMapperClass(MapClass.class);
 job.setNumReduceTasks(1);
 // job.setReducerClass(Reduce.class);

 job.setInputFormat(TextInputFormat.class);
 job.setOutputFormat(TextOutputFormat.class);

 Path in = new Path(args[0]);
 Path out = new Path(args[1]);
 FileInputFormat.addInputPath(job, in);
 FileOutputFormat.setOutputPath(job, out);

 JobClient.runJob(job);

 return 0;
 }

 void cacheFile(JobConf conf) throws IOException {
 FileSystem fs = FileSystem.get(conf);
 Path hdfsPath = new Path(hdfsFile);

 // upload the file to hdfs. Overwrite any existing copy.
 fs.copyFromLocalFile(false, true, new Path(localFile), hdfsPath);

 DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
 }

 /**
 * The main method.
 *
 * @param args
 *            the arguments
 * @throws Exception
 *             the exception
 */
 public static void main(String[] args) throws Exception {
 String in = "/home/hadoop/testTLD";
 String out = "/home/hadoop/tld-ip-bl";

 if (args.length == 0) {
 args = new String[2];
 args[0] = in;
 args[1] = out;
 }

 int res = ToolRunner.run(new Configuration(), new checker(), args);

 System.exit(res);
 }
}

Hadoop 0.20.2

According to Reference 1,  “DistributedCache is not changed to use new api in branch 0.20. The change is done in only from branch 0.21. See MAPREDUCE-898 ( https://issues.apache.org/jira/browse/MAPREDUCE-898).”

But according to my test, even the code running successfully at 0.18.3 (previous one) still can not run at the 0.20.2. I guess we need to apply the patch.

Hadoop 0.21.0 (not working)

Note: A new Cluster class and the  DistributedCache and Job are deprecated! I have not figured out why it did not work!

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountNew {

 public static class TokenizerMapper extends
 Mapper<Object, Text, Text, IntWritable> {

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public static HashSet<String> stopWord = new HashSet<String>();

 protected void setup(Context context) throws IOException,
 InterruptedException {
 try {
 URI[] uris = context.getCacheFiles();
 System.out.println("Inside setup(): " + uris[0].toString());

 if (uris != null && uris.length > 0) {
 String line;

 BufferedReader cacheReader = new BufferedReader(
 new FileReader(uris[0].toString()));
 try {
 while ((line = cacheReader.readLine()) != null) {
 stopWord.add(line.trim());
 }
 } finally {
 cacheReader.close();
 }
 }
 } catch (Exception e) {
 e.printStackTrace();
 }

 }

 public void map(Object key, Text value, Context context)
 throws IOException, InterruptedException {
 StringTokenizer itr = new StringTokenizer(value.toString());
 while (itr.hasMoreTokens()) {
 String token = itr.nextToken();

 // check the stop word here!
 if (stopWord.contains(token.trim().toLowerCase()))
 continue;
 else {
 word.set(token);
 context.write(word, one);
 }

 }
 }
 }

 public static class IntSumReducer extends
 Reducer<Text, IntWritable, Text, IntWritable> {
 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable<IntWritable> values,
 Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
 sum += val.get();
 }
 result.set(sum);
 context.write(key, result);
 }
 }

 public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

 Cluster cluster = new Cluster(conf);
 Job job = Job.getInstance(cluster);

 String path = "/user/hadoop/WordCount-cache/words";
 job.addCacheFile(new URI(path));

 job.setJarByClass(WordCountNew.class);
 job.setMapperClass(TokenizerMapper.class);
 job.setCombinerClass(IntSumReducer.class);
 job.setReducerClass(IntSumReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);

 String in = "/user/hadoop/WordCount-input";
 String out = "/user/hadoop/WordCount-output";

 FileInputFormat.addInputPath(job, new Path(in));
 FileOutputFormat.setOutputPath(job, new Path(out));

 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Reference:

  1. http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html#a722187

Install Hadoop 0.18.3, 0.20.2 and 0.21.0 into the same Linux box

I am using Ubuntu 10.04 LTS – the Lucid Lynx now and for test purpose it is necessary to run the code under different version. More specifically,  0.18 is the stable version for old API; 0.20 is the stable version for new API and 0.21 is the latest test version (0.22 will be the stable version). One of my is related to Distributed Cache, which is quite interesting for different version (Please refer to another post for more details).

To install the three instances of Hadoop is similar to install one. So please read this post first: Install Hadoop

All three instances are located at /usr/local

/usr/local/
├── hadoop (for 0.20.2)
├── hadoop-0.18.3
├── hadoop-0.21.0

HDFS directories layout is:

/home/hadoop/datastore/
├── 018
├── 020
└── 021

Note: You can only run one instance even it listens on different ports.

hadoop@fxu-t60:/usr/local/hadoop-0.18.3$ bin/start-all.sh
starting namenode, logging to /usr/local/hadoop-0.18.3/bin/../logs/hadoop-hadoop-namenode-fxu-t60.out
localhost: starting datanode, logging to /usr/local/hadoop-0.18.3/bin/../logs/hadoop-hadoop-datanode-fxu-t60.out
localhost: starting secondarynamenode, logging to /usr/local/hadoop-0.18.3/bin/../logs/hadoop-hadoop-secondarynamenode-fxu-t60.out
starting jobtracker, logging to /usr/local/hadoop-0.18.3/bin/../logs/hadoop-hadoop-jobtracker-fxu-t60.out
localhost: starting tasktracker, logging to /usr/local/hadoop-0.18.3/bin/../logs/hadoop-hadoop-tasktracker-fxu-t60.out
hadoop@fxu-t60:/usr/local/hadoop-0.18.3$ jps
7873 NameNode
8294 JobTracker
8541 Jps
8472 TaskTracker
4911 org.eclipse.equinox.launcher_1.0.201.R35x_v20090715.jar
8051 DataNode
8222 SecondaryNameNode
5261

hadoop@fxu-t60:/usr/local/hadoop$ bin/start-all.sh
namenode running as process 7873. Stop it first.
localhost: datanode running as process 8051. Stop it first.
localhost: secondarynamenode running as process 8222. Stop it first.
jobtracker running as process 8294. Stop it first.
localhost: tasktracker running as process 8472. Stop it first.

Extract IP address from record using MapReduce Streaming

The goal is to extract IP address from input files. The script reads each line and uses regular expression to extract IP address.

#!/usr/bin/python

import re
import sys

# Use "(?:" to suppress capturing parenethesis.
# Use "\." to match a dot (suppress regex meaning of ".")
# "{3}": Look for 3 iterations (3 iterations of a dot followed by a number).
my_regex = r'[0-9]+(?:\.[0-9]+){3}'

for line in sys.stdin:
 IPs = re.findall(my_regex, line)
 if len(IPs) == 1:
 print IPs[0]

Run the code:

hadoop@fxu-t60:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-0.20.2-streaming.jar -D mapred.reduce.tasks=1 -input /home/hadoop/google -output /home/hadoop/google-ip -mapper 'googleIP.py' -file ~/Desktop/data/TLD/googleIP.py