Monday, December 19, 2011

Programmatically submitting jobs to a remote Hadoop Cluster


I'm adding the ability to deploy a Map/Reduce job to a remote Hadoop cluster in Virgil. With this, Virgil allows users to make a REST POST to schedule a Hadoop job. (pretty handy)

To get this to work properly, Virgil needed to be able to remotely deploy a job. Ordinarily, to run a job against a remote cluster you issue a command from the shell:


hadoop jar $JAR_FILE $CLASS_NAME 

We wanted to do the same thing, but from within the Virgil runtime. It was easy enough to find the class we needed to use: RunJar. RunJar's main() method stages the jar and submits the job. Thus, to achieve the same functionality as the command line, we used the following:


 List args = new ArrayList(); 
 args.add(locationOfJarFile); 
 args.add(className); 
 RunJar.main(args.toArray(new String[0])); 

That worked just fine, but would result in a local job deployment. To get it to deploy to a remote cluster, we needed Hadoop to load the cluster configuration. For Hadoop, cluster configuration is spread across three files: core-site.xml, hdfs-site.xml, and mapred-site.xml. To get the Hadoop runtime to load the configuration, you need to include these files on your classpath. The key line is found in the configuration Hadoop Javadoc.

"Unless explicitly turned off, Hadoop by default specifies two resources, loaded in-order from the classpath:"


Once we dropped the cluster configuration onto the classpath, everything worked like a charm.

Monday, December 12, 2011

Binaries available for download from Virgil


In response to a few requests for a binary distribution, we just posted artifacts for Virgil.

For simplicity, we're keeping the version number aligned with the version of Cassandra. 
(which is important when you are running with an embedded Cassandra ;)

Also, we changed it so you can simply specify the Cassandra instance you want to run against as a command line parameter:

This makes it easy to point the GUI at different Cassandra instances. 

Now, all you need to do is download the binary distribution, untar/unzip and type:

bin/virgil -h CASSANDRA_HOST

Let me know if anyone has any trouble.

Friday, December 9, 2011

Dependencies and Repositories in SBT 0.11 (vs 0.7) for Kestrel

I was trying to build Kestrel last night, which is written in Scala.

After installing scala and sbt on my mac using homebrew, Kestrel wouldn't build because it requires an older version of sbt. Instead of installing the old version of sbt, I took it as a good opportunity to learn what sbt was about. In migrating the Kestrel build file to sbt 0.11, I found the documentation somewhat lacking. (especially when compared to 0.7) Thus, here are two tidbits that I had a hard time finding... (I actually had to dig through other github projects to see how they did it)

I learned that sbt uses maven repositories as one source for dependencies.  Just like maven, you need to declare the dependencies and the available repositories.  Here is how you do it.

I forked the Kestrel project and posted my new build.sbt here:
https://github.com/boneill42/kestrel/blob/master/build.sbt  

To add a dependency to your project add the following line to the build.sbt. 

  libraryDependencies += "com.twitter" % "util-core" % "1.12.4"

In maven speak, the first string is the group identifier. The second string is the artifact identifier. And the third string is the version identifier.

In this case, Kestrel required additional repositories.

To add a repository to your project add the following line to the build.sbt. 

resolvers += "twitter.com" at "http://maven.twttr.com/"
The first string is the name of the repository. The second string is the url for the repo.

Hope that helps some people.

Thursday, December 1, 2011

Hadoop/MapReduce on Cassandra using Ruby and REST!

In an effort to make Hadoop/MapReduce on Cassandra more accessible, we added a REST layer to Virgil that allows you to run map reduce jobs written in Ruby against column families in Cassandra by simply posting the ruby script to a URL. This greatly reduces the skill set required to write and deploy the jobs, and allows users to rapidly develop analytics for data store in Cassandra.

To get started, just write a map/reduce job in Ruby like the example included in Virgil:
http://code.google.com/a/apache-extras.org/p/virgil/source/browse/trunk/server/src/test/resources/wordcount.rb

Then throw that script at Virgil with a curl:

curl -X POST http://localhost:8080/virgil/job?jobName=wordcount\&inputKeyspace=dummy\&inputColumnFamily=book\&outputKeyspace=stats\&outputColumnFamily=word_counts --data-binary @src/test/resources/wordcount.rb

In the POST, you specify the input keyspace and column family and the output keyspace and column family. Each row is fed to the ruby map function as a Map, each entry in the map is column in the row. The map function must return tuples (key/value pairs), which are fed back into Hadoop for sorting.

Then, the reduce method is called with the keys and values from Hadoop. The reduce function must return a map of maps, which represent the rows and columns that need to be written back to Cassandra. (keys are the rowkeys, sub maps are the columns)

Presently, the actual job runs inside the Virgil JVM and the HTTP connection is left open until the job completes. Over the next week or two, we'll fix that. We intend to implement the ability to distribute that job across an existing Hadoop cluster. Stay tuned.

For more information see the Virgil wiki:
http://code.google.com/a/apache-extras.org/p/virgil/wiki/mapreduce

Wednesday, November 30, 2011

Concurrent use of embedded Ruby in Java (using JRuby)

Last night I was finishing up the map/reduce capabilities within Virgil. We hope to allow people to post ruby scripts that will then get executed over a column family in Cassandra using map/reduce. To do that, we needed concurrent use of a ScriptEngine that could evaluate the ruby script. In the below code snippets, script is a String that contains the contents of a ruby file with a method definition for foo.

First, I started with JSR 223 and the ScriptEngine with the following code:

public static final ScriptEngine ENGINE = new ScriptEngineManager().getEngineByName("jruby");
ScriptContext context = new SimpleScriptContext();
Bindings bindings = context.getBindings(ScriptContext.ENGINE_SCOPE);
bindings.put("variable", "value");
ENGINE.eval(script, context);


That worked fine in unit testing, but when used within map/reduce I encountered a dead-lock of sorts. After some googling, I landed in the Redbridge documentation. There I found that jruby exposes a lower-level API (beneath JSR223) that exposes concurrent processing features. I swapped the above code, for the following:


this.rubyContainer = new ScriptingContainer(LocalContextScope.CONCURRENT);
this.rubyReceiver = rubyContainer.runScriptlet(script);
container.callMethod(rubyReceiver, "foo", "value");


That let me leverage a single engine for multiple concurrent invocations of the method foo, which is defined in the ruby script.

This worked like a charm.

Monday, November 28, 2011

Introducing run-modes to Virgil: support for embedded or remote cassandra instances


Since Virgil was originally developed as an embedded REST layer for the Cassandra Server, it ran as a daemon inside the server and performed operations directly against the CassandraServer classes. Running in a single JVM had some performance gains over a separate server that communicated over Thrift (either directly or via Hector) since operations didn't have to take a second hop across the network (with the associated marshalling/unmarshalling)

We had a request come in to add the ability to run Virgil against a remote Cassandra:

That seemed reasonable since there are a lot of existing cassandra clusters and users may just want to add a REST layer to support webapp/gui access or SOLR integration.

To support those cases, we added run-modes to the configuration:

Let us know what you think.

Monday, November 21, 2011

Virgil: GUI for Cassandra now included in Virgil

Sure, its read-only.
Sure, its focused on Strings.

But it was written in only 100 lines of code using Virgil's REST layer for Cassandra and includes all of ExtJS's goodness. (if you are into that kind of thing)

You can see the entire the GUI is contained in a single javascript class:

That javascript uses two GridPanel's: one to display column families grouped by keyspaces (on the east region panel), and another to display columns grouped by rowkeys (in the center panel). Each of the GridPanel's uses a store backed by an ExtJS model.

To accomodate the GUI, we added fetch capabilities the REST layer for both schema information and rows using key ranges. I'll detail those capabilities in a follow up post.

For instructions on how to access the GUI and to see what it looks like check out the wiki page.

Even in its existing state, this a useful GUI to quickly inspect the contents of a Cassandra node. It is also a good demonstration of how you might include a javascript component for visualization into your own application with very little effort.
...

Virgil now includes an elementary REST interface. (thanks to Dave Strauss @ Pantheon for his help defining the interface) It also includes simple SOLR integration and a GUI. Next up, map/reduce for the masses via REST. Stay tuned.

As always, comments and contributions welcome and appreciated.

Thursday, November 10, 2011

PATCH methods on JAX-RS

We added PATCH semantics for Virgil.

This was fairly straight forward, except we need to add support for a @PATCH annotation and PatchMethod for HttpClient.

To do this, we created a PATCH annotation. Take a look at PATCH.java. The contents of which are shown below:

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@HttpMethod("PATCH")
public @interface PATCH {
}

This then allows us to use @PATCH on an annotation on a REST service.

@PATCH
@Path("/data/{keyspace}/{columnFamily}/{key}")
@Produces({ "application/json" })
public void patchRow(@PathParam("keyspace") String keyspace,
@PathParam("columnFamily") String columnFamily, @PathParam("key") String key,
@QueryParam("index") boolean index, String body) throws Exception

That worked like a charm. Then we needed to call it using HttpClient. To that, we created a PatchMethod class that extended PostMethod. You can see that here.

Then we could use that just like any other HTTP method.

PatchMethod patch = new PatchMethod(BASE_URL + KEYSPACE + "/" + COLUMN_FAMILY + "/" + KEY);
requestEntity = new StringRequestEntity("{\"ADDR1\":\"1235 Fun St.\",\"COUNTY\":\"Montgomery\"}",
"appication/json", "UTF8");
patch.setRequestEntity(requestEntity);

Hope that helps people.

Virgil: PATCH semantics added to REST layer for Cassandra


Virgil now supports PATCH semantics for row updates in Cassandra via REST.

In REST, when a resource is modified rather than fully replaced by an HTTP operation, the IETF is proposing a new HTTP method, PATCH.

Virgil now allows users to use this HTTP method to add and modify columns in a single post (without reposting the entire row). We've included an example in the Getting Started instructions.

Likewise, PUT operations will now replace the entire row, per HTTP semantics.

(Thanks to David Strauss for suggesting this)

Friday, November 4, 2011

Cassandra integration w/ SOLR using Virgil

Up front, I'd like to say this is still pretty raw. We'd love to get feedback and contributions.

That said, Virgil now has the ability to integrate SOLR and Cassandra. When you add and delete rows and columns via the REST interface, an index is updated in SOLR.

For more information check out:
http://code.google.com/a/apache-extras.org/p/virgil/wiki/solr

Let us know what we can do better.

Monday, October 24, 2011

Virgil gets a command-line interface (virgil-cli)


Tonight I bundled the cassandra command-line interface (CLI) into virgil. Since the CLI uses the thrift-based CassandraDaemon, the main method now starts a thrift server along side the REST server.

Now, when you (or your application) issues commands through the REST interface, you can verify that they worked through the command-line interface. For more information, check out the wiki.

Specifically, if you use the curl commands in the Getting Started section. You should see the following in the command-line interface.


bone@zen:~/dev/code.google.com/virgil/trunk-> bin/virgil-cli -h localhost
Connected to: "Test Cluster" on localhost/9160
Welcome to the Cassandra CLI.

Type 'help;' or '?' for help.
Type 'quit;' or 'exit;' to quit.

[default@unknown] use playground;
Authenticated to keyspace: playground
[default@playground] list toys;
Using default limit of 100
-------------------
RowKey: swingset
=> (column=bar, value=33, timestamp=1319508065134)
=> (column=foo, value=1, timestamp=1319508065126)

1 Row Returned.
[default@playground] quit

Thursday, October 20, 2011

Virgil: a GUI and REST layer for Cassandra

Love Cassandra? Love REST?
Wish you could have both at the same time?
Now you can.

After much discussion, I'm happy to announce the birth of a new project, Virgil. The project will provide a GUI and a services layer on top of Cassandra, exposing data and services via REST.

Virgil already has a REST layer for CRUD operations against keyspaces, column families, and data. We hope to quickly add Pig/Hadoop support via REST as well as a thin, javascript-based GUI that uses the REST services.

How can you help nurture the baby?
Head over to Apache Extras,
http://code.google.com/a/apache-extras.org/p/virgil/

Star the project, and then get involved.
Grab the source code and give it a try.

Wednesday, October 5, 2011

Pig / Cassandra: binary operator expected

If you are trying to run Pig on Cassandra and you encounter: "binary operator expected"

You are most likely running pig_cassandra against the latest release of Pig, which has two jar files in it one with hadoop and one without hadoop. Your PIG_HOME is set to the root directory of your pig installation, which contains those two jar files. The existence of TWO jar files breaks the pig_cassandra shell script.

I've submitted a patch for this:
https://issues.apache.org/jira/browse/CASSANDRA-3320
(Please vote to get it included)

Until that is committed, you can simply remove the jar file you don't want to use:

rm -fr $PIG_HOME/pig-0.9.1-withouthadoop.jar


That should fix you.
Happy pigging.

Monday, October 3, 2011

Cassandra / Hadoop : Getting the row key (when iterating over all rows)


I thought I would save some people some time...

The word count example is fantastic, and is enough to get you going. But, you it may leave you wondering how to get at the row key since the "key" passed into the map is the name of the column and not the key of the row. Instead the key is in the context. Take a look at the snippet below.
public void map(ByteBuffer key, SortedMap<ByteBuffer, IColumn> columns, Context context) 
throws IOException, InterruptedException {
for (ByteBuffer columnKey : columns.keySet()){
String name = ByteBufferUtil.string(columns.get(columnKey).name());
String value = ByteBufferUtil.string(columns.get(columnKey).value());
logger.info("[" + ByteBufferUtil.string(columnKey) + "]->[" + name + "]:[" + value + "]");
logger.info("Context [" + ByteBufferUtil.string(context.getCurrentKey()) + "]);
}
}

Wednesday, September 14, 2011

Cassandra / Hadoop : No local connection available

If you are trying to run the word_count example in Cassandra Hadoop, and you encounter the following exception:

java.lang.RuntimeException: java.lang.UnsupportedOperationException: no local connection available
at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.initialize(ColumnFamilyRecordReader.java:132)
at org.apache.hadoop.mapred.MapTask$NewTrackingRecordReader.initialize(MapTask.java:418)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:620)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:305)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:177)
Caused by: java.lang.UnsupportedOperationException: no local connection available
at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.getLocation(ColumnFamilyRecordReader.java:176)
at org.apache.cassandra.hadoop.ColumnFamilyRecordReader.initialize(ColumnFamilyRecordReader.java:113)
... 4 more


Then you have hit a problem with local IP resolution in Java.

Cassandra currently uses the following line to resolve IP addresses;
localAddresses = InetAddress.getAllByName(InetAddress.getLocalHost().getHostAddress());

There are better ways to do this using the NetworkInterface,

But until Cassandra uses that you'll need to make sure that the bit of code above resolves properly by manipulating your /etc/hosts to resolve localhost to match the configuration in Cassandra, which by default is looking for localhost bound to 127.0.0.1.

I submitted a patch for this. If you are having issues, please vote to get the patch accepted.
https://issues.apache.org/jira/browse/CASSANDRA-3211



Wednesday, August 31, 2011

Master Data Management : Open Source Solutions???

The movement towards digital records is generating exponential amounts of data, tremendously valuable data. But building a system to manage that data and extract value from it requires acceptance of a paradox; the system needs to be flexible and tolerant, while simultaneously enforcing structure and standards. This is hard.

Many of us have felt the pain of Master Data Management (MDM). In large enough enterprises, even something as simple as keeping addresses current across multiple lines of business is a herculean task. Furthermore, it’s a problem that’s difficult to ignore because poor data management can drive substantial expense and leave invaluable opportunities on the table.

Since joining Health Market Science (HMS), a company focused on MDM for the healthcare space, that sentiment has been reinforced tenfold. Two things became immediately apparent: the complexity of a complete solution and the value of the same.

The problem is complex because there is a temporal aspect to the data. A complete MDM solution doesn't just provide a current view of entities, but a historical perspective as well. It provides that perspective across all the entities and the all the relationships between those entities. Then, add in the fact that each source system may have a different schema representing each entity and that the schema itself may evolve over time. Then, pile on all the necessary processing to analyze the data. Entities need to be consolidated based on precedence rules, standardized, and matched using fuzzy matching. What a fantastic recipe for a fun problem to solve.

But given the complexity, it isn't difficult to see why companies shy away from an "in-house" MDM solution. Solving these problems isn't easy. You may think that if you get yourself a good Data Architect and a massive Oracle instance, you could crank something out. You’ll soon find that standard relational structures quickly become unwieldy and you end up in “meta-meta” world building out schemas to manage schemas. This can be extremely painful.

I'm wondering if people have seen any open source solutions capable of tackling this problem. From what I can find, the open source "MDM" solutions only tackle the Extract-Transform-Load (ETL) portion of the problem. Unfortunately, that is the easy part. Does anyone know of any open-source communities focused on delivering a complete solution, ETL through down into storage?