Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.
Table of Contents
- 1 Code quality
- 2 Features
- 2.1 Batch processing
- 2.2 New routing engine and multiple topic consumer groups
- 2.3 #topic reference on a controller level
- 2.4 #params_batch messages with additional Kafka message details
- 2.5 #params_batch and #params lazy evaluation
- 2.6 Long running persistent controllers
- 2.7 Encryption and authentication using SSL and SASL support
- 2.8 Limited consumer groups execution from a single process
- 2.9 Multi process management thanks to Capistrano-Karafka
- 2.10 Processing backends (Inline and Sidekiq)
- 2.11 JRuby support
- 3 Incompatibilities
- 4 Wiki updates
- 5 Other changes
- 6 Getting started with Karafka
It's been over a year, since the last major Karafka framework release (0.5). During that time, we've managed to implement plenty of new features and fix so many bugs, that I don't know where to start...
Today I'm pleased to announce, that we're ready with the final 1.0 release.
Code quality
The quality of our work has always been important to us. Few months ago we've made a transition from polishgeeks-dev-tools to Coditsu. It allowed us to find and fix several new code offenses and to leverage the quality of the code and documentation. Here are some screens on where we were and where we are now:
There are still some things to be fixed and made better, That said, this is the best release we've made not only in terms of features but also in terms of quality of the code and the documentation.
For more details about the quality of the whole Karafka ecosystem, feel free to visit our Karafka Coditsu organization page.
Features
There are more and more companies taking advantage of Karafka as their backend async messaging backbone. Many of the new features were either feature requests or pull requests (including some from Shopify and other big players), that cover both performance and functionality issues existing in Karafka. It's amazing looking into all the use-cases that people cover with this framework.
Batch processing
Believe it or not, but up until now, Karafka didn't have batch processing functionality. It had batch messages receiving option, but each of the messages had to be processed separately. At the beginning we wanted to imitate the HTTP world, where (most of the time) a single request would equal a single controller instance logic execution.
It wasn't the best idea ever. Or maybe it was at the time, but we've clearly noticed, that it took away a huge part of possibilities that Kafka brings to the table.
Luckily those days are gone! From now on you can not only receive messages in batches (which makes Karafka several times faster), but you can also process them that way. The only thing you need to do is set the batch_processing
config option to true
.
You can do this either on an app configuration level:
class App < Karafka::App setup do |config| config.batch_consuming = true config.batch_processing = true # Other options end end
or per each topic route you define:
App.routes.draw do consumer_group :events_consumer do batch_consuming true topic :events_created do controller EventsCreatedController backend :inline batch_processing true end end end
Once you turn this option on, you will have access to a method called #params_batch
that will contain all the messages fetched from Kafka in a single batch.
It's worth pointing out, that a single messages batch always contains messages from the same topic and the same partition.
class EventsController < ApplicationController def perform # This example uses https://github.com/zdennis/activerecord-import Event.import params_batch.map { |param| param[:event] } end end
Keep in mind, that params_batch
is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn't directly flush them into DB.
Note: For more details about processing messages, please visit the Processing messages section of Karafka wiki.
New routing engine and multiple topic consumer groups
Routing engine provides an interface to describe how messages from all the topics should be received and processed.
Karafka routing engine used to be trivial. The only thing you could really do, was defining topics and their options. From now on, there are two modes in which routing API can operate:
- Karafka 0.6+ consumer group namespaced style (recommended)
- Karafka 0.5 compatible consumer group per topic style (old style)
With 0.6+ mode, you can define consumer groups subscribed to multiple topics. This will allow you to group topics based on your use-cases and other factors. It also enables overwriting most of the default settings, in case you need to create a per consumer group specific setup (for example to receive data from multiple Kafka clusters).
App.consumer_groups.draw do consumer_group :group_name do topic :example do controller ExampleController end topic :example2 do controller Example2Controller end end end
Note: For more details about processing messages, please visit the Routing section of Karafka wiki.
#topic reference on a controller level
There are several changes related to the topic itself. The biggest one, is its assignment to a controller class, not to a controller instance. This may not seem significant, but it is. It means, that you no longer should use same controller for handling multiple topics. You can still use #topic
from your controllers instance (no need to do self.class.topic
) - it’s just an alias.
The second big change, is the topic owning consumer group that you can reference as well from the topic. This allows you to discover and programmatically access all the routing details you need just by playing with the topic and consumer group objects:
# From the controller instance level topic.consumer_group.class #=> Karafka::Routing::ConsumerGroup topic.consumer_group.name #=> 'commit_builds' topic.name #=> 'commit_builds_scheduled' # From the console / outside of the controller scope App.consumer_groups.count #=> 3 App.consumer_groups.first.name #=> 'commit_builds' App.consumer_groups.first.topics.count #=> 5
#params_batch messages with additional Kafka message details
Each Kafka message you receive, contains now following extra attributes received from Kafka:
- partition
- offset
- key
- topic
IMHO the most interesting one is the partition key, that can be used when applying ordered changes to any persistent models (key can be used to ensure proper order delivery via Kafka guaranteed partition order feature):
def perform params_batch.each do |param| User.find(param[:key]).update!(param[:user]) end end
#params_batch and #params lazy evaluation
params_batch
is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn't directly flush them into DB. To do so, please use the #parsed params batch method to parse all the messages:
class EventsController < ApplicationController def perform EventStore.store(params_batch.parsed) end end
Parsing will be automatically performed as well, if you decide to map parameters (or use any Enumerable module method):
class EventsController < ApplicationController def perform EventStore.store(params_batch.map { |param| param[:user] }) end end
Karafka does not parse all the messages at once due to performance reasons. There are cases in which you might be interested only in the last message in a batch. It would be useless on such occasions to parse everything there is.
You can use this feature to prefilter unparsed data based on partition, topic or any other non-data related aspects:
def perform # In this example, we will ignore non-existing users data # without even unparsing their details. # Casting to an array will disable the automatic parsing upon iterating, # so when we decide to fetch user data, we need to use the #retrieve method ids_from_partition_key = params_batch.to_a.map { |param| param[:key] } existing_users_ids = User.where(id: ids_from_partition_key).pluck(:id) params_batch.to_a.each do |param| param[:parsed] #=> false next unless existing_users_ids.include?(param[:key]) # Some heavy parsing happens here param.retrieve! param[:parsed] #=> true User.where(id: param[:key]).update!(param[:user]) end end
Long running persistent controllers
Karafka used to create a single controller instance per each received message. This was one of the reasons why it had a quite big memory fingerprint. From now on (if not disabled by the config persistent
flag), Karafka will create and use a single object for each topic partition up until its shutdown.
This change not only reduces memory and CPU usage, but also allows to do cross-batch aggregations. One of the use-cases could be normalization of the batch insert process, so the DB flushing is performed only when we reach a certain buffer size:
class EventsController < ApplicationController FLUSH_SIZE = 1000 def perform buffer << params_batch.map { |param| param[:event] } if buffer.size >= FLUSH_SIZE data = buffer.shift(FLUSH_SIZE) Event.import(data) end end private def buffer @buffer ||= [] end end
Note: example above is simplified. You probably want to cover flushing buffer also in a case of process shutdown.
Encryption and authentication using SSL and SASL support
Karafka uses ruby-kafka driver to talk with Kafka. Now you can embrace all its encryption and authentication features. All the related configuration options are described here.
Limited consumer groups execution from a single process
One of the biggest downsides of Karafka 0.5 was its lack of ability to do a per consumer group scaling. Each server process was spinning up all the consumer groups from the routing. This was OK for smaller applications, but it was not enough for bigger systems. Karafka 1.0 server allows you to specify which consumer groups you want to run in a given process. This means you can easily scale your infrastructure together with your Kafka traffic.
Given set of consumer groups like this one:
App.consumer_groups.draw do consumer_group :events do # events related topics definitions end consumer_group :users do # users related topics definitions end consumer_group :webhooks do # webhooks related topics definitions end end
can now run all together:
# Equals to bundle exec karafka server --consumer-groups=events users webhooks bundle exec karafka server
in separate processes:
bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/karafka0.pid bundle exec karafka server --consumer-groups=users --daemon --pid=./pids/karafka1.pid bundle exec karafka server --consumer-groups=webhooks --daemon --pid=./pids/karafka2.pid
or in a mixed mode, where some of the processes run multiple groups:
bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/karafka0.pid bundle exec karafka server --consumer-groups=users webhooks --daemon --pid=./pids/karafka1.pid
Multi process management thanks to Capistrano-Karafka
In reference to the previous feature, Capistrano-Karafka has been updated as well. It now supports multi-process, multi and single group process deployment flow:
# Exemplary Capistrano deployment Karafka definitions set :karafka_role, %i[karafka_small karafka_big] set :karafka_small_processes, 1 set :karafka_small_consumer_groups, %w[ group_a ] set :karafka_big_processes, 4 set :karafka_small_consumer_groups, [ 'group_a group_b', 'group_c group_d', 'group_e', 'group_f' ] server 'example-small.com', roles: %i[karafka_small] server 'example-big.com', roles: %i[karafka_big]
Processing backends (Inline and Sidekiq)
Karafka is no longer bound to Sidekiq. There are cases in which Sidekiq can be really helpful when processing messages (reentrancy, thread scaling, etc), however for many other it was just a redundancy (receiving from one queue and pushing back to another). The default processing mode for Karafka 1.0 is an :inline
mode. It means that processing of messages will happen right after they are fetched from Kafka.
If you want to process your Kafka messages automatically in Sidekiq (without having to worry about workers or anything else), please visit the Karafka-Sidekiq-Backend README.
JRuby support
Thanks to few small changes, Karafka can be executed with JRuby 9000.
Incompatibilities
Moving forward means, that from time to time, you need to introduce some incompatibilities. There were some breaking changes, but the upgrading process shouldn't be that hard. We will cover it in a different article soon. Here are the most important incompatibilities you might encounter during the upgrade:
- Default boot file has been renamed from
app.rb
tokarafka.rb
- Removed worker glass as dependency (now and independent gem - if you use it, you need to add it to your gemfile)
kafka.hosts
option renamed tokafka.seed_brokers
- you don't need to provide all the hosts to work with Kafkastart_from_beginning
setting moved into kafka scope (kafka.start_from_beginning
)- Router no longer checks for route uniqueness - now you can define same routes for multiple kafkas and do a lot of crazy stuff, so it's your responsibility to check uniqueness
- Change in the way we identify topics in between Karafka and Sidekiq workers. If you upgrade, please make sure, all the jobs scheduled in Sidekiq are finished before the upgrade.
batch_mode
renamed tobatch_consuming
- Renamed
#params
content key to value to better resemble ruby-kafka internal messages naming convention - Renamed
inline_mode
toinline_processing
to resemble other settings conventions - Renamed
inline_processing
tobackend
- Single controller needs to be used for a single topic consumption
- Renamed
before_enqueue
toafter_received
to better resemble internal logic, since for inline backend, there is no enqueuing. - Due to the level on which topic and controller are related (class level), the dynamic worker selection is no longer available.
- Renamed params
#retrieve
to params#retrieve!
to better reflect how it works. - Sidekiq backend needs to be added as a separate gem (Karafka no longer depends on it)
Wiki updates
We've spent long hours to ensure, that our wiki is complete and consistent. We've added several new pages, including:
- Getting started
- Configuration
- Integrating with Ruby on Rails and other frameworks
- Consuming messages
- Processing messages
- Responders
- Karafka Sidekiq Backend
Other changes
Lower memory usage
We've managed to reduce number of new allocated objects down by around 70%. Karafka no longer creates so many objects for each received message and message batch as it used to. It also depends on less gems and requires much less additional libraries, so the overall memory consumption is significantly lower.
Better settings management between ruby-kafka and karafka
We've reorganized the whole concept of passing settings in betwen Karafka and ruby-kafka to be able to faster adapt if anything changes. The internal API is also much cleaner and easier to understand.
Dry-validation FTW
All internal validations are now powered by dry-validation schemas.
multi_json
In order to support different Ruby implementations, we've decided to use multi_json gem, so anyone can pick the most suitable JSON parser he needs.
Getting started with Karafka
If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:
git clone https://github.com/karafka/karafka-example-app ./example_app
then, just bundle install all the dependencies:
cd ./example_app bundle install
and follow the instructions from the example app Wiki.
September 7, 2017 — 00:33
Wow, cool wiki on Github! Respect for this docs.