Skip to content

Commit 1f0d47f

Browse files
cawright-rhCameron WrightHa Van
authored
Allow property security-inter-broker-protocol (#85)
* adding the ability to use security-inter-broker-protocol in koperator * updating util.go to remove _ for generated names * adding replace all for external listener port name * fixing other places where externallistener name is used to not have _ * adding an alternative way to identify which port to use for kafka administration and cc connection * taking out comments for pr push * fixing kafka crd * setting omitempty so it will not be required * adding generated crds * adding comments with context for new flag UsedForKafkaAdminCommunication * Use getBrokerReadOnlyConfig function to get properties and update unit test - security_inter_broker_protocol_Set * Update crds to match generated manifest --------- Co-authored-by: Cameron Wright <red82277@adobe.com> Co-authored-by: Ha Van <red83362@adobe.com>
1 parent b1e9544 commit 1f0d47f

File tree

11 files changed

+172
-14
lines changed

11 files changed

+172
-14
lines changed

api/v1beta1/kafkacluster_types.go

+3
Original file line numberDiff line numberDiff line change
@@ -748,6 +748,9 @@ type CommonListenerSpec struct {
748748
// At least one of the listeners should have this flag enabled
749749
// +optional
750750
UsedForInnerBrokerCommunication bool `json:"usedForInnerBrokerCommunication"`
751+
// UsedForKafkaAdminCommunication allows for a different port to be returned when the koperator is checking for the port to use to check if kafka is operating.
752+
// +optional
753+
UsedForKafkaAdminCommunication bool `json:"usedForKafkaAdminCommunication,omitempty"`
751754
}
752755

753756
func (c *CommonListenerSpec) GetServerSSLCertSecretName() string {

charts/kafka-operator/crds/kafkaclusters.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -21677,6 +21677,11 @@ spec:
2167721677
description: At least one of the listeners should have this
2167821678
flag enabled
2167921679
type: boolean
21680+
usedForKafkaAdminCommunication:
21681+
description: UsedForKafkaAdminCommunication allows for a
21682+
different port to be returned when the koperator is checking
21683+
for the port to use to check if kafka is operating.
21684+
type: boolean
2168021685
required:
2168121686
- containerPort
2168221687
- externalStartingPort
@@ -21753,6 +21758,11 @@ spec:
2175321758
description: At least one of the listeners should have this
2175421759
flag enabled
2175521760
type: boolean
21761+
usedForKafkaAdminCommunication:
21762+
description: UsedForKafkaAdminCommunication allows for a
21763+
different port to be returned when the koperator is checking
21764+
for the port to use to check if kafka is operating.
21765+
type: boolean
2175621766
required:
2175721767
- containerPort
2175821768
- name

config/base/crds/kafka.banzaicloud.io_kafkaclusters.yaml

+10
Original file line numberDiff line numberDiff line change
@@ -21677,6 +21677,11 @@ spec:
2167721677
description: At least one of the listeners should have this
2167821678
flag enabled
2167921679
type: boolean
21680+
usedForKafkaAdminCommunication:
21681+
description: UsedForKafkaAdminCommunication allows for a
21682+
different port to be returned when the koperator is checking
21683+
for the port to use to check if kafka is operating.
21684+
type: boolean
2168021685
required:
2168121686
- containerPort
2168221687
- externalStartingPort
@@ -21753,6 +21758,11 @@ spec:
2175321758
description: At least one of the listeners should have this
2175421759
flag enabled
2175521760
type: boolean
21761+
usedForKafkaAdminCommunication:
21762+
description: UsedForKafkaAdminCommunication allows for a
21763+
different port to be returned when the koperator is checking
21764+
for the port to use to check if kafka is operating.
21765+
type: boolean
2175621766
required:
2175721767
- containerPort
2175821768
- name

pkg/resources/kafka/configmap.go

+16-7
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
4040
config := properties.NewProperties()
4141

4242
// Add listener configuration
43-
listenerConf := generateListenerSpecificConfig(&r.KafkaCluster.Spec.ListenersConfig, serverPasses, log)
43+
listenerConf := generateListenerSpecificConfig(&r.KafkaCluster.Spec, serverPasses, log)
4444
config.Merge(listenerConf)
4545

4646
// Add listener configuration
@@ -87,9 +87,13 @@ func (r *Reconciler) getConfigProperties(bConfig *v1beta1.BrokerConfig, id int32
8787
}
8888
}
8989

90-
// Add Cruise Control Metrics Reporter configuration
91-
if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil {
92-
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters))
90+
// Add Cruise Control Metrics Reporter configuration.
91+
// When "security.inter.broker.protocol" (e.g. inter broker communication is secure) is configured, the operator disables the reporter.
92+
_, isSecurityInterBrokerProtocolConfigured := getBrokerReadOnlyConfig(id, r.KafkaCluster, log).Get(kafkautils.KafkaConfigSecurityInterBrokerProtocol)
93+
if !isSecurityInterBrokerProtocolConfigured {
94+
if err := config.Set(kafkautils.CruiseControlConfigMetricsReporters, "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"); err != nil {
95+
log.Error(err, fmt.Sprintf("setting '%s' in broker configuration resulted an error", kafkautils.CruiseControlConfigMetricsReporters))
96+
}
9397
}
9498
bootstrapServers, err := kafkautils.GetBootstrapServersService(r.KafkaCluster)
9599
if err != nil {
@@ -244,12 +248,14 @@ func generateControlPlaneListener(iListeners []v1beta1.InternalListenerConfig) s
244248
return controlPlaneListener
245249
}
246250

247-
func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map[string]string, log logr.Logger) *properties.Properties {
251+
func generateListenerSpecificConfig(kcs *v1beta1.KafkaClusterSpec, serverPasses map[string]string, log logr.Logger) *properties.Properties {
248252
var (
249253
interBrokerListenerName string
250254
securityProtocolMapConfig []string
251255
listenerConfig []string
252256
)
257+
l := kcs.ListenersConfig
258+
r := kcs.ReadOnlyConfig
253259

254260
config := properties.NewProperties()
255261

@@ -292,9 +298,12 @@ func generateListenerSpecificConfig(l *v1beta1.ListenersConfig, serverPasses map
292298
if err := config.Set(kafkautils.KafkaConfigListenerSecurityProtocolMap, securityProtocolMapConfig); err != nil {
293299
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListenerSecurityProtocolMap))
294300
}
295-
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
296-
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
301+
if !strings.Contains(r, kafkautils.KafkaConfigSecurityInterBrokerProtocol+"=") {
302+
if err := config.Set(kafkautils.KafkaConfigInterBrokerListenerName, interBrokerListenerName); err != nil {
303+
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigInterBrokerListenerName))
304+
}
297305
}
306+
298307
if err := config.Set(kafkautils.KafkaConfigListeners, listenerConfig); err != nil {
299308
log.Error(err, fmt.Sprintf("setting '%s' parameter in broker configuration resulted an error", kafkautils.KafkaConfigListeners))
300309
}

pkg/resources/kafka/configmap_test.go

+20
Original file line numberDiff line numberDiff line change
@@ -605,6 +605,26 @@ metric.reporters=com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlM
605605
super.users=User:CN=kafka-headless.kafka.svc.cluster.local
606606
zookeeper.connect=example.zk:2181/`,
607607
},
608+
{
609+
testName: "security_inter_broker_protocol_Set",
610+
readOnlyConfig: `security.inter.broker.protocol=SASL_SSL`,
611+
zkAddresses: []string{"example.zk:2181"},
612+
zkPath: ``,
613+
kubernetesClusterDomain: ``,
614+
clusterWideConfig: ``,
615+
perBrokerConfig: ``,
616+
perBrokerReadOnlyConfig: ``,
617+
advertisedListenerAddress: `kafka-0.kafka.svc.cluster.local:9092`,
618+
listenerType: "plaintext",
619+
expectedConfig: `advertised.listeners=INTERNAL://kafka-0.kafka.svc.cluster.local:9092
620+
broker.id=0
621+
cruise.control.metrics.reporter.bootstrap.servers=kafka-all-broker.kafka.svc.cluster.local:9092
622+
cruise.control.metrics.reporter.kubernetes.mode=true
623+
listener.security.protocol.map=INTERNAL:PLAINTEXT
624+
listeners=INTERNAL://:9092
625+
zookeeper.connect=example.zk:2181/
626+
security.inter.broker.protocol=SASL_SSL`,
627+
},
608628
}
609629

610630
t.Parallel()

pkg/resources/kafka/kafka.go

+8-2
Original file line numberDiff line numberDiff line change
@@ -1513,20 +1513,26 @@ func getServiceFromExternalListener(client client.Client, cluster *v1beta1.Kafka
15131513
case istioingressutils.IngressControllerName:
15141514
if ingressConfigName == util.IngressConfigGlobalName {
15151515
iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplate, eListenerName, cluster.GetName())
1516+
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
15161517
} else {
15171518
iControllerServiceName = fmt.Sprintf(istioingressutils.MeshGatewayNameTemplateWithScope, eListenerName, ingressConfigName, cluster.GetName())
1519+
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
15181520
}
15191521
case envoyutils.IngressControllerName:
15201522
if ingressConfigName == util.IngressConfigGlobalName {
15211523
iControllerServiceName = fmt.Sprintf(envoyutils.EnvoyServiceName, eListenerName, cluster.GetName())
1524+
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
15221525
} else {
15231526
iControllerServiceName = fmt.Sprintf(envoyutils.EnvoyServiceNameWithScope, eListenerName, ingressConfigName, cluster.GetName())
1527+
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
15241528
}
15251529
case contourutils.IngressControllerName:
15261530
if ingressConfigName == util.IngressConfigGlobalName {
15271531
iControllerServiceName = fmt.Sprintf(contourutils.ContourServiceName, eListenerName, cluster.GetName())
1532+
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
15281533
} else {
15291534
iControllerServiceName = fmt.Sprintf(contourutils.ContourServiceNameWithScope, eListenerName, ingressConfigName, cluster.GetName())
1535+
iControllerServiceName = strings.ReplaceAll(iControllerServiceName, "_", "-")
15301536
}
15311537
}
15321538

@@ -1602,7 +1608,7 @@ func generateServicePortForIListeners(listeners []v1beta1.InternalListenerConfig
16021608
var usedPorts []corev1.ServicePort
16031609
for _, iListener := range listeners {
16041610
usedPorts = append(usedPorts, corev1.ServicePort{
1605-
Name: strings.ReplaceAll(iListener.GetListenerServiceName(), "_", ""),
1611+
Name: strings.ReplaceAll(iListener.GetListenerServiceName(), "_", "-"),
16061612
Port: iListener.ContainerPort,
16071613
TargetPort: intstr.FromInt(int(iListener.ContainerPort)),
16081614
Protocol: corev1.ProtocolTCP,
@@ -1615,7 +1621,7 @@ func generateServicePortForEListeners(listeners []v1beta1.ExternalListenerConfig
16151621
var usedPorts []corev1.ServicePort
16161622
for _, eListener := range listeners {
16171623
usedPorts = append(usedPorts, corev1.ServicePort{
1618-
Name: eListener.GetListenerServiceName(),
1624+
Name: strings.ReplaceAll(eListener.GetListenerServiceName(), "_", "-"),
16191625
Protocol: corev1.ProtocolTCP,
16201626
Port: eListener.ContainerPort,
16211627
TargetPort: intstr.FromInt(int(eListener.ContainerPort)),

pkg/util/client/common.go

+7
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,18 @@ func UseSSL(cluster *v1beta1.KafkaCluster) bool {
3737

3838
func getContainerPortForInnerCom(internalListeners []v1beta1.InternalListenerConfig, extListeners []v1beta1.ExternalListenerConfig) int32 {
3939
for _, val := range internalListeners {
40+
if val.UsedForKafkaAdminCommunication { // Optional override to return a port from a different listener. Needed if b2b communication is on an external listener and and you want the koperator to interact with kafka over a different port.
41+
return val.ContainerPort
42+
}
4043
if val.UsedForInnerBrokerCommunication {
4144
return val.ContainerPort
4245
}
4346
}
47+
4448
for _, val := range extListeners {
49+
if val.UsedForKafkaAdminCommunication {
50+
return val.ContainerPort
51+
}
4552
if val.UsedForInnerBrokerCommunication {
4653
return val.ContainerPort
4754
}

pkg/util/kafka/common.go

+8-1
Original file line numberDiff line numberDiff line change
@@ -192,15 +192,22 @@ func GetBootstrapServersService(cluster *v1beta1.KafkaCluster) (string, error) {
192192
// GetBrokerContainerPort return broker container port
193193
func GetBrokerContainerPort(cluster *v1beta1.KafkaCluster) (int32, error) {
194194
containerPort := int32(0)
195-
196195
for _, lc := range cluster.Spec.ListenersConfig.InternalListeners {
196+
if lc.UsedForKafkaAdminCommunication { // Optional override to return a port from a different listener. Needed if b2b communication is on an external listener and and you want the koperator to interact with kafka over a different port.
197+
containerPort = lc.ContainerPort
198+
break
199+
}
197200
if lc.UsedForInnerBrokerCommunication && !lc.UsedForControllerCommunication {
198201
containerPort = lc.ContainerPort
199202
break
200203
}
201204
}
202205

203206
for _, lc := range cluster.Spec.ListenersConfig.ExternalListeners {
207+
if lc.UsedForKafkaAdminCommunication {
208+
containerPort = lc.ContainerPort
209+
break
210+
}
204211
if lc.UsedForInnerBrokerCommunication {
205212
containerPort = lc.ContainerPort
206213
break

pkg/util/kafka/const.go

+1
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ const (
3030
KafkaConfigListenerName = "listener.name"
3131
KafkaConfigListenerSecurityProtocolMap = "listener.security.protocol.map"
3232
KafkaConfigInterBrokerListenerName = "inter.broker.listener.name"
33+
KafkaConfigSecurityInterBrokerProtocol = "security.inter.broker.protocol"
3334
KafkaConfigAdvertisedListeners = "advertised.listeners"
3435
KafkaConfigControlPlaneListener = "control.plane.listener.name"
3536

pkg/util/util.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -236,11 +236,12 @@ func IsIngressConfigInUse(iConfigName, defaultConfigName string, cluster *v1beta
236236

237237
// ConstructEListenerLabelName construct an eListener label name based on ingress config name and listener name
238238
func ConstructEListenerLabelName(ingressConfigName, eListenerName string) string {
239+
externalListenerName := strings.ReplaceAll(eListenerName, "_", "-")
239240
if ingressConfigName == IngressConfigGlobalName {
240-
return eListenerName
241+
return externalListenerName
241242
}
242243

243-
return fmt.Sprintf(ExternalListenerLabelNameTemplate, eListenerName, ingressConfigName)
244+
return fmt.Sprintf(ExternalListenerLabelNameTemplate, externalListenerName, ingressConfigName)
244245
}
245246

246247
// ShouldIncludeBroker returns true if the broker should be included as a resource on external listener resources
@@ -437,10 +438,11 @@ func ConvertConfigEntryListToProperties(config []sarama.ConfigEntry) (*propertie
437438
func GenerateEnvoyResourceName(resourceNameFormat string, resourceNameWithScopeFormat string, extListener v1beta1.ExternalListenerConfig, ingressConfig v1beta1.IngressConfig,
438439
ingressConfigName, clusterName string) string {
439440
var resourceName string
441+
externalListenerName := strings.ReplaceAll(extListener.Name, "_", "-")
440442
if ingressConfigName == IngressConfigGlobalName {
441-
resourceName = fmt.Sprintf(resourceNameFormat, extListener.Name, clusterName)
443+
resourceName = fmt.Sprintf(resourceNameFormat, externalListenerName, clusterName)
442444
} else {
443-
resourceName = fmt.Sprintf(resourceNameWithScopeFormat, extListener.Name, ingressConfigName, clusterName)
445+
resourceName = fmt.Sprintf(resourceNameWithScopeFormat, externalListenerName, ingressConfigName, clusterName)
444446
}
445447

446448
return resourceName

pkg/util/util_test.go

+83
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,89 @@ cruise.control.metrics.reporter.kubernetes.mode=true`,
691691
}
692692
}
693693

694+
func TestConstructEListenerLabelName(t *testing.T) {
695+
tests := []struct {
696+
ingressConfigName string
697+
eListenerName string
698+
expected string
699+
}{
700+
{"globalConfig", "example_listener_name", "example-listener-name"},
701+
{"globalConfig", "no_underscores", "no-underscores"},
702+
{"globalConfig", "multiple___underscores", "multiple---underscores"},
703+
{"globalConfig", "noUnderscoresHere", "noUnderscoresHere"},
704+
{"nonGlobalConfig", "example_listener_name", "example-listener-name-nonGlobalConfig"},
705+
}
706+
707+
for _, test := range tests {
708+
result := ConstructEListenerLabelName(test.ingressConfigName, test.eListenerName)
709+
if result != test.expected {
710+
t.Errorf("ConstructEListenerLabelName(%q, %q) = %q; want %q", test.ingressConfigName, test.eListenerName, result, test.expected)
711+
}
712+
}
713+
}
714+
715+
func TestGenerateEnvoyResourceName(t *testing.T) {
716+
testCases := []struct {
717+
resourceNameFormat string
718+
resourceNameWithScopeFormat string
719+
extListener v1beta1.ExternalListenerConfig
720+
ingressConfig v1beta1.IngressConfig
721+
ingressConfigName, clusterName, expected string
722+
}{
723+
{
724+
resourceNameFormat: "%s-%s",
725+
resourceNameWithScopeFormat: "%s-%s-%s",
726+
extListener: v1beta1.ExternalListenerConfig{
727+
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "noUnderscores"},
728+
},
729+
ingressConfig: v1beta1.IngressConfig{},
730+
ingressConfigName: "globalConfig",
731+
clusterName: "clusterName",
732+
expected: "noUnderscores-clusterName",
733+
},
734+
{
735+
resourceNameFormat: "%s-%s",
736+
resourceNameWithScopeFormat: "%s-%s-%s",
737+
extListener: v1beta1.ExternalListenerConfig{
738+
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "under_scores"},
739+
},
740+
ingressConfig: v1beta1.IngressConfig{},
741+
ingressConfigName: "globalConfig",
742+
clusterName: "clusterName",
743+
expected: "under-scores-clusterName",
744+
},
745+
{
746+
resourceNameFormat: "%s-%s",
747+
resourceNameWithScopeFormat: "%s-%s-%s",
748+
extListener: v1beta1.ExternalListenerConfig{
749+
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "noUnderscores"},
750+
},
751+
ingressConfig: v1beta1.IngressConfig{},
752+
ingressConfigName: "nonGlobalConfig",
753+
clusterName: "clusterName",
754+
expected: "noUnderscores-nonGlobalConfig-clusterName",
755+
},
756+
{
757+
resourceNameFormat: "%s-%s",
758+
resourceNameWithScopeFormat: "%s-%s-%s",
759+
extListener: v1beta1.ExternalListenerConfig{
760+
CommonListenerSpec: v1beta1.CommonListenerSpec{Name: "under_scores"},
761+
},
762+
ingressConfig: v1beta1.IngressConfig{},
763+
ingressConfigName: "nonGlobalConfig",
764+
clusterName: "clusterName",
765+
expected: "under-scores-nonGlobalConfig-clusterName",
766+
},
767+
}
768+
for _, test := range testCases {
769+
hash := GenerateEnvoyResourceName(test.resourceNameFormat, test.resourceNameWithScopeFormat, test.extListener, test.ingressConfig,
770+
test.ingressConfigName, test.clusterName)
771+
if hash != test.expected {
772+
t.Errorf("Expected: %s Got: %s", test.expected, hash)
773+
}
774+
}
775+
}
776+
694777
func TestGetMD5Hash(t *testing.T) {
695778
testCases := []struct {
696779
testName string

0 commit comments

Comments
 (0)