forked from snowflakedb/snowflake-connector-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_concurrent_insert.py
More file actions
181 lines (165 loc) · 5.79 KB
/
test_concurrent_insert.py
File metadata and controls
181 lines (165 loc) · 5.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# Copyright (c) 2012-2018 Snowflake Computing Inc. All right reserved.
#
"""
Concurrent test module
"""
from logging import getLogger
from multiprocessing.pool import ThreadPool
import pytest
try:
from parameters import (CONNECTION_PARAMETERS_ADMIN)
except:
CONNECTION_PARAMETERS_ADMIN = {}
logger = getLogger(__name__)
import snowflake.connector
from snowflake.connector.compat import TO_UNICODE
from snowflake.connector.errors import ProgrammingError
def _concurrent_insert(meta):
"""
Concurrent insert method
"""
cnx = snowflake.connector.connect(
user=meta['user'],
password=meta['password'],
host=meta['host'],
port=meta['port'],
account=meta['account'],
database=meta['database'],
schema=meta['schema'],
timezone='UTC',
protocol='http'
# tracing = logging.DEBUG,
)
try:
cnx.cursor().execute("use warehouse {0}".format(meta['warehouse']))
table = meta['table']
sql = "insert into {name} values(%(c1)s, %(c2)s)".format(name=table)
logger.debug(sql)
cnx.cursor().execute(sql, {
'c1': meta['idx'],
'c2': 'test string ' + meta['idx'],
})
meta['success'] = True
logger.debug("Succeeded process #%s", meta['idx'])
except:
logger.exception('failed to insert into a table [%s]', table)
meta['success'] = False
finally:
cnx.close()
return meta
@pytest.mark.skipif(
not CONNECTION_PARAMETERS_ADMIN,
reason="The user needs a privilege of create warehouse."
)
def test_concurrent_insert(conn_cnx, db_parameters):
"""
Concurrent insert tests. Inserts block on the one that's running.
"""
number_of_threads = 10 # change this to increase the concurrency
expected_success_runs = number_of_threads - 1
cnx_array = []
try:
with conn_cnx() as cnx:
cnx.cursor().execute("""
create or replace warehouse {0}
warehouse_type=standard
warehouse_size=small
""".format(db_parameters['name_wh']))
sql = """
create or replace table {name} (c1 integer, c2 string)
""".format(name=db_parameters['name'])
cnx.cursor().execute(sql)
for i in range(number_of_threads):
cnx_array.append({
'host': db_parameters['host'],
'port': db_parameters['port'],
'user': db_parameters['user'],
'password': db_parameters['password'],
'account': db_parameters['account'],
'database': db_parameters['database'],
'schema': db_parameters['schema'],
'table': db_parameters['name'],
'idx': TO_UNICODE(i),
'warehouse': db_parameters['name_wh']
})
pool = ThreadPool(processes=number_of_threads)
results = pool.map(
_concurrent_insert,
cnx_array)
success = 0
for record in results:
success += 1 if record['success'] else 0
# 9 threads or more
assert success >= expected_success_runs, "Number of success run"
c = cnx.cursor()
sql = "select * from {name} order by 1".format(
name=db_parameters['name'])
c.execute(sql)
for rec in c:
logger.debug(rec)
c.close()
finally:
with conn_cnx() as cnx:
cnx.cursor().execute(
"drop table if exists {0}".format(db_parameters['name']))
cnx.cursor().execute(
"drop warehouse if exists {0}".format(db_parameters['name_wh']))
def _concurrent_insert_using_connection(meta):
connection = meta['connection']
idx = meta['idx']
name = meta['name']
try:
connection.cursor().execute(
"INSERT INTO {name} VALUES(%s, %s)".format(
name=name),
(idx, 'test string{0}'.format(idx)))
except ProgrammingError as e:
if e.errno != 619: # SQL Execution Canceled
raise
@pytest.mark.skipif(
not CONNECTION_PARAMETERS_ADMIN,
reason="The user needs a privilege of create warehouse."
)
def test_concurrent_insert_using_connection(conn_cnx, db_parameters):
"""
Concurrent insert tests using the same connection
"""
try:
with conn_cnx() as cnx:
cnx.cursor().execute("""
create or replace warehouse {0}
warehouse_type=standard
warehouse_size=small
""".format(db_parameters['name_wh']))
cnx.cursor().execute("""
CREATE OR REPLACE TABLE {name} (c1 INTEGER, c2 STRING)
""".format(
name=db_parameters['name']))
number_of_threads = 5
metas = []
for i in range(number_of_threads):
metas.append({
'connection': cnx,
'idx': i,
'name': db_parameters['name'],
})
pool = ThreadPool(processes=number_of_threads)
pool.map(_concurrent_insert_using_connection, metas)
cnt = 0
for _ in cnx.cursor().execute(
"SELECT * FROM {name} ORDER BY 1".format(
name=db_parameters['name'])):
cnt += 1
assert cnt <= number_of_threads, \
"Number of records should be less than the number of threads"
assert cnt > 0, \
"Number of records should be one or more number of threads"
finally:
with conn_cnx() as cnx:
cnx.cursor().execute(
"drop table if exists {0}".format(db_parameters['name']))
cnx.cursor().execute(
"drop warehouse if exists {0}".format(db_parameters['name_wh']))