With the deprecation of Message Router, the auto creation of kafka topics will no longer be supported.
This will now be managed by Strimzi at helm chart level.
Applications should know/identify what topics they require up front and also the relevant access to those topics.
It is also recommended that all clients communicate with the kafka cluster using their preferred kafka client.


The examples below refer to the CPS project which uses spring-kafka to communicate with the cluster.

CPS both produces and consumes on the same topic.

For kafka clients that require a topic, this will need to be defined as a KafkaTopic custom resource in the relevant helm chart.

{{- if .Values.config.useStrimziKafka }}
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
  name: {{ .Values.config.dataUpdatedTopic.name }}
  labels:
    strimzi.io/cluster: {{ include "common.release" . }}-strimzi
spec:
  partitions: {{ .Values.config.dataUpdatedTopic.partitions }}
  config:
    retention.ms: {{ .Values.config.dataUpdatedTopic.retentionMs }}
    segment.bytes: {{ .Values.config.dataUpdatedTopic.segmentBytes }}
{{- end }}


For kafka clients that require access (read/write) to a topic, this will need to be defined as a KafkaUser custom resource in the relevant helm chart.

{{- if .Values.config.useStrimziKafka }}
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: {{ include "common.release" . }}-{{ .Values.global.cpsKafkaUser }}
  labels:
    strimzi.io/cluster: {{ include "common.release" . }}-strimzi
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
    - resource:
        type: group
        name: {{ .Values.config.dataUpdatedTopic.consumer.groupId }}
      operation: Read
    - resource:
        type: topic
        name: {{ .Values.config.dataUpdatedTopic.name }}
      operation: Read
    - resource:
        type: topic
        name: {{ .Values.config.dataUpdatedTopic.name }}
      operation: Write
{{- end }}

The following KafkaUser specs must be:

spec.authentication.type = scram-sha-512

spec.authorization.type = simple


and the relevant values in values.yaml:

global:
  kafkaBootstrap: strimzi-kafka-bootstrap
  cpsKafkaUser: cps-kafka-user

config:
  useStrimziKafka: true
  dataUpdatedTopic:
    name: cps.data-updated-events
    partitions: 10
    retentionMs: 7200000
    segmentBytes: 1073741824
    consumer:
      groupId: cps-temporal-group

Connecting to the kafka cluster

On OOM, Strimzi creates a kubernetes secret with the same name as the KafkaUser (in this case cps-kafka-user) 
An example of the format is:

apiVersion: v1
kind: Secret
metadata:
  name: cps-kafka-user
  labels:
    strimzi.io/kind: KafkaUser
    strimzi.io/cluster: my-cluster
type: Opaque
data:
  password: Z2VuZXJhdGVkcGFzc3dvcmQ=
  sasl.jaas.config: b3JnLmFwYWNoZS5rYWZrYS5jb21tb24uc2VjdXJpdHkuc2NyYW0uU2NyYW1Mb2dpbk1vZHVsZSByZXF1aXJlZCB1c2VybmFtZT0ibXktdXNlciIgcGFzc3dvcmQ9ImdlbmVyYXRlZHBhc3N3b3JkIjsK

To retrieve this secret we need to use the _secret template provided by OOM common.
In the case of cps-temporal for example we configure and externalSecret on the pod:

cps-temporal:
  enabled: true
  config:
    jaasConfExternalSecret: '{{ include "common.release" . }}-{{ .Values.global.cpsKafkaUser }}'

Which is then setup in the secrets section of the components values.yaml.
In this example we are getting the sasl.jaas.config which provides the credentials to connect to the cluster in the form of org.apache.kafka.common.security.scram.ScramLoginModule required username="my-user" password="generatedpassword";

secrets:
  - uid: cps-kafka-user
    externalSecret: '{{ tpl (default "" .Values.config.jaasConfExternalSecret) . }}'
    type: genericKV
    envs:
      - name: sasl.jaas.config
        value: '{{ .Values.config.someConfig }}'
        policy: generate

We then pass the secret to the pod via an environment variable:

       ...
        env:
          {{- if .Values.config.useStrimziKafka }}
          - name: JAASLOGIN
            {{- include "common.secret.envFromSecretFast" (dict "global" . "uid" "cps-kafka-user" "key" "sasl.jaas.config") | indent 12 }}
          {{- end }}
       ...

In the case of cps-temporal, it then sets this config as part of it's application.yaml:

{{- if .Values.config.useStrimziKafka }}
spring.kafka.bootstrap-servers: {{ include "common.release" . }}-{{ .Values.config.kafkaBootstrap }}:9092
spring.kafka.security.protocol: SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism: SCRAM-SHA-512
spring.kafka.properties.sasl.jaas.config: ${JAASLOGIN}
{{ else }}

For your application,

The security.protocol must be: SASL_PLAINTEXT

The sasl.mechanism must be: SCRAM-SHA-512

  • No labels