Saturday, October 25, 2014

Performance Battle of NoSQL blob storages #2: Apache Kafka

The first article of this series brought scaling factors for blob-based content on Apache Cassandra. It's well know piece of software, requiring full installation on nodes, management application and so on. You also need to tune various configs to achieve best performance results. I've spend nice time playing with yamls on ubuntu :-)

The configuration is sometimes tricky. I was little bit confused once or twice so I planned to hire Cassandra guru to our team as issues I encountered seems really complicated :-)

Well, we do not need much functionality in HP Service Virtualization. The core is to replicate messages to achieve reliability. The next steps is to process them. Yes, the last part is to aggregate the results. Sounds exactly like map and reduce ala hadoop. Evaluation of Apache Storm or Apache Samza are different stories, but they allow me to find Apache Kafka.

Kafka is pretty nice software, much more simpler comparing to Cassandra how I've described it above. The only operation to use it is to depend on kafka jars in your pom files. That's it! Maven downloads couple of dependent jar files and your environment is (almost) ready.

As you can see below, Kafka is incredibly fast, much more faster than Cassandra. Last year I read some article. An author labels Redis as most incredible software he had met so far. I agreed. Now there are two candidates for this label :-)

It was also very beneficial to read their technical documentation for me as for technical engineer. The guys did performance research how to store data on hard drives and they also employed founded approaches in Kafka implementation. Their documentation contains interesting technical papers, like e.g. lmax pdf does with disruptor.

Setup

The setup was identical to the described in first article.

Performance Measurement of Kafka Cluster

Batch Size

  • 8 topics
  • two partitions per one topic
Blob size \ Batch Size [TPS]128256512102432768
100 b392k432k433k495k396k
20 kb9k--------

There is little difference between various batch sizes so you can tune the value according your needs. Note that overall throughput is incredible: raw 400 mbits/s.



Number of Connections

  • batch size is 512

Blob size \ Connections [TPS per connection]12481632
100 b84k74k62k58k37k17k
20 kb3.2k2.5k1.7k------

You can see that number of connections significantly increases the throughput of Kafka during writes. Two connections handles 150k but, 8 ones allows 464k messages per all connections.

Number of Partitions

  • 8 connections

Blob size \ Partitions [TPS]12481664
100 b535k457k493k284k--

Two partitions brought little difference in the overall score. There is same approach or pattern like in hyper threading.

Long Running


The goal of this test is to verify the behavior once the cluster is under the continual heavy load for couple of minutes - as the underlying storage goes out of space (150GB). As Kafka is really fast, such space lasts couple of minutes.

Blob size \ [TPS]
100b490k

The test generates 150GB of data successfully stored by Kafka. The throughput result is almost the same as in short test.

Replication


This is crucial verification how is Kafka affected by the replication.

Blob size \ Replicas [TPS]12
100 b577k536k

There is difference less than 10% when two nodes are involved in the replication. Great! Both nodes handles 400mbits throughput.

Large Messages


Previous tests use relatively small messages, how does it behave with larger message?

Blob size \ [TPS]
100b640k
20 kb6k
500kb314

The best throughput achieved last - large - message. Even for so large entity, it handles unbelievable 1.2Gbits/s. Note that all of this is a remote communication, we have 10gbps network.

Occupied Space


As the kafka stores byte array, the occupied space for this system depends on the serialization framework. I used famous kryo.

Blob size \ [Occupied space in bytes per message]
100b183b

Here is the structure of serialized entity.
class Message {
    private UUID messageId;
    private UUID virtualServiceId;
    private String targetUrlSuffix;
    private int responseStatusCode;
    private long time;
    private byte[] data;
}


Conclusion


Kafka surprised me a lot. It's performance is incredible. The installation is just the dependency on a jar file, the configuration is very easy. The API is really simple, It's up to you what kind and form of the content you prefer.

The only drawback is that this piece of software is pretty young. In the time this article was being written, beta version 0.8 is out. For example, async API is now in a proposal only.

On the other hand, there is large set of various articles, videos and other materials, how one used it in his project, especially along with Apache Storm.

Well,  if you want to use messaging within your new solution you should definitely look at Apache Kafka.

Wednesday, July 30, 2014

Performance Battle of NoSQL blob storages #1: Cassandra

Preface

We spend last five years on HP Service Virtualization using MsSQL database. Non-clustered server. Our app utilizes this system for all kinds of persistence. No polyglot so far. As we tuned the performance of the response time - we started at 700ms/call and we achieved couple milliseconds per call at the end when DB involved - we had to learn a lot of stuff.

Transactions, lock escalation, isolation levels, clustered and non clustered indexes, buffered reading, index structure and it's persistence, GUID ids in clustered indexes, bulk importing, omit slow joins, sparse indexes, and so on. We also rewrite part of NHibernate to support multiple tables for one entity type which allows use scaling up without lock escalation. It was good time. The end also showed us that famous Oracle has half of our favorite features once we decided to support this database.

Well, as I'm thinking about all issues which we encountered during the development, unpredictive behavior is the feeling I have once I did all this with my DB specialist colleague. When we started to care about the performance more and more we must have understood those features and we usually took out default behavior and plug-in some special one - we could have done that as we understood essentials of the system. This is the way.

As I've described those features above, a lot of NoSQL storages allow to use them on your own. You can easily provide inverted index in key-value store even if the database itself does not support such functionality. What you need to do is to write error-less code and maintain indexes by your own code. Look at twitter example for famous key-value store Redis. It is not so simple as three/four tables in MySql with build-in indexing, isn't it?

There is nothing bad about SQL databases. I still believe that they fit the purpose of 80% of programs as they are best for difficult querying and/or less than massive scaling. But who need real massive scaling? Everyone?

Why we are going to use NoSQL?

Here in this article and the whole series, I'm going to evaluate certain attributes of storages for persistence of binary data of variable length. The goal is to persist network traffic in binary data format.

No relation decomposition, no querying of particular attributes of particular messages. No implicit transactions complying with ACID. No indexes and so on. You can find the reasons in the preface, we just hope that we know what we do :-)

Cassandra

The first evaluated storage is Cassandra. It's famous NoSQL database, commercial support provides virtual confidence for fast support and so on.

Cassandra is somewhere in the middle between schemaless and schemaware (SQL like) databases as it supports schema and CQL query language. I was pretty surprised when I have saw features like time series with dynamic columns, build-in lists and sets. Next good stuff is OpsCenter monitoring tool, I always show some screenshots from the application when I want to demonstrate easy-of-use monitor for some distributed app. Pretty nice.

Hardware

Hardware used within the tests:
  • DB cluster:
    • HP Proliant BL460c gen 8, 32core 2.6 GHZ, 32GB RAM, W12k server
    • HP Proliant BL460c gen 8, 32core 2.6 GHZ, 192GB RAM, ubuntu 12 server
  • Tests executor:
    • xeon, 16 cores, 32gb ram, w8k server
  • 10Gbps network
  • Java 7

Performance Measurement of Cassandra Cluster

The setup used one seed and one replica, java tests use datastax driver. Replication factor two when not explicitly defined. After some issues:


... my final CQL create script was:
  1. CREATE COLUMNFAMILY recording (
  2. PARTKEY INT,
  3. ID uuid,
  4. VS_ID uuid,
  5. URL ascii,
  6. MESSAGE_TIME BIGINT,
  7. DATA BLOB,
  8. PRIMARY KEY((PARTKEY, VS_ID), ID)
  9. );

Batch Size


Datastax driver allows to use queries in batches, how does it scale?

Blob size \ Batch Size [TPS]1282565121024819632768
100 bytes (8 clients)67 10063 90055 40055 80023 500Timeouted
1 kb (4 clients)15 60020 60020 30019 4005 500N/A
5 kb (1 client)N/A8 3009 20010 0003 000

256 rows seems like best size of the batch.

Number of Connections


The tests utilizes variable count of connections to the Cassandra cluster. Note that the batch size was 256 and consistency was set to ONE.


Blob size \ Connections [TPS]124824
100 b27 00053 00075 00062 00062 000
1 kb17 90031 00042 00040 000--
5 kb9 00014 60016 00010 300--
20 kb3 3004 5003 800

You can see that 2 or 4 connections per one client are enough whilst one connection slows down the overall performance.

Number of Partitions


The table contains partition key, see the CQL code snippet. The test generates variable values for the partition key. Batch size was also 256 and consistency one.

Blob size \ Partitions [TPS]1864
100 b58 00071 00070 000
10 kb13 20091007 300

Partition key says where the data will reside, it defines target machine. As two nodes are involved in my tests, its better to use more than one partition.

Variable Consistency


Tuning of consistency is the most beautiful attribute in Cassandra. You can define consistency level in every aspect of the communication with cassandra driver in any language. There are different levels, see the documentation.

The test used 8 partitions, 8 connections and 256 batch size.

Blob size \ Consisency [TPS]AnyOneTwo
100 b67 00066 00066 000

Well, I'm surprised. There is almost no difference between consistency levels for such blob size despite totally different purpose or implementation. This could happen due to almost ideal environment, very fast network and machines as well.

Variable Replicas


I used two replicas by default but this tests tries to reveal the price for second copy of the data.

Blob size \ Replicas [TPS]12
100 b121 00072 000
10 kb19 2008 200

There is apparent penalty for second replica.

Long Running


Previous tests used a few millions of inserts/rows but the current one uses hundred of millions. This is the way how to definitely omit the advantage of various caches and clever algorithms. This simulates heavy weight load test to the server.

8 partitions, 8 connections, 256 batch size.

Blob size \ Replicas+Consistency [TPS]1+ONE2+ONE2+TWO
100 b (1 billion)121 00063 00062 00
10 kb (30 m)--8 200--

Large Messages


The largest message in these tests has 20 kb so far, but what is the behavior when our app would meet some large file as a attachment?

Consistency One, 2 partitions, 2 connections, 2 batch size.

Blob size  [TPS]
500 kb90
5 mb9.5

Occupied Space


Apache Cassandra automatically enables compression since 1.1 version.


Blob size \ [Occupied space in bytes per message]
100b89b

Here is the structure of serialized entity.

CREATE COLUMNFAMILY recording (
    PARTKEY INT,
    ID uuid,
    VS_ID uuid,
    URL ascii,
    MESSAGE_TIME bigint,
    DATA blob,
    PRIMARY KEY((PARTKEY, VS_ID), ID)
);

Conclusion

This kind of testing is almost opposite in comparison to neflix tests. They analyzed scalability but we have done testing of simple cluster having two machines only. We need to figure out the performance of one node (obviously in cluster deployment).

I was impressed as the numbers above are good, the overall throughput on the network reached hundred mbps coming out from one client. 

What next. Same kinds of tests agains Kafka, Redis, Couchbase, Aerospike, Couchbase.

Friday, June 13, 2014

Kryonet: simple but super-fast TCP communication in Java + performance benchmarks

We have designed cluster-aware and cloud-based solution of service virtualization for some time. Incredible work as distributed system is next dimension of thinking - and learning as well :-)

We was deciding to split atomic piece of our domain work to multiple nodes. Such step would hopefully provide us to point certain messages to certain nodes. We can than store data in main memory and avoid distributed locking above some remote store, probably Redis. This is usual approach how to achieve better response time of the whole solution to avoid database search for all calls. Obviously with well designed replication.

I was really worried about latency. We need really low response time in couple of milliseconds at most under heavy load. I'm talking about the whole solution which consumes CPU a lot because of the calculations of simulated response under the hood, so no space for any wasting.

What is the best latency of such transmission between two nodes? What is the throughput of one or multiple connections? Except theoretical analysis I also did our own proof of concept to verify our thoughts in the reality.

I like kryo serialization framework. It's very fast, very simple and the boot of anybody is in level of minutes. Hopefully I discovered that guys from Esoteric Software think about remote communication and they did very logic step. They developed framework allowing TCP communication using kryo serialization - kryonet.

Well, I believe that nobody is surprised that the solution is very fast and super easy as well. Like kryo itself.

There are some benchmarks how TCP communication can be fast, in level o microseconds, but what is the real response time, performance, when I need to transfer some domain object to another node in real environment using gigabit network. I mean no laboratory environment but just real one where our app will probably run in couple of months?

Well, in short it's hundred of microseconds.

Kryonet via Kryo

It is built above java nio, kryo serialization and asynchronous approach when the response is being received.
It's better to read the manual on github as coding around the sending and the receiving is very easy.

Kryo preserves the way that the serializators are separated classes, later registered to the system, so you do no have to affect your domain classes. But obviously, you have to open them to support injections of fields from outside. Or you can use the reflection too.

The response is received asynchronously to the processing of the original request. Kryonet exposes TCP port on the server side and uses TCP under the hood. But don't worry, you almost can't touch complexity of TCP messaging.

Number meaning and setup

What was my message? It contains identifier, uuid, and map of elements. The element contains the data but the important thing is the overall size of the message.

Response time is the whole round-trip:
  1. Serialization on the client side
  2. To send the request to the server
  3. Deserialization of the message on server side
  4. To respond the message identifier back to the client
  5. The deserialization of received message on the client size
Well, the response time is just the number you can expect when you want to push some message to remote component and wait for the acknowledgment - ACK.

Both client and server are HP Proliant, 24 and 32 cores, >1 gigabit network.

Performance Benchmarks

One TCP Connection

First of all, I used one TCP connection.

Message Size [bytes]/Number of elementsAverage RT [micro-seconds]Median RT [micro-seconds]Buffer sizeThroughput [msgs/s]Rough Load [msgs/s]
48 / 2190160256 B5400100k
48 / 220217016 kB5400100k
48 / 26721571kB183001M
48 / 241813016kB193004M
3216 / 20031631316 kB6501k
3216 / 200332331256 B-1k

Message size: size of serialized message into raw byte[].

Average vs. median: as my test keeps all response time value, it was very interesting to analyze them. Average value reflects all edge cases, mostly the divergence during the flushing of buffers etc., but median value does not. What does it mean in real? The difference between average and median value indicates growing number of slower responses.

Buffer: kryo uses serialization and object buffer, see the documentation.

Rough load: all calls are asynchronous which means that I had to limit incoming load generated by my generator. The approach I chose was to call standard java thread sleep after certain messages. The operation in milliseconds is not quite accurate but it was fine for rough limitation of input load. 4M means that the generator thread sleeps for 1 milliseconds once it generates four thousand of messages.

The discussion:
  • there is only two times slower response time for 100 times larger message
  • there is almost no different response time for various buffer sizes
  • increasing load means almost the same median value but highly increased average response time. Well, the response time is not stable at all which indicates that you can't rely on response time in the case of massive load

Multiple Client Connections to One TCP Server Port

The approach having multiple clients using one TCP port usually scales well, but not for kryonet as it uses limited buffer size, see following table. I used 10 concurrent clients for the measure.

Message Size [bytes]/Number of elementsAverage RT [micro-seconds]Median RT [micro-seconds]Buffer size (client/server)Throughput [msgs/s]Rough Load [msgs/s]
48 / 225625416k/16k90001k
48 / 258053516k/160k420005k
48 / 2330002300016k/300k8000010k

The discussion:
  • kryo itself is very fast but single threaded deserialization brings certains limits so huge throughput was achieved for the cost of really high latency
  • the increasing of the buffer size within the server allows to serve much more requests as they just fall into buffer and were not rejected as before for lower buffer size

Multiple Connections to Multiple Ports

The last includes multiple clients connected to the multiple server ports. It brought the best performance as expected.

Message Size [bytes]/Number of elementsConnectionsAverage RT [micro-seconds]Median RT [micro-seconds]Buffer size (client/server)Throughput [msgs/s]Rough Load [msgs/s]
48 / 210542486256/25654000100k
48 / 210500051816k/16k155000500k
48 / 2202008142316k/32k210000500k
48 / 250146040416k/32k182000100k
48 / 2502024378256k/256k220000100m
48 / 21003120404256k/256k231000100m
3216 / 200104053688k/32k210003k
3216 / 2001041535316k/32k210003k
3216 / 2002039935016k/32k323002k
3216 / 2005038032716k/32k437001k
3216 / 2005041034464k/128k780002k

The discussion:

  • incredible load for 100/50 clients per second in no bad time as I would originally expected
  • as I've already discuss the stuff above, there is huge different between median and average RT as well
  • there is no such difference between 20 and 100 clients in throughtput but in average/median RT

Resources

Well, usually the transport of a message from one to another node is not your business case. It's important that both client and server is able to do it's core logic - message processing of received throughput is always the core.

One client does not almost burden the server, see htop screenshot for the most approach having the best throughput:


On the other hand, 10 concurrent clients to one server side port is almost the same even if the number of transported messages is 5x time higher:


As you can see, 50 concurrent clients and ports is more likely theoretical stuff for this measure than usable solution as the server do not any other thing then deserialize incoming throughtput :-)


Conclusion

It's fast and easy. Almost 20k transmitted small messages for one client per second is incredible number for me. On the other hand, 150-200 micro seconds response time is surprise for me as well.

The only thing to consider is to carefully define the size of all buffers. There is no queuing and just the rejection of a message can happen if the connection is under heavy load.

Sunday, May 4, 2014

Jasně cílených 25 + 5 minut v pomodoro

Když bude chtít netrénovaný jedinec uběhnout pět kilometrů, velmi pravděpodobně zažije lepší požitek ze sportu, když si místo celého běhu najednou rozkrájí trať na 10 pětisemetrových částí a mezi nimi udělá určitý odpočinek.

Proč? Doběhnout k další lampě na cyklostezce je pro psychiku - a také fyzičku - méně náročný úkol než doběhnout do další vesnice a pak zpět. Psychiku se stejně jako fyzičku můžete vyčerpat.

Takhle nějak funguje pomodoro v práci.

O co jde?

Pomodoro je celkem starý a jednoduchý postup, který velmi napomáhá mojí efektivitě. Je to vlastně jediná věc, kterou jsem zkusil a používal i po pár dnech dál. Respektive pomodoro pomáhá mému soustředění a schopnosti řešit smysluplně problémy.

Pomodoro říká, že byste měli dvacetpět minut pracovat a pět minut dělat něco jiného - odpočinout si od té hlavní činnosti - a takto dokola. Po třech opakováních si dát větší patnáctiminutovou pauzu. Takhle lze rozkrájet celý den.

Jak to funguje?

Přijdete do práce, pustíte čas a pracujete dvacetpět minut. Tento časový úsek je dedikován vaší přímé práci, žádné rozptylování, žádné odbíhání, čistý focus na prácí. Po pětadvaceti minutách je čas pět minut na pauzu. Její výplň záleží na každém z nás. Důležité je, aby tento úsek trávil člověk něčím jiným než tou konkrétní prací.

Časy lze při troše dobré vůle poměnit. Přece jen, když mám nosou myšlenku a bije mi 24:59, neodejdu, abych ji zapomněl :-) Na druhou stranu vypustit ale pauzu je nežádoucí, protože v ní je kouzlo pomodora.

Proč mi to funguje?

Určitě každý znáte moment, kdy se rozjedete v práci, hloubáte a hloubáte nad nevyřešitelným problémem a najednou je konec dne a jdete domů. Práce vás pohltila, ale ten problém jste nerozlouskli. Pravděpodobně se vám to povede druhý den ráno. Krom psychické únavy to má ještě jeden faktor. Přes stromy nevidíme les.

Když člověk kontinuálně přemýšlí, prochází slepé cestičky problému, mozek je v určitém kontextu. Jeho granularita je na úrovni těch dílčích řešených problémů - nikoli celku. Během pauzy z toho kontextu logicky vypadne. Když uplyně pět minut a člověk začne hloubat znovu, musí si kontext obnovit. To se mu povede tím, že začne nad problémem znovu přemýšlet, sestupovat hlouběji a hlouběji. A tohle je právě ono.

Jakmile znovu sestupujete do toho kontextu, procházíte věci znova a dost často vás napadne něco jiného, jiné řešení, jiné problémy nebo postupy. To mi většinou vede k rozuzlení celé úlohy.

Efektivita přemýšlení

Podstatnou věcí je efektivita sama o sobě. Mozek má jisté limity. Nedokáže se plně soustředit a přemýšlet hodiny po sobě, tak jako fyzická práce potřebuje odpočinek, tak i psychická. Všechno je to logicky vysvětlené v Pragmatic Thinking and Learning.

Plánovat dvou, tří nebo dokonce čtyřhodinový meeting je mrháním času. Nejenže nevznikne žádný tlak na řešení daných problémů jednoduše proto, že tolik času svádí k řešení všeho okolo, taktéž každý na daném meetingu se bude efektivně soustředit maximálně minut čtyřicet.

A pomodoro řeší kousknováním i tohle.

Jak to používám?

25 minut práce

Na barcampu někdo říkal, že jeho den začíná kafem vedle notebooku. Můj den začíná tím, že kliknu na rajské jablko v chromu a začne mi odpočítávat čas. Jelikož mám vidinu dvacetipěti minut práce před sebou a ne osmi, deseti či dvanácti hodin, nenechám se snadno rozptýlit.

Nejdu číst weby, sociální sítě, nejdu vařit čaj, nejdu jíst ani nejdu přemýšlet na víkendem. Jednak mám v hlavě focus na těch krátkých dvacet pět minut, které chci strávit efektivně, druhak se mi odpočítávaný čas krátí.

Proč používám chromový doplněk? Má to jednu super výhodu. Blokování stránek. Když už mám slabší chvilku a chtěl bych se podívat na facebook, pomodoro doplněk mi to neumožní :-) Prostě stránku nezobrazí, resp. zobrazí přes ní logo rajčete. Tohle funguje ikdyž jste měli tu stránku otevřenou předem, prostě se překryje šedým podkladem a logem. Má to nedozírné nasledky :-) Například vám to neumožní pokračovat v rozjetém dialogu na sociální síti. Máte na dvacetpět minut smůlu :-)

5 minut pauza

Tohle je asi nejlepší část. Přišlo mi škoda koukat z okna a relaxovat. Proto těchto pět minut trávím prací, ke které bych se normálně nedostal. Má jiný kontext, takže mi to funguje!

Moje typická normální praxe je taková, že člověk začne pracovat na nové věci a za tři měsíce zjistí, že neodpovídal celou dobu na emaily, které se netýkaly té nove featury. Těch pět pauzovních minut mi tak slouži na dělání dílčích úkolů. Odpovědět na dotaz kolegovi, naplánovat schůzku, zavolat klempířům na parapety nebo si vložit do getpocket články.

Pokud totiž pracujete 8 hodin denně, máte takových pauz celkem 16. Za to lze už stihnout hodně dalších věcí. Vzhledem ke zefektivnění práce byste tenhle čas stejně pravděpodobně nevyužili moc dobře.

Pokud nejsem v dohledu chromu, používám clockwork tomato pro android. Úplně se nabízí integrace s nějakou toolou na management tásků, třeba kanbanflow tohle pěkně umí.

Shrnutí

Dost mi to pomáhá, řeší mi to totiž hodně problémů:
  1. Efektivita, rozptylování, odcházení od problému
  2. Dokážu zase vidět les přes stromy, nezapadnu nesmyslně do problému na celé hodiny
  3. Dokážu dělat věci, ke kterým bych se nedostal a stále je odsouval na budoucno
  4. Jelikož ten čas dělím na kratší úseky, nemám pocit, že budu zase celý den v práci
Pomodoro má jen jedno pravidlo 25+5, tolikrát vzpomínané v tomhle článku. Tedy žádný overmanagement - to je také velká výhoda.