Running with Ruby

Page 2 of 154

Karafka framework 1.4.0 Release Notes (Ruby + Kafka)

This release mostly solves problems related to message deserialization and normalizes some of the naming conventions to ease during the upgrade to the upcoming 2.0 version.

Note: This release is the last release with ruby-kafka under the hood. We’ve already started the process of moving to rdkafka-ruby.

Note: If you are using Sidekiq-Backend plugin, please make sure that you’ve processed all the jobs from your Sidekiq queue before upgrading Karafka gems.

Changes (features, incompatibilities, etc)

consumer#metadata is now consumer#batch_metadata

This change is trivial: if you use batch consuming mode and you use the Consumer#metadata method, replace it with Consumer#batch_metadata.

# Karafka 1.3
class UsersConsumer < ApplicationConsumer
  def consume
    puts metadata
  end
end

# Karafka 1.4
class UsersConsumer < ApplicationConsumer
  def consume
    puts batch_metadata
  end
end

Message metadata available under #metadata method

Up to version 1.3, all the message metadata would be directly available under the root scope of the params object using both direct method reference as well as with #[] accessor.

While it felt like “The Rails way”, it had several side-effects, amongst which the biggest were the need of having a hash like API, issues with accessing metadata without payload deserialization, and a lack of clear separation between payload and the metadata.

From now on, you can use the params.metadata object to fetch all the metadata.

Note: we’ve preserved the direct metadata values fetching from the params object to preserve backwards compatibility.

# 1.3
params['partition'] #=> 0
params.partition #=> 0

# 1.4
params['partition'] #=> NoMethodError (undefined method '[]')

# This will work due to backward compatibility
params.partition #=> 0

# This is the recommended way of accessing metadata
params.metadata.partition #=> 0

# This will also work as metadata is a struct now
params.metadata[:partition] #=> 0
params.metadata['partition'] #=> 0

Message metadata access allowed without message deserialization

When accessing metadata, the payload is not being deserialized until #payload method is being used.

null message support in the default JSON deserializer

When the Kafka message payload is null / nil, deserialization won’t fail. Support for it was added as some of the Karafka users use log compaction with a nil payload. In case like that, #payload will return nil.

Karafka::Params::Params no longer inherits from a Hash

Karafka::Params::Params is now just a struct. This change is introduced to normalize the setup, limit the corner cases and simplify the interface only to methods that are really needed.

Documentation

Our Wiki has been updated accordingly to the 1.4 status. Please notify us if you find any incompatibilities.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

The hidden cost of the Ruby 2.7 dot-colon method reference usage

Note: This case is valid also for the “old” #method method usage. The reason why I mention that in the “dot-colon” context, is the fact that due to the syntax sugar addition, this style of coding will surely be used more intensely.

Note: This feature has been reverted. See details here: bugs.ruby-lang.org/issues/16275.

Note: Benchmarks and the optimization approach still applies to the #method method usage.


One of the most interesting for me features of the upcoming Ruby 2.7 is the syntax sugar for the method reference. I like using the #method method together with the #then (#yield_self) operator in order to compose several functions in a “pipeline” like fashion. It’s particularly useful when you process data streams or build ETL pipelines.

Here’s an example of how you could use it:

class Parse
  def self.call(string)
    string.to_f
  end
end

class Normalize
  def self.call(number)
    number.round
  end
end

class Transform
  def self.call(int)
    int * 2
  end
end

# Simulate a long-running data producing source with batches
# Builds a lot of stringified floats (each unique)
stream = Array.new(10_000) do |i|
  Array.new(100) { |i| "#{i}.#{i}" }
end

stream.each do |batch|
  batch
    .map(&Parse.:call)
    .map(&Normalize.:call)
    .map(&Transform.:call)
end

It’s nice, it’s clear, it’s short. So what is wrong with it?

Well, what is wrong is Ruby itself. Each time you reference a method using the #method method, Ruby gives you a new instance of a #Method class. Even when you’re fetching the method of the same instance of an object. That’s not all! Since we’re using the & operator, each of the fetched method references is later on converted into a Proc object using the #to_proc method.

nil.:nil?.object_id #=> 47141222633640
nil.:nil?.object_id #=> 47141222626280
nil.:nil?.object_id #=> 47141222541360

# In general
nil.:nil?.object_id == nil.:nil?.object_id #=> false
nil.:nil?.to_proc == nil.:nil?.to_proc #=> false

It means that when you process a lot of data samples, you may spin up a lot of objects and pay a huge performance penalty. Especially when you operate on a per entity basis:

stream.each do |batch|
  batch.each do |message|
    message
      .then(&Parse.:call)
      .then(&Normalize.:call)
      .then(&Transform.:call)
  end
end

If you run the same code as above, but in a way like this:

stream.each do |batch|
  batch.each do |message|
    Transform.call(
      Normalize.call(
        Parse.call(message)
      )
    )
  end
end

you end up having 12 million fewer objects and you will be able to run your code almost 10 times faster!
See for yourself:

require 'benchmark/ips'

GC.disable

class Parse
  def self.call(string)
    string.to_f
  end
end

class Normalize
  def self.call(number)
    number.round
  end
end

class Transform
  def self.call(int)
    int * 2
  end
end

# Builds a lot of stringified floats (each unique)
stream = Array.new(10_000) do |i|
  Array.new(100) { |i| "#{i}.#{i}" }
end

Benchmark.ips do |x|
  x.config(time: 5, warmup: 1)

  x.report('std') do
    stream.each do |batch|
      batch.each do |message|
        Transform.call(
          Normalize.call(
            Parse.call(message)
          )
        )
      end
    end
  end

  # This case was pointed out by Vladimir Dementyev
  # See the comments for more details
  x.report('std-then') do
    stream.each do |batch|
      batch.each do |message|
        message.then do |message|
          Parse.call(message)
        end.then do |message|
          Normalize.call(message)
        end.then do |message|
          Transform.call(message)
        end
      end
    end
  end

  x.report('dot-colon') do
    stream.each do |batch|
      batch.each do |message|
        message
          .then(&Parse.:call)
          .then(&Normalize.:call)
          .then(&Transform.:call)
      end
    end
  end

  x.compare!
end

Results:

Warming up --------------------------------------
         std 1.000 i/100ms
    std-then 1.000 i/100ms
   dot-colon 1.000 i/100ms
Calculating -------------------------------------
         std 6.719 (± 0.0%) i/s - 34.000 in 5.060580s
    std-then 3.085 (± 0.0%) i/s - 16.000 in 5.187639s
   dot-colon 0.692 (± 0.0%) i/s -  4.000 in 5.824453s

Comparison:
         std: 6.7 i/s
    std-then: 3.1 i/s - 2.18x  slower
   dot-colon: 0.7 i/s - 9.70x  slower

Same for the allocation of the objects:

tao1 =  GC.stat[:total_allocated_objects]

stream.each do |batch|
  batch.each do |message|
    Transform.call(
      Normalize.call(
        Parse.call(message)
      )
    )
  end
end

tao2 =  GC.stat[:total_allocated_objects]

stream.each do |batch|
  batch.each do |message|
    message.then do |message|
      Parse.call(message)
    end.then do |message|
      Normalize.call(message)
    end.then do |message|
      Transform.call(message)
    end
  end
end

tao3 =  GC.stat[:total_allocated_objects]

stream.each do |batch|
  batch.each do |message|
    message
      .then(&Parse.:call)
      .then(&Normalize.:call)
      .then(&Transform.:call)
  end
end

tao4 =  GC.stat[:total_allocated_objects]

p "Std allocated: #{tao2 - tao1}"
p "Std-then allocated: #{tao3 - tao2}"
p "Dot-colon allocated: #{tao4 - tao3}"
Std allocated: 1
Std-then allocated: 2
Dot-colon allocated: 12000002

So, shouldn’t we use the new feature (and method reference in general) at all? Not exactly. There are two things you need to do if you want to use it and not slow down your application that much.

Memoize your method references

Instead of fetching the method reference for each of the objects (or batches), fetch it once and re-use:

parse = Parse.:call
normalize = Normalize.:call
transform = Transform.:call

stream.each do |batch|
  batch.each do |message|
    message
      .then(&parse)
      .then(&normalize)
      .then(&transform)
  end
end

This will save you from creating 3 milions objects and will make your code 7 times slower than the base one.

Convert the memoized methods into procs

Since Ruby will do that for you anyhow (in a loop), why not be smarter and do it for him:

parse = Parse.:call.to_proc
normalize = Normalize.:call.to_proc
transform = Transform.:call.to_proc

stream.each do |batch|
  batch.each do |message|
    message
      .then(&parse)
      .then(&normalize)
      .then(&transform)
  end
end

This will make the code above only 2.5 times slower than the base one (usually it’s fine), and at the same time, it will save you almost all out of the 12 milion additional objects!

Dot-colon and the method reference further development

Some of you might know that I’ve been involved a bit in this feature. I proposed and submitted a patch, that will make the .: Method object frozen. It may seem like not much, but freezing keeps a window of opportunity for introducing method reference caching in case it would be needed because the method object is immutable.

This proposal was an aftermath of my discussion with Ko1 and Matz this summer in Bristol. When using the #method method (not the syntax-sugar), due to the backwards compatibility (that I hope will be broken in this case), the Method instance is not frozen. However, the .: will be. It’s a rare corner case (why would you even want to mutate an object like that?), but it does create a funny “glitch”:

nil.:nil? == nil.method(:nil?) #=> true
nil.:nil?.frozen? #=> true
nil.method(:nil?).frozen? #=> false

Note: I’m planning to work on adding the last-method cache after the 2.7 is released and after I’m able to statistically justify that the majority of cases are as those presented above.


Cover photo by Rahel Samanyi on Attribution 2.0 Generic (CC BY 2.0) license.

« Older posts Newer posts »

Copyright © 2020 Running with Ruby

Theme by Anders NorenUp ↑