Table of Contents
We've finally released a new version of Karafka framework. Apart from many tweaks and bug fixes it contains three huge improvements:
- ApplicationWorker and ApplicationController
- Routing engine
- Offset commit after data processing (not only after fetching)
In this article I will focus on the routing engine.
First version of Karafka framework routed incoming messages based on controllers names. For example:
- videos topic => VideosController
- movies_subtitles => Movies::SubtitlesController
There was also a possibility to change that by setting the topic property of a controller:
class SuperController < Karafka::BaseController self.topic = :my_data end
Unfortunately it turned out that this approach had way more disadvantages than advantages:
- Fat, non-isolated routing logic bound tightly to controllers
- No possibility to use inheritance
- No possibility to use same controller logic for multiple routes (at least not directly)
- A lot non-controller related logic that had to be there because of consistency of the framework DSL (interchanger, group, etc)
Luckily those times are gone and now we have a nice shiny routing engine.
Note: If you're not familiar with terms like topic, producer, etc - please refer to an excellent post about that: Kafka for Rubyists - Part 1
Karafka routing basics
Karafka has a "Rails like" routing that allows you to describe how messages from each topic should be handled. Such separation layer (between topics, controllers and workers) allows you to have better control over the message "flow".
App.routes.draw do topic :example do controller ExampleController end end
By default Karafka requires topic and controller. Note that controllers should be provided as class names, not as strings/symbols (like in Rails). For most of the cases this will be all you need. Everything else will be built automatically under the hood.
Kafka has a flat topic structure. There are no namespaces or nested topics (however it might change in the future, since there's a discussion about that). It means that your Karafka routes will be flat as well: single topic block and within all the details on how to handle messages from it.
There are following options that you can use inside a topic block:
- Controller (required)
- Group (optional)
- Worker (optional)
- Parser (optional)
- Interchanger (optional)
Here's an example on how to use all the options together
App.routes.draw do topic :binary_video_details do group :composed_application controller Videos::DetailsController worker Workers::DetailsWorker parser Parsers::BinaryToJson interchanger Interchangers::Binary end
Karafka controller is similar to Rails controller. It is responsible for handling each of incoming messages.
Taken from Apache Kafka documentation:
Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If you're not planning to build multiple applications (not processes) from which only one needs to consume a given message, don't worry about that option at all. Karafka will take care of that for you.
Karafka by default will build a worker that will correspond to each of your controllers (so you will have a pair - controller and a worker). All of them will inherit from ApplicationWorker and will share all its settings.
You can overwrite this by providing your own worker. Just keep in mind, that if it does not inherit from ApplicationWorker it won't have interchanging, unparsing logic and it won't execute your controller #perform code. In cases like that you'll have to implement your business logic inside the worker.
Kafka is a cross-platform tool. It means that when integrating with other applications, you might not receive JSON data (producer dependent). Custom parsers allow you to handle any data format you specify. Karafka by default will parse messages with JSON parser. Custom parser needs to have a #parse method and raise error that is a ::Karafka::Errors::ParserError descendant when problem appears during parsing process. Parsing failure won't stop the application flow. Instead, Karafka will assign the raw message inside the :message key of params. That way you can handle raw message inside the Sidekiq worker (you can implement error detection, etc - any "heavy" parsing logic can and should be implemented there).
Custom interchangers target issues with non-default (binary, etc) data that we want to store when we do #perform_async. This data might be corrupted when fetched in a worker (see this issue). With custom interchangers, you can encode/compress data before it is being passed to scheduling and decode/decompress it when it gets into the worker.
Note: It is not equal to parsers. Parsers parse incoming data and convert them to "Ruby friendly" format. Interchangers allow you to add an additional layer of encoding, so for example your binary data won't be escaped by Sidekiq.
Warning: if you decide to use slow interchangers, they might significantly slow down Karafka.
Karafka's routing engine might seems simple, however it is simple only until you decide to use all the non-required features.
Thanks to this approach you can prototype and design your application flow really fast, however on demand you can also handle any non-standard case.
Is the current routing engine a final one? To be honest it is strongly dependent on Apache Kafka's development. We will see in the future :-)