From 09b0bdbc48026052dc698345a3e8cff124c9e22f Mon Sep 17 00:00:00 2001 From: mammo0 Date: Wed, 14 Oct 2020 08:24:37 +0200 Subject: [PATCH 1/4] use a semaphore to avoid fetching complete library to memory -> this happens if the processing of items is slower as the fetching of new -> if a big library is synced, the old behavior could lead to extensive use of memory -> the semaphore acts like a buffer that only allows fetching of new items from the library if old ones are processed -> the current size of the 'buffer' is hard coded to 2 * [max. item fetch limit] * [number of download threads] --- jellyfin_kodi/downloader.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index 0dd37da9..41b5d747 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -281,7 +281,13 @@ def _get_items(query, server_id=None): # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor p = concurrent.futures.ThreadPoolExecutor(DTHREADS) - results = p.map(lambda params: _get(url, params, server_id=server_id), query_params) + thread_buffer = threading.Semaphore(2 * LIMIT * DTHREADS) + + def get_wrapper(params): + thread_buffer.acquire() + return _get(url, params, server_id=server_id) + + results = p.map(get_wrapper, query_params) for params, result in zip(query_params, results): query['params'] = params @@ -302,6 +308,7 @@ def _get_items(query, server_id=None): items['RestorePoint'] = query yield items del items['Items'][:] + thread_buffer.release() class GetItemWorker(threading.Thread): From 37281f6ca791e5937e8a4ffb7a67b08134f92e98 Mon Sep 17 00:00:00 2001 From: mammo0 Date: Wed, 14 Oct 2020 18:02:06 +0200 Subject: [PATCH 2/4] free memory after a thread completes -> prior all threads that fetched items from the server and their results stayed in memory until the sync was finished --- jellyfin_kodi/downloader.py | 64 ++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index 41b5d747..bb4e70a4 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -7,7 +7,7 @@ import threading import concurrent.futures from datetime import date -from six.moves import range, queue as Queue, zip +from six.moves import range, queue as Queue from kodi_six import xbmc import requests @@ -183,7 +183,6 @@ def get_item_count(parent_id, item_type=None, params=None): return result.get('TotalRecordCount', 1) - def get_items(parent_id, item_type=None, basic=False, params=None): query = { @@ -279,36 +278,51 @@ def _get_items(query, server_id=None): # complete all tasks before allowing any results to be processed. ThreadPoolExecutor # allows for completed tasks to be processed while other tasks are completed on other # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor - p = concurrent.futures.ThreadPoolExecutor(DTHREADS) + with concurrent.futures.ThreadPoolExecutor(DTHREADS) as p: + # dictionary for storing the jobs and their results + jobs = {} - thread_buffer = threading.Semaphore(2 * LIMIT * DTHREADS) + # semaphore to avoid fetching complete library to memory + thread_buffer = threading.Semaphore(LIMIT * DTHREADS) - def get_wrapper(params): - thread_buffer.acquire() - return _get(url, params, server_id=server_id) + # wrapper function for _get that uses a semaphore + def get_wrapper(params): + thread_buffer.acquire() + return _get(url, params, server_id=server_id) - results = p.map(get_wrapper, query_params) + # create jobs + for param in query_params: + job = p.submit(get_wrapper, param) + # the query params are later needed again + jobs[job] = param - for params, result in zip(query_params, results): - query['params'] = params + # process complete jobs + for job in concurrent.futures.as_completed(jobs): + # get the result + result = job.result() or {'Items': []} + query['params'] = jobs[job] - result = result or {'Items': []} + # free job memory + del jobs[job] + del job - # Mitigates #216 till the server validates the date provided is valid - if result['Items'][0].get('ProductionYear'): - try: - date(result['Items'][0]['ProductionYear'], 1, 1) - except ValueError: - LOG.info('#216 mitigation triggered. Setting ProductionYear to None') - result['Items'][0]['ProductionYear'] = None + # Mitigates #216 till the server validates the date provided is valid + if result['Items'][0].get('ProductionYear'): + try: + date(result['Items'][0]['ProductionYear'], 1, 1) + except ValueError: + LOG.info('#216 mitigation triggered. Setting ProductionYear to None') + result['Items'][0]['ProductionYear'] = None - items['Items'].extend(result['Items']) - # Using items to return data and communicate a restore point back to the callee is - # a violation of the SRP. TODO: Seperate responsibilities. - items['RestorePoint'] = query - yield items - del items['Items'][:] - thread_buffer.release() + items['Items'].extend(result['Items']) + # Using items to return data and communicate a restore point back to the callee is + # a violation of the SRP. TODO: Seperate responsibilities. + items['RestorePoint'] = query + yield items + del items['Items'][:] + + # release the semaphore again + thread_buffer.release() class GetItemWorker(threading.Thread): From e510097193f0997c455c969d858d1b4c377c3a76 Mon Sep 17 00:00:00 2001 From: mammo0 Date: Wed, 14 Oct 2020 18:06:28 +0200 Subject: [PATCH 3/4] re-added empty line to pass validation checks --- jellyfin_kodi/downloader.py | 1 + 1 file changed, 1 insertion(+) diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index bb4e70a4..47bf971a 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -183,6 +183,7 @@ def get_item_count(parent_id, item_type=None, params=None): return result.get('TotalRecordCount', 1) + def get_items(parent_id, item_type=None, basic=False, params=None): query = { From 5522c93458d0c2e27d7c6d18d47be431e2b8f92f Mon Sep 17 00:00:00 2001 From: mammo0 Date: Mon, 2 Nov 2020 09:40:13 +0100 Subject: [PATCH 4/4] set DTHREADS as semaphore size --- jellyfin_kodi/downloader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index 47bf971a..fe9c98d9 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -284,7 +284,7 @@ def _get_items(query, server_id=None): jobs = {} # semaphore to avoid fetching complete library to memory - thread_buffer = threading.Semaphore(LIMIT * DTHREADS) + thread_buffer = threading.Semaphore(DTHREADS) # wrapper function for _get that uses a semaphore def get_wrapper(params):