DistributedCache incompleted guide

DistributedCache is a facility provided by the Map-Reduce framework to cache files (text, archives, jars etc.) needed by applications.” Here are some user cases: you can cache file like “stopword” when process text file in information retrieval or cache the smaller file during semi-join of files.

But thing is quite tricky now for different version of Hadoop. I test 0.18.3, 0.21.2 and 0.21.0 to provide some piratical guide.

Hadoop 0.18.3

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashSet;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class checker extends Configured implements Tool {
 public static String localFile = "/home/hadoop/Desktop/data/TLD/google-ip-valid.txt";
 public static String hdfsFile = "/home/hadoop/googleBL/google-ip-valid.txt";

 public static class MapClass extends MapReduceBase implements
 Mapper<LongWritable, Text, Text, NullWritable> {

 private HashSet<String> blIP = new HashSet<String>();

 static String ValidIpAddressRegex = "^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])$";

public void configure(JobConf job) {
 System.out.println("inside configure()");
 // blIP.add("98.202.86.206");
 try {
 Path[] cacheFiles = DistributedCache.getLocalCacheFiles(job);

 if (cacheFiles != null && cacheFiles.length > 0) {
 System.out.println("Inside setup(): "
 + cacheFiles[0].toString());

 String line;

 BufferedReader cacheReader = new BufferedReader(
 new FileReader(cacheFiles[0].toString()));
 try {
 while ((line = cacheReader.readLine()) != null) {
 blIP.add(line.trim());
 }
 } finally {
 cacheReader.close();
 }
 }
 } catch (IOException e) {
 System.err.println("Exception reading DistribtuedCache: " + e);
 }
 }

 public void map(LongWritable key, Text value,
 OutputCollector<Text, NullWritable> output, Reporter reporter)
 throws IOException {
 String[] fields = value.toString().split(" ");
 if (fields.length == 3) {

 String last = fields[2].trim();
 System.out.println(value + " : " + last);
 if (last.matches(ValidIpAddressRegex) && blIP.contains(last)) {
 output.collect(new Text(value), NullWritable.get());
 }
 }
 }
 }

 public int run(String[] args) throws Exception {
 Configuration conf = getConf();

 JobConf job = new JobConf(conf, checker.class);

 // Path cacheFile = new Path(args[2]);
 // DistributedCache.addCacheFile(cacheFile.toUri(), job);

 this.cacheFile(job);

 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(NullWritable.class);

 job.setMapperClass(MapClass.class);
 job.setNumReduceTasks(1);
 // job.setReducerClass(Reduce.class);

 job.setInputFormat(TextInputFormat.class);
 job.setOutputFormat(TextOutputFormat.class);

 Path in = new Path(args[0]);
 Path out = new Path(args[1]);
 FileInputFormat.addInputPath(job, in);
 FileOutputFormat.setOutputPath(job, out);

 JobClient.runJob(job);

 return 0;
 }

 void cacheFile(JobConf conf) throws IOException {
 FileSystem fs = FileSystem.get(conf);
 Path hdfsPath = new Path(hdfsFile);

 // upload the file to hdfs. Overwrite any existing copy.
 fs.copyFromLocalFile(false, true, new Path(localFile), hdfsPath);

 DistributedCache.addCacheFile(hdfsPath.toUri(), conf);
 }

 /**
 * The main method.
 *
 * @param args
 *            the arguments
 * @throws Exception
 *             the exception
 */
 public static void main(String[] args) throws Exception {
 String in = "/home/hadoop/testTLD";
 String out = "/home/hadoop/tld-ip-bl";

 if (args.length == 0) {
 args = new String[2];
 args[0] = in;
 args[1] = out;
 }

 int res = ToolRunner.run(new Configuration(), new checker(), args);

 System.exit(res);
 }
}

Hadoop 0.20.2

According to Reference 1,  “DistributedCache is not changed to use new api in branch 0.20. The change is done in only from branch 0.21. See MAPREDUCE-898 ( https://issues.apache.org/jira/browse/MAPREDUCE-898).”

But according to my test, even the code running successfully at 0.18.3 (previous one) still can not run at the 0.20.2. I guess we need to apply the patch.

Hadoop 0.21.0 (not working)

Note: A new Cluster class and the  DistributedCache and Job are deprecated! I have not figured out why it did not work!

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCountNew {

 public static class TokenizerMapper extends
 Mapper<Object, Text, Text, IntWritable> {

 private final static IntWritable one = new IntWritable(1);
 private Text word = new Text();

 public static HashSet<String> stopWord = new HashSet<String>();

 protected void setup(Context context) throws IOException,
 InterruptedException {
 try {
 URI[] uris = context.getCacheFiles();
 System.out.println("Inside setup(): " + uris[0].toString());

 if (uris != null && uris.length > 0) {
 String line;

 BufferedReader cacheReader = new BufferedReader(
 new FileReader(uris[0].toString()));
 try {
 while ((line = cacheReader.readLine()) != null) {
 stopWord.add(line.trim());
 }
 } finally {
 cacheReader.close();
 }
 }
 } catch (Exception e) {
 e.printStackTrace();
 }

 }

 public void map(Object key, Text value, Context context)
 throws IOException, InterruptedException {
 StringTokenizer itr = new StringTokenizer(value.toString());
 while (itr.hasMoreTokens()) {
 String token = itr.nextToken();

 // check the stop word here!
 if (stopWord.contains(token.trim().toLowerCase()))
 continue;
 else {
 word.set(token);
 context.write(word, one);
 }

 }
 }
 }

 public static class IntSumReducer extends
 Reducer<Text, IntWritable, Text, IntWritable> {
 private IntWritable result = new IntWritable();

 public void reduce(Text key, Iterable<IntWritable> values,
 Context context) throws IOException, InterruptedException {
 int sum = 0;
 for (IntWritable val : values) {
 sum += val.get();
 }
 result.set(sum);
 context.write(key, result);
 }
 }

 public static void main(String[] args) throws Exception {
 Configuration conf = new Configuration();

 Cluster cluster = new Cluster(conf);
 Job job = Job.getInstance(cluster);

 String path = "/user/hadoop/WordCount-cache/words";
 job.addCacheFile(new URI(path));

 job.setJarByClass(WordCountNew.class);
 job.setMapperClass(TokenizerMapper.class);
 job.setCombinerClass(IntSumReducer.class);
 job.setReducerClass(IntSumReducer.class);
 job.setOutputKeyClass(Text.class);
 job.setOutputValueClass(IntWritable.class);

 String in = "/user/hadoop/WordCount-input";
 String out = "/user/hadoop/WordCount-output";

 FileInputFormat.addInputPath(job, new Path(in));
 FileOutputFormat.setOutputPath(job, new Path(out));

 System.exit(job.waitForCompletion(true) ? 0 : 1);
 }
}

Reference:

  1. http://lucene.472066.n3.nabble.com/Distributed-Cache-with-New-API-td722187.html#a722187
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s