diff --git a/transformers/action_mapper.py b/transformers/action_mapper.py new file mode 100644 index 00000000..fc6cb9bc --- /dev/null +++ b/transformers/action_mapper.py @@ -0,0 +1,45 @@ +"""Action mapping transformer. + +This module defines a transformer responsible for mapping action values +between vendor-specific representations and the universal data model. +""" + +from typing import Any +from typing import Dict + +from .base_transformer import BaseTransformer + + +class ActionMapper(BaseTransformer): + """Map action values between vendor and universal models. + + This transformer replaces the ``action`` field of an item using a + predefined mapping dictionary. If the action is not found in the + mapping, it is left unchanged. + """ + + def __init__(self, action_map: Dict[str, str]) -> None: + """Initialize the ActionMapper. + + Args: + action_map: A dictionary mapping source action values to + destination action values. + """ + self.action_map = action_map + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Transform an item's action field using the action mapping. + + Args: + item: A dictionary representing a single rule or configuration + entry containing an ``action`` field. + + Returns: + The transformed item with its ``action`` field mapped according + to the configured action map. + """ + action = item.get("action") + if action in self.action_map: + item["action"] = self.action_map[action] + + return item diff --git a/transformers/base_transformer.py b/transformers/base_transformer.py new file mode 100644 index 00000000..04bd40b4 --- /dev/null +++ b/transformers/base_transformer.py @@ -0,0 +1,32 @@ +"""Base transformer definition. + +This module defines the abstract base class used by all transformers +in the transformation pipeline. +""" + +from abc import ABC +from abc import abstractmethod +from typing import Any +from typing import Dict + + +class BaseTransformer(ABC): + """Define the interface for all transformers. + + All concrete transformers must implement the ``transform`` method, + which takes a single dictionary item and returns a transformed + dictionary. + """ + + @abstractmethod + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Transform a single dictionary item. + + Args: + item: A dictionary representing a single configuration or + URL entry. + + Returns: + A transformed dictionary. + """ + raise NotImplementedError diff --git a/transformers/category_mapper.py b/transformers/category_mapper.py new file mode 100644 index 00000000..3010f14b --- /dev/null +++ b/transformers/category_mapper.py @@ -0,0 +1,46 @@ +"""Category mapping transformer. + +This module defines a transformer responsible for mapping category +identifiers between vendor-specific representations and the universal +data model. +""" + +from typing import Any +from typing import Dict + +from .base_transformer import BaseTransformer + + +class CategoryMapper(BaseTransformer): + """Map category identifiers between vendor and universal models. + + This transformer replaces the ``category_id`` field of an item using + a predefined mapping dictionary. If the category is not found in the + mapping, it is left unchanged. + """ + + def __init__(self, category_map: Dict[str, str]) -> None: + """Initialize the CategoryMapper. + + Args: + category_map: A dictionary mapping source category identifiers + to destination category identifiers. + """ + self.category_map = category_map + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Transform an item's category identifier using the category map. + + Args: + item: A dictionary representing a single rule or configuration + entry containing a ``category_id`` field. + + Returns: + The transformed item with its ``category_id`` field mapped + according to the configured category map. + """ + category_id = item.get("category_id") + if category_id in self.category_map: + item["category_id"] = self.category_map[category_id] + + return item diff --git a/transformers/metadata_enricher.py b/transformers/metadata_enricher.py new file mode 100644 index 00000000..95669776 --- /dev/null +++ b/transformers/metadata_enricher.py @@ -0,0 +1,45 @@ +"""Metadata enrichment transformer. + +This module defines a transformer that adds vendor information and +metadata timestamps to each item in the transformation pipeline. +""" + +from datetime import datetime +from typing import Any +from typing import Dict + +from .base_transformer import BaseTransformer + + +class MetadataEnricher(BaseTransformer): + """Enrich items with vendor and metadata information. + + This transformer adds a ``vendor`` field and a ``metadata`` dictionary + containing a ``processed_at`` timestamp to each item. + """ + + def __init__(self, vendor: str) -> None: + """Initialize the MetadataEnricher. + + Args: + vendor: The vendor name to attach to each item. + """ + self.vendor = vendor + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Add vendor and metadata information to an item. + + Args: + item: A dictionary representing a single configuration or URL entry. + + Returns: + The transformed dictionary containing the ``vendor`` field and + a ``metadata.processed_at`` timestamp. + """ + item["vendor"] = self.vendor + if "metadata" not in item: + item["metadata"] = {} + + item["metadata"]["processed_at"] = datetime.utcnow().isoformat() + + return item diff --git a/transformers/pattern_normalizer.py b/transformers/pattern_normalizer.py new file mode 100644 index 00000000..f88212d0 --- /dev/null +++ b/transformers/pattern_normalizer.py @@ -0,0 +1,30 @@ +"""Pattern normalization transformer. + +This module defines a generic pattern normalizer that ensures each item +has a ``pattern`` field. Currently, this transformer acts as a pass-through. +""" + +from typing import Any +from typing import Dict + +from .base_transformer import BaseTransformer + + +class PatternNormalizer(BaseTransformer): + """Normalize or enforce the presence of a pattern field in items. + + This transformer guarantees that each dictionary item contains a + ``pattern`` key. If the key is missing, it is initialized to an empty string. + """ + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Ensure the item has a pattern field. + + Args: + item: A dictionary representing a single configuration or URL entry. + + Returns: + The same dictionary with a ``pattern`` key ensured. + """ + item["pattern"] = item.get("pattern", "") + return item diff --git a/transformers/pipelines.py b/transformers/pipelines.py new file mode 100644 index 00000000..1ec2647d --- /dev/null +++ b/transformers/pipelines.py @@ -0,0 +1,39 @@ +"""Pipeline helper functions for URL transformations. + +This module provides utilities to apply a sequence of transformers +to vendor configuration items, producing universal model dictionaries. +""" + +from typing import Any +from typing import Dict +from typing import List + +from transformers.base_transformer import BaseTransformer + + +def apply_transformers( + items: List[Dict[str, Any]], + transformers: List[BaseTransformer] +) -> List[Dict[str, Any]]: + """Apply a sequence of transformers to a list of items. + + Each item in the input list is processed sequentially by all + transformers in the given order. + + Args: + items: A list of dictionaries representing vendor configuration + entries. + transformers: An ordered list of transformer instances that + implement the `transform` method. + + Returns: + A list of transformed dictionaries. + """ + result: List[Dict[str, Any]] = [] + + for item in items: + for transformer in transformers: + item = transformer.transform(item) + result.append(item) + + return result diff --git a/transformers/type_mapper.py b/transformers/type_mapper.py new file mode 100644 index 00000000..79a6afd8 --- /dev/null +++ b/transformers/type_mapper.py @@ -0,0 +1,46 @@ +"""Type mapping transformer. + +This module defines a transformer that maps item types (e.g., literal, +wildcard, regex, substring) between vendor-specific representations +and the universal data model. +""" + +from typing import Any +from typing import Dict + +from .base_transformer import BaseTransformer + + +class TypeMapper(BaseTransformer): + """Map type values between vendor and universal models. + + This transformer replaces the ``type`` field of an item using a + predefined mapping dictionary. If the type is not found in the + mapping, it is left unchanged. + """ + + def __init__(self, type_map: Dict[str, str]) -> None: + """Initialize the TypeMapper. + + Args: + type_map: A dictionary mapping source type values to + destination type values. + """ + self.type_map = type_map + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Transform an item's type field using the type mapping. + + Args: + item: A dictionary representing a single rule or configuration + entry containing a ``type`` field. + + Returns: + The transformed item with its ``type`` field mapped according + to the configured type map. + """ + item_type = item.get("type") + if item_type in self.type_map: + item["type"] = self.type_map[item_type] + + return item diff --git a/transformers/vendors/fortinet_transformer.py b/transformers/vendors/fortinet_transformer.py new file mode 100644 index 00000000..48804404 --- /dev/null +++ b/transformers/vendors/fortinet_transformer.py @@ -0,0 +1,54 @@ +"""Fortinet transformation pipelines. + +This module defines action, category, and type mappings for Fortinet +URL filtering configurations, as well as transformation pipelines +to convert between Fortinet-specific and universal data models. +""" + +from transformers.action_mapper import ActionMapper +from transformers.base_transformer import BaseTransformer +from transformers.category_mapper import CategoryMapper +from transformers.metadata_enricher import MetadataEnricher +from transformers.pattern_normalizer import PatternNormalizer +from transformers.type_mapper import TypeMapper + +# ---------------- FORTINET MAPPINGS ---------------- + +FORTINET_ACTION_MAP = { + "block": "block", + "allow": "allow", + "monitor": "monitor", +} + +FORTINET_CATEGORY_MAP = { + "3": "malware", + "4": "phishing", + "5": "gambling", + "default": "uncategorized", +} + +FORTINET_TYPE_MAP = { + "simple": "literal", + "wildcard": "wildcard", + "regex": "regex", + "substring": "substring", +} + + +# ---------------- FORTINET PIPELINES ---------------- + +VENDOR_TO_UNIVERSAL_PIPELINES = [ + ActionMapper(FORTINET_ACTION_MAP), + PatternNormalizer(), + TypeMapper(FORTINET_TYPE_MAP), + CategoryMapper(FORTINET_CATEGORY_MAP), + MetadataEnricher("fortinet"), +] + +UNIVERSAL_TO_VENDOR_PIPELINES = [ + ActionMapper({value: key for key, value in FORTINET_ACTION_MAP.items()}), + PatternNormalizer(), + TypeMapper({value: key for key, value in FORTINET_TYPE_MAP.items()}), + CategoryMapper({value: key for key, value in FORTINET_CATEGORY_MAP.items()}), + MetadataEnricher("fortinet"), +] diff --git a/transformers/vendors/netskope_transformer.py b/transformers/vendors/netskope_transformer.py new file mode 100644 index 00000000..c4e2e13a --- /dev/null +++ b/transformers/vendors/netskope_transformer.py @@ -0,0 +1,207 @@ +"""Netskope transformation pipelines and pattern normalization. + +This module defines transformers and mappings required to convert +Netskope URL list configurations to and from the universal data model. +""" + +import re +from typing import Any +from typing import Dict +from typing import List +from typing import Optional + +from transformers.action_mapper import ActionMapper +from transformers.base_transformer import BaseTransformer +from transformers.category_mapper import CategoryMapper +from transformers.metadata_enricher import MetadataEnricher +from transformers.pattern_normalizer import PatternNormalizer +from transformers.type_mapper import TypeMapper + + +class NetskopePatternNormalizer(BaseTransformer): + """Normalize Netskope URL patterns for vendor compatibility. + + This transformer converts universal URL patterns into Netskope- + compatible formats: + + - ``literal`` / ``exact`` patterns are preserved. + - ``wildcard`` patterns are converted into regex. + - ``regex`` patterns are passed through unchanged. + """ + + def wildcard_to_regex(self, pattern: str) -> str: + r"""Convert a wildcard domain pattern to a regex. + + Example: + ``*.example.com`` → ``^([^.]+\.)*example\.com$`` + + Args: + pattern: A wildcard URL pattern. + + Returns: + A regex representation of the wildcard pattern. + """ + if pattern.startswith("*."): + domain = pattern[2:].replace(".", r"\.") + return rf"^([^.]+\.)*{domain}$" + + return pattern + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Transform a URL item into a Netskope-compatible pattern. + + Args: + item: A universal URL dictionary. + + Returns: + The transformed dictionary with Netskope pattern semantics. + """ + raw_pattern = item.get("pattern", "") + universal_type = item.get("type", "literal") + + if universal_type in ("literal", "exact"): + item["pattern"] = raw_pattern + item["netskope_type"] = "exact" + + elif universal_type in ("wildcard", "regex"): + item["pattern"] = self.wildcard_to_regex(raw_pattern) + item["netskope_type"] = "regex" + + else: + item["pattern"] = raw_pattern + item["netskope_type"] = "exact" + + return item + + def transform_list( + self, + items: List[Dict[str, Any]], + ) -> List[Dict[str, Any]]: + """Transform a list of URL items. + + Args: + items: A list of universal URL dictionaries. + + Returns: + A list of Netskope-compatible URL dictionaries. + """ + return [self.transform(item) for item in items] + + +class NetskopePatternDenormalizer(BaseTransformer): + """Convert Netskope patterns back to universal model patterns.""" + + def regex_to_wildcard(self, pattern: str) -> Optional[str]: + r"""Convert a Netskope regex pattern back to wildcard format. + + Example: + ``^([^.]+\\.)*example\\.com$`` → ``*.example.com`` + + Args: + pattern: A Netskope regex pattern. + + Returns: + A wildcard pattern if conversion is possible, otherwise ``None``. + """ + wildcard_regex = ( + r"^\^\(\[\^\.\]\+\\\.\)\*(.+)\\\.([a-zA-Z0-9\-]+)\$$" + ) + match = re.match(wildcard_regex, pattern) + + if match: + domain = f"{match.group(1)}.{match.group(2)}" + return f"*.{domain}" + + return None + + def is_regex(self, pattern: str) -> bool: + """Determine whether a pattern contains regex syntax. + + Args: + pattern: A URL pattern string. + + Returns: + ``True`` if the pattern appears to be a regex, otherwise ``False``. + """ + regex_markers = ("^", "$", "(", ")", "[", "]", "+", "?", "|", "{", "}") + return any(marker in pattern for marker in regex_markers) + + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Transform a Netskope URL item into a universal-compatible form. + + Args: + item: A Netskope URL dictionary. + + Returns: + A universal URL dictionary. + """ + pattern = item.get("pattern", "").replace("\\\\", "\\") + + if pattern.startswith("*.") and pattern.count("*") == 1: + item["type"] = "wildcard" + + elif "*" in pattern: + item["type"] = "regex" + + elif self.is_regex(pattern): + wildcard = self.regex_to_wildcard(pattern) + if wildcard: + item["type"] = "wildcard" + pattern = wildcard + else: + item["type"] = "regex" + + else: + item["type"] = "exact" + + item["pattern"] = pattern + item.pop("netskope_type", None) + + return item + + +# ---------------- NETSKOPE MAPPINGS ---------------- + +NETSKOPE_ACTION_MAP = { + "block": "deny", + "allow": "allow", + "monitor": "allow", +} + +NETSKOPE_CATEGORY_MAP = { + "malware": "malware", + "phishing": "phishing", + "gambling": "gambling", + "uncategorized": "uncategorized", +} + +NETSKOPE_TO_UNIVERSAL_TYPE_MAP = { + "exact": "literal", + "regex": "regex", +} + +UNIVERSAL_TO_NETSKOPE_TYPE_MAP = { + "literal": "exact", + "regex": "regex", + "wildcard": "regex", + "substring": "regex", +} + + +# ---------------- NETSKOPE PIPELINES ---------------- + +VENDOR_TO_UNIVERSAL_PIPELINES = [ + ActionMapper(NETSKOPE_ACTION_MAP), + TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), + NetskopePatternDenormalizer(), + CategoryMapper(NETSKOPE_CATEGORY_MAP), + MetadataEnricher("netskope"), +] + +UNIVERSAL_TO_VENDOR_PIPELINES = [ + ActionMapper({value: key for key, value in NETSKOPE_ACTION_MAP.items()}), + TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), + NetskopePatternNormalizer(), + CategoryMapper({value: key for key, value in NETSKOPE_CATEGORY_MAP.items()}), + MetadataEnricher("netskope"), +] diff --git a/transformers/vendors/prisma_transformer.py b/transformers/vendors/prisma_transformer.py new file mode 100644 index 00000000..87ec8120 --- /dev/null +++ b/transformers/vendors/prisma_transformer.py @@ -0,0 +1,54 @@ +"""Prisma transformation pipelines. + +This module defines action, category, and type mappings for Prisma +URL filtering configurations, along with transformation pipelines +to convert between Prisma-specific and universal data models. +""" + +from transformers.action_mapper import ActionMapper +from transformers.base_transformer import BaseTransformer +from transformers.category_mapper import CategoryMapper +from transformers.metadata_enricher import MetadataEnricher +from transformers.pattern_normalizer import PatternNormalizer +from transformers.type_mapper import TypeMapper + +# ---------------- PRISMA MAPPINGS ---------------- + +PRISMA_ACTION_MAP = { + "block": "deny", + "allow": "allow", + "monitor": "alert", +} + +PRISMA_CATEGORY_MAP = { + "malware": "malware", + "phishing": "phishing", + "gambling": "gambling", + "uncategorized": "uncategorized", +} + +PRISMA_TYPE_MAP = { + "simple": "literal", + "wildcard": "wildcard", + "regex": "regex", + "substring": "substring", +} + + +# ---------------- PRISMA PIPELINES ---------------- + +VENDOR_TO_UNIVERSAL_PIPELINES = [ + ActionMapper(PRISMA_ACTION_MAP), + PatternNormalizer(), + TypeMapper(PRISMA_TYPE_MAP), + CategoryMapper(PRISMA_CATEGORY_MAP), + MetadataEnricher("prisma"), +] + +UNIVERSAL_TO_VENDOR_PIPELINES = [ + ActionMapper({value: key for key, value in PRISMA_ACTION_MAP.items()}), + PatternNormalizer(), + TypeMapper({value: key for key, value in PRISMA_TYPE_MAP.items()}), + CategoryMapper({value: key for key, value in PRISMA_CATEGORY_MAP.items()}), + MetadataEnricher("prisma"), +] diff --git a/transformers/vendors/zscaler_transformer.py b/transformers/vendors/zscaler_transformer.py new file mode 100644 index 00000000..600cb85f --- /dev/null +++ b/transformers/vendors/zscaler_transformer.py @@ -0,0 +1,54 @@ +"""Zscaler transformation pipelines. + +This module defines action, category, and type mappings for Zscaler +URL filtering configurations, along with transformation pipelines +to convert between Zscaler-specific and universal data models. +""" + +from transformers.action_mapper import ActionMapper +from transformers.base_transformer import BaseTransformer +from transformers.category_mapper import CategoryMapper +from transformers.metadata_enricher import MetadataEnricher +from transformers.pattern_normalizer import PatternNormalizer +from transformers.type_mapper import TypeMapper + +# ---------------- ZSCALER MAPPINGS ---------------- + +ZSCALER_ACTION_MAP = { + "block": "BLOCK", + "allow": "ALLOW", + "monitor": "MONITOR", +} + +ZSCALER_CATEGORY_MAP = { + "malware": "malware", + "phishing": "phishing", + "gambling": "gambling", + "uncategorized": "uncategorized", +} + +ZSCALER_TYPE_MAP = { + "STRING": "literal", + "WILDCARD": "wildcard", + "REGEX": "regex", +} + + +# ---------------- ZSCALER PIPELINES ---------------- + +VENDOR_TO_UNIVERSAL_PIPELINES = [ + ActionMapper(ZSCALER_ACTION_MAP), + PatternNormalizer(), + TypeMapper(ZSCALER_TYPE_MAP), + CategoryMapper(ZSCALER_CATEGORY_MAP), + MetadataEnricher("zscaler"), +] + +UNIVERSAL_TO_VENDOR_PIPELINES = [ + ActionMapper({value: key for key, value in ZSCALER_ACTION_MAP.items()}), + PatternNormalizer(), + TypeMapper({value: key for key, value in ZSCALER_TYPE_MAP.items()}), + CategoryMapper({value: key for key, value in ZSCALER_CATEGORY_MAP.items()}), + MetadataEnricher("zscaler"), +] +