Avisi Blog

Tools for building a real time analytics platform

Geschreven door Robin van Breukelen | 01 July 2013

Recently I did a bit of research on tools that are often mentioned in articles about 'big data' and real time analytics. Through this article I hope to provide some insight in how some of those tools might be used together to build a real time analytics platform. The tools I used in this particular case are Storm and Apache Camel.

 

 

When you're interested in real time analytics the main challenge is (perhaps quite obviously) the real-time part. As the phrase implies, real time analytics is analyzing or acting upon things as they happen. You must have all the parts in play to be able to fully utilize the benefits of real time processing.

 

A good starting point for this is Storm. Storm is a “distributed real time computation system”. As their homepage says: they're doing for real time processing what Hadoop did for batch processing. The basic concepts of Storm are relatively simple:

  • Spouts: source of streams of tuples. For example a spout may read tuples from a queue and emit them
  • Bolt: consumer of streams. For example read the incoming tuples and check to see if a certain threshold isn't breached

 

Spouts and bolts can be wired together. A combination of spouts and bolts is called a topology.

 


Storm topology

Storm is distributed, as such a Storm cluster consists of two types of nodes: the master node and the worker nodes. The master node runs a daemon process called 'Nimbus '. Nimbus takes care of distributing code, assigning tasks and monitoring for failures. Each of the worker nodes run a daemon process called the 'Supervisor' which listens to any assigned work and starts and stops processes as required by Nimbus. Coordination between Nimbus and the supervisor is done through a ZooKeeper cluster. For more information about the fail-over mechanism check out the Storm wiki: https://github.com/nathanmarz/storm/wiki/

 

 

While Storm can take care of the processing and distribution of the work, at some point you will want to communicate your results to another system, for instance a front-end. In the case of a web application the challenge is to keep it as real-time as possible. A good technology to do this is WebSockets, because you're able to write a stream of messages through WebSockets without having huge TCP or HTTP overhead between them. This way you are able to easily update the user interface as the data comes in.

 

Example

Combining all these tools, I created a small example that submits a Storm topology to a Storm cluster with one spout and two bolts:

  • Word generator spout: reads random words and emits them
  • Word counter bolt: for each word generated by the spout increments the total count per word.
  • JMS bolt: whenever the count of a word is updated, writes the new count to ActiveMQ.

 

The implementation of these components is quite straightforward. Firstly the spout:


public class RandomWordFeeder extends BaseRichSpout {

private SpoutOutputCollector collector;
private Random random;

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}

@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}

@Override
public void nextTuple() {
String[] words = new String[]{
"accompts", "active", "altiloquent", "bicuspid", "biweekly", "buffo", "chattels", "detached", "gaoler", "heeltap", "milksop",
"paralyzed", "passado", "reciminate", "repetend", "supertonic", "swashbuckler", "vaporarium", "wenching", "withers"
};

collector.emit(new Values(words[random.nextInt(words.length)]));
}
}

 

Whenever Storm asks the spout for a new data set (by calling the nextTuple method) the spout selects a new word randomly and emits that. The word counter bolt:

 

public class WordCounterBolt extends BaseBasicBolt {

private Map counts = new HashMap();

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);

Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);

collector.emit(new Values(word, count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}

 

This bolt has a very naive approach to reading its data: it doesn't check if the Tuple is in the correct format or anything, but for testing purposes this will do.

 

The bolt that handles the JMS part is provided by a contribution library of Storm, called storm-jms, available at: https://github.com/ptgoetz/storm-jms.

 

These components are wired together in a topology and submitted by invoking:

 

TopologyBuilder builder = new TopologyBuilder();

JmsBolt jmsBolt = new JmsBolt();
jmsBolt.setJmsProvider(jmsQueueProvider);
jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {

@Override
public Message toMessage(Session session, Tuple input) throws JMSException {
String json = "{\"word\":\"" + input.getString(0) + "\", \"count\":" + String.valueOf(input.getInteger(1)) + "}";
return session.createTextMessage(json);
}
});

builder.setSpout("wordGenerator", new RandomWordFeeder());
builder.setBolt("counter", new WordCounterBolt()).shuffleGrouping("wordGenerator");
builder.setBolt("jmsBolt", jmsBolt).shuffleGrouping("counter");

Config conf = new Config();
conf.setDebug(true);

LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());

 

In order to easily integrate between Storm and the front-end (through WebSockets) I chose Apache Camel to do the heavy lifting for me. By having the bolts in the Storm topology write their output to an ActiveMQ queue, I could create a Camel route that subscribes to this queue and push the messages to WebSockets, like so:


public class StreamingRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:storm.queue")
.to("websocket://storm?sendToAll=true");
}
}

 

Lastly, I created a small webapplication that uses highcharts to display a graph. The interesting part about this web application is how to read data from WebSockets:

var socket = {
start : function() {
var location = "ws://localhost:9292/storm";
this._ws = new WebSocket(location);
this._ws.onmessage = this._onmessage;
this._ws.onclose = this._onclose;
},

_onmessage : function(m) {
if (m.data) {
newDataReceived(m.data);
}
},
_onclose : function(m) {
if (this._ws) {
this._ws.close();
}
}
};

 

The function newDataReceived handles updating the graph. The end result is a graph that updates the count for a word whenever data comes in.

 

 

Hopefully you will be able to use these tools to get you started with your own projects. Should you have any questions or if you need any help, don't hesitate to contact us, we're happy to help!

 

Get in touch!