Introduction

Karafka is a Ruby and Rails framework designed to simplify processing messages consumed from Apache Kafka.

One of Karafka's components is the Web UI. It provides a convenient way for developers to monitor and manage their Karafka-based applications without using the command line or third-party software.

The interface, amongst others, includes:

  • historical metrics,
  • real-time aggregated metrics,
  • real-time information on resource usage,
  • errors details,
  • performance statistics,
  • stale partitions detection (LSO hangs),
  • routing pattern matched topics subscriptions.


It bridges the technical workings of Karafka and the humans overseeing them, ensuring smoother operations, quicker troubleshooting, and enhanced overall visibility.

karafka web ui

Why Track Job Progress?

Monitoring Karafka's job progress can be crucial, especially for longer tasks. Here's why:

  • Extended Jobs: Some jobs naturally take longer due to the data they handle. Monitoring helps differentiate between a naturally long job and one facing issues,

  • Stuck Jobs: Jobs that hang or get stuck can go unnoticed without monitoring. This wastes resources and can slow down the entire system,

  • Batch Processing: Karafka often works on batches of messages, processing each in sequence. Keeping track ensures no single message causes a hold-up, controlling the flow smoothly.

In short, monitoring Karafka's jobs helps keep things efficient, timely, and problem-free.

Implementing Progress Notifications

Karafka Web UI supports process and consumer tagging. Tags can be used to add additional information about consumers and their execution, and Karafka processes themselves.

Adding progress monitoring with tags is super easy. All you need to do is to tag progress inside the consumer:

class EventsConsumer < ApplicationConsumer
  def consume
    # Start with 0 progress
    tags.add(:progress, 'progress: 0%')
    # Track consumed messages
    consumed = 0
    # Compute the ratio of each message
    rate = (100 / messages.size.to_f).round(2)

    messages.each do |message|
      Event.store!(message.payload)

      mark_as_consumed(message)
      consumed += 1
      # Update progress
      tags.add(:progress, "progress: #{(rate * consumed).ceil}%")
    end
  end
end

In case you want to abstract that away, you can always create a simple custom iterator:

class EventsConsumer < ApplicationConsumer
  def consume
    each do |message|
      Event.store!(message.payload)

      mark_as_consumed(message)
    end
  end

  private

  def each
    tags.add(:progress, 'progress: 0%')

    consumed = 0
    rate = (100 / messages.size.to_f).round(2)

    messages.each do |message|
      yield(message)

      consumed += 1
      tags.add(:progress, "progress: #{(rate * consumed).ceil}%")
    end
  end
end

Once your code is in place, there is nothing more you need to do. Karafka Web UI Tagging API will do the rest.

This is how it will look in the Web UI:

Conclusion

Keeping track of Karafka's job progress is key. Some jobs take longer, but knowing if they are progressing is essential. The Tagging API helps with this by making it easy to see job details in the Web UI. This allows for quick checks on job status and ensures everything runs smoothly. With the Tagging API and Web UI combined, managing and overseeing Karafka jobs becomes more straightforward and efficient.