Running with Ruby

Tag: parallel (page 2 of 2)

Enforcing unique jobs in Karafka and Sidekiq for single resources

Note: For the case of simplicity I skipped some of the corner cases to make this article less complicated and more understandable.

When working with multi threaded and multi process systems that consume messages in parallel, you need to be able to enforce some limitations on the processing order.

Most of the time, in well designed systems, things should be based on atomic (in terms of not being dependent on any other worker job) jobs that can run whenever they need to. However, in some cases you need to make sure that jobs related to a given resource are not executed at the same time. This can be achieved with Karafka and Sidekiq in 3 ways:

  • Standalone Karafka mode – single Karafka process will consume messages one by one and process them inline (without using Sidekiq at all). Since Kafka gives us a per partition ordering, same applies to standalone Karafka mode.
  • Karafka default mode + single Sidekiq worker with a single thread. Since single threaded Sidekiq process uses FIFO, this can be achieved with this combination.
  • Karafka + Sidekiq + SidekiqUniqueJobs gem. This combination allows us to build complex multi threaded and multi process workers that will ensure that a single resource is being modified by maximum 1 worker at the same time (while executing uniqueness).

In this article I will focus on the last option, as it is the best in terms of performance and flexibility.

A bit about state machines

Having a single resource on which many actions can be performed is always a problem. To ensure (on an object level) that we won’t process things in parallel we can use state machines (aasm is a great gem for that) to achieve locking. However, if we only do that, we will have to implement a bit more logic when considering a possibility of many jobs doing same stuff at the same time:

  • Detecting and waiting if someone else is already using (whatever that means) a given resource.
  • Retrying in case of state machine failure (invalid transitions).

But hey! We use Sidekiq! It means that retrying is already built in and it will restart the job later on. It also means, that at some point, the state machine lock will be removed and we will be able to finish all the jobs.

That is true, however we should not rely on that because a case like that shouldn’t be considered an exception. Instead we should prevent situations like that from happening at all.

Problem definition

Let’s say we have a RemoteResource representation that needs to be refreshed periodically (every 5 seconds) and we need to make sure that we never overwrite it’s content with older data.

class RemoteResource < ActiveRecord::Base
  include AASM aasm do
    state :initial, initial: true
    state :running

    event :run do
      transitions :from => :initial, :to => :running
    end

    event :finish do
      transitions :from => :running, :to => :initial
    end
  end

  def refresh!
    # Some external API calls and other business stuff
    # @note I know that this shoould be in a service but
    # again, lets keep things simple
    update!(content: RemoteData.fetch(id))
  end
end

Happy, single threaded execution example:

resource = RemoteResource.find(params[:id])
resource.run!
resource.refresh!
resource.finish!

This code will run without problems as long as:

  • Our processing (including external API call) does not take more time than 5 seconds
  • Sidekiq workers are up and running (at least matching the speed of enqueuing)

But lets kill all the workers for 1 minute and run them again. We end up with a queue full of messages that will be processed at the same time. And then we might have jobs for the same object enqueued one after another, which means that they will be executed at the same time:

untitled-diagram
In a case like that, both workers will overlap processing the same resource. It means that we might end up with outdated data (network delay for an older resource) or a state machine exception. In more complex cases it could mean corrupted data.

Since Karafka passes messages into Sidekiq, this problem can occur there as well. The worse part is that it can get much worse when we perform many types of tasks on a single resource and they should never overlap.

Solution: SidekiqUniqueJobs

SidekiqUniqueJobs is a great gem that solves exactly this issue. It makes sure (among other options) that for a given set of arguments, only a single job can run at the same time. In our case  unique arguments would be:

  • Worker name
  • RemoteResource#id

This is enough to competeour task! Jobs for different resources will be processed at the same time, but no more parallel processing on the same RemoteResource.

untitled-diagram1

Implementation in Karafka application

To use this gem with Karafka application, you need to build a uniqueness scope before a job is enqueued. This can be done inside the #before_enqueue block:

class KarafkaController < Karafka::BaseController
  before_enqueue do
    params[:uniqueness_scope] = params.dig(:remote_resource, :id)
  end
end

you also need to configure your Karafka base worker to set uniqueness key based on this param value:

class ApplicationWorker < Karafka::BaseWorker
  sidekiq_options unique: :while_executing

  def self.unique_args(args)
    [
      args.first, # This is this worker name
      args.last['uniqueness_scope'] || SecureRandom.uuid
    ]
  end
end

Note, that we use SecureRandom.uuid as a fallback for jobs for which we don’t want to make uniqueness execution. Random uuid will ensure that all the jobs without a uniqueness_scope can perform at the same time. If we would leave nil, it would be treated as any other value so no jobs would run in parallel.

Speeding up RSpec and Cucumber on the CI server with PostgreSQL fsync flag and parallel execution

After we’ve added RSpec and Cucumber (with PhantomJS) to our CI build process, it got really, really slow. Due to the application character, after each scenario (for Cucumber) we truncate and restore the whole database. 45 minutes for a single build is definitely not what we aimed to get. So, how to speed up tests execution?

First we thought, that we could run RSpec and Cucumber stuff in parallel (using parallel tests gem). We’ve got a much better machine on AWS to make sure that a single process has a single core to use. Unfortunately everything got… slower. We’ve decided to pinpoint a single RSpec spec and a single Cucumber scenario that would be representative and figure out what the hell. What we’ve discovered at the beginning, is that all the specs were running faster on the Ruby level. It all got significantly slower because of the database. Our tests were heavy in terms of DB communication and as I said before, due to it’s character, it will probably stay that way.

So, what were our options?

  • We could get a much better hardware for our testing DBs. Bigger, faster, with SSD, however it would definitely make things more expensive
  • We could compromise data consistency. Since it is a testing cluster – in case of a system failure / crash /shutdown, we can just drop all the databases and repopulate them again

We’ve decided to try out the second approach and use fsync PostgreSQL flag to tweak this database a little bit.

What is fsync (quote from PostgreSQL documentation)?

If this parameter is on, the PostgreSQL server will try to make sure that updates are physically written to disk, by issuing fsync() system calls or various equivalent methods (see wal_sync_method). This ensures that the database cluster can recover to a consistent state after an operating system or hardware crash.

While turning off fsync is often a performance benefit, this can result in unrecoverable data corruption in the event of a power failure or system crash. Thus it is only advisable to turn off fsync if you can easily recreate your entire database from external data.

Examples of safe circumstances for turning off fsync include the initial loading of a new database cluster from a backup file, using a database cluster for processing a batch of data after which the database will be thrown away and recreated, or for a read-only database clone which gets recreated frequently and is not used for failover. High quality hardware alone is not a sufficient justification for turning off fsync.

Results were astonishing! Since we’re no longer as much dependent on our HDDs performance for each operation, the database layer does not slow us down that much.

Zrzut ekranu z 2016-03-31 12:16:09

Zrzut ekranu z 2016-03-31 12:44:35

Zrzut ekranu z 2016-03-31 12:20:48

Overall thanks to this tweak and parallel execution, we’ve managed to get down from 45 minutes for a whole build, down to 12 minutes. That is 75% faster than before and this build time is acceptable for us.

Research done by: Adam Gwozdowski

Newerposts

Copyright © 2021 Running with Ruby

Theme by Anders NorenUp ↑