Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions autorca_core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from autorca_core.reasoning.loop import run_rca, RCARunResult
from autorca_core.logging import configure_logging, get_logger
from autorca_core.config import ThresholdConfig
from autorca_core.validation import IngestionLimits, ValidationError

__all__ = [
"Event",
Expand All @@ -28,4 +29,6 @@
"configure_logging",
"get_logger",
"ThresholdConfig",
"IngestionLimits",
"ValidationError",
]
60 changes: 53 additions & 7 deletions autorca_core/ingestion/logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,18 @@
import re
from pathlib import Path
from typing import List, Optional, Dict, Any
from datetime import datetime
from datetime import datetime, timezone

from autorca_core.model.events import LogEvent, Severity
from autorca_core.logging import get_logger
from autorca_core.validation import (
IngestionLimits,
validate_path,
check_file_size,
check_line_length,
check_total_events,
sanitize_error_message,
)

logger = get_logger(__name__)

Expand All @@ -21,6 +29,7 @@
time_from: Optional[datetime] = None,
time_to: Optional[datetime] = None,
service_filter: Optional[str] = None,
limits: Optional[IngestionLimits] = None,
) -> List[LogEvent]:
"""
Load logs from a file or directory.
Expand All @@ -30,10 +39,14 @@
time_from: Start of time window (inclusive)
time_to: End of time window (inclusive)
service_filter: Only include logs from this service
limits: Optional ingestion limits for security

Returns:
List of LogEvent objects
"""
if limits is None:
limits = IngestionLimits()

source_path = Path(source)

if not source_path.exists():
Expand All @@ -42,13 +55,39 @@
events = []

if source_path.is_file():
events.extend(_load_log_file(source_path))
check_file_size(source_path, limits)
events.extend(_load_log_file(source_path, limits))
else:
# Load all .log, .jsonl, .txt files in directory
extensions = ['*.log', '*.jsonl', '*.txt']
<<<<<<< HEAD

Check failure on line 63 in autorca_core/ingestion/logs.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

Ruff (invalid-syntax)

autorca_core/ingestion/logs.py:63:7: invalid-syntax: Expected a statement

Check failure on line 63 in autorca_core/ingestion/logs.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

Ruff (invalid-syntax)

autorca_core/ingestion/logs.py:63:5: invalid-syntax: Expected a statement

Check failure on line 63 in autorca_core/ingestion/logs.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

Ruff (invalid-syntax)

autorca_core/ingestion/logs.py:63:3: invalid-syntax: Expected a statement

Check failure on line 63 in autorca_core/ingestion/logs.py

View workflow job for this annotation

GitHub Actions / Code Quality Checks

Ruff (invalid-syntax)

autorca_core/ingestion/logs.py:63:1: invalid-syntax: Expected a statement
for ext in extensions:
for file_path in source_path.glob(f"**/{ext}"):
events.extend(_load_log_file(file_path))
=======
file_count = 0
for ext in extensions:
for file_path in source_path.glob(f"**/{ext}"):
# Validate path to prevent traversal
validate_path(source_path, file_path)

# Check file count limit
file_count += 1
if file_count > limits.max_files_per_directory:
print(f"Warning: Reached file limit ({limits.max_files_per_directory}), skipping remaining files")
break

# Check file size
try:
check_file_size(file_path, limits)
events.extend(_load_log_file(file_path, limits))

# Check total event count
check_total_events(len(events), limits)
except Exception as e:
print(f"Warning: Skipping file {file_path.name}: {sanitize_error_message(e, file_path)}")
continue
>>>>>>> 0ac8e01 (security: add input validation and size limits for data ingestion)

# Apply filters
if time_from:
Expand All @@ -61,7 +100,7 @@
return sorted(events, key=lambda e: e.timestamp)


def _load_log_file(file_path: Path) -> List[LogEvent]:
def _load_log_file(file_path: Path, limits: IngestionLimits) -> List[LogEvent]:
"""Load a single log file."""
events = []

Expand All @@ -72,6 +111,9 @@
continue

try:
# Check line length
check_line_length(line, limits)

# Try JSON parsing first
event = _parse_json_log(line)
if event:
Expand All @@ -83,7 +125,11 @@
events.append(event)
except Exception as e:
# Log parsing errors are non-fatal
<<<<<<< HEAD
logger.warning(f"Failed to parse line {line_num} in {file_path}: {e}")
=======
print(f"Warning: Failed to parse line {line_num} in {file_path.name}: {sanitize_error_message(e)}")
>>>>>>> 0ac8e01 (security: add input validation and size limits for data ingestion)

return events

Expand All @@ -96,8 +142,8 @@
# Extract timestamp
timestamp_str = data.get('timestamp') or data.get('time') or data.get('@timestamp')
if not timestamp_str:
# Use current time as fallback
timestamp = datetime.utcnow()
# Use current time as fallback (timezone-aware)
timestamp = datetime.now(timezone.utc)
else:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))

Expand Down Expand Up @@ -151,7 +197,7 @@
try:
timestamp = datetime.fromisoformat(timestamp_str.replace('Z', '+00:00'))
except ValueError:
timestamp = datetime.utcnow()
timestamp = datetime.now(timezone.utc)

level = _parse_severity(level_str)

Expand All @@ -165,7 +211,7 @@

# If pattern doesn't match, create a basic log event
return LogEvent(
timestamp=datetime.utcnow(),
timestamp=datetime.now(timezone.utc),
service="unknown",
message=line,
level=Severity.INFO,
Expand Down
181 changes: 181 additions & 0 deletions autorca_core/validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
"""
Input validation and security controls for AutoRCA-Core.

Provides path validation, size limits, and sanitization to prevent security issues.
"""

import os
from pathlib import Path
from dataclasses import dataclass
from typing import Optional


@dataclass
class IngestionLimits:
"""
Limits for data ingestion to prevent resource exhaustion.

Attributes:
max_file_size_mb: Maximum file size in megabytes
max_total_events: Maximum number of events to ingest
max_line_length: Maximum length of a single line
max_files_per_directory: Maximum files to process from a directory
"""

max_file_size_mb: float = 100.0
max_total_events: int = 1_000_000
max_line_length: int = 65536 # 64KB per line
max_files_per_directory: int = 1000

@classmethod
def strict(cls) -> "IngestionLimits":
"""Create strict limits for untrusted data."""
return cls(
max_file_size_mb=10.0,
max_total_events=100_000,
max_line_length=8192,
max_files_per_directory=100,
)

@classmethod
def relaxed(cls) -> "IngestionLimits":
"""Create relaxed limits for trusted data."""
return cls(
max_file_size_mb=500.0,
max_total_events=10_000_000,
max_line_length=131072, # 128KB
max_files_per_directory=10000,
)


class ValidationError(Exception):
"""Base exception for validation errors."""

pass


class PathTraversalError(ValidationError):
"""Raised when path traversal is detected."""

pass


class FileSizeError(ValidationError):
"""Raised when file size exceeds limits."""

pass


class LineLengthError(ValidationError):
"""Raised when line length exceeds limits."""

pass


def validate_path(source_path: Path, file_path: Path) -> bool:
"""
Ensure file_path is within source_path to prevent path traversal attacks.

Args:
source_path: The expected root directory
file_path: The file path to validate

Returns:
True if path is safe, False otherwise

Raises:
PathTraversalError: If path traversal is detected
"""
try:
# Resolve both paths to absolute paths
source_resolved = source_path.resolve()
file_resolved = file_path.resolve()

# Check if file_path is within source_path
file_resolved.relative_to(source_resolved)
return True
except ValueError:
raise PathTraversalError(
f"Path traversal detected: {file_path} is outside {source_path}"
)


def check_file_size(file_path: Path, limits: IngestionLimits) -> None:
"""
Check if file size is within limits.

Args:
file_path: Path to the file
limits: Ingestion limits configuration

Raises:
FileSizeError: If file exceeds size limit
"""
try:
size_mb = file_path.stat().st_size / (1024 * 1024)
if size_mb > limits.max_file_size_mb:
raise FileSizeError(
f"File size {size_mb:.1f}MB exceeds limit of {limits.max_file_size_mb}MB"
)
except OSError as e:
raise ValidationError(f"Error checking file size: {e}")


def check_line_length(line: str, limits: IngestionLimits) -> None:
"""
Check if line length is within limits.

Args:
line: The line to check
limits: Ingestion limits configuration

Raises:
LineLengthError: If line exceeds length limit
"""
if len(line) > limits.max_line_length:
raise LineLengthError(
f"Line length {len(line)} exceeds limit of {limits.max_line_length}"
)


def sanitize_error_message(error: Exception, file_path: Optional[Path] = None) -> str:
"""
Sanitize error messages to avoid leaking sensitive path information.

Args:
error: The exception to sanitize
file_path: Optional file path to redact

Returns:
Sanitized error message
"""
message = str(error)

# Redact absolute paths
if file_path:
message = message.replace(str(file_path.resolve()), f"<file:{file_path.name}>")

# Redact home directory paths
home = os.path.expanduser("~")
if home in message:
message = message.replace(home, "~")

return message


def check_total_events(current_count: int, limits: IngestionLimits) -> None:
"""
Check if total event count is within limits.

Args:
current_count: Current number of events
limits: Ingestion limits configuration

Raises:
ValidationError: If event count exceeds limit
"""
if current_count >= limits.max_total_events:
raise ValidationError(
f"Event count {current_count} exceeds limit of {limits.max_total_events}"
)

Loading