diff --git a/config.yaml b/config.yaml index 2c7899d..3d4ba72 100644 --- a/config.yaml +++ b/config.yaml @@ -16,7 +16,8 @@ remote: sync: # 是否启用实时同步(监听本地文件变更进行同步,仅同步服务运行期间发生变更的文件,可结合check_job实现全量同步) real_time: - enable: true + enable: false + hot_delay: 5 # 单位分钟(1-60),对频繁修改的文件进行延迟同步,避免频繁的覆盖上传 # 是否启用定期文件对账(扫描对比本地与远端文件差异进行同步) check_job: enable: true diff --git a/config/config.go b/config/config.go index 880227b..b8c3975 100644 --- a/config/config.go +++ b/config/config.go @@ -27,7 +27,8 @@ type SyncConfig struct { } `yaml:"remote"` Sync struct { RealTime struct { - Enable bool `yaml:"enable"` + Enable bool `yaml:"enable"` + HotDelay int `yaml:"hot_delay"` } `yaml:"real_time"` CheckJob struct { Enable bool `yaml:"enable"` @@ -67,6 +68,7 @@ func (c *SyncConfig) GetString() string { s += fmt.Sprintln("Sync: -----------------------------------") s += fmt.Sprintln(" Real-time:") s += fmt.Sprintf(" Enable:\t| %t\n", c.Sync.RealTime.Enable) + s += fmt.Sprintf(" HotDelay:\t| %d minute\n", c.Sync.RealTime.HotDelay) s += fmt.Sprintln(" Check-job:") s += fmt.Sprintf(" Enable:\t| %t\n", c.Sync.CheckJob.Enable) s += fmt.Sprintf(" Interval:\t| %d hour\n", c.Sync.CheckJob.Interval) @@ -98,5 +100,12 @@ func loadConfig(path string) *SyncConfig { cfg.Remote.Path = strings.TrimLeft(cfg.Remote.Path, "/") } + // 处理Hot delay,最小1分钟,最大60分钟 + if cfg.Sync.RealTime.HotDelay < 1 { + cfg.Sync.RealTime.HotDelay = 1 + } else if cfg.Sync.RealTime.HotDelay > 60 { + cfg.Sync.RealTime.HotDelay = 60 + } + return cfg } diff --git a/transfer.go b/transfer.go index e45fe1c..f808e02 100644 --- a/transfer.go +++ b/transfer.go @@ -14,6 +14,7 @@ import ( type Transfer struct { LocalPrefix string RemotePrefix string + HotDelay time.Duration PutChan chan string DeleteChan chan string Storage *Storage @@ -23,6 +24,7 @@ func NewTransfer(c *config.SyncConfig, putCh chan string, deleteCh chan string, return &Transfer{ LocalPrefix: c.Local.Path, RemotePrefix: c.Remote.Path, + HotDelay: time.Duration(c.Sync.RealTime.HotDelay) * time.Minute, PutChan: putCh, DeleteChan: deleteCh, Storage: storage, @@ -48,7 +50,7 @@ func (t *Transfer) Run(ctx context.Context) { } log.Infof("Sync success, path: %s", subPath) // 将执行成功的记录加入到TTLSet,供热点文件发现 - ttlset.Add(subPath, 5*time.Minute) + ttlset.Add(subPath, t.HotDelay) return nil }) if err != nil { diff --git a/watcher.go b/watcher.go index 59954ff..d654f04 100644 --- a/watcher.go +++ b/watcher.go @@ -15,6 +15,7 @@ import ( type Watcher struct { Enable bool Ignore []string + HotDelay time.Duration LocalPrefix string Notify *fsnotify.Watcher PutChan chan string @@ -30,6 +31,7 @@ func NewWatcher(c *config.SyncConfig, putCh chan string, deleteCh chan string) ( return &Watcher{ Enable: c.Sync.RealTime.Enable, + HotDelay: time.Duration(c.Sync.RealTime.HotDelay) * time.Minute, Notify: notify, PutChan: putCh, DeleteChan: deleteCh, @@ -82,7 +84,7 @@ func (w *Watcher) Watch() error { hotKeySet := make(map[string]string) var mu sync.Mutex - ticker := time.NewTicker(5 * time.Minute) + ticker := time.NewTicker(w.HotDelay) defer ticker.Stop() for { @@ -108,7 +110,7 @@ func (w *Watcher) Watch() error { // 判断文件是否热点文件,热点文件进行延迟更新,以节省流量和操作次数 if ttlset.Exists(event.Name) { // 丢入set中,合并多个同名文件的事件(热点降温) - log.Debugf("Hotspot path, will be delay sync %s", event.Name) + log.Debugf("Hot path, will be delay sync %s", event.Name) mu.Lock() hotKeySet[event.Name] = "" mu.Unlock() @@ -148,6 +150,7 @@ func (w *Watcher) Watch() error { } mu.Unlock() deleteKeys = deleteKeys[:0] + case err, ok := <-w.Notify.Errors: if !ok { continue