Skip to content

Commit bbb6295

Browse files
committed
Initial commit
0 parents  commit bbb6295

27 files changed

+1459
-0
lines changed

.DS_Store

6 KB
Binary file not shown.

CR_kafka.pptx

4.43 MB
Binary file not shown.

blogs-producer-deployment.yaml

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: blogs
5+
labels:
6+
app: blogs
7+
spec:
8+
replicas: 10
9+
selector:
10+
matchLabels:
11+
run: kafka-blogs-producer
12+
template:
13+
metadata:
14+
labels:
15+
run: kafka-blogs-producer
16+
spec:
17+
containers:
18+
- name: producer
19+
env:
20+
- name: POD_NAME
21+
valueFrom:
22+
fieldRef:
23+
fieldPath: metadata.name
24+
image: 'strimzi/kafka:0.20.0-kafka-2.6.0'
25+
args:
26+
- bash
27+
- '-c'
28+
- >-
29+
(while true; do echo "$(date) -- $POD_NAME $(uuidgen)"; done)
30+
| bin/kafka-console-producer.sh
31+
--broker-list my-cluster-kafka-bootstrap:9092 --topic blogs

compare_10_messages.sh

+24
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
#!/bin/bash
2+
3+
topic=$1
4+
partition=$2
5+
offset=$3
6+
7+
offset_sup=$((offset+10))
8+
9+
10+
echo "ns:kafka topic:$topic partion:$partition messages:[$offset:$offset_sup]"
11+
echo "==============================================================================================="
12+
oc exec -n kafka kafka-tools -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \
13+
--topic $topic \
14+
--partition $partition \
15+
--offset $offset \
16+
--max-messages 10
17+
18+
echo "ns:kafka-mirror topic:my-source-cluster.$topic partion:$partition messages:[$offset:$offset_sup]"
19+
echo "================================================================================================"
20+
oc exec -n kafka-mirror kafka-tools -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \
21+
--topic my-source-cluster.$topic \
22+
--partition $partition \
23+
--offset $offset \
24+
--max-messages 10

compare_topic.sh

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
#!/bin/bash
2+
3+
topic=$1
4+
num_partitions=$2
5+
max_messages=$3
6+
offset=$4
7+
offset=${offset:=0}
8+
if [ $offset -eq 0 ];
9+
then
10+
offset="--from-beginning";
11+
else
12+
offset="--offset $offset";
13+
fi
14+
15+
echo "==========================================================="
16+
echo "Comparing topic $topic between kafka and kafka-mirror"
17+
echo "Comparing from $offset a number of $max_messages messages"
18+
echo "On $num_partitions partitions"
19+
echo "==========================================================="
20+
21+
tmp_dir=$(mktemp -d)
22+
23+
for partition in $(seq 0 $num_partitions)
24+
do
25+
cluster=kafka
26+
echo "Checking on $cluster"
27+
partition_file_kafka="$tmp_dir/${cluster}_${partition}"
28+
oc exec -n $cluster kafka-tools -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \
29+
--topic $topic \
30+
--partition $partition \
31+
$offset \
32+
--max-messages $max_messages > $partition_file_kafka
33+
md5_kafka=($(md5sum $partition_file_kafka))
34+
35+
36+
cluster=kafka-mirror
37+
echo "Checking on $cluster"
38+
partition_file_kafka_mirror="$tmp_dir/${cluster}_${partition}"
39+
oc exec -n $cluster kafka-tools -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \
40+
--topic my-source-cluster.$topic \
41+
--partition $partition \
42+
$offset \
43+
--max-messages $max_messages > $partition_file_kafka_mirror
44+
md5_kafka_mirror=($(md5sum $partition_file_kafka_mirror))
45+
46+
echo "Comparing md5 file between"
47+
echo " $partition_file_kafka"
48+
echo " $partition_file_kafka_mirror"
49+
50+
if [ "$md5_kafka" = "$md5_kafka_mirror" ]; then
51+
echo "topic $topic partition $partition are equals between $offset on $max_messages messages on both clusters"
52+
else
53+
echo "topic $topic partition $partition are **not equal** between $offset on $max_messages messages on both clusters"
54+
exit 1
55+
fi
56+
echo "---------------------------------------------------------------------------------------------------------------------"
57+
done
58+
59+
echo "Topic $topic are completly equals from $offset on $max_messages messages on all the partitions"
60+
echo "cleaning temporary directory $tmp_dir"
61+
rm -rf $tmp_dir

count_message_per_seconds.sh

+39
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
#!/bin/bash
2+
# count the number of messages produced at each second
3+
ns=$1
4+
topic=$2
5+
# create kafka-tools if needed
6+
if ! oc get po -n $ns kafka-tools &>/dev/null
7+
then
8+
echo "installing kafka-tools"
9+
oc run kafka-tools -n $ns --image=strimzi/kafka:0.20.0-kafka-2.6.0 -- tail -f /dev/null
10+
fi
11+
12+
echo "counting the offsets of $topic ..."
13+
14+
first_total=0
15+
second_total=0
16+
17+
offset=0
18+
for item in $(oc exec -n $ns kafka-tools -- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list my-cluster-kafka-bootstrap:9092 --topic $topic)
19+
do
20+
echo $item
21+
offset=$(echo $item | cut -d: -f3)
22+
first_total=$((first_total + offset))
23+
done
24+
25+
time_to_recount=5
26+
echo "Waiting $time_to_recount second to recount the offsets of $topic ..."
27+
sleep 5
28+
29+
offset=0
30+
for item in $(oc exec -n $ns kafka-tools -- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list my-cluster-kafka-bootstrap:9092 --topic $topic)
31+
do
32+
echo $item
33+
offset=$(echo $item | cut -d: -f3)
34+
second_total=$((second_total + offset))
35+
done
36+
37+
echo "$(((second_total-first_total)/time_to_recount))/s $topic messages"
38+
39+

images/blueprint-kafka-kasten.png

112 KB
Loading

images/blueprint-mirror-maker-bp.png

110 KB
Loading

images/blueprint-mirror-maker.png

84.8 KB
Loading
67.8 KB
Loading
58.6 KB
Loading
56.2 KB
Loading

images/hot-snap.png

129 KB
Loading

items-producer-deployment.yaml

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
apiVersion: apps/v1
2+
kind: Deployment
3+
metadata:
4+
name: items
5+
labels:
6+
app: items
7+
spec:
8+
replicas: 10
9+
selector:
10+
matchLabels:
11+
run: kafka-items-producer
12+
template:
13+
metadata:
14+
labels:
15+
run: kafka-items-producer
16+
spec:
17+
containers:
18+
- name: producer
19+
env:
20+
- name: POD_NAME
21+
valueFrom:
22+
fieldRef:
23+
fieldPath: metadata.name
24+
image: 'strimzi/kafka:0.20.0-kafka-2.6.0'
25+
args:
26+
- bash
27+
- '-c'
28+
- >-
29+
(while true; do echo "$(date) -- $POD_NAME $(uuidgen)"; done)
30+
| bin/kafka-console-producer.sh
31+
--broker-list my-cluster-kafka-bootstrap:9092 --topic items

kafka.yaml

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
apiVersion: kafka.strimzi.io/v1beta2
2+
kind: Kafka
3+
metadata:
4+
name: my-cluster
5+
spec:
6+
kafka:
7+
version: 2.6.0
8+
replicas: 6
9+
listeners:
10+
- name: plain
11+
port: 9092
12+
type: internal
13+
tls: false
14+
- name: tls
15+
port: 9093
16+
type: internal
17+
tls: true
18+
template:
19+
# statefulset:
20+
# metadata:
21+
# annotations:
22+
# kanister.kasten.io/blueprint: scale-down-up-kafka-bp
23+
pod:
24+
restartPolicy: Always
25+
securityContext:
26+
runAsUser: 0
27+
fsGroup: 0
28+
config:
29+
offsets.topic.replication.factor: 1
30+
transaction.state.log.replication.factor: 1
31+
transaction.state.log.min.isr: 1
32+
log.message.format.version: "2.6"
33+
storage:
34+
type: jbod
35+
volumes:
36+
- id: 0
37+
type: persistent-claim
38+
size: 100Gi
39+
deleteClaim: false
40+
zookeeper:
41+
replicas: 3
42+
template:
43+
# statefulset:
44+
# metadata:
45+
# annotations:
46+
# kanister.kasten.io/blueprint: scale-down-up-bp
47+
pod:
48+
restartPolicy: Always
49+
securityContext:
50+
runAsUser: 0
51+
fsGroup: 0
52+
storage:
53+
type: persistent-claim
54+
size: 20Gi
55+
deleteClaim: false
56+
entityOperator:
57+
topicOperator: {}
58+
userOperator: {}

my-mirror-maker-2.yaml

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
apiVersion: kafka.strimzi.io/v1beta2
2+
kind: KafkaMirrorMaker2
3+
metadata:
4+
namespace: kafka-mirror
5+
name: my-mirror-maker
6+
spec:
7+
version: 2.8.0
8+
replicas: 1
9+
connectCluster: "my-target-cluster"
10+
clusters:
11+
- alias: "my-source-cluster"
12+
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc.cluster.local:9092
13+
- alias: "my-target-cluster"
14+
bootstrapServers: my-cluster-kafka-bootstrap.kafka-mirror.svc.cluster.local:9092
15+
config:
16+
# -1 means it will use the default replication factor configured in the broker
17+
config.storage.replication.factor: -1
18+
offset.storage.replication.factor: -1
19+
status.storage.replication.factor: -1
20+
mirrors:
21+
- sourceCluster: "my-source-cluster"
22+
targetCluster: "my-target-cluster"
23+
sourceConnector:
24+
config:
25+
replication.factor: 1
26+
offset-syncs.topic.replication.factor: 1
27+
sync.topic.acls.enabled: "false"
28+
heartbeatConnector:
29+
config:
30+
heartbeats.topic.replication.factor: 1
31+
checkpointConnector:
32+
config:
33+
checkpoints.topic.replication.factor: 1
34+
topicsPattern: "blogs,items"
35+
groupsPattern: ".*"

patch-kafka-with-bp.yaml

+13
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
spec:
2+
kafka:
3+
template:
4+
statefulset:
5+
metadata:
6+
annotations:
7+
kanister.kasten.io/blueprint: scale-down-up-kafka-bp
8+
zookeeper:
9+
template:
10+
statefulset:
11+
metadata:
12+
annotations:
13+
kanister.kasten.io/blueprint: scale-down-up-bp

read_last_10_messages.sh

+32
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
#!/bin/bash
2+
# read the last 10 message of topic
3+
ns=$1
4+
topic=$2
5+
page=$3
6+
page=${page:=0}
7+
if ! oc get po -n $ns kafka-tools &>/dev/null
8+
then
9+
echo "installing kafka-tools"
10+
oc run kafka-tools -n $ns --image=strimzi/kafka:0.20.0-kafka-2.6.0 -- tail -f /dev/null
11+
fi
12+
13+
for item in $(oc exec -n $ns kafka-tools -- bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list my-cluster-kafka-bootstrap:9092 --topic $topic)
14+
do
15+
partition=$(echo $item | cut -d: -f2)
16+
offset=$(echo $item | cut -d: -f3)
17+
offset=$((offset-(page*10)))
18+
offset_minus_10=$((offset-10))
19+
max_messages=10
20+
if [ $offset_minus_10 -le 0 ];
21+
then
22+
offset_minus_10=0;
23+
max_messages=$offset
24+
fi
25+
echo "ns:$ns topic:$topic partion:$partition messages:[$offset_minus_10:$offset]"
26+
echo "=========================================================================="
27+
oc exec -n $ns kafka-tools -- bin/kafka-console-consumer.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 \
28+
--topic $topic \
29+
--partition $partition \
30+
--offset $offset_minus_10 \
31+
--max-messages $max_messages
32+
done

0 commit comments

Comments
 (0)