Tag: karafka framework

Ruby: Karafka framework 0.4 – Routing engine

13313336

We've finally released a new version of Karafka framework. Apart from many tweaks and bug fixes it contains three huge improvements:

  • ApplicationWorker and ApplicationController
  • Routing engine
  • Offset commit after data processing (not only after fetching)

In this article I will focus on the routing engine.

Pre-routing times

First version of Karafka framework routed incoming messages based on controllers names. For example:

  • videos topic => VideosController
  • movies_subtitles => Movies::SubtitlesController

There was also a possibility to change that by setting the topic property of a controller:

class SuperController < Karafka::BaseController
  self.topic = :my_data
end

Unfortunately it turned out that this approach had way more disadvantages than advantages:

  • Fat, non-isolated routing logic bound tightly to controllers
  • No possibility to use inheritance
  • No possibility to use same controller logic for multiple routes (at least not directly)
  • A lot non-controller related logic that had to be there because of consistency of the framework DSL (interchanger, group, etc)

Luckily those times are gone and now we have a nice shiny routing engine.

Note: If you're not familiar with terms like topic, producer, etc - please refer to an excellent post about that: Kafka for Rubyists - Part 1

Karafka routing basics

Karafka has a "Rails like" routing that allows you to describe how messages from each topic should be handled. Such separation layer (between topics, controllers and workers) allows you to have better control over the message "flow".

App.routes.draw do
  topic :example do
    controller ExampleController
  end
end

By default Karafka requires topic and controller. Note that controllers should be provided as class names, not as strings/symbols (like in Rails). For most of the cases this will be all you need. Everything else will be built automatically under the hood.

Kafka has a flat topic structure. There are no namespaces or nested topics (however it might change in the future, since there's a discussion about that). It means that your Karafka routes will be flat as well: single topic block and within all the details on how to handle messages from it.

There are following options that you can use inside a topic block:

  • Controller (required)
  • Group (optional)
  • Worker (optional)
  • Parser (optional)
  • Interchanger (optional)

Here's an example on how to use all the options together

App.routes.draw do
  topic :binary_video_details do
    group :composed_application
    controller Videos::DetailsController
    worker Workers::DetailsWorker
    parser Parsers::BinaryToJson
    interchanger Interchangers::Binary
  end

Karafka controller

Karafka controller is similar to Rails controller. It is responsible for handling each of incoming messages.

Karafka group

Taken from Apache Kafka documentation:

Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If you're not planning to build multiple applications (not processes) from which only one needs to consume a given message, don't worry about that option at all. Karafka will take care of that for you.

Karafka worker

Karafka by default will build a worker that will correspond to each of your controllers (so you will have a pair - controller and a worker). All of them will inherit from ApplicationWorker and will share all its settings.

You can overwrite this by providing your own worker. Just keep in mind, that if it does not inherit from ApplicationWorker it won't have interchanging, unparsing logic and it won't execute your controller #perform code. In cases like that you'll have to implement your business logic inside the worker.

Karafka parser

Kafka is a cross-platform tool. It means that when integrating with other applications, you might not receive JSON data (producer dependent). Custom parsers allow you to handle any data format you specify. Karafka by default will parse messages with JSON parser. Custom parser needs to have a #parse method and raise error that is a ::Karafka::Errors::ParserError descendant when problem appears during parsing process. Parsing failure won't stop the application flow. Instead, Karafka will assign the raw message inside the :message key of params. That way you can handle raw message inside the Sidekiq worker (you can implement error detection, etc - any "heavy" parsing logic can and should be implemented there).

Karafka interchanger

Custom interchangers target issues with non-default (binary, etc) data that we want to store when we do #perform_async. This data might be corrupted when fetched in a worker (see this issue). With custom interchangers, you can encode/compress data before it is being passed to scheduling and decode/decompress it when it gets into the worker.

Note: It is not equal to parsers. Parsers parse incoming data and convert them to "Ruby friendly" format. Interchangers allow you to add an additional layer of encoding, so for example your binary data won't be escaped by Sidekiq.

Warning: if you decide to use slow interchangers, they might significantly slow down Karafka.

Summary

Karafka's routing engine might seems simple, however it is simple only until you decide to use all the non-required features.

Thanks to this approach you can prototype and design your application flow really fast, however on demand you can also handle any non-standard case.

Is the current routing engine a final one? To be honest it is strongly dependent on Apache Kafka's development. We will see in the future :-)

Benchmarking Karafka – how does it handle multiple TCP connections

Recently I've released a Ruby Apache Kafka microframework, however I don't expect anyone to use it without at least a bit information on what it can do. Here are some measurements that I took.

How Karafka handles multiple TCP connections

Since listening to multiple topics require multiple TCP connections it is pretty obvious that in order to obtain a decent performance, we are using threads (process clustering feature is in progress). Each controller that you create theoretically could have a single thread and could listen all the time. However with a bigger application, it could slow down the application. That's why we introduced topics clusterization. When you config your Karafka application, you should specify the concurrency parameter:

class App < Karafka::App
  setup do |config|
    # Other config options
    config.max_concurrency = 10 # 10 threads max
  end
end

This is a maximum number of threads that will be used to listen for incoming messages. It is pretty simple when you have less controllers (topics) than threads - it will just use a single thread per topic. However if you have more controllers then threads - few connections will be packed in a single thread (wrapped with Karafka::Connection::ThreadCluster). And this is how it works when you have 2 threads and 4 controllers:

clusters

In general, it will distribute TCP connections across threads evenly. So, if you have 20 controllers and 5 threads, each thread will be responsible for checking 4 sockets, one after another. Since it won't do this simultaneously, Karafka will slow down. How much? It depends - if there's something on each of the topics - you will get around 24% (per controller) of the base performance out of each connection.

Other things that have impact on the performance

When considering this framework's performance, you need to keep in mind that:

  • It is strongly dependent on what you do in your code
  • It depends also on Apache Kafka performance
  • Connection between Karafka and Redis (for Sidekiq) is a factor as well
  • All the benchmarks show the performance without any business logic
  • All the benchmarks show the performance without enqueuing to Sidekiq
  • It also depends on what type of infrastructure you benchmark everything
  • Message size is a factor as well (since it get deserialized to JSON by default)
  • Ruby version - I've been testing in on MRI (CRuby) 2.2.3 - Karafka is not yet working with other Ruby distributions (JRuby or Rubinius) but it should change when some of the dependencies stop using refinements

Benchmarking

Methodology

For each of the benchmarks I was measuring time taken to consume all messages that were stored in Kafka. There were no business logic involved (just messages processing by the framework). My local Kafka setup was a default setup (no settings were changed) introduced with this Docker containers.

I've tested up to 5 topics - each with 1 000 000 messages loaded. Since Karafka has lazy loading for params - benchmark does not include time that is needed to unparse the messages. Unparsing performance strongly depends on a parser you pick (defaults to JSON) and messages size. Those benchmarks measure maximum throughput that we can get during messaging receiving.

Note: all the benchmarking was performed on my 16GB, 4 core i7 processor, Linux laptop. During the benchmarking I've been performing other tasks that might have small impact on overall results (although  no heavy stuff).

1 thread

With a single thread it is pretty straightforward - the more controllers we have, the less we can process per controller. There's also controllers context switching overhead that consumes some of the power, allowing us to consume less and less. Switching between controllers seems to consume around 11% of a single controller performance when we tend to use more than 1 controller in a single threaded application.

Zrzut ekranu z 2015-11-02 17:50:46
Context switching between controllers in a single thread will cost us around 1% of a general performance per one additional controller (if you're eager to know what we're planning to do with it scroll down to the summary). On one side it is a lot, on the other, with a bigger application you should probably run Karafka in multithreaded mode.. That way context switching won't be as painful.

2 threads

Zrzut ekranu z 2015-11-02 18:12:37
General performance with 2 threads and 2 controllers proves that we're able to lower switching impact on a overall performance, gaining around 1.5-2k requests per second (overall).

3 threads

Zrzut ekranu z 2015-11-02 18:23:13
5 controllers with 3 threads vs 5 controllers with 1 thread: 7% better performance.

4 threads

Zrzut ekranu z 2015-11-02 18:32:40

5 threads

Zrzut ekranu z 2015-11-02 18:33:33

Benchmarks results

Summary

The overall performance of a single Karafka framework process is highly dependent on the way it is being used. Because of GIL, when we receive data from sockets, we can only process incoming messages from a single socket at a time. So in general we're limited to around 30-33k requests per second per process. It means that the bigger the application gets, the slower it works (when we consider total performance per single controller). However this is only valid when we assume that all the topics are always full of messages. Otherwise we don't process, we wait on the IO and Ruby can process incoming messages from multiple threads. That's why it is worth starting Karafka with a decent concurrency level.

How can we increase throughput for Karafka applications? Well for now, we can create multiple partitions for a single topic and spin up multiple Karafka processes. Then they will load balance between partitions automatically. This solution has one downside: if we have only few topics with multiple partitions and rest with a single one, then some of the threads in Karafka won't perform any work. This will be fixed soon (we're already working on it), when we will introduce a Karafka processes clustering. It will allow to spin up multiple Karafka processes (in a single cluster) that will listen only for a given part of controllers. That way the overall performance will increase significantly. But still being able to perform 30k rq/s is not that bad, right? ;)

Copyright © 2025 Closer to Code

Theme by Anders NorenUp ↑