[#1032] Error out torrent if data is missing on startup

This commit is contained in:
Calum Lind 2014-08-12 11:16:01 +01:00
commit 4ae43c5f2a
4 changed files with 256 additions and 66 deletions

View file

@ -169,6 +169,13 @@ class TorrentOptions(dict):
self["seed_mode"] = False self["seed_mode"] = False
class TorrentError(object):
def __init__(self, error_message, was_paused=False, restart_to_resume=False):
self.error_message = error_message
self.was_paused = was_paused
self.restart_to_resume = restart_to_resume
class Torrent(object): class Torrent(object):
"""Torrent holds information about torrents added to the libtorrent session. """Torrent holds information about torrents added to the libtorrent session.
@ -196,7 +203,7 @@ class Torrent(object):
options (dict): The torrent options. options (dict): The torrent options.
filename (str): The filename of the torrent file in case it is required. filename (str): The filename of the torrent file in case it is required.
is_finished (bool): Keep track if torrent is finished to prevent some weird things on state load. is_finished (bool): Keep track if torrent is finished to prevent some weird things on state load.
statusmsg (str): Status message holds error info about the torrent statusmsg (str): Status message holds error/extra info about the torrent.
state (str): The torrent's state state (str): The torrent's state
trackers (list of dict): The torrent's trackers trackers (list of dict): The torrent's trackers
tracker_status (str): Status message of currently connected tracker tracker_status (str): Status message of currently connected tracker
@ -204,6 +211,7 @@ class Torrent(object):
forcing_recheck (bool): Keep track if we're forcing a recheck of the torrent forcing_recheck (bool): Keep track if we're forcing a recheck of the torrent
forcing_recheck_paused (bool): Keep track if we're forcing a recheck of the torrent so that forcing_recheck_paused (bool): Keep track if we're forcing a recheck of the torrent so that
we can re-pause it after its done if necessary we can re-pause it after its done if necessary
forced_error (TorrentError): Keep track if we have forced this torrent to be in Error state.
""" """
def __init__(self, handle, options, state=None, filename=None, magnet=None): def __init__(self, handle, options, state=None, filename=None, magnet=None):
self.torrent_id = str(handle.info_hash()) self.torrent_id = str(handle.info_hash())
@ -235,18 +243,13 @@ class Torrent(object):
self.set_trackers(state.trackers) self.set_trackers(state.trackers)
self.is_finished = state.is_finished self.is_finished = state.is_finished
self.filename = state.filename self.filename = state.filename
last_sess_prepend = "[Error from Previous Session] "
if state.error_statusmsg and not state.error_statusmsg.startswith(last_sess_prepend):
self.error_statusmsg = last_sess_prepend + state.error_statusmsg
else:
self.error_statusmsg = state.error_statusmsg
else: else:
self.set_trackers() self.set_trackers()
self.is_finished = False self.is_finished = False
self.filename = filename self.filename = filename
self.error_statusmsg = None
self.statusmsg = "OK" self.forced_error = None
self.statusmsg = None
self.state = None self.state = None
self.moving_storage = False self.moving_storage = False
self.moving_storage_dest_path = None self.moving_storage_dest_path = None
@ -610,18 +613,15 @@ class Torrent(object):
status = self.handle.status() status = self.handle.status()
session_paused = component.get("Core").session.is_paused() session_paused = component.get("Core").session.is_paused()
old_state = self.state old_state = self.state
if status.error or self.error_statusmsg:
if self.forced_error:
self.state = "Error" self.state = "Error"
# This will be reverted upon resuming. self.set_status_message(self.forced_error.error_message)
elif status.error:
self.state = "Error"
# auto-manage status will be reverted upon resuming.
self.handle.auto_managed(False) self.handle.auto_managed(False)
if not status.paused: self.set_status_message(decode_string(status.error))
self.handle.pause()
if status.error:
self.set_error_statusmsg(decode_string(status.error))
log.debug("Error state from lt: %s", self.error_statusmsg)
else:
log.debug("Error state forced by Deluge, error_statusmsg: %s", self.error_statusmsg)
self.set_status_message(self.error_statusmsg)
elif self.moving_storage: elif self.moving_storage:
self.state = "Moving" self.state = "Moving"
elif not session_paused and status.paused and status.auto_managed: elif not session_paused and status.paused and status.auto_managed:
@ -636,29 +636,52 @@ class Torrent(object):
if log.isEnabledFor(logging.DEBUG): if log.isEnabledFor(logging.DEBUG):
log.debug("State from lt was: %s | Session is paused: %s\nTorrent state set from '%s' to '%s' (%s)", log.debug("State from lt was: %s | Session is paused: %s\nTorrent state set from '%s' to '%s' (%s)",
status.state, session_paused, old_state, self.state, self.torrent_id) "error" if status.error else status.state, session_paused, old_state, self.state, self.torrent_id)
if self.forced_error:
log.debug("Torrent Error state message: %s", self.forced_error.error_message)
def set_status_message(self, message): def set_status_message(self, message=None):
"""Sets the torrent status message. """Sets the torrent status message.
Calling method without a message will reset the message to 'OK'.
Args: Args:
message (str): The status message. message (str, optional): The status message.
""" """
if not message:
message = "OK"
self.statusmsg = message self.statusmsg = message
def set_error_statusmsg(self, message): def force_error_state(self, message, restart_to_resume=True):
"""Sets the torrent error status message. """Forces the torrent into an error state.
Note: For setting an error state not covered by libtorrent.
This will force a torrent into an error state. It is used for
setting those errors that are not covered by libtorrent.
Args: Args:
message (str): The error status message. message (str): The error status message.
restart_to_resume (bool, optional): Prevent resuming clearing the error, only restarting
session can resume.
""" """
self.error_statusmsg = message status = self.handle.status()
self.handle.auto_managed(False)
self.forced_error = TorrentError(message, status.paused, restart_to_resume)
if not status.paused:
self.handle.pause()
self.update_state()
def clear_forced_error_state(self):
if not self.forced_error:
return
if self.forced_error.restart_to_resume:
log.error("Restart deluge to clear this torrent error")
if not self.force_error.was_paused and self.options["auto_managed"]:
self.handle.auto_managed(True)
self.force_error = None
self.set_status_message()
self.update_state()
def get_eta(self): def get_eta(self):
"""Get the ETA for this torrent. """Get the ETA for this torrent.
@ -1018,7 +1041,9 @@ class Torrent(object):
""" """
# Turn off auto-management so the torrent will not be unpaused by lt queueing # Turn off auto-management so the torrent will not be unpaused by lt queueing
self.handle.auto_managed(False) self.handle.auto_managed(False)
if self.status.paused: if self.state == "Error":
return False
elif self.status.paused:
# This torrent was probably paused due to being auto managed by lt # This torrent was probably paused due to being auto managed by lt
# Since we turned auto_managed off, we should update the state which should # Since we turned auto_managed off, we should update the state which should
# show it as 'Paused'. We need to emit a torrent_paused signal because # show it as 'Paused'. We need to emit a torrent_paused signal because
@ -1036,29 +1061,27 @@ class Torrent(object):
def resume(self): def resume(self):
"""Resumes this torrent.""" """Resumes this torrent."""
if self.status.paused and self.status.auto_managed: if self.status.paused and self.status.auto_managed:
log.debug("Torrent is being auto-managed, cannot resume!") log.debug("Resume not possible for auto-managed torrent!")
return elif self.forced_error and self.forced_error.was_paused:
log.debug("Resume skipped for error'd torrent as it was originally paused.")
# Reset the status message just in case of resuming an Error'd torrent elif (self.status.is_finished and self.options["stop_at_ratio"] and
self.set_status_message("OK") self.get_ratio() >= self.options["stop_ratio"]):
self.set_error_statusmsg(None) log.debug("Resume skipped for torrent as it has reached 'stop_seed_ratio'.")
else:
if self.status.is_finished: # Check if torrent was originally being auto-managed.
# If the torrent has already reached it's 'stop_seed_ratio' then do not do anything
if self.options["stop_at_ratio"]:
if self.get_ratio() >= self.options["stop_ratio"]:
# XXX: This should just be returned in the RPC Response, no event
return
if self.options["auto_managed"]: if self.options["auto_managed"]:
# This torrent is to be auto-managed by lt queueing
self.handle.auto_managed(True) self.handle.auto_managed(True)
try: try:
self.handle.resume() self.handle.resume()
except RuntimeError as ex: except RuntimeError as ex:
log.debug("Unable to resume torrent: %s", ex) log.debug("Unable to resume torrent: %s", ex)
# Clear torrent error state.
if self.forced_error and not self.forced_error.restart_to_resume:
self.clear_forced_error_state()
elif self.state == "Error" and not self.forced_error:
self.handle.clear_error()
def connect_peer(self, peer_ip, peer_port): def connect_peer(self, peer_ip, peer_port):
"""Manually add a peer to the torrent """Manually add a peer to the torrent
@ -1123,7 +1146,14 @@ class Torrent(object):
None: The response with resume data is returned in a libtorrent save_resume_data_alert. None: The response with resume data is returned in a libtorrent save_resume_data_alert.
""" """
if log.isEnabledFor(logging.DEBUG):
log.debug("Requesting save_resume_data for torrent: %s", self.torrent_id)
flags = lt.save_resume_flags_t.flush_disk_cache if flush_disk_cache else 0 flags = lt.save_resume_flags_t.flush_disk_cache if flush_disk_cache else 0
# Don't generate fastresume data if torrent is in a Deluge Error state.
if self.forced_error:
component.get("TorrentManager").waiting_on_resume_data[self.torrent_id].errback(
UserWarning("Skipped creating resume_data while in Error state"))
else:
self.handle.save_resume_data(flags) self.handle.save_resume_data(flags)
def write_torrentfile(self, filedump=None): def write_torrentfile(self, filedump=None):
@ -1135,6 +1165,7 @@ class Torrent(object):
""" """
def write_file(filepath, filedump): def write_file(filepath, filedump):
"""Write out the torrent file"""
log.debug("Writing torrent file to: %s", filepath) log.debug("Writing torrent file to: %s", filepath)
try: try:
with open(filepath, "wb") as save_file: with open(filepath, "wb") as save_file:
@ -1195,16 +1226,20 @@ class Torrent(object):
def force_recheck(self): def force_recheck(self):
"""Forces a recheck of the torrent's pieces""" """Forces a recheck of the torrent's pieces"""
paused = self.status.paused if self.forced_error:
self.forcing_recheck_paused = self.forced_error.was_paused
self.clear_forced_error_state(update_state=False)
else:
self.forcing_recheck_paused = self.status.paused
try: try:
self.handle.force_recheck() self.handle.force_recheck()
self.handle.resume() self.handle.resume()
self.forcing_recheck = True
except RuntimeError as ex: except RuntimeError as ex:
log.debug("Unable to force recheck: %s", ex) log.debug("Unable to force recheck: %s", ex)
return False self.forcing_recheck = False
self.forcing_recheck = True return self.forcing_recheck
self.forcing_recheck_paused = paused
return True
def rename_files(self, filenames): def rename_files(self, filenames):
"""Renames files in the torrent. """Renames files in the torrent.

View file

@ -52,7 +52,6 @@ class TorrentState:
queue=None, queue=None,
auto_managed=True, auto_managed=True,
is_finished=False, is_finished=False,
error_statusmsg=None,
stop_ratio=2.00, stop_ratio=2.00,
stop_at_ratio=False, stop_at_ratio=False,
remove_at_ratio=False, remove_at_ratio=False,
@ -406,6 +405,10 @@ class TorrentManager(component.Component):
component.resume("AlertManager") component.resume("AlertManager")
# Store the orignal resume_data, in case of errors.
if resume_data:
self.resume_data[torrent.torrent_id] = resume_data
# Add to queued torrents set. # Add to queued torrents set.
self.queued_torrents.add(torrent.torrent_id) self.queued_torrents.add(torrent.torrent_id)
if self.config["queue_new_to_top"]: if self.config["queue_new_to_top"]:
@ -604,7 +607,9 @@ class TorrentManager(component.Component):
for torrent in self.torrents.values(): for torrent in self.torrents.values():
if self.session.is_paused(): if self.session.is_paused():
paused = torrent.handle.is_paused() paused = torrent.handle.is_paused()
elif torrent.state in ["Paused", "Error"]: elif torrent.forced_error:
paused = torrent.forced_error.was_paused
elif torrent.state == "Paused":
paused = True paused = True
else: else:
paused = False paused = False
@ -626,7 +631,6 @@ class TorrentManager(component.Component):
torrent.get_queue_position(), torrent.get_queue_position(),
torrent.options["auto_managed"], torrent.options["auto_managed"],
torrent.is_finished, torrent.is_finished,
torrent.error_statusmsg,
torrent.options["stop_ratio"], torrent.options["stop_ratio"],
torrent.options["stop_at_ratio"], torrent.options["stop_at_ratio"],
torrent.options["remove_at_ratio"], torrent.options["remove_at_ratio"],
@ -1022,10 +1026,8 @@ class TorrentManager(component.Component):
return return
# Set an Error message and pause the torrent # Set an Error message and pause the torrent
alert_msg = decode_string(alert.message()).split(':', 1)[1].strip() alert_msg = decode_string(alert.message()).split(':', 1)[1].strip()
torrent.set_error_statusmsg("Failed to move download folder: %s" % alert_msg)
torrent.moving_storage = False torrent.moving_storage = False
torrent.pause() torrent.force_error_state("Failed to move download folder: %s" % alert_msg)
torrent.update_state()
if torrent_id in self.waiting_on_finish_moving: if torrent_id in self.waiting_on_finish_moving:
self.waiting_on_finish_moving.remove(torrent_id) self.waiting_on_finish_moving.remove(torrent_id)
@ -1069,7 +1071,6 @@ class TorrentManager(component.Component):
torrent_id = str(alert.handle.info_hash()) torrent_id = str(alert.handle.info_hash())
except RuntimeError: except RuntimeError:
return return
if torrent_id in self.torrents: if torrent_id in self.torrents:
# libtorrent add_torrent expects bencoded resume_data. # libtorrent add_torrent expects bencoded resume_data.
self.resume_data[torrent_id] = lt.bencode(alert.resume_data) self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
@ -1090,7 +1091,8 @@ class TorrentManager(component.Component):
def on_alert_fastresume_rejected(self, alert): def on_alert_fastresume_rejected(self, alert):
"""Alert handler for libtorrent fastresume_rejected_alert""" """Alert handler for libtorrent fastresume_rejected_alert"""
log.warning("on_alert_fastresume_rejected: %s", decode_string(alert.message())) alert_msg = decode_string(alert.message())
log.error("on_alert_fastresume_rejected: %s", alert_msg)
try: try:
torrent_id = str(alert.handle.info_hash()) torrent_id = str(alert.handle.info_hash())
torrent = self.torrents[torrent_id] torrent = self.torrents[torrent_id]
@ -1103,8 +1105,8 @@ class TorrentManager(component.Component):
else: else:
error_msg = "Missing or invalid torrent data!" error_msg = "Missing or invalid torrent data!"
else: else:
error_msg = "Problem with resume data: %s" % decode_string(alert.message()).split(':', 1)[1].strip() error_msg = "Problem with resume data: %s" % alert_msg.split(":", 1)[1].strip()
torrent.force_error_state(error_msg) torrent.force_error_state(error_msg, restart_to_resume=True)
def on_alert_file_renamed(self, alert): def on_alert_file_renamed(self, alert):
"""Alert handler for libtorrent file_renamed_alert """Alert handler for libtorrent file_renamed_alert

View file

@ -0,0 +1,2 @@
d7:comment17:Test torrent file10:created by11:Deluge Team13:creation datei1411826665e8:encoding5:UTF-84:infod6:lengthi512000e4:name17:test_torrent.file12:piece lengthi32768e6:pieces320:²î$2ÜøTj šðäèû>hžU.±ð´“ÿ--~œÂÞ”Bzu<7A>Œ¼žEBÞë1Ù¬ˆƒ@óªÍ¥ƒ/Kž²Î"z÷FÃ0‰Ö£Ñ[asV<73>1²B^Wp-éSÓÂ×¶æ©æF`¸M9Ð)}©ö,ƒ˜<7F>4nîiW ¡“˜QåI•ØçËÌ—(ï,œtØcã¡hŸi†Þ*MÐë}^çWSÕñô7´ú h:-<2D>­ãeÍTXa3åÒm|Ϫ<>áJÀ"¨‡˜]Û0$°<>}¬ÒÑëŸöøêÞl¾ç@Lä žõVöÝ,ƒ4ÉË«½ÅëzMLÄ£JÝ*š
\´APõ&I 9Œ¹}ö20ØÖŽHë¯Ó:Ù_±ƒœ8Õ<V¹2JYb2©'Ý2hÊÂ0\¦ú*_ûj7:privatei0eee

View file

@ -0,0 +1,151 @@
from __future__ import print_function
import base64
import os
import sys
import time
from twisted.internet import reactor
from twisted.internet.task import deferLater
from twisted.trial import unittest
import deluge.component as component
import deluge.core.torrent
import deluge.tests.common as common
from deluge._libtorrent import lt
from deluge.core.core import Core
from deluge.core.rpcserver import RPCServer
from deluge.core.torrentmanager import TorrentState
config_setup = False
core = None
rpcserver = None
eventmanager = None
# This is called by torrent.py when calling component.get("...")
def get(key):
if key is "Core":
return core
elif key is "RPCServer":
return rpcserver
elif key is "EventManager":
return core.eventmanager
elif key is "TorrentManager":
return core.torrentmanager
else:
return None
class TorrentTestCase(unittest.TestCase):
def setup_config(self):
global config_setup
config_setup = True
config_dir = common.set_tmp_config_dir()
core_config = deluge.config.Config("core.conf", defaults=deluge.core.preferencesmanager.DEFAULT_PREFS,
config_dir=config_dir)
core_config.save()
def setUp(self): # NOQA
# Save component and set back on teardown
self.original_component = deluge.core.torrent.component
deluge.core.torrent.component = sys.modules[__name__]
self.setup_config()
global rpcserver
global core
rpcserver = RPCServer(listen=False)
core = Core()
return component.start()
def tearDown(self): # NOQA
deluge.core.torrent.component = self.original_component
def on_shutdown(result):
component._ComponentRegistry.components = {}
return component.shutdown().addCallback(on_shutdown)
def assert_state(self, torrent, state):
torrent.update_state()
self.assertEquals(torrent.state, state)
def get_torrent_atp(self, filename):
filename = os.path.join(os.path.dirname(__file__), filename)
e = lt.bdecode(open(filename, 'rb').read())
info = lt.torrent_info(e)
atp = {"ti": info}
atp["save_path"] = os.getcwd()
atp["storage_mode"] = lt.storage_mode_t.storage_mode_sparse
atp["add_paused"] = False
atp["auto_managed"] = True
atp["duplicate_is_error"] = True
return atp
def test_torrent_error_data_missing(self):
options = {"seed_mode": True}
filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent")
torrent_id = core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent = core.torrentmanager.torrents[torrent_id]
self.assert_state(torrent, "Seeding")
# Force an error by reading (non-existant) piece from disk
torrent.handle.read_piece(0)
time.sleep(0.2) # Delay to wait for alert from lt
self.assert_state(torrent, "Error")
def test_torrent_error_resume_original_state(self):
options = {"seed_mode": True, "add_paused": True}
filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent")
torrent_id = core.add_torrent_file(filename, base64.encodestring(open(filename).read()), options)
torrent = core.torrentmanager.torrents[torrent_id]
orig_state = "Paused"
self.assert_state(torrent, orig_state)
# Force an error by reading (non-existant) piece from disk
torrent.handle.read_piece(0)
time.sleep(0.2) # Delay to wait for alert from lt
self.assert_state(torrent, "Error")
# Clear error and verify returned to original state
torrent.force_recheck()
return deferLater(reactor, 0.1, self.assert_state, torrent, orig_state)
def test_torrent_error_resume_data_unaltered(self):
resume_data = {'active_time': 13399L, 'num_incomplete': 16777215L, 'announce_to_lsd': 1L, 'seed_mode': 0L,
'pieces': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', 'paused': 0L,
'seeding_time': 13399L, 'last_scrape': 13399L,
'info-hash': '-\xc5\xd0\xe7\x1af\xfeid\x9ad\r9\xcb\x00\xa2YpIs', 'max_uploads': 16777215L,
'max_connections': 16777215L, 'num_downloaders': 16777215L, 'total_downloaded': 0L,
'file-format': 'libtorrent resume file', 'peers6': '', 'added_time': 1411826665L,
'banned_peers6': '', 'file_priority': [1L], 'last_seen_complete': 0L, 'total_uploaded': 0L,
'piece_priority': '\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01',
'file-version': 1L, 'announce_to_dht': 1L, 'auto_managed': 1L, 'upload_rate_limit': 0L,
'completed_time': 1411826665L, 'allocation': 'sparse', 'blocks per piece': 2L,
'download_rate_limit': 0L, 'libtorrent-version': '0.16.17.0', 'banned_peers': '',
'num_seeds': 16777215L, 'sequential_download': 0L, 'announce_to_trackers': 1L,
'peers': '\n\x00\x02\x0f=\xc6SC\x17]\xd8}\x7f\x00\x00\x01=\xc6', 'finished_time': 13399L,
'last_upload': 13399L, 'trackers': [[]], 'super_seeding': 0L,
'file sizes': [[512000L, 1411826586L]], 'last_download': 13399L}
torrent_state = TorrentState(
torrent_id='2dc5d0e71a66fe69649a640d39cb00a259704973',
filename='test_torrent.file.torrent',
name='',
save_path='/home/ubuntu/Downloads',
file_priorities=[1],
is_finished=True,
)
filename = os.path.join(os.path.dirname(__file__), "test_torrent.file.torrent")
filedump = open(filename).read()
torrent_id = core.torrentmanager.add(state=torrent_state, filedump=filedump,
resume_data=lt.bencode(resume_data))
torrent = core.torrentmanager.torrents[torrent_id]
def assert_resume_data():
self.assert_state(torrent, "Error")
tm_resume_data = lt.bdecode(core.torrentmanager.resume_data[torrent.torrent_id])
self.assertEquals(tm_resume_data, resume_data)
return deferLater(reactor, 0.5, assert_resume_data)