-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathmemdb_iterator.go
157 lines (139 loc) · 3.52 KB
/
memdb_iterator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package db
import (
"bytes"
"context"
"github.com/google/btree"
)
const (
// Size of the channel buffer between traversal goroutine and iterator. Using an unbuffered
// channel causes two context switches per item sent, while buffering allows more work per
// context switch. Tuned with benchmarks.
chBufferSize = 64
)
// memDBIterator is a memDB iterator.
type memDBIterator struct {
ch <-chan *item
cancel context.CancelFunc
item *item
start []byte
end []byte
useMtx bool
}
var _ Iterator = (*memDBIterator)(nil)
// newMemDBIterator creates a new memDBIterator.
func newMemDBIterator(db *MemDB, start []byte, end []byte, reverse bool) *memDBIterator {
return newMemDBIteratorMtxChoice(db, start, end, reverse, true)
}
func newMemDBIteratorMtxChoice(db *MemDB, start []byte, end []byte, reverse bool, useMtx bool) *memDBIterator {
ctx, cancel := context.WithCancel(context.Background())
ch := make(chan *item, chBufferSize)
iter := &memDBIterator{
ch: ch,
cancel: cancel,
start: start,
end: end,
useMtx: useMtx,
}
if useMtx {
db.mtx.RLock()
}
go func() {
if useMtx {
defer db.mtx.RUnlock()
}
// Because we use [start, end) for reverse ranges, while btree uses (start, end], we need
// the following variables to handle some reverse iteration conditions ourselves.
var (
skipEqual []byte
abortLessThan []byte
)
visitor := func(i btree.Item) bool {
item := i.(*item)
if skipEqual != nil && bytes.Equal(item.key, skipEqual) {
skipEqual = nil
return true
}
if abortLessThan != nil && bytes.Compare(item.key, abortLessThan) == -1 {
return false
}
select {
case <-ctx.Done():
return false
case ch <- item:
return true
}
}
switch {
case start == nil && end == nil && !reverse:
db.btree.Ascend(visitor)
case start == nil && end == nil && reverse:
db.btree.Descend(visitor)
case end == nil && !reverse:
// must handle this specially, since nil is considered less than anything else
db.btree.AscendGreaterOrEqual(newKey(start), visitor)
case !reverse:
db.btree.AscendRange(newKey(start), newKey(end), visitor)
case end == nil:
// abort after start, since we use [start, end) while btree uses (start, end]
abortLessThan = start
db.btree.Descend(visitor)
default:
// skip end and abort after start, since we use [start, end) while btree uses (start, end]
skipEqual = end
abortLessThan = start
db.btree.DescendLessOrEqual(newKey(end), visitor)
}
close(ch)
}()
// prime the iterator with the first value, if any
if item, ok := <-ch; ok {
iter.item = item
}
return iter
}
// Close implements Iterator.
func (i *memDBIterator) Close() error {
i.cancel()
for range i.ch { // drain channel
}
i.item = nil
return nil
}
// Domain implements Iterator.
func (i *memDBIterator) Domain() ([]byte, []byte) {
return i.start, i.end
}
// Valid implements Iterator.
func (i *memDBIterator) Valid() bool {
return i.item != nil
}
// Next implements Iterator.
func (i *memDBIterator) Next() {
i.assertIsValid()
item, ok := <-i.ch
switch {
case ok:
i.item = item
default:
i.item = nil
}
}
// Error implements Iterator.
func (i *memDBIterator) Error() error {
return nil // famous last words
}
// Key implements Iterator.
func (i *memDBIterator) Key() []byte {
i.assertIsValid()
return i.item.key
}
// Value implements Iterator.
func (i *memDBIterator) Value() []byte {
i.assertIsValid()
return i.item.value
}
func (i *memDBIterator) assertIsValid() {
if !i.Valid() {
panic("iterator is invalid")
}
}