Merge pull request #202 from druscoe/threaded_http_gets

Parallelize multiple http GETs
This commit is contained in:
mcarlton00 2020-02-20 20:12:29 -05:00 committed by GitHub
commit 4d4f45cd42
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 29 additions and 14 deletions

View file

@ -10,6 +10,7 @@
<import addon="script.module.six" />
<import addon="script.module.kodi-six" />
<import addon="script.module.addon.signals" version="0.0.1"/>
<import addon="script.module.futures" version="2.2.0"/>
</requires>
<extension point="xbmc.python.pluginsource"
library="default.py">

View file

@ -19,6 +19,7 @@ from jellyfin.exceptions import HTTPException
LOG = logging.getLogger("JELLYFIN." + __name__)
LIMIT = min(int(settings('limitIndex') or 50), 50)
DTHREADS = int(settings('limitThreads') or 3)
#################################################################################################
@ -243,7 +244,8 @@ def _get_items(query, server_id=None):
}
url = query['url']
params = query.get('params', {})
query.setdefault('params', {})
params = query['params']
try:
test_params = dict(params)
@ -256,21 +258,37 @@ def _get_items(query, server_id=None):
LOG.exception("Failed to retrieve the server response %s: %s params:%s", url, error, params)
else:
index = params.get('StartIndex', 0)
total = items['TotalRecordCount']
params.setdefault('StartIndex', 0)
while index < total:
def get_query_params(params, start, count):
params_copy = dict(params)
params_copy['StartIndex'] = start
params_copy['Limit'] = count
return params_copy
params['StartIndex'] = index
params['Limit'] = LIMIT
result = _get(url, params, server_id=server_id) or {'Items': []}
query_params = [get_query_params(params, offset, LIMIT) \
for offset in xrange(params['StartIndex'], items['TotalRecordCount'], LIMIT)]
from itertools import izip
# multiprocessing.dummy.Pool completes all requests in multiple threads but has to
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor
# allows for completed tasks to be processed while other tasks are completed on other
# threads. Dont be a dummy.Pool, be a ThreadPoolExecutor
import concurrent.futures
p = concurrent.futures.ThreadPoolExecutor(DTHREADS)
results = p.map(lambda params: _get(url, params, server_id=server_id), query_params)
for params, result in izip(query_params, results):
query['params'] = params
result = result or {'Items': []}
items['Items'].extend(result['Items'])
# Using items to return data and communicate a restore point back to the callee is
# a violation of the SRP. TODO: Seperate responsibilities.
items['RestorePoint'] = query
yield items
del items['Items'][:]
index += LIMIT
class GetItemWorker(threading.Thread):

View file

@ -138,8 +138,7 @@ class Library(threading.Thread):
if not self.player.isPlayingVideo() or settings('syncDuringPlay.bool') or xbmc.getCondVisibility('VideoPlayer.Content(livetv)'):
while self.worker_downloads():
pass
self.worker_downloads()
self.worker_sort()
self.worker_updates()
@ -225,7 +224,6 @@ class Library(threading.Thread):
''' Get items from jellyfin and place them in the appropriate queues.
'''
added_threads = False
for queue in ((self.updated_queue, self.updated_output), (self.userdata_queue, self.userdata_output)):
if queue[0].qsize() and len(self.download_threads) < DTHREADS:
@ -233,8 +231,6 @@ class Library(threading.Thread):
new_thread.start()
LOG.info("-->[ q:download/%s ]", id(new_thread))
self.download_threads.append(new_thread)
added_threads = True
return added_threads
def worker_sort(self):