Tag: apache kafka

Breaking the Rules: RPC Pattern with Apache Kafka and Karafka

Introduction

Using Kafka for Remote Procedure Calls (RPC) might raise eyebrows among seasoned developers. At its core, RPC is a programming technique that creates the illusion of running a function on a local machine when it executes on a remote server. When you make an RPC call, your application sends a request to a remote service, waits for it to execute some code, and then receives the results - all while making it feel like a regular function call in your code.

Apache Kafka, however, was designed as an event log, optimizing for throughput over latency. Yet, sometimes unconventional approaches yield surprising results. This article explores implementing RPC patterns with Kafka using the Karafka framework. While this approach might seem controversial - and rightfully so - understanding its implementation, performance characteristics, and limitations may provide valuable insights into Kafka's capabilities and distributed system design.

The idea emerged from discussing synchronous communication in event-driven architectures. What started as a theoretical question - "Could we implement RPC with Kafka?" - evolved into a working proof of concept that achieved millisecond response times in local testing.

In modern distributed systems, the default response to new requirements often involves adding another specialized tool to the technology stack. However, this approach comes with its own costs:

  • Increased operational complexity,
  • Additional maintenance overhead,
  • And more potential points of failure.

Sometimes, stretching the capabilities of existing infrastructure - even in unconventional ways - can provide a pragmatic solution that avoids these downsides.

Disclaimer: This implementation serves as a proof-of-concept and learning resource. While functional, it lacks production-ready features like proper timeout handling, resource cleanup after timeouts, error propagation, retries, message validation, security measures, and proper metrics/monitoring. The implementation also doesn't handle edge cases like Kafka cluster failures. Use this as a starting point to build a more robust solution.

Architecture Overview

Building an RPC pattern on top of Kafka requires careful consideration of both synchronous and asynchronous aspects of communication. At its core, we're creating a synchronous-feeling operation by orchestrating asynchronous message flows underneath. From the client's perspective, making an RPC call should feel synchronous - send a request and wait for a response. However, once a command enters Kafka, all the underlying operations are asynchronous.

Core Components

Such an architecture has to rely on several key components working together:

  • Two Kafka topics form the backbone - a command topic for requests and a result topic for responses.
  • A client-side consumer, running without a consumer group, that actively matches correlation IDs and starts from the latest offset to ensure we only process relevant messages.
  • The commands consumer in our RPC server that processes requests and publishes results
  • A synchronization mechanism using mutexes and condition variables that maintain thread safety and handles concurrent requests.

Implementation Flow

A unique correlation ID is always generated when a client initiates an RPC call. The command is then published to Kafka, where it's processed asynchronously. The client blocks execution using a mutex and condition variable while waiting for the response. Meanwhile, the message flows through several stages:

  • command topic persistence,
  • consumer polling and processing,
  • result publishing,
  • result topic persistence,
  • and finally, the client-side consumer matching of the correlation ID with the response and completion signaling,

Below, you can find a visual representation of the RPC flow over Kafka. The diagram shows the journey of a single request-response cycle:

Design Considerations

This architecture makes several conscious trade-offs. We use single-partition topics to ensure strict ordering, which limits throughput but simplifies correlation and provides exactly-once processing semantics - though the partition count and other things could be adjusted if higher scale becomes necessary. The custom consumer approach avoids consumer group rebalancing delays, while the synchronization mechanism bridges the gap between Kafka's asynchronous nature and our desired synchronous behavior. While this design prioritizes correctness over maximum throughput, it aligns well with typical RPC use cases where reliability and simplicity are key requirements.

Implementation Components

Getting from concept to working code requires several key components to work together. Let's examine the implementation of our RPC pattern with Kafka.

Topic Configuration

First, we need to define our topics. We use a single-partition configuration to maintain message ordering:

topic :commands do
  config(partitions: 1)
  consumer CommandsConsumer
end

topic :commands_results do
  config(partitions: 1)
  active false
end

This configuration defines two essential topics:

  • Command topic that receives and processes RPC requests
  • Results topic marked as inactive since we'll use a custom iterator instead of a standard consumer group consumer

Command Consumer

The consumer handles incoming commands and publishes results back to the results topic:

class CommandsConsumer < ApplicationConsumer
  def consume
    messages.each do |message|
      Karafka.producer.produce_async(
        topic: 'commands_results',
        # We evaluate whatever Ruby code comes in the payload
        # We return stringified result of evaluation
        payload: eval(message.raw_payload).to_s,
        key: message.key
      )

      mark_as_consumed(message)
    end
  end
end

We're using a simple eval to process commands for demonstration purposes. You'd want to implement proper command validation, deserialization, and secure processing logic in production.

Synchronization Mechanism

To bridge Kafka's asynchronous nature with synchronous RPC behavior, we implement a synchronization mechanism using Ruby's mutex and condition variables:

class Accu
  include Singleton

  def initialize
    @running = {}
    @results = {}
  end

  def register(id)
    @running[id] = [Mutex.new, ConditionVariable.new]
  end

  def unlock(id, result)
    return false unless @running.key?(id)

    @results[id] = result
    mutex, cond = @running.delete(id)
    mutex.synchronize { cond.signal }
  end

  def result(id)
    @results.delete(id)
  end
end

This mechanism maintains a registry of pending requests and coordinates the blocking and unblocking of client threads based on correlation IDs. When a response arrives, it signals the corresponding condition variable to unblock the waiting thread.

The Client

Our client implementation brings everything together with two main components:

  1. A response listener that continuously checks for matching results
  2. A blocking command dispatcher that waits for responses
class Client
  class << self
    def run
      iterator = Karafka::Pro::Iterator.new(
        { 'commands_results' => true },
        settings: {
          'bootstrap.servers': '127.0.0.1:9092',
          'enable.partition.eof': false,
          'auto.offset.reset': 'latest'
        },
        yield_nil: true,
        max_wait_time: 100
      )

      iterator.each do |message|
        next unless message

        Accu.instance.unlock(message.key, message.raw_payload)
      rescue StandardError => e
        puts e
        sleep(rand)
        next
      end
   end

   def perform(ruby_remote_code)
      cmd_id = SecureRandom.uuid

      Karafka.producer.produce_sync(
        topic: 'commands',
        payload: ruby_remote_code,
        key: cmd_id
      )

      mutex, cond = Accu.instance.register(cmd_id)
      mutex.synchronize { cond.wait(mutex) }

      Accu.instance.result(cmd_id)
    end
  end
end

The client uses Karafka's Iterator to consume responses without joining a consumer group, which avoids rebalancing delays and ensures we only process new messages. The perform method handles the synchronous aspects:

  • Generates a unique correlation ID
  • Registers the request with our synchronization mechanism
  • Sends the command
  • Blocks until the response arrives

Using the Implementation

To use this RPC implementation, first start the response listener in a background thread:

# Do this only once per process
Thread.new { Client.run }

Then, you can make synchronous RPC calls from your application:

Client.perform('1 + 1')
#=> Remote result: 2

Each call blocks until the response arrives, making it feel like a regular synchronous method call despite the underlying asynchronous message flow.

Despite its simplicity, this implementation achieves impressive performance in local testing - roundtrip times as low as 3ms. However, remember this assumes ideal conditions and minimal command processing time. Real-world usage would need additional error handling, timeouts, and more robust command processing logic.

Performance Considerations

The performance characteristics of this RPC implementation are surprisingly good, but they come with important caveats and considerations that need to be understood for proper usage.

Local Testing Results

In our local testing environment, the implementation showed impressive numbers.

A single roundtrip can be completed in as little as 3ms. Even when executing 100 sequential commands:

require 'benchmark'

Benchmark.measure do
  100.times { Client.perform('1 + 1') }
end
#=> 0.035734   0.011570   0.047304 (  0.316631)

However, it's crucial to understand that these numbers represent ideal conditions:

  • Local Kafka cluster
  • Minimal command processing time
  • No network latency
  • No concurrent load

Summary

While Kafka wasn't designed for RPC patterns, this implementation demonstrates that with careful consideration and proper use of Karafka's features, we can build reliable request-response patterns on top of it. The approach shines particularly in environments where Kafka is already a central infrastructure, allowing messaging architecture to be extended without introducing additional technologies.

However, this isn't a silver bullet solution. Success with this pattern requires careful attention to timeouts, error handling, and monitoring. It works best when Kafka is already part of your stack, and your use case can tolerate slightly higher latencies than traditional RPC solutions.

This fascinating pattern challenges our preconceptions about messaging systems and RPC. It demonstrates that understanding your tools deeply often reveals capabilities beyond their primary use cases. While unsuitable for every situation, it provides a pragmatic alternative when adding new infrastructure components isn't desirable.

From Sleep to Speed: Making Rdkafka Sync Operations 16 Times Faster

As an open-source developer, I constantly seek performance gains in the code I maintain. Since I took over rdkafka from AppSignal in November 2023, I promised not only to maintain the gem but to provide a stream of feature improvements and performance enhancements. One key area where performance can often be improved is how synchronization is handled in synchronous operations. This article discusses our significant improvement in rdkafka by replacing sleep with condition variables and mutexes.

rdkafka-ruby (rdkafka for short) is a low-level driver used within the Karafka ecosystem to communicate with Kafka.

It is worth pointing out that while I did the POC, Tomasz Pajor completed the final implementation, and I'm describing it here because Tomasz does not run a blog.

The Problem with Sleep in Synchronous Operations

In synchronous operations, especially those involving waiting for a condition to be met, the use of sleep to periodically check the status is common but problematic. While simple to implement, this approach introduces inefficiencies and can significantly degrade performance.

How Sleep Was Used in rdkafka

Such an approach was taken in rdkafka-ruby when dealing with callbacks for many operations. Whether dispatching messages, creating new topics, or getting configuration details, any wait request would sleep for a certain period, periodically rechecking whether the given operation was done. This meant that operations that could be completed in a few milliseconds were delayed by the fixed sleep duration.

Additionally, librdkafka, the underlying library used by rdkafka, is inherently asynchronous. Operations are dispatched to an internal thread that triggers a callback upon completion or error. This asynchronous nature requires some form of synchronization to ensure the main thread can handle these callbacks correctly. The sleep-based approach for synchronization added unnecessary delays and inefficiencies.

Below is a simplified diagram of synchronous message production before the change.

Why Sleep is Inefficient

Using sleep for status checking in synchronous operations has several significant drawbacks:

  • Latency: A fixed sleep interval means the actual waiting time is at least as long as the sleep duration, even if the condition is met sooner.
  • Resource Wastage: CPU cycles are wasted during sleep since the thread is inactive and does not contribute to the task's progress.
  • Imprecise Timing: Threads might wake up slightly later than the specified interval, leading to additional delays.

In rdkafka, the default sleep duration was set to 100ms. This means that any #wait operation would take at least 100ms, even if the task only required a few milliseconds. This unnecessary wait time accumulates, leading to significant performance degradation.

While such a lag was insignificant in the case of one-time operations like topic creation, it was problematic for anyone using sync messages dispatch. The overhead of sleeping for an extensive period was significant. The faster the cluster worked, the bigger the loss would be.

I asked Tomasz Pajor, who wanted to do something interesting in the Karafka ecosystem, to replace it with a condition-variable-based setup.

Validating Assumptions

To ensure that the new approach would yield gains, I measured the difference in time when librdkafka announced successful delivery against when this information was available in Ruby. This validation confirmed that the new synchronization method could provide a major performance boost.

*Time from the message dispatch to the moment the given component is aware of its successful delivery, plus waste time (pointless wait). Less is better.

Ruby would "wait" an additional 94 milliseconds on average before acknowledging a given message delivery! This meant there was a theoretical potential to improve this by over 93% per dispatch, ideally getting as close to librdkafka delivery awareness as possible.

The Role of Condition Variables and Mutexes

Before we explore the implementation's roots and some performance benchmarks, let's establish the knowledge baseline. While most of you may be familiar with mutexes, condition variables are only occasionally used in Ruby daily.

Mutexes

A mutex (short for mutual exclusion) is a synchronization primitive that controls access to a shared resource. It ensures that only one thread can access the resource at a time, preventing race conditions.

Condition Variables

A condition variable is a synchronization primitive that allows threads to wait until a particular condition is true. It works with a mutex to avoid the "busy-wait" problem seen with sleep.

Below is a simple example demonstrating the use of condition variables in Ruby. One thread waits for a condition to be met, while another thread simulates some work, sets the condition to true, and signals the waiting thread to proceed.

mutex = Mutex.new
condition = ConditionVariable.new
ready = false

# Thread that waits for the condition to be true
waiting_thread = Thread.new do
  mutex.synchronize do
    puts "Waiting for the condition..."
    condition.wait(mutex) until ready
    puts "Condition met! Proceeding..."
  end
end

# Thread that sets the condition to true
signaling_thread = Thread.new do
  sleep(1) # Simulate some work
  mutex.synchronize do
    puts "Signaling the condition..."
    ready = true
    condition.signal
  end
end

waiting_thread.join
signaling_thread.join

Spurious Wakeups

When using condition variables, it is essential to handle spurious wakeups. A spurious wakeup is when a thread waiting on a condition variable is awakened without being explicitly notified. This can happen for various reasons, such as system-level interruptions or other factors beyond the application's control.

The condition should always be checked in a loop to handle spurious wakeups. This ensures that even if the thread wakes up unexpectedly, it will recheck the condition and go back to waiting if it is not met. Here's an example:

@mutex.synchronize do
  loop do
    if condition_met?
      # Proceed with the task
      break
    else
      @resource.wait(@mutex)
    end
  end
end

This loop ensures that the thread only proceeds when the actual condition is met, thus safeguarding against spurious wakeups.

Implementing Condition Variables in rdkafka

To address the inefficiencies caused by sleep, Tomasz replaced it with a combination of condition variables and mutexes. This change allows threads to wait more efficiently for conditions to be met.

Code Implementation

The PR with this change can be found here.

Here's a simplified version of the wait code that replaced sleep with condition variables and mutexes:

def wait(max_wait_timeout: 60, raise_response_error: true)
  timeout = max_wait_timeout ? monotonic_now + max_wait_timeout : MAX_WAIT_TIMEOUT_FOREVER

  @mutex.synchronize do
    loop do
      if pending?
        to_wait = (timeout - monotonic_now)

        if to_wait.positive?
          @resource.wait(@mutex, to_wait)
        else
          raise WaitTimeoutError.new(
            "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds"
          )
        end
      elsif self[:response] != 0 && raise_response_error
        raise_error
      else
        return create_result
      end
    end
  end
end

def unlock
  @mutex.synchronize do
    self[:pending] = false
    @resource.broadcast
  end
end

The moment librdkafka would trigger delivery callback, condition variable #broadcast would unlock the wait, effectively reducing the wait waste from around 93ms down to 0,07ms! That's a whooping 1328 times less!

Performance and Efficiency Gains

By using condition variables and mutexes, we observed a significant improvement in performance and efficiency:

  • Reduced Latency: Threads wake up as soon as the condition is met, eliminating the unnecessary wait time introduced by sleep.
  • Better Resource Utilization: The CPU is not idling during the waits, allowing for more efficient use of processing power.
  • More Predictable Timing: The precise control over thread waking improves the predictability and reliability of synchronous operations.

The performance gains are substantial on a fast cluster. For instance, with queue.buffering.max.ms set to 5ms (default) and an acknowledgment (ack) of 1 or 2, Kafka can dispatch messages in 6-7ms. Using a 100ms sleep means waiting an additional 94ms, leading to a total wait time of 100ms for operations that could have been completed in 5-6ms.

The improvement is also significant in the case of WaterDrop, which had the sleep value set to 10ms. On a fast cluster with the same settings, a 10ms sleep would still cause a delay for operations that could be completed in 5ms, effectively doubling the wait time.

*Time from the message dispatch to the moment the given component is aware of its successful delivery, plus waste time (pointless wait). Less is better.

The change is so significant that putting it on a chart is hard. The time needed to dispatch 1000 messages synchronously is now over 16 times shorter!

*Time needed to dispatch 1000 messages synchronously before and after the change. Less is better.

Implications for Ruby's Scheduler

Ruby's scheduler also benefits from the removal of short sleep intervals. The Ruby scheduler typically schedules thread work in 100ms increments. Short sleeps disrupt this scheduling, leading to inefficient thread management and potential context-switching overhead. The scheduler can manage threads more effectively using condition variables and mutexes, reducing the need for frequent context switches and improving overall application performance.

Rdkafka and WaterDrop Synchronicity Remarks

Karafka components and the design of librdkafka heavily emphasize asynchronous operations. These operations are the recommended approach for most tasks, offering superior performance and resource utilization. However, it's important to address synchronous operations. Their efficient handling is crucial, particularly for specific use cases like transactions.

This improvement in rdkafka enhances the performance of synchronous operations, making them more efficient and reliable. It is important to recognize users' diverse use cases, including those who prefer synchronous operations for their specific needs.

Conclusions

Replacing sleep with condition variables and mutexes in rdkafka significantly enhanced its performance. This approach eliminates unnecessary wait times, optimizes resource usage, and aligns better with Ruby's scheduling model. These improvements translate to a more efficient and responsive application, especially in high-performance environments where every millisecond counts.

By adopting this change, rdkafka can better handle high-volume synchronous operations, ensuring that threads wait only as long as necessary and wake up immediately when the required condition is met. This not only improves performance but also enhances the overall robustness of the system.

Copyright © 2025 Closer to Code

Theme by Anders NorenUp ↑