Http and TCP Load Balancing with Kubernetes

Kubernetes allows to manage large clusters which are composed of docker containers. And where is large computation power there is large amount of data throughput. However, there is still a need to spread the data throughput so it can reach and utilize particular docker container(s). This is called load balancingKubernetes supports two basic forms of load balancing. Internal among docker containers themselves and external for clients which call you application from outside environment – just users.

Internal Load Balancing with Kubernetes

Usual approach during the modeling of an application in kubernetes is to provide domain models for pods, replications controllers and services. Look at the great documentation if you are not aware of all these principles.

For simplification, pod is a set of docker containers which are always located on the same node. Replication controller allows and guarantees scalability. The last but not least is the service which hides all instances of pods – created by replication controller – behind a facade.

The concept of services brings very powerful system of internal load balancing.

Look at the picture above. Service A uses Service B. Common pattern is to provide some service discovery so some service A can look up some instance of service B’ endpoint and use it for later communication. However, service B has it’s own lifecycle and can fail forever or service B can have multiple running instance. Thus, a collection of instances of service B is changing during the time.

You need some boilerplate code to handle the algorithm describe above. Ohh, wait. Kubernetes already contains similar system.

Let say, service B has 3 pods – running docker containers. Kubernetes allows you to introduce one instance of service B to hide these three pods. In advance, it gives you one virtual IP address. Service A than can use this virtual IP address and kubernetes proxying system does the routing to a specific pod. Cool isn’t it?

The routing supports sticky session or round robin algorithm. It depends. The last thing which is up to you is to have clever client for your service, e.g. implemented with cooperation of hystrix, so it does re-trying or handle timeouts correctly.

External Load Balancing using Kubernetes

This type of load balancing is about routing the traffic from source clients to target proper pods. You have multiple options here.

Buy AWS/GCE Load Balancer

The load balancing and the networking itself is difficult system. Sharing public IPs, protocols to resolve hostnames to IPs, to resolve IPs to mac addresses, HA of LB and so on. It makes sense to buy load balancer as a service if you deploy your app in some well known cloud provider.

Thus, the services support load balancing type. Kubernetes then uses cloud provider specific API to configure the load balancer.

This is the simplest way. But what are your other options if you are not so lucky and you can’t do that? You have to deploy your own load balancer. Let’s review existing load balancers which can be deployed on premise.

Kube-proxy as external load balancer

I’ve already described the mechanism how kubernetes supports internal load balancing. Yeah! Is it possible to use it for external load balancing? Yes but there are some limitations.

Kube-proxy does exactly what we want but it’s very simple tcp load balancer – L4. I’ve detected some failures during some performance testing from external clients. The proxy just sometimes refused the connection or did not know target address of particular host and so on. However, there is no way how to configure kube-proxy, e.g. to setup some timeouts. Or there is also no way – as it’s TCP load balancer – how to do re-tries on http layer.

Kube-proxy also suffers by performance issues but Kubernetes 1.1 supports native IP tables so huge progress has been made during the recent time.

These all are reasons why I recommended to have clever service client within the internal load balancing section at the beginning. So I would still recommend to use kube-proxy as an internal load balancer but use some solid external load balancer implementation for external load balancing.

Kubernetes Ingress

New version 1.1 of the system introduced ingress feature. However, it’s still beta.

Ingress is a new version of kubernetes resource. It’s API which allows you to define certain rules where to send a traffic. For example you can re-route a traffic from path http://host/example to a service called example. It’s pretty simple.

Ingress itself is not load balancer. It’s just a domain model and a tool to describe the problem. Thus there must be a service which absorbs this model and which does the load balancing but Ingress just helps you as a abstraction. Kubernetes team already developed ha-proxy implementation for this load balancing service.

Service Load Balancer

There is a project which exist from earlier kubernetes times called service load balancer. It’s under development for a year. It’s probably predecessor of Ingress. The load balancer also uses two levels of abstraction and any existing load balancer can stand as an implementation, ha-proxy is the only current implementation.

Vulcand

Vulcand is a new load balancer. It’s technology is compatible as the base configuration is stored within etcd – this allows to utilize watch command. Note that vulcand is http-only load balancer, tcp belongs among requested features. It’s also not integrated with kubernetes but here is a simple tutorial.

Standalone HAProxy

In fact, I have finally ended with developing of my own load balancer component which is based on existing system. It’s pretty easy.

There is a kubernetes java client from fabric8, you can use it to watch the kubernetes API and it’s then up to you which existing load balancer you want to leverage and which kind of config file you will generate. However, as it’s simple there are already some projects which do the same:

Standalone Nginx

Nginx is similar technology to HAProxy so it’s easy to develop a component to configure Nginx load balancer as well. Nginx plus seems to support directly kubernetes but it’s a service.

Validating nginx config file using docker approach

I try to setup nginx as a load balancer. The configuration is just a file with a lot of constrains so I need a validation. There is no online validation service, as e.g. CoreOS has, and I don’t want to install nginx on my laptop as I work on a distributed app.

Docker is right approach for me. Let say I have following config:

https://gist.github.com/MartinPodval/851b574da259abb33163.js
In short, I’m going to pass nginx config to running nginx instance and look to the logs.

Put you nginx.config to the temp and start the docker image:

sudo docker run –name nginx -v /tmp/nginx.config:/etc/nginx/nginx.conf:ro -d nginx

It uses volume mapping so the command just starts a new docker container and mounts a local /tmp/nginx.config to the given in-container path. You can obviously change the volume path to your personal path. Is it working or not? Look at logs.

sudo docker logs nginx

If there is no entry, your file is fine. In the case of an error, you can see something like this:

2016/01/08 11:37:31 [emerg] 1#1: unexpected “}” in /etc/nginx/nginx.conf:44
nginx: [emerg] unexpected “}” in /etc/nginx/nginx.conf:44

Note that if you want to validate the file again just stop, remove and then start container again, e.g.:

sudo docker stop nginx
sudo docker rm nginx

Apache Kafka Presentation for CZJUG

Apache Kafka is famous technology these days. Being almost traditional messaging system from user point of view, it also supports scalability, high throughput and failover as well. I’ve already wrote an article.

Guys from Czech Java User Group gave me a chance to had a talk about Kafka. Here is a video from the talk in czech language.

Slides are also published on slideshare.

Designing Key/Value Repository API with Java Optional

I spent some time last month by defining our repository API. Repository is commonly component used by service layer in your application to persist the data. In the time of polyglot persistence, we use this repository design discussed in this article to persist business domain model – designed according to (our experience with) domain driven design.

Lessons Learned

We have large experience since we used nhibernate as a persistent framework in earlier product version. First, and naive, idea consist in allowing the programmers to write queries to the database on his own. Unfortunately the idea failed soon. This scenario heavily relied on a belief that every programmer knows how persistence/database work and s/he wants to write those queries effectively. It inevitably inflicted error-prone and inefficient queries. Essentially, nobody was responsible for the repositories because everyone contributed to them. Persistence components was just a framework.

The whole experience implies to design very strong and highly review-able API designed by technology-aware engineers. Usually with strong commitment to all dependent layers.

Technical Implications Affects the API

The API must obviously reflect functional requirements. They are what we want the repository to do. According to our experience, such API must also reflect technical and implementation implications. Basically, the design without knowing if the implementation will use SQL database or NoSQL Key/Value store or what are boundaries of domain aggregates will result to not efficient implementation.

To provide more realistic example, lets talk about address, consider it as an aggregate. The repository usually provides CRUD methods for the address. But what if there is a functional requirement to return only address’ street? Should the API contain such method, e.g. get street by address id?

It depends on technical implementation:

  1. What is typical maximal size of serialized address, e.g. resulting json? Does it fit to one tcp packet traveling through network or does it fit to one read operation from hard drive on the storage node? I mean: does even make any sense to fetch partial entity contrary to full entity?
  2. How often is the street read and/or write? Read/write ratio.
    1. Is it better to duplicate the data – to store street separately and within the full json – as it’s often read?
    2. Is it better to store the whole address together because of often updates outnumbering the reading?

Let say you will ignore these questions and provide all methods required from user points of view. You just allow to fetch street and address in two different methods. Let say there is also functional requirement to fetch zip code from the address. Developers who are not familiar with repository internals will typically use the method to fetch street followed by the fetch of zip code on the next line. That’s because it’s natural thinking: to compose methods on API. However, this is obviously inefficient because of two remote calls to the storage.

If you answer similar questions you can easily make the decision that the only reasonable implementation is to provide getAddress only – to return the whole address aggregate. All developers now have no other chance that to use this method and use address as a whole.

You just define the repository API in most efficient way, you just tell developers how to use underlying persistence.

Implemenation

Once we know what kind of methods to place on repository API, there are some implementation constraints it worth to mention.

Repository is not a Map

… so do not try to express CRUD methods like some remote (hash)map

Every programmer, and probably man himself, starves for patterns and solves problems according to his or her past/current experience. CRUD using key/value store sounds like an application of a map. This idea almost implies the repository interface can probably reflect map interface for both method arguments and returning types.

However, there are certain circumstances, you need keep in mind.

1. Error States

In-memory map just CRUD’s or not. In case of GET, there is a key or not. Repository on the other hand does remote calls using unreliable network to unreliable (set of) node(s). Therefore there is a broad range of potential issues you can meet.

2. Degraded Access

Look at Maps’ DELETE. The remove method returns an entity being removed. Well, in case of map, it’s just fine. On the other hand, it seems like overhead in case of repository considering slow network access. I’m not saying anything about stuff like consensus or QUORUM evaluation. It’s not cheap. I’ve also doubts whether someone would use this returning value. He just needs to remove an entity via identifier.

Excluding simple in-memory implementations, the repository methods usually perform one or more remote calls. Contrary to local in-memory calls, those remotes use slow network system under the hood. What is the implication? Considering GET method, there are other states than a key does exist/not-exist. Or, returning current value in the case of REMOVE a key can take a time.

Optimistic Locking

Basically, every our entity contains long version used for CAS-like operation – optimistic locking. Contention thus happens on storage system itself. It’s up to the system or up to the query how and what to do. Especially in distributed system this is kind of problem you do not want to solve.

Most of NoSQL storages use light approach usually called compare-and-set. Redis itself supports non-blocking transactions via MULTI, EXEC and WATCH primitives. Cassandra uses different approach built in query language support.

Java Optional as Returning Type

We have eventually decided to use use java’s Optional so our API does not return any null. However, there is one exception in method with tree-state resulting type. Here is nice discussion on stackoverflow regarding where to use and where do not to use this syntax.

However, the implementation later approved this idea as a good approach. The point here is that everyone who use a method with Optional returning type is much more aware of null state, or Optional.Empty for record. I found out during the refactoring that 40% of code which used previous repository version (in memory) did not handle null as valid returning type.

Generic Repository API Interface

We eventually ended up with following API.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/**
* @throws timeout exception
* @throws generic persistence exception
*/
interface Repository<T> {

/**
* @throws if entity already exists
*/
 void add(T entity);
    /**
* @return {@link java.util.Optional} when found, {@link java.util.Optional#EMPTY} otherwise
*/
Optional<T> get(Id id);

/**
* @return {@link java.util.Optional} when found and persisted version is different given version,
* {@link java.util.Optional#EMPTY} otherwise if there is no persisted entity for given version,
* null when found and persisted version is same as provided one
*/
Optional<T> getIfNotMatched(Id id, long version);

boolean exist(Id id);

/**
* Persist given entity and increment it's version when succeeded
* @throws stale entity exception when given entity' version is different than persisted one
* @throws if entity not exist
*/
void update(T entity);

/**
* Deletes the whole entity hierarchy including all children
* @throws stale entity exception when given entity' version is different than persisted one
* @throws if entity not exist
*/
void delete(Id id, long version);
}

ETCD: POST vs. PUT understanding

ETCD is distributed key value store used as a core component in CoreOS. I’ve already send a post earlier this week. Here is a page describing how to use ETCD basic commands = ETCD API. Code snippets placed in a page mostly use put, but ETCD allows to use post as well. Most of us understand differences between those two commands in a notion of a REST(ful) service, but how does it work in key value store?

POST

Example over many words.

curl -v http://127.0.0.1:2379/v2/keys/test -XPOST -D value=”some value”

curl -v http://127.0.0.1:2379/v2/keys/test -XPOST -D value=”some value”
Two same command result into following content:

1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
{
  "action": "get",
  "node": {
    "key": "/test",
    "dir": true,
    "nodes": [
      {
        "key": "/test/194",
        "value": "",
        "modifiedIndex": 194,
        "createdIndex": 194
      },
      {
        "key": "/test/195",
        "value": "",
        "modifiedIndex": 195,
        "createdIndex": 195
      }
    ],
    "modifiedIndex": 194,
    "createdIndex": 194
  }
}
So ETCD adds an index value and put it into resulting key – which is also path to the value. For instance:

curl -v http://127.0.0.1:2379/v2/keys/test/194 -XGET

Allows you to get the specific key. The index is explicitly expressed in the url.

PUT

Put command just add or update given key. Let say I would use following example:
curl -v http://127.0.0.1:2379/v2/keys/test -XPUT -D value=”some value”
Resulting content on test key is expected.

1
2
3
4
5
6
7
8
9
{
  "action": "get",
  "node": {
    "key": "/test",
    "value": "",
    "modifiedIndex": 198,
    "createdIndex": 198
  }
}

How to Model Add and Update Method?

My current task is to model and implement repository using ETCD under the hood. Usual repository contains CRUD methods for particular set of entities. Reasonable approach is to separate add from update to do not replace existing object, e.g. when using optimistic locking.

I don’t want to see revision – index – numbers within keys so post command is not useful here. ETCD brings prevExist parameter for this use cases.

I want to perform add method which expect that there is no content on given key. I’ll use following statement:

curl -v http://127.0.0.1:2379/v2/keys/test?prevExist=false -XPUT -D value=”some value”

When you did not delete the key, as I did not, you can get following error:

1
2
3
4
5
6
{
  "errorCode": 105,
  "message": "Key already exists",
  "cause": "/test",
  "index": 198
}

On the other hand, use false to express update existing entity.

curl -v http://127.0.0.1:2379/v2/keys/test?prevExist=true -XPUT -D value=”some value”

This command results into positive response.
< HTTP/1.1 200 OK

The repository uses put for both add and update methods but value for prevExist is the difference.

Playing with ETCD cluster in Docker on Local

I’ve started to write some management component last week. We would like to utilize CoreOs with the whole stack, as much as possible, at least within such early phase of our project.

The core component of our solution is ETCD – distributed key value store. Something like my favorite piece of software – Redis. Word ‘distributed’ means that the core of all things within your solution needs to be synchronized or ‘consensused’. ETCD uses Raft. I’d love to know how my desired component works in real environment where everything can die.

In the age of docker – where every piece of software is docker-ized, it’s pretty simple to start ETCD cluster on local in a second. Following piece of code starts three etcd instances linked together in one cluster.

docker run -d -p 4001:4001 -p 2380:2380 -p 2379:2379 –net=host –name etcd0 quay.io/coreos/etcd:v2.0.3 \
 -name etcd0 \
 -advertise-client-urls http://localhost:2379,http://localhost:4001 \
 -listen-client-urls http://localhost:2379,http://localhost:4001 \
 -initial-advertise-peer-urls http://localhost:2380 \
 -listen-peer-urls http://localhost:2380 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

docker run -d -p 4101:4101 -p 2480:2480 -p 2479:2479 –net=host –name etcd1 quay.io/coreos/etcd:v2.0.3 \
 -name etcd1 \
 -advertise-client-urls http://localhost:2479,http://localhost:4101 \
 -listen-client-urls http://localhost:2479,http://localhost:4101 \
 -initial-advertise-peer-urls http://localhost:2480 \
 -listen-peer-urls http://localhost:2480 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

docker run -d -p 4201:4201 -p 2580:2580 -p 2579:2579 –net=host –name etcd2 quay.io/coreos/etcd:v2.0.3 \
 -name etcd2 \
 -advertise-client-urls http://localhost:2579,http://localhost:4201 \
 -listen-client-urls http://localhost:2579,http://localhost:4201 \
 -initial-advertise-peer-urls http://localhost:2580 \
 -listen-peer-urls http://localhost:2580 \
 -initial-cluster-token etcd-cluster-1 \
 -initial-cluster etcd0=http://localhost:2380,etcd1=http://localhost:2480,etcd2=http://localhost:2580

The inspiration is obvious, but this stuff simply runs everything on your computer.  Parameter –net=host provides full transparency from port&network point of view.

You can now use following URL in a browser:

http://localhost:4101/v2/keys/?recursive=true

Good thing is also to check all members of your cluster. You will kill them later.

http://localhost:2379/v2/members

You can easily delete all keys in XYZ namespace using curl once you did you tests. Note that you can delete only one of your keys so you can’t perform following command on your root namespace.

curl http://127.0.0.1:2379/v2/keys/XYZ?recursive=true -XDELETE

I also prefer to see http status code as ETCD uses http status codes.

curl -v http://127.0.0.1:2379/v2/keys/XYZ

In advance to status codes, it always returns a json with their own errors codes. See a snippet at the end of the following listing. You can get something similar to:

* Hostname was NOT found in DNS cache
*   Trying 127.0.0.1…
* Connected to localhost (127.0.0.1) port 2379 (#0)
> GET /v2/keys/XYZ HTTP/1.1
> User-Agent: curl/7.35.0
> Host: localhost:2379
> Accept: */*

< HTTP/1.1 404 Not Found
< Content-Type: application/json
< X-Etcd-Cluster-Id: 65a1e86cb62588c5
< X-Etcd-Index: 6
< Date: Sun, 01 Mar 2015 22:55:14 GMT
< Content-Length: 69

{“errorCode”:100,”message”:”Key not found”,”cause”:”/XYZ”,”index”:6}
* Connection #0 to host localhost left intact

At the end of playing with ETCD cluster, you will probably want to remove all etcd’s containers. I use simple script which removes every docker container, but you can improve it using grep to remove only those hosting ETCD.

sudo docker rm -f `docker ps –no-trunc -aq`

The last interesting thing is the performance. I’ve reminded Redis which can handle one million of transactions per second using one thread. I was surprised when ETCD responded usually in 20-30ms. Much worse fact is that I’ve also encountered client timeouts because of 400-500ms RT per request. Raft is obviously not for free. But the purpose of ETCD is massive reading scalability. Well, good to know.

Performance Battle of NoSQL blob storages #3: Redis

We have already measured performance stats for Apache Cassandra and Apache Kafka as well. To include Redis within a comparison of persistent storages could see like some misunderstanding at first sight. On the other hand, there are certain use-cases allowing us to think about to store data in main memory, especially in private data centers. Primarily once your cluster includes a machine having almost equal size of hard drive and RAM 🙂

Redis is enterprise, or advanced, key-value store with optional persistence. There are couple of reasons why everyone loves Redis. Why I do?

1. It’s pretty simple.

Following command can install redis server on ubuntu. That’s all.

apt-get install redis-server
2. It’s incredible fast. Look at following tables. One million remote operations per second.
3. It supports large set of commands. More than some kind of database, it’s rather enterprise remote-aware hash-map, hash-set, sorted-list or pub/sub channel solution supporting TTL or Lua script environment among others. See all commands.
4. It’s optimized to use low computer resources, cpu and ram. Despite the redis server is single thread app it can achieve such great performance.
I’ve already started to talk about purposes at the beginning. We have primarily targeted two points. First, we can use Redis within super-fast deployment of our app when latency matters.
Secondly, I wanted to compare in-memory and persistent stores. Does it really worth to think about such in-memory solution?

Setup

I used following setup:
  • Redis server 2.2.6: HP Proliant BL460c gen 8, 32core 2.6 GHZ, 192GB RAM, ubuntu 12 server
  • Tests executor: xeon, 16 cores, 32gb ram, w8k server
  • 10Gbps network
  • jedis java client
  • kryo binary serialization
As Redis cluster feature was in development in the time of measuring these numbers, I used only one machine. 32 cores were really overestimated as Redis used one plus one core indeed.

Performance Measurement of Redis

Batch Size

Appending using LPUSH to some eight different keys.

Blob size \ Batch Size [TPS] 128 256 1024 32768
100b 570k 570k 557k 600k
20kb 38k 40k 35k 33k
As main memory is touched only, it’s all about network transmission. It’s almost same for all sizes of batches.

Variable Connections

Utilizing LPUSH again to append to different number of keys. Every keys is accessed using different connection.

Blob size \ Connections [TPS] 1 2 4 8 32 128
100b 446 750k 646k 560k 960k 998k
20kb 9.2k 16.8k 20.8k 34k 35k 52k
Ohhh. One million inserted messages per second to one Redis instance. Incredible. Java client uses NIO so this is the answer why it somehow scales with much more tcp connections. Increasing number of network pipes where a client can push the data enables better throughput.

Occupied Memory

Blob within a list.
Blob size Bytes per Message
100b 152
20kb 19kb
There is some build-in compression which appeared within large message test.

Long Running

The goal of this test is to fill the main memory (192GB) with one redis instance to find out if there are some scalability limitations.
Blob size TPS
100b 842k
20kb 18.2k
Redis fill the main memory with blob messages till OOM. The shape of this progress within the time is almost flat.

Persistence

Even if Redis stores the data in the main memory, there is a way how to persist the data.

Blob size \ [TPS] With EOF (every second) With AOF (always) Without
100b 800k 330k 960k

Redis forks a new thread for I/O. Numbers are almost same if the data are persisted within one second frames. TPS goes significantly down when Redis writes every updated key to the drive but this mode ensures best durability.

Large Messages

How the performance is affected when message size increases to tens of mbytes?

Blob size TPS
500 kb 1418
5 mb 85
100 mb 1,2

Conclusion

The numbers are impressive. One-thread app successfully process almost one million of small messages in memory.

Redis performance is incredible. We could expect certain limitations because of one-threaded design which causes “serializable” behavior. Maybe this lockless implementation is the reason why redis server can handle such great throughput.

On the other hand, the right comparison against other competitors uses redis persistence feature. The performance is much more worse. Three times. Well, the persistence requirement can be the decision maker.

I’ve already mentioned great command set. You can probably model almost any behavior. Even if Redis primarily targets caches, there are commands allowing to calculate various stats, hold unique sets etc. The script made from these commands is powerful and very fast.

Everything is always about performance and features. Redis has both of them 🙂