Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions opendalfs/decorator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import inspect
import functools

def generate_blocking_methods(cls):
"""
Class decorator that automatically creates blocking versions of async methods.

For each async method that starts with '_', creates a non-underscore
blocking version that uses the sync version from self.fs.
"""
async_methods = []

# Find all async methods that start with underscore
for name, method in inspect.getmembers(cls, predicate=inspect.isfunction):
if name.startswith('_') and inspect.iscoroutinefunction(method):
# Skip any private methods with double underscore
if name.startswith('__'):
continue

# Get the method name without underscore
blocking_name = name[1:]

# Skip if the blocking version is already defined
if hasattr(cls, blocking_name):
continue

async_methods.append((name, blocking_name, method))

# Now add the blocking versions
for async_name, blocking_name, async_method in async_methods:
# Create a factory function that captures the current values
def make_blocking_method(async_name, blocking_name):
@functools.wraps(async_method)
def blocking_method(self, *args, **kwargs):
fs_method = getattr(self.fs, blocking_name)
return fs_method(*args, **kwargs)

# Add docstring
blocking_method.__doc__ = f"Synchronous version of {async_name}"
return blocking_method

# Create the method with the factory and add it to the class
blocking_impl = make_blocking_method(async_name, blocking_name)
setattr(cls, blocking_name, blocking_impl)

return cls
31 changes: 2 additions & 29 deletions opendalfs/fs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
import logging
from opendal import Operator, AsyncOperator
from .file import OpendalBufferedFile
from .decorator import generate_blocking_methods

logger = logging.getLogger("opendalfs")


@generate_blocking_methods
class OpendalFileSystem(AsyncFileSystem):
"""OpenDAL implementation of fsspec AsyncFileSystem.

Expand Down Expand Up @@ -71,37 +72,18 @@ async def _rmdir(self, path: str, recursive: bool = False) -> None:
else:
await self.async_fs.delete(path)

def rmdir(self, path: str, recursive: bool = False) -> None:
"""Sync version of rmdir"""
if recursive:
self.fs.remove_all(path)
else:
self.fs.delete(path)

async def _rm_file(self, path: str, **kwargs) -> None:
"""Remove file"""
await self.async_fs.delete(path)

def rm_file(self, path: str) -> None:
"""Sync version of rm_file"""
return sync(self.loop, self._rm_file, path)

async def _read(self, path: str):
"""Read file contents"""
return await self.async_fs.read(path)

def read(self, path: str):
"""Read file contents"""
return self.fs.read(path)

async def _write(self, path: str, data: bytes):
"""Write file contents"""
await self.async_fs.write(path, data)

def write(self, path: str, data: bytes):
"""Synchronous version of write"""
self.fs.write(path, data)

# Higher-level async operations built on core methods
async def _exists(self, path: str, **kwargs):
"""Check path existence"""
Expand All @@ -127,16 +109,7 @@ def _open(
**kwargs,
)

def created(self, path: str) -> None:
"""Get creation time (not supported)"""
raise NotImplementedError("Creation time is not supported by OpenDAL")

async def _modified(self, path: str):
"""Get modified time (async version)"""
info = await self.async_fs.stat(path)
return info.last_modified

def modified(self, path: str):
"""Get modified time (sync version)"""
info = self.fs.stat(path)
return info.last_modified