Calling Elasticsearch APIs using JAX-RS Client (Jersey Client), Jackson

Motivation:

There are plenty of full-featured client libraries for Elasticsearch out there which provide various facilities which may be used to either hide the complexities of the REST API or add some structure or helper methods to avoid dealing with big JSON strings. Among the Java-based ones, there is the Java Elasticsearch Client API which makes use of the Node or Transport clients as well as the JEST library which uses HTTP under the hood and has a similar API as the Elasticsearch Java Client APIs. Although these are great and offer various advantages, there may, in some cases, be pitfalls such as:

  • Having to bundle the elasticsearch jar and its many dependencies (lucene, guice, etc) and having to deal with the mess of classpath problems if there are common libraries that the client application itself uses
  • Errors and exceptions due to serialization or API changes if the elasticsearch jar version does not match the Elasticsearch server version (there have been issues even when the major version matches)
  • Having to redeploy application code even if the application is not using any changed Elasticsearch features/APIs in an upgraded server
  • Having to context switch between the REST API in the documentation and the client library specific APIs to achieve the same thing

The main issue is that the “official” Java library is the same jar as the elasticsearch server jar. The other client libraries all use the HTTP APIs under the hood and while they may sacrifice some features such as cluster knowledge or sniffing out data nodes, they remain sufficiently decoupled from the Elasticsearch server version whereas the main Java jar does not. This might sound irrelevant but is a subtle and crucial difference which has impact on deployments (and re-deployments) just as much as ease of development.

Another viable approach could be to just to develop client applications using the REST APIs. Although some might dread dealing with all the boilerplate and JSON processing code in Java and wanting to still use types for entities or responses, that’s a different problem to solve for which there are some pretty good solutions.

JAX-RS is a very common Java API for accessing REST/HTTP based services and has many available implementations (Jersey being one of them) which provides a lot of different options for serialization/deserialization as well as things like SSL, connection pooling, etc. In a polyglot or “microservices” environment, you may already be using it to access other HTTP based services you developed yourself or other databases. Even if you don’t, the library is very easy to use while at the same time being very flexible for your style of programming. For the rest of this article, I will show how to setup and use JAX-RS (with the Jersey implementation) and Jackson (the JSON serialization/deserialization framework) with Elasticsearch.

Dependencies:

I’m only interested in the Jersey Client features (it is also used to make REST server applications, but we don’t want that here) and the Jackson based serialization/deserialization. So the dependencies are:

<dependency>
    <groupId>org.glassfish.jersey.core</groupId>
    <artifactId>jersey-client</artifactId>
    <version>2.22.1</version>
</dependency>
<dependency>
    <groupId>com.fasterxml.jackson.jaxrs</groupId>
    <artifactId>jackson-jaxrs-json-provider</artifactId>
    <version>2.5.4</version>
</dependency>

Code:

Let’s get started with some code. Most people hit the root endpoint http://localhost:9200/ to check that the server is running, so we can start with that too in lieu of a “hello world!”.

import javax.ws.rs.client.Client;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;

import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;

public class Main {

	public static void main(String[] args) {
                // 1. Setup the JAX-RS Client and register the JacksonJsonProvider so that we can
                // marshall/unmarshall POJO <-> JSON
		Client client = ClientBuilder.newClient().register(
				JacksonJsonProvider.class);

                // 2. Create reference to an endpoint (we can't do all the calls using the fluent API too)
		WebTarget rootTarget = client.target("http://localhost:9200");

		// 3. Build and call default endpoint
		Response response = rootTarget
                                    .request() // Build request
                                    .get();    // Call get method

		System.out.println("Default endpoint");
		System.out.println("Response code: " + response.getStatus());
		System.out.println("Response: \n" + response.readEntity(String.class));
      }
}

The above code is quite simple to understand but it is doing quite a lot under the hood:

1. The first step is to create a Client object which encapsulates all the HTTP calls under the hood with a Connector provider implementation. The default uses a provider using HttpUrlConnection but there are other providers which may use other classes/libraries under the hood such as Apache Http Client. There are many configuration settings for these Connector implementations which can be tuned separately such as Keep-Alive settings, connection pooling, maximum number of concurrent connections, etc etc.

2. A JacksonJsonProvider is registered with the client. We will use this later to make use of POJOs in place of JSON Strings in our API calls and responses.

3. We create a target and make a call to it. Although we can just use the Client fluent API to do it all at once, it’s sometimes useful to break it up to get references to the specific endpoints using WebTarget.

4. The returned response object from the GET call is then queried for the HTTP status code and the body converted to a String for inspection.

Ok, so that’s good for warm-up but we want to do some more things.

Let’s say we want to create an index called “articles” using the POST API.

     // let's create an index called articles
     Response createIndexResponse = rootTarget.path("articles")
				.request(MediaType.APPLICATION_JSON) // don't really need to do this, but showing as example
                                .post(null);

    System.out.println("Creating articles index");
    System.out.println("Response code: " + createIndexResponse.getStatus());

So what did we do here? Similar to the first bit of code, we created another target from the root endpoint (calling /articles now) and passed an empty body to it using the POST method. We could have used PUT with a proper body as well here.

So how about creating an article? We can concoct a simple JSON string and POST that to the /articles/article endpoint (creates a document of type “article” in the “articles” index)

    String newArticleJSONString = "{ \"title\":\"New Article\","
				+ "\"body\":\"Lorem Ipsum\"," + "\"tags\":[\"some-tag\"]}";
		// let's create an article
    Response createArticleResponse1 = rootTarget.path("articles/article")
				.request(MediaType.APPLICATION_JSON).post(Entity.json(newArticleJSONString));
    
    System.out.println("Creating new article using JSON String");
    System.out.println("Response code: "+ createArticleResponse1.getStatus());
    System.out.println("Response :" + createArticleResponse1.readEntity(String.class));

It works but the string construction is ugly and error-prone (It took me a few tries for escaping the double quotes for example)

Can we do one better? Suppose we had an Article POJO class that we wanted to use instead?

import java.util.ArrayList;
import java.util.List;

public class Article {

	private String title;
	private String body;
	private List<String> tags = new ArrayList<String>();
	
	public Article() {}
	
	public Article(String title, String body) {
		this.title = title;
		this.body = body;
	}
	
	public void addTag(String tag) {
		tags.add(tag);
	}
	
	public String getTitle() {
		return title;
	}
	public void setTitle(String title) {
		this.title = title;
	}
	public String getBody() {
		return body;
	}
	public void setBody(String body) {
		this.body = body;
	}
	public List<String> getTags() {
		return tags;
	}
	public void setTags(List<String> tags) {
		this.tags = tags;
	}

}

Now, we can do the previous create API call like this:

    Article newArticle = new Article("New Article", "Lorem Ipsum");
    newArticle.addTag("some-tag");

    // let's create an article
    Response createArticleResponse = rootTarget.path("articles/article")
				.request().post(Entity.json(newArticle));

   System.out.println("Creating new article");
   System.out.println("Response code: "+ createArticleResponse.getStatus());
   System.out.println("Response :"+createArticleResponse.readEntity(String.class));

Ok, so that looks a lot cleaner. How did that work? Remember that JacksonJsonProvider class we registered. Well that’s coming into play here in converting the Article object to its JSON equivalent and POSTing that in the body of the HTTP call.

Next, can we do something about the response object? It’s got a lot of generic methods to query the status, headers, and read the body as a String. How do we read what comes back from Elasticsearch without explicitly parsing the String?

Since we’re using Jackson, we can create a POJO like follows which represents the bits and pieces of the response when we create an article:

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;

@JsonIgnoreProperties(ignoreUnknown=true)
public class ESDocumentResponse {

	@JsonProperty("_id")
	private String id;
	
	@JsonProperty("_type")
	private String type;
	
	@JsonProperty("_version")
	private String version;
	
	@JsonProperty("_index")
	private String index;
	
	@JsonProperty("created")
	private boolean isCreated;

	@JsonProperty("_source")
	private JsonNode source;
	
	
	public JsonNode getSource() {
		return source;
	}

	public String getId() {
		return id;
	}

	public String getType() {
		return type;
	}

	public String getVersion() {
		return version;
	}

	public String getIndex() {
		return index;
	}

	public boolean isCreated() {
		return isCreated;
	}	
}

So this is making use of Jackson annotations to say we’d like to convert a JSON string which looks like

{
 "_index":"articles", 
 "_type":"article", 
 "_id":"AVHvLWl2ouJtLoXzZ5I3", 
 "_version":1, 
 "_shards":{"total":2,"successful":1,"failed":0}, 
 "created":true
}

into an ESDocumentResponse object. The “_id”, “_type”, “_version”, “_index”, “created” attributes at the top level of the JSON String are mapped directly. The “_source” attribute we will look at later. Anything else (e.g. “_shards”) we are ignoring for now.

So how do get the response to convert to the object? It doesn’t take much:

     ESDocumentResponse createArticleResponse2 = rootTarget
				.path("articles/article").request()
				.post(Entity.json(newArticle), ESDocumentResponse.class);

     System.out.println("Creating new article");
     System.out.println("Created : " + createArticleResponse2.isCreated());
     System.out.println("Article ID: " + createArticleResponse2.getId());

What if we wanted to GET the object we just created? We can make use of the same class:

     // let's get the article we just created and marshall to
     // ESDocumentResponse
     ESDocumentResponse getArticleResponse = rootTarget
				.path("articles/article").path(createArticleResponse2.getId())
				.request().get(ESDocumentResponse.class);

     System.out.println("Getting the article we just created");
     System.out.println("ID: " + getArticleResponse.getId());
     System.out.println("SOURCE TITLE: " + getArticleResponse.getSource().get("title"));

Here the only difference is calling the .get(ESDocumentResponse.class). The “_source” field which contains the full object has been mapped to JsonNode so we could get each individual field using the .get(fieldName) call. We could just easily have mapped it to the Article class but that wouldn’t make ESDocumentResponse very generic. It’s quite easy to convert JsonNode to Article using Jackson anyway.

So there we have it. Very simple usage of JAX-RS (with the Jersey implementation) to interact with Elasticsearch. I ran the above queries against Elasticsearch master (3.0) but am pretty sure this would work against any Elasticsearch version. I didn’t have any dependencies on the elasticsearch jar and its dependencies. Also because I only made use of some CRUD operations, I only needed to write code for those bits so even though there were API changes in the Elasticsearch Java Client API, this code was not affected.

In a future post, we could get into:

  • Bulk APIs
  • Search API
  • Aggregations
  • Connection Keep Alive
  • Load balancing across multiple data nodes

Enjoy!

– Sarwar Bhuiyan

Calling Elasticsearch APIs using JAX-RS Client (Jersey Client), Jackson

Using JAX-RS and Jersey to write RESTful services in OSGI (Apache Felix, Adobe CQ5/AEM)

Although using SlingSafeMethodsServlet or SlingAllMethodsServlet might work for some things to expose some JSON based Http services, it quickly runs into some limits when you want to do the equivalents of JAX-RS. These mainly have to do with writing resources which can be used to specify URLs according to patterns, using sub resources, not having to marshal/unmarshal request payloads and responses from JSON/XML to POJOs, etc. JAX-RS is a lot more feature rich and lets you program at a higher level.

However, how to get these JAX-RS resources you write to be automatically be picked up and serving requests? In Sling, the SlingMainMethodsServlet is the main servlet serving requests but we are still able to use the Felix Http Service implementation to serve requests outside of Sling.

The wonderful people at https://github.com/hstaudacher/osgi-jax-rs-connector have written a set of bundles which do the work of publishing your JAX-RS resources and providers. You don’t get all the full fledged features that you would installing this in a servlet container but you get most things to get running.

The first steps are installing these bundles:

1. com.eclipsesource.jaxrs : jersey-all : 2.10.1
2. com.eclipsesource.jaxrs : publisher : 4.1
3. com.eclipsesource.jaxrs : multipart : 2.0-SR1 (if you’re planning to upload files for consumption by the REST service)
4. com.fasterxml.jackson.jaxrs : jackson-jaxrs-base : 2.4.3 (for marshalling to JSON using jackson)
5. com.fasterxml.jackson.core : jackson-core : 2.4.3
6. com.fasterxml.jackson.core : jackson-databind : 2.4.3
7. com.fasterxml.jackson.core : jackson-annotations : 2.4.3
8. com.fasterxml.jackson.module : jackson-module-jaxb-annotations : 2.4.3
9. com.fasterxml.jackson.module : jackson-jaxrs-json-provider : 2.4.3
10. com.eclipsesource.jaxrs : provider-security : 2.0-SR1
11. com.fasterxml : classmate : 1.1.0
12. org.jboss.logging : jboss-logging : 3.1.4.GA

Check that all bundles start.

Once you’ve done this, you can write a JAX-RS Resource as follows:

@Service
@Component
@Path("/my-service")
@Produces({ MediaType.APPLICATION_JSON })
public class MyServiceImpl extends MyService {</code>

@GET
@Override
public EntityPOJO getEntity() {
....
}

}

Make sure that it is registered as a service or the jersey container does not pick up the service. If it is successful, your service will be available at


/services/my-service (default prefix is /services)

Now, suppose you want to secure these services using a security provider. You need to write your own security provider as followed in the example

You can easily write one which uses a CQ5 ResourceResolverFactory to validate a cookie or auth token and return a user principal and user roles which can then be used to secure the JAX-RS Resource Methods. Here is an example:

package test;

@Service
@Component
public class TokenBasedSecurityHandler implements AuthenticationHandler, AuthorizationHandler {

	@Reference
	private ResourceResolverFactory resourceResolverFactory;

	public boolean isUserInRole(Principal principal, String role) {
		// TODO read the implementation of principal and see if the role matches the group in cq
	
		return true;
	}

	@Override
	public Principal authenticate(ContainerRequestContext requestContext) {
		Cookie loginTokenCookie = requestContext.getCookies().get("login-token");
		if(loginTokenCookie != null) {
			String loginTokenCookieStr = loginTokenCookie.getValue();
			TokenCookie tokenCookie = TokenCookie.fromString(loginTokenCookieStr);
			if(tokenCookie!=null &amp;&amp; !tokenCookie.getInfos().isEmpty()) {
			TokenCookie.Info tokenInfo = tokenCookie.getInfos().values().iterator.next(); // you can //iterate through all of them if you like or use the first depending on your needs
			TokenCredentials tokenCredentials = new TokenCredentials(tokenInfo.token);
			AuthenticationInfo authInfo = AuthenticationInfo("TOKEN");
			info.put("user.jcr.credentials", tokenCredentials);
			ResourceResolver resourceResolver = null;
			PrincalUser principalUser = null;
			try {
			resourceResolver = resourceResolverFactory.getResourceResolver(authInfo);
			if(resourceResolver!=null) {
				User user = resourceResolver.adaptTo(User.class); //org.apache.jackrabbit.api.security.user.User
				Iterator groupIterator = user.memberOf();
			}	
			//TODO WRite an instance of Principal which captures your user and groups
			return principalUser;
			} catch (Exception e) { //TODO }
			
		}
	}


}

The code above is not complete but mostly there. Once you have this implementation you can use the following annotation above a Resource method to secure it.

@RolesAllowed({ "group-name"})

Happy hacking.

— Sarwar Bhuiyan

Using JAX-RS and Jersey to write RESTful services in OSGI (Apache Felix, Adobe CQ5/AEM)

Using JPA to write database applications in an OSGi Container (e.g. Felix or Adobe AEM, CQ5)

Many projects might require access to databases and although you’re free to do it any way you like, many prefer the JPA (Java Persistence API) for its ease of use as a pluggable Object Relational Mapping (ORM) API. Using this in a normal servlet container or JEE container might be taken for granted with implementations such as Hibernate, Eclipselink already being there for the developer to just use without having to worry if there is any more setup or gotchas the container has. For those running in a container such as Apache Felix (e.g. users of Adobe CQ5/AEM) or another OSGi container, there are some steps before one can do this. The following is my experience doing this with Eclipselink and Gemini JPA.

So let’s say we’d like to do our JPA development like this:

1. Have a separate bundle for the Entity classes which includes the persistence unit.

2. Have bundles for OSGi services we write to be able to get access to an EntityManager instance whenever they need to

3. We do not want to put in any hardcoded database settings in the persistence.xml. Ideally we’d like the database settings to be managed by the data source pool.

For those in AEM, the data source pool is a service in the OSGi console (/system/console/configMgr) where you can setup a pool with JDBC connection settings (it’s called the Day Commons JDBC Connection Pool). Provided you have uploaded the driver already as an OSGi bundle, you can put settings here such as the JDBC driver, db string, etc. So one thing that isn’t documented (or at least I couldn’t find this) is that if you want that data source to be registered as a JNDI resource, you need to give the data source name with at least a “:” such as

java:comp/env/myDS

If you name it myDS, it is not going to be available like that and that is required for later on. If you’re not using AEM and need to do this in Felix, you can look into setting up a javax.sql.DataSource yourself and registering with a JNDI bundle yourself.

Now, once you’ve got this data source pool registered, the second step is to get all the dependent libraries for JPA and the implementation you choose. I had a requirement to use Eclipselink. The bundles I installed that worked (this was tricky) where:

1. org.eclipse.gemini.jpa_1.2.0.M1.jar (not available via maven so I had to install this manually)

2. org.osgi.enterprise 5.0.0

3. org.eclipse.persistence : javax.persistence 2.1.0

4. org.eclipse.peristence : org.eclipse.persistence.jpa 2.5.2

5. org.eclipse.persistence : org.eclipse.persistence.core 2.5.2

6. org.eclipse.persistence : org.eclipse.persistence.asm 2.5.2

7. org.eclipse.perisstence : org.eclipse.persistence.antlr 2.5.2

Once you have these installed, you can start preparing a bundle with your Entity classes and your persistence classes. I won’t go into writing how to write an Entity class here as I’m assuming you already know how to use JPA. For the persistence.xml file, put it in src/main/resources/META-INF and it goes something like this:

<persistence-unit name="my-pu" transaction-type="RESOURCE_LOCAL">

<provider>org.eclipse.persistence.jpa.PersistenceProvider</provider>

<non-jta-data-source>java:comp/env/myDS</non-jta-data-source> <!-- remember the JNDI data source name? That goes here -->

<class>my.entity.class.MyEntity</class> <!-- your entity class here -->

</persistence-unit>

Once you install this bundle as an OSGi bundle, two things should happen. The Gemini JPA container will look for the JNDI data source and try an instantiate a PersistenceProvider and an EntityManagerFactory per persistence unit bundle. So first check is to to go /system/console/services and check there are these services:

1. javax.persistence.spi.PersistenceProvider

2. At least one javax.persistence.EntityManagerFactory

If these are active, you know your can now start using that and development db access code.

In order to instantiate an EntityManager from the EntityManagerFactory, you need to get a hold of it first. In the declarative service, use:

@Reference(target=("osgi.unit.name=my-pu")) 
private EntityManagerFactory entityManagerFactory; //replace my-pu with your persistence unit name that you specified in persistence.xml

Once you’ve got this, you can call

EntityManager entityManager = entityManagerFactory.createEntityManager();

try {

//do your db access

} finally {

    entityManager.close();

    entityManager = null;

}

Once in a while you might see that your database access service classes don’t have their dependencies met. This would most likely be a problem with your database connection settings. Go to the Day Commons JDBC Connection pool and check your db settings and data source name. Once you save, unfortunately, you will have to restart the Gemini bundle again so that the container picks up those settings and restarts the persistence unit bundles.

And that’s it. There might be other ways to do this using the Hibernate bundles but I haven’t looked into this. This seemed the way of least resistance where I had the container manage the EntityManagerFactory as a service.

– Sarwar Bhuiyan

Using JPA to write database applications in an OSGi Container (e.g. Felix or Adobe AEM, CQ5)

Using EsperBolt and Storm to calculate Bollinger Bands

I was trying to hook up Storm and Esper to test their compatibility in calculating Bollinger Bands for stock symbols. I’m fairly new to all of this but I thought it’d be interesting to try to combine them in some way to get a working Storm topology that does this. I won’t go into the full details of the application I developed but just outline the way I managed to get it wired up and spitting out something useful.

The Apache Storm framework is a tool meant to aid with distributed processing of a pipeline of computations by taking one or more data sources (called Spouts) and running them through processing components (called Bolts) and being able to parallelize and distribute them in a cluster.

This allows you to focus on your data source and processing logic rather than worry about how to scale up or out the processing.

Esper, on the other hand, is a Complex Event Processing engine which allows you to run queries or computations directly on the streaming data. This allows you to do such things as checking for trends in a certain period or window and do aggregation calculations such as moving averages or standard deviations regardless of whether the data is historical or realtime.

My original idea was to write lots of small bolts to preprocess the data and then stream to an Esper based bolt to do the final CEP calculations (in this case, calculating the Bollinger Band).

However, I’ve come across https://github.com/tomdz/storm-esper and found it to have done quite a lot more than I could hope to achieve myself in the short amount of time while digesting all the bits of pieces. So thank you tomdz.

Next came trying to figure out what exactly would be enough to pass from a Spout (the data source) to Esper. It turns out, not much. You get your Stock tick data from somewhere, encapsulate it in a POJO (say TickData) and that’s it. The wiring looks like this:

            TopologyBuilder builder = new TopologyBuilder();
	    TickDataSpout spout = new TickDataSpout();

            builder.setSpout(SYMBOLS_SPOUT_ID, spout);

            EsperBolt esperBolt =
			    new EsperBolt.Builder()
			                 .inputs()
			                    .aliasComponent(SYMBOLS_SPOUT_ID)
			                    .withFields("tickData")
			                    .ofType(TickData.class)
			                    .toEventType("Stock")
			                 .outputs()
			                 	.outputs().onStream("aggregationStream").fromEventType("Stock")
			                 	.emit("pcount","symbol", "simpleMovingAverage", "standardDeviation", "price", "timestamp", "open")
			                 .outputs()
			                 	.onStream("outputStream").fromEventType("BollingerResults")
			                 	.emit("symbol", "upperBand", "middleBand", "lowerBand", "price", "bandwidth", "percentB", "timestamp", "open")
			                 .statements()
			                    .add("insert into Aggregation " +
			                    	 "select prevcount(tickData.symbol) as pcount, tickData.symbol as symbol, avg(tickData.price) as simpleMovingAverage, stddev(tickData.price) as standardDeviation, " +
			                    	 "last(tickData.price) as price, last(tickData.timestamp) as timestamp, tickData.open as open from Stock.std:groupwin(tickData.symbol).win:length(20)" +
			                    	 " group by tickData.symbol having count(*) >=20")
			                    .add("insert into BollingerResults select open, " +
										"symbol, " +
										"simpleMovingAverage + 2*standardDeviation as upperBand," +
										"simpleMovingAverage as middleBand," +
										"simpleMovingAverage - 2*standardDeviation as lowerBand," +
										"price," +
										"4*standardDeviation/simpleMovingAverage as bandwidth," +
										"(price - (simpleMovingAverage - (2 * standardDeviation))) / ((simpleMovingAverage + " +
										"(2 * standardDeviation)) - (simpleMovingAverage - (2 * standardDeviation))) as percentB," +
										"timestamp as timestamp from Aggregation")
			                 .build();


        builder.setBolt("esper-bolt", esperBolt).fieldsGrouping(SYMBOLS_SPOUT_ID, new Fields("symbol"));
	// pass to another bolt just to print out what came out of the Esper bolt
        builder.setBolt("print-esper-bolt", new PrintEsperOutputBolt()).fieldsGrouping("esper-bolt", "outputStream", new Fields("symbol"));

        //run locally for testing
        LocalCluster cluster = new LocalCluster();

        cluster.submitTopology(TOPOLOGY_NAME, config, builder.createTopology());
	try {
	  Thread.sleep(600 * 1000);// wait for 10 seconds
	} catch (InterruptedException e) {
	}
	cluster.killTopology(TOPOLOGY_NAME);
	cluster.shutdown();

As we can see, we need a TickDataSpout which is then wired to the EsperBolt. The EsperBolt builder has several parts. Looking at the inputs section, we tell it that we are going to pass something called tickData of type TickData and ask Esper to generate an event of type Stock from that. The aliasComponent is passed the spout id of TickDataSpout and that wraps the passing of data from the spout to the EsperBolt.

The next part is the calls after the first outputs() section which says we’re going to emit events with fields “pcount”,”symbol”, “simpleMovingAverage”, “standardDeviation”, “price”, “timestamp”, “open” on a stream called aggregationStream. Where do these come from? If you look at the first add() call after the statements(), it contains an EPL (Esper Processing Languages) which show how those are derived. These events are inserted into something called Aggregation in EPL to pass to the next stage. The next outputs() section says we are going to emit “symbol”, “upperBand”, “middleBand”, “lowerBand”, “price”, “bandwidth”, “percentB”, “timestamp”, “open” and correspond to the second statement which calculates the Bollinger Bands and inserts into something else called BollingerResults.

After this point, I don’t need Esper and can read off the Bollinger Band items individually in PrintEsperOutputBolt by their field names and take an appropriate action.

I have to say my first pass at this was rocky where I got so many errors that I didn’t realize that I could run both EPL statements in one single EsperBolt. In my first pass, I had two EsperBolts where the results of one had to be explicitly encapulated into a POJO to stream into the second one and my second EPL statement had needlessly a lot of prefixes like Aggregation. for the fields that were used. As well, it required pulling all the emitted values from the tuple of the first bolt and casting them to the appropriate types before only passing back to the second bolt. This was wasteful boilerplate which is not required in this final approach.

Next steps. I’d like to try to do this in clojure just so that there’s a lot less ceremony in setting up the code for the spouts and bolts.

– Sarwar Bhuiyan

Using EsperBolt and Storm to calculate Bollinger Bands

Using cemerick/friend to protect ring/compojure routes and one gotcha

The friend clojure library is a great library providing a pluggable authentication and authorization “framework” (I know, frameworks are bad words these days) for protecting your clojure web application written using ring and/or compojure. We started using it to protect REST api calls from various sources (internal and external) and given that different clients may be using different authentication mechanisms (basic auth vs oauth) while still accessing the same endpoints, friend’s support for multiple workflows (think of these as authentication handlers) came to the rescue.

An example of multiple workflows setup is as follows:

(def secured-app
   (friend/authenticate
                 app
                 {:allow-anon? true
                  :unauthenticated-handler #(workflows/http-basic-deny "Friend demo" %)
                  :workflows [(workflows/http-basic
                                 :credential-fn #(creds/bcrypt-credential-fn @users %)
                                 :realm "Friend demo")
                              (workflows/my-auth-workflow :credential-fn #(mycred-fn %))]}))

That being said, there is a slight bug in the current friend library where the different workflows are run. Ideally, a request should be going through the workflows in a chain and only continue to the next workflow if the current one fails or returns nil. The relevant code uses the clojure map function which is optimized to not be lazy for items under 32. A pull request exists to fix this:  https://github.com/cemerick/friend/pull/90/files

Once a request has been through the workflows and found the right one, the credential function/workflow returns an identity (e.g. username) and optionally some roles that can be used to authorize particular endpoints. One example is allowing read roles to access to GET endspoints but require write roles to create or update resources. As the role names are arbitrary, it is easy to use various conventions to generate and use them to isolate different actions on your REST server. In cases where you don’t need authorization by role, you can use some of the friend helper functions such as friend/authenticated.

One thing I’d like to see an example of would be principle based authorization rather than role based authorization. Given that a custom workflow can be written to emit an arbitrary set of “roles” which can be queried by the protected functions (it’s all injected into the request object), we can easily just change the compojure route to use friend/authenticated while each route handler then ensures that the correct role is present in the session to allow the user to go forward or not.

All in all, I’m pleased by the library and hope to use it more down the line.

– Sarwar Bhuiyan

Using cemerick/friend to protect ring/compojure routes and one gotcha

iPhone 5S, M7 chip, and the Internet of Things to come

Although Apple’s September special event announcing the iPhone 5s may have seemed underwhelming to many compared to previous years’ events, I am of the opinion that it portends great things to come from Apple. Although many prognosticate doom and gloom with regards to the implications for privacy with the the Touch ID sensor and the M7 chip, we may as well face it, these things are here to stay and become even more ubiquitous. There are two areas that I think these will play a major role in and I believe Apple’s underplaying their hand for strategic reasons.

1. Internet of Things – It is embarrassing to admit I hadn’t heard of this specific term until a few months back but it looks like context-aware computing, the ideas of which had been around for years, now has a brand. Everything we search or visit online is already being tracked and used in predictive analytics, targeting, and life improvements. The quantified mind is another such brand and the themes only point to a more tightly connected future than ever before. Although the iPhone 5S’s M7 chip may initially get push initial devices such as the Fitbit or the Nike+ band to obsoleteness, it is only another iteration in the process of improving the user experience. Instead of carrying around more separate devices, you carry one powerful general purpose computer (the phone) or wear unobtrusive sensors in apparel or accessories (watches, jewellery, clothes). There will definitely be cooperation required between those in the ecosystem and Apple may provide an olive branch to them by allowing them the platform(s) to build their applications on or connect their devices to.

2. Merging of computing devices into one. This one probably could go either way with Apple but my hope is that we will see a merging of all the computing devices one ones into one again. Since the phone is getting more and more powerful, why can’t the phone be my phone, work computer, or TV device? All that’s required is a shift in thinking around how to make a single device change context and provide you all those things. We have already seen initial attempts such as the Ubuntu Edge and the Motorola Atrix but we need to push the envelope harder. The CPU, memory, etc improvements will definitely help but so will pushing processing and storage to the cloud such that the disparate systems we interact with daily at work, home, and the commute will become connected via one unified set of services producing a smooth context-sensitive experience. Concerns about privacy and corporate IT backdated-ness aside, this is a matter of when instead of if. If Apple doesn’t do it, others will. Apple just happens to have the clout to be considered seriously by the masses when they present extant concepts with a polish and a bow. Everybody keeps saying how nothing is new but if these things become so ingrained in the average person’s psyche, it IS something new. It is only conscious meditation on using a phone that reminds me that only a decade ago I bought my first phone that was able to let me continue my ICQ conversations when I left my desk (it was via SMS and not Edge or GPRS, no less!)

So I say this again, Apple may have given a rather understated presentation of the iPhone 5S, but the implications are huge.

– Sarwar Bhuiyan

iPhone 5S, M7 chip, and the Internet of Things to come

Thoughts on an audit/compliance tool for Apache Sling or CQ5

In many cases we’d like to audit a system like a content management server. Of course, storing it in the system itself has its advantages but for circumstances where data needs to be captured for a long time and be searchable or retrievable long into the future by any forensics team, a more decoupled approach might be appropriate. So here’s what I’ve been thinking:

1. Use something like Apache Servicemix (https://servicemix.apache.org/index.html) or some other ESB to listen for all change events in the server (using either JCR Observation listeners or some Remote Event handlers (see http://r-osgi.sourceforge.net/userguide.html)
2. Capture all pertinent information of the event in a rendering agnostic format (say JSON) and store it in a Write Once Read Many (WORM) device or database
3. Index the data so it can be searchable via some common criteria (full text search, dates, users, etc)

Additions to this could include a crawl of the website (if it is indeed a website) and storing an offline mirror or snapshot of that somewhere. This could take many forms but a simple case would be a zip of the mirrored site which could be retrieved on demand according to date and unzipped into an embedded web server for each viewing. Alternatively, a crawler which creates a PDF or PDF/a would be created of the entire site and stored on the WORM device.

The idea behind this is not to be dependent on the codebase that renders the content because the code will definitely change over time. As well, even the system itself could be different. How many times have we decommissioned vendor products and lost the audit data?

I know we say we want to keep tape backups of CQ5 or Sling instances nightly or weekly and keep them for years and years but it could be that 9 years from the event being investigated, the specific vendor product devs or system admins might be long gone from the company.

Anybody else have any ideas on this? Would love to hear of real world implementations of this type of thing.

– Sarwar Bhuiyan

Thoughts on an audit/compliance tool for Apache Sling or CQ5