Category: Ruby

Inside Kafka: Enhancing Data Reliability Through Transactional Offsets with Karafka

Karafka is a Ruby and Rails framework that simplifies the development of Apache Kafka-based applications. Among its varied features, the Filtering API provides enhanced control over the data flow.

The crux of this article is about managing offsets - unique identifiers for messages within Kafka's partitions. Often, there's a need to manage offsets alongside database operations within a transaction, especially when handling potential process crashes and anomalies, minimizing the risk of double processing.

For instance, if a SQL operation concludes successfully but the offset commit fails due to a crash, data could be processed again, leading to potential duplications or data integrity issues. Integrating offset management with database transactions using Karafka's Filtering API can help tackle this problem. It ensures the offset progress is tracked within the database transaction, maintaining data integrity even in crashes.

We'll explore this concept further in the coming sections, highlighting its practical implications and benefits.

The Importance of Offset Management in Kafka

In a world of streaming data, Kafka has cemented its role as an industry-standard platform for handling high-volume, real-time data feeds. At the heart of Kafka's functionality lies the concept of offsets, which are crucial in ensuring data consistency and reliability.

Offsets are unique identifiers assigned to each message within a Kafka partition. They serve as checkpoints that allow Kafka to track which messages have been consumed and which haven't. In other words, they are the mechanism by which Kafka maintains the state across distributed data streams, marking the position of every consumer in the stream. With them, it is possible to keep track of the data flowing through Kafka at any given time.

However, Kafka offset management has its challenges. Because it is entirely independent of database operations, there may be cases where a SQL operation finishes successfully, but the offset commit fails due to a process crash or an involuntary rebalance. This can lead to issues like data duplication, as when the system recovers, the data already processed by the SQL operation may be consumed again.

Below you can find an an example code and a graph that illustrates this problem:

def consume
  Event.transaction do
    messages.each do |message|
      Event.insert(message.payload)
    end
  end

  # Karafka does that automatically after batch is successfully processed
  # however we do it here as well to better illustrate this scenario
  mark_as_consumed(messages.last)
end

When using the #mark_as_consume, Karafka will store the offset locally and commit it periodically. This means there may be cases where the partition is lost, but the process still needs to be made aware of it. If that happens, while the database operation finishes, the offset won't be committed, and a different process may already be working with the same messages. This will result in inserting some of the events multiple times.

One way to partially mitigate would be to use mark_as_consumed! at the end of the transaction as follows:

def consume
  Event.transaction do
    messages.each do |message|
      Event.insert(message.payload)
    end

    # Stop the transaction if we no longer own the partition
    raise(ActiveRecord::Rollback) unless mark_as_consumed!(messages.last)
  end
end

This, however, creates a new problem: what if the offset is committed, but the transaction fails?

Wouldn't it be amazing if we could store the offsets of processed messages or batches within the same DB transactions, ensuring that both always succeed or fail together?

Note 1: By default, Karafka will wait for the consumer to finish work and commit the offsets during rebalances unless the process is forcefully evicted from the consumer group.

Note 2: Yes this could be solved also by using unique keys for events, but this is not always the case. The example was reduced in complexity to focus on the transactional offset management and not a sophisticated SQL operations case.

Transactional Offset Management with the Filtering API

With Karafkas' Filtering API you can achieve exactly that!

By integrating offset management with the transactional integrity of the database using Karafka's Filtering API, we ensure that the offset progress is tracked within the database transaction itself. This approach helps maintain data integrity, even when crashes occur, by providing atomicity to the operation - meaning that all parts of the operation must succeed for the transaction to be committed. If any part fails, the entire transaction is rolled back, avoiding inconsistencies.

Karafka Filtering API is a powerful tool that allows developers to perform various actions around the consumption process. With the Filtering API, users can register multiple filters to validate, filter out messages as they arrive, and alter the polling process by pausing or starting from a different offset.

This time we will elevate the ability exposed by the Filtering API to inject an offset taken from the database in case it would not match the one stored in Kafka.

Defining the flow expectations

There are a few things we need to take into consideration to build a transactional offset management filter for Karafka:

  • All SQL operations should have a timeout shorter than max.poll.interval.ms to ensure we do not end up with an endless cycle of forced rebalances.
  • Upon a conflict between the offset present in the database and Kafka, database offset should have the higher priority.
  • Number of partitions is known (to simplify our code)
  • Each topic partition has a pre-existing row in an appropriate table
  • Our per-partition rows are always accessed with the FOR UPDATE lock since they should be only used by the consumers that claim partition ownership. Those rows should not be used for anything else.
  • Our per-partition row is used as a lock around the transaction happening during the consumption, ensuring that in case of reassignment, the other process is blocked on the initial offset selection until the transaction is finalized.

    Keeping all of the above in mind, we can draw the expected flow of the initial offset selection:



    We still have to remember that consumption may happen with a delay and that the partition may be lost between the messages' delivery and their consumption. However, this is a separate issue we will tackle soon.

Because of the DB lock, we now know that:

  • no one else owns the lock, which means there are no currently running operations on any other processes operating on the same topic partition (it does not mean there won't be any before the consumption in our process, but as mentioned, we will tackle this as well).
  • we have the current Kafka offset and the DB one, and we can ensure that we start from the transactional one in case of a conflict.

What about the consumption itself? Can we just run it as previously? Well almost. We need a way to ensure that at the moment of locking the row, we own the partition. Yes, we may lose it during the processing, but as long as we hold the lock, any other process attempting to establish its starting offset will have to wait.

While the processing may end when we no longer own the partition, it was started with ownership confirmed. Hence, as long as we hold the lock, no other process can fetch the DB offset. This means that we can safely finish our DB operations and ignore potential Kafka offset commit failure.

Implementation

Partitions table

There's not much in our table design. We need to make sure we have a row per each topic partition and that we have a way to store the offset.

class CreatePartitions < ActiveRecord::Migration[6.1]
  def change
    create_table :kafka_partitions do |t|
      t.string :topic_with_partition, unique: true, null: false
      t.integer :offset, limit: 8, default: 0, null: fase

      t.timestamps
    end
  end
end

Locking code

Code to ensure, that we can work with a given partition fully locked looks as followed:

class Partition < ApplicationRecord
  self.table_name = :kafka_partitions

  class << self
    def locked(topic, partition, &block)
      partition = find_by!(topic_with_partition: "#{topic}-#{partition}")

      partition.with_lock('FOR UPDATE') do
        yield(partition)
      end
    end
  end

  def mark_as_consumed(message)
    update!(offset: message.offset + 1)
  end
end

Filter for offset management

The most complex code resides in the filter. For the sake of simplicity, I left the lock timeout handling out:

class OffsetManager < Karafka::Pro::Processing::Filters::Base
  def initialize(topic, partition)
    @topic = topic
    @partition = partition
    @executed = false
    @analyze = false
  end

  def apply!(messages)
    # This filter should resolve sattes only on the first run because it's the first
    # one after the partition assignment
    # Every Karafka filter instance is reinitialized after a rebalance
    if @executed
      @analyze = false
      return
    end

    # Care only on first run
    @executed = true
    @analyze = true

    ::Partition.locked(@topic, @partition) do |partition|
      kafka_offset = messages.first.offset

      # Selecting max will ensure that we always prioritize the DB one and since
      # we always commit the transactional offset first, no risk in max
      @start_offset = [partition.offset, kafka_offset].max
      @mismatch = partition.offset != kafka_offset
    end

    # This will ensure that we do not  pass any messages for consumption when seek will run
    messages.clear if @mismatch
  end

  def applied?
    true
  end

  def action
    @analyze && @mismatch ? :seek : :skip
  end

  def cursor
    ::Karafka::Messages::Seek.new(
      @topic,
      @partition,
      @start_offset
    )
  end
end

You can register this filter as follows:

topic :my_topic do
  consumer Consumer
  filter ->(topic, partition) { OffsetManager.new(topic, partition) }
end

Note that it is crucial to make sure this is the first filter that runs, as it needs to be aware of the initial offset received alongside the first message from Kafka.

Consumption alignment

The last remaining thing is the alignment of our consumption process. Similarly to our initial code, we do need to run in a transaction, however now it is being taken care of by our Partition#locked wrapper.

We use a synchronous #revoked? method that will return false in case our consumer lost the assignments it was working with.

def consume
  successful = false

  Partition.locked(
    messages.metadata.topic,
    messages.metadata.partition
  ) do |partition|
    # Do not proceed if we have lost the assignment
    raise(ActiveRecord::Rollback) if revoked?

    # Do the work
    messages.each do |message|
      Event.insert(message.payload)
    end

    # Store the DB offset
    partition.mark_as_consumed(messages.last)
    successful = true
  end

  return unless successful

  # Store Kafka offset
  mark_as_consumed(messages.last)
end

Conclusion

My focus in this article was the careful and efficient management of Kafka's offsets, which are crucial for maintaining data integrity and consistency.

We explored how to integrate offset management with database transactions for handling scenarios involving process crashes. By doing so, the offset progress is meticulously tracked within the database transaction, significantly reducing the risk of data duplication or loss.

However, it's important to note that the examples and strategies discussed in this article have been simplified for clarity and understanding. In a real-world, production-grade environment, some extra development and adjustments may be required.

Karafka framework 2.1 announcement

I'm happy to announce that Karafka 2.1 has just been released.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework.

The Karafka 2.1 release builds upon the foundation set by its predecessor, 2.0, making it a seamless continuation rather than a major rewrite. This means that upgrading from version 2.0 to 2.1 can be done without extensive work or significant modifications to existing codebases. With Karafka 2.1, you can expect improved features and enhancements while maintaining the stability and compatibility you have come to rely on.

Note: There are no extensive upgrade notes, and you only need to follow those guidelines.

Noticeable features and improvements

Virtual Offset Management for Virtual Partitions

Virtual Partitions allow you to parallelize the processing of data from a single partition. This can drastically increase throughput when IO operations are involved.

While the default scaling strategy for Kafka consumers is to increase partitions count and number of consumers, in many cases, this will not provide you with desired effects. In the end, you cannot go with this strategy beyond assigning one process per single topic partition. That means that without a way to parallelize the work further, IO may become your biggest bottleneck.

Virtual Partitions solve this problem by providing you with the means to further parallelize work by creating "virtual" partitions that will operate independently but will, as a collective processing unit, obey all the Kafka warranties.

Up until now, when utilizing Virtual Partitions feature, the offset management was entirely collective. This meant that if any error occurred within any virtual partition during message processing, the entire set of virtual partitions from the starting offset would need to be processed again.

However, Karafka 2.1 introduces the concept of Virtual Offset Management, which enhances the previous offset management mechanism in several ways. When Karafka consumes messages using Virtual Partitions, it leverages Virtual Offset Management, which is built on top of the existing offset management mechanism. This feature allows for more granular and precise handling of offsets within each virtual partition.

While each of the Virtual Partitions operates independently, they are bound together to a single Kafka Partition. Karafka transforms the knowledge of messages marked as consumed in each virtual partition into a Kafka offset that can be committed. This process involves computing the highest possible offset by considering all the messages marked as consumed from all the virtual partitions. By analyzing the offsets across virtual partitions, Karafka can determine the maximum offset reached, allowing for an accurate and reliable offset commit to Kafka. This ensures that the state of consumption is properly synchronized and maintained.

Whenever you mark_as_consumed when using Virtual Partitions, Karafka will ensure that Kafka receives the highest possible continuous offset matching the underlying partition.

Below you can find a few examples of how Karafka transforms messages marked as consumed in virtual partitions into an appropriate offset that can be committed to Kafka.

With Virtual Offset Management, Karafka keeps track of each virtual partition's offset separately. In case of any error occurring within a specific virtual partition, only that particular partition will be processed again from the point of the error.

This improvement significantly enhances the efficiency and reliability of message processing when working with Virtual Partitions. It minimizes redundant processing by isolating errors to the affected virtual partition, thereby reducing the overall processing time and resource consumption.

Below you can find a visualization of data re-processing from a single topic partition distributed across three virtual partitions. Karafka knows which of the messages were not processed successfully and will re-process only those when retrying.

CurrentAttributes support in ActiveJob

The Karafka ActiveJob adapter has been updated to support the Ruby on Rails CurrentAttributes feature. If you want to use it, you need to put this in your karafka.rb config file (or initializer):

require 'karafka/active_job/current_attributes'
Karafka::ActiveJob::CurrentAttributes.persist('YourCurrentAttributesClass')
# or multiple current attributes
Karafka::ActiveJob::CurrentAttributes.persist('YourCurrentAttributesClass', 'AnotherCurrentAttributesClass')

When you set your current attributes and create a background job, it will execute with them set.

class Current < ActiveSupport::CurrentAttributes
  attribute :user_id
end

class Job < ActiveJob::Base
  def perform
    puts 'user_id: #{Current.user_id}'
  end
end

Karafka::ActiveJob::CurrentAttributes.persist('Current')
Current.user_id = 1
Job.perform_later # the job will output "user_id: 1"

Karafka handles CurrentAttributes by including them as part of the job serialization process before pushing them to Kafka. These attributes are then deserialized by the ActiveJob consumer and set back in your CurrentAttributes classes before executing the job.

This approach is based on Sidekiq's approach to persisting current attributes: Sidekiq and Request-Specific Context.

Kubernetes Liveness support

I'm excited to share that Karafka 2.1 has introduced a new feature that will significantly enhance the reliability and stability of your Karafka server processes. With the addition of an out-of-the-box Kubernetes Liveness Listener, Karafka now allows for seamless implementation of liveness checks within your Kubernetes environment. But why is checking process liveness so important?

Liveness checks are critical for ensuring that a process runs as expected and actively consumes data. By enabling the Kubernetes Liveness Listener in Karafka 2.1, you can easily configure liveness checks without extra effort. This means that Kubernetes will automatically monitor the health of your Karafka server process, periodically sending requests to verify its liveness.

However, sometimes a process may appear active, yet it can actually be stuck on user logic. This situation can be challenging to detect without proper instrumentation. While the process might respond to system-level signals, it could be unresponsive within its user logic or certain parts of the codebase. These issues, often called "liveness bugs," can lead to degraded performance, data inconsistencies, or even complete service disruptions.

With the Kubernetes Liveness Listener in Karafka 2.1, you can proactively detect such liveness bugs. By regularly checking the health of your Karafka server process, Kubernetes will be able to identify situations where the process is unresponsive, even if it appears active from a system-level perspective. This enables you to take timely actions, such as restarting the process or triggering alerts for investigation, ensuring the overall stability and reliability of your Karafka applications.

Subscribe the Kubernetes listener within your Ruby code:

require 'karafka/instrumentation/vendors/kubernetes/liveness_listener'

listener = ::Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener.new(
  port: 3000,
  # Make sure polling happens at least once every 5 minutes
  polling_ttl: 300_000,
  # Make sure that consuming does not hang and does not take more than 1 minute
  consuming_ttl: 60_000
)

Karafka.monitor.subscribe(listener)

And add a liveness probe to your Karafka deployment spec:

livenessProbe:
  httpGet:
    path: /
    port: 3000
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5

Upgrade Notes

No significant changes are needed. Just follow the changelog-based upgrade notes.

Karafka Pro

Karafka Pro has many valuable, well-documented, well-tested functionalities that can significantly improve your day-to-day operations with Kafka in Ruby. It also introduces commercial support, as due to a sheer number of questions and requests, I do need to have a way to prioritize those.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

References


Stay tuned and don't forget to join our Slack channel.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑