-
Notifications
You must be signed in to change notification settings - Fork 125
Expand file tree
/
Copy path27-transcription-live-sagemaker.py
More file actions
121 lines (90 loc) · 4.41 KB
/
27-transcription-live-sagemaker.py
File metadata and controls
121 lines (90 loc) · 4.41 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""
Example: Live Transcription via SageMaker (Listen V1)
This example shows how to use a custom transport to stream audio for real-time
transcription through a SageMaker endpoint running Deepgram, instead of the
Deepgram Cloud WebSocket API.
The SageMaker transport uses HTTP/2 bidirectional streaming under the hood,
but exposes the same SDK interface — just swap in a transport_factory.
**Async-only** — the SageMaker transport requires ``AsyncDeepgramClient``.
It cannot be used with the sync ``DeepgramClient``.
Requirements::
pip install deepgram-sagemaker
Environment:
AWS credentials must be configured (via environment variables,
``~/.aws/credentials``, or an IAM role).
Set ``SAGEMAKER_ENDPOINT_NAME`` and ``AWS_REGION`` in ``.env`` or your shell.
"""
import asyncio
import os
import sys
from typing import Union
# Ensure print output appears immediately (e.g. in Docker, piped output, IDE consoles)
sys.stdout.reconfigure(line_buffering=True)
from dotenv import load_dotenv
load_dotenv()
from deepgram import AsyncDeepgramClient
from deepgram.core.events import EventType
from deepgram.listen.v1.types import (
ListenV1Metadata,
ListenV1Results,
ListenV1SpeechStarted,
ListenV1UtteranceEnd,
)
from deepgram_sagemaker import SageMakerTransportFactory
ListenV1SocketClientResponse = Union[ListenV1Results, ListenV1Metadata, ListenV1UtteranceEnd, ListenV1SpeechStarted]
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
SAGEMAKER_ENDPOINT = os.getenv("SAGEMAKER_ENDPOINT_NAME", "deepgram-nova-3")
SAGEMAKER_REGION = os.getenv("AWS_REGION", "us-west-2")
CHUNK_SIZE = 512_000 # 512 KB per chunk (optimal for streaming performance)
CHUNK_DELAY = 0.5 # seconds between chunks
# ---------------------------------------------------------------------------
# Create the client with SageMaker transport
# ---------------------------------------------------------------------------
factory = SageMakerTransportFactory(
endpoint_name=SAGEMAKER_ENDPOINT,
region=SAGEMAKER_REGION,
)
# SageMaker uses AWS credentials (not Deepgram API keys), so api_key is unused
client = AsyncDeepgramClient(api_key="unused", transport_factory=factory)
async def main() -> None:
try:
async with client.listen.v1.connect(model="nova-3") as connection:
def on_message(message: ListenV1SocketClientResponse) -> None:
msg_type = getattr(message, "type", "Unknown")
print(f"Received {msg_type} event")
# Extract transcription from Results events
if isinstance(message, ListenV1Results):
if message.channel and message.channel.alternatives:
transcript = message.channel.alternatives[0].transcript
if transcript:
print(f"Transcript: {transcript}")
connection.on(EventType.OPEN, lambda _: print("Connection opened"))
connection.on(EventType.MESSAGE, on_message)
connection.on(EventType.CLOSE, lambda _: print("Connection closed"))
connection.on(EventType.ERROR, lambda error: print(f"Error: {error}"))
# Start listening in a background task so we can send audio concurrently
listen_task = asyncio.create_task(connection.start_listening())
# Wait for the connection to establish
await asyncio.sleep(1)
# Read and send audio in chunks
audio_path = os.path.join(os.path.dirname(__file__), "fixtures", "audio.wav")
with open(audio_path, "rb") as audio_file:
audio_data = audio_file.read()
print(f"Sending {len(audio_data)} bytes in {CHUNK_SIZE}-byte chunks...")
for i in range(0, len(audio_data), CHUNK_SIZE):
chunk = audio_data[i : i + CHUNK_SIZE]
await connection.send_media(chunk)
print(f"Sent chunk {i // CHUNK_SIZE + 1} ({len(chunk)} bytes)")
await asyncio.sleep(CHUNK_DELAY)
# Signal end of audio
await connection.send_finalize()
print("Finished sending audio")
# Wait for final responses
await asyncio.sleep(5)
# Cancel the listening task
listen_task.cancel()
except Exception as e:
print(f"Error: {e}")
asyncio.run(main())