From 302880f67ac339898d3f3c783bf6ca49f7939762 Mon Sep 17 00:00:00 2001 From: Chuddah Date: Wed, 19 Feb 2020 22:18:54 +0000 Subject: [PATCH 1/2] Parallelize multiple http GET requests Added ThreadPoolExecutor and used to process GET requests in multiple threads which enables chunks of data to always be available for processing. Processing of the data can happen as soon as the first chunk arrives. Refactored the code to help implement. The idea is the "params" are built in batch and passed to the thread pool which get the actual results. --- addon.xml | 1 + jellyfin_kodi/downloader.py | 36 +++++++++++++++++++++++++++--------- 2 files changed, 28 insertions(+), 9 deletions(-) diff --git a/addon.xml b/addon.xml index 557bd9bb..042a7146 100644 --- a/addon.xml +++ b/addon.xml @@ -10,6 +10,7 @@ + diff --git a/jellyfin_kodi/downloader.py b/jellyfin_kodi/downloader.py index 2a94c698..3b734deb 100644 --- a/jellyfin_kodi/downloader.py +++ b/jellyfin_kodi/downloader.py @@ -19,6 +19,7 @@ from jellyfin.exceptions import HTTPException LOG = logging.getLogger("JELLYFIN." + __name__) LIMIT = min(int(settings('limitIndex') or 50), 50) +DTHREADS = int(settings('limitThreads') or 3) ################################################################################################# @@ -243,7 +244,8 @@ def _get_items(query, server_id=None): } url = query['url'] - params = query.get('params', {}) + query.setdefault('params', {}) + params = query['params'] try: test_params = dict(params) @@ -256,21 +258,37 @@ def _get_items(query, server_id=None): LOG.exception("Failed to retrieve the server response %s: %s params:%s", url, error, params) else: - index = params.get('StartIndex', 0) - total = items['TotalRecordCount'] + params.setdefault('StartIndex', 0) - while index < total: + def get_query_params(params, start, count): + params_copy = dict(params) + params_copy['StartIndex'] = start + params_copy['Limit'] = count + return params_copy - params['StartIndex'] = index - params['Limit'] = LIMIT - result = _get(url, params, server_id=server_id) or {'Items': []} + query_params = [get_query_params(params, offset, LIMIT) \ + for offset in xrange(params['StartIndex'], items['TotalRecordCount'], LIMIT)] + from itertools import izip + # multiprocessing.dummy.Pool completes all requests in multiple threads but has to + # complete all tasks before allowing any results to be processed. ThreadPoolExecutor + # allows for completed tasks to be processed while other tasks are completed on other + # threads. Dont be a dummy.Pool, be a ThreadPoolExecutor + import concurrent.futures + p = concurrent.futures.ThreadPoolExecutor(DTHREADS) + + results = p.map(lambda params: _get(url, params, server_id=server_id), query_params) + + for params, result in izip(query_params, results): + query['params'] = params + + result = result or {'Items': []} items['Items'].extend(result['Items']) + # Using items to return data and communicate a restore point back to the callee is + # a violation of the SRP. TODO: Seperate responsibilities. items['RestorePoint'] = query yield items - del items['Items'][:] - index += LIMIT class GetItemWorker(threading.Thread): From bf465483657708891ab9164ad638a80c2c5632d9 Mon Sep 17 00:00:00 2001 From: Chuddah Date: Wed, 19 Feb 2020 23:23:21 +0000 Subject: [PATCH 2/2] Removed previous fixup. The recursive call to get_workers was implemented as it looked like the original intent of the DTHREADS variable. Otherwise this variable was redundant. The ThreadPool is a much better use of this setting. --- jellyfin_kodi/library.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/jellyfin_kodi/library.py b/jellyfin_kodi/library.py index 5ca1bdd9..8d4a3f7b 100644 --- a/jellyfin_kodi/library.py +++ b/jellyfin_kodi/library.py @@ -138,8 +138,7 @@ class Library(threading.Thread): if not self.player.isPlayingVideo() or settings('syncDuringPlay.bool') or xbmc.getCondVisibility('VideoPlayer.Content(livetv)'): - while self.worker_downloads(): - pass + self.worker_downloads() self.worker_sort() self.worker_updates() @@ -225,7 +224,6 @@ class Library(threading.Thread): ''' Get items from jellyfin and place them in the appropriate queues. ''' - added_threads = False for queue in ((self.updated_queue, self.updated_output), (self.userdata_queue, self.userdata_output)): if queue[0].qsize() and len(self.download_threads) < DTHREADS: @@ -233,8 +231,6 @@ class Library(threading.Thread): new_thread.start() LOG.info("-->[ q:download/%s ]", id(new_thread)) self.download_threads.append(new_thread) - added_threads = True - return added_threads def worker_sort(self):