Skip to content

Commit

Permalink
Added generic deserialization using pack-str format
Browse files Browse the repository at this point in the history
  • Loading branch information
edenhill committed Jul 8, 2019
1 parent 3ba4ebc commit edc5d3e
Show file tree
Hide file tree
Showing 6 changed files with 539 additions and 145 deletions.
30 changes: 24 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ Otherwise follow directions below.

* librdkafka - https://github.com/edenhill/librdkafka
* libyajl (for JSON support, optional)
* libavro and libserdes (for Avro support, optional. See https://github.com/confluentinc/libserdes)
* libavro-c and libserdes (for Avro support, optional. See https://github.com/confluentinc/libserdes)

On Ubuntu or Debian: `sudo apt-get install librdkafka-dev libyajl-dev`

Expand Down Expand Up @@ -115,18 +115,30 @@ Output consumed messages in JSON envelope:

$ kafkacat -b mybroker -t syslog -J

Decode Avro key (`-a key`), value (`-a value`) or both (`-a .`) to JSON:

$ kafkacat -b mybroker -t ledger -a . -s http://schema-registry-url:8080
Decode Avro key (`-s key=avro`), value (`-s value=avro`) or both (`-s avro`) to JSON using schema from the Schema-Registry:

$ kafkacat -b mybroker -t ledger -s avro -r http://schema-registry-url:8080


Decode Avro message value and extract Avro record's "age" field:

$ kafkacat -b mybroker -t ledger -a value -s http://schema-registry-url:8080 | jq .payload.age
$ kafkacat -b mybroker -t ledger -s value=avro -r http://schema-registry-url:8080 | jq .payload.age


Decode key as 32-bit signed integer and value as 16-bit signed integer followed by an unsigned byte followed by string:

$ kafkacat -b mybroker -t mytopic -s key='i$' -s value='hB s'


*Hint: see `./kafkacat -h` for all available deserializer options.*


Output consumed messages according to format string:

$ kafkacat -b mybroker -t syslog -f 'Topic %t[%p], offset: %o, key: %k, payload: %S bytes: %s\n'


Read the last 100 messages from topic 'syslog' with librdkafka configuration parameter 'broker.version.fallback' set to '0.8.2.1' :

$ kafkacat -C -b mybroker -X broker.version.fallback=0.8.2.1 -t syslog -p 0 -o -100 -e
Expand All @@ -136,19 +148,23 @@ Produce a tombstone (a "delete" for compacted topics) for key "abc" by providing

$ echo "abc:" | kafkacat -b mybroker -t mytopic -Z -K:


Produce with headers:

$ echo "hello there" | kafkacat -b mybroker -H "header1=header value" -H "nullheader" -H "emptyheader=" -H "header1=duplicateIsOk"


Print headers in consumer:

$ kafkacat -b mybroker -C -t mytopic -f 'Headers: %h: Message value: %s\n'


Enable the idempotent producer, providing exactly-once and strict-ordering
**producer** guarantees:

$ kafkacat -b mybroker -X enable.idempotence=true -P -t mytopic ....


Metadata listing:

````
Expand All @@ -172,10 +188,12 @@ Metadata for all topics (from broker 1: mybroker:9092/1):
....
````


JSON metadata listing

$ kafkacat -b mybroker -L -J


Pretty-printed JSON metadata listing

$ kafkacat -b mybroker -L -J | jq .
Expand Down Expand Up @@ -209,12 +227,12 @@ Here are two short examples of using kafkacat from Docker. See the [Docker Hub l
EOF
```
* Consume messages:
* Consume messages:
```
docker run --tty --interactive --rm \
confluentinc/cp-kafkacat \
kafkacat -b kafka-broker:9092 \
kafkacat -b kafka-broker:9092 \
-C \
-f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n' \
-t test
Expand Down
11 changes: 7 additions & 4 deletions bootstrap.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,17 @@ pushd tmp-bootstrap > /dev/null

export DEST="$PWD/usr"
export CFLAGS="-I$DEST/include"
export LDFLAGS="-L$DEST/lib -Wl,-rpath-link=$DEST/lib"
if [[ $(uname -s) == Linux ]]; then
export LDFLAGS="-L$DEST/lib -Wl,-rpath-link=$DEST/lib"
else
export LDFLAGS="-L$DEST/lib"
fi
export PKG_CONFIG_PATH="$DEST/lib/pkgconfig"

github_download "edenhill/librdkafka" "$LIBRDKAFKA_VERSION" "librdkafka"
github_download "edenhill/yajl" "edenhill" "libyajl"

build librdkafka "([ -f config.h ] || ./configure --prefix=$DEST) && make && make install" || (echo "Failed to build librdkafka: bootstrap failed" ; false)
build librdkafka "([ -f config.h ] || ./configure --prefix=$DEST --enable-static --install-deps --disable-lz4-ext) && make -j && make install" || (echo "Failed to build librdkafka: bootstrap failed" ; false)

github_download "edenhill/yajl" "edenhill" "libyajl"
build libyajl "([ -d build ] || ./configure --prefix $DEST) && make distro && make install" || (echo "Failed to build libyajl: JSON support will probably be disabled" ; true)

download http://www.digip.org/jansson/releases/jansson-2.12.tar.gz libjansson
Expand Down
Loading

0 comments on commit edc5d3e

Please sign in to comment.