@@ -18,21 +18,19 @@ public class MessagingService : IMessagingService
18
18
protected IConnection CurrentConnection => _sharedConnection . CurrentConnection ;
19
19
20
20
private readonly SharedConnection _sharedConnection ;
21
- private readonly IOptions < MerQureConfiguration > _merQureConfiguration ;
22
21
23
22
public MessagingService ( IOptions < MerQureConfiguration > merQureConfiguration , SharedConnection sharedConnection )
24
23
{
25
- _merQureConfiguration = merQureConfiguration ;
26
24
_sharedConnection = sharedConnection ;
27
25
28
- if ( _merQureConfiguration == null )
26
+ if ( merQureConfiguration == null )
29
27
{
30
28
return ;
31
29
}
32
- Durable = _merQureConfiguration . Value . Durable ;
33
- AutoDeleteQueue = _merQureConfiguration . Value . AutoDeleteQueue ;
34
- DefaultPrefetchCount = _merQureConfiguration . Value . DefaultPrefetchCount ;
35
- PublisherAcknowledgementsTimeoutInMilliseconds = _merQureConfiguration . Value . PublisherAcknowledgementsTimeoutInMilliseconds ;
30
+ Durable = merQureConfiguration . Value . Durable ;
31
+ AutoDeleteQueue = merQureConfiguration . Value . AutoDeleteQueue ;
32
+ DefaultPrefetchCount = merQureConfiguration . Value . DefaultPrefetchCount ;
33
+ PublisherAcknowledgementsTimeoutInMilliseconds = merQureConfiguration . Value . PublisherAcknowledgementsTimeoutInMilliseconds ;
36
34
}
37
35
38
36
public void DeclareExchange ( string exchangeName )
@@ -50,20 +48,20 @@ public void DeclareExchange(string exchangeName, string exchangeType)
50
48
}
51
49
}
52
50
53
- public void DeclareQueue ( string queueName , byte maxPriority )
51
+ public string DeclareQueue ( string queueName , byte maxPriority , bool isQuorum )
54
52
{
55
53
var queueArgs = new Dictionary < string , object > {
56
54
{ Constants . QueueMaxPriority , maxPriority }
57
55
} ;
58
- DeclareQueue ( queueName , queueArgs ) ;
56
+ return DeclareQueue ( queueName , queueArgs , isQuorum ) ;
59
57
}
60
58
61
- public void DeclareQueue ( string queueName )
59
+ public string DeclareQueue ( string queueName , bool isQuorum )
62
60
{
63
- DeclareQueue ( queueName , new Dictionary < string , object > ( ) ) ;
61
+ return DeclareQueue ( queueName , new Dictionary < string , object > ( ) , isQuorum ) ;
64
62
}
65
63
66
- public void DeclareQueueWithDeadLetterPolicy ( string queueName , string deadLetterExchange , int messageTimeToLive , string deadLetterRoutingKey )
64
+ public string DeclareQueueWithDeadLetterPolicy ( string queueName , string deadLetterExchange , int messageTimeToLive , string deadLetterRoutingKey , bool isQuorum )
67
65
{
68
66
if ( string . IsNullOrWhiteSpace ( deadLetterExchange ) ) throw new ArgumentNullException ( nameof ( deadLetterExchange ) ) ;
69
67
if ( messageTimeToLive <= 0 ) throw new ArgumentOutOfRangeException ( nameof ( messageTimeToLive ) ) ;
@@ -76,18 +74,41 @@ public void DeclareQueueWithDeadLetterPolicy(string queueName, string deadLetter
76
74
{
77
75
queueArgs . Add ( Constants . QueueDeadLetterRoutingKey , deadLetterRoutingKey ) ;
78
76
}
79
- DeclareQueue ( queueName , queueArgs ) ;
77
+ return DeclareQueue ( queueName , queueArgs , isQuorum ) ;
80
78
}
79
+ private const string QuorumQueueNameSuffix = "-q" ;
81
80
82
- public void DeclareQueue ( string queueName , Dictionary < string , object > queueArgs )
81
+ public string DeclareQueue ( string queueName , Dictionary < string , object > queueArgs , bool isQuorum )
83
82
{
84
83
if ( string . IsNullOrWhiteSpace ( queueName ) ) throw new ArgumentNullException ( nameof ( queueName ) ) ;
85
84
if ( queueArgs == null ) throw new ArgumentNullException ( nameof ( queueArgs ) ) ;
86
85
86
+ Dictionary < string , object > effectiveQueueArgs ;
87
+ string effectiveQueueName ;
88
+ if ( isQuorum )
89
+ {
90
+ if ( ! Durable || AutoDeleteQueue )
91
+ {
92
+ throw new ArgumentException ( "Quorum queues must be durable and non-auto-delete" ) ;
93
+ }
94
+ effectiveQueueArgs = new Dictionary < string , object > ( queueArgs )
95
+ {
96
+ { Constants . HeaderQueueType , Constants . HeaderQueueTypeQuorumValue }
97
+ } ;
98
+ effectiveQueueName = $ "{ queueName } { QuorumQueueNameSuffix } ";
99
+ }
100
+ else
101
+ {
102
+ effectiveQueueArgs = queueArgs ;
103
+ effectiveQueueName = queueName ;
104
+ }
105
+
87
106
using ( var channel = CurrentConnection . CreateModel ( ) )
88
107
{
89
- channel . QueueDeclare ( queueName . ToLowerInvariant ( ) , this . Durable , false , this . AutoDeleteQueue , queueArgs ) ;
108
+ channel . QueueDeclare ( effectiveQueueName . ToLowerInvariant ( ) , this . Durable , false , this . AutoDeleteQueue , effectiveQueueArgs ) ;
90
109
}
110
+
111
+ return effectiveQueueName ;
91
112
}
92
113
93
114
public void DeclareBinding ( string exchangeName , string queueName , string routingKey )
0 commit comments