Skip to content

Commit bb05876

Browse files
author
Daniele Rolando
committed
Remove mutex - use deque for speed - move implementation in init.lua
1 parent e985a17 commit bb05876

File tree

7 files changed

+204
-55
lines changed

7 files changed

+204
-55
lines changed

.busted

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
return {
22
default = {
3-
lpath = "./lib/?.lua;./lib/?/init.lua;"
3+
lpath = "./lib/?.lua;./lib/?/init.lua;/lib/util/?.lua"
44
}
55
}

lib/cassandra/init.lua

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
local socket = require 'cassandra.socket'
88
local cql = require 'cassandra.cql'
9+
local deque = require 'util.deque'
910

1011
local setmetatable = setmetatable
1112
local requests = cql.requests
@@ -124,18 +125,29 @@ function _Host.new(opts)
124125
local sock, err = socket.tcp()
125126
if err then return nil, err end
126127

128+
local protocol_version = opts.protocol_version or cql.def_protocol_version
129+
130+
-- Initialize stream_ids deque
131+
local max_id = protocol_version < 3 and 2^7-1 or 2^15-1
132+
local stream_ids = deque.new()
133+
134+
for i=1,max_id do
135+
deque.pushright(stream_ids, i)
136+
end
137+
127138
local host = {
128139
sock = sock,
129140
host = opts.host or '127.0.0.1',
130141
port = opts.port or 9042,
131142
keyspace = opts.keyspace,
132-
protocol_version = opts.protocol_version or cql.def_protocol_version,
143+
protocol_version = protocol_version,
133144
ssl = opts.ssl,
134145
verify = opts.verify,
135146
cert = opts.cert,
136147
cafile = opts.cafile,
137148
key = opts.key,
138-
auth = opts.auth
149+
auth = opts.auth,
150+
stream_ids = stream_ids,
139151
}
140152

141153
return setmetatable(host, _Host)
@@ -146,35 +158,58 @@ function _Host:send(request)
146158
return nil, 'no socket created'
147159
end
148160

161+
-- set stream_id
162+
local stream_id, err = deque.popleft(self.stream_ids)
163+
if err == nil then
164+
if request.opts then
165+
request.opts.stream_id = stream_id
166+
else
167+
request.opts = {stream_id = stream_id}
168+
end
169+
end
170+
149171
local frame = request:build_frame(self.protocol_version)
150172
local sent, err = self.sock:send(frame)
151-
if not sent then return nil, err end
173+
if not sent then
174+
deque.pushright(self.stream_ids, stream_id)
175+
return nil, err
176+
end
152177

153178
while true do
154179
-- receive frame version byte
155180
local v_byte, err = self.sock:receive(1)
156-
if not v_byte then return nil, err end
181+
if not v_byte then
182+
deque.pushright(self.stream_ids, stream_id)
183+
return nil, err
184+
end
157185

158186
-- -1 because of the v_byte we just read
159187
local version, n_bytes = cql.frame_reader.version(v_byte)
160188

161189
-- receive frame header
162190
local header_bytes, err = self.sock:receive(n_bytes)
163-
if not header_bytes then return nil, err end
191+
if not header_bytes then
192+
deque.pushright(self.stream_ids, stream_id)
193+
return nil, err
194+
end
164195

165196
local header = cql.frame_reader.read_header(version, header_bytes)
166197

167198
-- receive frame body
168199
local body_bytes
169200
if header.body_length > 0 then
170201
body_bytes, err = self.sock:receive(header.body_length)
171-
if not body_bytes then return nil, err end
202+
if not body_bytes then
203+
deque.pushright(self.stream_ids, stream_id)
204+
return nil, err
205+
end
172206
end
173207

174208
-- If stream_id was set in request.opts, only return a response
175209
-- with a matching stream_id and drop everything else
176-
if not request.opts or not request.opts.stream_id or request.opts.stream_id == header.stream_id then
210+
if stream_id and stream_id == header.stream_id then
177211
-- res, err, cql_err_code
212+
deque.pushright(self.stream_ids, stream_id)
178213
return cql.frame_reader.read_body(header, body_bytes)
179214
end
180215
end

lib/resty/cassandra/cluster.lua

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -387,7 +387,6 @@ function _Cluster.new(opts)
387387
or require('resty.cassandra.policies.reconnection.exp').new(1000, 60000),
388388
retry_policy = opts.retry_policy
389389
or require('resty.cassandra.policies.retry.simple').new(3),
390-
stream_ids = nil,
391390
}, _Cluster)
392391
end
393392

@@ -551,15 +550,6 @@ function _Cluster:refresh()
551550
-- initiate the load balancing policy
552551
self.lb_policy:init(peers)
553552

554-
if self.stream_ids == nil then
555-
-- Initialize the list of available seed ids (only once)
556-
self.stream_ids = {}
557-
local max_id = protocol_version < 3 and 2^7-1 or 2^15-1
558-
for i=1,max_id do
559-
self.stream_ids[i] = i
560-
end
561-
end
562-
563553
-- cluster is ready to be queried
564554
self.init = true
565555

@@ -758,45 +748,8 @@ local function handle_error(self, err, cql_code, coordinator, request)
758748
return nil, err, cql_code
759749
end
760750

761-
local function get_stream_id(self)
762-
local stream_id = 0
763-
764-
local lock = resty_lock:new(self.dict_name, self.lock_opts)
765-
local elapsed, err = lock:lock('stream_id')
766-
if not elapsed then return stream_id, 'failed to acquire stream_id lock: '..err end
767-
768-
if table.getn(self.stream_ids) > 0 then
769-
stream_id = table.remove(self.stream_ids)
770-
else
771-
err = 'Too many inflight requests. No new stream ids available.'
772-
end
773-
774-
local ok, err = lock:unlock()
775-
if not ok then return stream_id, 'failed to unlock: '..err end
776-
777-
return stream_id, nil
778-
end
779-
780-
local function release_stream_id(self, stream_id)
781-
local lock = resty_lock:new(self.dict_name, self.lock_opts)
782-
local elapsed, err = lock:lock('stream_id')
783-
if not elapsed then return stream_id, 'failed to acquire stream_id lock: '..err end
784-
785-
table.insert(self.stream_ids, 1, stream_id)
786-
787-
local ok, err = lock:unlock()
788-
if not ok then return stream_id, 'failed to unlock: '..err end
789-
end
790-
791751
send_request = function(self, coordinator, request)
792-
local err
793-
request.opts.stream_id, err = get_stream_id(self)
794-
if err ~= nil and self.logging then
795-
log(WARN, _log_prefix, err)
796-
end
797-
798752
local res, err, cql_code = coordinator:send(request)
799-
release_stream_id(self, request.opts.stream_id)
800753
if not res then
801754
return handle_error(self, err, cql_code, coordinator, request)
802755
elseif res.warnings and self.logging then

lib/util/deque.lua

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
--[[
2+
Deque implementation, taken from the "Programming in Lua" book.
3+
http://www.lua.org/pil/11.4.html
4+
--]]
5+
6+
local _List = {}
7+
8+
function _List.new ()
9+
return {first = 0, last = -1 }
10+
end
11+
12+
function _List.pushleft (list, value)
13+
local first = list.first - 1
14+
list.first = first
15+
list[first] = value
16+
end
17+
18+
function _List.pushright (list, value)
19+
local last = list.last + 1
20+
list.last = last
21+
list[last] = value
22+
end
23+
24+
function _List.popleft (list)
25+
local first = list.first
26+
if first > list.last then return nil, "list is empty" end
27+
local value = list[first]
28+
list[first] = nil -- to allow garbage collection
29+
list.first = first + 1
30+
return value, nil
31+
end
32+
33+
function _List.popright (list)
34+
local last = list.last
35+
if list.first > last then return nil, "list is empty" end
36+
local value = list[last]
37+
list[last] = nil -- to allow garbage collection
38+
list.last = last - 1
39+
return value, nil
40+
end
41+
42+
function _List.length (list)
43+
return list.last - list.first + 1
44+
end
45+
46+
return _List

lua-cassandra-1.2.3-0.rockspec

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ build = {
2020
["cassandra.auth"] = "lib/cassandra/auth.lua",
2121
["cassandra.socket"] = "lib/cassandra/socket.lua",
2222

23+
["util.deque"] = "lib/util/deque.lua",
24+
2325
["resty.cassandra.cluster"] = "lib/resty/cassandra/cluster.lua",
2426
["resty.cassandra.policies.lb"] = "lib/resty/cassandra/policies/lb/init.lua",
2527
["resty.cassandra.policies.lb.rr"] = "lib/resty/cassandra/policies/lb/rr.lua",

lua-cassandra-dev-0.rockspec

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ build = {
1919
["cassandra.auth"] = "lib/cassandra/auth.lua",
2020
["cassandra.socket"] = "lib/cassandra/socket.lua",
2121

22+
["util.deque"] = "lib/util/deque.lua",
23+
2224
["resty.cassandra.cluster"] = "lib/resty/cassandra/cluster.lua",
2325
["resty.cassandra.policies.lb"] = "lib/resty/cassandra/policies/lb/init.lua",
2426
["resty.cassandra.policies.lb.rr"] = "lib/resty/cassandra/policies/lb/rr.lua",

spec/01-unit/04-host_spec.lua

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
local cassandra = require "cassandra"
2+
local cql = require "cassandra.cql"
3+
local deque = require "util.deque"
4+
5+
describe("_Host", function()
6+
describe("new", function()
7+
it("sets stream_ids to the right length", function()
8+
local host_v2, err = cassandra.new({protocol_version = 2})
9+
assert.is_nil(err)
10+
assert.are.equal(2^7-1, deque.length(host_v2.stream_ids))
11+
12+
local host_v2, err = cassandra.new({protocol_version = 3})
13+
assert.is_nil(err)
14+
assert.are.equal(2^15-1, deque.length(host_v2.stream_ids))
15+
end)
16+
end)
17+
18+
describe("send", function()
19+
20+
local function mock_request()
21+
local r = cql.requests.startup.new()
22+
return mock(r)
23+
end
24+
25+
local function mock_host()
26+
local host, err = cassandra.new()
27+
assert.is_nil(err)
28+
stub(host.sock, "send")
29+
stub(host.sock, "receive")
30+
return host
31+
end
32+
33+
it("sets stream_id without overriding existing opts", function()
34+
local req = mock_request()
35+
local host = mock_host()
36+
req.opts = {custom = "option"}
37+
38+
local _, err = host:send(req)
39+
assert.is_nil(err)
40+
assert.are.same({custom = "option", stream_id = 1}, req.opts)
41+
end)
42+
43+
it("sets stream_id if there are no existing opts", function()
44+
local req = mock_request()
45+
local host = mock_host()
46+
47+
local _, err = host:send(req)
48+
assert.is_nil(err)
49+
assert.are.same({stream_id = 1}, req.opts)
50+
end)
51+
52+
it("doesn't crash if there are no stream_ids left", function()
53+
local req = mock_request()
54+
local host = mock_host()
55+
host.stream_ids["last"] = host.stream_ids["first"] - 1
56+
57+
local _, err = host:send(req)
58+
assert.is_nil(err)
59+
assert.is_nil(req.opts)
60+
end)
61+
62+
it("puts stream_id back if send fails", function()
63+
local req = mock_request()
64+
local host = mock_host()
65+
host.sock.send = function() return nil, "send failure" end
66+
67+
local _, err = host:send(req)
68+
assert.are.equal("send failure", err)
69+
assert.are.equal(1, deque.popright(host.stream_ids))
70+
end)
71+
72+
it("puts stream_id back if receive fails", function()
73+
local req = mock_request()
74+
local host = mock_host()
75+
host.sock.send = function() return true, nil end
76+
host.sock.receive = function() return nil, "receive failure" end
77+
78+
local _, err = host:send(req)
79+
assert.are.equal("receive failure", err)
80+
assert.are.equal(1, deque.popright(host.stream_ids))
81+
end)
82+
83+
it("retries if response stream_id doesn't match", function()
84+
local req = mock_request()
85+
local host = mock_host()
86+
host.sock.send = function() return true, nil end
87+
host.sock.receive = function() return "foobar", nil end
88+
local stream_id = 4
89+
local read_header_count = 0
90+
91+
cql.frame_reader = {
92+
version = function(_) return 3 end,
93+
read_header = function(_)
94+
read_header_count = read_header_count + 1
95+
stream_id = stream_id - 1
96+
return {stream_id = stream_id, body_length = 0}
97+
end,
98+
read_body = function(_, _) return "body" end
99+
}
100+
101+
local res, err = host:send(req)
102+
assert.is_nil(err)
103+
-- The first 2 times the stream_id doesn't match (3 & 2)
104+
-- The third time is 1 so the function should exit correctly
105+
assert.are.equal(3, read_header_count)
106+
-- The stream_id should be back in the deque
107+
assert.are.equal(1, deque.popright(host.stream_ids))
108+
assert.are.equal("body", res)
109+
end)
110+
end)
111+
end)

0 commit comments

Comments
 (0)