Tag: karafka

Karafka framework 2.0 announcement

I'm thrilled to announce the new and shiny Karafka 2.0. It is an effect of my work of almost four years.

For those who wonder what Karafka is, Karafka is a Ruby and Rails multi-threaded efficient Kafka processing framework.

Karafka 2.0 is a major rewrite that brings many new things to the table but removes specific concepts that were not as good as I initially thought when I created them.

In this announcement article, I will describe the most noticeable features and improvements that got into this release. If you are interested in a more comprehensive list, you can find it here.

Note: If you are looking for upgrade notes, they will be provided as a separate article.

Getting started

If you are new to Karafka and want to play around, follow this demo or visit the Getting Started page:

Noticeable features and improvements

This section includes all the noticeable changes you may be interested in if you already work with Karafka or if you want to understand the journey.

Multi-threading

Most of the engineering work around this release was about performance, scalability, and improvement of the overall engineering experience.

Multi-threading is probably the most significant change in Karafka since it was created. Up until now, Karafka was single-threaded. That means that any concurrency would have to be implemented by the end user. The reason is dead simple: concurrency is hard. Synchronization is hard. Warranties are hard. I do feel (and can back it up with integration specs) that I tackled it pretty well.

Karafka 2.0 uses native Ruby threads to achieve concurrent processing in three scenarios:

  • for concurrent processing of messages from different topics partitions.
  • for concurrent processing of messages from a single partition when using the Virtual Partitions feature.
  • to handle consumer groups management (each consumer group defined will be managed by a separate thread)

This can bring big advantages when any IO is involved.

When you start consuming messages, Karafka will fetch and distribute data to utilize multiple threads while preserving all the Kafka ordering warranties.

Years ago, I developed a lot of in-app async code to bypass Karafka limitations, and it makes me extremely happy to be able to retire all of it.

But wait, there's more...

Virtual Partitions

Virtual Partitions allow you to parallelize the processing of data from a single partition. This can drastically increase throughput when IO operations are involved.

While the default scaling strategy for Kafka consumers is to increase partitions count and number of consumers, in many cases, this will not provide you with desired effects. In the end, you cannot go with this strategy beyond assigning one process per single topic partition. That means that without a way to parallelize the work further, IO may become your biggest bottleneck.

Virtual Partitions solve this problem by providing you with the means to further parallelize work by creating "virtual" partitions that will operate independently but will obey all the Kafka warranties as a collective processing unit.

topic :orders_states do
  consumer OrdersStatesConsumer
  # Distribute work to virtual partitions based on the user id
  virtual_partitioner ->(message) { message.payload[:user_id] }
end

With Virtual Partitions, you benefit from both worlds: scaling with Kafka partitions and scaling with Ruby threads.

*This example illustrates the throughput difference for IO intense work, where the IO cost of processing a single message is 1ms.

Active Job support

Active Job is a standard interface for interacting with job runners in Ruby on Rails. Active Job can be configured to work with Karafka.

While Kafka is not a message queue, I still decided to create an Active Job adapter for it. Why? Because ordered jobs are something, I always wished for Ruby on Rails to have. On top of that, you may already have Kafka and only a few jobs to run. If so, why not use it and save yourself a hustle of yet another tool to maintain?

class Application < Rails::Application
  # ...
  config.active_job.queue_adapter = :karafka
end

End-to-end integration test suite

Karafka comes with a home-brew framework for running end-to-end integration specs against Kafka. I did my best to describe every possible case I could have imagined to ensure that the framework behaves as expected under any circumstances.

It is also a great place to learn about how Karafka behaves in particular scenarios.

Lower supply chain fingerprint

The number of external dependencies Karafka relies on has been reduced significantly. It was done to ensure that Karafka can be integrated into and upgraded in applications without causing dependency conflicts.

Upgraded documentation

Karafka and WaterDrop have been fully updated with several new sections describing use-cases, edge-cases and providing help and suggestions for both simple and advanced usage.

Out-of-the-box DataDog and StatsD instrumentation

Using DataDog or StatsD? In just a few lines you can enable full instrumentation of both consumption and production of messages:

# initialize the listener with statsd client
dd_listener = ::Karafka::Instrumentation::Vendors::Datadog::Listener.new do |config|
  config.client = Datadog::Statsd.new('localhost', 8125)
  # Publish host as a tag alongside the rest of tags
  config.default_tags = ["host:#{Socket.gethostname}"]
end

# Subscribe with your listener to Karafka and you should be ready to go!
Karafka.monitor.subscribe(dd_listener)

License change

Karafka 2.0 is dual licensed under LGPL and a Commercial License. Depending on your use-case, you should be good with one or the other.

Note: Before the license change, I did obtain the consent of all the contributors for a re-license. I want to say thank you to each of you for allowing me to do so.

Seamless Ruby on Rails integration

Karafka always had good integration with Ruby on Rails. With the 2.0 release, however, this integration is elevated to another level: no more files editing, no more configuration copying. Everything works out of the box.

Karafka Pro

This release is the first release that includes a Pro subscription.

Building a complex and reliable open-source is neither easy nor fast. Many companies rely on Karafka, and following Mikes Perham advice I have decided to introduce the Pro subscription to be able to support the further development of the ecosystem.

Karafka Pro has many valuable, well-documented, well-tested functionalities that can significantly improve your day-to-day operations with Kafka in Ruby. It also introduces commercial support, as due to a sheer number of questions and requests, I do need to have a way to prioritize those.

SInce it's not only me, 20% of the income will be further distributed down the supply chain pipeline to support the work of people I rely on.

Help me build and maintain a high-quality Kafka ecosystem for Ruby and Ruby on Rails.

Buy Karafka Pro.

Karafka 1.4 maintenance

With this release an official EOL policies have been introduced. Karafka 1.4 will be supported until the end of February 2023.

Karafka 2.0 has a lower dependency fingerprint and is in everything 1.4 was not. I strongly encourage you to upgrade.

What's ahead

Many things. This release is just the beginning. I am already working on a 2.1 release that will include several great additions, including:

  • Management Web-UI similar to the one Resque and Sidekiq have
  • Producer transactions
  • At Rest encryption
  • CurrentAttributes support for ActiveJob
  • Seamless Dead-Letter Queue integration

References


Stay tuned and don't forget to join our Slack channel.

Reduce your method calls by 99.9% by replacing Thread#pass with Queue#pop

When doing multi-threaded work in Ruby, there are a couple of ways to control the execution flow within a given thread. In this article, I will be looking at Thread#pass and Queue#pop and how understanding each of them can help you drastically optimize your applications.

Thread#pass - what it is and how does it work

One of the ways you can ask the scheduler to "do something else" is by using the Thread#pass method.

Where can you find it? Well, aside from Karafka, for example in one of the most recent additions to ActiveRecord called #load_async (pull request).

Let's see how it works and why it may or may not be what you are looking for when building multi-threaded applications.

Ruby docs are rather minimalistic with its description:

Give the thread scheduler a hint to pass execution to another thread. A running thread may or may not switch, it depends on OS and processor.

That means that when dealing with threads, you can tell Ruby that it would not be a bad idea to switch from executing the current one and focusing on others.

By default, all the threads you create have the same priority and are treated the same way. An excellent illustration of this is the code below:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    # Make threads wait for a bit so all threads are created
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

# for i in {1..1000}; do ruby threads.rb; done > results.txt

on average, the computation in each of them took a similar amount of time:

The difference in between the fastest and the slowest thread was less than 8%.

However, when one of the threads "passes," things change drastically:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      Thread.pass if i.zero?

      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

Now, thread zero takes twice as much time as other threads doing the same job.

What is worth pointing out is that this method does not stop the execution flow by itself, and it just suggests to Ruby that there may be other more important things to do.

Exactly this behaviour was used by Jean Boussier in ActiveRecord:

def schedule_query(future_result) # :nodoc:
  @async_executor.post { future_result.execute_or_skip }
  Thread.pass
end

This code schedules a background job and suggests to the scheduler that it may be worth doing that or other things somewhere else.

It is worth mentioning, that when all the threads use the Thread#pass, it becomes a colossal burden to the Ruby VM. Ruby goes crazy since none of the threads wants to do any work and the execution time increases over 100 times.

Queue#pop - What it is and how does it work

Queue is a well known class and #popis one of the most important methods it contains.

Here is what Ruby docs say about the Queue class and the #pop method:

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

#pop: If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

When asked about queues, most programmers think about workers consuming jobs from a queue:

numbers = Queue.new

threads = 10.times.map do |i|
  Thread.new do
    while number = numbers.pop
      result = Time.now.to_f / number

      # a bit of randomness
      sleep(rand / 1_000)

      puts "Thread #{i},#{result}"
    end
  end
end

10_000.times { numbers << rand }

# see what I did here? ;)
Thread.pass until numbers.empty?

numbers.close

threads.each(&:join)

What is worth keeping in mind about Queue#pop is that it will block the execution in a given thread until there is something to do. This means, that a blocked thread becomes almost "invisible" from the performance perspective. Here's an example of running computations with 0 , 4, 9 and 99 blocked threads:

queue = Queue.new

THREADS = 4

THREADS.times do
  Thread.new { queue.pop }
end

# Wait until all the threads are initialized
Thread.pass until queue.num_waiting == THREADS

start = Time.now.to_f

10_000_000.times do
  start / rand
end

puts Time.now.to_f - start

As you can see, inactive threads do not have a big impact on the overall performance of this code. Even with 99 extra threads, the end result is not far away from the baseline.

Reducing method calls in a multi-threaded environment

Now that you know what Thread#pass and Queue#pop do, lets put them to work in a real use case. For that to happen we will be looking into the Karafka framework.

Karafka is a framework used to simplify Apache Kafka-based Ruby applications development that I built. The version 2.0 supports work distribution across multiple threads. The way it works from a data processing perspective is quite simple:

1. Take some data from Kafka
2. Divide it into processing units (jobs)
3. Put all the jobs into a queue
4. Wait for all the workers to pick the jobs and finish all the work
5. Repeat endlessly

Assuming an endless stream of data available, this can be pretty much modelled as followed:

queue = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
    end
  end
end

def wait_for_jobs_to_finish(queue)
  Thread.pass while queue.num_waiting < THREADS || !queue.empty?
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(queue)
end

And this is how the listener loop together with jobs distributions was implemented by me initially.

When benchmarked in regards to the number of times Thread#pass was executed on a pass-through benchmark (where we measure max throughput), things looked solid.

Despite increased number of iterations, we would not wait more often per iteration. What that means, is that our jobs were short enough for them to finish prior to Ruby returning to the wait loop.

Things become much more interesting if we assume that our jobs take more time than Ruby gives them before thread execution is interrupted. Then things start to look differently:

# Same code as before but the job has a bit of sleep simulating IO
task = ->(data) { sleep(rand(9..11) / 10000.0) }

Assuming we burn around 1ms per job, the number of passes skyrockets:

That's over 1000 times more invocations of the same method!

In a case, where we would run heavy queries of around 100ms (+/- 10%) per job, we end up with following results per iteration:

That means, that Ruby had to run #Thread#pass over 180 000 times on average for nothing!

When optimizing any code, it is good to establish the primary use case for its usage. In the case of Karafka, while raw throughput is important, it is more about complex jobs being able to use the GVL release strategy to allow parallel work execution upon IO.

So, is there a better way to make Ruby wait patiently on all the jobs to be done? There is: Queue#pop. Since it is thread-safe, we can use it to notify the main thread that the given job has finished. It won't eliminate useless runs, but it will reduce them so much that they, in fact, will become insignificant. Since we know how many jobs we've enqueued, we know how many times we need to #pop:

queue = Queue.new
lock = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
      lock << true
    end
  end
end

def wait_for_jobs_to_finish(dispatched, lock)
  dispatched.times { lock.pop }
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(data.size, lock)
end

The lock.pop will stop the execution of the main thread until each job is done. This means that we increase the number of stops with an increased number of threads. However, this correlation is linear and the end result is orders of magnitude smaller than when using Thread.pass.

Here's the same benchmark with a number of Queue#pop calls that replaced Thread#pass for non-sleep case:

The number of Queue#pop invocations equals the thread number. It is independent of jobs types or any other circumstances. So the longer jobs are, the bigger the improvement:

This change not only reduced the number of calls by over 99.994% but it also drastically lowered CPU utilization, which is visible especially for cases with extensive IO (here simulated with sleep):

Summary

So, is one better than the other? No. They should be used in different cases and to achieve different goals.

Thread#pass should not be used to defer work but rather to provide a hint to Ruby, that there may be more important things that it could focus on.

Queue#pop on the other hand can act not only as a component of a queue but also as a part of multi-threaded applications flow control.

Concurrency is not easy. Thread management and selection of proper methods are as crucial as understanding your primary use-cases and building correct benchmarks. Sometimes minor tweaks can provide tremendous benefits.


Note: this post would not be possible without extensive help from Samuel Williams. Thank you!


Cover photo by Chris-HÃ¥vard Berge on Attribution-NonCommercial 2.0 Generic (CC BY-NC 2.0) . Image has been cropped.

Copyright © 2022 Closer to Code

Theme by Anders NorenUp ↑