diff --git a/deluge/core/alertmanager.py b/deluge/core/alertmanager.py index ee745f702..54a86e9c0 100644 --- a/deluge/core/alertmanager.py +++ b/deluge/core/alertmanager.py @@ -50,11 +50,10 @@ class AlertManager(component.Component): # handlers is a dictionary of lists {"alert_type": [handler1,h2,..]} self.handlers = {} self.delayed_calls = [] - self.wait_on_handler = False def update(self): self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()] - self.handle_alerts(wait=self.wait_on_handler) + self.handle_alerts() def stop(self): for delayed_call in self.delayed_calls: @@ -92,12 +91,9 @@ class AlertManager(component.Component): # Handler is in this alert type list value.remove(handler) - def handle_alerts(self, wait=False): - """Pops all libtorrent alerts in the session queue and handles them appropriately. - - Args: - wait (bool): If True the handler functions will be run straight away and - waited to return before processing the next alert. + def handle_alerts(self): + """ + Pops all libtorrent alerts in the session queue and handles them appropriately. """ alerts = self.session.pop_alerts() if not alerts: @@ -118,10 +114,7 @@ class AlertManager(component.Component): # Call any handlers for this alert type if alert_type in self.handlers: for handler in self.handlers[alert_type]: - if not wait: - self.delayed_calls.append(reactor.callLater(0, handler, alert)) - else: - handler(alert) + self.delayed_calls.append(reactor.callLater(0, handler, alert)) def set_alert_queue_size(self, queue_size): """Sets the maximum size of the libtorrent alert queue""" diff --git a/deluge/core/core.py b/deluge/core/core.py index acaff8e3f..01afab112 100644 --- a/deluge/core/core.py +++ b/deluge/core/core.py @@ -16,7 +16,7 @@ import shutil import tempfile import threading -from twisted.internet import reactor, task +from twisted.internet import defer, reactor, task from twisted.web.client import getPage import deluge.common @@ -33,7 +33,7 @@ from deluge.core.pluginmanager import PluginManager from deluge.core.preferencesmanager import PreferencesManager from deluge.core.rpcserver import export from deluge.core.torrentmanager import TorrentManager -from deluge.error import DelugeError, InvalidPathError, InvalidTorrentError +from deluge.error import AddTorrentError, DelugeError, InvalidPathError, InvalidTorrentError from deluge.event import NewVersionAvailableEvent, SessionPausedEvent, SessionResumedEvent, TorrentQueueChangedEvent from deluge.httpdownloader import download_file @@ -211,13 +211,14 @@ class Core(component.Component): log.error("There was an error decoding the filedump string: %s", ex) try: - torrent_id = self.torrentmanager.add( + d = self.torrentmanager.add( filedump=filedump, options=options, filename=filename, save_state=save_state ) except RuntimeError as ex: log.error("There was an error adding the torrent file %s: %s", filename, ex) - torrent_id = None - return torrent_id + raise + else: + return d # Exported Methods @export @@ -246,14 +247,18 @@ class Core(component.Component): Deferred """ + @defer.inlineCallbacks def add_torrents(): - torrent_ids = [] - count = len(torrent_files) + errors = [] + last_index = len(torrent_files) - 1 for idx, torrent in enumerate(torrent_files): - torrent_id = self._add_torrent_file(torrent[0], torrent[1], - torrent[2], save_state=idx == (count - 1)) - torrent_ids.append(torrent_id) - return torrent_ids + try: + yield self._add_torrent_file(torrent[0], torrent[1], + torrent[2], save_state=idx == last_index) + except AddTorrentError as ex: + log.warn("Error when adding torrent: '%s'", ex) + errors.append(ex) + defer.returnValue(errors) return task.deferLater(reactor, 0, add_torrents) @export diff --git a/deluge/core/torrentmanager.py b/deluge/core/torrentmanager.py index 77026c670..e4730347a 100644 --- a/deluge/core/torrentmanager.py +++ b/deluge/core/torrentmanager.py @@ -10,6 +10,7 @@ """TorrentManager handles Torrent objects""" import cPickle +import datetime import logging import operator import os @@ -26,7 +27,7 @@ from deluge.common import decode_string, get_magnet_info, utf8_encoded from deluge.configmanager import ConfigManager, get_config_dir from deluge.core.authmanager import AUTH_LEVEL_ADMIN from deluge.core.torrent import Torrent, TorrentOptions, sanitize_filepath -from deluge.error import InvalidTorrentError +from deluge.error import AddTorrentError, InvalidTorrentError from deluge.event import (PreTorrentRemovedEvent, SessionStartedEvent, TorrentAddedEvent, TorrentFileCompletedEvent, TorrentFileRenamedEvent, TorrentFinishedEvent, TorrentRemovedEvent, TorrentResumedEvent) @@ -105,6 +106,7 @@ class TorrentManager(component.Component): self.torrents = {} self.queued_torrents = set() self.is_saving_state = False + self.torrents_loading = {} # This is a map of torrent_ids to Deferreds used to track needed resume data. # The Deferreds will be completed when resume data has been saved. @@ -152,6 +154,7 @@ class TorrentManager(component.Component): self.alerts.register_handler("external_ip_alert", self.on_alert_external_ip) self.alerts.register_handler("performance_alert", self.on_alert_performance) self.alerts.register_handler("fastresume_rejected_alert", self.on_alert_fastresume_rejected) + self.alerts.register_handler("add_torrent_alert", self.on_add_torrent_alert) # Define timers self.save_state_timer = LoopingCall(self.save_state) @@ -163,7 +166,6 @@ class TorrentManager(component.Component): if os.path.isfile(self.temp_file): def archive_file(filename): """Archives the file in 'archive' sub-directory with timestamp appended""" - import datetime filepath = os.path.join(self.state_dir, filename) filepath_bak = filepath + ".bak" archive_dir = os.path.join(get_config_dir(), "archive") @@ -302,16 +304,14 @@ class TorrentManager(component.Component): TorrentAddedEvent: Torrent with torrent_id added to session. """ - if torrent_info is None and filedump is None and magnet is None: - log.error("You must specify a valid torrent_info, torrent state or magnet.") - return + if not torrent_info and not filedump and not magnet: + raise AddTorrentError("You must specify a valid torrent_info, torrent state or magnet.") if filedump: try: torrent_info = lt.torrent_info(lt.bdecode(filedump)) except RuntimeError as ex: - log.error("Unable to add torrent, decoding filedump failed: %s", ex) - return + raise AddTorrentError("Unable to add torrent, decoding filedump failed: %s" % ex) add_torrent_params = {} if torrent_info: @@ -329,15 +329,15 @@ class TorrentManager(component.Component): add_torrent_params["name"] = magnet_info["name"] torrent_id = magnet_info["info_hash"] else: - log.error("Unable to add magnet, invalid magnet info: %s", magnet) - return + raise AddTorrentError("Unable to add magnet, invalid magnet info: %s" % magnet) # Check for existing torrent in session. if torrent_id in self.get_torrent_list(): - log.warning("Unable to add torrent (%s), already in session", torrent_id) # Attempt merge trackers before returning. self.torrents[torrent_id].merge_trackers(torrent_info) - return + raise AddTorrentError("Torrent already in session (%s)." % torrent_id) + elif torrent_id in self.torrents_loading: + raise AddTorrentError("Torrent already being added (%s)." % torrent_id) # Load default options and update if needed. _options = TorrentOptions() @@ -385,24 +385,31 @@ class TorrentManager(component.Component): if options["seed_mode"]: add_torrent_params["flags"] |= lt.add_torrent_params_flags_t.flag_seed_mode - # We need to pause the AlertManager momentarily to prevent alerts - # for this torrent being generated before a Torrent object is created. - component.pause("AlertManager") - + d = Deferred() try: - handle = self.session.add_torrent(add_torrent_params) - if not handle.is_valid(): - raise InvalidTorrentError("Torrent handle is invalid!") - except (RuntimeError, InvalidTorrentError) as ex: - log.error("Unable to add torrent to session: %s", ex) - component.resume("AlertManager") + self.torrents_loading[torrent_id] = (d, options, state, filename, magnet, resume_data, filedump, save_state) + self.session.async_add_torrent(add_torrent_params) + except RuntimeError as ex: + raise AddTorrentError("Unable to add torrent to session: %s" % ex) + return d + + def on_add_torrent_alert(self, alert): + """Alert handler for libtorrent add_torrent_alert""" + if not alert.handle.is_valid(): + log.warn("Torrent handle is invalid!") return - # Create a Torrent object and add to the dictionary. - torrent = Torrent(handle, options, state, filename, magnet) - self.torrents[torrent.torrent_id] = torrent + try: + torrent_id = str(alert.handle.info_hash()) + except RuntimeError as ex: + log.warn("Failed to get torrent id from handle: %s", ex) + return - component.resume("AlertManager") + d, options, state, filename, magnet, resume_data, filedump, save_state = self.torrents_loading.pop(torrent_id) + + # Create a Torrent object and add to the dictionary. + torrent = Torrent(alert.handle, options, state, filename, magnet) + self.torrents[torrent.torrent_id] = torrent # Store the orignal resume_data, in case of errors. if resume_data: @@ -422,7 +429,7 @@ class TorrentManager(component.Component): component.get("EventManager").emit(TorrentAddedEvent(torrent.torrent_id, from_state)) if log.isEnabledFor(logging.DEBUG): - log.debug("Torrent added: %s", str(handle.info_hash())) + log.debug("Torrent added: %s", str(alert.handle.info_hash())) if log.isEnabledFor(logging.INFO): name_and_owner = torrent.get_status(["name", "owner"]) log.info("Torrent %s from user \"%s\" %s", @@ -438,7 +445,7 @@ class TorrentManager(component.Component): if save_state: self.save_state() - return torrent.torrent_id + d.callback(torrent.torrent_id) def remove(self, torrent_id, remove_data=False, save_state=True): """Remove a torrent from the session. @@ -556,6 +563,7 @@ class TorrentManager(component.Component): SessionStartedEvent: Emitted after all torrents are added to the session. """ + start = datetime.datetime.now() state = self.open_state() state = self.fixup_state(state) @@ -563,10 +571,7 @@ class TorrentManager(component.Component): state.torrents.sort(key=operator.attrgetter("queue"), reverse=self.config["queue_new_to_top"]) resume_data = self.load_resume_data_file() - # Tell alertmanager to wait for the handlers while adding torrents. - # This speeds up startup loading the torrents by quite a lot for some reason (~40%) - self.alerts.wait_on_handler = True - + deferreds = [] for t_state in state.torrents: # Populate the options dict from state options = TorrentOptions() @@ -587,12 +592,16 @@ class TorrentManager(component.Component): if torrent_info: magnet = None - self.add(torrent_info=torrent_info, state=t_state, options=options, save_state=False, - magnet=magnet, resume_data=resume_data.get(t_state.torrent_id)) + d = self.add(torrent_info=torrent_info, state=t_state, options=options, save_state=False, + magnet=magnet, resume_data=resume_data.get(t_state.torrent_id)) + deferreds.append(d) - self.alerts.wait_on_handler = False - log.info("Finished loading %d torrents.", len(state.torrents)) - component.get("EventManager").emit(SessionStartedEvent()) + deferred_list = DeferredList(deferreds, consumeErrors=False) + + def on_complete(result): + log.info("Finished loading %d torrents in %s", len(state.torrents), str(datetime.datetime.now() - start)) + component.get("EventManager").emit(SessionStartedEvent()) + deferred_list.addCallback(on_complete) def create_state(self): """Create a state of all the torrents in TorrentManager. diff --git a/deluge/error.py b/deluge/error.py index ed758fecb..4acd09e4d 100644 --- a/deluge/error.py +++ b/deluge/error.py @@ -32,6 +32,10 @@ class InvalidTorrentError(DelugeError): pass +class AddTorrentError(DelugeError): + pass + + class InvalidPathError(DelugeError): pass diff --git a/deluge/tests/test_core.py b/deluge/tests/test_core.py index 76b9dd72c..444d1a332 100644 --- a/deluge/tests/test_core.py +++ b/deluge/tests/test_core.py @@ -3,7 +3,7 @@ import os from hashlib import sha1 as sha import pytest -from twisted.internet import reactor +from twisted.internet import defer, reactor from twisted.internet.error import CannotListenError from twisted.python.failure import Failure from twisted.web.http import FORBIDDEN @@ -16,7 +16,7 @@ import deluge.component as component import deluge.core.torrent from deluge.core.core import Core from deluge.core.rpcserver import RPCServer -from deluge.error import InvalidTorrentError +from deluge.error import AddTorrentError, InvalidTorrentError from deluge.ui.web.common import compress from . import common @@ -102,25 +102,55 @@ class CoreTestCase(BaseTestCase): return component.shutdown().addCallback(on_shutdown) + @defer.inlineCallbacks + def test_add_torrent_files(self): + options = {} + filenames = ["test.torrent", "test_torrent.file.torrent"] + files_to_add = [] + for f in filenames: + filename = os.path.join(os.path.dirname(__file__), f) + filedump = base64.encodestring(open(filename).read()) + files_to_add.append((filename, filedump, options)) + errors = yield self.core.add_torrent_files(files_to_add) + self.assertEquals(len(errors), 0) + + @defer.inlineCallbacks + def test_add_torrent_files_error_duplicate(self): + options = {} + filenames = ["test.torrent", "test.torrent"] + files_to_add = [] + for f in filenames: + filename = os.path.join(os.path.dirname(__file__), f) + filedump = base64.encodestring(open(filename).read()) + files_to_add.append((filename, filedump, options)) + errors = yield self.core.add_torrent_files(files_to_add) + self.assertEquals(len(errors), 1) + self.assertTrue(str(errors[0]).startswith("Torrent already in session")) + + @defer.inlineCallbacks def test_add_torrent_file(self): options = {} filename = os.path.join(os.path.dirname(__file__), "test.torrent") - torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) + torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) # Get the info hash from the test.torrent from deluge.bencode import bdecode, bencode info_hash = sha(bencode(bdecode(open(filename).read())["info"])).hexdigest() - self.assertEquals(torrent_id, info_hash) + def test_add_torrent_file_invalid_filedump(self): + options = {} + filename = os.path.join(os.path.dirname(__file__), "test.torrent") + self.assertRaises(AddTorrentError, self.core.add_torrent_file, filename, False, options) + + @defer.inlineCallbacks def test_add_torrent_url(self): url = "http://localhost:%d/ubuntu-9.04-desktop-i386.iso.torrent" % self.listen_port options = {} info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" - d = self.core.add_torrent_url(url, options) - d.addCallback(self.assertEquals, info_hash) - return d + torrent_id = yield self.core.add_torrent_url(url, options) + self.assertEquals(torrent_id, info_hash) def test_add_torrent_url_with_cookie(self): url = "http://localhost:%d/cookie" % self.listen_port @@ -143,7 +173,6 @@ class CoreTestCase(BaseTestCase): d = self.core.add_torrent_url(url, options) d.addCallback(self.assertEquals, info_hash) - return d def test_add_torrent_url_with_partial_download(self): @@ -153,21 +182,21 @@ class CoreTestCase(BaseTestCase): d = self.core.add_torrent_url(url, options) d.addCallback(self.assertEquals, info_hash) - return d - def test_add_magnet(self): + @defer.inlineCallbacks + def test_add_torrent_magnet(self): info_hash = "60d5d82328b4547511fdeac9bf4d0112daa0ce00" uri = deluge.common.create_magnet_uri(info_hash) options = {} - - torrent_id = self.core.add_torrent_magnet(uri, options) + torrent_id = yield self.core.add_torrent_magnet(uri, options) self.assertEquals(torrent_id, info_hash) + @defer.inlineCallbacks def test_remove_torrent(self): options = {} filename = os.path.join(os.path.dirname(__file__), "test.torrent") - torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) + torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) removed = self.core.remove_torrent(torrent_id, True) self.assertTrue(removed) self.assertEquals(len(self.core.get_session_state()), 0) @@ -182,12 +211,13 @@ class CoreTestCase(BaseTestCase): d.addCallback(test_true) return d + @defer.inlineCallbacks def test_remove_torrents(self): options = {} filename = os.path.join(os.path.dirname(__file__), "test.torrent") - torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) + torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) filename2 = os.path.join(os.path.dirname(__file__), "unicode_filenames.torrent") - torrent_id2 = self.core.add_torrent_file(filename2, base64.encodestring(open(filename2).read()), options) + torrent_id2 = yield self.core.add_torrent_file(filename2, base64.encodestring(open(filename2).read()), options) d = self.core.remove_torrents([torrent_id, torrent_id2], True) def test_ret(val): @@ -197,12 +227,13 @@ class CoreTestCase(BaseTestCase): def test_session_state(val): self.assertEquals(len(self.core.get_session_state()), 0) d.addCallback(test_session_state) - return d + yield d + @defer.inlineCallbacks def test_remove_torrents_invalid(self): options = {} filename = os.path.join(os.path.dirname(__file__), "test.torrent") - torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) + torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) d = self.core.remove_torrents(["invalidid1", "invalidid2", torrent_id], False) def test_ret(val): @@ -212,7 +243,7 @@ class CoreTestCase(BaseTestCase): self.assertTrue(val[1][0] == "invalidid2") self.assertTrue(isinstance(val[1][1], InvalidTorrentError)) d.addCallback(test_ret) - return d + yield d def test_get_session_status(self): status = self.core.get_session_status(["upload_rate", "download_rate"]) diff --git a/deluge/tests/test_torrent.py b/deluge/tests/test_torrent.py index ce9e2afe5..61877978f 100644 --- a/deluge/tests/test_torrent.py +++ b/deluge/tests/test_torrent.py @@ -5,7 +5,7 @@ import os import sys import time -from twisted.internet import reactor +from twisted.internet import defer, reactor from twisted.internet.task import deferLater from twisted.trial import unittest @@ -144,10 +144,11 @@ class TorrentTestCase(unittest.TestCase): # self.print_priority_list(priorities) + @defer.inlineCallbacks def test_torrent_error_data_missing(self): options = {"seed_mode": True} filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent") - torrent_id = core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) + torrent_id = yield core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) torrent = core.torrentmanager.torrents[torrent_id] self.assert_state(torrent, "Seeding") @@ -157,10 +158,11 @@ class TorrentTestCase(unittest.TestCase): time.sleep(0.2) # Delay to wait for alert from lt self.assert_state(torrent, "Error") + @defer.inlineCallbacks def test_torrent_error_resume_original_state(self): options = {"seed_mode": True, "add_paused": True} filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent") - torrent_id = core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) + torrent_id = yield core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options) torrent = core.torrentmanager.torrents[torrent_id] orig_state = "Paused" @@ -173,8 +175,10 @@ class TorrentTestCase(unittest.TestCase): # Clear error and verify returned to original state torrent.force_recheck() - return deferLater(reactor, 0.1, self.assert_state, torrent, orig_state) + yield deferLater(reactor, 0.1, self.assert_state, torrent, orig_state) + return + @defer.inlineCallbacks def test_torrent_error_resume_data_unaltered(self): resume_data = {'active_time': 13399, 'num_incomplete': 16777215, 'announce_to_lsd': 1, 'seed_mode': 0, 'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', 'paused': 0, @@ -202,8 +206,8 @@ class TorrentTestCase(unittest.TestCase): filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent") filedump = open(filename).read() - torrent_id = core.torrentmanager.add(state=torrent_state, filedump=filedump, - resume_data=lt.bencode(resume_data)) + torrent_id = yield core.torrentmanager.add(state=torrent_state, filedump=filedump, + resume_data=lt.bencode(resume_data)) torrent = core.torrentmanager.torrents[torrent_id] def assert_resume_data(): @@ -211,4 +215,5 @@ class TorrentTestCase(unittest.TestCase): tm_resume_data = lt.bdecode(core.torrentmanager.resume_data[torrent.torrent_id]) self.assertEquals(tm_resume_data, resume_data) - return deferLater(reactor, 0.5, assert_resume_data) + yield deferLater(reactor, 0.5, assert_resume_data) + return diff --git a/deluge/tests/test_torrentmanager.py b/deluge/tests/test_torrentmanager.py index b03c99f89..76e3163a8 100644 --- a/deluge/tests/test_torrentmanager.py +++ b/deluge/tests/test_torrentmanager.py @@ -2,6 +2,7 @@ import base64 import os import warnings +from twisted.internet import defer from twisted.trial import unittest from deluge import component @@ -32,9 +33,10 @@ class TorrentmanagerTestCase(unittest.TestCase): del self.torrentManager return component.shutdown().addCallback(on_shutdown) + @defer.inlineCallbacks def test_remove_torrent(self): filename = os.path.join(os.path.dirname(__file__), "test.torrent") - torrent_id = self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), {}) + torrent_id = yield self.core.add_torrent_file(filename, base64.encodestring(open(filename).read()), {}) self.assertTrue(self.torrentManager.remove(torrent_id, False)) def test_remove_torrent_false(self): diff --git a/deluge/ui/gtkui/addtorrentdialog.py b/deluge/ui/gtkui/addtorrentdialog.py index 177e359e8..74b22c52e 100644 --- a/deluge/ui/gtkui/addtorrentdialog.py +++ b/deluge/ui/gtkui/addtorrentdialog.py @@ -738,8 +738,13 @@ class AddTorrentDialog(component.Component): options)) row = self.torrent_liststore.iter_next(row) - def on_torrents_added(torrent_ids): - log.info("Added %d torrents", len(torrent_ids)) + def on_torrents_added(errors): + if errors: + log.info("Failed to add %d out of %d torrents.", len(errors), len(torrents_to_add)) + for e in errors: + log.info("Torrent add failed: %s", e) + else: + log.info("Successfully added %d torrents.", len(torrents_to_add)) client.core.add_torrent_files(torrents_to_add).addCallback(on_torrents_added) def _on_button_apply_clicked(self, widget):