Tag: Ruby

Inside Kafka: Enhancing Data Reliability Through Transactional Offsets with Karafka

Karafka is a Ruby and Rails framework that simplifies the development of Apache Kafka-based applications. Among its varied features, the Filtering API provides enhanced control over the data flow.

The crux of this article is about managing offsets - unique identifiers for messages within Kafka's partitions. Often, there's a need to manage offsets alongside database operations within a transaction, especially when handling potential process crashes and anomalies, minimizing the risk of double processing.

For instance, if a SQL operation concludes successfully but the offset commit fails due to a crash, data could be processed again, leading to potential duplications or data integrity issues. Integrating offset management with database transactions using Karafka's Filtering API can help tackle this problem. It ensures the offset progress is tracked within the database transaction, maintaining data integrity even in crashes.

We'll explore this concept further in the coming sections, highlighting its practical implications and benefits.

The Importance of Offset Management in Kafka

In a world of streaming data, Kafka has cemented its role as an industry-standard platform for handling high-volume, real-time data feeds. At the heart of Kafka's functionality lies the concept of offsets, which are crucial in ensuring data consistency and reliability.

Offsets are unique identifiers assigned to each message within a Kafka partition. They serve as checkpoints that allow Kafka to track which messages have been consumed and which haven't. In other words, they are the mechanism by which Kafka maintains the state across distributed data streams, marking the position of every consumer in the stream. With them, it is possible to keep track of the data flowing through Kafka at any given time.

However, Kafka offset management has its challenges. Because it is entirely independent of database operations, there may be cases where a SQL operation finishes successfully, but the offset commit fails due to a process crash or an involuntary rebalance. This can lead to issues like data duplication, as when the system recovers, the data already processed by the SQL operation may be consumed again.

Below you can find an an example code and a graph that illustrates this problem:

def consume
  Event.transaction do
    messages.each do |message|

  # Karafka does that automatically after batch is successfully processed
  # however we do it here as well to better illustrate this scenario

When using the #mark_as_consume, Karafka will store the offset locally and commit it periodically. This means there may be cases where the partition is lost, but the process still needs to be made aware of it. If that happens, while the database operation finishes, the offset won't be committed, and a different process may already be working with the same messages. This will result in inserting some of the events multiple times.

One way to partially mitigate would be to use mark_as_consumed! at the end of the transaction as follows:

def consume
  Event.transaction do
    messages.each do |message|

    # Stop the transaction if we no longer own the partition
    raise(ActiveRecord::Rollback) unless mark_as_consumed!(messages.last)

This, however, creates a new problem: what if the offset is committed, but the transaction fails?

Wouldn't it be amazing if we could store the offsets of processed messages or batches within the same DB transactions, ensuring that both always succeed or fail together?

Note 1: By default, Karafka will wait for the consumer to finish work and commit the offsets during rebalances unless the process is forcefully evicted from the consumer group.

Note 2: Yes this could be solved also by using unique keys for events, but this is not always the case. The example was reduced in complexity to focus on the transactional offset management and not a sophisticated SQL operations case.

Transactional Offset Management with the Filtering API

With Karafkas' Filtering API you can achieve exactly that!

By integrating offset management with the transactional integrity of the database using Karafka's Filtering API, we ensure that the offset progress is tracked within the database transaction itself. This approach helps maintain data integrity, even when crashes occur, by providing atomicity to the operation - meaning that all parts of the operation must succeed for the transaction to be committed. If any part fails, the entire transaction is rolled back, avoiding inconsistencies.

Karafka Filtering API is a powerful tool that allows developers to perform various actions around the consumption process. With the Filtering API, users can register multiple filters to validate, filter out messages as they arrive, and alter the polling process by pausing or starting from a different offset.

This time we will elevate the ability exposed by the Filtering API to inject an offset taken from the database in case it would not match the one stored in Kafka.

Defining the flow expectations

There are a few things we need to take into consideration to build a transactional offset management filter for Karafka:

  • All SQL operations should have a timeout shorter than max.poll.interval.ms to ensure we do not end up with an endless cycle of forced rebalances.
  • Upon a conflict between the offset present in the database and Kafka, database offset should have the higher priority.
  • Number of partitions is known (to simplify our code)
  • Each topic partition has a pre-existing row in an appropriate table
  • Our per-partition rows are always accessed with the FOR UPDATE lock since they should be only used by the consumers that claim partition ownership. Those rows should not be used for anything else.
  • Our per-partition row is used as a lock around the transaction happening during the consumption, ensuring that in case of reassignment, the other process is blocked on the initial offset selection until the transaction is finalized.

    Keeping all of the above in mind, we can draw the expected flow of the initial offset selection:

    We still have to remember that consumption may happen with a delay and that the partition may be lost between the messages' delivery and their consumption. However, this is a separate issue we will tackle soon.

Because of the DB lock, we now know that:

  • no one else owns the lock, which means there are no currently running operations on any other processes operating on the same topic partition (it does not mean there won't be any before the consumption in our process, but as mentioned, we will tackle this as well).
  • we have the current Kafka offset and the DB one, and we can ensure that we start from the transactional one in case of a conflict.

What about the consumption itself? Can we just run it as previously? Well almost. We need a way to ensure that at the moment of locking the row, we own the partition. Yes, we may lose it during the processing, but as long as we hold the lock, any other process attempting to establish its starting offset will have to wait.

While the processing may end when we no longer own the partition, it was started with ownership confirmed. Hence, as long as we hold the lock, no other process can fetch the DB offset. This means that we can safely finish our DB operations and ignore potential Kafka offset commit failure.


Partitions table

There's not much in our table design. We need to make sure we have a row per each topic partition and that we have a way to store the offset.

class CreatePartitions < ActiveRecord::Migration[6.1]
  def change
    create_table :kafka_partitions do |t|
      t.string :topic_with_partition, unique: true, null: false
      t.integer :offset, limit: 8, default: 0, null: fase


Locking code

Code to ensure, that we can work with a given partition fully locked looks as followed:

class Partition < ApplicationRecord
  self.table_name = :kafka_partitions

  class << self
    def locked(topic, partition, &block)
      partition = find_by!(topic_with_partition: "#{topic}-#{partition}")

      partition.with_lock('FOR UPDATE') do

  def mark_as_consumed(message)
    update!(offset: message.offset + 1)

Filter for offset management

The most complex code resides in the filter. For the sake of simplicity, I left the lock timeout handling out:

class OffsetManager < Karafka::Pro::Processing::Filters::Base
  def initialize(topic, partition)
    @topic = topic
    @partition = partition
    @executed = false
    @analyze = false

  def apply!(messages)
    # This filter should resolve sattes only on the first run because it's the first
    # one after the partition assignment
    # Every Karafka filter instance is reinitialized after a rebalance
    if @executed
      @analyze = false

    # Care only on first run
    @executed = true
    @analyze = true

    ::Partition.locked(@topic, @partition) do |partition|
      kafka_offset = messages.first.offset

      # Selecting max will ensure that we always prioritize the DB one and since
      # we always commit the transactional offset first, no risk in max
      @start_offset = [partition.offset, kafka_offset].max
      @mismatch = partition.offset != kafka_offset

    # This will ensure that we do not  pass any messages for consumption when seek will run
    messages.clear if @mismatch

  def applied?

  def action
    @analyze && @mismatch ? :seek : :skip

  def cursor

You can register this filter as follows:

topic :my_topic do
  consumer Consumer
  filter ->(topic, partition) { OffsetManager.new(topic, partition) }

Note that it is crucial to make sure this is the first filter that runs, as it needs to be aware of the initial offset received alongside the first message from Kafka.

Consumption alignment

The last remaining thing is the alignment of our consumption process. Similarly to our initial code, we do need to run in a transaction, however now it is being taken care of by our Partition#locked wrapper.

We use a synchronous #revoked? method that will return false in case our consumer lost the assignments it was working with.

def consume
  successful = false

  ) do |partition|
    # Do not proceed if we have lost the assignment
    raise(ActiveRecord::Rollback) if revoked?

    # Do the work
    messages.each do |message|

    # Store the DB offset
    successful = true

  return unless successful

  # Store Kafka offset


My focus in this article was the careful and efficient management of Kafka's offsets, which are crucial for maintaining data integrity and consistency.

We explored how to integrate offset management with database transactions for handling scenarios involving process crashes. By doing so, the offset progress is meticulously tracked within the database transaction, significantly reducing the risk of data duplication or loss.

However, it's important to note that the examples and strategies discussed in this article have been simplified for clarity and understanding. In a real-world, production-grade environment, some extra development and adjustments may be required.

Delaying Kafka Messages Processing with Karafka: A Deep Dive into Delayed Topics

Kafka is a popular distributed streaming platform that is commonly used for building real-time data pipelines and streaming applications. One of the core features of Kafka is its ability to handle high-volume, real-time data streams and reliably process and distribute them to multiple consumers. However, in some cases, it may be necessary to postpone the processing of certain messages for many reasons.

This is where the Karafka's Delayed Topics feature comes in. This is a powerful mechanism that allows you to delay the consumption of messages for a later time, giving you greater control over your message processing and the ability to optimize your application's performance. In this article, we will take a deep dive into the Karafka's Delayed Topics feature and explore how it can be used to delay message processing in the Kafka-based applications.

The Power of Patience: Reasons to Postpone Your Kafka Messages Processing

Usually, you want to get the data as quickly as possible to ensure that your application can respond in real time. However, there are some situations where postponing messages processing can benefit your system. For example:

  • If you are implementing the retry logic, processing backoff can increase the chances of successful processing and reduce the number of failed messages that are sent to a dead letter queue. In this case, delaying processing can reduce the number of retries and minimize the impact of failed messages on your system.

  • Delaying the processing of data sent to a Kafka dead letter queue can reduce the number of retries and minimize the impact of failed messages on your system. By introducing the processing lag, you can make time for your system to recover and address any issues that may have caused the message to fail. This can be particularly useful if you are experiencing network latency, hardware issues, or other transient errors.

  • Delaying processing can give you more time to analyze the failed messages and take corrective actions. Collecting the failed messages in a dead-letter queue allows you to examine them in more detail to identify patterns or common issues. You can also use this information to improve your system and reduce the likelihood of similar failures in the future.

  • Processing lag can also be helpful in data crawling applications, where the immediately published data may not be immediately available due to HTTP caches. In such cases, it may be beneficial to always wait for the processing of messages for a fixed period to ensure that all the caches have expired and the data is fully available. By delaying the processing, you can avoid working with incomplete or stale data and ensure that your application works with the latest information.

Sleeping on the Job: A Problem with Kafka Delayed Processing

One common way developers delay the processing of Kafka messages is by using a sleep function to pause processing for a fixed period. While this approach is simple and easy to implement, it could be more efficient for a few reasons.

def consume
  messages.each do |message|
    time_diff = Time.now - message.timestamp

    if time_diff < 5.minutes
      sleep(5.minutes - time_diff)

    puts "Message key: #{message.key}, value: #{message.value}"

Using the sleep function can lead to unnecessarily long execution times. This can result in delays that impact the overall performance of your application. On top of that, Kafka may decide, that your process has hanged and may remove you out of the consumer group, creating unnecessery rebalances.

Using the sleep function can also result in idle threads, which can waste system resources and reduce the overall efficiency of your application. Idle threads can also prevent other messages from being processed, which can cause delays and bottlenecks in your system.

Below, you can find an example of how sleeping on partition 0 for ten seconds can impact Karafka's ability to process data from other topics/partitions*. In this scenario, we are producing and consuming messages simultaneously: 1 record per millisecond and using five partitions.

*Karafka can be configured in an optimized way to mitigate this, however, it was not done to illustrate the challenges related to sleeping

As you can see, we're immediately experiencing a lag on all of the partitions, and it is equal to the number of messages we produced per partition (10 000 messages for 10 seconds distributed to 5 partitions => 2000 messages per partition).

Below, you can also see that the age of messages aligns with the lag, effectively delaying all of our processing:

By default, Karafka and other Kafka consumers poll data in batches, process the data and then poll more. Only when all the work is finished for all the topics/partitions is more data polled. This is why you are seeing an increased lag for all the partitions, despite most of them not doing any work.

In general, it is more efficient and optimal to use a dedicated delay mechanism that is designed for Kafka message processing, such as the one built into Karafka. This approach can help you to optimize resource utilization, reduce processing delays, and ensure that your application remains performant and responsive.

Waking Up to a Better Solution for Kafka Delayed Processing

Rather than using sleep to delay consumption, a more effective approach is to pause partitions until the expected time has passed. While this can be more complex than sleeping, it provides a more optimal way of controlling the message processing delays. By pausing the partition, you can ensure that your application does not waste the system resources.

However, implementing the partition pausing correctly can be challenging, requiring considering things like rebalances and revocations. These events can cause the partition to be reassigned to a different consumer, impacting the timing of your processing. Additionally, if you pause for too long, you risk causing the delays that affect the performance of your application.

Fortunately, with Karafka, you can quickly implement this mechanism without worrying about these complexities. The Delayed Topics feature works seamlessly with other Karafka components, such as the Virtual Partitions and Long-Running Jobs, to provide a robust and efficient way of managing the processing delays. By leveraging this built-in mechanism, you can ensure that your application remains performant and responsive.

All that is needed for it to work is the delay_by routing definition for a given topic:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...

  routes.draw do
    topic :orders do
      consumer OrdersConsumer
      # Always delay processing messages from the orders by 10 seconds
      # Note: the code for delaying single partition is slightly different, but the outcome is the same

When you apply this logic to the code and replace sleep with Karafkas' automatic delay, things look drastically different:

Not only is the lag close to zero for non-paused partitions, but the processing is within the polling frequency (note the logarithmic scale):

Only the topic/partion that you want is being delayed without impacting the rest of the data you are working with! That is because you do not block polling. Instead, after the remaining four partitions work is done, you immediately request more data to work with.

Limitations of pausing and resuming

While this feature provides a valuable way to postpone the messages processing, it does have some limitations to look out for.

One significant limitation is precision. It is not always millisecond-exact. This is because the Delayed Topics feature works by pausing a given partition for a specified amount of time and then unpausing it after that time has elapsed. However, the unpausing happens before the polling happens, so there can be a slight delay between when the partition is unpaused and when the delayed message is processed.

This limitation also means that the age of the messages may be slightly higher than the required minimum but will never be less than expected.

Below, you can see the "ten seconds + polling" histogram. While the theoretical max is equal to 10 seconds + max wait time, most of the time, we're close to 10 seconds + 10% of max wait time.


Karafka's Delayed Topics is a powerful feature that allows for arbitrary delays when working with messages from specific topics. It can be used in various use cases, such as e-commerce, social media moderation, and finance. By delaying message processing, you can perform additional processing or validation, moderate user-generated content, and introduce a retry mechanism for failed messages.

Building a complex and reliable open-source is neither easy nor fast, so Karafka Pro exists. It has many functionalities, including the one described in this blog post, that can help you build robust and performant event-driven applications.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

Copyright © 2023 Closer to Code

Theme by Anders NorenUp ↑