Skip to content

Commit

Permalink
refactor(rclone): use golang.org/x/sync/semaphore instead of a channel
Browse files Browse the repository at this point in the history
  • Loading branch information
RoyXiang committed Jul 28, 2024
1 parent 3fbd733 commit 440800d
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 34 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/rclone/rclone v1.67.0
github.com/samber/lo v1.46.0
golang.org/x/oauth2 v0.21.0
golang.org/x/sync v0.7.0
)

require (
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ golang.org/x/oauth2 v0.21.0 h1:tsimM75w1tF/uws5rbeHzIWxEqElMehnc+iW793zsZs=
golang.org/x/oauth2 v0.21.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
Expand Down
24 changes: 5 additions & 19 deletions rclone/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,34 +22,20 @@ func rcDumpConfig() map[string]RemoteConfig {

func rcCopyDir(src, dest string) bool {
args := append([]string{"copy", src, dest}, moveArgs...)

lArgs := append(args, largeFileArgs...)
if !rcExecCmd(argLargeFileTransfers*2, lArgs...) {
if !rcExecCmd(lArgs...) {
return false
}

sArgs := append(args, smallFileArgs...)
return rcExecCmd(argSmallFileTransfers, sArgs...)
return rcExecCmd(sArgs...)
}

func rcCopyFile(src, dest string, filesize int64) bool {
func rcCopyFile(src, dest string) bool {
args := append([]string{"copyto", src, dest, "--transfers=1", "--checkers=2"}, moveArgs...)
if filesize < argMultiThreadCutoff {
return rcExecCmd(1, args...)
}
return rcExecCmd(2, args...)
return rcExecCmd(args...)
}

func rcExecCmd(transfers int, args ...string) bool {
for i := 0; i < transfers; i++ {
transferQueue <- struct{}{}
}
defer func() {
for i := 0; i < transfers; i++ {
<-transferQueue
}
}()

func rcExecCmd(args ...string) bool {
cmdArgs := append([]string{"--quiet"}, args...)
cmd := exec.Command("rclone", cmdArgs...)
cmd.Env = cmdEnv
Expand Down
18 changes: 9 additions & 9 deletions rclone/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/RoyXiang/putcallback/putio"
"github.com/rclone/rclone/fs"
"github.com/rclone/rclone/fs/fspath"
"golang.org/x/sync/semaphore"
)

var (
Expand All @@ -26,28 +27,27 @@ var (
excludeFileTypes []string

argMultiThreadCutoff int64
argLargeFileTransfers int
argSmallFileTransfers int
argMaxTransfers int
argLargeFileTransfers int64
argSmallFileTransfers int64
argMaxTransfers int64

cmdEnv []string
moveArgs []string
largeFileArgs []string
smallFileArgs []string

taskChan chan *putio.FileInfo
transferQueue chan struct{}
taskChan chan *putio.FileInfo
transferSem *semaphore.Weighted

callbackMu sync.Mutex
folderMu sync.Mutex
workerWg sync.WaitGroup
)

func init() {
rcGlobalConfig := fs.GetConfig(nil)
argMultiThreadCutoff = int64(rcGlobalConfig.MultiThreadCutoff)
argLargeFileTransfers = rcGlobalConfig.Transfers
argSmallFileTransfers = rcGlobalConfig.Transfers * 2
argLargeFileTransfers = int64(rcGlobalConfig.Transfers)
argSmallFileTransfers = argLargeFileTransfers * 2
argMaxTransfers = argSmallFileTransfers + 2

moveArgs = []string{
Expand Down Expand Up @@ -109,7 +109,7 @@ func init() {
Put = putio.New(accessToken, maxTransfers)

taskChan = make(chan *putio.FileInfo, 1)
transferQueue = make(chan struct{}, argMaxTransfers)
transferSem = semaphore.NewWeighted(int64(argMaxTransfers))
}

func Start() {
Expand Down
27 changes: 21 additions & 6 deletions rclone/worker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rclone

import (
"context"
"fmt"
"log"
"path"
Expand Down Expand Up @@ -85,17 +86,19 @@ func checkBeforeTransfer(info *putio.FileInfo) bool {
}

func moveFolder(folder *putio.FileInfo) {
folderMu.Lock()
defer func() {
workerWg.Done()
folderMu.Unlock()
}()
defer workerWg.Done()

if !checkBeforeTransfer(folder) {
log.Printf("Folder %s skipped", folder.Name)
return
}

if err := transferSem.Acquire(context.Background(), argSmallFileTransfers); err != nil {
log.Printf("Failed acquiring semaphore while moving folder %s", folder.Name)
return
}
defer transferSem.Release(argSmallFileTransfers)

if folder.Size > 0 {
log.Printf("Moving folder %s...", folder.Name)

Expand Down Expand Up @@ -127,6 +130,18 @@ func moveFile(file *putio.FileInfo) {
return
}

var weight int64
if file.Size < argMultiThreadCutoff {
weight = 1
} else {
weight = 2
}
if err := transferSem.Acquire(context.Background(), weight); err != nil {
log.Printf("Failed acquiring semaphore while moving file %s", file.Name)
return
}
defer transferSem.Release(weight)

newFilename := file.Name
if strings.HasPrefix(file.ContentType, putio.ContentTypeVideo) {
switch renamingStyle {
Expand All @@ -139,7 +154,7 @@ func moveFile(file *putio.FileInfo) {

src := remoteSrc.FullPath(file.FullPath, true)
dest := remoteDest.FullPath(newFilename, false)
if rcCopyFile(src, dest, file.Size) {
if rcCopyFile(src, dest) {
Put.DeleteFile(file.ID)
if file.Name == newFilename {
notification.Send(fmt.Sprintf("%s moved", file.Name))
Expand Down

0 comments on commit 440800d

Please sign in to comment.