Skip to content

Commit c11c024

Browse files
authored
filter_kubernetes: add workload scraping logics for kubernetes filter (fluent#14)
1 parent 2b400a7 commit c11c024

28 files changed

+926
-4
lines changed

plugins/filter_kubernetes/kube_conf.c

+3
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,9 @@ void flb_kube_conf_destroy(struct flb_kube *ctx)
221221
if (ctx->parser == NULL && ctx->regex) {
222222
flb_regex_destroy(ctx->regex);
223223
}
224+
if (ctx->deploymentRegex) {
225+
flb_regex_destroy(ctx->deploymentRegex);
226+
}
224227

225228
flb_free(ctx->api_host);
226229
flb_free(ctx->token);

plugins/filter_kubernetes/kube_conf.h

+1
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ struct flb_kube {
138138

139139
/* Regex context to parse records */
140140
struct flb_regex *regex;
141+
struct flb_regex *deploymentRegex;
141142
struct flb_parser *parser;
142143

143144
/* TLS CA certificate file */

plugins/filter_kubernetes/kube_meta.c

+109-2
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,102 @@ static void extract_container_hash(struct flb_kube_meta *meta,
721721
}
722722
}
723723

724+
static void cb_results_workload(const char *name, const char *value,
725+
size_t vlen, void *data)
726+
{
727+
if (name == NULL || value == NULL || vlen == 0 || data == NULL) {
728+
return;
729+
}
730+
731+
struct flb_kube_meta *meta = data;
732+
733+
if (meta->workload == NULL && strcmp(name, "deployment") == 0) {
734+
meta->workload = flb_strndup(value, vlen);
735+
meta->workload_len = vlen;
736+
meta->fields++;
737+
}
738+
}
739+
740+
static void search_workload(struct flb_kube_meta *meta,struct flb_kube *ctx,msgpack_object map)
741+
{
742+
int i,j,ownerIndex;
743+
int regex_found;
744+
int replicaset_match;
745+
int podname_match = FLB_FALSE;
746+
msgpack_object k, v;
747+
msgpack_object_map ownerMap;
748+
struct flb_regex_search result;
749+
/* Temporary variable to store the workload value */
750+
msgpack_object workload_val;
751+
752+
for (i = 0; i < map.via.map.size; i++) {
753+
754+
k = map.via.map.ptr[i].key;
755+
v = map.via.map.ptr[i].val;
756+
if (strncmp(k.via.str.ptr, "name", k.via.str.size) == 0) {
757+
758+
if (!strncmp(v.via.str.ptr, meta->podname, v.via.str.size)) {
759+
podname_match = FLB_TRUE;
760+
}
761+
762+
}
763+
/* Example JSON for the below parsing:
764+
* "ownerReferences": [
765+
{
766+
"apiVersion": "apps/v1",
767+
"kind": "ReplicaSet",
768+
"name": "my-replicaset",
769+
"uid": "abcd1234-5678-efgh-ijkl-9876mnopqrst",
770+
"controller": true,
771+
"blockOwnerDeletion": true
772+
}
773+
]*/
774+
if (podname_match && strncmp(k.via.str.ptr, "ownerReferences", k.via.str.size) == 0 && v.type == MSGPACK_OBJECT_ARRAY) {
775+
for (j = 0; j < v.via.array.size; j++) {
776+
if (v.via.array.ptr[j].type == MSGPACK_OBJECT_MAP) {
777+
ownerMap = v.via.array.ptr[j].via.map;
778+
for (ownerIndex = 0; ownerIndex < ownerMap.size; ownerIndex++) {
779+
msgpack_object key = ownerMap.ptr[ownerIndex].key;
780+
msgpack_object val = ownerMap.ptr[ownerIndex].val;
781+
782+
/* Ensure both key and value are strings */
783+
if (key.type == MSGPACK_OBJECT_STR && val.type == MSGPACK_OBJECT_STR) {
784+
if (strncmp(key.via.str.ptr, "kind", key.via.str.size) == 0 && strncmp(val.via.str.ptr, "ReplicaSet", val.via.str.size) == 0) {
785+
replicaset_match = FLB_TRUE;
786+
}
787+
788+
if (strncmp(key.via.str.ptr, "name", key.via.str.size) == 0) {
789+
/* Store the value of 'name' in workload_val so it can be reused by set_workload */
790+
workload_val = val;
791+
if (replicaset_match) {
792+
regex_found = flb_regex_do(ctx->deploymentRegex, val.via.str.ptr, val.via.str.size, &result);
793+
if (regex_found > 0) {
794+
/* Parse regex results */
795+
flb_regex_parse(ctx->deploymentRegex, &result, cb_results_workload, meta);
796+
} else {
797+
/* Set workload if regex does not match */
798+
goto set_workload;
799+
}
800+
} else {
801+
/* Set workload if not a replicaset match */
802+
goto set_workload;
803+
}
804+
}
805+
}
806+
}
807+
}
808+
}
809+
}
810+
}
811+
812+
return;
813+
814+
set_workload:
815+
meta->workload = flb_strndup(workload_val.via.str.ptr, workload_val.via.str.size);
816+
meta->workload_len = workload_val.via.str.size;
817+
meta->fields++;
818+
}
819+
724820
static int search_podname_and_namespace(struct flb_kube_meta *meta,
725821
struct flb_kube *ctx,
726822
msgpack_object map)
@@ -1006,6 +1102,7 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
10061102
k = api_map.via.map.ptr[i].key;
10071103
if (k.via.str.size == 8 && !strncmp(k.via.str.ptr, "metadata", 8)) {
10081104
meta_val = api_map.via.map.ptr[i].val;
1105+
search_workload(meta,ctx,meta_val);
10091106
if (meta_val.type == MSGPACK_OBJECT_MAP) {
10101107
meta_found = FLB_TRUE;
10111108
}
@@ -1123,6 +1220,13 @@ static int merge_meta(struct flb_kube_meta *meta, struct flb_kube *ctx,
11231220
}
11241221
}
11251222

1223+
if (meta->workload != NULL) {
1224+
msgpack_pack_str(&mp_pck, 8);
1225+
msgpack_pack_str_body(&mp_pck, "workload", 8);
1226+
msgpack_pack_str(&mp_pck, meta->workload_len);
1227+
msgpack_pack_str_body(&mp_pck, meta->workload, meta->workload_len);
1228+
}
1229+
11261230
/* Append API Server content */
11271231
if (have_uid >= 0) {
11281232
v = meta_val.via.map.ptr[have_uid].val;
@@ -1693,8 +1797,7 @@ int flb_kube_meta_get(struct flb_kube *ctx,
16931797
return 0;
16941798
}
16951799

1696-
int flb_kube_meta_release(struct flb_kube_meta *meta)
1697-
{
1800+
int flb_kube_meta_release(struct flb_kube_meta *meta) {
16981801
int r = 0;
16991802

17001803
if (meta->namespace) {
@@ -1731,6 +1834,10 @@ int flb_kube_meta_release(struct flb_kube_meta *meta)
17311834
flb_free(meta->cache_key);
17321835
}
17331836

1837+
if (meta->workload) {
1838+
flb_free(meta->workload);
1839+
}
1840+
17341841
if (meta->cluster) {
17351842
flb_free(meta->cluster);
17361843
}

plugins/filter_kubernetes/kube_meta.h

+2
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,15 @@ struct flb_kube_meta {
3535
int docker_id_len;
3636
int container_hash_len;
3737
int container_image_len;
38+
int workload_len;
3839

3940
char *cluster;
4041
char *namespace;
4142
char *podname;
4243
char *container_name;
4344
char *container_image;
4445
char *docker_id;
46+
char *workload;
4547

4648
char *container_hash; /* set only on Systemd mode */
4749

plugins/filter_kubernetes/kube_regex.c

+1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ int flb_kube_regex_init(struct flb_kube *ctx)
3434
ctx->regex = flb_regex_create(KUBE_TAG_TO_REGEX);
3535
}
3636
}
37+
ctx->deploymentRegex = flb_regex_create(DEPLOYMENT_REGEX);
3738

3839
if (!ctx->regex) {
3940
return -1;

plugins/filter_kubernetes/kube_regex.h

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626

2727
#define KUBE_JOURNAL_TO_REGEX "^(?<name_prefix>[^_]+)_(?<container_name>[^\\._]+)(\\.(?<container_hash>[^_]+))?_(?<pod_name>[^_]+)_(?<namespace_name>[^_]+)_[^_]+_[^_]+$"
2828

29+
#define DEPLOYMENT_REGEX "^(?<deployment>.+)-(?<id>[bcdfghjklmnpqrstvwxz2456789]{6,10})$"
30+
2931
int flb_kube_regex_init(struct flb_kube *ctx);
3032

3133
#endif

plugins/out_cloudwatch_logs/cloudwatch_api.c

+12
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,14 @@ static int entity_add_attributes(struct flb_cloudwatch *ctx, struct cw_flush *bu
266266
goto error;
267267
}
268268
}
269+
if(stream->entity->attributes->workload != NULL && strlen(stream->entity->attributes->workload) != 0) {
270+
if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"K8s.Workload\":\"",buf->current_stream->entity->attributes->workload,"\"")) {
271+
goto error;
272+
}
273+
if (!try_to_write(buf->out_buf, offset, buf->out_buf_size,ts,0)) {
274+
goto error;
275+
}
276+
}
269277
if(stream->entity->attributes->instance_id != NULL && strlen(stream->entity->attributes->instance_id) != 0) {
270278
if (!snprintf(ts,ATTRIBUTES_MAX_LEN, ",%s%s%s","\"EC2.InstanceId\":\"",buf->current_stream->entity->attributes->instance_id,"\"")) {
271279
goto error;
@@ -943,6 +951,10 @@ void parse_entity(struct flb_cloudwatch *ctx, entity *entity, msgpack_object map
943951
if(entity->attributes->cluster_name == NULL) {
944952
entity->attributes->cluster_name = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
945953
}
954+
} else if(strncmp(kube_key.via.str.ptr, "workload", kube_key.via.str.size) == 0) {
955+
if(entity->attributes->workload == NULL) {
956+
entity->attributes->workload = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
957+
}
946958
} else if(strncmp(kube_key.via.str.ptr, "name_source", kube_key.via.str.size) == 0) {
947959
if(entity->attributes->name_source == NULL) {
948960
entity->attributes->name_source = flb_strndup(kube_val.via.str.ptr, kube_val.via.str.size);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{"log":"Fluent Bit is logging\n","stream":"stdout","time":"2019-04-01T17:58:33.598656444Z"}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
{
2+
"apiVersion": "v1",
3+
"kind": "Pod",
4+
"metadata": {
5+
"annotations": {
6+
"prometheus.io/path": "/api/v1/metrics/prometheus",
7+
"prometheus.io/port": "2020",
8+
"prometheus.io/scrape": "true"
9+
},
10+
"creationTimestamp": "2019-04-03T09:29:00Z",
11+
"labels": {
12+
"app.kubernetes.io/name": "fluent-bit"
13+
},
14+
"name": "use-kubelet-disabled-daemonset",
15+
"namespace": "options",
16+
"resourceVersion": "74466568",
17+
"selfLink": "/api/v1/namespaces/core/pods/base",
18+
"uid": "e9f2963f-55f2-11e9-84c5-02e422b8a84a",
19+
"ownerReferences": [
20+
{
21+
"apiVersion": "apps/v1",
22+
"kind": "DaemonSet",
23+
"name": "my-daemonset",
24+
"uid": "abcd1234-5678-efgh-ijkl-9876mnopqrst",
25+
"controller": true,
26+
"blockOwnerDeletion": true
27+
}
28+
]
29+
},
30+
"spec": {
31+
"containers": [
32+
{
33+
"image": "fluent/fluent-bit",
34+
"imagePullPolicy": "Always",
35+
"name": "fluent-bit",
36+
"resources": {},
37+
"stdin": true,
38+
"stdinOnce": true,
39+
"terminationMessagePath": "/dev/termination-log",
40+
"terminationMessagePolicy": "File",
41+
"tty": true,
42+
"volumeMounts": [
43+
{
44+
"mountPath": "/var/run/secrets/kubernetes.io/serviceaccount",
45+
"name": "default-token-9ffht",
46+
"readOnly": true
47+
}
48+
]
49+
}
50+
],
51+
"dnsPolicy": "ClusterFirst",
52+
"nodeName": "ip-10-49-18-80.eu-west-1.compute.internal",
53+
"restartPolicy": "Never",
54+
"schedulerName": "default-scheduler",
55+
"securityContext": {},
56+
"serviceAccount": "default",
57+
"serviceAccountName": "default",
58+
"terminationGracePeriodSeconds": 30,
59+
"tolerations": [
60+
{
61+
"effect": "NoExecute",
62+
"key": "node.kubernetes.io/not-ready",
63+
"operator": "Exists",
64+
"tolerationSeconds": 300
65+
},
66+
{
67+
"effect": "NoExecute",
68+
"key": "node.kubernetes.io/unreachable",
69+
"operator": "Exists",
70+
"tolerationSeconds": 300
71+
}
72+
],
73+
"volumes": [
74+
{
75+
"name": "default-token-9ffht",
76+
"secret": {
77+
"defaultMode": 420,
78+
"secretName": "default-token-9ffht"
79+
}
80+
}
81+
]
82+
},
83+
"status": {
84+
"conditions": [
85+
{
86+
"lastProbeTime": null,
87+
"lastTransitionTime": "2019-04-03T09:29:00Z",
88+
"status": "True",
89+
"type": "Initialized"
90+
},
91+
{
92+
"lastProbeTime": null,
93+
"lastTransitionTime": "2019-04-03T09:29:06Z",
94+
"status": "True",
95+
"type": "Ready"
96+
},
97+
{
98+
"lastProbeTime": null,
99+
"lastTransitionTime": "2019-04-03T09:29:00Z",
100+
"status": "True",
101+
"type": "PodScheduled"
102+
}
103+
],
104+
"containerStatuses": [
105+
{
106+
"containerID": "docker://c9898099f6d235126d564ed38a020007ea7a6fac6e25e718de683c9dd0076c16",
107+
"image": "fluent/fluent-bit:latest",
108+
"imageID": "docker-pullable://fluent/fluent-bit@sha256:7ac0fd3569af866e9a6a22eb592744200d2dbe098cf066162453f8d0b06c531f",
109+
"lastState": {},
110+
"name": "fluent-bit",
111+
"ready": true,
112+
"restartCount": 0,
113+
"state": {
114+
"running": {
115+
"startedAt": "2019-04-03T09:29:05Z"
116+
}
117+
}
118+
}
119+
],
120+
"hostIP": "10.49.18.80",
121+
"phase": "Running",
122+
"podIP": "100.116.192.42",
123+
"qosClass": "BestEffort",
124+
"startTime": "2019-04-03T09:29:00Z"
125+
}
126+
}

0 commit comments

Comments
 (0)