Skip to content

Commit 9978dfa

Browse files
authored
Zk kraft migration (#100)
* Allow setting CLUSTER_ID as env var for zk to kraft migration * Add migration properties to control if broker are in kraft mode or zk mode even when kraft is enabled * Refactor functions and additional test cases * Fix linting * Fix unit test * Add zkConnect property when broker is in zk mode * Remove control.plane.listener.name property from broker during migration * Add additional labels to brokers for backward compatibility while performing migration
1 parent 5c5dbe7 commit 9978dfa

9 files changed

+335
-27
lines changed

api/v1beta1/kafkacluster_types.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -1102,13 +1102,18 @@ func (bConfig *BrokerConfig) GetBrokerAnnotations() map[string]string {
11021102

11031103
// GetBrokerLabels returns the labels that are applied to broker pods
11041104
func (bConfig *BrokerConfig) GetBrokerLabels(kafkaClusterName string, brokerId int32, kRaftMode bool) map[string]string {
1105-
kraftLabels := make(map[string]string, 0)
1105+
var kraftLabels map[string]string
11061106
if kRaftMode {
11071107
kraftLabels = map[string]string{
11081108
ProcessRolesKey: strings.Join(bConfig.Roles, "_"),
11091109
IsControllerNodeKey: fmt.Sprintf("%t", bConfig.IsControllerNode()),
11101110
IsBrokerNodeKey: fmt.Sprintf("%t", bConfig.IsBrokerNode()),
11111111
}
1112+
} else { // in ZK mode -> new labels for backward compatibility for the headless service when going from ZK to KRaft
1113+
kraftLabels = map[string]string{
1114+
IsControllerNodeKey: fmt.Sprintf("%t", false),
1115+
IsBrokerNodeKey: fmt.Sprintf("%t", true),
1116+
}
11121117
}
11131118
return util.MergeLabels(
11141119
bConfig.BrokerLabels,

api/v1beta1/kafkacluster_types_test.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -445,10 +445,12 @@ func TestGetBrokerLabels(t *testing.T) {
445445
{
446446
testName: "Labels in zookeeper mode",
447447
expectedLabels: map[string]string{
448-
AppLabelKey: expectedDefaultLabelApp,
449-
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
450-
KafkaCRLabelKey: expectedKafkaCRName,
451-
"test_label_key": "test_label_value",
448+
AppLabelKey: expectedDefaultLabelApp,
449+
BrokerIdLabelKey: strconv.Itoa(expectedBrokerId),
450+
KafkaCRLabelKey: expectedKafkaCRName,
451+
IsBrokerNodeKey: "true",
452+
IsControllerNodeKey: "false",
453+
"test_label_key": "test_label_value",
452454
},
453455
brokerConfig: &BrokerConfig{
454456
Roles: nil,

controllers/tests/kafkacluster_controller_kafka_test.go

+8
Original file line numberDiff line numberDiff line change
@@ -432,6 +432,14 @@ func expectKafkaBrokerPod(ctx context.Context, kafkaCluster *v1beta1.KafkaCluste
432432
Value: "/kafka-logs,/ephemeral-dir1",
433433
},
434434
))
435+
436+
// when CLUSTER_ID is set as an ENV, verify the status is not randomly generated
437+
for _, env := range kafkaCluster.Spec.Envs {
438+
if env.Name == "CLUSTER_ID" {
439+
Expect(kafkaCluster.Status.ClusterID).To(Equal("test-cluster-id"))
440+
break
441+
}
442+
}
435443
} else {
436444
Expect(container.Env).To(ConsistOf(
437445
// the exact value is not interesting

controllers/tests/kafkacluster_controller_test.go

+17
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,23 @@ var _ = Describe("KafkaCluster", func() {
256256
expectCruiseControl(ctx, kafkaClusterKRaft)
257257
})
258258
})
259+
When("configuring Kafka cluster in KRaft mode with CLUSTER_ID env var", func() {
260+
BeforeEach(func() {
261+
loadBalancerServiceName = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaCluster.Name)
262+
externalListenerHostName = "test.host.com"
263+
264+
loadBalancerServiceNameKRaft = fmt.Sprintf("envoy-loadbalancer-test-%s", kafkaClusterKRaft.Name)
265+
externalListenerHostNameKRaft = "test.host.com"
266+
kafkaClusterKRaft.Spec.Envs = append(kafkaClusterKRaft.Spec.Envs, corev1.EnvVar{
267+
Name: "CLUSTER_ID",
268+
Value: "test-cluster-id",
269+
})
270+
})
271+
272+
It("should reconciles objects properly", func(ctx SpecContext) {
273+
expectKafka(ctx, kafkaClusterKRaft, count)
274+
})
275+
})
259276
When("configuring one ingress envoy controller config inside the external listener without bindings", func() {
260277
BeforeEach(func() {
261278
testExternalListener := kafkaCluster.Spec.ListenersConfig.ExternalListeners[0]

pkg/resources/kafka/configmap.go

+53-14
Original file line numberDiff line numberDiff line change
@@ -57,9 +57,11 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, broker v
5757
// Cruise Control metrics reporter configuration
5858
r.configCCMetricsReporter(broker, config, clientPass, log)
5959

60+
brokerReadOnlyConfig := getBrokerReadOnlyConfig(broker, r.KafkaCluster, log)
61+
6062
// Kafka Broker configurations
6163
if r.KafkaCluster.Spec.KRaftMode {
62-
configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log)
64+
configureBrokerKRaftMode(bConfig, broker.Id, r.KafkaCluster, config, quorumVoters, serverPasses, extListenerStatuses, intListenerStatuses, log, brokerReadOnlyConfig)
6365
} else {
6466
configureBrokerZKMode(broker.Id, r.KafkaCluster, config, serverPasses, extListenerStatuses, intListenerStatuses, controllerIntListenerStatuses, log)
6567
}
@@ -148,23 +150,42 @@ func (r *Reconciler) configCCMetricsReporter(broker v1beta1.Broker, config *prop
148150
}
149151

150152
func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
151-
quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {
152-
if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil {
153-
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID))
154-
}
153+
quorumVoters []string, serverPasses map[string]string, extListenerStatuses, intListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger,
154+
brokerReadOnlyConfig *properties.Properties) {
155+
controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)
155156

156-
if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil {
157-
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles))
158-
}
157+
// when kRaft is enabled for the cluster, brokers can still be configured to use zookeeper for metadata.
158+
// this is to support the zk to kRaft migration where both zookeeper and kRaft controllers are running in parallel.
159+
if shouldUseKRaftModeForBroker(brokerReadOnlyConfig) {
160+
if err := config.Set(kafkautils.KafkaConfigNodeID, brokerID); err != nil {
161+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigNodeID))
162+
}
159163

160-
if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil {
161-
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters))
164+
if err := config.Set(kafkautils.KafkaConfigProcessRoles, bConfig.Roles); err != nil {
165+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigProcessRoles))
166+
}
167+
} else { // use zk mode for broker.
168+
// when in zk mode, "broker.id" and "zookeeper.connect" are configured so it will communicate with zookeeper
169+
// control.plane.listener.name will not be set in zk mode. There for it will default to the interbroker listener.
170+
if err := config.Set(kafkautils.KafkaConfigBrokerID, brokerID); err != nil {
171+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigBrokerID))
172+
}
173+
174+
if err := config.Set(kafkautils.KafkaConfigZooKeeperConnect, zookeeperutils.PrepareConnectionAddress(
175+
kafkaCluster.Spec.ZKAddresses, kafkaCluster.Spec.GetZkPath())); err != nil {
176+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigZooKeeperConnect))
177+
}
162178
}
163179

164-
controllerListenerName := generateControlPlaneListener(kafkaCluster.Spec.ListenersConfig.InternalListeners)
165-
if controllerListenerName != "" {
166-
if err := config.Set(kafkautils.KafkaConfigControllerListenerName, controllerListenerName); err != nil {
167-
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerListenerName))
180+
if shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig) {
181+
if err := config.Set(kafkautils.KafkaConfigControllerQuorumVoters, quorumVoters); err != nil {
182+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerQuorumVoters))
183+
}
184+
185+
if controllerListenerName != "" {
186+
if err := config.Set(kafkautils.KafkaConfigControllerListenerName, controllerListenerName); err != nil {
187+
log.Error(err, fmt.Sprintf(kafkautils.BrokerConfigErrorMsgTemplate, kafkautils.KafkaConfigControllerListenerName))
188+
}
168189
}
169190
}
170191

@@ -214,6 +235,20 @@ func configureBrokerKRaftMode(bConfig *v1beta1.BrokerConfig, brokerID int32, kaf
214235
}
215236
}
216237

238+
// Returns true by default (not in migration configured) OR when MigrationBrokerKRaftMode is set and 'true'.
239+
// this is to support the zk to kRaft migration
240+
func shouldUseKRaftModeForBroker(brokerReadOnlyConfig *properties.Properties) bool {
241+
migrationBrokerKRaftMode, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerKRaftMode)
242+
return !found || migrationBrokerKRaftMode.Value() == "true"
243+
}
244+
245+
// Returns true by default (not in migration) OR when MigrationBrokerControllerQuorumConfigEnabled is set and 'true'.
246+
// this is to support the zk to kRaft migration
247+
func shouldConfigureControllerQuorumForBroker(brokerReadOnlyConfig *properties.Properties) bool {
248+
migrationBrokerControllerQuorumConfigEnabled, found := brokerReadOnlyConfig.Get(kafkautils.MigrationBrokerControllerQuorumConfigEnabled)
249+
return !found || migrationBrokerControllerQuorumConfigEnabled.Value() == "true"
250+
}
251+
217252
func configureBrokerZKMode(brokerID int32, kafkaCluster *v1beta1.KafkaCluster, config *properties.Properties,
218253
serverPasses map[string]string, extListenerStatuses, intListenerStatuses,
219254
controllerIntListenerStatuses map[string]v1beta1.ListenerStatusList, log logr.Logger) {
@@ -547,6 +582,10 @@ func (r Reconciler) generateBrokerConfig(broker v1beta1.Broker, brokerConfig *v1
547582
finalBrokerConfig.Merge(opGenConf)
548583
}
549584

585+
// Remove the migration broker configuration since its only used as flags to derive other configs
586+
finalBrokerConfig.Delete(kafkautils.MigrationBrokerControllerQuorumConfigEnabled)
587+
finalBrokerConfig.Delete(kafkautils.MigrationBrokerKRaftMode)
588+
550589
finalBrokerConfig.Sort()
551590

552591
return finalBrokerConfig.String()

0 commit comments

Comments
 (0)