Tool black: auto-format Python code

This commit is contained in:
Odd Stråbø 2024-06-10 09:19:47 +00:00
parent e4d8084c25
commit 7763762212
54 changed files with 6545 additions and 4723 deletions

View file

@ -15,7 +15,7 @@ LOG = LazyLogger(__name__)
def jellyfin_url(client, handler):
return "%s/%s" % (client.config.data['auth.server'], handler)
return "%s/%s" % (client.config.data["auth.server"], handler)
def basic_info():
@ -42,9 +42,8 @@ def music_info():
class API(object):
"""All the api calls to the server."""
''' All the api calls to the server.
'''
def __init__(self, client, *args, **kwargs):
self.client = client
self.config = client.config
@ -54,18 +53,18 @@ class API(object):
if request is None:
request = {}
request.update({'type': action, 'handler': url})
request.update({"type": action, "handler": url})
return self.client.request(request)
def _get(self, handler, params=None):
return self._http("GET", handler, {'params': params})
return self._http("GET", handler, {"params": params})
def _post(self, handler, json=None, params=None):
return self._http("POST", handler, {'params': params, 'json': json})
return self._http("POST", handler, {"params": params, "json": json})
def _delete(self, handler, params=None):
return self._http("DELETE", handler, {'params': params})
return self._http("DELETE", handler, {"params": params})
#################################################################################################
@ -111,9 +110,17 @@ class API(object):
def artwork(self, item_id, art, max_width, ext="jpg", index=None):
if index is None:
return jellyfin_url(self.client, "Items/%s/Images/%s?MaxWidth=%s&format=%s" % (item_id, art, max_width, ext))
return jellyfin_url(
self.client,
"Items/%s/Images/%s?MaxWidth=%s&format=%s"
% (item_id, art, max_width, ext),
)
return jellyfin_url(self.client, "Items/%s/Images/%s/%s?MaxWidth=%s&format=%s" % (item_id, art, index, max_width, ext))
return jellyfin_url(
self.client,
"Items/%s/Images/%s/%s?MaxWidth=%s&format=%s"
% (item_id, art, index, max_width, ext),
)
#################################################################################################
@ -140,16 +147,16 @@ class API(object):
return self.users("/Items/%s" % item_id)
def get_items(self, item_ids):
return self.users("/Items", params={
'Ids': ','.join(str(x) for x in item_ids),
'Fields': info()
})
return self.users(
"/Items",
params={"Ids": ",".join(str(x) for x in item_ids), "Fields": info()},
)
def get_sessions(self):
return self.sessions(params={'ControllableByUserId': "{UserId}"})
return self.sessions(params={"ControllableByUserId": "{UserId}"})
def get_device(self, device_id):
return self.sessions(params={'DeviceId': device_id})
return self.sessions(params={"DeviceId": device_id})
def post_session(self, session_id, url, params=None, data=None):
return self.sessions("/%s/%s" % (session_id, url), "POST", params, data)
@ -158,64 +165,68 @@ class API(object):
return self.items("/%s/Images" % item_id)
def get_suggestion(self, media="Movie,Episode", limit=1):
return self.users("/Suggestions", params={
'Type': media,
'Limit': limit
})
return self.users("/Suggestions", params={"Type": media, "Limit": limit})
def get_recently_added(self, media=None, parent_id=None, limit=20):
return self.user_items("/Latest", {
'Limit': limit,
'UserId': "{UserId}",
'IncludeItemTypes': media,
'ParentId': parent_id,
'Fields': info()
})
return self.user_items(
"/Latest",
{
"Limit": limit,
"UserId": "{UserId}",
"IncludeItemTypes": media,
"ParentId": parent_id,
"Fields": info(),
},
)
def get_next(self, index=None, limit=1):
return self.shows("/NextUp", {
'Limit': limit,
'UserId': "{UserId}",
'StartIndex': None if index is None else int(index)
})
return self.shows(
"/NextUp",
{
"Limit": limit,
"UserId": "{UserId}",
"StartIndex": None if index is None else int(index),
},
)
def get_adjacent_episodes(self, show_id, item_id):
return self.shows("/%s/Episodes" % show_id, {
'UserId': "{UserId}",
'AdjacentTo': item_id,
'Fields': "Overview"
})
return self.shows(
"/%s/Episodes" % show_id,
{"UserId": "{UserId}", "AdjacentTo": item_id, "Fields": "Overview"},
)
def get_genres(self, parent_id=None):
return self._get("Genres", {
'ParentId': parent_id,
'UserId': "{UserId}",
'Fields': info()
})
return self._get(
"Genres", {"ParentId": parent_id, "UserId": "{UserId}", "Fields": info()}
)
def get_recommendation(self, parent_id=None, limit=20):
return self._get("Movies/Recommendations", {
'ParentId': parent_id,
'UserId': "{UserId}",
'Fields': info(),
'Limit': limit
})
return self._get(
"Movies/Recommendations",
{
"ParentId": parent_id,
"UserId": "{UserId}",
"Fields": info(),
"Limit": limit,
},
)
def get_items_by_letter(self, parent_id=None, media=None, letter=None):
return self.user_items(params={
'ParentId': parent_id,
'NameStartsWith': letter,
'Fields': info(),
'Recursive': True,
'IncludeItemTypes': media
})
return self.user_items(
params={
"ParentId": parent_id,
"NameStartsWith": letter,
"Fields": info(),
"Recursive": True,
"IncludeItemTypes": media,
}
)
def get_channels(self):
return self._get("LiveTv/Channels", {
'UserId': "{UserId}",
'EnableImages': True,
'EnableUserData': True
})
return self._get(
"LiveTv/Channels",
{"UserId": "{UserId}", "EnableImages": True, "EnableUserData": True},
)
def get_intros(self, item_id):
return self.user_items("/%s/Intros" % item_id)
@ -230,30 +241,26 @@ class API(object):
return self.user_items("/%s/LocalTrailers" % item_id)
def get_transcode_settings(self):
return self._get('System/Configuration/encoding')
return self._get("System/Configuration/encoding")
def get_ancestors(self, item_id):
return self.items("/%s/Ancestors" % item_id, params={
'UserId': "{UserId}"
})
return self.items("/%s/Ancestors" % item_id, params={"UserId": "{UserId}"})
def get_items_theme_video(self, parent_id):
return self.users("/Items", params={
'HasThemeVideo': True,
'ParentId': parent_id
})
return self.users(
"/Items", params={"HasThemeVideo": True, "ParentId": parent_id}
)
def get_themes(self, item_id):
return self.items("/%s/ThemeMedia" % item_id, params={
'UserId': "{UserId}",
'InheritFromParent': True
})
return self.items(
"/%s/ThemeMedia" % item_id,
params={"UserId": "{UserId}", "InheritFromParent": True},
)
def get_items_theme_song(self, parent_id):
return self.users("/Items", params={
'HasThemeSong': True,
'ParentId': parent_id
})
return self.users(
"/Items", params={"HasThemeSong": True, "ParentId": parent_id}
)
def check_companion_enabled(self):
"""
@ -262,8 +269,10 @@ class API(object):
None = Unknown
"""
try:
plugin_settings = self._get("Jellyfin.Plugin.KodiSyncQueue/GetPluginSettings") or {}
return plugin_settings.get('IsEnabled')
plugin_settings = (
self._get("Jellyfin.Plugin.KodiSyncQueue/GetPluginSettings") or {}
)
return plugin_settings.get("IsEnabled")
except requests.RequestException as e:
LOG.warning("Error checking companion installed state: %s", e)
@ -277,42 +286,51 @@ class API(object):
return None
def get_seasons(self, show_id):
return self.shows("/%s/Seasons" % show_id, params={
'UserId': "{UserId}",
'EnableImages': True,
'Fields': info()
})
return self.shows(
"/%s/Seasons" % show_id,
params={"UserId": "{UserId}", "EnableImages": True, "Fields": info()},
)
def get_date_modified(self, date, parent_id, media=None):
return self.users("/Items", params={
'ParentId': parent_id,
'Recursive': False,
'IsMissing': False,
'IsVirtualUnaired': False,
'IncludeItemTypes': media or None,
'MinDateLastSaved': date,
'Fields': info()
})
return self.users(
"/Items",
params={
"ParentId": parent_id,
"Recursive": False,
"IsMissing": False,
"IsVirtualUnaired": False,
"IncludeItemTypes": media or None,
"MinDateLastSaved": date,
"Fields": info(),
},
)
def get_userdata_date_modified(self, date, parent_id, media=None):
return self.users("/Items", params={
'ParentId': parent_id,
'Recursive': True,
'IsMissing': False,
'IsVirtualUnaired': False,
'IncludeItemTypes': media or None,
'MinDateLastSavedForUser': date,
'Fields': info()
})
return self.users(
"/Items",
params={
"ParentId": parent_id,
"Recursive": True,
"IsMissing": False,
"IsVirtualUnaired": False,
"IncludeItemTypes": media or None,
"MinDateLastSavedForUser": date,
"Fields": info(),
},
)
def refresh_item(self, item_id):
return self.items("/%s/Refresh" % item_id, "POST", json={
'Recursive': True,
'ImageRefreshMode': "FullRefresh",
'MetadataRefreshMode': "FullRefresh",
'ReplaceAllImages': False,
'ReplaceAllMetadata': True
})
return self.items(
"/%s/Refresh" % item_id,
"POST",
json={
"Recursive": True,
"ImageRefreshMode": "FullRefresh",
"MetadataRefreshMode": "FullRefresh",
"ReplaceAllImages": False,
"ReplaceAllMetadata": True,
},
)
def favorite(self, item_id, option=True):
return self.users("/FavoriteItems/%s" % item_id, "POST" if option else "DELETE")
@ -324,7 +342,9 @@ class API(object):
return self.sessions("/Capabilities/Full", "POST", json=data)
def session_add_user(self, session_id, user_id, option=True):
return self.sessions("/%s/User/%s" % (session_id, user_id), "POST" if option else "DELETE")
return self.sessions(
"/%s/User/%s" % (session_id, user_id), "POST" if option else "DELETE"
)
def session_playing(self, data):
return self.sessions("/Playing", "POST", json=data)
@ -339,115 +359,132 @@ class API(object):
return self.users("/PlayedItems/%s" % item_id, "POST" if watched else "DELETE")
def get_sync_queue(self, date, filters=None):
return self._get("Jellyfin.Plugin.KodiSyncQueue/{UserId}/GetItems", params={
'LastUpdateDT': date,
'filter': filters or 'None'
})
return self._get(
"Jellyfin.Plugin.KodiSyncQueue/{UserId}/GetItems",
params={"LastUpdateDT": date, "filter": filters or "None"},
)
def get_server_time(self):
return self._get("Jellyfin.Plugin.KodiSyncQueue/GetServerDateTime")
def get_play_info(self, item_id, profile):
return self.items("/%s/PlaybackInfo" % item_id, "POST", json={
'UserId': "{UserId}",
'DeviceProfile': profile,
'AutoOpenLiveStream': True
})
return self.items(
"/%s/PlaybackInfo" % item_id,
"POST",
json={
"UserId": "{UserId}",
"DeviceProfile": profile,
"AutoOpenLiveStream": True,
},
)
def get_live_stream(self, item_id, play_id, token, profile):
return self._post("LiveStreams/Open", json={
'UserId': "{UserId}",
'DeviceProfile': profile,
'OpenToken': token,
'PlaySessionId': play_id,
'ItemId': item_id
})
return self._post(
"LiveStreams/Open",
json={
"UserId": "{UserId}",
"DeviceProfile": profile,
"OpenToken": token,
"PlaySessionId": play_id,
"ItemId": item_id,
},
)
def close_live_stream(self, live_id):
return self._post("LiveStreams/Close", json={
'LiveStreamId': live_id
})
return self._post("LiveStreams/Close", json={"LiveStreamId": live_id})
def close_transcode(self, device_id, play_id):
return self._delete("Videos/ActiveEncodings", params={
'DeviceId': device_id,
'PlaySessionId': play_id
})
return self._delete(
"Videos/ActiveEncodings",
params={"DeviceId": device_id, "PlaySessionId": play_id},
)
def get_default_headers(self):
auth = "MediaBrowser "
auth += "Client=%s, " % self.config.data['app.name']
auth += "Device=%s, " % self.config.data['app.device_name']
auth += "DeviceId=%s, " % self.config.data['app.device_id']
auth += "Version=%s" % self.config.data['app.version']
auth += "Client=%s, " % self.config.data["app.name"]
auth += "Device=%s, " % self.config.data["app.device_name"]
auth += "DeviceId=%s, " % self.config.data["app.device_id"]
auth += "Version=%s" % self.config.data["app.version"]
return {
"Accept": "application/json",
"Content-type": "application/x-www-form-urlencoded; charset=UTF-8",
"X-Application": "%s/%s" % (self.config.data['app.name'], self.config.data['app.version']),
"X-Application": "%s/%s"
% (self.config.data["app.name"], self.config.data["app.version"]),
"Accept-Charset": "UTF-8,*",
"Accept-encoding": "gzip",
"User-Agent": self.config.data['http.user_agent'] or "%s/%s" % (self.config.data['app.name'], self.config.data['app.version']),
"x-emby-authorization": ensure_str(auth, 'utf-8')
"User-Agent": self.config.data["http.user_agent"]
or "%s/%s"
% (self.config.data["app.name"], self.config.data["app.version"]),
"x-emby-authorization": ensure_str(auth, "utf-8"),
}
def send_request(self, url, path, method="get", timeout=None, headers=None, data=None):
def send_request(
self, url, path, method="get", timeout=None, headers=None, data=None
):
request_method = getattr(requests, method.lower())
url = "%s/%s" % (url, path)
request_settings = {
"timeout": timeout or self.default_timeout,
"headers": headers or self.get_default_headers(),
"data": data
"data": data,
}
request_settings["verify"] = settings('sslverify.bool')
request_settings["verify"] = settings("sslverify.bool")
LOG.info("Sending %s request to %s" % (method, path))
LOG.debug(request_settings['timeout'])
LOG.debug(request_settings['headers'])
LOG.debug(request_settings["timeout"])
LOG.debug(request_settings["headers"])
return request_method(url, **request_settings)
def login(self, server_url, username, password=""):
path = "Users/AuthenticateByName"
auth_data = {
"username": username,
"Pw": password
}
auth_data = {"username": username, "Pw": password}
headers = self.get_default_headers()
headers.update({'Content-type': "application/json"})
headers.update({"Content-type": "application/json"})
try:
LOG.info("Trying to login to %s/%s as %s" % (server_url, path, username))
response = self.send_request(server_url, path, method="post", timeout=10, headers=headers, data=json.dumps(auth_data))
response = self.send_request(
server_url,
path,
method="post",
timeout=10,
headers=headers,
data=json.dumps(auth_data),
)
if response.status_code == 200:
return response.json()
else:
LOG.error("Failed to login to server with status code: " + str(response.status_code))
LOG.error(
"Failed to login to server with status code: "
+ str(response.status_code)
)
LOG.error("Server Response:\n" + str(response.content))
LOG.debug(headers)
return {}
except Exception as e: # Find exceptions for likely cases i.e, server timeout, etc
except (
Exception
) as e: # Find exceptions for likely cases i.e, server timeout, etc
LOG.error(e)
return {}
def validate_authentication_token(self, server):
auth_token_header = {
'X-MediaBrowser-Token': server['AccessToken']
}
auth_token_header = {"X-MediaBrowser-Token": server["AccessToken"]}
headers = self.get_default_headers()
headers.update(auth_token_header)
response = self.send_request(server['address'], "system/info", headers=headers)
response = self.send_request(server["address"], "system/info", headers=headers)
if response.status_code == 200:
return response.json()
else:
return {'Status_Code': response.status_code}
return {"Status_Code": response.status_code}
def get_public_info(self, server_address):
response = self.send_request(server_address, "system/info/public")
@ -459,8 +496,8 @@ class API(object):
return {}
def check_redirect(self, server_address):
''' Checks if the server is redirecting traffic to a new URL and
"""Checks if the server is redirecting traffic to a new URL and
returns the URL the server prefers to use
'''
"""
response = self.send_request(server_address, "system/info/public")
return response.url.replace('/system/info/public', '')
return response.url.replace("/system/info/public", "")