-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathrecon.go
305 lines (291 loc) · 8.46 KB
/
recon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
package putiosync
import (
"sort"
"strings"
"github.com/cenkalti/log"
"github.com/putdotio/putio-sync/v2/internal/inode"
)
// Reconciliation function does not perform any operation.
// It only takes the filesystem state as input and returns operations to be performed on those fileystems.
// It must be testable without side effects.
func reconciliation(syncFiles map[string]*syncFile) []iJob {
var jobs []iJob
// Sort files by path for deterministic output
files := sortSyncFiles(syncFiles)
for _, sf := range files {
log.Debugln(sf.String())
}
// Group files by certain keys for faster lookup
filesByRemoteID := mapRemoteFilesByID(syncFiles)
filesByInode := mapLocalFilesByInode(syncFiles)
// First, sync files with known state
// This is required for detecting simple move operations correctly.
for _, sf := range files {
if sf.state != nil {
for _, job := range syncWithState(sf, filesByRemoteID, filesByInode) {
if job != nil {
jobs = append(jobs, job)
}
}
}
}
// Then, sync first seen files
for _, sf := range files {
if sf.state == nil && !sf.skip {
job := syncFresh(sf)
if job != nil {
jobs = append(jobs, job)
}
}
}
return jobs
}
func syncFresh(sf *syncFile) iJob {
switch {
case sf.local != nil && sf.remote == nil:
// File present only on local side. Copy to the remote side.
if sf.local.Info().IsDir() {
return &createRemoteFolderJob{
relpath: sf.relpath,
}
}
return &uploadJob{
localFile: sf.local,
}
case sf.local == nil && sf.remote != nil:
// File present only on remote side. Copy to the local side.
if sf.remote.PutioFile().IsDir() {
return &createLocalFolderJob{
relpath: sf.relpath,
remoteID: sf.remote.PutioFile().ID,
}
}
return &downloadJob{
remoteFile: sf.remote,
}
case sf.local != nil && sf.remote != nil:
// File exists on both sides
switch {
case sf.local.Info().IsDir() && sf.remote.Info().IsDir():
// Dir exists on both sides, save this state to db
return &writeDirStateJob{
remoteID: sf.remote.PutioFile().ID,
relpath: sf.remote.RelPath(),
}
case sf.local.Info().IsDir() || sf.remote.Info().IsDir():
// One of the sides is a dir, the other is a file
log.Warningf("Conflicting file, skipping sync: %q", sf.relpath)
return nil
// Both sides are file, not folder
case sf.local.Info().Size() != sf.remote.PutioFile().Size:
log.Warningf("File sizes differ, skipping sync: %q", sf.relpath)
return nil
default:
// Assume files are same if they are in same size
return &writeFileStateJob{
localFile: sf.local,
remoteFile: sf.remote,
}
}
case sf.local == nil && sf.remote == nil:
return nil
default:
log.Errorf("Unhandled case for %q", sf.relpath)
return nil
}
}
func syncWithState(sf *syncFile, filesByRemoteID map[int64]*syncFile, filesByInode map[uint64]*syncFile) []iJob {
// We have a state from previous sync. Compare local and remote sides with existing state.
switch sf.state.Status {
case statusSynced:
switch {
case sf.local != nil && sf.remote != nil:
// Exist on both sides
if sf.local.Info().IsDir() && sf.remote.Info().IsDir() {
// Both sides are directory
return nil
}
if sf.local.Info().IsDir() || sf.remote.Info().IsDir() {
// One of the sides is a file
log.Warningf("Conflicting file, one side is a directory, skipping sync: %q", sf.relpath)
return nil
}
if sf.state.Size != sf.local.Info().Size() && sf.state.Size != sf.remote.PutioFile().Size {
log.Warningf("Conflicting file, both files have changed, skipping sync: %q", sf.relpath)
return nil
}
if sf.state.Size != sf.local.Info().Size() {
// Local file has changed
return []iJob{
&uploadJob{
localFile: sf.local,
},
}
}
if sf.state.Size != sf.remote.PutioFile().Size {
// Remote file has changed
return []iJob{
&downloadJob{
remoteFile: sf.remote,
},
}
}
// Assume files didn't change if their size didn't change
// This is the most common case that is executed most because once all files are in sync no operations will be done later.
return nil
case sf.local != nil && sf.remote == nil:
// File missing in remote side, could be deleted or moved elsewhere
target, ok := filesByRemoteID[sf.state.RemoteID]
if ok { // nolint: nestif
// File with the same id is found on another path
if target.state == nil {
// There is no existing state in move target
if target.remote.PutioFile().CRC32 == sf.state.CRC32 {
// Remote file is not changed
in, _ := inode.Get(sf.local.FullPath(), sf.local.Info())
if in == sf.state.LocalInode {
// Local file is not changed too
// Then, file must be moved. We can move the local file to same path.
target.skip = true
return []iJob{&moveLocalFileJob{
localFile: sf.local,
toRelpath: target.relpath,
state: *sf.state,
}}
}
}
}
}
// File is deleted on remote side
return []iJob{&deleteLocalFileJob{
localFile: sf.local,
state: *sf.state,
}}
case sf.local == nil && sf.remote != nil:
// File missing in local side, could be deleted or moved elsewhere
target, ok := filesByInode[sf.state.LocalInode]
if ok { // nolint: nestif
// File with same inode is found on another path
if target.state == nil {
if sf.remote.PutioFile().CRC32 == sf.state.CRC32 {
// Remote file is not changed
in, _ := inode.Get(target.local.FullPath(), target.local.Info())
if in == sf.state.LocalInode {
// Local file is not changed too
// Then, file must be moved. We can move the remote file to same path.
target.skip = true
return []iJob{&moveRemoteFileJob{
remoteFile: sf.remote,
toRelpath: target.relpath,
state: *sf.state,
}}
}
}
}
}
// File is delete on local side
return []iJob{&deleteRemoteFileJob{
remoteFile: sf.remote,
state: *sf.state,
}}
case sf.local == nil && sf.remote == nil:
// File deleted on both sides, handled by other cases above, let's delete the state.
return []iJob{&deleteStateJob{
state: *sf.state,
}}
default:
log.Errorf("Unhandled case for %q", sf.relpath)
return nil
}
case statusDownloading:
if sf.local == nil && sf.remote != nil {
if sf.remote.PutioFile().CRC32 == sf.state.CRC32 {
// Remote file is still the same, resume download
return []iJob{&downloadJob{
remoteFile: sf.remote,
state: sf.state,
}}
}
}
// Cancel current download and make a new sync decision
return []iJob{
&deleteStateJob{
state: *sf.state,
},
syncFresh(sf),
}
case statusUploading:
if sf.local != nil && sf.remote == nil {
in, _ := inode.Get(sf.local.FullPath(), sf.local.Info())
if in == sf.state.LocalInode {
// Local file is still the same, resume upload
return []iJob{&uploadJob{
localFile: sf.local,
state: sf.state,
}}
}
}
// Cancel current upload and make a new sync decision
return []iJob{
&deleteStateJob{
state: *sf.state,
},
syncFresh(sf),
}
default:
// Invalid status, should not happen in normal usage.
return []iJob{&deleteStateJob{
state: *sf.state,
}}
}
}
// sortSyncFiles so that folders come after regular files.
// This is required for correct moving of folders with files inside.
// First, files are synced, then folder can be moved or deleted.
func sortSyncFiles(m map[string]*syncFile) []*syncFile {
a := make([]*syncFile, 0, len(m))
for _, sf := range m {
a = append(a, sf)
}
sort.Slice(a, func(i, j int) bool {
x, y := a[i], a[j]
if isChildOf(x.relpath, y.relpath) {
return true
}
if isChildOf(y.relpath, x.relpath) {
return false
}
return x.relpath < y.relpath
})
return a
}
func isChildOf(child, parent string) bool {
return strings.HasPrefix(child, parent+"/")
}
func mapRemoteFilesByID(syncFiles map[string]*syncFile) map[int64]*syncFile {
m := make(map[int64]*syncFile, len(syncFiles))
for _, sf := range syncFiles {
if sf.remote != nil {
if !sf.remote.Info().IsDir() {
m[sf.remote.PutioFile().ID] = sf
}
}
}
return m
}
func mapLocalFilesByInode(syncFiles map[string]*syncFile) map[uint64]*syncFile {
m := make(map[uint64]*syncFile, len(syncFiles))
for _, sf := range syncFiles {
if sf.local != nil {
if !sf.local.Info().IsDir() {
in, err := inode.Get(sf.local.FullPath(), sf.local.Info())
if err != nil {
log.Error(err)
continue
}
m[in] = sf
}
}
}
return m
}