Tag: mutex

From Sleep to Speed: Making Rdkafka Sync Operations 16 Times Faster

As an open-source developer, I constantly seek performance gains in the code I maintain. Since I took over rdkafka from AppSignal in November 2023, I promised not only to maintain the gem but to provide a stream of feature improvements and performance enhancements. One key area where performance can often be improved is how synchronization is handled in synchronous operations. This article discusses our significant improvement in rdkafka by replacing sleep with condition variables and mutexes.

rdkafka-ruby (rdkafka for short) is a low-level driver used within the Karafka ecosystem to communicate with Kafka.

It is worth pointing out that while I did the POC, Tomasz Pajor completed the final implementation, and I'm describing it here because Tomasz does not run a blog.

The Problem with Sleep in Synchronous Operations

In synchronous operations, especially those involving waiting for a condition to be met, the use of sleep to periodically check the status is common but problematic. While simple to implement, this approach introduces inefficiencies and can significantly degrade performance.

How Sleep Was Used in rdkafka

Such an approach was taken in rdkafka-ruby when dealing with callbacks for many operations. Whether dispatching messages, creating new topics, or getting configuration details, any wait request would sleep for a certain period, periodically rechecking whether the given operation was done. This meant that operations that could be completed in a few milliseconds were delayed by the fixed sleep duration.

Additionally, librdkafka, the underlying library used by rdkafka, is inherently asynchronous. Operations are dispatched to an internal thread that triggers a callback upon completion or error. This asynchronous nature requires some form of synchronization to ensure the main thread can handle these callbacks correctly. The sleep-based approach for synchronization added unnecessary delays and inefficiencies.

Below is a simplified diagram of synchronous message production before the change.

Why Sleep is Inefficient

Using sleep for status checking in synchronous operations has several significant drawbacks:

  • Latency: A fixed sleep interval means the actual waiting time is at least as long as the sleep duration, even if the condition is met sooner.
  • Resource Wastage: CPU cycles are wasted during sleep since the thread is inactive and does not contribute to the task's progress.
  • Imprecise Timing: Threads might wake up slightly later than the specified interval, leading to additional delays.

In rdkafka, the default sleep duration was set to 100ms. This means that any #wait operation would take at least 100ms, even if the task only required a few milliseconds. This unnecessary wait time accumulates, leading to significant performance degradation.

While such a lag was insignificant in the case of one-time operations like topic creation, it was problematic for anyone using sync messages dispatch. The overhead of sleeping for an extensive period was significant. The faster the cluster worked, the bigger the loss would be.

I asked Tomasz Pajor, who wanted to do something interesting in the Karafka ecosystem, to replace it with a condition-variable-based setup.

Validating Assumptions

To ensure that the new approach would yield gains, I measured the difference in time when librdkafka announced successful delivery against when this information was available in Ruby. This validation confirmed that the new synchronization method could provide a major performance boost.

*Time from the message dispatch to the moment the given component is aware of its successful delivery, plus waste time (pointless wait). Less is better.

Ruby would "wait" an additional 94 milliseconds on average before acknowledging a given message delivery! This meant there was a theoretical potential to improve this by over 93% per dispatch, ideally getting as close to librdkafka delivery awareness as possible.

The Role of Condition Variables and Mutexes

Before we explore the implementation's roots and some performance benchmarks, let's establish the knowledge baseline. While most of you may be familiar with mutexes, condition variables are only occasionally used in Ruby daily.

Mutexes

A mutex (short for mutual exclusion) is a synchronization primitive that controls access to a shared resource. It ensures that only one thread can access the resource at a time, preventing race conditions.

Condition Variables

A condition variable is a synchronization primitive that allows threads to wait until a particular condition is true. It works with a mutex to avoid the "busy-wait" problem seen with sleep.

Below is a simple example demonstrating the use of condition variables in Ruby. One thread waits for a condition to be met, while another thread simulates some work, sets the condition to true, and signals the waiting thread to proceed.

mutex = Mutex.new
condition = ConditionVariable.new
ready = false

# Thread that waits for the condition to be true
waiting_thread = Thread.new do
  mutex.synchronize do
    puts "Waiting for the condition..."
    condition.wait(mutex) until ready
    puts "Condition met! Proceeding..."
  end
end

# Thread that sets the condition to true
signaling_thread = Thread.new do
  sleep(1) # Simulate some work
  mutex.synchronize do
    puts "Signaling the condition..."
    ready = true
    condition.signal
  end
end

waiting_thread.join
signaling_thread.join

Spurious Wakeups

When using condition variables, it is essential to handle spurious wakeups. A spurious wakeup is when a thread waiting on a condition variable is awakened without being explicitly notified. This can happen for various reasons, such as system-level interruptions or other factors beyond the application's control.

The condition should always be checked in a loop to handle spurious wakeups. This ensures that even if the thread wakes up unexpectedly, it will recheck the condition and go back to waiting if it is not met. Here's an example:

@mutex.synchronize do
  loop do
    if condition_met?
      # Proceed with the task
      break
    else
      @resource.wait(@mutex)
    end
  end
end

This loop ensures that the thread only proceeds when the actual condition is met, thus safeguarding against spurious wakeups.

Implementing Condition Variables in rdkafka

To address the inefficiencies caused by sleep, Tomasz replaced it with a combination of condition variables and mutexes. This change allows threads to wait more efficiently for conditions to be met.

Code Implementation

The PR with this change can be found here.

Here's a simplified version of the wait code that replaced sleep with condition variables and mutexes:

def wait(max_wait_timeout: 60, raise_response_error: true)
  timeout = max_wait_timeout ? monotonic_now + max_wait_timeout : MAX_WAIT_TIMEOUT_FOREVER

  @mutex.synchronize do
    loop do
      if pending?
        to_wait = (timeout - monotonic_now)

        if to_wait.positive?
          @resource.wait(@mutex, to_wait)
        else
          raise WaitTimeoutError.new(
            "Waiting for #{operation_name} timed out after #{max_wait_timeout} seconds"
          )
        end
      elsif self[:response] != 0 && raise_response_error
        raise_error
      else
        return create_result
      end
    end
  end
end

def unlock
  @mutex.synchronize do
    self[:pending] = false
    @resource.broadcast
  end
end

The moment librdkafka would trigger delivery callback, condition variable #broadcast would unlock the wait, effectively reducing the wait waste from around 93ms down to 0,07ms! That's a whooping 1328 times less!

Performance and Efficiency Gains

By using condition variables and mutexes, we observed a significant improvement in performance and efficiency:

  • Reduced Latency: Threads wake up as soon as the condition is met, eliminating the unnecessary wait time introduced by sleep.
  • Better Resource Utilization: The CPU is not idling during the waits, allowing for more efficient use of processing power.
  • More Predictable Timing: The precise control over thread waking improves the predictability and reliability of synchronous operations.

The performance gains are substantial on a fast cluster. For instance, with queue.buffering.max.ms set to 5ms (default) and an acknowledgment (ack) of 1 or 2, Kafka can dispatch messages in 6-7ms. Using a 100ms sleep means waiting an additional 94ms, leading to a total wait time of 100ms for operations that could have been completed in 5-6ms.

The improvement is also significant in the case of WaterDrop, which had the sleep value set to 10ms. On a fast cluster with the same settings, a 10ms sleep would still cause a delay for operations that could be completed in 5ms, effectively doubling the wait time.

*Time from the message dispatch to the moment the given component is aware of its successful delivery, plus waste time (pointless wait). Less is better.

The change is so significant that putting it on a chart is hard. The time needed to dispatch 1000 messages synchronously is now over 16 times shorter!

*Time needed to dispatch 1000 messages synchronously before and after the change. Less is better.

Implications for Ruby's Scheduler

Ruby's scheduler also benefits from the removal of short sleep intervals. The Ruby scheduler typically schedules thread work in 100ms increments. Short sleeps disrupt this scheduling, leading to inefficient thread management and potential context-switching overhead. The scheduler can manage threads more effectively using condition variables and mutexes, reducing the need for frequent context switches and improving overall application performance.

Rdkafka and WaterDrop Synchronicity Remarks

Karafka components and the design of librdkafka heavily emphasize asynchronous operations. These operations are the recommended approach for most tasks, offering superior performance and resource utilization. However, it's important to address synchronous operations. Their efficient handling is crucial, particularly for specific use cases like transactions.

This improvement in rdkafka enhances the performance of synchronous operations, making them more efficient and reliable. It is important to recognize users' diverse use cases, including those who prefer synchronous operations for their specific needs.

Conclusions

Replacing sleep with condition variables and mutexes in rdkafka significantly enhanced its performance. This approach eliminates unnecessary wait times, optimizes resource usage, and aligns better with Ruby's scheduling model. These improvements translate to a more efficient and responsive application, especially in high-performance environments where every millisecond counts.

By adopting this change, rdkafka can better handle high-volume synchronous operations, ensuring that threads wait only as long as necessary and wake up immediately when the required condition is met. This not only improves performance but also enhances the overall robustness of the system.

Ruby concurrency is hard: how I became a Ruby on Rails contributor

For the past several weeks, I've been trying to fix a cranky spec in Karafka integrations suite, which in the end, lead me to become a Ruby on Rails micro-contributor and submitting similar fix to several other high-popularity projects from the Ruby ecosystem. Here's my story of trying to make sense of my specs and Ruby concurrency.

Ephemeral bug from a test-suite

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework. To provide reliable OSS that is multi-threaded, I had to have the option to run my test suite concurrently to simulate how Karafka operates. Since it was a specific use case, I created my micro-framework.

Long story short: It runs end-to-end integration specs by running them in separate Ruby processes. Each starts Karafka, runs all the code in various configurations, connects to Kafka, checks assertions, and at the end, shuts down.

Such an approach allowed me to ensure that the process's whole lifecycle and its components work as expected. Specs are started with supervision, so in case of any hang, it will be killed after 5 minutes.

Karafka itself also has an internal shutdown supervisor. In case of a user shutdown request, if the shutdown takes longer than the defined expected time, Karafka will stop despite having things running. And this is what was happening with this single spec:

E, [2022-11-19T16:47:49.602718 #14843] ERROR -- : Forceful Karafka server stop
F, [2022-11-19T16:47:49.602825 #14843] FATAL -- : #<Karafka::Core::Monitoring::Event:0x0000562932d752b0 @id="error.occurred", @payload={:caller=>Karafka::Server, :error=>#<Karafka::Errors::ForcefulShutdownError: Karafka::Errors::ForcefulShutdownError>, :type=>"app.stopping.error"}>

This damn spec did not want to stop!

Many things are working under the hood:

  • workers that process jobs that could hang and force the process to wait
  • jobs queue that is also connected to the polling thread (to poll more data when no work is to be done)
  • listeners that poll data from Kafka that could hang
  • consumer groups with several threads polling Kafka data that might get stuck because of some underlying error
  • Other bugs in the coordination of work and states.

One thing that certainly worked was the process supervision that would forcefully kill it after 30 seconds.

Process shutdown coordination

The graceful shutdown of such a process takes work. When you have many connections to Kafka, upon a poorly organized shutdown, you may trigger several rebalances that may cause short-lived topics assignments causing nothing except friction and potentially blocking the whole process.

To mitigate this, Karafka shuts down actively and gracefully. That is, until the absolute end, it claims the ownership of given topics and partitions, actively waiting for all the current work to be finished. This looks more or less like so:

Note: Consumer groups internally in Karafka are a bit different than Kafka consumer groups. Here we focus on internal Karafka concepts.

Pinpointing the issue

After several failed attempts and fixing other bugs, I added a lot of extra instrumentation to check what Karafka hangs on. It was hanging because there were hanging listener threads!

As stated above, to close Karafka gracefully, all work from the jobs queue needs to be finished, and listeners that poll data from Kafka need to be able to exit the polling loops. It's all coordinated using a job queue. The job queue we're using is pretty complex with some blocking capabilities, and you can read about it here, but the interesting part of the code can be reduced to this:

@semaphores = Concurrent::Map.new { |h, k| h[k] = Queue.new }

Those queues are used as semaphores in the polling loops until all the current work is done. Since each Queue is assigned to a different subscription group within its thread and hidden behind a concurrent map, there should be no problem. Right?

Reproduction

Once I had my crazy suspicion, I decided to reduce it down to a proof of concept:

require 'concurrent-ruby'

100.times do
  ids = Set.new
  semaphores = Concurrent::Hash.new { |h, k| h[k] = Queue.new }

  100.times.map do
    Thread.new do
      ids << semaphores['test'].object_id
    end
  end.each(&:join)

  raise "I expected 1 semaphore but got #{ids.size}" if ids.size != 1
end

once executed, boom:

poc.rb:13:in `<main>': I expected 1 semaphore but got 2 (RuntimeError)

There is more than one semaphore for one listener! This caused Karafka to wait until forced to stop because the worker thread would use a different semaphore than the listener thread.

But how is that even possible?

Well, Concurrent::Hash and Concurrent::Map initialization is indeed thread-safe but not precisely as you would expect them to be. The docs state that:

This version locks against the object itself for every method call, ensuring only one thread can be reading or writing at a time. This includes iteration methods like #each, which takes the lock repeatedly when reading an item.

"only one thread can be reading or writing at a time". However, we are doing both at different times. Our code:

semaphores = Concurrent::Hash.new { |h, k| h[k] = Queue.new }

is actually equivalent to:

semaphores = Concurrent::Hash.new do |h, k|
  queue = Queue.new
  h[k] = queue
end

and the block content is not locked fully. One threads queue can overwrite the other if the Ruby scheduler stops the execution in the middle. Here's the flow of things happening in the form of a diagram:

Once in a while listener would receive a dangling queue object, effectively blocking the polling process.

Fixing the issue

This can be fixed either by replacing the Concurrent::Hash with Concurrent::Map and using the #compute_if_absent method or by introducing a lock inside of the Concurrent::Hash initialization block:

Concurrent::Map.new do |k, v|
  k.compute_if_absent(v) { [] }
end

mutex = Mutex.new

Concurrent::Hash.new do |k, v|
  mutex.synchronize do
    break k[v] if k.key?(v)
    k[v] = []
  end
end

Okay, but what does Ruby on Rails and other projects do with all of this?

Fixing the world

I figured out that if I made this mistake, maybe others did. I decided to check my local gems to find occurrences quickly. Inside my local gem cache, I executed the following code:

fgrep -R 'Concurrent::Hash.new {' ./
fgrep -R 'Concurrent::Hash.new do' ./
fgrep -R 'Concurrent::Map.new {' ./
fgrep -R 'Concurrent::Map.new do' ./

and validated that I'm not an isolated case. I wasn't alone!

Then using Sourcegraph I pinpointed a few projects that had the potential for fixes:

  • rails (activesupport and actionview)
  • i18n
  • dry-schema
  • finite_machine
  • graphql-ruby
  • rom-factory
  • apache whimsy
  • krane
  • puppet

I am not a domain expert in any of those, and understanding the severity of each was beyond my time constraints, but I decided to give it a shot.

Rails (ActiveSupport and ActionView)

Within Rails, this "pattern" was used twice: in ActiveSupport and ActionView.

In ActionView, it was used within a cache:

PREFIXED_PARTIAL_NAMES = Concurrent::Map.new do |h, k|
  h[k] = Concurrent::Map.new
end

and assuming that the cached result is stateless (same result each time for the same key), the issue could only cause an extra computation upon first parallel requests to this cache.

In the case of ActiveSupport, none of the concurrency code was needed, so I just replaced it with a simple Hash.

Both, luckily, were not that severe, though worth fixing nonetheless.

PR: https://github.com/rails/rails/pull/46536
PR: https://github.com/rails/rails/pull/46534

Both were merged, and this is how I became a Ruby on Rails contributor :)

i18n

This case was slightly more interesting because the concurrent cache stores all translations. In theory, this could cause similar leakage as in Karafka, effectively losing a language by loading it to a different Concurrent::Hash:

100.times.map do
  Thread.new do
    I18n.backend.store_translations(rand.to_s, :foo => { :bar => 'bar', :baz => 'baz' })
  end
end.each(&:join)

I18n.available_locales.count #=> 1

This could lead to hard-to-debug problems. Once in a while, your system could raise something like this:

:en is not a valid locale (I18n::InvalidLocale)

without an apparent reason, and this problem would go away after a restart.

PR: https://github.com/ruby-i18n/i18n/pull/644

dry-schema

Another cache case where the risk would revolve around double-computing.

PR: https://github.com/dry-rb/dry-schema/pull/440

rom-factory

This one is interesting! Let's reduce the code to a smaller POC first and see what will happen under heavy threading:

require 'singleton'
require 'concurrent-ruby'

class Sequences
  include Singleton

  attr_reader :registry

  def initialize
    reset
  end

  def next(key)
    registry[key] += 1
  end

  def reset
    @registry = Concurrent::Map.new { |h, k| h[k] = 0 }
    self
  end
end

seq = Sequences.instance

loop do
  100.times.map do
    Thread.new { seq.next('boom') }
  end.each(&:join)

  size = seq.registry['boom']

  raise "Wanted 100 but got #{size}" unless size == 100

  seq.reset
end
poc.rb:37:in `block in <main>': Wanted 100 but got 1 (RuntimeError)

The counter value gets biased. What is even more interesting is that making the map safe won't be enough:

@registry = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { 0 } }
poc.rb:36:in `block in <main>': Wanted 100 but got 55 (RuntimeError)

there is one more "unsafe" method:

def next(key)
  registry[key] += 1
end

this operation also is not atomic, thus needs to be wrapped with a mutex:

def initialize
  @mutex = Mutex.new
  reset
end

def next(key)
  @mutex.synchronize do
    registry[key] += 1
  end
end

Only then is this code safe to be used.

https://github.com/rom-rb/rom-factory/pull/80

Other repositories

Summary

In my opinion, there are a few outcomes of this story:

  • Karafka has a solid test-suite!
  • If you are doing concurrency-related work, you better test it in a multi-threaded environment and test it well.
  • Concurrency is hard to many of us (maybe that's because we are special ;) ).
  • RTFM and read it well :)
  • Do not be afraid to help others by submitting pull requests!

On the other hand, looking at the frequency of this issue, it may be worth opening a discussion about changing this behavior and making the initialization fully locked.

Afterwords

Concurrent::Hash under cRuby is just a Hash. You can check it out here.


Cover photo by James Broad on Attribution-NonCommercial-ShareAlike 2.0 Generic (CC BY-NC-SA 2.0). Image has been cropped.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑