From d598c682e27ab0d3a160dba1c2f69fd62cc31f14 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 4 May 2022 18:38:51 +0300 Subject: [PATCH 01/16] slightly refactor --- python/example_sender.py | 17 ++++++++------- python/gs_netstream/__init__.py | 4 ++++ python/gs_netstream/common.py | 37 ++++++++++++++++----------------- python/gs_netstream/sender.py | 26 +++++++++++------------ python/gs_netstream/varint.py | 20 ++++++++++-------- 5 files changed, 55 insertions(+), 49 deletions(-) diff --git a/python/example_sender.py b/python/example_sender.py index e3d261f..c98140f 100644 --- a/python/example_sender.py +++ b/python/example_sender.py @@ -1,10 +1,10 @@ import logging -from gs_netstream.sender import NetStreamProxyGraph, NetStreamSender +from gs_netstream import sender as sdr logging.basicConfig(level=logging.DEBUG) -sender = NetStreamSender(2012) -proxy = NetStreamProxyGraph(sender) +sender = sdr.NetStreamSender(2012) +proxy = sdr.NetStreamProxyGraph(sender) style = "node{fill-mode:plain;fill-color:gray;size:1px;}" proxy.add_attribute("stylesheet", style) @@ -12,8 +12,9 @@ proxy.add_attribute("ui.antialias", True) proxy.add_attribute("layout.stabilization-limit", 0) -for i in range(0,500): - proxy.add_node(str(i)) - if i > 0: - proxy.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False) - proxy.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False) +if __name__ == '__main__': + for i in range(0,500): + proxy.add_node(str(i)) + if i > 0: + proxy.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False) + proxy.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False) diff --git a/python/gs_netstream/__init__.py b/python/gs_netstream/__init__.py index e69de29..57596be 100644 --- a/python/gs_netstream/__init__.py +++ b/python/gs_netstream/__init__.py @@ -0,0 +1,4 @@ +from . import common +from .constants import * +from . import sender +from . import varint diff --git a/python/gs_netstream/common.py b/python/gs_netstream/common.py index 5beaa1f..6ec1482 100644 --- a/python/gs_netstream/common.py +++ b/python/gs_netstream/common.py @@ -7,52 +7,51 @@ Copyright (c) 2011 University of Luxembourg. All rights reserved. """ -import sys -import os -class AttributeSink(object): +class AttributeSink: def graph_attribute_added(self, source_id, time_id, attribute, value): - raise NotImplementedError + raise NotImplemented def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value): - raise NotImplementedError + raise NotImplemented def graph_attribute_removed(self, source_id, time_id, attribute): - raise NotImplementedError + raise NotImplemented def node_attribute_added(self, source_id, time_id, node_id, attribute, value): - raise NotImplementedError + raise NotImplemented def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value): - raise NotImplementedError + raise NotImplemented def node_attribute_removed(self, source_id, time_id, node_id, attribute): - raise NotImplementedError + raise NotImplemented def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): - raise NotImplementedError + raise NotImplemented def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value): - raise NotImplementedError + raise NotImplemented def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): - raise NotImplementedError + raise NotImplemented -class ElementSink(object): + +class ElementSink: def node_added(self, source_id, time_id, node_id): - raise NotImplementedError + raise NotImplemented def node_removed(self, source_id, time_id, node_id): - raise NotImplementedError + raise NotImplemented def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): - raise NotImplementedError + raise NotImplemented def edge_removed(self, source_id, time_id, edge_id): - raise NotImplementedError + raise NotImplemented def step_begun(self, source_id, time_id, timestamp): - raise NotImplementedError + raise NotImplemented def graph_cleared(self, source_id, time_id): - raise NotImplementedError + raise NotImplemented diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index 656c821..221e9ea 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -14,14 +14,16 @@ import socket import struct -import varint +from . import varint +from .constants import * import logging from random import random -from common import AttributeSink, ElementSink -from constants import * +from .common import AttributeSink, ElementSink -class DefaultNetStreamTransport(object): + +class DefaultNetStreamTransport: """Default transport class using TCP/IP networking.""" + def __init__(self, host, port): """Initialize using host and port.""" self.host = host @@ -52,6 +54,7 @@ def close(self): self.socket = None logging.info("disconnected from remote server") + class NetStreamSender(AttributeSink, ElementSink): """One client must send to only one identified stream (streamID, host, port)""" @@ -88,7 +91,7 @@ def send(self, event): packet.extend(self.stream_buff) packet.extend(event) buff = bytearray() - buff.extend(struct.pack("!i", len(packet))) # fixed 4-bytes size! + buff.extend(struct.pack("!i", len(packet))) # fixed 4-bytes size! buff.extend(packet) self.transport.send(buff) @@ -110,15 +113,11 @@ def get_type(self, value): if is_array: return TYPE_INT_ARRAY return TYPE_INT - elif isinstance(value, long): - if is_array: - return TYPE_LONG_ARRAY - return TYPE_LONG elif isinstance(value, float): if is_array: return TYPE_DOUBLE_ARRAY return TYPE_DOUBLE - elif isinstance(value, str) or isinstance(value, unicode): + elif isinstance(value, str): return TYPE_STRING elif isinstance(value, dict): raise NotImplementedError("dicts are not supported") @@ -179,11 +178,11 @@ def encode_int_array(self, value): def encode_long(self, value): """Encode a long type.""" - return self.encode_int(value) # same as int for now + return self.encode_int(value) # same as int for now def encode_long_array(self, value): """Encode an array of long values.""" - return self.encode_int_array(value) # same as int_array for now + return self.encode_int_array(value) # same as int_array for now def encode_double(self, value): """Encode a double type.""" @@ -509,7 +508,8 @@ def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): "attribute": attribute }) -class NetStreamProxyGraph(): + +class NetStreamProxyGraph: """ This is a utility class that handles 'source id' and 'time id' synchronization tokens. It proposes utile classes that allow to directly send events through the network pipe. diff --git a/python/gs_netstream/varint.py b/python/gs_netstream/varint.py index 7d3b03e..a16c400 100644 --- a/python/gs_netstream/varint.py +++ b/python/gs_netstream/varint.py @@ -2,33 +2,35 @@ """Implementation for encoding unsigned varints.""" + def encoding_size(value): """Computes the encoding size of a value.""" - if value < (1L << 7): + if value < (1 << 7): return 1 - if value < (1L << 14): + if value < (1 << 14): return 2 - if value < (1L << 21): + if value < (1 << 21): return 3 - if value < (1L << 28): + if value < (1 << 28): return 4 - if value < (1L << 35): + if value < (1 << 35): return 5 - if value < (1L << 42): + if value < (1 << 42): return 6 - if value < (1L << 49): + if value < (1 << 49): return 7 - if value < (1L << 56): + if value < (1 << 56): return 8 return 9 + def encode_unsigned(value): """Encodes a Python integer into its varint representation.""" if not isinstance(value, int) or value < 0: raise TypeError("value argument is not an integer or is negative") size = encoding_size(value) buff = bytearray(size) - for i in xrange(size): + for i in range(size): head = 128 if i == size - 1: head = 0 From 58b6b2cecaf962684d05e868f4dace0403c0d9b8 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 09:05:48 +0300 Subject: [PATCH 02/16] convert sinks into abstractclasses --- python/gs_netstream/common.py | 50 +++++++++++++++++++++++------------ 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/python/gs_netstream/common.py b/python/gs_netstream/common.py index 6ec1482..2bb8e9c 100644 --- a/python/gs_netstream/common.py +++ b/python/gs_netstream/common.py @@ -6,52 +6,68 @@ Created by Yoann Pigné on 2011-08-21. Copyright (c) 2011 University of Luxembourg. All rights reserved. """ +from abc import ABC, abstractmethod -class AttributeSink: +class AttributeSink(ABC): + @abstractmethod def graph_attribute_added(self, source_id, time_id, attribute, value): - raise NotImplemented + pass + @abstractmethod def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value): - raise NotImplemented + pass + @abstractmethod def graph_attribute_removed(self, source_id, time_id, attribute): - raise NotImplemented + pass + @abstractmethod def node_attribute_added(self, source_id, time_id, node_id, attribute, value): - raise NotImplemented + pass + @abstractmethod def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value): - raise NotImplemented + pass + @abstractmethod def node_attribute_removed(self, source_id, time_id, node_id, attribute): - raise NotImplemented + pass + @abstractmethod def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): - raise NotImplemented + pass + @abstractmethod def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value): - raise NotImplemented + pass + @abstractmethod def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): - raise NotImplemented + pass -class ElementSink: +class ElementSink(ABC): + @abstractmethod def node_added(self, source_id, time_id, node_id): - raise NotImplemented + pass + @abstractmethod def node_removed(self, source_id, time_id, node_id): - raise NotImplemented + pass + @abstractmethod def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): - raise NotImplemented + pass + @abstractmethod def edge_removed(self, source_id, time_id, edge_id): - raise NotImplemented + pass + @abstractmethod def step_begun(self, source_id, time_id, timestamp): - raise NotImplemented + pass + @abstractmethod def graph_cleared(self, source_id, time_id): - raise NotImplemented + pass From 6ba31f9b2e9875fa3c157aabb05fe9d9cdaa35ab Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 09:10:20 +0300 Subject: [PATCH 03/16] unnecessary imports in init --- python/gs_netstream/__init__.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/gs_netstream/__init__.py b/python/gs_netstream/__init__.py index 57596be..e69de29 100644 --- a/python/gs_netstream/__init__.py +++ b/python/gs_netstream/__init__.py @@ -1,4 +0,0 @@ -from . import common -from .constants import * -from . import sender -from . import varint From 94b51e2f187e039c463448d3df9e1a75d7708c03 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 10:17:57 +0300 Subject: [PATCH 04/16] refactor getting encoding size --- python/gs_netstream/constants.py | 7 +++++++ python/gs_netstream/varint.py | 24 +++++------------------- 2 files changed, 12 insertions(+), 19 deletions(-) diff --git a/python/gs_netstream/constants.py b/python/gs_netstream/constants.py index 1fc5489..4912414 100644 --- a/python/gs_netstream/constants.py +++ b/python/gs_netstream/constants.py @@ -203,3 +203,10 @@ themselves. The elements themselves have to give their type. """ TYPE_ARRAY = 0x60 + + +######################### +# Utils constants +######################### +import numpy as np +ENCODING_SIZES = np.array([1 << 7, 1 << 14, 1 << 21, 1 << 28, 1 << 35, 1 << 42, 1 << 49, 1 << 56]) diff --git a/python/gs_netstream/varint.py b/python/gs_netstream/varint.py index a16c400..1ab88ce 100644 --- a/python/gs_netstream/varint.py +++ b/python/gs_netstream/varint.py @@ -1,27 +1,13 @@ #!/usr/bin/env python - +import numpy as np +from .constants import ENCODING_SIZES """Implementation for encoding unsigned varints.""" -def encoding_size(value): +def encoding_size(value: int): """Computes the encoding size of a value.""" - if value < (1 << 7): - return 1 - if value < (1 << 14): - return 2 - if value < (1 << 21): - return 3 - if value < (1 << 28): - return 4 - if value < (1 << 35): - return 5 - if value < (1 << 42): - return 6 - if value < (1 << 49): - return 7 - if value < (1 << 56): - return 8 - return 9 + dist = (ENCODING_SIZES - value) <= 0 + return 9 if not np.all(dist) else np.argmin(dist) + 1 def encode_unsigned(value): From 0b207613a6120d7f0609fdd06cd9b4e3a3ad9919 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 11:11:25 +0300 Subject: [PATCH 05/16] refactor sender encoders --- python/gs_netstream/sender_utils.py | 107 ++++++++++++++++++++++++++++ python/gs_netstream/varint.py | 24 ------- 2 files changed, 107 insertions(+), 24 deletions(-) create mode 100644 python/gs_netstream/sender_utils.py delete mode 100644 python/gs_netstream/varint.py diff --git a/python/gs_netstream/sender_utils.py b/python/gs_netstream/sender_utils.py new file mode 100644 index 0000000..8c25cd6 --- /dev/null +++ b/python/gs_netstream/sender_utils.py @@ -0,0 +1,107 @@ +"""Sender encoding utils""" +import struct +from itertools import chain +from typing import List, Callable + +from .constants import * +import numpy as np + + +def encoding_size(value: int): + """Computes the encoding size of a value.""" + dist = (ENCODING_SIZES - value) <= 0 + return 9 if not np.all(dist) else np.argmin(dist) + 1 + + +def encode_unsigned(value): + """Encodes a Python integer into its varint representation.""" + assert isinstance(value, int) and value >= 0, f"Value argument is not an integer or is negative = {value}" + size = encoding_size(value) + buff = bytearray(size) + for i in range(size): + head = 128 + if i == size - 1: + head = 0 + buff[i] = (((value >> (7 * i)) & 127) ^ head) & 255 + return buff + + +TYPES_CONVERTER = { + 'bool': (TYPE_BOOLEAN, TYPE_BOOLEAN_ARRAY), + 'int': (TYPE_INT, TYPE_INT_ARRAY), + 'float': (TYPE_DOUBLE, TYPE_DOUBLE_ARRAY), + 'str': (TYPE_STRING, TYPE_STRING) +} + + +def get_type(value): + """Get the data type for a given value.""" + is_array = isinstance(value, list) + value_type_str = type(value[0] if is_array else value).__name__ + netstream_type = value_type_str.get(value_type_str, None) + if netstream_type is None: + raise NotImplementedError("dicts are not supported") + + type_pos = int(is_array) + return netstream_type[type_pos] + + +def encode_array(values_array: List, single_value_type, encoding_method: Callable) -> bytearray: + assert isinstance(values_array, list) and all([isinstance(v, single_value_type) for v in values_array]), \ + f"Values_array should be an array with values of type {single_value_type}, but values array = {values_array}" + return bytearray(chain(encode_unsigned(len(values_array)), *[encoding_method(elem) for elem in values_array])) + + +def encode_boolean(value): + """Encode a boolean type.""" + return bytearray([value & 1]) + + +def encode_boolean_array(value: List[bool]): + """Encode an array of boolean values.""" + return encode_array(value, bool, encode_boolean) + + +def encode_int_array(value): + """Encode an array of integer values.""" + return encode_array(value, int, encode_unsigned) + + +def encode_double(value): + """Encode a double type.""" + return bytearray(struct.pack("!d", value)) + + +def encode_double_array(value): + """Encode an array of double values.""" + return encode_array(value, float, encode_double) + + +def encode_string(string): + """Encode a string type.""" + data = bytearray(string, "UTF-8") + return bytearray(chain(encode_unsigned(len(data)), data)) + + +def encode_byte(value): + """Encode a byte type.""" + return bytearray([value]) + + +TYPE_TO_ENCODER = { + TYPE_BOOLEAN: encode_boolean, + TYPE_BOOLEAN_ARRAY: encode_boolean_array, + TYPE_INT: encode_unsigned, + TYPE_INT_ARRAY: encode_int_array, + TYPE_LONG: encode_unsigned, + TYPE_LONG_ARRAY: encode_int_array, + TYPE_DOUBLE: encode_double, + TYPE_DOUBLE_ARRAY: encode_double_array, + TYPE_STRING: encode_string +} + + +def encode_value(value, dtype): + """Encode a value according to a given data type.""" + encoder = TYPE_TO_ENCODER[dtype] + return encoder(value) if encoder is not None else None diff --git a/python/gs_netstream/varint.py b/python/gs_netstream/varint.py deleted file mode 100644 index 1ab88ce..0000000 --- a/python/gs_netstream/varint.py +++ /dev/null @@ -1,24 +0,0 @@ -#!/usr/bin/env python -import numpy as np -from .constants import ENCODING_SIZES -"""Implementation for encoding unsigned varints.""" - - -def encoding_size(value: int): - """Computes the encoding size of a value.""" - dist = (ENCODING_SIZES - value) <= 0 - return 9 if not np.all(dist) else np.argmin(dist) + 1 - - -def encode_unsigned(value): - """Encodes a Python integer into its varint representation.""" - if not isinstance(value, int) or value < 0: - raise TypeError("value argument is not an integer or is negative") - size = encoding_size(value) - buff = bytearray(size) - for i in range(size): - head = 128 - if i == size - 1: - head = 0 - buff[i] = (((value >> (7 * i)) & 127) ^ head) & 255 - return buff From 12a6c5e35f7397697363314fa75ed0042c2533ab Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 11:18:02 +0300 Subject: [PATCH 06/16] add typings --- python/gs_netstream/sender_utils.py | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/python/gs_netstream/sender_utils.py b/python/gs_netstream/sender_utils.py index 8c25cd6..3e45c35 100644 --- a/python/gs_netstream/sender_utils.py +++ b/python/gs_netstream/sender_utils.py @@ -1,19 +1,19 @@ """Sender encoding utils""" import struct from itertools import chain -from typing import List, Callable +from typing import List, Callable, Any, Optional from .constants import * import numpy as np -def encoding_size(value: int): +def encoding_size(value: int) -> int: """Computes the encoding size of a value.""" dist = (ENCODING_SIZES - value) <= 0 return 9 if not np.all(dist) else np.argmin(dist) + 1 -def encode_unsigned(value): +def encode_unsigned(value: int) -> bytearray: """Encodes a Python integer into its varint representation.""" assert isinstance(value, int) and value >= 0, f"Value argument is not an integer or is negative = {value}" size = encoding_size(value) @@ -34,7 +34,7 @@ def encode_unsigned(value): } -def get_type(value): +def get_type(value: Any) -> int: """Get the data type for a given value.""" is_array = isinstance(value, list) value_type_str = type(value[0] if is_array else value).__name__ @@ -52,38 +52,38 @@ def encode_array(values_array: List, single_value_type, encoding_method: Callabl return bytearray(chain(encode_unsigned(len(values_array)), *[encoding_method(elem) for elem in values_array])) -def encode_boolean(value): +def encode_boolean(value: bool) -> bytearray: """Encode a boolean type.""" return bytearray([value & 1]) -def encode_boolean_array(value: List[bool]): +def encode_boolean_array(value: List[bool]) -> bytearray: """Encode an array of boolean values.""" return encode_array(value, bool, encode_boolean) -def encode_int_array(value): +def encode_int_array(value: List[int]) -> bytearray: """Encode an array of integer values.""" return encode_array(value, int, encode_unsigned) -def encode_double(value): +def encode_double(value: int) -> bytearray: """Encode a double type.""" return bytearray(struct.pack("!d", value)) -def encode_double_array(value): +def encode_double_array(value: List[int]) -> bytearray: """Encode an array of double values.""" return encode_array(value, float, encode_double) -def encode_string(string): +def encode_string(string: str) -> bytearray: """Encode a string type.""" data = bytearray(string, "UTF-8") return bytearray(chain(encode_unsigned(len(data)), data)) -def encode_byte(value): +def encode_byte(value) -> bytearray: """Encode a byte type.""" return bytearray([value]) @@ -93,15 +93,13 @@ def encode_byte(value): TYPE_BOOLEAN_ARRAY: encode_boolean_array, TYPE_INT: encode_unsigned, TYPE_INT_ARRAY: encode_int_array, - TYPE_LONG: encode_unsigned, - TYPE_LONG_ARRAY: encode_int_array, TYPE_DOUBLE: encode_double, TYPE_DOUBLE_ARRAY: encode_double_array, TYPE_STRING: encode_string } -def encode_value(value, dtype): +def encode_value(value: Any, dtype: int) -> Optional[bytearray]: """Encode a value according to a given data type.""" encoder = TYPE_TO_ENCODER[dtype] return encoder(value) if encoder is not None else None From 92f1d6505d7f55a637126a35e5c26ea3e9433734 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 12:19:46 +0300 Subject: [PATCH 07/16] refactor sender --- python/gs_netstream/sender.py | 346 +++++++--------------------- python/gs_netstream/sender_utils.py | 5 + 2 files changed, 82 insertions(+), 269 deletions(-) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index 221e9ea..d1ff014 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -14,17 +14,19 @@ import socket import struct -from . import varint +from typing import List, Any + from .constants import * import logging from random import random from .common import AttributeSink, ElementSink +from .sender_utils import get_msg, get_type class DefaultNetStreamTransport: """Default transport class using TCP/IP networking.""" - def __init__(self, host, port): + def __init__(self, host, port: int): """Initialize using host and port.""" self.host = host self.port = port @@ -32,10 +34,10 @@ def __init__(self, host, port): def connect(self): """Connect to remote server if necessary.""" - if not self.socket: + if self.socket is None: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.socket.connect((self.host, self.port)) - logging.info("connected to remote server") + logging.info(f"Connected to remote server - host = {self.host}, port = {self.port}") def send(self, data): """Send data to remote server.""" @@ -100,132 +102,22 @@ def close(self): if self.transport: self.transport.close() - def get_type(self, value): - """Get the data type for a given value.""" - is_array = isinstance(value, list) - if is_array: - value = value[0] - if isinstance(value, bool): - if is_array: - return TYPE_BOOLEAN_ARRAY - return TYPE_BOOLEAN - elif isinstance(value, int): - if is_array: - return TYPE_INT_ARRAY - return TYPE_INT - elif isinstance(value, float): - if is_array: - return TYPE_DOUBLE_ARRAY - return TYPE_DOUBLE - elif isinstance(value, str): - return TYPE_STRING - elif isinstance(value, dict): - raise NotImplementedError("dicts are not supported") - - def encode_value(self, value, dtype): - """Encode a value according to a given data type.""" - if dtype is TYPE_BOOLEAN: - return self.encode_boolean(value) - elif dtype is TYPE_BOOLEAN_ARRAY: - return self.encode_boolean_array(value) - elif dtype is TYPE_INT: - return self.encode_int(value) - elif dtype is TYPE_INT_ARRAY: - return self.encode_int_array(value) - elif dtype is TYPE_LONG: - return self.encode_long(value) - elif dtype is TYPE_LONG_ARRAY: - return self.encode_long_array(value) - elif dtype is TYPE_DOUBLE: - return self.encode_double(value) - elif dtype is TYPE_DOUBLE_ARRAY: - return self.encode_double_array(value) - elif dtype is TYPE_STRING: - return self.encode_string(value) - return None - - def encode_boolean(self, value): - """Encode a boolean type.""" - return bytearray([value & 1]) - - def encode_boolean_array(self, value): - """Encode an array of boolean values.""" - if not isinstance(value, list): - raise TypeError("value is not an array") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(value))) - for elem in value: - if not isinstance(elem, bool): - raise TypeError("array element is not a boolean") - buff.extend(self.encode_boolean(elem)) - return buff - - def encode_int(self, value): - """Encode an integer type.""" - return varint.encode_unsigned(value) - - def encode_int_array(self, value): - """Encode an array of integer values.""" - if not isinstance(value, list): - raise TypeError("value is not an array") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(value))) - for elem in value: - if not isinstance(elem, int): - raise TypeError("array element is not an integer") - buff.extend(self.encode_int(elem)) - return buff - - def encode_long(self, value): - """Encode a long type.""" - return self.encode_int(value) # same as int for now - - def encode_long_array(self, value): - """Encode an array of long values.""" - return self.encode_int_array(value) # same as int_array for now - - def encode_double(self, value): - """Encode a double type.""" - return bytearray(struct.pack("!d", value)) - - def encode_double_array(self, value): - """Encode an array of double values.""" - if not isinstance(value, list): - raise TypeError("value is not an array") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(value))) - for elem in value: - if not isinstance(elem, float): - raise TypeError("array element is not a float/double") - buff.extend(self.encode_double(elem)) - return buff - - def encode_string(self, string): - """Encode a string type.""" - data = bytearray(string, "UTF-8") - buff = bytearray() - buff.extend(varint.encode_unsigned(len(data))) - buff.extend(data) - return buff - - def encode_byte(self, value): - """Encode a byte type.""" - return bytearray([value]) # ========================= # = AttributeSink methods = # ========================= - def node_added(self, source_id, time_id, node_id): - """A node was added.""" + def send_msg(self, source_id: str, values: List[Any], value_types: List[int]): if not source_id is self.source_id: self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_NODE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) + buff = get_msg(values, value_types) self.send(buff) + + def node_added(self, source_id, time_id, node_id): + """A node was added.""" + self.send_msg(source_id=source_id, + values=[EVENT_ADD_NODE, source_id, time_id, node_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("node added: %s", { "source_id": source_id, "time_id": time_id, @@ -234,14 +126,9 @@ def node_added(self, source_id, time_id, node_id): def node_removed(self, source_id, time_id, node_id): """A node was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_NODE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_NODE, source_id, time_id, node_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("node removed: %s", { "source_id": source_id, "time_id": time_id, @@ -250,17 +137,10 @@ def node_removed(self, source_id, time_id, node_id): def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): """An edge was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_EDGE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(from_node)) - buff.extend(self.encode_string(to_node)) - buff.extend(self.encode_boolean(directed)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_EDGE, source_id, time_id, edge_id, from_node, to_node, directed], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_STRING, + TYPE_BOOLEAN]) logging.debug("edge added: %s", { "source_id": source_id, "time_id": time_id, @@ -272,14 +152,9 @@ def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): def edge_removed(self, source_id, time_id, edge_id): """An edge was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_EDGE)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_EDGE, source_id, time_id, edge_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("edge removed: %s", { "source_id": source_id, "time_id": time_id, @@ -288,14 +163,9 @@ def edge_removed(self, source_id, time_id, edge_id): def step_begun(self, source_id, time_id, timestamp): """A new step begun.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_STEP)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_double(timestamp)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_STEP, source_id, time_id, timestamp], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_DOUBLE]) logging.debug("step begun: %s", { "source_id": source_id, "time_id": time_id, @@ -304,13 +174,9 @@ def step_begun(self, source_id, time_id, timestamp): def graph_cleared(self, source_id, time_id): """The graph was cleared.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CLEARED)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_CLEARED, source_id, time_id], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT]) logging.debug("graph cleared: %s", { "source_id": source_id, "time_id": time_id @@ -318,17 +184,10 @@ def graph_cleared(self, source_id, time_id): def graph_attribute_added(self, source_id, time_id, attribute, value): """A graph attribute was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_GRAPH_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(value, dtype)) - self.send(buff) + dtype = get_type(value) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_GRAPH_ATTR, source_id, time_id, attribute, dtype, value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_BYTE, dtype]) logging.debug("graph attribute added: %s", { "source_id": source_id, "time_id": time_id, @@ -338,20 +197,14 @@ def graph_attribute_added(self, source_id, time_id, attribute, value): def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value): """A graph attribute was changed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CHG_GRAPH_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(old_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(old_value, dtype)) - dtype = self.get_type(new_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(new_value, dtype)) - self.send(buff) + old_value_dtype = get_type(old_value) + new_value_dtype = get_type(new_value) + + self.send_msg(source_id=source_id, + values=[EVENT_CHG_GRAPH_ATTR, source_id, time_id, attribute, old_value_dtype, old_value, + new_value_dtype, new_value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_BYTE, old_value_dtype, + TYPE_BYTE, new_value_dtype]) logging.debug("graph attribute changed: %s", { "source_id": source_id, "time_id": time_id, @@ -362,14 +215,9 @@ def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_ def graph_attribute_removed(self, source_id, time_id, attribute): """A graph attribute was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_GRAPH_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(attribute)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_GRAPH_ATTR, source_id, time_id, attribute], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING]) logging.debug("graph attribute removed: %s", { "source_id": source_id, "time_id": time_id, @@ -378,18 +226,10 @@ def graph_attribute_removed(self, source_id, time_id, attribute): def node_attribute_added(self, source_id, time_id, node_id, attribute, value): """A node attribute was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_NODE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(value, dtype)) - self.send(buff) + dtype = get_type(value) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_NODE_ATTR, source_id, time_id, node_id, attribute, dtype, value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, dtype]) logging.debug("node attribute added: %s", { "source_id": source_id, "time_id": time_id, @@ -400,21 +240,14 @@ def node_attribute_added(self, source_id, time_id, node_id, attribute, value): def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value): """A node attribute was changed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CHG_NODE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(old_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(old_value, dtype)) - dtype = self.get_type(new_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(new_value, dtype)) - self.send(buff) + old_value_dtype = get_type(old_value) + new_value_dtype = get_type(new_value) + + self.send_msg(source_id=source_id, + values=[EVENT_CHG_NODE_ATTR, source_id, time_id, node_id, attribute, old_value_dtype, old_value, + new_value_dtype, new_value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, + old_value_dtype, TYPE_BYTE, new_value_dtype]) logging.debug("node attribute changed: %s", { "source_id": source_id, "time_id": time_id, @@ -426,15 +259,9 @@ def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_val def node_attribute_removed(self, source_id, time_id, node_id, attribute): """A node attribute was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_NODE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(node_id)) - buff.extend(self.encode_string(attribute)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_NODE_ATTR, source_id, time_id, node_id, attribute], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING]) logging.debug("node attribute removed: %s", { "source_id": source_id, "time_id": time_id, @@ -444,18 +271,10 @@ def node_attribute_removed(self, source_id, time_id, node_id, attribute): def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): """An edge attribute was added.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_ADD_EDGE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(value, dtype)) - self.send(buff) + dtype = get_type(value) + self.send_msg(source_id=source_id, + values=[EVENT_ADD_EDGE_ATTR, source_id, time_id, edge_id, attribute, dtype, value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, dtype]) logging.debug("edge attribute added: %s", { "source_id": source_id, "time_id": time_id, @@ -466,21 +285,15 @@ def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value): """An edge attribute was changed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_CHG_EDGE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(attribute)) - dtype = self.get_type(old_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(old_value, dtype)) - dtype = self.get_type(new_value) - buff.extend(self.encode_byte(dtype)) - buff.extend(self.encode_value(new_value, dtype)) - self.send(buff) + old_value_dtype = get_type(old_value) + new_value_dtype = get_type(new_value) + + self.send_msg(source_id=source_id, + values=[EVENT_CHG_EDGE_ATTR, source_id, time_id, edge_id, attribute, old_value_dtype, old_value, + new_value_dtype, new_value], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING, TYPE_BYTE, + old_value_dtype, TYPE_BYTE, new_value_dtype]) + logging.debug("edge attribute changed: %s", { "source_id": source_id, "time_id": time_id, @@ -492,15 +305,10 @@ def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_val def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): """An edge attribute was removed.""" - if not source_id is self.source_id: - self.set_source_id(source_id) - buff = bytearray() - buff.extend(self.encode_byte(EVENT_DEL_EDGE_ATTR)) - buff.extend(self.encode_string(source_id)) - buff.extend(self.encode_long(time_id)) - buff.extend(self.encode_string(edge_id)) - buff.extend(self.encode_string(attribute)) - self.send(buff) + self.send_msg(source_id=source_id, + values=[EVENT_DEL_EDGE_ATTR, source_id, time_id, edge_id, attribute], + value_types=[TYPE_BYTE, TYPE_STRING, TYPE_INT, TYPE_STRING, TYPE_STRING]) + logging.debug("edge attribute removed: %s", { "source_id": source_id, "time_id": time_id, diff --git a/python/gs_netstream/sender_utils.py b/python/gs_netstream/sender_utils.py index 3e45c35..1846316 100644 --- a/python/gs_netstream/sender_utils.py +++ b/python/gs_netstream/sender_utils.py @@ -89,6 +89,7 @@ def encode_byte(value) -> bytearray: TYPE_TO_ENCODER = { + TYPE_BYTE: encode_byte, TYPE_BOOLEAN: encode_boolean, TYPE_BOOLEAN_ARRAY: encode_boolean_array, TYPE_INT: encode_unsigned, @@ -103,3 +104,7 @@ def encode_value(value: Any, dtype: int) -> Optional[bytearray]: """Encode a value according to a given data type.""" encoder = TYPE_TO_ENCODER[dtype] return encoder(value) if encoder is not None else None + + +def get_msg(values: List[Any], types: List[int]) -> bytearray: + return bytearray(chain(*[encode_value(value, value_type) for value, value_type in zip(values, types)])) From 0bee9f9da00666594506adf5b4189d9364266556 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 12:27:58 +0300 Subject: [PATCH 08/16] fix function usage --- python/gs_netstream/sender.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index d1ff014..6cfc7c4 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -20,7 +20,7 @@ import logging from random import random from .common import AttributeSink, ElementSink -from .sender_utils import get_msg, get_type +from .sender_utils import get_msg, get_type, encode_value class DefaultNetStreamTransport: @@ -76,12 +76,12 @@ def __init__(self, port, host="localhost", stream="default"): def set_stream(self, stream): """Set and cache a stream ID.""" self.stream = stream - self.stream_buff = self.encode_string(stream) + self.stream_buff = encode_value(stream, TYPE_STRING) def set_source_id(self, source_id): """Set and cache a source ID.""" self.source_id = source_id - self.source_id_buff = self.encode_string(source_id) + self.source_id_buff = encode_value(source_id, TYPE_STRING) def connect(self): """Connect to the underlying transport.""" From eaf602a6fc1e8ac75c498f007a2982afb174259d Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 13:04:46 +0300 Subject: [PATCH 09/16] add typings --- python/gs_netstream/sender.py | 67 ++++++++++++++++++----------------- 1 file changed, 35 insertions(+), 32 deletions(-) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index 6cfc7c4..4e40f3d 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -78,7 +78,7 @@ def set_stream(self, stream): self.stream = stream self.stream_buff = encode_value(stream, TYPE_STRING) - def set_source_id(self, source_id): + def set_source_id(self, source_id: str): """Set and cache a source ID.""" self.source_id = source_id self.source_id_buff = encode_value(source_id, TYPE_STRING) @@ -113,7 +113,7 @@ def send_msg(self, source_id: str, values: List[Any], value_types: List[int]): buff = get_msg(values, value_types) self.send(buff) - def node_added(self, source_id, time_id, node_id): + def node_added(self, source_id: str, time_id: int, node_id: str): """A node was added.""" self.send_msg(source_id=source_id, values=[EVENT_ADD_NODE, source_id, time_id, node_id], @@ -124,7 +124,7 @@ def node_added(self, source_id, time_id, node_id): "node_id": node_id }) - def node_removed(self, source_id, time_id, node_id): + def node_removed(self, source_id: str, time_id: int, node_id: str): """A node was removed.""" self.send_msg(source_id=source_id, values=[EVENT_DEL_NODE, source_id, time_id, node_id], @@ -135,7 +135,7 @@ def node_removed(self, source_id, time_id, node_id): "node_id": node_id }) - def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): + def edge_added(self, source_id: str, time_id: int, edge_id: str, from_node: str, to_node: str, directed: bool): """An edge was added.""" self.send_msg(source_id=source_id, values=[EVENT_ADD_EDGE, source_id, time_id, edge_id, from_node, to_node, directed], @@ -150,7 +150,7 @@ def edge_added(self, source_id, time_id, edge_id, from_node, to_node, directed): "directed": directed }) - def edge_removed(self, source_id, time_id, edge_id): + def edge_removed(self, source_id: str, time_id: int, edge_id: str): """An edge was removed.""" self.send_msg(source_id=source_id, values=[EVENT_DEL_EDGE, source_id, time_id, edge_id], @@ -161,7 +161,7 @@ def edge_removed(self, source_id, time_id, edge_id): "node_id": edge_id }) - def step_begun(self, source_id, time_id, timestamp): + def step_begun(self, source_id: str, time_id: int, timestamp: int): """A new step begun.""" self.send_msg(source_id=source_id, values=[EVENT_STEP, source_id, time_id, timestamp], @@ -172,7 +172,7 @@ def step_begun(self, source_id, time_id, timestamp): "timestamp": timestamp }) - def graph_cleared(self, source_id, time_id): + def graph_cleared(self, source_id: str, time_id: int): """The graph was cleared.""" self.send_msg(source_id=source_id, values=[EVENT_CLEARED, source_id, time_id], @@ -182,7 +182,7 @@ def graph_cleared(self, source_id, time_id): "time_id": time_id }) - def graph_attribute_added(self, source_id, time_id, attribute, value): + def graph_attribute_added(self, source_id: str, time_id: int, attribute: str, value): """A graph attribute was added.""" dtype = get_type(value) self.send_msg(source_id=source_id, @@ -195,7 +195,7 @@ def graph_attribute_added(self, source_id, time_id, attribute, value): "value": value }) - def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_value): + def graph_attribute_changed(self, source_id: str, time_id: int, attribute: str, old_value, new_value): """A graph attribute was changed.""" old_value_dtype = get_type(old_value) new_value_dtype = get_type(new_value) @@ -213,7 +213,7 @@ def graph_attribute_changed(self, source_id, time_id, attribute, old_value, new_ "new_value": new_value }) - def graph_attribute_removed(self, source_id, time_id, attribute): + def graph_attribute_removed(self, source_id: str, time_id: int, attribute: str): """A graph attribute was removed.""" self.send_msg(source_id=source_id, values=[EVENT_DEL_GRAPH_ATTR, source_id, time_id, attribute], @@ -224,7 +224,7 @@ def graph_attribute_removed(self, source_id, time_id, attribute): "attribute": attribute }) - def node_attribute_added(self, source_id, time_id, node_id, attribute, value): + def node_attribute_added(self, source_id: str, time_id: int, node_id: str, attribute: str, value): """A node attribute was added.""" dtype = get_type(value) self.send_msg(source_id=source_id, @@ -238,7 +238,7 @@ def node_attribute_added(self, source_id, time_id, node_id, attribute, value): "value": value }) - def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_value, new_value): + def node_attribute_changed(self, source_id: str, time_id: int, node_id: str, attribute: str, old_value, new_value): """A node attribute was changed.""" old_value_dtype = get_type(old_value) new_value_dtype = get_type(new_value) @@ -257,7 +257,7 @@ def node_attribute_changed(self, source_id, time_id, node_id, attribute, old_val "new_value": new_value }) - def node_attribute_removed(self, source_id, time_id, node_id, attribute): + def node_attribute_removed(self, source_id: str, time_id: int, node_id: str, attribute: str): """A node attribute was removed.""" self.send_msg(source_id=source_id, values=[EVENT_DEL_NODE_ATTR, source_id, time_id, node_id, attribute], @@ -269,7 +269,7 @@ def node_attribute_removed(self, source_id, time_id, node_id, attribute): "attribute": attribute }) - def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): + def edge_attribute_added(self, source_id: str, time_id: int, edge_id: str, attribute: str, value): """An edge attribute was added.""" dtype = get_type(value) self.send_msg(source_id=source_id, @@ -283,7 +283,7 @@ def edge_attribute_added(self, source_id, time_id, edge_id, attribute, value): "value": value }) - def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_value, new_value): + def edge_attribute_changed(self, source_id: str, time_id: int, edge_id: str, attribute: str, old_value, new_value): """An edge attribute was changed.""" old_value_dtype = get_type(old_value) new_value_dtype = get_type(new_value) @@ -303,7 +303,7 @@ def edge_attribute_changed(self, source_id, time_id, edge_id, attribute, old_val "new_value": new_value }) - def edge_attribute_removed(self, source_id, time_id, edge_id, attribute): + def edge_attribute_removed(self, source_id: str, time_id: int, edge_id: str, attribute: str): """An edge attribute was removed.""" self.send_msg(source_id=source_id, values=[EVENT_DEL_EDGE_ATTR, source_id, time_id, edge_id, attribute], @@ -329,67 +329,70 @@ def __init__(self, sender, source_id=None): self.source_id = source_id if source_id else "nss%d" % (1000 * random()) self.time_id = 0 - def add_node(self, node): - """Add a node to the graph.""" - self.sender.node_added(self.source_id, self.time_id, node) + def run_sender_method(self, sender_method, *args, **kwargs): + sender_method(self.source_id, self.time_id, *args, **kwargs) self.time_id += 1 - def remove_node(self, node): + def add_node(self, node: str): + """Add a node to the graph.""" + self.run_sender_method(self.sender.node_added, node) + + def remove_node(self, node: str): """Remove a node from the graph.""" self.sender.node_removed(self.source_id, self.time_id, node) self.time_id += 1 - def add_edge(self, edge, from_node, to_node, directed=False): + def add_edge(self, edge: str, from_node: str, to_node: str, directed: bool = False): """Add an edge to the graph.""" self.sender.edge_added(self.source_id, self.time_id, edge, from_node, to_node, directed) self.time_id += 1 - def remove_edge(self, edge): + def remove_edge(self, edge: str): """Remove an edge from the graph.""" self.sender.edge_removed(self.source_id, self.time_id, edge) self.time_id += 1 - def add_attribute(self, attribute, value): + def add_attribute(self, attribute: str, value): """Add an attribute to the graph.""" self.sender.graph_attribute_added(self.source_id, self.time_id, attribute, value) self.time_id += 1 - def remove_attribute(self, attribute): + def remove_attribute(self, attribute: str): """Remove an attribute from the graph.""" self.sender.graph_attribute_removed(self.source_id, self.time_id, attribute) self.time_id += 1 - def change_attribute(self, attribute, old_value, new_value): + def change_attribute(self, attribute: str, old_value, new_value): """Change an attribute of the graph.""" self.sender.graph_attribute_changed(self.source_id, self.time_id, attribute, old_value, new_value) self.time_id += 1 - def add_node_attribute(self, node, attribute, value): + def add_node_attribute(self, node: str, attribute: str, value): """Add an attribute to a node.""" self.sender.node_attribute_added(self.source_id, self.time_id, node, attribute, value) self.time_id += 1 - def remove_node_attibute(self, node, attribute): + def remove_node_attibute(self, node: str, attribute: str): """Remove an attribute from a node.""" self.sender.node_attribute_removed(self.source_id, self.time_id, node, attribute) self.time_id += 1 - def change_node_attribute(self, node, attribute, old_value, new_value): + def change_node_attribute(self, node: str, attribute: str, old_value, new_value): """Change an attribute of a node.""" self.sender.node_attribute_changed(self.source_id, self.time_id, node, attribute, old_value, new_value) self.time_id += 1 - def add_edge_attribute(self, edge, attribute, value): + def add_edge_attribute(self, edge: str, attribute: str, value): """Add an attribute to an edge.""" self.sender.edge_attribute_added(self.source_id, self.time_id, edge, attribute, value) self.time_id += 1 - def remove_edge_attribute(self, edge, attribute): + def remove_edge_attribute(self, edge: str, attribute: str): """Remove an attribute from an edge.""" self.sender.edge_attribute_removed(self.source_id, self.time_id, edge, attribute) self.time_id += 1 - def change_edge_attribute(self, edge, attribute, old_value, new_value): + def change_edge_attribute(self, edge: str, attribute: str, old_value, new_value): """Change an attribute of an edge.""" self.sender.edge_attribute_changed(self.source_id, self.time_id, edge, attribute, old_value, new_value) self.time_id += 1 @@ -399,7 +402,7 @@ def clear_graph(self): self.sender.graph_cleared(self.source_id, self.time_id) self.time_id += 1 - def step_begins(self, time): + def step_begins(self, time: int): """Begin a step.""" self.sender.step_begun(self.source_id, self.time_id, time) self.time_id += 1 From 92be28867d476378a16cfe42d2fde11ef6a9a6a9 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 13:11:04 +0300 Subject: [PATCH 10/16] refactor netstream proxy graph --- python/gs_netstream/sender.py | 48 ++++++++++++++--------------------- 1 file changed, 19 insertions(+), 29 deletions(-) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index 4e40f3d..e78bfa3 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -324,7 +324,11 @@ class NetStreamProxyGraph: """ def __init__(self, sender, source_id=None): - """Constructor can be with one NetStreamSender object and a source id OR with with 4 args: Source ID, Stream ID, Host, and port number""" + """Constructor can be with one NetStreamSender object and a source id OR with with 4 args. + + Notes: + 4 args: Source ID, Stream ID, Host, and port number + """ self.sender = sender self.source_id = source_id if source_id else "nss%d" % (1000 * random()) self.time_id = 0 @@ -339,70 +343,56 @@ def add_node(self, node: str): def remove_node(self, node: str): """Remove a node from the graph.""" - self.sender.node_removed(self.source_id, self.time_id, node) - self.time_id += 1 + self.run_sender_method(self.sender.node_removed, node) def add_edge(self, edge: str, from_node: str, to_node: str, directed: bool = False): """Add an edge to the graph.""" - self.sender.edge_added(self.source_id, self.time_id, edge, from_node, to_node, directed) - self.time_id += 1 + self.run_sender_method(self.sender.edge_added, edge, from_node, to_node, directed) def remove_edge(self, edge: str): """Remove an edge from the graph.""" - self.sender.edge_removed(self.source_id, self.time_id, edge) - self.time_id += 1 + self.run_sender_method(self.sender.edge_removed, edge) def add_attribute(self, attribute: str, value): """Add an attribute to the graph.""" - self.sender.graph_attribute_added(self.source_id, self.time_id, attribute, value) - self.time_id += 1 + self.run_sender_method(self.sender.graph_attribute_added, attribute, value) def remove_attribute(self, attribute: str): """Remove an attribute from the graph.""" - self.sender.graph_attribute_removed(self.source_id, self.time_id, attribute) - self.time_id += 1 + self.run_sender_method(self.sender.graph_attribute_removed, attribute) def change_attribute(self, attribute: str, old_value, new_value): """Change an attribute of the graph.""" - self.sender.graph_attribute_changed(self.source_id, self.time_id, attribute, old_value, new_value) - self.time_id += 1 + self.run_sender_method(self.sender.graph_attribute_changed, attribute, old_value, new_value) def add_node_attribute(self, node: str, attribute: str, value): """Add an attribute to a node.""" - self.sender.node_attribute_added(self.source_id, self.time_id, node, attribute, value) - self.time_id += 1 + self.run_sender_method(self.sender.node_attribute_added, node, attribute, value) def remove_node_attibute(self, node: str, attribute: str): """Remove an attribute from a node.""" - self.sender.node_attribute_removed(self.source_id, self.time_id, node, attribute) - self.time_id += 1 + self.run_sender_method(self.sender.node_attribute_removed, node, attribute) def change_node_attribute(self, node: str, attribute: str, old_value, new_value): """Change an attribute of a node.""" - self.sender.node_attribute_changed(self.source_id, self.time_id, node, attribute, old_value, new_value) - self.time_id += 1 + self.run_sender_method(self.sender.node_attribute_changed, node, attribute, old_value, new_value) def add_edge_attribute(self, edge: str, attribute: str, value): """Add an attribute to an edge.""" - self.sender.edge_attribute_added(self.source_id, self.time_id, edge, attribute, value) - self.time_id += 1 + self.run_sender_method(self.sender.edge_attribute_added, edge, attribute, value) def remove_edge_attribute(self, edge: str, attribute: str): """Remove an attribute from an edge.""" - self.sender.edge_attribute_removed(self.source_id, self.time_id, edge, attribute) - self.time_id += 1 + self.run_sender_method(self.sender.edge_attribute_removed, edge, attribute) def change_edge_attribute(self, edge: str, attribute: str, old_value, new_value): """Change an attribute of an edge.""" - self.sender.edge_attribute_changed(self.source_id, self.time_id, edge, attribute, old_value, new_value) - self.time_id += 1 + self.run_sender_method(self.sender.edge_attribute_changed, edge, attribute, old_value, new_value) def clear_graph(self): """Clear the graph.""" - self.sender.graph_cleared(self.source_id, self.time_id) - self.time_id += 1 + self.run_sender_method(self.sender.graph_cleared) def step_begins(self, time: int): """Begin a step.""" - self.sender.step_begun(self.source_id, self.time_id, time) - self.time_id += 1 + self.run_sender_method(self.sender.step_begun, time) From c0cc3e54560a1cd35a204dd6bc59d1f03a1090d7 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 13:16:55 +0300 Subject: [PATCH 11/16] move proxy graph implementation into a separate file --- python/gs_netstream/graph_sender.py | 82 +++++++++++++++++++++++++ python/gs_netstream/sender.py | 95 +++-------------------------- 2 files changed, 90 insertions(+), 87 deletions(-) create mode 100644 python/gs_netstream/graph_sender.py diff --git a/python/gs_netstream/graph_sender.py b/python/gs_netstream/graph_sender.py new file mode 100644 index 0000000..1462859 --- /dev/null +++ b/python/gs_netstream/graph_sender.py @@ -0,0 +1,82 @@ +"""Proxy Netstream graph class""" + + +class NetStreamProxyGraph: + """ + This is a utility class that handles 'source id' and 'time id' synchronization tokens. + It proposes utile classes that allow to directly send events through the network pipe. + """ + + def __init__(self, sender, source_id=None): + """Constructor can be with one NetStreamSender object and a source id OR with with 4 args. + + Notes: + 4 args: Source ID, Stream ID, Host, and port number + """ + self.sender = sender + self.source_id = source_id if source_id else "nss%d" % (1000 * random()) + self.time_id = 0 + + def run_sender_method(self, sender_method, *args, **kwargs): + sender_method(self.source_id, self.time_id, *args, **kwargs) + self.time_id += 1 + + def add_node(self, node: str): + """Add a node to the graph.""" + self.run_sender_method(self.sender.node_added, node) + + def remove_node(self, node: str): + """Remove a node from the graph.""" + self.run_sender_method(self.sender.node_removed, node) + + def add_edge(self, edge: str, from_node: str, to_node: str, directed: bool = False): + """Add an edge to the graph.""" + self.run_sender_method(self.sender.edge_added, edge, from_node, to_node, directed) + + def remove_edge(self, edge: str): + """Remove an edge from the graph.""" + self.run_sender_method(self.sender.edge_removed, edge) + + def add_attribute(self, attribute: str, value): + """Add an attribute to the graph.""" + self.run_sender_method(self.sender.graph_attribute_added, attribute, value) + + def remove_attribute(self, attribute: str): + """Remove an attribute from the graph.""" + self.run_sender_method(self.sender.graph_attribute_removed, attribute) + + def change_attribute(self, attribute: str, old_value, new_value): + """Change an attribute of the graph.""" + self.run_sender_method(self.sender.graph_attribute_changed, attribute, old_value, new_value) + + def add_node_attribute(self, node: str, attribute: str, value): + """Add an attribute to a node.""" + self.run_sender_method(self.sender.node_attribute_added, node, attribute, value) + + def remove_node_attibute(self, node: str, attribute: str): + """Remove an attribute from a node.""" + self.run_sender_method(self.sender.node_attribute_removed, node, attribute) + + def change_node_attribute(self, node: str, attribute: str, old_value, new_value): + """Change an attribute of a node.""" + self.run_sender_method(self.sender.node_attribute_changed, node, attribute, old_value, new_value) + + def add_edge_attribute(self, edge: str, attribute: str, value): + """Add an attribute to an edge.""" + self.run_sender_method(self.sender.edge_attribute_added, edge, attribute, value) + + def remove_edge_attribute(self, edge: str, attribute: str): + """Remove an attribute from an edge.""" + self.run_sender_method(self.sender.edge_attribute_removed, edge, attribute) + + def change_edge_attribute(self, edge: str, attribute: str, old_value, new_value): + """Change an attribute of an edge.""" + self.run_sender_method(self.sender.edge_attribute_changed, edge, attribute, old_value, new_value) + + def clear_graph(self): + """Clear the graph.""" + self.run_sender_method(self.sender.graph_cleared) + + def step_begins(self, time: int): + """Begin a step.""" + self.run_sender_method(self.sender.step_begun, time) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index e78bfa3..ed1d806 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -18,7 +18,6 @@ from .constants import * import logging -from random import random from .common import AttributeSink, ElementSink from .sender_utils import get_msg, get_type, encode_value @@ -102,17 +101,16 @@ def close(self): if self.transport: self.transport.close() - - # ========================= - # = AttributeSink methods = - # ========================= - def send_msg(self, source_id: str, values: List[Any], value_types: List[int]): if not source_id is self.source_id: self.set_source_id(source_id) buff = get_msg(values, value_types) self.send(buff) + ########################### + # ElementSink methods + ########################### + def node_added(self, source_id: str, time_id: int, node_id: str): """A node was added.""" self.send_msg(source_id=source_id, @@ -182,6 +180,10 @@ def graph_cleared(self, source_id: str, time_id: int): "time_id": time_id }) + ########################### + # AttributeSink methods + ########################### + def graph_attribute_added(self, source_id: str, time_id: int, attribute: str, value): """A graph attribute was added.""" dtype = get_type(value) @@ -315,84 +317,3 @@ def edge_attribute_removed(self, source_id: str, time_id: int, edge_id: str, att "edge_id": edge_id, "attribute": attribute }) - - -class NetStreamProxyGraph: - """ - This is a utility class that handles 'source id' and 'time id' synchronization tokens. - It proposes utile classes that allow to directly send events through the network pipe. - """ - - def __init__(self, sender, source_id=None): - """Constructor can be with one NetStreamSender object and a source id OR with with 4 args. - - Notes: - 4 args: Source ID, Stream ID, Host, and port number - """ - self.sender = sender - self.source_id = source_id if source_id else "nss%d" % (1000 * random()) - self.time_id = 0 - - def run_sender_method(self, sender_method, *args, **kwargs): - sender_method(self.source_id, self.time_id, *args, **kwargs) - self.time_id += 1 - - def add_node(self, node: str): - """Add a node to the graph.""" - self.run_sender_method(self.sender.node_added, node) - - def remove_node(self, node: str): - """Remove a node from the graph.""" - self.run_sender_method(self.sender.node_removed, node) - - def add_edge(self, edge: str, from_node: str, to_node: str, directed: bool = False): - """Add an edge to the graph.""" - self.run_sender_method(self.sender.edge_added, edge, from_node, to_node, directed) - - def remove_edge(self, edge: str): - """Remove an edge from the graph.""" - self.run_sender_method(self.sender.edge_removed, edge) - - def add_attribute(self, attribute: str, value): - """Add an attribute to the graph.""" - self.run_sender_method(self.sender.graph_attribute_added, attribute, value) - - def remove_attribute(self, attribute: str): - """Remove an attribute from the graph.""" - self.run_sender_method(self.sender.graph_attribute_removed, attribute) - - def change_attribute(self, attribute: str, old_value, new_value): - """Change an attribute of the graph.""" - self.run_sender_method(self.sender.graph_attribute_changed, attribute, old_value, new_value) - - def add_node_attribute(self, node: str, attribute: str, value): - """Add an attribute to a node.""" - self.run_sender_method(self.sender.node_attribute_added, node, attribute, value) - - def remove_node_attibute(self, node: str, attribute: str): - """Remove an attribute from a node.""" - self.run_sender_method(self.sender.node_attribute_removed, node, attribute) - - def change_node_attribute(self, node: str, attribute: str, old_value, new_value): - """Change an attribute of a node.""" - self.run_sender_method(self.sender.node_attribute_changed, node, attribute, old_value, new_value) - - def add_edge_attribute(self, edge: str, attribute: str, value): - """Add an attribute to an edge.""" - self.run_sender_method(self.sender.edge_attribute_added, edge, attribute, value) - - def remove_edge_attribute(self, edge: str, attribute: str): - """Remove an attribute from an edge.""" - self.run_sender_method(self.sender.edge_attribute_removed, edge, attribute) - - def change_edge_attribute(self, edge: str, attribute: str, old_value, new_value): - """Change an attribute of an edge.""" - self.run_sender_method(self.sender.edge_attribute_changed, edge, attribute, old_value, new_value) - - def clear_graph(self): - """Clear the graph.""" - self.run_sender_method(self.sender.graph_cleared) - - def step_begins(self, time: int): - """Begin a step.""" - self.run_sender_method(self.sender.step_begun, time) From 999aa5d6689d9354969cb1b572f97145bba59ce7 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 13:23:32 +0300 Subject: [PATCH 12/16] slightly refactor NetStreamSender --- python/gs_netstream/sender.py | 23 +++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) diff --git a/python/gs_netstream/sender.py b/python/gs_netstream/sender.py index ed1d806..3332596 100644 --- a/python/gs_netstream/sender.py +++ b/python/gs_netstream/sender.py @@ -14,7 +14,7 @@ import socket import struct -from typing import List, Any +from typing import List, Any, Optional from .constants import * import logging @@ -61,17 +61,20 @@ class NetStreamSender(AttributeSink, ElementSink): def __init__(self, port, host="localhost", stream="default"): """Initialize using port, host (optional) and stream ID (optional).""" - self.host = host - self.port = port - self.stream = None - self.stream_buff = None + self.host: str = host + self.port: int = port + self.stream: Optional[str] = None + self.stream_buff: Optional[bytearray] = None self.set_stream(stream) - self.source_id = None - self.source_id_buff = None + self.source_id: Optional[str] = None + self.source_id_buff: Optional[bytearray] = None self.set_source_id("") - self.transport = None + self.transport: Optional[DefaultNetStreamTransport] = None self.connect() + def __del__(self): + self.close() + def set_stream(self, stream): """Set and cache a stream ID.""" self.stream = stream @@ -98,11 +101,11 @@ def send(self, event): def close(self): """Close the underlying transport.""" - if self.transport: + if self.transport is not None: self.transport.close() def send_msg(self, source_id: str, values: List[Any], value_types: List[int]): - if not source_id is self.source_id: + if source_id != self.source_id: self.set_source_id(source_id) buff = get_msg(values, value_types) self.send(buff) From 731c9b0f96063f1347432ff0a18ce2f75a04f481 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 13:59:13 +0300 Subject: [PATCH 13/16] fix bugs --- python/gs_netstream/__init__.py | 1 + python/gs_netstream/graph_sender.py | 8 ++++++-- python/gs_netstream/sender_utils.py | 2 +- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/gs_netstream/__init__.py b/python/gs_netstream/__init__.py index e69de29..7acce47 100644 --- a/python/gs_netstream/__init__.py +++ b/python/gs_netstream/__init__.py @@ -0,0 +1 @@ +from .graph_sender import NetStreamProxyGraph diff --git a/python/gs_netstream/graph_sender.py b/python/gs_netstream/graph_sender.py index 1462859..593eeb9 100644 --- a/python/gs_netstream/graph_sender.py +++ b/python/gs_netstream/graph_sender.py @@ -1,4 +1,8 @@ """Proxy Netstream graph class""" +from random import random +from typing import Optional + +from gs_netstream.sender import NetStreamSender class NetStreamProxyGraph: @@ -7,13 +11,13 @@ class NetStreamProxyGraph: It proposes utile classes that allow to directly send events through the network pipe. """ - def __init__(self, sender, source_id=None): + def __init__(self, sender: Optional[NetStreamSender] = None, source_id: Optional[str] = None, port: int = 8008): """Constructor can be with one NetStreamSender object and a source id OR with with 4 args. Notes: 4 args: Source ID, Stream ID, Host, and port number """ - self.sender = sender + self.sender = sender if sender is not None else NetStreamSender(port) self.source_id = source_id if source_id else "nss%d" % (1000 * random()) self.time_id = 0 diff --git a/python/gs_netstream/sender_utils.py b/python/gs_netstream/sender_utils.py index 1846316..09dde9f 100644 --- a/python/gs_netstream/sender_utils.py +++ b/python/gs_netstream/sender_utils.py @@ -38,7 +38,7 @@ def get_type(value: Any) -> int: """Get the data type for a given value.""" is_array = isinstance(value, list) value_type_str = type(value[0] if is_array else value).__name__ - netstream_type = value_type_str.get(value_type_str, None) + netstream_type = TYPES_CONVERTER.get(value_type_str, None) if netstream_type is None: raise NotImplementedError("dicts are not supported") From f243c4cad336da17168e3f14c90b320bd7995b3a Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 13:59:27 +0300 Subject: [PATCH 14/16] refactor example, add one more --- python/example_sender.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/python/example_sender.py b/python/example_sender.py index c98140f..6a19568 100644 --- a/python/example_sender.py +++ b/python/example_sender.py @@ -1,20 +1,35 @@ import logging -from gs_netstream import sender as sdr +from time import sleep +from gs_netstream import NetStreamProxyGraph +from random import randint logging.basicConfig(level=logging.DEBUG) -sender = sdr.NetStreamSender(2012) -proxy = sdr.NetStreamProxyGraph(sender) -style = "node{fill-mode:plain;fill-color:gray;size:1px;}" -proxy.add_attribute("stylesheet", style) +def ex1(graph: NetStreamProxyGraph) -> None: + style = "node{fill-mode:plain;fill-color:gray;size:1px;}" + graph.add_attribute("stylesheet", style) -proxy.add_attribute("ui.antialias", True) -proxy.add_attribute("layout.stabilization-limit", 0) + graph.add_attribute("ui.antialias", True) + graph.add_attribute("layout.stabilization-limit", 0) -if __name__ == '__main__': - for i in range(0,500): - proxy.add_node(str(i)) + for i in range(500): + sleep(0.2) + graph.add_node(str(i)) if i > 0: - proxy.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False) - proxy.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False) + graph.add_edge(str(i) + "_" + str(i-1), str(i), str(i-1), False) + graph.add_edge(str(i) + "__" + str(i/2), str(i), str(i/2), False) + + +def ex2(graph: NetStreamProxyGraph) -> None: + graph.add_node("0") + for i in range(1, 200): + sleep(0.2) + graph.add_node(str(i)) + i2 = str(randint(0, i-1)) + graph.add_edge(f"{str(i)}_{i2}", str(i), i2) + + +if __name__ == '__main__': + graph = NetStreamProxyGraph(port=8008) + ex2(graph) From 2f894416f5a4f045627af3327bc7336d6b5f2e45 Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 14:12:30 +0300 Subject: [PATCH 15/16] Add some small readme --- python/Readme.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) create mode 100644 python/Readme.md diff --git a/python/Readme.md b/python/Readme.md new file mode 100644 index 0000000..f6e44da --- /dev/null +++ b/python/Readme.md @@ -0,0 +1,12 @@ +# Python example of the NetStreamGraph usage + +For running NetStreamGraph on python you need to do the following: +1. Run a server receiver for graph visualization. + + [Java server example](https://github.com/max-kalganov/graph_stream_server) +2. Use NetStreamProxyGraph to fill a graph. + + class import - `from gs_netstream import NetStreamProxyGraph` + See NetStreamProxyGraph implementation to look up graph methods. + +Run `example_sender.py` for an experiment. \ No newline at end of file From 11c75ecb44d3f9cc0bba309a56cb404e0b35632c Mon Sep 17 00:00:00 2001 From: max Date: Wed, 18 May 2022 14:16:46 +0300 Subject: [PATCH 16/16] place constant near to it's util --- python/gs_netstream/constants.py | 7 ------- python/gs_netstream/sender_utils.py | 5 ++++- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/python/gs_netstream/constants.py b/python/gs_netstream/constants.py index 4912414..1fc5489 100644 --- a/python/gs_netstream/constants.py +++ b/python/gs_netstream/constants.py @@ -203,10 +203,3 @@ themselves. The elements themselves have to give their type. """ TYPE_ARRAY = 0x60 - - -######################### -# Utils constants -######################### -import numpy as np -ENCODING_SIZES = np.array([1 << 7, 1 << 14, 1 << 21, 1 << 28, 1 << 35, 1 << 42, 1 << 49, 1 << 56]) diff --git a/python/gs_netstream/sender_utils.py b/python/gs_netstream/sender_utils.py index 09dde9f..cf270ec 100644 --- a/python/gs_netstream/sender_utils.py +++ b/python/gs_netstream/sender_utils.py @@ -7,6 +7,9 @@ import numpy as np +ENCODING_SIZES = np.array([1 << 7, 1 << 14, 1 << 21, 1 << 28, 1 << 35, 1 << 42, 1 << 49, 1 << 56]) + + def encoding_size(value: int) -> int: """Computes the encoding size of a value.""" dist = (ENCODING_SIZES - value) <= 0 @@ -102,7 +105,7 @@ def encode_byte(value) -> bytearray: def encode_value(value: Any, dtype: int) -> Optional[bytearray]: """Encode a value according to a given data type.""" - encoder = TYPE_TO_ENCODER[dtype] + encoder = TYPE_TO_ENCODER.get(dtype, None) return encoder(value) if encoder is not None else None