diff --git a/addon.xml b/addon.xml index 557bd9bb..042a7146 100644 --- a/addon.xml +++ b/addon.xml @@ -10,6 +10,7 @@ + diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index 2a94c698..3b734deb 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -19,6 +19,7 @@ from jellyfin.exceptions import HTTPException LOG = logging.getLogger("JELLYFIN." + __name__) LIMIT = min(int(settings('limitIndex') or 50), 50) +DTHREADS = int(settings('limitThreads') or 3) ################################################################################################# @@ -243,7 +244,8 @@ def _get_items(query, server_id=None): } url = query['url'] - params = query.get('params', {}) + query.setdefault('params', {}) + params = query['params'] try: test_params = dict(params) @@ -256,21 +258,37 @@ def _get_items(query, server_id=None): LOG.exception("Failed to retrieve the server response %s: %s params:%s", url, error, params) else: - index = params.get('StartIndex', 0) - total = items['TotalRecordCount'] + params.setdefault('StartIndex', 0) - while index < total: + def get_query_params(params, start, count): + params_copy = dict(params) + params_copy['StartIndex'] = start + params_copy['Limit'] = count + return params_copy - params['StartIndex'] = index - params['Limit'] = LIMIT - result = _get(url, params, server_id=server_id) or {'Items': []} + query_params = [get_query_params(params, offset, LIMIT) \ + for offset in xrange(params['StartIndex'], items['TotalRecordCount'], LIMIT)] + from itertools import izip + # multiprocessing.dummy.Pool completes all requests in multiple threads but has to + # complete all tasks before allowing any results to be processed. ThreadPoolExecutor + # allows for completed tasks to be processed while other tasks are completed on other + # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor + import concurrent.futures + p = concurrent.futures.ThreadPoolExecutor(DTHREADS) + + results = p.map(lambda params: _get(url, params, server_id=server_id), query_params) + + for params, result in izip(query_params, results): + query['params'] = params + + result = result or {'Items': []} items['Items'].extend(result['Items']) + # Using items to return data and communicate a restore point back to the callee is + # a violation of the SRP. TODO: Seperate responsibilities. items['RestorePoint'] = query yield items - del items['Items'][:] - index += LIMIT class GetItemWorker(threading.Thread): diff --git a/jellyfin_kodi/library.py b/jellyfin_kodi/library.py index 5ca1bdd9..8d4a3f7b 100644 --- a/jellyfin_kodi/library.py +++ b/jellyfin_kodi/library.py @@ -138,8 +138,7 @@ class Library(threading.Thread): if not self.player.isPlayingVideo() or settings('syncDuringPlay.bool') or xbmc.getCondVisibility('VideoPlayer.Content(livetv)'): - while self.worker_downloads(): - pass + self.worker_downloads() self.worker_sort() self.worker_updates() @@ -225,7 +224,6 @@ class Library(threading.Thread): ''' Get items from jellyfin and place them in the appropriate queues. ''' - added_threads = False for queue in ((self.updated_queue, self.updated_output), (self.userdata_queue, self.userdata_output)): if queue[0].qsize() and len(self.download_threads) < DTHREADS: @@ -233,8 +231,6 @@ class Library(threading.Thread): new_thread.start() LOG.info("-->[ q:download/%s ]", id(new_thread)) self.download_threads.append(new_thread) - added_threads = True - return added_threads def worker_sort(self):