mirror of
https://github.com/jellyfin/jellyfin-kodi.git
synced 2025-01-27 18:36:11 +00:00
348 lines
9.8 KiB
Python
348 lines
9.8 KiB
Python
# -*- coding: utf-8 -*-
|
|
from __future__ import division, absolute_import, print_function, unicode_literals
|
|
|
|
#################################################################################################
|
|
|
|
import threading
|
|
import concurrent.futures
|
|
from datetime import date
|
|
|
|
import queue
|
|
|
|
import requests
|
|
|
|
from .helper import settings, stop, window, LazyLogger
|
|
from .jellyfin import Jellyfin
|
|
from .jellyfin import api
|
|
from .helper.exceptions import HTTPException
|
|
|
|
#################################################################################################
|
|
|
|
LOG = LazyLogger(__name__)
|
|
|
|
#################################################################################################
|
|
|
|
|
|
def get_jellyfinserver_url(handler):
|
|
|
|
if handler.startswith("/"):
|
|
|
|
handler = handler[1:]
|
|
LOG.info("handler starts with /: %s", handler)
|
|
|
|
return "{server}/%s" % handler
|
|
|
|
|
|
def _http(action, url, request=None, server_id=None):
|
|
|
|
if request is None:
|
|
request = {}
|
|
|
|
request.update({"url": url, "type": action})
|
|
return Jellyfin(server_id).http.request(request)
|
|
|
|
|
|
def _get(handler, params=None, server_id=None):
|
|
return _http("GET", get_jellyfinserver_url(handler), {"params": params}, server_id)
|
|
|
|
|
|
def _post(handler, json=None, params=None, server_id=None):
|
|
return _http(
|
|
"POST",
|
|
get_jellyfinserver_url(handler),
|
|
{"params": params, "json": json},
|
|
server_id,
|
|
)
|
|
|
|
|
|
def _delete(handler, params=None, server_id=None):
|
|
return _http(
|
|
"DELETE", get_jellyfinserver_url(handler), {"params": params}, server_id
|
|
)
|
|
|
|
|
|
def validate_view(library_id, item_id):
|
|
"""This confirms a single item from the library matches the view it belongs to.
|
|
Used to detect grouped libraries.
|
|
"""
|
|
try:
|
|
result = _get(
|
|
"Users/{UserId}/Items",
|
|
{"ParentId": library_id, "Recursive": True, "Ids": item_id},
|
|
)
|
|
except Exception as error:
|
|
LOG.exception(error)
|
|
return False
|
|
|
|
return bool(len(result["Items"]))
|
|
|
|
|
|
def get_single_item(parent_id, media):
|
|
return _get(
|
|
"Users/{UserId}/Items",
|
|
{
|
|
"ParentId": parent_id,
|
|
"Recursive": True,
|
|
"Limit": 1,
|
|
"IncludeItemTypes": media,
|
|
},
|
|
)
|
|
|
|
|
|
def get_movies_by_boxset(boxset_id):
|
|
|
|
for items in get_items(boxset_id, "Movie"):
|
|
yield items
|
|
|
|
|
|
def get_episode_by_show(show_id):
|
|
|
|
query = {
|
|
"url": "Shows/%s/Episodes" % show_id,
|
|
"params": {
|
|
"EnableUserData": True,
|
|
"EnableImages": True,
|
|
"UserId": "{UserId}",
|
|
"Fields": api.info(),
|
|
},
|
|
}
|
|
for items in _get_items(query):
|
|
yield items
|
|
|
|
|
|
def get_episode_by_season(show_id, season_id):
|
|
|
|
query = {
|
|
"url": "Shows/%s/Episodes" % show_id,
|
|
"params": {
|
|
"SeasonId": season_id,
|
|
"EnableUserData": True,
|
|
"EnableImages": True,
|
|
"UserId": "{UserId}",
|
|
"Fields": api.info(),
|
|
},
|
|
}
|
|
for items in _get_items(query):
|
|
yield items
|
|
|
|
|
|
def get_item_count(parent_id, item_type=None):
|
|
|
|
url = "Users/{UserId}/Items"
|
|
|
|
query_params = {
|
|
"ParentId": parent_id,
|
|
"IncludeItemTypes": item_type,
|
|
"EnableTotalRecordCount": True,
|
|
"LocationTypes": "FileSystem,Remote,Offline",
|
|
"Recursive": True,
|
|
"Limit": 1,
|
|
}
|
|
|
|
result = _get(url, query_params)
|
|
|
|
return result.get("TotalRecordCount", 1)
|
|
|
|
|
|
def get_items(parent_id, item_type=None, basic=False, params=None):
|
|
|
|
query = {
|
|
"url": "Users/{UserId}/Items",
|
|
"params": {
|
|
"ParentId": parent_id,
|
|
"IncludeItemTypes": item_type,
|
|
"SortBy": "SortName",
|
|
"SortOrder": "Ascending",
|
|
"Fields": api.basic_info() if basic else api.info(),
|
|
"CollapseBoxSetItems": False,
|
|
"IsVirtualUnaired": False,
|
|
"EnableTotalRecordCount": False,
|
|
"LocationTypes": "FileSystem,Remote,Offline",
|
|
"IsMissing": False,
|
|
"Recursive": True,
|
|
},
|
|
}
|
|
if params:
|
|
query["params"].update(params)
|
|
|
|
for items in _get_items(query):
|
|
yield items
|
|
|
|
|
|
def get_artists(parent_id=None):
|
|
|
|
query = {
|
|
"url": "Artists",
|
|
"params": {
|
|
"UserId": "{UserId}",
|
|
"ParentId": parent_id,
|
|
"SortBy": "SortName",
|
|
"SortOrder": "Ascending",
|
|
"Fields": api.music_info(),
|
|
"CollapseBoxSetItems": False,
|
|
"IsVirtualUnaired": False,
|
|
"EnableTotalRecordCount": False,
|
|
"LocationTypes": "FileSystem,Remote,Offline",
|
|
"IsMissing": False,
|
|
"Recursive": True,
|
|
},
|
|
}
|
|
|
|
for items in _get_items(query):
|
|
yield items
|
|
|
|
|
|
@stop
|
|
def _get_items(query, server_id=None):
|
|
"""query = {
|
|
'url': string,
|
|
'params': dict -- opt, include StartIndex to resume
|
|
}
|
|
"""
|
|
items = {"Items": [], "TotalRecordCount": 0, "RestorePoint": {}}
|
|
|
|
limit = min(int(settings("limitIndex") or 50), 50)
|
|
dthreads = int(settings("limitThreads") or 3)
|
|
|
|
url = query["url"]
|
|
query.setdefault("params", {})
|
|
params = query["params"]
|
|
|
|
try:
|
|
test_params = dict(params)
|
|
test_params["Limit"] = 1
|
|
test_params["EnableTotalRecordCount"] = True
|
|
|
|
items["TotalRecordCount"] = _get(url, test_params, server_id=server_id)[
|
|
"TotalRecordCount"
|
|
]
|
|
|
|
except Exception as error:
|
|
LOG.exception(
|
|
"Failed to retrieve the server response %s: %s params:%s",
|
|
url,
|
|
error,
|
|
params,
|
|
)
|
|
|
|
else:
|
|
params.setdefault("StartIndex", 0)
|
|
|
|
def get_query_params(params, start, count):
|
|
params_copy = dict(params)
|
|
params_copy["StartIndex"] = start
|
|
params_copy["Limit"] = count
|
|
return params_copy
|
|
|
|
query_params = [
|
|
get_query_params(params, offset, limit)
|
|
for offset in range(params["StartIndex"], items["TotalRecordCount"], limit)
|
|
]
|
|
|
|
# multiprocessing.dummy.Pool completes all requests in multiple threads but has to
|
|
# complete all tasks before allowing any results to be processed. ThreadPoolExecutor
|
|
# allows for completed tasks to be processed while other tasks are completed on other
|
|
# threads. Don't be a dummy.Pool, be a ThreadPoolExecutor
|
|
with concurrent.futures.ThreadPoolExecutor(dthreads) as p:
|
|
# dictionary for storing the jobs and their results
|
|
jobs = {}
|
|
|
|
# semaphore to avoid fetching complete library to memory
|
|
thread_buffer = threading.Semaphore(dthreads)
|
|
|
|
# wrapper function for _get that uses a semaphore
|
|
def get_wrapper(params):
|
|
thread_buffer.acquire()
|
|
return _get(url, params, server_id=server_id)
|
|
|
|
# create jobs
|
|
for param in query_params:
|
|
job = p.submit(get_wrapper, param)
|
|
# the query params are later needed again
|
|
jobs[job] = param
|
|
|
|
# process complete jobs
|
|
for job in concurrent.futures.as_completed(jobs):
|
|
# get the result
|
|
result = job.result() or {"Items": []}
|
|
query["params"] = jobs[job]
|
|
|
|
# free job memory
|
|
del jobs[job]
|
|
del job
|
|
|
|
# Mitigates #216 till the server validates the date provided is valid
|
|
if result["Items"][0].get("ProductionYear"):
|
|
try:
|
|
date(result["Items"][0]["ProductionYear"], 1, 1)
|
|
except ValueError:
|
|
LOG.info(
|
|
"#216 mitigation triggered. Setting ProductionYear to None"
|
|
)
|
|
result["Items"][0]["ProductionYear"] = None
|
|
|
|
items["Items"].extend(result["Items"])
|
|
# Using items to return data and communicate a restore point back to the callee is
|
|
# a violation of the SRP. TODO: Separate responsibilities.
|
|
items["RestorePoint"] = query
|
|
yield items
|
|
del items["Items"][:]
|
|
|
|
# release the semaphore again
|
|
thread_buffer.release()
|
|
|
|
|
|
class GetItemWorker(threading.Thread):
|
|
|
|
is_done = False
|
|
|
|
def __init__(self, server, queue, output):
|
|
|
|
self.server = server
|
|
self.queue = queue
|
|
self.output = output
|
|
threading.Thread.__init__(self)
|
|
|
|
def run(self):
|
|
with requests.Session() as s:
|
|
while True:
|
|
try:
|
|
item_ids = self.queue.get(timeout=1)
|
|
except queue.Empty:
|
|
|
|
self.is_done = True
|
|
LOG.info("--<[ q:download/%s ]", id(self))
|
|
|
|
return
|
|
|
|
request = {
|
|
"type": "GET",
|
|
"handler": "Users/{UserId}/Items",
|
|
"params": {
|
|
"Ids": ",".join(str(x) for x in item_ids),
|
|
"Fields": api.info(),
|
|
},
|
|
}
|
|
|
|
try:
|
|
result = self.server.http.request(request, s)
|
|
|
|
for item in result["Items"]:
|
|
|
|
if item["Type"] in self.output:
|
|
self.output[item["Type"]].put(item)
|
|
except HTTPException as error:
|
|
LOG.error("--[ http status: %s ]", error.status)
|
|
|
|
if error.status == "ServerUnreachable":
|
|
self.is_done = True
|
|
|
|
break
|
|
|
|
except Exception as error:
|
|
LOG.exception(error)
|
|
|
|
self.queue.task_done()
|
|
|
|
if window("jellyfin_should_stop.bool"):
|
|
break
|