Hadoop Ecosystem

7 years ago, the Hadoop ecosystem was under rapid development. Now lots of projects are mature enough and ready for production deployment.

hadoop-ecosystem
Credit to Mercy (Ponnupandy) Beckham

Here is my personal pick for you to get start your Hadoop journey.

“The Apache Hadoop software library is a framework that allows for the distributed processing of large data sets across clusters of computers using simple programming models. It is designed to scale up from single servers to thousands of machines, each offering local computation and storage. Rather than rely on hardware to deliver high-availability, the library itself is designed to detect and handle failures at the application layer, so delivering a highly-available service on top of a cluster of computers, each of which may be prone to failures.”

The project includes these modules:

Hadoop Common: The common utilities that support the other Hadoop modules.
Hadoop Distributed File System (HDFS™): A distributed file system that provides high-throughput access to application data.
Hadoop YARN: A framework for job scheduling and cluster resource management.
Hadoop MapReduce: A YARN-based system for parallel processing of large data sets.

  • YARN (Distributed Resource Management): Part of the core Hadoop project, YARN is the architectural center of Hadoop that allows multiple data processing engines such as interactive SQL, real-time streaming, data science and batch processing to handle data stored in a single platform, unlocking an entirely new approach to analytics. YARN is the foundation of the new generation of Hadoop and is enabling organizations everywhere to realize a modern data architecture.
  • Spark (Distributed Programming): Apache Spark™ is a fast and general engine for large-scale data processing. Spark provides a simple and expressive programming model that supports a wide range of applications, including ETL, machine learning, stream processing, and graph computation.
  • Tez (Distributed Programming): Apache™ Tez is an extensible framework for building high performance batch and interactive data processing applications, coordinated by YARN in Apache Hadoop. Tez improves the MapReduce paradigm by dramatically improving its speed, while maintaining MapReduce’s ability to scale to petabytes of data. Important Hadoop ecosystem projects like Apache Hive and Apache Pig use Apache Tez, as do a growing number of third party data access applications developed for the broader Hadoop ecosystem.
  • Hive (SQL-On-Hadoop): The Apache Hive ™ data warehouse software facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. Structure can be projected onto data already in storage. A command line tool and JDBC driver are provided to connect users to Hive.
  • Hbase (Column Data Model NoSQL): Apache HBase™ is the Hadoop database, a distributed, scalable, big data store.
  • Cassandra (Column Data Model NoSQL): The Apache Cassandra database is the right choice when you need scalability and high availability without compromising performance. Linear scalability and proven fault-tolerance on commodity hardware or cloud infrastructure make it the perfect platform for mission-critical data.Cassandra’s support for replicating across multiple datacenters is best-in-class, providing lower latency for your users and the peace of mind of knowing that you can survive regional outages.
  • MongoDB (Document Data Model NoSQL): MongoDB is a document database with the scalability and flexibility that you want with the querying and indexing that you need. MongoDB stores data in flexible, JSON-like documents.
  • Redis (Key-Value Data Model NoSQL): Redis is an open source (BSD licensed), in-memory data structure store, used as a database, cache and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs and geospatial indexes with radius queries. Redis has built-in replication, Lua scripting, LRU eviction, transactions and different levels of on-disk persistence, and provides high availability via Redis Sentinel and automatic partitioning with Redis Cluster.
  • Flume (Data Ingestion): Apache Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.
  • Sqoop (Data Ingestion): Apache Sqoop™ is a tool designed for efficiently transferring bulk data between Apache Hadoop and structured datastores such as relational databases.
  • Kafka (Data Ingestion): Kafka™ is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.
  • Thrift (Service Programming): The Apache Thrift software framework, for scalable cross-language services development, combines a software stack with a code generation engine to build services that work efficiently and seamlessly between C++, Java, Python, PHP, Ruby, Erlang, Perl, Haskell, C#, Cocoa, JavaScript, Node.js, Smalltalk, OCaml and Delphi and other languages.
  • ZooKeeper (Service Programming): A high-performance coordination service for distributed applications.
  • Mahout (Machine Learning): The Apache Mahout™ project’s goal is to build an environment for quickly creating scalable performant machine learning applications.
  • Oozie (Scheduling): Oozie is a workflow scheduler system to manage Apache Hadoop jobs.

Reference:

  1. http://hadoopecosystemtable.github.io
  2. https://mydataexperiments.com/2017/04/11/hadoop-ecosystem-a-quick-glance/

Enterprise Hadoop: The Ecosystem

Just finish the training “Designing and Building Big Data Applications” from Cloudera. In the last two years, the hadoop world becomes more mature. What surprises me is so many open source projects there, which also confuses me. If you plan to apply hadoop what’s the best technology stack should take?

Here is what Hartonwork’s view. It is a super fan of Apache projects, but I didn’t see a clean enterprise solution. The business model is more like a Debain Linux.

Screen Shot 2014-07-11 at 8.28.27 PM

 

How about Cloudera? It has CDH, especially the data hub edition. It is running a similar business model like Redhat. It utilizes Apache projects but add lots of enhancement. See Ref[5] for what’s in the latest CDH 5.0.3.

The hadoop world is still growing lighting fast and let’s see what will happen in next two years.

Pay attention to Spark

Reference:

  1. http://cloudera.com/content/cloudera/en/training/courses/big-data-applications-training.html
  2. http://www.cloudera.com/content/cloudera/en/products-and-services/cloudera-enterprise.html
  3. http://www.cloudera.com/content/cloudera/en/products-and-services/product-comparison.html
  4. http://hortonworks.com/hadoop/
  5. http://www.cloudera.com/content/cloudera-content/cloudera-docs/CDH5/latest/CDH-Version-and-Packaging-Information/cdhvd_cdh_package_tarball.html

java.lang.ClassNotFoundException at Map-Reduce Job

The problem is I have exported all third party libraries into the lib directory in the jar file, but I still receive java.lang.ClassNotFoundException. As we know, all libraries need to be distributed to all task nodes. In general, there are several ways to put the jar libraries:

  1. package into jar file
  2. use  “-libjars” command line option of the “hadoop jar “
  3. load libraries into nodes manually

The first method works well for manually installed Hadoop system, but failed at CDH4. My guess is the CDH has its own wrapper for Hadoop. So the simple solution is the second method: hadoop jar Model.jar hadoop.mr.ElementDriver -libjars lib/your_library.jar /model/input /model/output_element. Using “,” if you have more than one library.

Reference:

  1. http://blog.cloudera.com/blog/2011/01/how-to-include-third-party-libraries-in-your-map-reduce-job/

Migrate Java+MySQL to hadoop

The problem is:  we have an existing java project with the back end storage as MySQL. The data is too big to process in memory now. The solution is to use Hadoop.

How to in a nutshell:

  • load data from MySQL into Hive using Sqoop
  • perform JOIN on different tables using Hive
  • write Java code to process join result using Hadoop
  • analyze the output using other tools like R

HTablePool example in java

As the name suggests, the pool makes different Htable instances share resources like Zookeeper connection.
Suppose you have following table:

hbase(main):002:0> scan 'blogposts'
ROW                                       COLUMN+CELL
post1                                    column=image:bodyimage, timestamp=1333409506149, value=image2.jpg
post1                                    column=image:header, timestamp=1333409504678, value=image1.jpg
post1                                    column=post:author, timestamp=1333409504583, value=The Author
post1                                    column=post:body, timestamp=1333409504642, value=This is a blog post
post1                                    column=post:title, timestamp=1333409504496, value=Hello World
1 row(s) in 7.1920 seconds

Java Example Code:

import java.io.IOException;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.util.Bytes;

import org.junit.*;
import static org.junit.Assert.*;

public class HTablePoolTest {

protected static String TEST_TABLE_NAME = "blogposts";
protected static String ROW1_STR = "post1";
protected static String COLFAM1_STR = "image";
protected static String QUAL1_STR = "bodyimage";

private final static byte[] ROW1 = Bytes.toBytes(ROW1_STR);
private final static byte[] COLFAM1 = Bytes.toBytes(COLFAM1_STR);
private final static byte[] QUAL1 = Bytes.toBytes(QUAL1_STR);

private final static int MAX = 10;
private static HTablePool pool;

@Before
public void runBeforeClass() throws IOException {
Configuration conf = HBaseConfiguration.create();
pool = new HTablePool(conf, MAX);

HTableInterface[] tables = new HTableInterface[10];
for (int n = 0; n < MAX; n++) {
tables[n] = pool.getTable(TEST_TABLE_NAME);
}
for (HTableInterface table : tables) {
table.close();
}
}

@Test
public void testHTablePool() throws IOException, InterruptedException,
ExecutionException {

Callable<Result> callable = new Callable<Result>() {
public Result call() throws Exception {
return get();
}
};

FutureTask<Result> task1 = new FutureTask<Result>(callable);

FutureTask<Result> task2 = new FutureTask<Result>(callable);

Thread thread1 = new Thread(task1, "THREAD-1");
thread1.start();
Thread thread2 = new Thread(task2, "THREAD-2");
thread2.start();

Result result1 = task1.get();
System.out.println("Thread1: "
+ Bytes.toString(result1.getValue(COLFAM1, QUAL1)));
assertEquals(Bytes.toString(result1.getValue(COLFAM1, QUAL1)),
"image2.jpg");

Result result2 = task2.get();
System.out.println("Thread2: "
+ Bytes.toString(result2.getValue(COLFAM1, QUAL1)));
assertEquals(Bytes.toString(result2.getValue(COLFAM1, QUAL1)),
"image2.jpg");
}

private Result get() {
HTableInterface table = pool.getTable(TEST_TABLE_NAME);
Get get = new Get(ROW1);
try {
Result result = table.get(get);
return result;
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
return null;
} finally {
try {
table.close();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}

}

Reference:

Java Example Code using HBase Data Model Operations

Please refer to the updated version: https://autofei.wordpress.com/2017/05/23/updated-java-example-code-using-hbase-data-model-operations/

The code is based on HBase version 0.92.1

The four primary data model operations are Get, Put, Scan, and Delete. Operations are applied via HTable instances.

First you need to install HBase. For testing, you can install it at a single machine by following this post.

Create a Java project inside Eclips and following libraries into ‘lib’ subdirectory are necessary:

hadoop@ubuntu:~/workspace/HBase$ tree lib
lib
├── commons-configuration-1.8.jar
├── commons-lang-2.6.jar
├── commons-logging-1.1.1.jar
├── hadoop-core-1.0.0.jar
├── hbase-0.92.1.jar
├── log4j-1.2.16.jar
├── slf4j-api-1.5.8.jar
├── slf4j-log4j12-1.5.8.jar
└── zookeeper-3.4.3.jar

Libraries locations

  • copy hbase-0.92.1.jar from HBase installation directory
  • copy rest jar files from “lib” subdirectory of HBase installation directory

Then you need to copy your HBase configuration hbase-site.xmlfile from “conf” subdirectory of HBase installation directory into the Java project directory.

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>hbase.rootdir</name>
<value>file:///home/hduser/hbase</value>
</property>
</configuration>

The whole directory looks like:

hadoop@ubuntu:~/workspace/HBase$ tree
.
├── bin
│   └── HBaseConnector.class
├── hbase-site.xml
├── lib
│   ├── commons-configuration-1.8.jar
│   ├── commons-lang-2.6.jar
│   ├── commons-logging-1.1.1.jar
│   ├── hadoop-core-1.0.0.jar
│   ├── hbase-0.92.1.jar
│   ├── log4j-1.2.16.jar
│   ├── slf4j-api-1.5.8.jar
│   ├── slf4j-log4j12-1.5.8.jar
│   └── zookeeper-3.4.3.jar
└── src
└── HBaseConnector.java

Open a terminal

  • start HBase in terminal: bin/start-hbase.sh
  • start HBase shell: bin/hbase shell
  • create a table: create ‘myLittleHBaseTable’, ‘myLittleFamily’

Now you can run the code:

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseConnector {
public static void main(String[] args) throws IOException {
// You need a configuration object to tell the client where to connect.
// When you create a HBaseConfiguration, it reads in whatever you've set
// into your hbase-site.xml and in hbase-default.xml, as long as these
// can be found on the CLASSPATH
Configuration config = HBaseConfiguration.create();

// This instantiates an HTable object that connects you to the
// "myLittleHBaseTable" table.
HTable table = new HTable(config, "myLittleHBaseTable");

// To add to a row, use Put. A Put constructor takes the name of the row
// you want to insert into as a byte array. In HBase, the Bytes class
// has utility for converting all kinds of java types to byte arrays. In
// the below, we are converting the String "myLittleRow" into a byte
// array to use as a row key for our update. Once you have a Put
// instance, you can adorn it by setting the names of columns you want
// to update on the row, the timestamp to use in your update, etc.
// If no timestamp, the server applies current time to the edits.
Put p = new Put(Bytes.toBytes("myLittleRow"));

// To set the value you'd like to update in the row 'myLittleRow',
// specify the column family, column qualifier, and value of the table
// cell you'd like to update. The column family must already exist
// in your table schema. The qualifier can be anything.
// All must be specified as byte arrays as hbase is all about byte
// arrays. Lets pretend the table 'myLittleHBaseTable' was created
// with a family 'myLittleFamily'.
p.add(Bytes.toBytes("myLittleFamily"), Bytes.toBytes("someQualifier"),
Bytes.toBytes("Some Value"));

// Once you've adorned your Put instance with all the updates you want
// to make, to commit it do the following
// (The HTable#put method takes the Put instance you've been building
// and pushes the changes you made into hbase)
table.put(p);

// Now, to retrieve the data we just wrote. The values that come back
// are Result instances. Generally, a Result is an object that will
// package up the hbase return into the form you find most palatable.
Get g = new Get(Bytes.toBytes("myLittleRow"));
Result r = table.get(g);
byte[] value = r.getValue(Bytes.toBytes("myLittleFamily"), Bytes
.toBytes("someQualifier"));
// If we convert the value bytes, we should get back 'Some Value', the
// value we inserted at this location.
String valueStr = Bytes.toString(value);
System.out.println("GET: " + valueStr);

// Sometimes, you won't know the row you're looking for. In this case,
// you use a Scanner. This will give you cursor-like interface to the
// contents of the table. To set up a Scanner, do like you did above
// making a Put and a Get, create a Scan. Adorn it with column names,
// etc.
Scan s = new Scan();
s.addColumn(Bytes.toBytes("myLittleFamily"), Bytes
.toBytes("someQualifier"));
ResultScanner scanner = table.getScanner(s);
try {
// Scanners return Result instances.
// Now, for the actual iteration. One way is to use a while loop
// like so:
for (Result rr = scanner.next(); rr != null; rr = scanner.next()) {
// print out the row we found and the columns we were looking
// for
System.out.println("Found row: " + rr);
}

// The other approach is to use a foreach loop. Scanners are
// iterable!
// for (Result rr : scanner) {
// System.out.println("Found row: " + rr);
// }
} finally {
// Make sure you close your scanners when you are done!
// Thats why we have it inside a try/finally clause
scanner.close();
}
}
}

Another great Java example from [4]:

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

public class HBaseTest {

	private static Configuration conf = null;
	/**
	 * Initialization
	 */
	static {
		conf = HBaseConfiguration.create();
	}

	/**
	 * Create a table
	 */
	public static void creatTable(String tableName, String[] familys)
			throws Exception {
		HBaseAdmin admin = new HBaseAdmin(conf);
		if (admin.tableExists(tableName)) {
			System.out.println("table already exists!");
		} else {
			HTableDescriptor tableDesc = new HTableDescriptor(tableName);
			for (int i = 0; i < familys.length; i++) {
				tableDesc.addFamily(new HColumnDescriptor(familys[i]));
			}
			admin.createTable(tableDesc);
			System.out.println("create table " + tableName + " ok.");
		}
	}

	/**
	 * Delete a table
	 */
	public static void deleteTable(String tableName) throws Exception {
		try {
			HBaseAdmin admin = new HBaseAdmin(conf);
			admin.disableTable(tableName);
			admin.deleteTable(tableName);
			System.out.println("delete table " + tableName + " ok.");
		} catch (MasterNotRunningException e) {
			e.printStackTrace();
		} catch (ZooKeeperConnectionException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Put (or insert) a row
	 */
	public static void addRecord(String tableName, String rowKey,
			String family, String qualifier, String value) throws Exception {
		try {
			HTable table = new HTable(conf, tableName);
			Put put = new Put(Bytes.toBytes(rowKey));
			put.add(Bytes.toBytes(family), Bytes.toBytes(qualifier), Bytes
					.toBytes(value));
			table.put(put);
			System.out.println("insert recored " + rowKey + " to table "
					+ tableName + " ok.");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	/**
	 * Delete a row
	 */
	public static void delRecord(String tableName, String rowKey)
			throws IOException {
		HTable table = new HTable(conf, tableName);
		List<Delete> list = new ArrayList<Delete>();
		Delete del = new Delete(rowKey.getBytes());
		list.add(del);
		table.delete(list);
		System.out.println("del recored " + rowKey + " ok.");
	}

	/**
	 * Get a row
	 */
	public static void getOneRecord (String tableName, String rowKey) throws IOException{
        HTable table = new HTable(conf, tableName);
        Get get = new Get(rowKey.getBytes());
        Result rs = table.get(get);
        for(KeyValue kv : rs.raw()){
            System.out.print(new String(kv.getRow()) + " " );
            System.out.print(new String(kv.getFamily()) + ":" );
            System.out.print(new String(kv.getQualifier()) + " " );
            System.out.print(kv.getTimestamp() + " " );
            System.out.println(new String(kv.getValue()));
        }
    }
	/**
	 * Scan (or list) a table
	 */
	public static void getAllRecord (String tableName) {
        try{
             HTable table = new HTable(conf, tableName);
             Scan s = new Scan();
             ResultScanner ss = table.getScanner(s);
             for(Result r:ss){
                 for(KeyValue kv : r.raw()){
                    System.out.print(new String(kv.getRow()) + " ");
                    System.out.print(new String(kv.getFamily()) + ":");
                    System.out.print(new String(kv.getQualifier()) + " ");
                    System.out.print(kv.getTimestamp() + " ");
                    System.out.println(new String(kv.getValue()));
                 }
             }
        } catch (IOException e){
            e.printStackTrace();
        }
    }

	public static void main(String[] agrs) {
		try {
			String tablename = "scores";
			String[] familys = { "grade", "course" };
			HBaseTest.creatTable(tablename, familys);

			// add record zkb
			HBaseTest.addRecord(tablename, "zkb", "grade", "", "5");
			HBaseTest.addRecord(tablename, "zkb", "course", "", "90");
			HBaseTest.addRecord(tablename, "zkb", "course", "math", "97");
			HBaseTest.addRecord(tablename, "zkb", "course", "art", "87");
			// add record baoniu
			HBaseTest.addRecord(tablename, "baoniu", "grade", "", "4");
			HBaseTest.addRecord(tablename, "baoniu", "course", "math", "89");

			System.out.println("===========get one record========");
			HBaseTest.getOneRecord(tablename, "zkb");

			System.out.println("===========show all record========");
			HBaseTest.getAllRecord(tablename);

			System.out.println("===========del one record========");
			HBaseTest.delRecord(tablename, "baoniu");
			HBaseTest.getAllRecord(tablename);

			System.out.println("===========show all record========");
			HBaseTest.getAllRecord(tablename);
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

Reference:

  1. http://hbase.apache.org/docs/current/api/index.html
  2. http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/package-summary.html
  3. http://hbase.apache.org/book/data_model_operations.html
  4. http://lirenjuan.iteye.com/blog/1470645

Install and run Mahout at single Linux box

This post show you how to install and run Mahout at a stand-alone Linux box.

Prerequisites for Building Mahout

  • Java JDK >=1.6
  • Maven
  • SVN

Steps:

  • svn co http://svn.apache.org/repos/asf/mahout/trunk
  • change directory to the checked out directory
  • mvn install
  • change to the core directory
  • mvn compile
  • mvn  install
  • change to the examples directory
  • mvn compile
  • mvn  install

Download test data from: http://www.grouplens.org/node/73. Please download “MovieLens 1M” one.

Run test example

Note: replace the test data file path to yours.

  • mvn -e exec:java -Dexec.mainClass=”org.apache.mahout.cf.taste.example.grouplens.GroupLensRecommenderEvaluatorRunner” -Dexec.args=”-i /home/hduser/trunk/examples/ml-1m/ratings.dat
    + Error stacktraces are turned on.
    [INFO] Scanning for projects...
    [INFO] Searching repository for plugin with prefix: 'exec'.
    [INFO] ------------------------------------------------------------------------
    [INFO] Building Mahout Examples
    [INFO]    task-segment: [exec:java]
    [INFO] ------------------------------------------------------------------------
    [INFO] Preparing exec:java
    [INFO] No goals needed for project - skipping
    [INFO] [exec:java {execution: default-cli}]
    12/03/28 14:08:33 INFO file.FileDataModel: Creating FileDataModel for file /tmp/ratings.txt
    12/03/28 14:08:33 INFO file.FileDataModel: Reading file info...
    12/03/28 14:08:34 INFO file.FileDataModel: Processed 1000000 lines
    12/03/28 14:08:34 INFO file.FileDataModel: Read lines: 1000209
    12/03/28 14:08:35 INFO model.GenericDataModel: Processed 6040 users
    12/03/28 14:08:35 INFO eval.AbstractDifferenceRecommenderEvaluator: Beginning evaluation using 0.9 of GroupLensDataModel
    12/03/28 14:08:35 INFO model.GenericDataModel: Processed 1753 users
    12/03/28 14:08:36 INFO slopeone.MemoryDiffStorage: Building average diffs...
    12/03/28 14:09:36 INFO eval.AbstractDifferenceRecommenderEvaluator: Beginning evaluation of 1719 users
    12/03/28 14:09:36 INFO eval.AbstractDifferenceRecommenderEvaluator: Starting timing of 1719 tasks in 1 threads
    12/03/28 14:09:36 INFO eval.StatsCallable: Average time per recommendation: 343ms
    12/03/28 14:09:36 INFO eval.StatsCallable: Approximate memory used: 448MB / 798MB
    12/03/28 14:09:36 INFO eval.StatsCallable: Unable to recommend in 0 cases
    12/03/28 14:09:43 INFO eval.StatsCallable: Average time per recommendation: 7ms
    12/03/28 14:09:43 INFO eval.StatsCallable: Approximate memory used: 510MB / 798MB
    12/03/28 14:09:43 INFO eval.StatsCallable: Unable to recommend in 13 cases
    12/03/28 14:09:52 INFO eval.AbstractDifferenceRecommenderEvaluator: Evaluation result: 0.7149488038906546
    12/03/28 14:09:52 INFO grouplens.GroupLensRecommenderEvaluatorRunner: 0.7149488038906546
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESSFUL
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 1 minute 26 seconds
    [INFO] Finished at: Wed Mar 28 14:09:53 PDT 2012
    [INFO] Final Memory: 53M/761M
    [INFO] ------------------------------------------------------------------------
    

Creating a simple recommender

Create a Maven project

mvn archetype:create -DarchetypeGroupId=org.apache.maven.archetypes -DgroupId=com.autofei -DartifactId=mahoutrec

This creates an empty project called mahoutrec with the package namespace com.autofei. Now change to the mahoutrec directory. You can try out the new project by running:

mvn compile
mvn exec:java -Dexec.mainClass="com.autofei.App"

Set the project dependencies
edit pom.xml, remember to change your Mahout version, in my case, it is 0.7-SNAPSHOT. an example file:

<?xml version="1.0"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>mahout</artifactId>
<groupId>org.apache.mahout</groupId>
<version>0.7-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<groupId>com.autofei</groupId>
<artifactId>mahoutrec</artifactId>
<version>1.0-SNAPSHOT</version>
<name>mahoutrec</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-core</artifactId>
<version>0.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>0.7-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math</artifactId>
<version>0.7-SNAPSHOT</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
</project>

Test data
Put these data into a file dummy-bool.csv under datasets directory

#userId,itemId
1,3
1,4
2,44
2,46
3,3
3,5
3,6
4,3
4,5
4,11
4,44
5,1
5,2
5,4

Create a java file under src/main/java/com/autofei/, named UnresystBoolRecommend.java:

 package com.autofei;

import java.io.File;
import java.io.FileNotFoundException;
import java.util.List;
import java.io.IOException;

import org.apache.commons.cli2.OptionException;
import org.apache.mahout.cf.taste.common.TasteException;
import org.apache.mahout.cf.taste.impl.model.file.FileDataModel;
import org.apache.mahout.cf.taste.impl.recommender.CachingRecommender;
import org.apache.mahout.cf.taste.impl.recommender.slopeone.SlopeOneRecommender;
import org.apache.mahout.cf.taste.model.DataModel;
import org.apache.mahout.cf.taste.recommender.RecommendedItem;
import org.apache.mahout.cf.taste.impl.common.LongPrimitiveIterator;

public class UnresystBoolRecommend {

public static void main(String... args) throws FileNotFoundException, TasteException, IOException, OptionException {

// create data source (model) - from the csv file
File ratingsFile = new File("datasets/dummy-bool.csv");
DataModel model = new FileDataModel(ratingsFile);

// create a simple recommender on our data
CachingRecommender cachingRecommender = new CachingRecommender(new SlopeOneRecommender(model));

// for all users
for (LongPrimitiveIterator it = model.getUserIDs(); it.hasNext();){
long userId = it.nextLong();

// get the recommendations for the user
List<RecommendedItem> recommendations = cachingRecommender.recommend(userId, 10);

// if empty write something
if (recommendations.size() == 0){
System.out.print("User ");
System.out.print(userId);
System.out.println(": no recommendations");
}

// print the list of recommendations for each
for (RecommendedItem recommendedItem : recommendations) {
System.out.print("User ");
System.out.print(userId);
System.out.print(": ");
System.out.println(recommendedItem);
}
}
}
}

Run the code

  • mvn compile
  • mvn exec:java -Dexec.mainClass="com.autofei.UnresystBoolRecommend"
    
    [INFO] Scanning for projects...
    [INFO] Searching repository for plugin with prefix: 'exec'.
    [INFO] ------------------------------------------------------------------------
    [INFO] Building mahoutrec
    [INFO]    task-segment: [exec:java]
    [INFO] ------------------------------------------------------------------------
    [INFO] Preparing exec:java
    [INFO] No goals needed for project - skipping
    [INFO] [exec:java {execution: default-cli}]
    SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
    SLF4J: Defaulting to no-operation (NOP) logger implementation
    SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
    User 1: RecommendedItem[item:5, value:1.0]
    User 2: RecommendedItem[item:5, value:1.0]
    User 2: RecommendedItem[item:3, value:1.0]
    User 3: no recommendations
    User 4: no recommendations
    User 5: RecommendedItem[item:5, value:1.0]
    User 5: RecommendedItem[item:3, value:1.0]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESSFUL
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 3 seconds
    [INFO] Finished at: Wed Mar 28 16:18:31 PDT 2012
    [INFO] Final Memory: 14M/35M
    [INFO] ------------------------------------------------------------------------

From now, you can test other algorithm inside Mahout.

Reference: