Skip to content

Commit

Permalink
完成定时任务全量check,优化变量命名,收拢实例化参数
Browse files Browse the repository at this point in the history
- 完成checkjob模块,支持定期执行全量对比补充
- 各模块实例化时尽量完成所需参数准备
- Put、Delete队列调整在一个方法中使用select消费
- 优化部分变量命名,使得更加一致
- 收拢对象在远端的Path到Storage模块中
- 支持单独开启或关闭实时与定时同步
  • Loading branch information
jorben committed Mar 15, 2024
1 parent eb14743 commit c0414bf
Show file tree
Hide file tree
Showing 5 changed files with 236 additions and 103 deletions.
106 changes: 106 additions & 0 deletions checkjob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package main

import (
"context"
"fmt"
"github.com/jorben/rsync-object-storage/config"
"github.com/jorben/rsync-object-storage/helper"
"github.com/jorben/rsync-object-storage/log"
"io/fs"
"path/filepath"
"time"
)

// CheckJob 定时问题
type CheckJob struct {
InitialDelay time.Duration
Interval int
Enable bool
PutChan chan string
LocalPrefix string
Ignore []string
Storage *Storage
}

// NewCheckJob 创建Job实例
func NewCheckJob(c *config.SyncConfig, ch chan string, storage *Storage) *CheckJob {
// 计算首次执行时间
now := time.Now()
targetTime, err := time.ParseInLocation("2006-01-02 15:04:05",
fmt.Sprintf("%d-%02d-%02d %s", now.Year(), now.Month(), now.Day(), c.Sync.CheckJob.StartAt), now.Location())
if err != nil {
// 格式不正确,设置为从0点开始
log.Errorf("Parse StartAt err: %s, Reset start at 0:0:0", err.Error())
targetTime = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
}

// 最高频率1小时一次
if c.Sync.CheckJob.Interval < 1 {
c.Sync.CheckJob.Interval = 1
}

// 如果时间已过去则+24小时看下一个启动时点
if now.After(targetTime) {
targetTime = targetTime.Add(24 * time.Hour)
}

return &CheckJob{
InitialDelay: targetTime.Sub(now),
Interval: c.Sync.CheckJob.Interval,
Enable: c.Sync.CheckJob.Enable,
Storage: storage,
LocalPrefix: c.Local.Path,
PutChan: ch,
Ignore: c.Sync.Ignore,
}
}

// Run Check job 启动入口
func (c *CheckJob) Run(ctx context.Context) {
if !c.Enable {
log.Debug("The check job is disabled")
return
}
log.Debugf("The check job will start at %s", time.Now().Add(c.InitialDelay).Format("2006-01-02 15:04:05"))
time.AfterFunc(c.InitialDelay, func() {
// 执行首次校对任务
go c.Walk(ctx)

// 创建周期定时器
ticker := time.NewTicker(time.Duration(c.Interval) * time.Minute)
defer ticker.Stop()
for range ticker.C {
// 执行周期校对任务
go c.Walk(ctx)
}
})
}

// Walk 遍历本地文件,对比与远端差异,存在差异的丢入变更队列
func (c *CheckJob) Walk(ctx context.Context) {
log.Info("Check job begin")
err := filepath.WalkDir(c.LocalPrefix, func(path string, d fs.DirEntry, err error) error {
if err != nil {
log.Errorf("WalkDir err: %s, skipping %s", err.Error(), path)
return filepath.SkipDir
}
// 在忽略名单的文件夹直接跳过
if d.IsDir() && helper.IsIgnore(path, c.Ignore) {
return filepath.SkipDir
}
// 对比文件
if !d.IsDir() && !helper.IsIgnore(path, c.Ignore) {
if isSame := c.Storage.IsSame(ctx, path); !isSame {
// 文件存在差异,丢入变更队列
c.PutChan <- path
log.Infof("Differences found %s", path)
}
}
return nil
})
if err != nil {
log.Errorf("WalkDir err: %s", err.Error())
return
}
log.Info("Check job ends")
}
23 changes: 15 additions & 8 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ func main() {
// 输出配置供检查
fmt.Println(c.GetString())

ctx := context.Background()
PutChan := make(chan string)
DeleteChan := make(chan string)
defer close(PutChan)
defer close(DeleteChan)

// 检查本地路径可读性
if _, err = os.ReadDir(c.Local.Path); err != nil {
log.Fatalf("ReadDir err: %s", err.Error())
Expand All @@ -39,29 +45,30 @@ func main() {
if err != nil {
log.Fatalf("NewStorage err: %s", err.Error())
}
ctx := context.Background()
if err = s.BucketExists(ctx); err != nil {
log.Fatalf("BucketExist err: %s", err.Error())
}

// 创建Watcher实例
w, err := NewWatcher(c.Sync.Ignore)
w, err := NewWatcher(c, PutChan, DeleteChan)
if err != nil {
log.Fatalf("NewWatcher err: %s", err.Error())
}
defer w.Close()

// 创建CheckJob实例
j := NewCheckJob(c, PutChan, s)
// 异步处理定期对账任务
go j.Run(ctx)

// 异步处理变更事件
t := NewTransfer(c.Local.Path, c.Remote.Path, s)
t := NewTransfer(c, PutChan, DeleteChan, s)
for i := 0; i < 8; i++ {
go t.ModifyObject(ctx, w.ModifyCh)
go t.DeleteObject(ctx, w.DeleteCh)
go t.Run(ctx)
}

// 异步处理定期对账任务

// 监听本地路径
if err = w.Watch(c.Local.Path); err != nil {
if err = w.Watch(); err != nil {
log.Fatalf("Watch err: %s", err.Error())
}

Expand Down
53 changes: 33 additions & 20 deletions storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@ import (
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
"os"
"strings"
)

type Storage struct {
Client *minio.Client
Bucket string
Prefix string
Minio *minio.Client
Bucket string
LocalPrefix string
RemotePrefix string
}

// NewStorage 获取对象存储客户端实例
Expand All @@ -29,15 +31,16 @@ func NewStorage(c *config.SyncConfig) (*Storage, error) {
return nil, err
}
return &Storage{
Client: cli,
Bucket: c.Remote.Bucket,
Prefix: c.Remote.Path,
Minio: cli,
Bucket: c.Remote.Bucket,
LocalPrefix: c.Local.Path,
RemotePrefix: c.Remote.Path,
}, nil
}

// BucketExists 判断Bucket是否存在
func (s *Storage) BucketExists(ctx context.Context) error {
exist, err := s.Client.BucketExists(ctx, s.Bucket)
exist, err := s.Minio.BucketExists(ctx, s.Bucket)
if err != nil {
return err
}
Expand All @@ -49,23 +52,25 @@ func (s *Storage) BucketExists(ctx context.Context) error {

// RemoveObject 删除对象
func (s *Storage) RemoveObject(ctx context.Context, objectName string) error {
_, err := s.Client.StatObject(ctx, s.Bucket, objectName, minio.StatObjectOptions{})
objectName = s.GetRemotePath(objectName)
_, err := s.Minio.StatObject(ctx, s.Bucket, objectName, minio.StatObjectOptions{})
if err != nil {
// 多半是Key不存在
log.Debugf("StatObject err: %s, path: %s", err.Error(), objectName)
return nil
}

return s.Client.RemoveObject(ctx, s.Bucket, objectName, minio.RemoveObjectOptions{})
return s.Minio.RemoveObject(ctx, s.Bucket, objectName, minio.RemoveObjectOptions{})

}

// RemoveObjects 批量删除对象
func (s *Storage) RemoveObjects(ctx context.Context, objectPath string) (someError error) {
objectsCh := make(chan minio.ObjectInfo)
ch := make(chan minio.ObjectInfo)
objectPath = s.GetRemotePath(objectPath)
go func() {
defer close(objectsCh)
for object := range s.Client.ListObjects(ctx, s.Bucket,
defer close(ch)
for object := range s.Minio.ListObjects(ctx, s.Bucket,
minio.ListObjectsOptions{Prefix: objectPath, Recursive: true}) {
if object.Err != nil {
log.Errorf("ListObjects err: %s", object.Err.Error())
Expand All @@ -74,23 +79,24 @@ func (s *Storage) RemoveObjects(ctx context.Context, objectPath string) (someErr
if object.Key == objectPath ||
(len(object.Key) > len(objectPath) && objectPath+"/" == object.Key[0:len(objectPath)+1]) {
// 避免误删了前缀相同但非子文件,比如 abc abcd.txt
objectsCh <- object
log.Debugf("Will be delete %s", object.Key)
ch <- object
log.Infof("Will be delete %s", object.Key)
}
}
}()

someError = nil
opts := minio.RemoveObjectsOptions{GovernanceBypass: true}
for err := range s.Client.RemoveObjects(ctx, s.Bucket, objectsCh, opts) {
for err := range s.Minio.RemoveObjects(ctx, s.Bucket, ch, opts) {
someError = err.Err
log.Errorf("RemoveObjects err: %s, path: %s", err.Err.Error(), err.ObjectName)
}
return someError
}

// FPutObject 上传对象
func (s *Storage) FPutObject(ctx context.Context, localPath string, objectName string) error {
func (s *Storage) FPutObject(ctx context.Context, localPath string) error {
objectName := localPath
if isDir, _ := helper.IsDir(localPath); isDir {
// 如果是文件夹则创建objectName/.keep文件,现有接口不支持直接创建空文件夹
objectName += "/.keep"
Expand All @@ -106,21 +112,23 @@ func (s *Storage) FPutObject(ctx context.Context, localPath string, objectName s
}
} else {
// 文件 则需要对远端内容一致性比较,内容一致则不重复上传
if isSame := s.IsSame(ctx, localPath, objectName); isSame {
if isSame := s.IsSame(ctx, localPath); isSame {
log.Debugf("Consistent, skipping %s", localPath)
return nil
}
}

if _, err := s.Client.FPutObject(ctx, s.Bucket, objectName, localPath, minio.PutObjectOptions{}); err != nil {
objectName = s.GetRemotePath(objectName)
if _, err := s.Minio.FPutObject(ctx, s.Bucket, objectName, localPath, minio.PutObjectOptions{}); err != nil {
return err
}
return nil
}

// IsSame 判断本地文件和远端文件内容是否一致
func (s *Storage) IsSame(ctx context.Context, localPath string, objectName string) bool {
objectInfo, err := s.Client.StatObject(ctx, s.Bucket, objectName, minio.StatObjectOptions{})
func (s *Storage) IsSame(ctx context.Context, localPath string) bool {
objectName := s.GetRemotePath(localPath)
objectInfo, err := s.Minio.StatObject(ctx, s.Bucket, objectName, minio.StatObjectOptions{})
if err != nil {
// 多半是Key不存在
log.Debugf("StatObject %s, path: %s", err.Error(), objectName)
Expand All @@ -134,3 +142,8 @@ func (s *Storage) IsSame(ctx context.Context, localPath string, objectName strin
}
return false
}

// GetRemotePath 把本地路径映射远端路径
func (s *Storage) GetRemotePath(path string) string {
return strings.TrimLeft(strings.Replace(path, s.LocalPrefix, s.RemotePrefix, 1), "/")
}
Loading

0 comments on commit c0414bf

Please sign in to comment.