Merge pull request #410 from mammo0/fix_for_#350

Use a semaphore to avoid fetching complete library to memory
This commit is contained in:
mcarlton00 2020-11-02 10:09:11 -05:00 committed by GitHub
commit 7d792ab50b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 41 additions and 19 deletions

View File

@ -7,7 +7,7 @@ import threading
import concurrent.futures import concurrent.futures
from datetime import date from datetime import date
from six.moves import range, queue as Queue, zip from six.moves import range, queue as Queue
from kodi_six import xbmc from kodi_six import xbmc
import requests import requests
@ -280,29 +280,51 @@ def _get_items(query, server_id=None):
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor # complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other # allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
p = concurrent.futures.ThreadPoolExecutor(dthreads) with concurrent.futures.ThreadPoolExecutor(dthreads) as p:
# dictionary for storing the jobs and their results
jobs = {}
results = p.map(lambda params: _get(url, params, server_id=server_id), query_params) # semaphore to avoid fetching complete library to memory
thread_buffer = threading.Semaphore(dthreads)
for params, result in zip(query_params, results): # wrapper function for _get that uses a semaphore
query['params'] = params def get_wrapper(params):
thread_buffer.acquire()
return _get(url, params, server_id=server_id)
result = result or {'Items': []} # create jobs
for param in query_params:
job = p.submit(get_wrapper, param)
# the query params are later needed again
jobs[job] = param
# Mitigates #216 till the server validates the date provided is valid # process complete jobs
if result['Items'][0].get('ProductionYear'): for job in concurrent.futures.as_completed(jobs):
try: # get the result
date(result['Items'][0]['ProductionYear'], 1, 1) result = job.result() or {'Items': []}
except ValueError: query['params'] = jobs[job]
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None
items['Items'].extend(result['Items']) # free job memory
# Using items to return data and communicate a restore point back to the callee is del jobs[job]
# a violation of the SRP. TODO: Seperate responsibilities. del job
items['RestorePoint'] = query
yield items # Mitigates #216 till the server validates the date provided is valid
del items['Items'][:] if result['Items'][0].get('ProductionYear'):
try:
date(result['Items'][0]['ProductionYear'], 1, 1)
except ValueError:
LOG.info('#216 mitigation triggered. Setting ProductionYear to None')
result['Items'][0]['ProductionYear'] = None
items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]
# release the semaphore again
thread_buffer.release()
class GetItemWorker(threading.Thread): class GetItemWorker(threading.Thread):