Introduction
This article demonstrates how to integrate a RCS Managed Database for MySQL with Apache Kafka using the Debezium source connector for MySQL. Behind the scenes, this is made possible by Change Data Capture (also referred to as CDC
), which can track row-level create, update and delete operations in MySQL tables. These change data capture events can be used to integrate MySQL with other systems.
Change data capture is a popular solution, and it is often combined with Apache Kafka and Kafka Connect.
Apache Kafka
Apache Kafka is a messaging system that allows clients to publish and read streams of data (also called events). It has an ecosystem of open-source solutions that you can combine to store, process, and integrate these data streams with other parts of your system in a secure, reliable, and scalable manner.
Some key components of Kafka include:
-
Broker (Node): A Kafka broker runs the Kafka JVM process. A best practice is to run three or more brokers for scalability and high availability. These groups of Kafka brokers form a cluster.
-
Producers: These are client applications that send messages to Kafka. Each message is nothing but a key-value pair.
-
Topics: Events (messages) are stored in topics, and each topic has one or more partitions. Data in each of these partitions are distributed across the Kafka cluster for high availability and redundancy.
-
Consumers: Just like producers, consumers are also client applications. They receive and process data/events from Kafka topics.
Kafka Connect
To build integration solutions, you can use the Kafka Connect framework, which provides a suite of connectors to integrate Kafka with external systems. There are two types of Kafka connectors:
-
Source connector: Used to move data from source systems to Kafka topics.
-
Sink connector: Used to send data from Kafka topics into target (sink) systems.
Debezium
Debezium is a set of source connectors for Kafka Connect. You can use it to capture changes in your databases so that your applications can respond to them in real time. It records all row-level changes within each database table in a change event stream, and applications simply read these streams to see the change events in the same order in which they occurred.
When it comes to integrating with relational databases, the JDBC connector (source and sink) and Debezium (source) connectors are widely used solutions. The JDBC connector works by polling the database table to retrieve data, while Debezium relies on change data capture.
Demonstration Scenario
The solution demonstrated in this article consists of the following components:
-
MySQL - You will set up a RCS Managed Database for MySQL.
-
Apache Kafka and Zookeeper - These will run as Docker containers and are orchestrated using Docker compose.
-
Kafka Connect - A Kafka Connect instance will also run as a Docker container. The Debezium MySQL connector will be installed in this instance.
You will create a sample table (called Persons
) in the MySQL database. Changes made to data in this table will be detected by the Debezium MySQL connector and sent to a Kafka topic. Underneath the hood, the connector reads from the MySQL binlog
and produces change events for row-level operations.
Prerequisites
Before following the steps in this guide, you need to:
-
Deploy a new Ubuntu 22.04 LTS RCS cloud server for Kafka.
-
Create a non-root sudo user.
-
Install Docker.
-
Install Docker Compose
-
Install mysql which is an command-line client for MySQL. Because this guide uses a RCS managed database, you only need the MySQL command-line client to query the database.
$ sudo apt install mysql-client
-
Install curl, which is a popular command-line
HTTP
client.
Create a MySQL Database
Log into your RCS account, navigate to Add Managed Database and follow the below steps.
-
Choose the MySQL database engine.
-
Choose a Server Type: Cloud Compute, Cloud Compute High Performance - AMD or Intel, Optimized Cloud Compute - General Purpose, Storage or Memory Optimized. Along with that, you should also select zero or more replica nodes as well the cluster location. A replica node is the same server type and plan as the primary node. You can opt for the Cloud Compute Server Type without a replica node.
-
After you add a label for the database cluster, click Deploy Now to create the cluster. It will take a few minutes for the cluster to be available and the Status should change to Running.
After the database is ready, you can proceed with the next steps.
Create a Table
Get the connection string details for the MySQL database.
-
In the Managed Database section of the RCS customer portal, click the Manage icon to open the Overview tab.
-
From Connection Details section, click Copy Connection String.
-
SSH as the non-root user to the Kafka server you deployed in the Prerequisites section.
-
In the SSH session, paste the
mysql
connection string you copied from the RCS customer portal.After a successful database connection, you should see this prompt as the output:
mysql>
-
Switch to the correct database:
use defaultdb;
You should see this output:
Database changed
-
To create the
Persons
table, enter this SQL:CREATE TABLE Persons ( Personid int NOT NULL AUTO_INCREMENT, LastName varchar(255) NOT NULL, FirstName varchar(255), Age int, PRIMARY KEY (Personid) );
After successful table creation, you should see this output:
Query OK, 0 rows affected (0.28 sec)
-
To confirm, you can query the table:
select * from Persons;
Because there are no rows in the table, you should see this output:
Empty set (0.26 sec)
Start the Services
-
On the Kafka server, create a new directory and change to it.
$ mkdir RCS-mysql-kafka $ cd RCS-mysql-kafka
-
Create a new file
docker-compose.yaml
:$ touch docker-compose.yaml
-
Add the following to
docker-compose.yaml
file and save it:version: "2" services: zookeeper: image: quay.io/debezium/zookeeper ports: - 2181:2181 kafka: image: quay.io/debezium/kafka ports: - 9092:9092 links: - zookeeper depends_on: - zookeeper environment: - ZOOKEEPER_CONNECT=zookeeper:2181
Introduction
MySQL is a free and open-source relational database management system (RDBMS) that manages data stored in tables using Structured Query Language (SQL). In a relational database, data is organized into one or more data tables which may be related to each other, while an RDBMS is a piece of software that lets you create, update, and manage such databases.
In MySQL, data is organized into tables consisting of rows and columns, with each column representing a different piece of data, and each row representing a unique record. For instance, a table of customer information might have columns for names, phone numbers, social security numbers (SSN), and amount purchased; each row would represent a different customer with their values for those columns. SQL is used to create, modify, and query tables and other database objects, as well as insert, update and delete data stored in tables.
MySQL is commonly used in web applications such as e-commerce, data warehousing, or logging applications.
This guide covers how to build a Rust application connecting to a MySQL RCS managed database.
Prerequisites
-
Working knowledge of Rust and SQL.
-
Properly installed Rust toolchain including cargo (Rust version >= 1.65).
Create the Managed Database
To create the RCS managed MySQL database, recreate the following steps:
-
Log in to your RCS account, and navigate to the Add Managed Database section.
-
Choose the MySQL database engine.
-
Select the Server Type from the options. The options include Cloud Compute, Cloud Compute High Performance - AMD or Intel, Optimized Cloud Compute - General Purpose, Storage, or Memory Optimized.
-
Select zero or more replica nodes. A replica node is the same server type and plan as the primary node. This guide opts for the Cloud Compute server type without any replica node for demonstration purposes.
-
Choose a server location.
-
Add a label for the database cluster.
-
Click on Deploy Now to create the cluster. It takes a few minutes for the cluster to be available, and the Status should be changed to Running.
The MySQL database is ready. Now you can connect to it using a Rust program.
Setting Up
Initialize the project crate using Cargo:
$ cargo new vmd_demo --bin
Navigate to the newly created project directory:
$ cd vmd_demo
The project directory should look like this:
. |-- Cargo.toml `-- src `-- main.rs 1 directory, 2 files
Open the Cargo.toml file, and add the following dependency:
mysql = "*"
This adds the mysql crate as a dependency of the project. The Cargo.toml should look like this:
[package] name = "vmd_demo" version = "0.1.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] mysql = "*"
Import Libraries
Open the main.rs file in the src/ directory, and overwrite its contents with the following import lines:
use mysql::*; use mysql::prelude::Queryable;
Define Data Structure
In this guide, CRUD (Create, Read, Update, Delete) operations will be performed on customer information against the database. Create a struct to hold customer information to be saved by adding the following lines:
#[derive(Debug, PartialEq, Eq)] struct Customer { id: i32, name: String, amount: i32, }
This struct represents customer entries in the database.
Initialize Customers Table in the MySQL Database
To create a customers table in the database, add the following function to initialize the table:
fn init_table(conn: &mut mysql::PooledConn) { // Drop table if it already exists conn.query_drop( r"DROP TABLE customers ").unwrap(); // Create a table for customers. conn.query_drop( r"CREATE TABLE customers ( id int not null, name text not null, amount int not null, PRIMARY KEY(id) )").unwrap(); println!("Table created."); }
The init_table function takes a
mysql::PooledConn
as an argument. SQL queries and statements are executed on the PooledConn object.The query_drop method takes a text query as an argument, performs it, and drops the result. It is first used to drop the customers table if it already exists in the database.
Next, query_drop is used to create the customers table with the required columns; the id column is also used as the primary key for the table.
Adding Data to the
customers
TableCreate a function to insert row entries into the table:
fn insert_customers(conn: &mut mysql::PooledConn, customers: Vec<Customer>) { // Insert vector of customers to the database conn.exec_batch( r"INSERT INTO customers (id, name, amount) VALUES (:id, :name, :amount)", customers.iter().map(|customer| params! { "id" => customer.id, "name" => &customer.name, "amount" => customer.amount, }) ).unwrap(); println!("Inserted customer entries successfully."); }
The insert_customers function takes two parameters - a PooledConn object and a vector of type Customer to be added to the database.
The
exec_batch
method takes two arguments, a statement to be executed and aparams
iterator. The statement is executed against each item in the given iterator. The named parameters in the query statement -(:id, :name, :amount)
, are replaced by the values caught in the iterator on each iteration. The params! macro is a convenient way to pass named parameters to a statement.Updating Data in the
customers
TableTo update row entries in the table, add the following function:
fn update_customer(conn: &mut mysql::PooledConn, customer: &mut Customer) { // Look for the matching id and update the columns conn.exec_drop( r"UPDATE customers SET name=:name, amount=:amount WHERE id=:id", params! { "name" => &customer.name, "amount" => customer.amount, "id" => customer.id, } ).unwrap(); println!("Successfully updated customer."); }
The function takes a PooledConn object and a Customer whose column values are to be updated.
The exec_drop method takes two arguments - a statement to be executed and a Params type to pass named parameters to the query statement. exec_drop executes the given statement and drops the result. In this example, the customer with the matching id in the database is updated with the new name and amount fields.
Reading Data in the
customers
TableTo access the rows in the table, add the following lines:
fn read_customers(conn: &mut mysql::PooledConn) { // Read all database values let customers = conn.query_map( "SELECT id, name, amount from customers", |(id, name, amount)| { Customer { id, name, amount } }, ).unwrap(); println!("{:#?}", customers); }
The query_map method takes a text query and a closure that maps each row of the first result set. The closure is used to capture the columns of the database rows into a Customer type struct and collects each row into a vector type. This vector contains each entry in the database represented as Customer types.
Deleting Data in the
customers
TableTo delete rows in the table:
fn delete_customer(conn: &mut mysql::PooledConn, customer_id: i32) { // Delete matching id entry conn.exec_drop( r"DELETE FROM customers WHERE id=:id", params! { "id" => customer_id, } ).unwrap(); println!("Deleted customer with id - {}.", customer_id); }
Here, the database is queried for a row with the corresponding id, if a match is found - the row is deleted from the table.
Connect to RCS MySQL with Rust
After writing the helper functions, it's time to bring them all together in the main function.
Before proceeding, you need to get the connection details for the managed database to connect to it.
-
Navigate to the Managed Databases section.
-
Select the previously deployed database.
-
Navigate to the Connection Details section, and click the Copy MySQL URL icon to get the connection URL.
Next, add the following lines to the src/main.rs file:
fn main() { let url = "[paste-url-string-here]"; let pool = Pool::new(url).unwrap(); let mut conn = pool.get_conn().unwrap();
Save the copied URL string in a variable - url.
Pool::new
is used to create a Pool, the connection is initiated and checked if alive using the get_conn method. When this connection is facilitated, queries and operations can then be run against it.Add the following lines to run operations:
// Initialize tables init_table(&mut conn); // Create vector of customer information to insert into the database let customers = vec![ Customer { id: 1, name: "Gates Bill".to_string(), amount: 1000 }, Customer { id: 2, name: "Buffer Warren".to_string(), amount: 999 }, Customer { id: 3, name: "Musk Noel".to_string(), amount: 7 }, ]; // Insert customers into database insert_customers(&mut conn, customers); // Update entry let mut customer3 = Customer { id: 3, name: "Musk Elon".to_string(), amount: 876 }; update_customer(&mut conn, &mut customer3); // Delete entry with matching id let customer_id = 2; delete_customer(&mut conn, customer_id); // Print list read_customers(&mut conn); }
The above code simply makes use of the functions created earlier to perform database operations.
Final Code
For reference, the full code in the
src/main.rs
file:use mysql::*; use mysql::prelude::Queryable; #[derive(Debug, PartialEq, Eq)] struct Customer { id: i32, name: String, amount: i32, } fn init_table(conn: &mut mysql::PooledConn) { // Drop table if it already exists conn.query_drop( r"DROP TABLE customers ").unwrap(); // Create a table for customers. conn.query_drop( r"CREATE TABLE customers ( id int not null, name text not null, amount int not null, PRIMARY KEY(id) )").unwrap(); println!("Table created."); } fn insert_customers(conn: &mut mysql::PooledConn, customers: Vec<Customer>) { // Insert vector of customers to the database conn.exec_batch( r"INSERT INTO customers (id, name, amount) VALUES (:id, :name, :amount)", customers.iter().map(|customer| params! { "id" => customer.id, "name" => &customer.name, "amount" => customer.amount, }) ).unwrap(); println!("Inserted customer entries successfully."); } fn update_customer(conn: &mut mysql::PooledConn, customer: &mut Customer) { // Look for the matching id and update the columns conn.exec_drop( r"UPDATE customers SET name=:name, amount=:amount WHERE id=:id", params! { "name" => &customer.name, "amount" => customer.amount, "id" => customer.id, } ).unwrap(); println!("Successfully updated customer."); } fn read_customers(conn: &mut mysql::PooledConn) { // Read all database values let customers = conn.query_map( "SELECT id, name, amount from customers", |(id, name, amount)| { Customer { id, name, amount } }, ).unwrap(); println!("{:#?}", customers); } fn delete_customer(conn: &mut mysql::PooledConn, customer_id: i32) { // Delete matching id entry conn.exec_drop( r"DELETE FROM customers WHERE id=:id", params! { "id" => customer_id, } ).unwrap(); println!("Deleted customer with id - {}.", customer_id); } fn main() { let url = "[paste-url-string-here]"; let pool = Pool::new(url).unwrap(); let mut conn = pool.get_conn().unwrap(); // Initialize tables init_table(&mut conn); // Create vector of customer information to insert into the database let customers = vec![ Customer { id: 1, name: "Gates Bill".to_string(), amount: 1000 }, Customer { id: 2, name: "Buffer Warren".to_string(), amount: 999 }, Customer { id: 3, name: "Musk Noel".to_string(), amount: 7 }, ]; // Insert customers into database insert_customers(&mut conn, customers); // Update entry let mut customer3 = Customer { id: 3, name: "Musk Elon".to_string(), amount: 876 }; update_customer(&mut conn, &mut customer3); // Delete entry with matching id let customer_id = 2; delete_customer(&mut conn, customer_id); // Print list read_customers(&mut conn); }
Running the Code
To run the code, enter the following command:
$ cargo run
You should see output similar to this:
Table created. Inserted customer entries successfully. Successfully updated customer. Deleted customer with id - 2. [ Customer { id: 1, name: "Gates Bill", amount: 1000, }, Customer { id: 3, name: "Musk Elon", amount: 876, }, ]
This means the program ran successfully and all database operations on the MySQL RCS Managed Database executed successfully.
Delete the Managed Database
While logged into your RCS account, navigate to the Managed Databases section:
-
For the database you created earlier, click the delete icon.
-
In the Destroy Managed Database? pop-up window, select the checkbox Yes, destroy this Managed Database, and click on Destroy Managed Database.
Conclusion
This guide covered how to use RCS's managed MySQL database in Rust.
To learn more about RCS Managed Databases, you can refer to the following documentation:
-