Page 6 of 165

Delaying Kafka Messages Processing with Karafka: A Deep Dive into Delayed Topics

Kafka is a popular distributed streaming platform that is commonly used for building real-time data pipelines and streaming applications. One of the core features of Kafka is its ability to handle high-volume, real-time data streams and reliably process and distribute them to multiple consumers. However, in some cases, it may be necessary to postpone the processing of certain messages for many reasons.

This is where the Karafka's Delayed Topics feature comes in. This is a powerful mechanism that allows you to delay the consumption of messages for a later time, giving you greater control over your message processing and the ability to optimize your application's performance. In this article, we will take a deep dive into the Karafka's Delayed Topics feature and explore how it can be used to delay message processing in the Kafka-based applications.

The Power of Patience: Reasons to Postpone Your Kafka Messages Processing

Usually, you want to get the data as quickly as possible to ensure that your application can respond in real time. However, there are some situations where postponing messages processing can benefit your system. For example:

  • If you are implementing the retry logic, processing backoff can increase the chances of successful processing and reduce the number of failed messages that are sent to a dead letter queue. In this case, delaying processing can reduce the number of retries and minimize the impact of failed messages on your system.

  • Delaying the processing of data sent to a Kafka dead letter queue can reduce the number of retries and minimize the impact of failed messages on your system. By introducing the processing lag, you can make time for your system to recover and address any issues that may have caused the message to fail. This can be particularly useful if you are experiencing network latency, hardware issues, or other transient errors.

  • Delaying processing can give you more time to analyze the failed messages and take corrective actions. Collecting the failed messages in a dead-letter queue allows you to examine them in more detail to identify patterns or common issues. You can also use this information to improve your system and reduce the likelihood of similar failures in the future.

  • Processing lag can also be helpful in data crawling applications, where the immediately published data may not be immediately available due to HTTP caches. In such cases, it may be beneficial to always wait for the processing of messages for a fixed period to ensure that all the caches have expired and the data is fully available. By delaying the processing, you can avoid working with incomplete or stale data and ensure that your application works with the latest information.

Sleeping on the Job: A Problem with Kafka Delayed Processing

One common way developers delay the processing of Kafka messages is by using a sleep function to pause processing for a fixed period. While this approach is simple and easy to implement, it could be more efficient for a few reasons.

def consume
  messages.each do |message|
    time_diff = Time.now - message.timestamp

    if time_diff < 5.minutes
      sleep(5.minutes - time_diff)
      redo
    end

    puts "Message key: #{message.key}, value: #{message.value}"
  end
end

Using the sleep function can lead to unnecessarily long execution times. This can result in delays that impact the overall performance of your application. On top of that, Kafka may decide, that your process has hanged and may remove you out of the consumer group, creating unnecessery rebalances.

Using the sleep function can also result in idle threads, which can waste system resources and reduce the overall efficiency of your application. Idle threads can also prevent other messages from being processed, which can cause delays and bottlenecks in your system.

Below, you can find an example of how sleeping on partition 0 for ten seconds can impact Karafka's ability to process data from other topics/partitions*. In this scenario, we are producing and consuming messages simultaneously: 1 record per millisecond and using five partitions.

*Karafka can be configured in an optimized way to mitigate this, however, it was not done to illustrate the challenges related to sleeping

As you can see, we're immediately experiencing a lag on all of the partitions, and it is equal to the number of messages we produced per partition (10 000 messages for 10 seconds distributed to 5 partitions => 2000 messages per partition).

Below, you can also see that the age of messages aligns with the lag, effectively delaying all of our processing:

By default, Karafka and other Kafka consumers poll data in batches, process the data and then poll more. Only when all the work is finished for all the topics/partitions is more data polled. This is why you are seeing an increased lag for all the partitions, despite most of them not doing any work.

In general, it is more efficient and optimal to use a dedicated delay mechanism that is designed for Kafka message processing, such as the one built into Karafka. This approach can help you to optimize resource utilization, reduce processing delays, and ensure that your application remains performant and responsive.

Waking Up to a Better Solution for Kafka Delayed Processing

Rather than using sleep to delay consumption, a more effective approach is to pause partitions until the expected time has passed. While this can be more complex than sleeping, it provides a more optimal way of controlling the message processing delays. By pausing the partition, you can ensure that your application does not waste the system resources.

However, implementing the partition pausing correctly can be challenging, requiring considering things like rebalances and revocations. These events can cause the partition to be reassigned to a different consumer, impacting the timing of your processing. Additionally, if you pause for too long, you risk causing the delays that affect the performance of your application.

Fortunately, with Karafka, you can quickly implement this mechanism without worrying about these complexities. The Delayed Topics feature works seamlessly with other Karafka components, such as the Virtual Partitions and Long-Running Jobs, to provide a robust and efficient way of managing the processing delays. By leveraging this built-in mechanism, you can ensure that your application remains performant and responsive.

All that is needed for it to work is the delay_by routing definition for a given topic:

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    topic :orders do
      consumer OrdersConsumer
      # Always delay processing messages from the orders by 10 seconds
      # Note: the code for delaying single partition is slightly different, but the outcome is the same
      delay_by(10_000)
    end
  end
end

When you apply this logic to the code and replace sleep with Karafkas' automatic delay, things look drastically different:

Not only is the lag close to zero for non-paused partitions, but the processing is within the polling frequency (note the logarithmic scale):

Only the topic/partion that you want is being delayed without impacting the rest of the data you are working with! That is because you do not block polling. Instead, after the remaining four partitions work is done, you immediately request more data to work with.

Limitations of pausing and resuming

While this feature provides a valuable way to postpone the messages processing, it does have some limitations to look out for.

One significant limitation is precision. It is not always millisecond-exact. This is because the Delayed Topics feature works by pausing a given partition for a specified amount of time and then unpausing it after that time has elapsed. However, the unpausing happens before the polling happens, so there can be a slight delay between when the partition is unpaused and when the delayed message is processed.

This limitation also means that the age of the messages may be slightly higher than the required minimum but will never be less than expected.

Below, you can see the "ten seconds + polling" histogram. While the theoretical max is equal to 10 seconds + max wait time, most of the time, we're close to 10 seconds + 10% of max wait time.

Summary

Karafka's Delayed Topics is a powerful feature that allows for arbitrary delays when working with messages from specific topics. It can be used in various use cases, such as e-commerce, social media moderation, and finance. By delaying message processing, you can perform additional processing or validation, moderate user-generated content, and introduce a retry mechanism for failed messages.

Building a complex and reliable open-source is neither easy nor fast, so Karafka Pro exists. It has many functionalities, including the one described in this blog post, that can help you build robust and performant event-driven applications.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

Kafka topics as code – declarative Kafka topics management in Ruby

Kafka topics are a fundamental concept in Apache Kafka. Topics are logical names or labels representing a stream of messages that Kafka clients can produce and consume.

What makes them interesting is the variety of settings that can be applied to them. These settings, amongst others include:

  • Partition count: The number of partitions that a topic should be split into.
  • Replication factor: The number of replicas that should be maintained for each partition.
  • Retention period: The time that messages should be retained in the topic.
  • Minimum and maximum in-sync replicas: The minimum number of replicas that must be in sync before a producer can receive acknowledgment for a write operation.
  • Cleanup policy: The policy used for deleting old messages from the topic.

When looking from a management perspective, topics are similar to database tables. They have names, a set of settings that apply to them, and their constraints. And on top of all of that, they need to be managed.

Declarative topics management

The management approach that I like and support in Karafka is called Declarative Topics Management. It allows for automatic topic creation and configuration based on predefined rules. It is a way to automate the process of managing Kafka topics by defining the desired topic properties and configurations in a declarative manner rather than manually creating and managing topics.

With Declarative Topics Management, you can define a set of rules that specify how topics should be created and configured. These rules can be based on various factors, such as the topic's name, number of partitions, replication factor, retention policy, and more.

Example of a declarative repartitioning using karafka topics migrate command

Keeping Kafka topics configuration as code has several benefits:

  • Version Control: By keeping the topic settings as code, you can track changes over time and easily understand historical changes related to the topics. This is particularly important in a production environment where changes need to be carefully managed.

  • Reproducibility: When you define Kafka topics settings as code, you can easily recreate the same topic with the same settings in multiple environments. This ensures that your development, staging, and production environments are consistent, which can help prevent unexpected issues and bugs.

  • Automation: If you use code to define Kafka topics settings, you can automate the process of creating and updating topics. This can save time and reduce the risk of human error.

  • Collaboration: When you keep Kafka topics settings as code, you can collaborate with other developers on the configuration. You can use tools like Git to manage changes and merge different configurations.

  • Documentation: Code is self-documenting, meaning anyone can look at the configuration and understand what is happening. This can make it easier for new team members to get up to speed and help troubleshoot issues.

In-app topics management

There are many ways to manage declaratively Kafka topics. For complex systems, you may want to look into tools like topicctl.

Often, however, your setup won't be overcomplicated. The primary thing that needs to happen is to ensure that all of your environments and developers use topics with the same configuration.

Partition count is a simple example where a config difference can impact the business logic and create hard-to-track issues. By default, topics created automatically by Kafka always have one partition. Assume a developer is working on something that requires strong ordering. If his development and test environments operate on only one partition, problems emerging from invalid partition key selection may only occur once the code hits production. Those types of race conditions can be both critical and hard to detect.

To mitigate risks of that nature, Karafka ships with a Declarative Topics feature. This feature lets you describe your topics' configuration in your routing, ensuring that the set of settings is the same across all the managed environments.

Defining topic configuration

All the configuration for a given topic needs to be defined using the topic scope #config method:

class KarafkaApp < Karafka::App
  routes.draw do
    topic :events do
      config(
        partitions: 6,
        replication_factor: Rails.env.production? ? 3 : 1,
        'retention.ms': 86_400_000 # 1 day in ms,
        'cleanup.policy': 'delete'
      )

      consumer EventsConsumer
    end
  end
end

Such a configuration can be then applied by running the following command: bundle exec karafka topics migrate. This command will create the topic if missing or repartition in case there are not enough partitions.

Karafka ships with following topics management related commands:

  • karafka topics create - creates topics with appropriate settings.
  • karafka topics delete - deletes all the topics defined in the routes.
  • karafka topics repartition - adds additional partitions to topics with fewer partitions than expected.
  • karafka topics reset - deletes and re-creates all the topics.
  • karafka topics migrate - creates missing topics and repartitions existing to match expected partitions count.

The below example illustrates the usage of the migrate command to align the number of partitions and to add one additional topic:

Limitations

This API has few limitations about which you can read here. There are two primary things you need to keep in mind:

  • Topics management API does not provide any means of concurrency locking when CLI commands are being executed. This means it is up to you to ensure that two topic CLI commands are not running in parallel during the deployments.
  • Karafka currently does not update settings different than the partition count on existing topics. This feature is under development.

Summary

Karafka declarative topics management API is an excellent solution for low and medium-complexity systems to ensure consistency of their topics across multiple environments, and that is available out-of-the-box with the framework itself.

Getting started with Kafka and Karafka

If you want to get started with Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example Rails app repository.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑