-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmongoqueue.py
More file actions
99 lines (78 loc) · 2.71 KB
/
mongoqueue.py
File metadata and controls
99 lines (78 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# Copyright 2012 Andrey Aleksandrov
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pymongo
from pymongo.errors import OperationFailure, AutoReconnect
from time import time
class Queue(object):
def __init__(self, collection, log):
self.log = log
self.collection = collection
def clear(self):
# Clear the queue.
return self.collection.drop()
def size(self):
# Total size of the queue
return self.collection.count()
def count(self, query = {"_r": {'$exists': True }}):
# By default, the surprising number of reserved tasks in the queue
return self.collection.find(query).count()
def add(self, task = {}, opts = {"_p": int(time()), "_a": 0, "_e" : []}):
task.update(opts)
self.collection.insert(task)
return task
def reserve(self, priority = -1):
if priority == -1:
priority = int(time())
result = None
try:
result = self.collection.find_and_modify(
query = {
"_p": {'$lte': priority},
"_r": {'$exists': False}
},
sort = {"_p": 1},
update = {'$set': {"_r": int(time())}}
)
self.log.debug(result)
except OperationFailure, e:
self.log.debug(e)
except AutoReconnect, e:
self.log.debug(e)
return result
def reschedule(self, task):
return self.collection.update(
{"_id": task["_id"]},
{
'$unset': {"_r": 0},
'$set' : {
"_p": task["_p"],
"_a": int(task["_a"] + 1)
}
}
)
def error(self, task, message):
return self.collection.update(
{"_id": task["_id"]},
{'$push': {"_e": message }}
)
def remove(self, task):
return self.collection.remove({"_id": task["_id"]})
def timeout(self, delay = 120):
cutoff = int(time()) - delay
self.collection.update(
{"_r": {'$lt': cutoff}},
{'$unset':{"_r": 0}},
safe = True,
multi = True
)