Skip to content
This repository was archived by the owner on Aug 23, 2022. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.sw[op]
*.rdb
pyforget/build/
pyforget/dist/
pyforget/*.egg-info/
1 change: 1 addition & 0 deletions pyforget/MANIFEST.in
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include readme.md
Empty file.
62 changes: 33 additions & 29 deletions pyforget/distribution.py → pyforget/forgettable/distribution.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import numpy as np
import logging
import time
import redis
import os

r = redis.StrictRedis(
'localhost',
port=6379,
db=2
)

def interleave_izip(*iterables):
# interleave_izip('ABCD', 'xy') --> A x B y
Expand All @@ -16,57 +11,67 @@ def interleave_izip(*iterables):
for i in iterators:
yield i.next()


class Distribution(object):
def __init__(self,k):

def __init__(self, k, redis_client=None, rate=0.02):
self.k = k
self.rate = rate
if not redis_client:
redis_client = redis.StrictRedis(
os.environ['REDIS_1_PORT_6379_TCP_ADDR'],
port=6379,
db=1
)
self.redis = redis_client

def decay(self, rate=0.02):
def decay(self):
"""
returns the amount to decay each bin by
"""
t = int(time.time())
tau = t-self.last_updated
rates = [v * rate * tau for v in self.values]
tau = t - self.last_updated
rates = [v * self.rate * tau for v in self.values]
y = np.random.poisson(rates)
return y,t
return y, t

def incr(self,bin):
def incr(self, bin):
"""
on an event, update the sorted set and the normalizing constant
"""
r.zincrby(self.k, bin)
a = r.incr(self.k+"_z")
self.redis.zincrby(self.k, bin)
a = self.redis.incr(self.k + "_z")
if a == 1:
# this catches the situtation where we've never seen the
# this catches the situtation where we've never seen the
# the key before, setting t to the time of the initial write
r.set(self.k+'_t', int(time.time()))
self.redis.set(self.k + '_t', int(time.time()))

def __str__(self):
return str(dict(zip(self.keys,self.values)))
return str(dict(zip(self.keys, self.values)))

def decrement(self):
# check this distribution exists to decrement
if not r.exists(self.k):
if not self.redis.exists(self.k):
raise KeyError('Cannot find distribution in Redis')
# get the currently stored data
self.keys, self.values = zip(*r.zrevrange(self.k,0,-1,withscores=True))
self.z = r.get(self.k+"_z")
self.keys, self.values = zip(*self.redis.zrevrange(self.k, 0, -1, withscores=True))
self.z = self.redis.get(self.k + "_z")
self.n = len(self.values)
self.last_updated = int(r.get(self.k+"_t"))
self.last_updated = int(self.redis.get(self.k + "_t"))
# get the amount to decay by
y,t = self.decay()
y, t = self.decay()
# decay values by y
self.values -= y
self.values[self.values <= 0] = 1
# normalizing constant
self.z = int(self.values.sum())
# build multi call
pipeline = r.pipeline()
pipeline.watch(self.k, self.k+'_t', self.k+'_z')
pipeline = self.redis.pipeline()
pipeline.watch(self.k, self.k + '_t', self.k + '_z')
pipeline.multi()
pipeline.zadd(self.k, *interleave_izip(self.values, self.keys))
pipeline.set(self.k+'_t', t)
pipeline.set(self.k+'_z', self.z)
pipeline.zadd(self.k, *interleave_izip(self.values, self.keys))
pipeline.set(self.k + '_t', t)
pipeline.set(self.k + '_z', self.z)
try:
# try to excute
pipeline.execute()
Expand All @@ -75,7 +80,7 @@ def decrement(self):

def get_dist(self):
self.decrement()
normalised = dict([(k, v/self.z) for k,v in zip(self.keys, self.values)])
normalised = dict([(k, v / self.z) for k, v in zip(self.keys, self.values)])
return normalised

def get_bin(self, bin):
Expand All @@ -85,4 +90,3 @@ def get_bin(self, bin):
except ValueError:
raise ValueError('bin not in distribution')
return out

49 changes: 33 additions & 16 deletions pyforget/forget_table.py → pyforget/forgettable/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
import tornado.web
import tornado.httpserver
import tornado.ioloop
from distribution import Distribution

from .distribution import Distribution


class Application(tornado.web.Application):

def __init__(self):

app_settings = {
'debug': True,
"autoescape" : None,
"autoescape": None,
}

handlers = [
Expand All @@ -20,65 +23,79 @@ def __init__(self):
]
tornado.web.Application.__init__(self, handlers, **app_settings)


class PingHandler(tornado.web.RequestHandler):

def get(self):
self.finish('OK')

def head(self):
self.finish('OK')


class IncrHandler(tornado.web.RequestHandler):

def get(self):
key = self.get_argument('key')
bin = self.get_argument('bin')
Distribution(key).incr(bin)


class GetHandler(tornado.web.RequestHandler):

def get(self):
key = self.get_argument('key')
bin = self.get_argument('bin')
try:
self.finish({
"status_code":200,
"data":[{
"status_code": 200,
"data": [{
"bin": bin,
"probability": Distribution(key).get_bin(bin)
}]
})
except ValueError:
self.finish({
"status_code":404,
"data":[],
"status_code": 404,
"data": [],
"error_message": "Could not find bin in distribution"
})
except KeyError:
self.finish({
"status_code":404,
"data":[],
"status_code": 404,
"data": [],
"error_message": "Could not find distribution in Forget Table"
})


class DistHandler(tornado.web.RequestHandler):

def get(self):
key = self.get_argument('key')
try:
dist = Distribution(key).get_dist()
except KeyError:
return self.finish({
"status_code":404,
"data":[],
"status_code": 404,
"data": [],
"error_message": "Could not find distribution in Forget Table"
})
return self.finish({
"status_code":200,
"data":[{
"bin":key,
"probability":value
} for key,value in dist.iteritems()]
"status_code": 200,
"data": [{
"bin": key,
"probability": value
} for key, value in dist.iteritems()]
})

if __name__ == "__main__":

def main():
tornado.options.define("port", default=8000, help="Listen on port", type=int)
tornado.options.parse_command_line()
http_server = tornado.httpserver.HTTPServer(request_callback=Application())
http_server.listen(tornado.options.options.port, address="0.0.0.0")
tornado.ioloop.IOLoop.instance().start()


if __name__ == "__main__":
main()
4 changes: 3 additions & 1 deletion pyforget/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

Written by [Mike Dewar](http://twitter.com/mikedewar) and [Micha Gorelick](http://micha.gd/).

To start the service run `python forget-table.py --port=8080` which will start the wrapper. Note that you will need a Redis database running locally on port 6379. Forget Table will write into db 2 by default.
To install run `pip install forgettable`.

To start the service run `forgettable --port=8080` which will start the wrapper. Note that you will need a Redis database running locally on port 6379. Forget Table will write into db 2 by default.

Upon recieving an event, to increment a bin in a distribution call

Expand Down
23 changes: 23 additions & 0 deletions pyforget/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from setuptools import setup, find_packages


setup(
name="forgettable",
version="0.1.0",
packages=find_packages(),
entry_points={
'console_scripts': [
'forgettable=forgettable.server:main'
]
},
install_requires=[
'tornado',
'numpy',
'redis',
],

long_description=open('readme.md').read(),
url="https://github.com/bitly/forgettable/tree/master/pyforget",
maintainer="Saul Shanabrook",
maintainer_email="s.shanabrook@gmail.com",
)