Skip to content

Commit

Permalink
支持热点数据Delay时间配置
Browse files Browse the repository at this point in the history
  • Loading branch information
jorben committed Mar 17, 2024
1 parent b6b06b5 commit 6bd90ed
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 5 deletions.
3 changes: 2 additions & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ remote:
sync:
# 是否启用实时同步(监听本地文件变更进行同步,仅同步服务运行期间发生变更的文件,可结合check_job实现全量同步)
real_time:
enable: true
enable: false
hot_delay: 5 # 单位分钟(1-60),对频繁修改的文件进行延迟同步,避免频繁的覆盖上传
# 是否启用定期文件对账(扫描对比本地与远端文件差异进行同步)
check_job:
enable: true
Expand Down
11 changes: 10 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ type SyncConfig struct {
} `yaml:"remote"`
Sync struct {
RealTime struct {
Enable bool `yaml:"enable"`
Enable bool `yaml:"enable"`
HotDelay int `yaml:"hot_delay"`
} `yaml:"real_time"`
CheckJob struct {
Enable bool `yaml:"enable"`
Expand Down Expand Up @@ -67,6 +68,7 @@ func (c *SyncConfig) GetString() string {
s += fmt.Sprintln("Sync: -----------------------------------")
s += fmt.Sprintln(" Real-time:")
s += fmt.Sprintf(" Enable:\t| %t\n", c.Sync.RealTime.Enable)
s += fmt.Sprintf(" HotDelay:\t| %d minute\n", c.Sync.RealTime.HotDelay)
s += fmt.Sprintln(" Check-job:")
s += fmt.Sprintf(" Enable:\t| %t\n", c.Sync.CheckJob.Enable)
s += fmt.Sprintf(" Interval:\t| %d hour\n", c.Sync.CheckJob.Interval)
Expand Down Expand Up @@ -98,5 +100,12 @@ func loadConfig(path string) *SyncConfig {
cfg.Remote.Path = strings.TrimLeft(cfg.Remote.Path, "/")
}

// 处理Hot delay,最小1分钟,最大60分钟
if cfg.Sync.RealTime.HotDelay < 1 {
cfg.Sync.RealTime.HotDelay = 1
} else if cfg.Sync.RealTime.HotDelay > 60 {
cfg.Sync.RealTime.HotDelay = 60
}

return cfg
}
4 changes: 3 additions & 1 deletion transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
type Transfer struct {
LocalPrefix string
RemotePrefix string
HotDelay time.Duration
PutChan chan string
DeleteChan chan string
Storage *Storage
Expand All @@ -23,6 +24,7 @@ func NewTransfer(c *config.SyncConfig, putCh chan string, deleteCh chan string,
return &Transfer{
LocalPrefix: c.Local.Path,
RemotePrefix: c.Remote.Path,
HotDelay: time.Duration(c.Sync.RealTime.HotDelay) * time.Minute,
PutChan: putCh,
DeleteChan: deleteCh,
Storage: storage,
Expand All @@ -48,7 +50,7 @@ func (t *Transfer) Run(ctx context.Context) {
}
log.Infof("Sync success, path: %s", subPath)
// 将执行成功的记录加入到TTLSet,供热点文件发现
ttlset.Add(subPath, 5*time.Minute)
ttlset.Add(subPath, t.HotDelay)
return nil
})
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
type Watcher struct {
Enable bool
Ignore []string
HotDelay time.Duration
LocalPrefix string
Notify *fsnotify.Watcher
PutChan chan string
Expand All @@ -30,6 +31,7 @@ func NewWatcher(c *config.SyncConfig, putCh chan string, deleteCh chan string) (

return &Watcher{
Enable: c.Sync.RealTime.Enable,
HotDelay: time.Duration(c.Sync.RealTime.HotDelay) * time.Minute,
Notify: notify,
PutChan: putCh,
DeleteChan: deleteCh,
Expand Down Expand Up @@ -82,7 +84,7 @@ func (w *Watcher) Watch() error {
hotKeySet := make(map[string]string)
var mu sync.Mutex

ticker := time.NewTicker(5 * time.Minute)
ticker := time.NewTicker(w.HotDelay)
defer ticker.Stop()

for {
Expand All @@ -108,7 +110,7 @@ func (w *Watcher) Watch() error {
// 判断文件是否热点文件,热点文件进行延迟更新,以节省流量和操作次数
if ttlset.Exists(event.Name) {
// 丢入set中,合并多个同名文件的事件(热点降温)
log.Debugf("Hotspot path, will be delay sync %s", event.Name)
log.Debugf("Hot path, will be delay sync %s", event.Name)
mu.Lock()
hotKeySet[event.Name] = ""
mu.Unlock()
Expand Down Expand Up @@ -148,6 +150,7 @@ func (w *Watcher) Watch() error {
}
mu.Unlock()
deleteKeys = deleteKeys[:0]

case err, ok := <-w.Notify.Errors:
if !ok {
continue
Expand Down

0 comments on commit 6bd90ed

Please sign in to comment.