Tag: karafka

The Art of Forking: Unlocking Scalability in Ruby

Introduction

The journey towards efficient parallelization in library development has often been based on using threads. As Karafka celebrates its eighth anniversary, it's become clear that while threads have served us well for many tasks, there's room to explore further. That's why I've decided to introduce forking capabilities into Karafka, aiming to offer another dimension of parallelization to its users. This move isn't about replacing threads but about providing options to cover a broader spectrum of use cases than before.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework designed for building efficient and scalable message processing applications.

Objectives and Scope

This article isn't a deep dive into every aspect of Ruby's parallelism and concurrency. Instead, it's focused on illustrating how forking, as a specific capability, can be woven into the fabric of Ruby applications, with Karafka as our case study. The goal is to outline what it takes to integrate forking effectively - ensuring it's stable, robust, and ready for production environments.

While forking offers potent possibilities for the performance and scalability of Ruby applications, it comes with its challenges. This topic easily deserves a whole chapter in a book about Ruby; hence, please remember that I picked only the most relevant things in this article to paint a general picture of the subject.

Acknowledgements

A special thank you goes out to KJ Tsanaktsidis, a member of the Ruby core team. His deep knowledge, insights, and willingness to help have been invaluable as I navigated the complexities of adding forking capabilities to Karafka. His help is living proof of the spirit of MINASWAN.

Concurrency and Parallelism in Ruby

Before we dive deeper into Karafka Swarm details and code-base, here is a short introduction to Ruby concurrency for all the people not deeply involved in any of those matters.

Ruby's model for handling parallelism and concurrency is robust, offering developers multiple ways to execute tasks simultaneously or concurrently. It can, however, also be challenging. This flexibility is critical to optimizing application performance and efficiency. Among the tools Ruby provides are processes, threads, and fibers, each with distinct characteristics and use cases. Additionally, Ruby has introduced more advanced features like auto-fibers and a fiber scheduler to enhance concurrency management further.

Note: Ractors were skipped as they are not entirely usable at the moment.

Processes

Processes in Ruby are separate instances of running programs, each with its own allocated memory space. This isolation guarantees that processes do not interfere with each other, making them a reliable choice for parallel tasks. However, this comes at a higher cost of resource usage than threads and fibers.

# Fork a new process
child_pid = fork do
  # This block is executed in the child process
  puts "Child Process: PID=#{Process.pid}"
  # Child process does some work
  sleep 1 # Simulate some work by sleeping for 1 second
end

# This code is executed only in the parent process
puts "Parent Process: PID=#{Process.pid}, Child PID=#{child_pid}"

# The parent process waits for the child process to exit
Process.wait(child_pid)

puts "Child process #{child_pid} has finished."

Threads

Threads offer a way to perform concurrent operations within the same application instance, sharing the same memory space. While this makes data exchange between threads straightforward, it also introduces the need for careful synchronization to prevent issues like race conditions. Threads in Ruby are subject to the Global Interpreter Lock (GIL), which we'll discuss next.

# An array to hold the threads
threads = []

# Create 5 threads
5.times do |i|
  threads << Thread.new do
    sleep_time = rand(1..3)
    puts "Thread #{i+1}: Sleeping for #{sleep_time} seconds..."
    sleep(sleep_time)
    puts "Thread #{i+1}: Woke up!"
  end
end

# Wait for all threads to complete
threads.each(&:join)

puts "All threads have completed."

Fibers, Auto-Fibers, and Fiber Scheduler

Fibers are lightweight programming constructs that allow for more granular control over program execution. They enable cooperative multitasking within a single thread, where the developer manually controls when a fiber is paused or resumed. This provides a flexible way to handle tasks that can be interrupted or need to yield control frequently.

# Define a fiber to print numbers
numbers_fiber = Fiber.new do
  (1..3).each do |number|
    puts "Number: #{number}"
    Fiber.yield
  end
end

# Define a fiber to print letters
letters_fiber = Fiber.new do
  ('A'..'C').each do |letter|
    puts "Letter: #{letter}"
    Fiber.yield
  end
end

# Alternate between the two fibers
while numbers_fiber.alive? || letters_fiber.alive?
  numbers_fiber.resume
  letters_fiber.resume
end

puts "Both fibers have finished their execution."

Ruby has introduced auto-fibers and the fiber scheduler, building on the concept of fibers. Auto-fibers automate the management of fibers, enabling asynchronous execution patterns that are simpler to implement and reason about. This is particularly useful for non-blocking I/O operations, where the Ruby runtime can automatically switch contexts instead of blocking the current thread, improving overall application throughput.

The fiber scheduler complements auto-fibers by providing a hook into Ruby's event loop, allowing developers to define custom scheduling logic. This is a powerful feature for those who need to integrate with external event loops or optimize their concurrency model for specific performance characteristics. Together, auto-fibers and the fiber scheduler significantly enhance Ruby's concurrency toolkit, offering developers sophisticated mechanisms for writing efficient, non-blocking code.

GIL (Global Interpreter Lock)

The GIL is a mechanism in Ruby designed to prevent multiple threads from executing Ruby code simultaneously, thereby protecting against concurrent access to Ruby's internal structures. While it simplifies thread safety, the GIL can limit the effectiveness of multi-threaded programs on multi-core processors, particularly for CPU-bound tasks. However, for I/O-bound tasks, Ruby threads can still offer significant performance improvements.

Below, you can find a simple example that attempts to perform CPU-bound operations using threads. The GIL ensures that only one thread can execute Ruby code at a time, which means CPU-bound operations won't see a significant performance improvement when run in parallel threads, unlike I/O-bound operations.

require 'benchmark'

def fib(n)
  n <= 2 ? 1 : fib(n - 1) + fib(n - 2)
end

# Measure the execution time of two threads performing CPU-bound tasks
execution_time = Benchmark.measure do
  thread1 = Thread.new { fib(35) }
  thread2 = Thread.new { fib(35) }

  thread1.join
  thread2.join
end

puts "Execution time with GIL: #{execution_time.real} seconds"

Multi-Process Communication API selection

To support a swarm of processes, one must figure out how they can be controlled and managed. Managing processes and ensuring their smooth operation in the Linux ecosystem is fundamental to system administration and application development. However, traditional process management relies heavily on process identifiers (PIDs) and has limitations and challenges. One such challenge is PID reuse, where after a process terminates, its PID can be reassigned to a new process. This behavior can lead to issues where actions intended for one process mistakenly affect another. To address these concerns and enhance process management capabilities, Linux introduced the concept of pidfd.

What is pidfd?

pidfd stands for PID file descriptor. A mechanism introduced in Linux 5.3 provides a more stable and reliable way to reference and manage processes. Unlike traditional PIDs, which the system can reuse, a pidfd is a unique file descriptor for a specific process instance. This means that as long as you hold the pidfd, it uniquely identifies the process, eliminating the risks associated with PID reuse.

The introduction of pidfd was motivated by the need to safely manage long-lived processes and perform operations without the risk of affecting unintended processes due to PID reuse. This is especially important in environments with high process churn, where PIDs can quickly be recycled.

The Problem with PIDs

Before pidfd, processes were managed and signaled using their PIDs. However, due to the finite and recyclable nature of PIDs, two major issues arose:

  1. PID Reuse: Once a process exits, its PID can be reassigned to a new process. A program storing PIDs for later use could mistakenly signal a completely unrelated process.

  2. Race Conditions: When a PID is checked and an action is taken (like sending a signal), the original process could terminate and the PID reassigned, leading to unintended consequences.

These issues necessitated a more stable reference to processes, leading to the development of pidfd.

Below is a theoretical case demonstrating how a Ruby script uses signals to communicate with processes identified by PIDs. This example highlights the risks associated with PID reuse and race conditions, where a signal intended for a specific process might inadvertently affect another process if the original PID has been reassigned.

# Fork a new process
child_pid = fork do
  # Child process will sleep for 5 seconds
  sleep 5
end

# Parent process waits for a moment to ensure the child process starts
sleep 1

# Send a "SIGUSR1" signal to the child process
puts "Sending SIGUSR1 to child process #{child_pid}"
Process.kill("SIGUSR1", child_pid)

# Wait for the child process to exit
Process.wait(child_pid)

# Now let's simulate PID reuse by forking another process that might reuse the same PID
another_child_pid = fork do
  # This process does something else
  sleep 5
end

# Assuming the original child PID got reused (simulating PID reuse)
# Here we try to signal the original child process again, not knowing it's a different process now
puts "Attempting to send SIGUSR1 to original child PID (now potentially reused): #{child_pid}"
begin
  Process.kill("SIGUSR1", child_pid)
rescue Errno::ESRCH
  puts "Process with PID #{child_pid} does not exist anymore."
end

Ruby and PidFd

Ruby's process management capabilities, while robust, traditionally revolve around PIDs. Ruby allows sending signals to processes using their PIDs but does not provide built-in APIs for pidfd operations. This gap means that Ruby applications can only directly leverage the benefits of pidfd with additional mechanisms.

I implemented a pidfd layer using Ruby's Foreign Function Interface (FFI) to bridge this gap. FFI is a way to call C functions and use C data structures from Ruby, enabling direct interaction with the lower-level system APIs that support pidfd. This implementation was an exciting challenge, as I don't often need to dive deep into Linux's signal tables and syscalls.

This implementation will receive its own blog post, and for now, all you need to know about it is the fact that Karafka ships with a relatively simple API comprised of only three methods and an initializer:

pid = fork { sleep }

# Fetch from Linux the pidfd of the child (can be any other process)
pidfd = Karafka::Swarm::Pidfd.new(pid)

# Check if given process is alive
pidfd.alive? #=> true
# Kill it
pidfd.signal('TERM')
# Collect it so there is no zombie process
pidfd.cleanup
# Check again and see that it is dead
pidfd.alive? #=> false

Karafka Swarm: A Perfect Match for Scalability

In my experience with Karafka, it's clear that while most user workloads are I/O-bound, involving operations like DB storage or cache updates, a significant portion - about 20% - are CPU-intensive. These tasks, involving heavy deserialization and computations, didn't fit with Karafka's multi-threaded model, which is more suited for I/O-bound tasks. Users often had to run multiple independent processes for CPU-heavy workloads, leading to unnecessary memory overhead. Recognizing this inefficiency, I decided to do something with it.

Swarm Architecture

When starting a project like this one, it is good to have an initial idea of what you want to achieve. Karafka is a critical component of many businesses, so the solution had to be robust and stable. Here are a few of the things that need to be taken into consideration when deciding on the architecture of such a solution:

  • Supervision Model
  • Zombie Processes
  • Orphaned Processes
  • Shutdown Procedure
  • Processes Communication
  • Memory Management
  • Load Balancing
  • Errors Handling
  • Signals Handling
  • Resources Cleanup

I've decided to pick an architecture that centers around a supervisor-worker model. At its core, the supervisor acts as the central command, orchestrating the execution of child node workers. These workers are responsible for parallel processing messages from Kafka topics, each operating in its own process space.

This design allows for a scalable and fault-tolerant system where the supervisor monitors and manages worker processes, ensuring that they perform optimally and restart them as necessary. By isolating work to individual processes, Swarm mitigates the risk of a single point of failure, enhancing the reliability of the application.

Challenges with Forking and librdkafka

Karafka relies under the hood on librdkafka - a C library implementation of the Apache Kafka protocol. A significant challenge in implementing the Swarm architecture is the inherent limitations of librdkafka regarding forking. librdkafka is not fork-safe. This limitation necessitates careful management of how and when processes are forked and how librdkafka is initialized and used within these processes.

To navigate these challenges, I decided to ensure that librdkafka instances are never pre-fork present. This involved initializing librdkafka only within the child processes after a fork, ensuring that no librdkafka objects or handles are shared across process boundaries. This approach maintains the integrity of the message processing pipeline, ensuring data consistency and system reliability.

Below, you can see an example code and how it behaves when rdkafka-ruby (the C binding layer that I also maintain) producer is being used from forks:

producer = Rdkafka::Config.new('bootstrap.servers': 'localhost:9092').producer
producer.produce(topic: 'a', payload: 'b')

fork do
  producer.produce(topic: 'a', payload: 'b')
end

Ruby VM will crash upon usage or sometimes even just presence of a librdkafka initialized entity in a fork.

Forking Strategies

That is why, initially, when I thought about adding swarm capabilities to Karafka, I thought about a relatively simple approach of forking nodes during the supervisor startup. This would save me from any resource management risk and allow me to use librdkafka from the supervisor post-fork.

However, I quickly realized this approach would not work in production in case of child-only incidents like VM crashes or critical errors. I had to develop a strategy that would allow me to control and manage processes during the whole time Karafka was supposed to run.

Supervision and Memory Leak Control

One of the challenges in managing a multi-process system is controlling memory leaks. While Karafka does not have known memory leaks, it can also integrate with applications that may have their issues. Karafka's supervisor monitors the memory usage of child processes to mitigate potential memory leaks, taking corrective action when usage patterns indicate a possible leak.

Here's the simplified code Karafka uses to monitor and report memory leaks to the supervisor. It compares the RSS with the configured max allowed, and if we go beyond it, it notifies the supervisor.

class LivenessListener
  # This method  is triggered every 5 seconds in each node
  def on_statistics_emitted(_event)
    # Skip if we are not a forked node
    return unless node

    # Fetch current process health status
    current_status = status

    # Report
    current_status.positive? ? node.unhealthy(current_status) : node.healthy
  end

  private

  def status
    return 3 if rss_mb > @memory_limit

    0 # This status means all good
  end

  def rss_mb
    kb_rss = 0

    IO.readlines("/proc/#{node.pid}/status").each do |line|
      next unless line.start_with?('VmRSS:')

      kb_rss = line.split[1].to_i

      break
    end

    (kb_rss / 1_024.to_f).round
  end
end

Processes Management

Karafka's Swarm architecture supervisor plays a critical role in managing child processes. It is responsible for monitoring the health of these processes, restarting them as needed, and ensuring that they are performing their tasks efficiently. The supervisor uses signals to communicate with child processes, managing their lifecycle from startup to shutdown.

Health checks are periodically conducted to ensure that each child's process is responsive, and messages are processed as expected. These checks are essential for maintaining the system's overall health, allowing the supervisor to take preemptive action to restart or replace workers who are not functioning correctly.

Each node is responsible for reporting its health periodically and indicating if its behavior deviates from the expected one configured by the developer.

The supervisor process uses signals to send control commands to child nodes, which allowed me to have unified control API whether using swarm or not. The child nodes use pipes to report their health status to the supervisor. This design choice leverages the strengths of both communication mechanisms appropriately for their respective tasks.

Why Pipes for Health Reporting?

  • Reliability and Order: Data transmitted through pipes is read in the order it was sent, ensuring accurate and consistent health monitoring.
  • Buffering: Pipes can buffer data, allowing child nodes to report health even when the supervisor is temporarily unable to read, preventing data loss and non-blocking operations.
  • Ease of Use: Ruby's abstraction over pipes simplifies their integration and use, allowing for straightforward data transmission without delving into complex IPC mechanisms.
  • Isolation and Safety: Separating control commands (via signals) from health data (via pipes) enhances system robustness by preventing interference between control and data flows.

Working with pipes has many benefits:

  • Pipes support structured and reliable communication, essential for detailed health reporting.
  • The buffering and non-blocking nature of pipes contribute to efficient system performance.
  • The ordered transmission ensures that health data integrity is maintained, aiding in precise system monitoring and decision-making.

This combination of signals for control and pipes for health reporting aligns with Karafka's design philosophy, ensuring efficient, reliable, and clear communication between the supervisor and child nodes.

Below, you can find an example of parent-child pipe-based communication.

# Create a pipe
reader, writer = IO.pipe

if fork
  # Parent process
  writer.close # Close the writing end in the parent, as we'll only read

  puts "Parent is waiting for a message from the child..."
  message_from_child = reader.read
  puts "Parent received a message: #{message_from_child}"

  Process.wait # Wait for the child process to exit
else
  # Child process
  reader.close # Close the reading end in the child, as we'll only write

  sleep 1 # Simulate some work
  puts "Child sending a message to the parent..."
  writer.puts "Hello from your child process!"

  writer.close # Close the writer to signal we're done sending
end

Since the supervisor receives reports, all it has to do is iterate over all the nodes, check them, and take appropriate actions if needed. While the whole code can be found in the Karafka repository, here's the most important part that I find rather self-descriptive:

def control
  @nodes.each do |node|
    if node.alive?
      next if terminate_if_hanging(node)
      next if stop_if_not_healthy(node)
      next if stop_if_not_responding(node)
    else
      next if cleanup_one(node)
      next if restart_after_timeout(node)
    end
  end
end

This code is executed in regular intervals, and each time, there is a system change to any of the child nodes. It ensures that whatever happens to any of the child nodes does not go unnoticed.

Glueing Things Together

In this article, I aimed to avoid delving into every nitty-gritty detail or pasting all the code snippets here. Instead, I focused on providing a high-level overview since the complete implementation details are readily available on GitHub for those interested in diving deeper. After integrating and refining all the necessary functionalities, I emerged with the following set of components:

  • Karafka::Swarm::Supervisor - Acts as the orchestrator that initiates and monitors forks through a monitoring system. It's responsible for the orderly shutdown of all processes, including itself. In the event of any node failure, it ensures the node is restarted.

  • Karafka::Swarm::Pidfd - This component encapsulates the Linux pidfd functionality within a Ruby wrapper, facilitating communication within the Swarm. It offers a more stable and resource-efficient alternative to traditional PID and PPID management combined with signal-based communication.

  • Karafka::Swarm::Node - Represents an individual forked process within the swarm, providing an API for managing forks and checking their status. While it serves slightly different purposes in the supervisor and the forked processes, its primary functions include facilitating information exchange with the supervisor and ensuring processes do not turn into zombies or become orphaned.

  • Karafka::Swarm::Manager - Similar to the thread manager but dedicated to managing processing nodes within the swarm. It oversees the initialization of nodes and monitors their behavior. If a node behaves unexpectedly, the manager attempts a graceful restart, escalating to forceful termination if necessary. Designed to operate within the supervisor.

  • Karafka::Swarm::LivenessListener - A monitoring component that periodically signals to the supervisor, ensuring it's aware that the system is responsive and not hanging. It also vigilantly checks if a node has become an orphan, terminating the process if necessary to maintain system integrity.

Overall, I think that the implementation I ended up with is quite compact and elegant, providing all the necessary components for robust and stable operations.

Future Directions

As the one behind Karafka, I often say that the framework is only about 30% complete in terms of my vision for its capabilities. I envision a vast landscape of features and improvements for this ecosystem, especially from a processing and data manipulation standpoint. Two key focus areas are the integration of ractors and the more innovative use of auto-fibers, each poised to enhance how Karafka handles data streams.

Summary

Ruby, while not the fastest language, offers a rich set of concurrency primitives that, when utilized effectively, can achieve impressive performance for both CPU and I/O-intensive tasks.

The ongoing development of my framework, alongside Ruby's evolving concurrency model, presents a promising landscape for developers aiming to achieve peak application performance. As the Ruby core team pushes the boundaries of what's possible with Ruby, I hope Karafka will be able to incorporate these advancements for the benefit of its users.

Karafka Framework 2.3 + Web UI 0.8 Release Announcement

Introduction

I'm happy to announce that Karafka 2.3 and its Web UI 0.8 have just been released.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework.

The Karafka 2.3 release builds upon the foundation set by its predecessor, 2.2, making it a seamless continuation rather than a major rewrite. This means upgrading from version 2.2 to 2.3 can be done without extensive work or significant modifications to the existing codebases.

These releases introduce many new features. Please note that I have described only the most significant changes and improvements to the ecosystem below.

Note: There are no extensive upgrade notes; you only need to follow those guidelines.

Karafka Noticeable Features And Improvements

Karafka 2.3 marks a significant step in the framework's evolution, driven by the community's valuable input and the collective need for a robust Kafka processing solution.

With each bigger update, the OSS and Pro versions of Karafka receive refinements, ensuring every user benefits from a more streamlined, robust, and adaptable framework.

Transactions And Exactly-Once Semantics

Transactions in Karafka provide a mechanism to ensure that a sequence of actions is treated as a single atomic unit. In the context of distributed systems and message processing, a transaction ensures that a series of produce and consume operations are either all successfully executed or none are, maintaining data integrity even in the face of system failures or crashes.

Starting with this release, Karafka supports Kafka's Exactly-Once Semantics, the gold standard for message processing systems. It ensures that each message is processed exactly once, eliminating data duplication or loss risks. In simpler terms, despite failures, retries, or other anomalies, each message will affect the system state only once.

class EventsConsumer < ApplicationConsumer
  def consume
    # Make sure, that messages are produced only together with marking as consumed
    transaction do
      sum = 0

      messages.each do |message|
        sum += message.payload[:count]
      end

      produce_async(topic: :sums, payload: sum.to_s)

      mark_as_consumed(messages.last)
    end
  end
end

Any exception or error raised within a transaction block will automatically result in the transaction being aborted. This ensures that if there are unexpected behaviors or issues during message production or processing, the entire batch of messages within that transaction won't be committed, preserving data consistency.

Dead Letter Queue Independent Recovery Mode

In standard operations, Karafka, while processing messages, does not make assumptions about the processing strategy employed by the user. Whether it’s individual message processing or batch operations, Karafka remains agnostic. This neutrality in the processing strategy becomes particularly relevant during the DLQ recovery phases.

Under normal circumstances, Karafka treats a batch of messages as a collective unit during DLQ recovery. This collective approach might not align with specific use cases with independent message processing. In such cases, the failure of one message does not necessarily imply a problem with the entire batch.

Karafka's DLQ independent flag introduces a nuanced approach to DLQ recovery, treating each message as an entity with its error counter.

To enable it, add independent: true to your DLQ topic definition:

class KarafkaApp < Karafka::App
  routes.draw do
    topic :orders_states do
      consumer OrdersStatesConsumer

      dead_letter_queue(
        topic: 'dead_messages',
        max_retries: 3,
        independent: true
      )
    end
  end
end

The following diagrams compare DLQ flows in Karafka: the first without the independent flag and the second with it enabled, demonstrating the operational differences between these two settings.

*The diagram shows DLQ retry behavior without the independent flag: each error in a batch adds to the error counter until the DLQ dispatch takes place on the last erroneous message.

*The diagram shows DLQ retry behavior with the independent flag active: the error counter resets after each message is successfully processed, avoiding DLQ dispatch if all messages recover.

Connection Multiplexing And Dynamic Scaling

Karafka 2.3 introduces enhanced multiplexing, allowing multiple connections per topic for parallel processing and increased throughput. Additionally, it introduces Dynamic Multiplexing, which optimizes resource utilization by automatically adjusting the number of connections based on partition assignments. This ensures balanced and efficient resource distribution, enhances system scalability, and maintains high performance despite fluctuating data volumes.

With this mode enabled, Karafka can automatically start and stop additional Kafka connections during runtime based on the topics and partition distribution.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    # Establish at most three connections and shut down two if not needed. Start with 2.
    subscription_group 'events' do
      multiplexing(min: 1, max: 3, boot: 2)

      topic :events do
        consumer EventsConsumer
      end
    end
  end
end

Multiplexing increases throughput and significantly enhances processing capabilities in scenarios with multi-partition lags. When a single process subscribes to multiple partitions, it can swiftly address lags in any of them, ensuring more consistent performance across your system. This advantage becomes particularly prominent in IO-intensive workloads where efficient data handling and processing are crucial.

*This example illustrates the performance difference for IO intense work, where the IO cost of processing a single message is 1ms and a total lag of 500 000 messages in five partitions.

Access Control Lists (ACLs) Admin Support

Apache Kafka ACLs (Access Control Lists) provide a robust mechanism to control permissions and access rights for Kafka resources. They are crucial for ensuring data security, managing consumer and producer interactions, and maintaining overall cluster integrity. Karafka extends these capabilities with a simplified, Ruby-friendly API:

acl = Karafka::Admin::Acl.new(
  resource_type: :topic,
  resource_name: 'my_topic',
  resource_pattern_type: :literal,
  principal: 'user:Bob',
  host: '*',
  operation: :write,
  permission_type: :allow
)

Karafka::Admin::Acl.create(acl)

Periodic Jobs

Periodic Jobs were designed to allow consumers to perform operations at regular intervals, even without new data. This capability is particularly useful for applications that require consistent action, such as window-based operations or maintaining system readiness.

class KarafkaApp < Karafka::App
  setup do |config|
    # ...
  end

  routes.draw do
    topic :events do
      consumer Consumer

      # Tick at most once every five seconds
      periodic true
    end
  end
end
class Consumer < Karafka::BaseConsumer
  def consume; end

  def tick
   puts "Look, mom, I'm periodic!"
  end
end

Karafka's Periodic Jobs handle tasks based on timing intervals and let you schedule specific tasks, like setting alarms or reminders. This means you can set up tasks that run regularly, just like cron jobs. This feature is great for organizing your work and ensuring everything runs smoothly and on time. In a later article, I'll talk more about how to use this for setting up scheduled tasks.

Offset Metadata Storage

Offset Metadata Storage allows adding metadata to offsets. At its core, it enables developers to attach custom metadata to message offsets when committed to the Kafka broker. This metadata, essentially a form of annotation or additional data, can be retrieved and used for many purposes, enhancing message processing systems' capability, traceability, and intelligence.

In traditional Kafka consumption, a message's offset indicates its position within a partition. While this is crucial for ensuring messages are processed in order, and no message is missed or duplicated, the standard offset mechanism doesn't provide context or additional information about the processing state or the nature of the message. Offset Metadata Storage fills this gap by allowing developers to store custom, context-rich data alongside these offsets.

All you need to use is a second argument when you mark it as consumed:

def consume
  messages.each do |message|
    EventsStore.call(message)
    @aggregator.mark(message)

    mark_as_consumed(
      message,
      # Make sure that this argument is a string and in case of a JSON, do not
      # forget to define a custom deserializer
      {
        process_id: Process.uid,
        aggregated_state: @aggregator.to_h, 
      }.to_json
    )
  end
end

And then, to retrieve the offset metadata, you can use the #offset_metadata method within your consumer. This method fetches the offset metadata and deserializes it using the configured deserializer.

def consume
  # Use offset metadata only on the first run on the consumer
  unless @recovered
    @recovered = true

    metadata = offset_metadata

    # Do nothing if `#offset_metadata` was false. It means we have lost the assignment
    return unless metadata

    # Use the metadata from the previous process to recover the internal state
    @aggregator.recover(
      metadata.fetch('aggregated_state')
    )
  end

  # Rest of the processing here...
end

Web UI Noticeable Features And Improvements

The latest Web UI 0.8 release enhances OSS and the paid Pro version. My goal was always clear: provide a strong OSS foundation that most users can rely on, allowing their projects to grow naturally before considering the jump to Pro. This approach ensures that everyone, regardless of their version, has access to a robust and efficient toolset from the get-go.

This update brings more than just Pro improvements; it significantly enriches the OSS experience. Thanks to the support from our Pro users, features that were once exclusive to Pro have transitioned to OSS. This move elevates the quality of the developer experience within the OSS ecosystem and reinforces the commitment to a user-friendly and inclusive Kafka ecosystem for all.

Full Errors Info Promoted to OSS

Full Error Backtrace visibility is now available in OSS.

Various Graphs Promoted to OSS

Graphs such as Concurrency, Batches, and Utilization have been promoted to OSS.

Time Ranges Promoted to OSS

Data Exploration Time Ranges are now fully available in OSS.

Data Sorting

Data Sorting in Karafka Web UI is super helpful because it lets you quickly organize and find the info you need in tables full of different metrics. With all the data Karafka handles, sorting makes it much easier to spot the important stuff.

Pending Jobs Visibility

Pending Jobs visibility in Karafka Web UI is a feature that is particularly useful when working with advanced scheduling API that fine-tunes how jobs are managed within a process. This API can hold back job execution when needed, like when you want to balance your system's resources or prioritize tasks from specific topics.

Now, the Web UI lets you see all jobs waiting to run. This means you can easily track what's queued up, helping you understand how your jobs are organized and when they will likely be executed.

Data Transfers Visibility

Visibility of ingress and egress data in Karafka Web UI helps you monitor the data flow to and from Kafka. This feature is excellent for spotting how much data your system is dealing with and catching any unexpected data spikes. It's beneficial if you're with vendors who charge for data transfer, as it can help you manage costs better and quickly spot any issues that might lead to extra charges.

Raw Payloads and Deserialized Payloads Download

The Web UI now allows the download of Raw and Deserialized Payloads, giving you direct access to your message data in its original or processed form. This feature can be adjusted to fit your company's data access policies, ensuring that sensitive information remains secure while providing the necessary data transparency and accessibility for your team.

Pauses Tracking

Karafka UI has enhanced its visibility features, providing detailed information about paused partitions, including when their processing is expected to resume. This is particularly helpful if a pause is due to an error, as you can see when the partition is paused and when it's set to get back on track.

Upgrade Notes

No significant changes are needed. Just follow this Upgrade Doc.

Karafka Pro

Karafka Pro has many valuable, well-documented, well-tested functionalities that can significantly improve your day-to-day operations with Kafka in Ruby. It also introduces commercial support, as due to a sheer number of questions and requests, I do need to have a way to prioritize those.

Help me continue to build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

References


Stay tuned, and don't forget to join our Slack channel.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑