Skip to content

Commit 607df33

Browse files
authored
Fix MQTT reconnect during proxy attach (#1078)
1 parent badb23d commit 607df33

File tree

1 file changed

+2
-6
lines changed

1 file changed

+2
-6
lines changed

pubber/src/main/java/udmi/lib/base/MqttPublisher.java

+2-6
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public class MqttPublisher implements Publisher {
9494
private static final Map<String, AtomicInteger> EVENT_SERIAL = new HashMap<>();
9595
private static final String GCP_CLIENT_PREFIX = "projects/";
9696
private static final Integer DEFAULT_MQTT_PORT = 8883;
97-
private static final long ATTACH_DELAY_MS = 1000;
97+
private static final long RETRY_DELAY_MS = 1000;
9898
private static final String LOCAL_MQTT_PREFIX = "/r/";
9999

100100
private final Semaphore connectionLock = new Semaphore(1);
@@ -205,7 +205,7 @@ private void publishCore(String deviceId, String topicSuffix, Object data, Runna
205205
debug(format("Sending message to %s", sendTopic));
206206
if (!sendMessage(deviceId, sendTopic, payload.getBytes())) {
207207
debug(format("Queue message for retry %s %s", topicSuffix, deviceId));
208-
safeSleep(ATTACH_DELAY_MS);
208+
safeSleep(RETRY_DELAY_MS);
209209
if (isActive()) {
210210
publisherExecutor.submit(() -> publishCore(deviceId, topicSuffix, data, callback));
211211
}
@@ -321,20 +321,16 @@ private MqttClient newProxyClient(String deviceId) {
321321
String gatewayId = getGatewayId();
322322
info(format("Connecting device %s through gateway %s", deviceId, gatewayId));
323323
final MqttClient mqttClient = getConnectedClient(gatewayId);
324-
long timeToWait = mqttClient.getTimeToWait();
325324
try {
326325
startupLatchWait(connectionLatch, "gateway startup exchange");
327326
String topic = getMessageTopic(deviceId, MqttDevice.ATTACH_TOPIC);
328327
info(format("Publishing attach message %s", topic));
329328
byte[] mqttMessage = EMPTY_STRING.getBytes(StandardCharsets.UTF_8);
330-
mqttClient.setTimeToWait(ATTACH_DELAY_MS);
331329
mqttClientPublish(mqttClient, topic, mqttMessage);
332330
subscribeToUpdates(mqttClient, deviceId);
333331
return mqttClient;
334332
} catch (Exception e) {
335333
throw new RuntimeException(format("While binding client %s", deviceId), e);
336-
} finally {
337-
mqttClient.setTimeToWait(timeToWait);
338334
}
339335
}
340336

0 commit comments

Comments
 (0)