From 82caa071e597d6ab2d2eecc2ba1d70112ed29ecf Mon Sep 17 00:00:00 2001 From: Micha Gorelick Date: Tue, 13 Jan 2015 13:30:44 -0500 Subject: [PATCH 1/2] Major optimizations to update We now only perform the update if we either a) already have all the data necissary or b) expect that there should be a change in the data to warrent the update. This is particularly useful when dealing with very large distributions where updates can be very cumbersome. --- goforget/decay.go | 20 ++----------- goforget/distribution.go | 62 +++++++++++++++++++++++++++------------- goforget/forget.go | 17 +++++------ goforget/redis_utils.go | 28 +++++++++++++----- 4 files changed, 75 insertions(+), 52 deletions(-) diff --git a/goforget/decay.go b/goforget/decay.go index f4bb574..95f2e0f 100644 --- a/goforget/decay.go +++ b/goforget/decay.go @@ -7,18 +7,11 @@ import ( "time" ) -var MAX_ITER = 1000 - func Poisson(lambda float64) int { if lambda == 0.0 { return 0 } e := math.Exp(-1.0 * lambda) - if e < 1e-8 { - return math.MaxInt32 - } - - counter := MAX_ITER r := rand.Float64() k := int(0) p := e @@ -26,22 +19,15 @@ func Poisson(lambda float64) int { k += 1 e *= lambda / float64(k) p += e - if counter == 0 { - return -1 - } } return k } -func Decay(count, Z, t int, rate float64) int { - return DecayTime(count, Z, t, rate, time.Now()) +func Decay(Z, t int, rate float64) int { + return DecayTime(Z, t, rate, time.Now()) } -func DecayTime(count, Z, t int, rate float64, now time.Time) int { - if count < 1 { - return 0.0 - } - +func DecayTime(Z, t int, rate float64, now time.Time) int { dt := int(now.Unix()) - t lambda := rate * float64(dt) diff --git a/goforget/distribution.go b/goforget/distribution.go index aea5034..60603fe 100644 --- a/goforget/distribution.go +++ b/goforget/distribution.go @@ -28,12 +28,14 @@ func (vm ValueMap) MarshalJSON() ([]byte, error) { } type Distribution struct { - Name string `json:"distribution"` - Z int `json:"Z"` - T int - Data ValueMap `json:"data"` - Rate float64 `json:"rate"` - Prune bool `json:"prune"` + Name string `json:"distribution"` + Z int `json:"Z"` + T int + Data ValueMap `json:"data"` + Rate float64 `json:"rate"` + Prune bool `json:"prune"` + LastSyncT int `json:"last_sync_time"` + numEntries int isFull bool hasDecayed bool @@ -41,13 +43,15 @@ type Distribution struct { func (d *Distribution) GetNMostProbable(N int) error { data, err := GetNMostProbable(d.Name, N) - if err != nil || len(data) != 3 { + if err != nil || len(data) != 4 { return fmt.Errorf("Could not fetch data for %s: %s", d.Name, err) } - d.Z, _ = redis.Int(data[1], nil) - d.T, _ = redis.Int(data[2], nil) + d.numEntries, _ = redis.Int(data[1], nil) + d.Z, _ = redis.Int(data[2], nil) + d.T, _ = redis.Int(data[3], nil) d.Data = make(map[string]*Value) + d.LastSyncT = d.T d.addMultiBulkCounts(data[0]) return nil @@ -57,15 +61,15 @@ func (d *Distribution) GetField(fields ...string) error { data, err := GetField(d.Name, fields...) N := len(fields) - if err != nil || len(data) != 2+N { + if err != nil || len(data) != 3+N { return fmt.Errorf("Could not retrieve field") } - Z, _ := redis.Int(data[N], nil) - T, _ := redis.Int(data[N+1], nil) + d.numEntries, _ = redis.Int(data[N], nil) + d.Z, _ = redis.Int(data[N+1], nil) + d.T, _ = redis.Int(data[N+2], nil) + d.LastSyncT = d.T - d.Z = Z - d.T = T d.Data = make(map[string]*Value) var count int for i, field := range fields { @@ -91,16 +95,18 @@ func (d *Distribution) Fill() error { log.Printf("Could not read _T from distribution %s: %s", d.Name, err) } d.T = T + d.LastSyncT = d.T // TODO: don't use the dist map to speed things up! d.Data = make(map[string]*Value) d.Rate = *defaultRate + d.numEntries = len(data) d.addMultiBulkCounts(data[1]) + d.isFull = true + d.Normalize() d.calcProbabilities() - - d.isFull = true return nil } @@ -151,10 +157,16 @@ func (d *Distribution) calcProbabilities() { } func (d *Distribution) Decay() { + if len(d.Data) == 0 { + return + } + startingZ := d.Z now := time.Now() - for k, v := range d.Data { - l := DecayTime(v.Count, d.Z, d.T, d.Rate, now) + Z := 0 + sumDecay := 0 + for k, _ := range d.Data { + l := DecayTime(d.Z, d.T, d.Rate, now) if l >= d.Data[k].Count { if d.Prune { l = d.Data[k].Count @@ -162,14 +174,24 @@ func (d *Distribution) Decay() { l = d.Data[k].Count - 1 } } + sumDecay += l d.Data[k].Count -= l - d.Z -= l + Z += d.Data[k].Count + } + if d.isFull { + d.Z = Z + } else { + d.Z -= (sumDecay) / len(d.Data) * d.numEntries + if d.Z < 0 { + d.Z = 0 + } } if !d.hasDecayed && startingZ != d.Z { d.hasDecayed = true } - d.T = int(time.Now().Unix()) + d.T = int(now.Unix()) + d.calcProbabilities() } diff --git a/goforget/forget.go b/goforget/forget.go index 6b17511..1e980bc 100644 --- a/goforget/forget.go +++ b/goforget/forget.go @@ -14,14 +14,15 @@ import ( ) var ( - VERSION = "0.4.5" - showVersion = flag.Bool("version", false, "print version string") - httpAddress = flag.String("http", ":8080", "HTTP service address (e.g., ':8080')") - redisHost = flag.String("redis-host", "", "Redis host in the form host:port:db.") - defaultRate = flag.Float64("default-rate", 0.5, "Default rate to decay distributions with") - nWorkers = flag.Int("nworkers", 1, "Number of update workers that update the redis DB") - pruneDist = flag.Bool("prune", true, "Whether or not to decay distributional fields out") - expirSigma = flag.Float64("expire-sigma", 2, "Confidence level that a distribution will be empty when set to expire") + VERSION = "0.4.6" + showVersion = flag.Bool("version", false, "print version string") + httpAddress = flag.String("http", ":8080", "HTTP service address (e.g., ':8080')") + redisHost = flag.String("redis-host", "", "Redis host in the form host:port:db.") + defaultRate = flag.Float64("default-rate", 0.5, "Default rate to decay distributions with") + nWorkers = flag.Int("nworkers", 1, "Number of update workers that update the redis DB") + UpdateOutputTime = flag.Int("status-time", 60, "Time in seconds between redis update status output") + pruneDist = flag.Bool("prune", true, "Whether or not to decay distributional fields out") + expirSigma = flag.Float64("expire-sigma", 2, "Confidence level that a distribution will be empty when set to expire") ) var updateChan chan *Distribution diff --git a/goforget/redis_utils.go b/goforget/redis_utils.go index 4ff8083..59dd349 100644 --- a/goforget/redis_utils.go +++ b/goforget/redis_utils.go @@ -67,15 +67,27 @@ func (rs *RedisServer) connectPool(maxIdle int) { func UpdateRedis(readChan chan *Distribution, id int) error { var redisConn redis.Conn + var now int + lastStatusTime := int(time.Now().Unix()) + updateCount := 0 for dist := range readChan { - log.Printf("[%d] Updating distribution: %s", id, dist.Name) - - redisConn = redisServer.GetConnection() - err := UpdateDistribution(redisConn, dist) - if err != nil { - log.Printf("[%d] Failed to update: %s: %v: %s", id, dist.Name, redisConn.Err(), err.Error()) + // Only do a update if we have all the data necissary or we expect + // there to be a decay event + now = int(time.Now().Unix()) + if dist.Full() || float64(now-dist.LastSyncT)*dist.Rate > 0.75 { + redisConn = redisServer.GetConnection() + err := UpdateDistribution(redisConn, dist) + if err != nil { + log.Printf("[%d] Failed to update: %s: %v: %s", id, dist.Name, redisConn.Err(), err.Error()) + } + updateCount += 1 + if now-lastStatusTime > *UpdateOutputTime { + rate := float64(updateCount) / float64(now-lastStatusTime) + log.Printf("[%d] Performing redis updates at %e updates/second", id, rate) + lastStatusTime = now + updateCount = 0 + } } - redisConn.Close() } return nil } @@ -146,6 +158,7 @@ func GetField(distribution string, fields ...string) ([]interface{}, error) { for _, field := range fields { rdb.Send("ZSCORE", distribution, field) } + rdb.Send("ZCARD", distribution) rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_Z")) rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T")) data, err := redis.MultiBulk(rdb.Do("EXEC")) @@ -157,6 +170,7 @@ func GetNMostProbable(distribution string, N int) ([]interface{}, error) { rdb.Send("MULTI") rdb.Send("ZREVRANGEBYSCORE", distribution, "+INF", "-INF", "WITHSCORES", "LIMIT", 0, N) + rdb.Send("ZCARD", distribution) rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_Z")) rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T")) data, err := redis.MultiBulk(rdb.Do("EXEC")) From c649ef367e447ef3b15b42b4f087bd6e46c6edc8 Mon Sep 17 00:00:00 2001 From: Micha Gorelick Date: Tue, 13 Jan 2015 13:47:32 -0500 Subject: [PATCH 2/2] Close redis connections and non-existant NewRedisServer() --- goforget/forget.go | 1 - goforget/redis_utils.go | 8 +++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/goforget/forget.go b/goforget/forget.go index 2cf52df..d2e5fea 100644 --- a/goforget/forget.go +++ b/goforget/forget.go @@ -233,7 +233,6 @@ func main() { } rand.Seed(time.Now().UnixNano()) - redisServer = NewRedisServer(*redisHost, *nWorkers*2) if *redisUri != "" { // if a redis URI exists was specified, parse it redisServer = NewRedisServerFromUri(*redisUri) diff --git a/goforget/redis_utils.go b/goforget/redis_utils.go index 2a285eb..2399e5f 100644 --- a/goforget/redis_utils.go +++ b/goforget/redis_utils.go @@ -90,7 +90,7 @@ func (rs *RedisServer) Connect(maxIdle int) { log.Fatal("Could not connect to Redis!") } conn.Close() - + log.Println("Connected to redis") } func (rs *RedisServer) connectPool(maxIdle int) { @@ -143,6 +143,7 @@ func UpdateRedis(readChan chan *Distribution, id int) error { lastStatusTime = now updateCount = 0 } + redisConn.Close() } } return nil @@ -209,6 +210,7 @@ func UpdateDistribution(rconn redis.Conn, dist *Distribution) error { func GetField(distribution string, fields ...string) ([]interface{}, error) { rdb := redisServer.GetConnection() + defer rdb.Close() rdb.Send("MULTI") for _, field := range fields { @@ -223,6 +225,7 @@ func GetField(distribution string, fields ...string) ([]interface{}, error) { func GetNMostProbable(distribution string, N int) ([]interface{}, error) { rdb := redisServer.GetConnection() + defer rdb.Close() rdb.Send("MULTI") rdb.Send("ZREVRANGEBYSCORE", distribution, "+INF", "-INF", "WITHSCORES", "LIMIT", 0, N) @@ -235,6 +238,7 @@ func GetNMostProbable(distribution string, N int) ([]interface{}, error) { func IncrField(distribution string, fields []string, N int) error { rdb := redisServer.GetConnection() + defer rdb.Close() rdb.Send("MULTI") for _, field := range fields { @@ -248,6 +252,7 @@ func IncrField(distribution string, fields []string, N int) error { func GetDistribution(distribution string) ([]interface{}, error) { rdb := redisServer.GetConnection() + defer rdb.Close() rdb.Send("MULTI") rdb.Send("GET", fmt.Sprintf("%s.%s", distribution, "_T")) @@ -258,6 +263,7 @@ func GetDistribution(distribution string) ([]interface{}, error) { func DBSize() (int, error) { rdb := redisServer.GetConnection() + defer rdb.Close() data, err := redis.Int(rdb.Do("DBSIZE")) return data, err