From 9f1cf14cf6b43f1d8bad0a99f9f4101fb16259b1 Mon Sep 17 00:00:00 2001 From: angelblue05 Date: Sat, 3 Dec 2016 03:11:38 -0600 Subject: [PATCH] Change to throttle --- resources/language/English/strings.xml | 3 +- resources/lib/read_embyserver.py | 256 ++++++++++++------------- resources/settings.xml | 4 +- 3 files changed, 125 insertions(+), 138 deletions(-) diff --git a/resources/language/English/strings.xml b/resources/language/English/strings.xml index bfe0d9ff..d2b9e0cc 100644 --- a/resources/language/English/strings.xml +++ b/resources/language/English/strings.xml @@ -201,7 +201,7 @@ Force artwork caching Limit artwork cache threads (recommended for rpi) Enable fast startup (requires server plugin) - Maximum items to request from the server at once + Maximum items to request from the server per download thread Video Playback Network credentials Enable Emby cinema mode @@ -234,6 +234,7 @@ Enable server offline message Enable analytic metric logging Display message (in seconds) + Download threads (recommended: 2-3) Sign in with Emby Connect diff --git a/resources/lib/read_embyserver.py b/resources/lib/read_embyserver.py index cd36c116..7d9b539e 100644 --- a/resources/lib/read_embyserver.py +++ b/resources/lib/read_embyserver.py @@ -4,6 +4,8 @@ import logging import hashlib +import threading +import Queue import xbmc @@ -19,20 +21,74 @@ log = logging.getLogger("EMBY."+__name__) ################################################################################################# +class DownloadThreader(threading.Thread): + + is_finished = False + + def __init__(self, queue, output): + + self.queue = queue + self.output = output + threading.Thread.__init__(self) + + def run(self): + + try: + query = self.queue.get() + except Queue.Empty: + self.is_finished = True + return + + try: + result = downloadutils.DownloadUtils().downloadUrl(query['url'], + parameters=query.get('params')) + if result: + self.output.extend(result['Items']) + except Exception as error: + log.error(error) + + self.queue.task_done() + self.is_finished = True + + class Read_EmbyServer(): - limitIndex = int(settings('limitindex')) - + limitIndex = min(int(settings('limitIndex')), 50) + download_limit = int(settings('downloadThreads')) + download_threads = list() def __init__(self): - self.doUtils = downloadutils.DownloadUtils().downloadUrl + self.doUtils = downloadutils.DownloadUtils() self.userId = window('emby_currUser') self.server = window('emby_server%s' % self.userId) def get_emby_url(self, handler): return "{server}/emby/%s" % handler + def _add_worker_thread(self, queue, output): + + while True: + for thread in self.download_threads: + if thread.is_finished: + self.download_threads.remove(thread) + + if window('emby_online') != "true": + # Something happened + log.error("Server is not online, don't start new download thread") + queue.task_done() + return False + + if len(self.download_threads) < self.download_limit: + # Start new "daemon thread" - actual daemon thread is not supported in Kodi + new_thread = DownloadThreader(queue, output) + new_thread.start() + self.download_threads.append(new_thread) + return True + else: + log.info("Waiting for empty download spot: %s", len(self.download_threads)) + xbmc.sleep(100) + def split_list(self, itemlist, size): # Split up list in pieces of size. Will generate a list of lists @@ -43,7 +99,7 @@ class Read_EmbyServer(): item = {} try: - result = self.doUtils("{server}/emby/Users/{UserId}/Items/%s?format=json" % itemid) + result = self.doUtils.downloadUrl("{server}/emby/Users/{UserId}/Items/%s?format=json" % itemid) except Exception as error: log.info("Error getting item from server: " + str(error)) result = None @@ -53,41 +109,37 @@ class Read_EmbyServer(): return item - def getItems(self, itemlist): + def getItems(self, item_list): items = [] + queue = Queue.Queue() - itemlists = self.split_list(itemlist, 50) - for itemlist in itemlists: + url = "{server}/emby/Users/{UserId}/Items?&format=json" + for item_ids in self.split_list(item_list, self.limitIndex): # Will return basic information params = { - 'Ids': ",".join(itemlist), + 'Ids': ",".join(item_ids), 'Fields': "Etag" } - url = "{server}/emby/Users/{UserId}/Items?&format=json" + queue.put({'url': url, 'params': params}) + if not self._add_worker_thread(queue, items): + break - try: - result = self.doUtils(url, parameters=params) - except Exception as error: - log.info("Error getting items form server: " + str(error)) - result = None - - if result is not None: - items.extend(result['Items']) + queue.join() return items - def getFullItems(self, itemlist): - + def getFullItems(self, item_list): + items = [] + queue = Queue.Queue() - itemlists = self.split_list(itemlist, 50) - for itemlist in itemlists: - + url = "{server}/emby/Users/{UserId}/Items?format=json" + for item_ids in self.split_list(item_list, self.limitIndex): params = { - "Ids": ",".join(itemlist), + "Ids": ",".join(item_ids), "Fields": ( "Path,Genres,SortName,Studios,Writer,ProductionYear,Taglines," @@ -98,15 +150,11 @@ class Read_EmbyServer(): "MediaSources,VoteCount" ) } - url = "{server}/emby/Users/{UserId}/Items?format=json" - try: - result = self.doUtils(url, parameters=params) - except Exception as error: - log.info("Error getting full items: " + str(error)) - result = None + queue.put({'url': url, 'params': params}) + if not self._add_worker_thread(queue, items): + break - if result is not None: - items.extend(result['Items']) + queue.join() return items @@ -133,7 +181,7 @@ class Read_EmbyServer(): "Tags,ProviderIds,ParentId,RemoteTrailers,SpecialEpisodeNumbers" ) } - return self.doUtils("{server}/emby/Users/{UserId}/Items?format=json", parameters=params) + return self.doUtils.downloadUrl("{server}/emby/Users/{UserId}/Items?format=json", parameters=params) def getTvChannels(self): @@ -150,7 +198,7 @@ class Read_EmbyServer(): ) } url = "{server}/emby/LiveTv/Channels/?userid={UserId}&format=json" - return self.doUtils(url, parameters=params) + return self.doUtils.downloadUrl(url, parameters=params) def getTvRecordings(self, groupid): @@ -171,7 +219,7 @@ class Read_EmbyServer(): ) } url = "{server}/emby/LiveTv/Recordings/?userid={UserId}&format=json" - return self.doUtils(url, parameters=params) + return self.doUtils.downloadUrl(url, parameters=params) def getSection(self, parentid, itemtype=None, sortby="SortName", artist_id=None, basic=False, dialog=None): @@ -180,7 +228,6 @@ class Read_EmbyServer(): 'Items': [], 'TotalRecordCount': 0 } - # Get total number of items url = "{server}/emby/Users/{UserId}/Items?format=json" params = { @@ -195,18 +242,16 @@ class Read_EmbyServer(): 'Recursive': True, 'Limit': 1 } - try: - result = self.doUtils(url, parameters=params) + result = self.doUtils.downloadUrl(url, parameters=params) total = result['TotalRecordCount'] items['TotalRecordCount'] = total except Exception as error: # Failed to retrieve - log.debug("%s:%s Failed to retrieve the server response: %s" % (url, params, error)) + log.debug("%s:%s Failed to retrieve the server response: %s", url, params, error) else: index = 0 jump = self.limitIndex - throttled = False - highestjump = 0 + queue = Queue.Queue() while index < total: # Get items by chunk to increase retrieval speed at scale @@ -217,6 +262,7 @@ class Read_EmbyServer(): 'IncludeItemTypes': itemtype, 'CollapseBoxSetItems': False, 'IsVirtualUnaired': False, + 'EnableTotalRecordCount': False, 'LocationTypes': "FileSystem,Remote,Offline", 'IsMissing': False, 'Recursive': True, @@ -237,66 +283,20 @@ class Read_EmbyServer(): "Tags,ProviderIds,ParentId,RemoteTrailers,SpecialEpisodeNumbers," "MediaSources,VoteCount" ) - try: - result = self.doUtils(url, parameters=params) - items['Items'].extend(result['Items']) - except Warning as error: - if "400" in error: - log.info("Something went wrong, aborting request.") - index += jump - except Exception as error: - # Something happened to the connection - if not throttled: - throttled = True - log.info("Throttle activated.") - - if jump == highestjump: - # We already tried with the highestjump, but it failed. Reset value. - log.info("Reset highest value.") - highestjump = 0 + queue.put({'url': url, 'params': params}) + if not self._add_worker_thread(queue, items['Items']): + break - # Lower the number by half - if highestjump: - throttled = False - jump = highestjump - log.info("Throttle deactivated.") - else: - jump = int(jump/4) - log.debug("Set jump limit to recover: %s" % jump) - - retry = 0 - while window('emby_online') != "true": - # Wait server to come back online - if retry == 5: - log.info("Unable to reconnect to server. Abort process.") - return items - - retry += 1 - if xbmc.Monitor().waitForAbort(1): - # Abort was requested while waiting. - return items - else: - # Request succeeded - index += jump + index += jump - if dialog: - percentage = int((float(index) / float(total))*100) - dialog.update(percentage) + if dialog: + percentage = int((float(index) / float(total))*100) + dialog.update(percentage) - if jump > highestjump: - # Adjust with the latest number, if it's greater - highestjump = jump + queue.join() + if dialog: + dialog.update(100) - if throttled: - # We needed to adjust the number of item requested. - # keep increasing until the connection times out again - # to find the highest value - increment = int(jump*0.33) - if not increment: # Incase the increment is 0 - increment = 10 - - jump += increment - log.info("Increase jump limit to: %s" % jump) return items def get_views(self, root=False): @@ -306,7 +306,7 @@ class Read_EmbyServer(): else: # Views ungrouped url = "{server}/emby/Users/{UserId}/Items?Sortby=SortName&format=json" - return self.doUtils(url) + return self.doUtils.downloadUrl(url) def getViews(self, mediatype="", root=False, sortedlist=False): # Build a list of user views @@ -351,7 +351,6 @@ class Read_EmbyServer(): def verifyView(self, parentid, itemid): - belongs = False params = { 'ParentId': parentid, @@ -362,41 +361,32 @@ class Read_EmbyServer(): 'Recursive': True, 'Ids': itemid } - try: - result = self.doUtils("{server}/emby/Users/{UserId}/Items?format=json", parameters=params) + result = self.doUtils.downloadUrl("{server}/emby/Users/{UserId}/Items?format=json", parameters=params) total = result['TotalRecordCount'] except Exception as error: # Something happened to the connection log.info("Error getting item count: " + str(error)) - pass - else: - if total: - belongs = True + return False - return belongs + return True if total else False def getMovies(self, parentId, basic=False, dialog=None): - return self.getSection(parentId, "Movie", basic=basic, dialog=dialog) def getBoxset(self, dialog=None): - return self.getSection(None, "BoxSet", dialog=dialog) def getMovies_byBoxset(self, boxsetid): return self.getSection(boxsetid, "Movie") def getMusicVideos(self, parentId, basic=False, dialog=None): - return self.getSection(parentId, "MusicVideo", basic=basic, dialog=dialog) def getHomeVideos(self, parentId): - return self.getSection(parentId, "Video") def getShows(self, parentId, basic=False, dialog=None): - return self.getSection(parentId, "Series", basic=basic, dialog=dialog) def getSeasons(self, showId): @@ -415,7 +405,7 @@ class Read_EmbyServer(): url = "{server}/emby/Shows/%s/Seasons?UserId={UserId}&format=json" % showId try: - result = self.doUtils(url, parameters=params) + result = self.doUtils.downloadUrl(url, parameters=params) except Exception as error: log.info("Error getting Seasons form server: " + str(error)) result = None @@ -426,15 +416,12 @@ class Read_EmbyServer(): return items def getEpisodes(self, parentId, basic=False, dialog=None): - return self.getSection(parentId, "Episode", basic=basic, dialog=dialog) def getEpisodesbyShow(self, showId): - return self.getSection(showId, "Episode") def getEpisodesbySeason(self, seasonId): - return self.getSection(seasonId, "Episode") def getArtists(self, parent_id=None, dialog=None): @@ -444,7 +431,6 @@ class Read_EmbyServer(): 'Items': [], 'TotalRecordCount': 0 } - # Get total number of items url = "{server}/emby/Artists?UserId={UserId}&format=json" params = { @@ -453,16 +439,16 @@ class Read_EmbyServer(): 'Recursive': True, 'Limit': 1 } - try: - result = self.doUtils(url, parameters=params) + result = self.doUtils.downloadUrl(url, parameters=params) total = result['TotalRecordCount'] items['TotalRecordCount'] = total except Exception as error: # Failed to retrieve - log.debug("%s:%s Failed to retrieve the server response: %s" % (url, params, error)) + log.debug("%s:%s Failed to retrieve the server response: %s", url, params, error) else: index = 0 jump = self.limitIndex + queue = Queue.Queue() while index < total: # Get items by chunk to increase retrieval speed at scale @@ -471,6 +457,7 @@ class Read_EmbyServer(): 'ParentId': parent_id, 'Recursive': True, 'IsVirtualUnaired': False, + 'EnableTotalRecordCount': False, 'LocationTypes': "FileSystem,Remote,Offline", 'IsMissing': False, 'StartIndex': index, @@ -484,33 +471,32 @@ class Read_EmbyServer(): "AirTime,DateCreated,MediaStreams,People,ProviderIds,Overview" ) } + queue.put({'url': url, 'params': params}) + if not self._add_worker_thread(queue, items['Items']): + break - try: - result = self.doUtils(url, parameters=params) - items['Items'].extend(result['Items']) - index += jump - except Exception as error: - log.debug("Error getting artists from server: " + str(error)) + index += jump if dialog: percentage = int((float(index) / float(total))*100) dialog.update(percentage) + + queue.join() + if dialog: + dialog.update(100) + return items def getAlbums(self, basic=False, dialog=None): - return self.getSection(None, "MusicAlbum", sortby="DateCreated", basic=basic, dialog=dialog) def getAlbumsbyArtist(self, artistId): - return self.getSection(None, "MusicAlbum", sortby="DateCreated", artist_id=artistId) def getSongs(self, basic=False, dialog=None): - return self.getSection(None, "Audio", basic=basic, dialog=dialog) def getSongsbyAlbum(self, albumId): - return self.getSection(albumId, "Audio") def getAdditionalParts(self, itemId): @@ -523,7 +509,7 @@ class Read_EmbyServer(): url = "{server}/emby/Videos/%s/AdditionalParts?UserId={UserId}&format=json" % itemId try: - result = self.doUtils(url) + result = self.doUtils.downloadUrl(url) except Exception as error: log.info("Error getting additional parts form server: " + str(error)) result = None @@ -549,7 +535,7 @@ class Read_EmbyServer(): def updateUserRating(self, itemid, favourite=None): # Updates the user rating to Emby - doUtils = self.doUtils + doUtils = self.doUtils.downloadUrl if favourite: url = "{server}/emby/Users/{UserId}/FavoriteItems/%s?format=json" % itemid @@ -574,18 +560,18 @@ class Read_EmbyServer(): 'ReplaceAllMetadata': True } - self.doUtils(url, postBody=params, action_type="POST") + self.doUtils.downloadUrl(url, postBody=params, action_type="POST") def deleteItem(self, itemid): url = "{server}/emby/Items/%s?format=json" % itemid - self.doUtils(url, action_type="DELETE") + self.doUtils.downloadUrl(url, action_type="DELETE") def getUsers(self, server): url = "%s/emby/Users/Public?format=json" % server try: - users = self.doUtils(url, authenticate=False) + users = self.doUtils.downloadUrl(url, authenticate=False) except Exception as error: log.info("Error getting users from server: " + str(error)) users = [] @@ -597,7 +583,7 @@ class Read_EmbyServer(): password = password or "" url = "%s/emby/Users/AuthenticateByName?format=json" % server data = {'username': username, 'password': hashlib.sha1(password).hexdigest()} - user = self.doUtils(url, postBody=data, action_type="POST", authenticate=False) + user = self.doUtils.downloadUrl(url, postBody=data, action_type="POST", authenticate=False) return user @@ -610,4 +596,4 @@ class Read_EmbyServer(): 'IncludeItemTypes': media_type } url = self.get_emby_url('Users/{UserId}/Items?format=json') - return self.doUtils(url, parameters=params) \ No newline at end of file + return self.doUtils.downloadUrl(url, parameters=params) \ No newline at end of file diff --git a/resources/settings.xml b/resources/settings.xml index 76533344..1a554199 100644 --- a/resources/settings.xml +++ b/resources/settings.xml @@ -25,8 +25,8 @@ - - + +