mirror of https://github.com/docker/docs.git
commit
b6e1f4dd69
|
@ -36,7 +36,7 @@ clone git github.com/coreos/etcd v2.2.0
|
||||||
fix_rewritten_imports github.com/coreos/etcd
|
fix_rewritten_imports github.com/coreos/etcd
|
||||||
clone git github.com/ugorji/go 5abd4e96a45c386928ed2ca2a7ef63e2533e18ec
|
clone git github.com/ugorji/go 5abd4e96a45c386928ed2ca2a7ef63e2533e18ec
|
||||||
clone git github.com/hashicorp/consul v0.5.2
|
clone git github.com/hashicorp/consul v0.5.2
|
||||||
clone git github.com/boltdb/bolt v1.0
|
clone git github.com/boltdb/bolt v1.1.0
|
||||||
|
|
||||||
# get graph and distribution packages
|
# get graph and distribution packages
|
||||||
clone git github.com/docker/distribution 20c4b7a1805a52753dfd593ee1cc35558722a0ce # docker/1.9 branch
|
clone git github.com/docker/distribution 20c4b7a1805a52753dfd593ee1cc35558722a0ce # docker/1.9 branch
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
*.prof
|
*.prof
|
||||||
*.test
|
*.test
|
||||||
|
*.swp
|
||||||
/bin/
|
/bin/
|
||||||
|
|
|
@ -16,7 +16,7 @@ and setting values. That's it.
|
||||||
|
|
||||||
## Project Status
|
## Project Status
|
||||||
|
|
||||||
Bolt is stable and the API is fixed. Full unit test coverage and randomized
|
Bolt is stable and the API is fixed. Full unit test coverage and randomized
|
||||||
black box testing are used to ensure database consistency and thread safety.
|
black box testing are used to ensure database consistency and thread safety.
|
||||||
Bolt is currently in high-load production environments serving databases as
|
Bolt is currently in high-load production environments serving databases as
|
||||||
large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed
|
large as 1TB. Many companies such as Shopify and Heroku use Bolt-backed
|
||||||
|
@ -87,6 +87,11 @@ are not thread safe. To work with data in multiple goroutines you must start
|
||||||
a transaction for each one or use locking to ensure only one goroutine accesses
|
a transaction for each one or use locking to ensure only one goroutine accesses
|
||||||
a transaction at a time. Creating transaction from the `DB` is thread safe.
|
a transaction at a time. Creating transaction from the `DB` is thread safe.
|
||||||
|
|
||||||
|
Read-only transactions and read-write transactions should not depend on one
|
||||||
|
another and generally shouldn't be opened simultaneously in the same goroutine.
|
||||||
|
This can cause a deadlock as the read-write transaction needs to periodically
|
||||||
|
re-map the data file but it cannot do so while a read-only transaction is open.
|
||||||
|
|
||||||
|
|
||||||
#### Read-write transactions
|
#### Read-write transactions
|
||||||
|
|
||||||
|
@ -120,12 +125,88 @@ err := db.View(func(tx *bolt.Tx) error {
|
||||||
})
|
})
|
||||||
```
|
```
|
||||||
|
|
||||||
You also get a consistent view of the database within this closure, however,
|
You also get a consistent view of the database within this closure, however,
|
||||||
no mutating operations are allowed within a read-only transaction. You can only
|
no mutating operations are allowed within a read-only transaction. You can only
|
||||||
retrieve buckets, retrieve values, and copy the database within a read-only
|
retrieve buckets, retrieve values, and copy the database within a read-only
|
||||||
transaction.
|
transaction.
|
||||||
|
|
||||||
|
|
||||||
|
#### Batch read-write transactions
|
||||||
|
|
||||||
|
Each `DB.Update()` waits for disk to commit the writes. This overhead
|
||||||
|
can be minimized by combining multiple updates with the `DB.Batch()`
|
||||||
|
function:
|
||||||
|
|
||||||
|
```go
|
||||||
|
err := db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
...
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
```
|
||||||
|
|
||||||
|
Concurrent Batch calls are opportunistically combined into larger
|
||||||
|
transactions. Batch is only useful when there are multiple goroutines
|
||||||
|
calling it.
|
||||||
|
|
||||||
|
The trade-off is that `Batch` can call the given
|
||||||
|
function multiple times, if parts of the transaction fail. The
|
||||||
|
function must be idempotent and side effects must take effect only
|
||||||
|
after a successful return from `DB.Batch()`.
|
||||||
|
|
||||||
|
For example: don't display messages from inside the function, instead
|
||||||
|
set variables in the enclosing scope:
|
||||||
|
|
||||||
|
```go
|
||||||
|
var id uint64
|
||||||
|
err := db.Batch(func(tx *bolt.Tx) error {
|
||||||
|
// Find last key in bucket, decode as bigendian uint64, increment
|
||||||
|
// by one, encode back to []byte, and add new key.
|
||||||
|
...
|
||||||
|
id = newValue
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return ...
|
||||||
|
}
|
||||||
|
fmt.Println("Allocated ID %d", id)
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
#### Managing transactions manually
|
||||||
|
|
||||||
|
The `DB.View()` and `DB.Update()` functions are wrappers around the `DB.Begin()`
|
||||||
|
function. These helper functions will start the transaction, execute a function,
|
||||||
|
and then safely close your transaction if an error is returned. This is the
|
||||||
|
recommended way to use Bolt transactions.
|
||||||
|
|
||||||
|
However, sometimes you may want to manually start and end your transactions.
|
||||||
|
You can use the `Tx.Begin()` function directly but _please_ be sure to close the
|
||||||
|
transaction.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Start a writable transaction.
|
||||||
|
tx, err := db.Begin(true)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer tx.Rollback()
|
||||||
|
|
||||||
|
// Use the transaction...
|
||||||
|
_, err := tx.CreateBucket([]byte("MyBucket"))
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit the transaction and check for error.
|
||||||
|
if err := tx.Commit(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The first argument to `DB.Begin()` is a boolean stating if the transaction
|
||||||
|
should be writable.
|
||||||
|
|
||||||
|
|
||||||
### Using buckets
|
### Using buckets
|
||||||
|
|
||||||
Buckets are collections of key/value pairs within the database. All keys in a
|
Buckets are collections of key/value pairs within the database. All keys in a
|
||||||
|
@ -175,13 +256,61 @@ db.View(func(tx *bolt.Tx) error {
|
||||||
```
|
```
|
||||||
|
|
||||||
The `Get()` function does not return an error because its operation is
|
The `Get()` function does not return an error because its operation is
|
||||||
guarenteed to work (unless there is some kind of system failure). If the key
|
guaranteed to work (unless there is some kind of system failure). If the key
|
||||||
exists then it will return its byte slice value. If it doesn't exist then it
|
exists then it will return its byte slice value. If it doesn't exist then it
|
||||||
will return `nil`. It's important to note that you can have a zero-length value
|
will return `nil`. It's important to note that you can have a zero-length value
|
||||||
set to a key which is different than the key not existing.
|
set to a key which is different than the key not existing.
|
||||||
|
|
||||||
Use the `Bucket.Delete()` function to delete a key from the bucket.
|
Use the `Bucket.Delete()` function to delete a key from the bucket.
|
||||||
|
|
||||||
|
Please note that values returned from `Get()` are only valid while the
|
||||||
|
transaction is open. If you need to use a value outside of the transaction
|
||||||
|
then you must use `copy()` to copy it to another byte slice.
|
||||||
|
|
||||||
|
|
||||||
|
### Autoincrementing integer for the bucket
|
||||||
|
By using the NextSequence() function, you can let Bolt determine a sequence
|
||||||
|
which can be used as the unique identifier for your key/value pairs. See the
|
||||||
|
example below.
|
||||||
|
|
||||||
|
```go
|
||||||
|
// CreateUser saves u to the store. The new user ID is set on u once the data is persisted.
|
||||||
|
func (s *Store) CreateUser(u *User) error {
|
||||||
|
return s.db.Update(func(tx *bolt.Tx) error {
|
||||||
|
// Retrieve the users bucket.
|
||||||
|
// This should be created when the DB is first opened.
|
||||||
|
b := tx.Bucket([]byte("users"))
|
||||||
|
|
||||||
|
// Generate ID for the user.
|
||||||
|
// This returns an error only if the Tx is closed or not writeable.
|
||||||
|
// That can't happen in an Update() call so I ignore the error check.
|
||||||
|
id, _ = b.NextSequence()
|
||||||
|
u.ID = int(id)
|
||||||
|
|
||||||
|
// Marshal user data into bytes.
|
||||||
|
buf, err := json.Marshal(u)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Persist bytes to users bucket.
|
||||||
|
return b.Put(itob(u.ID), buf)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// itob returns an 8-byte big endian representation of v.
|
||||||
|
func itob(v int) []byte {
|
||||||
|
b := make([]byte, 8)
|
||||||
|
binary.BigEndian.PutUint64(b, uint64(v))
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
type User struct {
|
||||||
|
ID int
|
||||||
|
...
|
||||||
|
}
|
||||||
|
|
||||||
|
```
|
||||||
|
|
||||||
### Iterating over keys
|
### Iterating over keys
|
||||||
|
|
||||||
|
@ -254,7 +383,7 @@ db.View(func(tx *bolt.Tx) error {
|
||||||
max := []byte("2000-01-01T00:00:00Z")
|
max := []byte("2000-01-01T00:00:00Z")
|
||||||
|
|
||||||
// Iterate over the 90's.
|
// Iterate over the 90's.
|
||||||
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) != -1; k, v = c.Next() {
|
for k, v := c.Seek(min); k != nil && bytes.Compare(k, max) <= 0; k, v = c.Next() {
|
||||||
fmt.Printf("%s: %s\n", k, v)
|
fmt.Printf("%s: %s\n", k, v)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -294,7 +423,7 @@ func (*Bucket) DeleteBucket(key []byte) error
|
||||||
|
|
||||||
### Database backups
|
### Database backups
|
||||||
|
|
||||||
Bolt is a single file so it's easy to backup. You can use the `Tx.Copy()`
|
Bolt is a single file so it's easy to backup. You can use the `Tx.WriteTo()`
|
||||||
function to write a consistent view of the database to a writer. If you call
|
function to write a consistent view of the database to a writer. If you call
|
||||||
this from a read-only transaction, it will perform a hot backup and not block
|
this from a read-only transaction, it will perform a hot backup and not block
|
||||||
your other database reads and writes. It will also use `O_DIRECT` when available
|
your other database reads and writes. It will also use `O_DIRECT` when available
|
||||||
|
@ -305,11 +434,12 @@ do database backups:
|
||||||
|
|
||||||
```go
|
```go
|
||||||
func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
|
func BackupHandleFunc(w http.ResponseWriter, req *http.Request) {
|
||||||
err := db.View(func(tx bolt.Tx) error {
|
err := db.View(func(tx *bolt.Tx) error {
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
|
w.Header().Set("Content-Disposition", `attachment; filename="my.db"`)
|
||||||
w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
|
w.Header().Set("Content-Length", strconv.Itoa(int(tx.Size())))
|
||||||
return tx.Copy(w)
|
_, err := tx.WriteTo(w)
|
||||||
|
return err
|
||||||
})
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||||
|
@ -351,14 +481,13 @@ go func() {
|
||||||
// Grab the current stats and diff them.
|
// Grab the current stats and diff them.
|
||||||
stats := db.Stats()
|
stats := db.Stats()
|
||||||
diff := stats.Sub(&prev)
|
diff := stats.Sub(&prev)
|
||||||
|
|
||||||
// Encode stats to JSON and print to STDERR.
|
// Encode stats to JSON and print to STDERR.
|
||||||
json.NewEncoder(os.Stderr).Encode(diff)
|
json.NewEncoder(os.Stderr).Encode(diff)
|
||||||
|
|
||||||
// Save stats for the next loop.
|
// Save stats for the next loop.
|
||||||
prev = stats
|
prev = stats
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}()
|
}()
|
||||||
```
|
```
|
||||||
|
|
||||||
|
@ -366,25 +495,83 @@ It's also useful to pipe these stats to a service such as statsd for monitoring
|
||||||
or to provide an HTTP endpoint that will perform a fixed-length sample.
|
or to provide an HTTP endpoint that will perform a fixed-length sample.
|
||||||
|
|
||||||
|
|
||||||
|
### Read-Only Mode
|
||||||
|
|
||||||
|
Sometimes it is useful to create a shared, read-only Bolt database. To this,
|
||||||
|
set the `Options.ReadOnly` flag when opening your database. Read-only mode
|
||||||
|
uses a shared lock to allow multiple processes to read from the database but
|
||||||
|
it will block any processes from opening the database in read-write mode.
|
||||||
|
|
||||||
|
```go
|
||||||
|
db, err := bolt.Open("my.db", 0666, &bolt.Options{ReadOnly: true})
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
## Resources
|
## Resources
|
||||||
|
|
||||||
For more information on getting started with Bolt, check out the following articles:
|
For more information on getting started with Bolt, check out the following articles:
|
||||||
|
|
||||||
* [Intro to BoltDB: Painless Performant Persistence](http://npf.io/2014/07/intro-to-boltdb-painless-performant-persistence/) by [Nate Finch](https://github.com/natefinch).
|
* [Intro to BoltDB: Painless Performant Persistence](http://npf.io/2014/07/intro-to-boltdb-painless-performant-persistence/) by [Nate Finch](https://github.com/natefinch).
|
||||||
|
* [Bolt -- an embedded key/value database for Go](https://www.progville.com/go/bolt-embedded-db-golang/) by Progville
|
||||||
|
|
||||||
|
|
||||||
|
## Comparison with other databases
|
||||||
|
|
||||||
## Comparing Bolt to LMDB
|
### Postgres, MySQL, & other relational databases
|
||||||
|
|
||||||
|
Relational databases structure data into rows and are only accessible through
|
||||||
|
the use of SQL. This approach provides flexibility in how you store and query
|
||||||
|
your data but also incurs overhead in parsing and planning SQL statements. Bolt
|
||||||
|
accesses all data by a byte slice key. This makes Bolt fast to read and write
|
||||||
|
data by key but provides no built-in support for joining values together.
|
||||||
|
|
||||||
|
Most relational databases (with the exception of SQLite) are standalone servers
|
||||||
|
that run separately from your application. This gives your systems
|
||||||
|
flexibility to connect multiple application servers to a single database
|
||||||
|
server but also adds overhead in serializing and transporting data over the
|
||||||
|
network. Bolt runs as a library included in your application so all data access
|
||||||
|
has to go through your application's process. This brings data closer to your
|
||||||
|
application but limits multi-process access to the data.
|
||||||
|
|
||||||
|
|
||||||
|
### LevelDB, RocksDB
|
||||||
|
|
||||||
|
LevelDB and its derivatives (RocksDB, HyperLevelDB) are similar to Bolt in that
|
||||||
|
they are libraries bundled into the application, however, their underlying
|
||||||
|
structure is a log-structured merge-tree (LSM tree). An LSM tree optimizes
|
||||||
|
random writes by using a write ahead log and multi-tiered, sorted files called
|
||||||
|
SSTables. Bolt uses a B+tree internally and only a single file. Both approaches
|
||||||
|
have trade offs.
|
||||||
|
|
||||||
|
If you require a high random write throughput (>10,000 w/sec) or you need to use
|
||||||
|
spinning disks then LevelDB could be a good choice. If your application is
|
||||||
|
read-heavy or does a lot of range scans then Bolt could be a good choice.
|
||||||
|
|
||||||
|
One other important consideration is that LevelDB does not have transactions.
|
||||||
|
It supports batch writing of key/values pairs and it supports read snapshots
|
||||||
|
but it will not give you the ability to do a compare-and-swap operation safely.
|
||||||
|
Bolt supports fully serializable ACID transactions.
|
||||||
|
|
||||||
|
|
||||||
|
### LMDB
|
||||||
|
|
||||||
Bolt was originally a port of LMDB so it is architecturally similar. Both use
|
Bolt was originally a port of LMDB so it is architecturally similar. Both use
|
||||||
a B+tree, have ACID semanetics with fully serializable transactions, and support
|
a B+tree, have ACID semantics with fully serializable transactions, and support
|
||||||
lock-free MVCC using a single writer and multiple readers.
|
lock-free MVCC using a single writer and multiple readers.
|
||||||
|
|
||||||
The two projects have somewhat diverged. LMDB heavily focuses on raw performance
|
The two projects have somewhat diverged. LMDB heavily focuses on raw performance
|
||||||
while Bolt has focused on simplicity and ease of use. For example, LMDB allows
|
while Bolt has focused on simplicity and ease of use. For example, LMDB allows
|
||||||
several unsafe actions such as direct writes and append writes for the sake of
|
several unsafe actions such as direct writes for the sake of performance. Bolt
|
||||||
performance. Bolt opts to disallow actions which can leave the database in a
|
opts to disallow actions which can leave the database in a corrupted state. The
|
||||||
corrupted state. The only exception to this in Bolt is `DB.NoSync`.
|
only exception to this in Bolt is `DB.NoSync`.
|
||||||
|
|
||||||
|
There are also a few differences in API. LMDB requires a maximum mmap size when
|
||||||
|
opening an `mdb_env` whereas Bolt will handle incremental mmap resizing
|
||||||
|
automatically. LMDB overloads the getter and setter functions with multiple
|
||||||
|
flags whereas Bolt splits these specialized cases into their own functions.
|
||||||
|
|
||||||
|
|
||||||
## Caveats & Limitations
|
## Caveats & Limitations
|
||||||
|
@ -425,14 +612,33 @@ Here are a few things to note when evaluating and using Bolt:
|
||||||
can in memory and will release memory as needed to other processes. This means
|
can in memory and will release memory as needed to other processes. This means
|
||||||
that Bolt can show very high memory usage when working with large databases.
|
that Bolt can show very high memory usage when working with large databases.
|
||||||
However, this is expected and the OS will release memory as needed. Bolt can
|
However, this is expected and the OS will release memory as needed. Bolt can
|
||||||
handle databases much larger than the available physical RAM.
|
handle databases much larger than the available physical RAM, provided its
|
||||||
|
memory-map fits in the process virtual address space. It may be problematic
|
||||||
|
on 32-bits systems.
|
||||||
|
|
||||||
|
* The data structures in the Bolt database are memory mapped so the data file
|
||||||
|
will be endian specific. This means that you cannot copy a Bolt file from a
|
||||||
|
little endian machine to a big endian machine and have it work. For most
|
||||||
|
users this is not a concern since most modern CPUs are little endian.
|
||||||
|
|
||||||
|
* Because of the way pages are laid out on disk, Bolt cannot truncate data files
|
||||||
|
and return free pages back to the disk. Instead, Bolt maintains a free list
|
||||||
|
of unused pages within its data file. These free pages can be reused by later
|
||||||
|
transactions. This works well for many use cases as databases generally tend
|
||||||
|
to grow. However, it's important to note that deleting large chunks of data
|
||||||
|
will not allow you to reclaim that space on disk.
|
||||||
|
|
||||||
|
For more information on page allocation, [see this comment][page-allocation].
|
||||||
|
|
||||||
|
[page-allocation]: https://github.com/boltdb/bolt/issues/308#issuecomment-74811638
|
||||||
|
|
||||||
|
|
||||||
## Other Projects Using Bolt
|
## Other Projects Using Bolt
|
||||||
|
|
||||||
Below is a list of public, open source projects that use Bolt:
|
Below is a list of public, open source projects that use Bolt:
|
||||||
|
|
||||||
* [Bazil](https://github.com/bazillion/bazil) - A file system that lets your data reside where it is most convenient for it to reside.
|
* [Operation Go: A Routine Mission](http://gocode.io) - An online programming game for Golang using Bolt for user accounts and a leaderboard.
|
||||||
|
* [Bazil](https://bazil.org/) - A file system that lets your data reside where it is most convenient for it to reside.
|
||||||
* [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb.
|
* [DVID](https://github.com/janelia-flyem/dvid) - Added Bolt as optional storage engine and testing it against Basho-tuned leveldb.
|
||||||
* [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics.
|
* [Skybox Analytics](https://github.com/skybox/skybox) - A standalone funnel analysis tool for web analytics.
|
||||||
* [Scuttlebutt](https://github.com/benbjohnson/scuttlebutt) - Uses Bolt to store and process all Twitter mentions of GitHub projects.
|
* [Scuttlebutt](https://github.com/benbjohnson/scuttlebutt) - Uses Bolt to store and process all Twitter mentions of GitHub projects.
|
||||||
|
@ -450,6 +656,16 @@ Below is a list of public, open source projects that use Bolt:
|
||||||
* [bleve](http://www.blevesearch.com/) - A pure Go search engine similar to ElasticSearch that uses Bolt as the default storage backend.
|
* [bleve](http://www.blevesearch.com/) - A pure Go search engine similar to ElasticSearch that uses Bolt as the default storage backend.
|
||||||
* [tentacool](https://github.com/optiflows/tentacool) - REST api server to manage system stuff (IP, DNS, Gateway...) on a linux server.
|
* [tentacool](https://github.com/optiflows/tentacool) - REST api server to manage system stuff (IP, DNS, Gateway...) on a linux server.
|
||||||
* [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database.
|
* [SkyDB](https://github.com/skydb/sky) - Behavioral analytics database.
|
||||||
|
* [Seaweed File System](https://github.com/chrislusf/weed-fs) - Highly scalable distributed key~file system with O(1) disk read.
|
||||||
|
* [InfluxDB](http://influxdb.com) - Scalable datastore for metrics, events, and real-time analytics.
|
||||||
|
* [Freehold](http://tshannon.bitbucket.org/freehold/) - An open, secure, and lightweight platform for your files and data.
|
||||||
|
* [Prometheus Annotation Server](https://github.com/oliver006/prom_annotation_server) - Annotation server for PromDash & Prometheus service monitoring system.
|
||||||
|
* [Consul](https://github.com/hashicorp/consul) - Consul is service discovery and configuration made easy. Distributed, highly available, and datacenter-aware.
|
||||||
|
* [Kala](https://github.com/ajvb/kala) - Kala is a modern job scheduler optimized to run on a single node. It is persistent, JSON over HTTP API, ISO 8601 duration notation, and dependent jobs.
|
||||||
|
* [drive](https://github.com/odeke-em/drive) - drive is an unofficial Google Drive command line client for \*NIX operating systems.
|
||||||
|
* [stow](https://github.com/djherbis/stow) - a persistence manager for objects
|
||||||
|
backed by boltdb.
|
||||||
|
* [buckets](https://github.com/joyrexus/buckets) - a bolt wrapper streamlining
|
||||||
|
simple tx and key scans.
|
||||||
|
|
||||||
If you are using Bolt in a project please send a pull request to add it to the list.
|
If you are using Bolt in a project please send a pull request to add it to the list.
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,138 @@
|
||||||
|
package bolt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Batch calls fn as part of a batch. It behaves similar to Update,
|
||||||
|
// except:
|
||||||
|
//
|
||||||
|
// 1. concurrent Batch calls can be combined into a single Bolt
|
||||||
|
// transaction.
|
||||||
|
//
|
||||||
|
// 2. the function passed to Batch may be called multiple times,
|
||||||
|
// regardless of whether it returns error or not.
|
||||||
|
//
|
||||||
|
// This means that Batch function side effects must be idempotent and
|
||||||
|
// take permanent effect only after a successful return is seen in
|
||||||
|
// caller.
|
||||||
|
//
|
||||||
|
// The maximum batch size and delay can be adjusted with DB.MaxBatchSize
|
||||||
|
// and DB.MaxBatchDelay, respectively.
|
||||||
|
//
|
||||||
|
// Batch is only useful when there are multiple goroutines calling it.
|
||||||
|
func (db *DB) Batch(fn func(*Tx) error) error {
|
||||||
|
errCh := make(chan error, 1)
|
||||||
|
|
||||||
|
db.batchMu.Lock()
|
||||||
|
if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) {
|
||||||
|
// There is no existing batch, or the existing batch is full; start a new one.
|
||||||
|
db.batch = &batch{
|
||||||
|
db: db,
|
||||||
|
}
|
||||||
|
db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger)
|
||||||
|
}
|
||||||
|
db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh})
|
||||||
|
if len(db.batch.calls) >= db.MaxBatchSize {
|
||||||
|
// wake up batch, it's ready to run
|
||||||
|
go db.batch.trigger()
|
||||||
|
}
|
||||||
|
db.batchMu.Unlock()
|
||||||
|
|
||||||
|
err := <-errCh
|
||||||
|
if err == trySolo {
|
||||||
|
err = db.Update(fn)
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type call struct {
|
||||||
|
fn func(*Tx) error
|
||||||
|
err chan<- error
|
||||||
|
}
|
||||||
|
|
||||||
|
type batch struct {
|
||||||
|
db *DB
|
||||||
|
timer *time.Timer
|
||||||
|
start sync.Once
|
||||||
|
calls []call
|
||||||
|
}
|
||||||
|
|
||||||
|
// trigger runs the batch if it hasn't already been run.
|
||||||
|
func (b *batch) trigger() {
|
||||||
|
b.start.Do(b.run)
|
||||||
|
}
|
||||||
|
|
||||||
|
// run performs the transactions in the batch and communicates results
|
||||||
|
// back to DB.Batch.
|
||||||
|
func (b *batch) run() {
|
||||||
|
b.db.batchMu.Lock()
|
||||||
|
b.timer.Stop()
|
||||||
|
// Make sure no new work is added to this batch, but don't break
|
||||||
|
// other batches.
|
||||||
|
if b.db.batch == b {
|
||||||
|
b.db.batch = nil
|
||||||
|
}
|
||||||
|
b.db.batchMu.Unlock()
|
||||||
|
|
||||||
|
retry:
|
||||||
|
for len(b.calls) > 0 {
|
||||||
|
var failIdx = -1
|
||||||
|
err := b.db.Update(func(tx *Tx) error {
|
||||||
|
for i, c := range b.calls {
|
||||||
|
if err := safelyCall(c.fn, tx); err != nil {
|
||||||
|
failIdx = i
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
|
||||||
|
if failIdx >= 0 {
|
||||||
|
// take the failing transaction out of the batch. it's
|
||||||
|
// safe to shorten b.calls here because db.batch no longer
|
||||||
|
// points to us, and we hold the mutex anyway.
|
||||||
|
c := b.calls[failIdx]
|
||||||
|
b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1]
|
||||||
|
// tell the submitter re-run it solo, continue with the rest of the batch
|
||||||
|
c.err <- trySolo
|
||||||
|
continue retry
|
||||||
|
}
|
||||||
|
|
||||||
|
// pass success, or bolt internal errors, to all callers
|
||||||
|
for _, c := range b.calls {
|
||||||
|
if c.err != nil {
|
||||||
|
c.err <- err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break retry
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// trySolo is a special sentinel error value used for signaling that a
|
||||||
|
// transaction function should be re-run. It should never be seen by
|
||||||
|
// callers.
|
||||||
|
var trySolo = errors.New("batch function returned an error and should be re-run solo")
|
||||||
|
|
||||||
|
type panicked struct {
|
||||||
|
reason interface{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p panicked) Error() string {
|
||||||
|
if err, ok := p.reason.(error); ok {
|
||||||
|
return err.Error()
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("panic: %v", p.reason)
|
||||||
|
}
|
||||||
|
|
||||||
|
func safelyCall(fn func(*Tx) error, tx *Tx) (err error) {
|
||||||
|
defer func() {
|
||||||
|
if p := recover(); p != nil {
|
||||||
|
err = panicked{p}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return fn(tx)
|
||||||
|
}
|
|
@ -1,4 +1,7 @@
|
||||||
package bolt
|
package bolt
|
||||||
|
|
||||||
// maxMapSize represents the largest mmap size supported by Bolt.
|
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||||
const maxMapSize = 0xFFFFFFF // 256MB
|
const maxMapSize = 0x7FFFFFFF // 2GB
|
||||||
|
|
||||||
|
// maxAllocSize is the size used when creating array pointers.
|
||||||
|
const maxAllocSize = 0xFFFFFFF
|
||||||
|
|
|
@ -2,3 +2,6 @@ package bolt
|
||||||
|
|
||||||
// maxMapSize represents the largest mmap size supported by Bolt.
|
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||||
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
|
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
|
||||||
|
|
||||||
|
// maxAllocSize is the size used when creating array pointers.
|
||||||
|
const maxAllocSize = 0x7FFFFFFF
|
||||||
|
|
|
@ -1,4 +1,7 @@
|
||||||
package bolt
|
package bolt
|
||||||
|
|
||||||
// maxMapSize represents the largest mmap size supported by Bolt.
|
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||||
const maxMapSize = 0xFFFFFFF // 256MB
|
const maxMapSize = 0x7FFFFFFF // 2GB
|
||||||
|
|
||||||
|
// maxAllocSize is the size used when creating array pointers.
|
||||||
|
const maxAllocSize = 0xFFFFFFF
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
// +build arm64
|
||||||
|
|
||||||
|
package bolt
|
||||||
|
|
||||||
|
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||||
|
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
|
||||||
|
|
||||||
|
// maxAllocSize is the size used when creating array pointers.
|
||||||
|
const maxAllocSize = 0x7FFFFFFF
|
|
@ -0,0 +1,9 @@
|
||||||
|
// +build ppc64le
|
||||||
|
|
||||||
|
package bolt
|
||||||
|
|
||||||
|
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||||
|
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
|
||||||
|
|
||||||
|
// maxAllocSize is the size used when creating array pointers.
|
||||||
|
const maxAllocSize = 0x7FFFFFFF
|
|
@ -0,0 +1,9 @@
|
||||||
|
// +build s390x
|
||||||
|
|
||||||
|
package bolt
|
||||||
|
|
||||||
|
// maxMapSize represents the largest mmap size supported by Bolt.
|
||||||
|
const maxMapSize = 0xFFFFFFFFFFFF // 256TB
|
||||||
|
|
||||||
|
// maxAllocSize is the size used when creating array pointers.
|
||||||
|
const maxAllocSize = 0x7FFFFFFF
|
|
@ -1,8 +1,9 @@
|
||||||
// +build !windows,!plan9
|
// +build !windows,!plan9,!solaris
|
||||||
|
|
||||||
package bolt
|
package bolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
@ -10,7 +11,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
// flock acquires an advisory lock on a file descriptor.
|
// flock acquires an advisory lock on a file descriptor.
|
||||||
func flock(f *os.File, timeout time.Duration) error {
|
func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
||||||
var t time.Time
|
var t time.Time
|
||||||
for {
|
for {
|
||||||
// If we're beyond our timeout then return an error.
|
// If we're beyond our timeout then return an error.
|
||||||
|
@ -20,9 +21,13 @@ func flock(f *os.File, timeout time.Duration) error {
|
||||||
} else if timeout > 0 && time.Since(t) > timeout {
|
} else if timeout > 0 && time.Since(t) > timeout {
|
||||||
return ErrTimeout
|
return ErrTimeout
|
||||||
}
|
}
|
||||||
|
flag := syscall.LOCK_SH
|
||||||
|
if exclusive {
|
||||||
|
flag = syscall.LOCK_EX
|
||||||
|
}
|
||||||
|
|
||||||
// Otherwise attempt to obtain an exclusive lock.
|
// Otherwise attempt to obtain an exclusive lock.
|
||||||
err := syscall.Flock(int(f.Fd()), syscall.LOCK_EX|syscall.LOCK_NB)
|
err := syscall.Flock(int(f.Fd()), flag|syscall.LOCK_NB)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return nil
|
return nil
|
||||||
} else if err != syscall.EWOULDBLOCK {
|
} else if err != syscall.EWOULDBLOCK {
|
||||||
|
@ -41,11 +46,28 @@ func funlock(f *os.File) error {
|
||||||
|
|
||||||
// mmap memory maps a DB's data file.
|
// mmap memory maps a DB's data file.
|
||||||
func mmap(db *DB, sz int) error {
|
func mmap(db *DB, sz int) error {
|
||||||
|
// Truncate and fsync to ensure file size metadata is flushed.
|
||||||
|
// https://github.com/boltdb/bolt/issues/284
|
||||||
|
if !db.NoGrowSync && !db.readOnly {
|
||||||
|
if err := db.file.Truncate(int64(sz)); err != nil {
|
||||||
|
return fmt.Errorf("file resize error: %s", err)
|
||||||
|
}
|
||||||
|
if err := db.file.Sync(); err != nil {
|
||||||
|
return fmt.Errorf("file sync error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map the data file to memory.
|
||||||
b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED)
|
b, err := syscall.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Advise the kernel that the mmap is accessed randomly.
|
||||||
|
if err := madvise(b, syscall.MADV_RANDOM); err != nil {
|
||||||
|
return fmt.Errorf("madvise: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Save the original byte slice and convert to a byte array pointer.
|
// Save the original byte slice and convert to a byte array pointer.
|
||||||
db.dataref = b
|
db.dataref = b
|
||||||
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
|
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
|
||||||
|
@ -67,3 +89,12 @@ func munmap(db *DB) error {
|
||||||
db.datasz = 0
|
db.datasz = 0
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NOTE: This function is copied from stdlib because it is not available on darwin.
|
||||||
|
func madvise(b []byte, advice int) (err error) {
|
||||||
|
_, _, e1 := syscall.Syscall(syscall.SYS_MADVISE, uintptr(unsafe.Pointer(&b[0])), uintptr(len(b)), uintptr(advice))
|
||||||
|
if e1 != 0 {
|
||||||
|
err = e1
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,101 @@
|
||||||
|
|
||||||
|
package bolt
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"os"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
"unsafe"
|
||||||
|
"golang.org/x/sys/unix"
|
||||||
|
)
|
||||||
|
|
||||||
|
// flock acquires an advisory lock on a file descriptor.
|
||||||
|
func flock(f *os.File, exclusive bool, timeout time.Duration) error {
|
||||||
|
var t time.Time
|
||||||
|
for {
|
||||||
|
// If we're beyond our timeout then return an error.
|
||||||
|
// This can only occur after we've attempted a flock once.
|
||||||
|
if t.IsZero() {
|
||||||
|
t = time.Now()
|
||||||
|
} else if timeout > 0 && time.Since(t) > timeout {
|
||||||
|
return ErrTimeout
|
||||||
|
}
|
||||||
|
var lock syscall.Flock_t
|
||||||
|
lock.Start = 0
|
||||||
|
lock.Len = 0
|
||||||
|
lock.Pid = 0
|
||||||
|
lock.Whence = 0
|
||||||
|
lock.Pid = 0
|
||||||
|
if exclusive {
|
||||||
|
lock.Type = syscall.F_WRLCK
|
||||||
|
} else {
|
||||||
|
lock.Type = syscall.F_RDLCK
|
||||||
|
}
|
||||||
|
err := syscall.FcntlFlock(f.Fd(), syscall.F_SETLK, &lock)
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
} else if err != syscall.EAGAIN {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a bit and try again.
|
||||||
|
time.Sleep(50 * time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// funlock releases an advisory lock on a file descriptor.
|
||||||
|
func funlock(f *os.File) error {
|
||||||
|
var lock syscall.Flock_t
|
||||||
|
lock.Start = 0
|
||||||
|
lock.Len = 0
|
||||||
|
lock.Type = syscall.F_UNLCK
|
||||||
|
lock.Whence = 0
|
||||||
|
return syscall.FcntlFlock(uintptr(f.Fd()), syscall.F_SETLK, &lock)
|
||||||
|
}
|
||||||
|
|
||||||
|
// mmap memory maps a DB's data file.
|
||||||
|
func mmap(db *DB, sz int) error {
|
||||||
|
// Truncate and fsync to ensure file size metadata is flushed.
|
||||||
|
// https://github.com/boltdb/bolt/issues/284
|
||||||
|
if !db.NoGrowSync && !db.readOnly {
|
||||||
|
if err := db.file.Truncate(int64(sz)); err != nil {
|
||||||
|
return fmt.Errorf("file resize error: %s", err)
|
||||||
|
}
|
||||||
|
if err := db.file.Sync(); err != nil {
|
||||||
|
return fmt.Errorf("file sync error: %s", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Map the data file to memory.
|
||||||
|
b, err := unix.Mmap(int(db.file.Fd()), 0, sz, syscall.PROT_READ, syscall.MAP_SHARED)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Advise the kernel that the mmap is accessed randomly.
|
||||||
|
if err := unix.Madvise(b, syscall.MADV_RANDOM); err != nil {
|
||||||
|
return fmt.Errorf("madvise: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Save the original byte slice and convert to a byte array pointer.
|
||||||
|
db.dataref = b
|
||||||
|
db.data = (*[maxMapSize]byte)(unsafe.Pointer(&b[0]))
|
||||||
|
db.datasz = sz
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// munmap unmaps a DB's data file from memory.
|
||||||
|
func munmap(db *DB) error {
|
||||||
|
// Ignore the unmap if we have no mapped data.
|
||||||
|
if db.dataref == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmap using the original byte slice.
|
||||||
|
err := unix.Munmap(db.dataref)
|
||||||
|
db.dataref = nil
|
||||||
|
db.data = nil
|
||||||
|
db.datasz = 0
|
||||||
|
return err
|
||||||
|
}
|
|
@ -16,7 +16,7 @@ func fdatasync(db *DB) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// flock acquires an advisory lock on a file descriptor.
|
// flock acquires an advisory lock on a file descriptor.
|
||||||
func flock(f *os.File, _ time.Duration) error {
|
func flock(f *os.File, _ bool, _ time.Duration) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,9 +28,11 @@ func funlock(f *os.File) error {
|
||||||
// mmap memory maps a DB's data file.
|
// mmap memory maps a DB's data file.
|
||||||
// Based on: https://github.com/edsrzf/mmap-go
|
// Based on: https://github.com/edsrzf/mmap-go
|
||||||
func mmap(db *DB, sz int) error {
|
func mmap(db *DB, sz int) error {
|
||||||
// Truncate the database to the size of the mmap.
|
if !db.readOnly {
|
||||||
if err := db.file.Truncate(int64(sz)); err != nil {
|
// Truncate the database to the size of the mmap.
|
||||||
return fmt.Errorf("truncate: %s", err)
|
if err := db.file.Truncate(int64(sz)); err != nil {
|
||||||
|
return fmt.Errorf("truncate: %s", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open a file mapping handle.
|
// Open a file mapping handle.
|
||||||
|
|
|
@ -99,6 +99,7 @@ func (b *Bucket) Cursor() *Cursor {
|
||||||
|
|
||||||
// Bucket retrieves a nested bucket by name.
|
// Bucket retrieves a nested bucket by name.
|
||||||
// Returns nil if the bucket does not exist.
|
// Returns nil if the bucket does not exist.
|
||||||
|
// The bucket instance is only valid for the lifetime of the transaction.
|
||||||
func (b *Bucket) Bucket(name []byte) *Bucket {
|
func (b *Bucket) Bucket(name []byte) *Bucket {
|
||||||
if b.buckets != nil {
|
if b.buckets != nil {
|
||||||
if child := b.buckets[string(name)]; child != nil {
|
if child := b.buckets[string(name)]; child != nil {
|
||||||
|
@ -148,6 +149,7 @@ func (b *Bucket) openBucket(value []byte) *Bucket {
|
||||||
|
|
||||||
// CreateBucket creates a new bucket at the given key and returns the new bucket.
|
// CreateBucket creates a new bucket at the given key and returns the new bucket.
|
||||||
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
|
// Returns an error if the key already exists, if the bucket name is blank, or if the bucket name is too long.
|
||||||
|
// The bucket instance is only valid for the lifetime of the transaction.
|
||||||
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
|
func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
|
||||||
if b.tx.db == nil {
|
if b.tx.db == nil {
|
||||||
return nil, ErrTxClosed
|
return nil, ErrTxClosed
|
||||||
|
@ -192,6 +194,7 @@ func (b *Bucket) CreateBucket(key []byte) (*Bucket, error) {
|
||||||
|
|
||||||
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
|
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist and returns a reference to it.
|
||||||
// Returns an error if the bucket name is blank, or if the bucket name is too long.
|
// Returns an error if the bucket name is blank, or if the bucket name is too long.
|
||||||
|
// The bucket instance is only valid for the lifetime of the transaction.
|
||||||
func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
|
func (b *Bucket) CreateBucketIfNotExists(key []byte) (*Bucket, error) {
|
||||||
child, err := b.CreateBucket(key)
|
child, err := b.CreateBucket(key)
|
||||||
if err == ErrBucketExists {
|
if err == ErrBucketExists {
|
||||||
|
@ -252,6 +255,7 @@ func (b *Bucket) DeleteBucket(key []byte) error {
|
||||||
|
|
||||||
// Get retrieves the value for a key in the bucket.
|
// Get retrieves the value for a key in the bucket.
|
||||||
// Returns a nil value if the key does not exist or if the key is a nested bucket.
|
// Returns a nil value if the key does not exist or if the key is a nested bucket.
|
||||||
|
// The returned value is only valid for the life of the transaction.
|
||||||
func (b *Bucket) Get(key []byte) []byte {
|
func (b *Bucket) Get(key []byte) []byte {
|
||||||
k, v, flags := b.Cursor().seek(key)
|
k, v, flags := b.Cursor().seek(key)
|
||||||
|
|
||||||
|
@ -332,6 +336,12 @@ func (b *Bucket) NextSequence() (uint64, error) {
|
||||||
return 0, ErrTxNotWritable
|
return 0, ErrTxNotWritable
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Materialize the root node if it hasn't been already so that the
|
||||||
|
// bucket will be saved during commit.
|
||||||
|
if b.rootNode == nil {
|
||||||
|
_ = b.node(b.root, nil)
|
||||||
|
}
|
||||||
|
|
||||||
// Increment and return the sequence.
|
// Increment and return the sequence.
|
||||||
b.bucket.sequence++
|
b.bucket.sequence++
|
||||||
return b.bucket.sequence, nil
|
return b.bucket.sequence, nil
|
||||||
|
@ -339,7 +349,8 @@ func (b *Bucket) NextSequence() (uint64, error) {
|
||||||
|
|
||||||
// ForEach executes a function for each key/value pair in a bucket.
|
// ForEach executes a function for each key/value pair in a bucket.
|
||||||
// If the provided function returns an error then the iteration is stopped and
|
// If the provided function returns an error then the iteration is stopped and
|
||||||
// the error is returned to the caller.
|
// the error is returned to the caller. The provided function must not modify
|
||||||
|
// the bucket; this will result in undefined behavior.
|
||||||
func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
|
func (b *Bucket) ForEach(fn func(k, v []byte) error) error {
|
||||||
if b.tx.db == nil {
|
if b.tx.db == nil {
|
||||||
return ErrTxClosed
|
return ErrTxClosed
|
||||||
|
@ -511,8 +522,12 @@ func (b *Bucket) spill() error {
|
||||||
// Update parent node.
|
// Update parent node.
|
||||||
var c = b.Cursor()
|
var c = b.Cursor()
|
||||||
k, _, flags := c.seek([]byte(name))
|
k, _, flags := c.seek([]byte(name))
|
||||||
_assert(bytes.Equal([]byte(name), k), "misplaced bucket header: %x -> %x", []byte(name), k)
|
if !bytes.Equal([]byte(name), k) {
|
||||||
_assert(flags&bucketLeafFlag != 0, "unexpected bucket header flag: %x", flags)
|
panic(fmt.Sprintf("misplaced bucket header: %x -> %x", []byte(name), k))
|
||||||
|
}
|
||||||
|
if flags&bucketLeafFlag == 0 {
|
||||||
|
panic(fmt.Sprintf("unexpected bucket header flag: %x", flags))
|
||||||
|
}
|
||||||
c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
|
c.node().put([]byte(name), []byte(name), value, 0, bucketLeafFlag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -528,7 +543,9 @@ func (b *Bucket) spill() error {
|
||||||
b.rootNode = b.rootNode.root()
|
b.rootNode = b.rootNode.root()
|
||||||
|
|
||||||
// Update the root node for this bucket.
|
// Update the root node for this bucket.
|
||||||
_assert(b.rootNode.pgid < b.tx.meta.pgid, "pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid)
|
if b.rootNode.pgid >= b.tx.meta.pgid {
|
||||||
|
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", b.rootNode.pgid, b.tx.meta.pgid))
|
||||||
|
}
|
||||||
b.root = b.rootNode.pgid
|
b.root = b.rootNode.pgid
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -659,7 +676,9 @@ func (b *Bucket) pageNode(id pgid) (*page, *node) {
|
||||||
// Inline buckets have a fake page embedded in their value so treat them
|
// Inline buckets have a fake page embedded in their value so treat them
|
||||||
// differently. We'll return the rootNode (if available) or the fake page.
|
// differently. We'll return the rootNode (if available) or the fake page.
|
||||||
if b.root == 0 {
|
if b.root == 0 {
|
||||||
_assert(id == 0, "inline bucket non-zero page access(2): %d != 0", id)
|
if id != 0 {
|
||||||
|
panic(fmt.Sprintf("inline bucket non-zero page access(2): %d != 0", id))
|
||||||
|
}
|
||||||
if b.rootNode != nil {
|
if b.rootNode != nil {
|
||||||
return nil, b.rootNode
|
return nil, b.rootNode
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,6 +2,7 @@ package bolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -9,6 +10,8 @@ import (
|
||||||
// Cursors see nested buckets with value == nil.
|
// Cursors see nested buckets with value == nil.
|
||||||
// Cursors can be obtained from a transaction and are valid as long as the transaction is open.
|
// Cursors can be obtained from a transaction and are valid as long as the transaction is open.
|
||||||
//
|
//
|
||||||
|
// Keys and values returned from the cursor are only valid for the life of the transaction.
|
||||||
|
//
|
||||||
// Changing data while traversing with a cursor may cause it to be invalidated
|
// Changing data while traversing with a cursor may cause it to be invalidated
|
||||||
// and return unexpected keys and/or values. You must reposition your cursor
|
// and return unexpected keys and/or values. You must reposition your cursor
|
||||||
// after mutating data.
|
// after mutating data.
|
||||||
|
@ -24,6 +27,7 @@ func (c *Cursor) Bucket() *Bucket {
|
||||||
|
|
||||||
// First moves the cursor to the first item in the bucket and returns its key and value.
|
// First moves the cursor to the first item in the bucket and returns its key and value.
|
||||||
// If the bucket is empty then a nil key and value are returned.
|
// If the bucket is empty then a nil key and value are returned.
|
||||||
|
// The returned key and value are only valid for the life of the transaction.
|
||||||
func (c *Cursor) First() (key []byte, value []byte) {
|
func (c *Cursor) First() (key []byte, value []byte) {
|
||||||
_assert(c.bucket.tx.db != nil, "tx closed")
|
_assert(c.bucket.tx.db != nil, "tx closed")
|
||||||
c.stack = c.stack[:0]
|
c.stack = c.stack[:0]
|
||||||
|
@ -40,6 +44,7 @@ func (c *Cursor) First() (key []byte, value []byte) {
|
||||||
|
|
||||||
// Last moves the cursor to the last item in the bucket and returns its key and value.
|
// Last moves the cursor to the last item in the bucket and returns its key and value.
|
||||||
// If the bucket is empty then a nil key and value are returned.
|
// If the bucket is empty then a nil key and value are returned.
|
||||||
|
// The returned key and value are only valid for the life of the transaction.
|
||||||
func (c *Cursor) Last() (key []byte, value []byte) {
|
func (c *Cursor) Last() (key []byte, value []byte) {
|
||||||
_assert(c.bucket.tx.db != nil, "tx closed")
|
_assert(c.bucket.tx.db != nil, "tx closed")
|
||||||
c.stack = c.stack[:0]
|
c.stack = c.stack[:0]
|
||||||
|
@ -57,6 +62,7 @@ func (c *Cursor) Last() (key []byte, value []byte) {
|
||||||
|
|
||||||
// Next moves the cursor to the next item in the bucket and returns its key and value.
|
// Next moves the cursor to the next item in the bucket and returns its key and value.
|
||||||
// If the cursor is at the end of the bucket then a nil key and value are returned.
|
// If the cursor is at the end of the bucket then a nil key and value are returned.
|
||||||
|
// The returned key and value are only valid for the life of the transaction.
|
||||||
func (c *Cursor) Next() (key []byte, value []byte) {
|
func (c *Cursor) Next() (key []byte, value []byte) {
|
||||||
_assert(c.bucket.tx.db != nil, "tx closed")
|
_assert(c.bucket.tx.db != nil, "tx closed")
|
||||||
k, v, flags := c.next()
|
k, v, flags := c.next()
|
||||||
|
@ -68,6 +74,7 @@ func (c *Cursor) Next() (key []byte, value []byte) {
|
||||||
|
|
||||||
// Prev moves the cursor to the previous item in the bucket and returns its key and value.
|
// Prev moves the cursor to the previous item in the bucket and returns its key and value.
|
||||||
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
|
// If the cursor is at the beginning of the bucket then a nil key and value are returned.
|
||||||
|
// The returned key and value are only valid for the life of the transaction.
|
||||||
func (c *Cursor) Prev() (key []byte, value []byte) {
|
func (c *Cursor) Prev() (key []byte, value []byte) {
|
||||||
_assert(c.bucket.tx.db != nil, "tx closed")
|
_assert(c.bucket.tx.db != nil, "tx closed")
|
||||||
|
|
||||||
|
@ -99,6 +106,7 @@ func (c *Cursor) Prev() (key []byte, value []byte) {
|
||||||
// Seek moves the cursor to a given key and returns it.
|
// Seek moves the cursor to a given key and returns it.
|
||||||
// If the key does not exist then the next key is used. If no keys
|
// If the key does not exist then the next key is used. If no keys
|
||||||
// follow, a nil key is returned.
|
// follow, a nil key is returned.
|
||||||
|
// The returned key and value are only valid for the life of the transaction.
|
||||||
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
|
func (c *Cursor) Seek(seek []byte) (key []byte, value []byte) {
|
||||||
k, v, flags := c.seek(seek)
|
k, v, flags := c.seek(seek)
|
||||||
|
|
||||||
|
@ -228,8 +236,8 @@ func (c *Cursor) next() (key []byte, value []byte, flags uint32) {
|
||||||
// search recursively performs a binary search against a given page/node until it finds a given key.
|
// search recursively performs a binary search against a given page/node until it finds a given key.
|
||||||
func (c *Cursor) search(key []byte, pgid pgid) {
|
func (c *Cursor) search(key []byte, pgid pgid) {
|
||||||
p, n := c.bucket.pageNode(pgid)
|
p, n := c.bucket.pageNode(pgid)
|
||||||
if p != nil {
|
if p != nil && (p.flags&(branchPageFlag|leafPageFlag)) == 0 {
|
||||||
_assert((p.flags&(branchPageFlag|leafPageFlag)) != 0, "invalid page type: %d: %x", p.id, p.flags)
|
panic(fmt.Sprintf("invalid page type: %d: %x", p.id, p.flags))
|
||||||
}
|
}
|
||||||
e := elemRef{page: p, node: n}
|
e := elemRef{page: p, node: n}
|
||||||
c.stack = append(c.stack, e)
|
c.stack = append(c.stack, e)
|
||||||
|
|
|
@ -12,9 +12,6 @@ import (
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
|
||||||
// The smallest size that the mmap can be.
|
|
||||||
const minMmapSize = 1 << 22 // 4MB
|
|
||||||
|
|
||||||
// The largest step that can be taken when remapping the mmap.
|
// The largest step that can be taken when remapping the mmap.
|
||||||
const maxMmapStep = 1 << 30 // 1GB
|
const maxMmapStep = 1 << 30 // 1GB
|
||||||
|
|
||||||
|
@ -30,6 +27,12 @@ const magic uint32 = 0xED0CDAED
|
||||||
// must be synchronzied using the msync(2) syscall.
|
// must be synchronzied using the msync(2) syscall.
|
||||||
const IgnoreNoSync = runtime.GOOS == "openbsd"
|
const IgnoreNoSync = runtime.GOOS == "openbsd"
|
||||||
|
|
||||||
|
// Default values if not set in a DB instance.
|
||||||
|
const (
|
||||||
|
DefaultMaxBatchSize int = 1000
|
||||||
|
DefaultMaxBatchDelay = 10 * time.Millisecond
|
||||||
|
)
|
||||||
|
|
||||||
// DB represents a collection of buckets persisted to a file on disk.
|
// DB represents a collection of buckets persisted to a file on disk.
|
||||||
// All data access is performed through transactions which can be obtained through the DB.
|
// All data access is performed through transactions which can be obtained through the DB.
|
||||||
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
|
// All the functions on DB will return a ErrDatabaseNotOpen if accessed before Open() is called.
|
||||||
|
@ -52,9 +55,33 @@ type DB struct {
|
||||||
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
|
// THIS IS UNSAFE. PLEASE USE WITH CAUTION.
|
||||||
NoSync bool
|
NoSync bool
|
||||||
|
|
||||||
|
// When true, skips the truncate call when growing the database.
|
||||||
|
// Setting this to true is only safe on non-ext3/ext4 systems.
|
||||||
|
// Skipping truncation avoids preallocation of hard drive space and
|
||||||
|
// bypasses a truncate() and fsync() syscall on remapping.
|
||||||
|
//
|
||||||
|
// https://github.com/boltdb/bolt/issues/284
|
||||||
|
NoGrowSync bool
|
||||||
|
|
||||||
|
// MaxBatchSize is the maximum size of a batch. Default value is
|
||||||
|
// copied from DefaultMaxBatchSize in Open.
|
||||||
|
//
|
||||||
|
// If <=0, disables batching.
|
||||||
|
//
|
||||||
|
// Do not change concurrently with calls to Batch.
|
||||||
|
MaxBatchSize int
|
||||||
|
|
||||||
|
// MaxBatchDelay is the maximum delay before a batch starts.
|
||||||
|
// Default value is copied from DefaultMaxBatchDelay in Open.
|
||||||
|
//
|
||||||
|
// If <=0, effectively disables batching.
|
||||||
|
//
|
||||||
|
// Do not change concurrently with calls to Batch.
|
||||||
|
MaxBatchDelay time.Duration
|
||||||
|
|
||||||
path string
|
path string
|
||||||
file *os.File
|
file *os.File
|
||||||
dataref []byte
|
dataref []byte // mmap'ed readonly, write throws SEGV
|
||||||
data *[maxMapSize]byte
|
data *[maxMapSize]byte
|
||||||
datasz int
|
datasz int
|
||||||
meta0 *meta
|
meta0 *meta
|
||||||
|
@ -66,6 +93,9 @@ type DB struct {
|
||||||
freelist *freelist
|
freelist *freelist
|
||||||
stats Stats
|
stats Stats
|
||||||
|
|
||||||
|
batchMu sync.Mutex
|
||||||
|
batch *batch
|
||||||
|
|
||||||
rwlock sync.Mutex // Allows only one writer at a time.
|
rwlock sync.Mutex // Allows only one writer at a time.
|
||||||
metalock sync.Mutex // Protects meta page access.
|
metalock sync.Mutex // Protects meta page access.
|
||||||
mmaplock sync.RWMutex // Protects mmap access during remapping.
|
mmaplock sync.RWMutex // Protects mmap access during remapping.
|
||||||
|
@ -74,6 +104,10 @@ type DB struct {
|
||||||
ops struct {
|
ops struct {
|
||||||
writeAt func(b []byte, off int64) (n int, err error)
|
writeAt func(b []byte, off int64) (n int, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Read only mode.
|
||||||
|
// When true, Update() and Begin(true) return ErrDatabaseReadOnly immediately.
|
||||||
|
readOnly bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Path returns the path to currently open database file.
|
// Path returns the path to currently open database file.
|
||||||
|
@ -101,20 +135,34 @@ func Open(path string, mode os.FileMode, options *Options) (*DB, error) {
|
||||||
if options == nil {
|
if options == nil {
|
||||||
options = DefaultOptions
|
options = DefaultOptions
|
||||||
}
|
}
|
||||||
|
db.NoGrowSync = options.NoGrowSync
|
||||||
|
|
||||||
|
// Set default values for later DB operations.
|
||||||
|
db.MaxBatchSize = DefaultMaxBatchSize
|
||||||
|
db.MaxBatchDelay = DefaultMaxBatchDelay
|
||||||
|
|
||||||
|
flag := os.O_RDWR
|
||||||
|
if options.ReadOnly {
|
||||||
|
flag = os.O_RDONLY
|
||||||
|
db.readOnly = true
|
||||||
|
}
|
||||||
|
|
||||||
// Open data file and separate sync handler for metadata writes.
|
// Open data file and separate sync handler for metadata writes.
|
||||||
db.path = path
|
db.path = path
|
||||||
|
|
||||||
var err error
|
var err error
|
||||||
if db.file, err = os.OpenFile(db.path, os.O_RDWR|os.O_CREATE, mode); err != nil {
|
if db.file, err = os.OpenFile(db.path, flag|os.O_CREATE, mode); err != nil {
|
||||||
_ = db.close()
|
_ = db.close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Lock file so that other processes using Bolt cannot use the database
|
// Lock file so that other processes using Bolt in read-write mode cannot
|
||||||
// at the same time. This would cause corruption since the two processes
|
// use the database at the same time. This would cause corruption since
|
||||||
// would write meta pages and free pages separately.
|
// the two processes would write meta pages and free pages separately.
|
||||||
if err := flock(db.file, options.Timeout); err != nil {
|
// The database file is locked exclusively (only one process can grab the lock)
|
||||||
|
// if !options.ReadOnly.
|
||||||
|
// The database file is locked using the shared lock (more than one process may
|
||||||
|
// hold a lock at the same time) otherwise (options.ReadOnly is set).
|
||||||
|
if err := flock(db.file, !db.readOnly, options.Timeout); err != nil {
|
||||||
_ = db.close()
|
_ = db.close()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -162,16 +210,6 @@ func (db *DB) mmap(minsz int) error {
|
||||||
db.mmaplock.Lock()
|
db.mmaplock.Lock()
|
||||||
defer db.mmaplock.Unlock()
|
defer db.mmaplock.Unlock()
|
||||||
|
|
||||||
// Dereference all mmap references before unmapping.
|
|
||||||
if db.rwtx != nil {
|
|
||||||
db.rwtx.root.dereference()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Unmap existing data before continuing.
|
|
||||||
if err := db.munmap(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
info, err := db.file.Stat()
|
info, err := db.file.Stat()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("mmap stat error: %s", err)
|
return fmt.Errorf("mmap stat error: %s", err)
|
||||||
|
@ -184,7 +222,20 @@ func (db *DB) mmap(minsz int) error {
|
||||||
if size < minsz {
|
if size < minsz {
|
||||||
size = minsz
|
size = minsz
|
||||||
}
|
}
|
||||||
size = db.mmapSize(size)
|
size, err = db.mmapSize(size)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Dereference all mmap references before unmapping.
|
||||||
|
if db.rwtx != nil {
|
||||||
|
db.rwtx.root.dereference()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unmap existing data before continuing.
|
||||||
|
if err := db.munmap(); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
// Memory-map the data file as a byte slice.
|
// Memory-map the data file as a byte slice.
|
||||||
if err := mmap(db, size); err != nil {
|
if err := mmap(db, size); err != nil {
|
||||||
|
@ -215,22 +266,40 @@ func (db *DB) munmap() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// mmapSize determines the appropriate size for the mmap given the current size
|
// mmapSize determines the appropriate size for the mmap given the current size
|
||||||
// of the database. The minimum size is 4MB and doubles until it reaches 1GB.
|
// of the database. The minimum size is 1MB and doubles until it reaches 1GB.
|
||||||
func (db *DB) mmapSize(size int) int {
|
// Returns an error if the new mmap size is greater than the max allowed.
|
||||||
if size <= minMmapSize {
|
func (db *DB) mmapSize(size int) (int, error) {
|
||||||
return minMmapSize
|
// Double the size from 32KB until 1GB.
|
||||||
} else if size < maxMmapStep {
|
for i := uint(15); i <= 30; i++ {
|
||||||
size *= 2
|
if size <= 1<<i {
|
||||||
} else {
|
return 1 << i, nil
|
||||||
size += maxMmapStep
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the requested size is not above the maximum allowed.
|
||||||
|
if size > maxMapSize {
|
||||||
|
return 0, fmt.Errorf("mmap too large")
|
||||||
|
}
|
||||||
|
|
||||||
|
// If larger than 1GB then grow by 1GB at a time.
|
||||||
|
sz := int64(size)
|
||||||
|
if remainder := sz % int64(maxMmapStep); remainder > 0 {
|
||||||
|
sz += int64(maxMmapStep) - remainder
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensure that the mmap size is a multiple of the page size.
|
// Ensure that the mmap size is a multiple of the page size.
|
||||||
if (size % db.pageSize) != 0 {
|
// This should always be true since we're incrementing in MBs.
|
||||||
size = ((size / db.pageSize) + 1) * db.pageSize
|
pageSize := int64(db.pageSize)
|
||||||
|
if (sz % pageSize) != 0 {
|
||||||
|
sz = ((sz / pageSize) + 1) * pageSize
|
||||||
}
|
}
|
||||||
|
|
||||||
return size
|
// If we've exceeded the max size then only grow up to the max size.
|
||||||
|
if sz > maxMapSize {
|
||||||
|
sz = maxMapSize
|
||||||
|
}
|
||||||
|
|
||||||
|
return int(sz), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// init creates a new database file and initializes its meta pages.
|
// init creates a new database file and initializes its meta pages.
|
||||||
|
@ -250,7 +319,6 @@ func (db *DB) init() error {
|
||||||
m.magic = magic
|
m.magic = magic
|
||||||
m.version = version
|
m.version = version
|
||||||
m.pageSize = uint32(db.pageSize)
|
m.pageSize = uint32(db.pageSize)
|
||||||
m.version = version
|
|
||||||
m.freelist = 2
|
m.freelist = 2
|
||||||
m.root = bucket{root: 3}
|
m.root = bucket{root: 3}
|
||||||
m.pgid = 4
|
m.pgid = 4
|
||||||
|
@ -283,8 +351,15 @@ func (db *DB) init() error {
|
||||||
// Close releases all database resources.
|
// Close releases all database resources.
|
||||||
// All transactions must be closed before closing the database.
|
// All transactions must be closed before closing the database.
|
||||||
func (db *DB) Close() error {
|
func (db *DB) Close() error {
|
||||||
|
db.rwlock.Lock()
|
||||||
|
defer db.rwlock.Unlock()
|
||||||
|
|
||||||
db.metalock.Lock()
|
db.metalock.Lock()
|
||||||
defer db.metalock.Unlock()
|
defer db.metalock.Unlock()
|
||||||
|
|
||||||
|
db.mmaplock.RLock()
|
||||||
|
defer db.mmaplock.RUnlock()
|
||||||
|
|
||||||
return db.close()
|
return db.close()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -304,8 +379,11 @@ func (db *DB) close() error {
|
||||||
|
|
||||||
// Close file handles.
|
// Close file handles.
|
||||||
if db.file != nil {
|
if db.file != nil {
|
||||||
// Unlock the file.
|
// No need to unlock read-only file.
|
||||||
_ = funlock(db.file)
|
if !db.readOnly {
|
||||||
|
// Unlock the file.
|
||||||
|
_ = funlock(db.file)
|
||||||
|
}
|
||||||
|
|
||||||
// Close the file descriptor.
|
// Close the file descriptor.
|
||||||
if err := db.file.Close(); err != nil {
|
if err := db.file.Close(); err != nil {
|
||||||
|
@ -323,6 +401,11 @@ func (db *DB) close() error {
|
||||||
// will cause the calls to block and be serialized until the current write
|
// will cause the calls to block and be serialized until the current write
|
||||||
// transaction finishes.
|
// transaction finishes.
|
||||||
//
|
//
|
||||||
|
// Transactions should not be depedent on one another. Opening a read
|
||||||
|
// transaction and a write transaction in the same goroutine can cause the
|
||||||
|
// writer to deadlock because the database periodically needs to re-mmap itself
|
||||||
|
// as it grows and it cannot do that while a read transaction is open.
|
||||||
|
//
|
||||||
// IMPORTANT: You must close read-only transactions after you are finished or
|
// IMPORTANT: You must close read-only transactions after you are finished or
|
||||||
// else the database will not reclaim old pages.
|
// else the database will not reclaim old pages.
|
||||||
func (db *DB) Begin(writable bool) (*Tx, error) {
|
func (db *DB) Begin(writable bool) (*Tx, error) {
|
||||||
|
@ -371,6 +454,11 @@ func (db *DB) beginTx() (*Tx, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (db *DB) beginRWTx() (*Tx, error) {
|
func (db *DB) beginRWTx() (*Tx, error) {
|
||||||
|
// If the database was opened with Options.ReadOnly, return an error.
|
||||||
|
if db.readOnly {
|
||||||
|
return nil, ErrDatabaseReadOnly
|
||||||
|
}
|
||||||
|
|
||||||
// Obtain writer lock. This is released by the transaction when it closes.
|
// Obtain writer lock. This is released by the transaction when it closes.
|
||||||
// This enforces only one writer transaction at a time.
|
// This enforces only one writer transaction at a time.
|
||||||
db.rwlock.Lock()
|
db.rwlock.Lock()
|
||||||
|
@ -501,6 +589,12 @@ func (db *DB) View(fn func(*Tx) error) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync executes fdatasync() against the database file handle.
|
||||||
|
//
|
||||||
|
// This is not necessary under normal operation, however, if you use NoSync
|
||||||
|
// then it allows you to force the database file to sync against the disk.
|
||||||
|
func (db *DB) Sync() error { return fdatasync(db) }
|
||||||
|
|
||||||
// Stats retrieves ongoing performance stats for the database.
|
// Stats retrieves ongoing performance stats for the database.
|
||||||
// This is only updated when a transaction closes.
|
// This is only updated when a transaction closes.
|
||||||
func (db *DB) Stats() Stats {
|
func (db *DB) Stats() Stats {
|
||||||
|
@ -561,18 +655,30 @@ func (db *DB) allocate(count int) (*page, error) {
|
||||||
return p, nil
|
return p, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (db *DB) IsReadOnly() bool {
|
||||||
|
return db.readOnly
|
||||||
|
}
|
||||||
|
|
||||||
// Options represents the options that can be set when opening a database.
|
// Options represents the options that can be set when opening a database.
|
||||||
type Options struct {
|
type Options struct {
|
||||||
// Timeout is the amount of time to wait to obtain a file lock.
|
// Timeout is the amount of time to wait to obtain a file lock.
|
||||||
// When set to zero it will wait indefinitely. This option is only
|
// When set to zero it will wait indefinitely. This option is only
|
||||||
// available on Darwin and Linux.
|
// available on Darwin and Linux.
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
|
|
||||||
|
// Sets the DB.NoGrowSync flag before memory mapping the file.
|
||||||
|
NoGrowSync bool
|
||||||
|
|
||||||
|
// Open database in read-only mode. Uses flock(..., LOCK_SH |LOCK_NB) to
|
||||||
|
// grab a shared lock (UNIX).
|
||||||
|
ReadOnly bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// DefaultOptions represent the options used if nil options are passed into Open().
|
// DefaultOptions represent the options used if nil options are passed into Open().
|
||||||
// No timeout is used which will cause Bolt to wait indefinitely for a lock.
|
// No timeout is used which will cause Bolt to wait indefinitely for a lock.
|
||||||
var DefaultOptions = &Options{
|
var DefaultOptions = &Options{
|
||||||
Timeout: 0,
|
Timeout: 0,
|
||||||
|
NoGrowSync: false,
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stats represents statistics about the database.
|
// Stats represents statistics about the database.
|
||||||
|
@ -647,9 +753,11 @@ func (m *meta) copy(dest *meta) {
|
||||||
|
|
||||||
// write writes the meta onto a page.
|
// write writes the meta onto a page.
|
||||||
func (m *meta) write(p *page) {
|
func (m *meta) write(p *page) {
|
||||||
|
if m.root.root >= m.pgid {
|
||||||
_assert(m.root.root < m.pgid, "root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid)
|
panic(fmt.Sprintf("root bucket pgid (%d) above high water mark (%d)", m.root.root, m.pgid))
|
||||||
_assert(m.freelist < m.pgid, "freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid)
|
} else if m.freelist >= m.pgid {
|
||||||
|
panic(fmt.Sprintf("freelist pgid (%d) above high water mark (%d)", m.freelist, m.pgid))
|
||||||
|
}
|
||||||
|
|
||||||
// Page id is either going to be 0 or 1 which we can determine by the transaction ID.
|
// Page id is either going to be 0 or 1 which we can determine by the transaction ID.
|
||||||
p.id = pgid(m.txid % 2)
|
p.id = pgid(m.txid % 2)
|
||||||
|
@ -675,13 +783,8 @@ func _assert(condition bool, msg string, v ...interface{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func warn(v ...interface{}) {
|
func warn(v ...interface{}) { fmt.Fprintln(os.Stderr, v...) }
|
||||||
fmt.Fprintln(os.Stderr, v...)
|
func warnf(msg string, v ...interface{}) { fmt.Fprintf(os.Stderr, msg+"\n", v...) }
|
||||||
}
|
|
||||||
|
|
||||||
func warnf(msg string, v ...interface{}) {
|
|
||||||
fmt.Fprintf(os.Stderr, msg+"\n", v...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func printstack() {
|
func printstack() {
|
||||||
stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
|
stack := strings.Join(strings.Split(string(debug.Stack()), "\n")[2:], "\n")
|
||||||
|
|
|
@ -36,6 +36,10 @@ var (
|
||||||
// ErrTxClosed is returned when committing or rolling back a transaction
|
// ErrTxClosed is returned when committing or rolling back a transaction
|
||||||
// that has already been committed or rolled back.
|
// that has already been committed or rolled back.
|
||||||
ErrTxClosed = errors.New("tx closed")
|
ErrTxClosed = errors.New("tx closed")
|
||||||
|
|
||||||
|
// ErrDatabaseReadOnly is returned when a mutating transaction is started on a
|
||||||
|
// read-only database.
|
||||||
|
ErrDatabaseReadOnly = errors.New("database is in read-only mode")
|
||||||
)
|
)
|
||||||
|
|
||||||
// These errors can occur when putting or deleting a value or a bucket.
|
// These errors can occur when putting or deleting a value or a bucket.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
package bolt
|
package bolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
@ -47,15 +48,14 @@ func (f *freelist) pending_count() int {
|
||||||
|
|
||||||
// all returns a list of all free ids and all pending ids in one sorted list.
|
// all returns a list of all free ids and all pending ids in one sorted list.
|
||||||
func (f *freelist) all() []pgid {
|
func (f *freelist) all() []pgid {
|
||||||
ids := make([]pgid, len(f.ids))
|
m := make(pgids, 0)
|
||||||
copy(ids, f.ids)
|
|
||||||
|
|
||||||
for _, list := range f.pending {
|
for _, list := range f.pending {
|
||||||
ids = append(ids, list...)
|
m = append(m, list...)
|
||||||
}
|
}
|
||||||
|
|
||||||
sort.Sort(pgids(ids))
|
sort.Sort(m)
|
||||||
return ids
|
return pgids(f.ids).merge(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
// allocate returns the starting page id of a contiguous list of pages of a given size.
|
// allocate returns the starting page id of a contiguous list of pages of a given size.
|
||||||
|
@ -67,7 +67,9 @@ func (f *freelist) allocate(n int) pgid {
|
||||||
|
|
||||||
var initial, previd pgid
|
var initial, previd pgid
|
||||||
for i, id := range f.ids {
|
for i, id := range f.ids {
|
||||||
_assert(id > 1, "invalid page allocation: %d", id)
|
if id <= 1 {
|
||||||
|
panic(fmt.Sprintf("invalid page allocation: %d", id))
|
||||||
|
}
|
||||||
|
|
||||||
// Reset initial page if this is not contiguous.
|
// Reset initial page if this is not contiguous.
|
||||||
if previd == 0 || id-previd != 1 {
|
if previd == 0 || id-previd != 1 {
|
||||||
|
@ -103,13 +105,17 @@ func (f *freelist) allocate(n int) pgid {
|
||||||
// free releases a page and its overflow for a given transaction id.
|
// free releases a page and its overflow for a given transaction id.
|
||||||
// If the page is already free then a panic will occur.
|
// If the page is already free then a panic will occur.
|
||||||
func (f *freelist) free(txid txid, p *page) {
|
func (f *freelist) free(txid txid, p *page) {
|
||||||
_assert(p.id > 1, "cannot free page 0 or 1: %d", p.id)
|
if p.id <= 1 {
|
||||||
|
panic(fmt.Sprintf("cannot free page 0 or 1: %d", p.id))
|
||||||
|
}
|
||||||
|
|
||||||
// Free page and all its overflow pages.
|
// Free page and all its overflow pages.
|
||||||
var ids = f.pending[txid]
|
var ids = f.pending[txid]
|
||||||
for id := p.id; id <= p.id+pgid(p.overflow); id++ {
|
for id := p.id; id <= p.id+pgid(p.overflow); id++ {
|
||||||
// Verify that page is not already free.
|
// Verify that page is not already free.
|
||||||
_assert(!f.cache[id], "page %d already freed", id)
|
if f.cache[id] {
|
||||||
|
panic(fmt.Sprintf("page %d already freed", id))
|
||||||
|
}
|
||||||
|
|
||||||
// Add to the freelist and cache.
|
// Add to the freelist and cache.
|
||||||
ids = append(ids, id)
|
ids = append(ids, id)
|
||||||
|
@ -120,15 +126,17 @@ func (f *freelist) free(txid txid, p *page) {
|
||||||
|
|
||||||
// release moves all page ids for a transaction id (or older) to the freelist.
|
// release moves all page ids for a transaction id (or older) to the freelist.
|
||||||
func (f *freelist) release(txid txid) {
|
func (f *freelist) release(txid txid) {
|
||||||
|
m := make(pgids, 0)
|
||||||
for tid, ids := range f.pending {
|
for tid, ids := range f.pending {
|
||||||
if tid <= txid {
|
if tid <= txid {
|
||||||
// Move transaction's pending pages to the available freelist.
|
// Move transaction's pending pages to the available freelist.
|
||||||
// Don't remove from the cache since the page is still free.
|
// Don't remove from the cache since the page is still free.
|
||||||
f.ids = append(f.ids, ids...)
|
m = append(m, ids...)
|
||||||
delete(f.pending, tid)
|
delete(f.pending, tid)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
sort.Sort(pgids(f.ids))
|
sort.Sort(m)
|
||||||
|
f.ids = pgids(f.ids).merge(m)
|
||||||
}
|
}
|
||||||
|
|
||||||
// rollback removes the pages from a given pending tx.
|
// rollback removes the pages from a given pending tx.
|
||||||
|
|
|
@ -2,6 +2,7 @@ package bolt
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"sort"
|
"sort"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
@ -70,7 +71,9 @@ func (n *node) pageElementSize() int {
|
||||||
|
|
||||||
// childAt returns the child node at a given index.
|
// childAt returns the child node at a given index.
|
||||||
func (n *node) childAt(index int) *node {
|
func (n *node) childAt(index int) *node {
|
||||||
_assert(!n.isLeaf, "invalid childAt(%d) on a leaf node", index)
|
if n.isLeaf {
|
||||||
|
panic(fmt.Sprintf("invalid childAt(%d) on a leaf node", index))
|
||||||
|
}
|
||||||
return n.bucket.node(n.inodes[index].pgid, n)
|
return n.bucket.node(n.inodes[index].pgid, n)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,9 +114,13 @@ func (n *node) prevSibling() *node {
|
||||||
|
|
||||||
// put inserts a key/value.
|
// put inserts a key/value.
|
||||||
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
|
func (n *node) put(oldKey, newKey, value []byte, pgid pgid, flags uint32) {
|
||||||
_assert(pgid < n.bucket.tx.meta.pgid, "pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid)
|
if pgid >= n.bucket.tx.meta.pgid {
|
||||||
_assert(len(oldKey) > 0, "put: zero-length old key")
|
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", pgid, n.bucket.tx.meta.pgid))
|
||||||
_assert(len(newKey) > 0, "put: zero-length new key")
|
} else if len(oldKey) <= 0 {
|
||||||
|
panic("put: zero-length old key")
|
||||||
|
} else if len(newKey) <= 0 {
|
||||||
|
panic("put: zero-length new key")
|
||||||
|
}
|
||||||
|
|
||||||
// Find insertion index.
|
// Find insertion index.
|
||||||
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })
|
index := sort.Search(len(n.inodes), func(i int) bool { return bytes.Compare(n.inodes[i].key, oldKey) != -1 })
|
||||||
|
@ -189,7 +196,9 @@ func (n *node) write(p *page) {
|
||||||
p.flags |= branchPageFlag
|
p.flags |= branchPageFlag
|
||||||
}
|
}
|
||||||
|
|
||||||
_assert(len(n.inodes) < 0xFFFF, "inode overflow: %d (pgid=%d)", len(n.inodes), p.id)
|
if len(n.inodes) >= 0xFFFF {
|
||||||
|
panic(fmt.Sprintf("inode overflow: %d (pgid=%d)", len(n.inodes), p.id))
|
||||||
|
}
|
||||||
p.count = uint16(len(n.inodes))
|
p.count = uint16(len(n.inodes))
|
||||||
|
|
||||||
// Loop over each item and write it to the page.
|
// Loop over each item and write it to the page.
|
||||||
|
@ -212,11 +221,20 @@ func (n *node) write(p *page) {
|
||||||
_assert(elem.pgid != p.id, "write: circular dependency occurred")
|
_assert(elem.pgid != p.id, "write: circular dependency occurred")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If the length of key+value is larger than the max allocation size
|
||||||
|
// then we need to reallocate the byte array pointer.
|
||||||
|
//
|
||||||
|
// See: https://github.com/boltdb/bolt/pull/335
|
||||||
|
klen, vlen := len(item.key), len(item.value)
|
||||||
|
if len(b) < klen+vlen {
|
||||||
|
b = (*[maxAllocSize]byte)(unsafe.Pointer(&b[0]))[:]
|
||||||
|
}
|
||||||
|
|
||||||
// Write data for the element to the end of the page.
|
// Write data for the element to the end of the page.
|
||||||
copy(b[0:], item.key)
|
copy(b[0:], item.key)
|
||||||
b = b[len(item.key):]
|
b = b[klen:]
|
||||||
copy(b[0:], item.value)
|
copy(b[0:], item.value)
|
||||||
b = b[len(item.value):]
|
b = b[vlen:]
|
||||||
}
|
}
|
||||||
|
|
||||||
// DEBUG ONLY: n.dump()
|
// DEBUG ONLY: n.dump()
|
||||||
|
@ -348,7 +366,9 @@ func (n *node) spill() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Write the node.
|
// Write the node.
|
||||||
_assert(p.id < tx.meta.pgid, "pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid)
|
if p.id >= tx.meta.pgid {
|
||||||
|
panic(fmt.Sprintf("pgid (%d) above high water mark (%d)", p.id, tx.meta.pgid))
|
||||||
|
}
|
||||||
node.pgid = p.id
|
node.pgid = p.id
|
||||||
node.write(p)
|
node.write(p)
|
||||||
node.spilled = true
|
node.spilled = true
|
||||||
|
|
|
@ -3,12 +3,12 @@ package bolt
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
|
"sort"
|
||||||
"unsafe"
|
"unsafe"
|
||||||
)
|
)
|
||||||
|
|
||||||
const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr))
|
const pageHeaderSize = int(unsafe.Offsetof(((*page)(nil)).ptr))
|
||||||
|
|
||||||
const maxAllocSize = 0xFFFFFFF
|
|
||||||
const minKeysPerPage = 2
|
const minKeysPerPage = 2
|
||||||
|
|
||||||
const branchPageElementSize = int(unsafe.Sizeof(branchPageElement{}))
|
const branchPageElementSize = int(unsafe.Sizeof(branchPageElement{}))
|
||||||
|
@ -97,7 +97,7 @@ type branchPageElement struct {
|
||||||
// key returns a byte slice of the node key.
|
// key returns a byte slice of the node key.
|
||||||
func (n *branchPageElement) key() []byte {
|
func (n *branchPageElement) key() []byte {
|
||||||
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
|
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
|
||||||
return buf[n.pos : n.pos+n.ksize]
|
return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize]
|
||||||
}
|
}
|
||||||
|
|
||||||
// leafPageElement represents a node on a leaf page.
|
// leafPageElement represents a node on a leaf page.
|
||||||
|
@ -111,13 +111,13 @@ type leafPageElement struct {
|
||||||
// key returns a byte slice of the node key.
|
// key returns a byte slice of the node key.
|
||||||
func (n *leafPageElement) key() []byte {
|
func (n *leafPageElement) key() []byte {
|
||||||
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
|
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
|
||||||
return buf[n.pos : n.pos+n.ksize]
|
return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos]))[:n.ksize]
|
||||||
}
|
}
|
||||||
|
|
||||||
// value returns a byte slice of the node value.
|
// value returns a byte slice of the node value.
|
||||||
func (n *leafPageElement) value() []byte {
|
func (n *leafPageElement) value() []byte {
|
||||||
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
|
buf := (*[maxAllocSize]byte)(unsafe.Pointer(n))
|
||||||
return buf[n.pos+n.ksize : n.pos+n.ksize+n.vsize]
|
return (*[maxAllocSize]byte)(unsafe.Pointer(&buf[n.pos+n.ksize]))[:n.vsize]
|
||||||
}
|
}
|
||||||
|
|
||||||
// PageInfo represents human readable information about a page.
|
// PageInfo represents human readable information about a page.
|
||||||
|
@ -133,3 +133,40 @@ type pgids []pgid
|
||||||
func (s pgids) Len() int { return len(s) }
|
func (s pgids) Len() int { return len(s) }
|
||||||
func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
func (s pgids) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
|
||||||
func (s pgids) Less(i, j int) bool { return s[i] < s[j] }
|
func (s pgids) Less(i, j int) bool { return s[i] < s[j] }
|
||||||
|
|
||||||
|
// merge returns the sorted union of a and b.
|
||||||
|
func (a pgids) merge(b pgids) pgids {
|
||||||
|
// Return the opposite slice if one is nil.
|
||||||
|
if len(a) == 0 {
|
||||||
|
return b
|
||||||
|
} else if len(b) == 0 {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a list to hold all elements from both lists.
|
||||||
|
merged := make(pgids, 0, len(a)+len(b))
|
||||||
|
|
||||||
|
// Assign lead to the slice with a lower starting value, follow to the higher value.
|
||||||
|
lead, follow := a, b
|
||||||
|
if b[0] < a[0] {
|
||||||
|
lead, follow = b, a
|
||||||
|
}
|
||||||
|
|
||||||
|
// Continue while there are elements in the lead.
|
||||||
|
for len(lead) > 0 {
|
||||||
|
// Merge largest prefix of lead that is ahead of follow[0].
|
||||||
|
n := sort.Search(len(lead), func(i int) bool { return lead[i] > follow[0] })
|
||||||
|
merged = append(merged, lead[:n]...)
|
||||||
|
if n >= len(lead) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Swap lead and follow.
|
||||||
|
lead, follow = follow, lead[n:]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Append what's left in follow.
|
||||||
|
merged = append(merged, follow...)
|
||||||
|
|
||||||
|
return merged
|
||||||
|
}
|
||||||
|
|
|
@ -87,18 +87,21 @@ func (tx *Tx) Stats() TxStats {
|
||||||
|
|
||||||
// Bucket retrieves a bucket by name.
|
// Bucket retrieves a bucket by name.
|
||||||
// Returns nil if the bucket does not exist.
|
// Returns nil if the bucket does not exist.
|
||||||
|
// The bucket instance is only valid for the lifetime of the transaction.
|
||||||
func (tx *Tx) Bucket(name []byte) *Bucket {
|
func (tx *Tx) Bucket(name []byte) *Bucket {
|
||||||
return tx.root.Bucket(name)
|
return tx.root.Bucket(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateBucket creates a new bucket.
|
// CreateBucket creates a new bucket.
|
||||||
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
|
// Returns an error if the bucket already exists, if the bucket name is blank, or if the bucket name is too long.
|
||||||
|
// The bucket instance is only valid for the lifetime of the transaction.
|
||||||
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
|
func (tx *Tx) CreateBucket(name []byte) (*Bucket, error) {
|
||||||
return tx.root.CreateBucket(name)
|
return tx.root.CreateBucket(name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
|
// CreateBucketIfNotExists creates a new bucket if it doesn't already exist.
|
||||||
// Returns an error if the bucket name is blank, or if the bucket name is too long.
|
// Returns an error if the bucket name is blank, or if the bucket name is too long.
|
||||||
|
// The bucket instance is only valid for the lifetime of the transaction.
|
||||||
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
|
func (tx *Tx) CreateBucketIfNotExists(name []byte) (*Bucket, error) {
|
||||||
return tx.root.CreateBucketIfNotExists(name)
|
return tx.root.CreateBucketIfNotExists(name)
|
||||||
}
|
}
|
||||||
|
@ -127,7 +130,8 @@ func (tx *Tx) OnCommit(fn func()) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Commit writes all changes to disk and updates the meta page.
|
// Commit writes all changes to disk and updates the meta page.
|
||||||
// Returns an error if a disk write error occurs.
|
// Returns an error if a disk write error occurs, or if Commit is
|
||||||
|
// called on a read-only transaction.
|
||||||
func (tx *Tx) Commit() error {
|
func (tx *Tx) Commit() error {
|
||||||
_assert(!tx.managed, "managed tx commit not allowed")
|
_assert(!tx.managed, "managed tx commit not allowed")
|
||||||
if tx.db == nil {
|
if tx.db == nil {
|
||||||
|
@ -203,7 +207,8 @@ func (tx *Tx) Commit() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rollback closes the transaction and ignores all previous updates.
|
// Rollback closes the transaction and ignores all previous updates. Read-only
|
||||||
|
// transactions must be rolled back and not committed.
|
||||||
func (tx *Tx) Rollback() error {
|
func (tx *Tx) Rollback() error {
|
||||||
_assert(!tx.managed, "managed tx rollback not allowed")
|
_assert(!tx.managed, "managed tx rollback not allowed")
|
||||||
if tx.db == nil {
|
if tx.db == nil {
|
||||||
|
@ -234,7 +239,8 @@ func (tx *Tx) close() {
|
||||||
var freelistPendingN = tx.db.freelist.pending_count()
|
var freelistPendingN = tx.db.freelist.pending_count()
|
||||||
var freelistAlloc = tx.db.freelist.size()
|
var freelistAlloc = tx.db.freelist.size()
|
||||||
|
|
||||||
// Remove writer lock.
|
// Remove transaction ref & writer lock.
|
||||||
|
tx.db.rwtx = nil
|
||||||
tx.db.rwlock.Unlock()
|
tx.db.rwlock.Unlock()
|
||||||
|
|
||||||
// Merge statistics.
|
// Merge statistics.
|
||||||
|
@ -248,41 +254,51 @@ func (tx *Tx) close() {
|
||||||
} else {
|
} else {
|
||||||
tx.db.removeTx(tx)
|
tx.db.removeTx(tx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Clear all references.
|
||||||
tx.db = nil
|
tx.db = nil
|
||||||
|
tx.meta = nil
|
||||||
|
tx.root = Bucket{tx: tx}
|
||||||
|
tx.pages = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy writes the entire database to a writer.
|
// Copy writes the entire database to a writer.
|
||||||
// A reader transaction is maintained during the copy so it is safe to continue
|
// This function exists for backwards compatibility. Use WriteTo() in
|
||||||
// using the database while a copy is in progress.
|
|
||||||
// Copy will write exactly tx.Size() bytes into the writer.
|
|
||||||
func (tx *Tx) Copy(w io.Writer) error {
|
func (tx *Tx) Copy(w io.Writer) error {
|
||||||
var f *os.File
|
_, err := tx.WriteTo(w)
|
||||||
var err error
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// WriteTo writes the entire database to a writer.
|
||||||
|
// If err == nil then exactly tx.Size() bytes will be written into the writer.
|
||||||
|
func (tx *Tx) WriteTo(w io.Writer) (n int64, err error) {
|
||||||
// Attempt to open reader directly.
|
// Attempt to open reader directly.
|
||||||
|
var f *os.File
|
||||||
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil {
|
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY|odirect, 0); err != nil {
|
||||||
// Fallback to a regular open if that doesn't work.
|
// Fallback to a regular open if that doesn't work.
|
||||||
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil {
|
if f, err = os.OpenFile(tx.db.path, os.O_RDONLY, 0); err != nil {
|
||||||
return err
|
return 0, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy the meta pages.
|
// Copy the meta pages.
|
||||||
tx.db.metalock.Lock()
|
tx.db.metalock.Lock()
|
||||||
_, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
|
n, err = io.CopyN(w, f, int64(tx.db.pageSize*2))
|
||||||
tx.db.metalock.Unlock()
|
tx.db.metalock.Unlock()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
return fmt.Errorf("meta copy: %s", err)
|
return n, fmt.Errorf("meta copy: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Copy data pages.
|
// Copy data pages.
|
||||||
if _, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2)); err != nil {
|
wn, err := io.CopyN(w, f, tx.Size()-int64(tx.db.pageSize*2))
|
||||||
|
n += wn
|
||||||
|
if err != nil {
|
||||||
_ = f.Close()
|
_ = f.Close()
|
||||||
return err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return f.Close()
|
return n, f.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// CopyFile copies the entire database to file at the given path.
|
// CopyFile copies the entire database to file at the given path.
|
||||||
|
@ -416,15 +432,39 @@ func (tx *Tx) write() error {
|
||||||
// Write pages to disk in order.
|
// Write pages to disk in order.
|
||||||
for _, p := range pages {
|
for _, p := range pages {
|
||||||
size := (int(p.overflow) + 1) * tx.db.pageSize
|
size := (int(p.overflow) + 1) * tx.db.pageSize
|
||||||
buf := (*[maxAllocSize]byte)(unsafe.Pointer(p))[:size]
|
|
||||||
offset := int64(p.id) * int64(tx.db.pageSize)
|
offset := int64(p.id) * int64(tx.db.pageSize)
|
||||||
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update statistics.
|
// Write out page in "max allocation" sized chunks.
|
||||||
tx.stats.Write++
|
ptr := (*[maxAllocSize]byte)(unsafe.Pointer(p))
|
||||||
|
for {
|
||||||
|
// Limit our write to our max allocation size.
|
||||||
|
sz := size
|
||||||
|
if sz > maxAllocSize-1 {
|
||||||
|
sz = maxAllocSize - 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// Write chunk to disk.
|
||||||
|
buf := ptr[:sz]
|
||||||
|
if _, err := tx.db.ops.writeAt(buf, offset); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update statistics.
|
||||||
|
tx.stats.Write++
|
||||||
|
|
||||||
|
// Exit inner for loop if we've written all the chunks.
|
||||||
|
size -= sz
|
||||||
|
if size == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
// Otherwise move offset forward and move pointer to next chunk.
|
||||||
|
offset += int64(sz)
|
||||||
|
ptr = (*[maxAllocSize]byte)(unsafe.Pointer(&ptr[sz]))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ignore file sync if flag is set on DB.
|
||||||
if !tx.db.NoSync || IgnoreNoSync {
|
if !tx.db.NoSync || IgnoreNoSync {
|
||||||
if err := fdatasync(tx.db); err != nil {
|
if err := fdatasync(tx.db); err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
Loading…
Reference in New Issue