22
33import os
44import sys
5+
56from gaussdb import connect
67
78# Set the GaussDB implementation
1213SCHEMA = "demo_schema"
1314TABLE = "demo_table"
1415
16+
1517def _slot_exists (conn , slot_name ):
1618 """Check if a replication slot exists."""
1719 with conn .cursor () as cur :
@@ -22,6 +24,7 @@ def _slot_exists(conn, slot_name):
2224 row = cur .fetchone ()
2325 return bool (row and row [0 ] > 0 )
2426
27+
2528def _cleanup_slot_and_schema (conn ):
2629 """Clean up replication slot and schema."""
2730 with conn .cursor () as cur :
@@ -43,6 +46,7 @@ def _cleanup_slot_and_schema(conn):
4346 print (f"Error dropping schema: { e } " )
4447 conn .commit ()
4548
49+
4650def _create_slot (conn ):
4751 """Create a logical replication slot."""
4852 with conn .cursor () as cur :
@@ -53,6 +57,7 @@ def _create_slot(conn):
5357 conn .commit ()
5458 print (f"Created replication slot: { SLOT_NAME } " )
5559
60+
5661def _read_changes (conn ):
5762 """Read changes from the replication slot."""
5863 with conn .cursor () as cur :
@@ -63,6 +68,7 @@ def _read_changes(conn):
6368 rows = cur .fetchall ()
6469 return [str (row [0 ]) for row in rows if row and row [0 ] is not None ]
6570
71+
6672def main ():
6773 # Get database connection string from environment variable
6874 dsn = os .environ .get ("GAUSSDB_TEST_DSN" )
@@ -75,7 +81,9 @@ def main():
7581 sys .exit (1 )
7682
7783 # Connect to the database
78- with connect (dsn , connect_timeout = 10 , application_name = "logical-replication-demo" ) as conn :
84+ with connect (
85+ dsn , connect_timeout = 10 , application_name = "logical-replication-demo"
86+ ) as conn :
7987 # Display server information
8088 with conn .cursor () as cur :
8189 server_version = conn .execute ("SELECT version()" ).fetchall ()[0 ][0 ]
@@ -89,7 +97,9 @@ def main():
8997 with conn .cursor () as cur :
9098 cur .execute (f"CREATE SCHEMA { SCHEMA } ;" )
9199 cur .execute (f"SET search_path TO { SCHEMA } ;" )
92- cur .execute (f"CREATE TABLE { TABLE } (id int PRIMARY KEY, name varchar(255));" )
100+ cur .execute (
101+ f"CREATE TABLE { TABLE } (id int PRIMARY KEY, name varchar(255));"
102+ )
93103 cur .execute (f"ALTER TABLE { TABLE } REPLICA IDENTITY FULL;" )
94104 conn .commit ()
95105 print (f"Created schema { SCHEMA } and table { TABLE } " )
@@ -105,7 +115,9 @@ def main():
105115 print ("Inserted: (1, 'hello world')" )
106116
107117 # Update
108- cur .execute (f"UPDATE { TABLE } SET name = %s WHERE id = %s;" , ("hello gaussdb" , 1 ))
118+ cur .execute (
119+ f"UPDATE { TABLE } SET name = %s WHERE id = %s;" , ("hello gaussdb" , 1 )
120+ )
109121 conn .commit ()
110122 print ("Updated: name to 'hello gaussdb' for id=1" )
111123
@@ -124,5 +136,6 @@ def main():
124136 _cleanup_slot_and_schema (conn )
125137 print (f"Cleaned up slot { SLOT_NAME } and schema { SCHEMA } " )
126138
139+
127140if __name__ == "__main__" :
128- main ()
141+ main ()
0 commit comments