Skip to content

Commit 526512e

Browse files
authoredOct 18, 2024
Remove entity related fields on top-level message pack map (fluent#24)
1 parent dc025f8 commit 526512e

File tree

1 file changed

+25
-32
lines changed

1 file changed

+25
-32
lines changed
 

‎plugins/out_cloudwatch_logs/cloudwatch_api.c

+25-32
Original file line numberDiff line numberDiff line change
@@ -551,13 +551,14 @@ void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key,
551551
msgpack_object_map root = root_map->via.map;
552552

553553
// Prepare to pack the modified root map (size may be unchanged or reduced)
554-
msgpack_pack_map(pk, root.size-root_filtered_fields); // Assume no top-level key is removed
554+
msgpack_pack_map(pk, root.size-root_filtered_fields);
555555

556556
for (uint32_t i = 0; i < root.size; i++) {
557557
msgpack_object_kv root_kv = root.ptr[i];
558558

559559
// Check if this key matches the nested map key (e.g., "kubernetes")
560-
if (root_kv.key.type == MSGPACK_OBJECT_STR &&
560+
if (filtered_fields > 0 &&
561+
root_kv.key.type == MSGPACK_OBJECT_STR &&
561562
strncmp(root_kv.key.via.str.ptr, nested_map_key, root_kv.key.via.str.size) == 0 &&
562563
root_kv.val.type == MSGPACK_OBJECT_MAP) {
563564

@@ -566,7 +567,7 @@ void remove_unneeded_field(msgpack_object *root_map, const char *nested_map_key,
566567

567568
// Remove the unneeded key from the nested map
568569
remove_key_from_nested_map(&root_kv.val.via.map, pk,filtered_fields);
569-
} else if (root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) {
570+
} else if (root_filtered_fields > 0 && root_kv.key.type == MSGPACK_OBJECT_STR && root_kv.key.via.str.size > AWS_ENTITY_PREFIX_LEN && strncmp(root_kv.key.via.str.ptr, AWS_ENTITY_PREFIX, AWS_ENTITY_PREFIX_LEN) == 0) {
570571
} else {
571572
// Pack other key-value pairs unchanged
572573
msgpack_pack_object(pk, root_kv.key);
@@ -598,35 +599,9 @@ int process_event(struct flb_cloudwatch *ctx, struct cw_flush *buf,
598599
char *tmp_buf_ptr;
599600

600601
tmp_buf_ptr = buf->tmp_buf + buf->tmp_buf_offset;
601-
602-
if (ctx->add_entity && buf->current_stream->entity && buf->current_stream->entity->filter_count > 0) {
603-
// Prepare a buffer to pack the modified map
604-
msgpack_sbuffer sbuf;
605-
msgpack_sbuffer_init(&sbuf);
606-
msgpack_packer pk;
607-
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
608-
remove_unneeded_field(obj, "kubernetes",&pk,buf->current_stream->entity->root_filter_count, buf->current_stream->entity->filter_count);
609-
610-
// Now, unpack the modified data into a new msgpack_object
611-
msgpack_unpacked modified_unpacked;
612-
msgpack_unpacked_init(&modified_unpacked);
613-
msgpack_object modified_obj;
614-
size_t modified_offset = 0;
615-
if (msgpack_unpack_next(&modified_unpacked, sbuf.data, sbuf.size, &modified_offset)) {
616-
modified_obj = modified_unpacked.data;
617-
}
618-
619-
ret = flb_msgpack_to_json(tmp_buf_ptr,
620-
buf->tmp_buf_size - buf->tmp_buf_offset,
621-
&modified_obj);
622-
623-
msgpack_sbuffer_destroy(&sbuf);
624-
msgpack_unpacked_destroy(&modified_unpacked);
625-
} else {
626-
ret = flb_msgpack_to_json(tmp_buf_ptr,
627-
buf->tmp_buf_size - buf->tmp_buf_offset,
628-
obj);
629-
}
602+
ret = flb_msgpack_to_json(tmp_buf_ptr,
603+
buf->tmp_buf_size - buf->tmp_buf_offset,
604+
obj);
630605
if (ret <= 0) {
631606
/*
632607
* failure to write to buffer,
@@ -1239,6 +1214,24 @@ int process_and_send(struct flb_cloudwatch *ctx, const char *input_plugin,
12391214
}
12401215
if(ctx->kubernete_metadata_enabled && ctx->add_entity) {
12411216
update_or_create_entity(ctx,stream,map);
1217+
// Prepare a buffer to pack the modified map
1218+
if(stream->entity != NULL && (stream->entity->root_filter_count > 0 || stream->entity->filter_count > 0)) {
1219+
msgpack_sbuffer sbuf;
1220+
msgpack_sbuffer_init(&sbuf);
1221+
msgpack_packer pk;
1222+
msgpack_packer_init(&pk, &sbuf, msgpack_sbuffer_write);
1223+
remove_unneeded_field(&map, "kubernetes",&pk,stream->entity->root_filter_count, stream->entity->filter_count);
1224+
1225+
// Now, unpack the modified data into a new msgpack_object
1226+
msgpack_unpacked modified_unpacked;
1227+
msgpack_unpacked_init(&modified_unpacked);
1228+
size_t modified_offset = 0;
1229+
if (msgpack_unpack_next(&modified_unpacked, sbuf.data, sbuf.size, &modified_offset)) {
1230+
map = modified_unpacked.data;
1231+
}
1232+
msgpack_sbuffer_destroy(&sbuf);
1233+
msgpack_unpacked_destroy(&modified_unpacked);
1234+
}
12421235
}
12431236

12441237
if (ctx->log_key) {

0 commit comments

Comments
 (0)