Monday, November 4, 2013

Redis messaging using Spring and Kryo serialization

Redis is famous enterprise key/value data store which provides simple messaging using publisher/subscriber. I've decided to use the system to notify remote nodes (components) about a change of state between those components.

Spring among the others allows to use it's templating system to decouple business logic from messaging system used under the hood. The use of spring data for Redis guarantees a solution which does not utilize any redis command in the code.

Redis contains serialized content, either byte[] or string. So the last thing to reveal is domain model serialization. I've decided to fast binary serialization using Kryo framework as a winner of battle of serializators.

Maven

First of all, it's necessary to define all dependant components. Obviously, usual component like spring itself missing.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
<dependency>
 <groupId>com.esotericsoftware.kryo</groupId>
 <artifactId>kryo</artifactId>
 <version>2.22</version>
</dependency>
<dependency>
 <groupId>org.springframework.data</groupId>
 <artifactId>spring-data-redis</artifactId>
 <version>1.1.0.RELEASE</version>
</dependency>
<dependency>
 <groupId>redis.clients</groupId>
 <artifactId>jedis</artifactId>
 <version>2.1.0</version>
</dependency>

Kryo Serializator

I used domain model's entity which hold identifier of our internal service, this is just UUID. The point is to setup kryo serializator which get the entity and returns byte[]. The entity itself is not important.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
public class MyEntitySerializer extends Serializer<MyEntity> {

    @Override
    public void write(Kryo kryo, Output output, MyEntity object) {
  return ...;
    }

    @Override
    public MyEntity read(Kryo kryo, Input input, Class<MyEntity> type) {
        return ...;
    }
}

Redis Message Handler

The beautiful part is the complete definition of async message handling within xml configuration of redis message container.

Note that I used Jedis client as java redis client.

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<bean id="jedisConnectionFactory" class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory"
  p:host-name="${redisHost}" p:use-pool="true"/>

<bean id="myEntitySerializer" class="MyEntitySerializer" />

<bean id="redisContainer" class="org.springframework.data.redis.listener.RedisMessageListenerContainer">
  <property name="connectionFactory" ref="jedisConnectionFactory"/>
  <property name="messageListeners">
    <!-- map of listeners and their associated topics (channels or/and patterns) -->
    <map>
      <entry>
        <key>
          <bean id="messageListener"
              class="org.springframework.data.redis.listener.adapter.MessageListenerAdapter">
            <constructor-arg>
              <bean class="MyEntityListener"
                  autowire="constructor"/>
            </constructor-arg>
            <property name="serializer" ref="myEntitySerializer"/>
          </bean>
        </key>
        <bean class="org.springframework.data.redis.listener.ChannelTopic">
          <constructor-arg value="${channelName}"/>
        </bean>
      </entry>
    </map>
  </property>
</bean>

What is going on?

Message container references both redis factory and given list of message listeners. It's a class (or set of classes) having registered method as a callback when a new message arrives.

The last property is channel with important channel name. The code uses two variables, first is redis host and already mentioned channel name.

Message Listener

The last thing to do is to define message listener containing callback method, MyEntityListener. The class instance is called always once new message arrive using channel topic.

Crucial point was to discover the signature of callback's method because spring's documentation is little bit sloopy. Quick look into org.springframework.data.redis.listener.adapter.MessageListenerAdapter's onMessage shows the correct way.

1
2
3
4
5
6
public class MyEntityListener {

    public void handleMessage(MyEntity entity, String channel) {
      // provide handling code
    }
}

Incoming message is automatically deserialized so the method accepts entity itself.

Conclusion

Look at previous code. Every redis related code is defined in spring's context and container class. No boilerplate code. Pretty nice.

Architectural note should explicitly show that the entity serves as a notification only. This is very important as such messaging is not reliable. Although the entity holds some information all the persistent data is defined within another place, e.g. as key/value pairs. Approaching message just notifies subscriber that new content is available to refresh and it's supposed to GET key(s).