Karafka framework 2.1 announcement

I'm happy to announce that Karafka 2.1 has just been released.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework.

The Karafka 2.1 release builds upon the foundation set by its predecessor, 2.0, making it a seamless continuation rather than a major rewrite. This means that upgrading from version 2.0 to 2.1 can be done without extensive work or significant modifications to existing codebases. With Karafka 2.1, you can expect improved features and enhancements while maintaining the stability and compatibility you have come to rely on.

Note: There are no extensive upgrade notes, and you only need to follow those guidelines.

Noticeable features and improvements

Virtual Offset Management for Virtual Partitions

Virtual Partitions allow you to parallelize the processing of data from a single partition. This can drastically increase throughput when IO operations are involved.

While the default scaling strategy for Kafka consumers is to increase partitions count and number of consumers, in many cases, this will not provide you with desired effects. In the end, you cannot go with this strategy beyond assigning one process per single topic partition. That means that without a way to parallelize the work further, IO may become your biggest bottleneck.

Virtual Partitions solve this problem by providing you with the means to further parallelize work by creating "virtual" partitions that will operate independently but will, as a collective processing unit, obey all the Kafka warranties.

Up until now, when utilizing Virtual Partitions feature, the offset management was entirely collective. This meant that if any error occurred within any virtual partition during message processing, the entire set of virtual partitions from the starting offset would need to be processed again.

However, Karafka 2.1 introduces the concept of Virtual Offset Management, which enhances the previous offset management mechanism in several ways. When Karafka consumes messages using Virtual Partitions, it leverages Virtual Offset Management, which is built on top of the existing offset management mechanism. This feature allows for more granular and precise handling of offsets within each virtual partition.

While each of the Virtual Partitions operates independently, they are bound together to a single Kafka Partition. Karafka transforms the knowledge of messages marked as consumed in each virtual partition into a Kafka offset that can be committed. This process involves computing the highest possible offset by considering all the messages marked as consumed from all the virtual partitions. By analyzing the offsets across virtual partitions, Karafka can determine the maximum offset reached, allowing for an accurate and reliable offset commit to Kafka. This ensures that the state of consumption is properly synchronized and maintained.

Whenever you mark_as_consumed when using Virtual Partitions, Karafka will ensure that Kafka receives the highest possible continuous offset matching the underlying partition.

Below you can find a few examples of how Karafka transforms messages marked as consumed in virtual partitions into an appropriate offset that can be committed to Kafka.

With Virtual Offset Management, Karafka keeps track of each virtual partition's offset separately. In case of any error occurring within a specific virtual partition, only that particular partition will be processed again from the point of the error.

This improvement significantly enhances the efficiency and reliability of message processing when working with Virtual Partitions. It minimizes redundant processing by isolating errors to the affected virtual partition, thereby reducing the overall processing time and resource consumption.

Below you can find a visualization of data re-processing from a single topic partition distributed across three virtual partitions. Karafka knows which of the messages were not processed successfully and will re-process only those when retrying.

CurrentAttributes support in ActiveJob

The Karafka ActiveJob adapter has been updated to support the Ruby on Rails CurrentAttributes feature. If you want to use it, you need to put this in your karafka.rb config file (or initializer):

require 'karafka/active_job/current_attributes'
Karafka::ActiveJob::CurrentAttributes.persist('YourCurrentAttributesClass')
# or multiple current attributes
Karafka::ActiveJob::CurrentAttributes.persist('YourCurrentAttributesClass', 'AnotherCurrentAttributesClass')

When you set your current attributes and create a background job, it will execute with them set.

class Current < ActiveSupport::CurrentAttributes
  attribute :user_id
end

class Job < ActiveJob::Base
  def perform
    puts 'user_id: #{Current.user_id}'
  end
end

Karafka::ActiveJob::CurrentAttributes.persist('Current')
Current.user_id = 1
Job.perform_later # the job will output "user_id: 1"

Karafka handles CurrentAttributes by including them as part of the job serialization process before pushing them to Kafka. These attributes are then deserialized by the ActiveJob consumer and set back in your CurrentAttributes classes before executing the job.

This approach is based on Sidekiq's approach to persisting current attributes: Sidekiq and Request-Specific Context.

Kubernetes Liveness support

I'm excited to share that Karafka 2.1 has introduced a new feature that will significantly enhance the reliability and stability of your Karafka server processes. With the addition of an out-of-the-box Kubernetes Liveness Listener, Karafka now allows for seamless implementation of liveness checks within your Kubernetes environment. But why is checking process liveness so important?

Liveness checks are critical for ensuring that a process runs as expected and actively consumes data. By enabling the Kubernetes Liveness Listener in Karafka 2.1, you can easily configure liveness checks without extra effort. This means that Kubernetes will automatically monitor the health of your Karafka server process, periodically sending requests to verify its liveness.

However, sometimes a process may appear active, yet it can actually be stuck on user logic. This situation can be challenging to detect without proper instrumentation. While the process might respond to system-level signals, it could be unresponsive within its user logic or certain parts of the codebase. These issues, often called "liveness bugs," can lead to degraded performance, data inconsistencies, or even complete service disruptions.

With the Kubernetes Liveness Listener in Karafka 2.1, you can proactively detect such liveness bugs. By regularly checking the health of your Karafka server process, Kubernetes will be able to identify situations where the process is unresponsive, even if it appears active from a system-level perspective. This enables you to take timely actions, such as restarting the process or triggering alerts for investigation, ensuring the overall stability and reliability of your Karafka applications.

Subscribe the Kubernetes listener within your Ruby code:

require 'karafka/instrumentation/vendors/kubernetes/liveness_listener'

listener = ::Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener.new(
  port: 3000,
  # Make sure polling happens at least once every 5 minutes
  polling_ttl: 300_000,
  # Make sure that consuming does not hang and does not take more than 1 minute
  consuming_ttl: 60_000
)

Karafka.monitor.subscribe(listener)

And add a liveness probe to your Karafka deployment spec:

livenessProbe:
  httpGet:
    path: /
    port: 3000
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5

Upgrade Notes

No significant changes are needed. Just follow the changelog-based upgrade notes.

Karafka Pro

Karafka Pro has many valuable, well-documented, well-tested functionalities that can significantly improve your day-to-day operations with Kafka in Ruby. It also introduces commercial support, as due to a sheer number of questions and requests, I do need to have a way to prioritize those.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

References


Stay tuned and don't forget to join our Slack channel.

Delaying Kafka Messages Processing with Karafka: A Deep Dive into Delayed Topics

Kafka is a popular distributed streaming platform that is commonly used for building real-time data pipelines and streaming applications. One of the core features of Kafka is its ability to handle high-volume, real-time data streams and reliably process and distribute them to multiple consumers. However, in some cases, it may be necessary to postpone the processing of certain messages for many reasons.

This is where the Karafka's Delayed Topics feature comes in. This is a powerful mechanism that allows you to delay the consumption of messages for a later time, giving you greater control over your message processing and the ability to optimize your application's performance. In this article, we will take a deep dive into the Karafka's Delayed Topics feature and explore how it can be used to delay message processing in the Kafka-based applications.

The Power of Patience: Reasons to Postpone Your Kafka Messages Processing

Usually, you want to get the data as quickly as possible to ensure that your application can respond in real time. However, there are some situations where postponing messages processing can benefit your system. For example:

  • If you are implementing the retry logic, processing backoff can increase the chances of successful processing and reduce the number of failed messages that are sent to a dead letter queue. In this case, delaying processing can reduce the number of retries and minimize the impact of failed messages on your system.

  • Delaying the processing of data sent to a Kafka dead letter queue can reduce the number of retries and minimize the impact of failed messages on your system. By introducing the processing lag, you can make time for your system to recover and address any issues that may have caused the message to fail. This can be particularly useful if you are experiencing network latency, hardware issues, or other transient errors.

  • Delaying processing can give you more time to analyze the failed messages and take corrective actions. Collecting the failed messages in a dead-letter queue allows you to examine them in more detail to identify patterns or common issues. You can also use this information to improve your system and reduce the likelihood of similar failures in the future.

  • Processing lag can also be helpful in data crawling applications, where the immediately published data may not be immediately available due to HTTP caches. In such cases, it may be beneficial to always wait for the processing of messages for a fixed period to ensure that all the caches have expired and the data is fully available. By delaying the processing, you can avoid working with incomplete or stale data and ensure that your application works with the latest information.

Sleeping on the Job: A Problem with Kafka Delayed Processing

One common way developers delay the processing of Kafka messages is by using a sleep function to pause processing for a fixed period. While this approach is simple and easy to implement, it could be more efficient for a few reasons.

def consume
  messages.each do |message|
    time_diff = Time.now - message.timestamp

    if time_diff < 5.minutes
      sleep(5.minutes - time_diff)
      redo
    end

    puts "Message key: #{message.key}, value: #{message.value}"
  end
end

Using the sleep function can lead to unnecessarily long execution times. This can result in delays that impact the overall performance of your application. On top of that, Kafka may decide, that your process has hanged and may remove you out of the consumer group, creating unnecessery rebalances.

Using the sleep function can also result in idle threads, which can waste system resources and reduce the overall efficiency of your application. Idle threads can also prevent other messages from being processed, which can cause delays and bottlenecks in your system.

Below, you can find an example of how sleeping on partition 0 for ten seconds can impact Karafka's ability to process data from other topics/partitions*. In this scenario, we are producing and consuming messages simultaneously: 1 record per millisecond and using five partitions.

*Karafka can be configured in an optimized way to mitigate this, however, it was not done to illustrate the challenges related to sleeping

As you can see, we're immediately experiencing a lag on all of the partitions, and it is equal to the number of messages we produced per partition (10 000 messages for 10 seconds distributed to 5 partitions => 2000 messages per partition).

Below, you can also see that the age of messages aligns with the lag, effectively delaying all of our processing:

By default, Karafka and other Kafka consumers poll data in batches, process the data and then poll more. Only when all the work is finished for all the topics/partitions is more data polled. This is why you are seeing an increased lag for all the partitions, despite most of them not doing any work.

In general, it is more efficient and optimal to use a dedicated delay mechanism that is designed for Kafka message processing, such as the one built into Karafka. This approach can help you to optimize resource utilization, reduce processing delays, and ensure that your application remains performant and responsive.

Waking Up to a Better Solution for Kafka Delayed Processing

Rather than using sleep to delay consumption, a more effective approach is to pause partitions until the expected time has passed. While this can be more complex than sleeping, it provides a more optimal way of controlling the message processing delays. By pausing the partition, you can ensure that your application does not waste the system resources.

However, implementing the partition pausing correctly can be challenging, requiring considering things like rebalances and revocations. These events can cause the partition to be reassigned to a different consumer, impacting the timing of your processing. Additionally, if you pause for too long, you risk causing the delays that affect the performance of your application.

Fortunately, with Karafka, you can quickly implement this mechanism without worrying about these complexities. The Delayed Topics feature works seamlessly with other Karafka components, such as the Virtual Partitions and Long-Running Jobs, to provide a robust and efficient way of managing the processing delays. By leveraging this built-in mechanism, you can ensure that your application remains performant and responsive.

All that is needed for it to work is the delay_by routing definition for a given topic:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    topic :orders do
      consumer OrdersConsumer
      # Always delay processing messages from the orders by 10 seconds
      # Note: the code for delaying single partition is slightly different, but the outcome is the same
      delay_by(10_000)
    end
  end
end

When you apply this logic to the code and replace sleep with Karafkas' automatic delay, things look drastically different:

Not only is the lag close to zero for non-paused partitions, but the processing is within the polling frequency (note the logarithmic scale):

Only the topic/partion that you want is being delayed without impacting the rest of the data you are working with! That is because you do not block polling. Instead, after the remaining four partitions work is done, you immediately request more data to work with.

Limitations of pausing and resuming

While this feature provides a valuable way to postpone the messages processing, it does have some limitations to look out for.

One significant limitation is precision. It is not always millisecond-exact. This is because the Delayed Topics feature works by pausing a given partition for a specified amount of time and then unpausing it after that time has elapsed. However, the unpausing happens before the polling happens, so there can be a slight delay between when the partition is unpaused and when the delayed message is processed.

This limitation also means that the age of the messages may be slightly higher than the required minimum but will never be less than expected.

Below, you can see the "ten seconds + polling" histogram. While the theoretical max is equal to 10 seconds + max wait time, most of the time, we're close to 10 seconds + 10% of max wait time.

Summary

Karafka's Delayed Topics is a powerful feature that allows for arbitrary delays when working with messages from specific topics. It can be used in various use cases, such as e-commerce, social media moderation, and finance. By delaying message processing, you can perform additional processing or validation, moderate user-generated content, and introduce a retry mechanism for failed messages.

Building a complex and reliable open-source is neither easy nor fast, so Karafka Pro exists. It has many functionalities, including the one described in this blog post, that can help you build robust and performant event-driven applications.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

Copyright © 2023 Closer to Code

Theme by Anders NorenUp ↑