Tuesday, November 24, 2015

Monetate open-sources Koupler: a versatile interface to Kinesis!


I'm happy to announce that Monetate has open-sourced Koupler, a versatile interface for Kinesis.  We took the best practices outlined by Amazon and codified them.

Hopefully, this will be the first of many contributions back to the community.

For the full story, check out the formal announcement.


Wednesday, November 18, 2015

Using Squid as an HTTP Proxy via SSH (to fetch remotely from Amazon yum repos)


We've been playing around with vagrant for local development.   When combined with Ansible, the pair allows you to recreate complex systems locally with high fidelity to your deployment environment.   Through magic voodoo (kudos to @jjpersch and @kmolendyke), we managed to get an Amazon AMI crammed into a virtual box.  Unfortunately, Amazon's yum repos are only available from the EC2 network.  Thus, when we fired up our vagrant machine locally, it couldn't update!

No worries.  We could use a single EC2 instance to proxy the http requests from yum.  Here's how.

First, fire up an ec2 instance and install squid on that instance.

yum install squid

Start squid with:

/etc/init.d/squid start

If it has trouble starting, have a look at:

cat /var/log/squid/squid.out

If you see something like this:

FATAL: Could not determine fully qualified hostname.  Please set 'visible_hostname'

You may need to set visible_name in the /etc/squid/squid.conf file.  Add it to the end of that file:

visible_hostname myec2

Then, squid should fire up.  By default squid runs on port 3128.
(But you can check with netstat -plant | grep squid)

Now, you are ready for HTTP proxying.  In this scenario, lets assume your ec2 instance is myec2.foo.com, and you want to proxy all HTTP requests from your laptop through that machine.

On your laptop, you would run:

ssh -L 4444:localhost:3128 myec2.foo.com

This will ssh into your ec2 instance, simultaneously setting up port forwarding from your laptop.  Every packet sent to port 4444 on your laptop will be forwarded over the secure tunnel to port 3128 on your ec2 instance.  And since squid is running on that port, squid will in turn forward any HTTP requests along, but they will now look like they are coming from your ec2 instance!

Shazam.  You can test out the setup using wget.  For example, previously we couldn't fetch a package list from amazon's yum repos.  The following wget would fail:

wget http://packages.ap-southeast-2.amazonaws.com/2014.09/updates/d1c36cf420e2/x86_64/repodata/repomd.xml

wget pays attention to the http_proxy environment variable.   When set, wget will forward all requests to that proxy.  Thus, to test out our proxy, set http_proxy with the following:

export http_proxy=http://localhost:4444

Once set, you should be able to re-run the wget and it should succeed!  Finally, if you have a similar situation, and you want to proxy all yum requests.  Go into /etc/yum.conf and set the following:

proxy=http://localhost:4444

With that setting, you should be able to yum update all day long from your vagrant machine. =)



Wednesday, October 28, 2015

Diagnosing memory leaks in Java


Every time I suspect a memory leak, I have to go dig up these commands.
So, here they are for posterity's sake:

First, I use the following command to monitor the process over time:

while ( sleep 1 ) ; do ps -p $PID -o %cpu,%mem,rss  ; done

(and/or New Relic if you have it ;)

If you see memory creeping up, it could be your VM settings. If you haven't explicitly specified memory settings to the JVM, it will default them. To get the defaults, use the following command:

java -XX:+PrintFlagsFinal -version | grep -i HeapSize

If those are out of line with what you want, then you'll need to specify the memory settings for the
JVM. You can set minimum and maximum heap sizes with the following:

java -Xms128m -Xmx256m

Once you have sane memory settings, and you can monitor the process, you might legitimately still see memory increasing over time. To get insight into that, you can start by taking a look at the histogram of object instances using the following:

jmap -histo $PID

If that isn't enough information then you can take heap dumps with the following command:

jmap -dump:format=b,file=/tmp/dump1.hprof $PID


Typically, I'd take two heap dumps and then compare them using jhat using the following command:


jhat -baseline /tmp/dump1.hprof /tmp/dump2.hprof

That fires up an HTTP server that you can use to explore the delta between those two heap dumps. By default, the HTTP server will start on port 7000, which you can just hit in a browser.

If you are behind a firewall, but have ssh access, you can tunnel over to the port using:

ssh -L 7000:localhost:7000 $HOST

If you scroll down to the bottom of the first page, you will see two useful links: "
That will show you all "new" instances between the different heaps, which should give you some idea of where your leak is coming from.  Screenshot below:




And there you have it, a quick synopsis of the magic command-lines you need when diagnosing memory leaks. (which I always forget)

Tuesday, October 27, 2015

Using Gradle with AWS and S3 (w/ DefaultAWSCredentialsProviderChain and InstanceProfileCredentialsProvider (FTW!))


We recently switched over to gradle as our build mechanism.  As part of that switchover, we wanted to be able to build w/o external dependencies (e.g. Maven Central).  We tossed around the idea of Artifactory, but in the end we decided to keep it simple and push our jar files into S3.

This turned out to be remarkably easy.   First, we added a task to copy down our dependencies:

task copyRuntimeLibs(type: Copy) {
    from configurations.runtime
    into "$buildDir/libs"
}

With that, we simply sync'd our dependencies up to S3 using aws cli:

aws s3 sync build/libs/ s3://yourbucket/gradle/

That deposits the jar files into your S3 bucket.  For example:

amazon-kinesis-client-1.5.1.jar

For the next trick, you need to get gradle to pull those artifacts from S3.  You can do this by declaring an *ivy* repository in your build.gradle. e.g.

repositories {
    ivy {
         url "s3://yourbucket/gradle/"
         credentials(AwsCredentials) {
            accessKey "YOUR_AWSAccessKeyId"
            secretKey "YOUR_AWSSecretKey"
         }
         layout "pattern", {
            artifact "[artifact]-[revision].[ext]"
         }
     }
}

That's it. HOWEVER....

If you are as paranoid about security as we are, you'll want to use EC2 instance credentials for system users like Jenkins, instead of having keys and secrets floating around that are known to many. Fortunately, the AWS Java SDK can do this out-of-the-box.   It is part of the DefaultAWSCredentialsProviderChain.  With the instance profile credentials, you don't need to specify a key and secret, instead it retrieves the security credentials directly from the host using the Amazon EC2 Instance Metadata Service.

UNFORTUNATELY, Gradle is hard-coded to use the BasicAwsCredentials!
See this discussion thread.

The gradle crew is working on a larger design to address security, and because of that they are not going to merge this patch.   So, we took matters into our own hands and forked the 2.8 branch of gradle to make the change:
https://github.com/monetate/gradle/tree/2.8.monetate

If you want to use instance profile credentials feel free to pull that code and follow the instructions in this commit.

With that patch, you can simply omit the credentials from your build.gradle. (woohoo!)

repositories {
    ivy {
         url "s3://yourbucket/gradle/"
         layout "pattern", {
            artifact "[artifact]-[revision].[ext]"
         }
     }
}

It will pickup the credentials from your instance metadata, and you'll be on your way to Cape May (S3).

FWIW, hopefully this helps people.

Thursday, October 22, 2015

Reverse Engineering Kinesis Aggregation and Sharding (and the magic "a" partitionKey)


I love open source.  I love being able to take things apart to understand them.  And for that reason, I'd like to thank Amazon for open sourcing Kinesis Consumer Library and Kinesis Producer Library.  It allowed us to get to the bottom of a couple of strange observations and reverse engineer the design for Kinesis Record aggregation in the KPL. (at least we think so?)

When we started scale testing Kinesis, we saw a couple strange observations:

(1) The velocity of PUTs (records per second) seemed to scale/increase with the number of shards.
(2) When we attached to the stream using the AWS CLI, we saw records with partition key: "a"

First, let's discuss the PUT record velocity.  One component of Amazon Kinesis's pricing includes the number of PUT records per month.  Specifically:

PUT Payload Units, per 1,000,000 units$0.014

So, mo' PUTs is mo' money.  Fortunately, the Kinesis Producer Library (KPL) lets you aggregate multiple records into a single PUT.    If you are optimizing for cost, you want to maximize that records per PUT ratio. (and obviously at a 1:1 ratio you are paying the most)  But, the amount of aggregation you can perform depends on record throughput and your tolerance for latency.

KPL ingests records, accumulating them in a buffer.  The buffering is configured via the kpl.properties file, and the key parameter in the system is the RecordMaxBufferedTime, which specifies the maximum amount of time that the KPL will wait before it sends a PUT.   After this maximum amount of time passes, KPL PUTs the buffered records, regardless of how many it has accumulated/aggregated.  If your system has low throughput, KPL sends partially filled PUTs. (which is more expensive)

For example,
If your throughput is 1 record / second, and you have RecordMaxBufferedTime set to 1 second.  You will have a compression ratio of 1:1 (not good).  Now, if you increase that duration from 1 second to 4 seconds, you will immediately quadruple your compression ratio (and cut costs by 75%!).

When we started scale testing, we saw an order of magnitude more PUT records than we saw during our initial testing (on streams with few shards).  We noticed that the velocity of PUTs increased linearly with the number of shards.  For example, with 2X the number of shards, we saw 2X the number of PUTs.  This lead us to hypothesize that aggregation was somehow related to shards.

Our straw man hypothesis looked like this:



If our hypothesis was right, then locally the KPL was keeping an aggregate KinesisRecord (aka PUT) for each shard.  Then, when KPL hit the RecordMaxBufferedTime time it would flush all of those records, meaning that the minimum velocity of PUTs would be equal to the RecordMaxBufferedTime multiplied by the number of shards.

e.g. 100ms RecordBufferedTime = 10 PUTs / second / shard
which would be: 500 PUTs / second for 50 shards.

This is exactly what we saw.  Even with a trickling of traffic, we saw hundreds of PUT requests.

When you think about it, this makes sense.  Since a partition key determines which shard receives the record, a single PUT can't contain records destined for multiple shards, unless Amazon somehow fanned out the records on the backend.

...

Now, let's look at our second observation, the illusive "a" partition key.  When checking the accuracy of our stream data, we happened to attach using AWS CLI.  When we attached to the stream and read records directly using `aws kinesis get-records...`, we saw JSON data in the form:

{
  • PartitionKey"a",
  • Data:"84mawgo...rnz1f7h",
  • SequenceNumber"49555608841303555555555578136812826047777764134359384066"
}

Some of the JSON elements had the proper partitionKey in them, while others looked like the above. We had no idea where the mysterious "a" partitionKey was coming from.  Immediately, we thought we had a bug somewhere.  I proceeded to go into debug mode, and Base64 decoded the Data element of the JSON.  What I saw was... (and paraphrasing a bit...)

pk1 pk2 pk3 pk4 ?? data1 ?? data2 ?? data3 ?? data4

Very interesting... the beginning of the Data element had all of the partitionKeys from my records.  They were then followed by the data for each of those records.  Okay, so the data looked good.  And it looks like KPL just packs all of the partition keys up front, followed by the data.  But that did'n't explain the "a" partition key.  And if the partition key was always "a", how was the data getting distributed across shards?!

If you recall, the Kinesis Producer Library is a native application, coated in a Java wrapper.   If that "a" was coming from somewhere, it had to be deep in the native application.  I put on my waders, strapped on my spelunking headlamp and dusted off my C++.    After some digging, I eventually found the code(droid) I was looking for:

117 std::string KinesisRecord::partition_key() const {
118   if (items_.empty()) {
119     throw std::runtime_error(
120         "Cannot compute partition_key for empty container");
121   }
122
123   if (items_.size() == 1) {
124     return items_.front()->partition_key();
125   }
126
127   // We will always set an explicit hash key if we created an aggregated record.
128   // We therefore have no need to set a partition key since the records within
129   // the container have their own parition keys anyway. We will therefore use a
130   // single byte to save space.
131   return "a";
132 }

Aha  Mr. Mustard, that explains everything!   Kinesis typically hashes the partition key to determine the shard.  But if you have a a look at the Kinesis API, you'll notice that you can actually specify ExplicitHashKey.  When an ExplicitHashKey is provided, the partition key is ignored!

If we closely examine the code above, notice that whenever the aggregated KinesisRecord had a single record, it used that record's partition key.  These were the "good" records we were seeing through AWS CLI. However, whenever KPL had more than one record, it set the partition key to the magic "a", and set the ExplicitHashKey for the Kinesis Record (which contained multiple partition keys!).    It all became clear, and a rainbow unicorn galloped in from the street.

There you have it.

The case of the crazy "a", and the accelerated PUTs. Solved.
Until next time.  Long live open source software.

Tuesday, October 20, 2015

BaDaS Arsenal : from grep to graph in under 5 minutes w/ Pyplot

BaDaS = Big and Data and Science

These days, BaDaS is all the rage. And to be successful at it, you need an arsenal of weapons to gain insights into vast amounts of data quickly. And more and more, I believe python is the go-to swiss-army knife used in the field. And if you’ve read my previous blog, I believe iPython Notebook (a.k.a. Jupyter) is a great way to collaborate with other people when doing data analysis.

So…

Recently, when I was analyzing the throughput/scalability of Kinesis and I wanted to graph the results, instead of turning to Excel to churn out the graph, I fired up the old iPython Notebook. (admittedly, I was also inspired by @AustinRochford’s excellent blog on Bayesian inference)

I was blown away. I was able to go from a log file to a visible graph in just minutes. Here is how I did it…

After a couple minutes of googling, I realized that the secret sauce was in matplotlib. More specifically, it was Pyplot FTW!

To get to work, I first grep’d the log files for each of my runs, and created a log file for each. I happened to be playing around with RecordBufferSizes in the Kinesis Producer Library (KPL), and ran an experiment at 10ms, 25ms, 50ms and 100ms buffer times.

I then fired up the trusty Notebook and started hacking away.

First, I imported pyplot:

from matplotlib import pyplot as plt

Then, I imported csv and dateutil to munge the data…

import csv
import dateutil

Finally, all I had to do was write some python to parse each data file and create a representative dataset. Pyplot took care of the rest.

In the below code, I loop through each CSV file and gather up the dataset for that file. The dataset is a dictionary called data. The dictionary contains two lists, one for timestamps, which is what will become the y axis, and one for the throughput (a.k.a. recordsPerSecond), which is what will become the x axis.

The first element of each row in the CSV contains the epoch time. But because I wanted a time series graph , I didn’t want to use the absolute timestamp. Instead, inline I just calculate the delta from the first timestamp I see. (neato!)

In the end, I stuff each of the data dictionaries, into an overarching dictionary that maps filename to the data dictionary representing the data for that file.

The final few lines tell pyplot to graph each of the datasets. There is some voodoo in there to set the color (hint: ‘r’ = red, ‘y’ = yellow, etc.)

In the end, you hit execute on the cell, and SHAZAM – it’s graphalicious.

%matplotlib inline
files = ['10ms.csv', '25ms.csv', '50ms.csv', '100ms.csv']

datasets = {}

for filename in files:
    data = {
        "timestamp": [],
        "recordsPerSecond": []
    }
    with open(filename, 'rb') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        starttime = None
        for row in reader:
            timestamp = dateutil.parser.parse(row[0])
            if (starttime is None):
                starttime = timestamp
            delta = (timestamp - starttime).total_seconds()
            data["timestamp"].append(delta)
            data["recordsPerSecond"].append(row[1])
        datasets[filename] = data

plt.plot(
    datasets["10ms.csv"]["timestamp"], 
    datasets["10ms.csv"]["recordsPerSecond"], 
    'r', label='10ms')
plt.plot(datasets["25ms.csv"]["timestamp"], 
    datasets["25ms.csv"]["recordsPerSecond"],
    'b', label="25ms")
plt.plot(datasets["50ms.csv"]["timestamp"], 
    datasets["50ms.csv"]["recordsPerSecond"],
    'g', label="50ms")
plt.plot(datasets["100ms.csv"]["timestamp"], 
    datasets["100ms.csv"]["recordsPerSecond"],
    'y', label="100ms")
plt.ylabel('Records / Second')
plt.xlabel('Seconds')
plt.legend()
plt.show
<function matplotlib.pyplot.show>




And FWIW, this entire blog post was written from within Jupyter!

If I get around to it, I’ll hopefully post what the actual results were when we played with buffer sizes with Kinesis. (HINT: Be careful – the buffer sizes control how much aggregation occurs, which not only affects throughput, but also affects cost!)

Sunday, October 11, 2015

A quick proof that Black is not White (to save all of mankind)


In the Hitchhiker's guide to the galaxy, there is a proof about the non-existence of God.  The reasoning roughly mimics that of the Ontological argument for God.

The passage has stuck with me:
"I refuse to prove that I exist," says God, "for proof denies faith, and without faith I am nothing." "But," says Man, "the Babel fish is a dead giveaway, isn't it? It could not have evolved by chance. It proves that You exist, and so therefore, by Your own arguments, You don't. QED".  "Oh dear," says God, "I hadn't thought of that," and promptly vanishes in a puff of logic. "Oh, that was easy," says Man, and for an encore goes on to prove that black is white and gets himself killed on the next zebra crossing.

I always thought it weird that Man is capitalized in this passage.  Does that in fact mean all of Man?  If so, it would be tragic if we all died during a zebra crossing simply because we did not have proof that black was not white.   Thus, I formally submit this proof for posterity, to avoid a Zebrapoclypse.

Proof that Black is not White:

Axiom (1): A chess board comprises 64 black and white squares.

Now, let us assume for contradiction that back is white. 

With black and white indistinguishable from one another, squares on a chess board would be indistinguishable from one another.  

Thus, the chess board would have only one square.

We know from (1) that a chess board has 64 squares, therefore our initial assumption must be incorrect.  Black must not be white.

Q.E.D.

Proof by contradiction, never fails. (or does it always fail? ;)

And there you have it.  All of mankind saved from a zebra crossing.

(oh, the things I think about while waiting in line to pay for pumpkins. =)

Thursday, October 8, 2015

Integrating Syslog w/ Kinesis : Anticipating use of the Firehose

On the heals of the Kinesis Firehose announcement, more people are going to be looking to integrate Kinesis with logging systems. (to expedite/simplify the ingestion of logs into S3 and Redshift)  Here is one take on solving that problem that integrates syslog-ng with Kinesis.

First, let’s have a look at the syslog-ng configuration. In the syslog-ng configuration, you wire sources to destinations:

source s_sys {
    udp(ip(127.0.0.1) port(514));
};

destination d_test { file("/var/log/monetate/test.log"
    perm(0644)
    template("$MSGONLY\n")
    template_escape(no)
); };

destination d_fact_kinesis { tcp("localhost"
   port(4242)
   template("$MSGONLY\n")
   template_escape(no)
); };
log { source(s_sys); destination(d_test); destination(d_kinesis); };

In this example, we have one source of events: UDP, and we have two destinations: one file and one TCP. We wire them together in the last log statement. With this configuration, any packets syslog-ng receives via UDP, are written to the TCP port and to file. File is our backup strategy, just in case something happens to Kinesis, we will also persist the events locally.  TCP is our path to Kinesis.

On the TCP side of things, we can easily implement a simple TCP server in Java that receives the messages from syslog-ng and sends them out via KPL. At Monetate, we’ve done exactly that. (and I’ll look to see if we can open source the solution). However,  it is really important to understand the syslog-ng flow control, and more specifically how we will handle the back pressure if/when Kinesis starts to back up.

Let's consider how we back that thang’ up.

Here is a visual that shows the whole flow:



At each point in the sequence, we need to make sure that we are prepared to handle back-pressure. Specifically, we need to validate the behavior of each component when the next downstream component refuses to accept additional data.

For example, if our little TCP server (“bridge”) blindly accepted packets from the TCP stream, we would quickly OoM the JVM if/when Kinesis slowed down. Thus, in the “bridge” we need to apply back-pressure upstream, to prevent that from happening. More specifically, we need to stop consuming off of the TCP stream, and hope that syslog-ng reacts appropriately.

Fortunately, syslog-ng has accounted for this case with their “output buffer”, shown above.  The output buffer is used to store messages for delivery to destinations. If the max-size is reached on that buffer, than the destination is “disconnected” as not to crash syslog. You control the max-size of the buffer through the log_fifo_size setting in syslog-ng.

Different destionation types have different behaviors under this scenario, and we tested out a few of them. First we considered a straight-up pipe between syslog and our bridge process using the “program” destination within syslog-ng. That worked well, but in the event of a backed-up queue, syslog-ng kills the program and respawns a new process. This was less than ideal. We also considered using UDP. That worked as well, and in effect eliminated the back pressure scenario because UDP is “fire and forget”.  Under heavy load however, we noticed packet drops, which meant we were losing events.

In the end, we decided to go with TCP.  With TCP we won’t silently lose messages, and syslog-ng won’t continually restart our process.  With that decision, we needed to confirm the behavior of syslog-ng.  And for that, we needed to monitor syslog-ng. This handy dandy command-line does exactly that:

syslog-ng -Fevd >& syslog.log

While monitoring syslog, we tested two failure scenarios.  The first tested a dead bridge (no TCP listener).  In this situation, syslog-ng won’t be able to connect.  We needed to make sure syslog-ng behaves well under that case. To test this, we ran 100K messages through the system over 40 seconds (>2K/sec). When monitoring syslog-ng, we saw it trying to reconnect:

$grep 4242 syslog.log | grep failed
Connection failed; server='AF_INET(127.0.0.1:4242)', error='Connection refused (111)', time_reopen='10'
...
Connection failed; server='AF_INET(127.0.0.1:4242)', error='Connection refused (111)', time_reopen='10'

All the messages made it through to our log file (hallelujah!), which means the downed TCP connection had no affect on the log file destination. And furthermore, syslog-ng continually retried to establish the TCP connection, so when the process did come back up, it reconnected! nice! One very important thing to note from the above output is the “time_reopen”. It turns out that this is a global configuration option, which tells syslog-ng how long to wait until re-attempting to establish the connection.  So, the behavior was solid – and we can actually configure how noisy things get if/when we lose the bridge process.

For the final test, we needed to see what happens when we apply back-pressure from the java process itself.  In this case, syslog-ng can connect, but the java process refuses to read off the stream/socket.  For this, we ran the same 100K test, but paused consuming in the java process to simulate a Kinesis slow-down. And again, we saw good things...  All the messages made it through to the log file, and this time we saw messages from syslog-ng indicating that the ouput buffer (fifo queue) was full:

syslog.log:Destination queue full, dropping message; queue_len='1000', mem_fifo_size='1000'
...
syslog.log:Destination queue full, dropping message; queue_len='1000', mem_fifo_size='1000'

And the log message tells you everything, after the queue fills up, it drops messages on the floor until it can re-establish the socket.

So, there you have it… to connect syslog to Kinesis, I’d recommend using a TCP output destination with some glue code between that and the Kinesis Producer Library (KPL). (and again, I’ll see if we can open source some of that) Just be careful, and apply the back pressure to syslog-ng.

Per our experiments, syslog-ng can back that thang up!

Tuesday, October 6, 2015

Destinations for syslog-ng : UDP + Program (Java)

This is just a quick mental note because I keep losing them...

Here are two quick gists that show how to configure destinations within syslog-ng.

UDP Destination

This destination will spew UDP datagram packets:
destination udp_spew { udp("localhost"
   port(8052)
   template("$MSGONLY\n")
   template_escape(no)
); };

Process Destination

This destination will spew log events at a java program:
destination program_spew { program("/opt/jdk1.7.0_79/bin/java -jar /mnt/foo.jar arg1 arg2"
    template("$MSGONLY\n")
    template_escape(no)
); };;

Connecting the Destination to the Source

For both of these, don't forget to connect it to the source!
log { source(s_sys); ...; destination(program_spew); };

Monday, October 5, 2015

Getting started w/ Python Kinesis Consumer Library (KCL) (via IPython Notebook)


We are racing ahead with Kinesis.  With streaming events in hand, we need to get our data scientists tapped in. One problem, the KCL and KPL are heavily focused on Java, but our data scientists (and the rest of our organization) love Python. =)

We use IPython notebook to share python code/samples.  So, in this blog I'm going to get you up and running, consuming events from Kinesis, using Python and IPython notebook.  If you are already familiar with IPython notebook, you can pull the notebook from here:
https://github.com/boneill42/kcl-python-notebook/

If you aren't familiar with iPython Notebook, first go here to get a quick overview.  In short, IPython Notebook is a great little webapp that lets you play with python commands via the browser, creating a wiki/gist-like "recipe" that can be shared with others.  Once familiar, read on.

To get started with KCL-python, let's clone the amazon-kinesis-client-python github repo:

   git clone git@github.com:awslabs/amazon-kinesis-client-python.git

Then, create a new virtualenv.  (and if you don't use virtualenv, get with the program =)
 
   virtualenv venv/kcl 
   source venv/kcl/bin/activate

Next, get IPython Notebook setup.  Install ipython notebook, by running the following commands from the directory containing the cloned repo:

   pip install "ipython[notebook]"
   ipython notebook

That should open a new tab in your browser.  Start a new notebook by clicking "New->Python 2".   A notebook comprises one or more cells.  Each cell has a type.   For this exercise, we'll use "Code" cells.   We'll take advantage of "magic" functions available in IPython Notebook.  We'll use run, which will execute a command.  Specifically, need execute the python setup for KCL.

The amazon-kinesis-client-python library actually rides on top of a Java process, and uses MultiLangDaemon for interprocess communication.  Thus, to get KCL setup you need to download the jars.  You can do that in the notebook by entering and executing the following in the first cell:

   run setup.py download_jars

Next, you need to install those jars with:

   run setup.py install

That command installed a few libraries for python (specifically -- it installed boto, which is the aws library for python).  Because the notebook doesn't dynamically reload, you'll need to restart IPython Notebook and re-open the notebook. (hmmm... Is there a way to reload the virtualenv used by the notebook w/o restarting?)

Once, reloaded you are ready to start emitting events into a kinesis stream.  For now, we'll use the sample application in the KCL repo.  Create a new cell and run:

   run samples/sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster

From this, you should see:

Connecting to stream: words in us-east-1
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words

Woo hoo! We are emitting events to Kinesis!  Now, we need to consume them. =)

To get that going, you will want to edit sample.properties.  This file is loaded by KCL to configure the consumer.   Most importantly, have a look at the executableName property in that file.  This sets the name of the python code that the KCL will execute when it receives records.  In the sample, this is sample_kclpy_app.py.  

Have a look at that sample Python code.  You'll notice there are four important methods that you will need to implement:  init, process_records, checkpoint, and shutdown.  The purpose of those methods is almost self-evident, but the documentation in the sample is quite good.  Have a read through there.

Also in the sample.properties, notice the AWSCredentialsProvider.  Since it is using Java underneath the covers, you need to set the Java classname of the credentials provider.  If left unchanged, it will use: DefaultAWSCredentialsProviderChain.

It is worth looking at that documentation, but probably the easiest route to get the authentication working is to create a ~/.aws/credentials file that contains the following:

[default]
aws_access_key_id=XXX
aws_secret_access_key=XXX

Once you have your credentials setup, you can crank up the consumer by executing the following command in another notebook cell:

   run samples/amazon_kclpy_helper.py --print_command --java /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/bin/java --properties samples/sample.properties

Note that the --java parameter needs the location of the java executable.  Now, running that in the notebook will give you the command line that you can go ahead and execute to consume the stream.   Go ahead and cut-and-paste that into a command-line and execute it.  At this point you are up and running.

To get started with your own application, simply replace the sample_kclpy_app.py code with your own, and update the properties file, and you should be off and running.

Now, I find this bit hokey.  There is no native python consumer (yet!).  And fact you actually need to run *java*, which will turn around and call python.  Honestly, with this approach, you aren't getting much from the python library over what you would get from the Java library.  (except for some pipe manipulation in MultiLangDaemon)  Sure, it is a micro-services approach... but maybe it would be better to simply run a little python web service (behind a load balancer?).   Then, we'd be able to scale the consumers better without re-sharding the Kinesis stream.  (One shard/KCL worker could fan-out work to many python consumers across many boxes)  You would need to be careful about checkpointing, but it might be a bit more flexible.  (and would completely decouple the "data science" components from Kinesis)  Definitely food for thought. =)

As always, let me know if you run into trouble.

Friday, September 25, 2015

Druid : Vagrant Up (and Tranquility!)


We've been shown the light.  

After a great meeting with Fangjin and Giam, we are headed down the Tranquility route for real-time ingestion from Kinesis to Druid.  For me, that means getting comfortable with the expanded set of node types in the Druid cluster.

There is no better way to get comfortable with the cluster and the new node types, than to spin up a complete environment and kick the tires a bit.  And there is no better way to do that than "vagrant up"!

So, this morning I set out to create a complete environment suitable for Tranquility, inclusive of middle managers (which are typically omitted from a standalone "simple" cluster).  I started with Quantily's vagrant config for Druid 0.6.160, forked the repo, and went to work.

If you are impatient, you can find my fork here and get going.  If you have the patience...

It's important to understand the anatomy of Druid cluster.  First off, Druid relies on Zookeeper and MySQL.  The install.sh script installs vanilla versions of these, and creates a druid database and user in MySQL.  (Druid itself populates the schema at startup.)

The following is a list of the server types in a Druid cluster.  On our vagrant server, each of the servers occupies a different port, and has its own configuration. (also detailed below)

Overlord: (port 8080)

The overlord is responsible for task management on the Druid cluster.  Since Tranquility is largely an orchestration engine, doling out tasks to create segments as they form in real-time, the overlord is the entry point into the Druid cluster for Tranquility. 


Coordinator: (port 8081)

The coordinator is responsible for segment management, telling nodes to load/drop segments.


Broker: (port 8090)

The Broker is the query proxy in the system.  It receives the query, knows which nodes have which segments, and proxies the query to the respective nodes.


Historical: (port 8082)

Historical nodes are the beasts that load the segments and respond to queries for "static" data.  Once a segment has been committed to deep storage, it is loaded by a historical node, which responds to queries.


MiddleManager: (port 8100)

MiddleManagers are exactly that. =)  They push tasks to Peons, which they spawn on the same node.  Right now, one Peon works on one task, which is produces one segment.  (that may change) 


Special Notes about Real-Time nodes:
Per my previous blog, we were debating the use of a real-time (RT) node.   But with Tranquility, you don't need RT nodes.  They are replaced with transient peon instances that run a temporary firehose ingesting the events for that segment directly from the Tranquility client.  

Druid is targeting highly-available, fully-replicated, transactional real-time ingestion.  Since RT nodes share nothing.  They are unable to coordinate and unable to replicate data, which means they don't fit well into the strategic vision for the project.  Tranquility, and its coordinated ingestion model, may eventually obsolesce/replace RT nodes entirely. (See #1642 for more information)

Getting Started

OK -- down to business.

To fire up your own cluster, simply clone the repository and "vagrant up".  Once things are up and running, you can:

Hit the Overlord at:

Hit the Coordinator at:

In my next blog, I'll detail the client-side of integrating Tranquility using the direct Finagle API.




Thursday, September 24, 2015

Kinesis -> Druid : Options Analysis (to Push? to Pull? to Firehose? to Nay Nay?)


Druid is one of the best slicers and dicers on the planet (except maybe for the Vegematic-2 ;).  And I know there are people out there that might argue that Elastic Search can do the job (Shay & Isaac, I'm looking at you), but MetaMarkets and Yahoo have proved that Druid can scale to some pretty insane numbers, with query times an order of magnitude better than Spark.  And because of that, we've come to rely on Druid as our main analytics database.

The challenge is figuring out how best to pipe events into Druid.  As I pointed out in my previous post, Kinesis is now a contender.  Alas, Druid has no first class support for Kinesis.  Thus, we need to figure out how to plumb Kinesis to Druid.  (<-- druidish="" intended="" p="" pun="">
First, let me send out some Kudos to Cheddar, Fangjin, Gian and the crew...
They've made *tremendous* progress since I last looked at Druid. (0.5.X days)
That progress has given us the additional options below.

Here are the options as I see them:


Option 1: Traditional KinesisFirehose Approach

In the good ole' days of 0.5.X, real-time data flowed via Firehoses.  To introduce a new ingest mechanism, one simply implemented the Firehose interface, fired up a real-time node, and voila: data flowed like spice. (this is what I detailed in the Storm book)

And since 0.5, Druid has made some updates to the Firehose API, to specifically address the issues that we saw before (around check-pointing, and making sure events are processed only once, in a highly-available manner).  For the update on the Firehose API, check out this thread.

After reading that thread, I was excited to crank out a KinesisFirehose.  And we *nearly* have a KinesisFirehose up and running.  As with most Java projects these days, you spend 15 minutes on the code, and then two hours working out dependencies.  Ironically, Amazon's Kinesis Producer Library (KPL) uses an *older* version of the aws-java-sdk then Druid, and because the firehose runs in the same JVM as the Druid code, you have to workout the classpath kinks.  When I hit this wall, it got me thinking -- hmmm, maybe some separation might be good here. ;)

To summarize what I did: I took the KPL's push model (implemented IRecordProcessor), put a simple little BlockingQueue in place, and wired it to Druid's pull model (Firehose.nextRow).  It worked like a charm in unit tests. ;)

(ping me, if you want to collaborate on the firehose implementation)

Option 2: The EventReceiver Firehose

Of course, the whole time I'm implementing the Firehose -- I'm on the consuming end of a push model from the KPL and on the producing end of a pull model from Druid, and forced to dance between those two.  It made me wonder, "Isn't there a way to push data into Druid?".   If there were such a thing, then I could just push data into Druid as I receive it from the KPL. (that sounds nice/simple doesn't it?)

This ends up being the crux of the issue, and something that many people have been wrestling with.  It turns out that there is a firehose specifically for this!  Boo yah!  If you have a look at the available firehoses, you'll see an Event Receiver firehose.  The Event Receiver firehose creates a REST endpoint to which you can POST events.  (have a look at the addAll() method's javax.ws.rs.POST annotation)

Beautiful.  Now, I could receive messages from the KPL and POST them to Druid.  Simple. I  move the checkpoint in Kinesis after I safely receive 200's for all of my events.  Unfortunately, what happens to those events on the Druid side if the node that receives them fails?  Also, how do I scale that ingest mechanism?  Are there multiple REST endpoints behind a load balancer? 

I started up a real-time node with a spec file instructing Druid to fire up (<- 404="" a="" an="" and="" another="" appeared="" back="" but="" came="" code="" complete="" documentation="" druidish="" endpoint.="" eventreceiver.="" found="" from="" grep="" ing="" nbsp="" nothing="" out="" pointed="" pun="" s="" started="" startup="" t="" that="" the="" to="" url="" workerresource:="">

http://:/druid/worker/v1/chat//push-events/

Well, that WorkerResource was over in the indexing-service... hmmm, what is an IndexingService?

We tried to answer all the above questions around availability and scalability, and did some further research on the IndexingService.  That lead us to Tranquility. (hey, that sounds so nice and pleasant, maybe we want to use that!?)


Option 3: Tranquility

In doing some due-diligence on Tranquility, I discovered that while I was away, Druid implemented their own Task management system!!  (See the Indexing-Service documentation for more information)  Honestly, my first reaction was to run for the hills.  Why would Druid implement their own task management system?  MiddleManagers and Peons... sounds an awful lot like YARN.  I thought the rest of the industry was moving to YARN (spark, samza, storm-yarn, etc.) YARN was appropriately named, Yet Another Resource Negotiator, because everyone was building their own! What happened to the simplicity of Druid, and the real-time node?

Despite that visceral reaction, I decided to give Tranquility a whirl.  I fired up the simple cluster from the documentation, incorporated the Finagle-based API integration and was able to get data flowing through the system.  I watched the overlord's console to watch the tasks move from pending to running to complete.  It worked!  I was happy... sort of.  I was left with a "these are small Hadoop jobs" taste in my mouth.   It didn't feel like events were "streaming".  

Now, this might just be me.  In reality, many/all of the streaming frameworks are actually just doing micro-batching.  And with Druid's segment-based architecture, the system really needs to wait for a windowPeriod/segmentGranularity to pass before a segment can be published.  I get it.

I'm just scared.  For me, the more nouns involved, the more brittle the system: tasks, overlords, middle managers, peons, etc.    Maybe I should just get myself some kahonas, and plow ahead.


Next Step:

Regardless,  we plan to meet up with Fangjin Yang and Gian Merlino, who are presently coming out of Stealth mode with a new company.  I have a tremendous amount of faith in these two.   I'm confident they will steer us in the right direction, and when they do -- I'll report back. =)








Monday, September 21, 2015

Amazon Kinesis : 20x cheaper when used with the Kinesis Producer Library (KPL)!

We've been looking at the tradeoffs between Kafka and Kinesis, and just recently.... Kinesis won!

Kinesis pricing is based on two dimensions: hourly price for each shard (which dictates overall concurrency/throughput), and a unit price per 1M messages.  As I alluded to in my previous post, we process a *ton* of events, and if we simply pushed those events onto Kinesis streams 1:1, we would quickly hand over all of our money to Amazon and go out of business.

However, with the recent release of the Kinesis Producer Library (KPL), Amazon exposed a native capability to aggregate multiple messages/events into a single PUT unit.  The maximum size of a PUT unit is 25Kb.  If you have a message size of 150 bytes, you can cram about 150 messages  into a single PUT unit! (and save some serious dough!)

Here is the math, straight from the Kinesis Pricing Calculator:

Without the KPL,
100K m / s * (150 bytes / m) = 100 shards and 263,520M PUT units = $4,787.28 / month

* Note: That each shard can only process 1K/s, which is why we end up with 100 shards.

Now with KPL, let's assume we can only fit 100 messages in each PUT unit.
We would reduce our required throughput (messages / second) down to 1K / s.
With 100 messages in each PUT unit,  each unit would be 15Kb in size. (100 * 150 bytes)

This gives us:
1K m / s * (15 Kb / m) = 15 shards and 2,635.2M PUT units = $201.60 / month

That is s savings of: ~20x!!

So, let's look at what that means architecturally...

First, you are going to want to look at the KPL recommended usage matrix:
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-kpl-integration.html

My naive interpretation of that chart is: Use Java!

Ironically, the producer is actually a native built micro-service/binary.  And the KPL is a java wrapper around that native binary that uses interprocess communication to delegate work to the micro-service.

In Java, it is straightforward to integrate the KPL.  Here are the key components.

First, you need to configure the producer:

1
2
3
4
5
6
7
8
9
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion(region);
config.setCredentialsProvider(new ProfileCredentialsProvider());
config.setMaxConnections(1);
config.setRequestTimeout(60000);
config.setRecordMaxBufferedTime(15000);
config.setNativeExecutable(
  "/Users/brianoneill/git/amazon-kinesis-producer/bin/darwin-4.2.1/debug/kinesis_producer");
producer = new KinesisProducer(config);


Notice, that the KPL actually sends the messages asynchronously, buffering/aggregating those messages internally.  (which makes for challenging guarantees around message delivery -- but we'll defer that problem for another day)

Also notice, that I built my own native binary, and specified its location.  You don't necessarily need to do that, the binary is actually bundled into the JAR file and extracted into a temporary location at runtime.  (yeah, crazy eh?)

Next, you need to send some data:

This is done simply, one line of code:

1
2
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName,
  Long.toString(System.currentTimeMillis()), data);


Finally, you need to find out what happened to your message:

For this one, we'll add a callback to the future:

1
Futures.addCallback(f, callback);

Where, callback is a method that will be called when the record is processed.  Here is an example:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
 @Override
 public void onFailure(Throwable t) {
  if (t instanceof UserRecordFailedException) {
   Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
   LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(),
     last.getErrorMessage()));
  }
  LOGGER.error("Exception during put", t);
  System.exit(1);
 }

 @Override
 public void onSuccess(UserRecordResult result) {
  // LOGGER.info("Successfully wrote [" + result.toString() + "]");
  completed.getAndIncrement();
 }
};


And there you have it, KPL in a nutshell.

For us, it was a huge win, and actually made Kinesis a viable alternative.
Now, we just need to figure out how to plumb it into everything else. =)

Kinesis Firehose to the rescue!  (stay tuned)




Friday, September 18, 2015

Gone Monetate : Personalizing Marketing at 100K events/second


Internet advertising has gone the way of junk mail.  There are tons of impersonal advertisements sucking up screen real-estate and mailbox space.   The majority of the advertising industry leverages only inane intelligence that assumes, "because I clicked on a rubber ducky once, I must want to buy a rubber ducky".  In fact, I probably want to see rubber duckies on every site I visit, which is the internet equivalent of a mailbox full of refinance offers the second I have my credit checked.  I'd much rather be shown things I care about.  Even better, show me stuff I didn't know I cared about!

That is the reason I joined Monetate.  Well, it was that combined with the opportunity to join a rock-star team of passionate geniuses, working on an incredibly challenging real-time data and analytics problem, hell-bent on changing the world.

I've only been on the team for a week, and I've been blown away.  Directed by Jeff Persch, the development teach has architected a phenomenal platform: scalable and resilient, with simplicity as a core tenant.  With Monetate at the center of many of the e-commerce interactions, influencing almost 1/3 of the dollars spent on Black Friday, that is no small feat.  We are gearing up for another black friday, for which peak traffic may hit 50-100K events / second!

Doing some math on that, the numbers get big really fast:

100K / s = 6M / minute = 360M / hour = ~8B events / day

Yep, that ended in the B word... per day.  And like everyone else, we are pushing to deliver better, faster, and more insightful analytics on those user activity streams... in near real-time.

And because of that, I hope to have many useful/fun blog posts coming.

Right now, we're in the process of rolling out Druid, a favorite technology of mine. (and something to which we dedicated a whole chapter in our Storm book -- (shameless plug ;))  With Druid in place, we'll be able to slice and dice the real-time events.

And we already have the ability to ingest user context in real-time. When you combine that user context with stream processing that allows us to identify behavioral trends, our decision engine can make contextual decisions, adapt on the fly, and deliver relevant/interesting content to the end-user!

Hooray!

Imagine if we could only do that for the real world too.  Instead of coming home to a mailbox filled with junk mail that I immediately throw awawy,  I'd have updates on the latest gadgetry and fishing paraphernalia.  Hmm, maybe I can get that on the roadmap. ;)

Anyway, I couldn't be happier with my choice to join Monetate.  The people, culture and technology are astounding.

Stay tuned -- likely my next blog post will show how to leverage Amazon's Kinesis for 1/100th the cost.  (by aggregating events into a smaller number of Kinesis records, decreasing the overall message rate, you can save mondo dough =)





Thursday, July 16, 2015

Cloud Formation on AWS for Cassandra + HPCC


If your primary objective is to setup a simple Cassandra cluster, then you probably want to start here:
http://docs.datastax.com/en/cassandra/2.1/cassandra/install/installAMI.html

However, if you have an existing AWS cluster to which you want to add Cassandra, then read on.

In my case, I wanted to add Cassandra to an existing HPCC cluster.  More specifically, I wanted to be able to spin-up an HPCC + Cassandra cluster with a single command.  To accomplish this, I decided to add a bit of python scripting on top of Cloud Formation.

Amazon has a facility called Cloud Formation.  Cloud Formation reads a JSON template file, and creates instances as described in that file. (pretty slick)  Within that JSON, you can execute shell commands that do the heavy lifting.  The JSON file can define parameters that the administrator can then provide via the management console, or via AWS CLI.
(IMHO, I suggest installing AWS CLI)

Running a Cloud Formation

First, I started with Tim Humphrie's EasyFastHPCCoAWS.  That cloud formation template is a great basis.  It installs AWS CLI, and copies the contents of an S3 bucket down into /home/ec2-users.  Have a look at the template file.  To get that up and running, it is a simple matter of creating a PlacementGroup, a KeyPair, and an S3 bucket, into which you copy the contents of the github repo.  For simplicity, I named all of those the same thing: "realtime-hpcc".

Now, with a single command, I can fire up a low-cost HPCC cluster with the following:

aws cloudformation create-stack --capabilities CAPABILITY_IAM --stack-name realtime-hpcc --template-body https://s3.amazonaws.com/realtime-hpcc/MyHPCCCloudFormationTemplate.json --parameters \
   ParameterKey=HPCCPlacementGroup,ParameterValue=realtime-hpcc \
   ParameterKey=HPCCPlatform,ParameterValue=HPCC-Platform-5.2.2-1 \
   ParameterKey=KeyPair,ParameterValue=realtime-hpcc \
   ParameterKey=MasterInstanceType,ParameterValue=c3.2xlarge \
   ParameterKey=NumberOfRoxieNodes,ParameterValue=1 \
   ParameterKey=NumberOfSlaveInstances,ParameterValue=1 \
   ParameterKey=NumberOfSlavesPerNode,ParameterValue=2 \
   ParameterKey=RoxieInstanceType,ParameterValue=c3.2xlarge \
   ParameterKey=ScriptsS3BucketFolder,ParameterValue=s3://riptide-hpcc/ \
   ParameterKey=SlaveInstanceType,ParameterValue=c3.2xlarge \
   ParameterKey=UserNameAndPassword,ParameterValue=riptide/HIDDEN

Note, I specified the template via https url.  I also specified a stack-name, which is what you'll use when querying AWS for status, which you can do with the following command:

aws cloudformation describe-stacks --stack-name realtime-hpcc

With that you get a nice, clean JSON back that looks something like this:

{
    "Stacks": [
        {
            "StackId": "arn:aws:cloudformation:us-east-1:633162230041:stack/realtime-hpcc/e609e0b0-2595-11e5-97b7-5001b34a4a0a",
            "Description": "Launches instances for fast executing HPCC on AWS. Plus, it sets up and starts HPCC System.",
            "Parameters": [
                {
                    "ParameterValue": "realtime-hpcc",
                    "ParameterKey": "KeyPair"
                }...
            ],
            "Tags": [],
            "CreationTime": "2015-07-08T17:22:24.461Z",
            "Capabilities": [
                "CAPABILITY_IAM"
            ],
            "StackName": "realtime-hpcc",
            "NotificationARNs": [],
            "StackStatus": "CREATE_IN_PROGRESS",
            "DisableRollback": false
        }
    ]
}

The "StackStatus" is the key property.  You'll want to wait until that says, "CREATE_COMPLETE".
Once it completes, you can go into the management console and see your EC2 instances.

If something went wrong, you can go have a look in /var/log/user-data.log.  Tim's template conveniently redirects the output of the shell commands to that log file.

Installing Cassandra 

NOW -- to actually get Cassandra installed on the machines, I simply forked Tim's work and altered the Cloud Formation template to include the datastax repo and a yum install of Cassandra.   And the next time I created my cluster: poof magic voodoo, Cassandra was installed!

Next I needed to configure the Cassandra instances into a cluster.   At first, I tried to do this using a shell script executed as part of the cloud formation, but that proved difficult because I wanted the IP addresses for all the nodes, not just the one on which the script was running.  I shifted gears and decided to orchestrate the configuration from python after the cloud had already formed.

I wrote a quick little python script (configure_local_cassandra.py) that takes four parameters: the location of the cassandra.yaml file, the cluster name, the private IPs of the Cassandra nodes, and the IP of the node itself.   The python script updates the cassandra config, substituting those values into the template file.  I added this to the S3 bucket, and Cloud Formation took care of deploying the template and the python script to the machines.  (thanks to Tim's template)

Configuring Cassandra 

With that script and the template in place on each machine, the final piece is the script that gathers the IP addresses for the nodes and calls the python script via ssh.  For this, we use the aws ec2 cli, and fetch the JSON for all of our instances.  The aws ec2 command looks like this:

aws ec2 describe-instances

I wrote a python script (configure_cassandra_cluster.py) that parses that JSON and run commands on each of the nodes via ssh.

To make everything simple, I added a bunch of shell scripts that wrap all the command lines (so I don't need to remember all the parameters).  The shell scripts are as follow

Convenience Scripts

To keep simple, I also added a bunch of shell scripts that wrap all the command lines (so I don't need to remember all the parameters).   The shell scripts allow you to create a cluster, get the status of a cluster, and delete a cluster using a single command line:

create_stack.sh, get_status.sh, delete_stack.sh

(respectively)

Putting it all together...

To summarize, the create_stack.sh script uses aws cloudformation to create the cluster.
Then, you can watch the status of the cluster with, get_status.sh.
Once formed, the configure_cassandra_cluster.py script installs, configures and starts Cassandra.

After that, you should be able to run ecl using Casssandra!

Feel free to take these scripts, and apply them to other things.  And kudos to Tim Humphries for the cloud formation template.


Wednesday, June 10, 2015

Amazon Echo : Syntax, Semantics, Intents and Goals: NLP over time.

So I caved.  Even with all my Apple paraphernalia, I bought an Amazon Echo.  I've had it for a little over a week, and I'm hooked.  We use it to play music, check the weather, and set timers -- all of the out of the box functionality.  You may think, "It's just Siri in a room".  And that is one perspective.  From a different perspective, Alexa (the name you use to interact with Echo) will change everything, and bring the Internet of Things (IoT) to the masses.  Regardless, I'm just amazed at how far we've come with NLP.

Along those lines, I recently had an academic discussion this week around syntax and semantics in machine-to-machine interfaces (APIs), and how that correlates to user/consumer intent -- specifically whether or not you can infer user intent from captured REST calls.  (FWIW -- I believe you can infer intent (at least some notion of it), although it might only be a portion of the user's goal, and may be unintentionally misaligned with the semantics of the call.)

With that conversation fresh in my mind, I found it amusing that the developer API for Echo specifically calls out "intent" and defines it as:

"In the context of Alexa apps, an intent represents a high-level action that fulfills a user’s spoken request. Intents can optionally have arguments called slots. Note that intents for Alexa apps are not related in any way to Android intents." - Echo Developer : Getting Started Guide

That made me nostalgic.  I loved my days in NLP, and it's absolutely phenomenal to see how things have played out over the years...

(NOTE: what follows is almost entirely self-serving, and you may not get anything out of it.  I am not responsible for any of the time you lose in reading it ;)

I think I was eighteen when I started working at the Natural Language Processing (NLP) group within Unisys.  I was one of the many developers building those terrible voice recognition systems on the other end of the phone when you dialed in for customer service and received anything but that. We frustrated our end-users, but -- I got to work with some amazing people: Debbie Dahl, Bill Scholz, and Jim Irwin.

And we thought we were smart.  We built all sorts of tools that helped map text into actions ("intents"),  and new fangled web servers for voice recognition.  They were good times, but frankly we were only inflicting pain on people. Voice recognition wasn't there yet.   We spent our time trying to engineer the questions properly, to guide users to answer with certain terms to make it easy on the voice recognition system.  (The year was (1995-1999ish)

For a time after that, I wanted nothing to do with voice recognition.  I still loved NLP though, and went on to build a system to automate email routing and responses for customer service. That worked because it was numbers game.  If the system was confident enough to answer 50% of the customer inquires, it meant they didn't need humans to respond to that subset, which saved moolah.  We went on to extend that into a real-time instant messaging over the web (Kana IQ).  Again, good times and lots of brain work/patents in the process, and we congratulated ourselves for figuring out how to map a Bayesian inference engine on to a grammar. (1999-2001ish)  But there is no way we could have put that system directly in the hands of consumers with no human support.

However -- it was during this time that I started appreciating the difference between: Syntax, Semantics, Intents and Goals.  Here are the straight-up definitions:

Syntax : the arrangement of words and phrases to create well-formed sentences in a language.
Semantics : the branch of linguistics and logic concerned with meaning.
Intent : the reason for which something is done or created or for which something exists.
Goal : the object of a person's ambition or effort; an aim or desired result.

Consider the process of taking characters in a string as input and converting it into actions that help the user achieve their goal.  First, you need to consider syntax.  In this part of the process, you are converting useless string of characters into related tokens.   There are lots of ways to do this, and multiple pieces to the puzzle (e.g. Part of Speech Tagging), etc.  Back at Brown, I had the pleasure of studying under Eugene Charniak, and his book on Statistical Natural Language Learning became a favorite of mine (after initially hating it for a semester ;).  Ever since that course, I've fallen back on Context-Free-Grammars (CFGs) and chart-parsing to attempt to relate word tokens to each other in a sentence.

Once you have related tokens, and you know the parts of speech (ADJ, NOUN, etc), and how they relate (ADJ _modifies_ NOUN) via chart-parsing, you can attempt to assign semantics.  IMHO -- this is the hard part.  

After Kana, I took a job with a company that recorded patient/doctor conversations, transcribed them, and then attempted to perform computational linguistics on them.  If you can imagine, trying to discern between a mention of a *symptom* and a *side-effect* is incredibly difficult.  You not only need solid parsing to associate the terms in the sentence, but you need a knowledge base to know what those terms mean, and the context in which they are being used.  In this phase, we are assigning meaning to the terms.  (i.e. semantics)  Sure, we used Hadoop for the NLP processing, but more horse power didn't translate into better results... (now 2008-2010ish)

Even assuming you can properly assign semantics,  you may misinterpret the intent of a communique.  Even with human-to-human communication, this happens all the time.  English is a horrible language.  And the same thing can happen with machines.  However, you have to assume that more often than not, assuming you can parse the sentence (i.e. assign syntax), and you can interpret meaning (i.e. semantics), you can infer some notion of intent from the textual gibberish. Otherwise, communication is just fundamentally broken.

With all this in mind -- back to Echo.

Amazon nailed it.  I see the 20 years of my NLP frustration, solved in a 12" cylinder.  It rarely misses on the voice recognition.  And they have a fantastic engine taking phonemes to "intent", allowing developers to plugin at that highest layer.  Boo yah.  That's empowering.  I might just build an app for Echo.  Maybe an AWS integration --  "Alexa, please expand my AWS cluster by 10 nodes".   It'd be therapeutic. ;)






Thursday, May 14, 2015

Spark SQL against Cassandra Example


Spark SQL is awesome.  It allows you to query any Resilient Distributed Dataset (RDD) using SQL.  (including data stored in Cassandra!)

First thing to do is to create a SQLContext from your SparkContext.  I'm using Java so...
(sorry -- i'm still not hip enough for Scala)

        JavaSparkContext context = new JavaSparkContext(conf);
        JavaSQLContext sqlContext = new JavaSQLContext(context);


Now you have a SQLContext, but you have no data.  Go ahead and create an RDD, just like you would in regular Spark:


        JavaPairRDD<Integer, Product> productsRDD = 
            javaFunctions(context).cassandraTable("test_keyspace", "products",
                productReader).keyBy(new Function<Product, Integer>() {
            @Override
            public Integer call(Product product) throws Exception {
                return product.getId();
            }
        });

(The example above comes from the spark-on-cassandra-quickstart project, as described in my previous post.)

Now that we have a plain vanilla RDD,  we need to spice it up with a schema, and let the sqlContext know about it.  We can do that with the following lines:

        JavaSchemaRDD schemaRDD =   sqlContext.applySchema(productsRDD.values(), Product.class);        
        sqlContext.registerRDDAsTable(schemaRDD, "products");   

Shazam.  Now your sqlContext is ready for querying.  Notice that it inferred the schema from the Java bean. (Product.class).  (Next blog post, I'll show how to do this dynamically)

You can prime the pump with a:

        System.out.println("Total Records = [" + productsRDD.count() + "]");

The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast:

        JavaSchemaRDD result = sqlContext.sql("SELECT id from products WHERE price < 0.50");
        for (Row row : result.collect()){
            System.out.println(row);
        }

That's it.  Your off to the SQL races.


P.S.  If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this:

Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'id, tree:
'Project ['id]
 'Filter ('price < 0.5)
  NoRelation$