jellyfin-kodi/resources/lib/downloader.py
angelblue05 11f771ade2 Remember sync position for manual triggers
Allow to resume sync on restart for manual user triggers (update, repair). Automatically refresh boxsets if movie library is selected.

use waitForAbort and emby_should_stop prop to terminate threads
2018-09-22 02:52:14 -05:00

330 lines
9.2 KiB
Python

# -*- coding: utf-8 -*-
#################################################################################################
import json
import logging
import Queue
import threading
import os
import xbmc
import xbmcvfs
from libraries import requests
from helper.utils import should_stop, delete_folder
from helper import settings, stop, event, window, kodi_version, unzip
from emby import Emby
from emby.core import api
from emby.core.exceptions import HTTPException
#################################################################################################
LOG = logging.getLogger("EMBY."+__name__)
LIMIT = min(int(settings('limitIndex') or 50), 50)
#################################################################################################
def get_embyserver_url(handler):
if handler.startswith('/'):
handler = handler[1:]
LOG.warn("handler starts with /: %s", handler)
return "{server}/emby/%s" % handler
def browse_info():
return (
"DateCreated,EpisodeCount,SeasonCount,Path,Genres,Studios,Taglines,MediaStreams,Overview,Etag,"
"ProductionLocations,Width,Height,RecursiveItemCount,ChildCount"
)
def _http(action, url, request={}, server_id=None):
request.update({'url': url, 'type': action})
return Emby(server_id)['http/request'](request)
def _get(handler, params=None, server_id=None):
return _http("GET", get_embyserver_url(handler), {'params': params}, server_id)
def _post(handler, json=None, params=None, server_id=None):
return _http("POST", get_embyserver_url(handler), {'params': params, 'json': json}, server_id)
def _delete(handler, params=None, server_id=None):
return _http("DELETE", get_embyserver_url(handler), {'params': params}, server_id)
def validate_view(library_id, item_id):
''' This confirms a single item from the library matches the view it belongs to.
Used to detect grouped libraries.
'''
try:
result = _get("Users/{UserId}/Items", {
'ParentId': library_id,
'Recursive': True,
'Ids': item_id
})
except Exception:
return False
return True if len(result['Items']) else False
def get_single_item(parent_id, media):
return _get("Users/{UserId}/Items", {
'ParentId': parent_id,
'Recursive': True,
'Limit': 1,
'IncludeItemTypes': media
})
def get_filtered_section(parent_id, media=None, limit=None, recursive=None, sort=None, sort_order=None,
filters=None, server_id=None):
''' Get dynamic listings.
'''
params = {
'ParentId': parent_id,
'IncludeItemTypes': media,
'IsMissing': False,
'Recursive': recursive if recursive is not None else True,
'Limit': limit,
'SortBy': sort or "SortName",
'SortOrder': sort_order or "Ascending",
'ImageTypeLimit': 1,
'IsVirtualUnaired': False,
'Fields': browse_info()
}
if filters:
if 'Boxsets' in filters:
filters.remove('Boxsets')
params['CollapseBoxSetItems'] = settings('groupedSets.bool')
params['Filters'] = ','.join(filters)
if settings('getCast.bool'):
params['Fields'] += ",People"
if media and 'Photo' in media:
params['Fields'] += ",Width,Height"
return _get("Users/{UserId}/Items", params, server_id)
def get_movies_by_boxset(boxset_id):
for items in get_items(boxset_id, "Movie"):
yield items
def get_episode_by_show(show_id):
for items in get_items(show_id, "Episode"):
yield items
def get_items(parent_id, item_type=None, basic=False, params=None):
query = {
'url': "Users/{UserId}/Items",
'params': {
'ParentId': parent_id,
'IncludeItemTypes': item_type,
'SortBy': "SortName",
'SortOrder': "Ascending",
'Fields': api.basic_info() if basic else api.info()
}
}
if params:
query['params'].update(params)
for items in _get_items(query):
yield items
def get_artists(parent_id=None, basic=False, params=None, server_id=None):
query = {
'url': "Artists",
'params': {
'UserId': "{UserId}",
'ParentId': parent_id,
'SortBy': "SortName",
'SortOrder': "Ascending",
'Fields': api.basic_info() if basic else api.music_info()
}
}
if params:
query['params'].update(params)
for items in _get_items(query, server_id):
yield items
def get_albums_by_artist(artist_id, basic=False):
params = {
'SortBy': "DateCreated",
'ArtistIds': artist_id
}
for items in get_items(None, "MusicAlbum", basic, params):
yield items
@stop()
def _get_items(query, server_id=None):
''' query = {
'url': string,
'params': dict -- opt, include StartIndex to resume
}
'''
items = {
'Items': [],
'TotalRecordCount': 0,
'RestorePoint': {}
}
url = query['url']
params = query.get('params', {})
params.update({
'CollapseBoxSetItems': False,
'IsVirtualUnaired': False,
'EnableTotalRecordCount': False,
'LocationTypes': "FileSystem,Remote,Offline",
'IsMissing': False,
'Recursive': True
})
try:
test_params = dict(params)
test_params['Limit'] = 1
test_params['EnableTotalRecordCount'] = True
items['TotalRecordCount'] = _get(url, test_params, server_id=server_id)['TotalRecordCount']
except Exception as error:
LOG.error("Failed to retrieve the server response %s: %s params:%s", url, error, params)
else:
index = params.get('StartIndex', 0)
total = items['TotalRecordCount']
while index < total:
params['StartIndex'] = index
params['Limit'] = LIMIT
result = _get(url, params, server_id=server_id)
items['Items'].extend(result['Items'])
items['RestorePoint'] = query
yield items
del items['Items'][:]
index += LIMIT
class GetItemWorker(threading.Thread):
is_done = False
def __init__(self, server, queue, output):
self.server = server
self.queue = queue
self.output = output
threading.Thread.__init__(self)
def run(self):
with requests.Session() as s:
while True:
try:
item_id = self.queue.get(timeout=1)
except Queue.Empty:
self.is_done = True
LOG.info("--<[ q:download/%s ]", id(self))
return
request = {'type': "GET", 'handler': "Users/{UserId}/Items/%s" % item_id}
try:
result = self.server['http/request'](request, s)
if result['Type'] in self.output:
self.output[result['Type']].put(result)
except HTTPException as error:
LOG.error("--[ http status: %s ]", error.status)
if error.status != 500: # to retry
continue
except Exception as error:
LOG.exception(error)
self.queue.task_done()
if window('emby_should_stop.bool'):
break
class TheVoid(object):
def __init__(self, method, data):
''' This will block until response is received.
This is meant to go as fast as possible, a response will always be returned.
'''
if type(data) != dict:
raise Exception("unexpected data format")
data['VoidName'] = id(self)
LOG.info("---[ contact mothership/%s ]", method)
LOG.debug(data)
event(method, data)
self.method = method
self.data = data
self.monitor = xbmc.Monitor()
def get(self):
while True:
response = window('emby_%s.json' % self.data['VoidName'])
if response != "":
LOG.debug("--<[ beacon/emby_%s.json ]", self.data['VoidName'])
window('emby_%s' % self.data['VoidName'], clear=True)
return response
if window('emby_should_stop.bool') or self.monitor.waitForAbort(0.2):
LOG.info("Abandon mission! A black hole just swallowed [ %s ]", self.data['VoidName'])
break
def get_objects(src, filename):
''' Download objects dependency to temp cache folder.
'''
temp = xbmc.translatePath('special://temp/emby').decode('utf-8')
restart = not xbmcvfs.exists(os.path.join(temp, "objects") + '/')
path = os.path.join(temp, filename).encode('utf-8')
if not xbmcvfs.exists(path):
delete_folder()
LOG.info("From %s to %s", src, path)
try:
response = requests.get(src, stream=True, verify=False)
response.raise_for_status()
except Exception as error:
raise
else:
dl = xbmcvfs.File(path, 'w')
dl.write(response.content)
dl.close()
del response
unzip(path, temp, "objects")
return restart