forked from OpenBMB/ChatDev
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexceptions.py
More file actions
executable file
·115 lines (82 loc) · 4.27 KB
/
exceptions.py
File metadata and controls
executable file
·115 lines (82 loc) · 4.27 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
"""Custom exceptions for the DevAll workflow system."""
from typing import Optional, Dict, Any
import json
class MACException(Exception):
"""Base exception for DevAll workflow system."""
def __init__(self, message: str, error_code: str = None, details: Dict[str, Any] = None):
super().__init__(message)
self.message = message
self.error_code = error_code or "GENERIC_ERROR"
self.details = details or {}
def to_dict(self) -> Dict[str, Any]:
"""Convert exception to dictionary format for JSON response."""
return {
"error_code": self.error_code,
"message": self.message,
"details": self.details
}
def to_json(self) -> str:
"""Convert exception to JSON string."""
return json.dumps(self.to_dict())
class ValidationError(MACException):
"""Raised when validation fails."""
def __init__(self, message: str, field: str = None, details: Dict[str, Any] = None):
super().__init__(message, "VALIDATION_ERROR", details or {})
if field:
self.details["field"] = field
class SecurityError(MACException):
"""Raised when a security violation occurs."""
def __init__(self, message: str, details: Dict[str, Any] = None):
super().__init__(message, "SECURITY_ERROR", details or {})
class ConfigurationError(MACException):
"""Raised when configuration is invalid or missing."""
def __init__(self, message: str, config_key: str = None, details: Dict[str, Any] = None):
super().__init__(message, "CONFIGURATION_ERROR", details or {})
if config_key:
self.details["config_key"] = config_key
class WorkflowExecutionError(MACException):
"""Raised when workflow execution fails."""
def __init__(self, message: str, workflow_id: str = None, node_id: str = None, details: Dict[str, Any] = None):
super().__init__(message, "WORKFLOW_EXECUTION_ERROR", details or {})
if workflow_id:
self.details["workflow_id"] = workflow_id
if node_id:
self.details["node_id"] = node_id
class WorkflowCancelledError(MACException):
"""Raised when a workflow execution is cancelled mid-flight."""
def __init__(self, message: str, workflow_id: str = None, details: Dict[str, Any] = None):
super().__init__(message, "WORKFLOW_CANCELLED", details or {})
if workflow_id:
self.details["workflow_id"] = workflow_id
class ResourceNotFoundError(MACException):
"""Raised when a requested resource is not found."""
def __init__(self, message: str, resource_type: str = None, resource_id: str = None, details: Dict[str, Any] = None):
super().__init__(message, "RESOURCE_NOT_FOUND", details or {})
if resource_type:
self.details["resource_type"] = resource_type
if resource_id:
self.details["resource_id"] = resource_id
class ResourceConflictError(MACException):
"""Raised when there's a conflict with an existing resource."""
def __init__(self, message: str, resource_type: str = None, resource_id: str = None, details: Dict[str, Any] = None):
super().__init__(message, "RESOURCE_CONFLICT", details or {})
if resource_type:
self.details["resource_type"] = resource_type
if resource_id:
self.details["resource_id"] = resource_id
class TimeoutError(MACException):
"""Raised when an operation times out."""
def __init__(self, message: str, operation: str = None, timeout_duration: float = None, details: Dict[str, Any] = None):
super().__init__(message, "TIMEOUT_ERROR", details or {})
if operation:
self.details["operation"] = operation
if timeout_duration is not None:
self.details["timeout_duration"] = timeout_duration
class ExternalServiceError(MACException):
"""Raised when an external service call fails."""
def __init__(self, message: str, service_name: str = None, status_code: int = None, details: Dict[str, Any] = None):
super().__init__(message, "EXTERNAL_SERVICE_ERROR", details or {})
if service_name:
self.details["service_name"] = service_name
if status_code is not None:
self.details["status_code"] = status_code