Tag: Performance

Ruby: Stream processing of shell command results

There are various methods for calling shell commands in Ruby. Most of them either not wait at all for the results, or wait until the command execution is finished.

In many cases it is ok, because programmers want shell command results for further processing. Unfortunately this means that while a shell command runs, there's no way to get partial results and process them (multitasking FTW). It also means that all the results have to be buffered. It might be (for a long running intensive commands) a source of memory leaks in your applications.

Luckily there's a great way to process shell command data in a stream (row after row).

The task

Lets assume that we want to find first 10 files in our file system that match a given pattern (note that it could be achieved way better with just shell commands but it's not the point here).

The bad way

Here's a typical code to achieve that (and believe me - I've encountered solutions like that in production systems):

require 'memory_profiler'
report = MemoryProfiler.report do
  pattern = /test/
  results = `find / 2> /dev/null`.split("\n")
  selection = results.select { |file| file =~ pattern }
  selection[0..10]
end

report.pretty_print

This might seem elegant and it definitely works, but let's check Ruby's process memory usage:

Memory usage in MB (before and after find)

Total allocated: 661999052 bytes (2925598 objects)
Total retained:  40 bytes (1 objects)

allocated memory by gem
-----------------------------------
 661999052  other

allocated memory by class
-----------------------------------
 632408588  String
  26181752  Array
   3408440  MatchData
       232  Hash
        40  Process::Status

We had to use nearly 700MB of memory and it took us 4.7 seconds just to find few matching files. The time wouldn't be that bad,  but memory usage like this is a bit overkill. It is bad mostly because find / lists all the files and the more things we have, the bigger output we get. This also means that our code will behave differently dependent on what machine it will run. For some we might not have memory problems but for others it might grow over 1GB.

Now imagine what would happen if we would execute this code in 25 Sidekiq concurrent workers...

Of course with GC running we might not kill our machine, but memory spikes will look kinda weird and suspicious.

The good way - hello IO.popen

Instead of waiting for all the files from find / command, let's process each line separately. To do so, we can use the IO.popen method.

IO.popen runs the specified command as a subprocess; the subprocess’s standard input and output will be connected to the returned IO object. (source)

It means that we can execute find command and feed our main process with every line of the output separately.

Note: IO.popen executed without a block will not wait for the subprocess to finish!

require 'memory_profiler'
report = MemoryProfiler.report do
  pattern = /test/
  selection = []

  IO.popen('find / 2> /dev/null') do |io|
    while (line = io.gets) do
      # Note - here you could use break to get out and sigpipe
      # subprocess to finish it early. It will however mean that your subprocess
      # will stop running early and you need to test if it will stop without
      # causing any trouble
      next if selection.size > 10
      selection << line if line =~ pattern
    end
  end

  selection[0..10]
end

report.pretty_print

Results:

Total allocated: 362723119 bytes (2564923 objects)
Total retained:  394 bytes (3 objects)

allocated memory by gem
-----------------------------------
 362723119  other

allocated memory by class
-----------------------------------
 362713215  String
      8432  IO
      1120  MatchData
       232  Hash
        80  Array
        40  Process::Status

45% less memory required.

The best way (for some cases)

There's also one more way to do the same with the same #popen but in a slightly different style. If you:

  • Don't need to process all the lines from the executed command
  • Can terminate subprocess early
  • Are aware of how to manage subprocesses

you can then stream data into Ruby as long as you need and terminate once you're done. Than way Ruby won't fetch new lines and won't have to GC them later on.

require 'memory_profiler'
report = MemoryProfiler.report do
  pattern = /test/
  selection = []
  run = true

  io = IO.popen('find / 2> /dev/null')

  while (run && line = io.gets) do
    if selection.size > 10
      Process.kill('TERM', io.pid)
      io.close
      run = false
    end

    selection << line if line =~ pattern
  end

  selection
end

report.pretty_print

Since we don't wait for the subprocess to finish, it definitely will be faster but what about memory consumption?

Total allocated: 509989 bytes (5613 objects)
Total retained:  448 bytes (4 objects)

allocated memory by gem
-----------------------------------
    509989  other

allocated memory by class
-----------------------------------
    499965  String
      8432  IO
      1120  MatchData
       232  Hash
       200  Array
        40  Process::Status

99% less than the original solution!

Note: This solution is not always applicable.

Summary

The way you execute shell commands really depends on few factors:

  • Do you need an output results at all?
  • Do you need all the lines from the output at the same time?
  • Can you do other stuff and return once the data is ready?
  • Can you process partial data?
  • Can you terminate subprocess early?

When you go with your code out of Ruby scope and when you execute shell commands, it is always good to ask yourself those questions. Sometimes achieving stream processing ability can be done only when the system is being built, so it is really good to think about that before the implementation. In general I would recommend to always consider streaming in every place where we cannot exactly estimate the external command result size. That way you won't be surprised when there will be a lot more data that initially anticipated.

Note: Attentive readers will notice, that I didn't benchmark memory used in the subprocess. That is true, however it was irrelevant to our case as the shell command for all the cases was exactly the same.

Cover photo by: heiwa4126 on Creative Commons license.

Benchmarking Karafka – how does it handle multiple TCP connections

Recently I've released a Ruby Apache Kafka microframework, however I don't expect anyone to use it without at least a bit information on what it can do. Here are some measurements that I took.

How Karafka handles multiple TCP connections

Since listening to multiple topics require multiple TCP connections it is pretty obvious that in order to obtain a decent performance, we are using threads (process clustering feature is in progress). Each controller that you create theoretically could have a single thread and could listen all the time. However with a bigger application, it could slow down the application. That's why we introduced topics clusterization. When you config your Karafka application, you should specify the concurrency parameter:

class App < Karafka::App
  setup do |config|
    # Other config options
    config.max_concurrency = 10 # 10 threads max
  end
end

This is a maximum number of threads that will be used to listen for incoming messages. It is pretty simple when you have less controllers (topics) than threads - it will just use a single thread per topic. However if you have more controllers then threads - few connections will be packed in a single thread (wrapped with Karafka::Connection::ThreadCluster). And this is how it works when you have 2 threads and 4 controllers:

clusters

In general, it will distribute TCP connections across threads evenly. So, if you have 20 controllers and 5 threads, each thread will be responsible for checking 4 sockets, one after another. Since it won't do this simultaneously, Karafka will slow down. How much? It depends - if there's something on each of the topics - you will get around 24% (per controller) of the base performance out of each connection.

Other things that have impact on the performance

When considering this framework's performance, you need to keep in mind that:

  • It is strongly dependent on what you do in your code
  • It depends also on Apache Kafka performance
  • Connection between Karafka and Redis (for Sidekiq) is a factor as well
  • All the benchmarks show the performance without any business logic
  • All the benchmarks show the performance without enqueuing to Sidekiq
  • It also depends on what type of infrastructure you benchmark everything
  • Message size is a factor as well (since it get deserialized to JSON by default)
  • Ruby version - I've been testing in on MRI (CRuby) 2.2.3 - Karafka is not yet working with other Ruby distributions (JRuby or Rubinius) but it should change when some of the dependencies stop using refinements

Benchmarking

Methodology

For each of the benchmarks I was measuring time taken to consume all messages that were stored in Kafka. There were no business logic involved (just messages processing by the framework). My local Kafka setup was a default setup (no settings were changed) introduced with this Docker containers.

I've tested up to 5 topics - each with 1 000 000 messages loaded. Since Karafka has lazy loading for params - benchmark does not include time that is needed to unparse the messages. Unparsing performance strongly depends on a parser you pick (defaults to JSON) and messages size. Those benchmarks measure maximum throughput that we can get during messaging receiving.

Note: all the benchmarking was performed on my 16GB, 4 core i7 processor, Linux laptop. During the benchmarking I've been performing other tasks that might have small impact on overall results (although  no heavy stuff).

1 thread

With a single thread it is pretty straightforward - the more controllers we have, the less we can process per controller. There's also controllers context switching overhead that consumes some of the power, allowing us to consume less and less. Switching between controllers seems to consume around 11% of a single controller performance when we tend to use more than 1 controller in a single threaded application.

Zrzut ekranu z 2015-11-02 17:50:46
Context switching between controllers in a single thread will cost us around 1% of a general performance per one additional controller (if you're eager to know what we're planning to do with it scroll down to the summary). On one side it is a lot, on the other, with a bigger application you should probably run Karafka in multithreaded mode.. That way context switching won't be as painful.

2 threads

Zrzut ekranu z 2015-11-02 18:12:37
General performance with 2 threads and 2 controllers proves that we're able to lower switching impact on a overall performance, gaining around 1.5-2k requests per second (overall).

3 threads

Zrzut ekranu z 2015-11-02 18:23:13
5 controllers with 3 threads vs 5 controllers with 1 thread: 7% better performance.

4 threads

Zrzut ekranu z 2015-11-02 18:32:40

5 threads

Zrzut ekranu z 2015-11-02 18:33:33

Benchmarks results

Summary

The overall performance of a single Karafka framework process is highly dependent on the way it is being used. Because of GIL, when we receive data from sockets, we can only process incoming messages from a single socket at a time. So in general we're limited to around 30-33k requests per second per process. It means that the bigger the application gets, the slower it works (when we consider total performance per single controller). However this is only valid when we assume that all the topics are always full of messages. Otherwise we don't process, we wait on the IO and Ruby can process incoming messages from multiple threads. That's why it is worth starting Karafka with a decent concurrency level.

How can we increase throughput for Karafka applications? Well for now, we can create multiple partitions for a single topic and spin up multiple Karafka processes. Then they will load balance between partitions automatically. This solution has one downside: if we have only few topics with multiple partitions and rest with a single one, then some of the threads in Karafka won't perform any work. This will be fixed soon (we're already working on it), when we will introduce a Karafka processes clustering. It will allow to spin up multiple Karafka processes (in a single cluster) that will listen only for a given part of controllers. That way the overall performance will increase significantly. But still being able to perform 30k rq/s is not that bad, right? ;)

Copyright © 2023 Closer to Code

Theme by Anders NorenUp ↑