Tag: karafka

Breaking the Rules: RPC Pattern with Apache Kafka and Karafka

Introduction

Using Kafka for Remote Procedure Calls (RPC) might raise eyebrows among seasoned developers. At its core, RPC is a programming technique that creates the illusion of running a function on a local machine when it executes on a remote server. When you make an RPC call, your application sends a request to a remote service, waits for it to execute some code, and then receives the results - all while making it feel like a regular function call in your code.

Apache Kafka, however, was designed as an event log, optimizing for throughput over latency. Yet, sometimes unconventional approaches yield surprising results. This article explores implementing RPC patterns with Kafka using the Karafka framework. While this approach might seem controversial - and rightfully so - understanding its implementation, performance characteristics, and limitations may provide valuable insights into Kafka's capabilities and distributed system design.

The idea emerged from discussing synchronous communication in event-driven architectures. What started as a theoretical question - "Could we implement RPC with Kafka?" - evolved into a working proof of concept that achieved millisecond response times in local testing.

In modern distributed systems, the default response to new requirements often involves adding another specialized tool to the technology stack. However, this approach comes with its own costs:

  • Increased operational complexity,
  • Additional maintenance overhead,
  • And more potential points of failure.

Sometimes, stretching the capabilities of existing infrastructure - even in unconventional ways - can provide a pragmatic solution that avoids these downsides.

Disclaimer: This implementation serves as a proof-of-concept and learning resource. While functional, it lacks production-ready features like proper timeout handling, resource cleanup after timeouts, error propagation, retries, message validation, security measures, and proper metrics/monitoring. The implementation also doesn't handle edge cases like Kafka cluster failures. Use this as a starting point to build a more robust solution.

Architecture Overview

Building an RPC pattern on top of Kafka requires careful consideration of both synchronous and asynchronous aspects of communication. At its core, we're creating a synchronous-feeling operation by orchestrating asynchronous message flows underneath. From the client's perspective, making an RPC call should feel synchronous - send a request and wait for a response. However, once a command enters Kafka, all the underlying operations are asynchronous.

Core Components

Such an architecture has to rely on several key components working together:

  • Two Kafka topics form the backbone - a command topic for requests and a result topic for responses.
  • A client-side consumer, running without a consumer group, that actively matches correlation IDs and starts from the latest offset to ensure we only process relevant messages.
  • The commands consumer in our RPC server that processes requests and publishes results
  • A synchronization mechanism using mutexes and condition variables that maintain thread safety and handles concurrent requests.

Implementation Flow

A unique correlation ID is always generated when a client initiates an RPC call. The command is then published to Kafka, where it's processed asynchronously. The client blocks execution using a mutex and condition variable while waiting for the response. Meanwhile, the message flows through several stages:

  • command topic persistence,
  • consumer polling and processing,
  • result publishing,
  • result topic persistence,
  • and finally, the client-side consumer matching of the correlation ID with the response and completion signaling,

Below, you can find a visual representation of the RPC flow over Kafka. The diagram shows the journey of a single request-response cycle:

Design Considerations

This architecture makes several conscious trade-offs. We use single-partition topics to ensure strict ordering, which limits throughput but simplifies correlation and provides exactly-once processing semantics - though the partition count and other things could be adjusted if higher scale becomes necessary. The custom consumer approach avoids consumer group rebalancing delays, while the synchronization mechanism bridges the gap between Kafka's asynchronous nature and our desired synchronous behavior. While this design prioritizes correctness over maximum throughput, it aligns well with typical RPC use cases where reliability and simplicity are key requirements.

Implementation Components

Getting from concept to working code requires several key components to work together. Let's examine the implementation of our RPC pattern with Kafka.

Topic Configuration

First, we need to define our topics. We use a single-partition configuration to maintain message ordering:

topic :commands do
  config(partitions: 1)
  consumer CommandsConsumer
end

topic :commands_results do
  config(partitions: 1)
  active false
end

This configuration defines two essential topics:

  • Command topic that receives and processes RPC requests
  • Results topic marked as inactive since we'll use a custom iterator instead of a standard consumer group consumer

Command Consumer

The consumer handles incoming commands and publishes results back to the results topic:

class CommandsConsumer < ApplicationConsumer
  def consume
    messages.each do |message|
      Karafka.producer.produce_async(
        topic: 'commands_results',
        # We evaluate whatever Ruby code comes in the payload
        # We return stringified result of evaluation
        payload: eval(message.raw_payload).to_s,
        key: message.key
      )

      mark_as_consumed(message)
    end
  end
end

We're using a simple eval to process commands for demonstration purposes. You'd want to implement proper command validation, deserialization, and secure processing logic in production.

Synchronization Mechanism

To bridge Kafka's asynchronous nature with synchronous RPC behavior, we implement a synchronization mechanism using Ruby's mutex and condition variables:

class Accu
  include Singleton

  def initialize
    @running = {}
    @results = {}
  end

  def register(id)
    @running[id] = [Mutex.new, ConditionVariable.new]
  end

  def unlock(id, result)
    return false unless @running.key?(id)

    @results[id] = result
    mutex, cond = @running.delete(id)
    mutex.synchronize { cond.signal }
  end

  def result(id)
    @results.delete(id)
  end
end

This mechanism maintains a registry of pending requests and coordinates the blocking and unblocking of client threads based on correlation IDs. When a response arrives, it signals the corresponding condition variable to unblock the waiting thread.

The Client

Our client implementation brings everything together with two main components:

  1. A response listener that continuously checks for matching results
  2. A blocking command dispatcher that waits for responses
class Client
  class << self
    def run
      iterator = Karafka::Pro::Iterator.new(
        { 'commands_results' => true },
        settings: {
          'bootstrap.servers': '127.0.0.1:9092',
          'enable.partition.eof': false,
          'auto.offset.reset': 'latest'
        },
        yield_nil: true,
        max_wait_time: 100
      )

      iterator.each do |message|
        next unless message

        Accu.instance.unlock(message.key, message.raw_payload)
      rescue StandardError => e
        puts e
        sleep(rand)
        next
      end
   end

   def perform(ruby_remote_code)
      cmd_id = SecureRandom.uuid

      Karafka.producer.produce_sync(
        topic: 'commands',
        payload: ruby_remote_code,
        key: cmd_id
      )

      mutex, cond = Accu.instance.register(cmd_id)
      mutex.synchronize { cond.wait(mutex) }

      Accu.instance.result(cmd_id)
    end
  end
end

The client uses Karafka's Iterator to consume responses without joining a consumer group, which avoids rebalancing delays and ensures we only process new messages. The perform method handles the synchronous aspects:

  • Generates a unique correlation ID
  • Registers the request with our synchronization mechanism
  • Sends the command
  • Blocks until the response arrives

Using the Implementation

To use this RPC implementation, first start the response listener in a background thread:

# Do this only once per process
Thread.new { Client.run }

Then, you can make synchronous RPC calls from your application:

Client.perform('1 + 1')
#=> Remote result: 2

Each call blocks until the response arrives, making it feel like a regular synchronous method call despite the underlying asynchronous message flow.

Despite its simplicity, this implementation achieves impressive performance in local testing - roundtrip times as low as 3ms. However, remember this assumes ideal conditions and minimal command processing time. Real-world usage would need additional error handling, timeouts, and more robust command processing logic.

Performance Considerations

The performance characteristics of this RPC implementation are surprisingly good, but they come with important caveats and considerations that need to be understood for proper usage.

Local Testing Results

In our local testing environment, the implementation showed impressive numbers.

A single roundtrip can be completed in as little as 3ms. Even when executing 100 sequential commands:

require 'benchmark'

Benchmark.measure do
  100.times { Client.perform('1 + 1') }
end
#=> 0.035734   0.011570   0.047304 (  0.316631)

However, it's crucial to understand that these numbers represent ideal conditions:

  • Local Kafka cluster
  • Minimal command processing time
  • No network latency
  • No concurrent load

Summary

While Kafka wasn't designed for RPC patterns, this implementation demonstrates that with careful consideration and proper use of Karafka's features, we can build reliable request-response patterns on top of it. The approach shines particularly in environments where Kafka is already a central infrastructure, allowing messaging architecture to be extended without introducing additional technologies.

However, this isn't a silver bullet solution. Success with this pattern requires careful attention to timeouts, error handling, and monitoring. It works best when Kafka is already part of your stack, and your use case can tolerate slightly higher latencies than traditional RPC solutions.

This fascinating pattern challenges our preconceptions about messaging systems and RPC. It demonstrates that understanding your tools deeply often reveals capabilities beyond their primary use cases. While unsuitable for every situation, it provides a pragmatic alternative when adding new infrastructure components isn't desirable.

Under the Hood: Enhancing Karafka’s CPU and Memory Efficiency

Introduction

Now and then, I like to go on a performance improvement hunt because life isn't just about adding new features. Recently, I have been focusing on enhancing efficiency, particularly regarding CPU and memory usage in Karafka. Three of my recent pull requests (PR 117, PR 118, PR 123), have made some minor improvements, and this article is about them. These changes help reduce memory allocation and improve time tracking management in Karafka and WaterDrop.

Most of the time, such improvements are not significant, but when applied in crucial places, they can make things a bit faster.

When doing OSS, I think of myself as a middleman. Karafka runs in tens of thousands of processes, and improvements affecting the consumption or production of messages (especially when applicable per message) can make a difference when multiplied.

Shaving Memory Allocations

PR 117 targets memory savings by optimizing instrumentation data handling. The primary change involves reducing unnecessary array allocations during instrumentation. Previously, every instrumentation event would allocate a new array, leading to excessive memory usage. The updated code minimizes these allocations by inlining the tracking code.

Here's a simple instrumentation layer one could build:

# This is a simplified bus without too many features
#
# It supports simple instrumentation piping details to listeners and returning the results
class Notifications
  def initialize(*listeners)
    @listeners = listeners
  end

  def instrument(name, &block)
    result, time = measure_time_taken(&block)

    details = { name: name, result: result, time: time }

    @listeners.each do |listener|
      listener.call(details)
    end

    result
  end

  private

  def monotonic_now
    ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) * 1_000
  end

  def measure_time_taken
    start = monotonic_now
    result = yield
    [result, monotonic_now - start]
  end
end

It's a fairly simple instrumentation that allows you to have listeners and wrap your code with it for tracking:

bus = Notifications.new(
  ->(details) { puts "Event: #{details[:name]} occurred" },
  ->(details) { puts("  and it took #{details[:time]}ms to run") }
)

bus.instrument('my.random.sleep') do
  sleep_time = rand
  sleep(sleep_time)
  sleep_time
end

# Event: my.random.sleep occurred
#   and it took 799.0296444892883ms
# => 0.7981784182914173

It works and is actually quite fast:

bus = Notifications.new
EVENT = 'test'
puts Benchmark.measure { 1_000_000.times { bus.instrument(EVENT) {} } }

# 0.538294   0.000000   0.538294 (  0.539663)

However, there is one thing that is fundamentally wrong with this code, and that is time tracking. It may not be visible at first, but when you start counting object allocations, this is what you end up with:

GC.disable

def count_objects_created
  # Take a snapshot of the current object counts
  initial_counts = ObjectSpace.each_object.count

  yield if block_given?

  # Take another snapshot after running the block
  final_counts = ObjectSpace.each_object.count

  # Calculate the difference
  new_objects_created = final_counts - initial_counts

  puts "Number of objects created: #{new_objects_created}"
end

bus = Notifications.new
EVENT = 'test'

count_objects_created do
  1_000_000.times { bus.instrument(EVENT){} }
end

# Number of objects created: 2000002

there are twice as many objects and all of this because of the return value of the time measurement:

def measure_time_taken
  start = monotonic_now
  result = yield
  [result, monotonic_now - start]
end

It returns an array of results and time measurements. It may not be a lot but nonetheless, lets inline this instead of delegating to a method:

def instrument(name)
  start = monotonic_now
  result = yield
  time = monotonic_now - start

  details = { name: name, result: result, time: time }

  @listeners.each do |listener|
    listener.call(details)
  end

  result
end

With this change we no longer allocate the arrays:

bus = Notifications.new
EVENT = 'test'

count_objects_created do
  1_000_000.times { bus.instrument(EVENT){} }
end

# Number of objects created: 1000002

*Numer of objects allocated per one million of instrumentation calls.

You may question why it is relevant and whether it provides significant benefits. I would say that it depends. Karafka is heavily instrumented, and under heavy load, this simple change saves 20-30 thousand allocations per second of execution.

If a Tree Falls: Fast Paths for Unmonitored Events

As I mentioned, Karafka and WaterDrop are heavily instrumented. Since different people can use different events for different use cases (logging, AppSignal instrumentation, or Web UI), there is no silver bullet regarding what to instrument and what not to do. This means that Karafka emits many events during its execution. Same with WaterDrop. During my optimization session, I wondered if there's even a point in measuring and publishing the instrumentation results when no one listens. And this is what the PR 123 is about. If no one is listening, there is no point in making any sound.

Below you can find a simplified previous version of the instrument method. Similar code can be found for example, in dry-monitor and dry-events

# Before
def instrument(name)
  start = monotonic_now
  result = yield
  time = monotonic_now - start

  details = { name: name, result: result, time: time }

  @listeners[name].each do |listener|
    listener.call(details)
  end

  result
end

It's nice and fairly fast. But in the case of publishing many events, it may be optimized as long as we have a way to check if there are any listeners:

# After
def instrument(name)
  listeners = @listeners[name]

  # Measure nothing since no one will use it
  return yield unless listeners

  start = monotonic_now
  result = yield
  time = monotonic_now - start

  details = { name: name, result: result, time: time }

  @listeners[name].each do |listener|
    listener.call(details)
  end

  result
end

For a fast track instrumentation cases, the changed code is over 2.5x faster:

# Based on the PR change measurements

1.830753   0.005683   1.836436 (  1.843820)
# vs.
0.759046   0.000000   0.759046 (  0.759051)

*Time difference when executing instrumentation in the fast-path scenario 1 million times.

This also saves on memory allocations because when no one will use the results, the event details are not being built at all.

Timing Time

The last optimization in this batch was about time measurements. Karafka uses two types of time tracking:

  • A monotonic clock is used to measure distance in time in a way that would not break because of time zone changes, yearly second alignments, or anything else. The CPU monotonic clock is a clock source that provides a monotonically increasing time value, which means it will never go backward.

  • Regular UTC-based time: A normal clock is used to take snapshots when something has happened.

def monotonic_now
  # Returns float with seconds, we wanted in milliseconds
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) * 1_000
end

def float_now
  ::Time.now.utc.to_f
end

The above methods are fairly simple but they do contain space for two improvements:

The first thing that can be done is the elimination of the millisecond multiplication. Ruby now supports the float_millisecond time that allows us not to deal with the multiplication explicitly:

def monotonic_now
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :float_millisecond)
end

The difference is not significant (if any), but this at least simplifies the code and makes it more explicit.

While the above did not yield performance gains, the latter is much more interesting:

def float_now
  ::Time.now.utc.to_f
end

The above code creates a few objects down the road and then casts the time into a float. This may not seem expensive at first, but keep in mind that this code may run thousands of times per second in Karafka. Since our goal is to get a float time, this can also be replaced with a system clock invocation:

def float_now
  ::Process.clock_gettime(Process::CLOCK_REALTIME)
end

The difference in terms of performance is big:

6.405367   0.546916   6.952283 (  6.952844)
# vs.
1.887427   0.003451   1.890878 (  1.897118)

*Time difference when fetching the real-time in both ways.

Using the process clock is 3,4x times faster!

Conclusion

The recent optimizations in Karafka, shown in the PRs mentioned, reflect my commitment to pushing this project forward. As I refine Karafka, I aim to deliver top-notch data-streaming tools for the Ruby community. Every optimization brings Karafka Core closer to achieving the best possible performance and efficiency.

Copyright © 2025 Closer to Code

Theme by Anders NorenUp ↑