lt 0.13 more fix

This commit is contained in:
Marcos Pinto 2007-05-25 20:23:37 +00:00
commit ac3690fdee
20 changed files with 684 additions and 586 deletions

View file

@ -77,6 +77,11 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/stat.hpp" #include "libtorrent/stat.hpp"
#include "libtorrent/file_pool.hpp" #include "libtorrent/file_pool.hpp"
#include "libtorrent/bandwidth_manager.hpp" #include "libtorrent/bandwidth_manager.hpp"
#include "libtorrent/natpmp.hpp"
#include "libtorrent/upnp.hpp"
#include "libtorrent/lsd.hpp"
#include "libtorrent/socket_type.hpp"
#include "libtorrent/connection_queue.hpp"
namespace libtorrent namespace libtorrent
{ {
@ -104,6 +109,7 @@ namespace libtorrent
std::vector<int> piece_map; std::vector<int> piece_map;
std::vector<piece_picker::downloading_piece> unfinished_pieces; std::vector<piece_picker::downloading_piece> unfinished_pieces;
std::vector<piece_picker::block_info> block_info;
std::vector<tcp::endpoint> peers; std::vector<tcp::endpoint> peers;
entry resume_data; entry resume_data;
@ -163,12 +169,10 @@ namespace libtorrent
#endif #endif
friend struct checker_impl; friend struct checker_impl;
friend class invariant_access; friend class invariant_access;
typedef std::map<boost::shared_ptr<stream_socket> typedef std::map<boost::shared_ptr<socket_type>
, boost::intrusive_ptr<peer_connection> > , boost::intrusive_ptr<peer_connection> >
connection_map; connection_map;
typedef std::map<sha1_hash, boost::shared_ptr<torrent> > torrent_map; typedef std::map<sha1_hash, boost::shared_ptr<torrent> > torrent_map;
typedef std::deque<boost::intrusive_ptr<peer_connection> >
connection_queue;
session_impl( session_impl(
std::pair<int, int> listen_port_range std::pair<int, int> listen_port_range
@ -184,7 +188,7 @@ namespace libtorrent
void open_listen_port(); void open_listen_port();
void async_accept(); void async_accept();
void on_incoming_connection(boost::shared_ptr<stream_socket> const& s void on_incoming_connection(boost::shared_ptr<socket_type> const& s
, boost::weak_ptr<socket_acceptor> const& as, asio::error_code const& e); , boost::weak_ptr<socket_acceptor> const& as, asio::error_code const& e);
// must be locked to access the data // must be locked to access the data
@ -195,14 +199,8 @@ namespace libtorrent
boost::weak_ptr<torrent> find_torrent(const sha1_hash& info_hash); boost::weak_ptr<torrent> find_torrent(const sha1_hash& info_hash);
peer_id const& get_peer_id() const { return m_peer_id; } peer_id const& get_peer_id() const { return m_peer_id; }
// this will see if there are any pending connection attempts
// and in that case initiate new connections until the limit
// is reached.
void process_connection_queue();
void close_connection(boost::intrusive_ptr<peer_connection> const& p); void close_connection(boost::intrusive_ptr<peer_connection> const& p);
void connection_completed(boost::intrusive_ptr<peer_connection> const& p); void connection_failed(boost::shared_ptr<socket_type> const& s
void connection_failed(boost::shared_ptr<stream_socket> const& s
, tcp::endpoint const& a, char const* message); , tcp::endpoint const& a, char const* message);
void set_settings(session_settings const& s); void set_settings(session_settings const& s);
@ -213,11 +211,16 @@ namespace libtorrent
void add_dht_node(udp::endpoint n); void add_dht_node(udp::endpoint n);
void add_dht_router(std::pair<std::string, int> const& node); void add_dht_router(std::pair<std::string, int> const& node);
void set_dht_settings(dht_settings const& s); void set_dht_settings(dht_settings const& s);
dht_settings const& kad_settings() const { return m_dht_settings; } dht_settings const& get_dht_settings() const { return m_dht_settings; }
void start_dht(entry const& startup_state); void start_dht(entry const& startup_state);
void stop_dht(); void stop_dht();
entry dht_state() const; entry dht_state() const;
#endif #endif
// called when a port mapping is successful, or a router returns
// a failure to map a port
void on_port_mapping(int tcp_port, int udp_port, std::string const& errmsg);
bool is_aborted() const { return m_abort; } bool is_aborted() const { return m_abort; }
void set_ip_filter(ip_filter const& f); void set_ip_filter(ip_filter const& f);
@ -232,7 +235,8 @@ namespace libtorrent
, boost::filesystem::path const& save_path , boost::filesystem::path const& save_path
, entry const& resume_data , entry const& resume_data
, bool compact_mode , bool compact_mode
, int block_size); , int block_size
, storage_constructor_type sc);
torrent_handle add_torrent( torrent_handle add_torrent(
char const* tracker_url char const* tracker_url
@ -241,7 +245,8 @@ namespace libtorrent
, boost::filesystem::path const& save_path , boost::filesystem::path const& save_path
, entry const& resume_data , entry const& resume_data
, bool compact_mode , bool compact_mode
, int block_size); , int block_size
, storage_constructor_type sc);
void remove_torrent(torrent_handle const& h); void remove_torrent(torrent_handle const& h);
@ -271,18 +276,56 @@ namespace libtorrent
torrent_handle find_torrent_handle(sha1_hash const& info_hash); torrent_handle find_torrent_handle(sha1_hash const& info_hash);
void announce_lsd(sha1_hash const& ih);
void set_peer_proxy(proxy_settings const& s)
{ m_peer_proxy = s; }
void set_web_seed_proxy(proxy_settings const& s)
{ m_web_seed_proxy = s; }
void set_tracker_proxy(proxy_settings const& s)
{ m_tracker_proxy = s; }
proxy_settings const& peer_proxy() const
{ return m_peer_proxy; }
proxy_settings const& web_seed_proxy() const
{ return m_web_seed_proxy; }
proxy_settings const& tracker_proxy() const
{ return m_tracker_proxy; }
#ifndef TORRENT_DISABLE_DHT
void set_dht_proxy(proxy_settings const& s)
{ m_dht_proxy = s; }
proxy_settings const& dht_proxy() const
{ return m_dht_proxy; }
#endif
// handles delayed alerts // handles delayed alerts
alert_manager m_alerts; alert_manager m_alerts;
// private: // private:
void on_lsd_peer(tcp::endpoint peer, sha1_hash const& ih);
// this is where all active sockets are stored. // this is where all active sockets are stored.
// the selector can sleep while there's no activity on // the selector can sleep while there's no activity on
// them // them
io_service m_io_service; io_service m_io_service;
asio::strand m_strand; asio::strand m_strand;
// the file pool that all storages in this session's
// torrents uses. It sets a limit on the number of
// open files by this session.
// file pool must be destructed after the torrents
// since they will still have references to it
// when they are destructed.
file_pool m_files;
// this is a list of half-open tcp connections
// (only outgoing connections)
// this has to be one of the last
// members to be destructed
connection_queue m_half_open;
// the bandwidth manager is responsible for // the bandwidth manager is responsible for
// handing out bandwidth to connections that // handing out bandwidth to connections that
// asks for it, it can also throttle the // asks for it, it can also throttle the
@ -298,16 +341,6 @@ namespace libtorrent
// peers. // peers.
connection_map m_connections; connection_map m_connections;
// this is a list of half-open tcp connections
// (only outgoing connections)
connection_map m_half_open;
// this is a queue of pending outgoing connections. If the
// list of half-open connections is full (given the global
// limit), new outgoing connections are put on this queue,
// waiting for one slot in the half-open queue to open up.
connection_queue m_connection_queue;
// filters incoming connections // filters incoming connections
ip_filter m_ip_filter; ip_filter m_ip_filter;
@ -329,10 +362,27 @@ namespace libtorrent
// interface to listen on // interface to listen on
tcp::endpoint m_listen_interface; tcp::endpoint m_listen_interface;
// this is typically set to the same as the local
// listen port. In case a NAT port forward was
// successfully opened, this will be set to the
// port that is open on the external (NAT) interface
// on the NAT box itself. This is the port that has
// to be published to peers, since this is the port
// the client is reachable through.
int m_external_listen_port;
boost::shared_ptr<socket_acceptor> m_listen_socket; boost::shared_ptr<socket_acceptor> m_listen_socket;
// the settings for the client // the settings for the client
session_settings m_settings; session_settings m_settings;
// the proxy settings for different
// kinds of connections
proxy_settings m_peer_proxy;
proxy_settings m_web_seed_proxy;
proxy_settings m_tracker_proxy;
#ifndef TORRENT_DISABLE_DHT
proxy_settings m_dht_proxy;
#endif
// set to true when the session object // set to true when the session object
// is being destructed and the thread // is being destructed and the thread
@ -341,9 +391,6 @@ namespace libtorrent
int m_max_uploads; int m_max_uploads;
int m_max_connections; int m_max_connections;
// the number of simultaneous half-open tcp
// connections libtorrent will have.
int m_half_open_limit;
// statistics gathered from all torrents. // statistics gathered from all torrents.
stat m_stat; stat m_stat;
@ -354,23 +401,42 @@ namespace libtorrent
// NAT or not. // NAT or not.
bool m_incoming_connection; bool m_incoming_connection;
// the file pool that all storages in this session's
// torrents uses. It sets a limit on the number of
// open files by this session.
file_pool m_files;
void second_tick(asio::error_code const& e); void second_tick(asio::error_code const& e);
boost::posix_time::ptime m_last_tick; ptime m_last_tick;
#ifndef TORRENT_DISABLE_DHT #ifndef TORRENT_DISABLE_DHT
boost::intrusive_ptr<dht::dht_tracker> m_dht; boost::intrusive_ptr<dht::dht_tracker> m_dht;
dht_settings m_dht_settings; dht_settings m_dht_settings;
// if this is set to true, the dht listen port
// will be set to the same as the tcp listen port
// and will be synchronlized with it as it changes
// it defaults to true
bool m_dht_same_port;
// see m_external_listen_port. This is the same
// but for the udp port used by the DHT.
int m_external_udp_port;
#endif #endif
natpmp m_natpmp;
upnp m_upnp;
lsd m_lsd;
// the timer used to fire the second_tick // the timer used to fire the second_tick
deadline_timer m_timer; deadline_timer m_timer;
// the index of the torrent that will be offered to
// connect to a peer next time second_tick is called.
// This implements a round robin.
int m_next_connect_torrent;
#ifndef NDEBUG #ifndef NDEBUG
void check_invariant(const char *place = 0); void check_invariant(const char *place = 0);
#endif #endif
#ifdef TORRENT_STATS
// logger used to write bandwidth usage statistics
std::ofstream m_stats_logger;
int m_second_counter;
#endif
#if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING)
boost::shared_ptr<logger> create_log(std::string const& name boost::shared_ptr<logger> create_log(std::string const& name
, int instance, bool append = true); , int instance, bool append = true);
@ -380,9 +446,6 @@ namespace libtorrent
// whe shutting down process // whe shutting down process
std::list<boost::shared_ptr<tracker_logger> > m_tracker_loggers; std::list<boost::shared_ptr<tracker_logger> > m_tracker_loggers;
// logger used to write bandwidth usage statistics
boost::shared_ptr<logger> m_stats_logger;
int m_second_counter;
public: public:
boost::shared_ptr<logger> m_logger; boost::shared_ptr<logger> m_logger;
private: private:

View file

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
#endif #endif
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include "libtorrent/config.hpp"
#ifdef _MSC_VER #ifdef _MSC_VER
#pragma warning(pop) #pragma warning(pop)
@ -47,7 +48,7 @@ namespace libtorrent
{ {
struct torrent_plugin; struct torrent_plugin;
class torrent; class torrent;
boost::shared_ptr<torrent_plugin> create_metadata_plugin(torrent*); TORRENT_EXPORT boost::shared_ptr<torrent_plugin> create_metadata_plugin(torrent*);
} }
#endif // TORRENT_METADATA_TRANSFER_HPP_INCLUDED #endif // TORRENT_METADATA_TRANSFER_HPP_INCLUDED

View file

@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE.
#endif #endif
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include "libtorrent/config.hpp"
#ifdef _MSC_VER #ifdef _MSC_VER
#pragma warning(pop) #pragma warning(pop)
@ -47,7 +48,7 @@ namespace libtorrent
{ {
struct torrent_plugin; struct torrent_plugin;
class torrent; class torrent;
boost::shared_ptr<torrent_plugin> create_ut_pex_plugin(torrent*); TORRENT_EXPORT boost::shared_ptr<torrent_plugin> create_ut_pex_plugin(torrent*);
} }
#endif // TORRENT_UT_PEX_EXTENSION_HPP_INCLUDED #endif // TORRENT_UT_PEX_EXTENSION_HPP_INCLUDED

View file

@ -38,6 +38,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/traversal_algorithm.hpp> #include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp> #include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
@ -80,6 +82,35 @@ private:
done_callback m_done_callback; done_callback m_done_callback;
}; };
class closest_nodes_observer : public observer
{
public:
closest_nodes_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self
, node_id target)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~closest_nodes_observer();
void send(msg& p)
{
p.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<traversal_algorithm> m_algorithm;
node_id const m_target;
node_id const m_self;
};
} } // namespace libtorrent::dht } } // namespace libtorrent::dht
#endif // CLOSEST_NODES_050323_HPP #endif // CLOSEST_NODES_050323_HPP

View file

@ -39,8 +39,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <set> #include <set>
#include <numeric> #include <numeric>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/ref.hpp> #include <boost/ref.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
@ -120,7 +118,7 @@ namespace libtorrent { namespace dht
udp::endpoint m_remote_endpoint[2]; udp::endpoint m_remote_endpoint[2];
std::vector<char> m_send_buf; std::vector<char> m_send_buf;
boost::posix_time::ptime m_last_refresh; ptime m_last_new_key;
deadline_timer m_timer; deadline_timer m_timer;
deadline_timer m_connection_timer; deadline_timer m_connection_timer;
deadline_timer m_refresh_timer; deadline_timer m_refresh_timer;

View file

@ -40,8 +40,10 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/packet_iterator.hpp> #include <libtorrent/kademlia/packet_iterator.hpp>
#include <boost/optional.hpp> #include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/optional.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
@ -89,6 +91,37 @@ private:
bool m_done; bool m_done;
}; };
class find_data_observer : public observer
{
public:
find_data_observer(
boost::intrusive_ptr<find_data> const& algorithm
, node_id self
, node_id target)
: observer(algorithm->allocator())
, m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~find_data_observer();
void send(msg& m)
{
m.reply = false;
m.message_id = messages::get_peers;
m.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<find_data> m_algorithm;
node_id const m_target;
node_id const m_self;
};
} } // namespace libtorrent::dht } } // namespace libtorrent::dht
#endif // FIND_DATA_050323_HPP #endif // FIND_DATA_050323_HPP

View file

@ -41,17 +41,18 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/node_id.hpp> #include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/io.hpp> #include <libtorrent/io.hpp>
#include <libtorrent/session_settings.hpp> #include <libtorrent/session_settings.hpp>
#include <boost/cstdint.hpp> #include <boost/cstdint.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/iterator/transform_iterator.hpp> #include <boost/iterator/transform_iterator.hpp>
#include <boost/ref.hpp> #include <boost/ref.hpp>
#include "libtorrent/socket.hpp"
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
@ -67,7 +68,7 @@ TORRENT_DECLARE_LOG(node);
struct peer_entry struct peer_entry
{ {
tcp::endpoint addr; tcp::endpoint addr;
boost::posix_time::ptime added; ptime added;
}; };
// this is a group. It contains a set of group members // this is a group. It contains a set of group members
@ -85,6 +86,75 @@ inline bool operator<(peer_entry const& lhs, peer_entry const& rhs)
struct null_type {}; struct null_type {};
class announce_observer : public observer
{
public:
announce_observer(boost::pool<>& allocator
, sha1_hash const& info_hash
, int listen_port
, entry const& write_token)
: observer(allocator)
, m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_token(write_token)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
m.write_token = m_token;
}
void timeout() {}
void reply(msg const&) {}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
entry m_token;
};
class get_peers_observer : public observer
{
public:
get_peers_observer(sha1_hash const& info_hash
, int listen_port
, rpc_manager& rpc
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
: observer(rpc.allocator())
, m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_rpc(rpc)
, m_fun(f)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
}
void timeout() {}
void reply(msg const& r)
{
m_rpc.invoke(messages::announce_peer, r.addr
, observer_ptr(new (m_rpc.allocator().malloc()) announce_observer(
m_rpc.allocator(), m_info_hash, m_listen_port, r.write_token)));
m_fun(r.peers, m_info_hash);
}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
rpc_manager& m_rpc;
boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
};
class node_impl : boost::noncopyable class node_impl : boost::noncopyable
{ {
typedef std::map<node_id, torrent_entry> table_t; typedef std::map<node_id, torrent_entry> table_t;
@ -116,13 +186,17 @@ public:
node_id const& nid() const { return m_id; } node_id const& nid() const { return m_id; }
boost::tuple<int, int> size() const{ return m_table.size(); } boost::tuple<int, int> size() const{ return m_table.size(); }
size_type num_global_nodes() const
{ return m_table.num_global_nodes(); }
data_iterator begin_data() { return m_map.begin(); } data_iterator begin_data() { return m_map.begin(); }
data_iterator end_data() { return m_map.end(); } data_iterator end_data() { return m_map.end(); }
int data_size() const { return int(m_map.size()); } int data_size() const { return int(m_map.size()); }
#ifdef TORRENT_DHT_VERBOSE_LOGGING
void print_state(std::ostream& os) const void print_state(std::ostream& os) const
{ m_table.print_state(os); } { m_table.print_state(os); }
#endif
void announce(sha1_hash const& info_hash, int listen_port void announce(sha1_hash const& info_hash, int listen_port
, boost::function<void(std::vector<tcp::endpoint> const& , boost::function<void(std::vector<tcp::endpoint> const&
@ -133,8 +207,8 @@ public:
// the returned time is the delay until connection_timeout() // the returned time is the delay until connection_timeout()
// should be called again the next time // should be called again the next time
boost::posix_time::time_duration connection_timeout(); time_duration connection_timeout();
boost::posix_time::time_duration refresh_timeout(); time_duration refresh_timeout();
// generates a new secret number used to generate write tokens // generates a new secret number used to generate write tokens
void new_write_key(); void new_write_key();
@ -172,7 +246,7 @@ private:
rpc_manager m_rpc; rpc_manager m_rpc;
table_t m_map; table_t m_map;
boost::posix_time::ptime m_last_tracker_tick; ptime m_last_tracker_tick;
// secret random numbers used to create write tokens // secret random numbers used to create write tokens
int m_secret[2]; int m_secret[2];

View file

@ -37,6 +37,8 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/traversal_algorithm.hpp> #include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/node_id.hpp> #include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
@ -98,6 +100,59 @@ private:
std::vector<result>::iterator m_leftover_nodes_iterator; std::vector<result>::iterator m_leftover_nodes_iterator;
}; };
class refresh_observer : public observer
{
public:
refresh_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self
, node_id target)
: observer(algorithm->allocator())
, m_target(target)
, m_self(self)
, m_algorithm(algorithm)
{}
~refresh_observer();
void send(msg& m)
{
m.info_hash = m_target;
}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_target;
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
class ping_observer : public observer
{
public:
ping_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self)
: observer(algorithm->allocator())
, m_self(self)
, m_algorithm(algorithm)
{}
~ping_observer();
void send(msg& p) {}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
template<class InIt> template<class InIt>
inline refresh::refresh( inline refresh::refresh(
node_id target node_id target

View file

@ -36,7 +36,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <vector> #include <vector>
#include <deque> #include <deque>
#include <boost/cstdint.hpp> #include <boost/cstdint.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/iterator/iterator_facade.hpp> #include <boost/iterator/iterator_facade.hpp>
#include <boost/iterator/iterator_categories.hpp> #include <boost/iterator/iterator_categories.hpp>
@ -50,8 +49,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp> #include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/node_entry.hpp> #include <libtorrent/kademlia/node_entry.hpp>
#include <libtorrent/session_settings.hpp> #include <libtorrent/session_settings.hpp>
#include <libtorrent/size_type.hpp>
namespace pt = boost::posix_time;
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
@ -60,7 +58,7 @@ using asio::ip::udp;
//TORRENT_DECLARE_LOG(table); //TORRENT_DECLARE_LOG(table);
typedef std::deque<node_entry> bucket_t; typedef std::vector<node_entry> bucket_t;
// differences in the implementation from the description in // differences in the implementation from the description in
// the paper: // the paper:
@ -104,7 +102,7 @@ namespace aux
, bucket_iterator_t end) , bucket_iterator_t end)
: m_bucket_iterator(begin) : m_bucket_iterator(begin)
, m_bucket_end(end) , m_bucket_end(end)
, m_iterator(begin != end ? begin->first.begin() : bucket_t::iterator()) , m_iterator(begin != end ? begin->first.begin() : bucket_t::const_iterator())
{ {
if (m_bucket_iterator == m_bucket_end) return; if (m_bucket_iterator == m_bucket_end) return;
while (m_iterator == m_bucket_iterator->first.end()) while (m_iterator == m_bucket_iterator->first.end())
@ -177,7 +175,7 @@ public:
// if the given bucket is empty but there are nodes // if the given bucket is empty but there are nodes
// in a bucket closer to us, or if the bucket is non-empty and // in a bucket closer to us, or if the bucket is non-empty and
// the time from the last activity is more than 15 minutes // the time from the last activity is more than 15 minutes
boost::posix_time::ptime next_refresh(int bucket); ptime next_refresh(int bucket);
// fills the vector with the count nodes from our buckets that // fills the vector with the count nodes from our buckets that
// are nearest to the given id. // are nearest to the given id.
@ -204,16 +202,20 @@ public:
iterator end() const; iterator end() const;
boost::tuple<int, int> size() const; boost::tuple<int, int> size() const;
size_type num_global_nodes() const;
// returns true if there are no working nodes // returns true if there are no working nodes
// in the routing table // in the routing table
bool need_bootstrap() const; bool need_bootstrap() const;
int num_active_buckets() const
{ return 160 - m_lowest_active_bucket + 1; }
void replacement_cache(bucket_t& nodes) const; void replacement_cache(bucket_t& nodes) const;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
// used for debug and monitoring purposes. This will print out // used for debug and monitoring purposes. This will print out
// the state of the routing table to the given stream // the state of the routing table to the given stream
void print_state(std::ostream& os) const; void print_state(std::ostream& os) const;
#endif
private: private:
@ -226,7 +228,7 @@ private:
typedef boost::array<std::pair<bucket_t, bucket_t>, 160> table_t; typedef boost::array<std::pair<bucket_t, bucket_t>, 160> table_t;
table_t m_buckets; table_t m_buckets;
// timestamps of the last activity in each bucket // timestamps of the last activity in each bucket
typedef boost::array<boost::posix_time::ptime, 160> table_activity_t; typedef boost::array<ptime, 160> table_activity_t;
table_activity_t m_bucket_activity; table_activity_t m_bucket_activity;
node_id m_id; // our own node id node_id m_id; // our own node id

View file

@ -36,11 +36,11 @@ POSSIBILITY OF SUCH DAMAGE.
#include <vector> #include <vector>
#include <map> #include <map>
#include <boost/function.hpp> #include <boost/function.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <boost/cstdint.hpp> #include <boost/cstdint.hpp>
#include <boost/array.hpp> #include <boost/array.hpp>
#include <boost/pool/pool.hpp>
#include <libtorrent/socket.hpp> #include <libtorrent/socket.hpp>
#include <libtorrent/entry.hpp> #include <libtorrent/entry.hpp>
@ -48,95 +48,27 @@ POSSIBILITY OF SUCH DAMAGE.
#include <libtorrent/kademlia/node_id.hpp> #include <libtorrent/kademlia/node_id.hpp>
#include <libtorrent/kademlia/logging.hpp> #include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/node_entry.hpp> #include <libtorrent/kademlia/node_entry.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include "libtorrent/time.hpp"
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
struct observer;
using asio::ip::udp; using asio::ip::udp;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DECLARE_LOG(rpc); TORRENT_DECLARE_LOG(rpc);
#endif #endif
typedef std::vector<char> packet_t; struct null_observer : public observer
namespace messages
{ {
enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 }; null_observer(boost::pool<>& allocator): observer(allocator) {}
char const* const ids[] = { "ping", "find_node", "get_peers", "announce_peer", "error" }; virtual void reply(msg const&) {}
} // namespace messages virtual void timeout() {}
virtual void send(msg&) {}
struct msg void abort() {}
{
msg() : reply(false), piggy_backed_ping(false)
, port(0) {}
// true if this message is a reply
bool reply;
// true if this is a reply with a piggy backed ping
bool piggy_backed_ping;
// the kind if message
int message_id;
// if this is a reply, a copy of the transaction id
// from the request. If it's a request, a transaction
// id that should be sent back in the reply
std::string transaction_id;
// if this packet has a piggy backed ping, this
// is the transaction id of that ping
std::string ping_transaction_id;
// the node id of the process sending the message
node_id id;
// the address of the process sending or receiving
// the message.
udp::endpoint addr;
// if this is a nodes response, these are the nodes
typedef std::vector<node_entry> nodes_t;
nodes_t nodes;
typedef std::vector<tcp::endpoint> peers_t;
peers_t peers;
// similar to transaction_id but for write operations.
entry write_token;
// the info has for peer_requests, announce_peer
// and responses
node_id info_hash;
// port for announce_peer messages
int port;
// ERROR MESSAGES
int error_code;
std::string error_msg;
};
struct observer : boost::noncopyable
{
observer()
: sent(boost::posix_time::microsec_clock::universal_time())
{}
virtual ~observer() {}
// this two callbacks lets the observer add
// information to the message before it's sent
virtual void send(msg& m) = 0;
// this is called when a reply is received
virtual void reply(msg const& m) = 0;
// this is called when no reply has been received within
// some timeout
virtual void timeout() = 0;
// if this is called the destructor should
// not invoke any new messages, and should
// only clean up. It means the rpc-manager
// is being destructed
virtual void abort() = 0;
udp::endpoint target_addr;
boost::posix_time::ptime sent;
}; };
class routing_table; class routing_table;
@ -153,31 +85,36 @@ public:
// returns true if the node needs a refresh // returns true if the node needs a refresh
bool incoming(msg const&); bool incoming(msg const&);
boost::posix_time::time_duration tick(); time_duration tick();
void invoke(int message_id, udp::endpoint target void invoke(int message_id, udp::endpoint target
, boost::shared_ptr<observer> o); , observer_ptr o);
void reply(msg& m, msg const& reply_to); void reply(msg& m);
void reply_with_ping(msg& m, msg const& reply_to); void reply_with_ping(msg& m);
#ifndef NDEBUG #ifndef NDEBUG
void check_invariant() const; void check_invariant() const;
#endif #endif
boost::pool<>& allocator() const
{ return m_pool_allocator; }
private: private:
enum { max_transactions = 2048 }; enum { max_transactions = 2048 };
unsigned int new_transaction_id(boost::shared_ptr<observer> o); unsigned int new_transaction_id(observer_ptr o);
void update_oldest_transaction_id(); void update_oldest_transaction_id();
boost::uint32_t calc_connection_id(udp::endpoint addr); boost::uint32_t calc_connection_id(udp::endpoint addr);
typedef boost::array<boost::shared_ptr<observer>, max_transactions> mutable boost::pool<> m_pool_allocator;
typedef boost::array<observer_ptr, max_transactions>
transactions_t; transactions_t;
transactions_t m_transactions; transactions_t m_transactions;
std::vector<boost::shared_ptr<observer> > m_aborted_transactions; std::vector<observer_ptr > m_aborted_transactions;
// this is the next transaction id to be used // this is the next transaction id to be used
int m_next_transaction_id; int m_next_transaction_id;
@ -191,7 +128,7 @@ private:
send_fun m_send; send_fun m_send;
node_id m_our_id; node_id m_our_id;
routing_table& m_table; routing_table& m_table;
boost::posix_time::ptime m_timer; ptime m_timer;
node_id m_random_number; node_id m_random_number;
bool m_destructing; bool m_destructing;
}; };

View file

@ -43,6 +43,7 @@ POSSIBILITY OF SUCH DAMAGE.
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <boost/intrusive_ptr.hpp> #include <boost/intrusive_ptr.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/pool/pool.hpp>
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
@ -60,6 +61,7 @@ public:
void finished(node_id const& id); void finished(node_id const& id);
void failed(node_id const& id, bool prevent_request = false); void failed(node_id const& id, bool prevent_request = false);
virtual ~traversal_algorithm() {} virtual ~traversal_algorithm() {}
boost::pool<>& allocator() const;
protected: protected:
template<class InIt> template<class InIt>

View file

@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <libtorrent/kademlia/closest_nodes.hpp> #include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
@ -39,36 +41,6 @@ namespace libtorrent { namespace dht
using asio::ip::udp; using asio::ip::udp;
typedef boost::shared_ptr<observer> observer_ptr;
class closest_nodes_observer : public observer
{
public:
closest_nodes_observer(
boost::intrusive_ptr<traversal_algorithm> const& algorithm
, node_id self
, node_id target)
: m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~closest_nodes_observer();
void send(msg& p)
{
p.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<traversal_algorithm> m_algorithm;
node_id const m_target;
node_id const m_self;
};
closest_nodes_observer::~closest_nodes_observer() closest_nodes_observer::~closest_nodes_observer()
{ {
if (m_algorithm) m_algorithm->failed(m_self, true); if (m_algorithm) m_algorithm->failed(m_self, true);
@ -127,8 +99,8 @@ closest_nodes::closest_nodes(
void closest_nodes::invoke(node_id const& id, udp::endpoint addr) void closest_nodes::invoke(node_id const& id, udp::endpoint addr)
{ {
observer_ptr p(new closest_nodes_observer(this, id, m_target)); observer_ptr o(new (m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target));
m_rpc.invoke(messages::find_node, addr, p); m_rpc.invoke(messages::find_node, addr, o);
} }
void closest_nodes::done() void closest_nodes::done()

View file

@ -30,13 +30,13 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <fstream> #include <fstream>
#include <set> #include <set>
#include <numeric> #include <numeric>
#include <stdexcept> #include <stdexcept>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/date_time/posix_time/ptime.hpp>
#include <boost/ref.hpp> #include <boost/ref.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <boost/lexical_cast.hpp> #include <boost/lexical_cast.hpp>
@ -52,14 +52,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/io.hpp" #include "libtorrent/io.hpp"
#include "libtorrent/version.hpp" #include "libtorrent/version.hpp"
using boost::posix_time::ptime;
using boost::posix_time::time_duration;
using boost::posix_time::second_clock;
using boost::posix_time::microsec_clock;
using boost::posix_time::seconds;
using boost::posix_time::minutes;
using boost::posix_time::hours;
using boost::posix_time::milliseconds;
using boost::ref; using boost::ref;
using boost::lexical_cast; using boost::lexical_cast;
using libtorrent::dht::node_impl; using libtorrent::dht::node_impl;
@ -70,6 +62,11 @@ using libtorrent::dht::packet_iterator;
namespace messages = libtorrent::dht::messages; namespace messages = libtorrent::dht::messages;
using namespace libtorrent::detail; using namespace libtorrent::detail;
enum
{
key_refresh = 5 // generate a new write token key every 5 minutes
};
using asio::ip::udp; using asio::ip::udp;
typedef asio::ip::address_v4 address; typedef asio::ip::address_v4 address;
@ -155,7 +152,7 @@ namespace libtorrent { namespace dht
, m_dht(bind(&dht_tracker::send_packet, this, _1), settings , m_dht(bind(&dht_tracker::send_packet, this, _1), settings
, read_id(bootstrap)) , read_id(bootstrap))
, m_buffer(0) , m_buffer(0)
, m_last_refresh(second_clock::universal_time() - hours(1)) , m_last_new_key(time_now() - minutes(key_refresh))
, m_timer(ios) , m_timer(ios)
, m_connection_timer(ios) , m_connection_timer(ios)
, m_refresh_timer(ios) , m_refresh_timer(ios)
@ -215,7 +212,7 @@ namespace libtorrent { namespace dht
m_connection_timer.async_wait(m_strand.wrap( m_connection_timer.async_wait(m_strand.wrap(
bind(&dht_tracker::connection_timeout, self(), _1))); bind(&dht_tracker::connection_timeout, self(), _1)));
m_refresh_timer.expires_from_now(minutes(15)); m_refresh_timer.expires_from_now(seconds(5));
m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, self(), _1))); m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, self(), _1)));
m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self())); m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self()));
@ -233,6 +230,7 @@ namespace libtorrent { namespace dht
{ {
boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size(); boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size();
s.dht_torrents = m_dht.data_size(); s.dht_torrents = m_dht.data_size();
s.dht_global_nodes = m_dht.num_global_nodes();
} }
void dht_tracker::connection_timeout(asio::error_code const& e) void dht_tracker::connection_timeout(asio::error_code const& e)
@ -281,7 +279,15 @@ namespace libtorrent { namespace dht
m_timer.expires_from_now(minutes(tick_period)); m_timer.expires_from_now(minutes(tick_period));
m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1))); m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1)));
ptime now = time_now();
if (now - m_last_new_key > minutes(key_refresh))
{
m_last_new_key = now;
m_dht.new_write_key(); m_dht.new_write_key();
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string() << " new write key";
#endif
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
static bool first = true; static bool first = true;
@ -304,9 +310,7 @@ namespace libtorrent { namespace dht
if (first) if (first)
{ {
first = false; first = false;
using boost::posix_time::to_simple_string; pc << "\n\n ***** starting log at " << time_now_string() << " *****\n\n"
pc << "\n\n ***** starting log at " << to_simple_string(
second_clock::universal_time()) << " *****\n\n"
<< "minute:active nodes:passive nodes" << "minute:active nodes:passive nodes"
":ping replies sent:ping queries recvd:ping" ":ping replies sent:ping queries recvd:ping"
":ping replies sent:ping queries recvd:ping" ":ping replies sent:ping queries recvd:ping"
@ -409,9 +413,8 @@ namespace libtorrent { namespace dht
, m_in_buf[current_buffer].end()); , m_in_buf[current_buffer].end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << microsec_clock::universal_time() TORRENT_LOG(dht_tracker) << time_now_string() << " RECEIVED ["
<< " RECEIVED [" << m_remote_endpoint[current_buffer] << m_remote_endpoint[current_buffer] << "]:";
<< "]:";
#endif #endif
libtorrent::dht::msg m; libtorrent::dht::msg m;
@ -453,7 +456,7 @@ namespace libtorrent { namespace dht
} }
else else
{ {
TORRENT_LOG(dht_tracker) << " client: generic"; TORRENT_LOG(dht_tracker) << " client: " << client;
} }
} }
catch (std::exception&) catch (std::exception&)
@ -625,9 +628,10 @@ namespace libtorrent { namespace dht
m.error_msg = list.back().string(); m.error_msg = list.back().string();
m.error_code = list.front().integer(); m.error_code = list.front().integer();
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " " TORRENT_LOG(dht_tracker) << " incoming error: " << m.error_code << " "
<< m.error_msg; << m.error_msg;
#endif #endif
throw std::runtime_error("DHT error message");
} }
else else
{ {
@ -646,14 +650,17 @@ namespace libtorrent { namespace dht
} }
TORRENT_LOG(dht_tracker) << e; TORRENT_LOG(dht_tracker) << e;
#endif #endif
assert(m.message_id != messages::error);
m_dht.incoming(m); m_dht.incoming(m);
} }
catch (std::exception& e) catch (std::exception& e)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
int current_buffer = (m_buffer + 1) & 1;
std::string msg(m_in_buf[current_buffer].begin()
, m_in_buf[current_buffer].begin() + bytes_transferred);
TORRENT_LOG(dht_tracker) << "invalid incoming packet: " TORRENT_LOG(dht_tracker) << "invalid incoming packet: "
<< e.what(); << e.what() << "\n" << msg << "\n";
#endif #endif
} }
} }
@ -737,58 +744,9 @@ namespace libtorrent { namespace dht
void dht_tracker::on_bootstrap() void dht_tracker::on_bootstrap()
{} {}
void dht_tracker::send_packet(msg const& m) namespace
{ {
using libtorrent::bencode; void write_nodes_entry(entry& r, libtorrent::dht::msg const& m)
using libtorrent::entry;
entry e(entry::dictionary_t);
e["t"] = m.transaction_id;
std::string version_str("LT ");
std::string::iterator i = version_str.begin() + 2;
detail::write_uint8(LIBTORRENT_VERSION_MAJOR, i);
detail::write_uint8(LIBTORRENT_VERSION_MINOR, i);
e["v"] = version_str;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << microsec_clock::universal_time()
<< " SENDING [" << m.addr << "]:";
TORRENT_LOG(dht_tracker) << " transaction: " << m.transaction_id;
// e.print(std::cerr);
#endif
if (m.message_id == messages::error)
{
assert(m.reply);
e["y"] = "e";
entry error_list(entry::list_t);
error_list.list().push_back(entry(m.error_code));
error_list.list().push_back(entry(m.error_msg));
e["e"] = error_list;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " "
<< m.error_msg;
#endif
}
else if (m.reply)
{
e["y"] = "r";
e["r"] = entry(entry::dictionary_t);
entry& r = e["r"];
r["id"] = std::string(m.id.begin(), m.id.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " reply: "
<< messages::ids[m.message_id];
#endif
if (m.write_token.type() != entry::undefined_t)
r["token"] = m.write_token;
switch (m.message_id)
{
case messages::ping:
break;
case messages::find_node:
{ {
bool ipv6_nodes = false; bool ipv6_nodes = false;
r["nodes"] = entry(entry::string_t); r["nodes"] = entry(entry::string_t);
@ -811,50 +769,99 @@ namespace libtorrent { namespace dht
r["nodes2"] = entry(entry::list_t); r["nodes2"] = entry(entry::list_t);
entry& p = r["nodes2"]; entry& p = r["nodes2"];
std::string endpoint; std::string endpoint;
endpoint.resize(6);
for (msg::nodes_t::const_iterator i = m.nodes.begin() for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i) , end(m.nodes.end()); i != end; ++i)
{ {
if (!i->addr.address().is_v6()) continue;
endpoint.resize(18 + 20);
std::string::iterator out = endpoint.begin(); std::string::iterator out = endpoint.begin();
std::copy(i->id.begin(), i->id.end(), out); std::copy(i->id.begin(), i->id.end(), out);
out += 20;
write_endpoint(i->addr, out); write_endpoint(i->addr, out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint)); p.list().push_back(entry(endpoint));
} }
} }
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size(); TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size();
#endif #endif
}
}
void dht_tracker::send_packet(msg const& m)
try
{
using libtorrent::bencode;
using libtorrent::entry;
entry e(entry::dictionary_t);
assert(!m.transaction_id.empty() || m.message_id == messages::error);
e["t"] = m.transaction_id;
static char const version_str[] = {'L', 'T'
, LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR};
e["v"] = std::string(version_str, version_str + 4);
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string()
<< " SENDING [" << m.addr << "]:";
TORRENT_LOG(dht_tracker) << " transaction: " << m.transaction_id;
#endif
if (m.message_id == messages::error)
{
assert(m.reply);
e["y"] = "e";
entry error_list(entry::list_t);
assert(m.error_code > 200 && m.error_code <= 204);
error_list.list().push_back(entry(m.error_code));
error_list.list().push_back(entry(m.error_msg));
e["e"] = error_list;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string()
<< " outgoing error: " << m.error_code << " " << m.error_msg;
#endif
}
else if (m.reply)
{
e["y"] = "r";
e["r"] = entry(entry::dictionary_t);
entry& r = e["r"];
r["id"] = std::string(m.id.begin(), m.id.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << time_now_string()
<< " reply: " << messages::ids[m.message_id];
#endif
if (m.write_token.type() != entry::undefined_t)
r["token"] = m.write_token;
switch (m.message_id)
{
case messages::ping:
break;
case messages::find_node:
{
write_nodes_entry(r, m);
break; break;
} }
case messages::get_peers: case messages::get_peers:
{ {
if (m.peers.empty()) if (m.peers.empty())
{ {
r["nodes"] = entry(entry::string_t); write_nodes_entry(r, m);
entry& n = r["nodes"];
std::back_insert_iterator<std::string> out(n.string());
for (msg::nodes_t::const_iterator i = m.nodes.begin()
, end(m.nodes.end()); i != end; ++i)
{
if (!i->addr.address().is_v4()) continue;
std::copy(i->id.begin(), i->id.end(), out);
write_endpoint(i->addr, out);
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size();
#endif
} }
else else
{ {
r["values"] = entry(entry::list_t); r["values"] = entry(entry::list_t);
entry& p = r["values"]; entry& p = r["values"];
std::string endpoint; std::string endpoint;
endpoint.resize(6);
for (msg::peers_t::const_iterator i = m.peers.begin() for (msg::peers_t::const_iterator i = m.peers.begin()
, end(m.peers.end()); i != end; ++i) , end(m.peers.end()); i != end; ++i)
{ {
endpoint.resize(18);
std::string::iterator out = endpoint.begin(); std::string::iterator out = endpoint.begin();
write_endpoint(*i, out); write_endpoint(*i, out);
endpoint.resize(out - endpoint.begin());
p.list().push_back(entry(endpoint)); p.list().push_back(entry(endpoint));
} }
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -923,8 +930,10 @@ namespace libtorrent { namespace dht
m_send_buf.clear(); m_send_buf.clear();
bencode(std::back_inserter(m_send_buf), e); bencode(std::back_inserter(m_send_buf), e);
asio::error_code ec;
m_socket.send_to(asio::buffer(&m_send_buf[0] m_socket.send_to(asio::buffer(&m_send_buf[0]
, (int)m_send_buf.size()), m.addr); , (int)m_send_buf.size()), m.addr, 0, ec);
if (ec) return;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
m_total_out_bytes += m_send_buf.size(); m_total_out_bytes += m_send_buf.size();
@ -953,6 +962,13 @@ namespace libtorrent { namespace dht
send_packet(pm); send_packet(pm);
} }
catch (std::exception&)
{
// m_send may fail with "no route to host"
// but it shouldn't throw since an error code
// is passed in instead
assert(false);
}
}} }}

View file

@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <libtorrent/kademlia/find_data.hpp> #include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
@ -38,38 +40,6 @@ POSSIBILITY OF SUCH DAMAGE.
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
typedef boost::shared_ptr<observer> observer_ptr;
class find_data_observer : public observer
{
public:
find_data_observer(
boost::intrusive_ptr<find_data> const& algorithm
, node_id self
, node_id target)
: m_algorithm(algorithm)
, m_target(target)
, m_self(self)
{}
~find_data_observer();
void send(msg& m)
{
m.reply = false;
m.message_id = messages::get_peers;
m.info_hash = m_target;
}
void timeout();
void reply(msg const&);
void abort() { m_algorithm = 0; }
private:
boost::intrusive_ptr<find_data> m_algorithm;
node_id const m_target;
node_id const m_self;
};
find_data_observer::~find_data_observer() find_data_observer::~find_data_observer()
{ {
if (m_algorithm) m_algorithm->failed(m_self); if (m_algorithm) m_algorithm->failed(m_self);
@ -139,8 +109,8 @@ void find_data::invoke(node_id const& id, asio::ip::udp::endpoint addr)
return; return;
} }
observer_ptr p(new find_data_observer(this, id, m_target)); observer_ptr o(new (m_rpc.allocator().malloc()) find_data_observer(this, id, m_target));
m_rpc.invoke(messages::get_peers, addr, p); m_rpc.invoke(messages::get_peers, addr, o);
} }
void find_data::got_data(msg const* m) void find_data::got_data(msg const* m)

View file

@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <utility> #include <utility>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/optional.hpp> #include <boost/optional.hpp>
@ -50,11 +52,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include "libtorrent/kademlia/find_data.hpp" #include "libtorrent/kademlia/find_data.hpp"
using boost::bind; using boost::bind;
using boost::posix_time::second_clock;
using boost::posix_time::seconds;
using boost::posix_time::minutes;
using boost::posix_time::ptime;
using boost::posix_time::time_duration;
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
@ -66,8 +63,6 @@ namespace
} }
#endif #endif
typedef boost::shared_ptr<observer> observer_ptr;
// TODO: configurable? // TODO: configurable?
enum { announce_interval = 30 }; enum { announce_interval = 30 };
@ -99,7 +94,7 @@ void purge_peers(std::set<peer_entry>& peers)
, end(peers.end()); i != end;) , end(peers.end()); i != end;)
{ {
// the peer has timed out // the peer has timed out
if (i->added + minutes(int(announce_interval * 1.5f)) < second_clock::universal_time()) if (i->added + minutes(int(announce_interval * 1.5f)) < time_now())
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "peer timed out at: " << i->addr.address(); TORRENT_LOG(node) << "peer timed out at: " << i->addr.address();
@ -120,7 +115,7 @@ node_impl::node_impl(boost::function<void(msg const&)> const& f
, m_table(m_id, 8, settings) , m_table(m_id, 8, settings)
, m_rpc(bind(&node_impl::incoming_request, this, _1) , m_rpc(bind(&node_impl::incoming_request, this, _1)
, m_id, m_table, f) , m_id, m_table, f)
, m_last_tracker_tick(boost::posix_time::second_clock::universal_time()) , m_last_tracker_tick(time_now())
{ {
m_secret[0] = std::rand(); m_secret[0] = std::rand();
m_secret[1] = std::rand(); m_secret[1] = std::rand();
@ -129,9 +124,20 @@ node_impl::node_impl(boost::function<void(msg const&)> const& f
bool node_impl::verify_token(msg const& m) bool node_impl::verify_token(msg const& m)
{ {
if (m.write_token.type() != entry::string_t) if (m.write_token.type() != entry::string_t)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "token of incorrect type " << m.write_token.type();
#endif
return false; return false;
}
std::string const& token = m.write_token.string(); std::string const& token = m.write_token.string();
if (token.length() != 4) return false; if (token.length() != 4)
{
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "token of incorrect length: " << token.length();
#endif
return false;
}
hasher h1; hasher h1;
std::string address = m.addr.address().to_string(); std::string address = m.addr.address().to_string();
@ -146,6 +152,7 @@ bool node_impl::verify_token(msg const& m)
hasher h2; hasher h2;
h2.update(&address[0], address.length()); h2.update(&address[0], address.length());
h2.update((char*)&m_secret[1], sizeof(m_secret[1])); h2.update((char*)&m_secret[1], sizeof(m_secret[1]));
h2.update((char*)&m.info_hash[0], sha1_hash::size);
h = h2.final(); h = h2.final();
if (std::equal(token.begin(), token.end(), (signed char*)&h[0])) if (std::equal(token.begin(), token.end(), (signed char*)&h[0]))
return true; return true;
@ -258,70 +265,6 @@ void node_impl::incoming(msg const& m)
namespace namespace
{ {
class announce_observer : public observer
{
public:
announce_observer(sha1_hash const& info_hash, int listen_port
, entry const& write_token)
: m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_token(write_token)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
m.write_token = m_token;
}
void timeout() {}
void reply(msg const&) {}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
entry m_token;
};
class get_peers_observer : public observer
{
public:
get_peers_observer(sha1_hash const& info_hash, int listen_port
, rpc_manager& rpc
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
: m_info_hash(info_hash)
, m_listen_port(listen_port)
, m_rpc(rpc)
, m_fun(f)
{}
void send(msg& m)
{
m.port = m_listen_port;
m.info_hash = m_info_hash;
}
void timeout() {}
void reply(msg const& r)
{
m_rpc.invoke(messages::announce_peer, r.addr
, boost::shared_ptr<observer>(
new announce_observer(m_info_hash, m_listen_port, r.write_token)));
m_fun(r.peers, m_info_hash);
}
void abort() {}
private:
sha1_hash m_info_hash;
int m_listen_port;
rpc_manager& m_rpc;
boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> m_fun;
};
void announce_fun(std::vector<node_entry> const& v, rpc_manager& rpc void announce_fun(std::vector<node_entry> const& v, rpc_manager& rpc
, int listen_port, sha1_hash const& ih , int listen_port, sha1_hash const& ih
, boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f) , boost::function<void(std::vector<tcp::endpoint> const&, sha1_hash const&)> f)
@ -331,23 +274,11 @@ namespace
for (std::vector<node_entry>::const_iterator i = v.begin() for (std::vector<node_entry>::const_iterator i = v.begin()
, end(v.end()); i != end; ++i) , end(v.end()); i != end; ++i)
{ {
rpc.invoke(messages::get_peers, i->addr, boost::shared_ptr<observer>( rpc.invoke(messages::get_peers, i->addr, observer_ptr(
new get_peers_observer(ih, listen_port, rpc, f))); new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f)));
nodes = true; nodes = true;
} }
} }
}
namespace
{
struct dummy_observer : observer
{
virtual void reply(msg const&) {}
virtual void timeout() {}
virtual void send(msg&) {}
virtual void abort() {}
};
} }
void node_impl::add_router_node(udp::endpoint router) void node_impl::add_router_node(udp::endpoint router)
@ -359,8 +290,8 @@ void node_impl::add_node(udp::endpoint node)
{ {
// ping the node, and if we get a reply, it // ping the node, and if we get a reply, it
// will be added to the routing table // will be added to the routing table
observer_ptr p(new dummy_observer()); observer_ptr o(new (m_rpc.allocator().malloc()) null_observer(m_rpc.allocator()));
m_rpc.invoke(messages::ping, node, p); m_rpc.invoke(messages::ping, node, o);
} }
void node_impl::announce(sha1_hash const& info_hash, int listen_port void node_impl::announce(sha1_hash const& info_hash, int listen_port
@ -377,24 +308,22 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port
time_duration node_impl::refresh_timeout() time_duration node_impl::refresh_timeout()
{ {
int refresh = -1; int refresh = -1;
ptime now = second_clock::universal_time(); ptime now = time_now();
ptime next = now + minutes(15); ptime next = now + minutes(15);
try try
{ {
for (int i = 0; i < 160; ++i) for (int i = 0; i < 160; ++i)
{ {
ptime r = m_table.next_refresh(i); ptime r = m_table.next_refresh(i);
if (r <= now) if (r <= next)
{
if (refresh == -1) refresh = i;
}
else if (r < next)
{ {
refresh = i;
next = r; next = r;
} }
} }
if (refresh != -1) if (next < now)
{ {
assert(refresh > -1);
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "refreshing bucket: " << refresh; TORRENT_LOG(node) << "refreshing bucket: " << refresh;
#endif #endif
@ -403,8 +332,20 @@ time_duration node_impl::refresh_timeout()
} }
catch (std::exception&) {} catch (std::exception&) {}
if (next < now + seconds(5)) return seconds(5); time_duration next_refresh = next - now;
return next - now; time_duration min_next_refresh
= minutes(15) / (m_table.num_active_buckets());
if (min_next_refresh > seconds(40))
min_next_refresh = seconds(40);
if (next_refresh < min_next_refresh)
next_refresh = min_next_refresh;
#ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(node) << "next refresh: " << total_seconds(next_refresh) << " seconds";
#endif
return next_refresh;
} }
time_duration node_impl::connection_timeout() time_duration node_impl::connection_timeout()
@ -412,7 +353,7 @@ time_duration node_impl::connection_timeout()
time_duration d = m_rpc.tick(); time_duration d = m_rpc.tick();
try try
{ {
ptime now(second_clock::universal_time()); ptime now(time_now());
if (now - m_last_tracker_tick < minutes(10)) return d; if (now - m_last_tracker_tick < minutes(10)) return d;
m_last_tracker_tick = now; m_last_tracker_tick = now;
@ -443,7 +384,7 @@ void node_impl::on_announce(msg const& m, msg& reply)
{ {
reply.message_id = messages::error; reply.message_id = messages::error;
reply.error_code = 203; reply.error_code = 203;
reply.error_msg = "Incorrect write token in announce_peer message"; reply.error_msg = "Incorrect token in announce_peer";
return; return;
} }
@ -455,7 +396,7 @@ void node_impl::on_announce(msg const& m, msg& reply)
torrent_entry& v = m_map[m.info_hash]; torrent_entry& v = m_map[m.info_hash];
peer_entry e; peer_entry e;
e.addr = tcp::endpoint(m.addr.address(), m.addr.port()); e.addr = tcp::endpoint(m.addr.address(), m.addr.port());
e.added = second_clock::universal_time(); e.added = time_now();
std::set<peer_entry>::iterator i = v.peers.find(e); std::set<peer_entry>::iterator i = v.peers.find(e);
if (i != v.peers.end()) v.peers.erase(i++); if (i != v.peers.end()) v.peers.erase(i++);
v.peers.insert(i, e); v.peers.insert(i, e);
@ -496,6 +437,11 @@ bool node_impl::on_find(msg const& m, std::vector<tcp::endpoint>& peers) const
void node_impl::incoming_request(msg const& m) void node_impl::incoming_request(msg const& m)
{ {
msg reply; msg reply;
reply.message_id = m.message_id;
reply.addr = m.addr;
reply.reply = true;
reply.transaction_id = m.transaction_id;
switch (m.message_id) switch (m.message_id)
{ {
case messages::ping: case messages::ping:
@ -535,16 +481,16 @@ void node_impl::incoming_request(msg const& m)
} }
break; break;
case messages::announce_peer: case messages::announce_peer:
{
on_announce(m, reply); on_announce(m, reply);
}
break; break;
default:
assert(false);
}; };
if (m_table.need_node(m.id)) if (m_table.need_node(m.id))
m_rpc.reply_with_ping(reply, m); m_rpc.reply_with_ping(reply);
else else
m_rpc.reply(reply, m); m_rpc.reply(reply);
} }

View file

@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <algorithm> #include <algorithm>
#include <iomanip> #include <iomanip>
#include <cassert> #include <cassert>

View file

@ -30,10 +30,13 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <libtorrent/kademlia/refresh.hpp> #include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/logging.hpp> #include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/msg.hpp>
#include <libtorrent/io.hpp> #include <libtorrent/io.hpp>
@ -50,38 +53,6 @@ using asio::ip::udp;
TORRENT_DEFINE_LOG(refresh) TORRENT_DEFINE_LOG(refresh)
#endif #endif
typedef boost::shared_ptr<observer> observer_ptr;
class refresh_observer : public observer
{
public:
refresh_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self
, node_id target
)
: m_target(target)
, m_self(self)
, m_algorithm(algorithm)
{}
~refresh_observer();
void send(msg& m)
{
m.info_hash = m_target;
}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_target;
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
refresh_observer::~refresh_observer() refresh_observer::~refresh_observer()
{ {
if (m_algorithm) m_algorithm->failed(m_self, true); if (m_algorithm) m_algorithm->failed(m_self, true);
@ -110,29 +81,6 @@ void refresh_observer::timeout()
m_algorithm = 0; m_algorithm = 0;
} }
class ping_observer : public observer
{
public:
ping_observer(
boost::intrusive_ptr<refresh> const& algorithm
, node_id self
)
: m_self(self)
, m_algorithm(algorithm)
{}
~ping_observer();
void send(msg& p) {}
void timeout();
void reply(msg const& m);
void abort() { m_algorithm = 0; }
private:
node_id const m_self;
boost::intrusive_ptr<refresh> m_algorithm;
};
ping_observer::~ping_observer() ping_observer::~ping_observer()
{ {
if (m_algorithm) m_algorithm->ping_timeout(m_self, true); if (m_algorithm) m_algorithm->ping_timeout(m_self, true);
@ -155,13 +103,10 @@ void ping_observer::timeout()
void refresh::invoke(node_id const& nid, udp::endpoint addr) void refresh::invoke(node_id const& nid, udp::endpoint addr)
{ {
observer_ptr p(new refresh_observer( observer_ptr o(new (m_rpc.allocator().malloc()) refresh_observer(
this this, nid, m_target));
, nid
, m_target
));
m_rpc.invoke(messages::find_node, addr, p); m_rpc.invoke(messages::find_node, addr, o);
} }
void refresh::done() void refresh::done()
@ -209,8 +154,9 @@ void refresh::invoke_pings_or_finish(bool prevent_request)
try try
{ {
observer_ptr p(new ping_observer(this, node.id)); observer_ptr o(new (m_rpc.allocator().malloc()) ping_observer(
m_rpc.invoke(messages::ping, node.addr, p); this, node.id));
m_rpc.invoke(messages::ping, node.addr, o);
++m_active_pings; ++m_active_pings;
++m_leftover_nodes_iterator; ++m_leftover_nodes_iterator;
} }

View file

@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <vector> #include <vector>
#include <deque> #include <deque>
#include <algorithm> #include <algorithm>
@ -37,7 +39,6 @@ POSSIBILITY OF SUCH DAMAGE.
#include <numeric> #include <numeric>
#include <boost/cstdint.hpp> #include <boost/cstdint.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "libtorrent/kademlia/routing_table.hpp" #include "libtorrent/kademlia/routing_table.hpp"
#include "libtorrent/kademlia/node_id.hpp" #include "libtorrent/kademlia/node_id.hpp"
@ -46,13 +47,6 @@ POSSIBILITY OF SUCH DAMAGE.
using boost::bind; using boost::bind;
using boost::uint8_t; using boost::uint8_t;
using boost::posix_time::second_clock;
using boost::posix_time::minutes;
using boost::posix_time::seconds;
using boost::posix_time::hours;
namespace pt = boost::posix_time;
namespace libtorrent { namespace dht namespace libtorrent { namespace dht
{ {
@ -69,7 +63,8 @@ routing_table::routing_table(node_id const& id, int bucket_size
// distribute the refresh times for the buckets in an // distribute the refresh times for the buckets in an
// attempt do even out the network load // attempt do even out the network load
for (int i = 0; i < 160; ++i) for (int i = 0; i < 160; ++i)
m_bucket_activity[i] = second_clock::universal_time() - seconds(15*60 - i*5); m_bucket_activity[i] = time_now() - milliseconds(i*5625);
m_bucket_activity[0] = time_now() - minutes(15);
} }
boost::tuple<int, int> routing_table::size() const boost::tuple<int, int> routing_table::size() const
@ -85,14 +80,34 @@ boost::tuple<int, int> routing_table::size() const
return boost::make_tuple(nodes, replacements); return boost::make_tuple(nodes, replacements);
} }
size_type routing_table::num_global_nodes() const
{
int first_full = m_lowest_active_bucket;
int num_nodes = 1; // we are one of the nodes
for (; first_full < 160
&& int(m_buckets[first_full].first.size()) < m_bucket_size;
++first_full)
{
num_nodes += m_buckets[first_full].first.size();
}
return (2 << (160 - first_full)) * num_nodes;
}
#ifdef TORRENT_DHT_VERBOSE_LOGGING
void routing_table::print_state(std::ostream& os) const void routing_table::print_state(std::ostream& os) const
{ {
os << "kademlia routing table state\n" os << "kademlia routing table state\n"
<< "bucket_size: " << m_bucket_size << "\n" << "bucket_size: " << m_bucket_size << "\n"
<< "global node count: " << num_global_nodes() << "\n"
<< "node_id: " << m_id << "\n\n"; << "node_id: " << m_id << "\n\n";
os << "number of nodes per bucket:\n" os << "number of nodes per bucket:\n-- live ";
"live\n"; for (int i = 8; i < 160; ++i)
os << "-";
os << "\n";
for (int k = 0; k < 8; ++k) for (int k = 0; k < 8; ++k)
{ {
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
@ -117,17 +132,20 @@ void routing_table::print_state(std::ostream& os) const
} }
os << "\n"; os << "\n";
} }
os << "cached\n-----------\n"; os << "-- cached ";
for (int i = 10; i < 160; ++i)
os << "-";
os << "\n\n";
os << "nodes:\n"; os << "nodes:\n";
for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end());
i != end; ++i) i != end; ++i)
{ {
int bucket_index = int(i - m_buckets.begin()); int bucket_index = int(i - m_buckets.begin());
os << "bucket " << bucket_index << " " os << "=== BUCKET = " << bucket_index
<< to_simple_string(m_bucket_activity[bucket_index]) << " = " << (bucket_index >= m_lowest_active_bucket?"active":"inactive")
<< " " << (bucket_index >= m_lowest_active_bucket?"active":"inactive") << " = " << total_seconds(time_now() - m_bucket_activity[bucket_index])
<< "\n"; << " s ago ===== \n";
for (bucket_t::const_iterator j = i->first.begin() for (bucket_t::const_iterator j = i->first.begin()
, end(i->first.end()); j != end; ++j) , end(i->first.end()); j != end; ++j)
{ {
@ -137,19 +155,21 @@ void routing_table::print_state(std::ostream& os) const
} }
} }
#endif
void routing_table::touch_bucket(int bucket) void routing_table::touch_bucket(int bucket)
{ {
m_bucket_activity[bucket] = second_clock::universal_time(); m_bucket_activity[bucket] = time_now();
} }
boost::posix_time::ptime routing_table::next_refresh(int bucket) ptime routing_table::next_refresh(int bucket)
{ {
assert(bucket < 160); assert(bucket < 160);
assert(bucket >= 0); assert(bucket >= 0);
// lower than or equal to since a refresh of bucket 0 will // lower than or equal to since a refresh of bucket 0 will
// effectively refresh the lowest active bucket as well // effectively refresh the lowest active bucket as well
if (bucket <= m_lowest_active_bucket && bucket > 0) if (bucket < m_lowest_active_bucket && bucket > 0)
return second_clock::universal_time() + minutes(15); return time_now() + minutes(15);
return m_bucket_activity[bucket] + minutes(15); return m_bucket_activity[bucket] + minutes(15);
} }
@ -177,11 +197,11 @@ bool routing_table::need_node(node_id const& id)
if ((int)rb.size() >= m_bucket_size) return false; if ((int)rb.size() >= m_bucket_size) return false;
// if the node already exists, we don't need it // if the node already exists, we don't need it
if (std::find_if(b.begin(), b.end(), bind(std::equal_to<node_id>() if (std::find_if(b.begin(), b.end(), bind(&node_entry::id, _1) == id)
, bind(&node_entry::id, _1), id)) != b.end()) return false; != b.end()) return false;
if (std::find_if(rb.begin(), rb.end(), bind(std::equal_to<node_id>() if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == id)
, bind(&node_entry::id, _1), id)) != rb.end()) return false; != rb.end()) return false;
return true; return true;
} }
@ -195,8 +215,7 @@ void routing_table::node_failed(node_id const& id)
bucket_t& rb = m_buckets[bucket_index].second; bucket_t& rb = m_buckets[bucket_index].second;
bucket_t::iterator i = std::find_if(b.begin(), b.end() bucket_t::iterator i = std::find_if(b.begin(), b.end()
, bind(std::equal_to<node_id>() , bind(&node_entry::id, _1) == id);
, bind(&node_entry::id, _1), id));
if (i == b.end()) return; if (i == b.end()) return;
@ -245,12 +264,11 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
bucket_t& b = m_buckets[bucket_index].first; bucket_t& b = m_buckets[bucket_index].first;
bucket_t::iterator i = std::find_if(b.begin(), b.end() bucket_t::iterator i = std::find_if(b.begin(), b.end()
, bind(std::equal_to<node_id>() , bind(&node_entry::id, _1) == id);
, bind(&node_entry::id, _1), id));
bool ret = need_bootstrap(); bool ret = need_bootstrap();
m_bucket_activity[bucket_index] = second_clock::universal_time(); //m_bucket_activity[bucket_index] = time_now();
if (i != b.end()) if (i != b.end())
{ {
@ -274,6 +292,7 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
// offline // offline
if ((int)b.size() < m_bucket_size) if ((int)b.size() < m_bucket_size)
{ {
if (b.empty()) b.reserve(m_bucket_size);
b.push_back(node_entry(id, addr)); b.push_back(node_entry(id, addr));
// if bucket index is 0, the node is ourselves // if bucket index is 0, the node is ourselves
// don't updated m_lowest_active_bucket // don't updated m_lowest_active_bucket
@ -293,9 +312,8 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
// with nodes from that cache. // with nodes from that cache.
i = std::max_element(b.begin(), b.end() i = std::max_element(b.begin(), b.end()
, bind(std::less<int>()
, bind(&node_entry::fail_count, _1) , bind(&node_entry::fail_count, _1)
, bind(&node_entry::fail_count, _2))); < bind(&node_entry::fail_count, _2));
if (i != b.end() && i->fail_count > 0) if (i != b.end() && i->fail_count > 0)
{ {
@ -315,14 +333,14 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr)
bucket_t& rb = m_buckets[bucket_index].second; bucket_t& rb = m_buckets[bucket_index].second;
i = std::find_if(rb.begin(), rb.end() i = std::find_if(rb.begin(), rb.end()
, bind(std::equal_to<node_id>() , bind(&node_entry::id, _1) == id);
, bind(&node_entry::id, _1), id));
// if the node is already in the replacement bucket // if the node is already in the replacement bucket
// just return. // just return.
if (i != rb.end()) return ret; if (i != rb.end()) return ret;
if ((int)rb.size() > m_bucket_size) rb.erase(rb.begin()); if ((int)rb.size() > m_bucket_size) rb.erase(rb.begin());
if (rb.empty()) rb.reserve(m_bucket_size);
rb.push_back(node_entry(id, addr)); rb.push_back(node_entry(id, addr));
// TORRENT_LOG(table) << "inserting node in replacement cache: " << id << " " << addr; // TORRENT_LOG(table) << "inserting node in replacement cache: " << id << " " << addr;
return ret; return ret;
@ -358,8 +376,7 @@ void routing_table::find_node(node_id const& target
if ((int)l.size() == count) if ((int)l.size() == count)
{ {
assert(std::count_if(l.begin(), l.end() assert(std::count_if(l.begin(), l.end()
, boost::bind(std::not_equal_to<int>() , boost::bind(&node_entry::fail_count, _1) != 0) == 0);
, boost::bind(&node_entry::fail_count, _1), 0)) == 0);
return; return;
} }
@ -391,8 +408,7 @@ void routing_table::find_node(node_id const& target
|| bucket_index == (int)m_buckets.size() - 1) || bucket_index == (int)m_buckets.size() - 1)
{ {
assert(std::count_if(l.begin(), l.end() assert(std::count_if(l.begin(), l.end()
, boost::bind(std::not_equal_to<int>() , boost::bind(&node_entry::fail_count, _1) != 0) == 0);
, boost::bind(&node_entry::fail_count, _1), 0)) == 0);
return; return;
} }
@ -406,8 +422,7 @@ void routing_table::find_node(node_id const& target
{ {
l.erase(l.begin() + count, l.end()); l.erase(l.begin() + count, l.end());
assert(std::count_if(l.begin(), l.end() assert(std::count_if(l.begin(), l.end()
, boost::bind(std::not_equal_to<int>() , boost::bind(&node_entry::fail_count, _1) != 0) == 0);
, boost::bind(&node_entry::fail_count, _1), 0)) == 0);
return; return;
} }
} }
@ -416,8 +431,7 @@ void routing_table::find_node(node_id const& target
assert((int)l.size() <= count); assert((int)l.size() <= count);
assert(std::count_if(l.begin(), l.end() assert(std::count_if(l.begin(), l.end()
, boost::bind(std::not_equal_to<int>() , boost::bind(&node_entry::fail_count, _1) != 0) == 0);
, boost::bind(&node_entry::fail_count, _1), 0)) == 0);
} }
routing_table::iterator routing_table::begin() const routing_table::iterator routing_table::begin() const

View file

@ -30,24 +30,31 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include <boost/date_time/posix_time/posix_time_types.hpp> #include "libtorrent/pch.hpp"
#include <boost/date_time/posix_time/ptime.hpp> #include "libtorrent/socket.hpp"
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include <boost/mpl/max_element.hpp>
#include <boost/mpl/vector.hpp>
#include <boost/mpl/sizeof.hpp>
#include <boost/mpl/transform_view.hpp>
#include <boost/mpl/deref.hpp>
#include <boost/lexical_cast.hpp>
#include <libtorrent/io.hpp> #include <libtorrent/io.hpp>
#include <libtorrent/invariant_check.hpp> #include <libtorrent/invariant_check.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
#include <libtorrent/kademlia/logging.hpp> #include <libtorrent/kademlia/logging.hpp>
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/find_data.hpp>
#include <libtorrent/kademlia/closest_nodes.hpp>
#include <libtorrent/kademlia/refresh.hpp>
#include <libtorrent/kademlia/node.hpp>
#include <libtorrent/kademlia/observer.hpp>
#include <libtorrent/hasher.hpp> #include <libtorrent/hasher.hpp>
#include <fstream> #include <fstream>
using boost::posix_time::ptime;
using boost::posix_time::time_duration;
using boost::posix_time::microsec_clock;
using boost::posix_time::seconds;
using boost::posix_time::milliseconds;
using boost::shared_ptr; using boost::shared_ptr;
using boost::bind; using boost::bind;
@ -55,22 +62,57 @@ namespace libtorrent { namespace dht
{ {
namespace io = libtorrent::detail; namespace io = libtorrent::detail;
namespace mpl = boost::mpl;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_DEFINE_LOG(rpc) TORRENT_DEFINE_LOG(rpc)
#endif #endif
void intrusive_ptr_add_ref(observer const* o)
{
assert(o->m_refs >= 0);
assert(o != 0);
++o->m_refs;
}
void intrusive_ptr_release(observer const* o)
{
assert(o->m_refs > 0);
assert(o != 0);
if (--o->m_refs == 0)
{
boost::pool<>& p = o->pool_allocator;
o->~observer();
p.ordered_free(const_cast<observer*>(o));
}
}
node_id generate_id(); node_id generate_id();
typedef mpl::vector<
closest_nodes_observer
, find_data_observer
, announce_observer
, get_peers_observer
, refresh_observer
, ping_observer
, null_observer
> observer_types;
typedef mpl::max_element<
mpl::transform_view<observer_types, mpl::sizeof_<mpl::_1> >
>::type max_observer_type_iter;
rpc_manager::rpc_manager(fun const& f, node_id const& our_id rpc_manager::rpc_manager(fun const& f, node_id const& our_id
, routing_table& table, send_fun const& sf) , routing_table& table, send_fun const& sf)
: m_next_transaction_id(rand() % max_transactions) : m_pool_allocator(sizeof(mpl::deref<max_observer_type_iter::base>::type))
, m_next_transaction_id(rand() % max_transactions)
, m_oldest_transaction_id(m_next_transaction_id) , m_oldest_transaction_id(m_next_transaction_id)
, m_incoming(f) , m_incoming(f)
, m_send(sf) , m_send(sf)
, m_our_id(our_id) , m_our_id(our_id)
, m_table(table) , m_table(table)
, m_timer(boost::posix_time::microsec_clock::universal_time()) , m_timer(time_now())
, m_random_number(generate_id()) , m_random_number(generate_id())
, m_destructing(false) , m_destructing(false)
{ {
@ -121,12 +163,21 @@ bool rpc_manager::incoming(msg const& m)
// if we don't have the transaction id in our // if we don't have the transaction id in our
// request list, ignore the packet // request list, ignore the packet
if (m.transaction_id.size() != 2) if (m.transaction_id.size() < 2)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with invalid transaction id size: " TORRENT_LOG(rpc) << "Reply with invalid transaction id size: "
<< m.transaction_id.size() << " from " << m.addr; << m.transaction_id.size() << " from " << m.addr;
#endif #endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = "reply with invalid transaction id, size "
+ boost::lexical_cast<std::string>(m.transaction_id.size());
reply.addr = m.addr;
reply.transaction_id = "";
m_send(reply);
return false; return false;
} }
@ -137,19 +188,27 @@ bool rpc_manager::incoming(msg const& m)
|| tid < 0) || tid < 0)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with unknown transaction id: " TORRENT_LOG(rpc) << "Reply with invalid transaction id: "
<< tid << " from " << m.addr; << tid << " from " << m.addr;
#endif #endif
msg reply;
reply.reply = true;
reply.message_id = messages::error;
reply.error_code = 203; // Protocol error
reply.error_msg = "reply with invalid transaction id";
reply.addr = m.addr;
reply.transaction_id = "";
m_send(reply);
return false; return false;
} }
boost::shared_ptr<observer> o = m_transactions[tid]; observer_ptr o = m_transactions[tid];
if (!o) if (!o)
{ {
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(rpc) << "Reply with unknown transaction id: " TORRENT_LOG(rpc) << "Reply with unknown transaction id: "
<< tid << " from " << m.addr; << tid << " from " << m.addr << " (possibly timed out)";
#endif #endif
return false; return false;
} }
@ -165,11 +224,11 @@ bool rpc_manager::incoming(msg const& m)
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
std::ofstream reply_stats("libtorrent_logs/round_trip_ms.log", std::ios::app); std::ofstream reply_stats("libtorrent_logs/round_trip_ms.log", std::ios::app);
reply_stats << m.addr << "\t" << (microsec_clock::universal_time() reply_stats << m.addr << "\t" << total_milliseconds(time_now() - o->sent)
- o->sent).total_milliseconds() << std::endl; << std::endl;
#endif #endif
o->reply(m); o->reply(m);
m_transactions[tid].reset(); m_transactions[tid] = 0;
if (m.piggy_backed_ping) if (m.piggy_backed_ping)
{ {
@ -178,17 +237,16 @@ bool rpc_manager::incoming(msg const& m)
msg ph; msg ph;
ph.message_id = messages::ping; ph.message_id = messages::ping;
ph.transaction_id = m.ping_transaction_id; ph.transaction_id = m.ping_transaction_id;
ph.id = m_our_id;
ph.addr = m.addr; ph.addr = m.addr;
ph.reply = true;
msg empty; reply(ph);
reply(empty, ph);
} }
return m_table.node_seen(m.id, m.addr); return m_table.node_seen(m.id, m.addr);
} }
else else
{ {
assert(m.message_id != messages::error);
// this is an incoming request // this is an incoming request
m_incoming(m); m_incoming(m);
} }
@ -199,15 +257,13 @@ time_duration rpc_manager::tick()
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
using boost::posix_time::microsec_clock;
const int timeout_ms = 10 * 1000; const int timeout_ms = 10 * 1000;
// look for observers that has timed out // look for observers that has timed out
if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms); if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms);
std::vector<shared_ptr<observer> > timeouts; std::vector<observer_ptr > timeouts;
for (;m_next_transaction_id != m_oldest_transaction_id; for (;m_next_transaction_id != m_oldest_transaction_id;
m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions)
@ -215,11 +271,10 @@ time_duration rpc_manager::tick()
assert(m_oldest_transaction_id >= 0); assert(m_oldest_transaction_id >= 0);
assert(m_oldest_transaction_id < max_transactions); assert(m_oldest_transaction_id < max_transactions);
boost::shared_ptr<observer> o = m_transactions[m_oldest_transaction_id]; observer_ptr o = m_transactions[m_oldest_transaction_id];
if (!o) continue; if (!o) continue;
time_duration diff = o->sent + milliseconds(timeout_ms) time_duration diff = o->sent + milliseconds(timeout_ms) - time_now();
- microsec_clock::universal_time();
if (diff > seconds(0)) if (diff > seconds(0))
{ {
if (diff < seconds(1)) return seconds(1); if (diff < seconds(1)) return seconds(1);
@ -228,7 +283,7 @@ time_duration rpc_manager::tick()
try try
{ {
m_transactions[m_oldest_transaction_id].reset(); m_transactions[m_oldest_transaction_id] = 0;
timeouts.push_back(o); timeouts.push_back(o);
} catch (std::exception) {} } catch (std::exception) {}
} }
@ -239,11 +294,11 @@ time_duration rpc_manager::tick()
// clear the aborted transactions, will likely // clear the aborted transactions, will likely
// generate new requests. We need to swap, since the // generate new requests. We need to swap, since the
// destrutors may add more observers to the m_aborted_transactions // destrutors may add more observers to the m_aborted_transactions
std::vector<shared_ptr<observer> >().swap(m_aborted_transactions); std::vector<observer_ptr >().swap(m_aborted_transactions);
return milliseconds(timeout_ms); return milliseconds(timeout_ms);
} }
unsigned int rpc_manager::new_transaction_id(shared_ptr<observer> o) unsigned int rpc_manager::new_transaction_id(observer_ptr o)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -255,7 +310,7 @@ unsigned int rpc_manager::new_transaction_id(shared_ptr<observer> o)
// it will prevent it from spawning new requests right now, // it will prevent it from spawning new requests right now,
// since that would break the invariant // since that would break the invariant
m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]); m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]);
m_transactions[m_next_transaction_id].reset(); m_transactions[m_next_transaction_id] = 0;
assert(m_oldest_transaction_id == m_next_transaction_id); assert(m_oldest_transaction_id == m_next_transaction_id);
} }
assert(!m_transactions[tid]); assert(!m_transactions[tid]);
@ -288,7 +343,7 @@ void rpc_manager::update_oldest_transaction_id()
} }
void rpc_manager::invoke(int message_id, udp::endpoint target_addr void rpc_manager::invoke(int message_id, udp::endpoint target_addr
, shared_ptr<observer> o) , observer_ptr o)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
@ -315,7 +370,7 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
o->send(m); o->send(m);
o->sent = boost::posix_time::microsec_clock::universal_time(); o->sent = time_now();
o->target_addr = target_addr; o->target_addr = target_addr;
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
@ -333,67 +388,41 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr
} }
} }
void rpc_manager::reply(msg& m, msg const& reply_to) void rpc_manager::reply(msg& m)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
if (m_destructing) return; if (m_destructing) return;
if (m.message_id != messages::error) assert(m.reply);
m.message_id = reply_to.message_id;
m.addr = reply_to.addr;
m.reply = true;
m.piggy_backed_ping = false; m.piggy_backed_ping = false;
m.id = m_our_id; m.id = m_our_id;
m.transaction_id = reply_to.transaction_id;
m_send(m); m_send(m);
} }
namespace void rpc_manager::reply_with_ping(msg& m)
{
struct dummy_observer : observer
{
virtual void reply(msg const&) {}
virtual void timeout() {}
virtual void send(msg&) {}
void abort() {}
};
}
void rpc_manager::reply_with_ping(msg& m, msg const& reply_to)
{ {
INVARIANT_CHECK; INVARIANT_CHECK;
if (m_destructing) return; if (m_destructing) return;
assert(m.reply);
if (m.message_id != messages::error)
m.message_id = reply_to.message_id;
m.addr = reply_to.addr;
m.reply = true;
m.piggy_backed_ping = true; m.piggy_backed_ping = true;
m.id = m_our_id; m.id = m_our_id;
m.transaction_id = reply_to.transaction_id;
try
{
m.ping_transaction_id.clear(); m.ping_transaction_id.clear();
std::back_insert_iterator<std::string> out(m.ping_transaction_id); std::back_insert_iterator<std::string> out(m.ping_transaction_id);
io::write_uint16(m_next_transaction_id, out); io::write_uint16(m_next_transaction_id, out);
boost::shared_ptr<observer> o(new dummy_observer); observer_ptr o(new (allocator().malloc()) null_observer(allocator()));
assert(!m_transactions[m_next_transaction_id]); assert(!m_transactions[m_next_transaction_id]);
o->sent = boost::posix_time::microsec_clock::universal_time(); o->sent = time_now();
o->target_addr = m.addr; o->target_addr = m.addr;
m_send(m); m_send(m);
new_transaction_id(o); new_transaction_id(o);
} }
catch (std::exception& e)
{
// m_send may fail with "no route to host"
}
}

View file

@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "libtorrent/pch.hpp"
#include <libtorrent/kademlia/traversal_algorithm.hpp> #include <libtorrent/kademlia/traversal_algorithm.hpp>
#include <libtorrent/kademlia/routing_table.hpp> #include <libtorrent/kademlia/routing_table.hpp>
#include <libtorrent/kademlia/rpc_manager.hpp> #include <libtorrent/kademlia/rpc_manager.hpp>
@ -66,8 +68,7 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
if (i == m_results.end() || i->id != id) if (i == m_results.end() || i->id != id)
{ {
assert(std::find_if(m_results.begin(), m_results.end() assert(std::find_if(m_results.begin(), m_results.end()
, bind(std::equal_to<node_id>() , bind(&result::id, _1) == id) == m_results.end());
, bind(&result::id, _1), id)) == m_results.end());
#ifdef TORRENT_DHT_VERBOSE_LOGGING #ifdef TORRENT_DHT_VERBOSE_LOGGING
TORRENT_LOG(traversal) << "adding result: " << id << " " << addr; TORRENT_LOG(traversal) << "adding result: " << id << " " << addr;
#endif #endif
@ -75,6 +76,11 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig
} }
} }
boost::pool<>& traversal_algorithm::allocator() const
{
return m_rpc.allocator();
}
void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr)
{ {
add_entry(id, addr, 0); add_entry(id, addr, 0);