diff --git a/libtorrent/include/libtorrent/aux_/session_impl.hpp b/libtorrent/include/libtorrent/aux_/session_impl.hpp index 04d71885c..c878eb95c 100644 --- a/libtorrent/include/libtorrent/aux_/session_impl.hpp +++ b/libtorrent/include/libtorrent/aux_/session_impl.hpp @@ -77,6 +77,11 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/stat.hpp" #include "libtorrent/file_pool.hpp" #include "libtorrent/bandwidth_manager.hpp" +#include "libtorrent/natpmp.hpp" +#include "libtorrent/upnp.hpp" +#include "libtorrent/lsd.hpp" +#include "libtorrent/socket_type.hpp" +#include "libtorrent/connection_queue.hpp" namespace libtorrent { @@ -104,6 +109,7 @@ namespace libtorrent std::vector piece_map; std::vector unfinished_pieces; + std::vector block_info; std::vector peers; entry resume_data; @@ -163,12 +169,10 @@ namespace libtorrent #endif friend struct checker_impl; friend class invariant_access; - typedef std::map + typedef std::map , boost::intrusive_ptr > connection_map; typedef std::map > torrent_map; - typedef std::deque > - connection_queue; session_impl( std::pair listen_port_range @@ -184,7 +188,7 @@ namespace libtorrent void open_listen_port(); void async_accept(); - void on_incoming_connection(boost::shared_ptr const& s + void on_incoming_connection(boost::shared_ptr const& s , boost::weak_ptr const& as, asio::error_code const& e); // must be locked to access the data @@ -195,14 +199,8 @@ namespace libtorrent boost::weak_ptr find_torrent(const sha1_hash& info_hash); peer_id const& get_peer_id() const { return m_peer_id; } - // this will see if there are any pending connection attempts - // and in that case initiate new connections until the limit - // is reached. - void process_connection_queue(); - void close_connection(boost::intrusive_ptr const& p); - void connection_completed(boost::intrusive_ptr const& p); - void connection_failed(boost::shared_ptr const& s + void connection_failed(boost::shared_ptr const& s , tcp::endpoint const& a, char const* message); void set_settings(session_settings const& s); @@ -213,11 +211,16 @@ namespace libtorrent void add_dht_node(udp::endpoint n); void add_dht_router(std::pair const& node); void set_dht_settings(dht_settings const& s); - dht_settings const& kad_settings() const { return m_dht_settings; } + dht_settings const& get_dht_settings() const { return m_dht_settings; } void start_dht(entry const& startup_state); void stop_dht(); entry dht_state() const; #endif + + // called when a port mapping is successful, or a router returns + // a failure to map a port + void on_port_mapping(int tcp_port, int udp_port, std::string const& errmsg); + bool is_aborted() const { return m_abort; } void set_ip_filter(ip_filter const& f); @@ -232,7 +235,8 @@ namespace libtorrent , boost::filesystem::path const& save_path , entry const& resume_data , bool compact_mode - , int block_size); + , int block_size + , storage_constructor_type sc); torrent_handle add_torrent( char const* tracker_url @@ -241,7 +245,8 @@ namespace libtorrent , boost::filesystem::path const& save_path , entry const& resume_data , bool compact_mode - , int block_size); + , int block_size + , storage_constructor_type sc); void remove_torrent(torrent_handle const& h); @@ -271,18 +276,56 @@ namespace libtorrent torrent_handle find_torrent_handle(sha1_hash const& info_hash); - + void announce_lsd(sha1_hash const& ih); + + void set_peer_proxy(proxy_settings const& s) + { m_peer_proxy = s; } + void set_web_seed_proxy(proxy_settings const& s) + { m_web_seed_proxy = s; } + void set_tracker_proxy(proxy_settings const& s) + { m_tracker_proxy = s; } + + proxy_settings const& peer_proxy() const + { return m_peer_proxy; } + proxy_settings const& web_seed_proxy() const + { return m_web_seed_proxy; } + proxy_settings const& tracker_proxy() const + { return m_tracker_proxy; } + +#ifndef TORRENT_DISABLE_DHT + void set_dht_proxy(proxy_settings const& s) + { m_dht_proxy = s; } + proxy_settings const& dht_proxy() const + { return m_dht_proxy; } +#endif + // handles delayed alerts alert_manager m_alerts; // private: + void on_lsd_peer(tcp::endpoint peer, sha1_hash const& ih); + // this is where all active sockets are stored. // the selector can sleep while there's no activity on // them io_service m_io_service; asio::strand m_strand; + // the file pool that all storages in this session's + // torrents uses. It sets a limit on the number of + // open files by this session. + // file pool must be destructed after the torrents + // since they will still have references to it + // when they are destructed. + file_pool m_files; + + // this is a list of half-open tcp connections + // (only outgoing connections) + // this has to be one of the last + // members to be destructed + connection_queue m_half_open; + // the bandwidth manager is responsible for // handing out bandwidth to connections that // asks for it, it can also throttle the @@ -298,16 +341,6 @@ namespace libtorrent // peers. connection_map m_connections; - // this is a list of half-open tcp connections - // (only outgoing connections) - connection_map m_half_open; - - // this is a queue of pending outgoing connections. If the - // list of half-open connections is full (given the global - // limit), new outgoing connections are put on this queue, - // waiting for one slot in the half-open queue to open up. - connection_queue m_connection_queue; - // filters incoming connections ip_filter m_ip_filter; @@ -328,11 +361,28 @@ namespace libtorrent // that we should let the os decide which // interface to listen on tcp::endpoint m_listen_interface; + + // this is typically set to the same as the local + // listen port. In case a NAT port forward was + // successfully opened, this will be set to the + // port that is open on the external (NAT) interface + // on the NAT box itself. This is the port that has + // to be published to peers, since this is the port + // the client is reachable through. + int m_external_listen_port; boost::shared_ptr m_listen_socket; // the settings for the client session_settings m_settings; + // the proxy settings for different + // kinds of connections + proxy_settings m_peer_proxy; + proxy_settings m_web_seed_proxy; + proxy_settings m_tracker_proxy; +#ifndef TORRENT_DISABLE_DHT + proxy_settings m_dht_proxy; +#endif // set to true when the session object // is being destructed and the thread @@ -341,9 +391,6 @@ namespace libtorrent int m_max_uploads; int m_max_connections; - // the number of simultaneous half-open tcp - // connections libtorrent will have. - int m_half_open_limit; // statistics gathered from all torrents. stat m_stat; @@ -354,23 +401,42 @@ namespace libtorrent // NAT or not. bool m_incoming_connection; - // the file pool that all storages in this session's - // torrents uses. It sets a limit on the number of - // open files by this session. - file_pool m_files; - void second_tick(asio::error_code const& e); - boost::posix_time::ptime m_last_tick; + ptime m_last_tick; #ifndef TORRENT_DISABLE_DHT boost::intrusive_ptr m_dht; dht_settings m_dht_settings; + // if this is set to true, the dht listen port + // will be set to the same as the tcp listen port + // and will be synchronlized with it as it changes + // it defaults to true + bool m_dht_same_port; + + // see m_external_listen_port. This is the same + // but for the udp port used by the DHT. + int m_external_udp_port; #endif + natpmp m_natpmp; + upnp m_upnp; + lsd m_lsd; + // the timer used to fire the second_tick deadline_timer m_timer; + + // the index of the torrent that will be offered to + // connect to a peer next time second_tick is called. + // This implements a round robin. + int m_next_connect_torrent; #ifndef NDEBUG void check_invariant(const char *place = 0); #endif + +#ifdef TORRENT_STATS + // logger used to write bandwidth usage statistics + std::ofstream m_stats_logger; + int m_second_counter; +#endif #if defined(TORRENT_VERBOSE_LOGGING) || defined(TORRENT_LOGGING) boost::shared_ptr create_log(std::string const& name , int instance, bool append = true); @@ -379,10 +445,7 @@ namespace libtorrent // shutting down. This list is just here to keep them alive during // whe shutting down process std::list > m_tracker_loggers; - - // logger used to write bandwidth usage statistics - boost::shared_ptr m_stats_logger; - int m_second_counter; + public: boost::shared_ptr m_logger; private: diff --git a/libtorrent/include/libtorrent/extensions/metadata_transfer.hpp b/libtorrent/include/libtorrent/extensions/metadata_transfer.hpp index 6e11eb5b1..210642161 100644 --- a/libtorrent/include/libtorrent/extensions/metadata_transfer.hpp +++ b/libtorrent/include/libtorrent/extensions/metadata_transfer.hpp @@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE. #endif #include +#include "libtorrent/config.hpp" #ifdef _MSC_VER #pragma warning(pop) @@ -47,7 +48,7 @@ namespace libtorrent { struct torrent_plugin; class torrent; - boost::shared_ptr create_metadata_plugin(torrent*); + TORRENT_EXPORT boost::shared_ptr create_metadata_plugin(torrent*); } #endif // TORRENT_METADATA_TRANSFER_HPP_INCLUDED diff --git a/libtorrent/include/libtorrent/extensions/ut_pex.hpp b/libtorrent/include/libtorrent/extensions/ut_pex.hpp index c21c56816..efd9ab4f6 100644 --- a/libtorrent/include/libtorrent/extensions/ut_pex.hpp +++ b/libtorrent/include/libtorrent/extensions/ut_pex.hpp @@ -38,6 +38,7 @@ POSSIBILITY OF SUCH DAMAGE. #endif #include +#include "libtorrent/config.hpp" #ifdef _MSC_VER #pragma warning(pop) @@ -47,7 +48,7 @@ namespace libtorrent { struct torrent_plugin; class torrent; - boost::shared_ptr create_ut_pex_plugin(torrent*); + TORRENT_EXPORT boost::shared_ptr create_ut_pex_plugin(torrent*); } #endif // TORRENT_UT_PEX_EXTENSION_HPP_INCLUDED diff --git a/libtorrent/include/libtorrent/kademlia/closest_nodes.hpp b/libtorrent/include/libtorrent/kademlia/closest_nodes.hpp index d5580b9c9..244e4bb38 100644 --- a/libtorrent/include/libtorrent/kademlia/closest_nodes.hpp +++ b/libtorrent/include/libtorrent/kademlia/closest_nodes.hpp @@ -38,6 +38,8 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include +#include #include @@ -80,6 +82,35 @@ private: done_callback m_done_callback; }; +class closest_nodes_observer : public observer +{ +public: + closest_nodes_observer( + boost::intrusive_ptr const& algorithm + , node_id self + , node_id target) + : observer(algorithm->allocator()) + , m_algorithm(algorithm) + , m_target(target) + , m_self(self) + {} + ~closest_nodes_observer(); + + void send(msg& p) + { + p.info_hash = m_target; + } + + void timeout(); + void reply(msg const&); + void abort() { m_algorithm = 0; } + +private: + boost::intrusive_ptr m_algorithm; + node_id const m_target; + node_id const m_self; +}; + } } // namespace libtorrent::dht #endif // CLOSEST_NODES_050323_HPP diff --git a/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp b/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp index d1a3a8247..f61364707 100644 --- a/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp +++ b/libtorrent/include/libtorrent/kademlia/dht_tracker.hpp @@ -39,8 +39,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include -#include #include #include #include @@ -120,7 +118,7 @@ namespace libtorrent { namespace dht udp::endpoint m_remote_endpoint[2]; std::vector m_send_buf; - boost::posix_time::ptime m_last_refresh; + ptime m_last_new_key; deadline_timer m_timer; deadline_timer m_connection_timer; deadline_timer m_refresh_timer; diff --git a/libtorrent/include/libtorrent/kademlia/find_data.hpp b/libtorrent/include/libtorrent/kademlia/find_data.hpp index bbafcdd77..17d77c9d8 100644 --- a/libtorrent/include/libtorrent/kademlia/find_data.hpp +++ b/libtorrent/include/libtorrent/kademlia/find_data.hpp @@ -40,8 +40,10 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include +#include +#include +#include #include namespace libtorrent { namespace dht @@ -89,6 +91,37 @@ private: bool m_done; }; +class find_data_observer : public observer +{ +public: + find_data_observer( + boost::intrusive_ptr const& algorithm + , node_id self + , node_id target) + : observer(algorithm->allocator()) + , m_algorithm(algorithm) + , m_target(target) + , m_self(self) + {} + ~find_data_observer(); + + void send(msg& m) + { + m.reply = false; + m.message_id = messages::get_peers; + m.info_hash = m_target; + } + + void timeout(); + void reply(msg const&); + void abort() { m_algorithm = 0; } + +private: + boost::intrusive_ptr m_algorithm; + node_id const m_target; + node_id const m_self; +}; + } } // namespace libtorrent::dht #endif // FIND_DATA_050323_HPP diff --git a/libtorrent/include/libtorrent/kademlia/node.hpp b/libtorrent/include/libtorrent/kademlia/node.hpp index c44a21f33..850333043 100644 --- a/libtorrent/include/libtorrent/kademlia/node.hpp +++ b/libtorrent/include/libtorrent/kademlia/node.hpp @@ -41,17 +41,18 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include #include #include #include #include -#include -#include #include #include +#include "libtorrent/socket.hpp" + namespace libtorrent { namespace dht { @@ -67,7 +68,7 @@ TORRENT_DECLARE_LOG(node); struct peer_entry { tcp::endpoint addr; - boost::posix_time::ptime added; + ptime added; }; // this is a group. It contains a set of group members @@ -85,6 +86,75 @@ inline bool operator<(peer_entry const& lhs, peer_entry const& rhs) struct null_type {}; +class announce_observer : public observer +{ +public: + announce_observer(boost::pool<>& allocator + , sha1_hash const& info_hash + , int listen_port + , entry const& write_token) + : observer(allocator) + , m_info_hash(info_hash) + , m_listen_port(listen_port) + , m_token(write_token) + {} + + void send(msg& m) + { + m.port = m_listen_port; + m.info_hash = m_info_hash; + m.write_token = m_token; + } + + void timeout() {} + void reply(msg const&) {} + void abort() {} + +private: + sha1_hash m_info_hash; + int m_listen_port; + entry m_token; +}; + +class get_peers_observer : public observer +{ +public: + get_peers_observer(sha1_hash const& info_hash + , int listen_port + , rpc_manager& rpc + , boost::function const&, sha1_hash const&)> f) + : observer(rpc.allocator()) + , m_info_hash(info_hash) + , m_listen_port(listen_port) + , m_rpc(rpc) + , m_fun(f) + {} + + void send(msg& m) + { + m.port = m_listen_port; + m.info_hash = m_info_hash; + } + + void timeout() {} + void reply(msg const& r) + { + m_rpc.invoke(messages::announce_peer, r.addr + , observer_ptr(new (m_rpc.allocator().malloc()) announce_observer( + m_rpc.allocator(), m_info_hash, m_listen_port, r.write_token))); + m_fun(r.peers, m_info_hash); + } + void abort() {} + +private: + sha1_hash m_info_hash; + int m_listen_port; + rpc_manager& m_rpc; + boost::function const&, sha1_hash const&)> m_fun; +}; + + + class node_impl : boost::noncopyable { typedef std::map table_t; @@ -116,14 +186,18 @@ public: node_id const& nid() const { return m_id; } boost::tuple size() const{ return m_table.size(); } + size_type num_global_nodes() const + { return m_table.num_global_nodes(); } data_iterator begin_data() { return m_map.begin(); } data_iterator end_data() { return m_map.end(); } int data_size() const { return int(m_map.size()); } +#ifdef TORRENT_DHT_VERBOSE_LOGGING void print_state(std::ostream& os) const { m_table.print_state(os); } - +#endif + void announce(sha1_hash const& info_hash, int listen_port , boost::function const& , sha1_hash const&)> f); @@ -133,8 +207,8 @@ public: // the returned time is the delay until connection_timeout() // should be called again the next time - boost::posix_time::time_duration connection_timeout(); - boost::posix_time::time_duration refresh_timeout(); + time_duration connection_timeout(); + time_duration refresh_timeout(); // generates a new secret number used to generate write tokens void new_write_key(); @@ -172,7 +246,7 @@ private: rpc_manager m_rpc; table_t m_map; - boost::posix_time::ptime m_last_tracker_tick; + ptime m_last_tracker_tick; // secret random numbers used to create write tokens int m_secret[2]; diff --git a/libtorrent/include/libtorrent/kademlia/refresh.hpp b/libtorrent/include/libtorrent/kademlia/refresh.hpp index 7231b26f2..953c4d871 100644 --- a/libtorrent/include/libtorrent/kademlia/refresh.hpp +++ b/libtorrent/include/libtorrent/kademlia/refresh.hpp @@ -37,6 +37,8 @@ POSSIBILITY OF SUCH DAMAGE. #include #include +#include +#include #include @@ -98,6 +100,59 @@ private: std::vector::iterator m_leftover_nodes_iterator; }; +class refresh_observer : public observer +{ +public: + refresh_observer( + boost::intrusive_ptr const& algorithm + , node_id self + , node_id target) + : observer(algorithm->allocator()) + , m_target(target) + , m_self(self) + , m_algorithm(algorithm) + {} + ~refresh_observer(); + + void send(msg& m) + { + m.info_hash = m_target; + } + + void timeout(); + void reply(msg const& m); + void abort() { m_algorithm = 0; } + + +private: + node_id const m_target; + node_id const m_self; + boost::intrusive_ptr m_algorithm; +}; + +class ping_observer : public observer +{ +public: + ping_observer( + boost::intrusive_ptr const& algorithm + , node_id self) + : observer(algorithm->allocator()) + , m_self(self) + , m_algorithm(algorithm) + {} + ~ping_observer(); + + void send(msg& p) {} + void timeout(); + void reply(msg const& m); + void abort() { m_algorithm = 0; } + + +private: + node_id const m_self; + boost::intrusive_ptr m_algorithm; +}; + template inline refresh::refresh( node_id target diff --git a/libtorrent/include/libtorrent/kademlia/routing_table.hpp b/libtorrent/include/libtorrent/kademlia/routing_table.hpp index 3abc472b4..45a7dd762 100644 --- a/libtorrent/include/libtorrent/kademlia/routing_table.hpp +++ b/libtorrent/include/libtorrent/kademlia/routing_table.hpp @@ -36,7 +36,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #include #include @@ -50,8 +49,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include - -namespace pt = boost::posix_time; +#include namespace libtorrent { namespace dht { @@ -60,7 +58,7 @@ using asio::ip::udp; //TORRENT_DECLARE_LOG(table); -typedef std::deque bucket_t; +typedef std::vector bucket_t; // differences in the implementation from the description in // the paper: @@ -104,7 +102,7 @@ namespace aux , bucket_iterator_t end) : m_bucket_iterator(begin) , m_bucket_end(end) - , m_iterator(begin != end ? begin->first.begin() : bucket_t::iterator()) + , m_iterator(begin != end ? begin->first.begin() : bucket_t::const_iterator()) { if (m_bucket_iterator == m_bucket_end) return; while (m_iterator == m_bucket_iterator->first.end()) @@ -177,7 +175,7 @@ public: // if the given bucket is empty but there are nodes // in a bucket closer to us, or if the bucket is non-empty and // the time from the last activity is more than 15 minutes - boost::posix_time::ptime next_refresh(int bucket); + ptime next_refresh(int bucket); // fills the vector with the count nodes from our buckets that // are nearest to the given id. @@ -204,17 +202,21 @@ public: iterator end() const; boost::tuple size() const; + size_type num_global_nodes() const; // returns true if there are no working nodes // in the routing table bool need_bootstrap() const; + int num_active_buckets() const + { return 160 - m_lowest_active_bucket + 1; } void replacement_cache(bucket_t& nodes) const; - +#ifdef TORRENT_DHT_VERBOSE_LOGGING // used for debug and monitoring purposes. This will print out // the state of the routing table to the given stream void print_state(std::ostream& os) const; - +#endif + private: // constant called k in paper @@ -226,7 +228,7 @@ private: typedef boost::array, 160> table_t; table_t m_buckets; // timestamps of the last activity in each bucket - typedef boost::array table_activity_t; + typedef boost::array table_activity_t; table_activity_t m_bucket_activity; node_id m_id; // our own node id diff --git a/libtorrent/include/libtorrent/kademlia/rpc_manager.hpp b/libtorrent/include/libtorrent/kademlia/rpc_manager.hpp index 2603071fc..a7c47f29a 100644 --- a/libtorrent/include/libtorrent/kademlia/rpc_manager.hpp +++ b/libtorrent/include/libtorrent/kademlia/rpc_manager.hpp @@ -36,11 +36,11 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #include #include #include #include +#include #include #include @@ -48,95 +48,27 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include + +#include "libtorrent/time.hpp" namespace libtorrent { namespace dht { +struct observer; + using asio::ip::udp; #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_DECLARE_LOG(rpc); #endif -typedef std::vector packet_t; - -namespace messages +struct null_observer : public observer { - enum { ping = 0, find_node = 1, get_peers = 2, announce_peer = 3, error = 4 }; - char const* const ids[] = { "ping", "find_node", "get_peers", "announce_peer", "error" }; -} // namespace messages - -struct msg -{ - msg() : reply(false), piggy_backed_ping(false) - , port(0) {} - - // true if this message is a reply - bool reply; - // true if this is a reply with a piggy backed ping - bool piggy_backed_ping; - // the kind if message - int message_id; - // if this is a reply, a copy of the transaction id - // from the request. If it's a request, a transaction - // id that should be sent back in the reply - std::string transaction_id; - // if this packet has a piggy backed ping, this - // is the transaction id of that ping - std::string ping_transaction_id; - // the node id of the process sending the message - node_id id; - // the address of the process sending or receiving - // the message. - udp::endpoint addr; - // if this is a nodes response, these are the nodes - typedef std::vector nodes_t; - nodes_t nodes; - - typedef std::vector peers_t; - peers_t peers; - - // similar to transaction_id but for write operations. - entry write_token; - - // the info has for peer_requests, announce_peer - // and responses - node_id info_hash; - - // port for announce_peer messages - int port; - - // ERROR MESSAGES - int error_code; - std::string error_msg; -}; - -struct observer : boost::noncopyable -{ - observer() - : sent(boost::posix_time::microsec_clock::universal_time()) - {} - - virtual ~observer() {} - - // this two callbacks lets the observer add - // information to the message before it's sent - virtual void send(msg& m) = 0; - - // this is called when a reply is received - virtual void reply(msg const& m) = 0; - - // this is called when no reply has been received within - // some timeout - virtual void timeout() = 0; - - // if this is called the destructor should - // not invoke any new messages, and should - // only clean up. It means the rpc-manager - // is being destructed - virtual void abort() = 0; - - udp::endpoint target_addr; - boost::posix_time::ptime sent; + null_observer(boost::pool<>& allocator): observer(allocator) {} + virtual void reply(msg const&) {} + virtual void timeout() {} + virtual void send(msg&) {} + void abort() {} }; class routing_table; @@ -153,31 +85,36 @@ public: // returns true if the node needs a refresh bool incoming(msg const&); - boost::posix_time::time_duration tick(); + time_duration tick(); void invoke(int message_id, udp::endpoint target - , boost::shared_ptr o); + , observer_ptr o); - void reply(msg& m, msg const& reply_to); - void reply_with_ping(msg& m, msg const& reply_to); + void reply(msg& m); + void reply_with_ping(msg& m); #ifndef NDEBUG void check_invariant() const; #endif + boost::pool<>& allocator() const + { return m_pool_allocator; } + private: enum { max_transactions = 2048 }; - unsigned int new_transaction_id(boost::shared_ptr o); + unsigned int new_transaction_id(observer_ptr o); void update_oldest_transaction_id(); boost::uint32_t calc_connection_id(udp::endpoint addr); - typedef boost::array, max_transactions> + mutable boost::pool<> m_pool_allocator; + + typedef boost::array transactions_t; transactions_t m_transactions; - std::vector > m_aborted_transactions; + std::vector m_aborted_transactions; // this is the next transaction id to be used int m_next_transaction_id; @@ -191,7 +128,7 @@ private: send_fun m_send; node_id m_our_id; routing_table& m_table; - boost::posix_time::ptime m_timer; + ptime m_timer; node_id m_random_number; bool m_destructing; }; diff --git a/libtorrent/include/libtorrent/kademlia/traversal_algorithm.hpp b/libtorrent/include/libtorrent/kademlia/traversal_algorithm.hpp index 6fa647ba4..d51ed5506 100644 --- a/libtorrent/include/libtorrent/kademlia/traversal_algorithm.hpp +++ b/libtorrent/include/libtorrent/kademlia/traversal_algorithm.hpp @@ -43,6 +43,7 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include namespace libtorrent { namespace dht { @@ -60,6 +61,7 @@ public: void finished(node_id const& id); void failed(node_id const& id, bool prevent_request = false); virtual ~traversal_algorithm() {} + boost::pool<>& allocator() const; protected: template diff --git a/libtorrent/src/kademlia/closest_nodes.cpp b/libtorrent/src/kademlia/closest_nodes.cpp index e8bb9781c..0c7d9d276 100644 --- a/libtorrent/src/kademlia/closest_nodes.cpp +++ b/libtorrent/src/kademlia/closest_nodes.cpp @@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include @@ -39,36 +41,6 @@ namespace libtorrent { namespace dht using asio::ip::udp; -typedef boost::shared_ptr observer_ptr; - -class closest_nodes_observer : public observer -{ -public: - closest_nodes_observer( - boost::intrusive_ptr const& algorithm - , node_id self - , node_id target) - : m_algorithm(algorithm) - , m_target(target) - , m_self(self) - {} - ~closest_nodes_observer(); - - void send(msg& p) - { - p.info_hash = m_target; - } - - void timeout(); - void reply(msg const&); - void abort() { m_algorithm = 0; } - -private: - boost::intrusive_ptr m_algorithm; - node_id const m_target; - node_id const m_self; -}; - closest_nodes_observer::~closest_nodes_observer() { if (m_algorithm) m_algorithm->failed(m_self, true); @@ -127,8 +99,8 @@ closest_nodes::closest_nodes( void closest_nodes::invoke(node_id const& id, udp::endpoint addr) { - observer_ptr p(new closest_nodes_observer(this, id, m_target)); - m_rpc.invoke(messages::find_node, addr, p); + observer_ptr o(new (m_rpc.allocator().malloc()) closest_nodes_observer(this, id, m_target)); + m_rpc.invoke(messages::find_node, addr, o); } void closest_nodes::done() diff --git a/libtorrent/src/kademlia/dht_tracker.cpp b/libtorrent/src/kademlia/dht_tracker.cpp index 3b2e9a50b..eda6cd864 100644 --- a/libtorrent/src/kademlia/dht_tracker.cpp +++ b/libtorrent/src/kademlia/dht_tracker.cpp @@ -30,13 +30,13 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include #include #include -#include -#include #include #include #include @@ -52,14 +52,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/io.hpp" #include "libtorrent/version.hpp" -using boost::posix_time::ptime; -using boost::posix_time::time_duration; -using boost::posix_time::second_clock; -using boost::posix_time::microsec_clock; -using boost::posix_time::seconds; -using boost::posix_time::minutes; -using boost::posix_time::hours; -using boost::posix_time::milliseconds; using boost::ref; using boost::lexical_cast; using libtorrent::dht::node_impl; @@ -70,6 +62,11 @@ using libtorrent::dht::packet_iterator; namespace messages = libtorrent::dht::messages; using namespace libtorrent::detail; +enum +{ + key_refresh = 5 // generate a new write token key every 5 minutes +}; + using asio::ip::udp; typedef asio::ip::address_v4 address; @@ -155,7 +152,7 @@ namespace libtorrent { namespace dht , m_dht(bind(&dht_tracker::send_packet, this, _1), settings , read_id(bootstrap)) , m_buffer(0) - , m_last_refresh(second_clock::universal_time() - hours(1)) + , m_last_new_key(time_now() - minutes(key_refresh)) , m_timer(ios) , m_connection_timer(ios) , m_refresh_timer(ios) @@ -215,7 +212,7 @@ namespace libtorrent { namespace dht m_connection_timer.async_wait(m_strand.wrap( bind(&dht_tracker::connection_timeout, self(), _1))); - m_refresh_timer.expires_from_now(minutes(15)); + m_refresh_timer.expires_from_now(seconds(5)); m_refresh_timer.async_wait(m_strand.wrap(bind(&dht_tracker::refresh_timeout, self(), _1))); m_dht.bootstrap(initial_nodes, bind(&dht_tracker::on_bootstrap, self())); @@ -233,6 +230,7 @@ namespace libtorrent { namespace dht { boost::tie(s.dht_nodes, s.dht_node_cache) = m_dht.size(); s.dht_torrents = m_dht.data_size(); + s.dht_global_nodes = m_dht.num_global_nodes(); } void dht_tracker::connection_timeout(asio::error_code const& e) @@ -281,7 +279,15 @@ namespace libtorrent { namespace dht m_timer.expires_from_now(minutes(tick_period)); m_timer.async_wait(m_strand.wrap(bind(&dht_tracker::tick, this, _1))); - m_dht.new_write_key(); + ptime now = time_now(); + if (now - m_last_new_key > minutes(key_refresh)) + { + m_last_new_key = now; + m_dht.new_write_key(); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << time_now_string() << " new write key"; +#endif + } #ifdef TORRENT_DHT_VERBOSE_LOGGING static bool first = true; @@ -304,9 +310,7 @@ namespace libtorrent { namespace dht if (first) { first = false; - using boost::posix_time::to_simple_string; - pc << "\n\n ***** starting log at " << to_simple_string( - second_clock::universal_time()) << " *****\n\n" + pc << "\n\n ***** starting log at " << time_now_string() << " *****\n\n" << "minute:active nodes:passive nodes" ":ping replies sent:ping queries recvd:ping" ":ping replies sent:ping queries recvd:ping" @@ -409,9 +413,8 @@ namespace libtorrent { namespace dht , m_in_buf[current_buffer].end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << microsec_clock::universal_time() - << " RECEIVED [" << m_remote_endpoint[current_buffer] - << "]:"; + TORRENT_LOG(dht_tracker) << time_now_string() << " RECEIVED [" + << m_remote_endpoint[current_buffer] << "]:"; #endif libtorrent::dht::msg m; @@ -453,7 +456,7 @@ namespace libtorrent { namespace dht } else { - TORRENT_LOG(dht_tracker) << " client: generic"; + TORRENT_LOG(dht_tracker) << " client: " << client; } } catch (std::exception&) @@ -625,9 +628,10 @@ namespace libtorrent { namespace dht m.error_msg = list.back().string(); m.error_code = list.front().integer(); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " " + TORRENT_LOG(dht_tracker) << " incoming error: " << m.error_code << " " << m.error_msg; #endif + throw std::runtime_error("DHT error message"); } else { @@ -646,14 +650,17 @@ namespace libtorrent { namespace dht } TORRENT_LOG(dht_tracker) << e; #endif - + assert(m.message_id != messages::error); m_dht.incoming(m); } catch (std::exception& e) { #ifdef TORRENT_DHT_VERBOSE_LOGGING + int current_buffer = (m_buffer + 1) & 1; + std::string msg(m_in_buf[current_buffer].begin() + , m_in_buf[current_buffer].begin() + bytes_transferred); TORRENT_LOG(dht_tracker) << "invalid incoming packet: " - << e.what(); + << e.what() << "\n" << msg << "\n"; #endif } } @@ -737,23 +744,66 @@ namespace libtorrent { namespace dht void dht_tracker::on_bootstrap() {} + namespace + { + void write_nodes_entry(entry& r, libtorrent::dht::msg const& m) + { + bool ipv6_nodes = false; + r["nodes"] = entry(entry::string_t); + entry& n = r["nodes"]; + std::back_insert_iterator out(n.string()); + for (msg::nodes_t::const_iterator i = m.nodes.begin() + , end(m.nodes.end()); i != end; ++i) + { + if (!i->addr.address().is_v4()) + { + ipv6_nodes = true; + continue; + } + std::copy(i->id.begin(), i->id.end(), out); + write_endpoint(i->addr, out); + } + + if (ipv6_nodes) + { + r["nodes2"] = entry(entry::list_t); + entry& p = r["nodes2"]; + std::string endpoint; + for (msg::nodes_t::const_iterator i = m.nodes.begin() + , end(m.nodes.end()); i != end; ++i) + { + if (!i->addr.address().is_v6()) continue; + endpoint.resize(18 + 20); + std::string::iterator out = endpoint.begin(); + std::copy(i->id.begin(), i->id.end(), out); + out += 20; + write_endpoint(i->addr, out); + endpoint.resize(out - endpoint.begin()); + p.list().push_back(entry(endpoint)); + } + } +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size(); +#endif + } + } + void dht_tracker::send_packet(msg const& m) + try { using libtorrent::bencode; using libtorrent::entry; entry e(entry::dictionary_t); + assert(!m.transaction_id.empty() || m.message_id == messages::error); e["t"] = m.transaction_id; - std::string version_str("LT "); - std::string::iterator i = version_str.begin() + 2; - detail::write_uint8(LIBTORRENT_VERSION_MAJOR, i); - detail::write_uint8(LIBTORRENT_VERSION_MINOR, i); - e["v"] = version_str; + static char const version_str[] = {'L', 'T' + , LIBTORRENT_VERSION_MAJOR, LIBTORRENT_VERSION_MINOR}; + e["v"] = std::string(version_str, version_str + 4); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << microsec_clock::universal_time() + TORRENT_LOG(dht_tracker) << time_now_string() << " SENDING [" << m.addr << "]:"; TORRENT_LOG(dht_tracker) << " transaction: " << m.transaction_id; -// e.print(std::cerr); #endif if (m.message_id == messages::error) @@ -761,12 +811,13 @@ namespace libtorrent { namespace dht assert(m.reply); e["y"] = "e"; entry error_list(entry::list_t); + assert(m.error_code > 200 && m.error_code <= 204); error_list.list().push_back(entry(m.error_code)); error_list.list().push_back(entry(m.error_msg)); e["e"] = error_list; #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " error: " << m.error_code << " " - << m.error_msg; + TORRENT_LOG(dht_tracker) << time_now_string() + << " outgoing error: " << m.error_code << " " << m.error_msg; #endif } else if (m.reply) @@ -777,8 +828,8 @@ namespace libtorrent { namespace dht r["id"] = std::string(m.id.begin(), m.id.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " reply: " - << messages::ids[m.message_id]; + TORRENT_LOG(dht_tracker) << time_now_string() + << " reply: " << messages::ids[m.message_id]; #endif if (m.write_token.type() != entry::undefined_t) @@ -790,71 +841,27 @@ namespace libtorrent { namespace dht break; case messages::find_node: { - bool ipv6_nodes = false; - r["nodes"] = entry(entry::string_t); - entry& n = r["nodes"]; - std::back_insert_iterator out(n.string()); - for (msg::nodes_t::const_iterator i = m.nodes.begin() - , end(m.nodes.end()); i != end; ++i) - { - if (!i->addr.address().is_v4()) - { - ipv6_nodes = true; - continue; - } - std::copy(i->id.begin(), i->id.end(), out); - write_endpoint(i->addr, out); - } - - if (ipv6_nodes) - { - r["nodes2"] = entry(entry::list_t); - entry& p = r["nodes2"]; - std::string endpoint; - endpoint.resize(6); - for (msg::nodes_t::const_iterator i = m.nodes.begin() - , end(m.nodes.end()); i != end; ++i) - { - std::string::iterator out = endpoint.begin(); - std::copy(i->id.begin(), i->id.end(), out); - write_endpoint(i->addr, out); - p.list().push_back(entry(endpoint)); - } - } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size(); -#endif + write_nodes_entry(r, m); break; } case messages::get_peers: { if (m.peers.empty()) { - r["nodes"] = entry(entry::string_t); - entry& n = r["nodes"]; - std::back_insert_iterator out(n.string()); - for (msg::nodes_t::const_iterator i = m.nodes.begin() - , end(m.nodes.end()); i != end; ++i) - { - if (!i->addr.address().is_v4()) continue; - std::copy(i->id.begin(), i->id.end(), out); - write_endpoint(i->addr, out); - } -#ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(dht_tracker) << " nodes: " << m.nodes.size(); -#endif + write_nodes_entry(r, m); } else { r["values"] = entry(entry::list_t); entry& p = r["values"]; std::string endpoint; - endpoint.resize(6); for (msg::peers_t::const_iterator i = m.peers.begin() , end(m.peers.end()); i != end; ++i) { + endpoint.resize(18); std::string::iterator out = endpoint.begin(); write_endpoint(*i, out); + endpoint.resize(out - endpoint.begin()); p.list().push_back(entry(endpoint)); } #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -923,8 +930,10 @@ namespace libtorrent { namespace dht m_send_buf.clear(); bencode(std::back_inserter(m_send_buf), e); + asio::error_code ec; m_socket.send_to(asio::buffer(&m_send_buf[0] - , (int)m_send_buf.size()), m.addr); + , (int)m_send_buf.size()), m.addr, 0, ec); + if (ec) return; #ifdef TORRENT_DHT_VERBOSE_LOGGING m_total_out_bytes += m_send_buf.size(); @@ -953,6 +962,13 @@ namespace libtorrent { namespace dht send_packet(pm); } + catch (std::exception&) + { + // m_send may fail with "no route to host" + // but it shouldn't throw since an error code + // is passed in instead + assert(false); + } }} diff --git a/libtorrent/src/kademlia/find_data.cpp b/libtorrent/src/kademlia/find_data.cpp index 9fe787807..4ada42fb3 100644 --- a/libtorrent/src/kademlia/find_data.cpp +++ b/libtorrent/src/kademlia/find_data.cpp @@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include @@ -38,38 +40,6 @@ POSSIBILITY OF SUCH DAMAGE. namespace libtorrent { namespace dht { -typedef boost::shared_ptr observer_ptr; - -class find_data_observer : public observer -{ -public: - find_data_observer( - boost::intrusive_ptr const& algorithm - , node_id self - , node_id target) - : m_algorithm(algorithm) - , m_target(target) - , m_self(self) - {} - ~find_data_observer(); - - void send(msg& m) - { - m.reply = false; - m.message_id = messages::get_peers; - m.info_hash = m_target; - } - - void timeout(); - void reply(msg const&); - void abort() { m_algorithm = 0; } - -private: - boost::intrusive_ptr m_algorithm; - node_id const m_target; - node_id const m_self; -}; - find_data_observer::~find_data_observer() { if (m_algorithm) m_algorithm->failed(m_self); @@ -139,8 +109,8 @@ void find_data::invoke(node_id const& id, asio::ip::udp::endpoint addr) return; } - observer_ptr p(new find_data_observer(this, id, m_target)); - m_rpc.invoke(messages::get_peers, addr, p); + observer_ptr o(new (m_rpc.allocator().malloc()) find_data_observer(this, id, m_target)); + m_rpc.invoke(messages::get_peers, addr, o); } void find_data::got_data(msg const* m) diff --git a/libtorrent/src/kademlia/node.cpp b/libtorrent/src/kademlia/node.cpp index 07da958bb..74641ec43 100644 --- a/libtorrent/src/kademlia/node.cpp +++ b/libtorrent/src/kademlia/node.cpp @@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include @@ -50,11 +52,6 @@ POSSIBILITY OF SUCH DAMAGE. #include "libtorrent/kademlia/find_data.hpp" using boost::bind; -using boost::posix_time::second_clock; -using boost::posix_time::seconds; -using boost::posix_time::minutes; -using boost::posix_time::ptime; -using boost::posix_time::time_duration; namespace libtorrent { namespace dht { @@ -66,8 +63,6 @@ namespace } #endif -typedef boost::shared_ptr observer_ptr; - // TODO: configurable? enum { announce_interval = 30 }; @@ -99,7 +94,7 @@ void purge_peers(std::set& peers) , end(peers.end()); i != end;) { // the peer has timed out - if (i->added + minutes(int(announce_interval * 1.5f)) < second_clock::universal_time()) + if (i->added + minutes(int(announce_interval * 1.5f)) < time_now()) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(node) << "peer timed out at: " << i->addr.address(); @@ -120,7 +115,7 @@ node_impl::node_impl(boost::function const& f , m_table(m_id, 8, settings) , m_rpc(bind(&node_impl::incoming_request, this, _1) , m_id, m_table, f) - , m_last_tracker_tick(boost::posix_time::second_clock::universal_time()) + , m_last_tracker_tick(time_now()) { m_secret[0] = std::rand(); m_secret[1] = std::rand(); @@ -129,9 +124,20 @@ node_impl::node_impl(boost::function const& f bool node_impl::verify_token(msg const& m) { if (m.write_token.type() != entry::string_t) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << "token of incorrect type " << m.write_token.type(); +#endif return false; + } std::string const& token = m.write_token.string(); - if (token.length() != 4) return false; + if (token.length() != 4) + { +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << "token of incorrect length: " << token.length(); +#endif + return false; + } hasher h1; std::string address = m.addr.address().to_string(); @@ -142,10 +148,11 @@ bool node_impl::verify_token(msg const& m) sha1_hash h = h1.final(); if (std::equal(token.begin(), token.end(), (signed char*)&h[0])) return true; - + hasher h2; h2.update(&address[0], address.length()); h2.update((char*)&m_secret[1], sizeof(m_secret[1])); + h2.update((char*)&m.info_hash[0], sha1_hash::size); h = h2.final(); if (std::equal(token.begin(), token.end(), (signed char*)&h[0])) return true; @@ -258,70 +265,6 @@ void node_impl::incoming(msg const& m) namespace { - - class announce_observer : public observer - { - public: - announce_observer(sha1_hash const& info_hash, int listen_port - , entry const& write_token) - : m_info_hash(info_hash) - , m_listen_port(listen_port) - , m_token(write_token) - {} - - void send(msg& m) - { - m.port = m_listen_port; - m.info_hash = m_info_hash; - m.write_token = m_token; - } - - void timeout() {} - void reply(msg const&) {} - void abort() {} - - private: - sha1_hash m_info_hash; - int m_listen_port; - entry m_token; - }; - - class get_peers_observer : public observer - { - public: - get_peers_observer(sha1_hash const& info_hash, int listen_port - , rpc_manager& rpc - , boost::function const&, sha1_hash const&)> f) - : m_info_hash(info_hash) - , m_listen_port(listen_port) - , m_rpc(rpc) - , m_fun(f) - {} - - void send(msg& m) - { - m.port = m_listen_port; - m.info_hash = m_info_hash; - } - - void timeout() {} - void reply(msg const& r) - { - m_rpc.invoke(messages::announce_peer, r.addr - , boost::shared_ptr( - new announce_observer(m_info_hash, m_listen_port, r.write_token))); - m_fun(r.peers, m_info_hash); - } - void abort() {} - - private: - sha1_hash m_info_hash; - int m_listen_port; - rpc_manager& m_rpc; - boost::function const&, sha1_hash const&)> m_fun; - }; - - void announce_fun(std::vector const& v, rpc_manager& rpc , int listen_port, sha1_hash const& ih , boost::function const&, sha1_hash const&)> f) @@ -331,23 +274,11 @@ namespace for (std::vector::const_iterator i = v.begin() , end(v.end()); i != end; ++i) { - rpc.invoke(messages::get_peers, i->addr, boost::shared_ptr( - new get_peers_observer(ih, listen_port, rpc, f))); + rpc.invoke(messages::get_peers, i->addr, observer_ptr( + new (rpc.allocator().malloc()) get_peers_observer(ih, listen_port, rpc, f))); nodes = true; } } - -} - -namespace -{ - struct dummy_observer : observer - { - virtual void reply(msg const&) {} - virtual void timeout() {} - virtual void send(msg&) {} - virtual void abort() {} - }; } void node_impl::add_router_node(udp::endpoint router) @@ -359,8 +290,8 @@ void node_impl::add_node(udp::endpoint node) { // ping the node, and if we get a reply, it // will be added to the routing table - observer_ptr p(new dummy_observer()); - m_rpc.invoke(messages::ping, node, p); + observer_ptr o(new (m_rpc.allocator().malloc()) null_observer(m_rpc.allocator())); + m_rpc.invoke(messages::ping, node, o); } void node_impl::announce(sha1_hash const& info_hash, int listen_port @@ -377,34 +308,44 @@ void node_impl::announce(sha1_hash const& info_hash, int listen_port time_duration node_impl::refresh_timeout() { int refresh = -1; - ptime now = second_clock::universal_time(); + ptime now = time_now(); ptime next = now + minutes(15); try { for (int i = 0; i < 160; ++i) { ptime r = m_table.next_refresh(i); - if (r <= now) - { - if (refresh == -1) refresh = i; - } - else if (r < next) + if (r <= next) { + refresh = i; next = r; } } - if (refresh != -1) + if (next < now) { - #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(node) << "refreshing bucket: " << refresh; - #endif + assert(refresh > -1); +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << "refreshing bucket: " << refresh; +#endif refresh_bucket(refresh); } } catch (std::exception&) {} - if (next < now + seconds(5)) return seconds(5); - return next - now; + time_duration next_refresh = next - now; + time_duration min_next_refresh + = minutes(15) / (m_table.num_active_buckets()); + if (min_next_refresh > seconds(40)) + min_next_refresh = seconds(40); + + if (next_refresh < min_next_refresh) + next_refresh = min_next_refresh; + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + TORRENT_LOG(node) << "next refresh: " << total_seconds(next_refresh) << " seconds"; +#endif + + return next_refresh; } time_duration node_impl::connection_timeout() @@ -412,7 +353,7 @@ time_duration node_impl::connection_timeout() time_duration d = m_rpc.tick(); try { - ptime now(second_clock::universal_time()); + ptime now(time_now()); if (now - m_last_tracker_tick < minutes(10)) return d; m_last_tracker_tick = now; @@ -443,7 +384,7 @@ void node_impl::on_announce(msg const& m, msg& reply) { reply.message_id = messages::error; reply.error_code = 203; - reply.error_msg = "Incorrect write token in announce_peer message"; + reply.error_msg = "Incorrect token in announce_peer"; return; } @@ -455,7 +396,7 @@ void node_impl::on_announce(msg const& m, msg& reply) torrent_entry& v = m_map[m.info_hash]; peer_entry e; e.addr = tcp::endpoint(m.addr.address(), m.addr.port()); - e.added = second_clock::universal_time(); + e.added = time_now(); std::set::iterator i = v.peers.find(e); if (i != v.peers.end()) v.peers.erase(i++); v.peers.insert(i, e); @@ -496,6 +437,11 @@ bool node_impl::on_find(msg const& m, std::vector& peers) const void node_impl::incoming_request(msg const& m) { msg reply; + reply.message_id = m.message_id; + reply.addr = m.addr; + reply.reply = true; + reply.transaction_id = m.transaction_id; + switch (m.message_id) { case messages::ping: @@ -535,16 +481,16 @@ void node_impl::incoming_request(msg const& m) } break; case messages::announce_peer: - { - on_announce(m, reply); - } + on_announce(m, reply); break; + default: + assert(false); }; if (m_table.need_node(m.id)) - m_rpc.reply_with_ping(reply, m); + m_rpc.reply_with_ping(reply); else - m_rpc.reply(reply, m); + m_rpc.reply(reply); } diff --git a/libtorrent/src/kademlia/node_id.cpp b/libtorrent/src/kademlia/node_id.cpp index d435846d2..4ed413714 100644 --- a/libtorrent/src/kademlia/node_id.cpp +++ b/libtorrent/src/kademlia/node_id.cpp @@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include diff --git a/libtorrent/src/kademlia/refresh.cpp b/libtorrent/src/kademlia/refresh.cpp index ccd753de9..ce94ca93b 100644 --- a/libtorrent/src/kademlia/refresh.cpp +++ b/libtorrent/src/kademlia/refresh.cpp @@ -30,10 +30,13 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include #include +#include #include @@ -50,38 +53,6 @@ using asio::ip::udp; TORRENT_DEFINE_LOG(refresh) #endif -typedef boost::shared_ptr observer_ptr; - -class refresh_observer : public observer -{ -public: - refresh_observer( - boost::intrusive_ptr const& algorithm - , node_id self - , node_id target - ) - : m_target(target) - , m_self(self) - , m_algorithm(algorithm) - {} - ~refresh_observer(); - - void send(msg& m) - { - m.info_hash = m_target; - } - - void timeout(); - void reply(msg const& m); - void abort() { m_algorithm = 0; } - - -private: - node_id const m_target; - node_id const m_self; - boost::intrusive_ptr m_algorithm; -}; - refresh_observer::~refresh_observer() { if (m_algorithm) m_algorithm->failed(m_self, true); @@ -110,29 +81,6 @@ void refresh_observer::timeout() m_algorithm = 0; } -class ping_observer : public observer -{ -public: - ping_observer( - boost::intrusive_ptr const& algorithm - , node_id self - ) - : m_self(self) - , m_algorithm(algorithm) - {} - ~ping_observer(); - - void send(msg& p) {} - void timeout(); - void reply(msg const& m); - void abort() { m_algorithm = 0; } - - -private: - node_id const m_self; - boost::intrusive_ptr m_algorithm; -}; - ping_observer::~ping_observer() { if (m_algorithm) m_algorithm->ping_timeout(m_self, true); @@ -155,13 +103,10 @@ void ping_observer::timeout() void refresh::invoke(node_id const& nid, udp::endpoint addr) { - observer_ptr p(new refresh_observer( - this - , nid - , m_target - )); + observer_ptr o(new (m_rpc.allocator().malloc()) refresh_observer( + this, nid, m_target)); - m_rpc.invoke(messages::find_node, addr, p); + m_rpc.invoke(messages::find_node, addr, o); } void refresh::done() @@ -209,8 +154,9 @@ void refresh::invoke_pings_or_finish(bool prevent_request) try { - observer_ptr p(new ping_observer(this, node.id)); - m_rpc.invoke(messages::ping, node.addr, p); + observer_ptr o(new (m_rpc.allocator().malloc()) ping_observer( + this, node.id)); + m_rpc.invoke(messages::ping, node.addr, o); ++m_active_pings; ++m_leftover_nodes_iterator; } diff --git a/libtorrent/src/kademlia/routing_table.cpp b/libtorrent/src/kademlia/routing_table.cpp index 32f7514f2..45091481c 100644 --- a/libtorrent/src/kademlia/routing_table.cpp +++ b/libtorrent/src/kademlia/routing_table.cpp @@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include @@ -37,7 +39,6 @@ POSSIBILITY OF SUCH DAMAGE. #include #include #include -#include #include "libtorrent/kademlia/routing_table.hpp" #include "libtorrent/kademlia/node_id.hpp" @@ -46,13 +47,6 @@ POSSIBILITY OF SUCH DAMAGE. using boost::bind; using boost::uint8_t; -using boost::posix_time::second_clock; -using boost::posix_time::minutes; -using boost::posix_time::seconds; -using boost::posix_time::hours; - -namespace pt = boost::posix_time; - namespace libtorrent { namespace dht { @@ -69,7 +63,8 @@ routing_table::routing_table(node_id const& id, int bucket_size // distribute the refresh times for the buckets in an // attempt do even out the network load for (int i = 0; i < 160; ++i) - m_bucket_activity[i] = second_clock::universal_time() - seconds(15*60 - i*5); + m_bucket_activity[i] = time_now() - milliseconds(i*5625); + m_bucket_activity[0] = time_now() - minutes(15); } boost::tuple routing_table::size() const @@ -85,14 +80,34 @@ boost::tuple routing_table::size() const return boost::make_tuple(nodes, replacements); } +size_type routing_table::num_global_nodes() const +{ + int first_full = m_lowest_active_bucket; + int num_nodes = 1; // we are one of the nodes + for (; first_full < 160 + && int(m_buckets[first_full].first.size()) < m_bucket_size; + ++first_full) + { + num_nodes += m_buckets[first_full].first.size(); + } + + return (2 << (160 - first_full)) * num_nodes; +} + +#ifdef TORRENT_DHT_VERBOSE_LOGGING + void routing_table::print_state(std::ostream& os) const { os << "kademlia routing table state\n" << "bucket_size: " << m_bucket_size << "\n" + << "global node count: " << num_global_nodes() << "\n" << "node_id: " << m_id << "\n\n"; - os << "number of nodes per bucket:\n" - "live\n"; + os << "number of nodes per bucket:\n-- live "; + for (int i = 8; i < 160; ++i) + os << "-"; + os << "\n"; + for (int k = 0; k < 8; ++k) { for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); @@ -117,17 +132,20 @@ void routing_table::print_state(std::ostream& os) const } os << "\n"; } - os << "cached\n-----------\n"; + os << "-- cached "; + for (int i = 10; i < 160; ++i) + os << "-"; + os << "\n\n"; os << "nodes:\n"; for (table_t::const_iterator i = m_buckets.begin(), end(m_buckets.end()); i != end; ++i) { int bucket_index = int(i - m_buckets.begin()); - os << "bucket " << bucket_index << " " - << to_simple_string(m_bucket_activity[bucket_index]) - << " " << (bucket_index >= m_lowest_active_bucket?"active":"inactive") - << "\n"; + os << "=== BUCKET = " << bucket_index + << " = " << (bucket_index >= m_lowest_active_bucket?"active":"inactive") + << " = " << total_seconds(time_now() - m_bucket_activity[bucket_index]) + << " s ago ===== \n"; for (bucket_t::const_iterator j = i->first.begin() , end(i->first.end()); j != end; ++j) { @@ -137,19 +155,21 @@ void routing_table::print_state(std::ostream& os) const } } +#endif + void routing_table::touch_bucket(int bucket) { - m_bucket_activity[bucket] = second_clock::universal_time(); + m_bucket_activity[bucket] = time_now(); } -boost::posix_time::ptime routing_table::next_refresh(int bucket) +ptime routing_table::next_refresh(int bucket) { assert(bucket < 160); assert(bucket >= 0); // lower than or equal to since a refresh of bucket 0 will // effectively refresh the lowest active bucket as well - if (bucket <= m_lowest_active_bucket && bucket > 0) - return second_clock::universal_time() + minutes(15); + if (bucket < m_lowest_active_bucket && bucket > 0) + return time_now() + minutes(15); return m_bucket_activity[bucket] + minutes(15); } @@ -177,11 +197,11 @@ bool routing_table::need_node(node_id const& id) if ((int)rb.size() >= m_bucket_size) return false; // if the node already exists, we don't need it - if (std::find_if(b.begin(), b.end(), bind(std::equal_to() - , bind(&node_entry::id, _1), id)) != b.end()) return false; + if (std::find_if(b.begin(), b.end(), bind(&node_entry::id, _1) == id) + != b.end()) return false; - if (std::find_if(rb.begin(), rb.end(), bind(std::equal_to() - , bind(&node_entry::id, _1), id)) != rb.end()) return false; + if (std::find_if(rb.begin(), rb.end(), bind(&node_entry::id, _1) == id) + != rb.end()) return false; return true; } @@ -195,8 +215,7 @@ void routing_table::node_failed(node_id const& id) bucket_t& rb = m_buckets[bucket_index].second; bucket_t::iterator i = std::find_if(b.begin(), b.end() - , bind(std::equal_to() - , bind(&node_entry::id, _1), id)); + , bind(&node_entry::id, _1) == id); if (i == b.end()) return; @@ -245,12 +264,11 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr) bucket_t& b = m_buckets[bucket_index].first; bucket_t::iterator i = std::find_if(b.begin(), b.end() - , bind(std::equal_to() - , bind(&node_entry::id, _1), id)); + , bind(&node_entry::id, _1) == id); bool ret = need_bootstrap(); - m_bucket_activity[bucket_index] = second_clock::universal_time(); + //m_bucket_activity[bucket_index] = time_now(); if (i != b.end()) { @@ -274,6 +292,7 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr) // offline if ((int)b.size() < m_bucket_size) { + if (b.empty()) b.reserve(m_bucket_size); b.push_back(node_entry(id, addr)); // if bucket index is 0, the node is ourselves // don't updated m_lowest_active_bucket @@ -293,9 +312,8 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr) // with nodes from that cache. i = std::max_element(b.begin(), b.end() - , bind(std::less() - , bind(&node_entry::fail_count, _1) - , bind(&node_entry::fail_count, _2))); + , bind(&node_entry::fail_count, _1) + < bind(&node_entry::fail_count, _2)); if (i != b.end() && i->fail_count > 0) { @@ -315,14 +333,14 @@ bool routing_table::node_seen(node_id const& id, udp::endpoint addr) bucket_t& rb = m_buckets[bucket_index].second; i = std::find_if(rb.begin(), rb.end() - , bind(std::equal_to() - , bind(&node_entry::id, _1), id)); + , bind(&node_entry::id, _1) == id); // if the node is already in the replacement bucket // just return. if (i != rb.end()) return ret; if ((int)rb.size() > m_bucket_size) rb.erase(rb.begin()); + if (rb.empty()) rb.reserve(m_bucket_size); rb.push_back(node_entry(id, addr)); // TORRENT_LOG(table) << "inserting node in replacement cache: " << id << " " << addr; return ret; @@ -358,8 +376,7 @@ void routing_table::find_node(node_id const& target if ((int)l.size() == count) { assert(std::count_if(l.begin(), l.end() - , boost::bind(std::not_equal_to() - , boost::bind(&node_entry::fail_count, _1), 0)) == 0); + , boost::bind(&node_entry::fail_count, _1) != 0) == 0); return; } @@ -391,8 +408,7 @@ void routing_table::find_node(node_id const& target || bucket_index == (int)m_buckets.size() - 1) { assert(std::count_if(l.begin(), l.end() - , boost::bind(std::not_equal_to() - , boost::bind(&node_entry::fail_count, _1), 0)) == 0); + , boost::bind(&node_entry::fail_count, _1) != 0) == 0); return; } @@ -406,8 +422,7 @@ void routing_table::find_node(node_id const& target { l.erase(l.begin() + count, l.end()); assert(std::count_if(l.begin(), l.end() - , boost::bind(std::not_equal_to() - , boost::bind(&node_entry::fail_count, _1), 0)) == 0); + , boost::bind(&node_entry::fail_count, _1) != 0) == 0); return; } } @@ -416,8 +431,7 @@ void routing_table::find_node(node_id const& target assert((int)l.size() <= count); assert(std::count_if(l.begin(), l.end() - , boost::bind(std::not_equal_to() - , boost::bind(&node_entry::fail_count, _1), 0)) == 0); + , boost::bind(&node_entry::fail_count, _1) != 0) == 0); } routing_table::iterator routing_table::begin() const diff --git a/libtorrent/src/kademlia/rpc_manager.cpp b/libtorrent/src/kademlia/rpc_manager.cpp index 7295938d0..93eac8565 100644 --- a/libtorrent/src/kademlia/rpc_manager.cpp +++ b/libtorrent/src/kademlia/rpc_manager.cpp @@ -30,24 +30,31 @@ POSSIBILITY OF SUCH DAMAGE. */ -#include -#include +#include "libtorrent/pch.hpp" +#include "libtorrent/socket.hpp" + #include +#include +#include +#include +#include +#include +#include #include #include #include #include #include +#include +#include +#include +#include +#include #include #include -using boost::posix_time::ptime; -using boost::posix_time::time_duration; -using boost::posix_time::microsec_clock; -using boost::posix_time::seconds; -using boost::posix_time::milliseconds; using boost::shared_ptr; using boost::bind; @@ -55,22 +62,57 @@ namespace libtorrent { namespace dht { namespace io = libtorrent::detail; +namespace mpl = boost::mpl; #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_DEFINE_LOG(rpc) #endif +void intrusive_ptr_add_ref(observer const* o) +{ + assert(o->m_refs >= 0); + assert(o != 0); + ++o->m_refs; +} + +void intrusive_ptr_release(observer const* o) +{ + assert(o->m_refs > 0); + assert(o != 0); + if (--o->m_refs == 0) + { + boost::pool<>& p = o->pool_allocator; + o->~observer(); + p.ordered_free(const_cast(o)); + } +} + node_id generate_id(); +typedef mpl::vector< + closest_nodes_observer + , find_data_observer + , announce_observer + , get_peers_observer + , refresh_observer + , ping_observer + , null_observer + > observer_types; + +typedef mpl::max_element< + mpl::transform_view > + >::type max_observer_type_iter; + rpc_manager::rpc_manager(fun const& f, node_id const& our_id , routing_table& table, send_fun const& sf) - : m_next_transaction_id(rand() % max_transactions) + : m_pool_allocator(sizeof(mpl::deref::type)) + , m_next_transaction_id(rand() % max_transactions) , m_oldest_transaction_id(m_next_transaction_id) , m_incoming(f) , m_send(sf) , m_our_id(our_id) , m_table(table) - , m_timer(boost::posix_time::microsec_clock::universal_time()) + , m_timer(time_now()) , m_random_number(generate_id()) , m_destructing(false) { @@ -121,12 +163,21 @@ bool rpc_manager::incoming(msg const& m) // if we don't have the transaction id in our // request list, ignore the packet - if (m.transaction_id.size() != 2) + if (m.transaction_id.size() < 2) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Reply with invalid transaction id size: " << m.transaction_id.size() << " from " << m.addr; #endif + msg reply; + reply.reply = true; + reply.message_id = messages::error; + reply.error_code = 203; // Protocol error + reply.error_msg = "reply with invalid transaction id, size " + + boost::lexical_cast(m.transaction_id.size()); + reply.addr = m.addr; + reply.transaction_id = ""; + m_send(reply); return false; } @@ -137,19 +188,27 @@ bool rpc_manager::incoming(msg const& m) || tid < 0) { #ifdef TORRENT_DHT_VERBOSE_LOGGING - TORRENT_LOG(rpc) << "Reply with unknown transaction id: " + TORRENT_LOG(rpc) << "Reply with invalid transaction id: " << tid << " from " << m.addr; #endif + msg reply; + reply.reply = true; + reply.message_id = messages::error; + reply.error_code = 203; // Protocol error + reply.error_msg = "reply with invalid transaction id"; + reply.addr = m.addr; + reply.transaction_id = ""; + m_send(reply); return false; } - boost::shared_ptr o = m_transactions[tid]; + observer_ptr o = m_transactions[tid]; if (!o) { #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(rpc) << "Reply with unknown transaction id: " - << tid << " from " << m.addr; + << tid << " from " << m.addr << " (possibly timed out)"; #endif return false; } @@ -165,11 +224,11 @@ bool rpc_manager::incoming(msg const& m) #ifdef TORRENT_DHT_VERBOSE_LOGGING std::ofstream reply_stats("libtorrent_logs/round_trip_ms.log", std::ios::app); - reply_stats << m.addr << "\t" << (microsec_clock::universal_time() - - o->sent).total_milliseconds() << std::endl; + reply_stats << m.addr << "\t" << total_milliseconds(time_now() - o->sent) + << std::endl; #endif o->reply(m); - m_transactions[tid].reset(); + m_transactions[tid] = 0; if (m.piggy_backed_ping) { @@ -178,17 +237,16 @@ bool rpc_manager::incoming(msg const& m) msg ph; ph.message_id = messages::ping; ph.transaction_id = m.ping_transaction_id; - ph.id = m_our_id; ph.addr = m.addr; - - msg empty; + ph.reply = true; - reply(empty, ph); + reply(ph); } return m_table.node_seen(m.id, m.addr); } else { + assert(m.message_id != messages::error); // this is an incoming request m_incoming(m); } @@ -199,15 +257,13 @@ time_duration rpc_manager::tick() { INVARIANT_CHECK; - using boost::posix_time::microsec_clock; - const int timeout_ms = 10 * 1000; // look for observers that has timed out if (m_next_transaction_id == m_oldest_transaction_id) return milliseconds(timeout_ms); - std::vector > timeouts; + std::vector timeouts; for (;m_next_transaction_id != m_oldest_transaction_id; m_oldest_transaction_id = (m_oldest_transaction_id + 1) % max_transactions) @@ -215,11 +271,10 @@ time_duration rpc_manager::tick() assert(m_oldest_transaction_id >= 0); assert(m_oldest_transaction_id < max_transactions); - boost::shared_ptr o = m_transactions[m_oldest_transaction_id]; + observer_ptr o = m_transactions[m_oldest_transaction_id]; if (!o) continue; - time_duration diff = o->sent + milliseconds(timeout_ms) - - microsec_clock::universal_time(); + time_duration diff = o->sent + milliseconds(timeout_ms) - time_now(); if (diff > seconds(0)) { if (diff < seconds(1)) return seconds(1); @@ -228,7 +283,7 @@ time_duration rpc_manager::tick() try { - m_transactions[m_oldest_transaction_id].reset(); + m_transactions[m_oldest_transaction_id] = 0; timeouts.push_back(o); } catch (std::exception) {} } @@ -239,11 +294,11 @@ time_duration rpc_manager::tick() // clear the aborted transactions, will likely // generate new requests. We need to swap, since the // destrutors may add more observers to the m_aborted_transactions - std::vector >().swap(m_aborted_transactions); + std::vector().swap(m_aborted_transactions); return milliseconds(timeout_ms); } -unsigned int rpc_manager::new_transaction_id(shared_ptr o) +unsigned int rpc_manager::new_transaction_id(observer_ptr o) { INVARIANT_CHECK; @@ -255,7 +310,7 @@ unsigned int rpc_manager::new_transaction_id(shared_ptr o) // it will prevent it from spawning new requests right now, // since that would break the invariant m_aborted_transactions.push_back(m_transactions[m_next_transaction_id]); - m_transactions[m_next_transaction_id].reset(); + m_transactions[m_next_transaction_id] = 0; assert(m_oldest_transaction_id == m_next_transaction_id); } assert(!m_transactions[tid]); @@ -288,7 +343,7 @@ void rpc_manager::update_oldest_transaction_id() } void rpc_manager::invoke(int message_id, udp::endpoint target_addr - , shared_ptr o) + , observer_ptr o) { INVARIANT_CHECK; @@ -315,7 +370,7 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr o->send(m); - o->sent = boost::posix_time::microsec_clock::universal_time(); + o->sent = time_now(); o->target_addr = target_addr; #ifdef TORRENT_DHT_VERBOSE_LOGGING @@ -333,66 +388,40 @@ void rpc_manager::invoke(int message_id, udp::endpoint target_addr } } -void rpc_manager::reply(msg& m, msg const& reply_to) +void rpc_manager::reply(msg& m) { INVARIANT_CHECK; if (m_destructing) return; - if (m.message_id != messages::error) - m.message_id = reply_to.message_id; - m.addr = reply_to.addr; - m.reply = true; + assert(m.reply); m.piggy_backed_ping = false; m.id = m_our_id; - m.transaction_id = reply_to.transaction_id; m_send(m); } -namespace -{ - struct dummy_observer : observer - { - virtual void reply(msg const&) {} - virtual void timeout() {} - virtual void send(msg&) {} - void abort() {} - }; -} - -void rpc_manager::reply_with_ping(msg& m, msg const& reply_to) +void rpc_manager::reply_with_ping(msg& m) { INVARIANT_CHECK; if (m_destructing) return; + assert(m.reply); - if (m.message_id != messages::error) - m.message_id = reply_to.message_id; - m.addr = reply_to.addr; - m.reply = true; m.piggy_backed_ping = true; m.id = m_our_id; - m.transaction_id = reply_to.transaction_id; - try - { - m.ping_transaction_id.clear(); - std::back_insert_iterator out(m.ping_transaction_id); - io::write_uint16(m_next_transaction_id, out); + m.ping_transaction_id.clear(); + std::back_insert_iterator out(m.ping_transaction_id); + io::write_uint16(m_next_transaction_id, out); - boost::shared_ptr o(new dummy_observer); - assert(!m_transactions[m_next_transaction_id]); - o->sent = boost::posix_time::microsec_clock::universal_time(); - o->target_addr = m.addr; + observer_ptr o(new (allocator().malloc()) null_observer(allocator())); + assert(!m_transactions[m_next_transaction_id]); + o->sent = time_now(); + o->target_addr = m.addr; - m_send(m); - new_transaction_id(o); - } - catch (std::exception& e) - { - // m_send may fail with "no route to host" - } + m_send(m); + new_transaction_id(o); } diff --git a/libtorrent/src/kademlia/traversal_algorithm.cpp b/libtorrent/src/kademlia/traversal_algorithm.cpp index 1efe76e77..ceb977f19 100644 --- a/libtorrent/src/kademlia/traversal_algorithm.cpp +++ b/libtorrent/src/kademlia/traversal_algorithm.cpp @@ -30,6 +30,8 @@ POSSIBILITY OF SUCH DAMAGE. */ +#include "libtorrent/pch.hpp" + #include #include #include @@ -66,8 +68,7 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig if (i == m_results.end() || i->id != id) { assert(std::find_if(m_results.begin(), m_results.end() - , bind(std::equal_to() - , bind(&result::id, _1), id)) == m_results.end()); + , bind(&result::id, _1) == id) == m_results.end()); #ifdef TORRENT_DHT_VERBOSE_LOGGING TORRENT_LOG(traversal) << "adding result: " << id << " " << addr; #endif @@ -75,6 +76,11 @@ void traversal_algorithm::add_entry(node_id const& id, udp::endpoint addr, unsig } } +boost::pool<>& traversal_algorithm::allocator() const +{ + return m_rpc.allocator(); +} + void traversal_algorithm::traverse(node_id const& id, udp::endpoint addr) { add_entry(id, addr, 0);