Running with Ruby

Tag: karafka framework (page 1 of 4)

Karafka (Ruby + Kafka) framework 1.0.0 Release Notes

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

It’s been over a year, since the last major Karafka framework release (0.5). During that time, we’ve managed to implement plenty of new features and fix so many bugs, that I don’t know where to start…

Today I’m pleased to announce, that we’re ready with the final 1.0 release.

Code quality

The quality of our work has always been important to us. Few months ago we’ve made a transition from polishgeeks-dev-tools to Coditsu. It allowed us to find and fix several new code offenses and to leverage the quality of the code and documentation. Here are some screens on where we were and where we are now:

There are still some things to be fixed and made better, That said, this is the best release we’ve made not only in terms of features but also in terms of quality of the code and the documentation.

For more details about the quality of the whole Karafka ecosystem, feel free to visit our Karafka Coditsu organization page.

Features

There are more and more companies taking advantage of Karafka as their backend async messaging backbone. Many of the new features were either feature requests or pull requests (including some from Shopify and other big players), that cover both performance and functionality issues existing in Karafka. It’s amazing looking into all the use-cases that people cover with this framework.

Batch processing

Believe it or not, but up until now, Karafka didn’t have batch processing functionality. It had batch messages receiving option, but each of the messages had to be processed separately. At the beginning we wanted to imitate the HTTP world, where (most of the time) a single request would equal a single controller instance logic execution.

It wasn’t the best idea ever. Or maybe it was at the time, but we’ve clearly noticed, that it took away a huge part of possibilities that Kafka brings to the table.

Luckily those days are gone! From now on you can not only receive messages in batches (which makes Karafka several times faster), but you can also process them that way. The only thing you need to do is set the batch_processing config option to true.

You can do this either on an app configuration level:

class App < Karafka::App
  setup do |config|
    config.batch_consuming = true
    config.batch_processing = true
    # Other options
  end
end

or per each topic route you define:

App.routes.draw do
  consumer_group :events_consumer do
    batch_consuming true

    topic :events_created do
      controller EventsCreatedController
      backend :inline
      batch_processing true
    end
  end
end

Once you turn this option on, you will have access to a method called #params_batch that will contain all the messages fetched from Kafka in a single batch.

It’s worth pointing out, that a single messages batch always contains messages from the same topic and the same partition.

class EventsController < ApplicationController
  def perform
    # This example uses https://github.com/zdennis/activerecord-import
    Event.import params_batch.map { |param| param[:event] }
  end
end

Keep in mind, that params_batch is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn’t directly flush them into DB.

Note: For more details about processing messages, please visit the Processing messages section of Karafka wiki.

New routing engine and multiple topic consumer groups

Routing engine provides an interface to describe how messages from all the topics should be received and processed.

Karafka routing engine used to be trivial. The only thing you could really do, was defining topics and their options. From now on, there are two modes in which routing API can operate:

  • Karafka 0.6+ consumer group namespaced style (recommended)
  • Karafka 0.5 compatible consumer group per topic style (old style)

With 0.6+ mode, you can define consumer groups subscribed to multiple topics. This will allow you to group topics based on your use-cases and other factors. It also enables overwriting most of the default settings, in case you need to create a per consumer group specific setup (for example to receive data from multiple Kafka clusters).

App.consumer_groups.draw do
  consumer_group :group_name do
    topic :example do
      controller ExampleController
    end

    topic :example2 do
      controller Example2Controller
    end
  end
end

Note: For more details about processing messages, please visit the Routing section of Karafka wiki.

#topic reference on a controller level

There are several changes related to the topic itself. The biggest one, is its assignment to a controller class, not to a controller instance. This may not seem significant, but it is. It means, that you no longer should use same controller for handling multiple topics. You can still use #topic from your controllers instance (no need to do self.class.topic) – it’s just an alias.

The second big change, is the topic owning consumer group that you can reference as well from the topic. This allows you to discover and programmatically access all the routing details you need just by playing with the topic and consumer group objects:

# From the controller instance level
topic.consumer_group.class #=> Karafka::Routing::ConsumerGroup
topic.consumer_group.name #=> 'commit_builds'
topic.name #=> 'commit_builds_scheduled'

# From the console / outside of the controller scope
App.consumer_groups.count #=> 3
App.consumer_groups.first.name #=> 'commit_builds'
App.consumer_groups.first.topics.count #=> 5

#params_batch messages with additional Kafka message details

Each Kafka message you receive, contains now following extra attributes received from Kafka:

  • partition
  • offset
  • key
  • topic

IMHO the most interesting one is the partition key, that can be used when applying ordered changes to any persistent models (key can be used to ensure proper order delivery via Kafka guaranteed partition order feature):

def perform
  params_batch.each do |param|
    User.find(param[:key]).update!(param[:user])
  end
end

#params_batch and #params lazy evaluation

params_batch is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn’t directly flush them into DB. To do so, please use the #parsed params batch method to parse all the messages:

class EventsController < ApplicationController
  def perform
    EventStore.store(params_batch.parsed)
  end
end

Parsing will be automatically performed as well, if you decide to map parameters (or use any Enumerable module method):

class EventsController < ApplicationController
  def perform
    EventStore.store(params_batch.map { |param| param[:user] })
  end
end

Karafka does not parse all the messages at once due to performance reasons. There are cases in which you might be interested only in the last message in a batch. It would be useless on such occasions to parse everything there is.

You can use this feature to prefilter unparsed data based on partition, topic or any other non-data related aspects:

def perform
  # In this example, we will ignore non-existing users data
  # without even unparsing their details.
  # Casting to an array will disable the automatic parsing upon iterating,
  # so when we decide to fetch user data, we need to use the #retrieve method
  ids_from_partition_key = params_batch.to_a.map { |param| param[:key] }
  existing_users_ids = User.where(id: ids_from_partition_key).pluck(:id)

  params_batch.to_a.each do |param|
    param[:parsed] #=> false
    next unless existing_users_ids.include?(param[:key])
    # Some heavy parsing happens here
    param.retrieve!
    param[:parsed] #=> true
    User.where(id: param[:key]).update!(param[:user])
  end
end

Long running persistent controllers

Karafka used to create a single controller instance per each received message. This was one of the reasons why it had a quite big memory fingerprint. From now on (if not disabled by the config persistent flag), Karafka will create and use a single object for each topic partition up until its shutdown.

This change not only reduces memory and CPU usage, but also allows to do cross-batch aggregations. One of the use-cases could be normalization of the batch insert process, so the DB flushing is performed only when we reach a certain buffer size:

class EventsController < ApplicationController
  FLUSH_SIZE = 1000

  def perform
    buffer << params_batch.map { |param| param[:event] }
    if buffer.size >= FLUSH_SIZE
      data = buffer.shift(FLUSH_SIZE)
      Event.import(data)
    end
  end

  private

  def buffer
    @buffer ||= []
  end
end

Note: example above is simplified. You probably want to cover flushing buffer also in a case of process shutdown.

Encryption and authentication using SSL and SASL support

Karafka uses ruby-kafka driver to talk with Kafka. Now you can embrace all its encryption and authentication features. All the related configuration options are described here.

Limited consumer groups execution from a single process

One of the biggest downsides of Karafka 0.5 was its lack of ability to do a per consumer group scaling. Each server process was spinning up all the consumer groups from the routing. This was OK for smaller applications, but it was not enough for bigger systems. Karafka 1.0 server allows you to specify which consumer groups you want to run in a given process. This means you can easily scale your infrastructure together with your Kafka traffic.

Given set of consumer groups like this one:

App.consumer_groups.draw do
  consumer_group :events do
    # events related topics definitions
  end

  consumer_group :users do
    # users related topics definitions
  end

  consumer_group :webhooks do
    # webhooks related topics definitions
  end
end

can now run all together:

# Equals to bundle exec karafka server --consumer-groups=events users webhooks
bundle exec karafka server

in separate processes:

bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/karafka0.pid
bundle exec karafka server --consumer-groups=users --daemon --pid=./pids/karafka1.pid
bundle exec karafka server --consumer-groups=webhooks --daemon --pid=./pids/karafka2.pid

or in a mixed mode, where some of the processes run multiple groups:

bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/karafka0.pid
bundle exec karafka server --consumer-groups=users webhooks --daemon --pid=./pids/karafka1.pid

Multi process management thanks to Capistrano-Karafka

In reference to the previous feature, Capistrano-Karafka has been updated as well. It now supports multi-process, multi and single group process deployment flow:

# Exemplary Capistrano deployment Karafka definitions
set :karafka_role, %i[karafka_small karafka_big]

set :karafka_small_processes, 1
set :karafka_small_consumer_groups, %w[
  group_a
]

set :karafka_big_processes, 4
set :karafka_small_consumer_groups, [
  'group_a group_b',
  'group_c group_d',
  'group_e',
  'group_f'
]

server 'example-small.com', roles: %i[karafka_small]
server 'example-big.com', roles: %i[karafka_big]

Processing backends (Inline and Sidekiq)

Karafka is no longer bound to Sidekiq. There are cases in which Sidekiq can be really helpful when processing messages (reentrancy, thread scaling, etc), however for many other it was just a redundancy (receiving from one queue and pushing back to another). The default processing mode for Karafka 1.0 is an :inline mode. It means that processing of messages will happen right after they are fetched from Kafka.

If you want to process your Kafka messages automatically in Sidekiq (without having to worry about workers or anything else), please visit the Karafka-Sidekiq-Backend README.

JRuby support

Thanks to few small changes, Karafka can be executed with JRuby 9000.

Incompatibilities

Moving forward means, that from time to time, you need to introduce some incompatibilities. There were some breaking changes, but the upgrading process shouldn’t be that hard. We will cover it in a different article soon. Here are the most important incompatibilities you might encounter during the upgrade:

  • Default boot file has been renamed from app.rb to karafka.rb
  • Removed worker glass as dependency (now and independent gem – if you use it, you need to add it to your gemfile)
  • kafka.hosts option renamed to kafka.seed_brokers – you don’t need to provide all the hosts to work with Kafka
  • start_from_beginning setting moved into kafka scope (kafka.start_from_beginning)
  • Router no longer checks for route uniqueness – now you can define same routes for multiple kafkas and do a lot of crazy stuff, so it’s your responsibility to check uniqueness
  • Change in the way we identify topics in between Karafka and Sidekiq workers. If you upgrade, please make sure, all the jobs scheduled in Sidekiq are finished before the upgrade.
  • batch_mode renamed to batch_consuming
  • Renamed #params content key to value to better resemble ruby-kafka internal messages naming convention
  • Renamed inline_mode to inline_processing to resemble other settings conventions
  • Renamed inline_processing to backend
  • Single controller needs to be used for a single topic consumption
  • Renamed before_enqueue to after_received to better resemble internal logic, since for inline backend, there is no enqueuing.
  • Due to the level on which topic and controller are related (class level), the dynamic worker selection is no longer available.
  • Renamed params #retrieve to params #retrieve! to better reflect how it works.
  • Sidekiq backend needs to be added as a separate gem (Karafka no longer depends on it)

Wiki updates

We’ve spent long hours to ensure, that our wiki is complete and consistent. We’ve added several new pages, including:

Other changes

Lower memory usage

We’ve managed to reduce number of new allocated objects down by around 70%. Karafka no longer creates so many objects for each received message and message batch as it used to. It also depends on less gems and requires much less additional libraries, so the overall memory consumption is significantly lower.

Better settings management between ruby-kafka and karafka

We’ve reorganized the whole concept of passing settings in betwen Karafka and ruby-kafka to be able to faster adapt if anything changes. The internal API is also much cleaner and easier to understand.

Dry-validation FTW

All internal validations are now powered by dry-validation schemas.

multi_json

In order to support different Ruby implementations, we’ve decided to use multi_json gem, so anyone can pick the most suitable JSON parser he needs.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/karafka-example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Enforcing unique jobs in Karafka and Sidekiq for single resources

Note: For the case of simplicity I skipped some of the corner cases to make this article less complicated and more understandable.

When working with multi threaded and multi process systems that consume messages in parallel, you need to be able to enforce some limitations on the processing order.

Most of the time, in well designed systems, things should be based on atomic (in terms of not being dependent on any other worker job) jobs that can run whenever they need to. However, in some cases you need to make sure that jobs related to a given resource are not executed at the same time. This can be achieved with Karafka and Sidekiq in 3 ways:

  • Standalone Karafka mode – single Karafka process will consume messages one by one and process them inline (without using Sidekiq at all). Since Kafka gives us a per partition ordering, same applies to standalone Karafka mode.
  • Karafka default mode + single Sidekiq worker with a single thread. Since single threaded Sidekiq process uses FIFO, this can be achieved with this combination.
  • Karafka + Sidekiq + SidekiqUniqueJobs gem. This combination allows us to build complex multi threaded and multi process workers that will ensure that a single resource is being modified by maximum 1 worker at the same time (while executing uniqueness).

In this article I will focus on the last option, as it is the best in terms of performance and flexibility.

A bit about state machines

Having a single resource on which many actions can be performed is always a problem. To ensure (on an object level) that we won’t process things in parallel we can use state machines (aasm is a great gem for that) to achieve locking. However, if we only do that, we will have to implement a bit more logic when considering a possibility of many jobs doing same stuff at the same time:

  • Detecting and waiting if someone else is already using (whatever that means) a given resource.
  • Retrying in case of state machine failure (invalid transitions).

But hey! We use Sidekiq! It means that retrying is already built in and it will restart the job later on. It also means, that at some point, the state machine lock will be removed and we will be able to finish all the jobs.

That is true, however we should not rely on that because a case like that shouldn’t be considered an exception. Instead we should prevent situations like that from happening at all.

Problem definition

Let’s say we have a RemoteResource representation that needs to be refreshed periodically (every 5 seconds) and we need to make sure that we never overwrite it’s content with older data.

class RemoteResource < ActiveRecord::Base
  include AASM aasm do
    state :initial, initial: true
    state :running

    event :run do
      transitions :from => :initial, :to => :running
    end

    event :finish do
      transitions :from => :running, :to => :initial
    end
  end

  def refresh!
    # Some external API calls and other business stuff
    # @note I know that this shoould be in a service but
    # again, lets keep things simple
    update!(content: RemoteData.fetch(id))
  end
end

Happy, single threaded execution example:

resource = RemoteResource.find(params[:id])
resource.run!
resource.refresh!
resource.finish!

This code will run without problems as long as:

  • Our processing (including external API call) does not take more time than 5 seconds
  • Sidekiq workers are up and running (at least matching the speed of enqueuing)

But lets kill all the workers for 1 minute and run them again. We end up with a queue full of messages that will be processed at the same time. And then we might have jobs for the same object enqueued one after another, which means that they will be executed at the same time:

untitled-diagram
In a case like that, both workers will overlap processing the same resource. It means that we might end up with outdated data (network delay for an older resource) or a state machine exception. In more complex cases it could mean corrupted data.

Since Karafka passes messages into Sidekiq, this problem can occur there as well. The worse part is that it can get much worse when we perform many types of tasks on a single resource and they should never overlap.

Solution: SidekiqUniqueJobs

SidekiqUniqueJobs is a great gem that solves exactly this issue. It makes sure (among other options) that for a given set of arguments, only a single job can run at the same time. In our case  unique arguments would be:

  • Worker name
  • RemoteResource#id

This is enough to competeour task! Jobs for different resources will be processed at the same time, but no more parallel processing on the same RemoteResource.

untitled-diagram1

Implementation in Karafka application

To use this gem with Karafka application, you need to build a uniqueness scope before a job is enqueued. This can be done inside the #before_enqueue block:

class KarafkaController < Karafka::BaseController
  before_enqueue do
    params[:uniqueness_scope] = params.dig(:remote_resource, :id)
  end
end

you also need to configure your Karafka base worker to set uniqueness key based on this param value:

class ApplicationWorker < Karafka::BaseWorker
  sidekiq_options unique: :while_executing

  def self.unique_args(args)
    [
      args.first, # This is this worker name
      args.last['uniqueness_scope'] || SecureRandom.uuid
    ]
  end
end

Note, that we use SecureRandom.uuid as a fallback for jobs for which we don’t want to make uniqueness execution. Random uuid will ensure that all the jobs without a uniqueness_scope can perform at the same time. If we would leave nil, it would be treated as any other value so no jobs would run in parallel.

Olderposts

Copyright © 2017 Running with Ruby

Theme by Anders NorenUp ↑