Skip to content

Commit 2840500

Browse files
author
xiemalin
committed
add Peek method supporting pagination
1 parent 955a044 commit 2840500

4 files changed

Lines changed: 181 additions & 0 deletions

File tree

filefanoutqueue.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,18 @@ func (q *FileFanoutQueue) PeekAll(fanoutID int64) ([][]byte, []int64, error) {
180180
return q.fileQueue.peekAll(index, q.Size(fanoutID))
181181
}
182182

183+
// PeekPagination to peek data from queue by paing feature.
184+
func (q *FileFanoutQueue) PeekPagination(fanoutID int64, page, pagesize uint64) ([][]byte, []int64, error) {
185+
186+
qf, err := q.getQueueFront(fanoutID)
187+
if err != nil {
188+
return nil, nil, err
189+
}
190+
index := qf.frontIndex
191+
192+
return q.fileQueue.peekPagination(index, q.fileQueue.size(index), page, pagesize)
193+
}
194+
183195
// Skip To skip deqeue target number of items
184196
func (q *FileFanoutQueue) Skip(fanoutID int64, count int64) error {
185197
if count <= 0 {

filefanoutqueue_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,3 +368,63 @@ func TestFanoutQueue_Status(t *testing.T) {
368368
})
369369

370370
}
371+
372+
func TestFanoutQueue_PeekPagination(t *testing.T) {
373+
Convey("Test PeekPagination", t, func() {
374+
path := Tempfile()
375+
defer clearFiles(path, "fanoutqueue")
376+
fanoutID := int64(100)
377+
378+
defer clearFrontIndexFiles(path, "fanoutqueue", fanoutID)
379+
380+
queue := FileFanoutQueue{}
381+
err := queue.Open(path, "fanoutqueue", nil)
382+
if err != nil {
383+
t.Error(err)
384+
}
385+
defer queue.Close()
386+
387+
Convey("test PeekPagination on empty queue", func() {
388+
data, indexs, err := queue.PeekPagination(fanoutID, 0, 0)
389+
So(err, ShouldBeNil)
390+
So(data, ShouldBeEmpty)
391+
So(indexs, ShouldBeEmpty)
392+
393+
data, indexs, err = queue.PeekPagination(fanoutID, 1, 1)
394+
So(err, ShouldBeNil)
395+
So(data, ShouldBeEmpty)
396+
So(indexs, ShouldBeEmpty)
397+
})
398+
399+
Convey("test PeekPagination on items small than pagesize", func() {
400+
for i := 0; i < 5; i++ { // add value
401+
_, err := queue.Enqueue([]byte("hello matthew " + strconv.Itoa(i)))
402+
So(err, ShouldBeNil)
403+
}
404+
405+
data, indexs, err := queue.PeekPagination(fanoutID, 0, 0)
406+
So(err, ShouldBeNil)
407+
So(len(data), ShouldEqual, 5)
408+
So(string(data[4]), ShouldEqual, "hello matthew 4")
409+
So(len(indexs), ShouldEqual, 5)
410+
411+
data, indexs, err = queue.PeekPagination(fanoutID, 1, 10)
412+
So(err, ShouldBeNil)
413+
So(len(data), ShouldEqual, 5)
414+
So(string(data[4]), ShouldEqual, "hello matthew 4")
415+
So(len(indexs), ShouldEqual, 5)
416+
417+
data, indexs, err = queue.PeekPagination(fanoutID, 2, 10) // large paing
418+
So(err, ShouldBeNil)
419+
So(data, ShouldBeEmpty)
420+
So(indexs, ShouldBeEmpty)
421+
422+
data, indexs, err = queue.PeekPagination(fanoutID, 2, 2)
423+
So(err, ShouldBeNil)
424+
So(len(data), ShouldEqual, 2)
425+
So(string(data[1]), ShouldEqual, "hello matthew 3")
426+
So(len(indexs), ShouldEqual, 2)
427+
})
428+
429+
})
430+
}

filequeue.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ const (
4040
fileSuffix = ".dat"
4141

4242
defaultFileMode = 0666
43+
44+
Default_Page_Size = 10
4345
)
4446

4547
// DefaultOptions default options
@@ -435,6 +437,52 @@ func (q *FileQueue) PeekAll() ([][]byte, []int64, error) {
435437
return q.peekAll(index, q.Size())
436438
}
437439

440+
// PeekPagination to peek data from queue by paing feature.
441+
func (q *FileQueue) PeekPagination(page, pagesize uint64) ([][]byte, []int64, error) {
442+
return q.peekPagination(q.frontIndex, q.Size(), page, pagesize)
443+
}
444+
445+
// peekPagination to peek data from queue by paing feature.
446+
func (q *FileQueue) peekPagination(frontindex int64, size int64, page, pagesize uint64) ([][]byte, []int64, error) {
447+
if page == 0 {
448+
page = 1
449+
}
450+
if pagesize == 0 {
451+
pagesize = Default_Page_Size
452+
}
453+
454+
begin := (page - 1) * pagesize
455+
end := begin + pagesize
456+
457+
if begin > uint64(size) { // no data return
458+
return [][]byte{}, []int64{}, nil
459+
}
460+
461+
if end > uint64(size) {
462+
end = uint64(size)
463+
pagesize = end - begin
464+
}
465+
466+
// fix the offset
467+
begin = begin + uint64(frontindex)
468+
end = end + uint64(frontindex)
469+
470+
result := make([][]byte, pagesize)
471+
indexs := make([]int64, pagesize)
472+
473+
var index int = 0
474+
for i := begin; i < end; i++ {
475+
bb, err := q.peek(int64(i))
476+
if err != nil {
477+
return nil, nil, err
478+
}
479+
result[index] = bb
480+
indexs[index] = int64(i)
481+
index++
482+
}
483+
return result, indexs, nil
484+
}
485+
438486
// Skip the target n items to front index
439487
func (q *FileQueue) Skip(count int64) error {
440488
if q.IsEmpty() {

filequeue_test.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bigqueue
22

33
import (
44
"fmt"
5+
"strconv"
56
"strings"
67
"testing"
78
"time"
@@ -431,6 +432,66 @@ func TestFileQueue_Status(t *testing.T) {
431432

432433
}
433434

435+
// TestFileQueue_PeekPagination
436+
func TestFileQueue_PeekPagination(t *testing.T) {
437+
Convey("Test PeekPagination", t, func() {
438+
439+
path := Tempfile()
440+
defer clearFiles(path, "testqueue")
441+
442+
var queue = new(FileQueue)
443+
444+
err := queue.Open(path, "testqueue", nil)
445+
if err != nil {
446+
t.Error(err)
447+
}
448+
defer queue.Close()
449+
450+
Convey("test PeekPagination on empty queue", func() {
451+
data, indexs, err := queue.PeekPagination(0, 0)
452+
So(err, ShouldBeNil)
453+
So(data, ShouldBeEmpty)
454+
So(indexs, ShouldBeEmpty)
455+
456+
data, indexs, err = queue.PeekPagination(1, 1)
457+
So(err, ShouldBeNil)
458+
So(data, ShouldBeEmpty)
459+
So(indexs, ShouldBeEmpty)
460+
})
461+
462+
Convey("test PeekPagination on items small than pagesize", func() {
463+
for i := 0; i < 5; i++ { // add value
464+
_, err := queue.Enqueue([]byte("hello matthew " + strconv.Itoa(i)))
465+
So(err, ShouldBeNil)
466+
}
467+
468+
data, indexs, err := queue.PeekPagination(0, 0)
469+
So(err, ShouldBeNil)
470+
So(len(data), ShouldEqual, 5)
471+
So(string(data[4]), ShouldEqual, "hello matthew 4")
472+
So(len(indexs), ShouldEqual, 5)
473+
474+
data, indexs, err = queue.PeekPagination(1, 10)
475+
So(err, ShouldBeNil)
476+
So(len(data), ShouldEqual, 5)
477+
So(string(data[4]), ShouldEqual, "hello matthew 4")
478+
So(len(indexs), ShouldEqual, 5)
479+
480+
data, indexs, err = queue.PeekPagination(2, 10) // large paing
481+
So(err, ShouldBeNil)
482+
So(data, ShouldBeEmpty)
483+
So(indexs, ShouldBeEmpty)
484+
485+
data, indexs, err = queue.PeekPagination(2, 2)
486+
So(err, ShouldBeNil)
487+
So(len(data), ShouldEqual, 2)
488+
So(string(data[1]), ShouldEqual, "hello matthew 3")
489+
So(len(indexs), ShouldEqual, 2)
490+
})
491+
492+
})
493+
}
494+
434495
// tempfile returns a temporary file path.
435496
func Tempfile() string {
436497
return "./bin/temp"

0 commit comments

Comments
 (0)