Under the Hood: Enhancing Karafka’s CPU and Memory Efficiency

Introduction

Now and then, I like to go on a performance improvement hunt because life isn't just about adding new features. Recently, I have been focusing on enhancing efficiency, particularly regarding CPU and memory usage in Karafka. Three of my recent pull requests (PR 117, PR 118, PR 123), have made some minor improvements, and this article is about them. These changes help reduce memory allocation and improve time tracking management in Karafka and WaterDrop.

Most of the time, such improvements are not significant, but when applied in crucial places, they can make things a bit faster.

When doing OSS, I think of myself as a middleman. Karafka runs in tens of thousands of processes, and improvements affecting the consumption or production of messages (especially when applicable per message) can make a difference when multiplied.

Shaving Memory Allocations

PR 117 targets memory savings by optimizing instrumentation data handling. The primary change involves reducing unnecessary array allocations during instrumentation. Previously, every instrumentation event would allocate a new array, leading to excessive memory usage. The updated code minimizes these allocations by inlining the tracking code.

Here's a simple instrumentation layer one could build:

# This is a simplified bus without too many features
#
# It supports simple instrumentation piping details to listeners and returning the results
class Notifications
  def initialize(*listeners)
    @listeners = listeners
  end

  def instrument(name, &block)
    result, time = measure_time_taken(&block)

    details = { name: name, result: result, time: time }

    @listeners.each do |listener|
      listener.call(details)
    end

    result
  end

  private

  def monotonic_now
    ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) * 1_000
  end

  def measure_time_taken
    start = monotonic_now
    result = yield
    [result, monotonic_now - start]
  end
end

It's a fairly simple instrumentation that allows you to have listeners and wrap your code with it for tracking:

bus = Notifications.new(
  ->(details) { puts "Event: #{details[:name]} occurred" },
  ->(details) { puts("  and it took #{details[:time]}ms to run") }
)

bus.instrument('my.random.sleep') do
  sleep_time = rand
  sleep(sleep_time)
  sleep_time
end

# Event: my.random.sleep occurred
#   and it took 799.0296444892883ms
# => 0.7981784182914173

It works and is actually quite fast:

bus = Notifications.new
EVENT = 'test'
puts Benchmark.measure { 1_000_000.times { bus.instrument(EVENT) {} } }

# 0.538294   0.000000   0.538294 (  0.539663)

However, there is one thing that is fundamentally wrong with this code, and that is time tracking. It may not be visible at first, but when you start counting object allocations, this is what you end up with:

GC.disable

def count_objects_created
  # Take a snapshot of the current object counts
  initial_counts = ObjectSpace.each_object.count

  yield if block_given?

  # Take another snapshot after running the block
  final_counts = ObjectSpace.each_object.count

  # Calculate the difference
  new_objects_created = final_counts - initial_counts

  puts "Number of objects created: #{new_objects_created}"
end

bus = Notifications.new
EVENT = 'test'

count_objects_created do
  1_000_000.times { bus.instrument(EVENT){} }
end

# Number of objects created: 2000002

there are twice as many objects and all of this because of the return value of the time measurement:

def measure_time_taken
  start = monotonic_now
  result = yield
  [result, monotonic_now - start]
end

It returns an array of results and time measurements. It may not be a lot but nonetheless, lets inline this instead of delegating to a method:

def instrument(name)
  start = monotonic_now
  result = yield
  time = monotonic_now - start

  details = { name: name, result: result, time: time }

  @listeners.each do |listener|
    listener.call(details)
  end

  result
end

With this change we no longer allocate the arrays:

bus = Notifications.new
EVENT = 'test'

count_objects_created do
  1_000_000.times { bus.instrument(EVENT){} }
end

# Number of objects created: 1000002

*Numer of objects allocated per one million of instrumentation calls.

You may question why it is relevant and whether it provides significant benefits. I would say that it depends. Karafka is heavily instrumented, and under heavy load, this simple change saves 20-30 thousand allocations per second of execution.

If a Tree Falls: Fast Paths for Unmonitored Events

As I mentioned, Karafka and WaterDrop are heavily instrumented. Since different people can use different events for different use cases (logging, AppSignal instrumentation, or Web UI), there is no silver bullet regarding what to instrument and what not to do. This means that Karafka emits many events during its execution. Same with WaterDrop. During my optimization session, I wondered if there's even a point in measuring and publishing the instrumentation results when no one listens. And this is what the PR 123 is about. If no one is listening, there is no point in making any sound.

Below you can find a simplified previous version of the instrument method. Similar code can be found for example, in dry-monitor and dry-events

# Before
def instrument(name)
  start = monotonic_now
  result = yield
  time = monotonic_now - start

  details = { name: name, result: result, time: time }

  @listeners[name].each do |listener|
    listener.call(details)
  end

  result
end

It's nice and fairly fast. But in the case of publishing many events, it may be optimized as long as we have a way to check if there are any listeners:

# After
def instrument(name)
  listeners = @listeners[name]

  # Measure nothing since no one will use it
  return yield unless listeners

  start = monotonic_now
  result = yield
  time = monotonic_now - start

  details = { name: name, result: result, time: time }

  @listeners[name].each do |listener|
    listener.call(details)
  end

  result
end

For a fast track instrumentation cases, the changed code is over 2.5x faster:

# Based on the PR change measurements

1.830753   0.005683   1.836436 (  1.843820)
# vs.
0.759046   0.000000   0.759046 (  0.759051)

*Time difference when executing instrumentation in the fast-path scenario 1 million times.

This also saves on memory allocations because when no one will use the results, the event details are not being built at all.

Timing Time

The last optimization in this batch was about time measurements. Karafka uses two types of time tracking:

  • A monotonic clock is used to measure distance in time in a way that would not break because of time zone changes, yearly second alignments, or anything else. The CPU monotonic clock is a clock source that provides a monotonically increasing time value, which means it will never go backward.

  • Regular UTC-based time: A normal clock is used to take snapshots when something has happened.

def monotonic_now
  # Returns float with seconds, we wanted in milliseconds
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) * 1_000
end

def float_now
  ::Time.now.utc.to_f
end

The above methods are fairly simple but they do contain space for two improvements:

The first thing that can be done is the elimination of the millisecond multiplication. Ruby now supports the float_millisecond time that allows us not to deal with the multiplication explicitly:

def monotonic_now
  ::Process.clock_gettime(::Process::CLOCK_MONOTONIC, :float_millisecond)
end

The difference is not significant (if any), but this at least simplifies the code and makes it more explicit.

While the above did not yield performance gains, the latter is much more interesting:

def float_now
  ::Time.now.utc.to_f
end

The above code creates a few objects down the road and then casts the time into a float. This may not seem expensive at first, but keep in mind that this code may run thousands of times per second in Karafka. Since our goal is to get a float time, this can also be replaced with a system clock invocation:

def float_now
  ::Process.clock_gettime(Process::CLOCK_REALTIME)
end

The difference in terms of performance is big:

6.405367   0.546916   6.952283 (  6.952844)
# vs.
1.887427   0.003451   1.890878 (  1.897118)

*Time difference when fetching the real-time in both ways.

Using the process clock is 3,4x times faster!

Conclusion

The recent optimizations in Karafka, shown in the PRs mentioned, reflect my commitment to pushing this project forward. As I refine Karafka, I aim to deliver top-notch data-streaming tools for the Ruby community. Every optimization brings Karafka Core closer to achieving the best possible performance and efficiency.

The librdkafka Supply Chain Breakdown: rdkafka-ruby’s Darkest Hour

Opening Note

We all make mistakes, and fundamentally, the havoc caused by this incident was due to a flaw in the design of rdkafka-ruby. While the disappearance of librdkafka from GitHub was unexpected, this article aims to clarify and explain how rdkafka-ruby should have prevented it and what was poorly designed. By examining this incident, I hope to provide insights into better practices for managing dependencies and ensuring more resilient software builds for the Ruby ecosystem.

Incident Summary

On July 10, 2024 15:47 UTC, users of the rdkafka gem faced issues when the librdkafka repository on GitHub unexpectedly went private. This break in the supply chain disrupted installations, causing widespread frustration and, in many cases, completely blocking the ability to deploy rdkafka-based software.

Fetching rdkafka 0.16.0
Installing rdkafka 0.16.0 with native extensions
Gem::Ext::BuildError: ERROR: Failed to build gem native extension.

    current directory: /rdkafka-0.16.0/ext
/usr/local/bin/ruby -rrubygems
/rake-13.2.1/exe/rake
RUBYARCHDIR\=/home/circleci/.rubygems/extensions/x86_64-linux/3.3.0/rdkafka-0.16.0
RUBYLIBDIR\=/home/circleci/.rubygems/extensions/x86_64-linux/3.3.0/rdkafka-0.16.0
2 retrie(s) left for v2.4.0 (404 Not Found)
1 retrie(s) left for v2.4.0 (404 Not Found)
0 retrie(s) left for v2.4.0 (404 Not Found)
404 Not Found
rake aborted!
Errno::ENOENT: No such file or directory @ rb_sysopen - ports/archives/v2.4.0
(Errno::ENOENT)
/mini_portile2-2.8.7/lib/mini_portile2/mini_portile.rb:496:in
`verify_file'
/mini_portile2-2.8.7/lib/mini_portile2/mini_portile.rb:133:in
`block in download'
/mini_portile2-2.8.7/lib/mini_portile2/mini_portile.rb:131:in
`each'
/mini_portile2-2.8.7/lib/mini_portile2/mini_portile.rb:131:in
`download'
/mini_portile2-2.8.7/lib/mini_portile2/mini_portile.rb:232:in
`cook'
/rdkafka-0.16.0/ext/Rakefile:38:in `block
in <top (required)>'
/rake-13.2.1/exe/rake:27:in `<main>'
Tasks: TOP => default
(See full trace by running task with --trace)

Detailed Explanation

The rdkafka gem used to rely on downloading librdkafka from the Confluent GitHub repository during the installation process. As a huge proponent of immutable builds that do not depend on external resources, I planned to change this model for a long time. Several months ago, I created a GitHub issue to address this transition. However, the change was delayed due to other priorities within the karafka ecosystem. Unfortunately, this delay resulted in the recent outage.

# Just the relevant code here

recipe.files << {
  :url => "https://codeload.github.com/edenhill/librdkafka/tar.gz/v#{Rdkafka::LIBRDKAFKA_VERSION}",
  :sha256 => Rdkafka::LIBRDKAFKA_SOURCE_SHA256
}

recipe.configure_options = ["--host=#{recipe.host}"]
recipe.cook

This setup meant that during the bundle install process, the required librdkafka source was fetched and compiled on the fly, which inherently relied on the availability of the external GitHub repository.

Upon discovery, it took me 59 minutes to release the first patched version and approximately four hours to prepare fixes and backport them to all relevant versions of the rdkafka gem, including older ones. Luckily, I was in front of my computer when the incident occurred, allowing me to quickly create and release needed fixes.

Future Steps

Going forward, all future releases will depend only on RubyGems, ensuring no reliance on external build sources like GitHub. I decided to ship the librdkafka releases inside the gem itself, enhancing its reliability and stability of the ecosystem.

releases = File.expand_path(File.join(File.dirname(__FILE__), '../dist'))

recipe.files << {
  :url => "file://#{releases}/librdkafka_#{Rdkafka::LIBRDKAFKA_VERSION}.tar.gz",
  :sha256 => Rdkafka::LIBRDKAFKA_SOURCE_SHA256
}
recipe.configure_options = ["--host=#{recipe.host}"]
recipe.cook

Fragility of the OSS Supply Chain

This incident highlights our dependence on other OSS projects and repositories. It's essential to remember that mistakes can happen, and we must be prepared. This wasn't the first issue with GitHub downloads. In 2023, a change in GitHub's tar layout broke a lot of software, including ours, that relied on checksums for artifacts verification. To be honest, if we had migrated the building process of rdkafka at that time, this article would not have to be written.

Here are my main takeaways from this incident:

  1. Design Flaws Can Amplify Issues: The incident highlighted how design flaws in dependency management can lead to significant disruptions.
  2. Dependency on External Repositories: Relying on external data sources during the build process can pose risks, mainly when unexpected changes occur.
  3. Importance of Immutable Builds: Adopting immutable builds without external resources can enhance reliability and stability.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑