当zk出现session time out可能会导致instance出现多个消费者,从而出现乱序,数据丢失 #5270
+5
−1
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
问题现象:
kafka已经配置了max.in.flight.requests.per.connection = 1但仍发现canal deploy往kafka发送消息的时候会存在乱序现象,发生乱序的时候存在如下关键日志信息:

instance日志:
zk日志:

问题分析:
每个instance都会通过监听各自在zk的临时数据节点runningData来实现HA机制,当出现极端的网络波动或者假死比如频繁full gc等,就会导致zk和客户端的会话无法正常续期,从而出现会话超时导致临时节点删除;当故障恢复此时客户端会触发没有改数据节点事件,改事件会触发ServerRunningMonitor.initRunning


initRunning如果抢占runningData成功则会调用CanalMQStarter.startDestination,startDestination的逻辑是先stop再start,由于stop只是设置一个标识位,只有下次轮询才会退出,这时候立马start一个线程就会出现多线程进行get,commit,rollback,存在乱序,丢失数据的可能
同个instance出现多线程生产者问题案例:
乱序场景:
send的时候出现并行发送但是ack的时候确是有序进行,此时不会报错,依然满足按最小batchId进行ack
无法正常commit和rollback场景:
is not the firstly可能会导致无法ack进而触发事件buffer积压从而无法消费;无法rollback进而丢失数据
解决思路:
抢占成功以后如果mq生产者已经启动就没必要stop再start,跳过就行了。这里重启个人觉得没有啥特殊意义