Knowledgebase

Scale Apache Kafka Applications on RCS Kubernetes Engine with KEDA Print

  • 0

Introduction

Kafka is a distributed event streaming platform designed for high-throughput, fault-tolerance, and scalable data streaming and processing. It's based on a producer-consumer architecture where producers send data to Kafka topics and consumers retrieve messages from the Kafka topics. Consumer groups allow a set of consumers to work together and share data processing, this ensures that each message gets processed by only one consumer in the group.

Kubernetes Event-Driven Autoscaling (KEDA) is a cloud-native event-driven auto-scaler (CNCF project) for container workloads. In KEDA, scalers are extensible components that monitor external systems and produce metrics to drive the scaling process for Kubernetes workloads.

This guide explains how to use the KEDA scaler for Kafka on a RCS Kubernetes Engine (VKE) cluster to drive auto-scaling and allow Kafka consumer application pods to scale up and down based on the consumer group lag.

Prerequisites

Before you begin:

  • Deploy a fresh RCS Kubernetes Engine (VKE) cluster with at least 2 nodes

  • Deploy a Ubuntu development server to use for connecting to the cluster

  • Using SSH, access the server as a non-root sudo user

  • Install Kubectl to access the VKE cluster

  • Install Docker

  • Install the latest stable version of the Go programming language

    $ sudo apt install golang
    

Install the KEDA Operator

  1. Create a directory

    $ mkdir kafka-keda-RCS
    
  2. Switch to the directory

    $ cd kafka-keda-RCS
    
  3. Set the KUBECONFIG environment variable with the path to your VKE YAML file to grant kubectl access to the cluster

    $ export KUBECONFIG=/path/to/vke/YAML
    

    The above command allows Kubectl to use your VKE YAML file as the default cluster file instead of localhost

  4. Deploy KEDA using its deployment YAML file

    $ kubectl apply --server-side -f https://github.com/kedacore/keda/releases/download/v2.11.2/keda-2.11.2-core.yaml
    
  5. Verify the KEDA deployment status

    $ kubectl get deployment -n keda
    

    Your output should look like the one below:

    NAME                     READY   UP-TO-DATE   AVAILABLE   AGE
    
    keda-metrics-apiserver   1/1     1            1           57s
    
    keda-operator            1/1     1            1           57s
    

    Verify that the KEDA deployment status is READY

Set Up a Single Node Kafka cluster using the Strimzi Operator

  1. Install the Strimzi operator

    $ kubectl create -f 'https://strimzi.io/install/latest?namespace=default'
    
  2. Using a text editor such as Nano, create a new file kafka-cluster.yaml

    $ nano kafka-cluster.yaml
    
  3. Add the following contents to the file

    apiVersion: kafka.strimzi.io/v1beta2
    
    kind: Kafka
    
    metadata:
    
      name: my-cluster
    
    spec:
    
      kafka:
    
        version: 3.4.0
    
        replicas: 1
    
        listeners:
    
          - name: plain
    
            port: 9092
    
            type: internal
    
            tls: false
    
        config:
    
          offsets.topic.replication.factor: 1
    
          transaction.state.log.replication.factor: 1
    
          transaction.state.log.min.isr: 1
    
          default.replication.factor: 1
    
          min.insync.replicas: 1
    
          inter.broker.protocol.version: "3.4"
    
        storage:
    
          type: ephemeral
    
      zookeeper:
    
        replicas: 1
    
        storage:
    
          type: ephemeral
    
      entityOperator:
    
        topicOperator: {}
    
        userOperator: {}
    

    Save and close the file

  4. Deploy the Kafka cluster

    $ kubectl apply -f kafka-cluster.yaml
    
  5. Wait for a few minutes for the cluster creation to complete. Run the following command to wait for the cluster creation to complete

    $ kubectl wait kafka/my-cluster --for=condition=Ready --timeout=600s
    

    When complete, the following output should display:

    kafka.kafka.strimzi.io/my-cluster condition met
    
  6. When the cluster creation is complete, create a topic

    $ kubectl run kafka-topics -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --create --topic test-topic --partitions 5 --replication-factor 1
    

    Your output should look like the one below:

    Created topic test-topic.
    
    pod "kafka-topics" deleted
    

Prepare the Kafka Consumer Application

  1. Create a new Go module

    $ go mod init kafka-consumer
    
  2. Create a new file main.go

    $ nano main.go
    
  3. Add the following contents to the file

    package main
    
    
    
    import (
    
        "context"
    
        "fmt"
    
        "log"
    
        "os"
    
        "os/signal"
    
        "strings"
    
        "syscall"
    
        "time"
    
    
    
        "github.com/twmb/franz-go/pkg/kgo"
    
    )
    
    
    
    const consumerGroupName = "my-group"
    
    
    
    var kafkaBroker string
    
    var topic string
    
    var client *kgo.Client
    
    
    
    func init() {
    
    
    
        kafkaBroker = os.Getenv("KAFKA_BROKER")
    
        if kafkaBroker == "" {
    
            log.Fatal("missing env var KAFKA_BROKER")
    
        }
    
    
    
        topic = os.Getenv("KAFKA_TOPIC")
    
        if topic == "" {
    
            log.Fatal("missing env var KAFKA_TOPIC")
    
        }
    
    
    
        fmt.Println("KAFKA_BROKER", kafkaBroker)
    
        fmt.Println("KAFKA_TOPIC", topic)
    
    
    
        opts := []kgo.Opt{
    
            kgo.SeedBrokers(strings.Split(kafkaBroker, ",")...),
    
            kgo.ConsumeTopics(topic),
    
            kgo.ConsumerGroup(consumerGroupName),
    
            kgo.OnPartitionsAssigned(partitionsAssigned),
    
            kgo.OnPartitionsRevoked(partitionsRevoked),
    
            kgo.OnPartitionsLost(partitionsLost),
    
        }
    
    
    
        var err error
    
    
    
        client, err = kgo.NewClient(opts...)
    
    
    
        if err != nil {
    
            log.Fatal(err)
    
        }
    
    }
    
    
    
    func main() {
    
    
    
        go func() {
    
            fmt.Println("starting kafka consumer goroutine")
    
            for {
    
                err := client.Ping(context.Background())
    
                if err != nil {
    
                    log.Fatal("ping failed - ", err)
    
                }
    
                fmt.Println("fetching records....")
    
    
    
                fetches := client.PollRecords(context.Background(), 0)
    
    
    
                if fetches.IsClientClosed() {
    
                    fmt.Println("kgo kafka client closed")
    
                    return
    
                }
    
                fetches.EachError(func(t string, p int32, err error) {
    
                    fmt.Printf("fetch err - topic %s partition %d: %v\n", t, p, err)
    
                })
    
    
    
                fetches.Records()
    
    
    
                fetches.EachRecord(func(r *kgo.Record) {
    
                    fmt.Printf("got record from partition %v key=%s val=%s\n", r.Partition, string(r.Key), string(r.Value))
    
    
    
                    time.Sleep(3 * time.Second)
    
                    err = client.CommitRecords(context.Background(), r)
    
    
    
                    if err != nil {
    
                        fmt.Println("commit failed for record with offset", r.Offset, "in partition", r.Partition)
    
                    } else {
    
                        fmt.Println("committed record with offset", r.Offset, "in partition", r.Partition)
    
                    }
    
                })
    
            }
    
        }()
    
    
    
        end := make(chan os.Signal, 1)
    
        signal.Notify(end, syscall.SIGINT, syscall.SIGTERM)
    
    
    
        <-end
    
    
    
        client.Close()
    
        fmt.Println("kafka consumer exit")
    
    }
    
    
    
    func partitionsAssigned(ctx context.Context, c *kgo.Client, m map[string][]int32) {
    
        fmt.Printf("partitions ASSIGNED for topic %s %v\n", topic, m[topic])
    
    }
    
    
    
    func partitionsRevoked(ctx context.Context, c *kgo.Client, m map[string][]int32) {
    
        fmt.Printf("partitions REVOKED for topic %s %v\n", topic, m[topic])
    
    }
    
    
    
    func partitionsLost(ctx context.Context, c *kgo.Client, m map[string][]int32) {
    
        fmt.Printf("partitions LOST for topic %s %v\n", topic, m[topic])
    
    }
    

    Save and close the file.

    Below are what the above application parts do:

    • The init function:

      • Fetches the Kafka broker and topic information from the environment variables

      • If these environment variables aren't provided, the program exits with an error code

      • Initializes the Kafka client with various configuration options, Kafka broker addresses, topic to consume, consumer group name, and sends a callback when a partition gets assigned, revoked, or lost

    • The main function starts a goroutine to poll and process Kafka records, then:

      • The goroutine polls for Kafka records and logs any fetch errors

      • Processes each record (simulates some processing with a 3-second sleep), prints the record's details, and commits the record to Kafka

      • Listens for SIGINT or SIGTERM to ensure that the application can be gracefully shut down using CTRL + C, or when receiving a termination signal.

      • When the program detects a termination signal, the Kafka client closes, and the program exits

Deploy the Application to your VKE Cluster

  1. Create a new Dockerfile to store Docker variables

    $ nano Dockerfile
    
  2. Add the following contents to the file

    FROM golang:1.19-buster AS build
    
    
    
    WORKDIR /app
    
    COPY go.mod ./
    
    COPY go.sum ./
    
    
    
    RUN go mod download
    
    
    
    COPY main.go ./
    
    RUN go build -o /kafka-go-app
    
    
    
    FROM gcr.io/distroless/base-debian10
    
    WORKDIR /
    
    COPY --from=build /kafka-go-app /kafka-go-app
    
    USER nonroot:nonroot
    
    ENTRYPOINT ["/kafka-go-app"]
    

    Save and close the file

    The above Dockerfile builds the Kafka consumer Go application using a multi-stage build

  3. Fetch the program Go module dependencies

    $ go mod tidy
    
  4. Log in to Docker using your active Docker Hub account

    $ sudo docker login
    
  5. Build the Docker image. Replace example-user with your actual Docker Hub ID

    $ sudo docker build -t example-user/myapp .
    
  6. Push the image to Docker hub

    $ sudo docker push example-user/myapp
    

    Verify that the command is successful and a new myapp repository is available on your DockerHub profile

  7. Create a new file consumer.yaml

    $ nano consumer.yaml
    
  8. Add the following contents to the file. Replace example-user/myapp with your actual Docker repository

    apiVersion: apps/v1
    
    kind: Deployment
    
    metadata:
    
      name: kafka-consumer-app
    
      labels:
    
        app: kafka-consumer-app
    
    spec:
    
      replicas: 1
    
      selector:
    
        matchLabels:
    
          app: kafka-consumer-app
    
      template:
    
        metadata:
    
          labels:
    
            app: kafka-consumer-app
    
        spec:
    
          containers:
    
          - name: kafka-consumer-app-container
    
            image: example-user/myapp
    
            imagePullPolicy: Always
    
            env:
    
              - name: KAFKA_BROKER
    
                value: my-cluster-kafka-bootstrap:9092
    
              - name: KAFKA_TOPIC
    
                value: test-topic
    

    Save and close the file

  9. Deploy the application to your cluster

    $ kubectl apply -f consumer.yaml
    
  10. Verify the application deployment status

    $ kubectl get pods -l=app=kafka-consumer-app
    

    Your output should look like the one below:

    NAME                                 READY   STATUS    RESTARTS   AGE
    
    kafka-consumer-app-c4b67d694-mptlw   1/1     Running   0          2m12s
    

    Verify that the Pod status changes to Running

  11. View the application logs

    $ kubectl logs -f -l=app=kafka-consumer-app
    

    Output:

    KAFKA_BROKER my-cluster-kafka-bootstrap:9092
    
    KAFKA_TOPIC test-topic
    
    starting kafka consumer goroutine
    
    fetching records....
    
    partitions ASSIGNED for topic test-topic [0 1 2 3 4]
    

Enable Autoscaling

  1. Create a new file scaled-object.yaml

    $ nano scaled-object.yaml
    
  2. Add the following contents to the file

    apiVersion: keda.sh/v1alpha1
    
    kind: ScaledObject
    
    metadata:
    
      name: kafka-scaledobject
    
    spec:
    
      scaleTargetRef:
    
        name: kafka-consumer-app
    
      minReplicaCount: 1
    
      maxReplicaCount: 5
    
      pollingInterval: 30
    
      triggers:
    
      - type: kafka
    
        metadata:
    
          bootstrapServers: my-cluster-kafka-bootstrap.default.svc.cluster.local:9092
    
          consumerGroup: my-group
    
          topic: test-topic
    
          lagThreshold: "5"
    
          offsetResetPolicy: latest
    

    Save and close the file

  3. Deploy the KEDA scaled object

    $ kubectl apply -f scaled-object.yaml
    

Verify the Consumer Application Autoscaling

  1. Monitor the number of consumer application pods

    $ kubectl get pods -l=app=kafka-consumer-app -w
    
  2. In a new terminal window, export the KUBECONFIG variable to activate Kubectl in the session

    $ export KUBECONFIG=/path/to/vke/YAML
    
  3. Run the following command to send data to the Kafka topic

    $ kubectl run kafka-producer -ti --image=quay.io/strimzi/kafka:0.32.0-kafka-3.3.1 --rm=true --restart=Never -- bin/kafka-producer-perf-test.sh --throughput 200 --record-size 1000 --num-records 500 --topic test-topic --print-metrics --producer-props linger.ms=0 batch.size=16384 bootstrap.servers=my-cluster-kafka-bootstrap:9092
    

    The above native Kafka producer performance script generates the load that sends 500 records of 1000 bytes each to the test-topic at a rate of 200 records per second.

    When successful, your output should look like the one below:

    producer-topic-metrics:record-retry-total:{client-id=perf-producer-client, topic=test-topic} : 0.000
    
    producer-topic-metrics:record-send-rate:{client-id=perf-producer-client, topic=test-topic}   : 15.471
    
    producer-topic-metrics:record-send-total:{client-id=perf-producer-client, topic=test-topic}  : 500.000
    
    pod "kafka-producer" deleted
    
  4. Wait for one minute, and verify the Kafka consumer application Deployment status

    $ kubectl get deployment/kafka-consumer-app
    

    Your output should look like the one below:

    NAME                 READY   UP-TO-DATE   AVAILABLE   AGE
    
    kafka-consumer-app   5/5     5            5           2m48s
    

    Verify that the deployment has scaled up to five pods

  5. Navigate back to the monitoring terminal, and verify that your output looks like the one below:

    kafka-consumer-app-6bc79dd94f-jb867   0/1     ContainerCreating   0          0s
    
    kafka-consumer-app-6bc79dd94f-59bg4   1/1     Running             0          3s
    
    kafka-consumer-app-6bc79dd94f-f5ll4   1/1     Running             0          5s
    
    kafka-consumer-app-6bc79dd94f-jb867   1/1     Running             0          7s
    
    kafka-consumer-app-6bc79dd94f-wzrkp   0/1     Pending             0          0s
    
    kafka-consumer-app-6bc79dd94f-wzrkp   0/1     Pending             0          0s
    
    kafka-consumer-app-6bc79dd94f-wzrkp   0/1     ContainerCreating   0          0s
    
    kafka-consumer-app-6bc79dd94f-wzrkp   1/1     Running             0          3s
    

    Press CTRL + C to exit the watch session

KEDA Autoscaling Results

Auto-scaling happens due to the KEDA ScaledObject you created earlier. KEDA monitors the test-topic Kafka topic for the consumer message lag, and when the unread message count exceeds 5, KEDA scales out the kafka-consumer-app deployment with a maximum limit of 5 replicas. KEDA checks the Kafka metrics every 30 seconds to make these scaling decisions.

KEDA also scales down to 1 replica when the consumer applications complete processing the messages and the consumer lag decreases.

Conclusion

In this guide, you used KEDA to auto-scale a Kafka consumer application deployed on a RCS Kubernetes Engine (VKE) cluster. You set up KEDA and Kafka, deployed the application, used KEDA ScaledObject, and verified the auto-scaling behavior. For more information about KEDA, visit the official documentation.

Next Steps

To implement more solutions on your VKE cluster, visit the following resources:


Was this answer helpful?
Back

Powered by WHMCompleteSolution