Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,114 @@ def _kv_list_to_dict(field) -> dict:
logger.info(f"fetch_services returning {len(services)} total unique services")
return services

def _get_grpc_host(self) -> str:
"""Derive the gRPC host (host:443) from the configured HTTP endpoint."""
endpoint_url = self.__endpoint
if endpoint_url.startswith('https://'):
endpoint_url = endpoint_url[8:]
elif endpoint_url.startswith('http://'):
endpoint_url = endpoint_url[7:]
domain = endpoint_url.split('/')[0]

region_map = {
'eu2': 'api.eu2.coralogix.com',
'eu1': 'api.eu1.coralogix.com',
'us2': 'api.us2.coralogix.com',
'us1': 'api.us1.coralogix.com',
'ap1': 'api.ap1.coralogix.com',
'ap2': 'api.ap2.coralogix.com',
'ap3': 'api.ap3.coralogix.com',
}
for region, host in region_map.items():
if region in domain.lower():
return f'{host}:443'

# Fall back to the domain itself
return domain if ':' in domain else f'{domain}:443'

def fetch_apm_services(self):
"""
Fetch all services from the Coralogix APM Service Catalog.

Calls:
com.coralogixapis.apm.services.v1.ApmServiceService/ListApmServices

Uses the grpcio Python library (no grpcurl binary needed). Protobuf
message types are defined inline so no generated stubs are required.

Returns:
list[dict]: Each entry contains id, name, type, workloads, technology,
and sloStatusCount as returned by the API.
"""
try:
import grpc
from google.protobuf import descriptor_pb2
from google.protobuf.message_factory import GetMessages
from google.protobuf import json_format

# --- Build inline proto descriptors ---
file_proto = descriptor_pb2.FileDescriptorProto()
file_proto.name = "coralogix_apm_services.proto"
file_proto.package = "com.coralogixapis.apm.services.v1"
file_proto.syntax = "proto3"

# message ApmService
svc = file_proto.message_type.add()
svc.name = "ApmService"
for num, fname in [(1, "id"), (2, "name"), (3, "type"), (5, "technology")]:
f = svc.field.add()
f.name = fname
f.number = num
f.type = 9 # TYPE_STRING
f.label = 1 # LABEL_OPTIONAL
f = svc.field.add()
f.name = "workloads"
f.number = 4
f.type = 9 # TYPE_STRING
f.label = 3 # LABEL_REPEATED

# message ListApmServicesRequest (empty)
req_msg = file_proto.message_type.add()
req_msg.name = "ListApmServicesRequest"

# message ListApmServicesResponse
resp_msg = file_proto.message_type.add()
resp_msg.name = "ListApmServicesResponse"
f = resp_msg.field.add()
f.name = "services"
f.number = 1
f.type = 11 # TYPE_MESSAGE
f.label = 3 # LABEL_REPEATED
f.type_name = ".com.coralogixapis.apm.services.v1.ApmService"

classes = GetMessages([file_proto])
pkg = "com.coralogixapis.apm.services.v1"
RequestClass = classes[f"{pkg}.ListApmServicesRequest"]
ResponseClass = classes[f"{pkg}.ListApmServicesResponse"]

# --- Make the gRPC call ---
grpc_host = self._get_grpc_host()
channel = grpc.secure_channel(grpc_host, grpc.ssl_channel_credentials())
method = channel.unary_unary(
'/com.coralogixapis.apm.services.v1.ApmServiceService/ListApmServices',
request_serializer=RequestClass.SerializeToString,
response_deserializer=ResponseClass.FromString,
)

response = method(
RequestClass(),
metadata=[('authorization', f'Bearer {self.__api_key}')],
timeout=30,
)

services = [json_format.MessageToDict(svc) for svc in response.services]
logger.info(f"Fetched {len(services)} APM services from Coralogix Service Catalog")
return services

except Exception as e:
logger.error(f"Exception fetching Coralogix APM services: {e}")
raise e

def fetch_alert_defs(self):
"""
Fetch all alert definition configurations from Coralogix using gRPC API.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,81 @@ def __init__(self):
"display_name": "Fetch Alert Definitions from Coralogix",
"category": "Alerts",
"form_fields": [],
}
},
Coralogix.TaskType.FETCH_SPANS: {
"executor": self.execute_fetch_spans,
"model_types": [],
"result_type": PlaybookTaskResultType.API_RESPONSE,
"display_name": "Fetch Spans from Coralogix",
"category": "Traces",
"form_fields": [
FormField(
key_name=StringValue(value="query"),
display_name=StringValue(value="Lucene Query"),
description=StringValue(value="Lucene query to filter spans (e.g., '*', 'serviceName:frontend', 'operationName:\"GET /api/users\" AND duration:>1000')"),
data_type=LiteralType.STRING,
form_field_type=FormFieldType.MULTILINE_FT,
is_optional=True,
),
FormField(
key_name=StringValue(value="from_time"),
display_name=StringValue(value="From Time"),
description=StringValue(value="Start time (e.g., 'now-1h', '2024-01-01T00:00:00Z')"),
data_type=LiteralType.STRING,
form_field_type=FormFieldType.TEXT_FT,
is_optional=True,
),
FormField(
key_name=StringValue(value="to_time"),
display_name=StringValue(value="To Time"),
description=StringValue(value="End time (e.g., 'now', '2024-01-01T01:00:00Z')"),
data_type=LiteralType.STRING,
form_field_type=FormFieldType.TEXT_FT,
is_optional=True,
),
FormField(
key_name=StringValue(value="limit"),
display_name=StringValue(value="Limit"),
description=StringValue(value="Maximum number of spans to return (default: 100)"),
data_type=LiteralType.LONG,
form_field_type=FormFieldType.TEXT_FT,
is_optional=True,
),
],
},
Coralogix.TaskType.FETCH_TRACE: {
"executor": self.execute_fetch_trace,
"model_types": [],
"result_type": PlaybookTaskResultType.API_RESPONSE,
"display_name": "Fetch Trace by ID from Coralogix",
"category": "Traces",
"form_fields": [
FormField(
key_name=StringValue(value="trace_id"),
display_name=StringValue(value="Trace ID"),
description=StringValue(value="The trace ID to fetch all spans for (e.g., 'abc123def456...')"),
data_type=LiteralType.STRING,
form_field_type=FormFieldType.TEXT_FT,
is_optional=False,
),
FormField(
key_name=StringValue(value="from_time"),
display_name=StringValue(value="From Time"),
description=StringValue(value="Start of search window (e.g., 'now-3h'). Default: now-3h"),
data_type=LiteralType.STRING,
form_field_type=FormFieldType.TEXT_FT,
is_optional=True,
),
FormField(
key_name=StringValue(value="to_time"),
display_name=StringValue(value="To Time"),
description=StringValue(value="End of search window (e.g., 'now'). Default: now"),
data_type=LiteralType.STRING,
form_field_type=FormFieldType.TEXT_FT,
is_optional=True,
),
],
},
}

self.connector_form_configs = [
Expand Down Expand Up @@ -624,6 +698,148 @@ def execute_fetch_logs(self, time_range: TimeRange, coralogix_task: Coralogix, c
metadata=metadata
)

def execute_fetch_spans(self, time_range: TimeRange, coralogix_task: Coralogix, coralogix_connector: ConnectorProto):
try:
self._validate_connector(coralogix_connector)
task = coralogix_task.fetch_spans

query = task.query.value if task.HasField("query") and task.query.value else "*"
from_time = task.from_time.value if task.HasField("from_time") and task.from_time.value else "now-1h"
to_time = task.to_time.value if task.HasField("to_time") and task.to_time.value else "now"
limit = task.limit.value if task.HasField("limit") and task.limit.value else 100

processor = self.get_connector_processor(coralogix_connector)

print(
f"Playbook Task Downstream Request: Type -> Coralogix Spans, Query -> {query}, "
f"From -> {from_time}, To -> {to_time}, Limit -> {limit}"
)

response = processor.execute_spans_query(query=query, from_time=from_time, to_time=to_time)

domain = self._extract_domain_from_connector(coralogix_connector)
time_params = self._get_coralogix_time_params(time_range)
metadata = self._create_metadata_with_coralogix_url(domain, "spans", {
"query": query, "from_time": from_time, "to_time": to_time, **time_params
})

if not response:
return PlaybookTaskResult(
type=PlaybookTaskResultType.TEXT,
text=TextResult(output=StringValue(value=f"No spans returned from Coralogix for query: {query}")),
source=self.source, metadata=metadata
)

normalized = self._normalize_logs_response(response)
if normalized is not None:
spans = normalized['results']
count = normalized.get('count', len(spans))
elif isinstance(response, dict) and 'results' in response:
spans = response['results']
count = response.get('count', len(spans))
else:
spans = response if isinstance(response, list) else []
count = len(spans)

try:
response_struct = dict_to_proto({'spans': spans, 'count': count}, Struct)
return PlaybookTaskResult(
source=self.source,
type=PlaybookTaskResultType.API_RESPONSE,
api_response=ApiResponseResult(response_body=response_struct),
metadata=metadata
)
except Exception:
return PlaybookTaskResult(
source=self.source,
type=PlaybookTaskResultType.TEXT,
text=TextResult(output=StringValue(value=f"Spans fetched. Found {count} spans. Response: {str(spans)}")),
metadata=metadata
)

except Exception as e:
logger.error(f"Error executing Coralogix fetch spans task: {e}")
domain = self._extract_domain_from_connector(coralogix_connector)
metadata = self._create_metadata_with_coralogix_url(domain, "spans", {})
return PlaybookTaskResult(
type=PlaybookTaskResultType.TEXT,
text=TextResult(output=StringValue(value=f"Error while executing Coralogix spans task: {e}")),
source=self.source, metadata=metadata
)

def execute_fetch_trace(self, time_range: TimeRange, coralogix_task: Coralogix, coralogix_connector: ConnectorProto):
try:
self._validate_connector(coralogix_connector)
task = coralogix_task.fetch_trace

trace_id = task.trace_id.value if task.HasField("trace_id") and task.trace_id.value else None
if not trace_id:
raise Exception("trace_id is required for FETCH_TRACE task")

from_time = task.from_time.value if task.HasField("from_time") and task.from_time.value else "now-3h"
to_time = task.to_time.value if task.HasField("to_time") and task.to_time.value else "now"

processor = self.get_connector_processor(coralogix_connector)

print(f"Playbook Task Downstream Request: Type -> Coralogix Trace, TraceId -> {trace_id}")

# Fetch all spans belonging to this trace
response = processor.execute_spans_query(
query=f"traceId:{trace_id}",
from_time=from_time,
to_time=to_time
)

domain = self._extract_domain_from_connector(coralogix_connector)
time_params = self._get_coralogix_time_params(time_range)
metadata = self._create_metadata_with_coralogix_url(domain, "spans", {
"query": f"traceId:{trace_id}", "from_time": from_time, "to_time": to_time, **time_params
})

if not response:
return PlaybookTaskResult(
type=PlaybookTaskResultType.TEXT,
text=TextResult(output=StringValue(value=f"No spans found for trace ID: {trace_id}")),
source=self.source, metadata=metadata
)

normalized = self._normalize_logs_response(response)
if normalized is not None:
spans = normalized['results']
count = normalized.get('count', len(spans))
elif isinstance(response, dict) and 'results' in response:
spans = response['results']
count = response.get('count', len(spans))
else:
spans = response if isinstance(response, list) else []
count = len(spans)

try:
response_struct = dict_to_proto({'trace_id': trace_id, 'spans': spans, 'count': count}, Struct)
return PlaybookTaskResult(
source=self.source,
type=PlaybookTaskResultType.API_RESPONSE,
api_response=ApiResponseResult(response_body=response_struct),
metadata=metadata
)
except Exception:
return PlaybookTaskResult(
source=self.source,
type=PlaybookTaskResultType.TEXT,
text=TextResult(output=StringValue(value=f"Trace fetched. Found {count} spans for trace {trace_id}. Response: {str(spans)}")),
metadata=metadata
)

except Exception as e:
logger.error(f"Error executing Coralogix fetch trace task: {e}")
domain = self._extract_domain_from_connector(coralogix_connector)
metadata = self._create_metadata_with_coralogix_url(domain, "spans", {})
return PlaybookTaskResult(
type=PlaybookTaskResultType.TEXT,
text=TextResult(output=StringValue(value=f"Error while executing Coralogix trace task: {e}")),
source=self.source, metadata=metadata
)

def execute_fetch_metrics(self, time_range: TimeRange, coralogix_task: Coralogix, coralogix_connector: ConnectorProto):
"""
Execute the fetch metrics task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,39 @@ def extract_services(self):

return model_data

@log_function_call
def extract_apm_services(self):
"""
Extract services from the Coralogix APM Service Catalog (gRPC).

Each service is keyed by its name and the full API response is stored
as-is so callers have access to id, type, workloads, technology, and
sloStatusCount.

Returns:
dict: Mapping of service name -> raw service dict from the API.
"""
model_type = SourceModelType.CORALOGIX_APM_SERVICE
model_data = {}
try:
services = self.__coralogix_api_processor.fetch_apm_services()
if not services:
return model_data

for service in services:
name = service.get("name")
if not name:
continue
model_data[name] = service

if model_data:
self.create_or_update_model_metadata(model_type, model_data)
logger.info(f"Extracted {len(model_data)} Coralogix APM services")
except Exception as e:
logger.error(f"Error extracting Coralogix APM services: {e}")

return model_data

@log_function_call
def extract_index_mappings(self, index_pattern="*"):
"""
Expand Down
Loading