From da506ddd0b011d066ad5968702af1ab365bf08a4 Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Fri, 19 Mar 2021 17:44:02 +0800 Subject: [PATCH 1/2] Transaction support column families, #168 --- transaction.go | 72 ++++++++++++++++++++++++ transactiondb.go | 143 +++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) diff --git a/transaction.go b/transaction.go index 67c9ef09..b1696572 100644 --- a/transaction.go +++ b/transaction.go @@ -63,6 +63,23 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro return NewSlice(cValue, cValLen), nil } +// Get returns the data associated with the key from the database given this transaction and column family. +func (transaction *Transaction) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transaction_get_cf( + transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} + // GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction. func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Slice, error) { var ( @@ -80,6 +97,23 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl return NewSlice(cValue, cValLen), nil } +// GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction and column family. +func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transaction_get_for_update_cf( + transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} + // Put writes data associated with a key to the transaction. func (transaction *Transaction) Put(key, value []byte) error { var ( @@ -97,6 +131,23 @@ func (transaction *Transaction) Put(key, value []byte) error { return nil } +// Put writes data associated with a key to the transaction and column family. +func (transaction *Transaction) PutCF(cf *ColumnFamilyHandle, key, value []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + cValue = byteToChar(value) + ) + C.rocksdb_transaction_put_cf( + transaction.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // Delete removes the data associated with the key from the transaction. func (transaction *Transaction) Delete(key []byte) error { var ( @@ -111,6 +162,20 @@ func (transaction *Transaction) Delete(key []byte) error { return nil } +// Delete removes the data associated with the key from the transaction and column family. +func (transaction *Transaction) DeleteCF(cf *ColumnFamilyHandle, key []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + ) + C.rocksdb_transaction_delete_cf(transaction.c, cf.c, cKey, C.size_t(len(key)), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // NewIterator returns an Iterator over the database that uses the // ReadOptions given. func (transaction *Transaction) NewIterator(opts *ReadOptions) *Iterator { @@ -118,6 +183,13 @@ func (transaction *Transaction) NewIterator(opts *ReadOptions) *Iterator { unsafe.Pointer(C.rocksdb_transaction_create_iterator(transaction.c, opts.c))) } +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given and column family. +func (transaction *Transaction) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transaction_create_iterator_cf(transaction.c, opts.c, cf.c))) +} + // Destroy deallocates the transaction object. func (transaction *Transaction) Destroy() { C.rocksdb_transaction_destroy(transaction.c) diff --git a/transactiondb.go b/transactiondb.go index cfdeac9c..60dbf4bf 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -41,6 +41,87 @@ func OpenTransactionDb( }, nil } +// OpenTransactionDbColumnFamilies opens a database with the specified options. +func OpenTransactionDbColumnFamilies( + opts *Options, + transactionDBOpts *TransactionDBOptions, + name string, + cfNames []string, + cfOpts []*Options, +) (*TransactionDB, []*ColumnFamilyHandle, error) { + numColumnFamilies := len(cfNames) + if numColumnFamilies != len(cfOpts) { + return nil, nil, errors.New("must provide the same number of column family names and options") + } + cNames := make([]*C.char, numColumnFamilies) + for i, s := range cfNames { + cNames[i] = C.CString(s) + } + defer func() { + for _, s := range cNames { + C.free(unsafe.Pointer(s)) + } + }() + + cOpts := make([]*C.rocksdb_options_t, numColumnFamilies) + for i, o := range cfOpts { + cOpts[i] = o.c + } + + var ( + cErr *C.char + cName = C.CString(name) + ) + defer C.free(unsafe.Pointer(cName)) + + cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies) + + db := C.rocksdb_transactiondb_open_column_families( + opts.c, + transactionDBOpts.c, + cName, + C.int(numColumnFamilies), + &cNames[0], + &cOpts[0], + &cHandles[0], + &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, nil, errors.New(C.GoString(cErr)) + } + + cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies) + for i, c := range cHandles { + cfHandles[i] = NewNativeColumnFamilyHandle(c) + } + + return &TransactionDB{ + name: name, + c: db, + opts: opts, + transactionDBOpts: transactionDBOpts, + }, cfHandles, nil +} + +func (db *TransactionDB) CreateColumnFamily(opts *Options, name string) (*ColumnFamilyHandle, error) { + var ( + cErr *C.char + cName = C.CString(name) + ) + defer C.free(unsafe.Pointer(cName)) + + h := C.rocksdb_transactiondb_create_column_family( + db.c, + opts.c, + cName, + &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewNativeColumnFamilyHandle(h), nil +} + // NewSnapshot creates a new snapshot of the database. func (db *TransactionDB) NewSnapshot() *Snapshot { return NewNativeSnapshot(C.rocksdb_transactiondb_create_snapshot(db.c)) @@ -89,6 +170,23 @@ func (db *TransactionDB) Get(opts *ReadOptions, key []byte) (*Slice, error) { return NewSlice(cValue, cValLen), nil } +// Get returns the data associated with the key from the database and column family. +func (db *TransactionDB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transactiondb_get_cf( + db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} + // Put writes data associated with a key to the database. func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { var ( @@ -106,6 +204,23 @@ func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { return nil } +// Put writes data associated with a key to the database and column family. +func (db *TransactionDB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + cValue = byteToChar(value) + ) + C.rocksdb_transactiondb_put_cf( + db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // Delete removes the data associated with the key from the database. func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { var ( @@ -120,6 +235,34 @@ func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { return nil } +// Delete removes the data associated with the key from the database and column family. +func (db *TransactionDB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + ) + C.rocksdb_transactiondb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given. +func (db *TransactionDB) NewIterator(opts *ReadOptions) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transactiondb_create_iterator(db.c, opts.c))) +} + +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given and column family. +func (db *TransactionDB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transactiondb_create_iterator_cf(db.c, opts.c, cf.c))) +} + // NewCheckpoint creates a new Checkpoint for this db. func (db *TransactionDB) NewCheckpoint() (*Checkpoint, error) { var ( From 66e724b301f0b6887d9e2d0d1d595d1515c91a4f Mon Sep 17 00:00:00 2001 From: Flier Lu Date: Mon, 22 Mar 2021 23:18:17 +0800 Subject: [PATCH 2/2] use build tag rocksdb_v6 --- transaction.go | 17 ---------- transaction_v6.go | 26 +++++++++++++++ transactiondb.go | 69 --------------------------------------- transactiondb_v6.go | 78 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+), 86 deletions(-) create mode 100644 transaction_v6.go create mode 100644 transactiondb_v6.go diff --git a/transaction.go b/transaction.go index b1696572..5a2889c1 100644 --- a/transaction.go +++ b/transaction.go @@ -97,23 +97,6 @@ func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Sl return NewSlice(cValue, cValLen), nil } -// GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction and column family. -func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { - var ( - cErr *C.char - cValLen C.size_t - cKey = byteToChar(key) - ) - cValue := C.rocksdb_transaction_get_for_update_cf( - transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, - ) - if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) - return nil, errors.New(C.GoString(cErr)) - } - return NewSlice(cValue, cValLen), nil -} - // Put writes data associated with a key to the transaction. func (transaction *Transaction) Put(key, value []byte) error { var ( diff --git a/transaction_v6.go b/transaction_v6.go new file mode 100644 index 00000000..41630437 --- /dev/null +++ b/transaction_v6.go @@ -0,0 +1,26 @@ +// +build rocksdb_v6 + +package gorocksdb + +import ( + "errors" +) + +import "C" + +// GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction and column family. +func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transaction_get_for_update_cf( + transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} diff --git a/transactiondb.go b/transactiondb.go index 60dbf4bf..5799f40e 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -41,68 +41,6 @@ func OpenTransactionDb( }, nil } -// OpenTransactionDbColumnFamilies opens a database with the specified options. -func OpenTransactionDbColumnFamilies( - opts *Options, - transactionDBOpts *TransactionDBOptions, - name string, - cfNames []string, - cfOpts []*Options, -) (*TransactionDB, []*ColumnFamilyHandle, error) { - numColumnFamilies := len(cfNames) - if numColumnFamilies != len(cfOpts) { - return nil, nil, errors.New("must provide the same number of column family names and options") - } - cNames := make([]*C.char, numColumnFamilies) - for i, s := range cfNames { - cNames[i] = C.CString(s) - } - defer func() { - for _, s := range cNames { - C.free(unsafe.Pointer(s)) - } - }() - - cOpts := make([]*C.rocksdb_options_t, numColumnFamilies) - for i, o := range cfOpts { - cOpts[i] = o.c - } - - var ( - cErr *C.char - cName = C.CString(name) - ) - defer C.free(unsafe.Pointer(cName)) - - cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies) - - db := C.rocksdb_transactiondb_open_column_families( - opts.c, - transactionDBOpts.c, - cName, - C.int(numColumnFamilies), - &cNames[0], - &cOpts[0], - &cHandles[0], - &cErr) - if cErr != nil { - defer C.rocksdb_free(unsafe.Pointer(cErr)) - return nil, nil, errors.New(C.GoString(cErr)) - } - - cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies) - for i, c := range cHandles { - cfHandles[i] = NewNativeColumnFamilyHandle(c) - } - - return &TransactionDB{ - name: name, - c: db, - opts: opts, - transactionDBOpts: transactionDBOpts, - }, cfHandles, nil -} - func (db *TransactionDB) CreateColumnFamily(opts *Options, name string) (*ColumnFamilyHandle, error) { var ( cErr *C.char @@ -256,13 +194,6 @@ func (db *TransactionDB) NewIterator(opts *ReadOptions) *Iterator { unsafe.Pointer(C.rocksdb_transactiondb_create_iterator(db.c, opts.c))) } -// NewIterator returns an Iterator over the database that uses the -// ReadOptions given and column family. -func (db *TransactionDB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator { - return NewNativeIterator( - unsafe.Pointer(C.rocksdb_transactiondb_create_iterator_cf(db.c, opts.c, cf.c))) -} - // NewCheckpoint creates a new Checkpoint for this db. func (db *TransactionDB) NewCheckpoint() (*Checkpoint, error) { var ( diff --git a/transactiondb_v6.go b/transactiondb_v6.go new file mode 100644 index 00000000..53b69d4c --- /dev/null +++ b/transactiondb_v6.go @@ -0,0 +1,78 @@ +// +build rocksdb_v6 + +package gorocksdb + +import ( + "errors" +) + +import "C" + +// OpenTransactionDbColumnFamilies opens a database with the specified options. +func OpenTransactionDbColumnFamilies( + opts *Options, + transactionDBOpts *TransactionDBOptions, + name string, + cfNames []string, + cfOpts []*Options, +) (*TransactionDB, []*ColumnFamilyHandle, error) { + numColumnFamilies := len(cfNames) + if numColumnFamilies != len(cfOpts) { + return nil, nil, errors.New("must provide the same number of column family names and options") + } + cNames := make([]*C.char, numColumnFamilies) + for i, s := range cfNames { + cNames[i] = C.CString(s) + } + defer func() { + for _, s := range cNames { + C.free(unsafe.Pointer(s)) + } + }() + + cOpts := make([]*C.rocksdb_options_t, numColumnFamilies) + for i, o := range cfOpts { + cOpts[i] = o.c + } + + var ( + cErr *C.char + cName = C.CString(name) + ) + defer C.free(unsafe.Pointer(cName)) + + cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies) + + db := C.rocksdb_transactiondb_open_column_families( + opts.c, + transactionDBOpts.c, + cName, + C.int(numColumnFamilies), + &cNames[0], + &cOpts[0], + &cHandles[0], + &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, nil, errors.New(C.GoString(cErr)) + } + + cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies) + for i, c := range cHandles { + cfHandles[i] = NewNativeColumnFamilyHandle(c) + } + + return &TransactionDB{ + name: name, + c: db, + opts: opts, + transactionDBOpts: transactionDBOpts, + }, cfHandles, nil +} + +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given and column family. +func (db *TransactionDB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transactiondb_create_iterator_cf(db.c, opts.c, cf.c))) +}