# src/darca_storage/connectors/local.py
# License: MIT
"""
Async connector for a local-filesystem backend.
• Performs reachability and access probes without blocking the event-loop
(`asyncio.to_thread`).
• Returns a ready-scoped *async* `StorageClient`.
• Supports credential injection (e.g. posix_user) via
CredentialAware interface.
"""
from __future__ import annotations
import asyncio
import os
from typing import Dict, Optional
from darca_file_utils.directory_utils import (
DirectoryUtils,
DirectoryUtilsException,
)
from darca_file_utils.file_utils import FileUtils, FileUtilsException
from darca_storage.backends.local_file_backend import LocalFileBackend
from darca_storage.decorators.scoped_backend import ScopedFileBackend
from darca_storage.interfaces.credential_aware import CredentialAware
from darca_storage.interfaces.storage_connector import StorageConnector
[docs]
class LocalStorageConnector(StorageConnector, CredentialAware):
def __init__(
self,
base_path: str,
credentials: Optional[Dict[str, str]] = None,
parameters: Optional[Dict[str, str]] = None,
) -> None:
if not base_path:
raise ValueError(
"A base_path must be provided for LocalStorageConnector."
)
self._base_path: str = os.path.abspath(base_path)
self._credentials: Dict[str, str] = credentials or {}
self._parameters: Dict[str, str] = parameters or {}
[docs]
def inject_credentials(self, credentials: Dict[str, str]) -> None:
"""
Store credentials for downstream use (e.g., POSIX identity,
audit context).
"""
self._credentials = credentials
[docs]
async def connect(self) -> ScopedFileBackend:
if not await self.verify_connection():
raise RuntimeError(
f"Local storage path '{self._base_path}' is not reachable."
)
if not await self.verify_access():
raise PermissionError(f"Access to '{self._base_path}' is denied.")
return ScopedFileBackend(
backend=LocalFileBackend(), base_path=self._base_path
)
[docs]
async def verify_connection(self) -> bool:
"""True if the directory exists."""
return await asyncio.to_thread(
DirectoryUtils.directory_exist, self._base_path
)
[docs]
async def verify_access(
self,
*,
user: Optional[str] = None,
permissions: Optional[int] = None,
) -> bool:
"""
Attempt mkdir / touch / rm to prove we have RW access.
Uses injected credentials if available (e.g. posix_user).
"""
effective_user = self._credentials.get("posix_user") or user
try:
# Ensure root directory exists (mkdir may be needed the first time)
await asyncio.to_thread(
self._ensure_dir,
user=effective_user,
permissions=permissions,
)
# Probe write / delete
test_file = os.path.join(
self._base_path, f".access_check_{os.getpid()}"
)
await asyncio.to_thread(
FileUtils.write_file,
file_path=test_file,
content="ok",
binary=False,
permissions=permissions,
user=effective_user,
)
await asyncio.to_thread(FileUtils.remove_file, test_file)
return True
except (DirectoryUtilsException, FileUtilsException):
return False
def _ensure_dir(
self, *, user: Optional[str], permissions: Optional[int]
) -> None:
if not DirectoryUtils.directory_exist(self._base_path):
DirectoryUtils.create_directory(
path=self._base_path,
permissions=permissions,
user=user,
)
@property
def base_path(self) -> str:
"""Absolute root directory this connector targets."""
return self._base_path