Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 46 additions & 4 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ services:
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
volumes:
- postgres-data:/var/lib/postgresql/data
- ./scripts/db-maintenance:/docker-entrypoint-initdb.d
ports:
- "${POSTGRES_PORT:-5432}:5432"
env_file: .env
Expand All @@ -100,6 +101,12 @@ services:
interval: 10s
timeout: 5s
retries: 5
command: >
bash -c "
apt-get update &&
apt-get install -y postgresql-13-partman &&
docker-entrypoint.sh postgres
"

prometheus:
image: prom/prometheus:v3.1.0
Expand Down Expand Up @@ -204,6 +211,41 @@ services:
tempo:
condition: service_healthy
required: true
postgres-maintenance:
image: postgres:13
platform: ${PLATFORM:-}
depends_on:
postgres-db:
condition: service_healthy
volumes:
- ./scripts/db-maintenance:/scripts
- postgres-data:/var/lib/postgresql/data:ro
- ./s3-archives:/s3-archives
environment:
- POSTGRES_DB=${POSTGRES_DB}
- POSTGRES_USER=${POSTGRES_USER}
- POSTGRES_PASSWORD=${POSTGRES_PASSWORD}
- PGHOST=postgres-db
- PGPORT=5432
env_file: .env
networks:
- atoma-network
command: /bin/bash -c "sleep 30 && /scripts/run-maintenance.sh"
profiles:
- maintenance

postgres-cron:
image: mcuadros/ofelia:latest
volumes:
- /var/run/docker.sock:/var/run/docker.sock:ro
command: daemon --docker
environment:
- DOCKER_COMPOSE_PROJECT=atoma-network
networks:
- atoma-network
restart: always
depends_on:
- postgres-db

atoma-node:
<<: *atoma-node
Expand Down Expand Up @@ -303,7 +345,7 @@ services:
- CUDA_VISIBLE_DEVICES=1
ipc: host
command: ${VLLM_ENGINE_ARGS}

vllm2:
<<: *inference-service-cuda
container_name: chat-completions2
Expand Down Expand Up @@ -368,7 +410,7 @@ services:
<<: *inference-service-cuda
container_name: chat-completions5
profiles: [chat_completions_vllm]
image: vllm/vllm-openai:v0.8.1
image: vllm/vllm-openai:v0.8.1
environment:
# Backend for attention computation
# Available options:
Expand Down Expand Up @@ -397,7 +439,7 @@ services:
# - "XFORMERS": use XFormers
# - "ROCM_FLASH": use ROCmFlashAttention
# - "FLASHINFER": use flashinfer (recommended for fp8 quantized models)
- VLLM_ATTENTION_BACKEND=FLASH_ATTN
- VLLM_ATTENTION_BACKEND=FLASH_ATTN
- VLLM_FLASH_ATTN_VERSION=3
- VLLM_USE_V1=1
- CUDA_VISIBLE_DEVICES=6
Expand Down Expand Up @@ -443,7 +485,7 @@ services:
- CUDA_VISIBLE_DEVICES=8
ipc: host
command: ${VLLM_ENGINE_ARGS}

vllm-cpu:
<<: *inference-service-cpu
container_name: chat-completions
Expand Down
4 changes: 4 additions & 0 deletions ofelia.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[job "archive-postgres-monthly"]
schedule = @monthly
container = atoma-network_postgres-maintenance_1
command = /scripts/db-maintenance/run-maintenance.sh
13 changes: 11 additions & 2 deletions prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@ scrape_configs:
- job_name: "vllm3"
metrics_path: "/metrics"
static_configs:
- targets: ["vllm3:8000"]
- targets: ["vllm3:8000"]

- job_name: "vllm4"
metrics_path: "/metrics"
static_configs:
- targets: ["vllm4:8000"]

- job_name: "vllm5"
metrics_path: "/metrics"
static_configs:
Expand Down Expand Up @@ -61,3 +61,12 @@ scrape_configs:
- action: labelmap
regex: ^(atoma|libp2p)_(.+)$
replacement: $1_$2
- job_name: "postgres_archival"
metrics_path: "/metrics"
static_configs:
- targets: ["postgres-db:5432"]
relabel_configs:
- source_labels: [__address__]
target_label: instance
regex: "(.*):.*"
replacement: "$1"
6 changes: 6 additions & 0 deletions scripts/db-maintenance/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# AWS S3 Configuration for Database Archiving
AWS_ACCESS_KEY_ID=your_access_key
AWS_SECRET_KEY=your_secret_key
AWS_REGION=us-east-1
S3_BUCKET=your-bucket-name
S3_PREFIX=atoma-db-archives
252 changes: 252 additions & 0 deletions scripts/db-maintenance/archive-partitions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
-- Create a logging table for archive operations
CREATE TABLE IF NOT EXISTS archive_log (
id SERIAL PRIMARY KEY,
operation VARCHAR(50),
table_name TEXT,
s3_path TEXT,
rows_affected BIGINT,
status VARCHAR(50),
error_message TEXT,
executed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Configure AWS credentials
-- Note: In production, use IAM roles instead of embedding credentials
DO $ $ BEGIN EXECUTE 'ALTER DATABASE ' || current_database() || ' SET aws_s3.aws_access_key_id TO ''' || current_setting('aws_access_key_id', true) || '''';

EXECUTE 'ALTER DATABASE ' || current_database() || ' SET aws_s3.aws_secret_access_key TO ''' || current_setting('aws_secret_key', true) || '''';

EXECUTE 'ALTER DATABASE ' || current_database() || ' SET aws_s3.aws_region TO ''' || coalesce(current_setting('aws_region', true), 'us-east-1') || '''';

END $ $;

-- Function to archive a partition to S3
CREATE
OR REPLACE FUNCTION archive_partition_to_s3(
partition_table_name TEXT,
s3_bucket TEXT,
s3_prefix TEXT
) RETURNS BIGINT AS $ $ DECLARE s3_path TEXT;

rows_affected BIGINT;

current_time TEXT;

BEGIN -- Generate a timestamp for the archive
current_time := to_char(now(), 'YYYY_MM_DD_HH24_MI_SS');

-- Set the complete S3 path
s3_path := s3_prefix || '/' || partition_table_name || '/' || current_time || '.csv';

-- Count rows in the partition
EXECUTE 'SELECT COUNT(*) FROM ' || quote_ident(partition_table_name) INTO rows_affected;

-- Log the operation start
INSERT INTO
archive_log (
operation,
table_name,
s3_path,
rows_affected,
status
)
VALUES
(
'ARCHIVE_START',
partition_table_name,
s3_path,
rows_affected,
'IN_PROGRESS'
);

-- Export the data to S3
PERFORM aws_s3.table_export_to_s3(
'SELECT * FROM ' || quote_ident(partition_table_name),
aws_commons.create_s3_uri(
s3_bucket,
s3_path,
current_setting('aws_s3.aws_region', true)
),
options := 'FORMAT CSV, HEADER true'
);

-- Log successful completion
INSERT INTO
archive_log (
operation,
table_name,
s3_path,
rows_affected,
status
)
VALUES
(
'ARCHIVE_COMPLETE',
partition_table_name,
s3_path,
rows_affected,
'SUCCESS'
);

-- Create a local backup
COPY (
SELECT
*
FROM
partition_table_name
) TO '/s3-archives/' || partition_table_name || '_' || current_time || '.csv' WITH (FORMAT CSV, HEADER true);

RETURN rows_affected;

EXCEPTION
WHEN OTHERS THEN -- Log error
INSERT INTO
archive_log (
operation,
table_name,
s3_path,
rows_affected,
status,
error_message
)
VALUES
(
'ARCHIVE_ERROR',
partition_table_name,
s3_path,
rows_affected,
'ERROR',
SQLERRM
);

RAISE;

END;

$ $ LANGUAGE plpgsql;

-- Function to find and archive old data
CREATE
OR REPLACE FUNCTION archive_old_data(
months_to_keep INT,
s3_bucket TEXT,
s3_prefix TEXT
) RETURNS TABLE (table_name TEXT, rows_archived BIGINT) AS $ $ DECLARE archive_date BIGINT;

rec RECORD;

BEGIN -- Calculate the cutoff epoch (assuming epoch is in milliseconds)
archive_date := extract(
epoch
from
(
CURRENT_DATE - (months_to_keep || ' months') :: interval
)
) * 1000;

-- Archive old tasks
FOR rec IN
SELECT
'tasks' as table_name,
task_small_id
FROM
tasks
WHERE
deprecated_at_epoch < archive_date
AND is_deprecated = true LOOP table_name := rec.table_name;

rows_archived := archive_partition_to_s3(
'tasks_' || rec.task_small_id,
s3_bucket,
s3_prefix
);

RETURN NEXT;

END LOOP;

-- Archive old stacks
FOR rec IN
SELECT
'stacks' as table_name,
stack_small_id
FROM
stacks s
JOIN tasks t ON s.task_small_id = t.task_small_id
WHERE
t.deprecated_at_epoch < archive_date
AND t.is_deprecated = true LOOP table_name := rec.table_name;

rows_archived := archive_partition_to_s3(
'stacks_' || rec.stack_small_id,
s3_bucket,
s3_prefix
);

RETURN NEXT;

END LOOP;

-- Archive related settlement tickets
FOR rec IN
SELECT
'stack_settlement_tickets' as table_name,
sst.stack_small_id
FROM
stack_settlement_tickets sst
JOIN stacks s ON sst.stack_small_id = s.stack_small_id
JOIN tasks t ON s.task_small_id = t.task_small_id
WHERE
t.deprecated_at_epoch < archive_date
AND t.is_deprecated = true LOOP table_name := rec.table_name;

rows_archived := archive_partition_to_s3(
'stack_settlement_tickets_' || rec.stack_small_id,
s3_bucket,
s3_prefix
);

RETURN NEXT;

END LOOP;

-- Archive related disputes
FOR rec IN
SELECT
'stack_attestation_disputes' as table_name,
sad.stack_small_id
FROM
stack_attestation_disputes sad
JOIN stacks s ON sad.stack_small_id = s.stack_small_id
JOIN tasks t ON s.task_small_id = t.task_small_id
WHERE
t.deprecated_at_epoch < archive_date
AND t.is_deprecated = true LOOP table_name := rec.table_name;

rows_archived := archive_partition_to_s3(
'stack_attestation_disputes_' || rec.stack_small_id,
s3_bucket,
s3_prefix
);

RETURN NEXT;

END LOOP;

RETURN;

END;

$ $ LANGUAGE plpgsql;

-- Run the archive function
SELECT
*
FROM
archive_old_data(
1,
-- Keep 1 month of data in PostgreSQL
'your-s3-bucket-name',
-- Replace with your S3 bucket
'database-archives' -- S3 prefix
);
Loading