Copy 1 Million Redis Keys in 2 Minutes with Golang

Golang is a useful programming language that can solve daily problems in an efficient way. It’s easy to learn, and doesn’t require writing a lot of code to perform well.

Let’s take a look at how Golang can help in a simple and practical case involving copying large amounts of Redis keys.

At some point it became necessary to split our Amazon ElastiCache store into two parts — one for storing cached data, and the other for storing users’ sessions.

We, unfortunately, had them on the same instance previously. We also didn’t want to interrupt long-living sessions by resetting the storage.

Amazon ElastiCache is compatible with the Redis protocol, though with certain limitations. Redis supports the MIGRATE command, allowing you to move keys matched by a pattern from one instance to another.

Internally it works by executing DUMP+DEL commands on the source instance and creating them in target instance using RESTORE. However, Amazon’s version didn’t support this command at the time.

Back then, my practical experience with Golang was limited. I’d only implemented projects for fun and was familiar with basic syntax and concepts like goroutine and channels. But I’d decided that was enough to make use of Golang’s strengths to solve the problem I was facing.

Step 1: Let’s Write Some Simple Code

Let’s assume that Golang is fast enough to do the job. Keep in mind that Redis is, mostly, a single-threaded server from the point of view of commands execution and implements replication with no concurrency.

Preparation

I’ve picked two base libraries for this challenge:

  • Radix to connect to the Redis API, and

  • Cobra to make it easier to build the command-line interface for the tool

package cmd

import (
  "github.com/mediocregopher/radix/v3"
  "github.com/spf13/cobra"
  "log"
)

var pattern string
var scanCount, report, limit int

var copyCmd = &cobra.Command{
  Use:   "copy [sourceHost:port] [targetHost:port]",
  Short: "Copy keys from source redis instance to destination by given pattern",
  Long: "",
  Args:  cobra.MinimumNArgs(2),
  Run: func(cmd *cobra.Command, args []string) {
    clientSource, err := radix.DefaultClientFunc("tcp", args[0])
    if err != nil {
      log.Fatal(err)
    }

    clientTarget, err := radix.DefaultClientFunc("tcp", args[1])
    if err != nil {
      log.Fatal(err)
    }

    // ... here the copying will happen
  },
}

var rootCmd = &cobra.Command{
  Use:   "go-redis-migrate",
  Short: "Application to migrate redis data from one instance to another",
  Long:  "",
}

func init() {
  rootCmd.AddCommand(copyCmd)

  copyCmd.Flags().StringVar(&pattern, "pattern", "*", "Match pattern for keys")
  copyCmd.Flags().IntVar(&scanCount, "scanCount", 100, "COUNT parameter for redis SCAN command")
  copyCmd.Flags().IntVar(&report, "report", 1000, "After what number of keys copied to report time")
  copyCmd.Flags().IntVar(&limit, "limit", 0, "After what number of keys copied to stop (0 - unlimited)")
}

The interface is ready, it supports the “pattern” parameter to match keys, and the “limit” parameter to define the maximum number of keys. The source and destination are provided as arguments and are also required.

The Main Loop

Radix supports creating a “scanner” structure that helps you iterate over keys:

scanOpts := radix.ScanOpts{
  Command: "SCAN",
  Count:   scanCount,
}

if pattern != "*" {
  scanOpts.Pattern = pattern
}

scanner := radix.NewScanner(clientSource, scanOpts)

var key string
counter := 0

for scanner.Next(&key) {
  // copy the key

  counter++
}

if err := scanner.Close(); err != nil {
  log.Fatal(err)
}

log.Printf("In total %d keys copied in %s", counter, time.Since(start))

The loop is now ready. What’s left is to read and restore data in the target. I joined the PTTL and DUMP command to fetch time to live and value of the key in a pipeline to save execution time.

var value string
var ttl int

p := radix.Pipeline(
  radix.Cmd(&ttl, "PTTL", key),
  radix.Cmd(&value, "DUMP", key),
)

if err := clientSource.Do(p); err != nil {
  panic(err)
}

if ttl < 0 {
  ttl = 0
}

err = clientTarget.Do(radix.FlatCmd(nil, "RESTORE", key, ttl, value, "REPLACE"))
if err != nil {
  log.Fatal(err)
}

That’s already enough for the code to work, but adding some reporting logic would definitely improve the user experience.

The complete code can be found here: https://github.com/obukhov/go-redis-migrate/blob/v1.0/cmd/copy.go

But is it really that good?

Let’s run some benchmark tests by quickly spawning two Redis instances locally with Docker, and seeding the source with data (453,967 keys in total, but we only copy part of them by matching a pattern).

Then, we run each test three times to see the random deviation:

10000 keys to copy: 17.79s 18.01s 17.98s
367610 keys to copy: 8m57.98s 8m44.98s 8m58.07s

That’s not bad, but let’s see if we can improve it.

Step 2. Utilize Concurrency

Let’s visualize the sequence of operations in the current implementation:

What can we do to improve the performance here?

We can clearly see the following shortcomings:

  • Reading from the source and writing to the target is serialized, although it can be executed in parallel

  • The single-threaded nature of Redis only affects command execution, but serving data (network i/o) can also be parallelized. Depending on the value size, this can make a big difference

Sending multiple concurrent requests could be a good strategy. They will block each other on processing but will better utilize the i/o. This applies to both dumping and restoring data.

There is one process that can’t be parallelized— scanning the database. It relies on the scanning cursor, and there’s no way to perform the scan in multiple threads.

Let’s split the process into three stages:

  • Scanning,

  • Dumping data, and

  • Restoring data

Scanned keys can be served through a channel to a set of goroutines, concurrently dumping these keys values and TTLs, and sending them through another channel to another set of goroutines. They, in turn, restore this data in the target instance.

Here’s an example of a visualization:

The Go gopher in this image was created by Renee French, the image was composited by me.

Gophers running in circles are loops in the goroutines, reading from the channel, processing data, and sending the processed data to another channel.

Implementation

We’ll start implementing a scanner and exporter in the same package for simplicity, starting with declaring structures:

type KeyDump struct {
  Key   string
  Value string
  Ttl   int
}

type RedisScannerOpts struct {
  Pattern          string
  ScanCount        int
  PullRoutineCount int
}

type RedisScanner struct {
  client      radix.Client
  options     RedisScannerOpts
  keyChannel  chan string
  dumpChannel chan KeyDump
}

func NewScanner(client radix.Client, options RedisScannerOpts) *RedisScanner {
  return &RedisScanner{
    client:      client,
    options:     options,
    dumpChannel: make(chan KeyDump),
    keyChannel:  make(chan string),
  }
}

Two channels are declared here. The first is a plain string channel to send scanned keys from the scanner to the group of exporting goroutines. The second channel of KeyDumpstructures is for sending dumped data to the goroutines restoring data.

A KeyDump structure contains all the necessary information about simple Redis values: key, value, and TTL.

First Goroutines

The following function orchestrates goroutines for scanning and exporting data:

func (s *redisScanner) Start() {
  wgPull := new(sync.WaitGroup)
  wgPull.Add(s.options.PullRoutineCount)

  go s.scanRoutine()
  for i := 0; i < s.options.PullRoutineCount; i++ {
    go s.exportRoutine(wgPull)
  }

  wgPull.Wait()
  close(s.dumpChannel)
}

As you can see, it spawns one scanning routine and the number of exporting goroutines defined by the PullRoutineCount option. Pay attention to the variable named wgPull of type WaitGroup, a handy tool that makes sure our code doesn’t exit before the process is complete.

WaitGroup waits for a collection of goroutines to finish. The main goroutine calls Add to set the number of goroutines to wait for. Then each of the goroutines runs and calls Done when finished. At the same time, Wait can be used to block excution until all goroutines have finished.

Waitgroup is initialized through the Add method with the overall number of goroutines. The variable is provided to each goroutine as an argument, and when goroutine finishes its work, it calls Done method. Add increments an internal counter andDone decrements it. Wait method blocks execution until the counter reaches zero.

The goroutine scanner structure is similar to what we had in the first version:

func (s *RedisScanner) scanRoutine() {
  var key string
  scanOpts := radix.ScanOpts{
    Command: "SCAN",
    Count:   s.options.ScanCount,
  }

  if s.options.Pattern != "*" {
    scanOpts.Pattern = s.options.Pattern
  }

  radixScanner := radix.NewScanner(s.client, scanOpts)
  for radixScanner.Next(&key) {
    s.keyChannel <- key
  }

  close(s.keyChannel)
}

Everything is self-explanatory, but there are a few things worth mentioning:

  • <- sends data to the channel

  • At the very end, we close the channel to let the goroutines know that no more data is going to be sent

Exporting goroutine already looks familiar to us. Terminating execution on a client error is not really a nice way to handle errors, but if the connection is reliable, it will never be a problem.

func (s *RedisScanner) exportRoutine(wg *sync.WaitGroup) {
  for key := range s.keyChannel {
    var value string
    var ttl int

    p := radix.Pipeline(
      radix.Cmd(&ttl, "PTTL", key),
      radix.Cmd(&value, "DUMP", key),
    )

    if err := s.client.Do(p); err != nil {
      log.Fatal(err)
    }

    if ttl < 0 {
      ttl = 0
    }

    s.reporter.AddExportedCounter(1)
    s.dumpChannel <- KeyDump{
      Key:   key,
      Ttl:   ttl,
      Value: value,
    }
  }

  wg.Done()
}

The reading from the channel is implemented with the range keyword which exits the for loop automatically when the channel (s.keyChannel) is closed. wg.Done() in the last line helps to ensure that all the keys passed through s.keyChannel were dumped and sent through s.dumpChannel.

As you maybe know, struct fields starting with a lower-case letter are considered internal for the package, so we have to provide a getter in order to allow other packages to read the dumpChannel field. This is also a chance to declare return type as a channel only intended to be read from (using <-chan instead of just chan type):

func (s *RedisScanner) GetDumpChannel() <-chan KeyDump {
  return s.dumpChannel
}

Goroutines to Restore Exported Data

Pusher can also be configured and uses WaitGroup to orchestrate goroutines:

func NewRedisPusher(client radix.Client, dumpChannel <-chan scanner.KeyDump) *RedisPusher {
  return &RedisPusher{
    client:      client,
    dumpChannel: dumpChannel,
  }
}

type RedisPusher struct {
  client      radix.Client
  dumpChannel <-chan scanner.KeyDump
}

func (p *RedisPusher) Start(wg *sync.WaitGroup, number int) {
  wg.Add(number)
  for i := 0; i < number; i++ {
    go p.pushRoutine(wg)
  }
}

And pushRoutine uses a similar practice to read from the channel and exit:

func (p *RedisPusher) pushRoutine(wg *sync.WaitGroup) {
    for dump := range p.dumpChannel {
        p.reporter.AddPushedCounter(1)
        err := p.client.Do(radix.FlatCmd(nil, "RESTORE", dump.Key, dump.Ttl, dump.Value, "REPLACE"))
        if err != nil {
        log.Fatal(err)
        }
    }

    wg.Done()
}

There is one important thing to note here: dumpChannel is closed by scanner only after all exporters exit. That guarantees no data will be lost at the very end. It’s achieved with wgPull and two lines in RedisScanner‘s Start() receiver:

wgPull.Wait()
close(s.dumpChannel)

Wiring Everything Together

Now let’s use developed packages in the Cobra command to put it all together.

First, extend the command definition to add more options:

func init() {
  rootCmd.AddCommand(copyCmd)
  copyCmd.Flags().StringVar(&pattern, "pattern", "*", "Match pattern for keys")
  copyCmd.Flags().IntVar(&scanCount, "scanCount", 100, "COUNT parameter for redis SCAN command")
  copyCmd.Flags().IntVar(&report, "report", 1, "Report current status every N seconds")
  copyCmd.Flags().IntVar(&exportRoutines, "exportRoutines", 30, "Number of parallel export goroutines")
  copyCmd.Flags().IntVar(&pushRoutines, "pushRoutines", 30, "Number of parallel push goroutines")
}

Then, create a scanner and pusher (and WaitGroup for them). Don’t forget to call Wait() on it, otherwise, the command will exit immediately:

// clientSource and clientTarget initialization
redisScanner := scanner.NewScanner(
  clientSource,
  scanner.RedisScannerOpts{
        Pattern:          pattern,
        ScanCount:        scanCount,
        PullRoutineCount: exportRoutines,
    },
)

redisPusher := pusher.NewRedisPusher(clientTarget, redisScanner.GetDumpChannel())

waitingGroup := new(sync.WaitGroup)

redisPusher.Start(waitingGroup, pushRoutines)
redisScanner.Start()

waitingGroup.Wait()

Benchmark

The most exciting part is to see the difference. Let’s take a look at the same test cases and compare them:

Test #1

Source database: 453,967 keys.Keys to copy: 10,000 keys.

Test #2

Source database: 453,967 keys.Keys to copy: 367,610 keys.

Processing is three to nine times faster when testing on a local machine. The real execution on the production infrastructure took less than two minutes to copy about a million keys.

Conclusion

This application was one of my first codebases written in Golang. It also helped to fix a real-life problem with very little development and operations time needed.

If you look at the full version of the code here, you’ll see a “sidecar” goroutine that collects counters on scanned, exported, and pushed keys, and reports in with configured time intervals to stdout. It helps to see the progress of execution in the following format:

Start copying
2021/02/14 13:11:42 Scanned: 29616 Exported: 29616 Pushed: 29616 after 1.000153648s
2021/02/14 13:11:43 Scanned: 59621 Exported: 59615 Pushed: 59615 after 2.000128223s
2021/02/14 13:11:44 Scanned: 89765 Exported: 89765 Pushed: 89765 after 3.0001194s
2021/02/14 13:11:44 Scanned: 100000 Exported: 100000 Pushed: 100000 after 3.347127281s
Finish copying

Do you have some examples of how Golang helped you to find a simple solution to a tricky problem? Tweet me a response with the link below.