-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdatabase.go
300 lines (256 loc) · 7.07 KB
/
database.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package lmdb
import (
"errors"
"fmt"
mdb "github.com/libreoscar/gomdb"
"log"
)
// Thread Safety
// 1) NOTLS mode is used exclusively, which allows read txns to freely migrate across
// threads and for a single thread to maintain multiple read txns. This enables mostly
// care-free use of read txns.
// 2) Most objects can be safely called by a single caller from a single thread, and usually it
// only makes sense to have a single caller, except in the case of Database.
// 3) Most Database methods are thread-safe, and may be called concurrently, except for
// Database.Close().
// 4) A write txn may only be used from the thread it was created on.
// 5) A read-only txn can move across threads, but it cannot be used concurrently from multiple
// threads.
// 6) Iterator is not thread-safe, but it does not make sense to use it on any thread except the
// thread that currently owns its associated txn.
//
//----------------------------------------------------------------------------------------
//
// Best practice:
// 1) Use iterators only in the txn that they are created
// 2) DO NOT modify the memory slice from GetNoCopy
// 3) Make sure all read/write txns are finished before Database.Close().
const (
// There is no penalty for making this huge.
// If you are on a 32-bit system, use Open2 and specify a smaller map size.
MAP_SIZE_DEFAULT uint64 = 1 * 1024 * 1024 * 1024 * 1024 // 1TB
// http://www.openldap.org/lists/openldap-technical/201305/msg00176.html
MAX_DB_DEFAULT int = 32
)
type RWTxnCreator interface {
TransactionalRW(func(*ReadWriteTxn) error) error
}
type dryRunDummyError struct{}
func (d dryRunDummyError) Error() string {
return "Dummy error of dry running a db transaction."
}
// Dry run the db transaction, i.e. always rollback, even if the returned error is nil.
func DryRunRWTxn(rwtxner RWTxnCreator, f func(*ReadWriteTxn) error) error {
err := rwtxner.TransactionalRW(func(rwtx *ReadWriteTxn) error {
err := f(rwtx)
if err == nil {
err = dryRunDummyError{}
}
return err
})
if _, ok := err.(dryRunDummyError); ok {
err = nil
}
return err
}
// a make-patch is a dry-run with a patch as its return value
func MakePatch(rwtxner RWTxnCreator, f func(*ReadWriteTxn) error) (patch TxnPatch, err error) {
rwtxner.TransactionalRW(func(rwtxn *ReadWriteTxn) error {
origin := rwtxn.dirtyKeys
rwtxn.dirtyKeys = make(map[string]bool)
err = f(rwtxn)
if err == nil {
for serializedCellKey := range rwtxn.dirtyKeys {
cellKey, err2 := DeserializeCellKey(serializedCellKey)
if err2 != nil {
panic(fmt.Errorf("deserialization error: %s, serialized key = %v",
err2.Error(), serializedCellKey))
}
cell := cellState{bucket: cellKey.Bucket, key: cellKey.Key}
cell.value, cell.exists = rwtxn.Get(cellKey.Bucket, cellKey.Key)
patch = append(patch, cell)
}
err = dryRunDummyError{}
}
rwtxn.dirtyKeys = origin
return err
})
if _, ok := err.(dryRunDummyError); ok {
err = nil
}
return
}
//--------------------------------- Database ---------------------------------------------
type Database struct {
env *mdb.Env
// In this package, a DBI is obtained only through Open/Open2, and is never closed until
// Context.Close(), in which all dbis are closed automatically.
buckets map[string]mdb.DBI
}
type Stat mdb.Stat
type Info mdb.Info
func Version() string {
return mdb.Version()
}
func Open(path string, buckets []string) (*Database, error) {
return Open2(path, buckets, MAP_SIZE_DEFAULT, MAX_DB_DEFAULT)
}
func Open2(path string, buckets []string, maxMapSize uint64, maxDB int) (db *Database, err error) {
if maxDB < len(buckets) {
maxDB = len(buckets)
}
// TODO: (Potential bug):
// From mdb_env_open's doc,
// "If this function fails, #mdb_env_close() must be called to discard the #MDB_env handle."
// But mdb.NewEnv doesnot call mdb_env_close() when it fails, AND it just return nil as env.
// Patch gomdb if this turns out to be a big issue.
env, err := mdb.NewEnv()
db = &Database{env, make(map[string]mdb.DBI)}
defer func() {
if err != nil && env != nil {
log.Printf("[ERROR] Open db failed. %v", err)
env.Close()
db.env = nil
}
}()
if err != nil {
return
}
err = env.SetMapSize(maxMapSize)
if err != nil {
return
}
err = env.SetMaxDBs(mdb.DBI(maxDB))
if err != nil {
return
}
MDB_NOTLS := uint(0x200000)
MDB_NORDAHEAD := uint(0x800000)
err = env.Open(path, MDB_NOTLS|MDB_NORDAHEAD, 0664)
if err != nil {
return
}
err = db.openBuckets(buckets)
return
}
func (db *Database) openBuckets(buckets []string) error {
return db.TransactionalRW(func(txn *ReadWriteTxn) error {
for _, name := range buckets {
if name == "" {
return errors.New("Bucket name is empty")
}
_, exist := db.buckets[name]
if exist {
continue
}
dbi, err := txn.txn.DBIOpen(&name, mdb.CREATE)
if err != nil {
return err
} else {
db.buckets[name] = dbi
}
}
return nil
})
}
func (db *Database) GetExistingBuckets() (buckets []string, err error) {
db.TransactionalRW(func(txn *ReadWriteTxn) error {
dbi, err := txn.txn.DBIOpen(nil, mdb.CREATE)
if err != nil {
return err
}
cur, err := txn.txn.CursorOpen(dbi)
if err != nil {
return err
}
itr := (*Iterator)(cur)
defer itr.Close()
if !itr.SeekFirst() {
return nil
}
for {
key, _ := itr.GetNoCopy()
buckets = append(buckets, string(key))
if !itr.Next() {
break
}
}
return nil
})
return
}
func (db *Database) Close() {
if db.env != nil {
db.env.Close() // all opened dbis are closed during this process
}
}
func (db *Database) Stat() *Stat {
stat, err := db.env.Stat()
if err != nil { // Possible errors: EINVAL
panic(err)
}
return (*Stat)(stat)
}
func (db *Database) Info() *Info {
info, err := db.env.Info()
if err != nil { // error when env == nil, so panic
panic(err)
}
return (*Info)(info)
}
func (db *Database) TransactionalR(f func(ReadTxner)) {
txn, err := db.env.BeginTxn(nil, mdb.RDONLY)
if err != nil { // Possible Errors: MDB_PANIC, MDB_MAP_RESIZED, MDB_READERS_FULL, ENOMEM
panic(err)
}
var panicF interface{} // panic from f
rdTxn := ReadTxn{db.buckets, txn, nil}
defer func() {
for _, itr := range rdTxn.itrs {
itr.Close() // no panic
}
rdTxn.itrs = nil
txn.Abort()
if panicF != nil {
panic(panicF) // re-panic
}
}()
func() {
defer func() {
panicF = recover()
}()
f(&rdTxn)
}()
}
func (db *Database) TransactionalRW(f func(*ReadWriteTxn) error) (err error) {
txn, err := db.env.BeginTxn(nil, 0)
if err != nil { // Possible Errors: MDB_PANIC, MDB_MAP_RESIZED, MDB_READERS_FULL, ENOMEM
panic(err)
}
var panicF interface{} // panic from f
rwCtx := ReadWriteTxn{db.env, &ReadTxn{db.buckets, txn, nil}, nil}
defer func() {
for _, itr := range rwCtx.itrs {
itr.Close() // no panic
}
rwCtx.itrs = nil
if err == nil && panicF == nil {
e := txn.Commit()
if e != nil { // Possible errors: EINVAL, ENOSPEC, EIO, ENOMEM
panic(e)
}
} else {
txn.Abort()
if panicF != nil {
panic(panicF) // re-panic
}
}
}()
func() {
defer func() {
panicF = recover()
}()
err = f(&rwCtx)
}()
return
}