Tag: Ruby

Reduce your method calls by 99.9% by replacing Thread#pass with Queue#pop

When doing multi-threaded work in Ruby, there are a couple of ways to control the execution flow within a given thread. In this article, I will be looking at Thread#pass and Queue#pop and how understanding each of them can help you drastically optimize your applications.

Thread#pass - what it is and how does it work

One of the ways you can ask the scheduler to "do something else" is by using the Thread#pass method.

Where can you find it? Well, aside from Karafka, for example in one of the most recent additions to ActiveRecord called #load_async (pull request).

Let's see how it works and why it may or may not be what you are looking for when building multi-threaded applications.

Ruby docs are rather minimalistic with its description:

Give the thread scheduler a hint to pass execution to another thread. A running thread may or may not switch, it depends on OS and processor.

That means that when dealing with threads, you can tell Ruby that it would not be a bad idea to switch from executing the current one and focusing on others.

By default, all the threads you create have the same priority and are treated the same way. An excellent illustration of this is the code below:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    # Make threads wait for a bit so all threads are created
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

# for i in {1..1000}; do ruby threads.rb; done > results.txt

on average, the computation in each of them took a similar amount of time:

The difference in between the fastest and the slowest thread was less than 8%.

However, when one of the threads "passes," things change drastically:

threads = []

threads = 10.times.map do |i|
  Thread.new do
    sleep(0.001) until threads.size == 10

    start = Time.now.to_f

    10_000_000.times do
      Thread.pass if i.zero?

      start / rand
    end

    puts "Thread #{i},#{Time.now.to_f - start}"
  end
end

threads.each(&:join)

Now, thread zero takes twice as much time as other threads doing the same job.

What is worth pointing out is that this method does not stop the execution flow by itself, and it just suggests to Ruby that there may be other more important things to do.

Exactly this behaviour was used by Jean Boussier in ActiveRecord:

def schedule_query(future_result) # :nodoc:
  @async_executor.post { future_result.execute_or_skip }
  Thread.pass
end

This code schedules a background job and suggests to the scheduler that it may be worth doing that or other things somewhere else.

It is worth mentioning, that when all the threads use the Thread#pass, it becomes a colossal burden to the Ruby VM. Ruby goes crazy since none of the threads wants to do any work and the execution time increases over 100 times.

Queue#pop - What it is and how does it work

Queue is a well known class and #popis one of the most important methods it contains.

Here is what Ruby docs say about the Queue class and the #pop method:

The Queue class implements multi-producer, multi-consumer queues. It is especially useful in threaded programming when information must be exchanged safely between multiple threads. The Queue class implements all the required locking semantics.

#pop: If the queue is empty, the calling thread is suspended until data is pushed onto the queue. If non_block is true, the thread isn't suspended, and ThreadError is raised.

When asked about queues, most programmers think about workers consuming jobs from a queue:

numbers = Queue.new

threads = 10.times.map do |i|
  Thread.new do
    while number = numbers.pop
      result = Time.now.to_f / number

      # a bit of randomness
      sleep(rand / 1_000)

      puts "Thread #{i},#{result}"
    end
  end
end

10_000.times { numbers << rand }

# see what I did here? ;)
Thread.pass until numbers.empty?

numbers.close

threads.each(&:join)

What is worth keeping in mind about Queue#pop is that it will block the execution in a given thread until there is something to do. This means, that a blocked thread becomes almost "invisible" from the performance perspective. Here's an example of running computations with 0 , 4, 9 and 99 blocked threads:

queue = Queue.new

THREADS = 4

THREADS.times do
  Thread.new { queue.pop }
end

# Wait until all the threads are initialized
Thread.pass until queue.num_waiting == THREADS

start = Time.now.to_f

10_000_000.times do
  start / rand
end

puts Time.now.to_f - start

As you can see, inactive threads do not have a big impact on the overall performance of this code. Even with 99 extra threads, the end result is not far away from the baseline.

Reducing method calls in a multi-threaded environment

Now that you know what Thread#pass and Queue#pop do, lets put them to work in a real use case. For that to happen we will be looking into the Karafka framework.

Karafka is a framework used to simplify Apache Kafka-based Ruby applications development that I built. The version 2.0 supports work distribution across multiple threads. The way it works from a data processing perspective is quite simple:

1. Take some data from Kafka
2. Divide it into processing units (jobs)
3. Put all the jobs into a queue
4. Wait for all the workers to pick the jobs and finish all the work
5. Repeat endlessly

Assuming an endless stream of data available, this can be pretty much modelled as followed:

queue = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
    end
  end
end

def wait_for_jobs_to_finish(queue)
  Thread.pass while queue.num_waiting < THREADS || !queue.empty?
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(queue)
end

And this is how the listener loop together with jobs distributions was implemented by me initially.

When benchmarked in regards to the number of times Thread#pass was executed on a pass-through benchmark (where we measure max throughput), things looked solid.

Despite increased number of iterations, we would not wait more often per iteration. What that means, is that our jobs were short enough for them to finish prior to Ruby returning to the wait loop.

Things become much more interesting if we assume that our jobs take more time than Ruby gives them before thread execution is interrupted. Then things start to look differently:

# Same code as before but the job has a bit of sleep simulating IO
task = ->(data) { sleep(rand(9..11) / 10000.0) }

Assuming we burn around 1ms per job, the number of passes skyrockets:

That's over 1000 times more invocations of the same method!

In a case, where we would run heavy queries of around 100ms (+/- 10%) per job, we end up with following results per iteration:

That means, that Ruby had to run #Thread#pass over 180 000 times on average for nothing!

When optimizing any code, it is good to establish the primary use case for its usage. In the case of Karafka, while raw throughput is important, it is more about complex jobs being able to use the GVL release strategy to allow parallel work execution upon IO.

So, is there a better way to make Ruby wait patiently on all the jobs to be done? There is: Queue#pop. Since it is thread-safe, we can use it to notify the main thread that the given job has finished. It won't eliminate useless runs, but it will reduce them so much that they, in fact, will become insignificant. Since we know how many jobs we've enqueued, we know how many times we need to #pop:

queue = Queue.new
lock = Queue.new

THREADS = 10

THREADS.times do |i|
  Thread.new do
    loop do
      data, task = queue.pop
      task.call(data)
      lock << true
    end
  end
end

def wait_for_jobs_to_finish(dispatched, lock)
  dispatched.times { lock.pop }
end

def data
  Array.new(10) { rand }
end

task = ->(data) { data * 2 }

100_000.times do
  data.each { queue << [_1, task] }

  wait_for_jobs_to_finish(data.size, lock)
end

The lock.pop will stop the execution of the main thread until each job is done. This means that we increase the number of stops with an increased number of threads. However, this correlation is linear and the end result is orders of magnitude smaller than when using Thread.pass.

Here's the same benchmark with a number of Queue#pop calls that replaced Thread#pass for non-sleep case:

The number of Queue#pop invocations equals the thread number. It is independent of jobs types or any other circumstances. So the longer jobs are, the bigger the improvement:

This change not only reduced the number of calls by over 99.994% but it also drastically lowered CPU utilization, which is visible especially for cases with extensive IO (here simulated with sleep):

Summary

So, is one better than the other? No. They should be used in different cases and to achieve different goals.

Thread#pass should not be used to defer work but rather to provide a hint to Ruby, that there may be more important things that it could focus on.

Queue#pop on the other hand can act not only as a component of a queue but also as a part of multi-threaded applications flow control.

Concurrency is not easy. Thread management and selection of proper methods are as crucial as understanding your primary use-cases and building correct benchmarks. Sometimes minor tweaks can provide tremendous benefits.


Note: this post would not be possible without extensive help from Samuel Williams. Thank you!


Cover photo by Chris-HÃ¥vard Berge on Attribution-NonCommercial 2.0 Generic (CC BY-NC 2.0) . Image has been cropped.

Controlling Elgato Key Light under Ubuntu with Ruby

Recently I've acquired Elgato Key Light. It is a WiFi controllable LED lighting panel.

The panel uses 160 LEDs to provide up to 2800 lumens of brightness and a color range of 2900-7000K. While you can control it from a mobile device, doing it directly from the shell makes the whole experience way more convenient.

Key Light officially does not support Linux, however it uses ESP32, and it does run an unprotected HTTP server that accepts JSON commands. You can not only turn it on and off but also adjust both light temperature and brightness with simple JSON requests.

Here's the code that I use to control my lights written in Ruby:

#!/usr/bin/env ruby

require 'uri'
require 'net/http'
require 'json'
require 'yaml'

# Set your lights IPs
LIGHTS = %w[
  192.168.0.199
]

COMMAND = ARGV[0]
DETAIL1 = ARGV[1].to_i
DETAIL2 = ARGV[2].to_i

# Sends a json request to a given elgato light
def dispatch(ip, payload, endpoint, method)
  uri = URI("http://#{ip}:9123/elgato/#{endpoint}")
  req = method.new(uri)
  req.body = payload.to_json

  res = Net::HTTP.start(uri.hostname, uri.port) do |http|
    http.request(req)
  end

  puts res.body
end

def on(light_ip)
  dispatch(
    light_ip,
    { 'numberOfLights': 1, 'lights': [{ 'on': 1 }] },
    'lights',
    Net::HTTP::Put
  )
end

def off(light_ip)
  dispatch(
    light_ip,
    { 'numberOfLights': 1, 'lights': [{ 'on': 0 }] },
    'lights',
    Net::HTTP::Put
  )
end

def temperature(light_ip, value)
  raise "Needs to be between 143 and 344, was: #{value}" if value < 143 || value > 344

  dispatch(
    light_ip,
    { 'numberOfLights': 1, 'lights': [{ 'on': 1, 'temperature': value }] },
    'lights',
    Net::HTTP::Put
  )
end

def brightness(light_ip, value)
  raise "Needs to be between 3 and 100, was: #{value}"  if value < 3 || value > 100

  dispatch(
    light_ip,
    { 'numberOfLights': 1, 'lights': [{ 'on': 1, 'brightness': value }] },
    'lights',
    Net::HTTP::Put
  )
end

def info(light_ip)
  dispatch(
    light_ip,
    {},
    'accessory-info',
    Net::HTTP::Get
  )
end

def command(command, light_ip)
  case command
  when 'on'
    on(light_ip)
  when 'off'
    off(light_ip)
  when 'temperature'
    temperature(light_ip, DETAIL1)
  when 'brightness'
    brightness(light_ip, DETAIL1)
  when 'theme'
    temperature(light_ip, DETAIL1)
    brightness(light_ip, DETAIL2)
  when 'info'
    info(light_ip)
  else
    raise "Unknown COMMAND #{COMMAND}"
  end
end

LIGHTS.each { |light_ip| puts command(COMMAND, light_ip) }

You can place this code in /usr/local/bin under elgato name with executable permissions and then you can just:

elgato on # turn your lights on
elgato off # turn your lights off
elgato info # get info on your lights
elgato brightness 50 # set brightness to 50%
elgato temperature 280 # make  light temperature quite warm

Copyright © 2022 Closer to Code

Theme by Anders NorenUp ↑