Skip to content

ReplayMarkerStorage with PgSQL not working with a saved replay id #19

@welcomemat-services

Description

@welcomemat-services

I have the below code that is working if the Pgsql table is empty. But if it pulls an existing replay id, it is throwing the below exception.

Thanks for any help provided here.

Code:

class MyReplayMarkerStorage(ReplayMarkerStorage):

	def __init__(self, connection, cursor):
		super().__init__()
		self.connection = connection
		self.cursor = cursor

	async def set_replay_marker(self, subscription: str, replay_marker: ReplayMarker):
		# store *replay_marker* for the given *subscription*
		event = subscription[subscription.rfind('/')+1:]
		self.cursor.execute(f"""
			INSERT INTO platform_event_setting (channel, replay_id, date_str)
			VALUES ('{event}', '{replay_marker.replay_id}', '{replay_marker.date}')
			ON CONFLICT ON CONSTRAINT platform_event_setting_pkey
			DO UPDATE SET (replay_id, date_str) = (EXCLUDED.replay_id, EXCLUDED.date_str)
		""")

		print(f"Upserting replay id {replay_marker.replay_id} for {subscription}")
		self.connection.commit()

	async def get_replay_marker(self, subscription: str):
		# retrieve the replay marker for the given *subscription*
		event = subscription[subscription.rfind('/')+1:]
		self.cursor.execute(f"""
			SELECT replay_id, date_str
			FROM platform_event_setting
			WHERE channel = '{event}'
			LIMIT 1
		""")

		channel = self.cursor.fetchone()
		
		if channel == None:
			print(f"Platform Event Setting does not exist: {subscription}")
			return None

		print(f"{subscription} - {channel['replay_id']} - {channel['date_str']}")
		return ReplayMarker(date=channel['date_str'], replay_id=channel['replay_id'])
# MyReplayMarkerStorage

async def process_events():
	try:
		connection = psycopg2.connect(os.getenv('DATABASE_URL'), sslmode='require')
		cursor = connection.cursor(cursor_factory=psycopg2.extras.RealDictCursor)

		myReplay = MyReplayMarkerStorage(connection, cursor)

		async with SalesforceStreamingClient(
			consumer_key=os.getenv("SF_APP_KEY"),
			consumer_secret=os.getenv("SF_APP_SECRET"),
			username=os.getenv('SF_USERNAME'),
			password=os.getenv('SF_PASSWORD'),
			replay=myReplay,
			replay_fallback=ReplayOption.ALL_EVENTS,
			replay_storage_policy=ReplayMarkerStoragePolicy.MANUAL) as client:

			await client.subscribe("/event/Notification__e")

			async for message in client:
				async with client.replay_storage(message):
					print(message['data']['payload'])

	except (Exception, psycopg2.Error) as e:
		exc_type, exc_value, exc_traceback = sys.exc_info()
		print(f"Exception - {e}")
	finally:
		if cursor:
			cursor.close()
		if connection:
			connection.close()

# process_events

Error log:

DEBUG:asyncio:Using proactor: IocpProactor
Connected to DB
DEBUG:aiosfstream.client:Client created with replay storage: <main.MyReplayMarkerStorage object at 0x0000024E0287CA30>, replay fallback: <ReplayOption.ALL_EVENTS: -2>
DEBUG:aiosfstream.client:Authenticating using PasswordAuthenticator(consumer_key='xxxxx',consumer_secret='yyyyy', username='test.test.com', password='abcde').
INFO:aiosfstream.client:Successful authentication. Instance URL: 'https://test.my.salesforce.com'.
INFO:aiocometd.client:Opening client with connection types ['websocket', 'long-polling'] ...
INFO:aiocometd.client:Connection types supported by the server: ['long-polling']
DEBUG:aiocometd.transports.base:Connect task finished with: {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'advice': {'interval': 0, 'timeout': 110000, 'reconnect': 'retry'}, 'channel': '/meta/connect', 'id': '1', 'successful': True}
INFO:aiocometd.client:Client opened with connection_type 'long-polling'
/event/Notification__e - 7785481 - 2022-02-10T23:23:49.572Z
INFO:aiocometd.client:Closing client...
ERROR:asyncio:Exception in callback TransportBase._connect_done(<Task cancell...\utils.py:22>>)
handle: <Handle TransportBase._connect_done(<Task cancell...\utils.py:22>>)>
Traceback (most recent call last):
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\utils.py", line 27, in wrapper
return await coro_func(*args, **kwargs)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\transports\base.py", line 524, in _connect
result = await self._send_payload_with_auth(payload)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\transports\base.py", line 323, in _send_payload_with_auth
await self._auth.authenticate()
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\auth.py", line 96, in authenticate
status_code, response_data = await self._authenticate()
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\auth.py", line 173, in _authenticate
response = await session.post(self._token_url, data=data)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\client.py", line 535, in _request
conn = await self._connector.connect(
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 542, in connect
proto = await self._create_connection(req, traces, timeout)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 907, in _create_connection
_, proto = await self._create_direct_connection(req, traces, timeout)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiohttp\connector.py", line 1154, in _create_direct_connection
hosts = await asyncio.shield(host_resolved)
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

self._raise_server_error(response)

File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiocometd\client.py", line 385, in _raise_server_error
raise ServerError(message, response)
aiocometd.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'channel': '/meta/subscribe', 'id': '2', 'subscription': '/event/Notification__e', 'error': 'Failed to create an internal subscription!', 'successful': False})

The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "c:\code\Python\scheduled_processes\processor.py", line 70, in process_events
await client.subscribe("/event/Notification__e")
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\exceptions.py", line 143, in async_wrapper
return await func(*args, **kwargs)
File "C:\Users\test\AppData\Local\Programs\Python\Python39\lib\contextlib.py", line 137, in exit
self.gen.throw(typ, value, traceback)
File "C:\Users\test.virtualenvs\scheduled_processes-vrvgpOGN\lib\site-packages\aiosfstream\exceptions.py", line 123, in translate_errors_context
raise error_cls(*cometd_error.args) from cometd_error
aiosfstream.exceptions.ServerError: ('Subscribe request failed.', {'clientId': 'kvl2mi3sd9hs6ei1m3vnsilh4nb9', 'channel': '/meta/subscribe', 'id': '2', 'subscription': '/event/Notification__e', 'error': 'Failed to create an internal subscription!', 'successful': False})

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions