jellyfin-kodi/jellyfin_kodi/downloader.py
2020-09-27 04:20:24 +02:00

400 lines
11 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import division, absolute_import, print_function, unicode_literals
#################################################################################################
import threading
import concurrent.futures
from datetime import date
from six.moves import range, queue as Queue, zip
from kodi_six import xbmc
import requests
from helper import settings, stop, event, window, create_id
from jellyfin import Jellyfin
from jellyfin import api
from helper.exceptions import HTTPException
from helper import LazyLogger
#################################################################################################
LOG = LazyLogger(__name__)
LIMIT = min(int(settings('limitIndex') or 50), 50)
DTHREADS = int(settings('limitThreads') or 3)
#################################################################################################
def get_jellyfinserver_url(handler):
if handler.startswith('/'):
handler = handler[1:]
LOG.info("handler starts with /: %s", handler)
return "{server}/%s" % handler
def browse_info():
return (
"DateCreated,EpisodeCount,SeasonCount,Path,Genres,Studios,Taglines,MediaStreams,Overview,Etag,"
"ProductionLocations,Width,Height,RecursiveItemCount,ChildCount"
)
def _http(action, url, request=None, server_id=None):
if request is None:
request = {}
request.update({'url': url, 'type': action})
return Jellyfin(server_id).http.request(request)
def _get(handler, params=None, server_id=None):
return _http("GET", get_jellyfinserver_url(handler), {'params': params}, server_id)
def _post(handler, json=None, params=None, server_id=None):
return _http("POST", get_jellyfinserver_url(handler), {'params': params, 'json': json}, server_id)
def _delete(handler, params=None, server_id=None):
return _http("DELETE", get_jellyfinserver_url(handler), {'params': params}, server_id)
def validate_view(library_id, item_id):
''' This confirms a single item from the library matches the view it belongs to.
Used to detect grouped libraries.
'''
try:
result = _get("Users/{UserId}/Items", {
'ParentId': library_id,
'Recursive': True,
'Ids': item_id
})
except Exception as error:
LOG.exception(error)
return False
return True if len(result['Items']) else False
def get_single_item(parent_id, media):
return _get("Users/{UserId}/Items", {
'ParentId': parent_id,
'Recursive': True,
'Limit': 1,
'IncludeItemTypes': media
})
def get_filtered_section(parent_id=None, media=None, limit=None, recursive=None, sort=None, sort_order=None,
filters=None, extra=None, server_id=None):
''' Get dynamic listings.
'''
params = {
'ParentId': parent_id,
'IncludeItemTypes': media,
'IsMissing': False,
'Recursive': recursive if recursive is not None else True,
'Limit': limit,
'SortBy': sort or "SortName",
'SortOrder': sort_order or "Ascending",
'ImageTypeLimit': 1,
'IsVirtualUnaired': False,
'Fields': browse_info()
}
if filters:
if 'Boxsets' in filters:
filters.remove('Boxsets')
params['CollapseBoxSetItems'] = settings('groupedSets.bool')
params['Filters'] = ','.join(filters)
if settings('getCast.bool'):
params['Fields'] += ",People"
if media and 'Photo' in media:
params['Fields'] += ",Width,Height"
if extra is not None:
params.update(extra)
return _get("Users/{UserId}/Items", params, server_id)
def get_movies_by_boxset(boxset_id):
for items in get_items(boxset_id, "Movie"):
yield items
def get_episode_by_show(show_id):
query = {
'url': "Shows/%s/Episodes" % show_id,
'params': {
'EnableUserData': True,
'EnableImages': True,
'UserId': "{UserId}",
'Fields': api.info()
}
}
for items in _get_items(query):
yield items
def get_episode_by_season(show_id, season_id):
query = {
'url': "Shows/%s/Episodes" % show_id,
'params': {
'SeasonId': season_id,
'EnableUserData': True,
'EnableImages': True,
'UserId': "{UserId}",
'Fields': api.info()
}
}
for items in _get_items(query):
yield items
def get_item_count(parent_id, item_type=None, params=None):
url = "Users/{UserId}/Items"
query_params = {
'ParentId': parent_id,
'IncludeItemTypes': item_type,
'EnableTotalRecordCount': True,
'LocationTypes': "FileSystem,Remote,Offline",
'Recursive': True,
'Limit': 1
}
if params:
query_params['params'].update(params)
result = _get(url, query_params)
return result.get('TotalRecordCount', 1)
def get_items(parent_id, item_type=None, basic=False, params=None):
query = {
'url': "Users/{UserId}/Items",
'params': {
'ParentId': parent_id,
'IncludeItemTypes': item_type,
'SortBy': "SortName",
'SortOrder': "Ascending",
'Fields': api.basic_info() if basic else api.info(),
'CollapseBoxSetItems': False,
'IsVirtualUnaired': False,
'EnableTotalRecordCount': False,
'LocationTypes': "FileSystem,Remote,Offline",
'IsMissing': False,
'Recursive': True
}
}
if params:
query['params'].update(params)
for items in _get_items(query):
yield items
def get_artists(parent_id=None):
query = {
'url': 'Artists',
'params': {
'UserId': "{UserId}",
'ParentId': parent_id,
'SortBy': "SortName",
'SortOrder': "Ascending",
'Fields': api.music_info(),
'CollapseBoxSetItems': False,
'IsVirtualUnaired': False,
'EnableTotalRecordCount': False,
'LocationTypes': "FileSystem,Remote,Offline",
'IsMissing': False,
'Recursive': True
}
}
for items in _get_items(query):
yield items
@stop
def _get_items(query, server_id=None):
''' query = {
'url': string,
'params': dict -- opt, include StartIndex to resume
}
'''
items = {
'Items': [],
'TotalRecordCount': 0,
'RestorePoint': {}
}
url = query['url']
query.setdefault('params', {})
params = query['params']
try:
test_params = dict(params)
test_params['Limit'] = 1
test_params['EnableTotalRecordCount'] = True
items['TotalRecordCount'] = _get(url, test_params, server_id=server_id)['TotalRecordCount']
except Exception as error:
LOG.exception("Failed to retrieve the server response %s: %s params:%s", url, error, params)
else:
params.setdefault('StartIndex', 0)
def get_query_params(params, start, count):
params_copy = dict(params)
params_copy['StartIndex'] = start
params_copy['Limit'] = count
return params_copy
query_params = [
get_query_params(params, offset, LIMIT)
for offset
in range(params['StartIndex'], items['TotalRecordCount'], LIMIT)
]
# multiprocessing.dummy.Pool completes all requests in multiple threads but has to
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
p = concurrent.futures.ThreadPoolExecutor(DTHREADS)
results = p.map(lambda params: _get(url, params, server_id=server_id), query_params)
for params, result in zip(query_params, results):
query['params'] = params
result = result or {'Items': []}
# Mitigates #216 till the server validates the date provided is valid
if result['Items'][0].get('ProductionYear'):
try:
date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None
items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]
class GetItemWorker(threading.Thread):
is_done = False
def __init__(self, server, queue, output):
self.server = server
self.queue = queue
self.output = output
threading.Thread.__init__(self)
def run(self):
with requests.Session() as s:
while True:
try:
item_ids = self.queue.get(timeout=1)
except Queue.Empty:
self.is_done = True
LOG.info("--<[ q:download/%s ]", id(self))
return
request = {
'type': "GET",
'handler': "Users/{UserId}/Items",
'params': {
'Ids': ','.join(str(x) for x in item_ids),
'Fields': api.info()
}
}
try:
result = self.server.http.request(request, s)
for item in result['Items']:
if item['Type'] in self.output:
self.output[item['Type']].put(item)
except HTTPException as error:
LOG.error("--[ http status: %s ]", error.status)
if error.status == 'ServerUnreachable':
self.is_done = True
break
except Exception as error:
LOG.exception(error)
self.queue.task_done()
if window('jellyfin_should_stop.bool'):
break
class TheVoid(object):
def __init__(self, method, data):
''' If you call get, this will block until response is received.
This is used to communicate between entrypoints.
'''
if type(data) != dict:
raise Exception("unexpected data format")
data['VoidName'] = str(create_id())
LOG.info("---[ contact MU-TH-UR 6000/%s ]", method)
LOG.debug(data)
event(method, data)
self.method = method
self.data = data
def get(self):
while True:
response = window('jellyfin_%s.json' % self.data['VoidName'])
if response != "":
LOG.debug("--<[ nostromo/jellyfin_%s.json ]", self.data['VoidName'])
window('jellyfin_%s' % self.data['VoidName'], clear=True)
return response
if window('jellyfin_should_stop.bool'):
LOG.info("Abandon mission! A black hole just swallowed [ %s/%s ]", self.method, self.data['VoidName'])
return
xbmc.sleep(100)
LOG.info("--[ void/%s ]", self.data['VoidName'])