-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathbasic_cancel_spec.rb
124 lines (99 loc) · 2.89 KB
/
basic_cancel_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
RSpec.describe 'A consumer' do
let(:connection) { MarchHare.connect }
after :each do
connection.close
end
context "that does not block the caller" do
it 'receives messages until cancelled' do
x = connection.create_channel.default_exchange
q = connection.create_channel.queue("", :exclusive => true)
messages = []
consumer_exited = false
consumer = nil
consumer_thread = Thread.new do
consumer = q.subscribe do |headers, message|
messages << message
sleep 0.1
end
consumer_exited = true
end
publisher_thread = Thread.new do
20.times do
x.publish('hello world', :routing_key => q.name)
end
end
sleep 0.2
consumer.cancel
consumer_thread.join
publisher_thread.join
expect(messages).not_to be_empty
expect(consumer_exited).to eq(true)
end
end
context "that DOES block the caller" do
it 'receives messages until cancelled' do
x = connection.create_channel.default_exchange
q = connection.create_channel.queue("", :exclusive => true)
messages = []
consumer_exited = false
consumer = nil
consumer_thread = Thread.new do
consumer = q.build_consumer do |headers, message|
messages << message
sleep 0.1
end
q.subscribe_with(consumer, :block => true)
consumer_exited = true
end
publisher_thread = Thread.new do
20.times do
x.publish('hello world', :routing_key => q.name)
end
end
sleep 0.5
consumer.cancel
consumer_thread.join
publisher_thread.join
expect(messages).not_to be_empty
expect(consumer_exited).to eq(true)
end
end
context "that DOES block the caller and never receives any messages" do
it 'can be cancelled' do
q = connection.create_channel.queue("", :exclusive => true)
consumer_exited = false
consumer = nil
consumer_thread = Thread.new do
co = q.build_consumer(:block => true) do |headers, message|
messages << message
sleep 0.1
end
consumer = co
q.subscribe_with(co, :block => true)
consumer_exited = true
end
sleep 1.0
consumer.cancel
consumer_thread.join
expect(consumer_exited).to eq(true)
end
end
context 'that is cancelled' do
it 'will not raise errors when cancelled again' do
queue = connection.create_channel.queue('')
consumer = queue.build_consumer(:block => true) { |headers, message| }
thread = Thread.new do
queue.subscribe_with(consumer, :block => true)
end
sleep 1
begin
consumer.cancel
consumer.cancel
rescue NativeException => e
raise e
ensure
thread.join
end
end
end
end