From 633c56f54ecb5733aa29670273c6d5f88db404ad Mon Sep 17 00:00:00 2001 From: Calum Lind Date: Sat, 10 Jun 2017 16:58:21 +0100 Subject: [PATCH] [Core] Add prefetch metadata methods for magnets --- deluge/core/core.py | 20 +++++++ deluge/core/torrentmanager.py | 82 +++++++++++++++++++++++++++-- deluge/tests/test_torrentmanager.py | 71 +++++++++++++++++++++++-- 3 files changed, 164 insertions(+), 9 deletions(-) diff --git a/deluge/core/core.py b/deluge/core/core.py index 00266fac2..387e32488 100644 --- a/deluge/core/core.py +++ b/deluge/core/core.py @@ -403,6 +403,26 @@ class Core(component.Component): else: return d + @export + def prefetch_magnet_metadata(self, magnet, timeout=60): + """Download the metadata for the magnet uri without adding torrent to deluge session. + + Args: + magnet (str): The magnet uri. + timeout (int): Time to wait to recieve metadata from peers. + + Returns: + Deferred: A tuple of (torrent_id (str), metadata (bytes)) for the magnet. + + The metadata is base64 encoded. + + """ + def on_metadata(result): + torrent_id, metadata = result + return torrent_id, b64encode(metadata) + + return self.torrentmanager.prefetch_metadata(magnet, timeout).addCallback(on_metadata) + @export def add_torrent_file(self, filename, filedump, options): """Adds a torrent file to the session. diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index 2004dd5c8..86ad76466 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -17,7 +17,7 @@ import operator import os import time -from twisted.internet import defer, reactor, threads +from twisted.internet import defer, error, reactor, threads from twisted.internet.defer import Deferred, DeferredList from twisted.internet.task import LoopingCall @@ -114,6 +114,7 @@ class TorrentManager(component.Component): This object is also responsible for saving the state of the session for use on restart. """ + callLater = reactor.callLater def __init__(self): component.Component.__init__( @@ -140,6 +141,7 @@ class TorrentManager(component.Component): self.is_saving_state = False self.save_resume_data_file_lock = defer.DeferredLock() self.torrents_loading = {} + self.prefetching_metadata = {} # This is a map of torrent_ids to Deferreds used to track needed resume data. # The Deferreds will be completed when resume data has been saved. @@ -301,6 +303,59 @@ class TorrentManager(component.Component): else: return torrent_info + def prefetch_metadata(self, magnet, timeout=60): + """Download metadata for a magnet uri. + + Args: + magnet (str): A magnet uri to download the metadata for. + timeout (int): How long + + Returns: + Deferred: A tuple of (torrent_id (str), bencoded metadata (bytes)) + + """ + + add_torrent_params = {} + # need a temp save_path + add_torrent_params['save_path'] = '/tmp' + add_torrent_params['url'] = magnet.strip().encode('utf8') + # do we need to make it not auto_managed to force start. what about queue? + add_torrent_params['flags'] = ((LT_DEFAULT_ADD_TORRENT_FLAGS | + lt.add_torrent_params_flags_t.flag_duplicate_is_error | + lt.add_torrent_params_flags_t.flag_upload_mode)) + + torrent_handle = self.session.add_torrent(add_torrent_params) + torrent_id = str(torrent_handle.info_hash()) + + def on_metadata(torrent_info, torrent_id, defer_timeout): + # Cancel reactor.callLater. + try: + defer_timeout.cancel() + except error.AlreadyCalled: + pass + + log.debug('remove magnet from session') + try: + torrent_handle = self.prefetching_metadata.pop(torrent_id)[1] + except KeyError: + pass + else: + self.session.remove_torrent(torrent_handle, 1) + + metadata = '' + if isinstance(torrent_info, lt.torrent_info): + log.debug('metadata received') + metadata = torrent_info.metadata() + + return torrent_id, metadata + + d = Deferred() + # Cancel the defer if timeout reached. + defer_timeout = self.callLater(timeout, d.cancel) + d.addBoth(on_metadata, torrent_id, defer_timeout) + self.prefetching_metadata[torrent_id] = (d, torrent_handle) + return d + def _build_torrent_options(self, options): """Load default options and update if needed.""" _options = TorrentOptions() @@ -343,6 +398,10 @@ class TorrentManager(component.Component): raise AddTorrentError('Torrent already in session (%s).' % torrent_id) elif torrent_id in self.torrents_loading: raise AddTorrentError('Torrent already being added (%s).' % torrent_id) + elif torrent_id in self.prefetching_metadata: + # Cancel and remove metadata fetching torrent. + d = self.prefetching_metadata[torrent_id][0] + d.cancel() # Check for renamed files and if so, rename them in the torrent_info before adding. if options['mapped_files'] and torrent_info: @@ -1320,10 +1379,25 @@ class TorrentManager(component.Component): def on_alert_metadata_received(self, alert): """Alert handler for libtorrent metadata_received_alert""" try: - torrent = self.torrents[str(alert.handle.info_hash())] - except (RuntimeError, KeyError): + torrent_id = str(alert.handle.info_hash()) + except RuntimeError: return - torrent.on_metadata_received() + + try: + torrent = self.torrents[torrent_id] + except KeyError: + pass + else: + return torrent.on_metadata_received() + + # Try callback to prefetch_metadata method. + try: + d = self.prefetching_metadata[torrent_id][0] + except KeyError: + pass + else: + torrent_info = alert.handle.get_torrent_info() + return d.callback(torrent_info) def on_alert_file_error(self, alert): """Alert handler for libtorrent file_error_alert""" diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py index 88e0fdf49..d5dc8fd32 100644 --- a/deluge/tests/test_torrentmanager.py +++ b/deluge/tests/test_torrentmanager.py @@ -10,10 +10,12 @@ from __future__ import unicode_literals import warnings from base64 import b64encode +import mock import pytest -from twisted.internet import defer +from twisted.internet import defer, task from deluge import component +from deluge.bencode import bencode from deluge.core.core import Core from deluge.core.rpcserver import RPCServer from deluge.error import InvalidTorrentError @@ -32,6 +34,9 @@ class TorrentmanagerTestCase(BaseTestCase): self.rpcserver = RPCServer(listen=False) self.core = Core() self.core.config.config['lsd'] = False + self.clock = task.Clock() + self.tm = self.core.torrentmanager + self.tm.callLater = self.clock.callLater return component.start() def tear_down(self): @@ -46,9 +51,64 @@ class TorrentmanagerTestCase(BaseTestCase): def test_remove_torrent(self): filename = common.get_test_data_file('test.torrent') with open(filename) as _file: - filedump = b64encode(_file.read()) - torrent_id = yield self.core.add_torrent_file_async(filename, filedump, {}) - self.assertTrue(self.core.torrentmanager.remove(torrent_id, False)) + filedump = _file.read() + torrent_id = yield self.core.add_torrent_file_async( + filename, b64encode(filedump), {}) + self.assertTrue(self.tm.remove(torrent_id, False)) + + def test_prefetch_metadata(self): + from deluge._libtorrent import lt + with open(common.get_test_data_file('test.torrent')) as _file: + t_info = lt.torrent_info(lt.bdecode(_file.read())) + mock_alert = mock.MagicMock() + mock_alert.handle.info_hash = mock.MagicMock( + return_value='ab570cdd5a17ea1b61e970bb72047de141bce173') + mock_alert.handle.get_torrent_info = mock.MagicMock( + return_value=t_info) + + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + d = self.tm.prefetch_metadata(magnet) + self.tm.on_alert_metadata_received(mock_alert) + + expected = ( + 'ab570cdd5a17ea1b61e970bb72047de141bce173', + bencode({ + 'piece length': 32768, + 'sha1': ( + b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh' + b'\x9d\xc5\xb7\xac\xdd' + ), + 'name': 'azcvsupdater_2.6.2.jar', + 'private': 0, + 'pieces': ( + b'\xdb\x04B\x05\xc3\'\xdab\xb8su97\xa9u' + b'\xca\xdf\xdagA' + b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f' + b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee' + b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc' + b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu' + b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?' + b'\xa1\xd6\x8c\x83\x9e&' + ), + 'length': 307949, + 'name.utf-8': b'azcvsupdater_2.6.2.jar', + 'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7', + }), + ) + self.assertEqual(expected, self.successResultOf(d)) + + def test_prefetch_metadata_timeout(self): + magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173' + d = self.tm.prefetch_metadata(magnet) + self.clock.advance(60) + expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', '') + return d.addCallback(self.assertEqual, expected) @pytest.mark.todo def test_remove_torrent_false(self): @@ -56,4 +116,5 @@ class TorrentmanagerTestCase(BaseTestCase): common.todo_test(self) def test_remove_invalid_torrent(self): - self.assertRaises(InvalidTorrentError, self.core.torrentmanager.remove, 'torrentidthatdoesntexist') + self.assertRaises( + InvalidTorrentError, self.tm.remove, 'torrentidthatdoesntexist')