diff --git a/.gitignore b/.gitignore index 11c9110..12f423e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ **/*.xlog **/*.snap +.DS_Store \ No newline at end of file diff --git a/examples/example-locks/init.lua b/examples/example-locks/init.lua new file mode 100644 index 0000000..b378126 --- /dev/null +++ b/examples/example-locks/init.lua @@ -0,0 +1,59 @@ +box.cfg{ + -- listen = '127.0.0.1:4221' +} +-- box.once('access:v1', function() +-- box.schema.user.grant('guest', 'read,write,execute', 'universe') +-- end) + +require 'strict'.on() + +-- box.once('schema',function() + local format = { + {name = "id", type = "string"}, + {name = "status", type = "string"}, + {name = "lock", type = "number", nullable=true}, + + {name = "action", type = "string"}, + {name = "attr", type = "*"}, + {name = "runat", type = "number"}, + } + box.schema.space.create('tasks', { if_not_exists = true, format = format }) + + box.space.tasks:create_index('primary', { unique = true, parts = {'id'}, if_not_exists = true}) + box.space.tasks:create_index('xq', { unique = false, parts = { 'status', 'id' }, if_not_exists = true}) + box.space.tasks:create_index('lock', { unique = false, parts = { 'lock', 'status', 'id' }, if_not_exists = true}) + box.space.tasks:create_index('runat', { unique = false, parts = { 'runat', 'id' }, if_not_exists = true}) +-- end) + +if not package.path:match('%.%./%?%.lua;') then + package.path = '../?.lua;' .. package.path +end + +require 'xqueue' ( box.space.tasks, { + debug = true; + fields = { + status = 'status'; + lock = 'lock'; + runat = 'runat'; + }; + features = { + id = 'uuid', + delayed = true, + ttl = true, + }; + workers = 0; + worker = function(task) + print(require'fiber'.self().id(), "got task",task.id, require'yaml'.encode(box.space.tasks:select())) + require'fiber'.sleep(10) + end; +} ) + + +print(require'yaml'.encode(box.space.tasks:select())) +-- for i = 1, 5 do +-- local t = box.space.tasks:put{ lock=1, action = "doit", attr = {} } +-- print('inserted', t.id, t.status) +-- end + +require'console'.start() +os.exit() diff --git a/example-putwait/init.lua b/examples/example-putwait/init.lua similarity index 100% rename from example-putwait/init.lua rename to examples/example-putwait/init.lua diff --git a/example-runat/init.lua b/examples/example-runat/init.lua similarity index 100% rename from example-runat/init.lua rename to examples/example-runat/init.lua diff --git a/example-tube/init.lua b/examples/example-tube/init.lua similarity index 100% rename from example-tube/init.lua rename to examples/example-tube/init.lua diff --git a/example-worker/init.lua b/examples/example-worker/init.lua similarity index 100% rename from example-worker/init.lua rename to examples/example-worker/init.lua diff --git a/example1/init.lua b/examples/example1/init.lua similarity index 100% rename from example1/init.lua rename to examples/example1/init.lua diff --git a/rockspecs/xqueue-scm-6.rockspec b/rockspecs/xqueue-scm-6.rockspec new file mode 100644 index 0000000..a118904 --- /dev/null +++ b/rockspecs/xqueue-scm-6.rockspec @@ -0,0 +1,20 @@ +package = "xqueue" +version = "scm-6" +source = { + url = "git://github.com/ktsstudio/xqueue.git", + branch = "locks", +} +description = { + summary = "Package for loading external lua config", + homepage = "https://github.com/moonlibs/xqueue.git", + license = "BSD" +} +dependencies = { + "lua ~> 5.1" +} +build = { + type = "builtin", + modules = { + xqueue = "xqueue.lua" + } +} diff --git a/xqueue.lua b/xqueue.lua index 5911e61..31dfe4a 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -84,6 +84,9 @@ Status: D - done - task was processed and ack'ed and permanently left in database enabled when keep feature is set + + L - locked - task is locked via `lock' index. Tasks will be processed only + if there are no older tasks by `lock` index X - reserved for statistics @@ -102,6 +105,7 @@ Interface: status = 'status_field_name' | status_field_no, runat = 'runat_field_name' | runat_field_no, priority = 'priority_field_name' | priority_field_no, + lock = 'lock_field_name' | lock_field_no, }, features = { id = 'auto_increment' | 'uuid' | 'required' | function @@ -243,6 +247,7 @@ function M.upgrade(space,opts,depth) self.taken = space.xq.taken self._stat = space.xq._stat self.bysid = space.xq.bysid + self.sid_take_fids = space.xq.sid_take_fids self._lock = space.xq._lock self.take_wait = space.xq.take_wait self.take_chans = space.xq.take_chans or setmetatable({}, { __mode = 'v' }) @@ -252,6 +257,7 @@ function M.upgrade(space,opts,depth) else self.taken = {} self.bysid = {} + self.sid_take_fids = {} -- byfid = {}; self._lock = {} self.put_wait = setmetatable({}, { __mode = 'v' }) @@ -300,7 +306,7 @@ function M.upgrade(space,opts,depth) -- 1. fields check local fields = {} local fieldmap = {} - for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"}}) do + for _,f in pairs({{"status","str"},{"runat","num"},{"priority","num"},{"tube","str"},{"lock","*"}}) do local fname,ftype = f[1], f[2] local num = opts.fields[fname] if num then @@ -320,7 +326,7 @@ function M.upgrade(space,opts,depth) error(string.format("wrong type %s for field %s, number or string required",type(num),fname),2 + depth) end -- check type - if format[num] then + if format[num] and ftype ~= "*" then if not typeeq(format[num].type, ftype) then error(string.format("type mismatch for field %s, required %s, got %s",fname, ftype, format[fname].type),2+depth) end @@ -457,6 +463,23 @@ function M.upgrade(space,opts,depth) end end end + + if fields.lock then + local lock_index + for _,index in pairs(space.index) do + if type(_) == 'number' and #index.parts >= 2 then + if index.parts[1].fieldno == fields.lock and index.parts[2].fieldno == fields.status then + -- print("found lock index", index.name) + lock_index = index + break + end + end + end + if not lock_index then + error(string.format("fields.lock requires tree index with at least fields (`lock', `status', ...)"), 2+depth) + end + self.lock_index = lock_index + end end local runat_index @@ -626,6 +649,10 @@ function M.upgrade(space,opts,depth) features.tube = true end + if fields.lock then + features.lockable = true + end + self.gen_id = gen_id self.features = features self.space = space.id @@ -649,6 +676,50 @@ function M.upgrade(space,opts,depth) return t[pkf.no] end + function self:register_take_fid() + local sid = box.session.id() + local fid = fiber.id() + if self.sid_take_fids[sid] == nil then + self.sid_take_fids[sid] = {} + end + self.sid_take_fids[sid][fid] = true + end + + function self:deregister_take_fid() + local sid = box.session.id() + local fid = fiber.id() + if self.sid_take_fids[sid] == nil then + return + end + + self.sid_take_fids[sid][fid] = nil + end + + function self:wakeup_locked_task(t) + local locker_t = self.lock_index:select({ t[ self.fieldmap.lock ], 'L' }, {limit=1})[1] + if locker_t ~= nil then + locker_t = box.space[self.space]:update({ locker_t[ self.key.no ] }, { + { '=', self.fields.status, 'R' } + }) + log.info("Unlock: L->R {%s} (by %s) from %s/sid=%s/fid=%s", + self:getkey(locker_t), self:getkey(t), box.session.storage.peer, box.session.id(), fiber.id() ) + self:wakeup(locker_t) + end + return locker_t + end + + function self:find_locker_task(lock_value) + local locker_t + for _, status in ipairs({'L', 'T', 'R'}) do + locker_t = self.lock_index:select({ lock_value, status }, {limit=1})[1] + if locker_t ~= nil then + -- there is a task that should be processed first + return locker_t + end + end + return nil + end + -- Notify producer if it is still waiting us. -- Producer waits only for successfully processed task -- or for task which would never be processed. @@ -709,7 +780,7 @@ function M.upgrade(space,opts,depth) if xq.ready then xq.ready:get() end log.info("I am worker %s",i) if box.info.ro then - log.notice("Shutting down on ro instance") + log.info("Shutting down on ro instance") return end while box.space[space.name] and space.xq == xq do @@ -747,7 +818,7 @@ function M.upgrade(space,opts,depth) local chan = xq.runat_chan log.info("Runat started") if box.info.ro then - log.notice("Shutting down on ro instance") + log.info("Shutting down on ro instance") return end local maxrun = 1000 @@ -769,25 +840,35 @@ function M.upgrade(space,opts,depth) if #collect >= maxrun then remaining = 0 break end end + local status for _,t in ipairs(collect) do -- log.info("Runat: %s, %s", _, t) - if t[xq.fields.status] == 'W' then - log.info("Runat: W->R %s",xq:keyfield(t)) + status = t[ xq.fields.status ] + + if status == 'W' then + local target_status = 'R' + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then + -- check if we need to set status L or R by looking up locker task + if xq:find_locker_task(t[ xq.fieldmap.lock ]) ~= nil then + target_status = 'L' + end + end + log.info("Runat: W->%s %s",target_status,xq:keyfield(t)) -- TODO: default ttl? local u = space:update({ xq:keyfield(t) },{ - { '=',xq.fields.status,'R' }, + { '=',xq.fields.status,target_status }, { '=',xq.fields.runat, xq.NEVER } }) xq:wakeup(u) - elseif t[xq.fields.status] == 'R' and xq.features.ttl then + elseif (status == 'R' or status == 'L') and xq.features.ttl then local key = xq:keyfield(t) log.info("Runat: Kill R by ttl %s (%+0.2fs)", key, fiber.time() - t[ xq.fields.runat ]) t = space:delete{key} notify_producer(key, t) - elseif t[xq.fields.status] == 'Z' and xq.features.zombie then + elseif status == 'Z' and xq.features.zombie then log.info("Runat: Kill Zombie %s",xq:keyfield(t)) space:delete{ xq:keyfield(t) } - elseif t[xq.fields.status] == 'T' and xq.features.ttr then + elseif status == 'T' and xq.features.ttr then local key = xq:keypack(t) local sid = xq.taken[ key ] local peer = peers[sid] or sid @@ -803,7 +884,7 @@ function M.upgrade(space,opts,depth) end xq:wakeup(u) else - log.error("Runat: unsupported status %s for %s",t[xq.fields.status], tostring(t)) + log.error("Runat: unsupported status %s for %s",status, tostring(t)) space:update({ xq:keyfield(t) },{ { '=',xq.fields.runat, xq.NEVER } }) @@ -967,6 +1048,15 @@ function M.upgrade(space,opts,depth) end self.bysid[sid] = nil end + + if self.sid_take_fids[sid] then + for fid in pairs(self.sid_take_fids[sid]) do + log.info('Killing take fiber %d in sid %d', fid, sid) + fiber.kill(fid) + end + + self.sid_take_fids[sid] = nil + end end, self._on_dis) rawset(space,'xq',self) @@ -1104,6 +1194,14 @@ function methods:put(t, opts) t[ xq.fieldmap.status ] = 'R' end + -- check lock index + if xq.features.lockable and t[ xq.fieldmap.status ] == 'R' and t[ xq.fieldmap.lock ] ~= nil then + -- check if we need to set status L or R by looking up tasks in L, R or T states + if xq:find_locker_task(t[ xq.fieldmap.lock ]) ~= nil then + t[ xq.fieldmap.status ] = 'L' + end + end + local tuple = xq.tuple(t) local key = tuple[ xq.key.no ] @@ -1142,6 +1240,7 @@ local wait_for = { R = true, T = true, W = true, + L = true, } function methods:wait(key, timeout) @@ -1228,6 +1327,7 @@ function methods:take(timeout, opts) start_with = {'R'} end + local wait_chan = (tube_chan or xq.take_wait) local now = fiber.time() local key local found @@ -1244,8 +1344,14 @@ function methods:take(timeout, opts) if not found then local left = (now + timeout) - fiber.time() if left <= 0 then goto finish end - - (tube_chan or xq.take_wait):get(left) + + self.xq:register_take_fid() + local ok, r = pcall(wait_chan.get, wait_chan, left) + self.xq:deregister_take_fid() + if not ok then + log.info('take finished abruptly: %s', r) + goto finish + end if box.session.storage.destroyed then goto finish end end end @@ -1415,6 +1521,11 @@ function methods:ack(key, attr) log.info("Ack: %s->delete {%s} +%s from %s/sid=%s/fid=%s", old, key, attr.delay, box.session.storage.peer, box.session.id(), fiber.id() ) end + + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then + -- find any task that is locked + xq:wakeup_locked_task(t) + end end) xq:putback(t) -- in real drop form taken key @@ -1443,6 +1554,10 @@ function methods:bury(key, attr) xq.runat_chan:put(true,0) end log.info("Bury {%s} by %s, sid=%s, fid=%s", key, box.session.storage.peer, box.session.id(), fiber.id()) + + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then + xq:wakeup_locked_task(t) + end end) xq:putback(t) @@ -1497,6 +1612,10 @@ function methods:kill(key) end xq.taken[key] = nil xq._lock[key] = nil + + if xq.features.lockable and t[ xq.fieldmap.lock ] ~= nil then + xq:wakeup_locked_task(task) + end end end @@ -1527,6 +1646,7 @@ local pretty_st = { B = "Buried", Z = "Zombie", D = "Done", + L = "Locked", } function methods:stats(pretty)