Skip to content

Commit 63cabfc

Browse files
author
chenyunliang520
committed
add pytest for verifying logical replication and decoding (create, insert, update, delete, drop)
1 parent 5aa4150 commit 63cabfc

File tree

1 file changed

+148
-0
lines changed

1 file changed

+148
-0
lines changed

tests/test_logical_decoding.py

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
import pytest
2+
3+
SLOT_NAME = "slot_test"
4+
SCHEMA = "my_schema"
5+
TABLE = "test01"
6+
7+
8+
def _slot_exists(conn, slot_name):
9+
cur = conn.cursor()
10+
cur.execute(
11+
"SELECT count(1) FROM pg_replication_slots WHERE slot_name = %s",
12+
(slot_name,),
13+
)
14+
row = cur.fetchone()
15+
return bool(row and row[0] > 0)
16+
17+
18+
def _cleanup_slot_and_schema(conn):
19+
cur = conn.cursor()
20+
# Drop slot if exists
21+
try:
22+
cur.execute(
23+
"SELECT count(1) FROM pg_replication_slots WHERE slot_name = %s",
24+
(SLOT_NAME,),
25+
)
26+
if cur.fetchone()[0] > 0:
27+
cur.execute("SELECT pg_drop_replication_slot(%s);", (SLOT_NAME,))
28+
except Exception:
29+
pass
30+
31+
# Drop schema cascade
32+
try:
33+
cur.execute(f"DROP SCHEMA IF EXISTS {SCHEMA} CASCADE;")
34+
except Exception:
35+
pass
36+
conn.commit()
37+
38+
39+
@pytest.fixture(scope="function")
40+
def setup_env(conn):
41+
"""Ensure clean environment for each test."""
42+
_cleanup_slot_and_schema(conn)
43+
cur = conn.cursor()
44+
cur.execute(f"CREATE SCHEMA {SCHEMA};")
45+
cur.execute(f"SET search_path TO {SCHEMA};")
46+
cur.execute(f"CREATE TABLE {TABLE} (id int, name varchar(255));")
47+
cur.execute(f"ALTER TABLE {TABLE} REPLICA IDENTITY FULL;")
48+
conn.commit()
49+
yield conn
50+
_cleanup_slot_and_schema(conn)
51+
52+
53+
def _create_slot(cur):
54+
cur.execute(
55+
"SELECT * FROM pg_create_logical_replication_slot(%s, %s);",
56+
(SLOT_NAME, "mppdb_decoding"),
57+
)
58+
59+
60+
def _read_changes(cur):
61+
cur.execute(
62+
"SELECT data FROM pg_logical_slot_peek_changes(%s, NULL, %s);",
63+
(SLOT_NAME, 4096),
64+
)
65+
rows = cur.fetchall()
66+
return [str(r[0]) for r in rows if r and r[0] is not None]
67+
68+
69+
def test_create_replication_slot(setup_env):
70+
cur = setup_env.cursor()
71+
_create_slot(cur)
72+
assert _slot_exists(setup_env, SLOT_NAME)
73+
74+
75+
def test_insert_produces_changes(setup_env):
76+
cur = setup_env.cursor()
77+
_create_slot(cur)
78+
assert _slot_exists(setup_env, SLOT_NAME)
79+
80+
# insert
81+
cur.execute(f"INSERT INTO {TABLE} VALUES (%s, %s);", (1, "hello world"))
82+
setup_env.commit()
83+
84+
changes = _read_changes(cur)
85+
joined = "\n".join(changes).lower()
86+
87+
assert "insert" in joined, "Insert event not present"
88+
assert "hello world" in joined, "Inserted value missing"
89+
90+
91+
def test_update_produces_changes(setup_env):
92+
cur = setup_env.cursor()
93+
_create_slot(cur)
94+
assert _slot_exists(setup_env, SLOT_NAME)
95+
96+
# prepare row
97+
cur.execute(f"INSERT INTO {TABLE} VALUES (%s, %s);", (1, "hello world"))
98+
setup_env.commit()
99+
100+
# update
101+
cur.execute(
102+
f"UPDATE {TABLE} SET name = %s WHERE id = %s;",
103+
("hello gaussdb", 1),
104+
)
105+
setup_env.commit()
106+
107+
changes = _read_changes(cur)
108+
joined = "\n".join(changes).lower()
109+
110+
assert "update" in joined, "Update event not present"
111+
assert "hello gaussdb" in joined, "Updated value missing"
112+
113+
114+
def test_delete_produces_changes(setup_env):
115+
cur = setup_env.cursor()
116+
_create_slot(cur)
117+
assert _slot_exists(setup_env, SLOT_NAME)
118+
119+
# prepare row
120+
cur.execute(f"INSERT INTO {TABLE} VALUES (%s, %s);", (1, "to_delete"))
121+
setup_env.commit()
122+
123+
# delete
124+
cur.execute(f"DELETE FROM {TABLE} WHERE id = %s;", (1,))
125+
setup_env.commit()
126+
127+
changes = _read_changes(cur)
128+
joined = "\n".join(changes).lower()
129+
130+
assert "delete" in joined, "Delete event not present"
131+
assert "to_delete" in joined, "Deleted value missing"
132+
133+
134+
def test_drop_replication_slot(setup_env):
135+
cur = setup_env.cursor()
136+
_create_slot(cur)
137+
assert _slot_exists(setup_env, SLOT_NAME)
138+
139+
# drop slot
140+
cur.execute("SELECT pg_drop_replication_slot(%s);", (SLOT_NAME,))
141+
setup_env.commit()
142+
143+
# verify removed
144+
cur.execute(
145+
"SELECT count(1) FROM pg_replication_slots WHERE slot_name = %s;",
146+
(SLOT_NAME,),
147+
)
148+
assert cur.fetchone()[0] == 0

0 commit comments

Comments
 (0)