Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions src/connectors/aws_s3/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from session_manager import User
from utils.logging_config import get_logger

from .auth import create_s3_resource
from .auth import create_s3_resource, verify_s3_credentials
from .models import S3ConfigureBody
from .support import build_s3_config

Expand Down Expand Up @@ -68,8 +68,7 @@ async def s3_configure(

# Test credentials
try:
s3 = create_s3_resource(conn_config)
list(s3.buckets.all())
verify_s3_credentials(conn_config)
except Exception:
logger.exception("Failed to connect to S3 during credential test.")
return JSONResponse(
Expand Down
46 changes: 46 additions & 0 deletions src/connectors/aws_s3/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,3 +88,49 @@ def create_s3_client(config: Dict[str, Any]):
kwargs = _build_boto3_kwargs(creds)
logger.debug("Creating S3 client with HMAC authentication (boto3)")
return boto3.client("s3", **kwargs)


def verify_s3_credentials(config: Dict[str, Any]) -> None:
"""Verify that the S3 credentials in *config* are valid and grant sufficient access.

Uses the minimal AWS permission available for the caller's IAM policy:

- When ``bucket_names`` are configured: calls ``HeadBucket`` on each bucket,
which requires only ``s3:ListBucket`` scoped to that bucket.
- When no buckets are configured: calls ``ListBuckets`` (``s3:ListAllMyBuckets``).
An ``AccessDenied`` response is treated as valid credentials — the user simply
has a bucket-scoped policy and has not configured specific buckets yet.
Any other error (e.g. ``InvalidAccessKeyId``) propagates as a genuine failure.

Raises:
ValueError: If credentials cannot be resolved from *config* or env vars.
botocore.exceptions.ClientError: If the credentials are invalid or the
bucket(s) are inaccessible (except for the ``AccessDenied`` case above).
ImportError: If boto3/botocore is not installed.
"""
try:
import botocore.exceptions
except ImportError as exc:
raise ImportError(
"botocore is required for the S3 connector. "
"Install it with: pip install boto3"
) from exc

client = create_s3_client(config)
bucket_names: list = config.get("bucket_names") or []

if bucket_names:
for bucket in bucket_names:
client.head_bucket(Bucket=bucket)
else:
try:
client.list_buckets()
except botocore.exceptions.ClientError as exc:
error_code = exc.response.get("Error", {}).get("Code", "")
if error_code == "AccessDenied":
logger.warning(
"S3 credentials valid but lack s3:ListAllMyBuckets. "
"Proceeding — configure bucket_names to enable auto-discovery."
)
return
raise
16 changes: 10 additions & 6 deletions src/connectors/aws_s3/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from connectors.base import BaseConnector, ConnectorDocument, DocumentACL
from utils.logging_config import get_logger

from .auth import create_s3_client, create_s3_resource
from .auth import create_s3_client, create_s3_resource, verify_s3_credentials

logger = get_logger(__name__)

Expand Down Expand Up @@ -98,15 +98,19 @@ def _get_client(self):
# ------------------------------------------------------------------

async def authenticate(self) -> bool:
"""Validate credentials by listing accessible buckets."""
"""Validate credentials using the minimal required AWS permission.

Delegates to ``verify_s3_credentials`` which probes ``HeadBucket`` when
bucket names are configured (requires only ``s3:ListBucket`` per bucket)
or ``ListBuckets`` otherwise (gracefully treats ``AccessDenied`` as valid).
"""
try:
resource = self._get_resource()
list(resource.buckets.all())
verify_s3_credentials(self.config)
self._authenticated = True
logger.debug(f"S3 authenticated for connection {self.connection_id}")
logger.debug("S3 authenticated for connection %s", self.connection_id)
return True
except Exception as exc:
logger.warning(f"S3 authentication failed: {exc}")
logger.warning("S3 authentication failed: %s", exc)
self._authenticated = False
return False

Expand Down
199 changes: 199 additions & 0 deletions tests/unit/test_aws_s3_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""Unit tests for verify_s3_credentials() and S3Connector.authenticate()."""
import pytest
from unittest.mock import MagicMock, patch, call


# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------

def _client_error(code: str):
"""Build a botocore ClientError with the given error code."""
import botocore.exceptions

return botocore.exceptions.ClientError(
{"Error": {"Code": code, "Message": code}},
"operation",
)


# ---------------------------------------------------------------------------
# verify_s3_credentials — bucket_names configured
# ---------------------------------------------------------------------------

class TestVerifyWithBuckets:
def _make_client(self):
mock_client = MagicMock()
mock_client.head_bucket.return_value = {}
return mock_client

def test_success_calls_head_bucket_for_each_bucket(self):
from connectors.aws_s3.auth import verify_s3_credentials

config = {
"access_key": "AKIA_FAKE",
"secret_key": "fake_secret",
"bucket_names": ["bucket-a", "bucket-b"],
}
mock_client = self._make_client()

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
verify_s3_credentials(config)

assert mock_client.head_bucket.call_count == 2
mock_client.head_bucket.assert_any_call(Bucket="bucket-a")
mock_client.head_bucket.assert_any_call(Bucket="bucket-b")

def test_head_bucket_403_propagates(self):
from connectors.aws_s3.auth import verify_s3_credentials

config = {
"access_key": "AKIA_FAKE",
"secret_key": "fake_secret",
"bucket_names": ["restricted-bucket"],
}
mock_client = self._make_client()
mock_client.head_bucket.side_effect = _client_error("403")

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
with pytest.raises(Exception):
verify_s3_credentials(config)

def test_head_bucket_404_propagates(self):
from connectors.aws_s3.auth import verify_s3_credentials

config = {
"access_key": "AKIA_FAKE",
"secret_key": "fake_secret",
"bucket_names": ["nonexistent-bucket"],
}
mock_client = self._make_client()
mock_client.head_bucket.side_effect = _client_error("NoSuchBucket")

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
with pytest.raises(Exception):
verify_s3_credentials(config)


# ---------------------------------------------------------------------------
# verify_s3_credentials — no bucket_names
# ---------------------------------------------------------------------------

class TestVerifyWithoutBuckets:
def _make_client(self):
mock_client = MagicMock()
mock_client.list_buckets.return_value = {"Buckets": []}
return mock_client

def test_success_calls_list_buckets(self):
from connectors.aws_s3.auth import verify_s3_credentials

config = {"access_key": "AKIA_FAKE", "secret_key": "fake_secret"}
mock_client = self._make_client()

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
verify_s3_credentials(config)

mock_client.list_buckets.assert_called_once()

def test_access_denied_does_not_raise(self):
"""Bucket-scoped IAM users receive AccessDenied on ListBuckets — treat as valid creds."""
from connectors.aws_s3.auth import verify_s3_credentials

config = {"access_key": "AKIA_FAKE", "secret_key": "fake_secret"}
mock_client = self._make_client()
mock_client.list_buckets.side_effect = _client_error("AccessDenied")

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
# Should not raise
verify_s3_credentials(config)

def test_invalid_access_key_propagates(self):
from connectors.aws_s3.auth import verify_s3_credentials

config = {"access_key": "INVALID", "secret_key": "INVALID"}
mock_client = self._make_client()
mock_client.list_buckets.side_effect = _client_error("InvalidAccessKeyId")

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
with pytest.raises(Exception):
verify_s3_credentials(config)

def test_signature_mismatch_propagates(self):
from connectors.aws_s3.auth import verify_s3_credentials

config = {"access_key": "AKIA_FAKE", "secret_key": "wrong_secret"}
mock_client = self._make_client()
mock_client.list_buckets.side_effect = _client_error("SignatureDoesNotMatch")

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
with pytest.raises(Exception):
verify_s3_credentials(config)

def test_empty_bucket_names_list_treated_as_no_buckets(self):
"""An explicit empty list should follow the no-buckets path."""
from connectors.aws_s3.auth import verify_s3_credentials

config = {
"access_key": "AKIA_FAKE",
"secret_key": "fake_secret",
"bucket_names": [],
}
mock_client = self._make_client()

with patch("connectors.aws_s3.auth.create_s3_client", return_value=mock_client):
verify_s3_credentials(config)

mock_client.list_buckets.assert_called_once()
mock_client.head_bucket.assert_not_called()


# ---------------------------------------------------------------------------
# S3Connector.authenticate()
# ---------------------------------------------------------------------------

class TestS3ConnectorAuthenticate:
def _make_connector(self, bucket_names=None):
from connectors.aws_s3.connector import S3Connector

config = {
"access_key": "AKIA_FAKE",
"secret_key": "fake_secret",
}
if bucket_names is not None:
config["bucket_names"] = bucket_names
return S3Connector(config)

@pytest.mark.asyncio
async def test_returns_true_on_success(self):
connector = self._make_connector(bucket_names=["my-bucket"])

with patch("connectors.aws_s3.connector.verify_s3_credentials"):
result = await connector.authenticate()

assert result is True
assert connector._authenticated is True

@pytest.mark.asyncio
async def test_returns_false_on_failure(self):
connector = self._make_connector(bucket_names=["my-bucket"])

with patch(
"connectors.aws_s3.connector.verify_s3_credentials",
side_effect=_client_error("403"),
):
result = await connector.authenticate()

assert result is False
assert connector._authenticated is False

@pytest.mark.asyncio
async def test_returns_true_when_access_denied_on_list_buckets(self):
"""AccessDenied on list_buckets (no buckets configured) should still auth successfully."""
connector = self._make_connector() # no bucket_names

# verify_s3_credentials absorbs AccessDenied and returns None (no raise)
with patch("connectors.aws_s3.connector.verify_s3_credentials", return_value=None):
result = await connector.authenticate()

assert result is True
Loading