Running with Ruby

Tag: apache kafka (page 1 of 3)

Karafka (Ruby + Kafka) framework 1.0.0 Release Notes

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

It’s been over a year, since the last major Karafka framework release (0.5). During that time, we’ve managed to implement plenty of new features and fix so many bugs, that I don’t know where to start…

Today I’m pleased to announce, that we’re ready with the final 1.0 release.

Code quality

The quality of our work has always been important to us. Few months ago we’ve made a transition from polishgeeks-dev-tools to Coditsu. It allowed us to find and fix several new code offenses and to leverage the quality of the code and documentation. Here are some screens on where we were and where we are now:

There are still some things to be fixed and made better, That said, this is the best release we’ve made not only in terms of features but also in terms of quality of the code and the documentation.

For more details about the quality of the whole Karafka ecosystem, feel free to visit our Karafka Coditsu organization page.


There are more and more companies taking advantage of Karafka as their backend async messaging backbone. Many of the new features were either feature requests or pull requests (including some from Shopify and other big players), that cover both performance and functionality issues existing in Karafka. It’s amazing looking into all the use-cases that people cover with this framework.

Batch processing

Believe it or not, but up until now, Karafka didn’t have batch processing functionality. It had batch messages receiving option, but each of the messages had to be processed separately. At the beginning we wanted to imitate the HTTP world, where (most of the time) a single request would equal a single controller instance logic execution.

It wasn’t the best idea ever. Or maybe it was at the time, but we’ve clearly noticed, that it took away a huge part of possibilities that Kafka brings to the table.

Luckily those days are gone! From now on you can not only receive messages in batches (which makes Karafka several times faster), but you can also process them that way. The only thing you need to do is set the batch_processing config option to true.

You can do this either on an app configuration level:

class App < Karafka::App
  setup do |config|
    config.batch_consuming = true
    config.batch_processing = true
    # Other options

or per each topic route you define:

App.routes.draw do
  consumer_group :events_consumer do
    batch_consuming true

    topic :events_created do
      controller EventsCreatedController
      backend :inline
      batch_processing true

Once you turn this option on, you will have access to a method called #params_batch that will contain all the messages fetched from Kafka in a single batch.

It’s worth pointing out, that a single messages batch always contains messages from the same topic and the same partition.

class EventsController < ApplicationController
  def perform
    # This example uses
    Event.import { |param| param[:event] }

Keep in mind, that params_batch is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn’t directly flush them into DB.

Note: For more details about processing messages, please visit the Processing messages section of Karafka wiki.

New routing engine and multiple topic consumer groups

Routing engine provides an interface to describe how messages from all the topics should be received and processed.

Karafka routing engine used to be trivial. The only thing you could really do, was defining topics and their options. From now on, there are two modes in which routing API can operate:

  • Karafka 0.6+ consumer group namespaced style (recommended)
  • Karafka 0.5 compatible consumer group per topic style (old style)

With 0.6+ mode, you can define consumer groups subscribed to multiple topics. This will allow you to group topics based on your use-cases and other factors. It also enables overwriting most of the default settings, in case you need to create a per consumer group specific setup (for example to receive data from multiple Kafka clusters).

App.consumer_groups.draw do
  consumer_group :group_name do
    topic :example do
      controller ExampleController

    topic :example2 do
      controller Example2Controller

Note: For more details about processing messages, please visit the Routing section of Karafka wiki.

#topic reference on a controller level

There are several changes related to the topic itself. The biggest one, is its assignment to a controller class, not to a controller instance. This may not seem significant, but it is. It means, that you no longer should use same controller for handling multiple topics. You can still use #topic from your controllers instance (no need to do self.class.topic) – it’s just an alias.

The second big change, is the topic owning consumer group that you can reference as well from the topic. This allows you to discover and programmatically access all the routing details you need just by playing with the topic and consumer group objects:

# From the controller instance level
topic.consumer_group.class #=> Karafka::Routing::ConsumerGroup #=> 'commit_builds' #=> 'commit_builds_scheduled'

# From the console / outside of the controller scope
App.consumer_groups.count #=> 3 #=> 'commit_builds'
App.consumer_groups.first.topics.count #=> 5

#params_batch messages with additional Kafka message details

Each Kafka message you receive, contains now following extra attributes received from Kafka:

  • partition
  • offset
  • key
  • topic

IMHO the most interesting one is the partition key, that can be used when applying ordered changes to any persistent models (key can be used to ensure proper order delivery via Kafka guaranteed partition order feature):

def perform
  params_batch.each do |param|

#params_batch and #params lazy evaluation

params_batch is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn’t directly flush them into DB. To do so, please use the #parsed params batch method to parse all the messages:

class EventsController < ApplicationController
  def perform

Parsing will be automatically performed as well, if you decide to map parameters (or use any Enumerable module method):

class EventsController < ApplicationController
  def perform { |param| param[:user] })

Karafka does not parse all the messages at once due to performance reasons. There are cases in which you might be interested only in the last message in a batch. It would be useless on such occasions to parse everything there is.

You can use this feature to prefilter unparsed data based on partition, topic or any other non-data related aspects:

def perform
  # In this example, we will ignore non-existing users data
  # without even unparsing their details.
  # Casting to an array will disable the automatic parsing upon iterating,
  # so when we decide to fetch user data, we need to use the #retrieve method
  ids_from_partition_key = { |param| param[:key] }
  existing_users_ids = User.where(id: ids_from_partition_key).pluck(:id)

  params_batch.to_a.each do |param|
    param[:parsed] #=> false
    next unless existing_users_ids.include?(param[:key])
    # Some heavy parsing happens here
    param[:parsed] #=> true
    User.where(id: param[:key]).update!(param[:user])

Long running persistent controllers

Karafka used to create a single controller instance per each received message. This was one of the reasons why it had a quite big memory fingerprint. From now on (if not disabled by the config persistent flag), Karafka will create and use a single object for each topic partition up until its shutdown.

This change not only reduces memory and CPU usage, but also allows to do cross-batch aggregations. One of the use-cases could be normalization of the batch insert process, so the DB flushing is performed only when we reach a certain buffer size:

class EventsController < ApplicationController
  FLUSH_SIZE = 1000

  def perform
    buffer << { |param| param[:event] }
    if buffer.size >= FLUSH_SIZE
      data = buffer.shift(FLUSH_SIZE)


  def buffer
    @buffer ||= []

Note: example above is simplified. You probably want to cover flushing buffer also in a case of process shutdown.

Encryption and authentication using SSL and SASL support

Karafka uses ruby-kafka driver to talk with Kafka. Now you can embrace all its encryption and authentication features. All the related configuration options are described here.

Limited consumer groups execution from a single process

One of the biggest downsides of Karafka 0.5 was its lack of ability to do a per consumer group scaling. Each server process was spinning up all the consumer groups from the routing. This was OK for smaller applications, but it was not enough for bigger systems. Karafka 1.0 server allows you to specify which consumer groups you want to run in a given process. This means you can easily scale your infrastructure together with your Kafka traffic.

Given set of consumer groups like this one:

App.consumer_groups.draw do
  consumer_group :events do
    # events related topics definitions

  consumer_group :users do
    # users related topics definitions

  consumer_group :webhooks do
    # webhooks related topics definitions

can now run all together:

# Equals to bundle exec karafka server --consumer-groups=events users webhooks
bundle exec karafka server

in separate processes:

bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/
bundle exec karafka server --consumer-groups=users --daemon --pid=./pids/
bundle exec karafka server --consumer-groups=webhooks --daemon --pid=./pids/

or in a mixed mode, where some of the processes run multiple groups:

bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/
bundle exec karafka server --consumer-groups=users webhooks --daemon --pid=./pids/

Multi process management thanks to Capistrano-Karafka

In reference to the previous feature, Capistrano-Karafka has been updated as well. It now supports multi-process, multi and single group process deployment flow:

# Exemplary Capistrano deployment Karafka definitions
set :karafka_role, %i[karafka_small karafka_big]

set :karafka_small_processes, 1
set :karafka_small_consumer_groups, %w[

set :karafka_big_processes, 4
set :karafka_small_consumer_groups, [
  'group_a group_b',
  'group_c group_d',

server '', roles: %i[karafka_small]
server '', roles: %i[karafka_big]

Processing backends (Inline and Sidekiq)

Karafka is no longer bound to Sidekiq. There are cases in which Sidekiq can be really helpful when processing messages (reentrancy, thread scaling, etc), however for many other it was just a redundancy (receiving from one queue and pushing back to another). The default processing mode for Karafka 1.0 is an :inline mode. It means that processing of messages will happen right after they are fetched from Kafka.

If you want to process your Kafka messages automatically in Sidekiq (without having to worry about workers or anything else), please visit the Karafka-Sidekiq-Backend README.

JRuby support

Thanks to few small changes, Karafka can be executed with JRuby 9000.


Moving forward means, that from time to time, you need to introduce some incompatibilities. There were some breaking changes, but the upgrading process shouldn’t be that hard. We will cover it in a different article soon. Here are the most important incompatibilities you might encounter during the upgrade:

  • Default boot file has been renamed from app.rb to karafka.rb
  • Removed worker glass as dependency (now and independent gem – if you use it, you need to add it to your gemfile)
  • kafka.hosts option renamed to kafka.seed_brokers – you don’t need to provide all the hosts to work with Kafka
  • start_from_beginning setting moved into kafka scope (kafka.start_from_beginning)
  • Router no longer checks for route uniqueness – now you can define same routes for multiple kafkas and do a lot of crazy stuff, so it’s your responsibility to check uniqueness
  • Change in the way we identify topics in between Karafka and Sidekiq workers. If you upgrade, please make sure, all the jobs scheduled in Sidekiq are finished before the upgrade.
  • batch_mode renamed to batch_consuming
  • Renamed #params content key to value to better resemble ruby-kafka internal messages naming convention
  • Renamed inline_mode to inline_processing to resemble other settings conventions
  • Renamed inline_processing to backend
  • Single controller needs to be used for a single topic consumption
  • Renamed before_enqueue to after_received to better resemble internal logic, since for inline backend, there is no enqueuing.
  • Due to the level on which topic and controller are related (class level), the dynamic worker selection is no longer available.
  • Renamed params #retrieve to params #retrieve! to better reflect how it works.
  • Sidekiq backend needs to be added as a separate gem (Karafka no longer depends on it)

Wiki updates

We’ve spent long hours to ensure, that our wiki is complete and consistent. We’ve added several new pages, including:

Other changes

Lower memory usage

We’ve managed to reduce number of new allocated objects down by around 70%. Karafka no longer creates so many objects for each received message and message batch as it used to. It also depends on less gems and requires much less additional libraries, so the overall memory consumption is significantly lower.

Better settings management between ruby-kafka and karafka

We’ve reorganized the whole concept of passing settings in betwen Karafka and ruby-kafka to be able to faster adapt if anything changes. The internal API is also much cleaner and easier to understand.

Dry-validation FTW

All internal validations are now powered by dry-validation schemas.


In order to support different Ruby implementations, we’ve decided to use multi_json gem, so anyone can pick the most suitable JSON parser he needs.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Karafka (Ruby + Kafka framework) 0.5.0 release details

I’m proud to announce that we’ve released a new shiny version of Karafka: Framework used to simplify Apache Kafka based Ruby applications development.

In this article I will cover all the important changes and new features that you will be able to use.

Karafka? But what is it?

Karafka provides a higher-level abstraction than raw Kafka Ruby driver. Instead of focusing on single topic consumption, it provides developers with a set of tools that are dedicated for building multi-topic applications similarly to how Rails applications are being built.

Important changes

Here are the most important changes that were made (whole list available in the changelog):

  • Responders for nicer pipelining and better responses flow description and control
  • Automatic Capistrano integration
  • karafka flow CLI command for printing the application flow
  • Moved from Poseidon into Ruby-Kafka
  • Zookeeper (ZK) no longer as a dependency
  • Automatic thread management (no need for tunning) – each topic is a separate actor/thread
  • Manual consuming is no longer available (no m ore karafka consume command)
  • We’re finally on RubyGems
  • karafka topics no longer available
  • Ruby 2.2.* support dropped

Responders concept for nicer pipelining and better responses flow description and control

Change that we’re most proud of. A brand new, shiny concept that allows you to design applications that not only receive messages but can respond to them after performing your business logic!

Here’s a quick example (before I get into details) on how this concept looks:

class UsersCreateController < KarafkaController
    def perform
      respond_with User.create!(params[:user])

class UsersCreateResponder < ApplicationResponder
  topic :users_created, required: true
  topic :users_premium_created, optional: true

  def process(user)
    respond_to :users_created, user
    respond_to :users_premium_created, user if user.premium?

It’s no longer HTTP. It’s no longer a single response

Most web programmers are used to a nice and predictable flow: single request, single response. It is easy to test, it works and in many cases it is enough. Unfortunately the more complex IT software gets, the more often it is not enough. And that’s exactly why we’ve implemented the Responders concept.

Now you will have a single entry-point (topic’s message) but you can generate response on as many topics as you want in reaction to what you did with the incoming data.

Well I could do that already with Karafka + Producer client

That is true. However without clear boundaries on what should happen where, less experienced programmers will start developing less stable, more complicated apps. When you do that manually, there’s also no validation layer that will ensure that you responses were sent to where you wanted them in the first place.

Where can I use it?

Karafka responders allow you to build more reliable Ruby+Kafka SOA systems, that not only receive messages but also generate messages based on results of the app business logic. They work great in distributed systems that are built using SOA. Having Kafka as a message bus gives you a unique opportunity to treat your whole ecosystem like a huge pipeline. You can accept messages and preprocess them in one app. Then send the results into many other systems. The best thing is that apart from the topic (or topics) to which you send your results, your preprocesing app does not need to know anything about other applications.


Approach like that is great when you have event based systems that need to take many actions upon a single event. Let’s assume for a second that you have a huge monolithic application that upon a new user registration:

  • sends him an email
  • connects to his server (via SSH or anything else)
  • downloads some informations
  • analyzes received data
  • sends a second email with the results
  • sends a text-message to someone else if the analysis results are bad (whatever that might mean)

Wouldn’t it be better to split such an app into couplse sub-apps that would be parts of such a flow?


Here’s an example from one of the apps that I work with, with such a flow (input => output) generated using karafka flow CLI command:

repositories_created =>
  - sources_created:      (always, exactly once)
repositories_deleted =>
  - sources_deleted:      (always, exactly once)
repositories_updated =>
  - sources_updated:      (always, exactly once)
sources_refresh =>
  - sources_refreshed:    (conditionally, exactly once)
  - sources_disconnected: (conditionally, exactly once)

Responders API

The only thing that happens outside of a responder object, is it’s controller invocation using #respond_with method. It accepts any number of arguments. Input of this method is forwarded to a proper responder.

class UsersCreateController < KarafkaController
  def perform
    respond_with User.create!(params[:user])
Registering responder

Karafka needs to be aware of which responder to use for a given topic. This can be achieved in two ways:

Registering responder for a given route:

class App < Karafka::App
  routes.draw do
    topic :repositories_created do
      controller Repositories::CreatedController
      responder Repositories::CreatedResponder

or by naming responder class as your controller class but with “Responder” postfix:

UsersCreatedController => UsersCreatedResponder
UserActionsController => UserActionsResponder

If you follow this naming convention, you won’t have to register responders at all. They will be assigned when a proper controller is being used.

Registering topic

Each topic that is going be to used needs to be register. It happens when responder class is defined using .topic method:

class RefreshResponder < ApplicationResponder
  topic :sources_refreshed, required: true # true by default
  topic :sources_disconnected, required: false

.topic method accepts a second argument that allows to set two flages:

  • required – Should we raise an error when a topic was not used (if required)
  • multiple_usage – Should we raise an error when during a single response flow we sent more than one message to a given topic
Responding on topic

Once you register topics you want to work with, there’s only one more thing to do: implement the #perform method in which you implement your responding logic. This method should accept exactly the same number of arguments as passed during the #respond_with invokation:

class CreatedResponder < ApplicationResponder
  topic :sources_created

  def respond(source)
    respond_to :sources_created, source

In this method (as seen above) you can respond to a single registered topic using the #respond_to method. Note that if you pass something else than a string into it, Karafka will try to run a #to_json method on passed object. It means, that if you want to limit what is being send, you need to do this by yourself:

def respond(source)
  respond_to :sources_created, source.to_json(only: %i( id status ))

For topics defined with multiple_usage flag set to true, you can invoke the #respond_to method multiple times:

class RegisteredTopicsResponder < ApplicationResponder
  topic :new_topics, multiple_usage: true

  def respond(topics)
    topics.each do |topic|
      respond_to :new_topics, topic

Karafka will raise an error when you’ll:

  • Try to use unregistered topic
  • Try to use multiple times topic that was not registered with a multiple_usage flag
  • Forget to use topic registered as a required one

That way you can control your flow much better.

Automatic Capistrano integration

Second most important change from a programmer perspective! Karafka has now a new and shiny Capistrano out-of-the-box integration. In order to make it work, just drop this line into your Capfile:

require 'karafka/capistrano'

Following Capistrano tasks are available:

cap karafka:restart                # Restart Karafka
cap karafka:start                  # Start Karafka
cap karafka:status                 # Status Karafka
cap karafka:stop                   # Stop Karafka

Hooks are added by default, so you really don’t need to do anything.

If you want to personalize the flow, you can always disable default hooks by setting karafka_default_hooks in Capistrano to false:

set :karafka_default_hooks, false

Here are all the Capistrano configuration options with their defaults:

set :karafka_default_hooks, -> { true }
set :karafka_env, -> { fetch(:karafka_env, fetch(:environment)) }
set :karafka_pid, -> { File.join(shared_path, 'tmp', 'pids', '') }

New CLI command for printing the application flow

New low-level Kafka driver – Ruby-Kafka

Poseidon that Karafka used is no longer maintained. It also lacks many features that were introduced in Kafka 0.9, so we had to replace it. With what? Obviously with new and shiny Ruby-Kafka gem. What does it mean for you as a Karafka user? Many things

  • No more Kafka 0.8 support
  • Better balancing for topics and partitions
  • Support of Zendesk :-)
  • Better performance
  • Different thread management setup
  • No need for ZK as a dependency
  • Automatic cluster discovery from a single Kafka node

It’s worth pointing out, that the GIL in MRI is removed for IO operations, so by using separate threads we can be truly parallel.

Bye bye Zookeeper

Zookeeper is no longer a Karafka dependency. It has few drawbacks, but on the other hand it allowed us to remove ZK gem and all our auto-discovery login. Things work now way better.

Threads? I don’t want to care about that!

And you no longer have to. We’ve redesigned thread management completely and now a single connection has a long-running thread in which we constantly listen to a given topic. It means that there is no delay on receiving messages from multiple topics. You just don’t have to worry about that anymore.


Will it work for bigger apps? Well we’re currently running on production one that listens almost 100 topics and it works great!

No more manual consumption

Supporting manual consumption that could be incorporated into a rake task or a Sidekiq worker is no longer in Karafka. We’ve decided that it breaks the one of the concepts that is behind Karafka which is consuming messages as soon as possible.

It also means that following CLI command is no longer available:

bundle exec karafka consume

Hello RubyGems!

Some of you noticed, that Karafka is not being released on RubyGems. That is not true anymore. Karafka 0.5.0 is the first release also sent to RubyGems. Now you can do both:

gem install karafka

and in your Gemfile you can finally:

# replace this
# gem 'karafka', github: 'karafka/karafka'
# with
gem 'karafka'

Goodbye karafka topics command!

We no longer use Zookeeper, so we’re no longer able to list all the topics using it. That means that:

bundle exec karafka topics

Is removed and no longer available.

Goodbye Ruby 2.2.*

Karafka is using some Ruby 2.3 syntactic sugar, which means that you won’t be able to install and use Karafka if you run Ruby 2.2 or older.


It’s over a year, since the first Karafka commit and almost a year since the first working release and I can say that this release is one of the biggest we had. Kafka is getting more and more popular and hopefully Karafka will follow the same path :)


Copyright © 2017 Running with Ruby

Theme by Anders NorenUp ↑