Apache Kafka Presentation for CZJUG

Apache Kafka is famous technology these days. Being almost traditional messaging system from user point of view, it also supports scalability, high throughput and failover as well. I’ve already wrote an article.

Guys from Czech Java User Group gave me a chance to had a talk about Kafka. Here is a video from the talk in czech language.

Slides are also published on slideshare.

Java, Docker, Spring boot … and signals

I spend last couple weeks working on java apps running within docker containers deployed on clustered CoreOS machines. It’s pretty simple to run java app within a docker container. You just have to choose a base image for your app and write a docker file.

Note that docker registry contains many java distributions usually based on open jdk. We use our internal image for Oracle’s Java 8, build on top of something like this docker file. Once you make a decision whether oracle or openjdk, you can start to write your own docker file.

FROM dockerfile/java:oracle-java8
ADD your.jar /opt/your-app
ADD /dependencies /opt/your-app/dependency
WORKDIR /opt/your-app
CMD [“java -jar /opt/your-app/your.jar”]

However, your app would probably require some parameters. Therefore, last line usually calls your shell script. Such script than validates number and format of those parameters among other things. This is also useful during the development phase because none of us want to build and start an image always when something gets changed and you need to test it. So the last line of your docker file is usually similar to next snippet:

CMD [“/opt/your-app/start.sh”]

So far so good. Let say that I’ve just developed my code and made integration tests working. Once you are satisfied with your solution, you try something like this:

sudo docker run –name you-app-name your-image -e ENVIRONMENT_PARAM=VALUE …

sudo docker stop your-app-name

It’s pretty reasonable to use spring boot when your app is based on spring contexts. Thanks Dagi. You just put maven dependency to root pom.

    org.springframework.boot
    spring-boot-starter-actuator

Spring boot omits boiler-plate code and does a lot of things you eventually should do too, see documentation. I based my Main class on CommandLineRunner and I have to admit that the application or main itself is one of shortest main methods I have ever wrote.

New problem appeared once I tried to stop application. My process should receive SIGTERM and than SIGKILL according to the documentation. Nice thing about spring boot is automatic shutdown hook registered within your application. Unfortunately application log content obtained via docker logs showed that there was no such thing like …

will receive SIGTERM, and after a grace period, SIGKILL.

… the docker deamon just killed my app. Short research gave me a clue as signals are not distributed in child bash scripts. I finally found interesting article describing issues in java, shell scripts and signals in full details. As I was lucky developer, I just slightly changed last line of shell script to something like:

exec java -jar …

This simple change allows graceful shutdown and my java app can now close running spring context. This transitively means that all registered auto-closable beans are now terminated according to all needs.

Playing with ETCD cluster in Docker on Local

I’ve started to write some management component last week. We would like to utilize CoreOs with the whole stack, as much as possible, at least within such early phase of our project.

The core component of our solution is ETCD – distributed key value store. Something like my favorite piece of software – Redis. Word ‘distributed’ means that the core of all things within your solution needs to be synchronized or ‘consensused’. ETCD uses Raft. I’d love to know how my desired component works in real environment where everything can die.

In the age of docker – where every piece of software is docker-ized, it’s pretty simple to start ETCD cluster on local in a second. Following piece of code starts three etcd instances linked together in one cluster.

docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 –net=host –name etcd0 quay.io/coreos/etcd:v2.0.3 \
 -name etcd0 \
 -advertise-client-urls http://localhost:2379,http://localhost:4001 \
 -listen-client-urls http://localhost:2379,http://localhost:4001 \
 -initial-advertise-peer-urls http://localhost:2380 \
 -listen-peer-urls http://localhost:2380 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

docker run -d -p 4101:4101 -p 2480:2480 -p 2479:2479 –net=host –name etcd1 quay.io/coreos/etcd:v2.0.3 \
 -name etcd1 \
 -advertise-client-urls http://localhost:2479,http://localhost:4101 \
 -listen-client-urls http://localhost:2479,http://localhost:4101 \
 -initial-advertise-peer-urls http://localhost:2480 \
 -listen-peer-urls http://localhost:2480 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

docker run -d -p 4201:4201 -p 2580:2580 -p 2579:2579 –net=host –name etcd2 quay.io/coreos/etcd:v2.0.3 \
 -name etcd2 \
 -advertise-client-urls http://localhost:2579,http://localhost:4201 \
 -listen-client-urls http://localhost:2579,http://localhost:4201 \
 -initial-advertise-peer-urls http://localhost:2580 \
 -listen-peer-urls http://localhost:2580 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

The inspiration is obvious, but this stuff simply runs everything on your computer.  Parameter –net=host provides full transparency from port&network point of view.

You can now use following URL in a browser:

http://localhost:4101/v2/keys/?recursive=true

Good thing is also to check all members of your cluster. You will kill them later.

http://localhost:2379/v2/members

You can easily delete all keys in XYZ namespace using curl once you did you tests. Note that you can delete only one of your keys so you can’t perform following command on your root namespace.

curl http://127.0.0.1:2379/v2/keys/XYZ?recursive=true -XDELETE

I also prefer to see http status code as ETCD uses http status codes.

curl -v http://127.0.0.1:2379/v2/keys/XYZ

In advance to status codes, it always returns a json with their own errors codes. See a snippet at the end of the following listing. You can get something similar to:

* Hostname was NOT found in DNS cache
*   Trying 127.0.0.1…
* Connected to localhost (127.0.0.1) port 2379 (#0)
> GET /v2/keys/XYZ HTTP/1.1
> User-Agent: curl/7.35.0
> Host: localhost:2379
> Accept: */*

< HTTP/1.1 404 Not Found
< Content-Type: application/json
< X-Etcd-Cluster-Id: 65a1e86cb62588c5
< X-Etcd-Index: 6
< Date: Sun, 01 Mar 2015 22:55:14 GMT
< Content-Length: 69

{“errorCode”:100,”message”:”Key not found”,”cause”:”/XYZ”,”index”:6}
* Connection #0 to host localhost left intact

At the end of playing with ETCD cluster, you will probably want to remove all etcd’s containers. I use simple script which removes every docker container, but you can improve it using grep to remove only those hosting ETCD.

sudo docker rm -f `docker ps –no-trunc -aq`

The last interesting thing is the performance. I’ve reminded Redis which can handle one million of transactions per second using one thread. I was surprised when ETCD responded usually in 20-30ms. Much worse fact is that I’ve also encountered client timeouts because of 400-500ms RT per request. Raft is obviously not for free. But the purpose of ETCD is massive reading scalability. Well, good to know.

Kryonet: simple but super-fast TCP communication in Java + performance benchmarks

We have designed cluster-aware and cloud-based solution of service virtualization for some time. Incredible work as distributed system is next dimension of thinking – and learning as well 🙂

We was deciding to split atomic piece of our domain work to multiple nodes. Such step would hopefully provide us to point certain messages to certain nodes. We can than store data in main memory and avoid distributed locking above some remote store, probably Redis. This is usual approach how to achieve better response time of the whole solution to avoid database search for all calls. Obviously with well designed replication.

I was really worried about latency. We need really low response time in couple of milliseconds at most under heavy load. I’m talking about the whole solution which consumes CPU a lot because of the calculations of simulated response under the hood, so no space for any wasting.

What is the best latency of such transmission between two nodes? What is the throughput of one or multiple connections? Except theoretical analysis I also did our own proof of concept to verify our thoughts in the reality.

I like kryo serialization framework. It’s very fast, very simple and the boot of anybody is in level of minutes. Hopefully I discovered that guys from Esoteric Software think about remote communication and they did very logic step. They developed framework allowing TCP communication using kryo serialization – kryonet.

Well, I believe that nobody is surprised that the solution is very fast and super easy as well. Like kryo itself.

There are some benchmarks how TCP communication can be fast, in level o microseconds, but what is the real response time, performance, when I need to transfer some domain object to another node in real environment using gigabit network. I mean no laboratory environment but just real one where our app will probably run in couple of months?

Well, in short it’s hundred of microseconds.

Kryonet via Kryo

It is built above java nio, kryo serialization and asynchronous approach when the response is being received.
It’s better to read the manual on github as coding around the sending and the receiving is very easy.

Kryo preserves the way that the serializators are separated classes, later registered to the system, so you do no have to affect your domain classes. But obviously, you have to open them to support injections of fields from outside. Or you can use the reflection too.

The response is received asynchronously to the processing of the original request. Kryonet exposes TCP port on the server side and uses TCP under the hood. But don’t worry, you almost can’t touch complexity of TCP messaging.

Number meaning and setup

What was my message? It contains identifier, uuid, and map of elements. The element contains the data but the important thing is the overall size of the message.

Response time is the whole round-trip:

  1. Serialization on the client side
  2. To send the request to the server
  3. Deserialization of the message on server side
  4. To respond the message identifier back to the client
  5. The deserialization of received message on the client size

Well, the response time is just the number you can expect when you want to push some message to remote component and wait for the acknowledgment – ACK.

Both client and server are HP Proliant, 24 and 32 cores, >1 gigabit network.

Performance Benchmarks

One TCP Connection

First of all, I used one TCP connection.

Message Size [bytes]/Number of elements Average RT [micro-seconds] Median RT [micro-seconds] Buffer size Throughput [msgs/s] Rough Load [msgs/s]
48 / 2 190 160 256 B 5400 100k
48 / 2 202 170 16 kB 5400 100k
48 / 2 672 157 1kB 18300 1M
48 / 2 418 130 16kB 19300 4M
3216 / 200 316 313 16 kB 650 1k
3216 / 200 332 331 256 B 1k

Message size: size of serialized message into raw byte[].

Average vs. median: as my test keeps all response time value, it was very interesting to analyze them. Average value reflects all edge cases, mostly the divergence during the flushing of buffers etc., but median value does not. What does it mean in real? The difference between average and median value indicates growing number of slower responses.

Buffer: kryo uses serialization and object buffer, see the documentation.

Rough load: all calls are asynchronous which means that I had to limit incoming load generated by my generator. The approach I chose was to call standard java thread sleep after certain messages. The operation in milliseconds is not quite accurate but it was fine for rough limitation of input load. 4M means that the generator thread sleeps for 1 milliseconds once it generates four thousand of messages.

The discussion:

  • there is only two times slower response time for 100 times larger message
  • there is almost no different response time for various buffer sizes
  • increasing load means almost the same median value but highly increased average response time. Well, the response time is not stable at all which indicates that you can’t rely on response time in the case of massive load

Multiple Client Connections to One TCP Server Port

The approach having multiple clients using one TCP port usually scales well, but not for kryonet as it uses limited buffer size, see following table. I used 10 concurrent clients for the measure.

Message Size [bytes]/Number of elements Average RT [micro-seconds] Median RT [micro-seconds] Buffer size (client/server) Throughput [msgs/s] Rough Load [msgs/s]
48 / 2 256 254 16k/16k 9000 1k
48 / 2 580 535 16k/160k 42000 5k
48 / 2 33000 23000 16k/300k 80000 10k

The discussion:

  • kryo itself is very fast but single threaded deserialization brings certains limits so huge throughput was achieved for the cost of really high latency
  • the increasing of the buffer size within the server allows to serve much more requests as they just fall into buffer and were not rejected as before for lower buffer size

Multiple Connections to Multiple Ports

The last includes multiple clients connected to the multiple server ports. It brought the best performance as expected.

Message Size [bytes]/Number of elements Connections Average RT [micro-seconds] Median RT [micro-seconds] Buffer size (client/server) Throughput [msgs/s] Rough Load [msgs/s]
48 / 2 10 542 486 256/256 54000 100k
48 / 2 10 5000 518 16k/16k 155000 500k
48 / 2 20 2008 1423 16k/32k 210000 500k
48 / 2 50 1460 404 16k/32k 182000 100k
48 / 2 50 2024 378 256k/256k 220000 100m
48 / 2 100 3120 404 256k/256k 231000 100m
3216 / 200 10 405 368 8k/32k 21000 3k
3216 / 200 10 415 353 16k/32k 21000 3k
3216 / 200 20 399 350 16k/32k 32300 2k
3216 / 200 50 380 327 16k/32k 43700 1k
3216 / 200 50 410 344 64k/128k 78000 2k

The discussion:

  • incredible load for 100/50 clients per second in no bad time as I would originally expected
  • as I’ve already discuss the stuff above, there is huge different between median and average RT as well
  • there is no such difference between 20 and 100 clients in throughtput but in average/median RT

Resources

Well, usually the transport of a message from one to another node is not your business case. It’s important that both client and server is able to do it’s core logic – message processing of received throughput is always the core.
One client does not almost burden the server, see htop screenshot for the most approach having the best throughput:
On the other hand, 10 concurrent clients to one server side port is almost the same even if the number of transported messages is 5x time higher:
As you can see, 50 concurrent clients and ports is more likely theoretical stuff for this measure than usable solution as the server do not any other thing then deserialize incoming throughtput 🙂

Conclusion

It’s fast and easy. Almost 20k transmitted small messages for one client per second is incredible number for me. On the other hand, 150-200 micro seconds response time is surprise for me as well.
The only thing to consider is to carefully define the size of all buffers. There is no queuing and just the rejection of a message can happen if the connection is under heavy load.