Designing Key/Value Repository API with Java Optional

I spent some time last month by defining our repository API. Repository is commonly component used by service layer in your application to persist the data. In the time of polyglot persistence, we use this repository design discussed in this article to persist business domain model – designed according to (our experience with) domain driven design.

Lessons Learned

We have large experience since we used nhibernate as a persistent framework in earlier product version. First, and naive, idea consist in allowing the programmers to write queries to the database on his own. Unfortunately the idea failed soon. This scenario heavily relied on a belief that every programmer knows how persistence/database work and s/he wants to write those queries effectively. It inevitably inflicted error-prone and inefficient queries. Essentially, nobody was responsible for the repositories because everyone contributed to them. Persistence components was just a framework.

The whole experience implies to design very strong and highly review-able API designed by technology-aware engineers. Usually with strong commitment to all dependent layers.

Technical Implications Affects the API

The API must obviously reflect functional requirements. They are what we want the repository to do. According to our experience, such API must also reflect technical and implementation implications. Basically, the design without knowing if the implementation will use SQL database or NoSQL Key/Value store or what are boundaries of domain aggregates will result to not efficient implementation.

To provide more realistic example, lets talk about address, consider it as an aggregate. The repository usually provides CRUD methods for the address. But what if there is a functional requirement to return only address’ street? Should the API contain such method, e.g. get street by address id?

It depends on technical implementation:

  1. What is typical maximal size of serialized address, e.g. resulting json? Does it fit to one tcp packet traveling through network or does it fit to one read operation from hard drive on the storage node? I mean: does even make any sense to fetch partial entity contrary to full entity?
  2. How often is the street read and/or write? Read/write ratio.
    1. Is it better to duplicate the data – to store street separately and within the full json – as it’s often read?
    2. Is it better to store the whole address together because of often updates outnumbering the reading?

Let say you will ignore these questions and provide all methods required from user points of view. You just allow to fetch street and address in two different methods. Let say there is also functional requirement to fetch zip code from the address. Developers who are not familiar with repository internals will typically use the method to fetch street followed by the fetch of zip code on the next line. That’s because it’s natural thinking: to compose methods on API. However, this is obviously inefficient because of two remote calls to the storage.

If you answer similar questions you can easily make the decision that the only reasonable implementation is to provide getAddress only – to return the whole address aggregate. All developers now have no other chance that to use this method and use address as a whole.

You just define the repository API in most efficient way, you just tell developers how to use underlying persistence.

Implemenation

Once we know what kind of methods to place on repository API, there are some implementation constraints it worth to mention.

Repository is not a Map

… so do not try to express CRUD methods like some remote (hash)map

Every programmer, and probably man himself, starves for patterns and solves problems according to his or her past/current experience. CRUD using key/value store sounds like an application of a map. This idea almost implies the repository interface can probably reflect map interface for both method arguments and returning types.

However, there are certain circumstances, you need keep in mind.

1. Error States

In-memory map just CRUD’s or not. In case of GET, there is a key or not. Repository on the other hand does remote calls using unreliable network to unreliable (set of) node(s). Therefore there is a broad range of potential issues you can meet.

2. Degraded Access

Look at Maps’ DELETE. The remove method returns an entity being removed. Well, in case of map, it’s just fine. On the other hand, it seems like overhead in case of repository considering slow network access. I’m not saying anything about stuff like consensus or QUORUM evaluation. It’s not cheap. I’ve also doubts whether someone would use this returning value. He just needs to remove an entity via identifier.

Excluding simple in-memory implementations, the repository methods usually perform one or more remote calls. Contrary to local in-memory calls, those remotes use slow network system under the hood. What is the implication? Considering GET method, there are other states than a key does exist/not-exist. Or, returning current value in the case of REMOVE a key can take a time.

Optimistic Locking

Basically, every our entity contains long version used for CAS-like operation – optimistic locking. Contention thus happens on storage system itself. It’s up to the system or up to the query how and what to do. Especially in distributed system this is kind of problem you do not want to solve.

Most of NoSQL storages use light approach usually called compare-and-set. Redis itself supports non-blocking transactions via MULTI, EXEC and WATCH primitives. Cassandra uses different approach built in query language support.

Java Optional as Returning Type

We have eventually decided to use use java’s Optional so our API does not return any null. However, there is one exception in method with tree-state resulting type. Here is nice discussion on stackoverflow regarding where to use and where do not to use this syntax.

However, the implementation later approved this idea as a good approach. The point here is that everyone who use a method with Optional returning type is much more aware of null state, or Optional.Empty for record. I found out during the refactoring that 40% of code which used previous repository version (in memory) did not handle null as valid returning type.

Generic Repository API Interface

We eventually ended up with following API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* @throws timeout exception
* @throws generic persistence exception
*/
interface Repository<T> {

/**
* @throws if entity already exists
*/
 void add(T entity);
    /**
* @return {@link java.util.Optional} when found, {@link java.util.Optional#EMPTY} otherwise
*/
Optional<T> get(Id id);

/**
* @return {@link java.util.Optional} when found and persisted version is different given version,
* {@link java.util.Optional#EMPTY} otherwise if there is no persisted entity for given version,
* null when found and persisted version is same as provided one
*/
Optional<T> getIfNotMatched(Id id, long version);

boolean exist(Id id);

/**
* Persist given entity and increment it's version when succeeded
* @throws stale entity exception when given entity' version is different than persisted one
* @throws if entity not exist
*/
void update(T entity);

/**
* Deletes the whole entity hierarchy including all children
* @throws stale entity exception when given entity' version is different than persisted one
* @throws if entity not exist
*/
void delete(Id id, long version);
}

Java, Docker, Spring boot … and signals

I spend last couple weeks working on java apps running within docker containers deployed on clustered CoreOS machines. It’s pretty simple to run java app within a docker container. You just have to choose a base image for your app and write a docker file.

Note that docker registry contains many java distributions usually based on open jdk. We use our internal image for Oracle’s Java 8, build on top of something like this docker file. Once you make a decision whether oracle or openjdk, you can start to write your own docker file.

FROM dockerfile/java:oracle-java8
ADD your.jar /opt/your-app
ADD /dependencies /opt/your-app/dependency
WORKDIR /opt/your-app
CMD [“java -jar /opt/your-app/your.jar”]

However, your app would probably require some parameters. Therefore, last line usually calls your shell script. Such script than validates number and format of those parameters among other things. This is also useful during the development phase because none of us want to build and start an image always when something gets changed and you need to test it. So the last line of your docker file is usually similar to next snippet:

CMD [“/opt/your-app/start.sh”]

So far so good. Let say that I’ve just developed my code and made integration tests working. Once you are satisfied with your solution, you try something like this:

sudo docker run –name you-app-name your-image -e ENVIRONMENT_PARAM=VALUE …

sudo docker stop your-app-name

It’s pretty reasonable to use spring boot when your app is based on spring contexts. Thanks Dagi. You just put maven dependency to root pom.

    org.springframework.boot
    spring-boot-starter-actuator

Spring boot omits boiler-plate code and does a lot of things you eventually should do too, see documentation. I based my Main class on CommandLineRunner and I have to admit that the application or main itself is one of shortest main methods I have ever wrote.

New problem appeared once I tried to stop application. My process should receive SIGTERM and than SIGKILL according to the documentation. Nice thing about spring boot is automatic shutdown hook registered within your application. Unfortunately application log content obtained via docker logs showed that there was no such thing like …

will receive SIGTERM, and after a grace period, SIGKILL.

… the docker deamon just killed my app. Short research gave me a clue as signals are not distributed in child bash scripts. I finally found interesting article describing issues in java, shell scripts and signals in full details. As I was lucky developer, I just slightly changed last line of shell script to something like:

exec java -jar …

This simple change allows graceful shutdown and my java app can now close running spring context. This transitively means that all registered auto-closable beans are now terminated according to all needs.

ETCD: POST vs. PUT understanding

ETCD is distributed key value store used as a core component in CoreOS. I’ve already send a post earlier this week. Here is a page describing how to use ETCD basic commands = ETCD API. Code snippets placed in a page mostly use put, but ETCD allows to use post as well. Most of us understand differences between those two commands in a notion of a REST(ful) service, but how does it work in key value store?

POST

Example over many words.

curl -v http://127.0.0.1:2379/v2/keys/test -XPOST -D value=”some value”

curl -v http://127.0.0.1:2379/v2/keys/test -XPOST -D value=”some value”
Two same command result into following content:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
  "action": "get",
  "node": {
    "key": "/test",
    "dir": true,
    "nodes": [
      {
        "key": "/test/194",
        "value": "",
        "modifiedIndex": 194,
        "createdIndex": 194
      },
      {
        "key": "/test/195",
        "value": "",
        "modifiedIndex": 195,
        "createdIndex": 195
      }
    ],
    "modifiedIndex": 194,
    "createdIndex": 194
  }
}
So ETCD adds an index value and put it into resulting key – which is also path to the value. For instance:

curl -v http://127.0.0.1:2379/v2/keys/test/194 -XGET

Allows you to get the specific key. The index is explicitly expressed in the url.

PUT

Put command just add or update given key. Let say I would use following example:
curl -v http://127.0.0.1:2379/v2/keys/test -XPUT -D value=”some value”
Resulting content on test key is expected.

1
2
3
4
5
6
7
8
9
{
  "action": "get",
  "node": {
    "key": "/test",
    "value": "",
    "modifiedIndex": 198,
    "createdIndex": 198
  }
}

How to Model Add and Update Method?

My current task is to model and implement repository using ETCD under the hood. Usual repository contains CRUD methods for particular set of entities. Reasonable approach is to separate add from update to do not replace existing object, e.g. when using optimistic locking.

I don’t want to see revision – index – numbers within keys so post command is not useful here. ETCD brings prevExist parameter for this use cases.

I want to perform add method which expect that there is no content on given key. I’ll use following statement:

curl -v http://127.0.0.1:2379/v2/keys/test?prevExist=false -XPUT -D value=”some value”

When you did not delete the key, as I did not, you can get following error:

1
2
3
4
5
6
{
  "errorCode": 105,
  "message": "Key already exists",
  "cause": "/test",
  "index": 198
}

On the other hand, use false to express update existing entity.

curl -v http://127.0.0.1:2379/v2/keys/test?prevExist=true -XPUT -D value=”some value”

This command results into positive response.
< HTTP/1.1 200 OK

The repository uses put for both add and update methods but value for prevExist is the difference.

Playing with ETCD cluster in Docker on Local

I’ve started to write some management component last week. We would like to utilize CoreOs with the whole stack, as much as possible, at least within such early phase of our project.

The core component of our solution is ETCD – distributed key value store. Something like my favorite piece of software – Redis. Word ‘distributed’ means that the core of all things within your solution needs to be synchronized or ‘consensused’. ETCD uses Raft. I’d love to know how my desired component works in real environment where everything can die.

In the age of docker – where every piece of software is docker-ized, it’s pretty simple to start ETCD cluster on local in a second. Following piece of code starts three etcd instances linked together in one cluster.

docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 –net=host –name etcd0 quay.io/coreos/etcd:v2.0.3 \
 -name etcd0 \
 -advertise-client-urls http://localhost:2379,http://localhost:4001 \
 -listen-client-urls http://localhost:2379,http://localhost:4001 \
 -initial-advertise-peer-urls http://localhost:2380 \
 -listen-peer-urls http://localhost:2380 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

docker run -d -p 4101:4101 -p 2480:2480 -p 2479:2479 –net=host –name etcd1 quay.io/coreos/etcd:v2.0.3 \
 -name etcd1 \
 -advertise-client-urls http://localhost:2479,http://localhost:4101 \
 -listen-client-urls http://localhost:2479,http://localhost:4101 \
 -initial-advertise-peer-urls http://localhost:2480 \
 -listen-peer-urls http://localhost:2480 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

docker run -d -p 4201:4201 -p 2580:2580 -p 2579:2579 –net=host –name etcd2 quay.io/coreos/etcd:v2.0.3 \
 -name etcd2 \
 -advertise-client-urls http://localhost:2579,http://localhost:4201 \
 -listen-client-urls http://localhost:2579,http://localhost:4201 \
 -initial-advertise-peer-urls http://localhost:2580 \
 -listen-peer-urls http://localhost:2580 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

The inspiration is obvious, but this stuff simply runs everything on your computer.  Parameter –net=host provides full transparency from port&network point of view.

You can now use following URL in a browser:

http://localhost:4101/v2/keys/?recursive=true

Good thing is also to check all members of your cluster. You will kill them later.

http://localhost:2379/v2/members

You can easily delete all keys in XYZ namespace using curl once you did you tests. Note that you can delete only one of your keys so you can’t perform following command on your root namespace.

curl http://127.0.0.1:2379/v2/keys/XYZ?recursive=true -XDELETE

I also prefer to see http status code as ETCD uses http status codes.

curl -v http://127.0.0.1:2379/v2/keys/XYZ

In advance to status codes, it always returns a json with their own errors codes. See a snippet at the end of the following listing. You can get something similar to:

* Hostname was NOT found in DNS cache
*   Trying 127.0.0.1…
* Connected to localhost (127.0.0.1) port 2379 (#0)
> GET /v2/keys/XYZ HTTP/1.1
> User-Agent: curl/7.35.0
> Host: localhost:2379
> Accept: */*

< HTTP/1.1 404 Not Found
< Content-Type: application/json
< X-Etcd-Cluster-Id: 65a1e86cb62588c5
< X-Etcd-Index: 6
< Date: Sun, 01 Mar 2015 22:55:14 GMT
< Content-Length: 69

{“errorCode”:100,”message”:”Key not found”,”cause”:”/XYZ”,”index”:6}
* Connection #0 to host localhost left intact

At the end of playing with ETCD cluster, you will probably want to remove all etcd’s containers. I use simple script which removes every docker container, but you can improve it using grep to remove only those hosting ETCD.

sudo docker rm -f `docker ps –no-trunc -aq`

The last interesting thing is the performance. I’ve reminded Redis which can handle one million of transactions per second using one thread. I was surprised when ETCD responded usually in 20-30ms. Much worse fact is that I’ve also encountered client timeouts because of 400-500ms RT per request. Raft is obviously not for free. But the purpose of ETCD is massive reading scalability. Well, good to know.

Performance Battle of NoSQL blob storages #3: Redis

We have already measured performance stats for Apache Cassandra and Apache Kafka as well. To include Redis within a comparison of persistent storages could see like some misunderstanding at first sight. On the other hand, there are certain use-cases allowing us to think about to store data in main memory, especially in private data centers. Primarily once your cluster includes a machine having almost equal size of hard drive and RAM 🙂

Redis is enterprise, or advanced, key-value store with optional persistence. There are couple of reasons why everyone loves Redis. Why I do?

1. It’s pretty simple.

Following command can install redis server on ubuntu. That’s all.

apt-get install redis-server
2. It’s incredible fast. Look at following tables. One million remote operations per second.
3. It supports large set of commands. More than some kind of database, it’s rather enterprise remote-aware hash-map, hash-set, sorted-list or pub/sub channel solution supporting TTL or Lua script environment among others. See all commands.
4. It’s optimized to use low computer resources, cpu and ram. Despite the redis server is single thread app it can achieve such great performance.
I’ve already started to talk about purposes at the beginning. We have primarily targeted two points. First, we can use Redis within super-fast deployment of our app when latency matters.
Secondly, I wanted to compare in-memory and persistent stores. Does it really worth to think about such in-memory solution?

Setup

I used following setup:
  • Redis server 2.2.6: HP Proliant BL460c gen 8, 32core 2.6 GHZ, 192GB RAM, ubuntu 12 server
  • Tests executor: xeon, 16 cores, 32gb ram, w8k server
  • 10Gbps network
  • jedis java client
  • kryo binary serialization
As Redis cluster feature was in development in the time of measuring these numbers, I used only one machine. 32 cores were really overestimated as Redis used one plus one core indeed.

Performance Measurement of Redis

Batch Size

Appending using LPUSH to some eight different keys.

Blob size \ Batch Size [TPS] 128 256 1024 32768
100b 570k 570k 557k 600k
20kb 38k 40k 35k 33k
As main memory is touched only, it’s all about network transmission. It’s almost same for all sizes of batches.

Variable Connections

Utilizing LPUSH again to append to different number of keys. Every keys is accessed using different connection.

Blob size \ Connections [TPS] 1 2 4 8 32 128
100b 446 750k 646k 560k 960k 998k
20kb 9.2k 16.8k 20.8k 34k 35k 52k
Ohhh. One million inserted messages per second to one Redis instance. Incredible. Java client uses NIO so this is the answer why it somehow scales with much more tcp connections. Increasing number of network pipes where a client can push the data enables better throughput.

Occupied Memory

Blob within a list.
Blob size Bytes per Message
100b 152
20kb 19kb
There is some build-in compression which appeared within large message test.

Long Running

The goal of this test is to fill the main memory (192GB) with one redis instance to find out if there are some scalability limitations.
Blob size TPS
100b 842k
20kb 18.2k
Redis fill the main memory with blob messages till OOM. The shape of this progress within the time is almost flat.

Persistence

Even if Redis stores the data in the main memory, there is a way how to persist the data.

Blob size \ [TPS] With EOF (every second) With AOF (always) Without
100b 800k 330k 960k

Redis forks a new thread for I/O. Numbers are almost same if the data are persisted within one second frames. TPS goes significantly down when Redis writes every updated key to the drive but this mode ensures best durability.

Large Messages

How the performance is affected when message size increases to tens of mbytes?

Blob size TPS
500 kb 1418
5 mb 85
100 mb 1,2

Conclusion

The numbers are impressive. One-thread app successfully process almost one million of small messages in memory.

Redis performance is incredible. We could expect certain limitations because of one-threaded design which causes “serializable” behavior. Maybe this lockless implementation is the reason why redis server can handle such great throughput.

On the other hand, the right comparison against other competitors uses redis persistence feature. The performance is much more worse. Three times. Well, the persistence requirement can be the decision maker.

I’ve already mentioned great command set. You can probably model almost any behavior. Even if Redis primarily targets caches, there are commands allowing to calculate various stats, hold unique sets etc. The script made from these commands is powerful and very fast.

Everything is always about performance and features. Redis has both of them 🙂

Performance Battle of NoSQL blob storages #2: Apache Kafka

The first article of this series brought scaling factors for blob-based content on Apache Cassandra. It’s well know piece of software, requiring full installation on nodes, management application and so on. You also need to tune various configs to achieve best performance results. I’ve spend nice time playing with yamls on ubuntu 🙂

The configuration is sometimes tricky. I was little bit confused once or twice so I planned to hire Cassandra guru to our team as issues I encountered seems really complicated 🙂

Well, we do not need much functionality in HP Service Virtualization. The core is to replicate messages to achieve reliability. The next steps is to process them. Yes, the last part is to aggregate the results. Sounds exactly like map and reduce ala hadoop. Evaluation of Apache Storm or Apache Samza are different stories, but they allow me to find Apache Kafka.

Kafka is pretty nice software, much more simpler comparing to Cassandra how I’ve described it above. The only operation to use it is to depend on kafka jars in your pom files. That’s it! Maven downloads couple of dependent jar files and your environment is (almost) ready.

As you can see below, Kafka is incredibly fast, much more faster than Cassandra. Last year I read some article. An author labels Redis as most incredible software he had met so far. I agreed. Now there are two candidates for this label 🙂

It was also very beneficial to read their technical documentation for me as for technical engineer. The guys did performance research how to store data on hard drives and they also employed founded approaches in Kafka implementation. Their documentation contains interesting technical papers, like e.g. lmax pdf does with disruptor.

Setup

The setup was identical to the described in first article.

Performance Measurement of Kafka Cluster

Batch Size

  • 8 topics
  • two partitions per one topic
Blob size \ Batch Size [TPS] 128 256 512 1024 32768
100 b 392k 432k 433k 495k 396k
20 kb 9k

There is little difference between various batch sizes so you can tune the value according your needs. Note that overall throughput is incredible: raw 400 mbits/s.

Number of Connections

  • batch size is 512

Blob size \ Connections [TPS per connection] 1 2 4 8 16 32
100 b 84k 74k 62k 58k 37k 17k
20 kb 3.2k 2.5k 1.7k

You can see that number of connections significantly increases the throughput of Kafka during writes. Two connections handles 150k but, 8 ones allows 464k messages per all connections.

Number of Partitions

  • 8 connections

Blob size \ Partitions [TPS] 1 2 4 8 16 64
100 b 535k 457k 493k 284k

Two partitions brought little difference in the overall score. There is same approach or pattern like in hyper threading.

Long Running

The goal of this test is to verify the behavior once the cluster is under the continual heavy load for couple of minutes – as the underlying storage goes out of space (150GB). As Kafka is really fast, such space lasts couple of minutes.

Blob size \ [TPS]
100b 490k

The test generates 150GB of data successfully stored by Kafka. The throughput result is almost the same as in short test.

Replication

This is crucial verification how is Kafka affected by the replication.

Blob size \ Replicas [TPS] 1 2
100 b 577k 536k

There is difference less than 10% when two nodes are involved in the replication. Great! Both nodes handles 400mbits throughput.

Large Messages

Previous tests use relatively small messages, how does it behave with larger message?

Blob size \ [TPS]
100b 640k
20 kb 6k
500kb 314

The best throughput achieved last – large – message. Even for so large entity, it handles unbelievable 1.2Gbits/s. Note that all of this is a remote communication, we have 10gbps network.

Occupied Space

As the kafka stores byte array, the occupied space for this system depends on the serialization framework. I used famous kryo.

Blob size \ [Occupied space in bytes per message]
100b 183b

Here is the structure of serialized entity.
class Message {

    private UUID messageId;
private UUID virtualServiceId;
private String targetUrlSuffix;
private int responseStatusCode;
private long time;
private byte[] data;
}

Conclusion

Kafka surprised me a lot. It’s performance is incredible. The installation is just the dependency on a jar file, the configuration is very easy. The API is really simple, It’s up to you what kind and form of the content you prefer.

The only drawback is that this piece of software is pretty young. In the time this article was being written, beta version 0.8 is out. For example, async API is now in a proposal only.

On the other hand, there is large set of various articles, videos and other materials, how one used it in his project, especially along with Apache Storm.

Well,  if you want to use messaging within your new solution you should definitely look at Apache Kafka.

Performance Battle of NoSQL blob storages #1: Cassandra

Preface

We spend last five years on HP Service Virtualization using MsSQL database. Non-clustered server. Our app utilizes this system for all kinds of persistence. No polyglot so far. As we tuned the performance of the response time – we started at 700ms/call and we achieved couple milliseconds per call at the end when DB involved – we had to learn a lot of stuff.

Transactions, lock escalation, isolation levels, clustered and non clustered indexes, buffered reading, index structure and it’s persistence, GUID ids in clustered indexes, bulk importing, omit slow joins, sparse indexes, and so on. We also rewrite part of NHibernate to support multiple tables for one entity type which allows use scaling up without lock escalation. It was good time. The end also showed us that famous Oracle has half of our favorite features once we decided to support this database.

Well, as I’m thinking about all issues which we encountered during the development, unpredictive behavior is the feeling I have once I did all this with my DB specialist colleague. When we started to care about the performance more and more we must have understood those features and we usually took out default behavior and plug-in some special one – we could have done that as we understood essentials of the system. This is the way.

As I’ve described those features above, a lot of NoSQL storages allow to use them on your own. You can easily provide inverted index in key-value store even if the database itself does not support such functionality. What you need to do is to write error-less code and maintain indexes by your own code. Look at twitter example for famous key-value store Redis. It is not so simple as three/four tables in MySql with build-in indexing, isn’t it?

There is nothing bad about SQL databases. I still believe that they fit the purpose of 80% of programs as they are best for difficult querying and/or less than massive scaling. But who need real massive scaling? Everyone?

Why we are going to use NoSQL?

Here in this article and the whole series, I’m going to evaluate certain attributes of storages for persistence of binary data of variable length. The goal is to persist network traffic in binary data format.
No relation decomposition, no querying of particular attributes of particular messages. No implicit transactions complying with ACID. No indexes and so on. You can find the reasons in the preface, we just hope that we know what we do 🙂

Cassandra

The first evaluated storage is Cassandra. It’s famous NoSQL database, commercial support provides virtual confidence for fast support and so on.
Cassandra is somewhere in the middle between schemaless and schemaware (SQL like) databases as it supports schema and CQL query language. I was pretty surprised when I have saw features like time series with dynamic columns, build-in lists and sets. Next good stuff is OpsCenter monitoring tool, I always show some screenshots from the application when I want to demonstrate easy-of-use monitor for some distributed app. Pretty nice.

Hardware

Hardware used within the tests:
  • DB cluster:
    • HP Proliant BL460c gen 8, 32core 2.6 GHZ, 32GB RAM, W12k server
    • HP Proliant BL460c gen 8, 32core 2.6 GHZ, 192GB RAM, ubuntu 12 server
  • Tests executor:
    • xeon, 16 cores, 32gb ram, w8k server
  • 10Gbps network
  • Java 7

Performance Measurement of Cassandra Cluster

The setup used one seed and one replica, java tests use datastax driver. Replication factor two when not explicitly defined. After some issues:

//platform.twitter.com/widgets.js

… my final CQL create script was:


  1. CREATE COLUMNFAMILY recording (



  2. PARTKEY INT,



  3. ID uuid,



  4. VS_ID uuid,



  5. URL ascii,



  6. MESSAGE_TIME BIGINT,



  7. DATA BLOB,



  8. PRIMARY KEY((PARTKEY, VS_ID), ID)



  9. );



Batch Size

Datastax driver allows to use queries in batches, how does it scale?

Blob size \ Batch Size [TPS] 128 256 512 1024 8196 32768
100 bytes (8 clients) 67 100 63 900 55 400 55 800 23 500 Timeouted
1 kb (4 clients) 15 600 20 600 20 300 19 400 5 500 N/A
5 kb (1 client) N/A 8 300 9 200 10 000 3 000

256 rows seems like best size of the batch.

Number of Connections

The tests utilizes variable count of connections to the Cassandra cluster. Note that the batch size was 256 and consistency was set to ONE.

Blob size \ Connections [TPS] 1 2 4 8 24
100 b 27 000 53 000 75 000 62 000 62 000
1 kb 17 900 31 000 42 000 40 000
5 kb 9 000 14 600 16 000 10 300
20 kb 3 300 4 500 3 800

You can see that 2 or 4 connections per one client are enough whilst one connection slows down the overall performance.

Number of Partitions

The table contains partition key, see the CQL code snippet. The test generates variable values for the partition key. Batch size was also 256 and consistency one.

Blob size \ Partitions [TPS] 1 8 64
100 b 58 000 71 000 70 000
10 kb 13 200 9100 7 300
Partition key says where the data will reside, it defines target machine. As two nodes are involved in my tests, its better to use more than one partition.

Variable Consistency

Tuning of consistency is the most beautiful attribute in Cassandra. You can define consistency level in every aspect of the communication with cassandra driver in any language. There are different levels, see the documentation.

The test used 8 partitions, 8 connections and 256 batch size.

Blob size \ Consisency [TPS] Any One Two
100 b 67 000 66 000 66 000

Well, I’m surprised. There is almost no difference between consistency levels for such blob size despite totally different purpose or implementation. This could happen due to almost ideal environment, very fast network and machines as well.

Variable Replicas

I used two replicas by default but this tests tries to reveal the price for second copy of the data.

Blob size \ Replicas [TPS] 1 2
100 b 121 000 72 000
10 kb 19 200 8 200

There is apparent penalty for second replica.

Long Running

Previous tests used a few millions of inserts/rows but the current one uses hundred of millions. This is the way how to definitely omit the advantage of various caches and clever algorithms. This simulates heavy weight load test to the server.

8 partitions, 8 connections, 256 batch size.

Blob size \ Replicas+Consistency [TPS] 1+ONE 2+ONE 2+TWO
100 b (1 billion) 121 000 63 000 62 00
10 kb (30 m) 8 200

Large Messages

The largest message in these tests has 20 kb so far, but what is the behavior when our app would meet some large file as a attachment?
Consistency One, 2 partitions, 2 connections, 2 batch size.

Blob size  [TPS]
500 kb 90
5 mb 9.5

Occupied Space

Apache Cassandra automatically enables compression since 1.1 version.

Blob size \ [Occupied space in bytes per message]
100b 89b

Here is the structure of serialized entity.

CREATE COLUMNFAMILY recording (
    PARTKEY INT,
    ID uuid,
    VS_ID uuid,
    URL ascii,
    MESSAGE_TIME bigint,
    DATA blob,
    PRIMARY KEY((PARTKEY, VS_ID), ID)
);

Conclusion

This kind of testing is almost opposite in comparison to neflix tests. They analyzed scalability but we have done testing of simple cluster having two machines only. We need to figure out the performance of one node (obviously in cluster deployment).
I was impressed as the numbers above are good, the overall throughput on the network reached hundred mbps coming out from one client. 
What next. Same kinds of tests agains Kafka, Redis, Couchbase, Aerospike, Couchbase.