Using EsperBolt and Storm to calculate Bollinger Bands

I was trying to hook up Storm and Esper to test their compatibility in calculating Bollinger Bands for stock symbols. I’m fairly new to all of this but I thought it’d be interesting to try to combine them in some way to get a working Storm topology that does this. I won’t go into the full details of the application I developed but just outline the way I managed to get it wired up and spitting out something useful.

The Apache Storm framework is a tool meant to aid with distributed processing of a pipeline of computations by taking one or more data sources (called Spouts) and running them through processing components (called Bolts) and being able to parallelize and distribute them in a cluster.

This allows you to focus on your data source and processing logic rather than worry about how to scale up or out the processing.

Esper, on the other hand, is a Complex Event Processing engine which allows you to run queries or computations directly on the streaming data. This allows you to do such things as checking for trends in a certain period or window and do aggregation calculations such as moving averages or standard deviations regardless of whether the data is historical or realtime.

My original idea was to write lots of small bolts to preprocess the data and then stream to an Esper based bolt to do the final CEP calculations (in this case, calculating the Bollinger Band).

However, I’ve come across https://github.com/tomdz/storm-esper and found it to have done quite a lot more than I could hope to achieve myself in the short amount of time while digesting all the bits of pieces. So thank you tomdz.

Next came trying to figure out what exactly would be enough to pass from a Spout (the data source) to Esper. It turns out, not much. You get your Stock tick data from somewhere, encapsulate it in a POJO (say TickData) and that’s it. The wiring looks like this:

            TopologyBuilder builder = new TopologyBuilder();
	    TickDataSpout spout = new TickDataSpout();

            builder.setSpout(SYMBOLS_SPOUT_ID, spout);

            EsperBolt esperBolt =
			    new EsperBolt.Builder()
			                 .inputs()
			                    .aliasComponent(SYMBOLS_SPOUT_ID)
			                    .withFields("tickData")
			                    .ofType(TickData.class)
			                    .toEventType("Stock")
			                 .outputs()
			                 	.outputs().onStream("aggregationStream").fromEventType("Stock")
			                 	.emit("pcount","symbol", "simpleMovingAverage", "standardDeviation", "price", "timestamp", "open")
			                 .outputs()
			                 	.onStream("outputStream").fromEventType("BollingerResults")
			                 	.emit("symbol", "upperBand", "middleBand", "lowerBand", "price", "bandwidth", "percentB", "timestamp", "open")
			                 .statements()
			                    .add("insert into Aggregation " +
			                    	 "select prevcount(tickData.symbol) as pcount, tickData.symbol as symbol, avg(tickData.price) as simpleMovingAverage, stddev(tickData.price) as standardDeviation, " +
			                    	 "last(tickData.price) as price, last(tickData.timestamp) as timestamp, tickData.open as open from Stock.std:groupwin(tickData.symbol).win:length(20)" +
			                    	 " group by tickData.symbol having count(*) >=20")
			                    .add("insert into BollingerResults select open, " +
										"symbol, " +
										"simpleMovingAverage + 2*standardDeviation as upperBand," +
										"simpleMovingAverage as middleBand," +
										"simpleMovingAverage - 2*standardDeviation as lowerBand," +
										"price," +
										"4*standardDeviation/simpleMovingAverage as bandwidth," +
										"(price - (simpleMovingAverage - (2 * standardDeviation))) / ((simpleMovingAverage + " +
										"(2 * standardDeviation)) - (simpleMovingAverage - (2 * standardDeviation))) as percentB," +
										"timestamp as timestamp from Aggregation")
			                 .build();


        builder.setBolt("esper-bolt", esperBolt).fieldsGrouping(SYMBOLS_SPOUT_ID, new Fields("symbol"));
	// pass to another bolt just to print out what came out of the Esper bolt
        builder.setBolt("print-esper-bolt", new PrintEsperOutputBolt()).fieldsGrouping("esper-bolt", "outputStream", new Fields("symbol"));

        //run locally for testing
        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
	try {
	  Thread.sleep(600 * 1000);// wait for 10 seconds
	} catch (InterruptedException e) {
	}
	cluster.killTopology(TOPOLOGY_NAME);
	cluster.shutdown();

As we can see, we need a TickDataSpout which is then wired to the EsperBolt. The EsperBolt builder has several parts. Looking at the inputs section, we tell it that we are going to pass something called tickData of type TickData and ask Esper to generate an event of type Stock from that. The aliasComponent is passed the spout id of TickDataSpout and that wraps the passing of data from the spout to the EsperBolt.

The next part is the calls after the first outputs() section which says we’re going to emit events with fields “pcount”,”symbol”, “simpleMovingAverage”, “standardDeviation”, “price”, “timestamp”, “open” on a stream called aggregationStream. Where do these come from? If you look at the first add() call after the statements(), it contains an EPL (Esper Processing Languages) which show how those are derived. These events are inserted into something called Aggregation in EPL to pass to the next stage. The next outputs() section says we are going to emit “symbol”, “upperBand”, “middleBand”, “lowerBand”, “price”, “bandwidth”, “percentB”, “timestamp”, “open” and correspond to the second statement which calculates the Bollinger Bands and inserts into something else called BollingerResults.

After this point, I don’t need Esper and can read off the Bollinger Band items individually in PrintEsperOutputBolt by their field names and take an appropriate action.

I have to say my first pass at this was rocky where I got so many errors that I didn’t realize that I could run both EPL statements in one single EsperBolt. In my first pass, I had two EsperBolts where the results of one had to be explicitly encapulated into a POJO to stream into the second one and my second EPL statement had needlessly a lot of prefixes like Aggregation. for the fields that were used. As well, it required pulling all the emitted values from the tuple of the first bolt and casting them to the appropriate types before only passing back to the second bolt. This was wasteful boilerplate which is not required in this final approach.

Next steps. I’d like to try to do this in clojure just so that there’s a lot less ceremony in setting up the code for the spouts and bolts.

– Sarwar Bhuiyan

Advertisements
Using EsperBolt and Storm to calculate Bollinger Bands

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s