Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.
Time passes by, Kafka is already 1.0 and Karafka is already 1.1.
I will start from the same thing as with 1.0. We’re constantly working on having a better and easier code base. Apart from adding several new monitoring tools to our code quality stack, we were able to maintain a pretty decent offenses distribution and trends.
It’s also worth noting, that our reaction time towards gem updates and other upgrades have significantly improved, which means that we’re almost always up to date with all the dependencies.
There are several new features included in this release. Most of them focus on advanced users, that needed a better control over processing flow. However, this does not mean, that regular “consumers” won’t benefit from them. Features from this release give you space to expand your applications beyond simple consumption and allow you to process more and faster.
Manual offset management
Most of the time you will be happy with the automatic offset management, however there are some cases in which you might be interested in taking control over this process. It can be helpful i.a.:
- In memory DDD sagas realization,
- Simulating transactions.
In a real shortcut, this is how you can use it:
Disable automatic offset management either on the app or the consumer group level:
class App < Karafka::App consumer_groups.draw do consumer_group :events do automatically_mark_as_consumed false topic :user_events do controller EventsController end end end end
and just commit your offsets manually:
def consume # Do something with messages EventStore.store(params_batch.parsed) # And now mark last message as consumed, # so we won't consume any of already processed messages again consumer.mark_as_consumed params_batch.to_a.last end
Note: You can read more about this feature in the Manual offset management (checkpointing) Wiki section.
WaterDrop 1.0 with sync and async support
WaterDrop is a standalone messages producer that is integrated with Karafka out of the box.
We’ve recently redesigned both its internals and the API, to make it better, less memory consuming, easier to use and more bullet-proof.
Karafka 1.1 comes with full WaterDrop 1.0 support, including both synchronous and asynchronous producers. It also integrates automatically with it, populating all the options related to Kafka that were set during the Karafka framework configuration.
In case you want to change WaterDrop configuration settings, you can do this after you setup and boot Karafka framework in the karafka.rb file:
class App < Karafka::App setup do |config| config.kafka.seed_brokers = ::Settings.kafka.seed_brokers config.kafka.offset_commit_threshold = 30 config.client_id = ::Settings.name end consumer_groups.draw do # consumer groups definitions go here end end App.boot! # Overwrite default delivery setting and don't send in the test env WaterDrop.setup do |water_config| water_config.deliver = !Karafka.env.test? end
Responders are still the recommended way to generate Kafka messages, however if you want, you can use WaterDrop directly:
# For sync WaterDrop::SyncProducer.call('message', topic: 'my-topic') # or for async WaterDrop::AsyncProducer.call('message', topic: 'my-topic')
Async support per topic for responders
As mentioned above, WaterDrop now supports both synchronous and asynchronous way of producing messages. If wouldn’t make any sense, if the same would not be available for responders. From now on, you can decide on a delivery method per topic with which you decide to work:
class ExampleResponder < ApplicationResponder topic :regular_topic topic :async_topic, async: true def respond(user, profile) respond_to :regular_topic, user # This will be sent async respond_to :async_topic, user end end
New set of callbacks for better flow control
Callbacks can be used to trigger some actions on certain moments of Karafka messages receiving flow. You can use them for additional actions that need to take place at certain moments. They are not available by default, as we don’t want to provide functionalities that are not required by users by default.
In order to be able to use them, you need to include Karafka::Controllers::Callbacks module into your controller class:
class ExamplesController < Karafka::BaseController include Karafka::Controllers::Callbacks after_fetched do # Some logic here end def consume # Some logic here end end
Currently there are four callbacks available:
- after_fetched – executed right after we fetch messages from Kafka but before the main logic kicks in.
- before_stop – executed before the shutdown process kicks in. Really useful if you use manual offset management.
- after_poll – executed after each attempt to fetch messages from Kafka (even when there is no data).
- before_poll – executed before each attempt to fetch messages from Kafka (even when there is no data).
Please visit the Callbacks Wiki section for more details.
Incompatibilities and breaking changes
after_received callback renamed to after_fetched
IF you use the after_received callback, you will have to do two things to make it work with 1.1:
- Rename it from after_received to after_fetched
- include Karafka::Controllers::Callbacks module inside of your controller
class ExamplesController < Karafka::BaseController include Karafka::Controllers::Callbacks after_fetched do # Some logic here end end
connection_pool config options are no longer needed
WaterDrop 1.0 uses in-thread consumer pool, so connection pool is no longer required. You need to remove all connection_pool related settings.
Celluloid config options are no longer needed
Karafka no longer uses Celluloid, so all the Celluloid options are no longer needed.
#perform is now renamed to #consume
#perform has been renamed to #consume. Please update all your controllers to match this change.
class ExamplesController < Karafka::BaseController include Karafka::Controllers::Callbacks # Old name def perform # Some logic here end # New name def consume # Some logic here end end
Renamed batch_consuming option to batch_fetching and batch_processing to batch_consuming
We’re constantly trying to unify naming conventions. Due to some misunderstanding on what is consuming and what is processing, we’ve decided to rename them. So just to clarify:
- fetching is a process of receiving messages from Kafka cluster (no user business logic involved)
- consuming is a process of applying your business logic na processing the data
- if you want to fetch messages from Kafka in batches, you need to have batch_fetching set to true
- if you also want to work with messages in batches, you need to have batch_consuming set to true
Apart from code changes, we also updated Wiki pages accordingly and added the FAQ section.
Celluloid dropped in favor of native thread pool management
Mike has a great explanation related to Sidekiq on that one. Our reasons were quite similar:
- Memory consumption,
- Way more dependencies needed,
- Abstraction overhead.
Also, with a bit of Ruby-Kafka patching, we don’t need an extra layer to handle shutting down and other edge cases.
Unused dependencies cleanup
We’re constantly working on lowering the memory footprint of Karafka. It turned out, that we would load some of the parts of ActiveSupport that weren’t required. This is now cleaned up. Also we’re planning to completely drop ActiveSupport requirement as we’re not the biggest fans of this gem.
Getting started with Karafka
If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:
git clone https://github.com/karafka/karafka-example-app ./example_app
then, just bundle install all the dependencies:
cd ./example_app bundle install
and follow the instructions from the example app Wiki.