Skip to content

Commit 76c5b2f

Browse files
author
lwoodson
committed
Refactored event publishers to have single publish method w/type arg.
1 parent 554dfd3 commit 76c5b2f

7 files changed

+53
-45
lines changed

README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ You can implement your own publishers as long as they implement event handling
6262
methods as follows:
6363

6464
```ruby
65-
def event_type(timestamp, queue, metadata, klass, *args)
65+
def publish(event_type, timestamp, queue, metadata, klass, *args)
6666
...
6767
end
6868
```

TODO

+4-3
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1-
* Do not symbolize the item hash; Use string keys
2-
* Make hostname/process to be memoized.
3-
* Check return behavior of block.
1+
* Modify the publish signature to accept a hash instead of all the params.
2+
* Add file-like streams that wrap TCP/IP and UDP sockets? (not needed for our
3+
case). Would allow use of StreamPublisher to communicate over those
4+
protocols

lib/resque/plugins/clues/event_publisher.rb

+15-17
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,10 @@ def initialize(stream)
2424
@stream = stream
2525
end
2626

27-
EVENT_TYPES.each do |event_type|
28-
define_method(event_type) do |timestamp, queue, metadata, klass, *args|
29-
event = CLUES.event_marshaller.call(event_type, timestamp, queue, metadata, klass, args)
30-
stream.write(event)
31-
end
27+
# Publishes an event to the stream.
28+
def publish(event_type, timestamp, queue, metadata, klass, *args)
29+
event = CLUES.event_marshaller.call(event_type, timestamp, queue, metadata, klass, args)
30+
stream.write(event)
3231
end
3332
end
3433

@@ -58,11 +57,10 @@ def initialize(log_path, formatter=nil)
5857
@logger.formatter = formatter || lambda {|severity, time, program, msg| msg}
5958
end
6059

61-
EVENT_TYPES.each do |event_type|
62-
define_method(event_type) do |timestamp, queue, metadata, klass, *args|
63-
logger.info(CLUES.event_marshaller.call(
64-
event_type, timestamp, queue, metadata, klass, args))
65-
end
60+
# Publishes an event to the log.
61+
def publish(event_type, timestamp, queue, metadata, klass, *args)
62+
logger.info(CLUES.event_marshaller.call(
63+
event_type, timestamp, queue, metadata, klass, args))
6664
end
6765
end
6866

@@ -74,13 +72,13 @@ def initialize
7472
super([])
7573
end
7674

77-
EVENT_TYPES.each do |event_type|
78-
define_method(event_type) do |timestamp, queue, metadata, klass, *args|
79-
each do |child|
80-
child.send(
81-
event_type, timestamp, queue, metadata, klass, *args
82-
) rescue error(event_type, child)
83-
end
75+
# Invokes publish on each child publisher for them to publish the event
76+
# in their own way.
77+
def publish(event_type, timestamp, queue, metadata, klass, *args)
78+
each do |child|
79+
child.publish(
80+
event_type, timestamp, queue, metadata, klass, *args
81+
) rescue error(event_type, child)
8482
end
8583
end
8684

lib/resque/plugins/clues/job_extension.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ def self.define_perform(klass) # :doc:
2929
klass.send(:define_method, :perform) do
3030
return _base_perform unless CLUES.configured?
3131
item = CLUES.prepare(payload)
32-
CLUES.event_publisher.perform_started(CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
32+
CLUES.event_publisher.publish(:perform_started, CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
3333
@perform_started = Time.now
3434
_base_perform.tap do
3535
item['clues_metadata']['time_to_perform'] = CLUES.time_delta_since(@perform_started)
36-
CLUES.event_publisher.perform_finished(CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
36+
CLUES.event_publisher.publish(:perform_finished, CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
3737
end
3838
end
3939
end
@@ -53,7 +53,7 @@ def self.define_failed(klass) # :doc:
5353
item['clues_metadata']['exception'] = exception.class
5454
item['clues_metadata']['message'] = exception.message
5555
item['clues_metadata']['backtrace'] = exception.backtrace
56-
CLUES.event_publisher.failed(CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
56+
CLUES.event_publisher.publish(:failed, CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
5757
end
5858
end
5959
end

lib/resque/plugins/clues/queue_extension.rb

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def push(queue, orig)
4343
if Resque::Plugins::Clues.item_preprocessor
4444
Resque::Plugins::Clues.item_preprocessor.call(queue, item)
4545
end
46-
CLUES.event_publisher.enqueued(CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
46+
CLUES.event_publisher.publish(:enqueued, CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
4747
_base_push(queue, item)
4848
end
4949

@@ -58,7 +58,7 @@ def pop(queue)
5858
return orig unless CLUES.clues_configured?
5959
item = CLUES.prepare(orig) do |item|
6060
item['clues_metadata']['time_in_queue'] = CLUES.time_delta_since(item['clues_metadata']['enqueued_time'])
61-
CLUES.event_publisher.dequeued(CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
61+
CLUES.event_publisher.publish(:dequeued, CLUES.now, queue, item['clues_metadata'], item['class'], *item['args'])
6262
end
6363
end
6464
end

spec/event_publisher_spec.rb

+3-3
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
end
1313

1414
def publish_event_type(type)
15-
@publisher.send(type, @current_time, :test_queue, {}, "FooBar", "a", "b")
15+
@publisher.publish(type, @current_time, :test_queue, {}, "FooBar", "a", "b")
1616
end
1717

1818
describe Resque::Plugins::Clues::StreamPublisher do
@@ -145,8 +145,8 @@ def last_event
145145

146146
def verify_event_delegated_to_children(event_type)
147147
@publisher.each do |child|
148-
child.should_receive(event_type).with(
149-
@current_time, :test_queue, {}, "FooBar", "a", "b")
148+
child.should_receive(:publish).with(
149+
event_type, @current_time, :test_queue, {}, "FooBar", "a", "b")
150150
end
151151
end
152152

spec/job_extension_spec.rb

+25-16
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
require 'spec_helper'
2+
require 'set'
23

34
describe Resque::Plugins::Clues::JobExtension do
45
def base_item(overrides={})
@@ -31,40 +32,48 @@ def base_item(overrides={})
3132
end
3233

3334
context "with clues configured" do
34-
def publishes(evt_type)
35-
Resque::Plugins::Clues.event_publisher.should_receive(evt_type)
36-
Resque::Plugins::Clues.event_publisher.stub(evt_type) do |time, queue, metadata, klass, *args|
35+
def publishes(opts={})
36+
opts.keys.each{|key| @events_not_received << key}
37+
@events_not_received.size.should_not == 0
38+
Resque::Plugins::Clues.event_publisher.should_receive(:publish).at_least(:once)
39+
Resque::Plugins::Clues.event_publisher.stub(:publish) do |type, time, queue, metadata, klass, *args|
40+
@events_not_received.delete(type)
3741
time.nil?.should == false
3842
queue.should == :test_queue
3943
klass.should == 'TestWorker'
4044
args.should == [1,2]
4145
metadata['hostname'].should == `hostname`.strip
4246
metadata['process'].should == $$
43-
yield(metadata) if block_given?
47+
opts[type].call(metadata) if opts[type]
4448
end
4549
end
4650

4751
before do
4852
Resque::Plugins::Clues.event_publisher = Resque::Plugins::Clues::StandardOutPublisher.new
53+
@events_not_received = Set.new([])
54+
end
55+
56+
after do
57+
@events_not_received.size.should == 0
4958
end
5059

5160
describe "#perform" do
5261
it "should publish a perform_started event" do
53-
publishes(:perform_started)
62+
publishes perform_started: nil
5463
@job.perform
5564
end
5665

5766
it "should publish a perform_finished event that includes the time_to_perform" do
58-
publishes(:perform_finished) do |metadata|
67+
publishes(perform_finished: lambda do |metadata|
5968
metadata['time_to_perform'].nil?.should == false
60-
end
69+
end)
6170
@job.perform
6271
end
6372
end
6473

6574
describe "#fail" do
6675
it "should publish a perform_failed event" do
67-
publishes(:failed)
76+
publishes failed: nil
6877
@job.fail(Exception.new)
6978
end
7079

@@ -75,33 +84,33 @@ def publishes(evt_type)
7584

7685
context "includes metadata in the perform_failed event that should" do
7786
it "should include the time_to_perform" do
78-
publishes(:failed) do |metadata|
87+
publishes(failed: lambda do |metadata|
7988
metadata['time_to_perform'].nil?.should == false
80-
end
89+
end)
8190
@job.fail(Exception.new)
8291
end
8392

8493
it "should include the exception class" do
85-
publishes(:failed) do |metadata|
94+
publishes(failed: lambda do |metadata|
8695
metadata['exception'].should == Exception
87-
end
96+
end)
8897
@job.fail(Exception.new)
8998
end
9099

91100
it "should include the exception message" do
92-
publishes(:failed) do |metadata|
101+
publishes(failed: lambda do |metadata|
93102
metadata['message'].should == 'test'
94-
end
103+
end)
95104
@job.fail(Exception.new('test'))
96105
end
97106

98107
it "should include the exception backtrace" do
99108
begin
100109
raise 'test'
101110
rescue => e
102-
publishes(:failed) do |metadata|
111+
publishes(failed: lambda do |metadata|
103112
metadata['backtrace'].nil?.should == false
104-
end
113+
end)
105114
@job.fail(e)
106115
end
107116
end

0 commit comments

Comments
 (0)