From 8052b7a356fa9264973078afeec096c37fb60454 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 8 Feb 2020 18:31:24 +0100 Subject: [PATCH 1/3] bmt, file: Extract AsyncHasher to separate package --- bmt/bmt.go | 155 --------------------------- bmt/bmt_test.go | 213 ++++--------------------------------- file/hasher/hasher.go | 163 ++++++++++++++++++++++++++++ file/hasher/hasher_test.go | 179 +++++++++++++++++++++++++++++++ 4 files changed, 360 insertions(+), 350 deletions(-) create mode 100644 file/hasher/hasher.go create mode 100644 file/hasher/hasher_test.go diff --git a/bmt/bmt.go b/bmt/bmt.go index 1013fee0fd..056e5f71aa 100644 --- a/bmt/bmt.go +++ b/bmt/bmt.go @@ -20,7 +20,6 @@ package bmt import ( "context" "encoding/binary" - "errors" "fmt" "hash" "strings" @@ -430,160 +429,6 @@ func (h *Hasher) releaseTree() { }() } -// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes -// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters -func NewAsyncHasher(ctx context.Context, h *Hasher, double bool, errFunc func(error)) *AsyncHasher { - secsize := h.SectionSize() - if double { - secsize *= 2 - } - seccount := h.Branches() - if double { - seccount /= 2 - } - return &AsyncHasher{ - Hasher: h, - double: double, - secsize: secsize, - seccount: seccount, - ctx: ctx, - errFunc: errFunc, - } -} - -// AsyncHasher extends BMT Hasher with an asynchronous segment.GetSection() writer interface -// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the -// right indexes and length and the right number of sections -// It is unsafe and does not check indexes and section data lengths -// -// behaviour is undefined if -// * non-final sections are shorter or longer than secsize -// * if final section does not match length -// * write a section with index that is higher than length/secsize -// * set length in Sum call when length/secsize < maxsec -// -// * if Sum() is not called on a Hasher that is fully written -// a process will block, can be terminated with Reset -// * it will not leak processes if not all sections are written but it blocks -// and keeps the resource which can be released calling Reset() -type AsyncHasher struct { - *Hasher // extends the Hasher - mtx sync.Mutex // to lock the cursor access - double bool // whether to use double segments (call Hasher.writeSection) - secsize int // size of base section (size of hash or double) - seccount int // base section count - write func(i int, section []byte, final bool) - errFunc func(error) - ctx context.Context - all bool // if all written in one go, temporary workaround -} - -func (sw *AsyncHasher) raiseError(err string) { - if sw.errFunc != nil { - sw.errFunc(errors.New(err)) - } -} - -// Reset implements file.SectionWriter -func (sw *AsyncHasher) Reset() { - sw.all = false - sw.Hasher.Reset() -} - -// SectionSize implements file.SectionWriter -func (sw *AsyncHasher) SectionSize() int { - return sw.secsize -} - -// Branches implements file.SectionWriter -func (sw *AsyncHasher) Branches() int { - return sw.seccount -} - -// WriteSection writes the i-th section of the BMT base -// this function can and is meant to be called concurrently -// it sets max segment threadsafely -func (sw *AsyncHasher) WriteIndexed(i int, section []byte) { - sw.mtx.Lock() - defer sw.mtx.Unlock() - t := sw.GetTree() - // cursor keeps track of the rightmost.GetSection() written so far - // if index is lower than cursor then just write non-final section as is - if i < sw.Hasher.GetCursor() { - // if index is not the rightmost, safe to write section - go sw.WriteSection(i, section, sw.double, false) - return - } - // if there is a previous rightmost.GetSection() safe to write section - if t.GetOffset() > 0 { - if i == sw.Hasher.GetCursor() { - // i==cursor implies cursor was set by Hash call so we can write section as final one - // since it can be shorter, first we copy it to the padded buffer - //t.GetSection() = make([]byte, sw.secsize) - //copy(t.GetSection(), section) - // TODO: Consider whether the section here needs to be copied, maybe we can enforce not change the original slice - copySection := make([]byte, sw.secsize) - copy(copySection, section) - t.SetSection(copySection) - go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) - return - } - // the rightmost section just changed, so we write the previous one as non-final - go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) - } - // set i as the index of the righmost.GetSection() written so far - // set t.GetOffset() to cursor*secsize+1 - sw.Hasher.SetCursor(i) - t.SetOffset(i*sw.secsize + 1) - copySection := make([]byte, sw.secsize) - copy(copySection, section) - t.SetSection(copySection) -} - -// Sum can be called any time once the length and the span is known -// potentially even before all segments have been written -// in such cases Sum will block until all segments are present and -// the hash for the length can be calculated. -// -// b: digest is appended to b -// length: known length of the input (unsafe; undefined if out of range) -// meta: metadata to hash together with BMT root for the final digest -// e.g., span for protection against existential forgery -func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { - sw.mtx.Lock() - t := sw.GetTree() - if length == 0 { - sw.ReleaseTree() - sw.mtx.Unlock() - s = sw.Hasher.GetZeroHash() - return - } else { - // for non-zero input the rightmost.GetSection() is written to the tree asynchronously - // if the actual last.GetSection() has been written (sw.Hasher.GetCursor() == length/t.secsize) - maxsec := (length - 1) / sw.secsize - if t.GetOffset() > 0 { - go sw.Hasher.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, maxsec == sw.Hasher.GetCursor()) - } - // sesw.Hasher.GetCursor() to maxsec so final section is written when it arrives - sw.Hasher.SetCursor(maxsec) - t.SetOffset(length) - // TODO: must this t.result channel be within lock? - result := t.GetResult() - sw.mtx.Unlock() - // wait for the result or reset - s = <-result - } - // relesase the tree back to the pool - meta := t.GetSpan() - sw.ReleaseTree() - // hash together meta and BMT root hash using the pools - hsh := sw.Hasher.GetHasher() - hsh.Reset() - hsh.Write(meta) - hsh.Write(s) - return hsh.Sum(b) -} - // Writesection writes data to the data level in the section at index i. // Setting final to true tells the hasher no further data will be written and prepares the data for h.Sum() // TODO remove double as argument, push responsibility for handling data context to caller diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 2662359ba4..055c10462d 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -18,7 +18,6 @@ package bmt import ( "bytes" - "context" "encoding/binary" "fmt" "math/rand" @@ -27,6 +26,7 @@ import ( "testing" "time" + bmttestutil "github.com/ethersphere/swarm/bmt/testutil" "github.com/ethersphere/swarm/testutil" "golang.org/x/crypto/sha3" ) @@ -35,18 +35,6 @@ func init() { testutil.Init() } -// the actual data length generated (could be longer than max datalength of the BMT) -const bufferSize = 4128 - -const ( - // segmentCount is the maximum number of segments of the underlying chunk - // Should be equal to max-chunk-data-size / hash-size - // Currently set to 128 == 4096 (default chunk size) / 32 (sha3.keccak256 size) - segmentCount = 128 -) - -var counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} - var benchmarkBMTResult []byte // calculates the Keccak256 SHA3 hash of the data @@ -98,7 +86,7 @@ func TestRefHasher(t *testing.T) { }, }) - // all segmentCounts in [5,8] should be: + // all bmttestutil.SegmentCounts in [5,8] should be: // // sha3hash( // sha3hash( @@ -123,12 +111,12 @@ func TestRefHasher(t *testing.T) { // run the tests for i, x := range tests { - for segmentCount := x.from; segmentCount <= x.to; segmentCount++ { - for length := 1; length <= segmentCount*32; length++ { - t.Run(fmt.Sprintf("%d_segments_%d_bytes", segmentCount, length), func(t *testing.T) { + for segCount := x.from; segCount <= x.to; segCount++ { + for length := 1; length <= segCount*32; length++ { + t.Run(fmt.Sprintf("%d_segments_%d_bytes", segCount, length), func(t *testing.T) { data := testutil.RandomBytes(i, length) expected := x.expected(data) - actual := NewRefHasher(sha3.NewLegacyKeccak256, segmentCount).Hash(data) + actual := NewRefHasher(sha3.NewLegacyKeccak256, segCount).Hash(data) if !bytes.Equal(actual, expected) { t.Fatalf("expected %x, got %x", expected, actual) } @@ -142,7 +130,7 @@ func TestRefHasher(t *testing.T) { func TestHasherEmptyData(t *testing.T) { hasher := sha3.NewLegacyKeccak256 var data []byte - for _, count := range counts { + for _, count := range bmttestutil.Counts { t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) { pool := NewTreePool(hasher, count, PoolSize) defer pool.Drain(0) @@ -159,12 +147,12 @@ func TestHasherEmptyData(t *testing.T) { // tests sequential write with entire max size written in one go func TestSyncHasherCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, bufferSize) + data := testutil.RandomBytes(1, bmttestutil.BufferSize) hasher := sha3.NewLegacyKeccak256 size := hasher().Size() var err error - for _, count := range counts { + for _, count := range bmttestutil.Counts { t.Run(fmt.Sprintf("segments_%v", count), func(t *testing.T) { max := count * size var incr int @@ -183,46 +171,6 @@ func TestSyncHasherCorrectness(t *testing.T) { } } -// tests order-neutral concurrent writes with entire max size written in one go -func TestAsyncCorrectness(t *testing.T) { - data := testutil.RandomBytes(1, bufferSize) - hasher := sha3.NewLegacyKeccak256 - size := hasher().Size() - whs := []whenHash{first, last, random} - - for _, double := range []bool{false, true} { - for _, wh := range whs { - for _, count := range counts { - t.Run(fmt.Sprintf("double_%v_hash_when_%v_segments_%v", double, wh, count), func(t *testing.T) { - max := count * size - var incr int - capacity := 1 - pool := NewTreePool(hasher, count, capacity) - defer pool.Drain(0) - for n := 1; n <= max; n += incr { - incr = 1 + rand.Intn(5) - bmtobj := New(pool) - d := data[:n] - rbmtobj := NewRefHasher(hasher, count) - expNoMeta := rbmtobj.Hash(d) - h := hasher() - h.Write(ZeroSpan) - h.Write(expNoMeta) - exp := h.Sum(nil) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - sw := NewAsyncHasher(ctx, bmtobj, double, nil) - got := asyncHashRandom(sw, 0, d, wh) - if !bytes.Equal(got, exp) { - t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) - } - } - }) - } - } - } -} - // Tests that the BMT hasher can be synchronously reused with poolsizes 1 and PoolSize func TestHasherReuse(t *testing.T) { t.Run(fmt.Sprintf("poolsize_%d", 1), func(t *testing.T) { @@ -236,14 +184,14 @@ func TestHasherReuse(t *testing.T) { // tests if bmt reuse is not corrupting result func testHasherReuse(poolsize int, t *testing.T) { hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, poolsize) + pool := NewTreePool(hasher, bmttestutil.SegmentCount, poolsize) defer pool.Drain(0) bmt := New(pool) for i := 0; i < 100; i++ { - data := testutil.RandomBytes(1, bufferSize) + data := testutil.RandomBytes(1, bmttestutil.BufferSize) n := rand.Intn(bmt.Size()) - err := testHasherCorrectness(bmt, hasher, data, n, segmentCount) + err := testHasherCorrectness(bmt, hasher, data, n, bmttestutil.SegmentCount) if err != nil { t.Fatal(err) } @@ -253,7 +201,7 @@ func testHasherReuse(poolsize int, t *testing.T) { // Tests if pool can be cleanly reused even in concurrent use by several hasher func TestBMTConcurrentUse(t *testing.T) { hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) + pool := NewTreePool(hasher, bmttestutil.SegmentCount, PoolSize) defer pool.Drain(0) cycles := 100 errc := make(chan error) @@ -261,7 +209,7 @@ func TestBMTConcurrentUse(t *testing.T) { for i := 0; i < cycles; i++ { go func() { bmt := New(pool) - data := testutil.RandomBytes(1, bufferSize) + data := testutil.RandomBytes(1, bmttestutil.BufferSize) n := rand.Intn(bmt.Size()) errc <- testHasherCorrectness(bmt, hasher, data, n, 128) }() @@ -288,7 +236,7 @@ LOOP: func TestBMTWriterBuffers(t *testing.T) { hasher := sha3.NewLegacyKeccak256 - for _, count := range counts { + for _, count := range bmttestutil.Counts { t.Run(fmt.Sprintf("%d_segments", count), func(t *testing.T) { errc := make(chan error) pool := NewTreePool(hasher, count, PoolSize) @@ -394,27 +342,6 @@ func BenchmarkBMT(t *testing.B) { } } -type whenHash = int - -const ( - first whenHash = iota - last - random -) - -func BenchmarkBMTAsync(t *testing.B) { - whs := []whenHash{first, last, random} - for size := 4096; size >= 128; size /= 2 { - for _, wh := range whs { - for _, double := range []bool{false, true} { - t.Run(fmt.Sprintf("double_%v_hash_when_%v_size_%v", double, wh, size), func(t *testing.B) { - benchmarkBMTAsync(t, size, wh, double) - }) - } - } - } -} - func BenchmarkPool(t *testing.B) { caps := []int{1, PoolSize} for size := 4096; size >= 128; size /= 2 { @@ -473,7 +400,7 @@ func benchmarkBMTBaseline(t *testing.B, n int) { func benchmarkBMT(t *testing.B, n int) { data := testutil.RandomBytes(1, n) hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) + pool := NewTreePool(hasher, bmttestutil.SegmentCount, PoolSize) bmt := New(pool) var r []byte @@ -485,32 +412,11 @@ func benchmarkBMT(t *testing.B, n int) { benchmarkBMTResult = r } -// benchmarks BMT hasher with asynchronous concurrent segment/section writes -func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { - data := testutil.RandomBytes(1, n) - hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) - bmth := New(pool) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - bmtobj := NewAsyncHasher(ctx, bmth, double, nil) - idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) - rand.Shuffle(len(idxs), func(i int, j int) { - idxs[i], idxs[j] = idxs[j], idxs[i] - }) - - t.ReportAllocs() - t.ResetTimer() - for i := 0; i < t.N; i++ { - asyncHash(bmtobj, 0, n, wh, idxs, segments) - } -} - // benchmarks 100 concurrent bmt hashes with pool capacity func benchmarkPool(t *testing.B, poolsize, n int) { data := testutil.RandomBytes(1, n) hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, poolsize) + pool := NewTreePool(hasher, bmttestutil.SegmentCount, poolsize) cycles := 100 t.ReportAllocs() @@ -550,70 +456,10 @@ func syncHash(h *Hasher, spanLength int, data []byte) []byte { return h.Sum(nil) } -func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { - l := len(data) - n := l / secsize - if l%secsize > 0 { - n++ - } - for i := 0; i < n; i++ { - idxs = append(idxs, i) - end := (i + 1) * secsize - if end > l { - end = l - } - section := data[i*secsize : end] - segments = append(segments, section) - } - rand.Shuffle(n, func(i int, j int) { - idxs[i], idxs[j] = idxs[j], idxs[i] - }) - return idxs, segments -} - -// splits the input data performs a random shuffle to mock async section writes -func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) { - idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) - return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments) -} - -// mock for async section writes for file.SectionWriter -// requires a permutation (a random shuffle) of list of all indexes of segments -// and writes them in order to the appropriate section -// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) -func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { - bmtobj.Reset() - if l == 0 { - bmtobj.SetSpan(spanLength) - return bmtobj.SumIndexed(nil, l) - } - c := make(chan []byte, 1) - hashf := func() { - bmtobj.SetSpan(spanLength) - c <- bmtobj.SumIndexed(nil, l) - } - maxsize := len(idxs) - var r int - if wh == random { - r = rand.Intn(maxsize) - } - for i, idx := range idxs { - bmtobj.WriteIndexed(idx, segments[idx]) - if (wh == first || wh == random) && i == r { - go hashf() - } - } - if wh == last { - bmtobj.SetSpan(spanLength) - return bmtobj.SumIndexed(nil, l) - } - return <-c -} - // TestUseSyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface func TestUseSyncAsOrdinaryHasher(t *testing.T) { hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) + pool := NewTreePool(hasher, bmttestutil.SegmentCount, PoolSize) bmt := New(pool) bmt.SetSpan(3) bmt.Write([]byte("foo")) @@ -629,26 +475,3 @@ func TestUseSyncAsOrdinaryHasher(t *testing.T) { t.Fatalf("normalhash; expected %x, got %x", refRes, res) } } - -// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface -func TestUseAsyncAsOrdinaryHasher(t *testing.T) { - hasher := sha3.NewLegacyKeccak256 - pool := NewTreePool(hasher, segmentCount, PoolSize) - sbmt := New(pool) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - abmt := NewAsyncHasher(ctx, sbmt, false, nil) - abmt.SetSpan(3) - abmt.Write([]byte("foo")) - res := abmt.Sum(nil) - refh := NewRefHasher(hasher, 128) - resh := refh.Hash([]byte("foo")) - hsub := hasher() - span := LengthToSpan(3) - hsub.Write(span) - hsub.Write(resh) - refRes := hsub.Sum(nil) - if !bytes.Equal(res, refRes) { - t.Fatalf("normalhash; expected %x, got %x", refRes, res) - } -} diff --git a/file/hasher/hasher.go b/file/hasher/hasher.go new file mode 100644 index 0000000000..f74f9b0647 --- /dev/null +++ b/file/hasher/hasher.go @@ -0,0 +1,163 @@ +package file + +import ( + "context" + "errors" + "sync" + + "github.com/ethersphere/swarm/bmt" +) + +// NewAsyncWriter extends Hasher with an interface for concurrent segment.GetSection() writes +// TODO: Instead of explicitly setting double size of segment should be dynamic and chunked internally. If not, we have to keep different bmt hashers generation functions for different purposes in the same instance, or cope with added complexity of bmt hasher generation functions having to receive parameters +func NewAsyncHasher(ctx context.Context, h *bmt.Hasher, double bool, errFunc func(error)) *AsyncHasher { + secsize := h.SectionSize() + if double { + secsize *= 2 + } + seccount := h.Branches() + if double { + seccount /= 2 + } + return &AsyncHasher{ + Hasher: h, + double: double, + secsize: secsize, + seccount: seccount, + ctx: ctx, + errFunc: errFunc, + } +} + +// AsyncHasher extends BMT Hasher with an asynchronous segment.GetSection() writer interface +// AsyncHasher cannot be used as with a hash.Hash interface: It must be used with the +// right indexes and length and the right number of sections +// It is unsafe and does not check indexes and section data lengths +// +// behaviour is undefined if +// * non-final sections are shorter or longer than secsize +// * if final section does not match length +// * write a section with index that is higher than length/secsize +// * set length in Sum call when length/secsize < maxsec +// +// * if Sum() is not called on a Hasher that is fully written +// a process will block, can be terminated with Reset +// * it will not leak processes if not all sections are written but it blocks +// and keeps the resource which can be released calling Reset() +type AsyncHasher struct { + *bmt.Hasher // extends the Hasher + mtx sync.Mutex // to lock the cursor access + double bool // whether to use double segments (call Hasher.writeSection) + secsize int // size of base section (size of hash or double) + seccount int // base section count + write func(i int, section []byte, final bool) + errFunc func(error) + ctx context.Context + all bool // if all written in one go, temporary workaround +} + +func (sw *AsyncHasher) raiseError(err string) { + if sw.errFunc != nil { + sw.errFunc(errors.New(err)) + } +} + +// Reset implements file.SectionWriter +func (sw *AsyncHasher) Reset() { + sw.all = false + sw.Hasher.Reset() +} + +// SectionSize implements file.SectionWriter +func (sw *AsyncHasher) SectionSize() int { + return sw.secsize +} + +// Branches implements file.SectionWriter +func (sw *AsyncHasher) Branches() int { + return sw.seccount +} + +// WriteSection writes the i-th section of the BMT base +// this function can and is meant to be called concurrently +// it sets max segment threadsafely +func (sw *AsyncHasher) WriteIndexed(i int, section []byte) { + sw.mtx.Lock() + defer sw.mtx.Unlock() + t := sw.GetTree() + // cursor keeps track of the rightmost.GetSection() written so far + // if index is lower than cursor then just write non-final section as is + if i < sw.Hasher.GetCursor() { + // if index is not the rightmost, safe to write section + go sw.WriteSection(i, section, sw.double, false) + return + } + // if there is a previous rightmost.GetSection() safe to write section + if t.GetOffset() > 0 { + if i == sw.Hasher.GetCursor() { + // i==cursor implies cursor was set by Hash call so we can write section as final one + // since it can be shorter, first we copy it to the padded buffer + //t.GetSection() = make([]byte, sw.secsize) + //copy(t.GetSection(), section) + // TODO: Consider whether the section here needs to be copied, maybe we can enforce not change the original slice + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) + go sw.Hasher.WriteSection(i, t.GetSection(), sw.double, true) + return + } + // the rightmost section just changed, so we write the previous one as non-final + go sw.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, false) + } + // set i as the index of the righmost.GetSection() written so far + // set t.GetOffset() to cursor*secsize+1 + sw.Hasher.SetCursor(i) + t.SetOffset(i*sw.secsize + 1) + copySection := make([]byte, sw.secsize) + copy(copySection, section) + t.SetSection(copySection) +} + +// Sum can be called any time once the length and the span is known +// potentially even before all segments have been written +// in such cases Sum will block until all segments are present and +// the hash for the length can be calculated. +// +// b: digest is appended to b +// length: known length of the input (unsafe; undefined if out of range) +// meta: metadata to hash together with BMT root for the final digest +// e.g., span for protection against existential forgery +func (sw *AsyncHasher) SumIndexed(b []byte, length int) (s []byte) { + sw.mtx.Lock() + t := sw.GetTree() + if length == 0 { + sw.ReleaseTree() + sw.mtx.Unlock() + s = sw.Hasher.GetZeroHash() + return + } else { + // for non-zero input the rightmost.GetSection() is written to the tree asynchronously + // if the actual last.GetSection() has been written (sw.Hasher.GetCursor() == length/t.secsize) + maxsec := (length - 1) / sw.secsize + if t.GetOffset() > 0 { + go sw.Hasher.WriteSection(sw.Hasher.GetCursor(), t.GetSection(), sw.double, maxsec == sw.Hasher.GetCursor()) + } + // sesw.Hasher.GetCursor() to maxsec so final section is written when it arrives + sw.Hasher.SetCursor(maxsec) + t.SetOffset(length) + // TODO: must this t.result channel be within lock? + result := t.GetResult() + sw.mtx.Unlock() + // wait for the result or reset + s = <-result + } + // relesase the tree back to the pool + meta := t.GetSpan() + sw.ReleaseTree() + // hash together meta and BMT root hash using the pools + hsh := sw.Hasher.GetHasher() + hsh.Reset() + hsh.Write(meta) + hsh.Write(s) + return hsh.Sum(b) +} diff --git a/file/hasher/hasher_test.go b/file/hasher/hasher_test.go new file mode 100644 index 0000000000..579e7950fb --- /dev/null +++ b/file/hasher/hasher_test.go @@ -0,0 +1,179 @@ +package file + +import ( + "bytes" + "context" + "fmt" + "math/rand" + "testing" + + "github.com/ethersphere/swarm/bmt" + bmttestutil "github.com/ethersphere/swarm/bmt/testutil" + "github.com/ethersphere/swarm/testutil" + "golang.org/x/crypto/sha3" +) + +type whenHash = int + +const ( + first whenHash = iota + last + random +) + +func splitAndShuffle(secsize int, data []byte) (idxs []int, segments [][]byte) { + l := len(data) + n := l / secsize + if l%secsize > 0 { + n++ + } + for i := 0; i < n; i++ { + idxs = append(idxs, i) + end := (i + 1) * secsize + if end > l { + end = l + } + section := data[i*secsize : end] + segments = append(segments, section) + } + rand.Shuffle(n, func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + return idxs, segments +} + +// tests order-neutral concurrent writes with entire max size written in one go +func TestAsyncCorrectness(t *testing.T) { + data := testutil.RandomBytes(1, bmttestutil.BufferSize) + hasher := sha3.NewLegacyKeccak256 + size := hasher().Size() + whs := []whenHash{first, last, random} + + for _, double := range []bool{false, true} { + for _, wh := range whs { + for _, count := range bmttestutil.Counts { + t.Run(fmt.Sprintf("double_%v_hash_when_%v_segments_%v", double, wh, count), func(t *testing.T) { + max := count * size + var incr int + capacity := 1 + pool := bmt.NewTreePool(hasher, count, capacity) + defer pool.Drain(0) + for n := 1; n <= max; n += incr { + incr = 1 + rand.Intn(5) + bmtobj := bmt.New(pool) + d := data[:n] + rbmtobj := bmt.NewRefHasher(hasher, count) + expNoMeta := rbmtobj.Hash(d) + h := hasher() + h.Write(bmt.ZeroSpan) + h.Write(expNoMeta) + exp := h.Sum(nil) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + sw := NewAsyncHasher(ctx, bmtobj, double, nil) + got := asyncHashRandom(sw, 0, d, wh) + if !bytes.Equal(got, exp) { + t.Fatalf("wrong async hash (asyncpart) for datalength %v: expected %x, got %x", n, exp, got) + } + } + }) + } + } + } +} + +func BenchmarkBMTAsync(t *testing.B) { + whs := []whenHash{first, last, random} + for size := 4096; size >= 128; size /= 2 { + for _, wh := range whs { + for _, double := range []bool{false, true} { + t.Run(fmt.Sprintf("double_%v_hash_when_%v_size_%v", double, wh, size), func(t *testing.B) { + benchmarkBMTAsync(t, size, wh, double) + }) + } + } + } +} + +// splits the input data performs a random shuffle to mock async section writes +func asyncHashRandom(bmtobj *AsyncHasher, spanLength int, data []byte, wh whenHash) (s []byte) { + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) + return asyncHash(bmtobj, spanLength, len(data), wh, idxs, segments) +} + +// mock for async section writes for file.SectionWriter +// requires a permutation (a random shuffle) of list of all indexes of segments +// and writes them in order to the appropriate section +// the Sum function is called according to the wh parameter (first, last, random [relative to segment writes]) +func asyncHash(bmtobj *AsyncHasher, spanLength int, l int, wh whenHash, idxs []int, segments [][]byte) (s []byte) { + bmtobj.Reset() + if l == 0 { + bmtobj.SetSpan(spanLength) + return bmtobj.SumIndexed(nil, l) + } + c := make(chan []byte, 1) + hashf := func() { + bmtobj.SetSpan(spanLength) + c <- bmtobj.SumIndexed(nil, l) + } + maxsize := len(idxs) + var r int + if wh == random { + r = rand.Intn(maxsize) + } + for i, idx := range idxs { + bmtobj.WriteIndexed(idx, segments[idx]) + if (wh == first || wh == random) && i == r { + go hashf() + } + } + if wh == last { + bmtobj.SetSpan(spanLength) + return bmtobj.SumIndexed(nil, l) + } + return <-c +} + +// benchmarks BMT hasher with asynchronous concurrent segment/section writes +func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { + data := testutil.RandomBytes(1, n) + hasher := sha3.NewLegacyKeccak256 + pool := bmt.NewTreePool(hasher, bmttestutil.SegmentCount, bmt.PoolSize) + bmth := bmt.New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + bmtobj := NewAsyncHasher(ctx, bmth, double, nil) + idxs, segments := splitAndShuffle(bmtobj.SectionSize(), data) + rand.Shuffle(len(idxs), func(i int, j int) { + idxs[i], idxs[j] = idxs[j], idxs[i] + }) + + t.ReportAllocs() + t.ResetTimer() + for i := 0; i < t.N; i++ { + asyncHash(bmtobj, 0, n, wh, idxs, segments) + } +} + +// TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface +func TestUseAsyncAsOrdinaryHasher(t *testing.T) { + hasher := sha3.NewLegacyKeccak256 + pool := bmt.NewTreePool(hasher, bmttestutil.SegmentCount, bmt.PoolSize) + sbmt := bmt.New(pool) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + abmt := NewAsyncHasher(ctx, sbmt, false, nil) + abmt.SetSpan(3) + abmt.Write([]byte("foo")) + res := abmt.Sum(nil) + refh := bmt.NewRefHasher(hasher, 128) + resh := refh.Hash([]byte("foo")) + hsub := hasher() + span := bmt.LengthToSpan(3) + hsub.Write(span) + hsub.Write(resh) + refRes := hsub.Sum(nil) + if !bytes.Equal(res, refRes) { + t.Fatalf("normalhash; expected %x, got %x", refRes, res) + } +} From 0a0f5a846e4591c7f445de68acd1235a9aa68895 Mon Sep 17 00:00:00 2001 From: nolash Date: Sat, 8 Feb 2020 18:56:35 +0100 Subject: [PATCH 2/3] bmt: Add missing testutil package --- bmt/testutil/testutil.go | 13 +++++++++++++ 1 file changed, 13 insertions(+) create mode 100644 bmt/testutil/testutil.go diff --git a/bmt/testutil/testutil.go b/bmt/testutil/testutil.go new file mode 100644 index 0000000000..e9c7e23a3e --- /dev/null +++ b/bmt/testutil/testutil.go @@ -0,0 +1,13 @@ +package testutil + +// the actual data length generated (could be longer than max datalength of the BMT) +const BufferSize = 4128 + +const ( + // segmentCount is the maximum number of segments of the underlying chunk + // Should be equal to max-chunk-data-size / hash-size + // Currently set to 128 == 4096 (default chunk size) / 32 (sha3.keccak256 size) + SegmentCount = 128 +) + +var Counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} From 9799cfb842f64984ddca9ee8c787ab9cfa18184c Mon Sep 17 00:00:00 2001 From: nolash Date: Tue, 11 Feb 2020 11:32:10 +0100 Subject: [PATCH 3/3] bmt, file: Add copyright, avoid optimz of async hasher test --- bmt/bmt_bench_test.go | 16 ++++++++++++++++ bmt/bmt_test.go | 4 +--- bmt/testutil/testutil.go | 18 ++++++++++++++++++ file/hasher/hasher.go | 16 ++++++++++++++++ file/hasher/hasher_test.go | 20 +++++++++++++++++++- 5 files changed, 70 insertions(+), 4 deletions(-) diff --git a/bmt/bmt_bench_test.go b/bmt/bmt_bench_test.go index 05b8308344..9f10ce1109 100644 --- a/bmt/bmt_bench_test.go +++ b/bmt/bmt_bench_test.go @@ -1,3 +1,19 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + package bmt import ( diff --git a/bmt/bmt_test.go b/bmt/bmt_test.go index 055c10462d..1724ee907d 100644 --- a/bmt/bmt_test.go +++ b/bmt/bmt_test.go @@ -35,8 +35,6 @@ func init() { testutil.Init() } -var benchmarkBMTResult []byte - // calculates the Keccak256 SHA3 hash of the data func sha3hash(data ...[]byte) []byte { h := sha3.NewLegacyKeccak256() @@ -409,7 +407,7 @@ func benchmarkBMT(t *testing.B, n int) { for i := 0; i < t.N; i++ { r = syncHash(bmt, 0, data) } - benchmarkBMTResult = r + bmttestutil.BenchmarkBMTResult = r } // benchmarks 100 concurrent bmt hashes with pool capacity diff --git a/bmt/testutil/testutil.go b/bmt/testutil/testutil.go index e9c7e23a3e..79c64eaf24 100644 --- a/bmt/testutil/testutil.go +++ b/bmt/testutil/testutil.go @@ -1,3 +1,19 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + package testutil // the actual data length generated (could be longer than max datalength of the BMT) @@ -11,3 +27,5 @@ const ( ) var Counts = []int{1, 2, 3, 4, 5, 8, 9, 15, 16, 17, 32, 37, 42, 53, 63, 64, 65, 111, 127, 128} + +var BenchmarkBMTResult []byte diff --git a/file/hasher/hasher.go b/file/hasher/hasher.go index f74f9b0647..9478fb79b8 100644 --- a/file/hasher/hasher.go +++ b/file/hasher/hasher.go @@ -1,3 +1,19 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + package file import ( diff --git a/file/hasher/hasher_test.go b/file/hasher/hasher_test.go index 579e7950fb..babb981ef3 100644 --- a/file/hasher/hasher_test.go +++ b/file/hasher/hasher_test.go @@ -1,3 +1,19 @@ +// Copyright 2020 The Swarm Authors +// This file is part of the Swarm library. +// +// The Swarm library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The Swarm library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the Swarm library. If not, see . + package file import ( @@ -148,11 +164,13 @@ func benchmarkBMTAsync(t *testing.B, n int, wh whenHash, double bool) { idxs[i], idxs[j] = idxs[j], idxs[i] }) + var r []byte t.ReportAllocs() t.ResetTimer() for i := 0; i < t.N; i++ { - asyncHash(bmtobj, 0, n, wh, idxs, segments) + r = asyncHash(bmtobj, 0, n, wh, idxs, segments) } + bmttestutil.BenchmarkBMTResult = r } // TestUseAsyncAsOrdinaryHasher verifies that the bmt.Hasher can be used with the hash.Hash interface