Knowledgebase

How to Use RCS Managed Databases for MySQL in Rust Print

  • 0

Introduction

This article demonstrates how to integrate a RCS Managed Database for MySQL with Apache Kafka using the Debezium source connector for MySQL. Behind the scenes, this is made possible by Change Data Capture (also referred to as CDC), which can track row-level create, update and delete operations in MySQL tables. These change data capture events can be used to integrate MySQL with other systems.

Change data capture is a popular solution, and it is often combined with Apache Kafka and Kafka Connect.

Apache Kafka

Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate these data streams with other parts of your system in a secure, reliable, and scalable manner.

Some key components of Kafka include:

  • Broker (Node): A Kafka broker runs the Kafka JVM process. A best practice is to run three or more brokers for scalability and high availability. These groups of Kafka brokers form a cluster.

  • Producers: These are client applications that send messages to Kafka. Each message is nothing but a key-value pair.

  • Topics: Events (messages) are stored in topics, and each topic has one or more partitions. Data in each of these partitions are distributed across the Kafka cluster for high availability and redundancy.

  • Consumers: Just like producers, consumers are also client applications. They receive and process data/events from Kafka topics.

Kafka Connect

To build integration solutions, you can use the Kafka Connect framework, which provides a suite of connectors to integrate Kafka with external systems. There are two types of Kafka connectors:

  • Source connector: Used to move data from source systems to Kafka topics.

  • Sink connector: Used to send data from Kafka topics into target (sink) systems.

Debezium

Debezium is a set of source connectors for Kafka Connect. You can use it to capture changes in your databases so that your applications can respond to them in real time. It records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.

When it comes to integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by polling the database table to retrieve data, while Debezium relies on change data capture.

Demonstration Scenario

The solution demonstrated in this article consists of the following components:

  • MySQL - You will set up a RCS Managed Database for MySQL.

  • Apache Kafka and Zookeeper - These will run as Docker containers and are orchestrated using Docker compose.

  • Kafka Connect - A Kafka Connect instance will also run as a Docker container. The Debezium MySQL connector will be installed in this instance.

You will create a sample table (called Persons) in the MySQL database. Changes made to data in this table will be detected by the Debezium MySQL connector and sent to a Kafka topic. Underneath the hood, the connector reads from the MySQL binlog and produces change events for row-level operations.

Prerequisites

Before following the steps in this guide, you need to:

  1. Deploy a new Ubuntu 22.04 LTS RCS cloud server for Kafka.

  2. Create a non-root sudo user.

  3. Install Docker.

  4. Install Docker Compose

  5. Install mysql which is an command-line client for MySQL. Because this guide uses a RCS managed database, you only need the MySQL command-line client to query the database.

    $ sudo apt install mysql-client
    
  6. Install curl, which is a popular command-line HTTP client.

Create a MySQL Database

Log into your RCS account, navigate to Add Managed Database and follow the below steps.

  1. Choose the MySQL database engine.

    Choose the MySQL database engine

  2. Choose a Server Type: Cloud Compute, Cloud Compute High Performance - AMD or Intel, Optimized Cloud Compute - General Purpose, Storage or Memory Optimized. Along with that, you should also select zero or more replica nodes as well the cluster location. A replica node is the same server type and plan as the primary node. You can opt for the Cloud Compute Server Type without a replica node.

    Choose server type and replica nodes

  3. After you add a label for the database cluster, click Deploy Now to create the cluster. It will take a few minutes for the cluster to be available and the Status should change to Running.

    Running state

After the database is ready, you can proceed with the next steps.

Create a Table

Get the connection string details for the MySQL database.

  1. In the Managed Database section of the RCS customer portal, click the Manage icon to open the Overview tab.

  2. From Connection Details section, click Copy Connection String.

  3. SSH as the non-root user to the Kafka server you deployed in the Prerequisites section.

  4. In the SSH session, paste the mysql connection string you copied from the RCS customer portal.

    After a successful database connection, you should see this prompt as the output:

    mysql>
    
  5. Switch to the correct database:

    use defaultdb;
    

    You should see this output:

    Database changed
    
  6. To create the Persons table, enter this SQL:

    CREATE TABLE Persons (
    
        Personid int NOT NULL AUTO_INCREMENT,
    
        LastName varchar(255) NOT NULL,
    
        FirstName varchar(255),
    
        Age int,
    
        PRIMARY KEY (Personid)
    
    );
    

    After successful table creation, you should see this output:

    Query OK, 0 rows affected (0.28 sec)
    
  7. To confirm, you can query the table:

    select * from Persons;
    

    Because there are no rows in the table, you should see this output:

    Empty set (0.26 sec)
    

Start the Services

  1. On the Kafka server, create a new directory and change to it.

    $ mkdir RCS-mysql-kafka
    
    $ cd RCS-mysql-kafka
    
  2. Create a new file docker-compose.yaml:

    $ touch docker-compose.yaml
    
  3. Add the following to docker-compose.yaml file and save it:

    version: "2"
    
    services:
    
        zookeeper:
    
            image: quay.io/debezium/zookeeper
    
            ports:
    
            - 2181:2181
    
        kafka:
    
            image: quay.io/debezium/kafka
    
            ports:
    
            - 9092:9092
    
            links:
    
            - zookeeper
    
            depends_on:
    
            - zookeeper
    
            environment:
    
            - ZOOKEEPER_CONNECT=zookeeper:2181

    Introduction

    MySQL is a free and open-source relational database management system (RDBMS) that manages data stored in tables using Structured Query Language (SQL). In a relational database, data is organized into one or more data tables which may be related to each other, while an RDBMS is a piece of software that lets you create, update, and manage such databases.

    In MySQL, data is organized into tables consisting of rows and columns, with each column representing a different piece of data, and each row representing a unique record. For instance, a table of customer information might have columns for names, phone numbers, social security numbers (SSN), and amount purchased; each row would represent a different customer with their values for those columns. SQL is used to create, modify, and query tables and other database objects, as well as insert, update and delete data stored in tables.

    MySQL is commonly used in web applications such as e-commerce, data warehousing, or logging applications.

    This guide covers how to build a Rust application connecting to a MySQL RCS managed database.

    Prerequisites

    • Working knowledge of Rust and SQL.

    • Properly installed Rust toolchain including cargo (Rust version >= 1.65).

    Create the Managed Database

    To create the RCS managed MySQL database, recreate the following steps:

    1. Log in to your RCS account, and navigate to the Add Managed Database section.

    2. Choose the MySQL database engine.

    3. Select the Server Type from the options. The options include Cloud Compute, Cloud Compute High Performance - AMD or Intel, Optimized Cloud Compute - General Purpose, Storage, or Memory Optimized.

    4. Select zero or more replica nodes. A replica node is the same server type and plan as the primary node. This guide opts for the Cloud Compute server type without any replica node for demonstration purposes.

    5. Choose a server location.

    6. Add a label for the database cluster.

    7. Click on Deploy Now to create the cluster. It takes a few minutes for the cluster to be available, and the Status should be changed to Running.

    The MySQL database is ready. Now you can connect to it using a Rust program.

    Setting Up

    Initialize the project crate using Cargo:

    $ cargo new vmd_demo --bin
    

    Navigate to the newly created project directory:

    $ cd vmd_demo
    

    The project directory should look like this:

    .
    
    |-- Cargo.toml
    
    `-- src
    
       `-- main.rs
    
    
    
    1 directory, 2 files
    

    Open the Cargo.toml file, and add the following dependency:

    mysql = "*"
    

    This adds the mysql crate as a dependency of the project. The Cargo.toml should look like this:

    [package]
    
    name = "vmd_demo"
    
    version = "0.1.0"
    
    edition = "2021"
    
    
    
    # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
    
    
    
    [dependencies]
    
    mysql = "*"
    

    Import Libraries

    Open the main.rs file in the src/ directory, and overwrite its contents with the following import lines:

    use mysql::*;
    
    use mysql::prelude::Queryable;
    

    Define Data Structure

    In this guide, CRUD (Create, Read, Update, Delete) operations will be performed on customer information against the database. Create a struct to hold customer information to be saved by adding the following lines:

    #[derive(Debug, PartialEq, Eq)]
    
    struct Customer {
    
        id: i32,
    
        name: String,
    
        amount: i32,
    
    }
    

    This struct represents customer entries in the database.

    Initialize Customers Table in the MySQL Database

    To create a customers table in the database, add the following function to initialize the table:

    fn init_table(conn: &mut mysql::PooledConn) {
    
        // Drop table if it already exists
    
        conn.query_drop(
    
            r"DROP TABLE customers
    
            ").unwrap();
    
    
    
        // Create a table for customers.
    
        conn.query_drop(
    
            r"CREATE TABLE customers (
    
                id int not null,
    
                name text not null,
    
                amount int not null,
    
                PRIMARY KEY(id)
    
        )").unwrap();
    
    
    
        println!("Table created.");
    
    }
    

    The init_table function takes a mysql::PooledConn as an argument. SQL queries and statements are executed on the PooledConn object.

    The query_drop method takes a text query as an argument, performs it, and drops the result. It is first used to drop the customers table if it already exists in the database.

    Next, query_drop is used to create the customers table with the required columns; the id column is also used as the primary key for the table.

    Adding Data to the customers Table

    Create a function to insert row entries into the table:

    fn insert_customers(conn: &mut mysql::PooledConn, customers: Vec<Customer>) {
    
        // Insert vector of customers to the database
    
        conn.exec_batch(
    
            r"INSERT INTO customers (id, name, amount)
    
            VALUES (:id, :name, :amount)",
    
            customers.iter().map(|customer| params! {
    
                "id" => customer.id,
    
                "name" => &customer.name,
    
                "amount" => customer.amount,
    
            })
    
        ).unwrap();
    
    
    
        println!("Inserted customer entries successfully.");
    
    }
    

    The insert_customers function takes two parameters - a PooledConn object and a vector of type Customer to be added to the database.

    The exec_batch method takes two arguments, a statement to be executed and a params iterator. The statement is executed against each item in the given iterator. The named parameters in the query statement - (:id, :name, :amount), are replaced by the values caught in the iterator on each iteration. The params! macro is a convenient way to pass named parameters to a statement.

    Updating Data in the customers Table

    To update row entries in the table, add the following function:

    fn update_customer(conn: &mut mysql::PooledConn, customer: &mut Customer) {
    
        // Look for the matching id and update the columns
    
        conn.exec_drop(
    
            r"UPDATE customers SET name=:name, amount=:amount WHERE id=:id",
    
            params! {
    
                "name" => &customer.name,
    
                "amount" => customer.amount,
    
                "id" => customer.id,
    
            }
    
        ).unwrap();
    
    
    
        println!("Successfully updated customer.");
    
    }
    

    The function takes a PooledConn object and a Customer whose column values are to be updated.

    The exec_drop method takes two arguments - a statement to be executed and a Params type to pass named parameters to the query statement. exec_drop executes the given statement and drops the result. In this example, the customer with the matching id in the database is updated with the new name and amount fields.

    Reading Data in the customers Table

    To access the rows in the table, add the following lines:

    fn read_customers(conn: &mut mysql::PooledConn) {
    
        // Read all database values
    
        let customers = conn.query_map(
    
                "SELECT id, name, amount from customers",
    
                |(id, name, amount)| {
    
                    Customer { id, name, amount }
    
                },
    
            ).unwrap();
    
    
    
        println!("{:#?}", customers);
    
    }
    

    The query_map method takes a text query and a closure that maps each row of the first result set. The closure is used to capture the columns of the database rows into a Customer type struct and collects each row into a vector type. This vector contains each entry in the database represented as Customer types.

    Deleting Data in the customers Table

    To delete rows in the table:

    fn delete_customer(conn: &mut mysql::PooledConn, customer_id: i32) {
    
        // Delete matching id entry
    
        conn.exec_drop(
    
            r"DELETE FROM customers WHERE id=:id",
    
                params! {
    
                    "id" => customer_id,
    
                }
    
        ).unwrap();
    
    
    
        println!("Deleted customer with id - {}.", customer_id);
    
    }
    

    Here, the database is queried for a row with the corresponding id, if a match is found - the row is deleted from the table.

    Connect to RCS MySQL with Rust

    After writing the helper functions, it's time to bring them all together in the main function.

    Before proceeding, you need to get the connection details for the managed database to connect to it.

    1. Navigate to the Managed Databases section.

    2. Select the previously deployed database.

    3. Navigate to the Connection Details section, and click the Copy MySQL URL icon to get the connection URL.

    Next, add the following lines to the src/main.rs file:

    fn main() {
    
        let url = "[paste-url-string-here]";
    
        let pool = Pool::new(url).unwrap();
    
    
    
        let mut conn = pool.get_conn().unwrap();
    

    Save the copied URL string in a variable - url.

    Pool::new is used to create a Pool, the connection is initiated and checked if alive using the get_conn method. When this connection is facilitated, queries and operations can then be run against it.

    Add the following lines to run operations:

        // Initialize tables
    
        init_table(&mut conn);
    
    
    
        // Create vector of customer information to insert into the database
    
        let customers = vec![
    
                Customer { id: 1, name: "Gates Bill".to_string(), amount: 1000 },
    
                Customer { id: 2, name: "Buffer Warren".to_string(), amount: 999 },
    
                Customer { id: 3, name: "Musk Noel".to_string(), amount: 7 },
    
        ];
    
    
    
        // Insert customers into database
    
        insert_customers(&mut conn, customers);
    
    
    
        // Update entry
    
        let mut customer3 = Customer { id: 3, name: "Musk Elon".to_string(), amount: 876 };
    
        update_customer(&mut conn, &mut customer3);
    
    
    
        // Delete entry with matching id
    
        let customer_id = 2;
    
        delete_customer(&mut conn, customer_id);
    
    
    
        // Print list
    
        read_customers(&mut conn);
    
    }
    

    The above code simply makes use of the functions created earlier to perform database operations.

    Final Code

    For reference, the full code in the src/main.rs file:

    use mysql::*;
    
    use mysql::prelude::Queryable;
    
    
    
    #[derive(Debug, PartialEq, Eq)]
    
    struct Customer {
    
        id: i32,
    
        name: String,
    
        amount: i32,
    
    }
    
    
    
    fn init_table(conn: &mut mysql::PooledConn) {
    
        // Drop table if it already exists
    
        conn.query_drop(
    
                r"DROP TABLE customers
    
            ").unwrap();
    
    
    
        // Create a table for customers.
    
        conn.query_drop(
    
            r"CREATE TABLE customers (
    
                    id int not null,
    
                    name text not null,
    
                    amount int not null,
    
                    PRIMARY KEY(id)
    
        )").unwrap();
    
    
    
        println!("Table created.");
    
    }
    
    
    
    fn insert_customers(conn: &mut mysql::PooledConn, customers: Vec<Customer>) {
    
        // Insert vector of customers to the database
    
        conn.exec_batch(
    
                r"INSERT INTO customers (id, name, amount)
    
                VALUES (:id, :name, :amount)",
    
                customers.iter().map(|customer| params! {
    
                        "id" => customer.id,
    
                        "name" => &customer.name,
    
                        "amount" => customer.amount,
    
                })
    
        ).unwrap();
    
    
    
        println!("Inserted customer entries successfully.");
    
    }
    
    
    
    fn update_customer(conn: &mut mysql::PooledConn, customer: &mut Customer) {
    
        // Look for the matching id and update the columns
    
        conn.exec_drop(
    
                r"UPDATE customers SET name=:name, amount=:amount WHERE id=:id",
    
                params! {
    
                        "name" => &customer.name,
    
                        "amount" => customer.amount,
    
                        "id" => customer.id,
    
                }
    
        ).unwrap();
    
    
    
        println!("Successfully updated customer.");
    
    }
    
    
    
    fn read_customers(conn: &mut mysql::PooledConn) {
    
        // Read all database values
    
        let customers = conn.query_map(
    
                    "SELECT id, name, amount from customers",
    
                    |(id, name, amount)| {
    
                        Customer { id, name, amount }
    
                    },
    
            ).unwrap();
    
    
    
        println!("{:#?}", customers);
    
    }
    
    
    
    fn delete_customer(conn: &mut mysql::PooledConn, customer_id: i32) {
    
        // Delete matching id entry
    
        conn.exec_drop(
    
                r"DELETE FROM customers WHERE id=:id",
    
                params! {
    
                        "id" => customer_id,
    
                }
    
        ).unwrap();
    
    
    
    println!("Deleted customer with id - {}.", customer_id);
    
    }
    
    
    
    fn main() {
    
        let url = "[paste-url-string-here]";
    
        let pool = Pool::new(url).unwrap();
    
    
    
        let mut conn = pool.get_conn().unwrap();
    
    
    
        // Initialize tables
    
        init_table(&mut conn);
    
    
    
        // Create vector of customer information to insert into the database
    
        let customers = vec![
    
                Customer { id: 1, name: "Gates Bill".to_string(), amount: 1000 },
    
                Customer { id: 2, name: "Buffer Warren".to_string(), amount: 999 },
    
                Customer { id: 3, name: "Musk Noel".to_string(), amount: 7 },
    
        ];
    
    
    
        // Insert customers into database
    
        insert_customers(&mut conn, customers);
    
    
    
        // Update entry
    
        let mut customer3 = Customer { id: 3, name: "Musk Elon".to_string(), amount: 876 };
    
        update_customer(&mut conn, &mut customer3);
    
    
    
        // Delete entry with matching id
    
        let customer_id = 2;
    
        delete_customer(&mut conn, customer_id);
    
    
    
        // Print list
    
        read_customers(&mut conn);
    
    }
    

    Running the Code

    To run the code, enter the following command:

    $ cargo run
    

    You should see output similar to this:

    Table created.
    
    Inserted customer entries successfully.
    
    Successfully updated customer.
    
    Deleted customer with id - 2.
    
    [
    
       Customer {
    
           id: 1,
    
           name: "Gates Bill",
    
           amount: 1000,
    
       },
    
       Customer {
    
           id: 3,
    
           name: "Musk Elon",
    
           amount: 876,
    
       },
    
    ]
    

    This means the program ran successfully and all database operations on the MySQL RCS Managed Database executed successfully.

    Delete the Managed Database

    While logged into your RCS account, navigate to the Managed Databases section:

    • For the database you created earlier, click the delete icon.

    • In the Destroy Managed Database? pop-up window, select the checkbox Yes, destroy this Managed Database, and click on Destroy Managed Database.

    Conclusion

    This guide covered how to use RCS's managed MySQL database in Rust.

    To learn more about RCS Managed Databases, you can refer to the following documentation:


Was this answer helpful?
Back

Powered by WHMCompleteSolution