-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathhandler.go
More file actions
242 lines (210 loc) · 6.26 KB
/
handler.go
File metadata and controls
242 lines (210 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
package qcloud_im_callback
import (
"sync"
"encoding/json"
"github.com/BPing/go-toolkit/producer_consumer"
)
// 具体事件处理程序
type CallbackHandle func(*CallbackEvent) interface{}
//
// 回调事件处理句柄
// 为某个的事件注册相应的事件处理程序
//
type CallbackHandler struct {
// 事件处理路由,CallbackCommand对应的的处理方式
router map[CallbackCommand]RouterInfo
// 默认的处理程序
// 当Router里面没有注册的理由存在时候,
// 将默认使用本程序处理事件
defaultHandle CallbackHandle
// 开始处理事件之前的钩子
// 如果返回数据不为nil,代表结束处理事件,
// 否则,继续处理事件
beforeHook CallbackHandle
//生产/消费 消费异步事件
producerConsumer producerConsumer.IContainer
// CallbackEvent对象池,避免创建过多对象
eventPool *sync.Pool
}
// 事件处理信息和程序
type RouterInfo struct {
// 异步或者同步
Async bool
// 如果是异步处理,
// 默认的返回的数据。
// 一般为BaseResponse结构体即可
AsyncResponse interface{}
// 处理句柄
// 事件具体处理的程序。
// 如果同步处理,返回的数据将返回到客户端去,如果异步的话,将会忽略
Handle CallbackHandle
}
// 新建回调事件处理句柄(底层消费队列为channel缓冲队列)
func NewCallbackHandler(masterNum, msgEventLen int, defaultHandle CallbackHandle) (*CallbackHandler, error) {
ch := &CallbackHandler{
router: make(map[CallbackCommand]RouterInfo),
defaultHandle: defaultHandle,
eventPool: &sync.Pool{New: func() interface{} {
return NewCallbackEvent("", nil, nil)
}}}
err := ch.InitProducerConsumer(masterNum, msgEventLen, "channel", nil)
return ch, err
}
// 缓存Cache接口
// 封装producerConsumer.ICache
type ICache interface {
producerConsumer.ICache
}
// 新建回调事件处理句柄(底层消费队列为自定义Cache队列)
// @cache ICache
// // 缓存Cache接口
/*type ICache interface {
// BLPOP key1 timeout(秒)
// 移出并获取列表的第一个元素,
// 如果列表没有元素会阻塞列表直到等待超时或发现可弹出元素为止。
BLPop(key string,timeout int64)(map[string]string, error)
// 在列表尾部中添加一个或多个值
RPush(key string,values ... interface{}) (int64, error)
// 获取列表长度
LLen(key string) (int64, error)
}*/
//
func NewCallbackHandlerWithCache(masterNum, msgEventLen int, defaultHandle CallbackHandle,cache ICache) (*CallbackHandler, error) {
ch := &CallbackHandler{
router: make(map[CallbackCommand]RouterInfo),
defaultHandle: defaultHandle,
eventPool: &sync.Pool{New: func() interface{} {
return NewCallbackEvent("", nil, nil)
}}}
err := ch.InitProducerConsumer(masterNum, msgEventLen, "cache", cache)
return ch, err
}
//
// @masterNum 主消费线程数目,必须大于等于1
// @chanLen 消费信息(事件)队列长度
func (ch *CallbackHandler) InitProducerConsumer(masterNum, msgEventLen int, pcType string, cache producerConsumer.ICache) error {
pcConf := producerConsumer.Config{
MsgLen: int64(msgEventLen),
ConsumeFunc: func(msg producerConsumer.IMessage) {
// 处理异步延后处理消息(事件)
event, ok := msg.(*CallbackEvent)
if ok {
event.handle()
}
},
}
if pcType == "cache" {
pcConf.Type = producerConsumer.CacheType
pcConf.CacheInstance = cache
pcConf.Marshal = func(msg producerConsumer.IMessage) ([]byte, error) {
return json.Marshal(msg.(*CallbackEvent))
}
pcConf.Unmarshal = func(msgByte []byte) (producerConsumer.IMessage, error) {
msg := ch.eventPool.Get().(*CallbackEvent)
err := json.Unmarshal(msgByte, msg)
if msg != nil {
msg.Handler = ch
}
return msg, err
}
} else {
// 默认为ChannelType
pcConf.Type = producerConsumer.ChannelType
}
pc, err := producerConsumer.NewContainer(pcConf)
if err != nil {
return err
}
ch.producerConsumer = pc
if masterNum < 1 {
masterNum = 1
}
for i := 0; i < masterNum; i++ {
ch.producerConsumer.Consume()
}
return nil
}
// 注册
// 如果重复注册,新的将覆盖旧的
func (ch *CallbackHandler) Register(cc CallbackCommand, ri RouterInfo) *CallbackHandler {
ch.router[cc] = ri
return ch
}
// 注册默认处理程序
func (ch *CallbackHandler) RegisterDefaultHandle(callbackHandle CallbackHandle) *CallbackHandler {
ch.defaultHandle = callbackHandle
return ch
}
// 注册钩子
func (ch *CallbackHandler) RegisterBeforeHook(beforeHook CallbackHandle) *CallbackHandler {
ch.beforeHook = beforeHook
return ch
}
// 注销事件处理路由信息
func (ch *CallbackHandler) UnRegister(cc CallbackCommand) *CallbackHandler {
delete(ch.router, cc)
return ch
}
// 是否已注册
func (ch *CallbackHandler) Exist(cc CallbackCommand) bool {
_, ok := ch.router[cc]
return ok
}
// 获取事件处理路由信息
func (ch *CallbackHandler) Get(cc CallbackCommand) (RouterInfo, bool) {
ri, ok := ch.router[cc]
return ri, ok
}
// 事件队列处理协程数目情况
func (ch *CallbackHandler) ConsumerNumGoroutine() (master, assistActive int64) {
if nil != ch.producerConsumer {
master, assistActive = ch.producerConsumer.NumGoroutine()
}
return
}
// 处理事件
func (ch *CallbackHandler) Handle(ce *CallbackEvent) interface{} {
if ch.beforeHook != nil {
hr := ch.beforeHook(ce)
if nil != hr {
// 如果钩子有返回数据则代表结束
return hr
}
}
ri, ok := ch.Get(ce.CallbackCommand)
if ok {
if ri.Async {
if nil != ch.producerConsumer {
// 放进消费队列延后处理
ch.producerConsumer.Produce(ce)
}
if nil != ri.AsyncResponse {
return ri.AsyncResponse
}
return ch.defaultHandle(ce)
} else {
defer ch.eventPool.Put(ce)
resp := ri.Handle(ce)
return resp
}
}
defer ch.eventPool.Put(ce)
return ch.defaultHandle(ce)
}
// producerConsumer消费事件时调用
func (ch *CallbackHandler) handle(ce *CallbackEvent) {
defer ch.eventPool.Put(ce)
ri, ok := ch.Get(ce.CallbackCommand)
if ok && ri.Async {
ri.Handle(ce)
}
}
// 新建事件
func (ch *CallbackHandler) NewCallbackEvent(cc CallbackCommand, up *URLParams, body []byte) *CallbackEvent {
ce := ch.eventPool.Get().(*CallbackEvent)
ce.CallbackCommand = cc
ce.URLParams = up
ce.Body = body
ce.Handler = ch
return ce
}