Skip to content

Commit 971ba67

Browse files
vipwzw33cn
authored andcommitted
[[FIX]] close a closed queue
1 parent f5c88cf commit 971ba67

File tree

2 files changed

+12
-3
lines changed

2 files changed

+12
-3
lines changed

queue/client.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ func (client *client) WaitTimeout(msg *Message, timeout time.Duration) (*Message
145145
if msg.chReply == nil {
146146
return &Message{}, errors.New("empty wait channel")
147147
}
148-
148+
sub := client.q.chanSub(msg.Topic)
149149
var t <-chan time.Time
150150
if timeout > 0 {
151151
timer := time.NewTimer(timeout)
@@ -155,6 +155,8 @@ func (client *client) WaitTimeout(msg *Message, timeout time.Duration) (*Message
155155
select {
156156
case msg = <-msg.chReply:
157157
return msg, msg.Err()
158+
case <-sub.done:
159+
return nil, types.ErrChannelClosed
158160
case <-client.done:
159161
return &Message{}, ErrIsQueueClosed
160162
case <-t:

queue/queue.go

+9-2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ type chanSub struct {
5050
high chan *Message
5151
low chan *Message
5252
isClose int32
53+
done chan struct{}
5354
}
5455

5556
// Queue only one obj in project
@@ -183,6 +184,7 @@ func (q *queue) chanSub(topic string) *chanSub {
183184
high: make(chan *Message, defaultChanBuffer),
184185
low: make(chan *Message, defaultLowChanBuffer),
185186
isClose: 0,
187+
done: make(chan struct{}),
186188
}
187189
}
188190
return q.chanSubs[topic]
@@ -199,6 +201,7 @@ func (q *queue) closeTopic(topic string) {
199201
sub.high <- &Message{}
200202
sub.low <- &Message{}
201203
}
204+
close(sub.done)
202205
q.chanSubs[topic] = &chanSub{isClose: 1}
203206
}
204207

@@ -211,8 +214,12 @@ func (q *queue) send(msg *Message, timeout time.Duration) (err error) {
211214
return types.ErrChannelClosed
212215
}
213216
if timeout == -1 {
214-
sub.high <- msg
215-
return nil
217+
select {
218+
case sub.high <- msg:
219+
return nil
220+
case <-sub.done:
221+
return types.ErrChannelClosed
222+
}
216223
}
217224
defer func() {
218225
res := recover()

0 commit comments

Comments
 (0)