Skip to content

Commit 5f0654b

Browse files
authored
Set destination ID for DELAYED_DELIVERY producer
1 parent 9a16514 commit 5f0654b

File tree

3 files changed

+30
-21
lines changed

3 files changed

+30
-21
lines changed

broker-core/src/main/java/org/apache/qpid/server/model/Producer.java

+25-1
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,37 @@
2323
import java.util.UUID;
2424

2525
import com.google.common.util.concurrent.ListenableFuture;
26+
import org.apache.qpid.server.message.MessageDestination;
2627

2728
@ManagedObject(creatable = false, amqpName = "org.apache.qpid.Producer")
2829
public interface Producer<X extends Producer<X>> extends ConfiguredObject<X>
2930
{
3031
enum DeliveryType { DELAYED_DELIVERY, STANDARD_DELIVERY }
3132

32-
enum DestinationType { EXCHANGE, QUEUE }
33+
enum DestinationType
34+
{
35+
EXCHANGE,
36+
QUEUE;
37+
38+
public static DestinationType from(MessageDestination messageDestination)
39+
{
40+
if (messageDestination instanceof Exchange)
41+
{
42+
return EXCHANGE;
43+
}
44+
else if (messageDestination instanceof Queue)
45+
{
46+
return QUEUE;
47+
}
48+
return null;
49+
}
50+
51+
public static UUID getId(MessageDestination messageDestination)
52+
{
53+
final DestinationType destinationType = from(messageDestination);
54+
return destinationType == null ? null : ((ConfiguredObject<?>) messageDestination).getId();
55+
}
56+
}
3357

3458
void registerMessageDelivered(long messageSize);
3559

broker-core/src/main/java/org/apache/qpid/server/model/ProducerImpl.java

+3-12
Original file line numberDiff line numberDiff line change
@@ -67,25 +67,16 @@ public ProducerImpl(final AbstractAMQPSession<?, ?> session,
6767
_sessionName = session.getName();
6868
_principal = session.getAMQPConnection().getPrincipal();
6969
_remoteAddress = session.getAMQPConnection().getRemoteAddress();
70-
_destination = messageDestination == null ? null : messageDestination.getName();
7170
if (messageDestination == null)
7271
{
7372
_deliveryType = DeliveryType.DELAYED_DELIVERY;
74-
_destinationType = null;
7573
}
7674
else
7775
{
7876
_deliveryType = DeliveryType.STANDARD_DELIVERY;
79-
if (messageDestination instanceof Exchange)
80-
{
81-
_destinationId = ((Exchange<?>) messageDestination).getId();
82-
_destinationType = DestinationType.EXCHANGE;
83-
}
84-
else if (messageDestination instanceof Queue)
85-
{
86-
_destinationId = ((Queue<?>) messageDestination).getId();
87-
_destinationType = DestinationType.QUEUE;
88-
}
77+
_destination = messageDestination.getName();
78+
_destinationType = DestinationType.from(messageDestination);
79+
_destinationId = DestinationType.getId(messageDestination);
8980
}
9081

9182
registerWithParents();

broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/StandardReceivingLinkEndpoint.java

+2-8
Original file line numberDiff line numberDiff line change
@@ -296,14 +296,8 @@ public void onRollback()
296296
{
297297
final MessageDestination messageDestination = getAddressSpace()
298298
.getAttainedMessageDestination(serverMessage.getTo(), false);
299-
if (messageDestination != null)
300-
{
301-
final Producer.DestinationType destinationType =
302-
messageDestination instanceof Exchange
303-
? Producer.DestinationType.EXCHANGE
304-
: Producer.DestinationType.QUEUE;
305-
_producer.setDestinationType(destinationType);
306-
}
299+
_producer.setDestinationType(Producer.DestinationType.from(messageDestination));
300+
_producer.setDestinationId(Producer.DestinationType.getId(messageDestination));
307301
_producer.setDestination(to);
308302
}
309303
}

0 commit comments

Comments
 (0)