Running with Ruby

Category: Rails (page 1 of 74)

Kafka on Rails: Using Kafka with Ruby on Rails – Part 1 – Kafka basics and its advantages

Introduction

In this series of articles, I will try to provide you with an explanation on why you should invest your time in learning Kafka and the Karafka framework and how it can reshape the way you design and develop your Ruby applications. I will also try to answer some of the most common questions regarding those two and give you some real usage examples on how you can benefit fast from adding them to your technological stack.

What is Kafka?

Let me quote Wiki on that one:

Apache Kafka is an open-source stream processing platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds.

Now let’s translate it into some general concepts (copied from here):

  1. It lets you publish and subscribe to streams of records. In this respect, it is similar to a message queue or enterprise messaging system.
  2. It lets you store streams of records in a fault-tolerant way.
  3. It lets you process streams of records as they occur.
  4. It lets you build real-time data pipeline based applications that reliably get data between systems and/or applications.
  5. It lets you build real-time streaming applications that transform and react to a stream of data and/or events.
  6. It allows you to simplify Domain Driver Design implementation within both new and existing applications and allows you to do this more technology agnostic.

Why should I be interested in it?

Because it allows you to expand. And I don’t only mean that you will get much better performance with it and that you will be able to process more and faster.

What I really mean, is that once you understand concepts behind it, you will get a whole new set of possibilities to work with your data. You will expand your horizons and re-shift the way you design your code.

Systems that we build are data-driven and by having more ways of working with it, we get a totally new set of tools and solutions which we can use to make our work better and more efficient.

I keep saying, that the Ruby (and Rails in particular) community lacks architects and good architecture for post-MVP systems. One of the reasons why it is the way it is, is because we’re to bound to the Request-Response way of thinking. Once you learn, that things can be done in a different way, it will impact your way of working with any technology you use, including Ruby on Rails.

Basic Kafka terminology

There are many general Kafka introduction articles, including the official one. Here, I will describe the most important parts of Kafka ecosystem, so you can start working with it as fast as possible.

Note: the description mentioned below might not be 100% accurate, but it should be enough for you to grasp the basics and keep you going.

Note: You can find more details about Kafka in a great Kafka in a Nutshell article.

General publish-subscribe messaging system concept

A messaging system lets you send messages between processes, applications, and servers. Applications should be able to connect to a system like that and transfer messages both ways.

Note: Publisher (one that sends a message) can be a receiver / subscriber at the same time.

Illustration are taken from here.

Kafka brokers

Kafka is a distributed system that runs in a cluster. Each node in the cluster is called a Kafka broker. Broker is a single Kafka process that operates in a cluster.

Kafka topics with partitions

Kafka topic is just a named stream of records. It is a bit similar to Sidekiq or RabbiMQ queue concept. In general, it is a namespace where you are going to store messages that are similar to each other in terms of your business logic.

Everything is organized around topics and most Kafka guarantees are either for a topic or a topic partition. You send and receive messages from topics. Topics in Kafka are always multi-subscriber in nature; that is, a topic can have zero, one, or many consumers that subscribe to the data written to it.

Each Kafka topic is always divided in partitions. Even if you have a single partition, it is still there. Each partition is an ordered, immutable sequence of records that is continually appended to a structured commit log. The records in the partitions are each assigned a sequential id number called the offset.

You can fetch data from multiple partitions with a single consumer, but you need to be aware that their guaranteed delivery order will be maintained within data set from a single partition. It means, that you should not rely on a multi-partition message order within your business logic.

Kafka producers

Kafka producer is an application or a process that sends messages to Kafka.

Kafka consumers and consumer groups

Kafka consumer is an application that reads messages from Kafka.

Consumer can start reading messages from any offset. It means, that you can build systems that will start from the beginning of a topic and replay all the events/messages that Kafka contains or that will start from the current position and only work with new messages that are coming in.

Most of the time, for the first consumer run, you will pick one of those and later on you will always consume from the last offset you worked with before shutting down the consumer, but it is still good to know, that you can always start from any offset you want. This allows consumers to join the cluster at any point in time.

Consumers can be organized in groups. Consumer group includes consumers that subscribe to the same topics. Each consumer in a group will be assigned by Kafka with a set of partitions to work with. This approach allows you to greatly scale as you can increase number of partitions and spin up more consumers within the same consumer group. Kafka guarantees that a message is only read by a single consumer in the group.

You can have more consumers than partitions, but they won’t actively participate in the consumption process. They will start performing work in the case of crashes or other failures of other consumers.

It’s worth pointing out, that Kafka never pushes messages to the consumers on its own. It’s the consumer that asks for messages when it is ready to handle them. This approach is super flexible, as it allows you to temporarily shut down the consumer and after it is back, it will catch up with all the messages that were not yet processed. A really great feature for SOA-based microservices that won’t loose any data. In the worst case scenario, they will just process them a bit later.

What Kafka can do for me and my Ruby on Rails applications?

Note: We will explore all those benefits in details in next parts of this series. Here’s just a quick summary.

A lot. And it really depends on your perspective and your role in the organization. Having Kafka as your messages backbone for Ruby and Rails systems will bring you benefits in many places.

Performance

Most of the Ruby on Rails systems are developed with objects in mind. This is true for both the client end-to-end requests as well as for the Sidekiq background jobs.

Having to refresh or recalculate some things in the system upon a change that is frequent during spikes that occur from time to time? Redesigning this part of the system and being able to fetch messages in batches can lower the need of constant recalculation significantly.

The Kafka-based systems also scale really, really well and due to the multi-consumer subscription model, you can optimize and scale separate parts of the system independently.

Architecture

This is by far the biggest advantage you will get in your Ruby and Rails systems when you add Kafka to them. You will be able to design, build and test independent components that can do things outside of a typical Rails “HTTP like” processing scope.

You won’t have to worry about (almost) anything else except your bounded context and your business domain. Due to the way Kafka works, sometimes you will be even forced to use tools and solutions that aren’t from the “Rails way”.

Have you ever been able to build a proof of concept application that could hook up in real time to staging or production without introducing side-effects? Were you able to run it from your local machine and see how things work? With Kafka, it can be super easy to achieve that.

Note: Don’t get me wrong, it’s not Kafka itself in your stack, that will auto-magically change everything. It’s you having it and understanding what you can achieve with it who will trigger and lead the change. Kafka will just allow you to do those things easily and fast.

Deployment process

Being able to re-consume and re-process messages allows you to shutdown certain parts of the system without affecting others. Since the Kafka messages are not being pushed, they don’t disappear, if not consumed immediately. With a bit of good architecture, you can deploy, perform maintenance and do other things while the system is running without users knowing about that.

Development performance

The bigger system gets, the more often developers step on each other’s toes. Development costs and developers frustration will grow exponentially when they:

  • change the same things simultaneously,
  • have to remember about edge cases out of our current business domain scope,
  • have to deal with additional callback actions and/or non explicit processes.

Kafka allows you to easily use DDD to build systems that are event-based and that can be managed and developed with much smaller overhead than a typical Ruby on Rails MVC, callback-based system.

Freedom of choice

Ruby on Rails can be a burden from time to time. Plain Ruby can do really well. ActiveRecord can be replaced with ROM and Dry-Validation, bringing you many benefits. However, it can be really hard to introduce new concepts in a huge legacy system. If you have Kafka and Karafka, you can spin up a new experimental applications that will perform some business within a bounded context and won’t do any harm to the existing logic and/or data.

Tired of Ruby in general? Replace a single Kafka based component with a different one in a different technology that might better suit your needs.

I already have a message bus (Redis + Sidekiq)

Kafka is not a message bus. It is a distributed streaming platform.

It’s not entirely accurate to compare them as they are not the same. There are many business cases that could be solved with any of those. However, there are some significant differences,when looking from the Sidekiq perspective, that it’s good to know and understand:

  1. Kafka does not handle reentrancy – in case of a message processing failure, it is up to you to decide to do with it. It won’t be pushed back and retried automatically,
  2. Kafka does not support pushing the same message into a queue again (you can push it back but it will be a new message in the partition). Messages are immutable and once placed in Kafka, they cannot be changed,
  3. Sidekiq does not support  message broadcasting and is more command-oriented than event-oriented (do-this vs did-this), especially within Ruby on Rails and Sidekiq scope,
  4. Sidekiq does not support batch consuming,
  5. Kafka can keep events much (configurably) longer due to persistence,
  6. Kafka events can be consumed multiple times by multiple consumer groups,
  7. Kafka can be the only message bus for any publish-subscribe flows,
  8. Sidekiq message that got consumed is being removed from the queue, which means that you cannot re-consume it if needed.

In some situations, it is really good to have them work together, that’s why there’s even a Karafka Sidekiq backend for processing Kafka messages inside of Sidekiq workers. We will get to that in the next parts of this series.

Summary – Karafka as a Ruby Kafka backbone

All this introduction has had one goal: to make you familiar with the basic concepts and advantages of using Kafka with your new and existing Ruby and Rails based systems.

In the next parts of this series, we will explore Karafka, a framework used to simplify Apache Kafka based Ruby applications development.

We will start from building small applications that use Karafka as an internal and external message backbone, and then we’ll move to integrating Karafka with existing monoliths and using it to decompose and re-design your existing code base.

Somewhere down the road, in this series, I will also introduce other “non-Rails” stack tools including Traiblazer, Dry-Validation, ROM and few others, to give you a wider perspective on how much you can benefit, when combining proper tools altogether.

Karafka provides you with a lot of possibilities and you will see for yourself, that when boosted with other great tools, your code quality, architecture, performance and the way you work can jump to a totally different level.

Stay tuned :-)

Karafka (Ruby + Kafka) framework 1.0.0 Release Notes

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

It’s been over a year, since the last major Karafka framework release (0.5). During that time, we’ve managed to implement plenty of new features and fix so many bugs, that I don’t know where to start…

Today I’m pleased to announce, that we’re ready with the final 1.0 release.

Code quality

The quality of our work has always been important to us. Few months ago we’ve made a transition from polishgeeks-dev-tools to Coditsu. It allowed us to find and fix several new code offenses and to leverage the quality of the code and documentation. Here are some screens on where we were and where we are now:

There are still some things to be fixed and made better, That said, this is the best release we’ve made not only in terms of features but also in terms of quality of the code and the documentation.

For more details about the quality of the whole Karafka ecosystem, feel free to visit our Karafka Coditsu organization page.

Features

There are more and more companies taking advantage of Karafka as their backend async messaging backbone. Many of the new features were either feature requests or pull requests (including some from Shopify and other big players), that cover both performance and functionality issues existing in Karafka. It’s amazing looking into all the use-cases that people cover with this framework.

Batch processing

Believe it or not, but up until now, Karafka didn’t have batch processing functionality. It had batch messages receiving option, but each of the messages had to be processed separately. At the beginning we wanted to imitate the HTTP world, where (most of the time) a single request would equal a single controller instance logic execution.

It wasn’t the best idea ever. Or maybe it was at the time, but we’ve clearly noticed, that it took away a huge part of possibilities that Kafka brings to the table.

Luckily those days are gone! From now on you can not only receive messages in batches (which makes Karafka several times faster), but you can also process them that way. The only thing you need to do is set the batch_processing config option to true.

You can do this either on an app configuration level:

class App < Karafka::App
  setup do |config|
    config.batch_consuming = true
    config.batch_processing = true
    # Other options
  end
end

or per each topic route you define:

App.routes.draw do
  consumer_group :events_consumer do
    batch_consuming true

    topic :events_created do
      controller EventsCreatedController
      backend :inline
      batch_processing true
    end
  end
end

Once you turn this option on, you will have access to a method called #params_batch that will contain all the messages fetched from Kafka in a single batch.

It’s worth pointing out, that a single messages batch always contains messages from the same topic and the same partition.

class EventsController < ApplicationController
  def perform
    # This example uses https://github.com/zdennis/activerecord-import
    Event.import params_batch.map { |param| param[:event] }
  end
end

Keep in mind, that params_batch is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn’t directly flush them into DB.

Note: For more details about processing messages, please visit the Processing messages section of Karafka wiki.

New routing engine and multiple topic consumer groups

Routing engine provides an interface to describe how messages from all the topics should be received and processed.

Karafka routing engine used to be trivial. The only thing you could really do, was defining topics and their options. From now on, there are two modes in which routing API can operate:

  • Karafka 0.6+ consumer group namespaced style (recommended)
  • Karafka 0.5 compatible consumer group per topic style (old style)

With 0.6+ mode, you can define consumer groups subscribed to multiple topics. This will allow you to group topics based on your use-cases and other factors. It also enables overwriting most of the default settings, in case you need to create a per consumer group specific setup (for example to receive data from multiple Kafka clusters).

App.consumer_groups.draw do
  consumer_group :group_name do
    topic :example do
      controller ExampleController
    end

    topic :example2 do
      controller Example2Controller
    end
  end
end

Note: For more details about processing messages, please visit the Routing section of Karafka wiki.

#topic reference on a controller level

There are several changes related to the topic itself. The biggest one, is its assignment to a controller class, not to a controller instance. This may not seem significant, but it is. It means, that you no longer should use same controller for handling multiple topics. You can still use #topic from your controllers instance (no need to do self.class.topic) – it’s just an alias.

The second big change, is the topic owning consumer group that you can reference as well from the topic. This allows you to discover and programmatically access all the routing details you need just by playing with the topic and consumer group objects:

# From the controller instance level
topic.consumer_group.class #=> Karafka::Routing::ConsumerGroup
topic.consumer_group.name #=> 'commit_builds'
topic.name #=> 'commit_builds_scheduled'

# From the console / outside of the controller scope
App.consumer_groups.count #=> 3
App.consumer_groups.first.name #=> 'commit_builds'
App.consumer_groups.first.topics.count #=> 5

#params_batch messages with additional Kafka message details

Each Kafka message you receive, contains now following extra attributes received from Kafka:

  • partition
  • offset
  • key
  • topic

IMHO the most interesting one is the partition key, that can be used when applying ordered changes to any persistent models (key can be used to ensure proper order delivery via Kafka guaranteed partition order feature):

def perform
  params_batch.each do |param|
    User.find(param[:key]).update!(param[:user])
  end
end

#params_batch and #params lazy evaluation

params_batch is not just a simple array. The messages inside are lazy parsed upon first usage, so you shouldn’t directly flush them into DB. To do so, please use the #parsed params batch method to parse all the messages:

class EventsController < ApplicationController
  def perform
    EventStore.store(params_batch.parsed)
  end
end

Parsing will be automatically performed as well, if you decide to map parameters (or use any Enumerable module method):

class EventsController < ApplicationController
  def perform
    EventStore.store(params_batch.map { |param| param[:user] })
  end
end

Karafka does not parse all the messages at once due to performance reasons. There are cases in which you might be interested only in the last message in a batch. It would be useless on such occasions to parse everything there is.

You can use this feature to prefilter unparsed data based on partition, topic or any other non-data related aspects:

def perform
  # In this example, we will ignore non-existing users data
  # without even unparsing their details.
  # Casting to an array will disable the automatic parsing upon iterating,
  # so when we decide to fetch user data, we need to use the #retrieve method
  ids_from_partition_key = params_batch.to_a.map { |param| param[:key] }
  existing_users_ids = User.where(id: ids_from_partition_key).pluck(:id)

  params_batch.to_a.each do |param|
    param[:parsed] #=> false
    next unless existing_users_ids.include?(param[:key])
    # Some heavy parsing happens here
    param.retrieve!
    param[:parsed] #=> true
    User.where(id: param[:key]).update!(param[:user])
  end
end

Long running persistent controllers

Karafka used to create a single controller instance per each received message. This was one of the reasons why it had a quite big memory fingerprint. From now on (if not disabled by the config persistent flag), Karafka will create and use a single object for each topic partition up until its shutdown.

This change not only reduces memory and CPU usage, but also allows to do cross-batch aggregations. One of the use-cases could be normalization of the batch insert process, so the DB flushing is performed only when we reach a certain buffer size:

class EventsController < ApplicationController
  FLUSH_SIZE = 1000

  def perform
    buffer << params_batch.map { |param| param[:event] }
    if buffer.size >= FLUSH_SIZE
      data = buffer.shift(FLUSH_SIZE)
      Event.import(data)
    end
  end

  private

  def buffer
    @buffer ||= []
  end
end

Note: example above is simplified. You probably want to cover flushing buffer also in a case of process shutdown.

Encryption and authentication using SSL and SASL support

Karafka uses ruby-kafka driver to talk with Kafka. Now you can embrace all its encryption and authentication features. All the related configuration options are described here.

Limited consumer groups execution from a single process

One of the biggest downsides of Karafka 0.5 was its lack of ability to do a per consumer group scaling. Each server process was spinning up all the consumer groups from the routing. This was OK for smaller applications, but it was not enough for bigger systems. Karafka 1.0 server allows you to specify which consumer groups you want to run in a given process. This means you can easily scale your infrastructure together with your Kafka traffic.

Given set of consumer groups like this one:

App.consumer_groups.draw do
  consumer_group :events do
    # events related topics definitions
  end

  consumer_group :users do
    # users related topics definitions
  end

  consumer_group :webhooks do
    # webhooks related topics definitions
  end
end

can now run all together:

# Equals to bundle exec karafka server --consumer-groups=events users webhooks
bundle exec karafka server

in separate processes:

bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/karafka0.pid
bundle exec karafka server --consumer-groups=users --daemon --pid=./pids/karafka1.pid
bundle exec karafka server --consumer-groups=webhooks --daemon --pid=./pids/karafka2.pid

or in a mixed mode, where some of the processes run multiple groups:

bundle exec karafka server --consumer-groups=events --daemon --pid=./pids/karafka0.pid
bundle exec karafka server --consumer-groups=users webhooks --daemon --pid=./pids/karafka1.pid

Multi process management thanks to Capistrano-Karafka

In reference to the previous feature, Capistrano-Karafka has been updated as well. It now supports multi-process, multi and single group process deployment flow:

# Exemplary Capistrano deployment Karafka definitions
set :karafka_role, %i[karafka_small karafka_big]

set :karafka_small_processes, 1
set :karafka_small_consumer_groups, %w[
  group_a
]

set :karafka_big_processes, 4
set :karafka_small_consumer_groups, [
  'group_a group_b',
  'group_c group_d',
  'group_e',
  'group_f'
]

server 'example-small.com', roles: %i[karafka_small]
server 'example-big.com', roles: %i[karafka_big]

Processing backends (Inline and Sidekiq)

Karafka is no longer bound to Sidekiq. There are cases in which Sidekiq can be really helpful when processing messages (reentrancy, thread scaling, etc), however for many other it was just a redundancy (receiving from one queue and pushing back to another). The default processing mode for Karafka 1.0 is an :inline mode. It means that processing of messages will happen right after they are fetched from Kafka.

If you want to process your Kafka messages automatically in Sidekiq (without having to worry about workers or anything else), please visit the Karafka-Sidekiq-Backend README.

JRuby support

Thanks to few small changes, Karafka can be executed with JRuby 9000.

Incompatibilities

Moving forward means, that from time to time, you need to introduce some incompatibilities. There were some breaking changes, but the upgrading process shouldn’t be that hard. We will cover it in a different article soon. Here are the most important incompatibilities you might encounter during the upgrade:

  • Default boot file has been renamed from app.rb to karafka.rb
  • Removed worker glass as dependency (now and independent gem – if you use it, you need to add it to your gemfile)
  • kafka.hosts option renamed to kafka.seed_brokers – you don’t need to provide all the hosts to work with Kafka
  • start_from_beginning setting moved into kafka scope (kafka.start_from_beginning)
  • Router no longer checks for route uniqueness – now you can define same routes for multiple kafkas and do a lot of crazy stuff, so it’s your responsibility to check uniqueness
  • Change in the way we identify topics in between Karafka and Sidekiq workers. If you upgrade, please make sure, all the jobs scheduled in Sidekiq are finished before the upgrade.
  • batch_mode renamed to batch_consuming
  • Renamed #params content key to value to better resemble ruby-kafka internal messages naming convention
  • Renamed inline_mode to inline_processing to resemble other settings conventions
  • Renamed inline_processing to backend
  • Single controller needs to be used for a single topic consumption
  • Renamed before_enqueue to after_received to better resemble internal logic, since for inline backend, there is no enqueuing.
  • Due to the level on which topic and controller are related (class level), the dynamic worker selection is no longer available.
  • Renamed params #retrieve to params #retrieve! to better reflect how it works.
  • Sidekiq backend needs to be added as a separate gem (Karafka no longer depends on it)

Wiki updates

We’ve spent long hours to ensure, that our wiki is complete and consistent. We’ve added several new pages, including:

Other changes

Lower memory usage

We’ve managed to reduce number of new allocated objects down by around 70%. Karafka no longer creates so many objects for each received message and message batch as it used to. It also depends on less gems and requires much less additional libraries, so the overall memory consumption is significantly lower.

Better settings management between ruby-kafka and karafka

We’ve reorganized the whole concept of passing settings in betwen Karafka and ruby-kafka to be able to faster adapt if anything changes. The internal API is also much cleaner and easier to understand.

Dry-validation FTW

All internal validations are now powered by dry-validation schemas.

multi_json

In order to support different Ruby implementations, we’ve decided to use multi_json gem, so anyone can pick the most suitable JSON parser he needs.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/karafka-example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Olderposts

Copyright © 2017 Running with Ruby

Theme by Anders NorenUp ↑