diff --git a/cmd/redis-shake/main.go b/cmd/redis-shake/main.go index 3930ed83..12e22739 100644 --- a/cmd/redis-shake/main.go +++ b/cmd/redis-shake/main.go @@ -177,10 +177,17 @@ func main() { theWriter = writer.NewRedisStandaloneWriter(ctx, opts) } if config.Opt.Advanced.EmptyDBBeforeSync { - // exec FLUSHALL command to flush db - entry := entry.NewEntry() - entry.Argv = []string{"FLUSHALL"} - theWriter.Write(entry) + if config.Opt.Advanced.EmptyDBByAsync { + log.Infof("execute FLUSHALL ASYNC to empty target db before sync") + if err := theWriter.FlushAllAsync(); err != nil { + log.Panicf("failed to flush target db: %v", err) + } + } else { + log.Infof("execute FLUSHALL to empty target db before sync") + flushEntry := entry.NewEntry() + flushEntry.Argv = []string{"FLUSHALL"} + theWriter.Write(flushEntry) + } } default: log.Panicf("no writer config entry found") diff --git a/internal/client/func.go b/internal/client/func.go index 14488e71..586433b0 100644 --- a/internal/client/func.go +++ b/internal/client/func.go @@ -3,9 +3,13 @@ package client import ( "bytes" "errors" + "fmt" + "strconv" "strings" + "time" "RedisShake/internal/client/proto" + "RedisShake/internal/config" "RedisShake/internal/log" ) @@ -54,3 +58,38 @@ func (r *Redis) IsValkey() bool { } return isValkey } + +func (r *Redis) FlushAllAsync() error { + reply := r.DoWithStringReply("FLUSHALL", "ASYNC") + if reply != "OK" { + return fmt.Errorf("FLUSHALL ASYNC failed: %s", reply) + } + + deadline := time.Now().Add(config.Opt.Advanced.LazyFreePendingObjectsMaxWait) + for time.Now().Before(deadline) { + info := r.DoWithStringReply("INFO", "memory") + pending := parseLazyFreePendingObjects(info) + if pending == 0 { + return nil + } + time.Sleep(config.Opt.Advanced.LazyFreePendingObjectsCheckInterval) + } + return fmt.Errorf("timeout waiting for lazyfree_pending_objects to be 0") +} + +func parseLazyFreePendingObjects(info string) int { + for _, line := range strings.Split(info, "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "lazyfree_pending_objects:") { + parts := strings.Split(line, ":") + if len(parts) >= 2 { + val, err := strconv.Atoi(strings.TrimSpace(parts[1])) + if err != nil { + return -1 + } + return val + } + } + } + return -1 +} diff --git a/internal/client/func_test.go b/internal/client/func_test.go index e11b2245..b89d362a 100644 --- a/internal/client/func_test.go +++ b/internal/client/func_test.go @@ -126,3 +126,57 @@ redis_mode:standalone }) } } + +func TestParseLazyFreePendingObjects(t *testing.T) { + tests := []struct { + name string + info string + want int + }{ + { + name: "lazyfree_pending_objects is 0", + info: `# Stats +lazyfree_pending_objects:0 +used_cpu_sys:0.5 +`, + want: 0, + }, + { + name: "lazyfree_pending_objects is 5", + info: `# Stats +lazyfree_pending_objects:5 +used_cpu_sys:0.5 +`, + want: 5, + }, + { + name: "no lazyfree_pending_objects field", + info: `# Stats +used_cpu_sys:0.5 +`, + want: -1, + }, + { + name: "empty info", + info: ``, + want: -1, + }, + { + name: "lazyfree_pending_objects with spaces", + info: `# Stats +lazyfree_pending_objects: 10 +used_cpu_sys:0.5 +`, + want: 10, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := parseLazyFreePendingObjects(tt.info) + if got != tt.want { + t.Errorf("parseLazyFreePendingObjects() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/internal/config/config.go b/internal/config/config.go index 68cb13d3..0525c54e 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -7,6 +7,7 @@ import ( "os" "regexp" "strings" + "time" "github.com/mcuadros/go-defaults" "github.com/rs/zerolog" @@ -75,6 +76,13 @@ type AdvancedOptions struct { AwsPSync string `mapstructure:"aws_psync" default:""` // 10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379@xhma21xfkssync EmptyDBBeforeSync bool `mapstructure:"empty_db_before_sync" default:"false"` + + // flush options + // When true, uses FLUSHALL ASYNC and waits for lazyfree_pending_objects to be 0. + // When false, skips flush operation. + EmptyDBByAsync bool `mapstructure:"empty_db_by_async" default:"false"` + LazyFreePendingObjectsMaxWait time.Duration `mapstructure:"lazy_free_pending_objects_max_wait" default:"60s"` + LazyFreePendingObjectsCheckInterval time.Duration `mapstructure:"lazy_free_pending_objects_check_interval" default:"100ms"` } type ModuleOptions struct { diff --git a/internal/writer/file_writer.go b/internal/writer/file_writer.go index ef4dacbc..3a578753 100644 --- a/internal/writer/file_writer.go +++ b/internal/writer/file_writer.go @@ -127,3 +127,7 @@ func (w *fileWriter) writeEntry(writer *bufio.Writer, e *entry.Entry) { writer.WriteString("\n") } } + +func (w *fileWriter) FlushAllAsync() error { + return nil +} diff --git a/internal/writer/interface.go b/internal/writer/interface.go index 21329d29..bdbdad3a 100644 --- a/internal/writer/interface.go +++ b/internal/writer/interface.go @@ -11,4 +11,5 @@ type Writer interface { Write(entry *entry.Entry) StartWrite(ctx context.Context) (ch chan *entry.Entry) Close() + FlushAllAsync() error } diff --git a/internal/writer/redis_cluster_writer.go b/internal/writer/redis_cluster_writer.go index 40f193f5..44354601 100644 --- a/internal/writer/redis_cluster_writer.go +++ b/internal/writer/redis_cluster_writer.go @@ -121,3 +121,12 @@ func (r *RedisClusterWriter) StatusConsistent() bool { } return true } + +func (r *RedisClusterWriter) FlushAllAsync() error { + for _, writer := range r.writers { + if err := writer.FlushAllAsync(); err != nil { + return err + } + } + return nil +} diff --git a/internal/writer/redis_standalone_writer.go b/internal/writer/redis_standalone_writer.go index b8070947..db603444 100644 --- a/internal/writer/redis_standalone_writer.go +++ b/internal/writer/redis_standalone_writer.go @@ -181,3 +181,7 @@ func (w *redisStandaloneWriter) StatusString() string { func (w *redisStandaloneWriter) StatusConsistent() bool { return atomic.LoadInt64(&w.stat.UnansweredBytes) == 0 && atomic.LoadInt64(&w.stat.UnansweredEntries) == 0 } + +func (w *redisStandaloneWriter) FlushAllAsync() error { + return w.client.FlushAllAsync() +} diff --git a/shake.toml b/shake.toml index f0bf612f..29f01326 100644 --- a/shake.toml +++ b/shake.toml @@ -166,6 +166,17 @@ aws_psync = "" # example: aws_psync = "10.0.0.1:6379@nmfu2sl5osync,10.0.0.1:6379 # repl-diskless-load on-empty-db empty_db_before_sync = false +# FLUSHALL ASYNC options +# When true, uses FLUSHALL ASYNC and waits for lazyfree_pending_objects to be 0. +# When false, uses synchronous FLUSHALL. +empty_db_by_async = false +# The maximum time to wait for lazyfree_pending_objects to be 0 +# Default: 60s +lazy_free_pending_objects_max_wait = "60s" +# The interval to check lazyfree_pending_objects status +# Default: 100ms +lazy_free_pending_objects_check_interval = "100ms" + [module] # The data format for BF.LOADCHUNK is not compatible in different versions. v2.6.3 <=> 20603 target_mbbloom_version = 20603