Skip to content

feature: Add syslog server to PegaProx#257

Open
gyptazy wants to merge 3 commits intoPegaProx:mainfrom
gyptazy:feature/remote-syslog
Open

feature: Add syslog server to PegaProx#257
gyptazy wants to merge 3 commits intoPegaProx:mainfrom
gyptazy:feature/remote-syslog

Conversation

@gyptazy
Copy link
Copy Markdown
Contributor

@gyptazy gyptazy commented Apr 3, 2026

feature: Add syslog server to PegaProx

  • Add integrated syslog server
  • Write logs into dedicated database
  • Add new backend path for getting paginated log entries
  • Add new backend path for getting filtered log entries
  • Add frontend for viewing (filtered) logs in cluster resource tab

Example:
pegaprox_gyptazy_syslog

  * Add integrated syslog server
  * Write logs into dedicated database
  * Add new backend path for getting paginated log entries
  * Add new backend path for getting filtered log entries
  * Add frontend for viewing (filtered) logs in cluster resource tab
@qodo-code-review
Copy link
Copy Markdown

Review Summary by Qodo

Add integrated syslog server with database storage and UI

✨ Enhancement

Grey Divider

Walkthroughs

Description
• Add integrated async syslog server receiving UDP/TCP/TLS messages
• Store syslog events in SQLite database with parsed metadata
• Implement paginated backend API endpoint for querying log events
• Add frontend UI tab with filtering, search, sorting and pagination
• Support severity, protocol, hostname, source IP and facility filtering
Diagram
flowchart LR
  A["Remote Syslog<br/>Clients"] -->|UDP/TCP/TLS| B["Async Syslog<br/>Server"]
  B -->|Parse & Queue| C["Log Writer<br/>Queue"]
  C -->|Insert| D["SQLite<br/>Database"]
  E["Frontend UI<br/>Logs Tab"] -->|Query| F["Backend API<br/>/api/syslog/events"]
  F -->|Filter & Sort| D
  F -->|Return Events| E
Loading

Grey Divider

File Changes

1. pegaprox/background/syslog_server.py ✨ Enhancement +205/-0

Async syslog server with database storage

• New async syslog server implementation supporting UDP, TCP, and TLS protocols
• SQLite database initialization with WAL mode for concurrent access
• Syslog message parsing extracting facility, severity, hostname and message
• Background log writer queue for asynchronous database inserts
• Multiprocessing-based server startup with uvloop optimization support

pegaprox/background/syslog_server.py


2. pegaprox/api/reports.py ✨ Enhancement +144/-0

Backend API for syslog event queries

• New /api/syslog/events endpoint for paginated log event retrieval
• Support for filtering by search, severity, protocol, hostname, source IP and facility
• Configurable sorting by multiple fields with ascending/descending direction
• Dynamic protocol list extraction from database
• Pagination with per-page limit capped at 50 events

pegaprox/api/reports.py


3. web/src/dashboard.js ✨ Enhancement +315/-0

Frontend UI for syslog event viewing and filtering

• Add comprehensive state management for log events with pagination and filtering
• Implement fetchLogEvents function with dynamic filter parameter handling
• Add filter application and reset functions for log event queries
• Create sortable column headers with toggle functionality
• Build responsive filter UI with search, severity, protocol, hostname, source IP and facility
 inputs
• Render paginated log events table with severity-based color coding
• Add logs tab button to cluster resource navigation
• Implement useEffect hook to auto-fetch logs on tab switch and filter changes

web/src/dashboard.js


View more (2)
4. pegaprox/app.py ✨ Enhancement +4/-0

Integrate syslog server into app startup

• Import and initialize syslog server startup function
• Call start_syslog_server() during application initialization
• Add startup confirmation message to console output

pegaprox/app.py


5. web/index.html Miscellaneous +4/-4

HTML template updates

• Minor modifications to support new syslog UI components

web/index.html


Grey Divider

Qodo Logo

@qodo-code-review
Copy link
Copy Markdown

qodo-code-review bot commented Apr 3, 2026

Code Review by Qodo

🐞 Bugs (2) 📘 Rule violations (0) 📎 Requirement gaps (0) 🎨 UX Issues (0)

Grey Divider


Action required

1. Unbounded syslog ingestion 🐞 Bug ☼ Reliability
Description
The syslog receiver spawns an asyncio task per UDP packet and enqueues into an unbounded
asyncio.Queue, so sustained traffic can grow pending tasks/backlog faster than SQLite writes and
exhaust memory/CPU (DoS).
Code

pegaprox/background/syslog_server.py[R96-130]

+class LogWriter:
+    def __init__(self):
+        self.queue = asyncio.Queue()
+
+    async def start(self):
+        loop = asyncio.get_running_loop()
+        while True:
+            entry = await self.queue.get()
+            await loop.run_in_executor(None, _insert_log_sync, entry)
+
+    async def log(self, source_ip, protocol, message):
+        hostname, facility, severity, severity_text, msg = parse_syslog(message)
+
+        entry = (
+            datetime.utcnow().isoformat(),
+            source_ip,
+            hostname,
+            facility,
+            severity,
+            severity_text,
+            msg,
+            protocol
+        )
+
+        await self.queue.put(entry)
+
+
+writer = LogWriter()
+
+
+class SyslogUDP(asyncio.DatagramProtocol):
+    def datagram_received(self, data, addr):
+        message = data.decode(errors="ignore").strip()
+        asyncio.create_task(writer.log(addr[0], "UDP", message))
+
Evidence
LogWriter creates an unbounded queue and writer.log awaits queue.put; SyslogUDP creates a new task
per datagram with no rate limiting/backpressure, while LogWriter.start processes entries serially
via run_in_executor, so producers can outpace the single consumer indefinitely.

pegaprox/background/syslog_server.py[96-105]
pegaprox/background/syslog_server.py[106-121]
pegaprox/background/syslog_server.py[126-130]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The integrated syslog server can be DoS'd because it spawns a task per UDP packet and uses an unbounded `asyncio.Queue`, allowing memory/CPU usage to grow without limit when ingestion exceeds SQLite write throughput.
## Issue Context
- `SyslogUDP.datagram_received` schedules `writer.log(...)` for every datagram.
- `LogWriter.queue` has no `maxsize`, and `LogWriter.start()` performs one DB insert at a time.
## Fix Focus Areas
- Add a bounded queue and an explicit drop/backpressure policy (drop newest/oldest, or reject when full).
- Avoid `asyncio.create_task(...)` per datagram; parse/enqueue synchronously with `put_nowait()` and handle `QueueFull`.
- Consider batching inserts (e.g., drain up to N entries and insert in a single transaction) to increase throughput.
### Files/lines
- pegaprox/background/syslog_server.py[96-130]
- pegaprox/background/syslog_server.py[100-105]
- pegaprox/background/syslog_server.py[49-63]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Syslog IPv4 may fail🐞 Bug ≡ Correctness
Description
The syslog server binds using the IPv6 wildcard host "::" for UDP/TCP without explicitly enabling
dual-stack; on some OS/network configurations this results in IPv6-only listeners, preventing
IPv4-only syslog clients from connecting.
Code

pegaprox/background/syslog_server.py[R154-169]

+async def main_async():
+    init_db()
+
+    loop = asyncio.get_running_loop()
+    asyncio.create_task(writer.start())
+
+    await loop.create_datagram_endpoint(lambda: SyslogUDP(), local_addr=("::", 1514))
+    print("[UDP] Listening on [::]:1514")
+
+    tcp_server = await asyncio.start_server(handle_tcp, host="::", port=1514)
+    print("[TCP] Listening on [::]:1514")
+
+    ssl_ctx = create_ssl_context()
+    if ssl_ctx:
+        tls_server = await asyncio.start_server(handle_tcp, host="::", port=6514, ssl=ssl_ctx)
+        print("[TLS] Listening on [::]:6514")
Evidence
The syslog server uses host/local_addr "::" for both UDP and TCP. Elsewhere in the codebase, console
WebSocket servers explicitly avoid this because asyncio can create IPv6-only sockets for "::" and
instead use host "" to get IPv4+IPv6 listeners; syslog server does not apply that workaround or any
explicit IPV6_V6ONLY control.

pegaprox/background/syslog_server.py[154-169]
pegaprox/app.py[862-865]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The syslog server binds to `host='::'` / `local_addr=('::', port)` without explicitly ensuring dual-stack behavior. In environments where IPv6 wildcard sockets are IPv6-only, IPv4 syslog clients will not be able to send logs.
## Issue Context
The repo already documents/handles this exact asyncio behavior for the console WebSocket servers by using `host=''` when `bind_host == '::'`.
## Fix Focus Areas
- Use the existing pattern from console servers: choose a bind host that yields dual-stack (`''`) when configured for IPv6-any.
- Alternatively, create/bind sockets manually and set `IPV6_V6ONLY=0` for AF_INET6 sockets, or start separate IPv4 + IPv6 endpoints.
- Add a config option for syslog bind host/ports if needed.
### Files/lines
- pegaprox/background/syslog_server.py[154-169]
- pegaprox/app.py[862-865]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


3. Syslog events cross-tenant🐞 Bug ⛨ Security
Description
The /api/syslog/events endpoint is only guarded by cluster.view and returns the full integrated
syslog database without tenant/cluster scoping, so any viewer can read all collected syslog events
in a multi-tenant deployment.
Code

pegaprox/api/reports.py[R127-239]

+@bp.route('/api/syslog/events', methods=['GET'])
+@require_auth(perms=['cluster.view'])
+def get_integrated_syslog_events():
+    """Paginated overview of events stored by the integrated syslog server."""
+    try:
+        page = max(int(request.args.get('page', 1)), 1)
+    except (TypeError, ValueError):
+        page = 1
+
+    try:
+        per_page = int(request.args.get('per_page', 50))
+    except (TypeError, ValueError):
+        per_page = 50
+    per_page = min(max(per_page, 1), 50)
+
+    search = (request.args.get('search') or '').strip().lower()
+    severity = (request.args.get('severity') or '').strip()
+    protocol = (request.args.get('protocol') or '').strip().upper()
+    hostname = (request.args.get('hostname') or '').strip().lower()
+    source_ip = (request.args.get('source_ip') or '').strip().lower()
+    facility = (request.args.get('facility') or '').strip()
+
+    sort_map = {
+        'id': 'id',
+        'timestamp': 'timestamp',
+        'source_ip': 'source_ip',
+        'hostname': 'hostname',
+        'facility': 'facility',
+        'severity': 'severity',
+        'severity_text': 'severity_text',
+        'message': 'message',
+        'protocol': 'protocol',
+    }
+
+    sort_by = sort_map.get(request.args.get('sort_by', 'timestamp'), 'timestamp')
+    sort_dir = 'asc' if request.args.get('sort_dir', 'desc').lower() == 'asc' else 'desc'
+
+    db_path = os.path.abspath(DB_FILE)
+    if not os.path.exists(db_path):
+        return jsonify({
+            'items': [],
+            'pagination': {'page': page, 'per_page': per_page, 'total': 0, 'total_pages': 0},
+            'filters': {
+                'protocols': [],
+                'severities': [{'value': level, 'label': text} for level, text in sorted(SEVERITY_MAP.items())]
+            }
+        })
+
+    where = []
+    params = []
+
+    if search:
+        like = f'%{search}%'
+        where.append("""(
+            LOWER(COALESCE(timestamp, '')) LIKE ? OR
+            LOWER(COALESCE(source_ip, '')) LIKE ? OR
+            LOWER(COALESCE(hostname, '')) LIKE ? OR
+            LOWER(COALESCE(severity_text, '')) LIKE ? OR
+            LOWER(COALESCE(message, '')) LIKE ? OR
+            LOWER(COALESCE(protocol, '')) LIKE ?
+        )""")
+        params.extend([like, like, like, like, like, like])
+
+    if severity != '':
+        try:
+            severity_value = int(severity)
+            where.append('severity = ?')
+            params.append(severity_value)
+        except ValueError:
+            pass
+
+    if protocol:
+        where.append("UPPER(COALESCE(protocol, '')) = ?")
+        params.append(protocol)
+
+    if hostname:
+        where.append("LOWER(COALESCE(hostname, '')) LIKE ?")
+        params.append(f'%{hostname}%')
+
+    if source_ip:
+        where.append("LOWER(COALESCE(source_ip, '')) LIKE ?")
+        params.append(f'%{source_ip}%')
+
+    if facility != '':
+        try:
+            facility_value = int(facility)
+            where.append('facility = ?')
+            params.append(facility_value)
+        except ValueError:
+            pass
+
+    where_sql = f"WHERE {' AND '.join(where)}" if where else ''
+    offset = (page - 1) * per_page
+
+    conn = sqlite3.connect(db_path)
+    conn.row_factory = sqlite3.Row
+    try:
+        total = conn.execute(
+            f"SELECT COUNT(*) AS count FROM logs {where_sql}",
+            params
+        ).fetchone()['count']
+
+        rows = conn.execute(
+            f"""
+            SELECT id, timestamp, source_ip, hostname, facility, severity, severity_text, message, protocol
+            FROM logs
+            {where_sql}
+            ORDER BY {sort_by} {sort_dir}, id DESC
+            LIMIT ? OFFSET ?
+            """,
+            [*params, per_page, offset]
+        ).fetchall()
+
Evidence
The endpoint requires only cluster.view, which is granted to the default viewer role, and then
queries FROM logs without any tenant/cluster constraint. In the same module, other report
endpoints explicitly compute accessible_clusters = get_user_clusters(user_data) for multi-tenant
filtering, but get_integrated_syslog_events does not perform an equivalent access filter.

pegaprox/api/reports.py[127-239]
pegaprox/api/reports.py[35-40]
pegaprox/models/permissions.py[176-206]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`GET /api/syslog/events` is accessible to any user with `cluster.view` (including ROLE_VIEWER) and returns all integrated syslog rows with no tenant/cluster isolation, which breaks the repository’s multi-tenancy security model.
## Issue Context
- `get_reports_summary()` in the same file already applies tenant filtering via `get_user_clusters(...)`.
- Integrated syslog rows currently have no `cluster_id`/`tenant_id` column, so the API cannot filter results correctly.
## Fix Focus Areas
- Short-term safety: restrict this endpoint to an admin-only permission (e.g., `admin.audit` / `admin.settings`) until proper scoping exists.
- Proper fix: extend the syslog schema to include `tenant_id` and/or `cluster_id` (or an equivalent ownership key) at ingestion time and filter queries based on `get_user_clusters(user_data)`.
- Mirror the tenant-filtering pattern used elsewhere in `pegaprox/api/reports.py`.
### Files/lines
- pegaprox/api/reports.py[127-239]
- pegaprox/api/reports.py[35-40]
- pegaprox/background/syslog_server.py[26-43]
- pegaprox/models/permissions.py[176-206]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

4. Unindexed logs query path 🐞 Bug ➹ Performance
Description
The syslog database schema creates the logs table without indexes, but the API sorts/filters on
timestamp, severity, facility, hostname, and source_ip, which will degrade quickly as the
table grows.
Code

pegaprox/background/syslog_server.py[R26-43]

+def init_db():
+    conn = sqlite3.connect(DB_FILE)
+    cur = conn.cursor()
+    cur.execute("PRAGMA journal_mode=WAL;")
+
+    cur.execute("""
+        CREATE TABLE IF NOT EXISTS logs (
+            id INTEGER PRIMARY KEY AUTOINCREMENT,
+            timestamp TEXT,
+            source_ip TEXT,
+            hostname TEXT,
+            facility INTEGER,
+            severity INTEGER,
+            severity_text TEXT,
+            message TEXT,
+            protocol TEXT
+        )
+    """)
Evidence
The schema defines only an autoincrement primary key and no secondary indexes, while the API issues
ORDER BY timestamp and filters on several columns; without indexes, SQLite will scan/sort
increasingly large datasets for common queries.

pegaprox/background/syslog_server.py[26-43]
pegaprox/api/reports.py[149-159]
pegaprox/api/reports.py[221-238]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The integrated syslog table lacks indexes, but the API supports sorting/filtering on multiple columns. As log volume grows, queries will become slow due to full scans/sorts.
## Issue Context
The API sorts on `timestamp` and filters on `severity`, `facility`, `hostname`, `source_ip`, and `protocol`.
## Fix Focus Areas
- Add indexes suited to query patterns (e.g., `timestamp`, `(timestamp, id)`, `severity`, `facility`, `source_ip`, `hostname`, `protocol`).
- Consider an index for common combined filters (e.g., `(severity, timestamp)`), depending on expected usage.
### Files/lines
- pegaprox/background/syslog_server.py[26-43]
- pegaprox/api/reports.py[149-159]
- pegaprox/api/reports.py[221-238]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant