Tag: apache kafka

Karafka framework 1.3.0 Release Notes (Ruby + Kafka)

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

TL;DR

If you would prefer to see the changes in the code, here's the upgrade PR from the example app.

Note: Changes above don't include Zeitwerk setup for your non-Rails projects. See this commit for details on how to replace Karafka::Loader with Zeitwerk.

Note: If you use Sidekiq backend, keep in mind that before an upgrade, you need to consume all of the messages that are already in Redis.

Changes (features, incompatibilities, etc)

Auto-reload of code changes in development

Up until now, in order to see your code changes within the Karafka process, you would have to restart it. That was really cumbersome as for bigger and more complex Kafka clusters, restart with reconnections and rebalancing could take a significant amount of time. Fortunately, those times are already gone!

All you need to do is enabling this part of the code before the App.boot in your karafka.rb file:

# For non-Rails app with Zeitwerk loader
if Karafka::App.env.development?
  Karafka.monitor.subscribe(
    Karafka::CodeReloader.new(
      APP_LOADER
    )
  )
end

# Or for Ruby on Rails
if Karafka::App.env.development?
  Karafka.monitor.subscribe(
    Karafka::CodeReloader.new(
      *Rails.application.reloaders
    )
  )
end

and your code changes will be applied after each message/messages batch fetch.

Keep in mind though, that there are a couple of limitations to it:

  • Changes in the routing are NOT reflected. This would require reconnections and would drastically complicate reloading.
  • Any background work that you run, outside of the Karafka framework but still within, might not be caught in the reloading.
  • If you use in-memory consumer data buffering that spans across multiple batches (or messages in a single message fetch mode), it WON'T work as code reload means re-initializing all of the consumers instances. In cases like that. you will be better, not using the reload mode at all.

It is also worth pointing out, that if you have a code that should be re-initialized in any way during the reload phase, you can pass it to the Karafka::CodeReloader initializer:

if Karafka::App.env.development?
  Karafka.monitor.subscribe(
    Karafka::CodeReloader.new(
      *Rails.application.reloaders
    ) { Dry::Events::Publisher.registry.clear }
  )
end

Parsers are now Deserializers in the routing and accept the whole Karafka::Params::Params object

Parsers as a concept, that would be responsible for serialization and deserialization of data violated SRP (see details here). From now on, they are separate entities that you can use independently. For the upgrade, just rename parser to deserializer for each topic you're using in the routes:

App.consumer_groups.draw do
  consumer_group :batched_group do
    batch_fetching true

    topic :xml_data do
      consumer XmlMessagesConsumer
      batch_consuming false
      # parser XmlDeserializer.new
      deserializer XmlDeserializer.new
    end
  end
end

and make sure, you extract the payload of the message by yourself:

class XmlDeserializer
  # @param params [Karafka::Params::Params] params to de-serialize
  # @return [Hash] deserialized xml
  # @example:
  #   XmlDeserializer.new.call('<node>n</node>')
  def call(params)
    ::Hash.from_xml(params.payload)
  end
end

For a justification of this change, please refer to this pull request.

Zeitwerk in favor of Karafka::Loader

Note: You can skip this section if you use Karafka with Ruby on Rails.

We aren't the best at loading things. Zeitwerk is. That's why we've dropped our custom loader in favor of it.

Just load your app code in your karafka.rb file before configuring the app and you should be ready to go:

APP_LOADER = Zeitwerk::Loader.new

%w[
  lib
  app/consumers
  app/responders
  app/workers
].each(&APP_LOADER.method(:push_dir))

APP_LOADER.setup
APP_LOADER.eager_load

class App < Karafka::App
  # config here...
end

Don't forget to eager_load the code or some of the Karafka components might not work as expected.

Message payload now available under the 'payload' key without root merge

This is probably the biggest change in this release.

Up until now, your data when received was available in the root scope of each params instance in the #params_batch.

It means, that when you've sent a message as followed:

WaterDrop::SyncProducer.call(
  { login: 'maciek', id: '1' },
  topic: 'users'
)

you would access it like so:

def consume
  params_batch.each do |params|
    puts "Hello #{params['login']}!\n"
  end
end

Karafka used to merge your data directly within the Karafka::Params::Params object root scope. That was convenient, but not flexible enough. There are some metadata details in the root params scope that could get overwritten, plus in case you would send something else than a JSON hash, let's say an array, you would get an exception and you would have to use a custom parser to bypass that (see this FAQ question).

Due to that and in order to better separate your incoming data from the rest of the payload (headers, metadata information, etc), from now on, all of your data will be available under the payload params key:

def consume
  params_batch.each do |params|
    puts "Hello #{params['payload']['login']}!\n"
    # or
    puts "Hello #{params.payload['login']}!\n"
  end
en

The same applies to the case when you want to access unparsed data:

def consume
  params_batch.to_a.each |params|
    puts "Unparsed details: #{params['payload']}"
  end
end

Metadata support

When in the batch_fetching mode, while fetching data from the Kafka cluster, additional information is being received. This details are available using the #metadata consumer method:

class UsersConsumer < ApplicationConsumer
  def consume
    puts metadata
    #=> { batch_size: 200, topic: 'events', partition: 2 }
  end
end

Message headers support

In most message systems (JMS, QPID etc), streaming systems and most transport systems(HTTP, TCP), it is typical to have a concept of headers and payload.

The payload is traditionally for the business object, and headers are traditionally used for transport routing, filtering etc. Headers are most typically key=value pairs.

Both WaterDrop and Karafka support now messages headers.

WaterDrop::SyncProducer.call(
  { login: 'maciek', id: '1' },
  topic: 'users',
  headers: { event: 'created' }
)
# Karafka consumer
def consume
  puts params_batch.last.headers #=> { 'event' => 'created' }
end

RSpec helpers for much easier consumers testing

Up until now, in order to test consumers, you would have to know the internal format in which Karafka stores Kafka messages. That is no longer true!

We've created a new library called Karafka-Testing, that will provide you with all the methods to spec out your consumers much easier.

Installation

Add this gem to your Gemfile in the test group:

group :test do
  gem 'karafka-testing'
  gem 'rspec'
end

and then in your spec_helper.rb file:

require 'karafka/testing/rspec/helpers'

RSpec.configure do |config|
  config.include Karafka::Testing::RSpec::Helpers
end
Usage

Once included in your RSpec setup, this library will provide you two methods that you can use with your specs:

- #karafka_consumer_for - this method will create a consumer instance for the desired topic. It needs to be set as the spec subject.
- #publish_for_karafka - this method will "send" message to the consumer instance.

Note: Messages sent using the `#publish_for_karafka` method won't be sent to Kafka. They will be "virtually" delegated to the created consumer instance so your specs can run without Kafka setup.

RSpec.describe InlineBatchConsumer do
  # This will create a consumer instance with all the
  # settings defined for the given topic
  subject(:consumer) do
    karafka_consumer_for(:inline_batch_data)
  end

  let(:nr1_value) { rand }
  let(:nr2_value) { rand }
  let(:sum) { nr1_value + nr2_value }

  before do
    # Sends first message to Karafka consumer
    publish_for_karafka({ 'number' => nr1_value }.to_json)
    # Sends second message to Karafka consumer
    publish_for_karafka({ 'number' => nr2_value }.to_json)
    allow(Karafka.logger).to receive(:info)
  end

  it 'expects to log a proper message' do
    expect(Karafka.logger)
      .to receive(:info).with(
        "Sum of 2 elements equals to: #{sum}"
      )
    consumer.consume
  end
end

Instrumentation unification

We've made some small changed to the default listener and the names of the events that are being published during Karafka runtime flow execution. Please see this commit for more details.

Karafka::Instrumentation::Listener is now Karafka::Instrumentation::StdoutListener.

There has been a rename and a switch to an instantiation version of this listener.

Karafka.monitor.subscribe(
  # Old
  Karafka::Instrumentation::Listener
  # New
  Karafka::Instrumentation::StdoutListener.new
)

Karafka::Instrumentation::ProctitleListener has been added.

New instrumentation called Karafka::Instrumentation::ProctitleListener has been added. Its purpose is to provide you with a nicer proc title with a descriptive value. In order to use it, please put the following line in your karafka.rb boot file:

Karafka.monitor.subscribe(
  Karafka::Instrumentation::ProctitleListener.new
)

mark_as_consumed divided into mark_as_consumed and mark_as_consumed!

A blocking #mark_as_consumed method has been split into two:

  • #mark_as_consumed - for a non-blocking eventual offset commitment.
  • #mark_as_consumed! - for a blocking offset commitment that will stop the processing flow to ensure, that the offset has been stored.

#payloads for params_batch to extract only payload of objects from the params_batch

If you are not interested in the additional `#params` metadata, you can use the `#payloads` method to access only the Kafka messages deserialized payload:

class EventsConsumer < ApplicationConsumer
  def consume
    EventStore.store params_batch.payloads
  end
end

Single consumer class supports more than one topic

Since now, you are able to use the same consumer class for multiple topics:

App.consumer_groups.draw do
  consumer_group :default do
    topic :users do
      consumer UsersConsumer
    end

    topic :admins do
      consumer UsersConsumer
    end
  end
end

Note: you will still have separate instances per each topic partition.

Delayed re-connection upon critical failures

If a critical failure occurs (network disconnection or anything similar) Karafka will back off and wait for reconnect_timeout (defaults to 10s) before attempting to reconnect. This should prevent you from being clogged by errors and logs upon serious problems.

Support for Kafka 0.10 dropped in favor of native support for Kafka 0.11

Support for Kafka 0.10 has been dropped. Weird things may happen if you decide to use Kafka 0.10 with Karafka 1.3 so just upgrade.

Reorganized responders - multiple_usage constrain no longer available

multiple_usage has been removed. Responders won't raise any exception if you decide to send multiple messages to the same topic without declaring that. This feature was a bad idea and was creating a lot of trouble when using responders in a long-running, batched like flows.

Following code would raise a Karafka::Errors::InvalidResponderUsageError error in Karafka 1.2 but will continue to run in Karafka 1.3:

class ExampleResponder < ApplicationResponder
  topic :regular_topic

  def respond(user, profile)
    respond_to :regular_topic, user
    respond_to :regular_topic, user
  end
end

Exceptions names standardization

All the Karafka internal framework exception names end now with an Error postfix. Please see this file for the whole list of exceptions.

Default fetcher_max_queue_size changed from 100 to 10 to lower max memory usage

While Karafka is processing, ruby-kafka prebuffers more data under the hood in a separate thread. If you have a big consumer lag, this can cause your Karafka process to prebuffer hundreds or more megabytes of data upfront. Lowering the queue size makes Karafka more predictable by default.

Documentation

Our Wiki has been updated accordingly to the 1.3 status. Please notify us if you find any incompatibilities.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Karafka framework 1.2.0 Release Notes (Ruby + Kafka)

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

Note: 1.2 release is the last release that will require ActiveSupport to work.

Code quality

I will start with the same thing as with 1.1. We're constantly working on having a better and easier code base. Despite many changes to our code-base stack, we were able to maintain a pretty decent offenses distribution and trends.

It's worth pointing out, that we're now using more extensively many components of the Dry-Rb ecosystem and we love it!

Performance

This release brings significant performance improvements allowing you to consume around 40-50k messages per second per single topic. We could do a bit more (around 5-10%) by using symbols as defaults for metadata params key names, but this would bring up a lot of complexity and confusion since JSON parsing returns string keys. It would also introduce some problematic incompatibilities when using additional backend engines that serialize the whole params_batch and deserialize it back.

Karafka is a complex piece of software and benchmarking it can be tricky. There are many use-cases that need to be considered. Some of them single threaded, some of them multi-threaded, some with a non-parsed data rejections and some requiring multiple-thread interactions. That's why it is really hard to design a single benchmark that will be able to compare multiple Kafka + Ruby frameworks in a fair way.

We've decided not to go that way, but rather compare new releases with the previous once. Here are the results of running the same logic with 1.1 and 1.2 multiple times (the more the better):

For some edge cases, Karafka 1.2 can be up to 3x faster than 1.1.

If you are looking for some cross-framework benchmark results, they are available here.

Features

Controllers are now Consumers

Initial versions of Karafka were built with an idea, that we could ignore the transportation layer when working with data. Regardless whether it was an HTTP request, Kafka message or anything else, as long as the data is in a compatible format, we should not have to adapt our business logic to it.

That was the primary reason, why prior to Karafka 1.2 you would put logic in controllers that inherited from ApplicationController or KarafkaController. And this was a mistake.

More and more companies use Karafka within a typical Ruby on Rails stack in which controllers are meant to be Rails controllers. Less experienced developers that encounter Karafka controllers within Rails app/controllers namespace would often end up trying to use some Rails controllers API specific magic without realizing that they're within Karafka controller scope. To eliminate this problem and to match Kafka naming conventions, the processing units that are responsible for feeding you with Kafka data are being renamed to Consumers and from now on, there are no controllers in the Karafka ecosystem.

# Within app/consumers
class UsersCreatedConsumer < ApplicationConsumer
  def consume
    params_batch.each { |params| User.create!(params['user']) }
  end
end

New instrumentation engine using Dry-Monitor

Note: Dry-Monitor usage requires a separate article. Here's just a brief summary of what we did with it.

Old Karafka monitor was too magical. It would auto-detect the context in which it is invoked, automatically building notification scopes and doing a lot of other things. This was really cool but it was:

  • Slow
  • Hard to maintain
  • Bug sensitive
  • Code change sensitive
  • Not isolated from the rest of the system
  • Hard to use with custom tools like NewRelic or Airbrake
  • Limited when it comes to instrumenting with multiple tools at the same time
  • Too custom to be easily replaced

We are proud to announce, that from now on, Dry-Monitor is the instrumentation backbone of the whole Karafka ecosystem. Here's a simple example of what you can achieve using it:

Karafka.monitor.subscribe 'params.params.parse.error' do |event|
  puts "Oh no! An error: #{event[:error]} occurred!"
end

and to be honest, possibilities are endless. From simple logging, through in-production performance monitoring up to multi-target complex instrumentation. Please refer to the Monitoring and logging section of Karafka Wiki for more details.

Dynamic Karafka::Params::Params parent class

Karafka is designed to handle a lot of messages. Each incoming message is wrapper with a lazy evaluated hash-like object. Prior to 1.2, each params object was built based on ActiveSupport::HashWithIndifferentAccess. Truth be told, it is not the fastest library ever (benchmark details here), especially when compared to a PORO Hash:

Comparison:
Common Hash#[] access:  8306261.5 i/s
Common Hash#fetch access:  6053147.2 i/s - 1.37x slower
HashWithIndifferentAccess #[] String:  3803546.0 i/s - 2.18x slower
HashWithIndifferentAccess#fetch String:  1993671.6 i/s - 4.17x slower
HashWithIndifferentAccess#fetch Symbol:  1932004.0 i/s - 4.30x slower
HashWithIndifferentAccess #[] Symbol:  1422367.3 i/s - 5.84x slower
Hash#with_indifferent_access #[] String:   470876.8 i/s - 17.64x slower
Hash#with_indifferent_access #fetch String:   414701.6 i/s - 20.03x slower
Hash#with_indifferent_access #fetch Symbol:   410033.7 i/s - 20.26x slower
Hash#with_indifferent_access #[] Symbol: 381347.2 i/s - 21.78x slower

Now imagine that in some cases, we create 50 0000 objects like that per second. This had to have a serious impact on the framework performance. As always, there needs to be a trade-off. Should we go with a Hash in the name of performance or should we use HashWithIndifferentAccess for the sake of the "simplicity"? We will let you choose whatever you find more suitable.

For that reason, we've provided a config params_base_class setting that you can use to set up the base params class from which Karafka::Params::Params will inherit. By default, it is a plain Hash.

require 'active_support/hash_with_indifferent_access'

class App < Karafka::App
  setup do |config|
    # Other settings...
    # config.params_base_class = Hash
    config.params_base_class = HashWithIndifferentAccess
  end
end

Keep in mind, that you can use other base classes like for example concurrent hash for your advantage. The only requirement is that it needs to have the same API as a Ruby Hash.

System callbacks reorganization with multiple callbacks support

Note: This will be unified with a one set of events that you will be able to hook up to in 1.3 using Dry-Events.

Due to the fact, that some of the things happen in Karafka outside of consumers scope, there are two types of callbacks available:

- Lifecycle callbacks - callbacks that are triggered during various moments in the Karafka framework lifecycle. They can be used to configure additional software dependent on Karafka settings or to do one-time stuff that needs to happen before consumers are created.
- Consumer callbacks - callbacks that are triggered during various stages of messages flow

You can read more about them and how to use them in the Callbacks wiki section.

before_fetch_loop configuration block for early client usage (#seek, etc)

This new callback will be executed once per each consumer group per process before we start receiving messages. This is a great place if you need to use Kafka's #seek functionality to reprocess already fetched messages again.

Note: Keep in mind, that this is a per process configuration (not per consumer) so you need to check if a provided consumer_group (if you use multiple) is the one you want to seek against.

class App < Karafka::App
  # Setup and other things...

  # Moves the offset back to 100 message, so we can reprocess messages again
  # @note If you use multiple consumers group, make sure you execute #seek on a client of
  #   a proper consumer group not on all of them
  before_fetch_loop do |consumer_group, client|
    topic = 'my_topic'
    partition = 0
    offset = 100

    if consumer_group.topics.map(&:name).include?(topic)
      client.seek(topic, partition, offset)
    end
  end
end

Rewritten NewRelic client

Thanks to NewRelic kindness, we were able to rewrite the whole listener that now can collect various information about the Karafka data flow. It is super easy to use and extend. You can find it in the Monitoring and Logging wiki section.

Key and/or partition key support for responders

You can now provide key and/or partition_key when using responders:

module Users
  class CreatedResponder < KarafkaResponder
    topic :users_created

    def respond(user)
      respond_to :users_created, user, key: user.id
    end
  end
end

Alias for client#mark_as_consumed on a consumer level

Simple yet powerful. For max performance, you may use manual offset commit management. If you do that, you can now use the #mark_as_consumed directly, without having to refer to the #client object.

class UsersCreatedConsumer < ApplicationConsumer
  def consume
    params_batch.each { |params| User.create!(params['user']) }
    mark_as_consumed params_batch.last
  end
end

Incompatibilities and breaking changes

Controllers are now Consumers

Please refer to the features section with this one. It is both a feature and a breaking change at the same time.

after_fetched renamed to after_fetch to normalize the naming convention

class ExamplesConsumer < Karafka::BaseConsumer
  include Karafka::Consumers::Callbacks

  after_fetched do
    # Some logic here
  end

  def consume
    # some logic here
  end
end

is now:

class ExamplesConsumer < Karafka::BaseConsumer
  include Karafka::Consumers::Callbacks

  after_fetch do
    # Some logic here
  end

  def consume
    # some logic here
  end
end

received_at renamed to receive_time to follow ruby-kafka and WaterDrop conventions

received_at params key is now receive_time. This means that two timestamp values are available for each params object:

  • receive_time - the moment message was received by our Karafka process
  • create_time - the moment our message was created in the producer

Hash is now the default params base class in favor of ActiveSupport::HashWithIndifferentAccess

Long story short: performance and fewer dependencies. You can still use it though:

require 'active_support/hash_with_indifferent_access'

class App < Karafka::App
  setup do |config|
    # Other settings...
    config.params_base_class = HashWithIndifferentAccess
  end
end

All metadata keys are strings by default

Since now the default params class is a Hash, we had to pick either symbols or strings as key names for all the metadata attributes. We've decided to go with strings as they are more serialization friendly and cooperate with various backends used with Karafka.

Note: If you use HashWithIndifferentAccess, nothing really changes for you.

def consume
  params_batch.first.keys #=> ["parser", "partition", "offset", "key", "create_time", ...]
end

JSON parsing defaults now to string keys

Since there is no indifferent access by default, when lazy parsing the JSON Kafka data, it will default to string keys that will be merged to the params object. If you're not planning to use the HashWithIndifferentAccess make sure that your code-base is ready for this change.

Karafka 1.1:

class UsersCreatedConsumer < ApplicationConsumer
  def consume
    # Assuming user data is in the 'user' json scope
    params_batch.each do |params| params[:user] #=> { name: 'Maciek' }
      params['user'] #=> { name: 'Maciek' }
      params['receive_time'] #=> 2018-02-27 18:53:31 +0100
    end
  end
end

Karafka 1.2:

class UsersCreatedConsumer < ApplicationConsumer
  def consume
    # Assuming user data is in the 'user' json scope
    params_batch.each do |params| params[:user] #=> nil
      params['user'] #=> { name: 'Maciek' }
      # Note, that system keys are strings as well
      params['receive_time'] #=> 2018-02-27 18:53:31 +0100
    end
  end
end

Configurators removed in favor of the after_init block configuration

What were configurators? Let me quote 1.1 wiki on that one:

For additional setup and/or configuration tasks you can create custom configurators. Similar to Rails these are added to a config/initializers directory and run after app initialization.

Due to a changed lifecycle of Karafka process, more things are being built dynamically upon boot. This means that in order to run initializers in a good way, we would have to control the load order in a more granular way. That's why this functionality has been replaced with an after_init callback declaration:

class App < Karafka::App
  # Setup and other things...

  # Once everything is loaded and done, assign Karafka app logger as a Sidekiq logger
  # @note This example does not use config details, but you can use all the config values
  #   to setup your external components
  after_init do |_config|
    Sidekiq::Logging.logger = Karafka::App.logger
  end
end

Note: you can have as many callbacks of any type as you want to. They also can be objects as long as the respond to a #call method.

Karafka ecosystem gems versioning convention

Karafka is combined from several independent libraries. The most important are:

  • Karafka - The main gem that is used to build Karafka applications that consume messages
  • WaterDrop - WaterDrop is a standalone Karafka component library for generating Kafka messages
  • Capistrano-Karafka - Integration for deployment using Capistrano
  • Karafka Sidekiq Backend - an optional proxy that will pass messages received from Karafka into Sidekiq jobs

Some Karafka users had problems using mismatched versions of those gems. From now on, they all will be released in sync up to the second version point. It means that if you decide to use Karafka 1.2 with other ecosystem libraries, you should match them to 1.2.* as well.

Note: This should be resolved automatically as we locked all the proper versions within gemspec, but still worth mentioning.

Documentation

Our Wiki has been updated accordingly to the 1.2 status. You probably may want to look at the rewritten Monitoring and logging section and the new Testing guide that illustrates how you can test various Karafka ecosystem components.

Upgrade guide

Controllers are now Consumers

Following steps are required to move from controllers:

  • 1. Create app/consumers directory
  • 2. Rename ApplicationController (or KarafkaController) to ApplicationConsumer / KarafkaConsumer
  • 3. Move the ApplicationController and all Karafka controllers to app/consumers
  • 4. Rename files and classes by replacing "Controller" with "Consumer"
  • 5. If you use callbacks, don't forget about Karafka::Consumers::Callbacks
  • 6. Do exactly the same with your specs/tests
  • 7. Replace the controller consumers groups definition in the karafka.rb file with consumer
  • 8. Rename all the "Controller" with "Consumer" in the karafka.rb file

Karafka, WaterDrop and friends version match

This should be resolved automatically but if you prefer, you can always lock all the Karafka ecosystem gems in your gemfile:

gem 'karafka', '~> 1.2'
gem 'karafka-sidekiq-backend', '~> 1.2'
gem 'capistrano-karafka', '~> 1.2'

Ruby on Rails HashWithIndifferentAccess params compatibility mode

If you still want to use HashWithIndifferentAccess, feel free to:

require 'active_support/hash_with_indifferent_access'
 
class App < Karafka::App
  setup do |config|
    # Other settings...
    # config.params_base_class = Hash
    config.params_base_class = HashWithIndifferentAccess
  end
end

Default monitor and logger update

Please refer to the Monitoring and logging Wiki section for details of the way both of those things work now. If you used the default monitoring and logging without any customization, all you need to do is add this to your karafka.rb file after the setup part:

Karafka.monitor.subscribe(Karafka::Instrumentation::Listener)

NewRelic client update

If you use our NewRelic example client, please take a look at the new one and upgrade accordingly.

Callbacks rename

class ExamplesConsumer < Karafka::BaseConsumer
  include Karafka::Consumers::Callbacks
 
  # Rename this
  after_fetched do
    # Some logic here
  end

  # To this
  after_fetch do
    # Some logic here
  end
end

Karafka params received_at renamed to receive_time

Again, just a name change: if you use 'received_at' params timestamp, you'll enjoy it under the 'receive_time' key.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/karafka-example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑