|
4 | 4 | import weakref |
5 | 5 | from functools import lru_cache |
6 | 6 | from pathlib import Path |
| 7 | +from contextlib import asynccontextmanager |
7 | 8 | import warnings |
8 | 9 |
|
9 | 10 | import asyncio |
|
12 | 13 |
|
13 | 14 | from fsspec.asyn import AsyncFileSystem, sync, sync_wrapper |
14 | 15 | from fsspec.exceptions import FSTimeoutError |
| 16 | +from fsspec.callbacks import DEFAULT_CALLBACK |
| 17 | +from fsspec.utils import isfilelike |
15 | 18 |
|
16 | 19 | from multiformats import CID, multicodec |
17 | 20 | from . import unixfsv1 |
@@ -118,6 +121,17 @@ async def cat(self, path, session): |
118 | 121 | self._raise_not_found_for_status(res, path) |
119 | 122 | return await res.read() |
120 | 123 |
|
| 124 | + @asynccontextmanager |
| 125 | + async def iter_chunked(self, path, session, chunk_size): |
| 126 | + res = await self.get(path, session) |
| 127 | + async with res: |
| 128 | + self._raise_not_found_for_status(res, path) |
| 129 | + try: |
| 130 | + size = int(res.headers["content-length"]) |
| 131 | + except (ValueError, KeyError): |
| 132 | + size = None |
| 133 | + yield size, res.content.iter_chunked(chunk_size) |
| 134 | + |
121 | 135 | async def ls(self, path, session, detail=False): |
122 | 136 | res = await self.get(path, session, headers={"Accept": "application/vnd.ipld.raw"}, params={"format": "raw"}) |
123 | 137 | self._raise_not_found_for_status(res, path) |
@@ -293,6 +307,28 @@ async def _cat_file(self, path, start=None, end=None, **kwargs): |
293 | 307 | session = await self.set_session() |
294 | 308 | return (await self.gateway.cat(path, session))[start:end] |
295 | 309 |
|
| 310 | + async def _get_file( |
| 311 | + self, rpath, lpath, chunk_size=5 * 2**20, callback=DEFAULT_CALLBACK, **kwargs |
| 312 | + ): |
| 313 | + logger.debug(rpath) |
| 314 | + rpath = self._strip_protocol(rpath) |
| 315 | + session = await self.set_session() |
| 316 | + |
| 317 | + if isfilelike(lpath): |
| 318 | + outfile = lpath |
| 319 | + else: |
| 320 | + outfile = open(lpath, "wb") # noqa: ASYNC101, ASYNC230 |
| 321 | + |
| 322 | + try: |
| 323 | + async with self.gateway.iter_chunked(rpath, session, chunk_size) as (size, chunks): |
| 324 | + callback.set_size(size) |
| 325 | + async for chunk in chunks: |
| 326 | + outfile.write(chunk) |
| 327 | + callback.relative_update(len(chunk)) |
| 328 | + finally: |
| 329 | + if not isfilelike(lpath): |
| 330 | + outfile.close() |
| 331 | + |
296 | 332 | async def _info(self, path, **kwargs): |
297 | 333 | path = self._strip_protocol(path) |
298 | 334 | session = await self.set_session() |
|
0 commit comments