mirror of
https://github.com/jellyfin/jellyfin-kodi.git
synced 2024-12-28 11:46:11 +00:00
170 lines
4.8 KiB
Python
170 lines
4.8 KiB
Python
|
"""A library of helper functions for the Cheroot test suite."""
|
||
|
|
||
|
from __future__ import absolute_import, division, print_function
|
||
|
__metaclass__ = type
|
||
|
|
||
|
import datetime
|
||
|
import logging
|
||
|
import os
|
||
|
import sys
|
||
|
import time
|
||
|
import threading
|
||
|
import types
|
||
|
|
||
|
from six.moves import http_client
|
||
|
|
||
|
import six
|
||
|
|
||
|
import cheroot.server
|
||
|
import cheroot.wsgi
|
||
|
|
||
|
from cheroot.test import webtest
|
||
|
|
||
|
log = logging.getLogger(__name__)
|
||
|
thisdir = os.path.abspath(os.path.dirname(__file__))
|
||
|
serverpem = os.path.join(os.getcwd(), thisdir, 'test.pem')
|
||
|
|
||
|
|
||
|
config = {
|
||
|
'bind_addr': ('127.0.0.1', 54583),
|
||
|
'server': 'wsgi',
|
||
|
'wsgi_app': None,
|
||
|
}
|
||
|
|
||
|
|
||
|
class CherootWebCase(webtest.WebCase):
|
||
|
"""Helper class for a web app test suite."""
|
||
|
|
||
|
script_name = ''
|
||
|
scheme = 'http'
|
||
|
|
||
|
available_servers = {
|
||
|
'wsgi': cheroot.wsgi.Server,
|
||
|
'native': cheroot.server.HTTPServer,
|
||
|
}
|
||
|
|
||
|
@classmethod
|
||
|
def setup_class(cls):
|
||
|
"""Create and run one HTTP server per class."""
|
||
|
conf = config.copy()
|
||
|
conf.update(getattr(cls, 'config', {}))
|
||
|
|
||
|
s_class = conf.pop('server', 'wsgi')
|
||
|
server_factory = cls.available_servers.get(s_class)
|
||
|
if server_factory is None:
|
||
|
raise RuntimeError('Unknown server in config: %s' % conf['server'])
|
||
|
cls.httpserver = server_factory(**conf)
|
||
|
|
||
|
cls.HOST, cls.PORT = cls.httpserver.bind_addr
|
||
|
if cls.httpserver.ssl_adapter is None:
|
||
|
ssl = ''
|
||
|
cls.scheme = 'http'
|
||
|
else:
|
||
|
ssl = ' (ssl)'
|
||
|
cls.HTTP_CONN = http_client.HTTPSConnection
|
||
|
cls.scheme = 'https'
|
||
|
|
||
|
v = sys.version.split()[0]
|
||
|
log.info('Python version used to run this test script: %s' % v)
|
||
|
log.info('Cheroot version: %s' % cheroot.__version__)
|
||
|
log.info('HTTP server version: %s%s' % (cls.httpserver.protocol, ssl))
|
||
|
log.info('PID: %s' % os.getpid())
|
||
|
|
||
|
if hasattr(cls, 'setup_server'):
|
||
|
# Clear the wsgi server so that
|
||
|
# it can be updated with the new root
|
||
|
cls.setup_server()
|
||
|
cls.start()
|
||
|
|
||
|
@classmethod
|
||
|
def teardown_class(cls):
|
||
|
"""Cleanup HTTP server."""
|
||
|
if hasattr(cls, 'setup_server'):
|
||
|
cls.stop()
|
||
|
|
||
|
@classmethod
|
||
|
def start(cls):
|
||
|
"""Load and start the HTTP server."""
|
||
|
threading.Thread(target=cls.httpserver.safe_start).start()
|
||
|
while not cls.httpserver.ready:
|
||
|
time.sleep(0.1)
|
||
|
|
||
|
@classmethod
|
||
|
def stop(cls):
|
||
|
"""Terminate HTTP server."""
|
||
|
cls.httpserver.stop()
|
||
|
td = getattr(cls, 'teardown', None)
|
||
|
if td:
|
||
|
td()
|
||
|
|
||
|
date_tolerance = 2
|
||
|
|
||
|
def assertEqualDates(self, dt1, dt2, seconds=None):
|
||
|
"""Assert abs(dt1 - dt2) is within Y seconds."""
|
||
|
if seconds is None:
|
||
|
seconds = self.date_tolerance
|
||
|
|
||
|
if dt1 > dt2:
|
||
|
diff = dt1 - dt2
|
||
|
else:
|
||
|
diff = dt2 - dt1
|
||
|
if not diff < datetime.timedelta(seconds=seconds):
|
||
|
raise AssertionError('%r and %r are not within %r seconds.' %
|
||
|
(dt1, dt2, seconds))
|
||
|
|
||
|
|
||
|
class Request:
|
||
|
"""HTTP request container."""
|
||
|
|
||
|
def __init__(self, environ):
|
||
|
"""Initialize HTTP request."""
|
||
|
self.environ = environ
|
||
|
|
||
|
|
||
|
class Response:
|
||
|
"""HTTP response container."""
|
||
|
|
||
|
def __init__(self):
|
||
|
"""Initialize HTTP response."""
|
||
|
self.status = '200 OK'
|
||
|
self.headers = {'Content-Type': 'text/html'}
|
||
|
self.body = None
|
||
|
|
||
|
def output(self):
|
||
|
"""Generate iterable response body object."""
|
||
|
if self.body is None:
|
||
|
return []
|
||
|
elif isinstance(self.body, six.text_type):
|
||
|
return [self.body.encode('iso-8859-1')]
|
||
|
elif isinstance(self.body, six.binary_type):
|
||
|
return [self.body]
|
||
|
else:
|
||
|
return [x.encode('iso-8859-1') for x in self.body]
|
||
|
|
||
|
|
||
|
class Controller:
|
||
|
"""WSGI app for tests."""
|
||
|
|
||
|
def __call__(self, environ, start_response):
|
||
|
"""WSGI request handler."""
|
||
|
req, resp = Request(environ), Response()
|
||
|
try:
|
||
|
# Python 3 supports unicode attribute names
|
||
|
# Python 2 encodes them
|
||
|
handler = self.handlers[environ['PATH_INFO']]
|
||
|
except KeyError:
|
||
|
resp.status = '404 Not Found'
|
||
|
else:
|
||
|
output = handler(req, resp)
|
||
|
if (output is not None and
|
||
|
not any(resp.status.startswith(status_code)
|
||
|
for status_code in ('204', '304'))):
|
||
|
resp.body = output
|
||
|
try:
|
||
|
resp.headers.setdefault('Content-Length', str(len(output)))
|
||
|
except TypeError:
|
||
|
if not isinstance(output, types.GeneratorType):
|
||
|
raise
|
||
|
start_response(resp.status, resp.headers.items())
|
||
|
return resp.output()
|