Skip to content

Commit

Permalink
Allow message payload to be NULL.
Browse files Browse the repository at this point in the history
NULL message payloads are used with compaction topics introduced in 0.8.1
which use the NULL payload to indicate the corresponding key should be deleted.

The Kafka protocol guide also notes that the Value (payload) may be NULL.
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets

This commit makes these changes:

- prevents NULL payloads from being copied when RD_KAFKA_MSG_F_COPY is set.
- prevents payload bytes being copied into rkbuf when NULL.
- sets rkm_len to -1 when payload is NULL (just like key).
- Allows NULL payloads to be received in a fetch.
  • Loading branch information
secretmike committed Mar 16, 2015
1 parent ff3fb1e commit 248ca27
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 8 deletions.
27 changes: 19 additions & 8 deletions src/rdkafka_broker.c
Original file line number Diff line number Diff line change
Expand Up @@ -2127,6 +2127,10 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,


/* Value(payload) length */
if (rkm->rkm_payload == NULL) {
/* Set length to -1 to indicate a NULL value. */
rkm->rkm_len = -1;
}
msghdr->part4.Value_len = htonl(rkm->rkm_len);
msghdr->part3.Crc =
crc32(msghdr->part3.Crc,
Expand All @@ -2138,12 +2142,15 @@ static int rd_kafka_broker_produce_toppar (rd_kafka_broker_t *rkb,
rd_kafka_buf_push(rkbuf, &msghdr->part4, sizeof(msghdr->part4));


/* Payload */
msghdr->part3.Crc =
crc32(msghdr->part3.Crc,
rkm->rkm_payload,
rkm->rkm_len);
rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len);

/* Add Payload if not NULL */
if (rkm->rkm_payload != NULL) {
msghdr->part3.Crc =
crc32(msghdr->part3.Crc,
rkm->rkm_payload,
rkm->rkm_len);
rd_kafka_buf_push(rkbuf, rkm->rkm_payload, rkm->rkm_len);
}


/* Finalize Crc */
Expand Down Expand Up @@ -2801,14 +2808,18 @@ static rd_kafka_resp_err_t rd_kafka_messageset_handle (rd_kafka_broker_t *rkb,
/* Create op and push on temporary queue. */
rko = rd_kafka_op_new(RD_KAFKA_OP_FETCH);

rko->rko_rkmessage.key = NULL;
if (!RD_KAFKAP_BYTES_IS_NULL(Key)) {
rko->rko_rkmessage.key = Key->data;
rko->rko_rkmessage.key_len =
RD_KAFKAP_BYTES_LEN(Key);
}

rko->rko_rkmessage.payload = Value->data;
rko->rko_rkmessage.len = Value_len;
rko->rko_rkmessage.payload = NULL;
if (!RD_KAFKAP_BYTES_IS_NULL(Value)) {
rko->rko_rkmessage.payload = Value->data;
rko->rko_rkmessage.len = RD_KAFKAP_BYTES_LEN(Value);
}

rko->rko_rkmessage.offset = hdr->Offset;
rko->rko_rkmessage.rkt = rktp->rktp_rkt;
Expand Down
7 changes: 7 additions & 0 deletions src/rdkafka_msg.c
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ static rd_kafka_msg_t *rd_kafka_msg_new0 (rd_kafka_topic_t *rkt,
rkt->rkt_conf.message_timeout_ms * 1000;
}

/* Check for a NULL payload */
if (payload == NULL) {
rkm->rkm_payload = NULL;
rkm->rkm_len = 0;
return rkm;
}

if (msgflags & RD_KAFKA_MSG_F_COPY) {
/* Copy payload to space following the ..msg_t */
rkm->rkm_payload = (void *)(rkm+1);
Expand Down

0 comments on commit 248ca27

Please sign in to comment.