From 4b9e4900537b751be2877599719ac6234c63e126 Mon Sep 17 00:00:00 2001 From: Gaurav Lalchandani Date: Fri, 18 Jun 2021 00:13:22 +0530 Subject: [PATCH] Implemented thread pool to handle concurrent connections - Performs poorly than no concurrent threads. --- README.md | 30 +++++++++++++++++++ lilcache/cache.py | 73 ++++++++++++++++++++++++++++------------------- 2 files changed, 73 insertions(+), 30 deletions(-) diff --git a/README.md b/README.md index 94d4e91..d97e17a 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,36 @@ $ python3 tests/benchmarks/performance.py +--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ ``` +### lilcache vs redis + +**Note: lilcache was running with connection pool** +High number of threads is performing worse than no threads. +Seems like Python GIL is biting us here. + +``` +$ python3 tests/benchmarks/performance.py ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| Cache type | Concurrency | Number of operations (get/set/delete) | Mean response time (ms) | Lowest time (ms) | Highest time (ms) | ++==============+===============+=========================================+===========================+====================+=====================+ +| lilcache | 1 | 1000 | 0.0102515 | 0.00786781 | 0.223398 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| redis | 1 | 1000 | 0.0333166 | 0.0283718 | 1.07622 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| lilcache | 5 | 1000 | 0.0693438 | 0.0123978 | 0.691175 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| redis | 5 | 1000 | 0.0532017 | 0.0312328 | 1.34492 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| lilcache | 10 | 1000 | 0.233874 | 0.0140667 | 2.32434 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| redis | 10 | 1000 | 0.0799031 | 0.0324249 | 2.46119 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| lilcache | 20 | 1000 | 0.578124 | 0.0114441 | 8.94666 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +| redis | 20 | 1000 | 0.100958 | 0.0321865 | 4.00949 | ++--------------+---------------+-----------------------------------------+---------------------------+--------------------+---------------------+ +``` + + ## Timeline - [ ] Write unittests diff --git a/lilcache/cache.py b/lilcache/cache.py index f9ce905..d4c096e 100644 --- a/lilcache/cache.py +++ b/lilcache/cache.py @@ -3,6 +3,7 @@ """ import os +import threading import socket import pickle @@ -17,34 +18,11 @@ NONE_VALUE = pickle.dumps(None) RESPONSE_OK = b"OK" RESPONSE_ERROR = b"ERR" +cache = dict() -def _handler(conn, cache_path, snapshot, expires): - pass - - -def is_last(packet): - return packet.endswith(END_LIMIT) - - -def manager(sock_path, cache_path, snapshot, expires, poolsize): - """ - The manager thread. - - poolsize is used to manage the persistent client connections to the - manager. If there are more than desired connections, then we kill - the most infrequently used connections. - - TODO: Later think about snapshot and expires! - """ - cache = dict() - - mgr = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - mgr.bind(sock_path) - mgr.listen() - +def handle_connection(conn, cache_path, snapshot, expires): while True: - conn, addr = mgr.accept() command = conn.recv(BUFFER_SIZE) packet = command while not is_last(command): @@ -62,7 +40,7 @@ def manager(sock_path, cache_path, snapshot, expires, poolsize): cache[key] = value except Exception as e: payload = create_payload(RESPONSE_ERROR, BUFFER_SIZE, - pickle.dumps(e)) + pickle.dumps(e)) else: payload = create_payload(RESPONSE_OK, BUFFER_SIZE) elif args[0] == b'POP': @@ -76,12 +54,47 @@ def manager(sock_path, cache_path, snapshot, expires, poolsize): conn.send(p) +def is_last(packet): + return packet.endswith(END_LIMIT) + + + +def manager(sock_path, cache_path, snapshot, expires, poolsize): + """ + The manager thread. + + poolsize is used to manage the persistent client connections to the + manager. If there are more than desired connections, then we kill + the most infrequently used connections. + + TODO: Later think about snapshot and expires! + """ + mgr = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + mgr.bind(sock_path) + mgr.listen() + + connection_pool = [] + + while True: + conn, addr = mgr.accept() + if len(connection_pool) <= poolsize: + # Handle it in a thread + th = threading.Thread(target=handle_connection, + args=(conn, cache_path, snapshot, expires)) + th.setDaemon(True) + th.start() + connection_pool.append(th) + continue + else: + raise NotImplemented("poolsize should be equal or more than 'expected' " + "concurrency level") + def init( cache_name=None, root=None, snapshot=None, expires=None, - poolsize=10 + poolsize=20 ): if not cache_name: cache_name = generate_cache_name() @@ -97,13 +110,10 @@ def init( if os.path.exists(sock_path): return - import threading mgr = threading.Thread(target=manager, args=(sock_path, cache_path, snapshot, expires, poolsize)) mgr.setDaemon(True) mgr.start() - # manager(sock_path, cache_path, snapshot, expires, poolsize) - return def establish_connection(): @@ -112,10 +122,13 @@ def establish_connection(): We should try to reuse connections (from the connection pool) """ + if "connection" in STATE: + return STATE["connection"] sock_path = STATE["sock_path"] if os.path.exists(sock_path): conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) conn.connect(sock_path) + STATE["connection"] = conn return conn