Tag: karafka

Ruby concurrency is hard: how I became a Ruby on Rails contributor

For the past several weeks, I've been trying to fix a cranky spec in Karafka integrations suite, which in the end, lead me to become a Ruby on Rails micro-contributor and submitting similar fix to several other high-popularity projects from the Ruby ecosystem. Here's my story of trying to make sense of my specs and Ruby concurrency.

Ephemeral bug from a test-suite

Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework. To provide reliable OSS that is multi-threaded, I had to have the option to run my test suite concurrently to simulate how Karafka operates. Since it was a specific use case, I created my micro-framework.

Long story short: It runs end-to-end integration specs by running them in separate Ruby processes. Each starts Karafka, runs all the code in various configurations, connects to Kafka, checks assertions, and at the end, shuts down.

Such an approach allowed me to ensure that the process's whole lifecycle and its components work as expected. Specs are started with supervision, so in case of any hang, it will be killed after 5 minutes.

Karafka itself also has an internal shutdown supervisor. In case of a user shutdown request, if the shutdown takes longer than the defined expected time, Karafka will stop despite having things running. And this is what was happening with this single spec:

E, [2022-11-19T16:47:49.602718 #14843] ERROR -- : Forceful Karafka server stop
F, [2022-11-19T16:47:49.602825 #14843] FATAL -- : #<Karafka::Core::Monitoring::Event:0x0000562932d752b0 @id="error.occurred", @payload={:caller=>Karafka::Server, :error=>#<Karafka::Errors::ForcefulShutdownError: Karafka::Errors::ForcefulShutdownError>, :type=>"app.stopping.error"}>

This damn spec did not want to stop!

Many things are working under the hood:

  • workers that process jobs that could hang and force the process to wait
  • jobs queue that is also connected to the polling thread (to poll more data when no work is to be done)
  • listeners that poll data from Kafka that could hang
  • consumer groups with several threads polling Kafka data that might get stuck because of some underlying error
  • Other bugs in the coordination of work and states.

One thing that certainly worked was the process supervision that would forcefully kill it after 30 seconds.

Process shutdown coordination

The graceful shutdown of such a process takes work. When you have many connections to Kafka, upon a poorly organized shutdown, you may trigger several rebalances that may cause short-lived topics assignments causing nothing except friction and potentially blocking the whole process.

To mitigate this, Karafka shuts down actively and gracefully. That is, until the absolute end, it claims the ownership of given topics and partitions, actively waiting for all the current work to be finished. This looks more or less like so:

Note: Consumer groups internally in Karafka are a bit different than Kafka consumer groups. Here we focus on internal Karafka concepts.

Pinpointing the issue

After several failed attempts and fixing other bugs, I added a lot of extra instrumentation to check what Karafka hangs on. It was hanging because there were hanging listener threads!

As stated above, to close Karafka gracefully, all work from the jobs queue needs to be finished, and listeners that poll data from Kafka need to be able to exit the polling loops. It's all coordinated using a job queue. The job queue we're using is pretty complex with some blocking capabilities, and you can read about it here, but the interesting part of the code can be reduced to this:

@semaphores = Concurrent::Map.new { |h, k| h[k] = Queue.new }

Those queues are used as semaphores in the polling loops until all the current work is done. Since each Queue is assigned to a different subscription group within its thread and hidden behind a concurrent map, there should be no problem. Right?

Reproduction

Once I had my crazy suspicion, I decided to reduce it down to a proof of concept:

require 'concurrent-ruby'

100.times do
  ids = Set.new
  semaphores = Concurrent::Hash.new { |h, k| h[k] = Queue.new }

  100.times.map do
    Thread.new do
      ids << semaphores['test'].object_id
    end
  end.each(&:join)

  raise "I expected 1 semaphore but got #{ids.size}" if ids.size != 1
end

once executed, boom:

poc.rb:13:in `<main>': I expected 1 semaphore but got 2 (RuntimeError)

There is more than one semaphore for one listener! This caused Karafka to wait until forced to stop because the worker thread would use a different semaphore than the listener thread.

But how is that even possible?

Well, Concurrent::Hash and Concurrent::Map initialization is indeed thread-safe but not precisely as you would expect them to be. The docs state that:

This version locks against the object itself for every method call, ensuring only one thread can be reading or writing at a time. This includes iteration methods like #each, which takes the lock repeatedly when reading an item.

"only one thread can be reading or writing at a time". However, we are doing both at different times. Our code:

semaphores = Concurrent::Hash.new { |h, k| h[k] = Queue.new }

is actually equivalent to:

semaphores = Concurrent::Hash.new do |h, k|
  queue = Queue.new
  h[k] = queue
end

and the block content is not locked fully. One threads queue can overwrite the other if the Ruby scheduler stops the execution in the middle. Here's the flow of things happening in the form of a diagram:

Once in a while listener would receive a dangling queue object, effectively blocking the polling process.

Fixing the issue

This can be fixed either by replacing the Concurrent::Hash with Concurrent::Map and using the #compute_if_absent method or by introducing a lock inside of the Concurrent::Hash initialization block:

Concurrent::Map.new do |k, v|
  k.compute_if_absent(v) { [] }
end

mutex = Mutex.new

Concurrent::Hash.new do |k, v|
  mutex.synchronize do
    break k[v] if k.key?(v)
    k[v] = []
  end
end

Okay, but what does Ruby on Rails and other projects do with all of this?

Fixing the world

I figured out that if I made this mistake, maybe others did. I decided to check my local gems to find occurrences quickly. Inside my local gem cache, I executed the following code:

fgrep -R 'Concurrent::Hash.new {' ./
fgrep -R 'Concurrent::Hash.new do' ./
fgrep -R 'Concurrent::Map.new {' ./
fgrep -R 'Concurrent::Map.new do' ./

and validated that I'm not an isolated case. I wasn't alone!

Then using Sourcegraph I pinpointed a few projects that had the potential for fixes:

  • rails (activesupport and actionview)
  • i18n
  • dry-schema
  • finite_machine
  • graphql-ruby
  • rom-factory
  • apache whimsy
  • krane
  • puppet

I am not a domain expert in any of those, and understanding the severity of each was beyond my time constraints, but I decided to give it a shot.

Rails (ActiveSupport and ActionView)

Within Rails, this "pattern" was used twice: in ActiveSupport and ActionView.

In ActionView, it was used within a cache:

PREFIXED_PARTIAL_NAMES = Concurrent::Map.new do |h, k|
  h[k] = Concurrent::Map.new
end

and assuming that the cached result is stateless (same result each time for the same key), the issue could only cause an extra computation upon first parallel requests to this cache.

In the case of ActiveSupport, none of the concurrency code was needed, so I just replaced it with a simple Hash.

Both, luckily, were not that severe, though worth fixing nonetheless.

PR: https://github.com/rails/rails/pull/46536
PR: https://github.com/rails/rails/pull/46534

Both were merged, and this is how I became a Ruby on Rails contributor :)

i18n

This case was slightly more interesting because the concurrent cache stores all translations. In theory, this could cause similar leakage as in Karafka, effectively losing a language by loading it to a different Concurrent::Hash:

100.times.map do
  Thread.new do
    I18n.backend.store_translations(rand.to_s, :foo => { :bar => 'bar', :baz => 'baz' })
  end
end.each(&:join)

I18n.available_locales.count #=> 1

This could lead to hard-to-debug problems. Once in a while, your system could raise something like this:

:en is not a valid locale (I18n::InvalidLocale)

without an apparent reason, and this problem would go away after a restart.

PR: https://github.com/ruby-i18n/i18n/pull/644

dry-schema

Another cache case where the risk would revolve around double-computing.

PR: https://github.com/dry-rb/dry-schema/pull/440

rom-factory

This one is interesting! Let's reduce the code to a smaller POC first and see what will happen under heavy threading:

require 'singleton'
require 'concurrent-ruby'

class Sequences
  include Singleton

  attr_reader :registry

  def initialize
    reset
  end

  def next(key)
    registry[key] += 1
  end

  def reset
    @registry = Concurrent::Map.new { |h, k| h[k] = 0 }
    self
  end
end

seq = Sequences.instance

loop do
  100.times.map do
    Thread.new { seq.next('boom') }
  end.each(&:join)

  size = seq.registry['boom']

  raise "Wanted 100 but got #{size}" unless size == 100

  seq.reset
end
poc.rb:37:in `block in <main>': Wanted 100 but got 1 (RuntimeError)

The counter value gets biased. What is even more interesting is that making the map safe won't be enough:

@registry = Concurrent::Map.new { |h, k| h.compute_if_absent(k) { 0 } }
poc.rb:36:in `block in <main>': Wanted 100 but got 55 (RuntimeError)

there is one more "unsafe" method:

def next(key)
  registry[key] += 1
end

this operation also is not atomic, thus needs to be wrapped with a mutex:

def initialize
  @mutex = Mutex.new
  reset
end

def next(key)
  @mutex.synchronize do
    registry[key] += 1
  end
end

Only then is this code safe to be used.

https://github.com/rom-rb/rom-factory/pull/80

Other repositories

Summary

In my opinion, there are a few outcomes of this story:

  • Karafka has a solid test-suite!
  • If you are doing concurrency-related work, you better test it in a multi-threaded environment and test it well.
  • Concurrency is hard to many of us (maybe that's because we are special ;) ).
  • RTFM and read it well :)
  • Do not be afraid to help others by submitting pull requests!

On the other hand, looking at the frequency of this issue, it may be worth opening a discussion about changing this behavior and making the initialization fully locked.

Afterwords

Concurrent::Hash under cRuby is just a Hash. You can check it out here.


Cover photo by James Broad on Attribution-NonCommercial-ShareAlike 2.0 Generic (CC BY-NC-SA 2.0). Image has been cropped.

Karafka framework 2.0 announcement

I'm thrilled to announce the new and shiny Karafka 2.0. It is an effect of my work of almost four years.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework.

Karafka 2.0 is a major rewrite that brings many new things to the table but removes specific concepts that were not as good as I initially thought when I created them.

In this announcement article, I will describe the most noticeable features and improvements that got into this release. If you are interested in a more comprehensive list, you can find it here.

Note: Upgrade notes for migration from Karafka 1.4 to Karafka 2.0 can be found here.

Getting started

If you are new to Karafka and want to play around, follow this demo or visit the Getting Started page:

Noticeable features and improvements

This section includes all the noticeable changes you may be interested in if you already work with Karafka or if you want to understand the journey.

Multi-threading

Most of the engineering work around this release was about performance, scalability, and improvement of the overall engineering experience.

Multi-threading is probably the most significant change in Karafka since it was created. Up until now, Karafka was single-threaded. That means that any concurrency would have to be implemented by the end user. The reason is dead simple: concurrency is hard. Synchronization is hard. Warranties are hard. I do feel (and can back it up with integration specs) that I tackled it pretty well.

Karafka 2.0 uses native Ruby threads to achieve concurrent processing in three scenarios:

  • for concurrent processing of messages from different topics partitions.
  • for concurrent processing of messages from a single partition when using the Virtual Partitions feature.
  • to handle consumer groups management (each consumer group defined will be managed by a separate thread)

This can bring big advantages when any IO is involved.

When you start consuming messages, Karafka will fetch and distribute data to utilize multiple threads while preserving all the Kafka ordering warranties.

Years ago, I developed a lot of in-app async code to bypass Karafka limitations, and it makes me extremely happy to be able to retire all of it.

But wait, there's more...

Virtual Partitions

Virtual Partitions allow you to parallelize the processing of data from a single partition. This can drastically increase throughput when IO operations are involved.

While the default scaling strategy for Kafka consumers is to increase partitions count and number of consumers, in many cases, this will not provide you with desired effects. In the end, you cannot go with this strategy beyond assigning one process per single topic partition. That means that without a way to parallelize the work further, IO may become your biggest bottleneck.

Virtual Partitions solve this problem by providing you with the means to further parallelize work by creating "virtual" partitions that will operate independently but will obey all the Kafka warranties as a collective processing unit.

topic :orders_states do
  consumer OrdersStatesConsumer
  # Distribute work to virtual partitions based on the user id
  virtual_partitions(
    partitioner: ->(message) { message.payload[:user_id] }
  )
end

With Virtual Partitions, you benefit from both worlds: scaling with Kafka partitions and scaling with Ruby threads.

*This example illustrates the throughput difference for IO intense work, where the IO cost of processing a single message is 1ms.

Active Job support

Active Job is a standard interface for interacting with job runners in Ruby on Rails. Active Job can be configured to work with Karafka.

While Kafka is not a message queue, I still decided to create an Active Job adapter for it. Why? Because ordered jobs are something, I always wished for Ruby on Rails to have. On top of that, you may already have Kafka and only a few jobs to run. If so, why not use it and save yourself a hustle of yet another tool to maintain?

class Application < Rails::Application
  # ...
  config.active_job.queue_adapter = :karafka
end

End-to-end integration test suite

Karafka comes with a home-brew framework for running end-to-end integration specs against Kafka. I did my best to describe every possible case I could have imagined to ensure that the framework behaves as expected under any circumstances.

It is also a great place to learn about how Karafka behaves in particular scenarios.

Lower supply chain fingerprint

The number of external dependencies Karafka relies on has been reduced significantly. It was done to ensure that Karafka can be integrated into and upgraded in applications without causing dependency conflicts.

Upgraded documentation

Karafka and WaterDrop have been fully updated with several new sections describing use-cases, edge-cases and providing help and suggestions for both simple and advanced usage.

Out-of-the-box DataDog and StatsD instrumentation

Using DataDog or StatsD? In just a few lines you can enable full instrumentation of both consumption and production of messages:

# initialize the listener with statsd client
dd_listener = ::Karafka::Instrumentation::Vendors::Datadog::Listener.new do |config|
  config.client = Datadog::Statsd.new('localhost', 8125)
  # Publish host as a tag alongside the rest of tags
  config.default_tags = ["host:#{Socket.gethostname}"]
end

# Subscribe with your listener to Karafka and you should be ready to go!
Karafka.monitor.subscribe(dd_listener)

License change

Karafka 2.0 is dual licensed under LGPL and a Commercial License. Depending on your use-case, you should be good with one or the other.

Note: Before the license change, I did obtain the consent of all the contributors for a re-license. I want to say thank you to each of you for allowing me to do so.

Seamless Ruby on Rails integration

Karafka always had good integration with Ruby on Rails. With the 2.0 release, however, this integration is elevated to another level: no more files editing, no more configuration copying. Everything works out of the box.

Karafka Pro

This release is the first release that includes a Pro subscription.

Building a complex and reliable open-source is neither easy nor fast. Many companies rely on Karafka, and following Mikes Perham advice I have decided to introduce the Pro subscription to be able to support the further development of the ecosystem.

Karafka Pro has many valuable, well-documented, well-tested functionalities that can significantly improve your day-to-day operations with Kafka in Ruby. It also introduces commercial support, as due to a sheer number of questions and requests, I do need to have a way to prioritize those.

SInce it's not only me, 20% of the income will be further distributed down the supply chain pipeline to support the work of people I rely on.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

Karafka 1.4 maintenance

With this release an official EOL policies have been introduced. Karafka 1.4 will be supported until the end of February 2023.

Karafka 2.0 has a lower dependency fingerprint and is in everything 1.4 was not. I strongly encourage you to upgrade.

What's ahead

Many things. This release is just the beginning. I am already working on a 2.1 release that will include several great additions, including:

  • Management Web-UI similar to the one Resque and Sidekiq have
  • Producer transactions
  • At Rest encryption
  • CurrentAttributes support for ActiveJob
  • Seamless Dead-Letter Queue integration

Upgrade notes

Upgrade notes for migration from Karafka 1.4 to Karafka 2.0 can be found here.

References


Stay tuned and don't forget to join our Slack channel.

Copyright © 2022 Closer to Code

Theme by Anders NorenUp ↑