Closer to Code

Tag: karafka framework (page 1 of 7)

The hidden cost of a Ruby threads leakage

Bug hunting

Recently I’ve been working with one small application that would gradually become slower and slower. While there were many reasons for it to happen, I found one of them interesting.

To give you a bit of context: the application was a simple single topic legacy Kafka consumer. I rewrote it to Karafka, and all of the logic looks like this:

class EventsConsumer < Karafka::BaseConsumer
  def initialize(...)
    super
    @processor = Processor.new
  end

  def consume
    @processor.call(params_batch.payloads)
  end
end

And the processor looks like so (I removed all the irrelevant code):

class Processor
  def initialize
    @queue = Queue.new
    @worker = Thread.new { work }
  end

  def call(events)
    # do something with events
    results = complex_inline_computation(events)
    @queue << results
  end

  private

  def work
    while true
      result = @queue.pop
      # some sort of async storing operation should go here
      p result
    end
  end

  def complex_inline_computation(events)
    events.join('-')
  end
end

So, we have a Karafka consumer with a processor with one background thread supposed to flush the data async. Nothing special, and when putting aside potential thread crashes, all looks good.

However, there is a hidden problem in this code that can reveal itself by slowly degrading this consumer performance over time.

Karafka uses a single persistent consumer instance per topic partition. When we start processing a given partition of a given topic for the first time, a new consumer instance is created. This by itself means that the number of threads we end up with is directly defined by the number of topics and their partitions we’re processing with a single Karafka process.

If that was all, I would say it’s not that bad. While for a single topic consuming process, with 20 partitions, we do end up with additional 20 threads, upon reaching this number, the degradation should stop.

It did not.

There is one more case where our legacy consumer and Karafka would spin-up additional threads because of the processor re-creation: Kafka rebalance. When rebalancing happens, new consumer instances are initialized. That means that each time scaling occurred, whether it would add or remove instances, a new processor thread would be created.

Fixing the issue

Fixing this issue without a re-design is rather simple. As long as we can live with a singleton and we know that our code won’t be executed by several threads in parallel, we can just make the processor into a singleton:

class Processor
  include Singleton
  
  # Remaining code
end

class EventsConsumer < Karafka::BaseConsumer
  def initialize(...)
    super
    @processor = Processor.instance
  end

  # Remaining code
end

While it is not the optimal solution, in my case it was sufficient.

Performance impact

One question remains: what was the performance impact of having stale threads that were doing nothing?

I’ll try to answer that with a more straightforward case than mine:

require 'benchmark'

MAX_THREADS = 100
STEP = 10
ITERS = 50000000

(0..MAX_THREADS).step(STEP).each do |el|
  STEP.times do
    Thread.new do
      q = Queue.new
      q.pop
    end
  end unless el.zero?

  # Give it a bit of time to initialize the threads
  sleep 5

  # warmup for jruby - thanks Charles!
  5.times do
    ITERS.times do ; a = "1"; end
  end

  Benchmark.bm do |x|
    x.report { ITERS.times do ; a = "1"; end }
  end
end

I’ve run this code 100 times and used the average time to minimize the randomness impact of other tasks running on this machine.

Here are the results for Ruby 2.7.2, 3.0.0-preview2 (with and without JIT) and JRuby 9.2.13, all limited with time taskset -c 1, to make sure that JRuby is running under the same conditions (single core):

CRuby performance degradation is more or less linear. The more threads you have that do nothing, the slower the overall processing happens. This does not affect JRuby as JVM threads support is entirely different than the CRubys.

What worries me more, though, is that Ruby 3.0 seems to degrade more than 2.7.2. My wild guess here is that it’s because of Ractors code’s overhead and other changes that impact threads scheduler.

Below you can find the time comparison for all the variants of CRuby:

It is fascinating that 3.0 is slower than 2.7.2 in this case, and I will try to look into the reasons behind it in the upcoming months.

Note: I do not believe it’s the best use-case for JIT, so please do not make strong claims about its performance based on the charts above.

Summary

The more complex applications you build, the bigger chances are that you will have to have threads at some point. If that happens, please be mindful of their impact on your applications’ overall performance.

Also, keep in mind that the moment you introduce background threads, the moment you should introduce proper instrumentation around them.


Cover photo by Chad Cooper on Attribution 2.0 Generic (CC BY 2.0) license.

Karafka framework 1.4.0 Release Notes (Ruby + Kafka)

This release mostly solves problems related to message deserialization and normalizes some of the naming conventions to ease during the upgrade to the upcoming 2.0 version.

Note: This release is the last release with ruby-kafka under the hood. We’ve already started the process of moving to rdkafka-ruby.

Note: If you are using Sidekiq-Backend plugin, please make sure that you’ve processed all the jobs from your Sidekiq queue before upgrading Karafka gems.

Changes (features, incompatibilities, etc)

consumer#metadata is now consumer#batch_metadata

This change is trivial: if you use batch consuming mode and you use the Consumer#metadata method, replace it with Consumer#batch_metadata.

# Karafka 1.3
class UsersConsumer < ApplicationConsumer
  def consume
    puts metadata
  end
end

# Karafka 1.4
class UsersConsumer < ApplicationConsumer
  def consume
    puts batch_metadata
  end
end

Message metadata available under #metadata method

Up to version 1.3, all the message metadata would be directly available under the root scope of the params object using both direct method reference as well as with #[] accessor.

While it felt like “The Rails way”, it had several side-effects, amongst which the biggest were the need of having a hash like API, issues with accessing metadata without payload deserialization, and a lack of clear separation between payload and the metadata.

From now on, you can use the params.metadata object to fetch all the metadata.

Note: we’ve preserved the direct metadata values fetching from the params object to preserve backwards compatibility.

# 1.3
params['partition'] #=> 0
params.partition #=> 0

# 1.4
params['partition'] #=> NoMethodError (undefined method '[]')

# This will work due to backward compatibility
params.partition #=> 0

# This is the recommended way of accessing metadata
params.metadata.partition #=> 0

# This will also work as metadata is a struct now
params.metadata[:partition] #=> 0
params.metadata['partition'] #=> 0

Message metadata access allowed without message deserialization

When accessing metadata, the payload is not being deserialized until #payload method is being used.

null message support in the default JSON deserializer

When the Kafka message payload is null / nil, deserialization won’t fail. Support for it was added as some of the Karafka users use log compaction with a nil payload. In case like that, #payload will return nil.

Karafka::Params::Params no longer inherits from a Hash

Karafka::Params::Params is now just a struct. This change is introduced to normalize the setup, limit the corner cases and simplify the interface only to methods that are really needed.

Documentation

Our Wiki has been updated accordingly to the 1.4 status. Please notify us if you find any incompatibilities.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Olderposts

Copyright © 2021 Closer to Code

Theme by Anders NorenUp ↑