Skip to content
63 changes: 37 additions & 26 deletions src/secops/chronicle/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
#
"""Parser management functionality for Chronicle."""

from typing import Dict, Any, List, Optional
from secops.exceptions import APIError
import base64
from typing import Any, Dict, List, Optional, Union

from secops.exceptions import APIError

# Constants for size limits
MAX_LOG_SIZE = 10 * 1024 * 1024 # 10MB per log
Expand All @@ -26,7 +26,9 @@


def activate_parser(
client, log_type: str, id: str # pylint: disable=redefined-builtin
client: "ChronicleClient",
log_type: str,
id: str, # pylint: disable=redefined-builtin
) -> Dict[str, Any]:
"""Activate a custom parser.

Expand All @@ -42,8 +44,8 @@ def activate_parser(
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}"
f"/parsers/{id}:activate"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers/{id}:activate"
)
body = {}
response = client.session.post(url, json=body)
Expand All @@ -55,7 +57,9 @@ def activate_parser(


def activate_release_candidate_parser(
client, log_type: str, id: str # pylint: disable=redefined-builtin
client: "ChronicleClient",
log_type: str,
id: str, # pylint: disable=redefined-builtin
) -> Dict[str, Any]:
"""Activate the release candidate parser making it live for that customer.

Expand All @@ -71,8 +75,8 @@ def activate_release_candidate_parser(
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}"
f"/parsers/{id}:activateReleaseCandidateParser"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers/{id}:activateReleaseCandidateParser"
)
body = {}
response = client.session.post(url, json=body)
Expand All @@ -84,7 +88,9 @@ def activate_release_candidate_parser(


def copy_parser(
client, log_type: str, id: str # pylint: disable=redefined-builtin
client: "ChronicleClient",
log_type: str,
id: str, # pylint: disable=redefined-builtin
) -> Dict[str, Any]:
"""Makes a copy of a prebuilt parser.

Expand All @@ -100,8 +106,8 @@ def copy_parser(
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}"
f"/parsers/{id}:copy"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers/{id}:copy"
)
body = {}
response = client.session.post(url, json=body)
Expand All @@ -113,7 +119,7 @@ def copy_parser(


def create_parser(
client,
client: "ChronicleClient",
log_type: str,
parser_code: str,
validated_on_empty_logs: bool = True,
Expand Down Expand Up @@ -148,7 +154,9 @@ def create_parser(


def deactivate_parser(
client, log_type: str, id: str # pylint: disable=redefined-builtin
client: "ChronicleClient",
log_type: str,
id: str, # pylint: disable=redefined-builtin
) -> Dict[str, Any]:
"""Deactivate a custom parser.

Expand All @@ -164,8 +172,8 @@ def deactivate_parser(
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}"
f"/parsers/{id}:deactivate"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers/{id}:deactivate"
)
body = {}
response = client.session.post(url, json=body)
Expand All @@ -177,7 +185,7 @@ def deactivate_parser(


def delete_parser(
client,
client: "ChronicleClient",
log_type: str,
id: str, # pylint: disable=redefined-builtin
force: bool = False,
Expand All @@ -197,8 +205,8 @@ def delete_parser(
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}"
f"/parsers/{id}"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers/{id}"
)
params = {"force": force}
response = client.session.delete(url, params=params)
Expand All @@ -210,7 +218,9 @@ def delete_parser(


def get_parser(
client, log_type: str, id: str # pylint: disable=redefined-builtin
client: "ChronicleClient",
log_type: str,
id: str, # pylint: disable=redefined-builtin
) -> Dict[str, Any]:
"""Get a Parser by ID.

Expand All @@ -226,8 +236,8 @@ def get_parser(
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}"
f"/parsers/{id}"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers/{id}"
)
response = client.session.get(url)

Expand All @@ -238,10 +248,10 @@ def get_parser(


def list_parsers(
client,
client: "ChronicleClient",
log_type: str = "-",
page_size: int = 100,
page_token: str = None,
page_token: Optional[Union[str, None]] = None,
filter: str = None, # pylint: disable=redefined-builtin
) -> List[Any]:
"""List parsers.
Expand Down Expand Up @@ -284,8 +294,8 @@ def list_parsers(
if "parsers" in data:
parsers.extend(data["parsers"])

if "next_page_token" in data:
params["pageToken"] = data["next_page_token"]
if "nextPageToken" in data:
page_token = data["nextPageToken"]
else:
more = False

Expand Down Expand Up @@ -379,7 +389,8 @@ def run_parser(

# Build request
url = (
f"{client.base_url}/{client.instance_id}/logTypes/{log_type}:runParser"
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}:runParser"
)

parser = {
Expand Down
Loading