Kafka among other things provide a way to subscribe to multiple topics using the single KafkaConsumer instance. I never though how it works and just assumed that all messages will be ideally read from all subscribed topics and that there is nothing else to it.
Here I will do some experiments which should prove or disprove my initial understanding and hopefully be useful to some random reader having the same question.
- Setup
- Scenario 1 – Writing to topics one after another
- Scenario 2 – Writing to topics one after another using bigger messages
- Scenario 3 – Concurrent writing and reading
- Scenario 4 – Reading data written from topics written at different times
- Scenario 5 – Reading data with with introduced processing time – big messages
- Scenario 6 – Reading data with with introduced processing time – small messages
Setup
The setup will include only single Kafka broker instance running withing the docker (started by docker-compose) and a simple Scala application using the standard Kafka driver used to write and read from Kafka.
I have also created some kind of simple DSL that allows me to simply change the write/read behavior without spending a lot of time coding. The code should also be reasonably understandable readable to random viewer.
All topics are created with only one partition.
With all the out of the way, let’s do some testing!
Scenario 1 – Writing to topics one after another
This test will perform the following steps:
- delete topics
- write 100k messages into “topic-1”
- write 100k messages into “topic-2”
- read messages from both topics with one KafkaConsumer instance
Here is the actual code
for
_ <- deleteTopics("topic-1", "topic-2")
_ <- writeRange("topic-1", 0 until 100000)
_ <- writeRange("topic-2", 0 until 100000)
_ <- read(Topics("topic-1", "topic-2"), logMsg)
yield ()
Outcome of the test:
Started reading topic name=topic-1
Switched to topic name=topic-2, afterMsg=13728
Switched to topic name=topic-1, afterMsg=13728
Switched to topic name=topic-2, afterMsg=13443
Switched to topic name=topic-1, afterMsg=13443
Switched to topic name=topic-2, afterMsg=13443
Switched to topic name=topic-1, afterMsg=13443
Switched to topic name=topic-2, afterMsg=13443
Switched to topic name=topic-1, afterMsg=13000
Switched to topic name=topic-2, afterMsg=57
Switched to topic name=topic-1, afterMsg=443
Switched to topic name=topic-2, afterMsg=13386
Switched to topic name=topic-1, afterMsg=13443
Switched to topic name=topic-2, afterMsg=13443
Switched to topic name=topic-1, afterMsg=13443
Switched to topic name=topic-2, afterMsg=13443
Switched to topic name=topic-1, afterMsg=13443
Switched to topic name=topic-2, afterMsg=5614
Here we can see that read’s are pretty much spread evenly and that is exactly what I expected. One think that is potentially problematic is that Kakfa started reading data from “topic-2” after it has consumed 13728 messages from “topic-1”.
My current assumption is that this is so because the messages are small that they all fit in the same Kafka segment.
Scenario 2 – Writing to topics one after another using bigger messages
This test will perform the following steps:
- delete topics
- write 100k messages into “topic-1” of size 10KiB + key
- write 100k messages into “topic-2” of size 10KiB + key
- read messages from both topics with one KafkaConsumer instance
Here is the actual code
for
_ <- deleteTopics("topic-1", "topic-2")
_ <- writeRange("topic-1",
0 until 100000, valueSize = 10.k)
_ <- writeRange("topic-2",
0 until 100000, valueSize = 10.k)
_ <- read(Topics("topic-1", "topic-2"), logMsg)
yield ()
Outcome of the test:
Started reading topic name=topic-1
Switched to topic name=topic-2, afterMsg=101
Switched to topic name=topic-1, afterMsg=101
Switched to topic name=topic-2, afterMsg=101
Switched to topic name=topic-1, afterMsg=101
Switched to topic name=topic-2, afterMsg=101
Switched to topic name=topic-1, afterMsg=101
Switched to topic name=topic-2, afterMsg=101
Switched to topic name=topic-1, afterMsg=101
Switched to topic name=topic-2, afterMsg=101
Switched to topic name=topic-1, afterMsg=101
Switched to topic name=topic-2, afterMsg=101
Scenario 3 – Concurrent writing and reading
This test will perform the following steps:
- delete topics
- create topics
- write 100k messages into “topic-1”
- write 100k messages into “topic-2”
- read messages from both topics with one KafkaConsumer instance
Thing to note here is that all steps marked in bold are concurrent (running in parallel).
Here is the actual code
for
_ <- deleteTopics("topic-1", "topic-2")
_ <- createTopics("topic-1", "topic-2")
_ <- writeRange("topic-1", 0 until 100000).newThread
_ <- writeRange("topic-2", 0 until 100000).newThread
readThread <- read(Topics("topic-1", "topic-2"),
logMsg).newThread
_ <- readThread.join
yield ()
Outcome of the test:
Started reading topic name=topic-1
Switched to topic name=topic-2, afterMsg=14320
Switched to topic name=topic-1, afterMsg=14320
Switched to topic name=topic-2, afterMsg=14169
Switched to topic name=topic-1, afterMsg=14169
Switched to topic name=topic-2, afterMsg=1252
Switched to topic name=topic-1, afterMsg=1258
Switched to topic name=topic-2, afterMsg=274
Switched to topic name=topic-1, afterMsg=47
Switched to topic name=topic-2, afterMsg=6
Switched to topic name=topic-1, afterMsg=4
Switched to topic name=topic-2, afterMsg=2
Switched to topic name=topic-1, afterMsg=3
Switched to topic name=topic-2, afterMsg=4
...
Here we can see that even if writers started at the same time as readers, the reader did not start reading right away but waited a bit and then quickly read all the hole segments until it caught up with writers and started reading at rate of few messages until it "depleted" all values.
Scenario 4 – Reading data written from topics written at different times
This test will perform the following steps:
- delete topics
- create topics
- write 100k messages into “topic-1”
- sleep 10 seconds
- write 100k messages into “topic-2”
- sleep 10 seconds
- read messages from both topics with one KafkaConsumer instance
Thing to note here is that all steps marked in bold are concurrent (running in parallel).
Here is the actual code
for
_ <- deleteTopics("topic-1", "topic-2")
_ <- createTopics("topic-1", "topic-2")
_ <- writeRange("topic-1", 0 until 100000)
_ <- 10.seconds.sleep
_ <- writeRange("topic-2", 0 until 100000)
_ <- 10.seconds.sleep
_ <- read(Topics("topic-1", "topic-2"), logMsg)
yield ()
Outcome of the test:
Started reading topic name=topic-1
Switched to topic name=topic-2, afterMsg=14320
Switched to topic name=topic-1, afterMsg=14320
Switched to topic name=topic-2, afterMsg=14169
Switched to topic name=topic-1, afterMsg=14169
Switched to topic name=topic-2, afterMsg=14169
Switched to topic name=topic-1, afterMsg=14169
Switched to topic name=topic-2, afterMsg=14169
Switched to topic name=topic-1, afterMsg=14000
Switched to topic name=topic-2, afterMsg=331
Switched to topic name=topic-1, afterMsg=169
Switched to topic name=topic-2, afterMsg=13838
Switched to topic name=topic-1, afterMsg=14169
Switched to topic name=topic-2, afterMsg=14169
Switched to topic name=topic-1, afterMsg=14169
Switched to topic name=topic-2, afterMsg=14169
Switched to topic name=topic-1, afterMsg=14169
Switched to topic name=topic-2, afterMsg=666
Here we can see that the order and time of writing did not effect consumers. The reads are again evenly distributed.
Scenario 5 – Reading data with with introduced processing time – big messages
This test will perform the following steps:
- delete topics
- create topics
- write 100k messages into “topic-1” with size of around 10 KiB
- sleep 10 seconds
- write 100k messages into “topic-2” with size of around 10 KiB
- sleep 10 seconds
- read messages from both topics with one KafkaConsumer instance
Thing to note here is that all steps marked in bold are concurrent (running in parallel).
Here is the actual code
for
_ <- deleteTopics("topic-1", "topic-2")
_ <- createTopics("topic-1", "topic-2")
_ <- writeRange("topic-1", 0 until 100000, valueSize = 10.k)
_ <- writeRange("topic-2", 0 until 100000, valueSize = 10.k)
_ <- read(Topics("topic-1", "topic-2"),
logAndWaitPerMsg(10.millis))
yield ()
Outcome of the test:
16:31:02.150750: Started reading topic name=topic-1
16:31:03.181957: Switched to topic name=topic-2, afterMsg=101
16:31:04.231740: Switched to topic name=topic-1, afterMsg=101
16:31:05.255858: Switched to topic name=topic-2, afterMsg=101
16:31:06.285245: Switched to topic name=topic-1, afterMsg=101
16:31:07.304847: Switched to topic name=topic-2, afterMsg=101
16:31:08.330779: Switched to topic name=topic-1, afterMsg=101
16:31:09.349215: Switched to topic name=topic-2, afterMsg=101
16:31:10.374301: Switched to topic name=topic-1, afterMsg=101
16:31:11.393402: Switched to topic name=topic-2, afterMsg=101
16:31:12.418908: Switched to topic name=topic-1, afterMsg=101
16:31:13.437646: Switched to topic name=topic-2, afterMsg=101
16:31:14.462465: Switched to topic name=topic-1, afterMsg=101
16:31:15.481864: Switched to topic name=topic-2, afterMsg=101
16:31:16.510095: Switched to topic name=topic-1, afterMsg=101
16:31:17.529953: Switched to topic name=topic-2, afterMsg=101
16:31:18.555394: Switched to topic name=topic-1, afterMsg=101
16:31:19.574136: Switched to topic name=topic-2, afterMsg=101
16:31:20.599742: Switched to topic name=topic-1, afterMsg=101
16:31:21.620856: Switched to topic name=topic-2, afterMsg=101
16:31:22.646568: Switched to topic name=topic-1, afterMsg=101
16:31:23.665490: Switched to topic name=topic-2, afterMsg=101
16:31:24.694856: Switched to topic name=topic-1, afterMsg=101
16:31:25.713488: Switched to topic name=topic-2, afterMsg=101
16:31:26.737737: Switched to topic name=topic-1, afterMsg=101
16:31:27.756081: Switched to topic name=topic-2, afterMsg=101
16:31:28.781007: Switched to topic name=topic-1, afterMsg=101
16:31:29.799748: Switched to topic name=topic-2, afterMsg=101
16:31:30.828161: Switched to topic name=topic-1, afterMsg=101
16:31:31.848497: Switched to topic name=topic-2, afterMsg=101
16:31:32.877235: Switched to topic name=topic-1, afterMsg=101
16:31:33.897322: Switched to topic name=topic-2, afterMsg=101
16:31:34.924867: Switched to topic name=topic-1, afterMsg=101
16:31:35.944497: Switched to topic name=topic-2, afterMsg=101
16:31:36.968909: Switched to topic name=topic-1, afterMsg=101
16:31:37.989789: Switched to topic name=topic-2, afterMsg=101
16:31:39.014912: Switched to topic name=topic-1, afterMsg=101
16:31:40.035438: Switched to topic name=topic-2, afterMsg=101
16:31:41.064633: Switched to topic name=topic-1, afterMsg=101
16:31:42.083196: Switched to topic name=topic-2, afterMsg=101
16:31:43.104632: Switched to topic name=topic-1, afterMsg=101
16:31:44.123003: Switched to topic name=topic-2, afterMsg=101
16:31:45.146874: Switched to topic name=topic-1, afterMsg=101
16:31:46.167833: Switched to topic name=topic-2, afterMsg=101
Here we can see that if the messages are big enough and fit in partition segment, driver will randomly return processing batches for each topic. This means if we have some "slower" consumer, it is better to split the consuming to two separate KafakConsumer instances.
Scenario 6 – Reading data with with introduced processing time – small messages
This test will perform the following steps:
- delete topics
- create topics
- write 100k messages into “topic-1”
- sleep 10 seconds
- write 100k messages into “topic-2”
- sleep 10 seconds
- read messages from both topics with one KafkaConsumer instance
Thing to note here is that all steps marked in bold are concurrent (running in parallel).
Here is the actual code
for
_ <- deleteTopics("topic-1", "topic-2")
_ <- createTopics("topic-1", "topic-2")
_ <- writeRange("topic-1", 0 until 100000)
_ <- writeRange("topic-2", 0 until 100000)
_ <- read(Topics("topic-1", "topic-2"),
logAndWaitPerMsg(10.millis))
yield ()
Outcome of the test:
20:01:50.530211: Started reading topic name=topic-1
20:04:15.103736: Switched to topic name=topic-2, afterMsg=14320
20:06:37.972485: Switched to topic name=topic-1, afterMsg=14180
20:06:41.601656: Switched to topic name=topic-2, afterMsg=360
20:06:43.013961: Switched to topic name=topic-1, afterMsg=140
20:09:02.170501: Switched to topic name=topic-2, afterMsg=13809
20:11:20.113336: Switched to topic name=topic-1, afterMsg=13691
20:11:20.335153: Switched to topic name=topic-2, afterMsg=22
20:11:25.154276: Switched to topic name=topic-1, afterMsg=478
20:13:47.693232: Switched to topic name=topic-2, afterMsg=14147
20:16:07.308123: Switched to topic name=topic-1, afterMsg=13853
20:16:09.160389: Switched to topic name=topic-2, afterMsg=184
20:16:12.344271: Switched to topic name=topic-1, afterMsg=316
20:18:33.208330: Switched to topic name=topic-2, afterMsg=13985
Here with smaller messages it becomes obvious that if each message is processed 10 milliseconds and every segment needs contains many of them; the switching between partitions will take relatively long time. Processing is again evenly distributed globally, but segment take more time to process.