-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtxn.go
195 lines (167 loc) · 4.85 KB
/
txn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package lmdb
import (
"errors"
"fmt"
mdb "github.com/libreoscar/gomdb"
)
type ReadTxner interface {
BucketStat(bucket string) *Stat
Get(bucket string, key []byte) ([]byte, bool)
GetNoCopy(bucket string, key []byte) ([]byte, bool)
Iterate(bucket string) *Iterator
}
type ReadTxn struct {
// In this package, a DBI is obtained only through Open/Open2, and is never closed until
// Context.Close(), in which all dbis are closed automatically.
buckets map[string]mdb.DBI
txn *mdb.Txn
// Cached iterators in the current transaction, will be closed when txn finishes.
itrs []*Iterator
}
type ReadWriteTxn struct {
env *mdb.Env
*ReadTxn
dirtyKeys map[string]bool // the key is serilized CellKey
}
//--------------------------------- ReadTxn -------------------------------------------------------
// panic if {bucket} does not exist, internal use
func (txn *ReadTxn) getBucketId(bucket string) mdb.DBI {
id, b := txn.buckets[bucket]
if !b {
panic(fmt.Errorf("bucket does not exist: %s", bucket))
} else {
return id
}
}
func (txn *ReadTxn) BucketStat(bucket string) *Stat {
stat, err := txn.txn.Stat(txn.getBucketId(bucket))
if err != nil { // Possible errors: EINVAL, MDB_BAD_TXN
panic(err)
}
return (*Stat)(stat)
}
// Panic if {bucket} does not exist.
func (txn *ReadTxn) IsBucketEmpty(bucket string) bool {
return txn.Iterate(bucket) == nil
}
// Return {nil, false} if {key} does not exist, {val, true} if {key} exist
func (txn *ReadTxn) Get(bucket string, key []byte) ([]byte, bool) {
v, err := txn.txn.GetVal(txn.getBucketId(bucket), key)
if err != nil {
if err == mdb.NotFound {
return nil, false
} else { // Possible errors: EINVAL, MDB_BAD_TXN, MDB_BAD_VALSIZE, etc
panic(err)
}
}
return v.Bytes(), true
}
// 1) Return {nil, false} if {key} does not exist, {val, true} if {key} exist
func (txn *ReadTxn) GetNoCopy(bucket string, key []byte) ([]byte, bool) {
v, err := txn.txn.GetVal(txn.getBucketId(bucket), key)
if err != nil {
if err == mdb.NotFound {
return nil, false
} else { // Possible errors: EINVAL, MDB_BAD_TXN, MDB_BAD_VALSIZE, etc
panic(err)
}
}
return v.BytesNoCopy(), true
}
// Return an iterator pointing to the first item in the bucket.
// If the bucket is empty, nil is returned.
func (txn *ReadTxn) Iterate(bucket string) *Iterator {
cur, err := txn.txn.CursorOpen(txn.getBucketId(bucket))
if err != nil {
panic(err)
}
itr := (*Iterator)(cur)
if itr.SeekFirst() {
txn.itrs = append(txn.itrs, itr)
return itr
} else {
itr.Close()
return nil
}
}
//--------------------------------- ReadWriteTxn --------------------------------------------------
func (parent *ReadWriteTxn) TransactionalRW(f func(*ReadWriteTxn) error) (err error) {
txn, err := parent.env.BeginTxn(parent.txn, 0)
if err != nil { // Possible Errors: MDB_PANIC, MDB_MAP_RESIZED, MDB_READERS_FULL, ENOMEM
panic(err)
}
var panicF interface{} // panic from f
var subDirtyKeys map[string]bool
if parent.dirtyKeys != nil {
subDirtyKeys = make(map[string]bool)
}
rwCtx := ReadWriteTxn{parent.env, &ReadTxn{parent.buckets, txn, nil}, subDirtyKeys}
defer func() {
for _, itr := range rwCtx.itrs {
itr.Close() // no panic
}
rwCtx.itrs = nil
if err == nil && panicF == nil {
e := txn.Commit()
if e != nil { // Possible errors: EINVAL, ENOSPEC, EIO, ENOMEM
panic(e)
}
if (parent.dirtyKeys == nil) != (rwCtx.dirtyKeys == nil) {
panic(fmt.Errorf("unexpected error"))
}
for dirtyKey := range rwCtx.dirtyKeys {
parent.dirtyKeys[dirtyKey] = true
}
} else {
txn.Abort()
if panicF != nil {
panic(panicF) // re-panic
}
}
}()
func() {
defer func() {
panicF = recover()
}()
err = f(&rwCtx)
}()
return
}
func (txn *ReadWriteTxn) ApplyPatch(patch TxnPatch) error {
for _, cell := range patch {
if cell.exists {
txn.Put(cell.bucket, cell.key, cell.value)
} else {
txn.Delete(cell.bucket, cell.key)
}
}
return nil
}
func (txn *ReadWriteTxn) ClearBucket(bucket string) {
err := txn.txn.Drop(txn.getBucketId(bucket), 0)
if err != nil { // Possible errors: EINVAL, EACCES, MDB_BAD_DBI
panic(err)
}
if txn.dirtyKeys != nil {
// currently we do not support this operation when making a TxnPatch
panic(errors.New("Encountered ClearBucket operation when making a TxnPatch"))
}
}
func (txn *ReadWriteTxn) Put(bucket string, key, val []byte) {
err := txn.txn.Put(txn.getBucketId(bucket), key, val, 0)
if err != nil { // Possible errors: MDB_MAP_FULL, MDB_TXN_FULL, EACCES, EINVAL
panic(err)
}
if txn.dirtyKeys != nil {
txn.dirtyKeys[CellKey{bucket, key}.Serialize()] = true
}
}
func (txn *ReadWriteTxn) Delete(bucket string, key []byte) {
err := txn.txn.Del(txn.getBucketId(bucket), key, nil)
if err != nil && err != mdb.NotFound { // Possible errors: EINVAL, EACCES, MDB_BAD_TXN
panic(err)
}
if txn.dirtyKeys != nil {
txn.dirtyKeys[CellKey{bucket, key}.Serialize()] = true
}
}