Golang is a useful programming language that can solve daily problems in an efficient way. It’s easy to learn, and doesn’t require writing a lot of code to perform well.
Let’s take a look at how Golang can help in a simple and practical case involving copying large amounts of Redis keys.
At some point it became necessary to split our Amazon ElastiCache store into two parts — one for storing cached data, and the other for storing users’ sessions.
We, unfortunately, had them on the same instance previously. We also didn’t want to interrupt long-living sessions by resetting the storage.
Amazon ElastiCache is compatible with the Redis protocol, though with certain limitations. Redis supports the MIGRATE command, allowing you to move keys matched by a pattern from one instance to another.
Internally it works by executing DUMP+DEL commands on the source instance and creating them in target instance using RESTORE. However, Amazon’s version didn’t support this command at the time.
Back then, my practical experience with Golang was limited. I’d only implemented projects for fun and was familiar with basic syntax and concepts like goroutine and channels. But I’d decided that was enough to make use of Golang’s strengths to solve the problem I was facing.
Step 1: Let’s Write Some Simple Code
Let’s assume that Golang is fast enough to do the job. Keep in mind that Redis is, mostly, a single-threaded server from the point of view of commands execution and implements replication with no concurrency.
Preparation
I’ve picked two base libraries for this challenge:
Radix to connect to the Redis API, and
Cobra to make it easier to build the command-line interface for the tool
package cmd
import (
"github.com/mediocregopher/radix/v3"
"github.com/spf13/cobra"
"log"
)
var pattern string
var scanCount, report, limit int
var copyCmd = &cobra.Command{
Use: "copy [sourceHost:port] [targetHost:port]",
Short: "Copy keys from source redis instance to destination by given pattern",
Long: "",
Args: cobra.MinimumNArgs(2),
Run: func(cmd *cobra.Command, args []string) {
clientSource, err := radix.DefaultClientFunc("tcp", args[0])
if err != nil {
log.Fatal(err)
}
clientTarget, err := radix.DefaultClientFunc("tcp", args[1])
if err != nil {
log.Fatal(err)
}
// ... here the copying will happen
},
}
var rootCmd = &cobra.Command{
Use: "go-redis-migrate",
Short: "Application to migrate redis data from one instance to another",
Long: "",
}
func init() {
rootCmd.AddCommand(copyCmd)
copyCmd.Flags().StringVar(&pattern, "pattern", "*", "Match pattern for keys")
copyCmd.Flags().IntVar(&scanCount, "scanCount", 100, "COUNT parameter for redis SCAN command")
copyCmd.Flags().IntVar(&report, "report", 1000, "After what number of keys copied to report time")
copyCmd.Flags().IntVar(&limit, "limit", 0, "After what number of keys copied to stop (0 - unlimited)")
}
The interface is ready, it supports the “pattern” parameter to match keys, and the “limit” parameter to define the maximum number of keys. The source and destination are provided as arguments and are also required.
The Main Loop
Radix supports creating a “scanner” structure that helps you iterate over keys:
scanOpts := radix.ScanOpts{
Command: "SCAN",
Count: scanCount,
}
if pattern != "*" {
scanOpts.Pattern = pattern
}
scanner := radix.NewScanner(clientSource, scanOpts)
var key string
counter := 0
for scanner.Next(&key) {
// copy the key
counter++
}
if err := scanner.Close(); err != nil {
log.Fatal(err)
}
log.Printf("In total %d keys copied in %s", counter, time.Since(start))
The loop is now ready. What’s left is to read and restore data in the target. I joined the PTTL
and DUMP
command to fetch time to live and value of the key in a pipeline to save execution time.
var value string
var ttl int
p := radix.Pipeline(
radix.Cmd(&ttl, "PTTL", key),
radix.Cmd(&value, "DUMP", key),
)
if err := clientSource.Do(p); err != nil {
panic(err)
}
if ttl < 0 {
ttl = 0
}
err = clientTarget.Do(radix.FlatCmd(nil, "RESTORE", key, ttl, value, "REPLACE"))
if err != nil {
log.Fatal(err)
}
That’s already enough for the code to work, but adding some reporting logic would definitely improve the user experience.
The complete code can be found here: https://github.com/obukhov/go-redis-migrate/blob/v1.0/cmd/copy.go
But is it really that good?
Let’s run some benchmark tests by quickly spawning two Redis instances locally with Docker, and seeding the source with data (453,967 keys in total, but we only copy part of them by matching a pattern).
Then, we run each test three times to see the random deviation:
10000 keys to copy: 17.79s 18.01s 17.98s
367610 keys to copy: 8m57.98s 8m44.98s 8m58.07s
That’s not bad, but let’s see if we can improve it.
Step 2. Utilize Concurrency
Let’s visualize the sequence of operations in the current implementation:
What can we do to improve the performance here?
We can clearly see the following shortcomings:
Reading from the source and writing to the target is serialized, although it can be executed in parallel
The single-threaded nature of Redis only affects command execution, but serving data (network i/o) can also be parallelized. Depending on the value size, this can make a big difference
Sending multiple concurrent requests could be a good strategy. They will block each other on processing but will better utilize the i/o. This applies to both dumping and restoring data.
There is one process that can’t be parallelized— scanning the database. It relies on the scanning cursor, and there’s no way to perform the scan in multiple threads.
Let’s split the process into three stages:
Scanning,
Dumping data, and
Restoring data
Scanned keys can be served through a channel to a set of goroutines, concurrently dumping these keys values and TTLs, and sending them through another channel to another set of goroutines. They, in turn, restore this data in the target instance.
Here’s an example of a visualization:
The Go gopher in this image was created by Renee French, the image was composited by me.
Gophers running in circles are loops in the goroutines, reading from the channel, processing data, and sending the processed data to another channel.
Implementation
We’ll start implementing a scanner and exporter in the same package for simplicity, starting with declaring structures:
type KeyDump struct {
Key string
Value string
Ttl int
}
type RedisScannerOpts struct {
Pattern string
ScanCount int
PullRoutineCount int
}
type RedisScanner struct {
client radix.Client
options RedisScannerOpts
keyChannel chan string
dumpChannel chan KeyDump
}
func NewScanner(client radix.Client, options RedisScannerOpts) *RedisScanner {
return &RedisScanner{
client: client,
options: options,
dumpChannel: make(chan KeyDump),
keyChannel: make(chan string),
}
}
Two channels are declared here. The first is a plain string channel to send scanned keys from the scanner to the group of exporting goroutines. The second channel of KeyDump
structures is for sending dumped data to the goroutines restoring data.
A KeyDump structure contains all the necessary information about simple Redis values: key, value, and TTL.
First Goroutines
The following function orchestrates goroutines for scanning and exporting data:
func (s *redisScanner) Start() {
wgPull := new(sync.WaitGroup)
wgPull.Add(s.options.PullRoutineCount)
go s.scanRoutine()
for i := 0; i < s.options.PullRoutineCount; i++ {
go s.exportRoutine(wgPull)
}
wgPull.Wait()
close(s.dumpChannel)
}
As you can see, it spawns one scanning routine and the number of exporting goroutines defined by the PullRoutineCount option. Pay attention to the variable named wgPull
of type WaitGroup
, a handy tool that makes sure our code doesn’t exit before the process is complete.
WaitGroup
waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block excution until all goroutines have finished.
Waitgroup is initialized through the Add
method with the overall number of goroutines. The variable is provided to each goroutine as an argument, and when goroutine finishes its work, it calls Done
method. Add
increments an internal counter andDone
decrements it. Wait
method blocks execution until the counter reaches zero.
The goroutine scanner structure is similar to what we had in the first version:
func (s *RedisScanner) scanRoutine() {
var key string
scanOpts := radix.ScanOpts{
Command: "SCAN",
Count: s.options.ScanCount,
}
if s.options.Pattern != "*" {
scanOpts.Pattern = s.options.Pattern
}
radixScanner := radix.NewScanner(s.client, scanOpts)
for radixScanner.Next(&key) {
s.keyChannel <- key
}
close(s.keyChannel)
}
Everything is self-explanatory, but there are a few things worth mentioning:
<-
sends data to the channelAt the very end, we close the channel to let the goroutines know that no more data is going to be sent
Exporting goroutine already looks familiar to us. Terminating execution on a client error is not really a nice way to handle errors, but if the connection is reliable, it will never be a problem.
func (s *RedisScanner) exportRoutine(wg *sync.WaitGroup) {
for key := range s.keyChannel {
var value string
var ttl int
p := radix.Pipeline(
radix.Cmd(&ttl, "PTTL", key),
radix.Cmd(&value, "DUMP", key),
)
if err := s.client.Do(p); err != nil {
log.Fatal(err)
}
if ttl < 0 {
ttl = 0
}
s.reporter.AddExportedCounter(1)
s.dumpChannel <- KeyDump{
Key: key,
Ttl: ttl,
Value: value,
}
}
wg.Done()
}
The reading from the channel is implemented with the range
keyword which exits the for loop automatically when the channel (s.keyChannel)
is closed. wg.Done()
in the last line helps to ensure that all the keys passed through s.keyChannel
were dumped and sent through s.dumpChannel
.
As you maybe know, struct fields starting with a lower-case letter are considered internal for the package, so we have to provide a getter in order to allow other packages to read the dumpChannel
field. This is also a chance to declare return type as a channel only intended to be read from (using <-chan
instead of just chan
type):
func (s *RedisScanner) GetDumpChannel() <-chan KeyDump {
return s.dumpChannel
}
Goroutines to Restore Exported Data
Pusher can also be configured and uses WaitGroup to orchestrate goroutines:
func NewRedisPusher(client radix.Client, dumpChannel <-chan scanner.KeyDump) *RedisPusher {
return &RedisPusher{
client: client,
dumpChannel: dumpChannel,
}
}
type RedisPusher struct {
client radix.Client
dumpChannel <-chan scanner.KeyDump
}
func (p *RedisPusher) Start(wg *sync.WaitGroup, number int) {
wg.Add(number)
for i := 0; i < number; i++ {
go p.pushRoutine(wg)
}
}
And pushRoutine
uses a similar practice to read from the channel and exit:
func (p *RedisPusher) pushRoutine(wg *sync.WaitGroup) {
for dump := range p.dumpChannel {
p.reporter.AddPushedCounter(1)
err := p.client.Do(radix.FlatCmd(nil, "RESTORE", dump.Key, dump.Ttl, dump.Value, "REPLACE"))
if err != nil {
log.Fatal(err)
}
}
wg.Done()
}
There is one important thing to note here: dumpChannel
is closed by scanner only after all exporters exit. That guarantees no data will be lost at the very end. It’s achieved with wgPull
and two lines in RedisScanner
‘s Start()
receiver:
wgPull.Wait()
close(s.dumpChannel)
Wiring Everything Together
Now let’s use developed packages in the Cobra command to put it all together.
First, extend the command definition to add more options:
func init() {
rootCmd.AddCommand(copyCmd)
copyCmd.Flags().StringVar(&pattern, "pattern", "*", "Match pattern for keys")
copyCmd.Flags().IntVar(&scanCount, "scanCount", 100, "COUNT parameter for redis SCAN command")
copyCmd.Flags().IntVar(&report, "report", 1, "Report current status every N seconds")
copyCmd.Flags().IntVar(&exportRoutines, "exportRoutines", 30, "Number of parallel export goroutines")
copyCmd.Flags().IntVar(&pushRoutines, "pushRoutines", 30, "Number of parallel push goroutines")
}
Then, create a scanner and pusher (and WaitGroup
for them). Don’t forget to call Wait()
on it, otherwise, the command will exit immediately:
// clientSource and clientTarget initialization
redisScanner := scanner.NewScanner(
clientSource,
scanner.RedisScannerOpts{
Pattern: pattern,
ScanCount: scanCount,
PullRoutineCount: exportRoutines,
},
)
redisPusher := pusher.NewRedisPusher(clientTarget, redisScanner.GetDumpChannel())
waitingGroup := new(sync.WaitGroup)
redisPusher.Start(waitingGroup, pushRoutines)
redisScanner.Start()
waitingGroup.Wait()
Benchmark
The most exciting part is to see the difference. Let’s take a look at the same test cases and compare them:
Test #1
Source database: 453,967 keys.Keys to copy: 10,000 keys.
Test #2
Source database: 453,967 keys.Keys to copy: 367,610 keys.
Processing is three to nine times faster when testing on a local machine. The real execution on the production infrastructure took less than two minutes to copy about a million keys.
Conclusion
This application was one of my first codebases written in Golang. It also helped to fix a real-life problem with very little development and operations time needed.
If you look at the full version of the code here, you’ll see a “sidecar” goroutine that collects counters on scanned, exported, and pushed keys, and reports in with configured time intervals to stdout
. It helps to see the progress of execution in the following format:
Start copying
2021/02/14 13:11:42 Scanned: 29616 Exported: 29616 Pushed: 29616 after 1.000153648s
2021/02/14 13:11:43 Scanned: 59621 Exported: 59615 Pushed: 59615 after 2.000128223s
2021/02/14 13:11:44 Scanned: 89765 Exported: 89765 Pushed: 89765 after 3.0001194s
2021/02/14 13:11:44 Scanned: 100000 Exported: 100000 Pushed: 100000 after 3.347127281s
Finish copying
Do you have some examples of how Golang helped you to find a simple solution to a tricky problem? Tweet me a response with the link below.