@@ -46,6 +46,17 @@ ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)
46
46
return cio_chunk_get_content_size (ic -> chunk );
47
47
}
48
48
49
+ /*
50
+ * When chunk is set to DOWN from memory, data_size is set to 0 and
51
+ * cio_chunk_get_content_size(1) returns the data_size. fs_chunks_size
52
+ * is used to track the size of chunks in filesystem so we need to call
53
+ * cio_chunk_get_real_size to return the original size in the file system
54
+ */
55
+ ssize_t flb_input_chunk_get_real_size (struct flb_input_chunk * ic )
56
+ {
57
+ return cio_chunk_get_real_size (ic -> chunk );
58
+ }
59
+
49
60
int flb_input_chunk_write (void * data , const char * buf , size_t len )
50
61
{
51
62
int ret ;
@@ -137,7 +148,7 @@ int flb_intput_chunk_count_dropped_chunks(struct flb_input_chunk *ic,
137
148
continue ;
138
149
}
139
150
140
- bytes_remained += flb_input_chunk_get_size (old_ic );
151
+ bytes_remained += flb_input_chunk_get_real_size (old_ic );
141
152
count ++ ;
142
153
if (bytes_remained >= chunk_size ) {
143
154
enough_space = FLB_TRUE ;
@@ -217,7 +228,8 @@ int flb_input_chunk_find_space_new_data(struct flb_input_chunk *ic,
217
228
continue ;
218
229
}
219
230
220
- old_ic_bytes = flb_input_chunk_get_size (old_ic );
231
+ old_ic_bytes = flb_input_chunk_get_real_size (old_ic );
232
+
221
233
/* drop chunk by adjusting the routes_mask */
222
234
flb_routes_mask_clear_bit (old_ic -> routes_mask , o_ins -> id );
223
235
o_ins -> fs_chunks_size -= old_ic_bytes ;
@@ -279,7 +291,7 @@ int flb_input_chunk_has_overlimit_routes(struct flb_input_chunk *ic,
279
291
flb_input_chunk_get_name (ic ), chunk_size ,
280
292
o_ins -> total_limit_size - o_ins -> fs_chunks_size ,
281
293
o_ins -> name );
282
-
294
+
283
295
if (o_ins -> fs_chunks_size + chunk_size > o_ins -> total_limit_size ) {
284
296
overlimit = 1 ;
285
297
}
@@ -360,7 +372,7 @@ struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
360
372
flb_input_chunk_get_name (ic ));
361
373
}
362
374
363
- bytes = flb_input_chunk_get_size (ic );
375
+ bytes = flb_input_chunk_get_real_size (ic );
364
376
flb_input_chunk_update_output_instances (ic , bytes );
365
377
366
378
return ic ;
@@ -474,7 +486,7 @@ int flb_input_chunk_destroy(struct flb_input_chunk *ic, int del)
474
486
continue ;
475
487
}
476
488
477
- bytes = flb_input_chunk_get_size (ic );
489
+ bytes = flb_input_chunk_get_real_size (ic );
478
490
if (flb_routes_mask_get_bit (ic -> routes_mask , o_ins -> id ) != 0 ) {
479
491
o_ins -> fs_chunks_size -= bytes ;
480
492
}
@@ -607,7 +619,6 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
607
619
608
620
/* Gather total number of enqueued bytes */
609
621
total = flb_input_chunk_total_size (in );
610
-
611
622
/* Register the total into the context variable */
612
623
in -> mem_chunks_size = total ;
613
624
0 commit comments