Introduction
Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.
Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.
This guide explains how to use the KEDA scaler for Kafka on a RCS Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.
Prerequisites
Before you begin:
Deploy a fresh RCS Kubernetes Engine (VKE) cluster with at least 2 nodes
Deploy a Ubuntu development server to use for connecting to the cluster
Using SSH, access the server as a non-root sudo user
Install Kubectl to access the VKE cluster
Install Docker
Install the latest stable version of the Go programming language
$ sudo apt install golang
Install the KEDA Operator
Create a directory
$ mkdir kafka-keda-RCS
Switch to the directory
$ cd kafka-keda-RCS
Set the
KUBECONFIG
environment variable with the path to your VKE YAML file to grantkubectl
access to the cluster$ export KUBECONFIG=/path/to/vke/YAML
The above command allows Kubectl to use your VKE YAML file as the default cluster file instead of
localhost
Deploy
KEDA
using its deployment YAML file$ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.11.2/keda-2.11.2-core.yaml
Verify the
KEDA
deployment status$ kubectl get deployment -n keda
Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE keda-metrics-apiserver 1/1 1 1 57s keda-operator 1/1 1 1 57s
Verify that the
KEDA
deployment status isREADY
Set Up a Single Node Kafka cluster using the Strimzi Operator
Install the Strimzi operator
$ kubectl create -f 'https://strimzi.io/install/latest?namespace=default'
Using a text editor such as
Nano
, create a new filekafka-cluster.yaml
$ nano kafka-cluster.yaml
Add the following contents to the file
apiVersion: kafka.strimzi.io/v1beta2 kind: Kafka metadata: name: my-cluster spec: kafka: version: 3.4.0 replicas: 1 listeners: - name: plain port: 9092 type: internal tls: false config: offsets.topic.replication.factor: 1 transaction.state.log.replication.factor: 1 transaction.state.log.min.isr: 1 default.replication.factor: 1 min.insync.replicas: 1 inter.broker.protocol.version: "3.4" storage: type: ephemeral zookeeper: replicas: 1 storage: type: ephemeral entityOperator: topicOperator: {} userOperator: {}
Save and close the file
Deploy the Kafka cluster
$ kubectl apply -f kafka-cluster.yaml
Wait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete
$ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s
When complete, the following output should display:
kafka.kafka.strimzi.io/my-cluster condition met
When the cluster creation is complete, create a topic
$ kubectl run kafka-topics -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1
Your output should look like the one below:
Created topic test-topic. pod "kafka-topics" deleted
Prepare the Kafka Consumer Application
Create a new Go module
$ go mod init kafka-consumer
Create a new file
main.go
$ nano main.go
Add the following contents to the file
package main import ( "context" "fmt" "log" "os" "os/signal" "strings" "syscall" "time" "github.com/twmb/franz-go/pkg/kgo" ) const consumerGroupName = "my-group" var kafkaBroker string var topic string var client *kgo.Client func init() { kafkaBroker = os.Getenv("KAFKA_BROKER") if kafkaBroker == "" { log.Fatal("missing env var KAFKA_BROKER") } topic = os.Getenv("KAFKA_TOPIC") if topic == "" { log.Fatal("missing env var KAFKA_TOPIC") } fmt.Println("KAFKA_BROKER", kafkaBroker) fmt.Println("KAFKA_TOPIC", topic) opts := []kgo.Opt{ kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...), kgo.ConsumeTopics(topic), kgo.ConsumerGroup(consumerGroupName), kgo.OnPartitionsAssigned(partitionsAssigned), kgo.OnPartitionsRevoked(partitionsRevoked), kgo.OnPartitionsLost(partitionsLost), } var err error client, err = kgo.NewClient(opts...) if err != nil { log.Fatal(err) } } func main() { go func() { fmt.Println("starting kafka consumer goroutine") for { err := client.Ping(context.Background()) if err != nil { log.Fatal("ping failed - ", err) } fmt.Println("fetching records....") fetches := client.PollRecords(context.Background(), 0) if fetches.IsClientClosed() { fmt.Println("kgo kafka client closed") return } fetches.EachError(func(t string, p int32, err error) { fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err) }) fetches.Records() fetches.EachRecord(func(r *kgo.Record) { fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value)) time.Sleep(3 * time.Second) err = client.CommitRecords(context.Background(), r) if err != nil { fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition) } else { fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition) } }) } }() end := make(chan os.Signal, 1) signal.Notify(end, syscall.SIGINT, syscall.SIGTERM) <-end client.Close() fmt.Println("kafka consumer exit") } func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) { fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic]) } func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) { fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic]) } func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) { fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic]) }
Save and close the file.
Below are what the above application parts do:
The
init
function:Fetches the Kafka broker and topic information from the environment variables
If these environment variables aren't provided, the program exits with an error code
Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost
The
main
function starts agoroutine
to poll and process Kafka records, then:The
goroutine
polls for Kafka records and logs any fetch errorsProcesses each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka
Listens for
SIGINT
orSIGTERM
to ensure that the application can be gracefully shut down using CTRL + C, or when receiving a termination signal.When the program detects a termination signal, the Kafka client closes, and the program exits
Deploy the Application to your VKE Cluster
Create a new
Dockerfile
to store Docker variables$ nano Dockerfile
Add the following contents to the file
FROM golang:1.19-buster AS build WORKDIR /app COPY go.mod ./ COPY go.sum ./ RUN go mod download COPY main.go ./ RUN go build -o /kafka-go-app FROM gcr.io/distroless/base-debian10 WORKDIR / COPY --from=build /kafka-go-app /kafka-go-app USER nonroot:nonroot ENTRYPOINT ["/kafka-go-app"]
Save and close the file
The above
Dockerfile
builds the Kafka consumer Go application using a multi-stage buildFetch the program Go module dependencies
$ go mod tidy
Log in to Docker using your active Docker Hub account
$ sudo docker login
Build the Docker image. Replace
example-user
with your actual Docker Hub ID$ sudo docker build -t example-user/myapp .
Push the image to Docker hub
$ sudo docker push example-user/myapp
Verify that the command is successful and a new
myapp
repository is available on your DockerHub profileCreate a new file
consumer.yaml
$ nano consumer.yaml
Add the following contents to the file. Replace
example-user/myapp
with your actual Docker repositoryapiVersion: apps/v1 kind: Deployment metadata: name: kafka-consumer-app labels: app: kafka-consumer-app spec: replicas: 1 selector: matchLabels: app: kafka-consumer-app template: metadata: labels: app: kafka-consumer-app spec: containers: - name: kafka-consumer-app-container image: example-user/myapp imagePullPolicy: Always env: - name: KAFKA_BROKER value: my-cluster-kafka-bootstrap:9092 - name: KAFKA_TOPIC value: test-topic
Save and close the file
Deploy the application to your cluster
$ kubectl apply -f consumer.yaml
Verify the application deployment status
$ kubectl get pods -l=app=kafka-consumer-app
Your output should look like the one below:
NAME READY STATUS RESTARTS AGE kafka-consumer-app-c4b67d694-mptlw 1/1 Running 0 2m12s
Verify that the
Pod
status changes toRunning
View the application logs
$ kubectl logs -f -l=app=kafka-consumer-app
Output:
KAFKA_BROKER my-cluster-kafka-bootstrap:9092 KAFKA_TOPIC test-topic starting kafka consumer goroutine fetching records.... partitions ASSIGNED for topic test-topic [0 1 2 3 4]
Enable Autoscaling
Create a new file
scaled-object.yaml
$ nano scaled-object.yaml
Add the following contents to the file
apiVersion: keda.sh/v1alpha1 kind: ScaledObject metadata: name: kafka-scaledobject spec: scaleTargetRef: name: kafka-consumer-app minReplicaCount: 1 maxReplicaCount: 5 pollingInterval: 30 triggers: - type: kafka metadata: bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092 consumerGroup: my-group topic: test-topic lagThreshold: "5" offsetResetPolicy: latest
Save and close the file
Deploy the
KEDA
scaled object$ kubectl apply -f scaled-object.yaml
Verify the Consumer Application Autoscaling
Monitor the number of consumer application pods
$ kubectl get pods -l=app=kafka-consumer-app -w
In a new terminal window, export the
KUBECONFIG
variable to activate Kubectl in the session$ export KUBECONFIG=/path/to/vke/YAML
Run the following command to send data to the Kafka topic
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-producer-perf-test.sh --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props linger.ms=0 batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092
The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the
test-topic
at a rate of 200 records per second.When successful, your output should look like the one below:
producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000 producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic} : 15.471 producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic} : 500.000 pod "kafka-producer" deleted
Wait for one minute, and verify the Kafka consumer application
Deployment
status$ kubectl get deployment/kafka-consumer-app
Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE kafka-consumer-app 5/5 5 5 2m48s
Verify that the deployment has scaled up to five pods
Navigate back to the monitoring terminal, and verify that your output looks like the one below:
kafka-consumer-app-6bc79dd94f-jb867 0/1 ContainerCreating 0 0s kafka-consumer-app-6bc79dd94f-59bg4 1/1 Running 0 3s kafka-consumer-app-6bc79dd94f-f5ll4 1/1 Running 0 5s kafka-consumer-app-6bc79dd94f-jb867 1/1 Running 0 7s kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s kafka-consumer-app-6bc79dd94f-wzrkp 0/1 ContainerCreating 0 0s kafka-consumer-app-6bc79dd94f-wzrkp 1/1 Running 0 3s
Press CTRL + C to exit the
watch
session
KEDA Autoscaling Results
Auto-scaling happens due to the KEDA ScaledObject
you created earlier. KEDA monitors the test-topic
Kafka topic for the consumer message lag, and when the unread message count exceeds 5
, KEDA
scales out the kafka-consumer-app
deployment with a maximum limit of 5
replicas. KEDA
checks the Kafka metrics every 30
seconds to make these scaling decisions.
KEDA also scales down to 1
replica when the consumer applications complete processing the messages and the consumer lag decreases.
Conclusion
In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a RCS Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject
, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.
Next Steps
To implement more solutions on your VKE cluster, visit the following resources: