Skip to content

Commit 177e283

Browse files
committed
Update Dependant Code
This change updates all of the Dynamo usages of nixl_connect to adopt the changes in the previous commit. - Removal of `Connector.initialize()` calls. - Addition of the `await` keyword to `Connector.create_readable()` and `.create_writable()` calls. Signed-off-by: J Wyman <jwyman@nvidia.com>
1 parent 2dfba0d commit 177e283

File tree

8 files changed

+5
-14
lines changed

8 files changed

+5
-14
lines changed

components/src/dynamo/sglang/request_handlers/multimodal/encode_worker_handler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ async def generate(
159159
# Create descriptor for the multimodal data
160160
descriptor = connect.Descriptor(precomputed_embeddings)
161161

162-
with self._connector.create_readable(descriptor) as readable:
162+
with await self._connector.create_readable(descriptor) as readable:
163163
request.serialized_request = readable.metadata()
164164

165165
logger.debug(f"Request: {request.model_dump_json()}")
@@ -184,6 +184,5 @@ async def async_init(self, runtime: DistributedRuntime):
184184
# Create and initialize a dynamo connector for this worker.
185185
# We'll needs this to move data between this worker and remote workers efficiently.
186186
self._connector = connect.Connector()
187-
await self._connector.initialize()
188187

189188
logger.info("Startup completed.")

components/src/dynamo/sglang/request_handlers/multimodal/worker_handler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,6 @@ def __init__(self):
7777
async def initialize(self):
7878
"""Initialize the connector for embeddings processing"""
7979
self._connector = connect.Connector()
80-
await self._connector.initialize()
8180

8281
async def process_embeddings(self, request: SglangMultimodalRequest):
8382
"""Process embeddings from serialized request"""
@@ -103,7 +102,6 @@ async def process_embeddings(self, request: SglangMultimodalRequest):
103102
"Connector is None - this should not happen after initialization"
104103
)
105104
self._connector = connect.Connector()
106-
await self._connector.initialize()
107105

108106
read_op = await self._connector.begin_read(
109107
request.serialized_request, descriptor

components/src/dynamo/trtllm/encode_helper.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ async def process_embedding_request(
241241

242242
# Create readable operation with main embeddings tensor (works for both formats)
243243
descriptor = nixl_connect.Descriptor(encodings)
244-
with connector.create_readable(descriptor) as readable_op:
244+
with await connector.create_readable(descriptor) as readable_op:
245245
# Get the metadata for the readable operation
246246
op_metadata = readable_op.metadata()
247247

components/src/dynamo/trtllm/main.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,6 @@ async def init(runtime: DistributedRuntime, config: Config):
279279
connector = None
280280
logging.info("Initializing NIXL Connect.")
281281
connector = nixl_connect.Connector()
282-
await connector.initialize()
283282

284283
dump_config(
285284
config.dump_config_to, {"engine_args": engine_args, "dynamo_args": config}

components/src/dynamo/vllm/multimodal_handlers/encode_worker_handler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,6 @@ async def async_init(self, runtime: DistributedRuntime):
6969
# Create and initialize a dynamo connector for this worker.
7070
# We'll needs this to move data between this worker and remote workers efficiently.
7171
self._connector = connect.Connector()
72-
await self._connector.initialize()
7372
logger.info("Encode worker startup completed.")
7473

7574
async def generate(
@@ -130,7 +129,7 @@ async def generate(
130129
request.embeddings_shape = tuple(embeddings.shape)
131130
descriptor = connect.Descriptor(embeddings_cpu)
132131

133-
with self._connector.create_readable(descriptor) as readable:
132+
with await self._connector.create_readable(descriptor) as readable:
134133
request.serialized_request = readable.metadata()
135134
# Clear the image URL as hint that the image is passed as embeddings.
136135
request.multimodal_input.image_url = None

components/src/dynamo/vllm/multimodal_handlers/worker_handler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ def __init__(
4646
async def async_init(self, runtime: DistributedRuntime):
4747
"""Async initialization - connector needs async setup"""
4848
self._connector = connect.Connector()
49-
await self._connector.initialize()
5049
logger.info("Multimodal Decode Worker async initialization completed.")
5150

5251
async def generate(self, request: vLLMMultimodalRequest, context):
@@ -126,7 +125,6 @@ async def async_init(self, runtime: DistributedRuntime):
126125
"""Async initialization for connector that requires async setup"""
127126
# Initialize the connector asynchronously
128127
self._connector = connect.Connector()
129-
await self._connector.initialize()
130128
logger.info("Multimodal PD Worker async initialization completed.")
131129

132130
async def generate(self, request: vLLMMultimodalRequest, context):

examples/multimodal/components/encode_worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def generate(
125125
request.embeddings_shape = tuple(embeddings.shape)
126126
descriptor = connect.Descriptor(embeddings)
127127

128-
with self._connector.create_readable(descriptor) as readable:
128+
with await self._connector.create_readable(descriptor) as readable:
129129
request.serialized_request = readable.metadata()
130130
# Clear the image URL as hint that the image is passed as embeddings.
131131
request.multimodal_input.image_url = None
@@ -158,7 +158,6 @@ async def async_init(self, runtime: DistributedRuntime):
158158
# Create and initialize a dynamo connector for this worker.
159159
# We'll needs this to move data between this worker and remote workers efficiently.
160160
self._connector = connect.Connector()
161-
await self._connector.initialize()
162161

163162
logger.info("Startup completed.")
164163

examples/multimodal/components/video_encode_worker.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ async def generate(
153153
request.embeddings_shape = tuple(tensor_for_descriptor.shape)
154154
descriptor = connect.Descriptor(tensor_for_descriptor)
155155

156-
with self._connector.create_readable(descriptor) as readable:
156+
with await self._connector.create_readable(descriptor) as readable:
157157
request.serialized_request = readable.metadata()
158158
# Clear the image URL as hint that the image is passed as embeddings.
159159
request.multimodal_input.video_url = None
@@ -199,7 +199,6 @@ async def async_init(self, runtime: DistributedRuntime):
199199
# Create and initialize a dynamo connector for this worker.
200200
# We'll needs this to move data between this worker and remote workers efficiently.
201201
self._connector = connect.Connector()
202-
await self._connector.initialize()
203202

204203
logger.info("Startup completed.")
205204

0 commit comments

Comments
 (0)