mirror of
https://git.deluge-torrent.org/deluge
synced 2025-08-02 22:48:40 +00:00
1035 lines
40 KiB
Python
1035 lines
40 KiB
Python
#
|
|
# torrentmanager.py
|
|
#
|
|
# Copyright (C) 2007, 2008 Andrew Resch <andrewresch@gmail.com>
|
|
#
|
|
# Deluge is free software.
|
|
#
|
|
# You may redistribute it and/or modify it under the terms of the
|
|
# GNU General Public License, as published by the Free Software
|
|
# Foundation; either version 3 of the License, or (at your option)
|
|
# any later version.
|
|
#
|
|
# deluge is distributed in the hope that it will be useful,
|
|
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
|
|
# See the GNU General Public License for more details.
|
|
#
|
|
# You should have received a copy of the GNU General Public License
|
|
# along with deluge. If not, write to:
|
|
# The Free Software Foundation, Inc.,
|
|
# 51 Franklin Street, Fifth Floor
|
|
# Boston, MA 02110-1301, USA.
|
|
#
|
|
# In addition, as a special exception, the copyright holders give
|
|
# permission to link the code of portions of this program with the OpenSSL
|
|
# library.
|
|
# You must obey the GNU General Public License in all respects for all of
|
|
# the code used other than OpenSSL. If you modify file(s) with this
|
|
# exception, you may extend this exception to your version of the file(s),
|
|
# but you are not obligated to do so. If you do not wish to do so, delete
|
|
# this exception statement from your version. If you delete this exception
|
|
# statement from all source files in the program, then also delete it here.
|
|
#
|
|
#
|
|
|
|
|
|
"""TorrentManager handles Torrent objects"""
|
|
|
|
import cPickle
|
|
import os
|
|
import time
|
|
import shutil
|
|
import operator
|
|
|
|
from twisted.internet import reactor
|
|
from twisted.internet.task import LoopingCall
|
|
|
|
from deluge._libtorrent import lt
|
|
|
|
from deluge.event import *
|
|
from deluge.error import *
|
|
import deluge.component as component
|
|
from deluge.configmanager import ConfigManager, get_config_dir
|
|
from deluge.core.torrent import Torrent
|
|
from deluge.core.torrent import TorrentOptions
|
|
import deluge.core.oldstateupgrader
|
|
from deluge.common import utf8_encoded
|
|
|
|
from deluge.log import LOG as log
|
|
|
|
class TorrentState:
|
|
def __init__(self,
|
|
torrent_id=None,
|
|
filename=None,
|
|
total_uploaded=0,
|
|
trackers=None,
|
|
compact=False,
|
|
paused=False,
|
|
save_path=None,
|
|
max_connections=-1,
|
|
max_upload_slots=-1,
|
|
max_upload_speed=-1.0,
|
|
max_download_speed=-1.0,
|
|
prioritize_first_last=False,
|
|
file_priorities=None,
|
|
queue=None,
|
|
auto_managed=True,
|
|
is_finished=False,
|
|
stop_ratio=2.00,
|
|
stop_at_ratio=False,
|
|
remove_at_ratio=False,
|
|
move_completed=False,
|
|
move_completed_path=None,
|
|
magnet=None,
|
|
time_added=-1,
|
|
owner="",
|
|
public=False
|
|
):
|
|
self.torrent_id = torrent_id
|
|
self.filename = filename
|
|
self.total_uploaded = total_uploaded
|
|
self.trackers = trackers
|
|
self.queue = queue
|
|
self.is_finished = is_finished
|
|
self.magnet = magnet
|
|
self.time_added = time_added
|
|
self.owner = owner
|
|
|
|
# Options
|
|
self.compact = compact
|
|
self.paused = paused
|
|
self.save_path = save_path
|
|
self.max_connections = max_connections
|
|
self.max_upload_slots = max_upload_slots
|
|
self.max_upload_speed = max_upload_speed
|
|
self.max_download_speed = max_download_speed
|
|
self.prioritize_first_last = prioritize_first_last
|
|
self.file_priorities = file_priorities
|
|
self.auto_managed = auto_managed
|
|
self.stop_ratio = stop_ratio
|
|
self.stop_at_ratio = stop_at_ratio
|
|
self.remove_at_ratio = remove_at_ratio
|
|
self.move_completed = move_completed
|
|
self.move_completed_path = move_completed_path
|
|
self.public = public
|
|
|
|
class TorrentManagerState:
|
|
def __init__(self):
|
|
self.torrents = []
|
|
|
|
class TorrentManager(component.Component):
|
|
"""
|
|
TorrentManager contains a list of torrents in the current libtorrent
|
|
session. This object is also responsible for saving the state of the
|
|
session for use on restart.
|
|
"""
|
|
|
|
def __init__(self):
|
|
component.Component.__init__(self, "TorrentManager", interval=5, depend=["CorePluginManager"])
|
|
log.debug("TorrentManager init..")
|
|
# Set the libtorrent session
|
|
self.session = component.get("Core").session
|
|
# Set the alertmanager
|
|
self.alerts = component.get("AlertManager")
|
|
# Get the core config
|
|
self.config = ConfigManager("core.conf")
|
|
|
|
# Make sure the state folder has been created
|
|
if not os.path.exists(os.path.join(get_config_dir(), "state")):
|
|
os.makedirs(os.path.join(get_config_dir(), "state"))
|
|
|
|
# Create the torrents dict { torrent_id: Torrent }
|
|
self.torrents = {}
|
|
|
|
# This is a list of torrent_id when we shutdown the torrentmanager.
|
|
# We use this list to determine if all active torrents have been paused
|
|
# and that their resume data has been written.
|
|
self.shutdown_torrent_pause_list = []
|
|
|
|
# self.num_resume_data used to save resume_data in bulk
|
|
self.num_resume_data = 0
|
|
|
|
# Keeps track of resume data that needs to be saved to disk
|
|
self.resume_data = {}
|
|
|
|
# Register set functions
|
|
self.config.register_set_function("max_connections_per_torrent",
|
|
self.on_set_max_connections_per_torrent)
|
|
self.config.register_set_function("max_upload_slots_per_torrent",
|
|
self.on_set_max_upload_slots_per_torrent)
|
|
self.config.register_set_function("max_upload_speed_per_torrent",
|
|
self.on_set_max_upload_speed_per_torrent)
|
|
self.config.register_set_function("max_download_speed_per_torrent",
|
|
self.on_set_max_download_speed_per_torrent)
|
|
|
|
# Register alert functions
|
|
self.alerts.register_handler("torrent_finished_alert",
|
|
self.on_alert_torrent_finished)
|
|
self.alerts.register_handler("torrent_paused_alert",
|
|
self.on_alert_torrent_paused)
|
|
self.alerts.register_handler("torrent_checked_alert",
|
|
self.on_alert_torrent_checked)
|
|
self.alerts.register_handler("tracker_reply_alert",
|
|
self.on_alert_tracker_reply)
|
|
self.alerts.register_handler("tracker_announce_alert",
|
|
self.on_alert_tracker_announce)
|
|
self.alerts.register_handler("tracker_warning_alert",
|
|
self.on_alert_tracker_warning)
|
|
self.alerts.register_handler("tracker_error_alert",
|
|
self.on_alert_tracker_error)
|
|
self.alerts.register_handler("storage_moved_alert",
|
|
self.on_alert_storage_moved)
|
|
self.alerts.register_handler("torrent_resumed_alert",
|
|
self.on_alert_torrent_resumed)
|
|
self.alerts.register_handler("state_changed_alert",
|
|
self.on_alert_state_changed)
|
|
self.alerts.register_handler("save_resume_data_alert",
|
|
self.on_alert_save_resume_data)
|
|
self.alerts.register_handler("save_resume_data_failed_alert",
|
|
self.on_alert_save_resume_data_failed)
|
|
self.alerts.register_handler("file_renamed_alert",
|
|
self.on_alert_file_renamed)
|
|
self.alerts.register_handler("metadata_received_alert",
|
|
self.on_alert_metadata_received)
|
|
self.alerts.register_handler("file_error_alert",
|
|
self.on_alert_file_error)
|
|
self.alerts.register_handler("file_completed_alert",
|
|
self.on_alert_file_completed)
|
|
|
|
def start(self):
|
|
# Get the pluginmanager reference
|
|
self.plugins = component.get("CorePluginManager")
|
|
|
|
# Run the old state upgrader before loading state
|
|
deluge.core.oldstateupgrader.OldStateUpgrader()
|
|
|
|
# Try to load the state from file
|
|
self.load_state()
|
|
|
|
# Save the state every 5 minutes
|
|
self.save_state_timer = LoopingCall(self.save_state)
|
|
self.save_state_timer.start(200, False)
|
|
self.save_resume_data_timer = LoopingCall(self.save_resume_data)
|
|
self.save_resume_data_timer.start(190)
|
|
|
|
def stop(self):
|
|
# Stop timers
|
|
if self.save_state_timer.running:
|
|
self.save_state_timer.stop()
|
|
|
|
if self.save_resume_data_timer.running:
|
|
self.save_resume_data_timer.stop()
|
|
|
|
# Save state on shutdown
|
|
self.save_state()
|
|
|
|
# Make another list just to make sure all paused torrents will be
|
|
# passed to self.save_resume_data(). With
|
|
# self.shutdown_torrent_pause_list it is possible to have a case when
|
|
# torrent_id is removed from it in self.on_alert_torrent_paused()
|
|
# before we call self.save_resume_data() here.
|
|
save_resume_data_list = []
|
|
for key in self.torrents:
|
|
# Stop the status cleanup LoopingCall here
|
|
self.torrents[key].prev_status_cleanup_loop.stop()
|
|
if not self.torrents[key].handle.is_paused():
|
|
# We set auto_managed false to prevent lt from resuming the torrent
|
|
self.torrents[key].handle.auto_managed(False)
|
|
self.torrents[key].handle.pause()
|
|
self.shutdown_torrent_pause_list.append(key)
|
|
save_resume_data_list.append(key)
|
|
|
|
self.save_resume_data(save_resume_data_list)
|
|
|
|
# We have to wait for all torrents to pause and write their resume data
|
|
wait = True
|
|
while wait:
|
|
if self.shutdown_torrent_pause_list:
|
|
wait = True
|
|
else:
|
|
wait = False
|
|
for torrent in self.torrents.values():
|
|
if torrent.waiting_on_resume_data:
|
|
wait = True
|
|
break
|
|
|
|
time.sleep(0.01)
|
|
# Wait for all alerts
|
|
self.alerts.handle_alerts(True)
|
|
|
|
def update(self):
|
|
for torrent_id, torrent in self.torrents.items():
|
|
if torrent.options["stop_at_ratio"] and torrent.state not in ("Checking", "Allocating", "Paused", "Queued"):
|
|
# If the global setting is set, but the per-torrent isn't.. Just skip to the next torrent
|
|
# This is so that a user can turn-off the stop at ratio option on a per-torrent basis
|
|
if not torrent.options["stop_at_ratio"]:
|
|
continue
|
|
if torrent.get_ratio() >= torrent.options["stop_ratio"] and torrent.is_finished:
|
|
if torrent.options["remove_at_ratio"]:
|
|
self.remove(torrent_id)
|
|
break
|
|
if not torrent.handle.is_paused():
|
|
torrent.pause()
|
|
|
|
def __getitem__(self, torrent_id):
|
|
"""Return the Torrent with torrent_id"""
|
|
return self.torrents[torrent_id]
|
|
|
|
def get_torrent_list(self):
|
|
"""Returns a list of torrent_ids"""
|
|
return self.torrents.keys()
|
|
|
|
def get_torrent_info_from_file(self, filepath):
|
|
"""Returns a torrent_info for the file specified or None"""
|
|
torrent_info = None
|
|
# Get the torrent data from the torrent file
|
|
try:
|
|
log.debug("Attempting to create torrent_info from %s", filepath)
|
|
_file = open(filepath, "rb")
|
|
torrent_info = lt.torrent_info(lt.bdecode(_file.read()))
|
|
_file.close()
|
|
except (IOError, RuntimeError), e:
|
|
log.warning("Unable to open %s: %s", filepath, e)
|
|
|
|
return torrent_info
|
|
|
|
def legacy_get_resume_data_from_file(self, torrent_id):
|
|
"""Returns an entry with the resume data or None"""
|
|
fastresume = ""
|
|
try:
|
|
_file = open(os.path.join(get_config_dir(), "state",
|
|
torrent_id + ".fastresume"), "rb")
|
|
fastresume = _file.read()
|
|
_file.close()
|
|
except IOError, e:
|
|
log.debug("Unable to load .fastresume: %s", e)
|
|
|
|
return str(fastresume)
|
|
|
|
def legacy_delete_resume_data(self, torrent_id):
|
|
"""Deletes the .fastresume file"""
|
|
path = os.path.join(get_config_dir(), "state",
|
|
torrent_id + ".fastresume")
|
|
log.debug("Deleting fastresume file: %s", path)
|
|
try:
|
|
os.remove(path)
|
|
except Exception, e:
|
|
log.warning("Unable to delete the fastresume file: %s", e)
|
|
|
|
def add(self, torrent_info=None, state=None, options=None, save_state=True,
|
|
filedump=None, filename=None, magnet=None, resume_data=None):
|
|
"""Add a torrent to the manager and returns it's torrent_id"""
|
|
|
|
if torrent_info is None and state is None and filedump is None and magnet is None:
|
|
log.debug("You must specify a valid torrent_info, torrent state or magnet.")
|
|
return
|
|
|
|
log.debug("torrentmanager.add")
|
|
add_torrent_params = {}
|
|
|
|
if filedump is not None:
|
|
try:
|
|
torrent_info = lt.torrent_info(lt.bdecode(filedump))
|
|
except Exception, e:
|
|
log.error("Unable to decode torrent file!: %s", e)
|
|
# XXX: Probably should raise an exception here..
|
|
return
|
|
|
|
if torrent_info is None and state:
|
|
# We have no torrent_info so we need to add the torrent with information
|
|
# from the state object.
|
|
|
|
# Populate the options dict from state
|
|
options = TorrentOptions()
|
|
options["max_connections"] = state.max_connections
|
|
options["max_upload_slots"] = state.max_upload_slots
|
|
options["max_upload_speed"] = state.max_upload_speed
|
|
options["max_download_speed"] = state.max_download_speed
|
|
options["prioritize_first_last_pieces"] = state.prioritize_first_last
|
|
options["file_priorities"] = state.file_priorities
|
|
options["compact_allocation"] = state.compact
|
|
options["download_location"] = state.save_path
|
|
options["auto_managed"] = state.auto_managed
|
|
options["stop_at_ratio"] = state.stop_at_ratio
|
|
options["stop_ratio"] = state.stop_ratio
|
|
options["remove_at_ratio"] = state.remove_at_ratio
|
|
options["move_completed"] = state.move_completed
|
|
options["move_completed_path"] = state.move_completed_path
|
|
options["add_paused"] = state.paused
|
|
options["public"] = state.public
|
|
|
|
ti = self.get_torrent_info_from_file(
|
|
os.path.join(get_config_dir(),
|
|
"state", state.torrent_id + ".torrent"))
|
|
if ti:
|
|
add_torrent_params["ti"] = ti
|
|
elif state.magnet:
|
|
magnet = state.magnet
|
|
else:
|
|
log.error("Unable to add torrent!")
|
|
return
|
|
|
|
# Handle legacy case with storing resume data in individual files
|
|
# for each torrent
|
|
if resume_data is None:
|
|
resume_data = self.legacy_get_resume_data_from_file(state.torrent_id)
|
|
self.legacy_delete_resume_data(state.torrent_id)
|
|
|
|
add_torrent_params["resume_data"] = resume_data
|
|
else:
|
|
# We have a torrent_info object so we're not loading from state.
|
|
# Check if options is None and load defaults
|
|
if options == None:
|
|
options = TorrentOptions()
|
|
else:
|
|
o = TorrentOptions()
|
|
o.update(options)
|
|
options = o
|
|
|
|
# Check for renamed files and if so, rename them in the torrent_info
|
|
# before adding to the session.
|
|
if options["mapped_files"]:
|
|
for index, name in options["mapped_files"].items():
|
|
log.debug("renaming file index %s to %s", index, name)
|
|
torrent_info.rename_file(index, utf8_encoded(name))
|
|
|
|
add_torrent_params["ti"] = torrent_info
|
|
add_torrent_params["resume_data"] = ""
|
|
|
|
#log.info("Adding torrent: %s", filename)
|
|
log.debug("options: %s", options)
|
|
|
|
# Set the right storage_mode
|
|
if options["compact_allocation"]:
|
|
storage_mode = lt.storage_mode_t(2)
|
|
else:
|
|
storage_mode = lt.storage_mode_t(1)
|
|
|
|
# Fill in the rest of the add_torrent_params dictionary
|
|
add_torrent_params["save_path"] = utf8_encoded(options["download_location"])
|
|
add_torrent_params["storage_mode"] = storage_mode
|
|
add_torrent_params["paused"] = True
|
|
add_torrent_params["auto_managed"] = False
|
|
add_torrent_params["duplicate_is_error"] = True
|
|
|
|
# We need to pause the AlertManager momentarily to prevent alerts
|
|
# for this torrent being generated before a Torrent object is created.
|
|
component.pause("AlertManager")
|
|
|
|
handle = None
|
|
try:
|
|
if magnet:
|
|
handle = lt.add_magnet_uri(self.session, magnet, add_torrent_params)
|
|
else:
|
|
handle = self.session.add_torrent(add_torrent_params)
|
|
except RuntimeError, e:
|
|
log.warning("Error adding torrent: %s", e)
|
|
|
|
if not handle or not handle.is_valid():
|
|
log.debug("torrent handle is invalid!")
|
|
# The torrent was not added to the session
|
|
component.resume("AlertManager")
|
|
return
|
|
|
|
log.debug("handle id: %s", str(handle.info_hash()))
|
|
# Set auto_managed to False because the torrent is paused
|
|
handle.auto_managed(False)
|
|
# Create a Torrent object
|
|
owner = state.owner if state else component.get("RPCServer").get_session_user()
|
|
torrent = Torrent(handle, options, state, filename, magnet, owner)
|
|
# Add the torrent object to the dictionary
|
|
self.torrents[torrent.torrent_id] = torrent
|
|
if self.config["queue_new_to_top"]:
|
|
handle.queue_position_top()
|
|
|
|
component.resume("AlertManager")
|
|
|
|
# Resume the torrent if needed
|
|
if not options["add_paused"]:
|
|
torrent.resume()
|
|
|
|
# Write the .torrent file to the state directory
|
|
if filedump:
|
|
try:
|
|
save_file = open(os.path.join(get_config_dir(), "state",
|
|
torrent.torrent_id + ".torrent"),
|
|
"wb")
|
|
save_file.write(filedump)
|
|
save_file.close()
|
|
except IOError, e:
|
|
log.warning("Unable to save torrent file: %s", e)
|
|
|
|
# If the user has requested a copy of the torrent be saved elsewhere
|
|
# we need to do that.
|
|
if self.config["copy_torrent_file"] and filename is not None:
|
|
try:
|
|
save_file = open(
|
|
os.path.join(self.config["torrentfiles_location"], filename),
|
|
"wb")
|
|
save_file.write(filedump)
|
|
save_file.close()
|
|
except IOError, e:
|
|
log.warning("Unable to save torrent file: %s", e)
|
|
|
|
if save_state:
|
|
# Save the session state
|
|
self.save_state()
|
|
|
|
# Emit the torrent_added signal
|
|
component.get("EventManager").emit(TorrentAddedEvent(torrent.torrent_id))
|
|
|
|
log.info("Torrent %s added by user: %s", torrent.get_status(["name"])["name"], component.get("RPCServer").get_session_user())
|
|
return torrent.torrent_id
|
|
|
|
def load_torrent(self, torrent_id):
|
|
"""Load a torrent file from state and return it's torrent info"""
|
|
filedump = None
|
|
# Get the torrent data from the torrent file
|
|
try:
|
|
log.debug("Attempting to open %s for add.", torrent_id)
|
|
_file = open(
|
|
os.path.join(
|
|
get_config_dir(), "state", torrent_id + ".torrent"),
|
|
"rb")
|
|
filedump = lt.bdecode(_file.read())
|
|
_file.close()
|
|
except (IOError, RuntimeError), e:
|
|
log.warning("Unable to open %s: %s", torrent_id, e)
|
|
return False
|
|
|
|
return filedump
|
|
|
|
def remove(self, torrent_id, remove_data=False):
|
|
"""
|
|
Remove a torrent from the session.
|
|
|
|
:param torrent_id: the torrent to remove
|
|
:type torrent_id: string
|
|
:param remove_data: if True, remove the downloaded data
|
|
:type remove_data: bool
|
|
|
|
:returns: True if removed successfully, False if not
|
|
:rtype: bool
|
|
|
|
:raises InvalidTorrentError: if the torrent_id is not in the session
|
|
|
|
"""
|
|
|
|
if torrent_id not in self.torrents:
|
|
raise InvalidTorrentError("torrent_id not in session")
|
|
|
|
torrent_name = self.torrents[torrent_id].get_status(["name"])["name"]
|
|
|
|
# Emit the signal to the clients
|
|
component.get("EventManager").emit(PreTorrentRemovedEvent(torrent_id))
|
|
|
|
try:
|
|
self.session.remove_torrent(self.torrents[torrent_id].handle,
|
|
1 if remove_data else 0)
|
|
except (RuntimeError, KeyError), e:
|
|
log.warning("Error removing torrent: %s", e)
|
|
return False
|
|
|
|
# Remove fastresume data if it is exists
|
|
resume_data = self.load_resume_data_file()
|
|
resume_data.pop(torrent_id, None)
|
|
self.save_resume_data_file(resume_data)
|
|
|
|
# Remove the .torrent file in the state
|
|
self.torrents[torrent_id].delete_torrentfile()
|
|
|
|
# Remove the torrent file from the user specified directory
|
|
filename = self.torrents[torrent_id].filename
|
|
if self.config["copy_torrent_file"] \
|
|
and self.config["del_copy_torrent_file"] \
|
|
and filename:
|
|
try:
|
|
users_torrent_file = os.path.join(
|
|
self.config["torrentfiles_location"],
|
|
filename)
|
|
log.info("Delete user's torrent file: %s",
|
|
users_torrent_file)
|
|
os.remove(users_torrent_file)
|
|
except Exception, e:
|
|
log.warning("Unable to remove copy torrent file: %s", e)
|
|
|
|
# Stop the looping call
|
|
self.torrents[torrent_id].prev_status_cleanup_loop.stop()
|
|
|
|
# Remove the torrent from deluge's session
|
|
try:
|
|
del self.torrents[torrent_id]
|
|
except KeyError, ValueError:
|
|
return False
|
|
|
|
# Save the session state
|
|
self.save_state()
|
|
|
|
# Emit the signal to the clients
|
|
component.get("EventManager").emit(TorrentRemovedEvent(torrent_id))
|
|
log.info("Torrent %s removed by user: %s", torrent_name, component.get("RPCServer").get_session_user())
|
|
return True
|
|
|
|
def load_state(self):
|
|
"""Load the state of the TorrentManager from the torrents.state file"""
|
|
state = TorrentManagerState()
|
|
|
|
try:
|
|
log.debug("Opening torrent state file for load.")
|
|
state_file = open(
|
|
os.path.join(get_config_dir(), "state", "torrents.state"), "rb")
|
|
state = cPickle.load(state_file)
|
|
state_file.close()
|
|
except (EOFError, IOError, Exception), e:
|
|
log.warning("Unable to load state file: %s", e)
|
|
|
|
# Try to use an old state
|
|
try:
|
|
state_tmp = TorrentState()
|
|
if dir(state.torrents[0]) != dir(state_tmp):
|
|
for attr in (set(dir(state_tmp)) - set(dir(state.torrents[0]))):
|
|
for s in state.torrents:
|
|
setattr(s, attr, getattr(state_tmp, attr, None))
|
|
except Exception, e:
|
|
log.warning("Unable to update state file to a compatible version: %s", e)
|
|
|
|
# Reorder the state.torrents list to add torrents in the correct queue
|
|
# order.
|
|
state.torrents.sort(key=operator.attrgetter("queue"))
|
|
|
|
resume_data = self.load_resume_data_file()
|
|
|
|
for torrent_state in state.torrents:
|
|
try:
|
|
self.add(state=torrent_state, save_state=False,
|
|
resume_data=resume_data.get(torrent_state.torrent_id))
|
|
except AttributeError, e:
|
|
log.error("Torrent state file is either corrupt or incompatible! %s", e)
|
|
break
|
|
|
|
component.get("EventManager").emit(SessionStartedEvent())
|
|
|
|
def save_state(self):
|
|
"""Save the state of the TorrentManager to the torrents.state file"""
|
|
state = TorrentManagerState()
|
|
# Create the state for each Torrent and append to the list
|
|
for torrent in self.torrents.values():
|
|
paused = False
|
|
if torrent.state == "Paused":
|
|
paused = True
|
|
|
|
torrent_state = TorrentState(
|
|
torrent.torrent_id,
|
|
torrent.filename,
|
|
torrent.get_status(["total_uploaded"])["total_uploaded"],
|
|
torrent.trackers,
|
|
torrent.options["compact_allocation"],
|
|
paused,
|
|
torrent.options["download_location"],
|
|
torrent.options["max_connections"],
|
|
torrent.options["max_upload_slots"],
|
|
torrent.options["max_upload_speed"],
|
|
torrent.options["max_download_speed"],
|
|
torrent.options["prioritize_first_last_pieces"],
|
|
torrent.options["file_priorities"],
|
|
torrent.get_queue_position(),
|
|
torrent.options["auto_managed"],
|
|
torrent.is_finished,
|
|
torrent.options["stop_ratio"],
|
|
torrent.options["stop_at_ratio"],
|
|
torrent.options["remove_at_ratio"],
|
|
torrent.options["move_completed"],
|
|
torrent.options["move_completed_path"],
|
|
torrent.magnet,
|
|
torrent.time_added,
|
|
torrent.owner,
|
|
torrent.options["public"]
|
|
)
|
|
state.torrents.append(torrent_state)
|
|
|
|
# Pickle the TorrentManagerState object
|
|
try:
|
|
log.debug("Saving torrent state file.")
|
|
state_file = open(os.path.join(get_config_dir(),
|
|
"state", "torrents.state.new"), "wb")
|
|
cPickle.dump(state, state_file)
|
|
state_file.flush()
|
|
os.fsync(state_file.fileno())
|
|
state_file.close()
|
|
except IOError:
|
|
log.warning("Unable to save state file.")
|
|
return True
|
|
|
|
# We have to move the 'torrents.state.new' file to 'torrents.state'
|
|
try:
|
|
shutil.move(
|
|
os.path.join(get_config_dir(), "state", "torrents.state.new"),
|
|
os.path.join(get_config_dir(), "state", "torrents.state"))
|
|
except IOError:
|
|
log.warning("Unable to save state file.")
|
|
return True
|
|
|
|
# We return True so that the timer thread will continue
|
|
return True
|
|
|
|
def save_resume_data(self, torrent_ids=None):
|
|
"""
|
|
Saves resume data for list of torrent_ids or for all torrents if
|
|
torrent_ids is None
|
|
"""
|
|
|
|
if torrent_ids is None:
|
|
torrent_ids = self.torrents.keys()
|
|
|
|
for torrent_id in torrent_ids:
|
|
self.torrents[torrent_id].save_resume_data()
|
|
|
|
self.num_resume_data = len(torrent_ids)
|
|
|
|
def load_resume_data_file(self):
|
|
resume_data = {}
|
|
try:
|
|
log.debug("Opening torrents fastresume file for load.")
|
|
fastresume_file = open(os.path.join(get_config_dir(), "state",
|
|
"torrents.fastresume"), "rb")
|
|
resume_data = lt.bdecode(fastresume_file.read())
|
|
fastresume_file.close()
|
|
except (EOFError, IOError, Exception), e:
|
|
log.warning("Unable to load fastresume file: %s", e)
|
|
|
|
# If the libtorrent bdecode doesn't happen properly, it will return None
|
|
# so we need to make sure we return a {}
|
|
if resume_data is None:
|
|
return {}
|
|
|
|
return resume_data
|
|
|
|
def save_resume_data_file(self, resume_data=None):
|
|
"""
|
|
Saves the resume data file with the contents of self.resume_data. If
|
|
`resume_data` is None, then we grab the resume_data from the file on
|
|
disk, else, we update `resume_data` with self.resume_data and save
|
|
that to disk.
|
|
|
|
:param resume_data: the current resume_data, this will be loaded from disk if not provided
|
|
:type resume_data: dict
|
|
|
|
"""
|
|
# Check to see if we're waiting on more resume data
|
|
if self.num_resume_data or not self.resume_data:
|
|
return
|
|
|
|
path = os.path.join(get_config_dir(), "state", "torrents.fastresume")
|
|
|
|
# First step is to load the existing file and update the dictionary
|
|
if resume_data is None:
|
|
resume_data = self.load_resume_data_file()
|
|
|
|
resume_data.update(self.resume_data)
|
|
self.resume_data = {}
|
|
|
|
try:
|
|
log.debug("Saving fastresume file: %s", path)
|
|
fastresume_file = open(path, "wb")
|
|
fastresume_file.write(lt.bencode(resume_data))
|
|
fastresume_file.flush()
|
|
os.fsync(fastresume_file.fileno())
|
|
fastresume_file.close()
|
|
except IOError:
|
|
log.warning("Error trying to save fastresume file")
|
|
|
|
def queue_top(self, torrent_id):
|
|
"""Queue torrent to top"""
|
|
if self.torrents[torrent_id].get_queue_position() == 0:
|
|
return False
|
|
|
|
self.torrents[torrent_id].handle.queue_position_top()
|
|
return True
|
|
|
|
def queue_up(self, torrent_id):
|
|
"""Queue torrent up one position"""
|
|
if self.torrents[torrent_id].get_queue_position() == 0:
|
|
return False
|
|
|
|
self.torrents[torrent_id].handle.queue_position_up()
|
|
return True
|
|
|
|
def queue_down(self, torrent_id):
|
|
"""Queue torrent down one position"""
|
|
if self.torrents[torrent_id].get_queue_position() == (len(self.torrents) - 1):
|
|
return False
|
|
|
|
self.torrents[torrent_id].handle.queue_position_down()
|
|
return True
|
|
|
|
def queue_bottom(self, torrent_id):
|
|
"""Queue torrent to bottom"""
|
|
if self.torrents[torrent_id].get_queue_position() == (len(self.torrents) - 1):
|
|
return False
|
|
|
|
self.torrents[torrent_id].handle.queue_position_bottom()
|
|
return True
|
|
|
|
def on_set_max_connections_per_torrent(self, key, value):
|
|
"""Sets the per-torrent connection limit"""
|
|
log.debug("max_connections_per_torrent set to %s..", value)
|
|
for key in self.torrents.keys():
|
|
self.torrents[key].set_max_connections(value)
|
|
|
|
def on_set_max_upload_slots_per_torrent(self, key, value):
|
|
"""Sets the per-torrent upload slot limit"""
|
|
log.debug("max_upload_slots_per_torrent set to %s..", value)
|
|
for key in self.torrents.keys():
|
|
self.torrents[key].set_max_upload_slots(value)
|
|
|
|
def on_set_max_upload_speed_per_torrent(self, key, value):
|
|
log.debug("max_upload_speed_per_torrent set to %s..", value)
|
|
for key in self.torrents.keys():
|
|
self.torrents[key].set_max_upload_speed(value)
|
|
|
|
def on_set_max_download_speed_per_torrent(self, key, value):
|
|
log.debug("max_download_speed_per_torrent set to %s..", value)
|
|
for key in self.torrents.keys():
|
|
self.torrents[key].set_max_download_speed(value)
|
|
|
|
## Alert handlers ##
|
|
def on_alert_torrent_finished(self, alert):
|
|
log.debug("on_alert_torrent_finished")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent_id = str(alert.handle.info_hash())
|
|
log.debug("%s is finished..", torrent_id)
|
|
|
|
# Get the total_download and if it's 0, do not move.. It's likely
|
|
# that the torrent wasn't downloaded, but just added.
|
|
total_download = torrent.get_status(["total_payload_download"])["total_payload_download"]
|
|
|
|
# Move completed download to completed folder if needed
|
|
if not torrent.is_finished and total_download:
|
|
move_path = None
|
|
|
|
if torrent.options["move_completed"]:
|
|
move_path = torrent.options["move_completed_path"]
|
|
if torrent.options["download_location"] != move_path:
|
|
torrent.move_storage(move_path)
|
|
|
|
torrent.is_finished = True
|
|
component.get("EventManager").emit(TorrentFinishedEvent(torrent_id))
|
|
|
|
torrent.update_state()
|
|
|
|
# Only save resume data if it was actually downloaded something. Helps
|
|
# on startup with big queues with lots of seeding torrents. Libtorrent
|
|
# emits alert_torrent_finished for them, but there seems like nothing
|
|
# worth really to save in resume data, we just read it up in
|
|
# self.load_state().
|
|
if total_download:
|
|
self.save_resume_data((torrent_id, ))
|
|
|
|
def on_alert_torrent_paused(self, alert):
|
|
log.debug("on_alert_torrent_paused")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent_id = str(alert.handle.info_hash())
|
|
# Set the torrent state
|
|
old_state = torrent.state
|
|
torrent.update_state()
|
|
if torrent.state != old_state:
|
|
component.get("EventManager").emit(TorrentStateChangedEvent(torrent_id, torrent.state))
|
|
|
|
# Don't save resume data for each torrent after self.stop() was called.
|
|
# We save resume data in bulk in self.stop() in this case.
|
|
if self.save_resume_data_timer.running:
|
|
# Write the fastresume file
|
|
self.save_resume_data((torrent_id, ))
|
|
|
|
if torrent_id in self.shutdown_torrent_pause_list:
|
|
self.shutdown_torrent_pause_list.remove(torrent_id)
|
|
|
|
def on_alert_torrent_checked(self, alert):
|
|
log.debug("on_alert_torrent_checked")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
|
|
# Check to see if we're forcing a recheck and set it back to paused
|
|
# if necessary
|
|
if torrent.forcing_recheck:
|
|
torrent.forcing_recheck = False
|
|
if torrent.forcing_recheck_paused:
|
|
torrent.handle.pause()
|
|
|
|
# Set the torrent state
|
|
torrent.update_state()
|
|
|
|
def on_alert_tracker_reply(self, alert):
|
|
log.debug("on_alert_tracker_reply: %s", alert.message().decode("utf8"))
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
|
|
# Set the tracker status for the torrent
|
|
if alert.message() != "Got peers from DHT":
|
|
torrent.set_tracker_status(_("Announce OK"))
|
|
|
|
# Check to see if we got any peer information from the tracker
|
|
if alert.handle.status().num_complete == -1 or \
|
|
alert.handle.status().num_incomplete == -1:
|
|
# We didn't get peer information, so lets send a scrape request
|
|
torrent.scrape_tracker()
|
|
|
|
def on_alert_tracker_announce(self, alert):
|
|
log.debug("on_alert_tracker_announce")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
|
|
# Set the tracker status for the torrent
|
|
torrent.set_tracker_status(_("Announce Sent"))
|
|
|
|
def on_alert_tracker_warning(self, alert):
|
|
log.debug("on_alert_tracker_warning")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
tracker_status = '%s: %s' % (_("Warning"), str(alert.message()))
|
|
# Set the tracker status for the torrent
|
|
torrent.set_tracker_status(tracker_status)
|
|
|
|
def on_alert_tracker_error(self, alert):
|
|
log.debug("on_alert_tracker_error")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
tracker_status = "%s: %s" % (_("Error"), alert.msg)
|
|
torrent.set_tracker_status(tracker_status)
|
|
|
|
def on_alert_storage_moved(self, alert):
|
|
log.debug("on_alert_storage_moved")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent.set_save_path(alert.handle.save_path())
|
|
torrent.set_move_completed(False)
|
|
|
|
def on_alert_torrent_resumed(self, alert):
|
|
log.debug("on_alert_torrent_resumed")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent_id = str(alert.handle.info_hash())
|
|
torrent.is_finished = torrent.handle.is_seed()
|
|
old_state = torrent.state
|
|
torrent.update_state()
|
|
if torrent.state != old_state:
|
|
# We need to emit a TorrentStateChangedEvent too
|
|
component.get("EventManager").emit(TorrentStateChangedEvent(torrent_id, torrent.state))
|
|
component.get("EventManager").emit(TorrentResumedEvent(torrent_id))
|
|
|
|
def on_alert_state_changed(self, alert):
|
|
log.debug("on_alert_state_changed")
|
|
try:
|
|
torrent_id = str(alert.handle.info_hash())
|
|
torrent = self.torrents[torrent_id]
|
|
except:
|
|
return
|
|
|
|
old_state = torrent.state
|
|
torrent.update_state()
|
|
# Only emit a state changed event if the state has actually changed
|
|
if torrent.state != old_state:
|
|
component.get("EventManager").emit(TorrentStateChangedEvent(torrent_id, torrent.state))
|
|
|
|
def on_alert_save_resume_data(self, alert):
|
|
log.debug("on_alert_save_resume_data")
|
|
|
|
torrent_id = str(alert.handle.info_hash())
|
|
|
|
try:
|
|
torrent = self.torrents[torrent_id]
|
|
except:
|
|
return
|
|
|
|
# Libtorrent in add_torrent() expects resume_data to be bencoded
|
|
self.resume_data[torrent_id] = lt.bencode(alert.resume_data)
|
|
self.num_resume_data -= 1
|
|
|
|
torrent.waiting_on_resume_data = False
|
|
|
|
self.save_resume_data_file()
|
|
|
|
def on_alert_save_resume_data_failed(self, alert):
|
|
log.debug("on_alert_save_resume_data_failed: %s", alert.message())
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
|
|
self.num_resume_data -= 1
|
|
torrent.waiting_on_resume_data = False
|
|
|
|
self.save_resume_data_file()
|
|
|
|
|
|
def on_alert_file_renamed(self, alert):
|
|
log.debug("on_alert_file_renamed")
|
|
log.debug("index: %s name: %s", alert.index, alert.name.decode("utf8"))
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent_id = str(alert.handle.info_hash())
|
|
|
|
# We need to see if this file index is in a waiting_on_folder list
|
|
folder_rename = False
|
|
for i, wait_on_folder in enumerate(torrent.waiting_on_folder_rename):
|
|
if alert.index in wait_on_folder[2]:
|
|
folder_rename = True
|
|
if len(wait_on_folder[2]) == 1:
|
|
# This is the last alert we were waiting for, time to send signal
|
|
component.get("EventManager").emit(TorrentFolderRenamedEvent(torrent_id, wait_on_folder[0], wait_on_folder[1]))
|
|
del torrent.waiting_on_folder_rename[i]
|
|
self.save_resume_data((torrent_id,))
|
|
break
|
|
# This isn't the last file to be renamed in this folder, so just
|
|
# remove the index and continue
|
|
torrent.waiting_on_folder_rename[i][2].remove(alert.index)
|
|
|
|
if not folder_rename:
|
|
# This is just a regular file rename so send the signal
|
|
component.get("EventManager").emit(TorrentFileRenamedEvent(torrent_id, alert.index, alert.name))
|
|
self.save_resume_data((torrent_id,))
|
|
|
|
def on_alert_metadata_received(self, alert):
|
|
log.debug("on_alert_metadata_received")
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent.write_torrentfile()
|
|
|
|
def on_alert_file_error(self, alert):
|
|
log.debug("on_alert_file_error: %s", alert.message())
|
|
try:
|
|
torrent = self.torrents[str(alert.handle.info_hash())]
|
|
except:
|
|
return
|
|
torrent.update_state()
|
|
|
|
def on_alert_file_completed(self, alert):
|
|
log.debug("file_completed_alert: %s", alert.message())
|
|
torrent_id = str(alert.handle.info_hash())
|
|
component.get("EventManager").emit(
|
|
TorrentFileCompletedEvent(torrent_id, alert.index))
|