Introduction
Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.
Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.
This guide explains how to use the KEDA scaler for Kafka on a RCS Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.
Prerequisites
Before you begin:
Deploy a fresh RCS Kubernetes Engine (VKE) cluster with at least 2 nodes
Deploy a Ubuntu development server to use for connecting to the cluster
Using SSH, access the server as a non-root sudo user
Install Kubectl to access the VKE cluster
Install Docker
Install the latest stable version of the Go programming language
$ sudo apt install golang
Install the KEDA Operator
Create a directory
$ mkdir kafka-keda-RCSSwitch to the directory
$ cd kafka-keda-RCSSet the
KUBECONFIGenvironment variable with the path to your VKE YAML file to grantkubectlaccess to the cluster$ export KUBECONFIG=/path/to/vke/YAMLThe above command allows Kubectl to use your VKE YAML file as the default cluster file instead of
localhostDeploy
KEDAusing its deployment YAML file$ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.11.2/keda-2.11.2-core.yamlVerify the
KEDAdeployment status$ kubectl get deployment -n kedaYour output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE keda-metrics-apiserver 1/1 1 1 57s keda-operator 1/1 1 1 57sVerify that the
KEDAdeployment status isREADY
Set Up a Single Node Kafka cluster using the Strimzi Operator
Install the Strimzi operator
$ kubectl create -f 'https://strimzi.io/install/latest?namespace=default'Using a text editor such as
Nano, create a new filekafka-cluster.yaml$ nano kafka-cluster.yamlAdd the following contents to the file
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: version: 3.4.0 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.4" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}Save and close the file
Deploy the Kafka cluster
$ kubectl apply -f kafka-cluster.yamlWait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete
$ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600sWhen complete, the following output should display:
kafka.kafka.strimzi.io/my-cluster condition metWhen the cluster creation is complete, create a topic
$ kubectl run kafka-topics -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1Your output should look like the one below:
Created topic test-topic. pod "kafka-topics" deleted
Prepare the Kafka Consumer Application
Create a new Go module
$ go mod init kafka-consumerCreate a new file
main.go$ nano main.goAdd the following contents to the file
package main import ( "context" "fmt" "log" "os" "os/signal" "strings" "syscall" "time" "github.com/twmb/franz-go/pkg/kgo" ) const consumerGroupName = "my-group" var kafkaBroker string var topic string var client *kgo.Client func init() { kafkaBroker = os.Getenv("KAFKA_BROKER") if kafkaBroker == "" { log.Fatal("missing env var KAFKA_BROKER") } topic = os.Getenv("KAFKA_TOPIC") if topic == "" { log.Fatal("missing env var KAFKA_TOPIC") } fmt.Println("KAFKA_BROKER", kafkaBroker) fmt.Println("KAFKA_TOPIC", topic) opts := []kgo.Opt{ kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...), kgo.ConsumeTopics(topic), kgo.ConsumerGroup(consumerGroupName), kgo.OnPartitionsAssigned(partitionsAssigned), kgo.OnPartitionsRevoked(partitionsRevoked), kgo.OnPartitionsLost(partitionsLost), } var err error client, err = kgo.NewClient(opts...) if err != nil { log.Fatal(err) } } func main() { go func() { fmt.Println("starting kafka consumer goroutine") for { err := client.Ping(context.Background()) if err != nil { log.Fatal("ping failed - ", err) } fmt.Println("fetching records....") fetches := client.PollRecords(context.Background(), 0) if fetches.IsClientClosed() { fmt.Println("kgo kafka client closed") return } fetches.EachError(func(t string, p int32, err error) { fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err) }) fetches.Records() fetches.EachRecord(func(r *kgo.Record) { fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value)) time.Sleep(3 * time.Second) err = client.CommitRecords(context.Background(), r) if err != nil { fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition) } else { fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition) } }) } }() end := make(chan os.Signal, 1) signal.Notify(end, syscall.SIGINT, syscall.SIGTERM) <-end client.Close() fmt.Println("kafka consumer exit") } func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) { fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic]) } func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) { fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic]) } func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) { fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic]) }Save and close the file.
Below are what the above application parts do:
The
initfunction:Fetches the Kafka broker and topic information from the environment variables
If these environment variables aren't provided, the program exits with an error code
Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost
The
mainfunction starts agoroutineto poll and process Kafka records, then:The
goroutinepolls for Kafka records and logs any fetch errorsProcesses each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka
Listens for
SIGINTorSIGTERMto ensure that the application can be gracefully shut down using CTRL + C, or when receiving a termination signal.When the program detects a termination signal, the Kafka client closes, and the program exits
Deploy the Application to your VKE Cluster
Create a new
Dockerfileto store Docker variables$ nano DockerfileAdd the following contents to the file
FROM golang:1.19-buster AS build WORKDIR /app COPY go.mod ./ COPY go.sum ./ RUN go mod download COPY main.go ./ RUN go build -o /kafka-go-app FROM gcr.io/distroless/base-debian10 WORKDIR / COPY --from=build /kafka-go-app /kafka-go-app USER nonroot:nonroot ENTRYPOINT ["/kafka-go-app"]Save and close the file
The above
Dockerfilebuilds the Kafka consumer Go application using a multi-stage buildFetch the program Go module dependencies
$ go mod tidyLog in to Docker using your active Docker Hub account
$ sudo docker loginBuild the Docker image. Replace
example-userwith your actual Docker Hub ID$ sudo docker build -t example-user/myapp .Push the image to Docker hub
$ sudo docker push example-user/myappVerify that the command is successful and a new
myapprepository is available on your DockerHub profileCreate a new file
consumer.yaml$ nano consumer.yamlAdd the following contents to the file. Replace
example-user/myappwith your actual Docker repositoryapiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer-app labels: app: kafka-consumer-app spec: replicas: 1 selector: matchLabels: app: kafka-consumer-app template: metadata: labels: app: kafka-consumer-app spec: containers: - name: kafka-consumer-app-container image: example-user/myapp imagePullPolicy: Always env: - name: KAFKA_BROKER value: my-cluster-kafka-bootstrap:9092 - name: KAFKA_TOPIC value: test-topicSave and close the file
Deploy the application to your cluster
$ kubectl apply -f consumer.yamlVerify the application deployment status
$ kubectl get pods -l=app=kafka-consumer-appYour output should look like the one below:
NAME READY STATUS RESTARTS AGE kafka-consumer-app-c4b67d694-mptlw 1/1 Running 0 2m12sVerify that the
Podstatus changes toRunningView the application logs
$ kubectl logs -f -l=app=kafka-consumer-appOutput:
KAFKA_BROKER my-cluster-kafka-bootstrap:9092 KAFKA_TOPIC test-topic starting kafka consumer goroutine fetching records.... partitions ASSIGNED for topic test-topic [0 1 2 3 4]
Enable Autoscaling
Create a new file
scaled-object.yaml$ nano scaled-object.yamlAdd the following contents to the file
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: kafka-scaledobject spec: scaleTargetRef: name: kafka-consumer-app minReplicaCount: 1 maxReplicaCount: 5 pollingInterval: 30 triggers: - type: kafka metadata: bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092 consumerGroup: my-group topic: test-topic lagThreshold: "5" offsetResetPolicy: latestSave and close the file
Deploy the
KEDAscaled object$ kubectl apply -f scaled-object.yaml
Verify the Consumer Application Autoscaling
Monitor the number of consumer application pods
$ kubectl get pods -l=app=kafka-consumer-app -wIn a new terminal window, export the
KUBECONFIGvariable to activate Kubectl in the session$ export KUBECONFIG=/path/to/vke/YAMLRun the following command to send data to the Kafka topic
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-producer-perf-test.sh --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props linger.ms=0 batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the
test-topicat a rate of 200 records per second.When successful, your output should look like the one below:
producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000 producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic} : 15.471 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic} : 500.000 pod "kafka-producer" deletedWait for one minute, and verify the Kafka consumer application
Deploymentstatus$ kubectl get deployment/kafka-consumer-appYour output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE kafka-consumer-app 5/5 5 5 2m48sVerify that the deployment has scaled up to five pods
Navigate back to the monitoring terminal, and verify that your output looks like the one below:
kafka-consumer-app-6bc79dd94f-jb867 0/1 ContainerCreating 0 0s kafka-consumer-app-6bc79dd94f-59bg4 1/1 Running 0 3s kafka-consumer-app-6bc79dd94f-f5ll4 1/1 Running 0 5s kafka-consumer-app-6bc79dd94f-jb867 1/1 Running 0 7s kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s kafka-consumer-app-6bc79dd94f-wzrkp 0/1 ContainerCreating 0 0s kafka-consumer-app-6bc79dd94f-wzrkp 1/1 Running 0 3sPress CTRL + C to exit the
watchsession
KEDA Autoscaling Results
Auto-scaling happens due to the KEDA ScaledObject you created earlier. KEDA monitors the test-topic Kafka topic for the consumer message lag, and when the unread message count exceeds 5, KEDA scales out the kafka-consumer-app deployment with a maximum limit of 5 replicas. KEDA checks the Kafka metrics every 30 seconds to make these scaling decisions.
KEDA also scales down to 1 replica when the consumer applications complete processing the messages and the consumer lag decreases.
Conclusion
In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a RCS Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.
Next Steps
To implement more solutions on your VKE cluster, visit the following resources: