From 37281f6ca791e5937e8a4ffb7a67b08134f92e98 Mon Sep 17 00:00:00 2001 From: mammo0 Date: Wed, 14 Oct 2020 18:02:06 +0200 Subject: [PATCH] free memory after a thread completes -> prior all threads that fetched items from the server and their results stayed in memory until the sync was finished --- jellyfin_kodi/downloader.py | 64 ++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index 41b5d747..bb4e70a4 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -7,7 +7,7 @@ import threading import concurrent.futures from datetime import date -from six.moves import range, queue as Queue, zip +from six.moves import range, queue as Queue from kodi_six import xbmc import requests @@ -183,7 +183,6 @@ def get_item_count(parent_id, item_type=None, params=None): return result.get('TotalRecordCount', 1) - def get_items(parent_id, item_type=None, basic=False, params=None): query = { @@ -279,36 +278,51 @@ def _get_items(query, server_id=None): # complete all tasks before allowing any results to be processed. ThreadPoolExecutor # allows for completed tasks to be processed while other tasks are completed on other # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor - p = concurrent.futures.ThreadPoolExecutor(DTHREADS) + with concurrent.futures.ThreadPoolExecutor(DTHREADS) as p: + # dictionary for storing the jobs and their results + jobs = {} - thread_buffer = threading.Semaphore(2 * LIMIT * DTHREADS) + # semaphore to avoid fetching complete library to memory + thread_buffer = threading.Semaphore(LIMIT * DTHREADS) - def get_wrapper(params): - thread_buffer.acquire() - return _get(url, params, server_id=server_id) + # wrapper function for _get that uses a semaphore + def get_wrapper(params): + thread_buffer.acquire() + return _get(url, params, server_id=server_id) - results = p.map(get_wrapper, query_params) + # create jobs + for param in query_params: + job = p.submit(get_wrapper, param) + # the query params are later needed again + jobs[job] = param - for params, result in zip(query_params, results): - query['params'] = params + # process complete jobs + for job in concurrent.futures.as_completed(jobs): + # get the result + result = job.result() or {'Items': []} + query['params'] = jobs[job] - result = result or {'Items': []} + # free job memory + del jobs[job] + del job - # Mitigates #216 till the server validates the date provided is valid - if result['Items'][0].get('ProductionYear'): - try: - date(result['Items'][0]['ProductionYear'], 1, 1) - except ValueError: - LOG.info('#216 mitigation triggered. Setting ProductionYear to None') - result['Items'][0]['ProductionYear'] = None + # Mitigates #216 till the server validates the date provided is valid + if result['Items'][0].get('ProductionYear'): + try: + date(result['Items'][0]['ProductionYear'], 1, 1) + except ValueError: + LOG.info('#216 mitigation triggered. Setting ProductionYear to None') + result['Items'][0]['ProductionYear'] = None - items['Items'].extend(result['Items']) - # Using items to return data and communicate a restore point back to the callee is - # a violation of the SRP. TODO: Seperate responsibilities. - items['RestorePoint'] = query - yield items - del items['Items'][:] - thread_buffer.release() + items['Items'].extend(result['Items']) + # Using items to return data and communicate a restore point back to the callee is + # a violation of the SRP. TODO: Seperate responsibilities. + items['RestorePoint'] = query + yield items + del items['Items'][:] + + # release the semaphore again + thread_buffer.release() class GetItemWorker(threading.Thread):