Tag: Ruby on Rails

Karafka (Ruby + Kafka) framework 1.1.0 Release Notes

Note: These release notes cover only the major changes. To learn about various bug fixes and changes, please refer to the change logs or check out the list of commits in the main Karafka repository on GitHub.

Time passes by, Kafka is already 1.0 and Karafka is already 1.1.

Code quality

I will start from the same thing as with 1.0. We're constantly working on having a better and easier code base. Apart from adding several new monitoring tools to our code quality stack, we were able to maintain a pretty decent offenses distribution and trends.

It's also worth noting, that our reaction time towards gem updates and other upgrades have significantly improved, which means that we're almost always up to date with all the dependencies.

Features

There are several new features included in this release. Most of them focus on advanced users, that needed a better control over processing flow. However, this does not mean, that regular "consumers" won't benefit from them. Features from this release give you space to expand your applications beyond simple consumption and allow you to process more and faster.

Manual offset management

Most of the time you will be happy with the automatic offset management, however there are some cases in which you might be interested in taking control over this process. It can be helpful i.a.:

  • In memory DDD sagas realization,
  • Buffering,
  • Simulating transactions.

In a real shortcut, this is how you can use it:

Disable automatic offset management either on the app or the consumer group level:

class App < Karafka::App
  consumer_groups.draw do
    consumer_group :events do
      automatically_mark_as_consumed false

      topic :user_events do
        controller EventsController
      end
    end
  end
end

and just commit your offsets manually:

def consume
  # Do something with messages
  EventStore.store(params_batch.parsed)
  # And now mark last message as consumed,
  # so we won't consume any of already processed messages again
  consumer.mark_as_consumed params_batch.to_a.last
end

Note: You can read more about this feature in the Manual offset management (checkpointing) Wiki section.

WaterDrop 1.0 with sync and async support

WaterDrop is a standalone messages producer that is integrated with Karafka out of the box.

We've recently redesigned both its internals and the API, to make it better, less memory consuming, easier to use and more bullet-proof.

Karafka 1.1 comes with full WaterDrop 1.0 support, including both synchronous and asynchronous producers. It also integrates automatically with it, populating all the options related to Kafka that were set during the Karafka framework configuration.

In case you want to change WaterDrop configuration settings, you can do this after you setup and boot Karafka framework in the karafka.rb file:

class App < Karafka::App
  setup do |config|
    config.kafka.seed_brokers = ::Settings.kafka.seed_brokers
    config.kafka.offset_commit_threshold = 30
    config.client_id = ::Settings.name
  end

  consumer_groups.draw do
    # consumer groups definitions go here
  end
end

App.boot!

# Overwrite default delivery setting and don't send in the test env
WaterDrop.setup do |water_config|
  water_config.deliver = !Karafka.env.test?
end

Responders are still the recommended way to generate Kafka messages, however if you want, you can use WaterDrop directly:

# For sync
WaterDrop::SyncProducer.call('message', topic: 'my-topic')
# or for async
WaterDrop::AsyncProducer.call('message', topic: 'my-topic')

Async support per topic for responders

As mentioned above, WaterDrop now supports both synchronous and asynchronous way of producing messages. If wouldn't make any sense, if the same would not be available for responders. From now on, you can decide on a delivery method per topic with which you decide to work:

class ExampleResponder < ApplicationResponder
  topic :regular_topic
  topic :async_topic, async: true

  def respond(user, profile)
    respond_to :regular_topic, user
    # This will be sent async
    respond_to :async_topic, user
  end
end

New set of callbacks for better flow control

Callbacks can be used to trigger some actions on certain moments of Karafka messages receiving flow. You can use them for additional actions that need to take place at certain moments. They are not available by default, as we don't want to provide functionalities that are not required by users by default.

In order to be able to use them, you need to include Karafka::Controllers::Callbacks module into your controller class:

class ExamplesController < Karafka::BaseController
  include Karafka::Controllers::Callbacks

  after_fetched do
    # Some logic here
  end

  def consume
    # Some logic here
  end
end

Currently there are four callbacks available:

  • after_fetched - executed right after we fetch messages from Kafka but before the main logic kicks in.
  • before_stop - executed before the shutdown process kicks in. Really useful if you use manual offset management.
  • after_poll - executed after each attempt to fetch messages from Kafka (even when there is no data).
  • before_poll - executed before each attempt to fetch messages from Kafka (even when there is no data).

Please visit the Callbacks Wiki section for more details.

Incompatibilities and breaking changes

after_received callback renamed to after_fetched

IF you use the after_received callback, you will have to do two things to make it work with 1.1:

  • Rename it from after_received to after_fetched
  • include Karafka::Controllers::Callbacks module inside of your controller
class ExamplesController < Karafka::BaseController
  include Karafka::Controllers::Callbacks

  after_fetched do
    # Some logic here
  end
end

connection_pool config options are no longer needed

WaterDrop 1.0 uses in-thread consumer pool, so connection pool is no longer required. You need to remove all connection_pool related settings.

Celluloid config options are no longer needed

Karafka no longer uses Celluloid, so all the Celluloid options are no longer needed.

#perform is now renamed to #consume

#perform has been renamed to #consume. Please update all your controllers to match this change.

class ExamplesController < Karafka::BaseController
  include Karafka::Controllers::Callbacks

  # Old name
  def perform
    # Some logic here
  end

  # New name
  def consume
    # Some logic here
  end
end

Renamed batch_consuming option to batch_fetching and batch_processing to batch_consuming

We're constantly trying to unify naming conventions. Due to some misunderstanding on what is consuming and what is processing, we've decided to rename them. So just to clarify:

  • fetching is a process of receiving messages from Kafka cluster (no user business logic involved)
  • consuming is a process of applying your business logic na processing the data

So now:

  • if you want to fetch messages from Kafka in batches, you need to have batch_fetching set to true
  • if you also want to work with messages in batches, you need to have batch_consuming set to true

Other improvements

Wiki updates

Apart from code changes, we also updated Wiki pages accordingly and added the FAQ section.

Celluloid dropped in favor of native thread pool management

Mike has a great explanation related to Sidekiq on that one. Our reasons were quite similar:

  • Memory consumption,
  • Way more dependencies needed,
  • Abstraction overhead.

Also, with a bit of Ruby-Kafka patching, we don't need an extra layer to handle shutting down and other edge cases.

Unused dependencies cleanup

We're constantly working on lowering the memory footprint of Karafka. It turned out, that we would load some of the parts of ActiveSupport that weren't required. This is now cleaned up. Also we're planning to completely drop ActiveSupport requirement as we're not the biggest fans of this gem.

Getting started with Karafka

If you want to get started with Kafka and Karafka as fast as possible, then the best idea is to just clone our example repository:

git clone https://github.com/karafka/karafka-example-app ./example_app

then, just bundle install all the dependencies:

cd ./example_app
bundle install

and follow the instructions from the example app Wiki.

Dry-Validation as a schema validation layer for Ruby on Rails API

Legacy code is never the way we would like it to be

There are days, when you don't get to write your new shiny application using Grape and JSON-API. Life would be much easier, if we could always start from the beginning. Unfortunately we can't and one of the things that make some developers more valuable that others is their ability to adapt. The app might be outdated, it might have abandoned gems, but if you're able to introduce new concepts to it (without breaking it and having to rewrite everything), you should be able to overcome any other difficulties.

ActiveRecord::Validations is not the way to go

If you use Rails, then probably you use ActiveRecord::Validations as the main validation layer. Model validations weren't designed as a protection layer for ensuring incoming data schema consistency (and there are many more reasons why you should stop using them).

External world and its data won't always resemble your internal application structure. Instead of trying to adapt the code-base and fit it into the world, try making the world as much constant and predictable as it can be.

One of the best things in that matter is to ensure that any endpoint input data is as strict as it can be. You can do quite a lot, before it gets to your ActiveRecord models or hits any business logic.

Dry-Validation - different approach to solve different problems

Unlike other, well known, validation solutions in Ruby, dry-validation takes a different approach and focuses a lot on explicitness, clarity and precision of validation logic. It is designed to work with any data input, whether it’s a simple hash, an array or a complex object with deeply nested data.

Dry-validation is a library that can help you protect your typical Rails API endpoints better. You will never control what you will receive, but you can always make sure that it won't reach your business.

Here's a basic example of how it works (assuming we validate a single hash):

BeaconSchema = Dry::Validation.Schema do
  UUID_REGEXP = /\A([a-z|0-9]){32}\z/
  PROXIMITY_ZONES = %w(
    immediate
    near
    far
    unknown
  )

  required(:uuid).filled(:str?, size?: 32, format?: UUID_REGEXP)
  required(:rssi).filled(:int?, lteq?: -26, gteq?: -100)
  required(:major).filled(:int?, gteq?: 0, lteq?: 65535)
  required(:minor).filled(:int?, gteq?: 0, lteq?: 65535)
  required(:proximity_zone).filled(:str?, included_in?: PROXIMITY_ZONES)
  required(:distance) { filled? & ( int? | float? ) & gteq?(0) & lteq?(100)  }
end

data = {}
validation_result = BeaconSchema.call(data)
validation_result.errors #=> {:uuid=>["is missing"], :rssi=>["is missing"], ... }

The validation result object is really similar to the validation result of ActiveRecord::Validation in terms of usability.

Dry Ruby on Rails API

Integrating dry-validation with your Ruby on Rails API endpoints can give you multiple benefits, including:

  • No more strong parameters (they will become useless when you cover all the endpoints with dry-validation)
  • Schemas testing in isolation (not in the controller request - response flow)
  • Model validations that focus only on the core details (without the "this data is from form and this from the api" dumb cases)
  • Less coupling in your business logic
  • Safer extending and replacing of endpoints
  • More DRY
  • Nested structures validation

Non-obstructive data schema for a controller layer

There are several approaches you can take when implementing schema validation for your API endpoints. One that I find useful especially in legacy projects is the #before_action approach. Regardless whether #before_action is (or is not) an anti-pattern, in this case, it can be used to provide non-obstructive schema and data validation that does not interact with other layers (like services, models, etc.).

To implement such protection, you only need a couple lines of code):

class Api::BaseController < ApplicationController
  # Make sure that request data meets schema requirements
  before_action :ensure_schema!

  # Accessor for validation results hash
  attr_reader :validation_result
  # Schema assigned on a class level
  class_attribute :schema

  private

  def ensure_schema!
    @validation_result = self.class.schema.call(params)
    return if @validation_result.success?

    render json: @validation_result.errors.to_json, status: :unprocessable_entity
  end
end

and in your final API endpoint that inherits from the Api::BaseController:

class Api::BeaconsController < Api::BaseController
  self.schema = BeaconSchema

  def create
    # Of course you can still have validations on a beacon level
    Beacon.create!(validation_result.to_h)
    head :no_content
  end
end

Summary

Dry-validation is one of my "must use" gems when working with any type of incoming data (even when it comes from my other systems). As presented above, it can be useful in many situations and can help reduce the number of responsibilities imposed on models.

This type of validation layer can also serve as a great starting point for any bigger refactoring process. Ensuring incoming data and schema consistency without having to refer to any business models allows to block the API without blocking the business and models behind it.

Cover photo by: brlnpics123 on Creative Commons 2.0 license. Changes made: added an overlay layer with an article title on top of the original picture.

Copyright © 2024 Closer to Code

Theme by Anders NorenUp ↑