Skip to content

Commit f49c9d4

Browse files
authoredFeb 17, 2025··
fix: MQTT subscribe routing (#1403)
1 parent 2ecc506 commit f49c9d4

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed
 

‎runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSubscribeFactory.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -337,12 +337,12 @@ private MqttSubscribeProxy(
337337
this.messages = new Long2ObjectHashMap<>();
338338
routes.forEach(r ->
339339
{
340-
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this);
340+
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(routedId, r, this);
341341
messages.put(r.order, messagesProxy);
342342
messagesPerTopicKey.put(messagesProxy.topicKey, r.order);
343343
});
344344
final MqttKafkaRouteConfig retainedRoute = routes.get(0);
345-
this.retained = new KafkaRetainedProxy(originId, retainedRoute.id, retainedRoute.retained, this);
345+
this.retained = new KafkaRetainedProxy(routedId, retainedRoute.id, retainedRoute.retained, this);
346346
}
347347

348348
private void onMqttMessage(
@@ -533,7 +533,7 @@ private void onFiltersChanged(
533533
final long routeOrder = r.order;
534534
if (!messages.containsKey(routeOrder))
535535
{
536-
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(originId, r, this);
536+
KafkaMessagesProxy messagesProxy = new KafkaMessagesProxy(routedId, r, this);
537537
messages.put(routeOrder, messagesProxy);
538538
messagesPerTopicKey.put(messagesProxy.topicKey, r.order);
539539
messagesProxy.doKafkaBegin(traceId, authorization, 0, filters);

0 commit comments

Comments
 (0)
Please sign in to comment.