free memory after a thread completes

-> prior all threads that fetched items from the server and their
results stayed in memory until the sync was finished
This commit is contained in:
mammo0 2020-10-14 18:02:06 +02:00
parent 09b0bdbc48
commit 37281f6ca7
1 changed files with 39 additions and 25 deletions

View File

@ -7,7 +7,7 @@ import threading
import concurrent.futures import concurrent.futures
from datetime import date from datetime import date
from six.moves import range, queue as Queue, zip from six.moves import range, queue as Queue
from kodi_six import xbmc from kodi_six import xbmc
import requests import requests
@ -183,7 +183,6 @@ def get_item_count(parent_id, item_type=None, params=None):
return result.get('TotalRecordCount', 1) return result.get('TotalRecordCount', 1)
def get_items(parent_id, item_type=None, basic=False, params=None): def get_items(parent_id, item_type=None, basic=False, params=None):
query = { query = {
@ -279,36 +278,51 @@ def _get_items(query, server_id=None):
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor # complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other # allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
p = concurrent.futures.ThreadPoolExecutor(DTHREADS) with concurrent.futures.ThreadPoolExecutor(DTHREADS) as p:
# dictionary for storing the jobs and their results
jobs = {}
thread_buffer = threading.Semaphore(2 * LIMIT * DTHREADS) # semaphore to avoid fetching complete library to memory
thread_buffer = threading.Semaphore(LIMIT * DTHREADS)
def get_wrapper(params): # wrapper function for _get that uses a semaphore
thread_buffer.acquire() def get_wrapper(params):
return _get(url, params, server_id=server_id) thread_buffer.acquire()
return _get(url, params, server_id=server_id)
results = p.map(get_wrapper, query_params) # create jobs
for param in query_params:
job = p.submit(get_wrapper, param)
# the query params are later needed again
jobs[job] = param
for params, result in zip(query_params, results): # process complete jobs
query['params'] = params for job in concurrent.futures.as_completed(jobs):
# get the result
result = job.result() or {'Items': []}
query['params'] = jobs[job]
result = result or {'Items': []} # free job memory
del jobs[job]
del job
# Mitigates #216 till the server validates the date provided is valid # Mitigates #216 till the server validates the date provided is valid
if result['Items'][0].get('ProductionYear'): if result['Items'][0].get('ProductionYear'):
try: try:
date(result['Items'][0]['ProductionYear'], 1, 1) date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError: except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None') LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None result['Items'][0]['ProductionYear'] = None
items['Items'].extend(result['Items']) items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is # Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities. # a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query items['RestorePoint'] = query
yield items yield items
del items['Items'][:] del items['Items'][:]
thread_buffer.release()
# release the semaphore again
thread_buffer.release()
class GetItemWorker(threading.Thread): class GetItemWorker(threading.Thread):