Time series processing using Hadoop

Intuitively to me, time series data is like apache web log or stock price data. Obviously, it is very interesting to analyze them using Hadoop. “In statistics, signal processing, econometrics and mathematical finance, a time series is a sequence of data points, measured typically at successive times spaced at uniform time intervals.”[1]

In the implementation of [2], there are two version: using MR sort and secondary sort to keep all data times in order or using Priority Queue to sort all data points. Obviously, the first approach is more scalable.So I will analyze the first one in details.

hadoop@fxu-t60:~/workspace/TimeSeries/src/tv/floe/caduceus/hadoop/movingaverage$ tree
.
├── CompositeKeyComparator.java
├── MovingAverageJob.java
├── MovingAverageMapper.java
├── MovingAverageReducer.java
├── NaturalKeyGroupingComparator.java
├── NaturalKeyPartitioner.java
├── NoShuffleSort_MovingAverageJob.java
├── NoShuffleSort_MovingAverageMapper.java
├── NoShuffleSort_MovingAverageReducer.java
├── SlidingWindow.java
├── tests
│   ├── TestPreSortHeap.java
│   ├── TestSlidingWindow.java
│   └── TestYahooStockDataPoint.java
├── TimeseriesDataPoint.java
├── TimeseriesKey.java
└── YahooStockDataPoint.java

1 directory, 16 files

The Mapper
the input is the normal yahoo stock record: “NYSE,AA,2008-02-03,38.85,39.28,38.26,38.37,11279900,8.37”. Output key is “TimeseriesKey” type, which contains stock symbol and time-stamp.  Output value is “TimeseriesDataPoint” type, which contains stock time-stamp and stock price.
The Reducer
A secondary sort and a sliding window to produce a moving average are used. The window size is 30 days and window slides by one day. It use all data point inside the window to smooth the last data point.
The Job

......

conf.setMapOutputKeyClass(TimeseriesKey.class);
conf.setMapOutputValueClass(TimeseriesDataPoint.class);

conf.setMapperClass(MovingAverageMapper.class);
conf.setReducerClass(MovingAverageReducer.class);

conf.setPartitionerClass(NaturalKeyPartitioner.class);
conf.setOutputKeyComparatorClass(CompositeKeyComparator.class);
conf.setOutputValueGroupingComparator(NaturalKeyGroupingComparator.class);

......

The partitioner: the key has stock symbol and the time-stamp, but is should be portioned by stock symbol only!

public class NaturalKeyPartitioner implements
Partitioner<TimeseriesKey, TimeseriesDataPoint> {

@Override
public int getPartition(TimeseriesKey key, TimeseriesDataPoint value,
int numPartitions) {
return Math.abs(key.getGroup().hashCode() * 127) % numPartitions;
}

@Override
public void configure(JobConf arg0) {
// TODO Auto-generated method stub

}
}

JobConf.setOutputKeyComparatorClass(Class): Output key of mapper is sorted by stock first then time, which make sure the date order in time series.

public class CompositeKeyComparator extends WritableComparator {

protected CompositeKeyComparator() {
super(TimeseriesKey.class, true);
}

@Override
public int compare(WritableComparable w1, WritableComparable w2) {

TimeseriesKey ip1 = (TimeseriesKey) w1;
TimeseriesKey ip2 = (TimeseriesKey) w2;

int cmp = ip1.getGroup().compareTo(ip2.getGroup());
if (cmp != 0) {
return cmp;
}

return ip1.getTimestamp() == ip2.getTimestamp() ? 0 : (ip1
.getTimestamp() < ip2.getTimestamp() ? -1 : 1);

}

}

JobConf.setOutputValueGroupingComparator(Class): secondary sort at reducer side to make sure stock symbol during shuffling.

public class NaturalKeyGroupingComparator extends WritableComparator {

protected NaturalKeyGroupingComparator() {
super(TimeseriesKey.class, true);
}

@Override
public int compare(WritableComparable o1, WritableComparable o2) {

TimeseriesKey tsK1 = (TimeseriesKey) o1;
TimeseriesKey tsK2 = (TimeseriesKey) o2;

return tsK1.getGroup().compareTo(tsK2.getGroup());

}

}

At the end,  project “zohmg” [3] is used to store time series data. The paper [5] is very interesting also.

Links:

  1. http://en.wikipedia.org/wiki/Time_series
  2. https://github.com/jpatanooga/Caduceus/tree/master/src/tv/floe/caduceus/hadoop/movingaverage
  3. https://github.com/zohmg/zohmg/blob/master/README
  4. http://hadoop.apache.org/common/docs/r0.20.2/mapred_tutorial.html
  5. www-scf.usc.edu/~selinach/segmentation-slides.pdf
Advertisements

3 thoughts on “Time series processing using Hadoop

  1. Hello,
    It was a nice post. I am also working with time series datasets but remote sensing data-such as time series of vegetation indeces for the entire earth. The dataset is probably 1Tb and I am trying some time series data mining on this.
    I wanted to ask if you have some good time series data mining algorithm in your mind which can be hadoopified??

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s