Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connection refused for MQTT Kafka broker after setting up TLS on the Kafka client #1389

Closed
AlexisSouquiere opened this issue Jan 30, 2025 · 6 comments
Labels
bug Something isn't working

Comments

@AlexisSouquiere
Copy link

Describe the bug
I'm unable to publish messages to a MQTT Kafka broker when using TLS and SASL for the Kafka client connection. The connection with the Kafka cluster seems to work because messages are produced in the session topic and zilla can read messages in the topics.

Tests I did:

  • Full docker setup using the docker-compose.yaml file from your doc: OK
  • Existing Kafka cluster without TLS and no authentication + the Docker MQTT Kafka broker: OK
  • Existing Kafka cluster with TLS and SASL/SCRAM-SHA-512 + the Docker MQTT Kafka broker: partially KO

In the last setup:

  • mosquitto_pub generates
docker run -it --rm eclipse-mosquitto mosquitto_pub --url mqtt://host.docker.internal:7183/zilla --message 'Hello, world' --debug --id "client"
Client test sending CONNECT
Client test received CONNACK (142)
Connection error: Connection Refused: unknown reason.
Error: A network protocol error occurred when communicating with the broker.
  • BUT I can see that messages are produced in the mqtt-sessions topic
  • AND Zilla can read messages from the mqtt-messages topics when I produce some messages manually

Here is an extract of the logs that shows the successful authentication and session message producing

[0x01010000000000c0] SASL HANDSHAKE scram-sha-512
[0x01010000000000c0] SASL AUTHENTICATE admin
[1738250115049] [22] [72339069014638783] kafka client [mqtt-sessions[0] 0 + 65024 => 65024
[1738250115049] [22] [72339069014638783] kafka cache server fan [0 mqtt-sessions] 0 + 65024 => 65024
[1738250115050] [22] [72339069014638781] kafka cache server [[MQTT-intro.south_kafka_cache_server] mqtt-sessions[0]] 0 + 0 => 0
[1738250115050] [22] [72339069014638777] kafka cache client [[MQTT-intro.south_kafka_cache_server] mqtt-sessions[0]] 0 + 65536 => 65536
zilla:MQTT-intro-north_mqtt_kafka_mapping-test-session GroupId connect
zilla:MQTT-intro-north_mqtt_kafka_mapping-test-session GroupId connect
[1738250115051] [22] [72339069014638777] kafka cache client [[MQTT-intro.south_kafka_cache_server] mqtt-sessions[0]] 65536 - 0 => 65536
[1738250115051] [22] [72339069014638783] kafka cache server fan [0 mqtt-sessions] 65024 - 512 => 64512
[1738250115052] [22] [72339069014638783] kafka client [mqtt-sessions[0] 65024 - 512 => 64512
[client] mqtt-sessions[0] PRODUCE
[1738250115056] [22] [72339069014638785] kafka client [mqtt-sessions[0] flushableRequestBytes 0
[client] 102 DESCRIBE
[client] [0x010100000000004a] mqtt-sessions[0] FETCH RecordSet 134
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record Set Bytes 134
[client] [0x010100000000004a] mqtt-sessions[0] FETCH RecordBatch 152 0 122
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record Set Bytes 73
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record length 71
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record 152
[client] [0x010100000000004a] mqtt-sessions[0] FETCH Record Set Bytes 0

....

[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordSet 80
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 80
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordBatch 5 0 68
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 19
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record length 18
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record 5
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 0
[0x0101000000000042] mqtt-messages[0] FETCH 6
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordSet 157
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 157
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordBatch 6 0 67
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 96
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record length 17
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record 6
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 78
[client] [0x0101000000000042] mqtt-messages[0] FETCH RecordBatch 7 0 66
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 17
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record length 16
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record 7
[client] [0x0101000000000042] mqtt-messages[0] FETCH Record Set Bytes 0
[0x0101000000000042] mqtt-messages[0] FETCH 8
[client] [0x0101000000000052] mqtt-retained[0] FETCH RecordSet 0
[client] [0x0101000000000052] mqtt-retained[0] FETCH Record Set Bytes 0
[client] [0x0101000000000052] mqtt-retained[0] FETCH Record Set Bytes 0

To Reproduce
Here is the zilla.yaml file I used:

name: MQTT-intro
vaults:
  client_vault:
    type: filesystem
    options:
      trust:
        store: /tmp/truststore.p12
        type: pkcs12
bindings:
  # Proxy service entrypoint
  north_tcp_server:
    type: tcp
    kind: server
    options:
      host: 0.0.0.0
      port: 7183
    exit: north_mqtt_server

  # MQTT Broker With an exit to Kafka
  north_mqtt_server:
    type: mqtt
    kind: server
    exit: north_mqtt_kafka_mapping

  # Proxy MQTT messages to Kafka
  north_mqtt_kafka_mapping:
    type: mqtt-kafka
    kind: proxy
    options:
      topics:
        sessions: mqtt-sessions
        messages: mqtt-messages
        retained: mqtt-retained
    exit: north_kafka_cache_client

  # Kafka sync layer
  north_kafka_cache_client:
    type: kafka
    kind: cache_client
    exit: south_kafka_cache_server
  south_kafka_cache_server:
    type: kafka
    kind: cache_server
    options:
      bootstrap:
        - mqtt-messages
        - mqtt-sessions
        - mqtt-retained
    exit: south_kafka_client

  # Connect to Kafka
  south_kafka_client:
    type: kafka
    kind: client
    options:
      servers:
        - my_kakfa_broker1:9092
        - my_kakfa_broker2:9092
        - my_kakfa_broker3:9092
      sasl:
        mechanism: scram-sha-512
        username: "user"
        password: "password"
    exit: south_tls_client
  south_tls_client:
    type: tls
    kind: client
    vault: client_vault
    options:
      trust:
        - cert_alias
    exit: south_tcp_client
  south_tcp_client:
    type: tcp
    kind: client

telemetry:
  exporters:
    stdout_logs_exporter:
      type: stdout

Expected behavior
Like with the docker-compose setup, being able to produce a message with mosquitto_pub and to see in the mqtt_messages topic

Zilla Environment:
Zilla 0.9.122
Start command: start -v -e

Kafka Environment:

  • Provider: Confluent (Community) on-premise
  • Version: 7.6 (Kafka 3.6)
  • Config: 3 topics (mqtt-messages: 1 partition, delete - mqtt-retained: 1 partition, compacted - mqtt-sessions: 1 partition, compacted)

Client Environment:
Mosquitto 2.0.20

Additional context
We were looking for a way to add more logs to be able to debug easily. I found the -Dzilla.binding.kafka.debug=trueargument that is helpful but is there anything else that we could use ?

@AlexisSouquiere AlexisSouquiere added the bug Something isn't working label Jan 30, 2025
@jfallows
Copy link
Contributor

jfallows commented Jan 30, 2025

@AlexisSouquiere thanks for filing this issue, we are here to help.

Detailed trace level logs can be obtained by using the zilla dump command to produce a packet capture of the zilla internal streams for analysis in Wireshark using our Lua dissector.

Please follow the steps in our documentation to capture a dump and then attach the .pcap to this issue so we can help analyze what is happening in your environment with the existing Kafka cluster.

@AlexisSouquiere
Copy link
Author

AlexisSouquiere commented Jan 31, 2025

Thank you for the quick answer. Here is the pcap file (I changed the extension to be allowed to upload the file)

traces.txt

@bmaidics
Copy link
Contributor

Hi @AlexisSouquiere ,
I managed to configure Zilla with Kafka that uses SASL/SSL. Please take a look at my repository: https://github.com/bmaidics/zilla-examples/tree/mqtt_sasl_ssl/mqtt.kafka.broker.sasl
You can test it by:

  • run setup.sh
  • run mosquitto_sub -V '5' --url mqtt://localhost/sensors/# -p 7183 -d
  • run mosquitto_pub --url mqtt://localhost:7183/sensors/1 --message 'Hello, world'
  • the message should arrive at the subscriber

In the meantime, I'm analyzing your PCAP to see why it's behaving differently than in my setup.

@AlexisSouquiere
Copy link
Author

I will check your setup and see if I can make it works. Will give you an update quickly. Thank you !

@AlexisSouquiere
Copy link
Author

I managed to make it works after installing zilla on a VM close to the Kafka cluster (same VLAN). I don't know yet what is causing the issue when I install it on my laptop but I can confirm that zilla and a kafka cluster with SASL/SSL works fine.
Thank you for the quick support, I can continue testing zilla !

@bmaidics
Copy link
Contributor

bmaidics commented Feb 4, 2025

Sure thing @AlexisSouquiere ,
Let us know if you face any more issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants