Skip to content

Commit 7a62cd3

Browse files
isapegoivandasch
andauthored
GG-32996 [IGNITE-14472] Multiple performance improvements (#38)
(cherry picked from commit e48f4be) Co-authored-by: Ivan Dashchinskiy <ivandasch@gmail.com>
1 parent 0728459 commit 7a62cd3

File tree

9 files changed

+159
-115
lines changed

9 files changed

+159
-115
lines changed

pygridgain/binary.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ def write_footer(obj, stream, header, header_class, schema_items, offsets, initi
201201
stream.write(schema)
202202

203203
if save_to_buf:
204-
obj._buffer = bytes(stream.mem_view(initial_pos, stream.tell() - initial_pos))
204+
obj._buffer = stream.slice(initial_pos, stream.tell() - initial_pos)
205205
obj._hashcode = header.hash_code
206206

207207
def _setattr(self, attr_name: str, attr_value: Any):

pygridgain/connection/aio_connection.py

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ async def _connect_version(self) -> Union[dict, OrderedDict]:
149149

150150
with AioBinaryStream(self.client) as stream:
151151
await hs_request.from_python_async(stream)
152-
await self._send(stream.getbuffer(), reconnect=False)
152+
await self._send(stream.getvalue(), reconnect=False)
153153

154154
with AioBinaryStream(self.client, await self._recv(reconnect=False)) as stream:
155155
hs_response = await HandshakeResponse.parse_async(stream, self.protocol_context)
@@ -176,7 +176,7 @@ async def _reconnect(self):
176176
except connection_errors:
177177
pass
178178

179-
async def request(self, data: Union[bytes, bytearray, memoryview]) -> bytearray:
179+
async def request(self, data: Union[bytes, bytearray]) -> bytearray:
180180
"""
181181
Perform request.
182182
@@ -186,7 +186,7 @@ async def request(self, data: Union[bytes, bytearray, memoryview]) -> bytearray:
186186
await self._send(data)
187187
return await self._recv()
188188

189-
async def _send(self, data: Union[bytes, bytearray, memoryview], reconnect=True):
189+
async def _send(self, data: Union[bytes, bytearray], reconnect=True):
190190
if self.closed:
191191
raise SocketError('Attempt to use closed connection.')
192192

@@ -203,21 +203,43 @@ async def _recv(self, reconnect=True) -> bytearray:
203203
if self.closed:
204204
raise SocketError('Attempt to use closed connection.')
205205

206-
with BytesIO() as stream:
206+
data = bytearray(1024)
207+
buffer = memoryview(data)
208+
bytes_total_received, bytes_to_receive = 0, 0
209+
while True:
207210
try:
208-
buf = await self._reader.readexactly(4)
209-
response_len = int.from_bytes(buf, PROTOCOL_BYTE_ORDER)
211+
chunk = await self._reader.read(len(buffer))
212+
bytes_received = len(chunk)
213+
if bytes_received == 0:
214+
raise SocketError('Connection broken.')
210215

211-
stream.write(buf)
212-
213-
stream.write(await self._reader.readexactly(response_len))
216+
buffer[0:bytes_received] = chunk
217+
bytes_total_received += bytes_received
214218
except connection_errors:
215219
self.failed = True
216220
if reconnect:
217221
await self._reconnect()
218222
raise
219223

220-
return bytearray(stream.getbuffer())
224+
if bytes_total_received < 4:
225+
continue
226+
elif bytes_to_receive == 0:
227+
response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
228+
bytes_to_receive = response_len
229+
230+
if response_len + 4 > len(data):
231+
buffer.release()
232+
data.extend(bytearray(response_len + 4 - len(data)))
233+
buffer = memoryview(data)[bytes_total_received:]
234+
continue
235+
236+
if bytes_total_received >= bytes_to_receive:
237+
buffer.release()
238+
break
239+
240+
buffer = buffer[bytes_received:]
241+
242+
return data
221243

222244
async def close(self):
223245
async with self._mux:

pygridgain/connection/connection.py

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def _connect_version(self) -> Union[dict, OrderedDict]:
218218

219219
with BinaryStream(self.client) as stream:
220220
hs_request.from_python(stream)
221-
self.send(stream.getbuffer(), reconnect=False)
221+
self.send(stream.getvalue(), reconnect=False)
222222

223223
with BinaryStream(self.client, self.recv(reconnect=False)) as stream:
224224
hs_response = HandshakeResponse.parse(stream, self.protocol_context)
@@ -241,7 +241,7 @@ def reconnect(self):
241241
except connection_errors:
242242
pass
243243

244-
def request(self, data: Union[bytes, bytearray, memoryview], flags=None) -> bytearray:
244+
def request(self, data: Union[bytes, bytearray], flags=None) -> bytearray:
245245
"""
246246
Perform request.
247247
@@ -251,7 +251,7 @@ def request(self, data: Union[bytes, bytearray, memoryview], flags=None) -> byte
251251
self.send(data, flags=flags)
252252
return self.recv()
253253

254-
def send(self, data: Union[bytes, bytearray, memoryview], flags=None, reconnect=True):
254+
def send(self, data: Union[bytes, bytearray], flags=None, reconnect=True):
255255
"""
256256
Send data down the socket.
257257
@@ -281,35 +281,46 @@ def recv(self, flags=None, reconnect=True) -> bytearray:
281281
:param flags: (optional) OS-specific flags.
282282
:param reconnect: (optional) reconnect on failure, default True.
283283
"""
284-
def _recv(buffer, num_bytes):
285-
bytes_to_receive = num_bytes
286-
while bytes_to_receive > 0:
287-
try:
288-
bytes_rcvd = self._socket.recv_into(buffer, bytes_to_receive, **kwargs)
289-
if bytes_rcvd == 0:
290-
raise SocketError('Connection broken.')
291-
except connection_errors:
292-
self.failed = True
293-
if reconnect:
294-
self.reconnect()
295-
raise
296-
297-
buffer = buffer[bytes_rcvd:]
298-
bytes_to_receive -= bytes_rcvd
299-
300284
if self.closed:
301285
raise SocketError('Attempt to use closed connection.')
302286

303287
kwargs = {}
304288
if flags is not None:
305289
kwargs['flags'] = flags
306290

307-
data = bytearray(4)
308-
_recv(memoryview(data), 4)
309-
response_len = int.from_bytes(data, PROTOCOL_BYTE_ORDER)
291+
data = bytearray(1024)
292+
buffer = memoryview(data)
293+
bytes_total_received, bytes_to_receive = 0, 0
294+
while True:
295+
try:
296+
bytes_received = self._socket.recv_into(buffer, len(buffer), **kwargs)
297+
if bytes_received == 0:
298+
raise SocketError('Connection broken.')
299+
bytes_total_received += bytes_received
300+
except connection_errors:
301+
self.failed = True
302+
if reconnect:
303+
self.reconnect()
304+
raise
305+
306+
if bytes_total_received < 4:
307+
continue
308+
elif bytes_to_receive == 0:
309+
response_len = int.from_bytes(data[0:4], PROTOCOL_BYTE_ORDER)
310+
bytes_to_receive = response_len
311+
312+
if response_len + 4 > len(data):
313+
buffer.release()
314+
data.extend(bytearray(response_len + 4 - len(data)))
315+
buffer = memoryview(data)[bytes_total_received:]
316+
continue
317+
318+
if bytes_total_received >= bytes_to_receive:
319+
buffer.release()
320+
break
321+
322+
buffer = buffer[bytes_received:]
310323

311-
data.extend(bytearray(response_len))
312-
_recv(memoryview(data)[4:], response_len)
313324
return data
314325

315326
def close(self):

pygridgain/datatypes/internal.py

Lines changed: 31 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@
3737
from ..stream import READ_BACKWARD
3838

3939

40-
def tc_map(key: bytes, _memo_map: dict = {}):
40+
_tc_map = {}
41+
42+
43+
def tc_map(key: bytes):
4144
"""
4245
Returns a default parser/generator class for the given type code.
4346
@@ -50,7 +53,8 @@ def tc_map(key: bytes, _memo_map: dict = {}):
5053
of the “type code-type class” mapping,
5154
:return: parser/generator class for the type code.
5255
"""
53-
if not _memo_map:
56+
global _tc_map
57+
if not _tc_map:
5458
from pygridgain.datatypes import (
5559
Null, ByteObject, ShortObject, IntObject, LongObject, FloatObject,
5660
DoubleObject, CharObject, BoolObject, UUIDObject, DateObject,
@@ -65,7 +69,7 @@ def tc_map(key: bytes, _memo_map: dict = {}):
6569
MapObject, BinaryObject, WrappedDataObject,
6670
)
6771

68-
_memo_map = {
72+
_tc_map = {
6973
TC_NULL: Null,
7074

7175
TC_BYTE: ByteObject,
@@ -111,7 +115,7 @@ def tc_map(key: bytes, _memo_map: dict = {}):
111115
TC_COMPLEX_OBJECT: BinaryObject,
112116
TC_ARRAY_WRAPPED_OBJECTS: WrappedDataObject,
113117
}
114-
return _memo_map[key]
118+
return _tc_map[key]
115119

116120

117121
class Conditional:
@@ -184,7 +188,7 @@ async def parse_async(self, stream):
184188
def __parse_length(self, stream):
185189
counter_type_len = ctypes.sizeof(self.counter_type)
186190
length = int.from_bytes(
187-
stream.mem_view(offset=counter_type_len),
191+
stream.slice(offset=counter_type_len),
188192
byteorder=PROTOCOL_BYTE_ORDER
189193
)
190194
stream.seek(counter_type_len, SEEK_CUR)
@@ -349,6 +353,9 @@ class AnyDataObject:
349353
"""
350354
_python_map = None
351355
_python_array_map = None
356+
_map_obj_type = None
357+
_collection_obj_type = None
358+
_binary_obj_type = None
352359

353360
@staticmethod
354361
def get_subtype(iterable, allow_none=False):
@@ -392,7 +399,7 @@ async def parse_async(cls, stream):
392399

393400
@classmethod
394401
def __data_class_parse(cls, stream):
395-
type_code = bytes(stream.mem_view(offset=ctypes.sizeof(ctypes.c_byte)))
402+
type_code = stream.slice(offset=ctypes.sizeof(ctypes.c_byte))
396403
try:
397404
return tc_map(type_code)
398405
except KeyError:
@@ -417,15 +424,17 @@ def __data_class_from_ctype(cls, ctype_object):
417424
return tc_map(type_code)
418425

419426
@classmethod
420-
def _init_python_map(cls):
427+
def _init_python_mapping(cls):
421428
"""
422429
Optimizes Python types→GridGain types map creation for speed.
423430
424431
Local imports seem inevitable here.
425432
"""
426433
from pygridgain.datatypes import (
427-
LongObject, DoubleObject, String, BoolObject, Null, UUIDObject,
428-
DateObject, TimeObject, DecimalObject, ByteArrayObject,
434+
LongObject, DoubleObject, String, BoolObject, Null, UUIDObject, DateObject, TimeObject,
435+
DecimalObject, ByteArrayObject, LongArrayObject, DoubleArrayObject, StringArrayObject,
436+
BoolArrayObject, UUIDArrayObject, DateArrayObject, TimeArrayObject, DecimalArrayObject,
437+
MapObject, CollectionObject, BinaryObject
429438
)
430439

431440
cls._python_map = {
@@ -443,17 +452,6 @@ def _init_python_map(cls):
443452
decimal.Decimal: DecimalObject,
444453
}
445454

446-
@classmethod
447-
def _init_python_array_map(cls):
448-
"""
449-
Optimizes Python types→GridGain array types map creation for speed.
450-
"""
451-
from pygridgain.datatypes import (
452-
LongArrayObject, DoubleArrayObject, StringArrayObject,
453-
BoolArrayObject, UUIDArrayObject, DateArrayObject, TimeArrayObject,
454-
DecimalArrayObject,
455-
)
456-
457455
cls._python_array_map = {
458456
int: LongArrayObject,
459457
float: DoubleArrayObject,
@@ -467,18 +465,20 @@ def _init_python_array_map(cls):
467465
decimal.Decimal: DecimalArrayObject,
468466
}
469467

468+
cls._map_obj_type = MapObject
469+
cls._collection_obj_type = CollectionObject
470+
cls._binary_obj_type = BinaryObject
471+
470472
@classmethod
471473
def map_python_type(cls, value):
472-
from pygridgain.datatypes import (
473-
MapObject, CollectionObject, BinaryObject,
474-
)
475-
476-
if cls._python_map is None:
477-
cls._init_python_map()
478-
if cls._python_array_map is None:
479-
cls._init_python_array_map()
474+
if cls._python_map is None or cls._python_array_map is None:
475+
cls._init_python_mapping()
480476

481477
value_type = type(value)
478+
479+
if value_type in cls._python_map:
480+
return cls._python_map[value_type]
481+
482482
if is_iterable(value) and value_type not in (str, bytearray, bytes):
483483
value_subtype = cls.get_subtype(value)
484484
if value_subtype in cls._python_array_map:
@@ -491,15 +491,15 @@ def map_python_type(cls, value):
491491
isinstance(value[0], int),
492492
isinstance(value[1], dict),
493493
]):
494-
return MapObject
494+
return cls._map_obj_type
495495

496496
if all([
497497
value_subtype is None,
498498
len(value) == 2,
499499
isinstance(value[0], int),
500500
is_iterable(value[1]),
501501
]):
502-
return CollectionObject
502+
return cls._collection_obj_type
503503

504504
# no default for ObjectArrayObject, sorry
505505

@@ -508,10 +508,8 @@ def map_python_type(cls, value):
508508
)
509509

510510
if is_binary(value):
511-
return BinaryObject
511+
return cls._binary_obj_type
512512

513-
if value_type in cls._python_map:
514-
return cls._python_map[value_type]
515513
raise TypeError(
516514
'Type `{}` is invalid.'.format(value_type)
517515
)

pygridgain/datatypes/null_object.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ async def to_python_async(cls, ctypes_object, *args, **kwargs):
140140
def __check_null_input(cls, stream):
141141
type_len = ctypes.sizeof(ctypes.c_byte)
142142

143-
if stream.mem_view(offset=type_len) == TC_NULL:
143+
if stream.slice(offset=type_len) == TC_NULL:
144144
stream.seek(type_len, SEEK_CUR)
145145
return True, Null.build_c_type()
146146

pygridgain/datatypes/standard.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def build_c_type(cls, length: int):
9191
@classmethod
9292
def parse_not_null(cls, stream):
9393
length = int.from_bytes(
94-
stream.mem_view(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
94+
stream.slice(stream.tell() + ctypes.sizeof(ctypes.c_byte), ctypes.sizeof(ctypes.c_int)),
9595
byteorder=PROTOCOL_BYTE_ORDER
9696
)
9797

pygridgain/queries/query.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ def perform(
123123
"""
124124
with BinaryStream(conn.client) as stream:
125125
self.from_python(stream, query_params)
126-
response_data = conn.request(stream.getbuffer())
126+
response_data = conn.request(stream.getvalue())
127127

128128
response_struct = self.response_type(protocol_context=conn.protocol_context,
129129
following=response_config, **kwargs)
@@ -155,7 +155,7 @@ async def perform_async(
155155
"""
156156
with AioBinaryStream(conn.client) as stream:
157157
await self.from_python_async(stream, query_params)
158-
data = await conn.request(stream.getbuffer())
158+
data = await conn.request(stream.getvalue())
159159

160160
response_struct = self.response_type(protocol_context=conn.protocol_context,
161161
following=response_config, **kwargs)

0 commit comments

Comments
 (0)