resync asio

This commit is contained in:
Marcos Pinto 2008-02-10 22:05:58 +00:00
commit cc5ebf59d8
13 changed files with 254 additions and 82 deletions

View file

@ -392,7 +392,12 @@ public:
~buffer_debug_check() ~buffer_debug_check()
{ {
#if BOOST_WORKAROUND(BOOST_MSVC, >= 1400)
// MSVC's string iterator checking may crash in a std::string::iterator
// object's destructor when the iterator points to an already-destroyed
// std::string object, unless the iterator is cleared first.
iter_ = Iterator(); iter_ = Iterator();
#endif // BOOST_WORKAROUND(BOOST_MSVC, >= 1400)
} }
void operator()() void operator()()

View file

@ -66,7 +66,8 @@ public:
pending_cancellations_(), pending_cancellations_(),
stop_thread_(false), stop_thread_(false),
thread_(0), thread_(0),
shutdown_(false) shutdown_(false),
need_epoll_wait_(true)
{ {
// Start the reactor's internal thread only if needed. // Start the reactor's internal thread only if needed.
if (Own_Thread) if (Own_Thread)
@ -387,7 +388,9 @@ private:
// Block on the epoll descriptor. // Block on the epoll descriptor.
epoll_event events[128]; epoll_event events[128];
int num_events = epoll_wait(epoll_fd_, events, 128, timeout); int num_events = (block || need_epoll_wait_)
? epoll_wait(epoll_fd_, events, 128, timeout)
: 0;
lock.lock(); lock.lock();
wait_in_progress_ = false; wait_in_progress_ = false;
@ -478,6 +481,10 @@ private:
cancel_ops_unlocked(pending_cancellations_[i]); cancel_ops_unlocked(pending_cancellations_[i]);
pending_cancellations_.clear(); pending_cancellations_.clear();
// Determine whether epoll_wait should be called when the reactor next runs.
need_epoll_wait_ = !read_op_queue_.empty()
|| !write_op_queue_.empty() || !except_op_queue_.empty();
cleanup_operations_and_timers(lock); cleanup_operations_and_timers(lock);
} }
@ -632,6 +639,9 @@ private:
// Whether the service has been shut down. // Whether the service has been shut down.
bool shutdown_; bool shutdown_;
// Whether we need to call epoll_wait the next time the reactor is run.
bool need_epoll_wait_;
}; };
} // namespace detail } // namespace detail

View file

@ -74,7 +74,8 @@ public:
pending_cancellations_(), pending_cancellations_(),
stop_thread_(false), stop_thread_(false),
thread_(0), thread_(0),
shutdown_(false) shutdown_(false),
need_kqueue_wait_(true)
{ {
// Start the reactor's internal thread only if needed. // Start the reactor's internal thread only if needed.
if (Own_Thread) if (Own_Thread)
@ -373,7 +374,9 @@ private:
// Block on the kqueue descriptor. // Block on the kqueue descriptor.
struct kevent events[128]; struct kevent events[128];
int num_events = kevent(kqueue_fd_, 0, 0, events, 128, timeout); int num_events = (block || need_kqueue_wait_)
? kevent(kqueue_fd_, 0, 0, events, 128, timeout)
: 0;
lock.lock(); lock.lock();
wait_in_progress_ = false; wait_in_progress_ = false;
@ -478,6 +481,10 @@ private:
cancel_ops_unlocked(pending_cancellations_[i]); cancel_ops_unlocked(pending_cancellations_[i]);
pending_cancellations_.clear(); pending_cancellations_.clear();
// Determine whether kqueue needs to be called next time the reactor is run.
need_kqueue_wait_ = !read_op_queue_.empty()
|| !write_op_queue_.empty() || !except_op_queue_.empty();
cleanup_operations_and_timers(lock); cleanup_operations_and_timers(lock);
} }
@ -630,6 +637,9 @@ private:
// Whether the service has been shut down. // Whether the service has been shut down.
bool shutdown_; bool shutdown_;
// Whether we need to call kqueue the next time the reactor is run.
bool need_kqueue_wait_;
}; };
} // namespace detail } // namespace detail

View file

@ -31,6 +31,10 @@
#if defined(ASIO_HAS_OLD_WIN_SDK) #if defined(ASIO_HAS_OLD_WIN_SDK)
// Emulation of types that are missing from old Platform SDKs. // Emulation of types that are missing from old Platform SDKs.
//
// N.B. this emulation is also used if building for a Windows 2000 target with
// a recent (i.e. Vista or later) SDK, as the SDK does not provide IPv6 support
// in that case.
namespace asio { namespace asio {
namespace detail { namespace detail {
@ -54,9 +58,19 @@ struct sockaddr_storage_emulation
struct in6_addr_emulation struct in6_addr_emulation
{ {
u_char s6_addr[16]; union
{
u_char Byte[16];
u_short Word[8];
} u;
}; };
#if !defined(s6_addr)
# define _S6_un u
# define _S6_u8 Byte
# define s6_addr _S6_un._S6_u8
#endif // !defined(s6_addr)
struct sockaddr_in6_emulation struct sockaddr_in6_emulation
{ {
short sin6_family; short sin6_family;

View file

@ -90,6 +90,12 @@
# pragma warning (disable:4244) # pragma warning (disable:4244)
# pragma warning (disable:4355) # pragma warning (disable:4355)
# pragma warning (disable:4675) # pragma warning (disable:4675)
# if defined(_M_IX86) && defined(_Wp64)
// The /Wp64 option is broken. If you want to check 64 bit portability, use a
// 64 bit compiler!
# pragma warning (disable:4311)
# pragma warning (disable:4312)
# endif // defined(_M_IX86) && defined(_Wp64)
# pragma pack (push, 8) # pragma pack (push, 8)
// Note that if the /Og optimisation flag is enabled with MSVC6, the compiler // Note that if the /Og optimisation flag is enabled with MSVC6, the compiler
// has a tendency to incorrectly optimise away some calls to member template // has a tendency to incorrectly optimise away some calls to member template

View file

@ -1811,19 +1811,22 @@ inline asio::error_code getnameinfo(const socket_addr_type* addr,
# if defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0501) || defined(UNDER_CE) # if defined(_WIN32_WINNT) && (_WIN32_WINNT >= 0x0501) || defined(UNDER_CE)
// Building for Windows XP, Windows Server 2003, or later. // Building for Windows XP, Windows Server 2003, or later.
clear_error(ec); clear_error(ec);
int error = ::getnameinfo(addr, addrlen, host, static_cast<DWORD>(hostlen), int error = ::getnameinfo(addr, static_cast<socklen_t>(addrlen),
host, static_cast<DWORD>(hostlen),
serv, static_cast<DWORD>(servlen), flags); serv, static_cast<DWORD>(servlen), flags);
return ec = translate_addrinfo_error(error); return ec = translate_addrinfo_error(error);
# else # else
// Building for Windows 2000 or earlier. // Building for Windows 2000 or earlier.
typedef int (WSAAPI *gni_t)(const socket_addr_type*, typedef int (WSAAPI *gni_t)(const socket_addr_type*,
int, char*, std::size_t, char*, std::size_t, int); int, char*, DWORD, char*, DWORD, int);
if (HMODULE winsock_module = ::GetModuleHandleA("ws2_32")) if (HMODULE winsock_module = ::GetModuleHandleA("ws2_32"))
{ {
if (gni_t gni = (gni_t)::GetProcAddress(winsock_module, "getnameinfo")) if (gni_t gni = (gni_t)::GetProcAddress(winsock_module, "getnameinfo"))
{ {
clear_error(ec); clear_error(ec);
int error = gni(addr, addrlen, host, hostlen, serv, servlen, flags); int error = gni(addr, static_cast<int>(addrlen),
host, static_cast<DWORD>(hostlen),
serv, static_cast<DWORD>(servlen), flags);
return ec = translate_addrinfo_error(error); return ec = translate_addrinfo_error(error);
} }
} }

View file

@ -92,7 +92,7 @@
# include <sys/ioctl.h> # include <sys/ioctl.h>
# include <sys/poll.h> # include <sys/poll.h>
# include <sys/types.h> # include <sys/types.h>
# if defined(__hpux) # if defined(__hpux) && !defined(__HP_aCC)
# include <sys/time.h> # include <sys/time.h>
# else # else
# include <sys/select.h> # include <sys/select.h>

View file

@ -34,7 +34,6 @@
#include "asio/detail/service_base.hpp" #include "asio/detail/service_base.hpp"
#include "asio/detail/socket_types.hpp" #include "asio/detail/socket_types.hpp"
#include "asio/detail/timer_queue.hpp" #include "asio/detail/timer_queue.hpp"
#include "asio/detail/win_iocp_operation.hpp"
#include "asio/detail/mutex.hpp" #include "asio/detail/mutex.hpp"
namespace asio { namespace asio {
@ -44,14 +43,64 @@ class win_iocp_io_service
: public asio::detail::service_base<win_iocp_io_service> : public asio::detail::service_base<win_iocp_io_service>
{ {
public: public:
// Base class for all operations. // Base class for all operations. A function pointer is used instead of
typedef win_iocp_operation operation; // virtual functions to avoid the associated overhead.
//
// This class inherits from OVERLAPPED so that we can downcast to get back to
// the operation pointer from the LPOVERLAPPED out parameter of
// GetQueuedCompletionStatus.
class operation
: public OVERLAPPED
{
public:
typedef void (*invoke_func_type)(operation*, DWORD, size_t);
typedef void (*destroy_func_type)(operation*);
operation(win_iocp_io_service& iocp_service,
invoke_func_type invoke_func, destroy_func_type destroy_func)
: outstanding_operations_(&iocp_service.outstanding_operations_),
invoke_func_(invoke_func),
destroy_func_(destroy_func)
{
Internal = 0;
InternalHigh = 0;
Offset = 0;
OffsetHigh = 0;
hEvent = 0;
::InterlockedIncrement(outstanding_operations_);
}
void do_completion(DWORD last_error, size_t bytes_transferred)
{
invoke_func_(this, last_error, bytes_transferred);
}
void destroy()
{
destroy_func_(this);
}
protected:
// Prevent deletion through this type.
~operation()
{
::InterlockedDecrement(outstanding_operations_);
}
private:
long* outstanding_operations_;
invoke_func_type invoke_func_;
destroy_func_type destroy_func_;
};
// Constructor. // Constructor.
win_iocp_io_service(asio::io_service& io_service) win_iocp_io_service(asio::io_service& io_service)
: asio::detail::service_base<win_iocp_io_service>(io_service), : asio::detail::service_base<win_iocp_io_service>(io_service),
iocp_(), iocp_(),
outstanding_work_(0), outstanding_work_(0),
outstanding_operations_(0),
stopped_(0), stopped_(0),
shutdown_(0), shutdown_(0),
timer_thread_(0), timer_thread_(0),
@ -79,7 +128,7 @@ public:
{ {
::InterlockedExchange(&shutdown_, 1); ::InterlockedExchange(&shutdown_, 1);
for (;;) while (::InterlockedExchangeAdd(&outstanding_operations_, 0) > 0)
{ {
DWORD bytes_transferred = 0; DWORD bytes_transferred = 0;
#if (WINVER < 0x0500) #if (WINVER < 0x0500)
@ -88,12 +137,8 @@ public:
DWORD_PTR completion_key = 0; DWORD_PTR completion_key = 0;
#endif #endif
LPOVERLAPPED overlapped = 0; LPOVERLAPPED overlapped = 0;
::SetLastError(0); ::GetQueuedCompletionStatus(iocp_.handle, &bytes_transferred,
BOOL ok = ::GetQueuedCompletionStatus(iocp_.handle, &completion_key, &overlapped, INFINITE);
&bytes_transferred, &completion_key, &overlapped, 0);
DWORD last_error = ::GetLastError();
if (!ok && overlapped == 0 && last_error == WAIT_TIMEOUT)
break;
if (overlapped) if (overlapped)
static_cast<operation*>(overlapped)->destroy(); static_cast<operation*>(overlapped)->destroy();
} }
@ -249,7 +294,7 @@ public:
} }
// Request invocation of the given OVERLAPPED-derived operation. // Request invocation of the given OVERLAPPED-derived operation.
void post_completion(win_iocp_operation* op, DWORD op_last_error, void post_completion(operation* op, DWORD op_last_error,
DWORD bytes_transferred) DWORD bytes_transferred)
{ {
// Enqueue the operation on the I/O completion port. // Enqueue the operation on the I/O completion port.
@ -347,7 +392,7 @@ private:
&timer_thread_, this_thread_id, 0) == 0); &timer_thread_, this_thread_id, 0) == 0);
// Calculate timeout for GetQueuedCompletionStatus call. // Calculate timeout for GetQueuedCompletionStatus call.
DWORD timeout = max_timeout; DWORD timeout = INFINITE;
if (dispatching_timers) if (dispatching_timers)
{ {
asio::detail::mutex::scoped_lock lock(timer_mutex_); asio::detail::mutex::scoped_lock lock(timer_mutex_);
@ -370,6 +415,8 @@ private:
// Dispatch any pending timers. // Dispatch any pending timers.
if (dispatching_timers) if (dispatching_timers)
{
try
{ {
asio::detail::mutex::scoped_lock lock(timer_mutex_); asio::detail::mutex::scoped_lock lock(timer_mutex_);
timer_queues_copy_ = timer_queues_; timer_queues_copy_ = timer_queues_;
@ -380,6 +427,19 @@ private:
timer_queues_[i]->cleanup_timers(); timer_queues_[i]->cleanup_timers();
} }
} }
catch (...)
{
// Transfer responsibility for dispatching timers to another thread.
if (::InterlockedCompareExchange(&timer_thread_,
0, this_thread_id) == this_thread_id)
{
::PostQueuedCompletionStatus(iocp_.handle,
0, transfer_timer_dispatching, 0);
}
throw;
}
}
if (!ok && overlapped == 0) if (!ok && overlapped == 0)
{ {
@ -532,7 +592,7 @@ private:
{ {
handler_operation(win_iocp_io_service& io_service, handler_operation(win_iocp_io_service& io_service,
Handler handler) Handler handler)
: operation(&handler_operation<Handler>::do_completion_impl, : operation(io_service, &handler_operation<Handler>::do_completion_impl,
&handler_operation<Handler>::destroy_impl), &handler_operation<Handler>::destroy_impl),
io_service_(io_service), io_service_(io_service),
handler_(handler) handler_(handler)
@ -593,6 +653,10 @@ private:
// The count of unfinished work. // The count of unfinished work.
long outstanding_work_; long outstanding_work_;
// The count of unfinished operations.
long outstanding_operations_;
friend class operation;
// Flag to indicate whether the event loop has been stopped. // Flag to indicate whether the event loop has been stopped.
long stopped_; long stopped_;
@ -602,7 +666,7 @@ private:
enum enum
{ {
// Maximum GetQueuedCompletionStatus timeout, in milliseconds. // Maximum GetQueuedCompletionStatus timeout, in milliseconds.
max_timeout = 1000, max_timeout = 500,
// Completion key value to indicate that responsibility for dispatching // Completion key value to indicate that responsibility for dispatching
// timers is being cooperatively transferred from one thread to another. // timers is being cooperatively transferred from one thread to another.

View file

@ -56,7 +56,7 @@ public:
typedef typename Protocol::endpoint endpoint_type; typedef typename Protocol::endpoint endpoint_type;
// Base class for all operations. // Base class for all operations.
typedef win_iocp_operation operation; typedef win_iocp_io_service::operation operation;
struct noop_deleter { void operator()(void*) {} }; struct noop_deleter { void operator()(void*) {} };
typedef boost::shared_ptr<void> shared_cancel_token_type; typedef boost::shared_ptr<void> shared_cancel_token_type;
@ -680,13 +680,13 @@ public:
: public operation : public operation
{ {
public: public:
send_operation(asio::io_service& io_service, send_operation(win_iocp_io_service& io_service,
weak_cancel_token_type cancel_token, weak_cancel_token_type cancel_token,
const ConstBufferSequence& buffers, Handler handler) const ConstBufferSequence& buffers, Handler handler)
: operation( : operation(io_service,
&send_operation<ConstBufferSequence, Handler>::do_completion_impl, &send_operation<ConstBufferSequence, Handler>::do_completion_impl,
&send_operation<ConstBufferSequence, Handler>::destroy_impl), &send_operation<ConstBufferSequence, Handler>::destroy_impl),
work_(io_service), work_(io_service.get_io_service()),
cancel_token_(cancel_token), cancel_token_(cancel_token),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
@ -782,8 +782,8 @@ public:
typedef send_operation<ConstBufferSequence, Handler> value_type; typedef send_operation<ConstBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
this->get_io_service(), impl.cancel_token_, buffers, handler); impl.cancel_token_, buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -860,7 +860,7 @@ public:
// Send the data. // Send the data.
DWORD bytes_transferred = 0; DWORD bytes_transferred = 0;
int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred,
flags, destination.data(), destination.size(), 0, 0); flags, destination.data(), static_cast<int>(destination.size()), 0, 0);
if (result != 0) if (result != 0)
{ {
DWORD last_error = ::WSAGetLastError(); DWORD last_error = ::WSAGetLastError();
@ -880,12 +880,12 @@ public:
: public operation : public operation
{ {
public: public:
send_to_operation(asio::io_service& io_service, send_to_operation(win_iocp_io_service& io_service,
const ConstBufferSequence& buffers, Handler handler) const ConstBufferSequence& buffers, Handler handler)
: operation( : operation(io_service,
&send_to_operation<ConstBufferSequence, Handler>::do_completion_impl, &send_to_operation<ConstBufferSequence, Handler>::do_completion_impl,
&send_to_operation<ConstBufferSequence, Handler>::destroy_impl), &send_to_operation<ConstBufferSequence, Handler>::destroy_impl),
work_(io_service), work_(io_service.get_io_service()),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
{ {
@ -973,8 +973,7 @@ public:
typedef send_to_operation<ConstBufferSequence, Handler> value_type; typedef send_to_operation<ConstBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_, buffers, handler);
this->get_io_service(), buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -991,8 +990,8 @@ public:
// Send the data. // Send the data.
DWORD bytes_transferred = 0; DWORD bytes_transferred = 0;
int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, int result = ::WSASendTo(impl.socket_, bufs, i, &bytes_transferred, flags,
flags, destination.data(), destination.size(), ptr.get(), 0); destination.data(), static_cast<int>(destination.size()), ptr.get(), 0);
DWORD last_error = ::WSAGetLastError(); DWORD last_error = ::WSAGetLastError();
// Check if the operation completed immediately. // Check if the operation completed immediately.
@ -1074,15 +1073,15 @@ public:
: public operation : public operation
{ {
public: public:
receive_operation(asio::io_service& io_service, receive_operation(win_iocp_io_service& io_service,
weak_cancel_token_type cancel_token, weak_cancel_token_type cancel_token,
const MutableBufferSequence& buffers, Handler handler) const MutableBufferSequence& buffers, Handler handler)
: operation( : operation(io_service,
&receive_operation< &receive_operation<
MutableBufferSequence, Handler>::do_completion_impl, MutableBufferSequence, Handler>::do_completion_impl,
&receive_operation< &receive_operation<
MutableBufferSequence, Handler>::destroy_impl), MutableBufferSequence, Handler>::destroy_impl),
work_(io_service), work_(io_service.get_io_service()),
cancel_token_(cancel_token), cancel_token_(cancel_token),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
@ -1185,8 +1184,8 @@ public:
typedef receive_operation<MutableBufferSequence, Handler> value_type; typedef receive_operation<MutableBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
this->get_io_service(), impl.cancel_token_, buffers, handler); impl.cancel_token_, buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -1290,17 +1289,17 @@ public:
: public operation : public operation
{ {
public: public:
receive_from_operation(asio::io_service& io_service, receive_from_operation(win_iocp_io_service& io_service,
endpoint_type& endpoint, const MutableBufferSequence& buffers, endpoint_type& endpoint, const MutableBufferSequence& buffers,
Handler handler) Handler handler)
: operation( : operation(io_service,
&receive_from_operation< &receive_from_operation<
MutableBufferSequence, Handler>::do_completion_impl, MutableBufferSequence, Handler>::do_completion_impl,
&receive_from_operation< &receive_from_operation<
MutableBufferSequence, Handler>::destroy_impl), MutableBufferSequence, Handler>::destroy_impl),
endpoint_(endpoint), endpoint_(endpoint),
endpoint_size_(static_cast<int>(endpoint.capacity())), endpoint_size_(static_cast<int>(endpoint.capacity())),
work_(io_service), work_(io_service.get_io_service()),
buffers_(buffers), buffers_(buffers),
handler_(handler) handler_(handler)
{ {
@ -1405,8 +1404,8 @@ public:
typedef receive_from_operation<MutableBufferSequence, Handler> value_type; typedef receive_from_operation<MutableBufferSequence, Handler> value_type;
typedef handler_alloc_traits<Handler, value_type> alloc_traits; typedef handler_alloc_traits<Handler, value_type> alloc_traits;
raw_handler_ptr<alloc_traits> raw_ptr(handler); raw_handler_ptr<alloc_traits> raw_ptr(handler);
handler_ptr<alloc_traits> ptr(raw_ptr, handler_ptr<alloc_traits> ptr(raw_ptr, iocp_service_,
this->get_io_service(), sender_endp, buffers, handler); sender_endp, buffers, handler);
// Copy buffers into WSABUF array. // Copy buffers into WSABUF array.
::WSABUF bufs[max_buffers]; ::WSABUF bufs[max_buffers];
@ -1508,7 +1507,7 @@ public:
socket_type socket, socket_type new_socket, Socket& peer, socket_type socket, socket_type new_socket, Socket& peer,
const protocol_type& protocol, endpoint_type* peer_endpoint, const protocol_type& protocol, endpoint_type* peer_endpoint,
bool enable_connection_aborted, Handler handler) bool enable_connection_aborted, Handler handler)
: operation( : operation(io_service,
&accept_operation<Socket, Handler>::do_completion_impl, &accept_operation<Socket, Handler>::do_completion_impl,
&accept_operation<Socket, Handler>::destroy_impl), &accept_operation<Socket, Handler>::destroy_impl),
io_service_(io_service), io_service_(io_service),

View file

@ -70,6 +70,11 @@ enum basic_errors
/// Operation already in progress. /// Operation already in progress.
already_started = ASIO_SOCKET_ERROR(EALREADY), already_started = ASIO_SOCKET_ERROR(EALREADY),
/// Broken pipe.
broken_pipe = ASIO_WIN_OR_POSIX(
ASIO_NATIVE_ERROR(ERROR_BROKEN_PIPE),
ASIO_NATIVE_ERROR(EPIPE)),
/// A connection has been aborted. /// A connection has been aborted.
connection_aborted = ASIO_SOCKET_ERROR(ECONNABORTED), connection_aborted = ASIO_SOCKET_ERROR(ECONNABORTED),

View file

@ -20,10 +20,12 @@
#include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp"
#include <vector> #include <vector>
#include <boost/assert.hpp>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include "asio/detail/pop_options.hpp" #include "asio/detail/pop_options.hpp"
#include "asio/detail/mutex.hpp" #include "asio/detail/mutex.hpp"
#include "asio/detail/tss_ptr.hpp"
#include "asio/ssl/detail/openssl_types.hpp" #include "asio/ssl/detail/openssl_types.hpp"
namespace asio { namespace asio {
@ -51,6 +53,7 @@ private:
for (size_t i = 0; i < mutexes_.size(); ++i) for (size_t i = 0; i < mutexes_.size(); ++i)
mutexes_[i].reset(new asio::detail::mutex); mutexes_[i].reset(new asio::detail::mutex);
::CRYPTO_set_locking_callback(&do_init::openssl_locking_func); ::CRYPTO_set_locking_callback(&do_init::openssl_locking_func);
::CRYPTO_set_id_callback(&do_init::openssl_id_func);
} }
} }
@ -58,6 +61,7 @@ private:
{ {
if (Do_Init) if (Do_Init)
{ {
::CRYPTO_set_id_callback(0);
::CRYPTO_set_locking_callback(0); ::CRYPTO_set_locking_callback(0);
::ERR_free_strings(); ::ERR_free_strings();
::ERR_remove_state(0); ::ERR_remove_state(0);
@ -80,6 +84,15 @@ private:
} }
private: private:
static unsigned long openssl_id_func()
{
void* id = instance()->thread_id_;
if (id == 0)
instance()->thread_id_ = id = &id; // Ugh.
BOOST_ASSERT(sizeof(unsigned long) >= sizeof(void*));
return reinterpret_cast<unsigned long>(id);
}
static void openssl_locking_func(int mode, int n, static void openssl_locking_func(int mode, int n,
const char *file, int line) const char *file, int line)
{ {
@ -91,6 +104,9 @@ private:
// Mutexes to be used in locking callbacks. // Mutexes to be used in locking callbacks.
std::vector<boost::shared_ptr<asio::detail::mutex> > mutexes_; std::vector<boost::shared_ptr<asio::detail::mutex> > mutexes_;
// The thread identifiers to be used by openssl.
asio::detail::tss_ptr<void> thread_id_;
}; };
public: public:

View file

@ -19,6 +19,7 @@
#include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp"
#include <boost/function.hpp> #include <boost/function.hpp>
#include <boost/assert.hpp>
#include <boost/bind.hpp> #include <boost/bind.hpp>
#include "asio/detail/pop_options.hpp" #include "asio/detail/pop_options.hpp"
@ -87,10 +88,12 @@ public:
net_buffer& recv_buf, net_buffer& recv_buf,
SSL* session, SSL* session,
BIO* ssl_bio, BIO* ssl_bio,
user_handler_func handler user_handler_func handler,
asio::io_service::strand& strand
) )
: primitive_(primitive) : primitive_(primitive)
, user_handler_(handler) , user_handler_(handler)
, strand_(&strand)
, recv_buf_(recv_buf) , recv_buf_(recv_buf)
, socket_(socket) , socket_(socket)
, ssl_bio_(ssl_bio) , ssl_bio_(ssl_bio)
@ -117,6 +120,7 @@ public:
SSL* session, SSL* session,
BIO* ssl_bio) BIO* ssl_bio)
: primitive_(primitive) : primitive_(primitive)
, strand_(0)
, recv_buf_(recv_buf) , recv_buf_(recv_buf)
, socket_(socket) , socket_(socket)
, ssl_bio_(ssl_bio) , ssl_bio_(ssl_bio)
@ -240,6 +244,7 @@ private:
ssl_primitive_func primitive_; ssl_primitive_func primitive_;
user_handler_func user_handler_; user_handler_func user_handler_;
asio::io_service::strand* strand_;
write_func write_; write_func write_;
read_func read_; read_func read_;
int_handler_func handler_; int_handler_func handler_;
@ -304,10 +309,13 @@ private:
unsigned char *data_start = send_buf_.get_unused_start(); unsigned char *data_start = send_buf_.get_unused_start();
send_buf_.data_added(len); send_buf_.data_added(len);
BOOST_ASSERT(strand_);
asio::async_write asio::async_write
( (
socket_, socket_,
asio::buffer(data_start, len), asio::buffer(data_start, len),
strand_->wrap
(
boost::bind boost::bind
( (
&openssl_operation::async_write_handler, &openssl_operation::async_write_handler,
@ -317,6 +325,7 @@ private:
asio::placeholders::error, asio::placeholders::error,
asio::placeholders::bytes_transferred asio::placeholders::bytes_transferred
) )
)
); );
return 0; return 0;
@ -365,10 +374,13 @@ private:
int do_async_read() int do_async_read()
{ {
// Wait for new data // Wait for new data
BOOST_ASSERT(strand_);
socket_.async_read_some socket_.async_read_some
( (
asio::buffer(recv_buf_.get_unused_start(), asio::buffer(recv_buf_.get_unused_start(),
recv_buf_.get_unused_len()), recv_buf_.get_unused_len()),
strand_->wrap
(
boost::bind boost::bind
( (
&openssl_operation::async_read_handler, &openssl_operation::async_read_handler,
@ -376,6 +388,7 @@ private:
asio::placeholders::error, asio::placeholders::error,
asio::placeholders::bytes_transferred asio::placeholders::bytes_transferred
) )
)
); );
return 0; return 0;
} }

View file

@ -20,6 +20,7 @@
#include "asio/detail/push_options.hpp" #include "asio/detail/push_options.hpp"
#include <cstddef> #include <cstddef>
#include <climits>
#include <boost/config.hpp> #include <boost/config.hpp>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
#include <boost/function.hpp> #include <boost/function.hpp>
@ -28,6 +29,7 @@
#include "asio/error.hpp" #include "asio/error.hpp"
#include "asio/io_service.hpp" #include "asio/io_service.hpp"
#include "asio/strand.hpp"
#include "asio/detail/service_base.hpp" #include "asio/detail/service_base.hpp"
#include "asio/ssl/basic_context.hpp" #include "asio/ssl/basic_context.hpp"
#include "asio/ssl/stream_base.hpp" #include "asio/ssl/stream_base.hpp"
@ -42,6 +44,8 @@ class openssl_stream_service
: public asio::detail::service_base<openssl_stream_service> : public asio::detail::service_base<openssl_stream_service>
{ {
private: private:
enum { max_buffer_size = INT_MAX };
//Base handler for asyncrhonous operations //Base handler for asyncrhonous operations
template <typename Stream> template <typename Stream>
class base_handler class base_handler
@ -160,7 +164,8 @@ public:
// Construct a new stream socket service for the specified io_service. // Construct a new stream socket service for the specified io_service.
explicit openssl_stream_service(asio::io_service& io_service) explicit openssl_stream_service(asio::io_service& io_service)
: asio::detail::service_base<openssl_stream_service>(io_service) : asio::detail::service_base<openssl_stream_service>(io_service),
strand_(io_service)
{ {
} }
@ -255,11 +260,12 @@ public:
local_handler, local_handler,
boost::arg<1>(), boost::arg<1>(),
boost::arg<2>() boost::arg<2>()
) ),
strand_
); );
local_handler->set_operation(op); local_handler->set_operation(op);
get_io_service().post(boost::bind(&openssl_operation<Stream>::start, op)); strand_.post(boost::bind(&openssl_operation<Stream>::start, op));
} }
// Shut down SSL on the stream. // Shut down SSL on the stream.
@ -309,11 +315,12 @@ public:
local_handler, local_handler,
boost::arg<1>(), boost::arg<1>(),
boost::arg<2>() boost::arg<2>()
) ),
strand_
); );
local_handler->set_operation(op); local_handler->set_operation(op);
get_io_service().post(boost::bind(&openssl_operation<Stream>::start, op)); strand_.post(boost::bind(&openssl_operation<Stream>::start, op));
} }
// Write some data to the stream. // Write some data to the stream.
@ -324,10 +331,14 @@ public:
size_t bytes_transferred = 0; size_t bytes_transferred = 0;
try try
{ {
std::size_t buffer_size = asio::buffer_size(*buffers.begin());
if (buffer_size > max_buffer_size)
buffer_size = max_buffer_size;
boost::function<int (SSL*)> send_func = boost::function<int (SSL*)> send_func =
boost::bind(&::SSL_write, boost::arg<1>(), boost::bind(&::SSL_write, boost::arg<1>(),
asio::buffer_cast<const void*>(*buffers.begin()), asio::buffer_cast<const void*>(*buffers.begin()),
static_cast<int>(asio::buffer_size(*buffers.begin()))); static_cast<int>(buffer_size));
openssl_operation<Stream> op( openssl_operation<Stream> op(
send_func, send_func,
next_layer, next_layer,
@ -356,10 +367,14 @@ public:
send_handler* local_handler = new send_handler(handler, get_io_service()); send_handler* local_handler = new send_handler(handler, get_io_service());
std::size_t buffer_size = asio::buffer_size(*buffers.begin());
if (buffer_size > max_buffer_size)
buffer_size = max_buffer_size;
boost::function<int (SSL*)> send_func = boost::function<int (SSL*)> send_func =
boost::bind(&::SSL_write, boost::arg<1>(), boost::bind(&::SSL_write, boost::arg<1>(),
asio::buffer_cast<const void*>(*buffers.begin()), asio::buffer_cast<const void*>(*buffers.begin()),
static_cast<int>(asio::buffer_size(*buffers.begin()))); static_cast<int>(buffer_size));
openssl_operation<Stream>* op = new openssl_operation<Stream> openssl_operation<Stream>* op = new openssl_operation<Stream>
( (
@ -374,11 +389,12 @@ public:
local_handler, local_handler,
boost::arg<1>(), boost::arg<1>(),
boost::arg<2>() boost::arg<2>()
) ),
strand_
); );
local_handler->set_operation(op); local_handler->set_operation(op);
get_io_service().post(boost::bind(&openssl_operation<Stream>::start, op)); strand_.post(boost::bind(&openssl_operation<Stream>::start, op));
} }
// Read some data from the stream. // Read some data from the stream.
@ -389,10 +405,14 @@ public:
size_t bytes_transferred = 0; size_t bytes_transferred = 0;
try try
{ {
std::size_t buffer_size = asio::buffer_size(*buffers.begin());
if (buffer_size > max_buffer_size)
buffer_size = max_buffer_size;
boost::function<int (SSL*)> recv_func = boost::function<int (SSL*)> recv_func =
boost::bind(&::SSL_read, boost::arg<1>(), boost::bind(&::SSL_read, boost::arg<1>(),
asio::buffer_cast<void*>(*buffers.begin()), asio::buffer_cast<void*>(*buffers.begin()),
asio::buffer_size(*buffers.begin())); static_cast<int>(buffer_size));
openssl_operation<Stream> op(recv_func, openssl_operation<Stream> op(recv_func,
next_layer, next_layer,
impl->recv_buf, impl->recv_buf,
@ -421,10 +441,14 @@ public:
recv_handler* local_handler = new recv_handler(handler, get_io_service()); recv_handler* local_handler = new recv_handler(handler, get_io_service());
std::size_t buffer_size = asio::buffer_size(*buffers.begin());
if (buffer_size > max_buffer_size)
buffer_size = max_buffer_size;
boost::function<int (SSL*)> recv_func = boost::function<int (SSL*)> recv_func =
boost::bind(&::SSL_read, boost::arg<1>(), boost::bind(&::SSL_read, boost::arg<1>(),
asio::buffer_cast<void*>(*buffers.begin()), asio::buffer_cast<void*>(*buffers.begin()),
asio::buffer_size(*buffers.begin())); static_cast<int>(buffer_size));
openssl_operation<Stream>* op = new openssl_operation<Stream> openssl_operation<Stream>* op = new openssl_operation<Stream>
( (
@ -439,11 +463,12 @@ public:
local_handler, local_handler,
boost::arg<1>(), boost::arg<1>(),
boost::arg<2>() boost::arg<2>()
) ),
strand_
); );
local_handler->set_operation(op); local_handler->set_operation(op);
get_io_service().post(boost::bind(&openssl_operation<Stream>::start, op)); strand_.post(boost::bind(&openssl_operation<Stream>::start, op));
} }
// Peek at the incoming data on the stream. // Peek at the incoming data on the stream.
@ -465,6 +490,8 @@ public:
} }
private: private:
asio::io_service::strand strand_;
typedef asio::detail::mutex mutex_type; typedef asio::detail::mutex mutex_type;
template<typename Mutex> template<typename Mutex>