Skip to content

Commit 2cfb090

Browse files
authored
feat(loader): batch imdb requests (#54)
1 parent 0bfe528 commit 2cfb090

File tree

7 files changed

+162
-69
lines changed

7 files changed

+162
-69
lines changed

AGENTS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
accurate episode lookups.
2323
- Plex metadata is fetched in batches using `fetchItems` to reduce repeated
2424
network calls when loading library items.
25+
- IMDb metadata is fetched via `titles:batchGet` to minimize repeated API calls.
2526

2627
## User Queries
2728
The project should handle natural-language searches and recommendations such as:

mcp_plex/loader.py

Lines changed: 95 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,11 @@ async def _gather_in_batches(
5151
"""Gather awaitable tasks in fixed-size batches."""
5252

5353
results: List[T] = []
54-
for i in range(0, len(tasks), batch_size):
54+
total = len(tasks)
55+
for i in range(0, total, batch_size):
5556
batch = tasks[i : i + batch_size]
5657
results.extend(await asyncio.gather(*batch))
58+
logger.info("Processed %d/%d items", min(i + batch_size, total), total)
5759
return results
5860

5961

@@ -86,6 +88,54 @@ async def _fetch_imdb(client: httpx.AsyncClient, imdb_id: str) -> Optional[IMDbT
8688
return None
8789

8890

91+
async def _fetch_imdb_batch(
92+
client: httpx.AsyncClient, imdb_ids: Sequence[str]
93+
) -> dict[str, Optional[IMDbTitle]]:
94+
"""Fetch metadata for multiple IMDb IDs in a single request."""
95+
96+
results: dict[str, Optional[IMDbTitle]] = {}
97+
ids_to_fetch: list[str] = []
98+
for imdb_id in imdb_ids:
99+
if _imdb_cache:
100+
cached = _imdb_cache.get(imdb_id)
101+
if cached:
102+
results[imdb_id] = IMDbTitle.model_validate(cached)
103+
continue
104+
ids_to_fetch.append(imdb_id)
105+
106+
if not ids_to_fetch:
107+
return results
108+
109+
delay = _imdb_backoff
110+
params = [("titleIds", i) for i in ids_to_fetch]
111+
for attempt in range(_imdb_max_retries + 1):
112+
resp = await client.get("https://api.imdbapi.dev/titles:batchGet", params=params)
113+
if resp.status_code == 429:
114+
if attempt == _imdb_max_retries:
115+
if _imdb_retry_queue is not None:
116+
for imdb_id in ids_to_fetch:
117+
await _imdb_retry_queue.put(imdb_id)
118+
break
119+
await asyncio.sleep(delay)
120+
delay *= 2
121+
continue
122+
if resp.is_success:
123+
data = resp.json()
124+
for title_data in data.get("titles", []):
125+
imdb_title = IMDbTitle.model_validate(title_data)
126+
results[imdb_title.id] = imdb_title
127+
if _imdb_cache:
128+
_imdb_cache.set(imdb_title.id, title_data)
129+
for missing in set(ids_to_fetch) - set(results):
130+
results[missing] = None
131+
break
132+
for imdb_id in ids_to_fetch:
133+
results[imdb_id] = None
134+
break
135+
136+
return results
137+
138+
89139
def _load_imdb_retry_queue(path: Path) -> None:
90140
"""Populate the retry queue from a JSON file if it exists."""
91141

@@ -279,46 +329,29 @@ async def _load_from_plex(
279329
server: PlexServer, tmdb_api_key: str, *, batch_size: int = 50
280330
) -> List[AggregatedItem]:
281331
"""Load items from a live Plex server."""
282-
283-
async def _augment_movie(client: httpx.AsyncClient, movie: PlexPartialObject) -> AggregatedItem:
284-
ids = _extract_external_ids(movie)
285-
imdb_task = (
286-
_fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None)
287-
)
288-
tmdb_task = (
289-
_fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key)
290-
if ids.tmdb
291-
else asyncio.sleep(0, result=None)
292-
)
293-
imdb, tmdb = await asyncio.gather(imdb_task, tmdb_task)
294-
return AggregatedItem(plex=_build_plex_item(movie), imdb=imdb, tmdb=tmdb)
295-
296-
async def _augment_episode(
297-
client: httpx.AsyncClient,
298-
episode: PlexPartialObject,
299-
show_tmdb: Optional[TMDBShow],
300-
) -> AggregatedItem:
301-
ids = _extract_external_ids(episode)
302-
imdb_task = (
303-
_fetch_imdb(client, ids.imdb) if ids.imdb else asyncio.sleep(0, result=None)
304-
)
305-
season = resolve_tmdb_season_number(show_tmdb, episode)
306-
ep_num = getattr(episode, "index", None)
307-
tmdb_task = (
308-
_fetch_tmdb_episode(client, show_tmdb.id, season, ep_num, tmdb_api_key)
309-
if show_tmdb and season is not None and ep_num is not None
310-
else asyncio.sleep(0, result=None)
311-
)
312-
imdb, tmdb_episode = await asyncio.gather(imdb_task, tmdb_task)
313-
tmdb: Optional[TMDBItem] = tmdb_episode or show_tmdb
314-
return AggregatedItem(plex=_build_plex_item(episode), imdb=imdb, tmdb=tmdb)
315-
316332
results: List[AggregatedItem] = []
317333
async with httpx.AsyncClient(timeout=30) as client:
318334
movie_section = server.library.section("Movies")
319335
movie_keys = [int(m.ratingKey) for m in movie_section.all()]
320336
movies = server.fetchItems(movie_keys) if movie_keys else []
321-
movie_tasks = [_augment_movie(client, movie) for movie in movies]
337+
movie_imdb_ids = [
338+
_extract_external_ids(m).imdb for m in movies if _extract_external_ids(m).imdb
339+
]
340+
movie_imdb_map = (
341+
await _fetch_imdb_batch(client, movie_imdb_ids) if movie_imdb_ids else {}
342+
)
343+
344+
async def _augment_movie(movie: PlexPartialObject) -> AggregatedItem:
345+
ids = _extract_external_ids(movie)
346+
imdb = movie_imdb_map.get(ids.imdb) if ids.imdb else None
347+
tmdb = (
348+
await _fetch_tmdb_movie(client, ids.tmdb, tmdb_api_key)
349+
if ids.tmdb
350+
else None
351+
)
352+
return AggregatedItem(plex=_build_plex_item(movie), imdb=imdb, tmdb=tmdb)
353+
354+
movie_tasks = [_augment_movie(movie) for movie in movies]
322355
if movie_tasks:
323356
results.extend(await _gather_in_batches(movie_tasks, batch_size))
324357

@@ -332,9 +365,33 @@ async def _augment_episode(
332365
show_tmdb = await _fetch_tmdb_show(client, show_ids.tmdb, tmdb_api_key)
333366
episode_keys = [int(e.ratingKey) for e in full_show.episodes()]
334367
episodes = server.fetchItems(episode_keys) if episode_keys else []
335-
episode_tasks = [
336-
_augment_episode(client, episode, show_tmdb) for episode in episodes
368+
ep_imdb_ids = [
369+
_extract_external_ids(e).imdb
370+
for e in episodes
371+
if _extract_external_ids(e).imdb
337372
]
373+
ep_imdb_map = (
374+
await _fetch_imdb_batch(client, ep_imdb_ids) if ep_imdb_ids else {}
375+
)
376+
377+
async def _augment_episode(episode: PlexPartialObject) -> AggregatedItem:
378+
ids = _extract_external_ids(episode)
379+
imdb = ep_imdb_map.get(ids.imdb) if ids.imdb else None
380+
season = resolve_tmdb_season_number(show_tmdb, episode)
381+
ep_num = getattr(episode, "index", None)
382+
tmdb_episode = (
383+
await _fetch_tmdb_episode(
384+
client, show_tmdb.id, season, ep_num, tmdb_api_key
385+
)
386+
if show_tmdb and season is not None and ep_num is not None
387+
else None
388+
)
389+
tmdb: Optional[TMDBItem] = tmdb_episode or show_tmdb
390+
return AggregatedItem(
391+
plex=_build_plex_item(episode), imdb=imdb, tmdb=tmdb
392+
)
393+
394+
episode_tasks = [_augment_episode(ep) for ep in episodes]
338395
if episode_tasks:
339396
results.extend(await _gather_in_batches(episode_tasks, batch_size))
340397
return results

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "mcp-plex"
7-
version = "0.26.19"
7+
version = "0.26.21"
88

99
description = "Plex-Oriented Model Context Protocol Server"
1010
requires-python = ">=3.11,<3.13"

tests/test_gather_in_batches.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import asyncio
2+
import logging
23

34
from mcp_plex import loader
45

@@ -8,7 +9,7 @@ async def _echo(value: int) -> int:
89
return value
910

1011

11-
def test_gather_in_batches(monkeypatch):
12+
def test_gather_in_batches(monkeypatch, caplog):
1213
calls: list[int] = []
1314
orig_gather = asyncio.gather
1415

@@ -19,8 +20,12 @@ async def fake_gather(*coros):
1920
monkeypatch.setattr(asyncio, "gather", fake_gather)
2021

2122
tasks = [_echo(i) for i in range(5)]
22-
results = asyncio.run(loader._gather_in_batches(tasks, 2))
23+
with caplog.at_level(logging.INFO, logger="mcp_plex.loader"):
24+
results = asyncio.run(loader._gather_in_batches(tasks, 2))
2325

2426
assert results == list(range(5))
2527
assert calls == [2, 2, 1]
28+
assert "Processed 2/5 items" in caplog.text
29+
assert "Processed 4/5 items" in caplog.text
30+
assert "Processed 5/5 items" in caplog.text
2631

tests/test_load_from_plex.py

Lines changed: 28 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -62,33 +62,34 @@ async def handler(request):
6262
url = str(request.url)
6363
if "themoviedb.org" in url:
6464
assert request.headers.get("Authorization") == "Bearer key"
65-
if "tt1375666" in url:
66-
return httpx.Response(
67-
200,
68-
json={
69-
"id": "tt1375666",
70-
"type": "movie",
71-
"primaryTitle": "Inception",
72-
},
73-
)
74-
if "tt0959621" in url:
75-
return httpx.Response(
76-
200,
77-
json={
78-
"id": "tt0959621",
79-
"type": "episode",
80-
"primaryTitle": "Pilot",
81-
},
82-
)
83-
if "tt0959622" in url:
84-
return httpx.Response(
85-
200,
86-
json={
87-
"id": "tt0959622",
88-
"type": "episode",
89-
"primaryTitle": "Cat's in the Bag...",
90-
},
91-
)
65+
if "titles:batchGet" in url:
66+
ids = request.url.params.get_list("titleIds")
67+
titles = []
68+
if "tt1375666" in ids:
69+
titles.append(
70+
{
71+
"id": "tt1375666",
72+
"type": "movie",
73+
"primaryTitle": "Inception",
74+
}
75+
)
76+
if "tt0959621" in ids:
77+
titles.append(
78+
{
79+
"id": "tt0959621",
80+
"type": "episode",
81+
"primaryTitle": "Pilot",
82+
}
83+
)
84+
if "tt0959622" in ids:
85+
titles.append(
86+
{
87+
"id": "tt0959622",
88+
"type": "episode",
89+
"primaryTitle": "Cat's in the Bag...",
90+
}
91+
)
92+
return httpx.Response(200, json={"titles": titles})
9293
if "/movie/27205" in url:
9394
return httpx.Response(200, json={"id": 27205, "title": "Inception"})
9495
if "/tv/1396/season/1/episode/1" in url:

tests/test_loader_unit.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
_build_plex_item,
1212
_extract_external_ids,
1313
_fetch_imdb,
14+
_fetch_imdb_batch,
1415
_fetch_tmdb_episode,
1516
_fetch_tmdb_movie,
1617
_fetch_tmdb_show,
@@ -176,6 +177,34 @@ async def main():
176177
asyncio.run(main())
177178

178179

180+
def test_fetch_imdb_batch(tmp_path, monkeypatch):
181+
cache_path = tmp_path / "cache.json"
182+
monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path))
183+
184+
async def imdb_mock(request):
185+
params = request.url.params
186+
assert sorted(params.get_list("titleIds")) == ["tt1", "tt2"]
187+
return httpx.Response(
188+
200,
189+
json={
190+
"titles": [
191+
{"id": "tt1", "type": "movie", "primaryTitle": "A"},
192+
{"id": "tt2", "type": "movie", "primaryTitle": "B"},
193+
]
194+
},
195+
)
196+
197+
async def main():
198+
async with httpx.AsyncClient(transport=httpx.MockTransport(imdb_mock)) as client:
199+
result = await _fetch_imdb_batch(client, ["tt1", "tt2"])
200+
assert result["tt1"] and result["tt1"].primaryTitle == "A"
201+
assert result["tt2"] and result["tt2"].primaryTitle == "B"
202+
203+
asyncio.run(main())
204+
data = json.loads(cache_path.read_text())
205+
assert set(data.keys()) == {"tt1", "tt2"}
206+
207+
179208
def test_fetch_imdb_retries_on_429(monkeypatch, tmp_path):
180209
cache_path = tmp_path / "cache.json"
181210
monkeypatch.setattr(loader, "_imdb_cache", IMDbCache(cache_path))

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)