Skip to content

Commit c0fd751

Browse files
committed
feat: Add remote session support (ActivitySessionService)
1 parent 1a63985 commit c0fd751

File tree

5 files changed

+391
-9
lines changed

5 files changed

+391
-9
lines changed

src/google/adk/integrations/temporal.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import functools
1818
import inspect
19+
import datetime
1920
from typing import Any, AsyncGenerator, Callable, Optional, List
2021

2122
from temporalio import workflow, activity
@@ -153,3 +154,186 @@ def default_activities(cls) -> List[Callable]:
153154
Useful for registering activities with the Temporal Worker.
154155
"""
155156
return [generate_content_activity]
157+
158+
159+
class SessionServiceActivities:
160+
"""Wraps a BaseSessionService to expose its methods as Temporal Activities."""
161+
162+
def __init__(self, session_service: Any):
163+
# We type hint as Any to avoid circular imports or strict dependency on BaseSessionService
164+
# definition availability at this module level if not necessary,
165+
# but logically it is a BaseSessionService.
166+
self.session_service = session_service
167+
168+
@activity.defn(name="create_session")
169+
async def create_session(
170+
self,
171+
app_name: str,
172+
user_id: str,
173+
state: Optional[dict[str, Any]] = None,
174+
session_id: Optional[str] = None,
175+
extra_kwargs: Optional[dict[str, Any]] = None,
176+
) -> Any:
177+
# We return Any (dict/model) that Temporal can serialize.
178+
# BaseSessionService returns a Session object.
179+
return await self.session_service.create_session(
180+
app_name=app_name,
181+
user_id=user_id,
182+
state=state,
183+
session_id=session_id,
184+
**(extra_kwargs or {})
185+
)
186+
187+
@activity.defn(name="get_session")
188+
async def get_session(
189+
self,
190+
app_name: str,
191+
user_id: str,
192+
session_id: str,
193+
extra_kwargs: Optional[dict[str, Any]] = None,
194+
) -> Optional[Any]:
195+
# Note: 'config' argument in get_session might need Pydantic serialization support
196+
# We assume kwargs handles simple args or properly serialized objects.
197+
return await self.session_service.get_session(
198+
app_name=app_name,
199+
user_id=user_id,
200+
session_id=session_id,
201+
**(extra_kwargs or {})
202+
)
203+
204+
@activity.defn(name="list_sessions")
205+
async def list_sessions(
206+
self,
207+
app_name: str,
208+
user_id: Optional[str] = None
209+
) -> Any:
210+
return await self.session_service.list_sessions(
211+
app_name=app_name,
212+
user_id=user_id
213+
)
214+
215+
@activity.defn(name="delete_session")
216+
async def delete_session(
217+
self,
218+
app_name: str,
219+
user_id: str,
220+
session_id: str
221+
) -> None:
222+
await self.session_service.delete_session(
223+
app_name=app_name,
224+
user_id=user_id,
225+
session_id=session_id
226+
)
227+
228+
@activity.defn(name="append_event")
229+
async def append_event(self, session: Any, event: Any) -> Any:
230+
return await self.session_service.append_event(session, event)
231+
232+
def get_activities(self) -> List[Callable]:
233+
"""Returns the list of activities to register."""
234+
return [
235+
self.create_session,
236+
self.get_session,
237+
self.list_sessions,
238+
self.delete_session,
239+
self.append_event
240+
]
241+
242+
243+
from google.adk.sessions import BaseSessionService, Session
244+
from google.adk.sessions.base_session_service import ListSessionsResponse
245+
from google.adk.events import Event
246+
247+
248+
class ActivitySessionService(BaseSessionService):
249+
"""A SessionService that delegates all calls to Temporal Activities.
250+
251+
This ensures determinism within a Workflow by offloading the actual
252+
session I/O (which might be non-deterministic or remote) to the Worker.
253+
"""
254+
255+
def __init__(
256+
self,
257+
activity_options: Optional[dict[str, Any]] = None
258+
):
259+
"""Initializes the ActivitySessionService.
260+
261+
Args:
262+
activity_options: Default options for activity execution (e.g. timeouts).
263+
Defaults to schedule_to_close_timeout=datetime.timedelta(seconds=30) if not provided.
264+
"""
265+
self.activity_options = activity_options or {
266+
"schedule_to_close_timeout": datetime.timedelta(seconds=30)
267+
}
268+
269+
async def create_session(
270+
self,
271+
*,
272+
app_name: str,
273+
user_id: str,
274+
state: Optional[dict[str, Any]] = None,
275+
session_id: Optional[str] = None,
276+
**kwargs: Any,
277+
) -> Session:
278+
result = await workflow.execute_activity(
279+
"create_session",
280+
args=[app_name, user_id, state, session_id, kwargs],
281+
**self.activity_options
282+
)
283+
if isinstance(result, dict):
284+
return Session.model_validate(result)
285+
return result
286+
287+
async def get_session(
288+
self,
289+
*,
290+
app_name: str,
291+
user_id: str,
292+
session_id: str,
293+
config: Optional[Any] = None,
294+
) -> Optional[Session]:
295+
kwargs = {"config": config} if config else {}
296+
result = await workflow.execute_activity(
297+
"get_session",
298+
args=[app_name, user_id, session_id, kwargs],
299+
**self.activity_options
300+
)
301+
if result and isinstance(result, dict):
302+
return Session.model_validate(result)
303+
return result
304+
305+
async def list_sessions(
306+
self,
307+
*,
308+
app_name: str,
309+
user_id: Optional[str] = None
310+
) -> ListSessionsResponse:
311+
result = await workflow.execute_activity(
312+
"list_sessions",
313+
args=[app_name, user_id],
314+
**self.activity_options
315+
)
316+
if isinstance(result, dict):
317+
return ListSessionsResponse.model_validate(result)
318+
return result
319+
320+
async def delete_session(
321+
self, *, app_name: str, user_id: str, session_id: str
322+
) -> None:
323+
await workflow.execute_activity(
324+
"delete_session",
325+
args=[app_name, user_id, session_id],
326+
**self.activity_options
327+
)
328+
329+
async def append_event(self, session: Session, event: Event) -> Event:
330+
# Note: We might need to serialize session/event to dicts if passing them as args causes issues?
331+
# Usually PydanticConverter handles serialization of args fine.
332+
result = await workflow.execute_activity(
333+
"append_event",
334+
args=[session, event],
335+
**self.activity_options
336+
)
337+
if isinstance(result, dict):
338+
return Event.model_validate(result)
339+
return result

src/google/adk/sessions/in_memory_session_service.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222

2323
from typing_extensions import override
2424

25-
from google.adk import runtime
2625
from . import _session_util
2726
from ..errors.already_exists_error import AlreadyExistsError
2827
from ..events.event import Event
@@ -109,14 +108,14 @@ def _create_session_impl(
109108
session_id = (
110109
session_id.strip()
111110
if session_id and session_id.strip()
112-
else runtime.new_uuid()
111+
else str(uuid.uuid4())
113112
)
114113
session = Session(
115114
app_name=app_name,
116115
user_id=user_id,
117116
id=session_id,
118117
state=session_state or {},
119-
last_update_time=runtime.get_time(),
118+
last_update_time=time.time(),
120119
)
121120

122121
if app_name not in self.sessions:

tests/integration/manual_test_temporal_integration.py

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
from google.adk.sessions import InMemorySessionService
4040
from google.adk.utils.context_utils import Aclosing
4141
from google.adk.events import Event
42-
from google.adk.integrations.temporal import activity_as_tool, TemporalModel, generate_content_activity
42+
from google.adk.integrations.temporal import activity_as_tool, TemporalModel, generate_content_activity, ActivitySessionService, SessionServiceActivities
4343

4444
# Required Environment Variables for this test:
4545
# - GOOGLE_CLOUD_PROJECT
@@ -102,9 +102,13 @@ async def run(self, prompt: str) -> Event | None:
102102
tools=[weather_tool]
103103
)
104104

105-
# 3. Create Session (uses runtime.new_uuid() -> workflow.uuid4())
106-
session_service = InMemorySessionService()
107-
logger.info("Create session.")
105+
# 3. Create Session (uses ActivitySessionService to delegate to Worker)
106+
# This ensures session I/O is deterministic (replay-safe)
107+
session_service = ActivitySessionService()
108+
logger.info("Create session (via Activity).")
109+
# Note: server-generated IDs (or worker-generated) are safe because they come from Activity result.
110+
# But we can also pass a deterministic ID if the service supports it.
111+
# Here we let the service (Activity) handle it.
108112
session = await session_service.create_session(app_name="test_app", user_id="test")
109113

110114
logger.info(f"Session created with ID: {session.id}")
@@ -194,7 +198,8 @@ async def run(self, prompt: str) -> str:
194198
)
195199

196200
# 5. Execute
197-
session_service = InMemorySessionService()
201+
# Use ActivitySessionService to delegate session I/O to activities (replay-safe)
202+
session_service = ActivitySessionService()
198203
session = await session_service.create_session(app_name="multi_agent_app", user_id="user_MULTI")
199204

200205
runner = Runner(
@@ -287,10 +292,15 @@ async def test_temporalio_integration():
287292
except RuntimeError:
288293
pytest.skip("Could not connect to Temporal server. Is it running?")
289294

295+
# Register InMemorySessionService as activities (simulating remote/worker-side service)
296+
# This is key: The state lives in THIS InMemorySessionService instance in the Worker.
297+
real_session_service = InMemorySessionService()
298+
session_activities = SessionServiceActivities(real_session_service)
299+
290300
async with Worker(
291301
client,
292302
workflows=[WeatherAgent, MultiAgentWorkflow],
293-
activities=TemporalModel.default_activities() + [
303+
activities=TemporalModel.default_activities() + session_activities.get_activities() + [
294304
get_weather,
295305
coordinator_think,
296306
tool_agent_think,

0 commit comments

Comments
 (0)