diff --git a/libtorrent/include/libtorrent/asio/buffer.hpp b/libtorrent/include/libtorrent/asio/buffer.hpp index 881bc1176..57df33edf 100644 --- a/libtorrent/include/libtorrent/asio/buffer.hpp +++ b/libtorrent/include/libtorrent/asio/buffer.hpp @@ -392,7 +392,12 @@ public: ~buffer_debug_check() { +#if BOOST_WORKAROUND(BOOST_MSVC, >= 1400) + // MSVC's string iterator checking may crash in a std::string::iterator + // object's destructor when the iterator points to an already-destroyed + // std::string object, unless the iterator is cleared first. iter_ = Iterator(); +#endif // BOOST_WORKAROUND(BOOST_MSVC, >= 1400) } void operator()() diff --git a/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp b/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp index 93d39a23c..68b6fff4c 100644 --- a/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp +++ b/libtorrent/include/libtorrent/asio/detail/epoll_reactor.hpp @@ -66,7 +66,8 @@ public: pending_cancellations_(), stop_thread_(false), thread_(0), - shutdown_(false) + shutdown_(false), + need_epoll_wait_(true) { // Start the reactor's internal thread only if needed. if (Own_Thread) @@ -387,7 +388,9 @@ private: // Block on the epoll descriptor. epoll_event events[128]; - int num_events = epoll_wait(epoll_fd_, events, 128, timeout); + int num_events = (block || need_epoll_wait_) + ? epoll_wait(epoll_fd_, events, 128, timeout) + : 0; lock.lock(); wait_in_progress_ = false; @@ -478,6 +481,10 @@ private: cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + // Determine whether epoll_wait should be called when the reactor next runs. + need_epoll_wait_ = !read_op_queue_.empty() + || !write_op_queue_.empty() || !except_op_queue_.empty(); + cleanup_operations_and_timers(lock); } @@ -632,6 +639,9 @@ private: // Whether the service has been shut down. bool shutdown_; + + // Whether we need to call epoll_wait the next time the reactor is run. + bool need_epoll_wait_; }; } // namespace detail diff --git a/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp b/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp index 896ff5403..6b09d4dbc 100644 --- a/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp +++ b/libtorrent/include/libtorrent/asio/detail/kqueue_reactor.hpp @@ -74,7 +74,8 @@ public: pending_cancellations_(), stop_thread_(false), thread_(0), - shutdown_(false) + shutdown_(false), + need_kqueue_wait_(true) { // Start the reactor's internal thread only if needed. if (Own_Thread) @@ -373,7 +374,9 @@ private: // Block on the kqueue descriptor. struct kevent events[128]; - int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout); + int num_events = (block || need_kqueue_wait_) + ? kevent(kqueue_fd_, 0, 0, events, 128, timeout) + : 0; lock.lock(); wait_in_progress_ = false; @@ -478,6 +481,10 @@ private: cancel_ops_unlocked(pending_cancellations_[i]); pending_cancellations_.clear(); + // Determine whether kqueue needs to be called next time the reactor is run. + need_kqueue_wait_ = !read_op_queue_.empty() + || !write_op_queue_.empty() || !except_op_queue_.empty(); + cleanup_operations_and_timers(lock); } @@ -630,6 +637,9 @@ private: // Whether the service has been shut down. bool shutdown_; + + // Whether we need to call kqueue the next time the reactor is run. + bool need_kqueue_wait_; }; } // namespace detail diff --git a/libtorrent/include/libtorrent/asio/detail/old_win_sdk_compat.hpp b/libtorrent/include/libtorrent/asio/detail/old_win_sdk_compat.hpp index 2fb957aa9..e32552197 100644 --- a/libtorrent/include/libtorrent/asio/detail/old_win_sdk_compat.hpp +++ b/libtorrent/include/libtorrent/asio/detail/old_win_sdk_compat.hpp @@ -31,6 +31,10 @@ #if defined(ASIO_HAS_OLD_WIN_SDK) // Emulation of types that are missing from old Platform SDKs. +// +// N.B. this emulation is also used if building for a Windows 2000 target with +// a recent (i.e. Vista or later) SDK, as the SDK does not provide IPv6 support +// in that case. namespace asio { namespace detail { @@ -54,9 +58,19 @@ struct sockaddr_storage_emulation struct in6_addr_emulation { - u_char s6_addr[16]; + union + { + u_char Byte[16]; + u_short Word[8]; + } u; }; +#if !defined(s6_addr) +# define _S6_un u +# define _S6_u8 Byte +# define s6_addr _S6_un._S6_u8 +#endif // !defined(s6_addr) + struct sockaddr_in6_emulation { short sin6_family; diff --git a/libtorrent/include/libtorrent/asio/detail/push_options.hpp b/libtorrent/include/libtorrent/asio/detail/push_options.hpp index 0b68d2933..96ddf7d04 100644 --- a/libtorrent/include/libtorrent/asio/detail/push_options.hpp +++ b/libtorrent/include/libtorrent/asio/detail/push_options.hpp @@ -90,6 +90,12 @@ # pragma warning (disable:4244) # pragma warning (disable:4355) # pragma warning (disable:4675) +# if defined(_M_IX86) && defined(_Wp64) +// The /Wp64 option is broken. If you want to check 64 bit portability, use a +// 64 bit compiler! +# pragma warning (disable:4311) +# pragma warning (disable:4312) +# endif // defined(_M_IX86) && defined(_Wp64) # pragma pack (push, 8) // Note that if the /Og optimisation flag is enabled with MSVC6, the compiler // has a tendency to incorrectly optimise away some calls to member template diff --git a/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp b/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp index b99383a80..06ffe9f91 100644 --- a/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp +++ b/libtorrent/include/libtorrent/asio/detail/socket_ops.hpp @@ -1811,19 +1811,22 @@ inline asio::error_code getnameinfo(const socket_addr_type* addr, # if defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0501) || defined(UNDER_CE) // Building for Windows XP, Windows Server 2003, or later. clear_error(ec); - int error = ::getnameinfo(addr, addrlen, host, static_cast(hostlen), + int error = ::getnameinfo(addr, static_cast(addrlen), + host, static_cast(hostlen), serv, static_cast(servlen), flags); return ec = translate_addrinfo_error(error); # else // Building for Windows 2000 or earlier. typedef int (WSAAPI *gni_t)(const socket_addr_type*, - int, char*, std::size_t, char*, std::size_t, int); + int, char*, DWORD, char*, DWORD, int); if (HMODULE winsock_module = ::GetModuleHandleA("ws2_32")) { if (gni_t gni = (gni_t)::GetProcAddress(winsock_module, "getnameinfo")) { clear_error(ec); - int error = gni(addr, addrlen, host, hostlen, serv, servlen, flags); + int error = gni(addr, static_cast(addrlen), + host, static_cast(hostlen), + serv, static_cast(servlen), flags); return ec = translate_addrinfo_error(error); } } diff --git a/libtorrent/include/libtorrent/asio/detail/socket_types.hpp b/libtorrent/include/libtorrent/asio/detail/socket_types.hpp index 4ff207f57..d1497a345 100644 --- a/libtorrent/include/libtorrent/asio/detail/socket_types.hpp +++ b/libtorrent/include/libtorrent/asio/detail/socket_types.hpp @@ -92,7 +92,7 @@ # include # include # include -# if defined(__hpux) +# if defined(__hpux) && !defined(__HP_aCC) # include # else # include diff --git a/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service.hpp b/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service.hpp index 46e651653..fe2e58a0e 100644 --- a/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service.hpp +++ b/libtorrent/include/libtorrent/asio/detail/win_iocp_io_service.hpp @@ -34,7 +34,6 @@ #include "asio/detail/service_base.hpp" #include "asio/detail/socket_types.hpp" #include "asio/detail/timer_queue.hpp" -#include "asio/detail/win_iocp_operation.hpp" #include "asio/detail/mutex.hpp" namespace asio { @@ -44,14 +43,64 @@ class win_iocp_io_service : public asio::detail::service_base { public: - // Base class for all operations. - typedef win_iocp_operation operation; + // Base class for all operations. A function pointer is used instead of + // virtual functions to avoid the associated overhead. + // + // This class inherits from OVERLAPPED so that we can downcast to get back to + // the operation pointer from the LPOVERLAPPED out parameter of + // GetQueuedCompletionStatus. + class operation + : public OVERLAPPED + { + public: + typedef void (*invoke_func_type)(operation*, DWORD, size_t); + typedef void (*destroy_func_type)(operation*); + + operation(win_iocp_io_service& iocp_service, + invoke_func_type invoke_func, destroy_func_type destroy_func) + : outstanding_operations_(&iocp_service.outstanding_operations_), + invoke_func_(invoke_func), + destroy_func_(destroy_func) + { + Internal = 0; + InternalHigh = 0; + Offset = 0; + OffsetHigh = 0; + hEvent = 0; + + ::InterlockedIncrement(outstanding_operations_); + } + + void do_completion(DWORD last_error, size_t bytes_transferred) + { + invoke_func_(this, last_error, bytes_transferred); + } + + void destroy() + { + destroy_func_(this); + } + + protected: + // Prevent deletion through this type. + ~operation() + { + ::InterlockedDecrement(outstanding_operations_); + } + + private: + long* outstanding_operations_; + invoke_func_type invoke_func_; + destroy_func_type destroy_func_; + }; + // Constructor. win_iocp_io_service(asio::io_service& io_service) : asio::detail::service_base(io_service), iocp_(), outstanding_work_(0), + outstanding_operations_(0), stopped_(0), shutdown_(0), timer_thread_(0), @@ -79,7 +128,7 @@ public: { ::InterlockedExchange(&shutdown_, 1); - for (;;) + while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0) { DWORD bytes_transferred = 0; #if (WINVER < 0x0500) @@ -88,12 +137,8 @@ public: DWORD_PTR completion_key = 0; #endif LPOVERLAPPED overlapped = 0; - ::SetLastError(0); - BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, - &bytes_transferred, &completion_key, &overlapped, 0); - DWORD last_error = ::GetLastError(); - if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT) - break; + ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred, + &completion_key, &overlapped, INFINITE); if (overlapped) static_cast(overlapped)->destroy(); } @@ -249,7 +294,7 @@ public: } // Request invocation of the given OVERLAPPED-derived operation. - void post_completion(win_iocp_operation* op, DWORD op_last_error, + void post_completion(operation* op, DWORD op_last_error, DWORD bytes_transferred) { // Enqueue the operation on the I/O completion port. @@ -347,7 +392,7 @@ private: &timer_thread_, this_thread_id, 0) == 0); // Calculate timeout for GetQueuedCompletionStatus call. - DWORD timeout = max_timeout; + DWORD timeout = INFINITE; if (dispatching_timers) { asio::detail::mutex::scoped_lock lock(timer_mutex_); @@ -371,13 +416,28 @@ private: // Dispatch any pending timers. if (dispatching_timers) { - asio::detail::mutex::scoped_lock lock(timer_mutex_); - timer_queues_copy_ = timer_queues_; - for (std::size_t i = 0; i < timer_queues_.size(); ++i) + try { - timer_queues_[i]->dispatch_timers(); - timer_queues_[i]->dispatch_cancellations(); - timer_queues_[i]->cleanup_timers(); + asio::detail::mutex::scoped_lock lock(timer_mutex_); + timer_queues_copy_ = timer_queues_; + for (std::size_t i = 0; i < timer_queues_.size(); ++i) + { + timer_queues_[i]->dispatch_timers(); + timer_queues_[i]->dispatch_cancellations(); + timer_queues_[i]->cleanup_timers(); + } + } + catch (...) + { + // Transfer responsibility for dispatching timers to another thread. + if (::InterlockedCompareExchange(&timer_thread_, + 0, this_thread_id) == this_thread_id) + { + ::PostQueuedCompletionStatus(iocp_.handle, + 0, transfer_timer_dispatching, 0); + } + + throw; } } @@ -532,7 +592,7 @@ private: { handler_operation(win_iocp_io_service& io_service, Handler handler) - : operation(&handler_operation::do_completion_impl, + : operation(io_service, &handler_operation::do_completion_impl, &handler_operation::destroy_impl), io_service_(io_service), handler_(handler) @@ -593,6 +653,10 @@ private: // The count of unfinished work. long outstanding_work_; + // The count of unfinished operations. + long outstanding_operations_; + friend class operation; + // Flag to indicate whether the event loop has been stopped. long stopped_; @@ -602,7 +666,7 @@ private: enum { // Maximum GetQueuedCompletionStatus timeout, in milliseconds. - max_timeout = 1000, + max_timeout = 500, // Completion key value to indicate that responsibility for dispatching // timers is being cooperatively transferred from one thread to another. diff --git a/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp b/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp index cb3640638..faf6a5c1e 100644 --- a/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp +++ b/libtorrent/include/libtorrent/asio/detail/win_iocp_socket_service.hpp @@ -56,7 +56,7 @@ public: typedef typename Protocol::endpoint endpoint_type; // Base class for all operations. - typedef win_iocp_operation operation; + typedef win_iocp_io_service::operation operation; struct noop_deleter { void operator()(void*) {} }; typedef boost::shared_ptr shared_cancel_token_type; @@ -680,13 +680,13 @@ public: : public operation { public: - send_operation(asio::io_service& io_service, + send_operation(win_iocp_io_service& io_service, weak_cancel_token_type cancel_token, const ConstBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &send_operation::do_completion_impl, &send_operation::destroy_impl), - work_(io_service), + work_(io_service.get_io_service()), cancel_token_(cancel_token), buffers_(buffers), handler_(handler) @@ -782,8 +782,8 @@ public: typedef send_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), impl.cancel_token_, buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, + impl.cancel_token_, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -860,7 +860,7 @@ public: // Send the data. DWORD bytes_transferred = 0; int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, - flags, destination.data(), destination.size(), 0, 0); + flags, destination.data(), static_cast(destination.size()), 0, 0); if (result != 0) { DWORD last_error = ::WSAGetLastError(); @@ -880,12 +880,12 @@ public: : public operation { public: - send_to_operation(asio::io_service& io_service, + send_to_operation(win_iocp_io_service& io_service, const ConstBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &send_to_operation::do_completion_impl, &send_to_operation::destroy_impl), - work_(io_service), + work_(io_service.get_io_service()), buffers_(buffers), handler_(handler) { @@ -973,8 +973,7 @@ public: typedef send_to_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -991,8 +990,8 @@ public: // Send the data. DWORD bytes_transferred = 0; - int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, - flags, destination.data(), destination.size(), ptr.get(), 0); + int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags, + destination.data(), static_cast(destination.size()), ptr.get(), 0); DWORD last_error = ::WSAGetLastError(); // Check if the operation completed immediately. @@ -1074,15 +1073,15 @@ public: : public operation { public: - receive_operation(asio::io_service& io_service, + receive_operation(win_iocp_io_service& io_service, weak_cancel_token_type cancel_token, const MutableBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &receive_operation< MutableBufferSequence, Handler>::do_completion_impl, &receive_operation< MutableBufferSequence, Handler>::destroy_impl), - work_(io_service), + work_(io_service.get_io_service()), cancel_token_(cancel_token), buffers_(buffers), handler_(handler) @@ -1185,8 +1184,8 @@ public: typedef receive_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), impl.cancel_token_, buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, + impl.cancel_token_, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -1290,17 +1289,17 @@ public: : public operation { public: - receive_from_operation(asio::io_service& io_service, + receive_from_operation(win_iocp_io_service& io_service, endpoint_type& endpoint, const MutableBufferSequence& buffers, Handler handler) - : operation( + : operation(io_service, &receive_from_operation< MutableBufferSequence, Handler>::do_completion_impl, &receive_from_operation< MutableBufferSequence, Handler>::destroy_impl), endpoint_(endpoint), endpoint_size_(static_cast(endpoint.capacity())), - work_(io_service), + work_(io_service.get_io_service()), buffers_(buffers), handler_(handler) { @@ -1405,8 +1404,8 @@ public: typedef receive_from_operation value_type; typedef handler_alloc_traits alloc_traits; raw_handler_ptr raw_ptr(handler); - handler_ptr ptr(raw_ptr, - this->get_io_service(), sender_endp, buffers, handler); + handler_ptr ptr(raw_ptr, iocp_service_, + sender_endp, buffers, handler); // Copy buffers into WSABUF array. ::WSABUF bufs[max_buffers]; @@ -1508,7 +1507,7 @@ public: socket_type socket, socket_type new_socket, Socket& peer, const protocol_type& protocol, endpoint_type* peer_endpoint, bool enable_connection_aborted, Handler handler) - : operation( + : operation(io_service, &accept_operation::do_completion_impl, &accept_operation::destroy_impl), io_service_(io_service), diff --git a/libtorrent/include/libtorrent/asio/error.hpp b/libtorrent/include/libtorrent/asio/error.hpp index 4fd47d2d4..1c1946af6 100644 --- a/libtorrent/include/libtorrent/asio/error.hpp +++ b/libtorrent/include/libtorrent/asio/error.hpp @@ -70,6 +70,11 @@ enum basic_errors /// Operation already in progress. already_started = ASIO_SOCKET_ERROR(EALREADY), + /// Broken pipe. + broken_pipe = ASIO_WIN_OR_POSIX( + ASIO_NATIVE_ERROR(ERROR_BROKEN_PIPE), + ASIO_NATIVE_ERROR(EPIPE)), + /// A connection has been aborted. connection_aborted = ASIO_SOCKET_ERROR(ECONNABORTED), diff --git a/libtorrent/include/libtorrent/asio/ssl/detail/openssl_init.hpp b/libtorrent/include/libtorrent/asio/ssl/detail/openssl_init.hpp index 06fbf86fe..e646acf48 100755 --- a/libtorrent/include/libtorrent/asio/ssl/detail/openssl_init.hpp +++ b/libtorrent/include/libtorrent/asio/ssl/detail/openssl_init.hpp @@ -20,10 +20,12 @@ #include "asio/detail/push_options.hpp" #include +#include #include #include "asio/detail/pop_options.hpp" #include "asio/detail/mutex.hpp" +#include "asio/detail/tss_ptr.hpp" #include "asio/ssl/detail/openssl_types.hpp" namespace asio { @@ -51,6 +53,7 @@ private: for (size_t i = 0; i < mutexes_.size(); ++i) mutexes_[i].reset(new asio::detail::mutex); ::CRYPTO_set_locking_callback(&do_init::openssl_locking_func); + ::CRYPTO_set_id_callback(&do_init::openssl_id_func); } } @@ -58,6 +61,7 @@ private: { if (Do_Init) { + ::CRYPTO_set_id_callback(0); ::CRYPTO_set_locking_callback(0); ::ERR_free_strings(); ::ERR_remove_state(0); @@ -80,6 +84,15 @@ private: } private: + static unsigned long openssl_id_func() + { + void* id = instance()->thread_id_; + if (id == 0) + instance()->thread_id_ = id = &id; // Ugh. + BOOST_ASSERT(sizeof(unsigned long) >= sizeof(void*)); + return reinterpret_cast(id); + } + static void openssl_locking_func(int mode, int n, const char *file, int line) { @@ -91,6 +104,9 @@ private: // Mutexes to be used in locking callbacks. std::vector > mutexes_; + + // The thread identifiers to be used by openssl. + asio::detail::tss_ptr thread_id_; }; public: diff --git a/libtorrent/include/libtorrent/asio/ssl/detail/openssl_operation.hpp b/libtorrent/include/libtorrent/asio/ssl/detail/openssl_operation.hpp index f9ca336df..503559e7f 100755 --- a/libtorrent/include/libtorrent/asio/ssl/detail/openssl_operation.hpp +++ b/libtorrent/include/libtorrent/asio/ssl/detail/openssl_operation.hpp @@ -19,6 +19,7 @@ #include "asio/detail/push_options.hpp" #include +#include #include #include "asio/detail/pop_options.hpp" @@ -87,10 +88,12 @@ public: net_buffer& recv_buf, SSL* session, BIO* ssl_bio, - user_handler_func handler + user_handler_func handler, + asio::io_service::strand& strand ) : primitive_(primitive) , user_handler_(handler) + , strand_(&strand) , recv_buf_(recv_buf) , socket_(socket) , ssl_bio_(ssl_bio) @@ -117,6 +120,7 @@ public: SSL* session, BIO* ssl_bio) : primitive_(primitive) + , strand_(0) , recv_buf_(recv_buf) , socket_(socket) , ssl_bio_(ssl_bio) @@ -240,6 +244,7 @@ private: ssl_primitive_func primitive_; user_handler_func user_handler_; + asio::io_service::strand* strand_; write_func write_; read_func read_; int_handler_func handler_; @@ -303,19 +308,23 @@ private: { unsigned char *data_start = send_buf_.get_unused_start(); send_buf_.data_added(len); - + + BOOST_ASSERT(strand_); asio::async_write ( socket_, asio::buffer(data_start, len), - boost::bind + strand_->wrap ( - &openssl_operation::async_write_handler, - this, - is_operation_done, - rc, - asio::placeholders::error, - asio::placeholders::bytes_transferred + boost::bind + ( + &openssl_operation::async_write_handler, + this, + is_operation_done, + rc, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) ) ); @@ -365,17 +374,21 @@ private: int do_async_read() { // Wait for new data + BOOST_ASSERT(strand_); socket_.async_read_some ( asio::buffer(recv_buf_.get_unused_start(), recv_buf_.get_unused_len()), - boost::bind + strand_->wrap ( - &openssl_operation::async_read_handler, - this, - asio::placeholders::error, - asio::placeholders::bytes_transferred - ) + boost::bind + ( + &openssl_operation::async_read_handler, + this, + asio::placeholders::error, + asio::placeholders::bytes_transferred + ) + ) ); return 0; } diff --git a/libtorrent/include/libtorrent/asio/ssl/detail/openssl_stream_service.hpp b/libtorrent/include/libtorrent/asio/ssl/detail/openssl_stream_service.hpp index 3812deb80..ea0339326 100644 --- a/libtorrent/include/libtorrent/asio/ssl/detail/openssl_stream_service.hpp +++ b/libtorrent/include/libtorrent/asio/ssl/detail/openssl_stream_service.hpp @@ -20,6 +20,7 @@ #include "asio/detail/push_options.hpp" #include +#include #include #include #include @@ -28,6 +29,7 @@ #include "asio/error.hpp" #include "asio/io_service.hpp" +#include "asio/strand.hpp" #include "asio/detail/service_base.hpp" #include "asio/ssl/basic_context.hpp" #include "asio/ssl/stream_base.hpp" @@ -42,6 +44,8 @@ class openssl_stream_service : public asio::detail::service_base { private: + enum { max_buffer_size = INT_MAX }; + //Base handler for asyncrhonous operations template class base_handler @@ -160,7 +164,8 @@ public: // Construct a new stream socket service for the specified io_service. explicit openssl_stream_service(asio::io_service& io_service) - : asio::detail::service_base(io_service) + : asio::detail::service_base(io_service), + strand_(io_service) { } @@ -255,11 +260,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Shut down SSL on the stream. @@ -309,11 +315,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Write some data to the stream. @@ -324,10 +331,14 @@ public: size_t bytes_transferred = 0; try { + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function send_func = boost::bind(&::SSL_write, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - static_cast(asio::buffer_size(*buffers.begin()))); + static_cast(buffer_size)); openssl_operation op( send_func, next_layer, @@ -356,10 +367,14 @@ public: send_handler* local_handler = new send_handler(handler, get_io_service()); + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function send_func = boost::bind(&::SSL_write, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - static_cast(asio::buffer_size(*buffers.begin()))); + static_cast(buffer_size)); openssl_operation* op = new openssl_operation ( @@ -374,11 +389,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Read some data from the stream. @@ -389,10 +405,14 @@ public: size_t bytes_transferred = 0; try { + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function recv_func = boost::bind(&::SSL_read, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - asio::buffer_size(*buffers.begin())); + static_cast(buffer_size)); openssl_operation op(recv_func, next_layer, impl->recv_buf, @@ -421,10 +441,14 @@ public: recv_handler* local_handler = new recv_handler(handler, get_io_service()); + std::size_t buffer_size = asio::buffer_size(*buffers.begin()); + if (buffer_size > max_buffer_size) + buffer_size = max_buffer_size; + boost::function recv_func = boost::bind(&::SSL_read, boost::arg<1>(), asio::buffer_cast(*buffers.begin()), - asio::buffer_size(*buffers.begin())); + static_cast(buffer_size)); openssl_operation* op = new openssl_operation ( @@ -439,11 +463,12 @@ public: local_handler, boost::arg<1>(), boost::arg<2>() - ) + ), + strand_ ); local_handler->set_operation(op); - get_io_service().post(boost::bind(&openssl_operation::start, op)); + strand_.post(boost::bind(&openssl_operation::start, op)); } // Peek at the incoming data on the stream. @@ -465,6 +490,8 @@ public: } private: + asio::io_service::strand strand_; + typedef asio::detail::mutex mutex_type; template