Running with Ruby

Category: Software (page 1 of 71)

The hidden cost of the Ruby 2.7 dot-colon method reference usage

Note: This case is valid also for the “old” #method method usage. The reason why I mention that in the “dot-colon” context, is the fact that due to the syntax sugar addition, this style of coding will surely be used more intensely.

Note: This feature has been reverted. See details here: bugs.ruby-lang.org/issues/16275.

Note: Benchmarks and the optimization approach still applies to the #method method usage.


One of the most interesting for me features of the upcoming Ruby 2.7 is the syntax sugar for the method reference. I like using the #method method together with the #then (#yield_self) operator in order to compose several functions in a “pipeline” like fashion. It’s particularly useful when you process data streams or build ETL pipelines.

Here’s an example of how you could use it:

class Parse
  def self.call(string)
    string.to_f
  end
end

class Normalize
  def self.call(number)
    number.round
  end
end

class Transform
  def self.call(int)
    int * 2
  end
end

# Simulate a long-running data producing source with batches
# Builds a lot of stringified floats (each unique)
stream = Array.new(10_000) do |i|
  Array.new(100) { |i| "#{i}.#{i}" }
end

stream.each do |batch|
  batch
    .map(&Parse.:call)
    .map(&Normalize.:call)
    .map(&Transform.:call)
end

It’s nice, it’s clear, it’s short. So what is wrong with it?

Well, what is wrong is Ruby itself. Each time you reference a method using the #method method, Ruby gives you a new instance of a #Method class. Even when you’re fetching the method of the same instance of an object. That’s not all! Since we’re using the & operator, each of the fetched method references is later on converted into a Proc object using the #to_proc method.

nil.:nil?.object_id #=> 47141222633640
nil.:nil?.object_id #=> 47141222626280
nil.:nil?.object_id #=> 47141222541360

# In general
nil.:nil?.object_id == nil.:nil?.object_id #=> false
nil.:nil?.to_proc == nil.:nil?.to_proc #=> false

It means that when you process a lot of data samples, you may spin up a lot of objects and pay a huge performance penalty. Especially when you operate on a per entity basis:

stream.each do |batch|
  batch.each do |message|
    message
      .then(&Parse.:call)
      .then(&Normalize.:call)
      .then(&Transform.:call)
  end
end

If you run the same code as above, but in a way like this:

stream.each do |batch|
  batch.each do |message|
    Transform.call(
      Normalize.call(
        Parse.call(message)
      )
    )
  end
end

you end up having 12 million fewer objects and you will be able to run your code almost 10 times faster!
See for yourself:

require 'benchmark/ips'

GC.disable

class Parse
  def self.call(string)
    string.to_f
  end
end

class Normalize
  def self.call(number)
    number.round
  end
end

class Transform
  def self.call(int)
    int * 2
  end
end

# Builds a lot of stringified floats (each unique)
stream = Array.new(10_000) do |i|
  Array.new(100) { |i| "#{i}.#{i}" }
end

Benchmark.ips do |x|
  x.config(time: 5, warmup: 1)

  x.report('std') do
    stream.each do |batch|
      batch.each do |message|
        Transform.call(
          Normalize.call(
            Parse.call(message)
          )
        )
      end
    end
  end

  # This case was pointed out by Vladimir Dementyev
  # See the comments for more details
  x.report('std-then') do
    stream.each do |batch|
      batch.each do |message|
        message.then do |message|
          Parse.call(message)
        end.then do |message|
          Normalize.call(message)
        end.then do |message|
          Transform.call(message)
        end
      end
    end
  end

  x.report('dot-colon') do
    stream.each do |batch|
      batch.each do |message|
        message
          .then(&Parse.:call)
          .then(&Normalize.:call)
          .then(&Transform.:call)
      end
    end
  end

  x.compare!
end

Results:

Warming up --------------------------------------
         std 1.000 i/100ms
    std-then 1.000 i/100ms
   dot-colon 1.000 i/100ms
Calculating -------------------------------------
         std 6.719 (± 0.0%) i/s - 34.000 in 5.060580s
    std-then 3.085 (± 0.0%) i/s - 16.000 in 5.187639s
   dot-colon 0.692 (± 0.0%) i/s -  4.000 in 5.824453s

Comparison:
         std: 6.7 i/s
    std-then: 3.1 i/s - 2.18x  slower
   dot-colon: 0.7 i/s - 9.70x  slower

Same for the allocation of the objects:

tao1 =  GC.stat[:total_allocated_objects]

stream.each do |batch|
  batch.each do |message|
    Transform.call(
      Normalize.call(
        Parse.call(message)
      )
    )
  end
end

tao2 =  GC.stat[:total_allocated_objects]

stream.each do |batch|
  batch.each do |message|
    message.then do |message|
      Parse.call(message)
    end.then do |message|
      Normalize.call(message)
    end.then do |message|
      Transform.call(message)
    end
  end
end

tao3 =  GC.stat[:total_allocated_objects]

stream.each do |batch|
  batch.each do |message|
    message
      .then(&Parse.:call)
      .then(&Normalize.:call)
      .then(&Transform.:call)
  end
end

tao4 =  GC.stat[:total_allocated_objects]

p "Std allocated: #{tao2 - tao1}"
p "Std-then allocated: #{tao3 - tao2}"
p "Dot-colon allocated: #{tao4 - tao3}"
Std allocated: 1
Std-then allocated: 2
Dot-colon allocated: 12000002

So, shouldn’t we use the new feature (and method reference in general) at all? Not exactly. There are two things you need to do if you want to use it and not slow down your application that much.

Memoize your method references

Instead of fetching the method reference for each of the objects (or batches), fetch it once and re-use:

parse = Parse.:call
normalize = Normalize.:call
transform = Transform.:call

stream.each do |batch|
  batch.each do |message|
    message
      .then(&parse)
      .then(&normalize)
      .then(&transform)
  end
end

This will save you from creating 3 milions objects and will make your code 7 times slower than the base one.

Convert the memoized methods into procs

Since Ruby will do that for you anyhow (in a loop), why not be smarter and do it for him:

parse = Parse.:call.to_proc
normalize = Normalize.:call.to_proc
transform = Transform.:call.to_proc

stream.each do |batch|
  batch.each do |message|
    message
      .then(&parse)
      .then(&normalize)
      .then(&transform)
  end
end

This will make the code above only 2.5 times slower than the base one (usually it’s fine), and at the same time, it will save you almost all out of the 12 milion additional objects!

Dot-colon and the method reference further development

Some of you might know that I’ve been involved a bit in this feature. I proposed and submitted a patch, that will make the .: Method object frozen. It may seem like not much, but freezing keeps a window of opportunity for introducing method reference caching in case it would be needed because the method object is immutable.

This proposal was an aftermath of my discussion with Ko1 and Matz this summer in Bristol. When using the #method method (not the syntax-sugar), due to the backwards compatibility (that I hope will be broken in this case), the Method instance is not frozen. However, the .: will be. It’s a rare corner case (why would you even want to mutate an object like that?), but it does create a funny “glitch”:

nil.:nil? == nil.method(:nil?) #=> true
nil.:nil?.frozen? #=> true
nil.method(:nil?).frozen? #=> false

Note: I’m planning to work on adding the last-method cache after the 2.7 is released and after I’m able to statistically justify that the majority of cases are as those presented above.


Cover photo by Rahel Samanyi on Attribution 2.0 Generic (CC BY 2.0) license.

Karafka framework 1.3.0 Release Notes (Ruby + Kafka)

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

TL;DR

If you would prefer to see the changes in the code, here’s the upgrade PR from the example app.

Note: Changes above don’t include Zeitwerk setup for your non-Rails projects. See this commit for details on how to replace Karafka::Loader with Zeitwerk.

Note: If you use Sidekiq backend, keep in mind that before an upgrade, you need to consume all of the messages that are already in Redis.

Note: This release is the last release with ruby-kafka under the hood. We’ve already started the process of moving to rdkafka-ruby.

Changes (features, incompatibilities, etc)

Auto-reload of code changes in development

Up until now, in order to see your code changes within the Karafka process, you would have to restart it. That was really cumbersome as for bigger and more complex Kafka clusters, restart with reconnections and rebalancing could take a significant amount of time. Fortunately, those times are already gone!

All you need to do is enabling this part of the code before the App.boot in your karafka.rb file:

# For non-Rails app with Zeitwerk loader
if Karafka::App.env.development?
  Karafka.monitor.subscribe(
    Karafka::CodeReloader.new(
      APP_LOADER
    )
  )
end

# Or for Ruby on Rails
if Karafka::App.env.development?
  Karafka.monitor.subscribe(
    Karafka::CodeReloader.new(
      *Rails.application.reloaders
    )
  )
end

and your code changes will be applied after each message/messages batch fetch.

Keep in mind though, that there are a couple of limitations to it:

  • Changes in the routing are NOT reflected. This would require reconnections and would drastically complicate reloading.
  • Any background work that you run, outside of the Karafka framework but still within, might not be caught in the reloading.
  • If you use in-memory consumer data buffering that spans across multiple batches (or messages in a single message fetch mode), it WON’T work as code reload means re-initializing all of the consumers instances. In cases like that. you will be better, not using the reload mode at all.

It is also worth pointing out, that if you have a code that should be re-initialized in any way during the reload phase, you can pass it to the Karafka::CodeReloader initializer:

if Karafka::App.env.development?
  Karafka.monitor.subscribe(
    Karafka::CodeReloader.new(
      *Rails.application.reloaders
    ) { Dry::Events::Publisher.registry.clear }
  )
end

Parsers are now Deserializers in the routing and accept the whole Karafka::Params::Params object

Parsers as a concept, that would be responsible for serialization and deserialization of data violated SRP (see details here). From now on, they are separate entities that you can use independently. For the upgrade, just rename parser to deserializer for each topic you’re using in the routes:

App.consumer_groups.draw do
  consumer_group :batched_group do
    batch_fetching true

    topic :xml_data do
      consumer XmlMessagesConsumer
      batch_consuming false
      # parser XmlDeserializer.new
      deserializer XmlDeserializer.new
    end
  end
end

and make sure, you extract the payload of the message by yourself:

class XmlDeserializer
  # @param params [Karafka::Params::Params] params to de-serialize
  # @return [Hash] deserialized xml
  # @example:
  #   XmlDeserializer.new.call('<node>n</node>')
  def call(params)
    ::Hash.from_xml(params.payload)
  end
end

For a justification of this change, please refer to this pull request.

Zeitwerk in favor of Karafka::Loader

Note: You can skip this section if you use Karafka with Ruby on Rails.

We aren’t the best at loading things. Zeitwerk is. That’s why we’ve dropped our custom loader in favor of it.

Just load your app code in your karafka.rb file before configuring the app and you should be ready to go:

APP_LOADER = Zeitwerk::Loader.new

%w[
  lib
  app/consumers
  app/responders
  app/workers
].each(&APP_LOADER.method(:push_dir))

APP_LOADER.setup
APP_LOADER.eager_load

class App < Karafka::App
  # config here...
end

Don’t forget to eager_load the code or some of the Karafka components might not work as expected.

Message payload now available under the ‘payload’ key without root merge

This is probably the biggest change in this release.

Up until now, your data when received was available in the root scope of each params instance in the #params_batch.

It means, that when you’ve sent a message as followed:

WaterDrop::SyncProducer.call(
  { login: 'maciek', id: '1' },
  topic: 'users'
)

you would access it like so:

def consume
  params_batch.each do |params|
    puts "Hello #{params['login']}!\n"
  end
end

Karafka used to merge your data directly within the Karafka::Params::Params object root scope. That was convenient, but not flexible enough. There are some metadata details in the root params scope that could get overwritten, plus in case you would send something else than a JSON hash, let’s say an array, you would get an exception and you would have to use a custom parser to bypass that (see this FAQ question).

Due to that and in order to better separate your incoming data from the rest of the payload (headers, metadata information, etc), from now on, all of your data will be available under the payload params key:

def consume
  params_batch.each do |params|
    puts "Hello #{params['payload']['login']}!\n"
    # or
    puts "Hello #{params.payload['login']}!\n"
  end
en

The same applies to the case when you want to access unparsed data:

def consume
  params_batch.to_a.each |params|
    puts "Unparsed details: #{params['payload']}"
  end
end

Metadata support

When in the batch_fetching mode, while fetching data from the Kafka cluster, additional information is being received. This details are available using the #metadata consumer method:

class UsersConsumer < ApplicationConsumer
  def consume
    puts metadata
    #=> { batch_size: 200, topic: 'events', partition: 2 }
  end
end

Message headers support

In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs.

Both WaterDrop and Karafka support now messages headers.

WaterDrop::SyncProducer.call(
  { login: 'maciek', id: '1' },
  topic: 'users',
  headers: { event: 'created' }
)
# Karafka consumer
def consume
  puts params_batch.last.headers #=> { 'event' => 'created' }
end

RSpec helpers for much easier consumers testing

Up until now, in order to test consumers, you would have to know the internal format in which Karafka stores Kafka messages. That is no longer true!

We’ve created a new library called Karafka-Testing, that will provide you with all the methods to spec out your consumers much easier.

Installation

Add this gem to your Gemfile in the test group:

group :test do
  gem 'karafka-testing'
  gem 'rspec'
end

and then in your spec_helper.rb file:

require 'karafka/testing/rspec/helpers'

RSpec.configure do |config|
  config.include Karafka::Testing::RSpec::Helpers
end
Usage

Once included in your RSpec setup, this library will provide you two methods that you can use with your specs:

#karafka_consumer_for – this method will create a consumer instance for the desired topic. It needs to be set as the spec subject.
#publish_for_karafka – this method will “send” message to the consumer instance.

Note: Messages sent using the `#publish_for_karafka` method won’t be sent to Kafka. They will be “virtually” delegated to the created consumer instance so your specs can run without Kafka setup.

RSpec.describe InlineBatchConsumer do
  # This will create a consumer instance with all the
  # settings defined for the given topic
  subject(:consumer) do
    karafka_consumer_for(:inline_batch_data)
  end

  let(:nr1_value) { rand }
  let(:nr2_value) { rand }
  let(:sum) { nr1_value + nr2_value }

  before do
    # Sends first message to Karafka consumer
    publish_for_karafka({ 'number' => nr1_value }.to_json)
    # Sends second message to Karafka consumer
    publish_for_karafka({ 'number' => nr2_value }.to_json)
    allow(Karafka.logger).to receive(:info)
  end

  it 'expects to log a proper message' do
    expect(Karafka.logger)
      .to receive(:info).with(
        "Sum of 2 elements equals to: #{sum}"
      )
    consumer.consume
  end
end

Instrumentation unification

We’ve made some small changed to the default listener and the names of the events that are being published during Karafka runtime flow execution. Please see this commit for more details.

Karafka::Instrumentation::Listener is now Karafka::Instrumentation::StdoutListener.

There has been a rename and a switch to an instantiation version of this listener.

Karafka.monitor.subscribe(
  # Old
  Karafka::Instrumentation::Listener
  # New
  Karafka::Instrumentation::StdoutListener.new
)

Karafka::Instrumentation::ProctitleListener has been added.

New instrumentation called Karafka::Instrumentation::ProctitleListener has been added. Its purpose is to provide you with a nicer proc title with a descriptive value. In order to use it, please put the following line in your karafka.rb boot file:

Karafka.monitor.subscribe(
  Karafka::Instrumentation::ProctitleListener.new
)

mark_as_consumed divided into mark_as_consumed and mark_as_consumed!

A blocking #mark_as_consumed method has been split into two:

  • #mark_as_consumed – for a non-blocking eventual offset commitment.
  • #mark_as_consumed! – for a blocking offset commitment that will stop the processing flow to ensure, that the offset has been stored.

#payloads for params_batch to extract only payload of objects from the params_batch

If you are not interested in the additional `#params` metadata, you can use the `#payloads` method to access only the Kafka messages deserialized payload:

class EventsConsumer < ApplicationConsumer
  def consume
    EventStore.store params_batch.payloads
  end
end

Single consumer class supports more than one topic

Since now, you are able to use the same consumer class for multiple topics:

App.consumer_groups.draw do
  consumer_group :default do
    topic :users do
      consumer UsersConsumer
    end

    topic :admins do
      consumer UsersConsumer
    end
  end
end

Note: you will still have separate instances per each topic partition.

Delayed re-connection upon critical failures

If a critical failure occurs (network disconnection or anything similar) Karafka will back off and wait for reconnect_timeout (defaults to 10s) before attempting to reconnect. This should prevent you from being clogged by errors and logs upon serious problems.

Support for Kafka 0.10 dropped in favor of native support for Kafka 0.11

Support for Kafka 0.10 has been dropped. Weird things may happen if you decide to use Kafka 0.10 with Karafka 1.3 so just upgrade.

Reorganized responders – multiple_usage constrain no longer available

multiple_usage has been removed. Responders won’t raise any exception if you decide to send multiple messages to the same topic without declaring that. This feature was a bad idea and was creating a lot of trouble when using responders in a long-running, batched like flows.

Following code would raise a Karafka::Errors::InvalidResponderUsageError error in Karafka 1.2 but will continue to run in Karafka 1.3:

class ExampleResponder < ApplicationResponder
  topic :regular_topic

  def respond(user, profile)
    respond_to :regular_topic, user
    respond_to :regular_topic, user
  end
end

Exceptions names standardization

All the Karafka internal framework exception names end now with an Error postfix. Please see this file for the whole list of exceptions.

Default fetcher_max_queue_size changed from 100 to 10 to lower max memory usage

While Karafka is processing, ruby-kafka prebuffers more data under the hood in a separate thread. If you have a big consumer lag, this can cause your Karafka process to prebuffer hundreds or more megabytes of data upfront. Lowering the queue size makes Karafka more predictable by default.

Documentation

Our Wiki has been updated accordingly to the 1.3 status. Please notify us if you find any incompatibilities.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Olderposts

Copyright © 2019 Running with Ruby

Theme by Anders NorenUp ↑