[Core] Add prefetch metadata methods for magnets

This commit is contained in:
Calum Lind 2017-06-10 16:58:21 +01:00
parent 23171ad205
commit 633c56f54e
3 changed files with 164 additions and 9 deletions

View file

@ -403,6 +403,26 @@ class Core(component.Component):
else:
return d
@export
def prefetch_magnet_metadata(self, magnet, timeout=60):
"""Download the metadata for the magnet uri without adding torrent to deluge session.
Args:
magnet (str): The magnet uri.
timeout (int): Time to wait to recieve metadata from peers.
Returns:
Deferred: A tuple of (torrent_id (str), metadata (bytes)) for the magnet.
The metadata is base64 encoded.
"""
def on_metadata(result):
torrent_id, metadata = result
return torrent_id, b64encode(metadata)
return self.torrentmanager.prefetch_metadata(magnet, timeout).addCallback(on_metadata)
@export
def add_torrent_file(self, filename, filedump, options):
"""Adds a torrent file to the session.

View file

@ -17,7 +17,7 @@ import operator
import os
import time
from twisted.internet import defer, reactor, threads
from twisted.internet import defer, error, reactor, threads
from twisted.internet.defer import Deferred, DeferredList
from twisted.internet.task import LoopingCall
@ -114,6 +114,7 @@ class TorrentManager(component.Component):
This object is also responsible for saving the state of the session for use on restart.
"""
callLater = reactor.callLater
def __init__(self):
component.Component.__init__(
@ -140,6 +141,7 @@ class TorrentManager(component.Component):
self.is_saving_state = False
self.save_resume_data_file_lock = defer.DeferredLock()
self.torrents_loading = {}
self.prefetching_metadata = {}
# This is a map of torrent_ids to Deferreds used to track needed resume data.
# The Deferreds will be completed when resume data has been saved.
@ -301,6 +303,59 @@ class TorrentManager(component.Component):
else:
return torrent_info
def prefetch_metadata(self, magnet, timeout=60):
"""Download metadata for a magnet uri.
Args:
magnet (str): A magnet uri to download the metadata for.
timeout (int): How long
Returns:
Deferred: A tuple of (torrent_id (str), bencoded metadata (bytes))
"""
add_torrent_params = {}
# need a temp save_path
add_torrent_params['save_path'] = '/tmp'
add_torrent_params['url'] = magnet.strip().encode('utf8')
# do we need to make it not auto_managed to force start. what about queue?
add_torrent_params['flags'] = ((LT_DEFAULT_ADD_TORRENT_FLAGS |
lt.add_torrent_params_flags_t.flag_duplicate_is_error |
lt.add_torrent_params_flags_t.flag_upload_mode))
torrent_handle = self.session.add_torrent(add_torrent_params)
torrent_id = str(torrent_handle.info_hash())
def on_metadata(torrent_info, torrent_id, defer_timeout):
# Cancel reactor.callLater.
try:
defer_timeout.cancel()
except error.AlreadyCalled:
pass
log.debug('remove magnet from session')
try:
torrent_handle = self.prefetching_metadata.pop(torrent_id)[1]
except KeyError:
pass
else:
self.session.remove_torrent(torrent_handle, 1)
metadata = ''
if isinstance(torrent_info, lt.torrent_info):
log.debug('metadata received')
metadata = torrent_info.metadata()
return torrent_id, metadata
d = Deferred()
# Cancel the defer if timeout reached.
defer_timeout = self.callLater(timeout, d.cancel)
d.addBoth(on_metadata, torrent_id, defer_timeout)
self.prefetching_metadata[torrent_id] = (d, torrent_handle)
return d
def _build_torrent_options(self, options):
"""Load default options and update if needed."""
_options = TorrentOptions()
@ -343,6 +398,10 @@ class TorrentManager(component.Component):
raise AddTorrentError('Torrent already in session (%s).' % torrent_id)
elif torrent_id in self.torrents_loading:
raise AddTorrentError('Torrent already being added (%s).' % torrent_id)
elif torrent_id in self.prefetching_metadata:
# Cancel and remove metadata fetching torrent.
d = self.prefetching_metadata[torrent_id][0]
d.cancel()
# Check for renamed files and if so, rename them in the torrent_info before adding.
if options['mapped_files'] and torrent_info:
@ -1320,10 +1379,25 @@ class TorrentManager(component.Component):
def on_alert_metadata_received(self, alert):
"""Alert handler for libtorrent metadata_received_alert"""
try:
torrent = self.torrents[str(alert.handle.info_hash())]
except (RuntimeError, KeyError):
torrent_id = str(alert.handle.info_hash())
except RuntimeError:
return
torrent.on_metadata_received()
try:
torrent = self.torrents[torrent_id]
except KeyError:
pass
else:
return torrent.on_metadata_received()
# Try callback to prefetch_metadata method.
try:
d = self.prefetching_metadata[torrent_id][0]
except KeyError:
pass
else:
torrent_info = alert.handle.get_torrent_info()
return d.callback(torrent_info)
def on_alert_file_error(self, alert):
"""Alert handler for libtorrent file_error_alert"""

View file

@ -10,10 +10,12 @@ from __future__ import unicode_literals
import warnings
from base64 import b64encode
import mock
import pytest
from twisted.internet import defer
from twisted.internet import defer, task
from deluge import component
from deluge.bencode import bencode
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.error import InvalidTorrentError
@ -32,6 +34,9 @@ class TorrentmanagerTestCase(BaseTestCase):
self.rpcserver = RPCServer(listen=False)
self.core = Core()
self.core.config.config['lsd'] = False
self.clock = task.Clock()
self.tm = self.core.torrentmanager
self.tm.callLater = self.clock.callLater
return component.start()
def tear_down(self):
@ -46,9 +51,64 @@ class TorrentmanagerTestCase(BaseTestCase):
def test_remove_torrent(self):
filename = common.get_test_data_file('test.torrent')
with open(filename) as _file:
filedump = b64encode(_file.read())
torrent_id = yield self.core.add_torrent_file_async(filename, filedump, {})
self.assertTrue(self.core.torrentmanager.remove(torrent_id, False))
filedump = _file.read()
torrent_id = yield self.core.add_torrent_file_async(
filename, b64encode(filedump), {})
self.assertTrue(self.tm.remove(torrent_id, False))
def test_prefetch_metadata(self):
from deluge._libtorrent import lt
with open(common.get_test_data_file('test.torrent')) as _file:
t_info = lt.torrent_info(lt.bdecode(_file.read()))
mock_alert = mock.MagicMock()
mock_alert.handle.info_hash = mock.MagicMock(
return_value='ab570cdd5a17ea1b61e970bb72047de141bce173')
mock_alert.handle.get_torrent_info = mock.MagicMock(
return_value=t_info)
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet)
self.tm.on_alert_metadata_received(mock_alert)
expected = (
'ab570cdd5a17ea1b61e970bb72047de141bce173',
bencode({
'piece length': 32768,
'sha1': (
b'2\xce\xb6\xa8"\xd7\xf0\xd4\xbf\xdc^K\xba\x1bh'
b'\x9d\xc5\xb7\xac\xdd'
),
'name': 'azcvsupdater_2.6.2.jar',
'private': 0,
'pieces': (
b'\xdb\x04B\x05\xc3\'\xdab\xb8su97\xa9u'
b'\xca<w\\\x1ef\xd4\x9b\x16\xa9}\xc0\x9f:\xfd'
b'\x97qv\x83\xa2"\xef\x9d7\x0by!\rl\xe5v\xb7'
b'\x18{\xf7/"P\xe9\x8d\x01D\x9e8\xbd\x16\xe3'
b'\xfb-\x9d\xaa\xbcM\x11\xba\x92\xfc\x13F\xf0'
b'\x1c\x86x+\xc8\xd0S\xa9\x90`\xa1\xe4\x82\xe8'
b'\xfc\x08\xf7\xe3\xe5\xf6\x85\x1c%\xe7%\n\xed'
b'\xc0\x1f\xa1;\x9a\xea\xcf\x90\x0c/F>\xdf\xdagA'
b'\xc42|\xda\x82\xf5\xa6b\xa1\xb8#\x80wI\xd8f'
b'\xf8\xbd\xacW\xab\xc3s\xe0\xbbw\xf2K\xbe\xee'
b'\xa8rG\xe1W\xe8\xb7\xc2i\xf3\xd8\xaf\x9d\xdc'
b'\xd0#\xf4\xc1\x12u\xcd\x0bE?:\xe8\x9c\x1cu'
b'\xabb(oj\r^\xd5\xd5A\x83\x88\x9a\xa1J\x1c?'
b'\xa1\xd6\x8c\x83\x9e&'
),
'length': 307949,
'name.utf-8': b'azcvsupdater_2.6.2.jar',
'ed2k': b'>p\xefl\xfa]\x95K\x1b^\xc2\\;;e\xb7',
}),
)
self.assertEqual(expected, self.successResultOf(d))
def test_prefetch_metadata_timeout(self):
magnet = 'magnet:?xt=urn:btih:ab570cdd5a17ea1b61e970bb72047de141bce173'
d = self.tm.prefetch_metadata(magnet)
self.clock.advance(60)
expected = ('ab570cdd5a17ea1b61e970bb72047de141bce173', '')
return d.addCallback(self.assertEqual, expected)
@pytest.mark.todo
def test_remove_torrent_false(self):
@ -56,4 +116,5 @@ class TorrentmanagerTestCase(BaseTestCase):
common.todo_test(self)
def test_remove_invalid_torrent(self):
self.assertRaises(InvalidTorrentError, self.core.torrentmanager.remove, 'torrentidthatdoesntexist')
self.assertRaises(
InvalidTorrentError, self.tm.remove, 'torrentidthatdoesntexist')