From ee4e03f38d574908b060b615616d4cbe862d02a7 Mon Sep 17 00:00:00 2001 From: aeris Date: Mon, 23 Aug 2021 16:24:07 +0200 Subject: [PATCH] Job abort --- lib/sidekiq/workflow/job.rb | 10 ++ lib/sidekiq/workflow/worker.rb | 44 +++++-- spec/job_spec.rb | 224 +++++++++++++++++++-------------- spec/workflow_spec.rb | 204 +++++++++++++++--------------- 4 files changed, 273 insertions(+), 209 deletions(-) diff --git a/lib/sidekiq/workflow/job.rb b/lib/sidekiq/workflow/job.rb index 99a56cb..aa2fb4c 100644 --- a/lib/sidekiq/workflow/job.rb +++ b/lib/sidekiq/workflow/job.rb @@ -57,6 +57,16 @@ class Sidekiq::Workflow::Job self.new workflow.id, SecureRandom.uuid, klass, *args end + def error!(exception) + now = Time.now + self.errors << { date: now, error: exception.to_s } + self.error_at = now + end + + def fail! + self.failed_at = Time.now + end + def persist! Sidekiq::Workflow::Client.instance.persist_job self end diff --git a/lib/sidekiq/workflow/worker.rb b/lib/sidekiq/workflow/worker.rb index 0f79667..6f1bd24 100644 --- a/lib/sidekiq/workflow/worker.rb +++ b/lib/sidekiq/workflow/worker.rb @@ -1,27 +1,47 @@ module Sidekiq::Workflow::Worker module InstanceMethods def perform(id, *args, **kwargs) - @job = Sidekiq::Workflow::Job.find id - @workflow = @job.workflow - @job.started_at ||= Time.now - result, exception = nil, nil + @job = Sidekiq::Workflow::Job.find id + @workflow = @job.workflow + @job.started_at ||= Time.now + result, exception, abort = nil, nil, false begin result = super *args, **kwargs @job.finished_at = Time.now - rescue => e + rescue AbortException => e + abort = true + exception = e.exception + @job.fail! + rescue Exception => e exception = e - now = Time.now - @job.errors << { date: now, error: e.to_s } - @job.error_at = now end + @job.error! exception if exception @job.persist! self.perform_ready_jobs! unless exception - raise exception if exception + raise exception if exception && !abort result end + class AbortException < Exception + def initialize(args) + @args = args + end + + def exception + exception = @args.first + return exception if exception.is_a? Exception + clazz, *args = @args + return clazz.new *args if clazz.is_a? Class + RuntimeError.new *@args + end + end + + def abort!(*args) + raise AbortException, args + end + private def perform_ready_jobs! @@ -59,9 +79,9 @@ module Sidekiq::Workflow::Worker base.prepend InstanceMethods base.singleton_class.prepend ClassMethods base.sidekiq_retries_exhausted do |msg, _| - id = msg['args'].first - @job = Sidekiq::Workflow::Job.find id - @job.failed_at = Time.now + id = msg['args'].first + @job = Sidekiq::Workflow::Job.find id + @job.fail! @job.persist! end end diff --git a/spec/job_spec.rb b/spec/job_spec.rb index 029ecc6..f136b3d 100644 --- a/spec/job_spec.rb +++ b/spec/job_spec.rb @@ -1,10 +1,10 @@ -RSpec.describe Sidekiq::Workflow::Job do - before(:each) do - Sidekiq::Worker.clear_all - end +module JobSpec + RSpec.describe Sidekiq::Workflow::Job do + before(:each) do + Sidekiq::Worker.clear_all + end - it 'must allow sidekiq configuration per job' do - class TestJob1 + TestJob1 = Class.new do include Sidekiq::Workflow::Worker sidekiq_options queue: :test_1, retry: 1 sidekiq_retry_in { 10 } @@ -12,7 +12,7 @@ RSpec.describe Sidekiq::Workflow::Job do def perform(...) end end - class TestJob2 + TestJob2 = Class.new do include Sidekiq::Workflow::Worker sidekiq_options queue: :test_2, retry: 2 sidekiq_retry_in { 20 } @@ -20,111 +20,145 @@ RSpec.describe Sidekiq::Workflow::Job do def perform(...) end end - class TestWorkflow < Sidekiq::Workflow - def configure - job TestJob1 - job TestJob2 + TestAbortingJob = Class.new do + include Sidekiq::Workflow::Worker + + def perform + self.abort! 'some message' end end - TestWorkflow.start! - expect(TestJob1.jobs.size).to eq 1 - job1 = TestJob1.jobs.first - expect(job1).to include 'retry' => 1, 'queue' => 'test_1' - queue1 = Sidekiq::Queues['test_1'] - expect(queue1.size).to eq 1 - job1 = queue1.first - expect(job1).to include 'class' => 'TestJob1' - - expect(TestJob2.jobs.size).to eq 1 - job2 = TestJob2.jobs.first - expect(job2).to include 'retry' => 2, 'queue' => 'test_2' - queue2 = Sidekiq::Queues['test_2'] - expect(queue2.size).to eq 1 - job2 = queue2.first - expect(job2).to include 'class' => 'TestJob2' - end + it 'must allow sidekiq configuration per job' do + testWorkflow = Class.new Sidekiq::Workflow do + def configure + job TestJob1 + job TestJob2 + end + end - describe '#status' do - it 'must have the correct state given timestamp' do - job = described_class.create OpenStruct.new(id: ''), Class - expect(job.status).to eq :pending - job.enqueued_at = Time.now - expect(job.status).to eq :enqueued - job.started_at = Time.now - expect(job.status).to eq :started - job.error_at = Time.now - expect(job.status).to eq :error - job.finished_at = Time.now - expect(job.status).to eq :finished - job.failed_at = Time.now - expect(job.status).to eq :failed + testWorkflow.start! + expect(TestJob1.jobs.size).to eq 1 + job1 = TestJob1.jobs.first + expect(job1).to include 'retry' => 1, 'queue' => 'test_1' + queue1 = Sidekiq::Queues['test_1'] + expect(queue1.size).to eq 1 + job1 = queue1.first + expect(job1).to include 'class' => 'JobSpec::TestJob1' + + expect(TestJob2.jobs.size).to eq 1 + job2 = TestJob2.jobs.first + expect(job2).to include 'retry' => 2, 'queue' => 'test_2' + queue2 = Sidekiq::Queues['test_2'] + expect(queue2.size).to eq 1 + job2 = queue2.first + expect(job2).to include 'class' => 'JobSpec::TestJob2' end - end - describe '#state' do - it 'must flag the job as finished in case of success' do - class TestJob - include Sidekiq::Workflow::Worker + describe '#status' do + it 'must have the correct state given timestamp' do + job = described_class.create OpenStruct.new(id: ''), Class + expect(job.status).to eq :pending + job.enqueued_at = Time.now + expect(job.status).to eq :enqueued + job.started_at = Time.now + expect(job.status).to eq :started + job.error_at = Time.now + expect(job.status).to eq :error + job.finished_at = Time.now + expect(job.status).to eq :finished + job.failed_at = Time.now + expect(job.status).to eq :failed + end + end + + describe '#state' do + it 'must flag the job as finished in case of success' do + class TestJob + include Sidekiq::Workflow::Worker + + def perform; end + end - def perform; end + workflow = Sidekiq::Workflow.new Class, 'workflow_1' + job = Sidekiq::Workflow::Job.create workflow, TestJob.name + allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job } + expect(job.enqueued_at).to be_nil + expect(job.finished_at).to be_nil + expect(job.error_at).to be_nil + expect(job.failed_at).to be_nil + + job.perform + expect(job.enqueued_at).to_not be_nil + expect(job.finished_at).to be_nil + expect(job.error_at).to be_nil + expect(job.failed_at).to be_nil + + TestJob.perform_one + expect(job.enqueued_at).to_not be_nil + expect(job.finished_at).to_not be_nil + expect(job.error_at).to be_nil + expect(job.failed_at).to be_nil end - workflow = Sidekiq::Workflow.new Class, 'workflow_1' - job = Sidekiq::Workflow::Job.create workflow, TestJob.name - allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job } - expect(job.enqueued_at).to be_nil - expect(job.finished_at).to be_nil - expect(job.error_at).to be_nil - expect(job.failed_at).to be_nil - - job.perform - expect(job.enqueued_at).to_not be_nil - expect(job.finished_at).to be_nil - expect(job.error_at).to be_nil - expect(job.failed_at).to be_nil - - TestJob.perform_one - expect(job.enqueued_at).to_not be_nil - expect(job.finished_at).to_not be_nil - expect(job.error_at).to be_nil - expect(job.failed_at).to be_nil - end + it 'must flag the job as errored in case of error' do + class TestJob + include Sidekiq::Workflow::Worker - it 'must flag the job as errored in case of error' do - class TestJob - include Sidekiq::Workflow::Worker + def self.message=(message) + @@message = message + end - def self.message=(message) - @@message = message + def perform + raise @@message if @@message + end end - def perform - raise @@message if @@message + workflow = Sidekiq::Workflow.new Class, 'workflow_1' + job = Sidekiq::Workflow::Job.create workflow, TestJob.name + allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job } + + job.perform + expect(job.error_at).to be_nil + expect(job.errors).to be_empty + expect(job.finished_at).to be_nil + TestJob.message = 'perform_error' + Timecop.freeze do |now| + expect { TestJob.perform_one }.to raise_error 'perform_error' + expect(job.error_at).to eq now + expect(job.errors).to eq([{ date: now, error: 'perform_error' }]) end - end + expect(job.finished_at).to be_nil - workflow = Sidekiq::Workflow.new Class, 'workflow_1' - job = Sidekiq::Workflow::Job.create workflow, TestJob.name - allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job } - - job.perform - expect(job.error_at).to be_nil - expect(job.errors).to be_empty - expect(job.finished_at).to be_nil - TestJob.message = 'perform_error' - Timecop.freeze do |now| - expect { TestJob.perform_one }.to raise_error 'perform_error' - expect(job.error_at).to eq now - expect(job.errors).to eq([{ date: now, error: 'perform_error' }]) + job.perform + TestJob.message = nil + TestJob.perform_one + expect(job.error_at).to_not be_nil + expect(job.finished_at).to_not be_nil end - expect(job.finished_at).to be_nil + end - job.perform - TestJob.message = nil - TestJob.perform_one - expect(job.error_at).to_not be_nil - expect(job.finished_at).to_not be_nil + describe '#abort!' do + it 'must cancel the job' do + workflow = Sidekiq::Workflow.new Class, 'workflow_1' + job = Sidekiq::Workflow::Job.create workflow, TestAbortingJob.name + allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job } + expect(job.enqueued_at).to be_nil + expect(job.finished_at).to be_nil + expect(job.error_at).to be_nil + expect(job.failed_at).to be_nil + + job.perform + expect(job.enqueued_at).to_not be_nil + expect(job.finished_at).to be_nil + expect(job.error_at).to be_nil + expect(job.failed_at).to be_nil + + TestAbortingJob.perform_one + expect(job.enqueued_at).to_not be_nil + expect(job.finished_at).to be_nil + expect(job.error_at).to_not be_nil + expect(job.failed_at).to_not be_nil + end end end end diff --git a/spec/workflow_spec.rb b/spec/workflow_spec.rb index 7b3a47b..89d5ff8 100644 --- a/spec/workflow_spec.rb +++ b/spec/workflow_spec.rb @@ -1,131 +1,131 @@ -RSpec.describe Sidekiq::Workflow do - describe '::start!' do - class TestJob - include Sidekiq::Workflow::Worker - - def perform(...) end +module WorkflowSpec # Isolate class to avoid polluting global scope + RSpec.describe Sidekiq::Workflow do + before(:each) do + Sidekiq::Worker.clear_all end - class TestJob1 < TestJob; end + describe '::start!' do + TestJob = Class.new do + include Sidekiq::Workflow::Worker - class TestJob2 < TestJob; end - - class TestJob3 < TestJob; end + def perform(...) end + end - before(:each) do - Sidekiq::Worker.clear_all - end + TestJob1 = Class.new TestJob + TestJob2 = Class.new TestJob + TestJob3 = Class.new TestJob - it 'must handle after job' do - class TestWorkflow < Sidekiq::Workflow - def configure - job1 = job TestJob1 - job TestJob2, after: job1 + it 'must handle after job' do + testWorkflow = Class.new Sidekiq::Workflow do + def configure + job1 = job TestJob1 + job TestJob2, after: job1 + end end - end - TestWorkflow.start! - expect(TestJob1.jobs.size).to eq 1 - expect(TestJob2.jobs).to be_empty + testWorkflow.start! + expect(TestJob1.jobs.size).to eq 1 + expect(TestJob2.jobs).to be_empty - TestJob1.perform_one - expect(TestJob1.jobs).to be_empty - expect(TestJob2.jobs.size).to eq 1 - end + TestJob1.perform_one + expect(TestJob1.jobs).to be_empty + expect(TestJob2.jobs.size).to eq 1 + end - it 'must handle before job' do - class TestWorkflow < Sidekiq::Workflow - def configure - job2 = job TestJob2 - job TestJob1, before: job2 + it 'must handle before job' do + testWorkflow = Class.new Sidekiq::Workflow do + def configure + job2 = job TestJob2 + job TestJob1, before: job2 + end end - end - TestWorkflow.start! - expect(TestJob1.jobs.size).to eq 1 - expect(TestJob2.jobs).to be_empty + testWorkflow.start! + expect(TestJob1.jobs.size).to eq 1 + expect(TestJob2.jobs).to be_empty - TestJob1.perform_one - expect(TestJob1.jobs).to be_empty - expect(TestJob2.jobs.size).to eq 1 - end + TestJob1.perform_one + expect(TestJob1.jobs).to be_empty + expect(TestJob2.jobs.size).to eq 1 + end - it 'must starts all initial jobs if any' do - class TestWorkflow < Sidekiq::Workflow - def configure - job TestJob1 - job TestJob2 + it 'must starts all initial jobs if any' do + testWorkflow = Class.new Sidekiq::Workflow do + def configure + job TestJob1 + job TestJob2 + end end - end - TestWorkflow.start! - expect(TestJob1.jobs.size).to eq 1 - expect(TestJob2.jobs.size).to eq 1 - end + testWorkflow.start! + expect(TestJob1.jobs.size).to eq 1 + expect(TestJob2.jobs.size).to eq 1 + end - it 'must starts all subsequent jobs if any' do - class TestWorkflow < Sidekiq::Workflow - def configure - job1 = job TestJob1 - job TestJob2, after: job1 - job TestJob3, after: job1 + it 'must starts all subsequent jobs if any' do + testWorkflow = Class.new Sidekiq::Workflow do + def configure + job1 = job TestJob1 + job TestJob2, after: job1 + job TestJob3, after: job1 + end end - end - TestWorkflow.start! - expect(TestJob1.jobs.size).to eq 1 - expect(TestJob2.jobs).to be_empty - expect(TestJob3.jobs).to be_empty + testWorkflow.start! + expect(TestJob1.jobs.size).to eq 1 + expect(TestJob2.jobs).to be_empty + expect(TestJob3.jobs).to be_empty - TestJob1.perform_one - expect(TestJob1.jobs).to be_empty - expect(TestJob2.jobs.size).to eq 1 - expect(TestJob3.jobs.size).to eq 1 - end + TestJob1.perform_one + expect(TestJob1.jobs).to be_empty + expect(TestJob2.jobs.size).to eq 1 + expect(TestJob3.jobs.size).to eq 1 + end - it 'must not start subsequent jobs if all not done' do - class TestWorkflow < Sidekiq::Workflow - def configure - job1 = job TestJob1 - job2 = job TestJob2 - job TestJob3, after: [job1, job2] + it 'must not start subsequent jobs if all not done' do + testWorkflow = Class.new Sidekiq::Workflow do + def configure + job1 = job TestJob1 + job2 = job TestJob2 + job TestJob3, after: [job1, job2] + end end - end - TestWorkflow.start! - expect(TestJob1.jobs.size).to eq 1 - expect(TestJob2.jobs.size).to eq 1 - expect(TestJob3.jobs).to be_empty + testWorkflow.start! + expect(TestJob1.jobs.size).to eq 1 + expect(TestJob2.jobs.size).to eq 1 + expect(TestJob3.jobs).to be_empty - TestJob1.perform_one - expect(TestJob1.jobs).to be_empty - expect(TestJob2.jobs.size).to eq 1 - expect(TestJob3.jobs).to be_empty + TestJob1.perform_one + expect(TestJob1.jobs).to be_empty + expect(TestJob2.jobs.size).to eq 1 + expect(TestJob3.jobs).to be_empty - TestJob2.perform_one - expect(TestJob1.jobs).to be_empty - expect(TestJob2.jobs).to be_empty - expect(TestJob3.jobs.size).to eq 1 + TestJob2.perform_one + expect(TestJob1.jobs).to be_empty + expect(TestJob2.jobs).to be_empty + expect(TestJob3.jobs.size).to eq 1 + end end - end - describe '#status' do - it 'must return correct status' do - job1 = Sidekiq::Workflow::Job.new 'workflow', 'job_1', 'class' - job2 = Sidekiq::Workflow::Job.new 'workflow', 'job_2', 'class' - workflow = Sidekiq::Workflow.new 'class', 'workflow_1', - { job1.id => job1, job2.id => job2 } - expect(workflow.status).to eq :pending - job1.started_at = Time.now - expect(workflow.status).to eq :started - job1.error_at = Time.now - expect(workflow.status).to eq :error - job1.finished_at = Time.now - expect(workflow.status).to eq :error - job2.finished_at = Time.now - expect(workflow.status).to eq :finished - job1.failed_at = Time.now - expect(workflow.status).to eq :failed + describe '#status' do + it 'must return correct status' do + job1 = Sidekiq::Workflow::Job.new 'workflow', 'job_1', 'class' + job2 = Sidekiq::Workflow::Job.new 'workflow', 'job_2', 'class' + workflow = Sidekiq::Workflow.new 'class', 'workflow_1', + { job1.id => job1, job2.id => job2 } + expect(workflow.status).to eq :pending + job1.started_at = Time.now + expect(workflow.status).to eq :started + job1.error_at = Time.now + expect(workflow.status).to eq :error + job1.finished_at = Time.now + expect(workflow.status).to eq :error + job2.finished_at = Time.now + expect(workflow.status).to eq :finished + job1.failed_at = Time.now + expect(workflow.status).to eq :failed + end end end end