Skip to content

Commit 54117b4

Browse files
authored
protobuf validation failure fix (#1292)
1 parent 1f53f03 commit 54117b4

File tree

3 files changed

+13
-4
lines changed

3 files changed

+13
-4
lines changed

runtime/binding-kafka/src/main/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/KafkaCacheClientProduceFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -760,7 +760,7 @@ private void onClientInitialData(
760760
flushClientFanInitialIfNecessary(traceId);
761761
}
762762

763-
if ((flags & FLAGS_INCOMPLETE) != 0x00)
763+
if ((flags & FLAGS_INCOMPLETE) != 0x00 || error == ERROR_INVALID_RECORD)
764764
{
765765
markEntryDirty(traceId, stream.partitionOffset);
766766
}

runtime/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufReadConverterHandler.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,6 @@ private int validate(
188188
try
189189
{
190190
DynamicMessage message = builder.mergeFrom(in).build();
191-
builder.clear();
192191
if (!message.getUnknownFields().asMap().isEmpty())
193192
{
194193
break validate;
@@ -215,6 +214,10 @@ private int validate(
215214
{
216215
event.validationFailure(traceId, bindingId, ex.getMessage());
217216
}
217+
finally
218+
{
219+
builder.clear();
220+
}
218221
}
219222
}
220223
return valLength;

runtime/model-protobuf/src/main/java/io/aklivity/zilla/runtime/model/protobuf/internal/ProtobufWriteConverterHandler.java

+8-2
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,16 @@ private boolean validate(
111111
try
112112
{
113113
DynamicMessage message = builder.mergeFrom(in).build();
114-
builder.clear();
115114
status = message.getUnknownFields().asMap().isEmpty();
116115
}
117116
catch (IOException ex)
118117
{
119118
event.validationFailure(traceId, bindingId, ex.getMessage());
120119
}
120+
finally
121+
{
122+
builder.clear();
123+
}
121124
}
122125
}
123126
return status;
@@ -175,7 +178,6 @@ private int serializeJsonRecord(
175178
{
176179
parser.merge(input, builder);
177180
DynamicMessage message = builder.build();
178-
builder.clear();
179181
if (message.isInitialized() && message.getUnknownFields().asMap().isEmpty())
180182
{
181183
out.wrap(out.buffer());
@@ -187,6 +189,10 @@ private int serializeJsonRecord(
187189
{
188190
event.validationFailure(traceId, bindingId, ex.getMessage());
189191
}
192+
finally
193+
{
194+
builder.clear();
195+
}
190196
}
191197
}
192198
return valLength;

0 commit comments

Comments
 (0)