From a00d55ac0eba1e1348a56756f1981148d1c0e19e Mon Sep 17 00:00:00 2001 From: angelblue05 Date: Mon, 26 Sep 2016 21:50:58 -0500 Subject: [PATCH] Rework downloadutils (#63) --- resources/lib/artwork.py | 2 +- resources/lib/connect/connectionmanager.py | 8 +- resources/lib/connectmanager.py | 17 ++ resources/lib/downloadutils.py | 266 +++++++++++++-------- resources/lib/entrypoint.py | 9 + resources/lib/kodimonitor.py | 2 + resources/lib/librarysync.py | 4 +- resources/lib/service_entry.py | 181 +++++++------- resources/lib/userclient.py | 34 ++- resources/lib/websocket_client.py | 2 +- 10 files changed, 333 insertions(+), 192 deletions(-) diff --git a/resources/lib/artwork.py b/resources/lib/artwork.py index 0ae965cf..2e3bdded 100644 --- a/resources/lib/artwork.py +++ b/resources/lib/artwork.py @@ -37,7 +37,7 @@ class Artwork(object): self.enable_texture_cache = settings('enableTextureCache') == "true" self.image_cache_limit = int(settings('imageCacheLimit')) * 5 - log.info("image cache thread count: %s", self.image_cache_limit) + log.debug("image cache thread count: %s", self.image_cache_limit) if not self.xbmc_port and self.enable_texture_cache: self._set_webserver_details() diff --git a/resources/lib/connect/connectionmanager.py b/resources/lib/connect/connectionmanager.py index b061a17b..2e404e97 100644 --- a/resources/lib/connect/connectionmanager.py +++ b/resources/lib/connect/connectionmanager.py @@ -387,7 +387,7 @@ class ConnectionManager(object): return servers - def _getAvailableServers(self): + def getAvailableServers(self): log.info("Begin getAvailableServers") @@ -512,7 +512,7 @@ class ConnectionManager(object): if mode == ConnectionMode['Local']: enableRetry = True - timeout = 8 + timeout = 5 if self._stringEqualsIgnoreCase(address, server.get('ManualAddress')): log.info("skipping LocalAddress test because it is the same as ManualAddress") @@ -522,7 +522,7 @@ class ConnectionManager(object): if self._stringEqualsIgnoreCase(address, server.get('LocalAddress')): enableRetry = True - timeout = 8 + timeout = 5 if skipTest or not address: log.info("skipping test at index: %s" % index) @@ -744,7 +744,7 @@ class ConnectionManager(object): log.info("Begin connect") - servers = self._getAvailableServers() + servers = self.getAvailableServers() return self._connectToServers(servers, options) def _connectToServers(self, servers, options): diff --git a/resources/lib/connectmanager.py b/resources/lib/connectmanager.py index 88309a2b..cfa9d158 100644 --- a/resources/lib/connectmanager.py +++ b/resources/lib/connectmanager.py @@ -18,6 +18,7 @@ from dialogs import ServerConnect, UsersConnect, LoginConnect, LoginManual, Serv log = logging.getLogger("EMBY."+__name__) addon = xbmcaddon.Addon(id='plugin.video.emby') +STATE = connectionmanager.ConnectionState XML_PATH = (addon.getAddonInfo('path'), "default", "1080i") ################################################################################################## @@ -201,3 +202,19 @@ class ConnectManager(object): server.update(updated_server) # Update the token in data.txt self._connect.credentialProvider.getCredentials(credentials) + + def get_connect_servers(self): + + connect_servers = [] + servers = self._connect.getAvailableServers() + for server in servers: + if 'ExchangeToken' in server: + result = self.connect_server(server) + if result['State'] == STATE['SignedIn']: + connect_servers.append(server) + + log.info(connect_servers) + return connect_servers + + def connect_server(self, server): + return self._connect.connectToServer(server, {'updateDateLastAccessed': False}) diff --git a/resources/lib/downloadutils.py b/resources/lib/downloadutils.py index bc0084fd..71f279b3 100644 --- a/resources/lib/downloadutils.py +++ b/resources/lib/downloadutils.py @@ -10,6 +10,7 @@ import xbmc import xbmcgui import clientinfo +import connect.connectionmanager as connectionmanager from utils import window, settings, language as lang ################################################################################################## @@ -24,44 +25,69 @@ log = logging.getLogger("EMBY."+__name__) ################################################################################################## -class DownloadUtils(): +class DownloadUtils(object): # Borg - multiple instances, shared state _shared_state = {} - clientInfo = clientinfo.ClientInfo() # Requests session - s = None + session = {} + session_requests = None + servers = {} # Multi server setup default_timeout = 30 def __init__(self): self.__dict__ = self._shared_state + self.client_info = clientinfo.ClientInfo() - def setUserId(self, userId): + def _set_session(self, **kwargs): # Reserved for userclient only - self.userId = userId - log.debug("Set userId: %s" % userId) + info = {} + for key in kwargs: + info[key] = kwargs[key] - def setServer(self, server): + self.session.update(info) + log.info("Set info for server %s: %s", self.session['ServerId'], self.session) + + def add_server(self, server, ssl): # Reserved for userclient only - self.server = server - log.debug("Set server: %s" % server) + server_id = server['Id'] + info = { + 'UserId': server['UserId'], + 'Server': connectionmanager.getServerAddress(server, server['LastConnectionMode']), + 'Token': server['AccessToken'], + 'SSL': ssl + } + for s in self.servers: + if s == server_id: + s.update(info) + # Set window prop + self._set_server_properties(server_id, server['Name'], json.dumps(info)) + log.info("updating %s to available servers: %s", server_id, self.servers) + break + else: + self.servers[server_id] = info + self._set_server_properties(server_id, server['Name'], json.dumps(info)) + log.info("adding %s to available servers: %s", server_id, self.servers) - def setToken(self, token): + def reset_server(self, server_id): # Reserved for userclient only - self.token = token - log.debug("Set token: %s" % token) + for server in self.servers: + if server['ServerId'] == server_id: + self.servers.remove(s) + window('emby_server%s.json' % server_id, clear=True) + window('emby_server%s.name' % server_id, clear=True) + log.info("removing %s from available servers", server_id) - def setSSL(self, ssl): - # Reserved for userclient only - self.sslverify = ssl - log.debug("Verify SSL verify/certificate: %s" % ssl) + @staticmethod + def _set_server_properties(server_id, name, info): + window('emby_server%s.json' % server_id, value=info) + window('emby_server%s.name' % server_id, value=name) - - def postCapabilities(self, deviceId): + def post_capabilities(self, device_id): # Post settings to session url = "{server}/emby/Sessions/Capabilities/Full?format=json" @@ -83,14 +109,14 @@ class DownloadUtils(): ) } - log.debug("Capabilities URL: %s" % url) + log.debug("capabilities URL: %s" % url) log.debug("Postdata: %s" % data) self.downloadUrl(url, postBody=data, action_type="POST") - log.debug("Posted capabilities to %s" % self.server) + log.debug("Posted capabilities to %s" % self.session['Server']) # Attempt at getting sessionId - url = "{server}/emby/Sessions?DeviceId=%s&format=json" % deviceId + url = "{server}/emby/Sessions?DeviceId=%s&format=json" % device_id result = self.downloadUrl(url) try: sessionId = result[0]['Id'] @@ -130,75 +156,77 @@ class DownloadUtils(): self.downloadUrl(url, postBody={}, action_type="POST") - def startSession(self): - - self.deviceId = self.clientInfo.get_device_id() - + def start_session(self): # User is identified from this point # Attach authenticated header to the session - header = self.getHeader() - - # If user enabled host certificate verification - try: - verify = self.sslverify - except: - verify = False - log.info("Could not load SSL settings.") - # Start session - self.s = requests.Session() - self.s.headers = header - self.s.verify = verify + s = requests.Session() + s.headers = self.get_header() + s.verify = self.session['SSL'] # Retry connections to the server - self.s.mount("http://", requests.adapters.HTTPAdapter(max_retries=1)) - self.s.mount("https://", requests.adapters.HTTPAdapter(max_retries=1)) + s.mount("http://", requests.adapters.HTTPAdapter(max_retries=1)) + s.mount("https://", requests.adapters.HTTPAdapter(max_retries=1)) + self.session_requests = s - log.info("Requests session started on: %s" % self.server) + log.info("Requests session started on: %s" % self.session['Server']) - def stopSession(self): + def stop_session(self): try: - self.s.close() - except Exception: - log.warn("Requests session could not be terminated.") + self.session_requests.close() + except Exception as error: + log.error(error) + log.warn("requests session could not be terminated") - def getHeader(self, authenticate=True): + def get_header(self, server_id=None, authenticate=True): - deviceName = self.clientInfo.get_device_name() - deviceName = deviceName.encode('utf-8') - deviceId = self.clientInfo.get_device_id() - version = self.clientInfo.get_version() + device_name = self.client_info.get_device_name().encode('utf-8') + device_id = self.client_info.get_device_id() + version = self.client_info.get_version() if authenticate: + + user = self.get_user(server_id) + user_id = user['UserId'] + token = user['Token'] + auth = ( 'MediaBrowser UserId="%s", Client="Kodi", Device="%s", DeviceId="%s", Version="%s"' - % (self.userId, deviceName, deviceId, version)) - + % (user_id, device_name, device_id, version) + ) header = { - - 'Content-type': 'application/json', - 'Accept-encoding': 'gzip', - 'Accept-Charset': 'UTF-8,*', 'Authorization': auth, - 'X-MediaBrowser-Token': self.token + 'X-MediaBrowser-Token': token } else: - # If user is not authenticated auth = ( 'MediaBrowser Client="Kodi", Device="%s", DeviceId="%s", Version="%s"' - % (deviceName, deviceId, version)) - - header = { - - 'Content-type': 'application/json', - 'Accept-encoding': 'gzip', - 'Accept-Charset': 'UTF-8,*', - 'Authorization': auth - } + % (device_name, device_id, version) + ) + header = {'Authorization': auth} + header.update({ + 'Content-type': 'application/json', + 'Accept-encoding': 'gzip', + 'Accept-Charset': 'UTF-8,*', + }) return header + def get_user(self, server_id=None): + + if server_id is None: + return { + 'UserId': self.session['UserId'], + 'Token': self.session['Token'] + } + else: + server = self.servers[server_id] + return { + 'UserId': server['UserId'], + 'Token': server['Token'] + } + def downloadUrl(self, url, postBody=None, action_type="GET", parameters=None, - authenticate=True): + authenticate=True, server_id=None): log.debug("===== ENTER downloadUrl =====") @@ -206,31 +234,30 @@ class DownloadUtils(): kwargs = {} default_link = "" - try: - if self.s is not None: - session = self.s + try: # Ensure server info is loaded + if not self._ensure_server(server_id): + raise AttributeError("unable to load server information: %s" % server_id) + + if server_id is None: + if self.session_requests is not None: + session = self.session_requests + else: + kwargs.update({ + 'verify': self.session['SSL'], + 'headers': self.get_header(authenticate=authenticate) + }) + # Replace for the real values + url = url.replace("{server}", self.session['Server']) + url = url.replace("{UserId}", self.session['UserId']) else: - # request session does not exists - # Get user information - self.userId = window('emby_currUser') - self.server = window('emby_server%s' % self.userId) - self.token = window('emby_accessToken%s' % self.userId) - verifyssl = False - - # IF user enables ssl verification - if settings('sslverify') == "true": - verifyssl = True - if settings('sslcert') != "None": - verifyssl = settings('sslcert') - + server = self.servers[server_id] kwargs.update({ - 'verify': verifyssl, - 'headers': self.getHeader(authenticate) + 'verify': server['SSL'], + 'headers': self.get_header(server_id, authenticate) }) - - # Replace for the real values - url = url.replace("{server}", self.server) - url = url.replace("{UserId}", self.userId) + # Replace for the real values + url = url.replace("{server}", server['Server']) + url = url.replace("{UserId}", server['UserId']) ##### PREPARE REQUEST ##### kwargs.update({ @@ -324,15 +351,66 @@ class DownloadUtils(): except requests.exceptions.RequestException as e: log.error("Unknown error connecting to: %s" % url) + except AttributeError as error: + log.error(error) + return default_link - def _requests(self, action, session=requests, **kwargs): + + def _ensure_server(self, server_id=None): + + if server_id is None and self.session_requests is None: + + server = self._get_session_info() + self.session.update(server) + + elif server_id and server_id not in self.servers: + + server = self._get_session_info(server_id) + if server is None: + return False + + self.servers[server_id] = server + + return True + + @classmethod + def _get_session_info(cls, server_id=None): + + info = {} + + if server_id is None: # Main server + + user_id = window('emby_currUser') + info.update({ + 'UserId': user_id, + 'Server': window('emby_server%s' % user_id), + 'Token': window('emby_accessToken%s' % user_id) + }) + verifyssl = False + # If user enables ssl verification + if settings('sslverify') == "true": + verifyssl = True + if settings('sslcert') != "None": + verifyssl = settings('sslcert') + + info['SSL'] = verifyssl + + else: # Other connect servers + server = window('emby_server%s.json' % server_id) + if server: + info.update(json.loads(server)) + + return info + + @classmethod + def _requests(cls, action, session=requests, **kwargs): if action == "GET": - r = session.get(**kwargs) + response = session.get(**kwargs) elif action == "POST": - r = session.post(**kwargs) + response = session.post(**kwargs) elif action == "DELETE": - r = session.delete(**kwargs) + response = session.delete(**kwargs) - return r \ No newline at end of file + return response \ No newline at end of file diff --git a/resources/lib/entrypoint.py b/resources/lib/entrypoint.py index a5039945..aa33ffec 100644 --- a/resources/lib/entrypoint.py +++ b/resources/lib/entrypoint.py @@ -96,6 +96,15 @@ def doMainListing(): addDirectoryItem(lang(33052), "plugin://plugin.video.emby/?mode=browsecontent&type=recordings&folderid=root") + ''' + TODO: Create plugin listing for servers + servers = window('emby_servers') + if servers: + servers = json.loads(servers) + for server in servers: + log.info(window('emby_server%s.name' % server)) + addDirectoryItem(window('emby_server%s.name' % server), "plugin://plugin.video.emby/?mode=%s" % server)''' + addDirectoryItem(lang(30517), "plugin://plugin.video.emby/?mode=passwords") addDirectoryItem(lang(33053), "plugin://plugin.video.emby/?mode=settings") addDirectoryItem(lang(33054), "plugin://plugin.video.emby/?mode=adduser") diff --git a/resources/lib/kodimonitor.py b/resources/lib/kodimonitor.py index da15013d..6247a82e 100644 --- a/resources/lib/kodimonitor.py +++ b/resources/lib/kodimonitor.py @@ -25,6 +25,8 @@ class KodiMonitor(xbmc.Monitor): def __init__(self): + xbmc.Monitor.__init__(self) + self.download = downloadutils.DownloadUtils().downloadUrl log.info("Kodi monitor started") diff --git a/resources/lib/librarysync.py b/resources/lib/librarysync.py index 77264f94..31f7c4df 100644 --- a/resources/lib/librarysync.py +++ b/resources/lib/librarysync.py @@ -1032,7 +1032,6 @@ class LibrarySync(threading.Thread): line1=lang(33024)) break - # Run start up sync log.warn("Database version: %s", window('emby_version')) log.info("SyncDatabase (started)") @@ -1041,6 +1040,9 @@ class LibrarySync(threading.Thread): elapsedTime = datetime.now() - startTime log.info("SyncDatabase (finished in: %s) %s" % (str(elapsedTime).split('.')[0], librarySync)) + + # Add other servers at this point + self.user.load_connect_servers() # Only try the initial sync once per kodi session regardless # This will prevent an infinite loop in case something goes wrong. startupComplete = True diff --git a/resources/lib/service_entry.py b/resources/lib/service_entry.py index a37b8b92..21819550 100644 --- a/resources/lib/service_entry.py +++ b/resources/lib/service_entry.py @@ -33,9 +33,15 @@ class Service(object): warn_auth = True userclient_running = False + userclient_thread = None + websocket_running = False + websocket_thread = None + library_running = False - kodimonitor_running = False + library_thread = None + + monitor = False def __init__(self): @@ -43,7 +49,6 @@ class Service(object): self.client_info = clientinfo.ClientInfo() self.addon_name = self.client_info.get_addon_name() log_level = settings('logLevel') - self.monitor = xbmc.Monitor() window('emby_logLevel', value=str(log_level)) window('emby_kodiProfile', value=xbmc.translatePath('special://profile')) @@ -81,21 +86,21 @@ class Service(object): # Important: Threads depending on abortRequest will not trigger # if profile switch happens more than once. - monitor = self.monitor + self.monitor = kodimonitor.KodiMonitor() kodiProfile = xbmc.translatePath('special://profile') # Server auto-detect initialsetup.InitialSetup().setup() # Initialize important threads - user = userclient.UserClient() - ws = wsc.WebSocketClient() - library = librarysync.LibrarySync() + self.userclient_thread = userclient.UserClient() + self.websocket_thread = wsc.WebSocketClient() + self.library_thread = librarysync.LibrarySync() kplayer = player.Player() # Sync and progress report lastProgressUpdate = datetime.today() - while not monitor.abortRequested(): + while not self.monitor.abortRequested(): if window('emby_kodiProfile') != kodiProfile: # Profile change happened, terminate this thread and others @@ -112,7 +117,7 @@ class Service(object): # Emby server is online # Verify if user is set and has access to the server - if user.get_user() is not None and user.get_access(): + if self.userclient_thread.get_user() is not None and self.userclient_thread.get_access(): # If an item is playing if xbmc.Player().isPlaying(): @@ -158,27 +163,23 @@ class Service(object): dialog(type_="notification", heading="{emby}", message=("%s %s%s!" - % (lang(33000), user.get_username().decode('utf-8'), + % (lang(33000), self.userclient_thread.get_username().decode('utf-8'), add.decode('utf-8'))), icon="{emby}", time=2000, sound=False) - # Start monitoring kodi events - if not self.kodimonitor_running: - self.kodimonitor_running = kodimonitor.KodiMonitor() - # Start the Websocket Client if not self.websocket_running: self.websocket_running = True - ws.start() + self.websocket_thread.start() # Start the syncing thread if not self.library_running: self.library_running = True - library.start() + self.library_thread.start() else: - if (user.get_user() is None) and self.warn_auth: + if (self.userclient_thread.get_user() is None) and self.warn_auth: # Alert user is not authenticated and suppress future warning self.warn_auth = False log.info("Not authenticated yet.") @@ -186,96 +187,106 @@ class Service(object): # User access is restricted. # Keep verifying until access is granted # unless server goes offline or Kodi is shut down. - while not user.get_access(): + while not self.userclient_thread.get_access(): # Verify access with an API call + if window('emby_online') != "true": # Server went offline break - if monitor.waitForAbort(5): + if self.monitor.waitForAbort(5): # Abort was requested while waiting. We should exit break else: # Wait until Emby server is online # or Kodi is shut down. - while not monitor.abortRequested(): - - if user.get_server() is None: - # No server info set in add-on settings - pass - - elif not user.verify_server(): - # Server is offline. - # Alert the user and suppress future warning - if self.server_online: - log.info("Server is offline.") - window('emby_online', value="false") + self._server_online_check() - if settings('offlineMsg') == "true": - dialog(type_="notification", - heading=lang(33001), - message="%s %s" % (self.addon_name, lang(33002)), - icon="{emby}", - sound=False) - self.server_online = False - - elif window('emby_online') == "sleep": - # device going to sleep - if self.websocket_running: - ws.stop_client() - ws = wsc.WebSocketClient() - self.websocket_running = False - - if self.library_running: - library.stopThread() - library = librarysync.LibrarySync() - self.library_running = False - - else: - # Server is online - if not self.server_online: - # Server was offline when Kodi started. - # Wait for server to be fully established. - if monitor.waitForAbort(5): - # Abort was requested while waiting. - break - # Alert the user that server is online. - dialog(type_="notification", - heading="{emby}", - message=lang(33003), - icon="{emby}", - time=2000, - sound=False) - - self.server_online = True - log.info("Server is online and ready.") - window('emby_online', value="true") - - # Start the userclient thread - if not self.userclient_running: - self.userclient_running = True - user.start() - - break - - if monitor.waitForAbort(1): - # Abort was requested while waiting. - break - - if monitor.waitForAbort(1): + if self.monitor.waitForAbort(1): # Abort was requested while waiting. We should exit break ##### Emby thread is terminating. ##### + self._shutdown() + + def _server_online_check(self): + # Set emby_online true/false property + user = self.userclient_thread + while not self.monitor.abortRequested(): + + if user.get_server() is None: + # No server info set in add-on settings + pass + + elif not user.verify_server(): + # Server is offline. + # Alert the user and suppress future warning + if self.server_online: + log.info("Server is offline") + window('emby_online', value="false") + + if settings('offlineMsg') == "true": + dialog(type_="notification", + heading=lang(33001), + message="%s %s" % (self.addon_name, lang(33002)), + icon="{emby}", + sound=False) + + self.server_online = False + + elif window('emby_online') == "sleep": + # device going to sleep + if self.websocket_running: + self.websocket_thread.stop_client() + self.websocket_thread = wsc.WebSocketClient() + self.websocket_running = False + + if self.library_running: + self.library_thread.stopThread() + self.library_thread = librarysync.LibrarySync() + self.library_running = False + + else: + # Server is online + if not self.server_online: + # Server was offline when Kodi started. + # Wait for server to be fully established. + if self.monitor.waitForAbort(5): + # Abort was requested while waiting. + break + # Alert the user that server is online. + dialog(type_="notification", + heading="{emby}", + message=lang(33003), + icon="{emby}", + time=2000, + sound=False) + + self.server_online = True + window('emby_online', value="true") + log.info("Server is online and ready") + + # Start the userclient thread + if not self.userclient_running: + self.userclient_running = True + user.start() + + break + + if self.monitor.waitForAbort(1): + # Abort was requested while waiting. + break + + def _shutdown(self): if self.userclient_running: - user.stop_client() + self.userclient_thread.stop_client() if self.library_running: - library.stopThread() + self.library_thread.stopThread() if self.websocket_running: - ws.stop_client() + self.websocket_thread.stop_client() log.warn("======== STOP %s ========", self.addon_name) diff --git a/resources/lib/userclient.py b/resources/lib/userclient.py index d43a4f55..56a3016e 100644 --- a/resources/lib/userclient.py +++ b/resources/lib/userclient.py @@ -2,6 +2,7 @@ ################################################################################################## +import json import logging import threading @@ -213,10 +214,14 @@ class UserClient(threading.Thread): raise # Set downloadutils.py values - doutils.setUserId(userid) - doutils.setServer(server) - doutils.setToken(token) - doutils.setSSL(self.get_ssl()) + session = { + 'UserId': userid, + 'Server': server, + 'ServerId': settings('serverId'), + 'Token': token, + 'SSL': self.get_ssl() + } + doutils._set_session(**session) # verify user access try: @@ -225,10 +230,27 @@ class UserClient(threading.Thread): pass # Start downloadutils.py session - doutils.startSession() + doutils.start_session() # Set _user and _server self._set_user_server() + def load_connect_servers(self): + # Set connect servers + if not settings('connectUsername'): + return + + servers = self.connectmanager.get_connect_servers() + added_servers = [] + for server in servers: + if server['Id'] != settings('serverId'): + # TODO: SSL setup + self.doutils.add_server(server, False) + added_servers.append(server['Id']) + + # Set properties + log.info(added_servers) + window('emby_servers', value=json.dumps(added_servers)) + def _reset_client(self): log.info("reset UserClient authentication") @@ -295,7 +317,7 @@ class UserClient(threading.Thread): # Abort was requested while waiting. We should exit break - self.doutils.stopSession() + self.doutils.stop_session() log.warn("##===---- UserClient Stopped ----===##") def stop_client(self): diff --git a/resources/lib/websocket_client.py b/resources/lib/websocket_client.py index 447c81c7..3b70595f 100644 --- a/resources/lib/websocket_client.py +++ b/resources/lib/websocket_client.py @@ -292,7 +292,7 @@ class WebSocketClient(threading.Thread): log.debug("closed") def on_open(self, ws): - self.doutils.postCapabilities(self.device_id) + self.doutils.post_capabilities(self.device_id) def on_error(self, ws, error):