Introduction
Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.
Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.
This guide explains how to use the KEDA scaler for Kafka on a Rcs Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.
Prerequisites
Before you begin:
Deploy a fresh Rcs Kubernetes Engine (VKE) cluster with at least 2 nodes
Deploy a Ubuntu development server to use for connecting to the cluster
Using SSH, access the server as a non-root sudo user
Install Kubectl to access the VKE cluster
Install Docker
Install the latest stable version of the Go programming language
$ sudo apt install golang
Install the KEDA Operator
Create a directory
$ mkdir kafka-keda-vultr
Switch to the directory
$ cd kafka-keda-vultr
Set the KUBECONFIG
environment variable with the path to your VKE YAML file to grant kubectl
access to the cluster
$ export KUBECONFIG=/path/to/vke/YAML
The above command allows Kubectl to use your VKE YAML file as the default cluster file instead of localhost
Deploy KEDA
using its deployment YAML file
$ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.11.2/keda-2.11.2-core.yaml
Verify the KEDA
deployment status
$ kubectl get deployment -n keda
Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE
keda-metrics-apiserver 1/1 1 1 57s
keda-operator 1/1 1 1 57s
Verify that the KEDA
deployment status is READY
Set Up a Single Node Kafka cluster using the Strimzi Operator
Install the Strimzi operator
$ kubectl create -f 'https://strimzi.io/install/latest?namespace=default'
Using a text editor such as Nano
, create a new file kafka-cluster.yaml
$ nano kafka-cluster.yaml
Add the following contents to the file
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.4.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.4"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Save and close the file
Deploy the Kafka cluster
$ kubectl apply -f kafka-cluster.yaml
Wait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete
$ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s
When complete, the following output should display:
kafka.kafka.strimzi.io/my-cluster condition met
When the cluster creation is complete, create a topic
$ kubectl run kafka-topics -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1
Your output should look like the one below:
Created topic test-topic.
pod "kafka-topics" deleted
Prepare the Kafka Consumer Application
Create a new Go module
$ go mod init kafka-consumer
Create a new file main.go
$ nano main.go
Add the following contents to the file
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
const consumerGroupName = "my-group"
var kafkaBroker string
var topic string
var client *kgo.Client
func init() {
kafkaBroker = os.Getenv("KAFKA_BROKER")
if kafkaBroker == "" {
log.Fatal("missing env var KAFKA_BROKER")
}
topic = os.Getenv("KAFKA_TOPIC")
if topic == "" {
log.Fatal("missing env var KAFKA_TOPIC")
}
fmt.Println("KAFKA_BROKER", kafkaBroker)
fmt.Println("KAFKA_TOPIC", topic)
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...),
kgo.ConsumeTopics(topic),
kgo.ConsumerGroup(consumerGroupName),
kgo.OnPartitionsAssigned(partitionsAssigned),
kgo.OnPartitionsRevoked(partitionsRevoked),
kgo.OnPartitionsLost(partitionsLost),
}
var err error
client, err = kgo.NewClient(opts...)
if err != nil {
log.Fatal(err)
}
}
func main() {
go func() {
fmt.Println("starting kafka consumer goroutine")
for {
err := client.Ping(context.Background())
if err != nil {
log.Fatal("ping failed - ", err)
}
fmt.Println("fetching records....")
fetches := client.PollRecords(context.Background(), 0)
if fetches.IsClientClosed() {
fmt.Println("kgo kafka client closed")
return
}
fetches.EachError(func(t string, p int32, err error) {
fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err)
})
fetches.Records()
fetches.EachRecord(func(r *kgo.Record) {
fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value))
time.Sleep(3 * time.Second)
err = client.CommitRecords(context.Background(), r)
if err != nil {
fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition)
} else {
fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition)
}
})
}
}()
end := make(chan os.Signal, 1)
signal.Notify(end, syscall.SIGINT, syscall.SIGTERM)
<-end
client.Close()
fmt.Println("kafka consumer exit")
}
func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) {
fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic])
}
func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) {
fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic])
}
func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) {
fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic])
}
Save and close the file.
Below are what the above application parts do:
- The
init
function:
- Fetches the Kafka broker and topic information from the environment variables
- If these environment variables aren't provided, the program exits with an error code
- Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost
- The
main
function starts a goroutine
to poll and process Kafka records, then:
- The
goroutine
polls for Kafka records and logs any fetch errors
- Processes each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka
- Listens for
SIGINT
or SIGTERM
to ensure that the application can be gracefully shut down using Ctrl + C, or when receiving a termination signal.
- When the program detects a termination signal, the Kafka client closes, and the program exits
Deploy the Application to your VKE Cluster
Create a new Dockerfile
to store Docker variables
$ nano Dockerfile
Add the following contents to the file
FROM golang:1.19-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY main.go ./
RUN go build -o /kafka-go-app
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /kafka-go-app /kafka-go-app
USER nonroot:nonroot
ENTRYPOINT ["/kafka-go-app"]
Save and close the file
The above Dockerfile
builds the Kafka consumer Go application using a multi-stage build
Fetch the program Go module dependencies
$ go mod tidy
Log in to Docker using your active Docker Hub account
$ sudo docker login
Build the Docker image. Replace example-user
with your actual Docker Hub ID
$ sudo docker build -t example-user/myapp .
Push the image to Docker hub
$ sudo docker push example-user/myapp
Verify that the command is successful and a new myapp
repository is available on your DockerHub profile
Create a new file consumer.yaml
$ nano consumer.yaml
Add the following contents to the file. Replace example-user/myapp
with your actual Docker repository
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer-app
labels:
app: kafka-consumer-app
spec:
replicas: 1
selector:
matchLabels:
app: kafka-consumer-app
template:
metadata:
labels:
app: kafka-consumer-app
spec:
containers:
- name: kafka-consumer-app-container
image: example-user/myapp
imagePullPolicy: Always
env:
- name: KAFKA_BROKER
value: my-cluster-kafka-bootstrap:9092
- name: KAFKA_TOPIC
value: test-topic
Save and close the file
Deploy the application to your cluster
$ kubectl apply -f consumer.yaml
Verify the application deployment status
$ kubectl get pods -l=app=kafka-consumer-app
Your output should look like the one below:
NAME READY STATUS RESTARTS AGE
kafka-consumer-app-c4b67d694-mptlw 1/1 Running 0 2m12s
Verify that the Pod
status changes to Running
View the application logs
$ kubectl logs -f -l=app=kafka-consumer-app
Output:
KAFKA_BROKER my-cluster-kafka-bootstrap:9092
KAFKA_TOPIC test-topic
starting kafka consumer goroutine
fetching records....
partitions ASSIGNED for topic test-topic [0 1 2 3 4]
Enable Autoscaling
Create a new file scaled-object.yaml
$ nano scaled-object.yaml
Add the following contents to the file
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafka-scaledobject
spec:
scaleTargetRef:
name: kafka-consumer-app
minReplicaCount: 1
maxReplicaCount: 5
pollingInterval: 30
triggers:
- type: kafka
metadata:
bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092
consumerGroup: my-group
topic: test-topic
lagThreshold: "5"
offsetResetPolicy: latest
Save and close the file
Deploy the KEDA
scaled object
$ kubectl apply -f scaled-object.yaml
Verify the Consumer Application Autoscaling
Monitor the number of consumer application pods
$ kubectl get pods -l=app=kafka-consumer-app -w
In a new terminal window, export the KUBECONFIG
variable to activate Kubectl in the session
$ export KUBECONFIG=/path/to/vke/YAML
Run the following command to send data to the Kafka topic
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-producer-perf-test.sh --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props linger.ms=0 batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092
The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the test-topic
at a rate of 200 records per second.
When successful, your output should look like the one below:
producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic} : 15.471
producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic} : 500.000
pod "kafka-producer" deleted
Wait for one minute, and verify the Kafka consumer application Deployment
status
$ kubectl get deployment/kafka-consumer-app
Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE
kafka-consumer-app 5/5 5 5 2m48s
Verify that the deployment has scaled up to five pods
Navigate back to the monitoring terminal, and verify that your output looks like the one below:
kafka-consumer-app-6bc79dd94f-jb867 0/1 ContainerCreating 0 0s
kafka-consumer-app-6bc79dd94f-59bg4 1/1 Running 0 3s
kafka-consumer-app-6bc79dd94f-f5ll4 1/1 Running 0 5s
kafka-consumer-app-6bc79dd94f-jb867 1/1 Running 0 7s
kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s
kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s
kafka-consumer-app-6bc79dd94f-wzrkp 0/1 ContainerCreating 0 0s
kafka-consumer-app-6bc79dd94f-wzrkp 1/1 Running 0 3s
Press Ctrl + C to exit the watch
session
KEDA Autoscaling Results
Auto-scaling happens due to the KEDA ScaledObject
you created earlier. KEDA monitors the test-topic
Kafka topic for the consumer message lag, and when the unread message count exceeds 5
, KEDA
scales out the kafka-consumer-app
deployment with a maximum limit of 5
replicas. KEDA
checks the Kafka metrics every 30
seconds to make these scaling decisions.
KEDA also scales down to 1
replica when the consumer applications complete processing the messages and the consumer lag decreases.
Conclusion
In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a Rcs Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject
, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.
Next Steps
To implement more solutions on your VKE cluster, visit the following resources:
Introduction
Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.
Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.
This guide explains how to use the KEDA scaler for Kafka on a Rcs Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.
Prerequisites
Before you begin:
Deploy a fresh Rcs Kubernetes Engine (VKE) cluster with at least 2 nodes
Deploy a Ubuntu development server to use for connecting to the cluster
Using SSH, access the server as a non-root sudo user
Install Kubectl to access the VKE cluster
Install Docker
Install the latest stable version of the Go programming language
$ sudo apt install golang
Install the KEDA Operator
Create a directory
$ mkdir kafka-keda-vultr
Switch to the directory
$ cd kafka-keda-vultr
Set the KUBECONFIG environment variable with the path to your VKE YAML file to grant kubectl access to the cluster
$ export KUBECONFIG=/path/to/vke/YAML
The above command allows Kubectl to use your VKE YAML file as the default cluster file instead of localhost
Deploy KEDA using its deployment YAML file
$ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.11.2/keda-2.11.2-core.yaml
Verify the KEDA deployment status
$ kubectl get deployment -n keda
Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE
keda-metrics-apiserver 1/1 1 1 57s
keda-operator 1/1 1 1 57s
Verify that the KEDA deployment status is READY
Set Up a Single Node Kafka cluster using the Strimzi Operator
Install the Strimzi operator
$ kubectl create -f 'https://strimzi.io/install/latest?namespace=default'
Using a text editor such as Nano, create a new file kafka-cluster.yaml
$ nano kafka-cluster.yaml
Add the following contents to the file
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.4.0
replicas: 1
listeners:
- name: plain
port: 9092
type: internal
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
default.replication.factor: 1
min.insync.replicas: 1
inter.broker.protocol.version: "3.4"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
Save and close the file
Deploy the Kafka cluster
$ kubectl apply -f kafka-cluster.yaml
Wait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete
$ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s
When complete, the following output should display:
kafka.kafka.strimzi.io/my-cluster condition met
When the cluster creation is complete, create a topic
$ kubectl run kafka-topics -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1
Your output should look like the one below:
Created topic test-topic.
pod "kafka-topics" deleted
Prepare the Kafka Consumer Application
Create a new Go module
$ go mod init kafka-consumer
Create a new file main.go
$ nano main.go
Add the following contents to the file
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
"github.com/twmb/franz-go/pkg/kgo"
)
const consumerGroupName = "my-group"
var kafkaBroker string
var topic string
var client *kgo.Client
func init() {
kafkaBroker = os.Getenv("KAFKA_BROKER")
if kafkaBroker == "" {
log.Fatal("missing env var KAFKA_BROKER")
}
topic = os.Getenv("KAFKA_TOPIC")
if topic == "" {
log.Fatal("missing env var KAFKA_TOPIC")
}
fmt.Println("KAFKA_BROKER", kafkaBroker)
fmt.Println("KAFKA_TOPIC", topic)
opts := []kgo.Opt{
kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...),
kgo.ConsumeTopics(topic),
kgo.ConsumerGroup(consumerGroupName),
kgo.OnPartitionsAssigned(partitionsAssigned),
kgo.OnPartitionsRevoked(partitionsRevoked),
kgo.OnPartitionsLost(partitionsLost),
}
var err error
client, err = kgo.NewClient(opts...)
if err != nil {
log.Fatal(err)
}
}
func main() {
go func() {
fmt.Println("starting kafka consumer goroutine")
for {
err := client.Ping(context.Background())
if err != nil {
log.Fatal("ping failed - ", err)
}
fmt.Println("fetching records....")
fetches := client.PollRecords(context.Background(), 0)
if fetches.IsClientClosed() {
fmt.Println("kgo kafka client closed")
return
}
fetches.EachError(func(t string, p int32, err error) {
fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err)
})
fetches.Records()
fetches.EachRecord(func(r *kgo.Record) {
fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value))
time.Sleep(3 * time.Second)
err = client.CommitRecords(context.Background(), r)
if err != nil {
fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition)
} else {
fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition)
}
})
}
}()
end := make(chan os.Signal, 1)
signal.Notify(end, syscall.SIGINT, syscall.SIGTERM)
<-end
client.Close()
fmt.Println("kafka consumer exit")
}
func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) {
fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic])
}
func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) {
fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic])
}
func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) {
fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic])
}
Save and close the file.
Below are what the above application parts do:
The init function:
Fetches the Kafka broker and topic information from the environment variables
If these environment variables aren't provided, the program exits with an error code
Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost
The main function starts a goroutine to poll and process Kafka records, then:
The goroutine polls for Kafka records and logs any fetch errors
Processes each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka
Listens for SIGINT or SIGTERM to ensure that the application can be gracefully shut down using CTRL + C, or when receiving a termination signal.
When the program detects a termination signal, the Kafka client closes, and the program exits
Deploy the Application to your VKE Cluster
Create a new Dockerfile to store Docker variables
$ nano Dockerfile
Add the following contents to the file
FROM golang:1.19-buster AS build
WORKDIR /app
COPY go.mod ./
COPY go.sum ./
RUN go mod download
COPY main.go ./
RUN go build -o /kafka-go-app
FROM gcr.io/distroless/base-debian10
WORKDIR /
COPY --from=build /kafka-go-app /kafka-go-app
USER nonroot:nonroot
ENTRYPOINT ["/kafka-go-app"]
Save and close the file
The above Dockerfile builds the Kafka consumer Go application using a multi-stage build
Fetch the program Go module dependencies
$ go mod tidy
Log in to Docker using your active Docker Hub account
$ sudo docker login
Build the Docker image. Replace example-user with your actual Docker Hub ID
$ sudo docker build -t example-user/myapp .
Push the image to Docker hub
$ sudo docker push example-user/myapp
Verify that the command is successful and a new myapp repository is available on your DockerHub profile
Create a new file consumer.yaml
$ nano consumer.yaml
Add the following contents to the file. Replace example-user/myapp with your actual Docker repository
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-consumer-app
labels:
app: kafka-consumer-app
spec:
replicas: 1
selector:
matchLabels:
app: kafka-consumer-app
template:
metadata:
labels:
app: kafka-consumer-app
spec:
containers:
- name: kafka-consumer-app-container
image: example-user/myapp
imagePullPolicy: Always
env:
- name: KAFKA_BROKER
value: my-cluster-kafka-bootstrap:9092
- name: KAFKA_TOPIC
value: test-topic
Save and close the file
Deploy the application to your cluster
$ kubectl apply -f consumer.yaml
Verify the application deployment status
$ kubectl get pods -l=app=kafka-consumer-app
Your output should look like the one below:
NAME READY STATUS RESTARTS AGE
kafka-consumer-app-c4b67d694-mptlw 1/1 Running 0 2m12s
Verify that the Pod status changes to Running
View the application logs
$ kubectl logs -f -l=app=kafka-consumer-app
Output:
KAFKA_BROKER my-cluster-kafka-bootstrap:9092
KAFKA_TOPIC test-topic
starting kafka consumer goroutine
fetching records....
partitions ASSIGNED for topic test-topic [0 1 2 3 4]
Enable Autoscaling
Create a new file scaled-object.yaml
$ nano scaled-object.yaml
Add the following contents to the file
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafka-scaledobject
spec:
scaleTargetRef:
name: kafka-consumer-app
minReplicaCount: 1
maxReplicaCount: 5
pollingInterval: 30
triggers:
- type: kafka
metadata:
bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092
consumerGroup: my-group
topic: test-topic
lagThreshold: "5"
offsetResetPolicy: latest
Save and close the file
Deploy the KEDA scaled object
$ kubectl apply -f scaled-object.yaml
Verify the Consumer Application Autoscaling
Monitor the number of consumer application pods
$ kubectl get pods -l=app=kafka-consumer-app -w
In a new terminal window, export the KUBECONFIG variable to activate Kubectl in the session
$ export KUBECONFIG=/path/to/vke/YAML
Run the following command to send data to the Kafka topic
$ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-producer-perf-test.sh --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props linger.ms=0 batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092
The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the test-topic at a rate of 200 records per second.
When successful, your output should look like the one below:
producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000
producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic} : 15.471
producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic} : 500.000
pod "kafka-producer" deleted
Wait for one minute, and verify the Kafka consumer application Deployment status
$ kubectl get deployment/kafka-consumer-app
Your output should look like the one below:
NAME READY UP-TO-DATE AVAILABLE AGE
kafka-consumer-app 5/5 5 5 2m48s
Verify that the deployment has scaled up to five pods
Navigate back to the monitoring terminal, and verify that your output looks like the one below:
kafka-consumer-app-6bc79dd94f-jb867 0/1 ContainerCreating 0 0s
kafka-consumer-app-6bc79dd94f-59bg4 1/1 Running 0 3s
kafka-consumer-app-6bc79dd94f-f5ll4 1/1 Running 0 5s
kafka-consumer-app-6bc79dd94f-jb867 1/1 Running 0 7s
kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s
kafka-consumer-app-6bc79dd94f-wzrkp 0/1 Pending 0 0s
kafka-consumer-app-6bc79dd94f-wzrkp 0/1 ContainerCreating 0 0s
kafka-consumer-app-6bc79dd94f-wzrkp 1/1 Running 0 3s
Press CTRL + C to exit the watch session
KEDA Autoscaling Results
Auto-scaling happens due to the KEDA ScaledObject you created earlier. KEDA monitors the test-topic Kafka topic for the consumer message lag, and when the unread message count exceeds 5, KEDA scales out the kafka-consumer-app deployment with a maximum limit of 5 replicas. KEDA checks the Kafka metrics every 30 seconds to make these scaling decisions.
KEDA also scales down to 1 replica when the consumer applications complete processing the messages and the consumer lag decreases.
Conclusion
In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a Rcs Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.
Next Steps
To implement more solutions on your VKE cluster, visit the following resources:
Scale Redis Applications on Rcs Kubernetes Engine with KEDA
How to Run a Data Pipeline on Rcs Kubernetes Engine (VKE) Using Kafka Connect
Integrate MySQL and Apache Kafka using Change Data Capture with Debezium
Integrate PostgreSQL and Apache Kafka with Debezium