forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_concurrent_create_objects.py
More file actions
114 lines (99 loc) · 3.81 KB
/
test_concurrent_create_objects.py
File metadata and controls
114 lines (99 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2018 Snowflake Computing Inc. All right reserved.
#
from logging import getLogger
from multiprocessing.pool import ThreadPool
import pytest
try:
from parameters import (CONNECTION_PARAMETERS_ADMIN)
except:
CONNECTION_PARAMETERS_ADMIN ={}
logger = getLogger(__name__)
from snowflake.connector import ProgrammingError
from snowflake.connector.compat import TO_UNICODE
@pytest.mark.skipif(
not CONNECTION_PARAMETERS_ADMIN,
reason="Snowflake admin account is not accessible."
)
def test_snow5871(conn_cnx, db_parameters):
_test_snow5871(conn_cnx, db_parameters,
number_of_threads=5,
rt_max_outgoing_rate=60, rt_max_burst_size=5,
rt_max_borrowing_limt=1000, rt_reset_period=10000)
_test_snow5871(conn_cnx, db_parameters,
number_of_threads=40,
rt_max_outgoing_rate=60, rt_max_burst_size=1,
rt_max_borrowing_limt=200, rt_reset_period=1000)
def _create_a_table(meta):
cnx = meta['cnx']
name = meta['name']
try:
cnx.cursor().execute("""
create table {0} (aa int)
""".format(name))
# print("Success #" + meta['idx'])
return {'success': True}
except ProgrammingError:
logger.exception('Failed to create a table')
return {'success': False}
def _test_snow5871(conn_cnx,
db_parameters,
number_of_threads=10,
rt_max_outgoing_rate=60,
rt_max_burst_size=1,
rt_max_borrowing_limt=1000,
rt_reset_period=10000):
"""
SNOW-5871: rate limiting for creation of non-recycable objects
"""
logger.debug((
'number_of_threads = %s, rt_max_outgoing_rate = %s, '
'rt_max_burst_size = %s, rt_max_borrowing_limt = %s, '
'rt_reset_period = %s'),
number_of_threads, rt_max_outgoing_rate, rt_max_burst_size,
rt_max_borrowing_limt, rt_reset_period)
with conn_cnx(user=db_parameters['sf_user'],
password=db_parameters['sf_password'],
account=db_parameters['sf_account']) as cnx:
cnx.cursor().execute("""
alter system set
RT_MAX_OUTGOING_RATE={0},
RT_MAX_BURST_SIZE={1},
RT_MAX_BORROWING_LIMIT={2},
RT_RESET_PERIOD={3}""".format(
rt_max_outgoing_rate, rt_max_burst_size, rt_max_borrowing_limt, rt_reset_period))
try:
with conn_cnx() as cnx:
cnx.cursor().execute(
"create or replace database {name}_db".format(
name=db_parameters['name']))
meta = []
for i in range(number_of_threads):
meta.append({'idx': TO_UNICODE(i + 1),
'cnx': cnx,
'name': db_parameters[
'name'] + 'tbl_5871_' + TO_UNICODE(
i + 1)})
pool = ThreadPool(processes=number_of_threads)
results = pool.map(_create_a_table, meta)
success = 0
for r in results:
success += 1 if r['success'] else 0
# at least one should be success
assert success >= 1, 'success queries'
finally:
with conn_cnx() as cnx:
cnx.cursor().execute(
"drop database if exists {name}_db".format(
name=db_parameters['name']))
with conn_cnx(user=db_parameters['sf_user'],
password=db_parameters['sf_password'],
account=db_parameters['sf_account']) as cnx:
cnx.cursor().execute("""
alter system set
RT_MAX_OUTGOING_RATE=default,
RT_MAX_BURST_SIZE=default,
RT_RESET_PERIOD=default,
RT_MAX_BORROWING_LIMIT=default""")