Tag: karafka framework

Reduce your method calls by 99.9% by replacing Thread#pass with Queue#pop

When doing multi-threaded work in Ruby, there are a couple of ways to control the execution flow within a given thread. In this article, I will be looking at Thread#pass and Queue#pop and how understanding each of them can help you drastically optimize your applications.

Thread#pass - what it is and how does it work

One of the ways you can ask the scheduler to "do something else" is by using the Thread#pass method.

Where can you find it? Well, aside from Karafka, for example in one of the most recent additions to ActiveRecord called #load_async (pull request).

Let's see how it works and why it may or may not be what you are looking for when building multi-threaded applications.

Ruby docs are rather minimalistic with its description:

Give the thread scheduler a hint to pass execution to another thread. A running thread may or may not switch, it depends on OS and processor.

That means that when dealing with threads, you can tell Ruby that it would not be a bad idea to switch from executing the current one and focusing on others.

By default, all the threads you create have the same priority and are treated the same way. An excellent illustration of this is the code below:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    # Make threads wait for a bit so all threads are created
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

# for i in {1..1000}; do ruby threads.rb; done > results.txt

on average, the computation in each of them took a similar amount of time:

The difference in between the fastest and the slowest thread was less than 8%.

However, when one of the threads "passes," things change drastically:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      Thread.pass if i.zero?

      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

Now, thread zero takes twice as much time as other threads doing the same job.

What is worth pointing out is that this method does not stop the execution flow by itself, and it just suggests to Ruby that there may be other more important things to do.

Exactly this behaviour was used by Jean Boussier in ActiveRecord:

def schedule_query(future_result) # :nodoc:
  @async_executor.post { future_result.execute_or_skip }
  Thread.pass
end

This code schedules a background job and suggests to the scheduler that it may be worth doing that or other things somewhere else.

It is worth mentioning, that when all the threads use the Thread#pass, it becomes a colossal burden to the Ruby VM. Ruby goes crazy since none of the threads wants to do any work and the execution time increases over 100 times.

Queue#pop - What it is and how does it work

Queue is a well known class and #popis one of the most important methods it contains.

Here is what Ruby docs say about the Queue class and the #pop method:

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

#pop: If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

When asked about queues, most programmers think about workers consuming jobs from a queue:

numbers = Queue.new

threads = 10.times.map do |i|
  Thread.new do
    while number = numbers.pop
      result = Time.now.to_f / number

      # a bit of randomness
      sleep(rand / 1_000)

      puts "Thread #{i},#{result}"
    end
  end
end

10_000.times { numbers << rand }

# see what I did here? ;)
Thread.pass until numbers.empty?

numbers.close

threads.each(&:join)

What is worth keeping in mind about Queue#pop is that it will block the execution in a given thread until there is something to do. This means, that a blocked thread becomes almost "invisible" from the performance perspective. Here's an example of running computations with 0 , 4, 9 and 99 blocked threads:

queue = Queue.new

THREADS = 4

THREADS.times do
  Thread.new { queue.pop }
end

# Wait until all the threads are initialized
Thread.pass until queue.num_waiting == THREADS

start = Time.now.to_f

10_000_000.times do
  start / rand
end

puts Time.now.to_f - start

As you can see, inactive threads do not have a big impact on the overall performance of this code. Even with 99 extra threads, the end result is not far away from the baseline.

Reducing method calls in a multi-threaded environment

Now that you know what Thread#pass and Queue#pop do, lets put them to work in a real use case. For that to happen we will be looking into the Karafka framework.

Karafka is a framework used to simplify Apache Kafka-based Ruby applications development that I built. The version 2.0 supports work distribution across multiple threads. The way it works from a data processing perspective is quite simple:

1. Take some data from Kafka
2. Divide it into processing units (jobs)
3. Put all the jobs into a queue
4. Wait for all the workers to pick the jobs and finish all the work
5. Repeat endlessly

Assuming an endless stream of data available, this can be pretty much modelled as followed:

queue = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
    end
  end
end

def wait_for_jobs_to_finish(queue)
  Thread.pass while queue.num_waiting < THREADS || !queue.empty?
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(queue)
end

And this is how the listener loop together with jobs distributions was implemented by me initially.

When benchmarked in regards to the number of times Thread#pass was executed on a pass-through benchmark (where we measure max throughput), things looked solid.

Despite increased number of iterations, we would not wait more often per iteration. What that means, is that our jobs were short enough for them to finish prior to Ruby returning to the wait loop.

Things become much more interesting if we assume that our jobs take more time than Ruby gives them before thread execution is interrupted. Then things start to look differently:

# Same code as before but the job has a bit of sleep simulating IO
task = ->(data) { sleep(rand(9..11) / 10000.0) }

Assuming we burn around 1ms per job, the number of passes skyrockets:

That's over 1000 times more invocations of the same method!

In a case, where we would run heavy queries of around 100ms (+/- 10%) per job, we end up with following results per iteration:

That means, that Ruby had to run #Thread#pass over 180 000 times on average for nothing!

When optimizing any code, it is good to establish the primary use case for its usage. In the case of Karafka, while raw throughput is important, it is more about complex jobs being able to use the GVL release strategy to allow parallel work execution upon IO.

So, is there a better way to make Ruby wait patiently on all the jobs to be done? There is: Queue#pop. Since it is thread-safe, we can use it to notify the main thread that the given job has finished. It won't eliminate useless runs, but it will reduce them so much that they, in fact, will become insignificant. Since we know how many jobs we've enqueued, we know how many times we need to #pop:

queue = Queue.new
lock = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
      lock << true
    end
  end
end

def wait_for_jobs_to_finish(dispatched, lock)
  dispatched.times { lock.pop }
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(data.size, lock)
end

The lock.pop will stop the execution of the main thread until each job is done. This means that we increase the number of stops with an increased number of threads. However, this correlation is linear and the end result is orders of magnitude smaller than when using Thread.pass.

Here's the same benchmark with a number of Queue#pop calls that replaced Thread#pass for non-sleep case:

The number of Queue#pop invocations equals the thread number. It is independent of jobs types or any other circumstances. So the longer jobs are, the bigger the improvement:

This change not only reduced the number of calls by over 99.994% but it also drastically lowered CPU utilization, which is visible especially for cases with extensive IO (here simulated with sleep):

Summary

So, is one better than the other? No. They should be used in different cases and to achieve different goals.

Thread#pass should not be used to defer work but rather to provide a hint to Ruby, that there may be more important things that it could focus on.

Queue#pop on the other hand can act not only as a component of a queue but also as a part of multi-threaded applications flow control.

Concurrency is not easy. Thread management and selection of proper methods are as crucial as understanding your primary use-cases and building correct benchmarks. Sometimes minor tweaks can provide tremendous benefits.


Note: this post would not be possible without extensive help from Samuel Williams. Thank you!


Cover photo by Chris-Håvard Berge on Attribution-NonCommercial 2.0 Generic (CC BY-NC 2.0) . Image has been cropped.

The hidden cost of a Ruby threads leakage

Bug hunting

Recently I’ve been working with one small application that would gradually become slower and slower. While there were many reasons for it to happen, I found one of them interesting.

To give you a bit of context: the application was a simple single topic legacy Kafka consumer. I rewrote it to Karafka, and all of the logic looks like this:

class EventsConsumer < Karafka::BaseConsumer
  def initialize(...)
    super
    @processor = Processor.new
  end

  def consume
    @processor.call(params_batch.payloads)
  end
end

And the processor looks like so (I removed all the irrelevant code):

class Processor
  def initialize
    @queue = Queue.new
    @worker = Thread.new { work }
  end

  def call(events)
    # do something with events
    results = complex_inline_computation(events)
    @queue << results
  end

  private

  def work
    while true
      result = @queue.pop
      # some sort of async storing operation should go here
      p result
    end
  end

  def complex_inline_computation(events)
    events.join('-')
  end
end

So, we have a Karafka consumer with a processor with one background thread supposed to flush the data async. Nothing special, and when putting aside potential thread crashes, all looks good.

However, there is a hidden problem in this code that can reveal itself by slowly degrading this consumer performance over time.

Karafka uses a single persistent consumer instance per topic partition. When we start processing a given partition of a given topic for the first time, a new consumer instance is created. This by itself means that the number of threads we end up with is directly defined by the number of topics and their partitions we're processing with a single Karafka process.

If that was all, I would say it's not that bad. While for a single topic consuming process, with 20 partitions, we do end up with additional 20 threads, upon reaching this number, the degradation should stop.

It did not.

There is one more case where our legacy consumer and Karafka would spin-up additional threads because of the processor re-creation: Kafka rebalance. When rebalancing happens, new consumer instances are initialized. That means that each time scaling occurred, whether it would add or remove instances, a new processor thread would be created.

Fixing the issue

Fixing this issue without a re-design is rather simple. As long as we can live with a singleton and we know that our code won't be executed by several threads in parallel, we can just make the processor into a singleton:

class Processor
  include Singleton
  
  # Remaining code
end

class EventsConsumer < Karafka::BaseConsumer
  def initialize(...)
    super
    @processor = Processor.instance
  end

  # Remaining code
end

While it is not the optimal solution, in my case it was sufficient.

Performance impact

One question remains: what was the performance impact of having stale threads that were doing nothing?

I'll try to answer that with a more straightforward case than mine:

require 'benchmark'

MAX_THREADS = 100
STEP = 10
ITERS = 50000000

(0..MAX_THREADS).step(STEP).each do |el|
  STEP.times do
    Thread.new do
      q = Queue.new
      q.pop
    end
  end unless el.zero?

  # Give it a bit of time to initialize the threads
  sleep 5

  # warmup for jruby - thanks Charles!
  5.times do
    ITERS.times do ; a = "1"; end
  end

  Benchmark.bm do |x|
    x.report { ITERS.times do ; a = "1"; end }
  end
end

I've run this code 100 times and used the average time to minimize the randomness impact of other tasks running on this machine.

Here are the results for Ruby 2.7.2, 3.0.0-preview2 (with and without JIT) and JRuby 9.2.13, all limited with time taskset -c 1, to make sure that JRuby is running under the same conditions (single core):

CRuby performance degradation is more or less linear. The more threads you have that do nothing, the slower the overall processing happens. This does not affect JRuby as JVM threads support is entirely different than the CRubys.

What worries me more, though, is that Ruby 3.0 seems to degrade more than 2.7.2. My wild guess here is that it's because of Ractors code's overhead and other changes that impact threads scheduler.

Below you can find the time comparison for all the variants of CRuby:

It is fascinating that 3.0 is slower than 2.7.2 in this case, and I will try to look into the reasons behind it in the upcoming months.

Note: I do not believe it's the best use-case for JIT, so please do not make strong claims about its performance based on the charts above.

Summary

The more complex applications you build, the bigger chances are that you will have to have threads at some point. If that happens, please be mindful of their impact on your applications' overall performance.

Also, keep in mind that the moment you introduce background threads, the moment you should introduce proper instrumentation around them.


Cover photo by Chad Cooper on Attribution 2.0 Generic (CC BY 2.0) license.

Copyright © 2022 Closer to Code

Theme by Anders NorenUp ↑