Introduction
Redis is an open-source, in-memory data structure store. Redis Lists are collections of string elements sorted by insertion order. They offer a range of commands that enable various operations such as push, pop, and traversal from both ends of the list, along with the ability to access or remove elements by indexing or value.
Redis List blocking operations enable it to be used as a message queue. This allows a client to wait for a message to be available if the list is empty, hence facilitating asynchronous task processing. In this article, you will implement Worker Queues for Background Job Processing for applications using a rcs Managed Database for Redis
Prerequisites
Before you begin:
Deploy a rcs Managed Database for Redis.
When deployed, and ready. Copy the database connection string to connect to the Redis database as below:
rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]
Deploy a Ubuntu 22.04 management server.
Using SSH, access the server as a non-root sudo user and install:
The latest stable GO programming language.
$ sudo snap install go --classicThe Redis CLI tool.
$ sudo apt-get install redis
Redis List commands
In this section, explore the Redis List commands, and implement them in your Redis database as described in the steps below:
To implement the commands, use redis-cli to access your rcs Managed Database for Redis using a connection string as below.
$ redis-cli -u rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]
Replace DATABASE_PASSWORD, DATABASE_HOST, and DATABASE_PORT with your actual Redis database values.
When connected, your terminal prompt should change to the Redis Shell as below:
>
LPUSH
The LPUSH command inserts all specified values at the head of the list stored at the key.
> LPUSH students "Sam" "Alex"
Output:
(integer) 2
The above command adds two students, Sam and Alex to the list.
RPUSH
RPUSH inserts specified values at the tail of the list stored at the key.
> RPUSH students "Lily" "John"
Output:
(integer) 4
The command adds two more students, Lily and John to the list. This turns the total number of students to 4.
LRANGE
LRANGE gets a range of elements from a list as below.
> LRANGE students 0 -1
Output:
1) "Alex"
2) "Sam"
3) "Lily"
4) "John"
LLEN
LLEN returns the length of the list stored at the key.
> LLEN students
Output:
(integer) 4
As displayed in the output, the total number of students in the list is 4.
LPOP
LPOP removes and returns the first element of the list stored at the key.
> LPOP students
Output:
"Alex"
As displayed in the output, the name Alex which was the first student entry in the list is now removed.
RPOP
RPOP removes and returns the last element of the list stored at the key.
> RPOP students
Output:
"John"
As per the output, the last student in the list John is now removed.
LINDEX
LINDEX returns the element at index position in the list stored at the key.
> LINDEX students 0
Output:
"Sam"
As per the output, Samis the first student in the list.
LREM
LREM removes the first count occurrences of elements equal to a value from the list stored at the key.
> LREM students 1 "Sam"
Output:
(integer) 1
The above output shows that Sam is removed from the list, and the command affects 1 entry.
Redis as a worker queue
Redis lists support blocking operations (such as BLPOP and BRPOP). This makes it possible to wait for a specified time duration if there is no item in the list. When an item is added to the list (for example, by another application using the LPUSH method), it's retrieved and processed. This allows you to decouple applications and build asynchronous, event-driven solutions.
This is also known as the worker queue pattern, where:
A Redis List acts as a queue.
The producer applications add items to this queue.
The worker applications (consumers) retrieve and process them.
In this section, you will implement a sample application that uses Redis as a worker queue. The application retrieves and processes notification tasks from a Redis List as described in the steps below.
Initialize the project
Create a directory.
$ mkdir redis-notification-processorSwitch to the directory.
$ cd redis-notification-processorCreate a new Go module:
$ go mod init redis-notification-processorThe above command creates a new
go.modfile in the directory.To correctly use your Redis connection, add your rcs Managed Database for Redis connection string to the
REDIS_URLvariable.$ export REDIS_URL=rediss://default:[DATABASE_PASSWORD]@[DATABASE_HOST]:[DATABASE_PORT]Replace
DATABASE_PASSWORD,DATABASE_HOST, andDATABASE_PORTwith your actual database values.
Create a Consumer application
Using a text editor such as
Nano, create a new fileconsumer.go.$ nano consumer.goAdd the following contents to the file.
package main import ( "context" "crypto/tls" "errors" "fmt" "log" "os" "strings" "time" "github.com/redis/go-redis/v9" ) const listName = "jobs" var client *redis.Client func init() { redisURL := os.Getenv("REDIS_URL") if redisURL == "" { log.Fatal("missing environment variable REDIS_URL") } opt, err := redis.ParseURL(redisURL) if err != nil { log.Fatal("invalid redis url", err) } opt.TLSConfig = &tls.Config{} client = redis.NewClient(opt) _, err = client.Ping(context.Background()).Result() if err != nil { log.Fatal("ping failed. could not connect", err) } fmt.Println("successfully connected to redis") } func main() { fmt.Println("consumer application is ready") for { val, err := client.BRPop(context.Background(), 2*time.Second, listName).Result() if err != nil { if errors.Is(err, redis.Nil) { continue } log.Println("brpop issue", err) } job := val[1] info := strings.Split(job, ",") fmt.Println("received notification job", info) fmt.Printf("sending notification '%s' to %s\n", info[1], info[0]) } }Save and close the file.
In the above configuration:
The
initfunction reads the rcs Managed Database for Redis URL from theREDIS_URLenvironment variable and fails if it's missing.Then, it creates a new
redis.Clientinstance and verifies connectivity to the rcs Managed Database for Redis using thePingutility. If connectivity fails, the program exists with an error message.The
mainfunction runs an infinite loop and:Continuously tries to use
BRPopto get a message from thejobslist with a timeout of 2 seconds. This function acts as a blocking operation where the client waits for a message to be available in the list, if the list is empty.When it receives a message, it splits it using a comma as a delimiter (the message is assumed to be in the format of "notification, message"), and prints out the notification.
Fetch the Go module dependencies for the program.
$ go getRun the program.
$ go run consumer.goYour output should look like the one below:
successfully connected to redis consumer application is readyIn a new terminal session, establish a connection to your Redis Database, and add a new item to the list to simulate a new job.
> LPUSH jobs 'user1@foo.com,application installed'Output:
(integer) 1Add another item to the list.
> LPUSH jobs 'user2@foo.com,monthly fee due tomorrow'Output:
(integer) 1Add another item to the list.
> LPUSH jobs 'user3@foo.com,please upgrade for faster service'Output:
(integer) 1Navigate to your first terminal session, and verify that your output looks like the one below:
received notification job [user1@foo.com application installed] sending notification 'application installed' to user1@foo.com received notification job [user2@foo.com monthly fee due tomorrow] sending notification 'monthly fee due tomorrow' to user2@foo.com received notification job [user3@foo.com please upgrade for faster service] sending notification 'please upgrade for faster service' to user3@foo.comTo stop the application, press
CTRL + Con your keyboard to send a kill signal.
Reliable processing with Redis Lists
In the previous application, the consumer process used BRPop method to get the job information required for it to send notifications. The process of using BRPop (or BLPop) removes the job item from the list.
In a scenario where the job processor fails for some reason. The notification job should not be lost if the consumer process fails. By this, the application needs to be reliable and fault-tolerant. The BLMOVE command makes this possible by first moving the item from the original list to another list which reduces the possibility of data loss.
BLMOVE works in an atomic manner, and if it fails, the item remains in the original list, and you can retry the process. When processed, the items can be deleted from the destination list. You can have a monitoring application in the form of another background application that checks the destination list for failed jobs.
To make its operation reliable and fault-tolerant, the consumer application:
Uses
BLMoveto retrieve notification job information and simultaneously move it to another temporary list.Processes the notification job:
If the job finishes successfully, the entry from the temporary list is removed.
If the job fails, it remains in the list where it can eventually be picked up by another monitoring job.
When successful, it's moved back to the destination list. Else, the cycle repeats in a loop.
Reliable consumer application
Create a new file
reliable_consumer.go.$ nano reliable_consumer.goAdd the following contents to the file.
package main import ( "context" "crypto/tls" "errors" "fmt" "log" "os" "strings" "time" "github.com/redis/go-redis/v9" ) const listName = "new_jobs" const emailTempQueueList = "jobs_temp" var client *redis.Client func init() { redisURL := os.Getenv("REDIS_URL") if redisURL == "" { log.Fatal("missing environment variable REDIS_URL") } opt, err := redis.ParseURL(redisURL) if err != nil { log.Fatal("invalid redis url", err) } opt.TLSConfig = &tls.Config{} client = redis.NewClient(opt) _, err = client.Ping(context.Background()).Result() if err != nil { log.Fatal("ping failed. could not connect", err) } fmt.Println("successfully connected to redis") } func main() { fmt.Println("reliable consumer application is ready") for { val, err := client.BLMove(context.Background(), listName, emailTempQueueList, "RIGHT", "LEFT", 2*time.Second).Result() if err != nil { if errors.Is(err, redis.Nil) { continue } fmt.Println("blmove issue", err) } job := val info := strings.Split(job, ",") fmt.Println("received notification job", info) fmt.Printf("sending notification '%s' to %s\n", info[1], info[0]) go func() { err = client.LRem(context.Background(), emailTempQueueList, 0, job).Err() if err != nil { log.Fatal("lrem failed for", job, "error", err) } fmt.Println("removed job from temporary list", job) }() } }Save and close the file.
In the above code:
initreads the rcs Managed Database for Redis URL from theREDIS_URLenvironment variable.A new
redis.Clientinstance is created to verify connectivity to the rcs Managed Database for Redis. If the connection fails, the program exits with an error message.The main function starts an infinite loop and does the following:
Uses the
BLMovecommand to move an element from thenew_jobslist to thejobs_templist (a temporary list used for processing jobs) in blocking mode where it waits up to 2 seconds for an element to be available in the new_jobs list.Checks whether there is an error in the
BLMoveoperation, and when the Redis list is empty (redis.Nil), it continues the loop to try again. If the error is due to some other issue, it logs the error and proceeds.When an element (job) is successfully moved to the
jobs_templist, it's split using a comma as a delimiter (the message is assumed to be in the format of "notification, message"), and prints out both the notification and message.It launches a new goroutine to remove the processed job from the
jobs_templist.
Fetch the Go module dependencies for the program:
$ go getRun the program.
$ go run reliable_consumer.goYour output should look like the one below:
successfully connected to redis reliable consumer application is readyIn your Redis connection terminal session, add a new item to the list to simulate a job.
> LPUSH new_jobs 'user1@foo.com,application installed'Output:
(integer) 1Add another item to the list:
> LPUSH new_jobs 'user2@foo.com,monthly fee due tomorrow'Output:
(integer) 1Navigate back to the application terminal session, and verify that your output looks like the one below.
received notification job [user1@foo.com application installed] sending notification 'application installed' to user1@foo.com removed job from temporary list user1@foo.com,application installed received notification job [user2@foo.com monthly fee due tomorrow] sending notification 'monthly fee due tomorrow' to user2@foo.com removed job from temporary list user2@foo.com,monthly fee due tomorrowTo stop the application, press
Ctrl + Con your keyboard.
Conclusion
In this article, you implemented Redis List operations using a rcs Managed Database for Redis. Then, you used blocking list commands for asynchronous job processing, and also explored a reliable option to avoid data loss in case of consumer application failures. For more information about Redis lists, visit the official documentation.