diff --git a/transaction.go b/transaction.go index 67c9ef09..5a2889c1 100644 --- a/transaction.go +++ b/transaction.go @@ -63,6 +63,23 @@ func (transaction *Transaction) Get(opts *ReadOptions, key []byte) (*Slice, erro return NewSlice(cValue, cValLen), nil } +// Get returns the data associated with the key from the database given this transaction and column family. +func (transaction *Transaction) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transaction_get_cf( + transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} + // GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction. func (transaction *Transaction) GetForUpdate(opts *ReadOptions, key []byte) (*Slice, error) { var ( @@ -97,6 +114,23 @@ func (transaction *Transaction) Put(key, value []byte) error { return nil } +// Put writes data associated with a key to the transaction and column family. +func (transaction *Transaction) PutCF(cf *ColumnFamilyHandle, key, value []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + cValue = byteToChar(value) + ) + C.rocksdb_transaction_put_cf( + transaction.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // Delete removes the data associated with the key from the transaction. func (transaction *Transaction) Delete(key []byte) error { var ( @@ -111,6 +145,20 @@ func (transaction *Transaction) Delete(key []byte) error { return nil } +// Delete removes the data associated with the key from the transaction and column family. +func (transaction *Transaction) DeleteCF(cf *ColumnFamilyHandle, key []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + ) + C.rocksdb_transaction_delete_cf(transaction.c, cf.c, cKey, C.size_t(len(key)), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // NewIterator returns an Iterator over the database that uses the // ReadOptions given. func (transaction *Transaction) NewIterator(opts *ReadOptions) *Iterator { @@ -118,6 +166,13 @@ func (transaction *Transaction) NewIterator(opts *ReadOptions) *Iterator { unsafe.Pointer(C.rocksdb_transaction_create_iterator(transaction.c, opts.c))) } +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given and column family. +func (transaction *Transaction) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transaction_create_iterator_cf(transaction.c, opts.c, cf.c))) +} + // Destroy deallocates the transaction object. func (transaction *Transaction) Destroy() { C.rocksdb_transaction_destroy(transaction.c) diff --git a/transaction_v6.go b/transaction_v6.go new file mode 100644 index 00000000..41630437 --- /dev/null +++ b/transaction_v6.go @@ -0,0 +1,26 @@ +// +build rocksdb_v6 + +package gorocksdb + +import ( + "errors" +) + +import "C" + +// GetForUpdate queries the data associated with the key and puts an exclusive lock on the key from the database given this transaction and column family. +func (transaction *Transaction) GetForUpdateCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transaction_get_for_update_cf( + transaction.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, C.uchar(byte(1)) /*exclusive*/, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} diff --git a/transactiondb.go b/transactiondb.go index cfdeac9c..5799f40e 100644 --- a/transactiondb.go +++ b/transactiondb.go @@ -41,6 +41,25 @@ func OpenTransactionDb( }, nil } +func (db *TransactionDB) CreateColumnFamily(opts *Options, name string) (*ColumnFamilyHandle, error) { + var ( + cErr *C.char + cName = C.CString(name) + ) + defer C.free(unsafe.Pointer(cName)) + + h := C.rocksdb_transactiondb_create_column_family( + db.c, + opts.c, + cName, + &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewNativeColumnFamilyHandle(h), nil +} + // NewSnapshot creates a new snapshot of the database. func (db *TransactionDB) NewSnapshot() *Snapshot { return NewNativeSnapshot(C.rocksdb_transactiondb_create_snapshot(db.c)) @@ -89,6 +108,23 @@ func (db *TransactionDB) Get(opts *ReadOptions, key []byte) (*Slice, error) { return NewSlice(cValue, cValLen), nil } +// Get returns the data associated with the key from the database and column family. +func (db *TransactionDB) GetCF(opts *ReadOptions, cf *ColumnFamilyHandle, key []byte) (*Slice, error) { + var ( + cErr *C.char + cValLen C.size_t + cKey = byteToChar(key) + ) + cValue := C.rocksdb_transactiondb_get_cf( + db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cValLen, &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, errors.New(C.GoString(cErr)) + } + return NewSlice(cValue, cValLen), nil +} + // Put writes data associated with a key to the database. func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { var ( @@ -106,6 +142,23 @@ func (db *TransactionDB) Put(opts *WriteOptions, key, value []byte) error { return nil } +// Put writes data associated with a key to the database and column family. +func (db *TransactionDB) PutCF(opts *WriteOptions, cf *ColumnFamilyHandle, key, value []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + cValue = byteToChar(value) + ) + C.rocksdb_transactiondb_put_cf( + db.c, opts.c, cf.c, cKey, C.size_t(len(key)), cValue, C.size_t(len(value)), &cErr, + ) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + // Delete removes the data associated with the key from the database. func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { var ( @@ -120,6 +173,27 @@ func (db *TransactionDB) Delete(opts *WriteOptions, key []byte) error { return nil } +// Delete removes the data associated with the key from the database and column family. +func (db *TransactionDB) DeleteCF(opts *WriteOptions, cf *ColumnFamilyHandle, key []byte) error { + var ( + cErr *C.char + cKey = byteToChar(key) + ) + C.rocksdb_transactiondb_delete_cf(db.c, opts.c, cf.c, cKey, C.size_t(len(key)), &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return errors.New(C.GoString(cErr)) + } + return nil +} + +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given. +func (db *TransactionDB) NewIterator(opts *ReadOptions) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transactiondb_create_iterator(db.c, opts.c))) +} + // NewCheckpoint creates a new Checkpoint for this db. func (db *TransactionDB) NewCheckpoint() (*Checkpoint, error) { var ( diff --git a/transactiondb_v6.go b/transactiondb_v6.go new file mode 100644 index 00000000..53b69d4c --- /dev/null +++ b/transactiondb_v6.go @@ -0,0 +1,78 @@ +// +build rocksdb_v6 + +package gorocksdb + +import ( + "errors" +) + +import "C" + +// OpenTransactionDbColumnFamilies opens a database with the specified options. +func OpenTransactionDbColumnFamilies( + opts *Options, + transactionDBOpts *TransactionDBOptions, + name string, + cfNames []string, + cfOpts []*Options, +) (*TransactionDB, []*ColumnFamilyHandle, error) { + numColumnFamilies := len(cfNames) + if numColumnFamilies != len(cfOpts) { + return nil, nil, errors.New("must provide the same number of column family names and options") + } + cNames := make([]*C.char, numColumnFamilies) + for i, s := range cfNames { + cNames[i] = C.CString(s) + } + defer func() { + for _, s := range cNames { + C.free(unsafe.Pointer(s)) + } + }() + + cOpts := make([]*C.rocksdb_options_t, numColumnFamilies) + for i, o := range cfOpts { + cOpts[i] = o.c + } + + var ( + cErr *C.char + cName = C.CString(name) + ) + defer C.free(unsafe.Pointer(cName)) + + cHandles := make([]*C.rocksdb_column_family_handle_t, numColumnFamilies) + + db := C.rocksdb_transactiondb_open_column_families( + opts.c, + transactionDBOpts.c, + cName, + C.int(numColumnFamilies), + &cNames[0], + &cOpts[0], + &cHandles[0], + &cErr) + if cErr != nil { + defer C.rocksdb_free(unsafe.Pointer(cErr)) + return nil, nil, errors.New(C.GoString(cErr)) + } + + cfHandles := make([]*ColumnFamilyHandle, numColumnFamilies) + for i, c := range cHandles { + cfHandles[i] = NewNativeColumnFamilyHandle(c) + } + + return &TransactionDB{ + name: name, + c: db, + opts: opts, + transactionDBOpts: transactionDBOpts, + }, cfHandles, nil +} + +// NewIterator returns an Iterator over the database that uses the +// ReadOptions given and column family. +func (db *TransactionDB) NewIteratorCF(opts *ReadOptions, cf *ColumnFamilyHandle) *Iterator { + return NewNativeIterator( + unsafe.Pointer(C.rocksdb_transactiondb_create_iterator_cf(db.c, opts.c, cf.c))) +}