Tag: karafka framework

Karafka framework 2.1 announcement

I'm happy to announce that Karafka 2.1 has just been released.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework.

The Karafka 2.1 release builds upon the foundation set by its predecessor, 2.0, making it a seamless continuation rather than a major rewrite. This means that upgrading from version 2.0 to 2.1 can be done without extensive work or significant modifications to existing codebases. With Karafka 2.1, you can expect improved features and enhancements while maintaining the stability and compatibility you have come to rely on.

Note: There are no extensive upgrade notes, and you only need to follow those guidelines.

Noticeable features and improvements

Virtual Offset Management for Virtual Partitions

Virtual Partitions allow you to parallelize the processing of data from a single partition. This can drastically increase throughput when IO operations are involved.

While the default scaling strategy for Kafka consumers is to increase partitions count and number of consumers, in many cases, this will not provide you with desired effects. In the end, you cannot go with this strategy beyond assigning one process per single topic partition. That means that without a way to parallelize the work further, IO may become your biggest bottleneck.

Virtual Partitions solve this problem by providing you with the means to further parallelize work by creating "virtual" partitions that will operate independently but will, as a collective processing unit, obey all the Kafka warranties.

Up until now, when utilizing Virtual Partitions feature, the offset management was entirely collective. This meant that if any error occurred within any virtual partition during message processing, the entire set of virtual partitions from the starting offset would need to be processed again.

However, Karafka 2.1 introduces the concept of Virtual Offset Management, which enhances the previous offset management mechanism in several ways. When Karafka consumes messages using Virtual Partitions, it leverages Virtual Offset Management, which is built on top of the existing offset management mechanism. This feature allows for more granular and precise handling of offsets within each virtual partition.

While each of the Virtual Partitions operates independently, they are bound together to a single Kafka Partition. Karafka transforms the knowledge of messages marked as consumed in each virtual partition into a Kafka offset that can be committed. This process involves computing the highest possible offset by considering all the messages marked as consumed from all the virtual partitions. By analyzing the offsets across virtual partitions, Karafka can determine the maximum offset reached, allowing for an accurate and reliable offset commit to Kafka. This ensures that the state of consumption is properly synchronized and maintained.

Whenever you mark_as_consumed when using Virtual Partitions, Karafka will ensure that Kafka receives the highest possible continuous offset matching the underlying partition.

Below you can find a few examples of how Karafka transforms messages marked as consumed in virtual partitions into an appropriate offset that can be committed to Kafka.

With Virtual Offset Management, Karafka keeps track of each virtual partition's offset separately. In case of any error occurring within a specific virtual partition, only that particular partition will be processed again from the point of the error.

This improvement significantly enhances the efficiency and reliability of message processing when working with Virtual Partitions. It minimizes redundant processing by isolating errors to the affected virtual partition, thereby reducing the overall processing time and resource consumption.

Below you can find a visualization of data re-processing from a single topic partition distributed across three virtual partitions. Karafka knows which of the messages were not processed successfully and will re-process only those when retrying.

CurrentAttributes support in ActiveJob

The Karafka ActiveJob adapter has been updated to support the Ruby on Rails CurrentAttributes feature. If you want to use it, you need to put this in your karafka.rb config file (or initializer):

require 'karafka/active_job/current_attributes'
Karafka::ActiveJob::CurrentAttributes.persist('YourCurrentAttributesClass')
# or multiple current attributes
Karafka::ActiveJob::CurrentAttributes.persist('YourCurrentAttributesClass', 'AnotherCurrentAttributesClass')

When you set your current attributes and create a background job, it will execute with them set.

class Current < ActiveSupport::CurrentAttributes
  attribute :user_id
end

class Job < ActiveJob::Base
  def perform
    puts 'user_id: #{Current.user_id}'
  end
end

Karafka::ActiveJob::CurrentAttributes.persist('Current')
Current.user_id = 1
Job.perform_later # the job will output "user_id: 1"

Karafka handles CurrentAttributes by including them as part of the job serialization process before pushing them to Kafka. These attributes are then deserialized by the ActiveJob consumer and set back in your CurrentAttributes classes before executing the job.

This approach is based on Sidekiq's approach to persisting current attributes: Sidekiq and Request-Specific Context.

Kubernetes Liveness support

I'm excited to share that Karafka 2.1 has introduced a new feature that will significantly enhance the reliability and stability of your Karafka server processes. With the addition of an out-of-the-box Kubernetes Liveness Listener, Karafka now allows for seamless implementation of liveness checks within your Kubernetes environment. But why is checking process liveness so important?

Liveness checks are critical for ensuring that a process runs as expected and actively consumes data. By enabling the Kubernetes Liveness Listener in Karafka 2.1, you can easily configure liveness checks without extra effort. This means that Kubernetes will automatically monitor the health of your Karafka server process, periodically sending requests to verify its liveness.

However, sometimes a process may appear active, yet it can actually be stuck on user logic. This situation can be challenging to detect without proper instrumentation. While the process might respond to system-level signals, it could be unresponsive within its user logic or certain parts of the codebase. These issues, often called "liveness bugs," can lead to degraded performance, data inconsistencies, or even complete service disruptions.

With the Kubernetes Liveness Listener in Karafka 2.1, you can proactively detect such liveness bugs. By regularly checking the health of your Karafka server process, Kubernetes will be able to identify situations where the process is unresponsive, even if it appears active from a system-level perspective. This enables you to take timely actions, such as restarting the process or triggering alerts for investigation, ensuring the overall stability and reliability of your Karafka applications.

Subscribe the Kubernetes listener within your Ruby code:

require 'karafka/instrumentation/vendors/kubernetes/liveness_listener'

listener = ::Karafka::Instrumentation::Vendors::Kubernetes::LivenessListener.new(
  port: 3000,
  # Make sure polling happens at least once every 5 minutes
  polling_ttl: 300_000,
  # Make sure that consuming does not hang and does not take more than 1 minute
  consuming_ttl: 60_000
)

Karafka.monitor.subscribe(listener)

And add a liveness probe to your Karafka deployment spec:

livenessProbe:
  httpGet:
    path: /
    port: 3000
  initialDelaySeconds: 30
  periodSeconds: 10
  timeoutSeconds: 5

Upgrade Notes

No significant changes are needed. Just follow the changelog-based upgrade notes.

Karafka Pro

Karafka Pro has many valuable, well-documented, well-tested functionalities that can significantly improve your day-to-day operations with Kafka in Ruby. It also introduces commercial support, as due to a sheer number of questions and requests, I do need to have a way to prioritize those.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

References


Stay tuned and don't forget to join our Slack channel.

Kafka topics as code – declarative Kafka topics management in Ruby

Kafka topics are a fundamental concept in Apache Kafka. Topics are logical names or labels representing a stream of messages that Kafka clients can produce and consume.

What makes them interesting is the variety of settings that can be applied to them. These settings, amongst others include:

  • Partition count: The number of partitions that a topic should be split into.
  • Replication factor: The number of replicas that should be maintained for each partition.
  • Retention period: The time that messages should be retained in the topic.
  • Minimum and maximum in-sync replicas: The minimum number of replicas that must be in sync before a producer can receive acknowledgment for a write operation.
  • Cleanup policy: The policy used for deleting old messages from the topic.

When looking from a management perspective, topics are similar to database tables. They have names, a set of settings that apply to them, and their constraints. And on top of all of that, they need to be managed.

Declarative topics management

The management approach that I like and support in Karafka is called Declarative Topics Management. It allows for automatic topic creation and configuration based on predefined rules. It is a way to automate the process of managing Kafka topics by defining the desired topic properties and configurations in a declarative manner rather than manually creating and managing topics.

With Declarative Topics Management, you can define a set of rules that specify how topics should be created and configured. These rules can be based on various factors, such as the topic's name, number of partitions, replication factor, retention policy, and more.

Example of a declarative repartitioning using karafka topics migrate command

Keeping Kafka topics configuration as code has several benefits:

  • Version Control: By keeping the topic settings as code, you can track changes over time and easily understand historical changes related to the topics. This is particularly important in a production environment where changes need to be carefully managed.

  • Reproducibility: When you define Kafka topics settings as code, you can easily recreate the same topic with the same settings in multiple environments. This ensures that your development, staging, and production environments are consistent, which can help prevent unexpected issues and bugs.

  • Automation: If you use code to define Kafka topics settings, you can automate the process of creating and updating topics. This can save time and reduce the risk of human error.

  • Collaboration: When you keep Kafka topics settings as code, you can collaborate with other developers on the configuration. You can use tools like Git to manage changes and merge different configurations.

  • Documentation: Code is self-documenting, meaning anyone can look at the configuration and understand what is happening. This can make it easier for new team members to get up to speed and help troubleshoot issues.

In-app topics management

There are many ways to manage declaratively Kafka topics. For complex systems, you may want to look into tools like topicctl.

Often, however, your setup won't be overcomplicated. The primary thing that needs to happen is to ensure that all of your environments and developers use topics with the same configuration.

Partition count is a simple example where a config difference can impact the business logic and create hard-to-track issues. By default, topics created automatically by Kafka always have one partition. Assume a developer is working on something that requires strong ordering. If his development and test environments operate on only one partition, problems emerging from invalid partition key selection may only occur once the code hits production. Those types of race conditions can be both critical and hard to detect.

To mitigate risks of that nature, Karafka ships with a Declarative Topics feature. This feature lets you describe your topics' configuration in your routing, ensuring that the set of settings is the same across all the managed environments.

Defining topic configuration

All the configuration for a given topic needs to be defined using the topic scope #config method:

class KarafkaApp < Karafka::App
  routes.draw do
    topic :events do
      config(
        partitions: 6,
        replication_factor: Rails.env.production? ? 3 : 1,
        'retention.ms': 86_400_000 # 1 day in ms,
        'cleanup.policy': 'delete'
      )

      consumer EventsConsumer
    end
  end
end

Such a configuration can be then applied by running the following command: bundle exec karafka topics migrate. This command will create the topic if missing or repartition in case there are not enough partitions.

Karafka ships with following topics management related commands:

  • karafka topics create - creates topics with appropriate settings.
  • karafka topics delete - deletes all the topics defined in the routes.
  • karafka topics repartition - adds additional partitions to topics with fewer partitions than expected.
  • karafka topics reset - deletes and re-creates all the topics.
  • karafka topics migrate - creates missing topics and repartitions existing to match expected partitions count.

The below example illustrates the usage of the migrate command to align the number of partitions and to add one additional topic:

Limitations

This API has few limitations about which you can read here. There are two primary things you need to keep in mind:

  • Topics management API does not provide any means of concurrency locking when CLI commands are being executed. This means it is up to you to ensure that two topic CLI commands are not running in parallel during the deployments.
  • Karafka currently does not update settings different than the partition count on existing topics. This feature is under development.

Summary

Karafka declarative topics management API is an excellent solution for low and medium-complexity systems to ensure consistency of their topics across multiple environments, and that is available out-of-the-box with the framework itself.

Getting started with Kafka and Karafka

If you want to get started with Karafka as fast as possible, then the best idea is to visit our Getting started guides and the example Rails app repository.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑