Running with Ruby

Tag: Performance (page 1 of 5)

Sidekiq Unique Jobs: don’t waste your time waiting – reschedule if busy

Half a year ago, I’ve made a post about Enforcing unique jobs in Karafka and Sidekiq for single resources. This approach is great, however, there’s a particular case in which Sidekiq Unique Jobs can block all of your Sidekiq workers except one. This can significantly limit your computing power without you being aware of it.

When using Sidekiq Unique Jobs with a WhileExecuting strategy, only a single worker can start processing Sidekiq job with a given unique key. This is really helpful when you work with resources for which you cannot perform parallel operations (for example when you work with a database for which there are no atomic operations but you need to increment counters), as doing so could overwrite results from an other worker. A WhileExecuting strategy, with a properly defined unique key can help you prevent that from happening. However…

Problem definition

Sidekiq jobs are being consumed out of a FIFO queue. Without any additional modifications, situation is pretty clear: having 4 single threaded processes each allows you to process 4 jobs at the same time.

Everything changes, when you decide to add Sidekiq Unique Jobs to your stack. In a case, when there are multiple jobs in sequence for a given unique key, Sidekiq will get seriously clogged. In the worst scenario, it won’t matter how many workers and threads you have in your Sidekiq infrastructure. You will get a performance of a single Sidekiq thread. It is because you cannot make Sidekiq skip certain tasks because of FIFO queue processing. To bypass this limitation, authors of Sidekiq Unique Jobs introduced a #sleep that will run up until the resource is free again to be processed or until timeout occurs. This approach means, that if you have more tasks in queue than processors, they will have to wait until all the jobs with a given unique key are processed.

All the workers will actively wait (meaning that in Sidekiq console you will see them marked as busy) up until a lock is released.

Solution: reschedule instead of waiting

Warning: if a similar case occurs in your business logic quite often, probably you will be better taking engine different than Sidekiq. I would recommend this solution for non-frequent edge/corner cases.

Bypassing that behavior is pretty easy: if there’s a lock, put the current job at the end of the queue. That said, your jobs will be checked for possibility of execution and rescheduled back instead of waiting. This will mess up your queue counters a bit (as you will have more jobs enqueued and processed that it should) but on the other hand it means that Sidekiq will “actively” seek for resources on which it can work in a certain moment.

To do so, we can create a new Unique Job strategy that we can later on apply. Apart from rescheduling, our strategy won’t differ from the WhileExecuting, so we can use it as a base.

module SidekiqUniqueJobs
  module Lock
    class WhileExecutingReschedule < WhileExecuting
      MUTEX = Mutex.new

      def synchronize
        MUTEX.lock

        if (@locked = locked?)
          yield
        else
          # We use sleep just to prevent from a pointless, extremely fast looping
          # in case all the jobs have the same unique key
          sleep 0.1
          @item['class'].constantize.perform_async(*@item['args'])
        end
      rescue Sidekiq::Shutdown
        logger.fatal { "the unique_key: #{@unique_digest} needs to be unlocked manually" }
        raise
      ensure
        # If we were able to obtain lock, we need to release it after processing
        if @locked
          SidekiqUniqueJobs.connection(@redis_pool) { |conn| conn.del @unique_digest }
        end

        MUTEX.unlock
      end
    end
  end
end

Applying this strategy is super easy. We just need to replace while_executing strategy with while_executing_reschedule:

class ApplicationWorker
  include Sidekiq::Worker

  # sidekiq_options unique: :while_executing
  sidekiq_options unique: :while_executing_reschedule
end

Cover photo by: Alexandre Duret-Lutz on Creative Commons 2.0 license. Changes made: added an overlay layer with an article title on top of the original picture.

Ruby: Stream processing of shell command results

There are various methods for calling shell commands in Ruby. Most of them either not wait at all for the results, or wait until the command execution is finished.

In many cases it is ok, because programmers want shell command results for further processing. Unfortunately this means that while a shell command runs, there’s no way to get partial results and process them (multitasking FTW). It also means that all the results have to be buffered. It might be (for a long running intensive commands) a source of memory leaks in your applications.

Luckily there’s a great way to process shell command data in a stream (row after row).

The task

Lets assume that we want to find first 10 files in our file system that match a given pattern (note that it could be achieved way better with just shell commands but it’s not the point here).

The bad way

Here’s a typical code to achieve that (and believe me – I’ve encountered solutions like that in production systems):

require 'memory_profiler'
report = MemoryProfiler.report do
  pattern = /test/
  results = `find / 2> /dev/null`.split("\n")
  selection = results.select { |file| file =~ pattern }
  selection[0..10]
end

report.pretty_print

This might seem elegant and it definitely works, but let’s check Ruby’s process memory usage:

Memory usage in MB (before and after find)

Total allocated: 661999052 bytes (2925598 objects)
Total retained:  40 bytes (1 objects)

allocated memory by gem
-----------------------------------
 661999052  other

allocated memory by class
-----------------------------------
 632408588  String
  26181752  Array
   3408440  MatchData
       232  Hash
        40  Process::Status

We had to use nearly 700MB of memory and it took us 4.7 seconds just to find few matching files. The time wouldn’t be that bad,  but memory usage like this is a bit overkill. It is bad mostly because find / lists all the files and the more things we have, the bigger output we get. This also means that our code will behave differently dependent on what machine it will run. For some we might not have memory problems but for others it might grow over 1GB.

Now imagine what would happen if we would execute this code in 25 Sidekiq concurrent workers…

Of course with GC running we might not kill our machine, but memory spikes will look kinda weird and suspicious.

The good way – hello IO.popen

Instead of waiting for all the files from find / command, let’s process each line separately. To do so, we can use the IO.popen method.

IO.popen runs the specified command as a subprocess; the subprocess’s standard input and output will be connected to the returned IO object. (source)

It means that we can execute find command and feed our main process with every line of the output separately.

Note: IO.popen executed without a block will not wait for the subprocess to finish!

require 'memory_profiler'
report = MemoryProfiler.report do
  pattern = /test/
  selection = []

  IO.popen('find / 2> /dev/null') do |io|
    while (line = io.gets) do
      # Note - here you could use break to get out and sigpipe
      # subprocess to finish it early. It will however mean that your subprocess
      # will stop running early and you need to test if it will stop without
      # causing any trouble
      next if selection.size > 10
      selection << line if line =~ pattern
    end
  end

  selection[0..10]
end

report.pretty_print

Results:

Total allocated: 362723119 bytes (2564923 objects)
Total retained:  394 bytes (3 objects)

allocated memory by gem
-----------------------------------
 362723119  other

allocated memory by class
-----------------------------------
 362713215  String
      8432  IO
      1120  MatchData
       232  Hash
        80  Array
        40  Process::Status

45% less memory required.

The best way (for some cases)

There’s also one more way to do the same with the same #popen but in a slightly different style. If you:

  • Don’t need to process all the lines from the executed command
  • Can terminate subprocess early
  • Are aware of how to manage subprocesses

you can then stream data into Ruby as long as you need and terminate once you’re done. Than way Ruby won’t fetch new lines and won’t have to GC them later on.

require 'memory_profiler'
report = MemoryProfiler.report do
  pattern = /test/
  selection = []
  run = true

  io = IO.popen('find / 2> /dev/null')

  while (run && line = io.gets) do
    if selection.size > 10
      Process.kill('TERM', io.pid)
      io.close
      run = false
    end

    selection << line if line =~ pattern
  end

  selection
end

report.pretty_print

Since we don’t wait for the subprocess to finish, it definitely will be faster but what about memory consumption?

Total allocated: 509989 bytes (5613 objects)
Total retained:  448 bytes (4 objects)

allocated memory by gem
-----------------------------------
    509989  other

allocated memory by class
-----------------------------------
    499965  String
      8432  IO
      1120  MatchData
       232  Hash
       200  Array
        40  Process::Status

99% less than the original solution!

Note: This solution is not always applicable.

Summary

The way you execute shell commands really depends on few factors:

  • Do you need an output results at all?
  • Do you need all the lines from the output at the same time?
  • Can you do other stuff and return once the data is ready?
  • Can you process partial data?
  • Can you terminate subprocess early?

When you go with your code out of Ruby scope and when you execute shell commands, it is always good to ask yourself those questions. Sometimes achieving stream processing ability can be done only when the system is being built, so it is really good to think about that before the implementation. In general I would recommend to always consider streaming in every place where we cannot exactly estimate the external command result size. That way you won’t be surprised when there will be a lot more data that initially anticipated.

Note: Attentive readers will notice, that I didn’t benchmark memory used in the subprocess. That is true, however it was irrelevant to our case as the shell command for all the cases was exactly the same.

Cover photo by: heiwa4126 on Creative Commons license.

Olderposts

Copyright © 2017 Running with Ruby

Theme by Anders NorenUp ↑