Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
3 changes: 3 additions & 0 deletions .openapi-generator/custom_templates/requirements.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
python_dateutil >= 2.5.3
setuptools >= 21.0.0
urllib3 ~= 2.6.1
344 changes: 344 additions & 0 deletions .openapi-generator/custom_templates/rest.mustache
Original file line number Diff line number Diff line change
@@ -0,0 +1,344 @@
{{>partial_header}}

import io
import json
import logging
import re
import ssl
from urllib.parse import urlencode
from urllib.parse import urlparse
from urllib.request import proxy_bypass_environment
import urllib3
import ipaddress

from {{packageName}}.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError


logger = logging.getLogger(__name__)


class RESTResponse(io.IOBase):

def __init__(self, resp):
self.urllib3_response = resp
self.status = resp.status
self.reason = resp.reason
self.data = resp.data

def getheaders(self):
"""Returns a dictionary of the response headers."""
return self.urllib3_response.headers

def getheader(self, name, default=None):
"""Returns a given response header."""
return self.urllib3_response.headers.get(name, default)


class RESTClientObject(object):

def __init__(self, configuration, pools_size=4, maxsize=None):
# urllib3.PoolManager will pass all kw parameters to connectionpool
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501

# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE

addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501

if configuration.retries is not None:
addition_pool_args['retries'] = configuration.retries

if configuration.socket_options is not None:
addition_pool_args['socket_options'] = configuration.socket_options

if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
else:
maxsize = 4

# https pool manager
if configuration.proxy and not should_bypass_proxies(
configuration.host, no_proxy=configuration.no_proxy or ''):
self.pool_manager = urllib3.ProxyManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=configuration.ssl_ca_cert,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
proxy_url=configuration.proxy,
proxy_headers=configuration.proxy_headers,
**addition_pool_args
)
else:
self.pool_manager = urllib3.PoolManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=configuration.ssl_ca_cert,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
**addition_pool_args
)

def request(self, method, url, query_params=None, headers=None,
body=None, post_params=None, _preload_content=True,
_request_timeout=None):
"""Perform requests.

:param method: http request method
:param url: http request url
:param query_params: query parameters in the url
:param headers: http request headers
:param body: request json body, for `application/json`
:param post_params: request post parameters,
`application/x-www-form-urlencoded`
and `multipart/form-data`
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
"""
method = method.upper()
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
'PATCH', 'OPTIONS']

if post_params and body:
raise ApiValueError(
"body parameter cannot be used with post_params parameter."
)

post_params = post_params or {}
headers = headers or {}

timeout = None
if _request_timeout:
if isinstance(_request_timeout, (int, float)): # noqa: E501,F821
timeout = urllib3.Timeout(total=_request_timeout)
elif (isinstance(_request_timeout, tuple) and
len(_request_timeout) == 2):
timeout = urllib3.Timeout(
connect=_request_timeout[0], read=_request_timeout[1])

try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
# Only set a default Content-Type for POST, PUT, PATCH and OPTIONS requests
if (method != 'DELETE') and ('Content-Type' not in headers):
headers['Content-Type'] = 'application/json'
if query_params:
url += '?' + urlencode(query_params)
if ('Content-Type' not in headers) or (re.search('json',
headers['Content-Type'], re.IGNORECASE)):
request_body = None
if body is not None:
request_body = json.dumps(body)
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=False,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=True,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is
# provided in serialized form
elif isinstance(body, str) or isinstance(body, bytes):
request_body = body
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided
arguments. Please check that your arguments match
declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)

if _preload_content:
r = RESTResponse(r)

# log response body
logger.debug("response body: %s", r.data)

if not 200 <= r.status <= 299:
if r.status == 401:
raise UnauthorizedException(http_resp=r)

if r.status == 403:
raise ForbiddenException(http_resp=r)

if r.status == 404:
raise NotFoundException(http_resp=r)

if 500 <= r.status <= 599:
raise ServiceException(http_resp=r)

raise ApiException(http_resp=r)

return r

def GET(self, url, headers=None, query_params=None, _preload_content=True,
_request_timeout=None):
return self.request("GET", url,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
query_params=query_params)

def HEAD(self, url, headers=None, query_params=None, _preload_content=True,
_request_timeout=None):
return self.request("HEAD", url,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
query_params=query_params)

def OPTIONS(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("OPTIONS", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)

def DELETE(self, url, headers=None, query_params=None, body=None,
_preload_content=True, _request_timeout=None):
return self.request("DELETE", url,
headers=headers,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)

def POST(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("POST", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)

def PUT(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("PUT", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)

def PATCH(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("PATCH", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)

# end of class RESTClientObject


def is_ipv4(target):
""" Test if IPv4 address or not
"""
try:
chk = ipaddress.IPv4Address(target)
return True
except ipaddress.AddressValueError:
return False


def in_ipv4net(target, net):
""" Test if target belongs to given IPv4 network
"""
try:
nw = ipaddress.IPv4Network(net)
ip = ipaddress.IPv4Address(target)
if ip in nw:
return True
return False
except ipaddress.AddressValueError:
return False
except ipaddress.NetmaskValueError:
return False


def should_bypass_proxies(url, no_proxy=None):
""" Yet another requests.should_bypass_proxies
Test if proxies should not be used for a particular url.
"""

parsed = urlparse(url)

# special cases
if parsed.hostname in [None, '']:
return True

# special cases
if no_proxy in [None, '']:
return False
if no_proxy == '*':
return True

no_proxy = no_proxy.lower().replace(' ', '');
entries = (
host for host in no_proxy.split(',') if host
)

if is_ipv4(parsed.hostname):
for item in entries:
if in_ipv4net(parsed.hostname, item):
return True
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy})
2 changes: 1 addition & 1 deletion .openapi-generator/custom_templates/setup.mustache
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ VERSION = "{{packageVersion}}"
# http://pypi.python.org/pypi/setuptools

REQUIRES = [
"urllib3 >= 1.25.3",
"urllib3 >= 2.6.1",
"python-dateutil",
{{#asyncio}}
"aiohttp >= 3.0.0",
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,7 @@ services:
# This is CRITICAL for Python SDK tests which expect consistent fixture names
data-loader:
image: 020413372491.dkr.ecr.us-east-1.amazonaws.com/nas/data-loader:master
platform: linux/amd64
pull_policy: always
tty: true
depends_on:
Expand Down
Loading
Loading