Skip to content

Commit 1e757b4

Browse files
author
chenyunliang520
committed
Add cluster_ha_showcase.py for demonstrating high availability, load balancing, and primary node discovery in GaussDB clusters
1 parent 1d847dc commit 1e757b4

File tree

1 file changed

+307
-0
lines changed

1 file changed

+307
-0
lines changed

example/cluster_ha_showcase.py

Lines changed: 307 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,307 @@
1+
# -*- coding: utf-8 -*-
2+
import random
3+
import time
4+
import logging
5+
import sys
6+
from gaussdb import connect, Error, Connection
7+
8+
# 配置日志
9+
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
10+
logger = logging.getLogger(__name__)
11+
12+
def conninfo_to_dict(dsn):
13+
"""将 DSN 字符串解析为字典"""
14+
params = {}
15+
for part in dsn.split():
16+
key, value = part.split("=", 1)
17+
params[key] = value
18+
return params
19+
20+
def get_nodes(params):
21+
"""从 DSN 解析主机和端口配对"""
22+
hosts = params['host'].split(',')
23+
ports = params['port'].split(',')
24+
return list(zip(hosts, ports))
25+
26+
def get_cluster_mode(conn: Connection) -> str:
27+
"""获取集群模式(master-standby、distributed、single、main standby 或 cascade standby)"""
28+
try:
29+
with conn.cursor() as cur:
30+
try:
31+
cur.execute("SELECT local_role FROM pg_stat_get_stream_replications()")
32+
local_role = cur.fetchone()[0].lower()
33+
if local_role in ('primary', 'standby'):
34+
return 'master-standby'
35+
elif local_role == 'normal':
36+
try:
37+
cur.execute("SELECT count(1) FROM pgxc_node")
38+
node_count = cur.fetchone()[0]
39+
return 'distributed' if node_count > 0 else 'single'
40+
except Error:
41+
logger.warning("pgxc_node 表不存在,返回 single 模式")
42+
return 'single'
43+
elif local_role == 'main standby':
44+
return 'main standby'
45+
elif local_role == 'cascade standby':
46+
return 'cascade standby'
47+
else:
48+
logger.warning(f"未知的 local_role: {local_role},返回 single 模式")
49+
return 'single'
50+
except Error:
51+
logger.warning("pg_stat_get_stream_replications 查询失败,返回 single 模式")
52+
return 'single'
53+
except Error as e:
54+
logger.error(f"获取集群模式失败: {e}")
55+
return 'single'
56+
57+
def get_node_role(conn: Connection, cluster_mode: str, host: str, port: str) -> str:
58+
"""获取节点角色(Primary/Standby 或 node_name)"""
59+
try:
60+
with conn.cursor() as cur:
61+
if cluster_mode in ('master-standby', 'main standby', 'cascade standby'):
62+
cur.execute("SELECT CASE WHEN pg_is_in_recovery() THEN 'Standby' ELSE 'Primary' END")
63+
return cur.fetchone()[0]
64+
elif cluster_mode == 'distributed':
65+
cur.execute("SELECT node_name, node_host FROM pgxc_node WHERE node_type = 'C' AND node_port = current_setting('port')::int")
66+
results = cur.fetchall()
67+
for node_name, node_host in results:
68+
if node_host == host:
69+
return node_name
70+
logger.warning(f"未找到匹配的 node_host: {host},返回 coordinator")
71+
return 'coordinator'
72+
else:
73+
return 'single'
74+
except Error as e:
75+
logger.error(f"获取节点角色失败 (host={host}, port={port}): {e}")
76+
return 'unknown'
77+
78+
def connect_with_retry(dsn: str, max_attempts: int = 5, timeout: int = 10) -> Connection:
79+
"""带重试的数据库连接"""
80+
for attempt in range(1, max_attempts + 1):
81+
try:
82+
start_time = time.time()
83+
conn = connect(dsn, connect_timeout=timeout, application_name='pg_connection_test')
84+
logger.info(f"连接成功: {dsn},耗时: {time.time() - start_time:.2f} 秒")
85+
return conn
86+
except Error as e:
87+
logger.error(f"连接失败 ({dsn}),第 {attempt}/{max_attempts} 次尝试: {e}")
88+
if attempt == max_attempts:
89+
raise
90+
time.sleep(2 ** attempt)
91+
92+
def disaster_recovery(params, simulate_failure: bool = False):
93+
"""容灾场景:优先连接主节点,失败则尝试其他节点"""
94+
print(f"\n=== 容灾场景测试{'(模拟主节点故障)' if simulate_failure else ''} ===")
95+
nodes = get_nodes(params)
96+
primary_dsn = f"host={nodes[0][0]} port={nodes[0][1]} user={params['user']} password={params['password']} dbname={params['dbname']}"
97+
other_dsns = [
98+
f"host={host} port={port} user={params['user']} password={params['password']} dbname={params['dbname']}"
99+
for host, port in nodes[1:]
100+
]
101+
102+
# 检测集群模式
103+
cluster_mode = 'single'
104+
if not simulate_failure:
105+
try:
106+
with connect_with_retry(primary_dsn) as conn:
107+
cluster_mode = get_cluster_mode(conn)
108+
role = get_node_role(conn, cluster_mode, nodes[0][0], nodes[0][1])
109+
print(f"容灾测试通过: 连接到节点 {nodes[0][0]}:{nodes[0][1]},角色: {role},模式: {cluster_mode}")
110+
return
111+
except Error as e:
112+
logger.error(f"主节点连接失败: {e}")
113+
114+
# 尝试其他节点
115+
for dsn, (host, port) in zip(other_dsns, nodes[1:]):
116+
try:
117+
with connect_with_retry(dsn) as conn:
118+
cluster_mode = get_cluster_mode(conn)
119+
role = get_node_role(conn, cluster_mode, host, port)
120+
print(f"容灾测试通过: 切换到节点 {host}:{port},角色: {role},模式: {cluster_mode}")
121+
return
122+
except Error as e:
123+
logger.error(f"节点 {host}:{port} 连接失败: {e}")
124+
125+
print("容灾测试失败: 无法连接到任何节点")
126+
127+
def load_balancing(params):
128+
"""负载均衡场景:写操作到主节点,读操作测试顺序和随机模式"""
129+
print("\n=== 负载均衡场景测试 ===")
130+
nodes = get_nodes(params)
131+
primary_dsn = f"host={nodes[0][0]} port={nodes[0][1]} user={params['user']} password={params['password']} dbname={params['dbname']}"
132+
all_dsns = [
133+
f"host={host} port={port} user={params['user']} password={params['password']} dbname={params['dbname']}"
134+
for host, port in nodes
135+
]
136+
137+
# 检测集群模式
138+
cluster_mode = 'single'
139+
try:
140+
with connect_with_retry(primary_dsn) as conn:
141+
cluster_mode = get_cluster_mode(conn)
142+
role = get_node_role(conn, cluster_mode, nodes[0][0], nodes[0][1])
143+
logger.info(f"主节点 {nodes[0][0]}:{nodes[0][1]},角色: {role},模式: {cluster_mode}")
144+
except Error as e:
145+
logger.error(f"主节点连接失败: {e}")
146+
return
147+
148+
# 写操作:连接主节点,创建普通表
149+
try:
150+
with connect_with_retry(primary_dsn) as conn:
151+
with conn.cursor() as cur:
152+
if cluster_mode == 'distributed':
153+
cur.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, data TEXT) DISTRIBUTE BY REPLICATION")
154+
else:
155+
cur.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, data TEXT)")
156+
cur.execute("TRUNCATE TABLE test_table")
157+
cur.execute("INSERT INTO test_table (id, data) VALUES (1, 'test write')")
158+
conn.commit()
159+
print(f"写操作成功: 连接到主节点 {nodes[0][0]}:{nodes[0][1]},角色: {role}")
160+
except Error as e:
161+
logger.error(f"写操作失败,主节点连接失败或数据库错误: {e}")
162+
return
163+
164+
# 读操作:测试顺序和随机模式
165+
for load_balance_mode in ["disable", "random"]:
166+
print(f"\n=== 测试 {load_balance_mode} 模式 ===")
167+
connected_nodes = set()
168+
connected_hosts = []
169+
unavailable_nodes = []
170+
171+
# 优先测试主节点
172+
dsn = primary_dsn
173+
try:
174+
with connect_with_retry(dsn) as conn:
175+
host = nodes[0][0]
176+
port = nodes[0][1]
177+
role = get_node_role(conn, cluster_mode, host, port)
178+
with conn.cursor() as cur:
179+
cur.execute("SELECT data FROM test_table WHERE id = 1")
180+
result = cur.fetchone()
181+
node_id = f"{host}:{port}:{role.lower()}"
182+
connected_nodes.add(node_id)
183+
connected_hosts.append(host)
184+
logger.info(f"读操作结果: {result}")
185+
print(f"读操作成功: 连接到节点 {host}:{port},角色: {role},数据: {result[0] if result else 'None'}")
186+
except Error as e:
187+
logger.error(f"读操作失败 ({nodes[0][0]}:{nodes[0][1]}): {e}")
188+
unavailable_nodes.append(f"{nodes[0][0]}:{nodes[0][1]}")
189+
190+
# 测试其他节点(19 次,总计 20 次读操作)
191+
shuffled_dsns = all_dsns.copy()
192+
if load_balance_mode == "random":
193+
random.shuffle(shuffled_dsns)
194+
else:
195+
shuffled_dsns = [primary_dsn] * 19
196+
197+
for dsn in shuffled_dsns[:19]:
198+
try:
199+
with connect_with_retry(dsn) as conn:
200+
host = next(h for h, p in nodes if f"host={h} port={p}" in dsn)
201+
port = next(p for h, p in nodes if h == host)
202+
role = get_node_role(conn, cluster_mode, host, port)
203+
with conn.cursor() as cur:
204+
cur.execute("SELECT data FROM test_table WHERE id = 1")
205+
result = cur.fetchone()
206+
node_id = f"{host}:{port}:{role.lower()}"
207+
connected_nodes.add(node_id)
208+
connected_hosts.append(host)
209+
logger.info(f"读操作结果: {result}")
210+
print(f"读操作成功: 连接到节点 {host}:{port},角色: {role},数据: {result[0] if result else 'None'}")
211+
except Error as e:
212+
logger.error(f"读操作失败 ({host}:{port}): {e}")
213+
unavailable_nodes.append(f"{host}:{port}")
214+
continue
215+
216+
# 验证连接顺序
217+
expected_hosts = [host for host, _ in nodes]
218+
if load_balance_mode == "disable":
219+
if connected_hosts == [nodes[0][0]] * len(connected_hosts):
220+
print(f"负载均衡测试通过 ({load_balance_mode} 模式): 连接顺序符合预期 {connected_hosts}")
221+
else:
222+
print(f"负载均衡测试失败 ({load_balance_mode} 模式): 连接顺序不符合预期 {connected_hosts}")
223+
else: # random
224+
if len(set(connected_hosts)) >= 2:
225+
print(f"负载均衡测试通过 ({load_balance_mode} 模式): 随机连接,包含多个节点 {connected_hosts}")
226+
if len(set(connected_hosts)) < len(expected_hosts):
227+
print(f"警告: 未连接到所有节点,缺失节点: {[h for h in expected_hosts if h not in connected_hosts]}")
228+
else:
229+
print(f"负载均衡测试失败 ({load_balance_mode} 模式): 未连接到多个节点 {connected_hosts}")
230+
if unavailable_nodes:
231+
print(f"警告: 以下节点不可用: {unavailable_nodes}")
232+
233+
# 清理表
234+
try:
235+
with connect_with_retry(primary_dsn) as conn:
236+
with conn.cursor() as cur:
237+
cur.execute("DROP TABLE IF EXISTS test_table")
238+
conn.commit()
239+
except Error as e:
240+
logger.error(f"清理表失败: {e}")
241+
242+
def auto_find_primary(params, simulate_failure: bool = False, max_retries: int = 3, retry_interval: int = 5):
243+
"""自动寻主场景:连接主节点(主备模式)或协调节点(分布式模式)"""
244+
print(f"\n=== 自动寻主场景测试{'(模拟主节点故障)' if simulate_failure else ''} ===")
245+
nodes = get_nodes(params)
246+
dsns = [
247+
f"host={host} port={port} user={params['user']} password={params['password']} dbname={params['dbname']}"
248+
for host, port in nodes
249+
]
250+
failed_nodes = []
251+
252+
# 如果模拟故障,跳过第一个节点
253+
start_index = 1 if simulate_failure else 0
254+
for attempt in range(1, max_retries + 1):
255+
for dsn, (host, port) in zip(dsns[start_index:], nodes[start_index:]):
256+
try:
257+
with connect_with_retry(dsn) as conn:
258+
cluster_mode = get_cluster_mode(conn)
259+
role = get_node_role(conn, cluster_mode, host, port)
260+
if cluster_mode in ('master-standby', 'main standby', 'cascade standby'):
261+
if role == 'Primary':
262+
print(f"自动寻主测试通过: 连接到主节点 {host}:{port},角色: {role}")
263+
return
264+
else:
265+
logger.info(f"节点 {host}:{port}{role},模式: {cluster_mode},继续查找")
266+
failed_nodes.append(f"{host}:{port} ({role})")
267+
elif cluster_mode == 'distributed':
268+
print(f"自动寻主测试通过: 连接到协调节点 {host}:{port},角色: {role}")
269+
return
270+
else:
271+
logger.info(f"节点 {host}:{port}{role},模式: {cluster_mode},继续查找")
272+
failed_nodes.append(f"{host}:{port} ({role})")
273+
except Error as e:
274+
logger.error(f"节点 {host}:{port} 连接失败: {e}")
275+
failed_nodes.append(f"{host}:{port} (连接失败)")
276+
continue
277+
if attempt < max_retries:
278+
logger.info(f"第 {attempt}/{max_retries} 次尝试未找到主节点,等待 {retry_interval} 秒后重试")
279+
time.sleep(retry_interval)
280+
281+
print(f"自动寻主测试失败: 尝试的节点 {failed_nodes},未找到主节点或协调节点")
282+
283+
def main(dsn: str):
284+
"""主函数:运行所有场景测试"""
285+
params = conninfo_to_dict(dsn)
286+
287+
# 容灾场景(正常)
288+
disaster_recovery(params, simulate_failure=False)
289+
290+
# 容灾场景(模拟主节点故障)
291+
disaster_recovery(params, simulate_failure=True)
292+
293+
# 负载均衡场景
294+
load_balancing(params)
295+
296+
# 自动寻主场景(正常)
297+
auto_find_primary(params, simulate_failure=False)
298+
299+
# 自动寻主场景(模拟主节点故障)
300+
auto_find_primary(params, simulate_failure=True)
301+
302+
if __name__ == "__main__":
303+
if len(sys.argv) != 2:
304+
print("export DSN=\"dbname=postgres user=root password=your_password host=192.xx.xx.xx,192.xx.xx.xx,192.xx.xx.xx port=8000,8000,8000\"")
305+
print("Usage: python3 master_standby.py \"$DSN\" > exec.log")
306+
sys.exit(1)
307+
main(sys.argv[1])

0 commit comments

Comments
 (0)