We manage complexity by restricting our options
Door Niek Knuiman / jun 2021 / 1 Min
Door Robin van Breukelen / / 3 min
Recently I did a bit of research on tools that are often mentioned in articles about 'big data' and real time analytics. Through this article I hope to provide some insight in how some of those tools might be used together to build a real time analytics platform. The tools I used in this particular case are Storm and Apache Camel.
When you're interested in real time analytics the main challenge is (perhaps quite obviously) the real-time part. As the phrase implies, real time analytics is analyzing or acting upon things as they happen. You must have all the parts in play to be able to fully utilize the benefits of real time processing.
A good starting point for this is Storm. Storm is a “distributed real time computation system”. As their homepage says: they're doing for real time processing what Hadoop did for batch processing. The basic concepts of Storm are relatively simple:
Spouts and bolts can be wired together. A combination of spouts and bolts is called a topology.
While Storm can take care of the processing and distribution of the work, at some point you will want to communicate your results to another system, for instance a front-end. In the case of a web application the challenge is to keep it as real-time as possible. A good technology to do this is WebSockets, because you're able to write a stream of messages through WebSockets without having huge TCP or HTTP overhead between them. This way you are able to easily update the user interface as the data comes in.
Combining all these tools, I created a small example that submits a Storm topology to a Storm cluster with one spout and two bolts:
The implementation of these components is quite straightforward. Firstly the spout:
public class RandomWordFeeder extends BaseRichSpout {
private SpoutOutputCollector collector;
private Random random;
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector collector) {
this.collector = collector;
this.random = new Random();
}
@Override
public void nextTuple() {
String[] words = new String[]{
"accompts", "active", "altiloquent", "bicuspid", "biweekly", "buffo", "chattels", "detached", "gaoler", "heeltap", "milksop",
"paralyzed", "passado", "reciminate", "repetend", "supertonic", "swashbuckler", "vaporarium", "wenching", "withers"
};
collector.emit(new Values(words[random.nextInt(words.length)]));
}
}
Whenever Storm asks the spout for a new data set (by calling the nextTuple
method) the spout selects a new word randomly and emits that. The word counter bolt:
public class WordCounterBolt extends BaseBasicBolt {
private Map counts = new HashMap();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
This bolt has a very naive approach to reading its data: it doesn't check if the Tuple is in the correct format or anything, but for testing purposes this will do.
The bolt that handles the JMS part is provided by a contribution library of Storm, called storm-jms, available at: https://github.com/ptgoetz/storm-jms.
These components are wired together in a topology and submitted by invoking:
TopologyBuilder builder = new TopologyBuilder();
JmsBolt jmsBolt = new JmsBolt();
jmsBolt.setJmsProvider(jmsQueueProvider);
jmsBolt.setJmsMessageProducer(new JmsMessageProducer() {
@Override
public Message toMessage(Session session, Tuple input) throws JMSException {
String json = "{\"word\":\"" + input.getString(0) + "\", \"count\":" + String.valueOf(input.getInteger(1)) + "}";
return session.createTextMessage(json);
}
});
builder.setSpout("wordGenerator", new RandomWordFeeder());
builder.setBolt("counter", new WordCounterBolt()).shuffleGrouping("wordGenerator");
builder.setBolt("jmsBolt", jmsBolt).shuffleGrouping("counter");
Config conf = new Config();
conf.setDebug(true);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
In order to easily integrate between Storm and the front-end (through WebSockets) I chose Apache Camel to do the heavy lifting for me. By having the bolts in the Storm topology write their output to an ActiveMQ queue, I could create a Camel route that subscribes to this queue and push the messages to WebSockets, like so:
public class StreamingRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("activemq:storm.queue")
.to("websocket://storm?sendToAll=true");
}
}
Lastly, I created a small webapplication that uses highcharts to display a graph. The interesting part about this web application is how to read data from WebSockets:
var socket = {
start : function() {
var location = "ws://localhost:9292/storm";
this._ws = new WebSocket(location);
this._ws.onmessage = this._onmessage;
this._ws.onclose = this._onclose;
},
_onmessage : function(m) {
if (m.data) {
newDataReceived(m.data);
}
},
_onclose : function(m) {
if (this._ws) {
this._ws.close();
}
}
};
The function newDataReceived handles updating the graph. The end result is a graph that updates the count for a word whenever data comes in.
Hopefully you will be able to use these tools to get you started with your own projects. Should you have any questions or if you need any help, don't hesitate to contact us, we're happy to help!
| Software Development
Door Robin van Breukelen / okt 2024
Dan denken we dat dit ook wat voor jou is.