kubernetes – Kafka logs get reset when all brokers restart at the same time

I have a kafka cluster running on an integration Kubernetes cluster with 3 Kafka brokers and 3 Zookeepers, each component running in a statefulset.
The helm chart I use to deploy the cluster is a custom one as I needed external access to the cluster through a BigIp F5 and did not find a helm chart doing this.

The Kafka image is confluentinc/cp-kafka:5.4.0 and the zookeeper one is confluentinc/cp-zookeeper:5.4.0

/var/lib/zookeeper/data and /var/lib/zookeeper/log for Zookeeper are mapped to persistent volumes.
The same for /var/lib/kafka on Kafka

I use hlebalbau/kafka-manager:stable to watch the state of the cluster.

I set-up three partitions per topic, and replication factor is also equal to three.

Recently I realized that if I restarted all three kafka brokers at the same time (with kubectl delete pod) all the topics contents was lost :

  • the log size drops to zero for each topic
  • the list of topics stays the same
  • the consumers list stays the same, but each consumer current offset drops to negative value (if the consumer was at offset 10000 for a 10000 message topic, then the topic size drops to zero and the consumer offset to -10000 )

I never encountered any issue when restarting one kafka broker at a time and waiting for it to be started before restarting another one.
I know that a kafka cluster is not meant to be stopeed or restarted like this. But I did not expect this kind of behaviour.

Is it expected behaviour ? Or did I miss something obvious ?

Here is my Yaml template for a kafka broker. As indicated by its name, the wait-zookeeper.sh script “just” waits for the zookeepers to be started.

---
apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-controller-1
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-1
  serviceName: kafka1-headless
  template:
    metadata:
      labels:
        app: kafka-1
        cluster: kafka
    spec:
      initContainers:
        - name: init-wait-zookeeper
          image: bash:latest
          command: ("/usr/local/bin/bash","-c","cp /wait-zookeeper-configmap/wait-zookeeper.sh /wait-zookeeper-emptydir/ && chmod 755 /wait-zookeeper-emptydir/wait-zookeeper.sh && /wait-zookeeper-emptydir/wait-zookeeper.sh")
          volumeMounts:
            - name: wait-zookeeper-configmap
              mountPath: /wait-zookeeper-configmap
            - name: wait-zookeeper-emptydir
              mountPath: /wait-zookeeper-emptydir
      affinity:
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
          - weight: 100
            podAffinityTerm:
              labelSelector:
                matchExpressions:
                - key: app
                  operator: In
                  values:
                  - kafka-2
                  - kafka-3
                  {{- if gt .Values.workerNodesNumber 5.0 }}
                  - zookeeper-1
                  - zookeeper-2
                  - zookeeper-3
                  {{- end }}
              topologyKey: "kubernetes.io/hostname"
      containers:
        - name: kafka1
          image: confluentinc/cp-kafka:5.4.0
          resources:
            requests:
              memory: "512Mi"
            limits:
              memory: "{{.Values.jvmMaxHeapSizeGb}}Gi"
          ports:
          - containerPort: 9092
          env:
          - name: HOST_IP
            valueFrom:
              fieldRef:
                fieldPath: status.hostIP
          - name: KAFKA_LISTENERS
            value: "PLAINTEXT://0.0.0.0:9092"
          - name: KAFKA_ADVERTISED_LISTENERS
            value: "PLAINTEXT://$(HOST_IP):{{ add .Values.startingNodePort 0 }}"
          - name: KAFKA_BROKER_ID
            value: "10"
          - name: KAFKA_ZOOKEEPER_CONNECT
            value: "zookeeper-controller-1-0.zoo1-headless:2181,zookeeper-controller-2-0.zoo2-headless:2181,zookeeper-controller-3-0.zoo3-headless:2181"
          - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
            value: "3"
          - name: KAFKA_DELETE_TOPIC_ENABLE
            value: "true"
          - name: KAFKA_DEFAULT_REPLICATION_FACTOR
            value: "3"
          - name: KAFKA_NUM_PARTITIONS
            value: "{{.Values.defaultPartitionsNumber}}"
          - name: KAFKA_LOG_RETENTION_HOURS
            value: "{{.Values.retentionTimeHours}}"
          - name: KAFKA_OFFSETS_RETENTION_MINUTES
            value: "{{.Values.retentionTimeHours | mul 60 }}"
          - name: JMX_PORT
            value: "{{ add .Values.startingNodePort 3 }}"
          - name: KAFKA_JMX_HOSTNAME
            value: "kafka-service-1"
          - name: KAFKA_HEAP_OPTS
            value: "-Xms512m -Xmx{{.Values.jvmMaxHeapSizeGb}}G"
          livenessProbe:
            exec:
              command:
              - /bin/bash
              - -c
              - "unset JMX_PORT && kafka-broker-api-versions --bootstrap-server=localhost:9092"
            initialDelaySeconds: 60
            periodSeconds: 20
          volumeMounts:
            - name: "kafka-logs"
              mountPath: "/var/lib/kafka"
      volumes:
        - name: "wait-zookeeper-configmap"
          configMap:
            name: "kafka-initializer"
            items:
              - key: "wait-zookeeper.sh"
                path: "wait-zookeeper.sh"
        - name: "wait-zookeeper-emptydir"
          emptyDir: {}
  volumeClaimTemplates:
  - metadata:
      name: kafka-logs
    spec:
      storageClassName: {{.Values.storageClassName}}
      accessModes: ("ReadWriteOnce")
      resources:
        requests:
          storage: {{.Values.storageSizeGb}}Gi