Rails 8.1 brings a feature that's going to change how we handle long-running jobs: Active Job Continuations. Finally, we can interrupt and resume jobs without losing progress.
Imagine you have a job importing 100,000 records. It's processing record 67,543 when you deploy an update that causes the container to restart. Or perhaps the container running the job queue simply restarts due to reaching memory limits.
The result? You're back to square one. Even worse, if the job wasn't designed with idempotency in mind, you could end up with inconsistent data.
With Active Job Continuations, this is no longer an issue. Jobs can pause at any moment and seamlessly resume exactly where they left off after a restart.
To enable continuations in your job, include the ActiveJob::Continuable
module and use the step method to define stages:
class ProcessImportJob < ApplicationJob
include ActiveJob::Continuable
def perform(import_id)
# This always runs, even if the job is resumed
@import = Import.find(import_id)
step :validate do
@import.validate!
end
step :process_records do |step|
@import.records.find_each(start: step.cursor) do |record|
record.process
step.advance!(from: record.id)
end
end
step :reprocess_records
step :finalize
end
def reprocess_records(step)
@import.records.find_each(start: step.cursor) do |record|
record.reprocess
step.advance!(from: record.id)
end
end
def finalize
@import.finalize!
end
end
If the job gets interrupted during process_records, when it resumes:
process_records
from the last saved cursorreprocess_records
, finalize
)Cursors are the key to everything. They let you mark exactly where you are within a step. Some examples of how to implement cursors:
step :iterate_items do |step|
items[step.cursor..].each do |item|
process(item)
step.set!(step.cursor + 1)
end
end
step :iterate_items, start: 0 do |step|
items[step.cursor..].each do |item|
process(item)
step.advance!
end
end
step :process_records do |step|
records.find_each(start: step.cursor) do |record|
record.process
step.advance!(from: record.id) # Uses the real ID, not sequential
end
end
For more complex cases, you can use arrays as cursors:
step :process_nested_records, start: [0, 0] do |step|
Account.find_each(start: step.cursor[0]) do |account|
account.records.find_each(start: step.cursor[1]) do |record|
record.process
step.set!([account.id, record.id + 1])
end
step.set!([account.id + 1, 0])
end
end
A checkpoint is where a job can be safely interrupted. They're created automatically:
set!
, advance!
, or checkpoint!
step :destroy_records do |step|
records.find_each do |record|
record.destroy!
step.checkpoint! # Allow interruptions here
end
end
When it hits a checkpoint, the job asks the queue adapter if it should stop (stopping?
). If so, it saves progress and requeues itself to continue later.
If a job fails after making progress (completed a step or advanced the cursor), it automatically retries while keeping the progress. This prevents losing work when something fails temporarily.
To work, your queue adapter must implement the stopping?
method. It's already available in:
Other adapters like Solid Queue, Delayed Job, and Resque will need to implement it for full support.
Active Job Continuations shine for:
Unlike gems like job-iteration
, continuations:
To use Active Job Continuations:
step
stopping?
It's a powerful tool but requires careful thought. As the docs say: "Continuations are a sharp knife" - you need to manually handle checkpoints and cursors, but in return you get full flexibility.
Contrary to popular belief, developing your project using Ruby on Rails won't make it more difficult to maintain. It's a competitive advantage, nowadays.
Leer el artículoThis is a question we are asked all too frequently from outside the company. However, we recently asked this very question ourselves. Yes, we only do Ruby and that isn't going to change anytime soon.
Leer el artículoI keep hearing that “AI is killing developer jobs.” Let’s zoom out and see whether that is actually true.
Leer el artículo