jellyfin-kodi/jellyfin_kodi/downloader.py
Chuddah 302880f67a Parallelize multiple http GET requests
Added ThreadPoolExecutor and used to process GET requests in multiple
threads which enables chunks of data to always be available for
processing. Processing of the data can happen as soon as the first chunk
arrives.

Refactored the code to help implement. The idea is the "params" are
built in batch and passed to the thread pool which get the actual
results.
2020-02-19 22:18:54 +00:00

387 lines
11 KiB
Python

# -*- coding: utf-8 -*-
from __future__ import division, absolute_import, print_function, unicode_literals
#################################################################################################
import logging
import threading
from six.moves import queue as Queue
from kodi_six import xbmc
import requests
from helper import settings, stop, event, window, create_id
from jellyfin import Jellyfin
from jellyfin import api
from jellyfin.exceptions import HTTPException
#################################################################################################
LOG = logging.getLogger("JELLYFIN." + __name__)
LIMIT = min(int(settings('limitIndex') or 50), 50)
DTHREADS = int(settings('limitThreads') or 3)
#################################################################################################
def get_jellyfinserver_url(handler):
if handler.startswith('/'):
handler = handler[1:]
LOG.info("handler starts with /: %s", handler)
return "{server}/%s" % handler
def browse_info():
return (
"DateCreated,EpisodeCount,SeasonCount,Path,Genres,Studios,Taglines,MediaStreams,Overview,Etag,"
"ProductionLocations,Width,Height,RecursiveItemCount,ChildCount"
)
def _http(action, url, request={}, server_id=None):
request.update({'url': url, 'type': action})
return Jellyfin(server_id).http.request(request)
def _get(handler, params=None, server_id=None):
return _http("GET", get_jellyfinserver_url(handler), {'params': params}, server_id)
def _post(handler, json=None, params=None, server_id=None):
return _http("POST", get_jellyfinserver_url(handler), {'params': params, 'json': json}, server_id)
def _delete(handler, params=None, server_id=None):
return _http("DELETE", get_jellyfinserver_url(handler), {'params': params}, server_id)
def validate_view(library_id, item_id):
''' This confirms a single item from the library matches the view it belongs to.
Used to detect grouped libraries.
'''
try:
result = _get("Users/{UserId}/Items", {
'ParentId': library_id,
'Recursive': True,
'Ids': item_id
})
except Exception as error:
LOG.exception(error)
return False
return True if len(result['Items']) else False
def get_single_item(parent_id, media):
return _get("Users/{UserId}/Items", {
'ParentId': parent_id,
'Recursive': True,
'Limit': 1,
'IncludeItemTypes': media
})
def get_filtered_section(parent_id=None, media=None, limit=None, recursive=None, sort=None, sort_order=None,
filters=None, extra=None, server_id=None):
''' Get dynamic listings.
'''
params = {
'ParentId': parent_id,
'IncludeItemTypes': media,
'IsMissing': False,
'Recursive': recursive if recursive is not None else True,
'Limit': limit,
'SortBy': sort or "SortName",
'SortOrder': sort_order or "Ascending",
'ImageTypeLimit': 1,
'IsVirtualUnaired': False,
'Fields': browse_info()
}
if filters:
if 'Boxsets' in filters:
filters.remove('Boxsets')
params['CollapseBoxSetItems'] = settings('groupedSets.bool')
params['Filters'] = ','.join(filters)
if settings('getCast.bool'):
params['Fields'] += ",People"
if media and 'Photo' in media:
params['Fields'] += ",Width,Height"
if extra is not None:
params.update(extra)
return _get("Users/{UserId}/Items", params, server_id)
def get_movies_by_boxset(boxset_id):
for items in get_items(boxset_id, "Movie"):
yield items
def get_episode_by_show(show_id):
query = {
'url': "Shows/%s/Episodes" % show_id,
'params': {
'EnableUserData': True,
'EnableImages': True,
'UserId': "{UserId}",
'Fields': api.info()
}
}
for items in _get_items(query):
yield items
def get_episode_by_season(show_id, season_id):
query = {
'url': "Shows/%s/Episodes" % show_id,
'params': {
'SeasonId': season_id,
'EnableUserData': True,
'EnableImages': True,
'UserId': "{UserId}",
'Fields': api.info()
}
}
for items in _get_items(query):
yield items
def get_items(parent_id, item_type=None, basic=False, params=None):
query = {
'url': "Users/{UserId}/Items",
'params': {
'ParentId': parent_id,
'IncludeItemTypes': item_type,
'SortBy': "SortName",
'SortOrder': "Ascending",
'Fields': api.basic_info() if basic else api.info(),
'CollapseBoxSetItems': False,
'IsVirtualUnaired': False,
'EnableTotalRecordCount': False,
'LocationTypes': "FileSystem,Remote,Offline",
'IsMissing': False,
'Recursive': True
}
}
if params:
query['params'].update(params)
for items in _get_items(query):
yield items
def get_artists(parent_id=None, basic=False, params=None, server_id=None):
query = {
'url': "Artists",
'params': {
'UserId': "{UserId}",
'ParentId': parent_id,
'SortBy': "SortName",
'SortOrder': "Ascending",
'Fields': api.basic_info() if basic else api.music_info(),
'CollapseBoxSetItems': False,
'IsVirtualUnaired': False,
'EnableTotalRecordCount': False,
'LocationTypes': "FileSystem,Remote,Offline",
'IsMissing': False,
'Recursive': True
}
}
if params:
query['params'].update(params)
for items in _get_items(query, server_id):
yield items
def get_albums_by_artist(artist_id, basic=False):
params = {
'SortBy': "DateCreated",
'ArtistIds': artist_id
}
for items in get_items(None, "MusicAlbum", basic, params):
yield items
def get_songs_by_artist(artist_id, basic=False):
params = {
'SortBy': "DateCreated",
'ArtistIds': artist_id
}
for items in get_items(None, "Audio", basic, params):
yield items
@stop()
def _get_items(query, server_id=None):
''' query = {
'url': string,
'params': dict -- opt, include StartIndex to resume
}
'''
items = {
'Items': [],
'TotalRecordCount': 0,
'RestorePoint': {}
}
url = query['url']
query.setdefault('params', {})
params = query['params']
try:
test_params = dict(params)
test_params['Limit'] = 1
test_params['EnableTotalRecordCount'] = True
items['TotalRecordCount'] = _get(url, test_params, server_id=server_id)['TotalRecordCount']
except Exception as error:
LOG.exception("Failed to retrieve the server response %s: %s params:%s", url, error, params)
else:
params.setdefault('StartIndex', 0)
def get_query_params(params, start, count):
params_copy = dict(params)
params_copy['StartIndex'] = start
params_copy['Limit'] = count
return params_copy
query_params = [get_query_params(params, offset, LIMIT) \
for offset in xrange(params['StartIndex'], items['TotalRecordCount'], LIMIT)]
from itertools import izip
# multiprocessing.dummy.Pool completes all requests in multiple threads but has to
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
import concurrent.futures
p = concurrent.futures.ThreadPoolExecutor(DTHREADS)
results = p.map(lambda params: _get(url, params, server_id=server_id), query_params)
for params, result in izip(query_params, results):
query['params'] = params
result = result or {'Items': []}
items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]
class GetItemWorker(threading.Thread):
is_done = False
def __init__(self, server, queue, output):
self.server = server
self.queue = queue
self.output = output
threading.Thread.__init__(self)
def run(self):
with requests.Session() as s:
while True:
try:
item_ids = self.queue.get(timeout=1)
except Queue.Empty:
self.is_done = True
LOG.info("--<[ q:download/%s ]", id(self))
return
request = {
'type': "GET",
'handler': "Users/{UserId}/Items",
'params': {
'Ids': ','.join(str(x) for x in item_ids),
'Fields': api.info()
}
}
try:
result = self.server.http.request(request, s)
for item in result['Items']:
if item['Type'] in self.output:
self.output[item['Type']].put(item)
except HTTPException as error:
LOG.error("--[ http status: %s ]", error.status)
if error.status == 'ServerUnreachable':
self.is_done = True
break
except Exception as error:
LOG.exception(error)
self.queue.task_done()
if window('jellyfin_should_stop.bool'):
break
class TheVoid(object):
def __init__(self, method, data):
''' If you call get, this will block until response is received.
This is used to communicate between entrypoints.
'''
if type(data) != dict:
raise Exception("unexpected data format")
data['VoidName'] = str(create_id())
LOG.info("---[ contact MU-TH-UR 6000/%s ]", method)
LOG.debug(data)
event(method, data)
self.method = method
self.data = data
def get(self):
while True:
response = window('jellyfin_%s.json' % self.data['VoidName'])
if response != "":
LOG.debug("--<[ nostromo/jellyfin_%s.json ]", self.data['VoidName'])
window('jellyfin_%s' % self.data['VoidName'], clear=True)
return response
if window('jellyfin_should_stop.bool'):
LOG.info("Abandon mission! A black hole just swallowed [ %s/%s ]", self.method, self.data['VoidName'])
return
xbmc.sleep(100)
LOG.info("--[ void/%s ]", self.data['VoidName'])