# -*- coding: utf-8 -*- from __future__ import division, absolute_import, print_function, unicode_literals ################################################################################################# import threading import concurrent.futures from datetime import date from six.moves import range, queue as Queue from kodi_six import xbmc import requests from helper import settings, stop, event, window, create_id from jellyfin import Jellyfin from jellyfin import api from helper.exceptions import HTTPException from helper import LazyLogger ################################################################################################# LOG = LazyLogger(__name__) ################################################################################################# def get_jellyfinserver_url(handler): if handler.startswith('/'): handler = handler[1:] LOG.info("handler starts with /: %s", handler) return "{server}/%s" % handler def browse_info(): return ( "DateCreated,EpisodeCount,SeasonCount,Path,Genres,Studios,Taglines,MediaStreams,Overview,Etag," "ProductionLocations,Width,Height,RecursiveItemCount,ChildCount" ) def _http(action, url, request=None, server_id=None): if request is None: request = {} request.update({'url': url, 'type': action}) return Jellyfin(server_id).http.request(request) def _get(handler, params=None, server_id=None): return _http("GET", get_jellyfinserver_url(handler), {'params': params}, server_id) def _post(handler, json=None, params=None, server_id=None): return _http("POST", get_jellyfinserver_url(handler), {'params': params, 'json': json}, server_id) def _delete(handler, params=None, server_id=None): return _http("DELETE", get_jellyfinserver_url(handler), {'params': params}, server_id) def validate_view(library_id, item_id): ''' This confirms a single item from the library matches the view it belongs to. Used to detect grouped libraries. ''' try: result = _get("Users/{UserId}/Items", { 'ParentId': library_id, 'Recursive': True, 'Ids': item_id }) except Exception as error: LOG.exception(error) return False return True if len(result['Items']) else False def get_single_item(parent_id, media): return _get("Users/{UserId}/Items", { 'ParentId': parent_id, 'Recursive': True, 'Limit': 1, 'IncludeItemTypes': media }) def get_filtered_section(parent_id=None, media=None, limit=None, recursive=None, sort=None, sort_order=None, filters=None, extra=None, server_id=None): ''' Get dynamic listings. ''' params = { 'ParentId': parent_id, 'IncludeItemTypes': media, 'IsMissing': False, 'Recursive': recursive if recursive is not None else True, 'Limit': limit, 'SortBy': sort or "SortName", 'SortOrder': sort_order or "Ascending", 'ImageTypeLimit': 1, 'IsVirtualUnaired': False, 'Fields': browse_info() } if filters: if 'Boxsets' in filters: filters.remove('Boxsets') params['CollapseBoxSetItems'] = settings('groupedSets.bool') params['Filters'] = ','.join(filters) if settings('getCast.bool'): params['Fields'] += ",People" if media and 'Photo' in media: params['Fields'] += ",Width,Height" if extra is not None: params.update(extra) return _get("Users/{UserId}/Items", params, server_id) def get_movies_by_boxset(boxset_id): for items in get_items(boxset_id, "Movie"): yield items def get_episode_by_show(show_id): query = { 'url': "Shows/%s/Episodes" % show_id, 'params': { 'EnableUserData': True, 'EnableImages': True, 'UserId': "{UserId}", 'Fields': api.info() } } for items in _get_items(query): yield items def get_episode_by_season(show_id, season_id): query = { 'url': "Shows/%s/Episodes" % show_id, 'params': { 'SeasonId': season_id, 'EnableUserData': True, 'EnableImages': True, 'UserId': "{UserId}", 'Fields': api.info() } } for items in _get_items(query): yield items def get_item_count(parent_id, item_type=None, params=None): url = "Users/{UserId}/Items" query_params = { 'ParentId': parent_id, 'IncludeItemTypes': item_type, 'EnableTotalRecordCount': True, 'LocationTypes': "FileSystem,Remote,Offline", 'Recursive': True, 'Limit': 1 } if params: query_params['params'].update(params) result = _get(url, query_params) return result.get('TotalRecordCount', 1) def get_items(parent_id, item_type=None, basic=False, params=None): query = { 'url': "Users/{UserId}/Items", 'params': { 'ParentId': parent_id, 'IncludeItemTypes': item_type, 'SortBy': "SortName", 'SortOrder': "Ascending", 'Fields': api.basic_info() if basic else api.info(), 'CollapseBoxSetItems': False, 'IsVirtualUnaired': False, 'EnableTotalRecordCount': False, 'LocationTypes': "FileSystem,Remote,Offline", 'IsMissing': False, 'Recursive': True } } if params: query['params'].update(params) for items in _get_items(query): yield items def get_artists(parent_id=None): query = { 'url': 'Artists', 'params': { 'UserId': "{UserId}", 'ParentId': parent_id, 'SortBy': "SortName", 'SortOrder': "Ascending", 'Fields': api.music_info(), 'CollapseBoxSetItems': False, 'IsVirtualUnaired': False, 'EnableTotalRecordCount': False, 'LocationTypes': "FileSystem,Remote,Offline", 'IsMissing': False, 'Recursive': True } } for items in _get_items(query): yield items @stop def _get_items(query, server_id=None): ''' query = { 'url': string, 'params': dict -- opt, include StartIndex to resume } ''' items = { 'Items': [], 'TotalRecordCount': 0, 'RestorePoint': {} } limit = min(int(settings('limitIndex') or 50), 50) dthreads = int(settings('limitThreads') or 3) url = query['url'] query.setdefault('params', {}) params = query['params'] try: test_params = dict(params) test_params['Limit'] = 1 test_params['EnableTotalRecordCount'] = True items['TotalRecordCount'] = _get(url, test_params, server_id=server_id)['TotalRecordCount'] except Exception as error: LOG.exception("Failed to retrieve the server response %s: %s params:%s", url, error, params) else: params.setdefault('StartIndex', 0) def get_query_params(params, start, count): params_copy = dict(params) params_copy['StartIndex'] = start params_copy['Limit'] = count return params_copy query_params = [ get_query_params(params, offset, limit) for offset in range(params['StartIndex'], items['TotalRecordCount'], limit) ] # multiprocessing.dummy.Pool completes all requests in multiple threads but has to # complete all tasks before allowing any results to be processed. ThreadPoolExecutor # allows for completed tasks to be processed while other tasks are completed on other # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor with concurrent.futures.ThreadPoolExecutor(dthreads) as p: # dictionary for storing the jobs and their results jobs = {} # semaphore to avoid fetching complete library to memory thread_buffer = threading.Semaphore(dthreads) # wrapper function for _get that uses a semaphore def get_wrapper(params): thread_buffer.acquire() return _get(url, params, server_id=server_id) # create jobs for param in query_params: job = p.submit(get_wrapper, param) # the query params are later needed again jobs[job] = param # process complete jobs for job in concurrent.futures.as_completed(jobs): # get the result result = job.result() or {'Items': []} query['params'] = jobs[job] # free job memory del jobs[job] del job # Mitigates #216 till the server validates the date provided is valid if result['Items'][0].get('ProductionYear'): try: date(result['Items'][0]['ProductionYear'], 1, 1) except ValueError: LOG.info('#216 mitigation triggered. Setting ProductionYear to None') result['Items'][0]['ProductionYear'] = None items['Items'].extend(result['Items']) # Using items to return data and communicate a restore point back to the callee is # a violation of the SRP. TODO: Seperate responsibilities. items['RestorePoint'] = query yield items del items['Items'][:] # release the semaphore again thread_buffer.release() class GetItemWorker(threading.Thread): is_done = False def __init__(self, server, queue, output): self.server = server self.queue = queue self.output = output threading.Thread.__init__(self) def run(self): with requests.Session() as s: while True: try: item_ids = self.queue.get(timeout=1) except Queue.Empty: self.is_done = True LOG.info("--<[ q:download/%s ]", id(self)) return request = { 'type': "GET", 'handler': "Users/{UserId}/Items", 'params': { 'Ids': ','.join(str(x) for x in item_ids), 'Fields': api.info() } } try: result = self.server.http.request(request, s) for item in result['Items']: if item['Type'] in self.output: self.output[item['Type']].put(item) except HTTPException as error: LOG.error("--[ http status: %s ]", error.status) if error.status == 'ServerUnreachable': self.is_done = True break except Exception as error: LOG.exception(error) self.queue.task_done() if window('jellyfin_should_stop.bool'): break