From 029b08f53d94fbdbedf0a207f50e0f54b38671b0 Mon Sep 17 00:00:00 2001 From: piotrek Date: Fri, 22 Dec 2017 20:19:59 +0100 Subject: [PATCH 1/3] Don't serialize jobs into workflow JSON, it's redundant --- lib/gush/workflow.rb | 1 - spec/gush/workflow_spec.rb | 30 +----------------------------- 2 files changed, 1 insertion(+), 30 deletions(-) diff --git a/lib/gush/workflow.rb b/lib/gush/workflow.rb index b76dc7c..287d9b4 100644 --- a/lib/gush/workflow.rb +++ b/lib/gush/workflow.rb @@ -168,7 +168,6 @@ def to_hash total: jobs.count, finished: jobs.count(&:finished?), klass: name, - jobs: jobs.map(&:as_json), status: status, stopped: stopped, started_at: started_at, diff --git a/spec/gush/workflow_spec.rb b/spec/gush/workflow_spec.rb index 4e10a09..4e988e2 100644 --- a/spec/gush/workflow_spec.rb +++ b/spec/gush/workflow_spec.rb @@ -102,35 +102,7 @@ def configure(*args) "started_at" => nil, "finished_at" => nil, "stopped" => false, - "arguments" => ["arg1", "arg2"], - "jobs" => [ - { - "name"=>a_string_starting_with('FetchFirstJob'), - "klass"=>"FetchFirstJob", - "incoming"=>[], - "outgoing"=>[a_string_starting_with('PersistFirstJob')], - "finished_at"=>nil, - "started_at"=>nil, - "enqueued_at"=>nil, - "failed_at"=>nil, - "params" => {}, - "output_payload" => nil, - "workflow_id" => an_instance_of(String) - }, - { - "name"=>a_string_starting_with('PersistFirstJob'), - "klass"=>"PersistFirstJob", - "incoming"=>["FetchFirstJob"], - "outgoing"=>[], - "finished_at"=>nil, - "started_at"=>nil, - "enqueued_at"=>nil, - "failed_at"=>nil, - "params" => {}, - "output_payload" => nil, - "workflow_id" => an_instance_of(String) - } - ] + "arguments" => ["arg1", "arg2"] } expect(result).to match(expected) end From 71a5c38c14b54d5961598c4311875c343cf5d0c0 Mon Sep 17 00:00:00 2001 From: piotrek Date: Fri, 22 Dec 2017 20:56:07 +0100 Subject: [PATCH 2/3] Use SCAN instead of KEYS to not lock redis completely with a single command --- lib/gush/client.rb | 6 +++--- lib/gush/job.rb | 4 ++-- spec/gush/workflow_spec.rb | 3 +-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/gush/client.rb b/lib/gush/client.rb index 9b70dc3..e950107 100644 --- a/lib/gush/client.rb +++ b/lib/gush/client.rb @@ -73,7 +73,7 @@ def next_free_workflow_id def all_workflows connection_pool.with do |redis| - redis.keys("gush.workflows.*").map do |key| + redis.scan_each(match: "gush.workflows.*").map do |key| id = key.sub("gush.workflows.", "") find_workflow(id) end @@ -86,7 +86,7 @@ def find_workflow(id) unless data.nil? hash = Gush::JSON.decode(data, symbolize_keys: true) - keys = redis.keys("gush.jobs.#{id}.*") + keys = redis.scan_each(match: "gush.jobs.#{id}.*") nodes = redis.mget(*keys).map { |json| Gush::JSON.decode(json, symbolize_keys: true) } workflow_from_hash(hash, nodes) else @@ -116,7 +116,7 @@ def find_job(workflow_id, job_id) hypen = '-' if job_name_match.nil? keys = connection_pool.with do |redis| - redis.keys("gush.jobs.#{workflow_id}.#{job_id}#{hypen}*") + redis.scan_each(match: "gush.jobs.#{workflow_id}.#{job_id}#{hypen}*").to_a end return nil if keys.nil? diff --git a/lib/gush/job.rb b/lib/gush/job.rb index 5ec42b8..62b7931 100644 --- a/lib/gush/job.rb +++ b/lib/gush/job.rb @@ -88,8 +88,8 @@ def ready_to_start? end def parents_succeeded? - incoming.all? do |name| - client.find_job(workflow_id, name).succeeded? + !incoming.any? do |name| + !client.find_job(workflow_id, name).succeeded? end end diff --git a/spec/gush/workflow_spec.rb b/spec/gush/workflow_spec.rb index 4e988e2..e44e0c9 100644 --- a/spec/gush/workflow_spec.rb +++ b/spec/gush/workflow_spec.rb @@ -13,8 +13,7 @@ def configure(*args) end expect_any_instance_of(klass).to receive(:configure).with("arg1", "arg2") - flow = klass.new("arg1", "arg2") - + klass.new("arg1", "arg2") end end From 3085a61db0d583078e1d1cb89a613c6ac263f501 Mon Sep 17 00:00:00 2001 From: piotrek Date: Sat, 23 Dec 2017 11:57:37 +0100 Subject: [PATCH 3/3] Map nodes instead of pushing --- lib/gush/client.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/gush/client.rb b/lib/gush/client.rb index e950107..ed7e999 100644 --- a/lib/gush/client.rb +++ b/lib/gush/client.rb @@ -153,14 +153,14 @@ def enqueue_job(workflow_id, job) private - def workflow_from_hash(hash, nodes = nil) + def workflow_from_hash(hash, nodes = []) flow = hash[:klass].constantize.new(*hash[:arguments]) flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] - (nodes || hash[:nodes]).each do |node| - flow.jobs << Gush::Job.from_hash(node) + flow.jobs = nodes.map do |node| + Gush::Job.from_hash(node) end flow