Skip to content

Commit 0d16430

Browse files
committed
[ECO-5456] feat: add VCDiff support for delta message decoding
- Introduced `VCDiffDecoder` abstract class and `VCDiffPlugin` implementation. - Enhanced delta message processing with proper support for VCDiff decoding. - Updated `Options` to accept a `vcdiff_decoder`. - Handle delta failures with recovery mechanisms (RTL18-RTL20 compliance).
1 parent 5bec2a7 commit 0d16430

File tree

10 files changed

+424
-15
lines changed

10 files changed

+424
-15
lines changed

ably/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,10 @@
55
from ably.types.capability import Capability
66
from ably.types.channelsubscription import PushChannelSubscription
77
from ably.types.device import DeviceDetails
8-
from ably.types.options import Options
8+
from ably.types.options import Options, VCDiffDecoder
99
from ably.util.crypto import CipherParams
1010
from ably.util.exceptions import AblyException, AblyAuthException, IncompatibleClientIdException
11+
from ably.vcdiff_plugin import VCDiffPlugin
1112

1213
import logging
1314

ably/realtime/realtime_channel.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
from __future__ import annotations
2+
23
import asyncio
34
import logging
45
from typing import Optional, TYPE_CHECKING, Dict, Any, Union
56
from ably.realtime.connection import ConnectionState
6-
from ably.transport.websockettransport import ProtocolMessageAction
77
from ably.rest.channel import Channel, Channels as RestChannels
8+
from ably.transport.websockettransport import ProtocolMessageAction
89
from ably.types.channelstate import ChannelState, ChannelStateChange
910
from ably.types.flags import Flag, has_flag
1011
from ably.types.message import Message
12+
from ably.types.mixins import DecodingContext
1113
from ably.util.eventemitter import EventEmitter
1214
from ably.util.exceptions import AblyException
1315
from ably.util.helper import Timer, is_callable_or_coroutine
@@ -134,6 +136,10 @@ def __init__(self, realtime: AblyRealtime, name: str, channel_options: Optional[
134136
self.__error_reason: Optional[AblyException] = None
135137
self.__channel_options = channel_options or ChannelOptions()
136138

139+
# Delta-specific fields for RTL19/RTL20 compliance
140+
vcdiff_decoder = self.__realtime.options.vcdiff_decoder if self.__realtime.options.vcdiff_decoder else None
141+
self.__decoding_context = DecodingContext(vcdiff_decoder=vcdiff_decoder)
142+
137143
# Used to listen to state changes internally, if we use the public event emitter interface then internals
138144
# will be disrupted if the user called .off() to remove all listeners
139145
self.__internal_state_emitter = EventEmitter()
@@ -420,8 +426,16 @@ def _on_message(self, proto_msg: dict) -> None:
420426
else:
421427
self._request_state(ChannelState.ATTACHING)
422428
elif action == ProtocolMessageAction.MESSAGE:
423-
messages = Message.from_encoded_array(proto_msg.get('messages'))
424-
self.__channel_serial = channel_serial
429+
messages = []
430+
try:
431+
messages = Message.from_encoded_array(proto_msg.get('messages'), context=self.__decoding_context)
432+
self.__decoding_context.last_message_id = messages[-1].id
433+
self.__channel_serial = channel_serial
434+
except AblyException as e:
435+
if e.code == 40018: # Delta decode failure - start recovery
436+
self._start_decode_failure_recovery(e)
437+
else:
438+
log.error(f"Message processing error {e}. Skip messages {proto_msg.get('messages')}")
425439
for message in messages:
426440
self.__message_emitter._emit(message.name, message)
427441
elif action == ProtocolMessageAction.ERROR:
@@ -553,6 +567,18 @@ def error_reason(self) -> Optional[AblyException]:
553567
"""An AblyException instance describing the last error which occurred on the channel, if any."""
554568
return self.__error_reason
555569

570+
def _start_decode_failure_recovery(self, error: AblyException) -> None:
571+
"""Start RTL18 decode failure recovery procedure"""
572+
573+
# RTL18a: Log error with code 40018
574+
log.error(f'VCDiff decode failure: {error}')
575+
576+
# RTL18b: Message is already discarded by not processing it
577+
578+
# RTL18c: Send ATTACH with previous channel serial and transition to ATTACHING
579+
self._notify_state(ChannelState.ATTACHING, reason=error)
580+
self._check_pending_state()
581+
556582

557583
class Channels(RestChannels):
558584
"""Creates and destroys RealtimeChannel objects.

ably/types/message.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import logging
44

55
from ably.types.typedbuffer import TypedBuffer
6-
from ably.types.mixins import EncodeDataMixin
6+
from ably.types.mixins import EncodeDataMixin, DeltaExtras
77
from ably.util.crypto import CipherData
88
from ably.util.exceptions import AblyException
99

@@ -178,7 +178,7 @@ def as_dict(self, binary=False):
178178
return request_body
179179

180180
@staticmethod
181-
def from_encoded(obj, cipher=None):
181+
def from_encoded(obj, cipher=None, context=None):
182182
id = obj.get('id')
183183
name = obj.get('name')
184184
data = obj.get('data')
@@ -188,7 +188,12 @@ def from_encoded(obj, cipher=None):
188188
encoding = obj.get('encoding', '')
189189
extras = obj.get('extras', None)
190190

191-
decoded_data = Message.decode(data, encoding, cipher)
191+
delta_extra = DeltaExtras(extras)
192+
if delta_extra.from_id and delta_extra.from_id != context.last_message_id:
193+
raise AblyException(f"Delta message decode failure - previous message not available. "
194+
f"Message id = {id}", 400, 40018)
195+
196+
decoded_data = Message.decode(data, encoding, cipher, context)
192197

193198
return Message(
194199
id=id,

ably/types/mixins.py

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,29 @@
33
import logging
44

55
from ably.util.crypto import CipherData
6+
from ably.util.exceptions import AblyException
67

78

89
log = logging.getLogger(__name__)
910

11+
ENC_VCDIFF = "vcdiff"
12+
13+
14+
class DeltaExtras:
15+
def __init__(self, extras):
16+
self.from_id = None
17+
if extras and 'delta' in extras:
18+
delta_info = extras['delta']
19+
if isinstance(delta_info, dict):
20+
self.from_id = delta_info.get('from')
21+
22+
23+
class DecodingContext:
24+
def __init__(self, base_payload=None, last_message_id=None, vcdiff_decoder=None):
25+
self.base_payload = base_payload
26+
self.last_message_id = last_message_id
27+
self.vcdiff_decoder = vcdiff_decoder
28+
1029

1130
class EncodeDataMixin:
1231

@@ -25,10 +44,12 @@ def encoding(self, encoding):
2544
self._encoding_array = encoding.strip('/').split('/')
2645

2746
@staticmethod
28-
def decode(data, encoding='', cipher=None):
47+
def decode(data, encoding='', cipher=None, context=None):
2948
encoding = encoding.strip('/')
3049
encoding_list = encoding.split('/')
3150

51+
last_payload = data
52+
3253
while encoding_list:
3354
encoding = encoding_list.pop()
3455
if not encoding:
@@ -46,10 +67,43 @@ def decode(data, encoding='', cipher=None):
4667
if isinstance(data, list) or isinstance(data, dict):
4768
continue
4869
data = json.loads(data)
49-
elif encoding == 'base64' and isinstance(data, bytes):
50-
data = bytearray(base64.b64decode(data))
5170
elif encoding == 'base64':
52-
data = bytearray(base64.b64decode(data.encode('utf-8')))
71+
data = bytearray(base64.b64decode(data)) if isinstance(data, bytes) \
72+
else bytearray(base64.b64decode(data.encode('utf-8')))
73+
if not encoding_list:
74+
last_payload = data
75+
elif encoding == ENC_VCDIFF:
76+
if not context or not context.vcdiff_decoder:
77+
log.error('Message cannot be decoded as no VCDiff decoder available')
78+
raise AblyException('VCDiff decoder not available', 40019, 40019)
79+
80+
if not context.base_payload:
81+
log.error('VCDiff decoding requires base payload')
82+
raise AblyException('VCDiff decode failure', 40018, 40018)
83+
84+
try:
85+
# Convert base payload to bytes if it's a string
86+
base_data = context.base_payload
87+
if isinstance(base_data, str):
88+
base_data = base_data.encode('utf-8')
89+
else:
90+
base_data = bytes(base_data)
91+
92+
# Convert delta to bytes if needed
93+
delta_data = data
94+
if isinstance(delta_data, (bytes, bytearray)):
95+
delta_data = bytes(delta_data)
96+
else:
97+
delta_data = str(delta_data).encode('utf-8')
98+
99+
# Decode with VCDiff
100+
data = bytearray(context.vcdiff_decoder.decode(delta_data, base_data))
101+
last_payload = data
102+
103+
except Exception as e:
104+
log.error(f'VCDiff decode failed: {e}')
105+
raise AblyException('VCDiff decode failure', 40018, 40018)
106+
53107
elif encoding.startswith('%s+' % CipherData.ENCODING_ID):
54108
if not cipher:
55109
log.error('Message cannot be decrypted as the channel is '
@@ -67,9 +121,11 @@ def decode(data, encoding='', cipher=None):
67121
encoding_list.append(encoding)
68122
break
69123

124+
if context:
125+
context.base_payload = last_payload
70126
encoding = '/'.join(encoding_list)
71127
return {'encoding': encoding, 'data': data}
72128

73129
@classmethod
74-
def from_encoded_array(cls, objs, cipher=None):
75-
return [cls.from_encoded(obj, cipher=cipher) for obj in objs]
130+
def from_encoded_array(cls, objs, cipher=None, context=None):
131+
return [cls.from_encoded(obj, cipher=cipher, context=context) for obj in objs]

ably/types/options.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,28 @@
11
import random
22
import logging
3+
from abc import ABC, abstractmethod
34

45
from ably.transport.defaults import Defaults
56
from ably.types.authoptions import AuthOptions
67

78
log = logging.getLogger(__name__)
89

910

11+
class VCDiffDecoder(ABC):
12+
@abstractmethod
13+
def decode(self, delta: bytes, base: bytes) -> bytes:
14+
pass
15+
16+
1017
class Options(AuthOptions):
1118
def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realtime_host=None, port=0,
1219
tls_port=0, use_binary_protocol=True, queue_messages=False, recover=False, environment=None,
1320
http_open_timeout=None, http_request_timeout=None, realtime_request_timeout=None,
1421
http_max_retry_count=None, http_max_retry_duration=None, fallback_hosts=None,
1522
fallback_retry_timeout=None, disconnected_retry_timeout=None, idempotent_rest_publishing=None,
1623
loop=None, auto_connect=True, suspended_retry_timeout=None, connectivity_check_url=None,
17-
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False, **kwargs):
24+
channel_retry_timeout=Defaults.channel_retry_timeout, add_request_ids=False,
25+
vcdiff_decoder=None, **kwargs):
1826

1927
super().__init__(**kwargs)
2028

@@ -77,6 +85,7 @@ def __init__(self, client_id=None, log_level=0, tls=True, rest_host=None, realti
7785
self.__connectivity_check_url = connectivity_check_url
7886
self.__fallback_realtime_host = None
7987
self.__add_request_ids = add_request_ids
88+
self.__vcdiff_decoder = vcdiff_decoder
8089

8190
self.__rest_hosts = self.__get_rest_hosts()
8291
self.__realtime_hosts = self.__get_realtime_hosts()
@@ -259,6 +268,10 @@ def fallback_realtime_host(self, value):
259268
def add_request_ids(self):
260269
return self.__add_request_ids
261270

271+
@property
272+
def vcdiff_decoder(self):
273+
return self.__vcdiff_decoder
274+
262275
def __get_rest_hosts(self):
263276
"""
264277
Return the list of hosts as they should be tried. First comes the main

ably/vcdiff_plugin.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""
2+
VCDiff Plugin for Ably Python SDK
3+
4+
This module provides a production-ready VCDiff decoder plugin using the vcdiff library.
5+
It implements the VCDiffDecoder interface.
6+
7+
Usage:
8+
from ably import VCDiffPlugin, AblyRealtime
9+
10+
# Create VCDiff plugin
11+
plugin = VCDiffPlugin()
12+
13+
# Create client with plugin
14+
client = AblyRealtime(key="your-key", vcdiff_decoder=plugin)
15+
16+
# Get channel with delta enabled
17+
channel = client.channels.get("test", {"delta": "vcdiff"})
18+
"""
19+
20+
import logging
21+
22+
from ably.types.options import VCDiffDecoder
23+
from ably.util.exceptions import AblyException
24+
25+
log = logging.getLogger(__name__)
26+
27+
28+
class VCDiffPlugin(VCDiffDecoder):
29+
"""
30+
Production VCDiff decoder plugin using Ably's vcdiff library.
31+
32+
Raises:
33+
ImportError: If vcdiff is not installed
34+
AblyException: If VCDiff decoding fails
35+
"""
36+
37+
def __init__(self):
38+
"""Initialize the VCDiff plugin.
39+
40+
Raises:
41+
ImportError: If vcdiff library is not available
42+
"""
43+
try:
44+
import vcdiff
45+
self._vcdiff = vcdiff
46+
except ImportError as e:
47+
log.error("vcdiff library not found. Install with: pip install ably[vcdiff]")
48+
raise ImportError(
49+
"VCDiff plugin requires vcdiff library. "
50+
"Install with: pip install ably[vcdiff]"
51+
) from e
52+
53+
def decode(self, delta: bytes, base: bytes) -> bytes:
54+
"""
55+
Decode a VCDiff delta against a base payload.
56+
57+
Args:
58+
delta: The VCDiff-encoded delta data
59+
base: The base payload to apply the delta to
60+
61+
Returns:
62+
bytes: The decoded message payload
63+
64+
Raises:
65+
AblyException: If VCDiff decoding fails (error code 40018)
66+
"""
67+
if not isinstance(delta, bytes):
68+
raise TypeError("Delta must be bytes")
69+
if not isinstance(base, bytes):
70+
raise TypeError("Base must be bytes")
71+
72+
try:
73+
# Use the vcdiff library to decode
74+
result = self._vcdiff.decode(base, delta)
75+
return result
76+
except Exception as e:
77+
log.error(f"VCDiff decode failed: {e}")
78+
raise AblyException(f"VCDiff decode failure: {e}", 40018, 40018) from e
79+
80+
81+
# Export for easy importing
82+
__all__ = ['VCDiffPlugin']

poetry.lock

Lines changed: 18 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)