master
aeris 2 years ago
parent 434da61c5f
commit ee4e03f38d
  1. 10
      lib/sidekiq/workflow/job.rb
  2. 44
      lib/sidekiq/workflow/worker.rb
  3. 224
      spec/job_spec.rb
  4. 204
      spec/workflow_spec.rb

@ -57,6 +57,16 @@ class Sidekiq::Workflow::Job
self.new workflow.id, SecureRandom.uuid, klass, *args
end
def error!(exception)
now = Time.now
self.errors << { date: now, error: exception.to_s }
self.error_at = now
end
def fail!
self.failed_at = Time.now
end
def persist!
Sidekiq::Workflow::Client.instance.persist_job self
end

@ -1,27 +1,47 @@
module Sidekiq::Workflow::Worker
module InstanceMethods
def perform(id, *args, **kwargs)
@job = Sidekiq::Workflow::Job.find id
@workflow = @job.workflow
@job.started_at ||= Time.now
result, exception = nil, nil
@job = Sidekiq::Workflow::Job.find id
@workflow = @job.workflow
@job.started_at ||= Time.now
result, exception, abort = nil, nil, false
begin
result = super *args, **kwargs
@job.finished_at = Time.now
rescue => e
rescue AbortException => e
abort = true
exception = e.exception
@job.fail!
rescue Exception => e
exception = e
now = Time.now
@job.errors << { date: now, error: e.to_s }
@job.error_at = now
end
@job.error! exception if exception
@job.persist!
self.perform_ready_jobs! unless exception
raise exception if exception
raise exception if exception && !abort
result
end
class AbortException < Exception
def initialize(args)
@args = args
end
def exception
exception = @args.first
return exception if exception.is_a? Exception
clazz, *args = @args
return clazz.new *args if clazz.is_a? Class
RuntimeError.new *@args
end
end
def abort!(*args)
raise AbortException, args
end
private
def perform_ready_jobs!
@ -59,9 +79,9 @@ module Sidekiq::Workflow::Worker
base.prepend InstanceMethods
base.singleton_class.prepend ClassMethods
base.sidekiq_retries_exhausted do |msg, _|
id = msg['args'].first
@job = Sidekiq::Workflow::Job.find id
@job.failed_at = Time.now
id = msg['args'].first
@job = Sidekiq::Workflow::Job.find id
@job.fail!
@job.persist!
end
end

@ -1,10 +1,10 @@
RSpec.describe Sidekiq::Workflow::Job do
before(:each) do
Sidekiq::Worker.clear_all
end
module JobSpec
RSpec.describe Sidekiq::Workflow::Job do
before(:each) do
Sidekiq::Worker.clear_all
end
it 'must allow sidekiq configuration per job' do
class TestJob1
TestJob1 = Class.new do
include Sidekiq::Workflow::Worker
sidekiq_options queue: :test_1, retry: 1
sidekiq_retry_in { 10 }
@ -12,7 +12,7 @@ RSpec.describe Sidekiq::Workflow::Job do
def perform(...) end
end
class TestJob2
TestJob2 = Class.new do
include Sidekiq::Workflow::Worker
sidekiq_options queue: :test_2, retry: 2
sidekiq_retry_in { 20 }
@ -20,111 +20,145 @@ RSpec.describe Sidekiq::Workflow::Job do
def perform(...) end
end
class TestWorkflow < Sidekiq::Workflow
def configure
job TestJob1
job TestJob2
TestAbortingJob = Class.new do
include Sidekiq::Workflow::Worker
def perform
self.abort! 'some message'
end
end
TestWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
job1 = TestJob1.jobs.first
expect(job1).to include 'retry' => 1, 'queue' => 'test_1'
queue1 = Sidekiq::Queues['test_1']
expect(queue1.size).to eq 1
job1 = queue1.first
expect(job1).to include 'class' => 'TestJob1'
expect(TestJob2.jobs.size).to eq 1
job2 = TestJob2.jobs.first
expect(job2).to include 'retry' => 2, 'queue' => 'test_2'
queue2 = Sidekiq::Queues['test_2']
expect(queue2.size).to eq 1
job2 = queue2.first
expect(job2).to include 'class' => 'TestJob2'
end
it 'must allow sidekiq configuration per job' do
testWorkflow = Class.new Sidekiq::Workflow do
def configure
job TestJob1
job TestJob2
end
end
describe '#status' do
it 'must have the correct state given timestamp' do
job = described_class.create OpenStruct.new(id: ''), Class
expect(job.status).to eq :pending
job.enqueued_at = Time.now
expect(job.status).to eq :enqueued
job.started_at = Time.now
expect(job.status).to eq :started
job.error_at = Time.now
expect(job.status).to eq :error
job.finished_at = Time.now
expect(job.status).to eq :finished
job.failed_at = Time.now
expect(job.status).to eq :failed
testWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
job1 = TestJob1.jobs.first
expect(job1).to include 'retry' => 1, 'queue' => 'test_1'
queue1 = Sidekiq::Queues['test_1']
expect(queue1.size).to eq 1
job1 = queue1.first
expect(job1).to include 'class' => 'JobSpec::TestJob1'
expect(TestJob2.jobs.size).to eq 1
job2 = TestJob2.jobs.first
expect(job2).to include 'retry' => 2, 'queue' => 'test_2'
queue2 = Sidekiq::Queues['test_2']
expect(queue2.size).to eq 1
job2 = queue2.first
expect(job2).to include 'class' => 'JobSpec::TestJob2'
end
end
describe '#state' do
it 'must flag the job as finished in case of success' do
class TestJob
include Sidekiq::Workflow::Worker
describe '#status' do
it 'must have the correct state given timestamp' do
job = described_class.create OpenStruct.new(id: ''), Class
expect(job.status).to eq :pending
job.enqueued_at = Time.now
expect(job.status).to eq :enqueued
job.started_at = Time.now
expect(job.status).to eq :started
job.error_at = Time.now
expect(job.status).to eq :error
job.finished_at = Time.now
expect(job.status).to eq :finished
job.failed_at = Time.now
expect(job.status).to eq :failed
end
end
describe '#state' do
it 'must flag the job as finished in case of success' do
class TestJob
include Sidekiq::Workflow::Worker
def perform; end
end
def perform; end
workflow = Sidekiq::Workflow.new Class, 'workflow_1'
job = Sidekiq::Workflow::Job.create workflow, TestJob.name
allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job }
expect(job.enqueued_at).to be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
job.perform
expect(job.enqueued_at).to_not be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
TestJob.perform_one
expect(job.enqueued_at).to_not be_nil
expect(job.finished_at).to_not be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
end
workflow = Sidekiq::Workflow.new Class, 'workflow_1'
job = Sidekiq::Workflow::Job.create workflow, TestJob.name
allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job }
expect(job.enqueued_at).to be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
job.perform
expect(job.enqueued_at).to_not be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
TestJob.perform_one
expect(job.enqueued_at).to_not be_nil
expect(job.finished_at).to_not be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
end
it 'must flag the job as errored in case of error' do
class TestJob
include Sidekiq::Workflow::Worker
it 'must flag the job as errored in case of error' do
class TestJob
include Sidekiq::Workflow::Worker
def self.message=(message)
@@message = message
end
def self.message=(message)
@@message = message
def perform
raise @@message if @@message
end
end
def perform
raise @@message if @@message
workflow = Sidekiq::Workflow.new Class, 'workflow_1'
job = Sidekiq::Workflow::Job.create workflow, TestJob.name
allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job }
job.perform
expect(job.error_at).to be_nil
expect(job.errors).to be_empty
expect(job.finished_at).to be_nil
TestJob.message = 'perform_error'
Timecop.freeze do |now|
expect { TestJob.perform_one }.to raise_error 'perform_error'
expect(job.error_at).to eq now
expect(job.errors).to eq([{ date: now, error: 'perform_error' }])
end
end
expect(job.finished_at).to be_nil
workflow = Sidekiq::Workflow.new Class, 'workflow_1'
job = Sidekiq::Workflow::Job.create workflow, TestJob.name
allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job }
job.perform
expect(job.error_at).to be_nil
expect(job.errors).to be_empty
expect(job.finished_at).to be_nil
TestJob.message = 'perform_error'
Timecop.freeze do |now|
expect { TestJob.perform_one }.to raise_error 'perform_error'
expect(job.error_at).to eq now
expect(job.errors).to eq([{ date: now, error: 'perform_error' }])
job.perform
TestJob.message = nil
TestJob.perform_one
expect(job.error_at).to_not be_nil
expect(job.finished_at).to_not be_nil
end
expect(job.finished_at).to be_nil
end
job.perform
TestJob.message = nil
TestJob.perform_one
expect(job.error_at).to_not be_nil
expect(job.finished_at).to_not be_nil
describe '#abort!' do
it 'must cancel the job' do
workflow = Sidekiq::Workflow.new Class, 'workflow_1'
job = Sidekiq::Workflow::Job.create workflow, TestAbortingJob.name
allow(Sidekiq::Workflow::Job).to receive(:find).with(job.id) { job }
expect(job.enqueued_at).to be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
job.perform
expect(job.enqueued_at).to_not be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to be_nil
expect(job.failed_at).to be_nil
TestAbortingJob.perform_one
expect(job.enqueued_at).to_not be_nil
expect(job.finished_at).to be_nil
expect(job.error_at).to_not be_nil
expect(job.failed_at).to_not be_nil
end
end
end
end

@ -1,131 +1,131 @@
RSpec.describe Sidekiq::Workflow do
describe '::start!' do
class TestJob
include Sidekiq::Workflow::Worker
def perform(...) end
module WorkflowSpec # Isolate class to avoid polluting global scope
RSpec.describe Sidekiq::Workflow do
before(:each) do
Sidekiq::Worker.clear_all
end
class TestJob1 < TestJob; end
describe '::start!' do
TestJob = Class.new do
include Sidekiq::Workflow::Worker
class TestJob2 < TestJob; end
class TestJob3 < TestJob; end
def perform(...) end
end
before(:each) do
Sidekiq::Worker.clear_all
end
TestJob1 = Class.new TestJob
TestJob2 = Class.new TestJob
TestJob3 = Class.new TestJob
it 'must handle after job' do
class TestWorkflow < Sidekiq::Workflow
def configure
job1 = job TestJob1
job TestJob2, after: job1
it 'must handle after job' do
testWorkflow = Class.new Sidekiq::Workflow do
def configure
job1 = job TestJob1
job TestJob2, after: job1
end
end
end
TestWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs).to be_empty
testWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs).to be_empty
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
end
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
end
it 'must handle before job' do
class TestWorkflow < Sidekiq::Workflow
def configure
job2 = job TestJob2
job TestJob1, before: job2
it 'must handle before job' do
testWorkflow = Class.new Sidekiq::Workflow do
def configure
job2 = job TestJob2
job TestJob1, before: job2
end
end
end
TestWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs).to be_empty
testWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs).to be_empty
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
end
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
end
it 'must starts all initial jobs if any' do
class TestWorkflow < Sidekiq::Workflow
def configure
job TestJob1
job TestJob2
it 'must starts all initial jobs if any' do
testWorkflow = Class.new Sidekiq::Workflow do
def configure
job TestJob1
job TestJob2
end
end
end
TestWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs.size).to eq 1
end
testWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs.size).to eq 1
end
it 'must starts all subsequent jobs if any' do
class TestWorkflow < Sidekiq::Workflow
def configure
job1 = job TestJob1
job TestJob2, after: job1
job TestJob3, after: job1
it 'must starts all subsequent jobs if any' do
testWorkflow = Class.new Sidekiq::Workflow do
def configure
job1 = job TestJob1
job TestJob2, after: job1
job TestJob3, after: job1
end
end
end
TestWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs).to be_empty
expect(TestJob3.jobs).to be_empty
testWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs).to be_empty
expect(TestJob3.jobs).to be_empty
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
expect(TestJob3.jobs.size).to eq 1
end
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
expect(TestJob3.jobs.size).to eq 1
end
it 'must not start subsequent jobs if all not done' do
class TestWorkflow < Sidekiq::Workflow
def configure
job1 = job TestJob1
job2 = job TestJob2
job TestJob3, after: [job1, job2]
it 'must not start subsequent jobs if all not done' do
testWorkflow = Class.new Sidekiq::Workflow do
def configure
job1 = job TestJob1
job2 = job TestJob2
job TestJob3, after: [job1, job2]
end
end
end
TestWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs.size).to eq 1
expect(TestJob3.jobs).to be_empty
testWorkflow.start!
expect(TestJob1.jobs.size).to eq 1
expect(TestJob2.jobs.size).to eq 1
expect(TestJob3.jobs).to be_empty
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
expect(TestJob3.jobs).to be_empty
TestJob1.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs.size).to eq 1
expect(TestJob3.jobs).to be_empty
TestJob2.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs).to be_empty
expect(TestJob3.jobs.size).to eq 1
TestJob2.perform_one
expect(TestJob1.jobs).to be_empty
expect(TestJob2.jobs).to be_empty
expect(TestJob3.jobs.size).to eq 1
end
end
end
describe '#status' do
it 'must return correct status' do
job1 = Sidekiq::Workflow::Job.new 'workflow', 'job_1', 'class'
job2 = Sidekiq::Workflow::Job.new 'workflow', 'job_2', 'class'
workflow = Sidekiq::Workflow.new 'class', 'workflow_1',
{ job1.id => job1, job2.id => job2 }
expect(workflow.status).to eq :pending
job1.started_at = Time.now
expect(workflow.status).to eq :started
job1.error_at = Time.now
expect(workflow.status).to eq :error
job1.finished_at = Time.now
expect(workflow.status).to eq :error
job2.finished_at = Time.now
expect(workflow.status).to eq :finished
job1.failed_at = Time.now
expect(workflow.status).to eq :failed
describe '#status' do
it 'must return correct status' do
job1 = Sidekiq::Workflow::Job.new 'workflow', 'job_1', 'class'
job2 = Sidekiq::Workflow::Job.new 'workflow', 'job_2', 'class'
workflow = Sidekiq::Workflow.new 'class', 'workflow_1',
{ job1.id => job1, job2.id => job2 }
expect(workflow.status).to eq :pending
job1.started_at = Time.now
expect(workflow.status).to eq :started
job1.error_at = Time.now
expect(workflow.status).to eq :error
job1.finished_at = Time.now
expect(workflow.status).to eq :error
job2.finished_at = Time.now
expect(workflow.status).to eq :finished
job1.failed_at = Time.now
expect(workflow.status).to eq :failed
end
end
end
end

Loading…
Cancel
Save