Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ func main() {
Value: 0,
Usage: "snowflake machine id, 0-1023",
},
&cli.IntFlag{
Name: "incr-step",
Value: 1,
Usage: "auto increment step for cache",
EnvVars: []string{"INCR_STEP"},
},
&cli.StringFlag{
Name: "pk-root",
Value: "/seqs",
Expand All @@ -52,6 +58,7 @@ func main() {
log.Println("machine-id:", c.Int("machine-id"))
log.Println("pk-root:", c.String("pk-root"))
log.Println("uuid-key:", c.String("uuid-key"))
log.Println("incr-step:", c.Int("incr-step"))
// 监听
lis, err := net.Listen("tcp", c.String("listen"))
if err != nil {
Expand Down
86 changes: 60 additions & 26 deletions service.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ const (
)

type server struct {
pkroot string
uuidkey string
machine_id uint64 // 10-bit machine id
ch_proc chan chan uint64
muNext sync.Mutex
pkroot string
uuidkey string
machine_id uint64 // 10-bit machine id
ch_proc chan chan uint64
incr_step int64
incr_pendings map[string][]int64
incr_use_cache bool
muNext sync.Mutex
}

func (s *server) init(c *cli.Context) {
Expand All @@ -43,39 +46,70 @@ func (s *server) init(c *cli.Context) {
s.machine_id = (uint64(c.Int("machine-id")) & MACHINE_ID_MASK) << 12
s.pkroot = c.String("pk-root")
s.uuidkey = c.String("uuid-key")
s.incr_step = int64(c.Int("incr-step"))

if s.incr_step > 1 {
s.incr_use_cache = true
s.incr_pendings = make(map[string][]int64)
}

go s.uuid_task()
}

// get next value of a key, like auto-increment in mysql
func (s *server) Next(ctx context.Context, in *pb.Snowflake_Key) (*pb.Snowflake_Value, error) {
s.muNext.Lock()
defer s.muNext.Unlock()
client := etcdclient.KeysAPI()
key := s.pkroot + "/" + in.Name
for {
// get the key
resp, err := client.Get(context.Background(), key, nil)
if err != nil {
log.Error(err)
return nil, errors.New("Key not exists, need to create first")
}

// get prevValue & prevIndex
prevValue, err := strconv.Atoi(resp.Node.Value)
if err != nil {
log.Error(err)
return nil, errors.New("marlformed value")
if s.incr_use_cache {
if _, ok := s.incr_pendings[in.Name]; !ok {
s.incr_pendings[in.Name] = make([]int64, 0, int(s.incr_step))
}
prevIndex := resp.Node.ModifiedIndex
}

if !s.incr_use_cache || len(s.incr_pendings[in.Name]) == 0 {
client := etcdclient.KeysAPI()
key := s.pkroot + "/" + in.Name
for {
// get the key
resp, err := client.Get(context.Background(), key, nil)
if err != nil {
log.Error(err)
return nil, errors.New("Key not exists, need to create first")
}

// CompareAndSwap
resp, err = client.Set(context.Background(), key, fmt.Sprint(prevValue+1), &etcd.SetOptions{PrevIndex: prevIndex})
if err != nil {
log.Warn(err)
continue
// get prevValue & prevIndex
prevValue, err := strconv.ParseInt(resp.Node.Value, 10, 64)
if err != nil {
log.Error(err)
return nil, errors.New("marlformed value")
}
prevIndex := resp.Node.ModifiedIndex

nextValue := prevValue + s.incr_step
// CompareAndSwap
resp, err = client.Set(context.Background(), key, fmt.Sprint(nextValue), &etcd.SetOptions{PrevIndex: prevIndex})
if err != nil {
log.Warn(err)
continue
}

// cache value list
if s.incr_use_cache {
for i := prevValue + 1; i <= nextValue; i++ {
s.incr_pendings[in.Name] = append(s.incr_pendings[in.Name], i)
}

break
}

return &pb.Snowflake_Value{nextValue}, nil
}
return &pb.Snowflake_Value{int64(prevValue + 1)}, nil
}

nextId := s.incr_pendings[in.Name][0]
s.incr_pendings[in.Name] = s.incr_pendings[in.Name][1:]
return &pb.Snowflake_Value{nextId}, nil
}

// generate an unique uuid
Expand Down