Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 85 additions & 0 deletions adminapi/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,3 +193,88 @@ def parse_function_string(args, strict=True): # NOQA C901
parsed_args.append(('literal', args[string_start:]))

return parsed_args


def build_query(query_args):
"""Build a text query from parsed query arguments.

This is the inverse of parse_query(). It takes a dictionary mapping
attribute names to filter values and returns a text query string.

Args:
query_args: A dictionary like {'hostname': BaseFilter('web.*'),
'state': BaseFilter('online')}

Returns:
A text query string like 'hostname=Regexp(web.*) state=online'
"""
if not query_args:
return ''

parts = []
for attr, filter_obj in query_args.items():
value_str = _format_filter_value(filter_obj)
parts.append(f'{attr}={value_str}')

return ' '.join(parts)


def _format_filter_value(filter_obj):
"""Format a filter object as a text query value.

Args:
filter_obj: A BaseFilter instance or subclass

Returns:
A string representation suitable for text queries
"""
if not isinstance(filter_obj, BaseFilter):
# Plain value, format it directly
return _format_literal(filter_obj)

filter_type = type(filter_obj)

# BaseFilter with plain value - no function wrapper needed
if filter_type == BaseFilter:
return _format_literal(filter_obj.value)

# Empty filter - no arguments
if filter_type.__name__ == 'Empty':
return 'Empty()'

# Any/All filters - multiple values
if hasattr(filter_obj, 'values'):
inner_parts = [_format_filter_value(v) for v in filter_obj.values]
return '{}({})'.format(filter_type.__name__, ' '.join(inner_parts))

# Not filter - single nested filter
if filter_type.__name__ == 'Not':
inner = _format_filter_value(filter_obj.value)
return '{}({})'.format(filter_type.__name__, inner)

# Other filters (Regexp, GreaterThan, etc.) - single value
return '{}({})'.format(filter_type.__name__, _format_literal(filter_obj.value))


def _format_literal(value):
"""Format a literal value for text query output.

Args:
value: A Python value (str, int, bool, etc.)

Returns:
A string representation suitable for text queries
"""
if isinstance(value, bool):
return 'true' if value else 'false'

if isinstance(value, str):
# Check if the string needs quoting (contains spaces or special chars)
if ' ' in value or '(' in value or ')' in value or '=' in value:
# Escape backslashes and quotes, then quote the string
escaped = value.replace('\\', '\\\\').replace('"', '\\"')
return '"{}"'.format(escaped)
return value

# For numbers, IP addresses, dates, etc. - use string representation
return str(value)
247 changes: 246 additions & 1 deletion adminapi/tests/test_parse.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import unittest

from adminapi.datatype import DatatypeError
from adminapi.filters import (
All, Not, Empty,
)
from adminapi.filters import (
BaseFilter,
Regexp,
Any,
GreaterThan,
)
from adminapi.parse import build_query
from adminapi.parse import parse_query, parse_function_string


Expand Down Expand Up @@ -101,7 +105,7 @@ def test_invalid_function(self):

def test_top_level_literal_error(self):
with self.assertRaisesRegex(
DatatypeError, r"Invalid term: Top level literals are not allowed"
DatatypeError, r"Invalid term: Top level literals are not allowed"
):
parse_query("hostname=test value")

Expand Down Expand Up @@ -185,3 +189,244 @@ def test_parentheses_handling(self):
("endfunc", ""),
]
self.assertEqual(result, expected)


class TestBuildQuery(unittest.TestCase):
"""Tests for the build_query function."""

def test_simple_filter(self):
"""Test building a query with a simple attribute filter."""
query = 'servertype=vm'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'servertype=vm')

def test_multiple_attributes(self):
"""Test building a query with multiple attributes."""
query = 'servertype=vm state=online'
parsed = parse_query(query)
rebuilt = build_query(parsed)
# Order may vary due to dict iteration, check both attributes present
self.assertIn('servertype=vm', rebuilt)
self.assertIn('state=online', rebuilt)

def test_regexp_filter_explicit(self):
"""Test building a query with explicit Regexp filter."""
query = 'hostname=Regexp(web.*)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=Regexp(web.*)')

def test_any_filter(self):
"""Test building a query with Any filter."""
query = 'state=Any(online offline)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Any(online offline)')

def test_all_filter(self):
"""Test building a query with All filter."""
query = 'tags=All(production web)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'tags=All(production web)')

def test_not_filter(self):
"""Test building a query with Not filter."""
query = 'state=Not(offline)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Not(offline)')

def test_empty_filter(self):
"""Test building a query with Empty filter."""
query = 'comment=Empty()'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'comment=Empty()')

def test_numeric_value(self):
"""Test building a query with numeric value."""
query = 'cpu_cores=4'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'cpu_cores=4')

def test_boolean_true(self):
"""Test building a query with boolean true value."""
query = 'active=true'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'active=true')

def test_boolean_false(self):
"""Test building a query with boolean false value."""
query = 'active=false'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'active=false')

def test_hostname_only_plain(self):
"""Test building a query with plain hostname."""
query = 'webserver01'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=webserver01')

def test_hostname_only_with_regex(self):
"""Test building a query with hostname containing regex chars."""
query = 'web.*'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=Regexp(web.*)')

def test_greater_than_filter(self):
"""Test building a query with GreaterThan filter."""
query = 'cpu_cores=GreaterThan(4)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'cpu_cores=GreaterThan(4)')

def test_less_than_filter(self):
"""Test building a query with LessThan filter."""
query = 'memory=LessThan(8192)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'memory=LessThan(8192)')

def test_nested_not_any(self):
"""Test building a query with nested Not(Any(...))."""
query = 'state=Not(Any(offline maintenance))'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Not(Any(offline maintenance))')

def test_nested_any_not(self):
"""Test building a query with nested Any containing Not."""
query = 'state=Any(online Not(offline))'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'state=Any(online Not(offline))')

def test_empty_query(self):
"""Test building an empty query."""
parsed = {}
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, '')

def test_contains_filter(self):
"""Test building a query with Contains filter."""
query = 'hostname=Contains(web)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=Contains(web)')

def test_startswith_filter(self):
"""Test building a query with StartsWith filter."""
query = 'hostname=StartsWith(web)'
parsed = parse_query(query)
rebuilt = build_query(parsed)
self.assertEqual(rebuilt, 'hostname=StartsWith(web)')


class TestBuildQueryRoundTrip(unittest.TestCase):
"""Tests to verify parse_query and build_query are inverses."""

def assert_filters_equal(self, filter1, filter2):
"""Recursively compare two filter objects for equality."""
self.assertEqual(type(filter1), type(filter2))

if hasattr(filter1, 'values'):
# Any/All filters have multiple values
self.assertEqual(len(filter1.values), len(filter2.values))
for v1, v2 in zip(filter1.values, filter2.values):
self.assert_filters_equal(v1, v2)
elif hasattr(filter1, 'value'):
# Single value filters (BaseFilter, Not, Regexp, etc.)
if isinstance(filter1.value, BaseFilter):
self.assert_filters_equal(filter1.value, filter2.value)
else:
self.assertEqual(filter1.value, filter2.value)

def assert_round_trip(self, query):
"""Assert that parsing and rebuilding produces equivalent result."""
parsed = parse_query(query)
rebuilt = build_query(parsed)
reparsed = parse_query(rebuilt)

# Compare keys
self.assertEqual(set(parsed.keys()), set(reparsed.keys()))

# Compare filter types and values recursively
for key in parsed:
self.assert_filters_equal(parsed[key], reparsed[key])

def test_round_trip_simple(self):
"""Test round-trip for simple query."""
self.assert_round_trip('servertype=vm')

def test_round_trip_any(self):
"""Test round-trip for Any filter."""
self.assert_round_trip('state=Any(online offline)')

def test_round_trip_not(self):
"""Test round-trip for Not filter."""
self.assert_round_trip('state=Not(offline)')

def test_round_trip_nested(self):
"""Test round-trip for nested filters."""
self.assert_round_trip('state=Not(Any(offline maintenance))')

def test_round_trip_numeric(self):
"""Test round-trip for numeric value."""
self.assert_round_trip('cpu_cores=4')

def test_round_trip_boolean(self):
"""Test round-trip for boolean value."""
self.assert_round_trip('active=true')


class TestBuildQueryDirectConstruction(unittest.TestCase):
"""Tests for build_query with directly constructed filter objects."""

def test_direct_base_filter(self):
"""Test building query from directly constructed BaseFilter."""
query_args = {'servertype': BaseFilter('vm')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'servertype=vm')

def test_direct_regexp(self):
"""Test building query from directly constructed Regexp."""
query_args = {'hostname': Regexp('web.*')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'hostname=Regexp(web.*)')

def test_direct_any(self):
"""Test building query from directly constructed Any."""
query_args = {'state': Any('online', 'offline')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'state=Any(online offline)')

def test_direct_not(self):
"""Test building query from directly constructed Not."""
query_args = {'state': Not('offline')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'state=Not(offline)')

def test_direct_empty(self):
"""Test building query from directly constructed Empty."""
query_args = {'comment': Empty()}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'comment=Empty()')

def test_direct_nested(self):
"""Test building query from nested filter construction."""
query_args = {'state': Not(Any('offline', 'maintenance'))}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'state=Not(Any(offline maintenance))')

def test_direct_all(self):
"""Test building query from directly constructed All."""
query_args = {'tags': All('production', 'web')}
rebuilt = build_query(query_args)
self.assertEqual(rebuilt, 'tags=All(production web)')
Loading
Loading