From 57714fbb38add028e0ea45a7d5ea7681b4fc7295 Mon Sep 17 00:00:00 2001 From: Ali Mohammad Pur Date: Wed, 8 May 2024 20:15:05 +0200 Subject: [PATCH] RequestServer: Handle IPC requests on multiple threads concurrently Previously RS handled all the requests in an event loop, leading to issues with connections being started in the middle of other connections being started (and potentially blowing up the stack), ultimately causing requests to be delayed because of other requests. This commit reworks the way we handle these (specifically starting connections) by first serialising the requests, and then performing them in multiple threads concurrently; which yields a significant loading performance and reliability increase. --- Ladybird/RequestServer/CMakeLists.txt | 2 +- Userland/Libraries/LibCore/ElapsedTimer.cpp | 4 +- Userland/Libraries/LibCore/ElapsedTimer.h | 2 +- Userland/Libraries/LibTLS/TLSv12.cpp | 2 +- .../Services/RequestServer/CMakeLists.txt | 2 +- .../RequestServer/ConnectionCache.cpp | 125 ++++--- .../Services/RequestServer/ConnectionCache.h | 263 ++++++++++---- .../RequestServer/ConnectionFromClient.cpp | 329 +++++++++++------- .../RequestServer/ConnectionFromClient.h | 33 +- .../Services/RequestServer/GeminiProtocol.cpp | 2 +- Userland/Services/RequestServer/HttpCommon.h | 10 +- Userland/Services/RequestServer/main.cpp | 9 +- 12 files changed, 519 insertions(+), 264 deletions(-) diff --git a/Ladybird/RequestServer/CMakeLists.txt b/Ladybird/RequestServer/CMakeLists.txt index 798d7715fae..1f3ae575f89 100644 --- a/Ladybird/RequestServer/CMakeLists.txt +++ b/Ladybird/RequestServer/CMakeLists.txt @@ -33,7 +33,7 @@ target_link_libraries(RequestServer PRIVATE requestserver) target_include_directories(requestserver PRIVATE ${SERENITY_SOURCE_DIR}/Userland/Services/) target_include_directories(requestserver PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/..) -target_link_libraries(requestserver PUBLIC LibCore LibMain LibCrypto LibFileSystem LibGemini LibHTTP LibIPC LibMain LibTLS LibWebView LibWebSocket LibURL) +target_link_libraries(requestserver PUBLIC LibCore LibMain LibCrypto LibFileSystem LibGemini LibHTTP LibIPC LibMain LibTLS LibWebView LibWebSocket LibURL LibThreading) if (${CMAKE_SYSTEM_NAME} MATCHES "SunOS") # Solaris has socket and networking related functions in two extra libraries target_link_libraries(requestserver PUBLIC nsl socket) diff --git a/Userland/Libraries/LibCore/ElapsedTimer.cpp b/Userland/Libraries/LibCore/ElapsedTimer.cpp index bc81d93be99..e4dd994759f 100644 --- a/Userland/Libraries/LibCore/ElapsedTimer.cpp +++ b/Userland/Libraries/LibCore/ElapsedTimer.cpp @@ -10,9 +10,9 @@ namespace Core { -ElapsedTimer ElapsedTimer::start_new() +ElapsedTimer ElapsedTimer::start_new(TimerType timer_type) { - ElapsedTimer timer; + ElapsedTimer timer(timer_type); timer.start(); return timer; } diff --git a/Userland/Libraries/LibCore/ElapsedTimer.h b/Userland/Libraries/LibCore/ElapsedTimer.h index 380e8bce049..716943cad90 100644 --- a/Userland/Libraries/LibCore/ElapsedTimer.h +++ b/Userland/Libraries/LibCore/ElapsedTimer.h @@ -17,7 +17,7 @@ enum class TimerType { class ElapsedTimer { public: - static ElapsedTimer start_new(); + static ElapsedTimer start_new(TimerType timer_type = TimerType::Coarse); ElapsedTimer(TimerType timer_type = TimerType::Coarse) : m_timer_type(timer_type) diff --git a/Userland/Libraries/LibTLS/TLSv12.cpp b/Userland/Libraries/LibTLS/TLSv12.cpp index 6e4b2cce6b3..6adefa5855c 100644 --- a/Userland/Libraries/LibTLS/TLSv12.cpp +++ b/Userland/Libraries/LibTLS/TLSv12.cpp @@ -569,7 +569,7 @@ DefaultRootCACertificates::DefaultRootCACertificates() DefaultRootCACertificates& DefaultRootCACertificates::the() { - static DefaultRootCACertificates s_the; + static thread_local DefaultRootCACertificates s_the; return s_the; } diff --git a/Userland/Services/RequestServer/CMakeLists.txt b/Userland/Services/RequestServer/CMakeLists.txt index 85711038409..34c533c18ce 100644 --- a/Userland/Services/RequestServer/CMakeLists.txt +++ b/Userland/Services/RequestServer/CMakeLists.txt @@ -26,4 +26,4 @@ set(GENERATED_SOURCES ) serenity_bin(RequestServer) -target_link_libraries(RequestServer PRIVATE LibCore LibCrypto LibIPC LibGemini LibHTTP LibMain LibTLS LibWebSocket LibURL) +target_link_libraries(RequestServer PRIVATE LibCore LibCrypto LibIPC LibGemini LibHTTP LibMain LibTLS LibWebSocket LibURL LibThreading) diff --git a/Userland/Services/RequestServer/ConnectionCache.cpp b/Userland/Services/RequestServer/ConnectionCache.cpp index 3c9c46985df..c7dfdd13538 100644 --- a/Userland/Services/RequestServer/ConnectionCache.cpp +++ b/Userland/Services/RequestServer/ConnectionCache.cpp @@ -11,9 +11,9 @@ namespace RequestServer::ConnectionCache { -HashMap>>>> g_tcp_connection_cache {}; -HashMap>>>> g_tls_connection_cache {}; -HashMap g_inferred_server_properties; +Threading::RWLockProtected>>>>> g_tcp_connection_cache {}; +Threading::RWLockProtected>>>>> g_tls_connection_cache {}; +Threading::RWLockProtected> g_inferred_server_properties; void request_did_finish(URL::URL const& url, Core::Socket const* socket) { @@ -26,8 +26,20 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) ConnectionKey partial_key { url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(), url.port_or_default() }; auto fire_off_next_job = [&](auto& cache) { - auto it = find_if(cache.begin(), cache.end(), [&](auto& connection) { return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port; }); - if (it == cache.end()) { + using CacheType = typename RemoveCVReference::ProtectedType; + auto [it, end] = cache.with_read_locked([&](auto const& cache) { + struct Result { + decltype(cache.begin()) it; + decltype(cache.end()) end; + }; + return Result { + find_if(cache.begin(), cache.end(), [&](auto& connection) { + return connection.key.hostname == partial_key.hostname && connection.key.port == partial_key.port; + }), + cache.end(), + }; + }); + if (it == end) { dbgln("Request for URL {} finished, but we don't own that!", url); return; } @@ -38,11 +50,16 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) } auto& connection = *connection_it; - auto& properties = g_inferred_server_properties.ensure(partial_key.hostname); + if constexpr (REQUESTSERVER_DEBUG) { + connection->job_data->timing_info.performing_request = Duration::from_milliseconds(connection->job_data->timing_info.timer.elapsed_milliseconds()); + connection->job_data->timing_info.timer.start(); + } + + auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(partial_key.hostname); }); if (!connection->socket->is_open()) properties.requests_served_per_connection = min(properties.requests_served_per_connection, connection->max_queue_length + 1); - if (connection->request_queue.is_empty()) { + if (connection->request_queue.with_read_locked([](auto const& queue) { return queue.is_empty(); })) { // Immediately mark the connection as finished, as new jobs will never be run if they are queued // before the deferred_invoke() below runs otherwise. connection->has_started = false; @@ -59,30 +76,47 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) if (ptr->has_started) return; - dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket); - auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; }); - VERIFY(did_remove); - if (cache_entry.is_empty()) - cache.remove(key); + dbgln_if(REQUESTSERVER_DEBUG, "Removing no-longer-used connection {} (socket {})", ptr, ptr->socket.ptr()); + cache.with_write_locked([&](CacheType& cache) { + auto did_remove = cache_entry.remove_first_matching([&](auto& entry) { return entry == ptr; }); + VERIFY(did_remove); + if (cache_entry.is_empty()) + cache.remove(key); + }); }); }; connection->removal_timer->start(); }); } else { + auto timer = Core::ElapsedTimer::start_new(); if (auto result = recreate_socket_if_needed(*connection, url); result.is_error()) { - dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error()); - connection->job_data.fail(Core::NetworkJob::Error::ConnectionFailed); + if constexpr (REQUESTSERVER_DEBUG) { + connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds()); + } + cache.with_read_locked([&](auto&) { + dbgln("ConnectionCache request finish handler, reconnection failed with {}", result.error()); + connection->job_data->fail(Core::NetworkJob::Error::ConnectionFailed); + }); return; } + if constexpr (REQUESTSERVER_DEBUG) { + connection->job_data->timing_info.starting_connection += Duration::from_milliseconds(timer.elapsed_milliseconds()); + } connection->has_started = true; - Core::deferred_invoke([&connection = *connection, url] { - dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection); - connection.timer.start(); - connection.current_url = url; - connection.job_data = connection.request_queue.take_first(); - connection.socket->set_notifications_enabled(true); - connection.job_data.start(*connection.socket); + Core::deferred_invoke([&connection = *connection, url, &cache] { + cache.with_read_locked([&](auto&) { + dbgln_if(REQUESTSERVER_DEBUG, "Running next job in queue for connection {}", &connection); + connection.timer.start(); + connection.current_url = url; + connection.job_data = connection.request_queue.with_write_locked([](auto& queue) { return queue.take_first(); }); + if constexpr (REQUESTSERVER_DEBUG) { + connection.job_data->timing_info.waiting_in_queue = Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds() - connection.job_data->timing_info.performing_request.to_milliseconds()); + connection.job_data->timing_info.timer.start(); + } + connection.socket->set_notifications_enabled(true); + connection.job_data->start(*connection.socket); + }); }); } }; @@ -97,28 +131,37 @@ void request_did_finish(URL::URL const& url, Core::Socket const* socket) void dump_jobs() { - dbgln("=========== TLS Connection Cache =========="); - for (auto& connection : g_tls_connection_cache) { - dbgln(" - {}:{}", connection.key.hostname, connection.key.port); - for (auto& entry : *connection.value) { - dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket); - dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); - dbgln(" Request Queue:"); - for (auto& job : entry->request_queue) - dbgln(" - {}", &job); + g_tls_connection_cache.with_read_locked([](auto& cache) { + dbgln("=========== TLS Connection Cache =========="); + for (auto& connection : cache) { + dbgln(" - {}:{}", connection.key.hostname, connection.key.port); + for (auto& entry : *connection.value) { + dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr()); + dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); + dbgln(" Request Queue:"); + entry->request_queue.for_each_locked([](auto const& job) { + dbgln(" - {}", &job); + }); + } } - } - dbgln("=========== TCP Connection Cache =========="); - for (auto& connection : g_tcp_connection_cache) { - dbgln(" - {}:{}", connection.key.hostname, connection.key.port); - for (auto& entry : *connection.value) { - dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket); - dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); - dbgln(" Request Queue:"); - for (auto& job : entry->request_queue) - dbgln(" - {}", &job); + }); + + g_tcp_connection_cache.with_read_locked([](auto& cache) { + dbgln("=========== TCP Connection Cache =========="); + for (auto& connection : cache) { + dbgln(" - {}:{}", connection.key.hostname, connection.key.port); + for (auto& entry : *connection.value) { + dbgln(" - Connection {} (started={}) (socket={})", &entry, entry->has_started, entry->socket.ptr()); + dbgln(" Currently loading {} ({} elapsed)", entry->current_url, entry->timer.is_valid() ? entry->timer.elapsed() : 0); + dbgln(" Request Queue:"); + entry->request_queue.for_each_locked([](auto const& job) { + dbgln(" - {}", &job); + }); + } } - } + }); } +size_t hits; +size_t misses; } diff --git a/Userland/Services/RequestServer/ConnectionCache.h b/Userland/Services/RequestServer/ConnectionCache.h index f812d91ad51..f9ad626da3c 100644 --- a/Userland/Services/RequestServer/ConnectionCache.h +++ b/Userland/Services/RequestServer/ConnectionCache.h @@ -16,6 +16,7 @@ #include #include #include +#include #include namespace RequestServer { @@ -53,42 +54,95 @@ struct Proxy { } }; +struct JobData { + Function start {}; + Function fail {}; + Function()> provide_client_certificates {}; + struct TimingInfo { +#if REQUESTSERVER_DEBUG + bool valid { true }; + Core::ElapsedTimer timer {}; + URL::URL url {}; + Duration waiting_in_queue {}; + Duration starting_connection {}; + Duration performing_request {}; +#endif + } timing_info {}; + + JobData(Function start, Function fail, Function()> provide_client_certificates, TimingInfo timing_info) + : start(move(start)) + , fail(move(fail)) + , provide_client_certificates(move(provide_client_certificates)) + , timing_info(move(timing_info)) + { + } + + JobData(JobData&& other) + : start(move(other.start)) + , fail(move(other.fail)) + , provide_client_certificates(move(other.provide_client_certificates)) + , timing_info(move(other.timing_info)) + { +#if REQUESTSERVER_DEBUG + other.timing_info.valid = false; +#endif + } + +#if REQUESTSERVER_DEBUG + ~JobData() + { + if (timing_info.valid) { + dbgln("JobData for {} timings:", timing_info.url); + dbgln(" - Waiting in queue: {}ms", timing_info.waiting_in_queue.to_milliseconds()); + dbgln(" - Starting connection: {}ms", timing_info.starting_connection.to_milliseconds()); + dbgln(" - Performing request: {}ms", timing_info.performing_request.to_milliseconds()); + } + } +#endif + + template + static JobData create(NonnullRefPtr job, [[maybe_unused]] URL::URL url) + { + return JobData { + [job](auto& socket) { job->start(socket); }, + [job](auto error) { job->fail(error); }, + [job] { + if constexpr (requires { job->on_certificate_requested; }) { + if (job->on_certificate_requested) + return job->on_certificate_requested(); + } else { + // "use" `job`, otherwise clang gets sad. + (void)job; + } + return Vector {}; + }, + { +#if REQUESTSERVER_DEBUG + .timer = Core::ElapsedTimer::start_new(Core::TimerType::Precise), + .url = move(url), + .waiting_in_queue = {}, + .starting_connection = {}, + .performing_request = {}, +#endif + }, + }; + } +}; + template struct Connection { - struct JobData { - Function start {}; - Function fail {}; - Function()> provide_client_certificates {}; - - template - static JobData create(NonnullRefPtr job) - { - return JobData { - .start = [job](auto& socket) { job->start(socket); }, - .fail = [job](auto error) { job->fail(error); }, - .provide_client_certificates = [job] { - if constexpr (requires { job->on_certificate_requested; }) { - if (job->on_certificate_requested) - return job->on_certificate_requested(); - } else { - // "use" `job`, otherwise clang gets sad. - (void)job; - } - return Vector {}; }, - }; - } - }; using QueueType = Vector; using SocketType = Socket; using StorageType = SocketStorageType; - NonnullOwnPtr> socket; - QueueType request_queue; + OwnPtr> socket; + Threading::RWLockProtected request_queue; NonnullRefPtr removal_timer; + Atomic is_being_started { false }; bool has_started { false }; URL::URL current_url {}; Core::ElapsedTimer timer {}; - JobData job_data {}; + Optional job_data {}; Proxy proxy {}; size_t max_queue_length { 0 }; }; @@ -117,15 +171,16 @@ struct InferredServerProperties { size_t requests_served_per_connection { NumericLimits::max() }; }; -extern HashMap>>>> g_tcp_connection_cache; -extern HashMap>>>> g_tls_connection_cache; -extern HashMap g_inferred_server_properties; +extern Threading::RWLockProtected>>>>> g_tcp_connection_cache; +extern Threading::RWLockProtected>>>>> g_tls_connection_cache; +extern Threading::RWLockProtected> g_inferred_server_properties; void request_did_finish(URL::URL const&, Core::Socket const*); void dump_jobs(); constexpr static size_t MaxConcurrentConnectionsPerURL = 4; -constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 10'000; +constexpr static size_t ConnectionKeepAliveTimeMilliseconds = 20'000; +constexpr static size_t ConnectionCacheQueueHighWatermark = 4; template ErrorOr recreate_socket_if_needed(T& connection, URL::URL const& url) @@ -134,6 +189,7 @@ ErrorOr recreate_socket_if_needed(T& connection, URL::URL const& url) using SocketStorageType = typename T::StorageType; if (!connection.socket->is_open() || connection.socket->is_eof()) { + connection.socket = nullptr; // Create another socket for the connection. auto set_socket = [&](NonnullOwnPtr&& socket) -> ErrorOr { connection.socket = TRY(Core::BufferedSocket::create(move(socket))); @@ -151,52 +207,82 @@ ErrorOr recreate_socket_if_needed(T& connection, URL::URL const& url) else reason = Core::NetworkJob::Error::TransmissionFailed; - if (connection.job_data.fail) - connection.job_data.fail(reason); + if (connection.job_data->fail) + connection.job_data->fail(reason); }); options.set_certificate_provider([&connection]() -> Vector { - if (connection.job_data.provide_client_certificates) - return connection.job_data.provide_client_certificates(); + if (connection.job_data->provide_client_certificates) + return connection.job_data->provide_client_certificates(); return {}; }); TRY(set_socket(TRY((connection.proxy.template tunnel(url, move(options)))))); } else { TRY(set_socket(TRY((connection.proxy.template tunnel(url))))); } - dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket); + dbgln_if(REQUESTSERVER_DEBUG, "Creating a new socket for {} -> {}", url, connection.socket.ptr()); } return {}; } -decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto job, Core::ProxyData proxy_data = {}) +extern size_t hits; +extern size_t misses; + +template +void start_connection(const URL::URL& url, auto job, auto& sockets_for_url, size_t index, Duration, Cache&); + +void ensure_connection(auto& cache, const URL::URL& url, auto job, Core::ProxyData proxy_data = {}) { - using CacheEntryType = RemoveCVReferencevalue)>; + using CacheEntryType = RemoveCVReference::ProtectedType>().begin()->value)>; auto hostname = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(); - auto& properties = g_inferred_server_properties.ensure(hostname); + auto& properties = g_inferred_server_properties.with_write_locked([&](auto& map) -> InferredServerProperties& { return map.ensure(hostname); }); - auto& sockets_for_url = *cache.ensure({ move(hostname), url.port_or_default(), proxy_data }, [] { return make(); }); + auto& sockets_for_url = *cache.with_write_locked([&](auto& map) -> NonnullOwnPtr& { + return map.ensure({ move(hostname), url.port_or_default(), proxy_data }, [] { return make(); }); + }); - Proxy proxy { proxy_data }; - - using ReturnType = decltype(sockets_for_url[0].ptr()); // Find the connection with an empty queue; if none exist, we'll find the least backed-up connection later. // Note that servers that are known to serve a single request per connection (e.g. HTTP/1.0) usually have // issues with concurrent connections, so we'll only allow one connection per URL in that case to avoid issues. // This is a bit too aggressive, but there's no way to know if the server can handle concurrent connections // without trying it out first, and that's not worth the effort as HTTP/1.0 is a legacy protocol anyway. - auto it = sockets_for_url.find_if([&](auto& connection) { return properties.requests_served_per_connection < 2 || connection->request_queue.is_empty(); }); + auto it = sockets_for_url.find_if([&](auto const& connection) { + return properties.requests_served_per_connection < 2 + || connection->request_queue.with_read_locked([](auto const& queue) { return queue.size(); }) <= ConnectionCacheQueueHighWatermark; + }); auto did_add_new_connection = false; auto failed_to_find_a_socket = it.is_end(); - if (failed_to_find_a_socket && sockets_for_url.size() < ConnectionCache::MaxConcurrentConnectionsPerURL) { - using ConnectionType = RemoveCVReferencevalue->at(0))>; + + Proxy proxy { proxy_data }; + size_t index; + + auto timer = Core::ElapsedTimer::start_new(); + if (failed_to_find_a_socket && sockets_for_url.size() < MaxConcurrentConnectionsPerURL) { + using ConnectionType = RemoveCVReference().at(0))>; + auto& connection = cache.with_write_locked([&](auto&) -> ConnectionType& { + index = sockets_for_url.size(); + sockets_for_url.append(AK::make( + nullptr, + typename ConnectionType::QueueType {}, + Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr), + true)); + auto& connection = sockets_for_url.last(); + connection->proxy = move(proxy); + return *connection; + }); + ScopeGuard start_guard = [&] { + connection.is_being_started = false; + }; + dbgln_if(REQUESTSERVER_DEBUG, "I will start a connection ({}) for URL {}", &connection, url); + auto connection_result = proxy.tunnel(url); + misses++; if (connection_result.is_error()) { dbgln("ConnectionCache: Connection to {} failed: {}", url, connection_result.error()); Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); - return ReturnType { nullptr }; + return; } auto socket_result = Core::BufferedSocket::create(connection_result.release_value()); if (socket_result.is_error()) { @@ -204,25 +290,21 @@ decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto j Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); - return ReturnType { nullptr }; + return; } - sockets_for_url.append(make( - socket_result.release_value(), - typename ConnectionType::QueueType {}, - Core::Timer::create_single_shot(ConnectionKeepAliveTimeMilliseconds, nullptr))); - sockets_for_url.last()->proxy = move(proxy); did_add_new_connection = true; + connection.socket = socket_result.release_value(); } - size_t index; + + auto elapsed = Duration::from_milliseconds(timer.elapsed_milliseconds()); + if (failed_to_find_a_socket) { - if (did_add_new_connection) { - index = sockets_for_url.size() - 1; - } else { + if (!did_add_new_connection) { // Find the least backed-up connection (based on how many entries are in their request queue). index = 0; auto min_queue_size = (size_t)-1; for (auto it = sockets_for_url.begin(); it != sockets_for_url.end(); ++it) { - if (auto queue_size = (*it)->request_queue.size(); min_queue_size > queue_size) { + if (auto queue_size = (*it)->request_queue.with_read_locked([](auto const& queue) { return queue.size(); }); min_queue_size > queue_size) { index = it.index(); min_queue_size = queue_size; } @@ -230,39 +312,76 @@ decltype(auto) get_or_create_connection(auto& cache, URL::URL const& url, auto j } } else { index = it.index(); + hits++; } + + dbgln_if(REQUESTSERVER_DEBUG, "ConnectionCache: Hits: {}, Misses: {}", RequestServer::ConnectionCache::hits, RequestServer::ConnectionCache::misses); + start_connection(url, job, sockets_for_url, index, elapsed, cache); +} + +template +void start_connection(URL::URL const& url, auto job, auto& sockets_for_url, size_t index, Duration setup_time, Cache& cache) +{ if (sockets_for_url.is_empty()) { Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); - return ReturnType { nullptr }; + return; } auto& connection = *sockets_for_url[index]; + if (connection.is_being_started) { + // Someone else is creating the connection, queue the job and let them handle it. + dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket.ptr()); + auto size = connection.request_queue.with_write_locked([&](auto& queue) { + queue.append(JobData::create(job, url)); + return queue.size(); + }); + connection.max_queue_length = max(connection.max_queue_length, size); + return; + } + if (!connection.has_started) { connection.has_started = true; - Core::deferred_invoke([&connection, url, job] { + Core::deferred_invoke([&connection, &cache, url, job, setup_time] { + (void)setup_time; + auto job_data = JobData::create(job, url); + if constexpr (REQUESTSERVER_DEBUG) { + job_data.timing_info.waiting_in_queue = Duration::from_milliseconds(job_data.timing_info.timer.elapsed_milliseconds()); + job_data.timing_info.timer.start(); + } if (auto result = recreate_socket_if_needed(connection, url); result.is_error()) { - dbgln("ConnectionCache: request failed to start, failed to make a socket: {}", result.error()); + dbgln_if(REQUESTSERVER_DEBUG, "ConnectionCache: request failed to start, failed to make a socket: {}", result.error()); + if constexpr (REQUESTSERVER_DEBUG) { + job_data.timing_info.starting_connection += Duration::from_milliseconds(job_data.timing_info.timer.elapsed_milliseconds()) + setup_time; + job_data.timing_info.timer.start(); + } Core::deferred_invoke([job] { job->fail(Core::NetworkJob::Error::ConnectionFailed); }); } else { - dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket); - connection.removal_timer->stop(); - connection.timer.start(); - connection.current_url = url; - connection.job_data = decltype(connection.job_data)::create(job); - connection.socket->set_notifications_enabled(true); - connection.job_data.start(*connection.socket); + cache.with_write_locked([&](auto&) { + dbgln_if(REQUESTSERVER_DEBUG, "Immediately start request for url {} in {} - {}", url, &connection, connection.socket.ptr()); + connection.job_data = move(job_data); + if constexpr (REQUESTSERVER_DEBUG) { + connection.job_data->timing_info.starting_connection += Duration::from_milliseconds(connection.job_data->timing_info.timer.elapsed_milliseconds()) + setup_time; + connection.job_data->timing_info.timer.start(); + } + connection.removal_timer->stop(); + connection.timer.start(); + connection.current_url = url; + connection.socket->set_notifications_enabled(true); + connection.job_data->start(*connection.socket); + }); } }); } else { - dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket); - connection.request_queue.append(decltype(connection.job_data)::create(job)); - connection.max_queue_length = max(connection.max_queue_length, connection.request_queue.size()); + dbgln_if(REQUESTSERVER_DEBUG, "Enqueue request for URL {} in {} - {}", url, &connection, connection.socket.ptr()); + auto size = connection.request_queue.with_write_locked([&](auto& queue) { + queue.append(JobData::create(job, url)); + return queue.size(); + }); + connection.max_queue_length = max(connection.max_queue_length, size); } - return &connection; } - } diff --git a/Userland/Services/RequestServer/ConnectionFromClient.cpp b/Userland/Services/RequestServer/ConnectionFromClient.cpp index 013147ef16e..53d78a7b681 100644 --- a/Userland/Services/RequestServer/ConnectionFromClient.cpp +++ b/Userland/Services/RequestServer/ConnectionFromClient.cpp @@ -26,121 +26,11 @@ static IDAllocator s_client_ids; ConnectionFromClient::ConnectionFromClient(NonnullOwnPtr socket) : IPC::ConnectionFromClient(*this, move(socket), s_client_ids.allocate()) + , m_thread_pool([this](Work work) { worker_do_work(move(work)); }) { s_connections.set(client_id(), *this); } -void ConnectionFromClient::die() -{ - auto client_id = this->client_id(); - s_connections.remove(client_id); - s_client_ids.deallocate(client_id); - - if (s_connections.is_empty()) - Core::EventLoop::current().quit(0); -} - -Messages::RequestServer::ConnectNewClientResponse ConnectionFromClient::connect_new_client() -{ - int socket_fds[2] {}; - if (auto err = Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds); err.is_error()) { - dbgln("Failed to create client socketpair: {}", err.error()); - return IPC::File {}; - } - - auto client_socket_or_error = Core::LocalSocket::adopt_fd(socket_fds[0]); - if (client_socket_or_error.is_error()) { - close(socket_fds[0]); - close(socket_fds[1]); - dbgln("Failed to adopt client socket: {}", client_socket_or_error.error()); - return IPC::File {}; - } - auto client_socket = client_socket_or_error.release_value(); - // Note: A ref is stored in the static s_connections map - auto client = adopt_ref(*new ConnectionFromClient(move(client_socket))); - - return IPC::File::adopt_fd(socket_fds[1]); -} - -Messages::RequestServer::IsSupportedProtocolResponse ConnectionFromClient::is_supported_protocol(ByteString const& protocol) -{ - bool supported = Protocol::find_by_name(protocol.to_lowercase()); - return supported; -} - -void ConnectionFromClient::start_request(i32 request_id, ByteString const& method, URL::URL const& url, HashMap const& request_headers, ByteBuffer const& request_body, Core::ProxyData const& proxy_data) -{ - if (!url.is_valid()) { - dbgln("StartRequest: Invalid URL requested: '{}'", url); - (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); - return; - } - - auto* protocol = Protocol::find_by_name(url.scheme().to_byte_string()); - if (!protocol) { - dbgln("StartRequest: No protocol handler for URL: '{}'", url); - (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); - return; - } - auto request = protocol->start_request(request_id, *this, method, url, request_headers, request_body, proxy_data); - if (!request) { - dbgln("StartRequest: Protocol handler failed to start request: '{}'", url); - (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); - return; - } - auto id = request->id(); - auto fd = request->request_fd(); - m_requests.set(id, move(request)); - (void)post_message(Messages::RequestClient::RequestStarted(request_id, IPC::File::adopt_fd(fd))); -} - -Messages::RequestServer::StopRequestResponse ConnectionFromClient::stop_request(i32 request_id) -{ - auto* request = const_cast(m_requests.get(request_id).value_or(nullptr)); - bool success = false; - if (request) { - request->stop(); - m_requests.remove(request_id); - success = true; - } - return success; -} - -void ConnectionFromClient::did_receive_headers(Badge, Request& request) -{ - auto response_headers = request.response_headers().clone().release_value_but_fixme_should_propagate_errors(); - async_headers_became_available(request.id(), move(response_headers), request.status_code()); -} - -void ConnectionFromClient::did_finish_request(Badge, Request& request, bool success) -{ - if (request.total_size().has_value()) - async_request_finished(request.id(), success, request.total_size().value()); - - m_requests.remove(request.id()); -} - -void ConnectionFromClient::did_progress_request(Badge, Request& request) -{ - async_request_progress(request.id(), request.total_size(), request.downloaded_size()); -} - -void ConnectionFromClient::did_request_certificates(Badge, Request& request) -{ - async_certificate_requested(request.id()); -} - -Messages::RequestServer::SetCertificateResponse ConnectionFromClient::set_certificate(i32 request_id, ByteString const& certificate, ByteString const& key) -{ - auto* request = const_cast(m_requests.get(request_id).value_or(nullptr)); - bool success = false; - if (request) { - request->set_certificate(certificate, key); - success = true; - } - return success; -} - class Job : public RefCounted , public Weakable { public: @@ -183,6 +73,195 @@ private: inline static HashMap> s_jobs {}; }; +template +IterationDecision ConnectionFromClient::Looper::next(Pool& pool, bool wait) +{ + bool should_exit = false; + auto timer = Core::Timer::create_repeating(100, [&] { + if (Threading::ThreadPoolLooper::next(pool, false) == IterationDecision::Break) { + event_loop.quit(0); + should_exit = true; + } + }); + + timer->start(); + if (!wait) { + event_loop.deferred_invoke([&] { + event_loop.quit(0); + }); + } + + event_loop.exec(); + + if (should_exit) + return IterationDecision::Break; + return IterationDecision::Continue; +} + +void ConnectionFromClient::worker_do_work(Work work) +{ + work.visit( + [&](StartRequest& start_request) { + auto* protocol = Protocol::find_by_name(start_request.url.scheme().to_byte_string()); + if (!protocol) { + dbgln("StartRequest: No protocol handler for URL: '{}'", start_request.url); + (void)post_message(Messages::RequestClient::RequestFinished(start_request.request_id, false, 0)); + return; + } + auto request = protocol->start_request(start_request.request_id, *this, start_request.method, start_request.url, start_request.request_headers, start_request.request_body, start_request.proxy_data); + if (!request) { + dbgln("StartRequest: Protocol handler failed to start request: '{}'", start_request.url); + (void)post_message(Messages::RequestClient::RequestFinished(start_request.request_id, false, 0)); + return; + } + auto id = request->id(); + auto fd = request->request_fd(); + m_requests.with_locked([&](auto& map) { map.set(id, move(request)); }); + (void)post_message(Messages::RequestClient::RequestStarted(start_request.request_id, IPC::File::adopt_fd(fd))); + }, + [&](EnsureConnection& ensure_connection) { + auto& url = ensure_connection.url; + auto& cache_level = ensure_connection.cache_level; + + if (cache_level == CacheLevel::ResolveOnly) { + Core::deferred_invoke([host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string()] { + dbgln("EnsureConnection: DNS-preload for {}", host); + auto resolved_host = Core::Socket::resolve_host(host, Core::Socket::SocketType::Stream); + if (resolved_host.is_error()) + dbgln("EnsureConnection: DNS-preload failed for {}", host); + }); + dbgln("EnsureConnection: DNS-preload for {} done", url); + return; + } + + auto job = Job::ensure(url); + dbgln("EnsureConnection: Pre-connect to {}", url); + auto do_preconnect = [&](auto& cache) { + ConnectionCache::ensure_connection(cache, url, job); + }; + + if (url.scheme() == "http"sv) + do_preconnect(ConnectionCache::g_tcp_connection_cache); + else if (url.scheme() == "https"sv) + do_preconnect(ConnectionCache::g_tls_connection_cache); + else + dbgln("EnsureConnection: Invalid URL scheme: '{}'", url.scheme()); + }, + [&](Empty) {}); +} + +void ConnectionFromClient::die() +{ + auto client_id = this->client_id(); + s_connections.remove(client_id); + s_client_ids.deallocate(client_id); + + if (s_connections.is_empty()) + Core::EventLoop::current().quit(0); +} + +Messages::RequestServer::ConnectNewClientResponse ConnectionFromClient::connect_new_client() +{ + int socket_fds[2] {}; + if (auto err = Core::System::socketpair(AF_LOCAL, SOCK_STREAM, 0, socket_fds); err.is_error()) { + dbgln("Failed to create client socketpair: {}", err.error()); + return IPC::File {}; + } + + auto client_socket_or_error = Core::LocalSocket::adopt_fd(socket_fds[0]); + if (client_socket_or_error.is_error()) { + close(socket_fds[0]); + close(socket_fds[1]); + dbgln("Failed to adopt client socket: {}", client_socket_or_error.error()); + return IPC::File {}; + } + auto client_socket = client_socket_or_error.release_value(); + // Note: A ref is stored in the static s_connections map + auto client = adopt_ref(*new ConnectionFromClient(move(client_socket))); + + return IPC::File::adopt_fd(socket_fds[1]); +} + +void ConnectionFromClient::enqueue(Work work) +{ + m_thread_pool.submit(move(work)); +} + +Messages::RequestServer::IsSupportedProtocolResponse ConnectionFromClient::is_supported_protocol(ByteString const& protocol) +{ + bool supported = Protocol::find_by_name(protocol.to_lowercase()); + return supported; +} + +void ConnectionFromClient::start_request(i32 request_id, ByteString const& method, URL::URL const& url, HashMap const& request_headers, ByteBuffer const& request_body, Core::ProxyData const& proxy_data) +{ + if (!url.is_valid()) { + dbgln("StartRequest: Invalid URL requested: '{}'", url); + (void)post_message(Messages::RequestClient::RequestFinished(request_id, false, 0)); + return; + } + + enqueue(StartRequest { + .request_id = request_id, + .method = method, + .url = url, + .request_headers = request_headers, + .request_body = request_body, + .proxy_data = proxy_data, + }); +} + +Messages::RequestServer::StopRequestResponse ConnectionFromClient::stop_request(i32 request_id) +{ + return m_requests.with_locked([&](auto& map) { + auto* request = const_cast(map.get(request_id).value_or(nullptr)); + bool success = false; + if (request) { + request->stop(); + map.remove(request_id); + success = true; + } + return success; + }); +} + +void ConnectionFromClient::did_receive_headers(Badge, Request& request) +{ + auto response_headers = request.response_headers().clone().release_value_but_fixme_should_propagate_errors(); + async_headers_became_available(request.id(), move(response_headers), request.status_code()); +} + +void ConnectionFromClient::did_finish_request(Badge, Request& request, bool success) +{ + if (request.total_size().has_value()) + async_request_finished(request.id(), success, request.total_size().value()); + + m_requests.with_locked([&](auto& map) { map.remove(request.id()); }); +} + +void ConnectionFromClient::did_progress_request(Badge, Request& request) +{ + async_request_progress(request.id(), request.total_size(), request.downloaded_size()); +} + +void ConnectionFromClient::did_request_certificates(Badge, Request& request) +{ + async_certificate_requested(request.id()); +} + +Messages::RequestServer::SetCertificateResponse ConnectionFromClient::set_certificate(i32 request_id, ByteString const& certificate, ByteString const& key) +{ + return m_requests.with_locked([&](auto& map) { + auto* request = const_cast(map.get(request_id).value_or(nullptr)); + bool success = false; + if (request) { + request->set_certificate(certificate, key); + success = true; + } + return success; + }); +} + void ConnectionFromClient::ensure_connection(URL::URL const& url, ::RequestServer::CacheLevel const& cache_level) { if (!url.is_valid()) { @@ -190,30 +269,10 @@ void ConnectionFromClient::ensure_connection(URL::URL const& url, ::RequestServe return; } - if (cache_level == CacheLevel::ResolveOnly) { - return Core::deferred_invoke([host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string()] { - dbgln("EnsureConnection: DNS-preload for {}", host); - auto resolved_host = Core::Socket::resolve_host(host, Core::Socket::SocketType::Stream); - if (resolved_host.is_error()) - dbgln("EnsureConnection: DNS-preload failed for {}", host); - }); - } - - auto job = Job::ensure(url); - dbgln("EnsureConnection: Pre-connect to {}", url); - auto do_preconnect = [&](auto& cache) { - auto serialized_host = url.serialized_host().release_value_but_fixme_should_propagate_errors().to_byte_string(); - auto it = cache.find({ serialized_host, url.port_or_default() }); - if (it == cache.end() || it->value->is_empty()) - ConnectionCache::get_or_create_connection(cache, url, job); - }; - - if (url.scheme() == "http"sv) - do_preconnect(ConnectionCache::g_tcp_connection_cache); - else if (url.scheme() == "https"sv) - do_preconnect(ConnectionCache::g_tls_connection_cache); - else - dbgln("EnsureConnection: Invalid URL scheme: '{}'", url.scheme()); + enqueue(EnsureConnection { + .url = url, + .cache_level = cache_level, + }); } static i32 s_next_websocket_id = 1; diff --git a/Userland/Services/RequestServer/ConnectionFromClient.h b/Userland/Services/RequestServer/ConnectionFromClient.h index 709d5b3b6e3..f1f398e594a 100644 --- a/Userland/Services/RequestServer/ConnectionFromClient.h +++ b/Userland/Services/RequestServer/ConnectionFromClient.h @@ -7,7 +7,10 @@ #pragma once #include +#include #include +#include +#include #include #include #include @@ -46,8 +49,36 @@ private: virtual void websocket_close(i32, u16, ByteString const&) override; virtual Messages::RequestServer::WebsocketSetCertificateResponse websocket_set_certificate(i32, ByteString const&, ByteString const&) override; - HashMap> m_requests; + struct StartRequest { + i32 request_id; + ByteString method; + URL::URL url; + HashMap request_headers; + ByteBuffer request_body; + Core::ProxyData proxy_data; + }; + + struct EnsureConnection { + URL::URL url; + CacheLevel cache_level; + }; + + using Work = Variant; + + void worker_do_work(Work); + + Threading::MutexProtected>> m_requests; HashMap> m_websockets; + + void enqueue(Work); + + template + struct Looper : public Threading::ThreadPoolLooper { + IterationDecision next(Pool& pool, bool wait); + Core::EventLoop event_loop; + }; + + Threading::ThreadPool m_thread_pool; }; } diff --git a/Userland/Services/RequestServer/GeminiProtocol.cpp b/Userland/Services/RequestServer/GeminiProtocol.cpp index 9c79d8ed3ee..eba6f7a7955 100644 --- a/Userland/Services/RequestServer/GeminiProtocol.cpp +++ b/Userland/Services/RequestServer/GeminiProtocol.cpp @@ -32,7 +32,7 @@ OwnPtr GeminiProtocol::start_request(i32 request_id, ConnectionFromClie protocol_request->set_request_fd(pipe_result.value().read_fd); Core::EventLoop::current().deferred_invoke([=] { - ConnectionCache::get_or_create_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); + ConnectionCache::ensure_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); }); return protocol_request; diff --git a/Userland/Services/RequestServer/HttpCommon.h b/Userland/Services/RequestServer/HttpCommon.h index 386991012a0..3323dc6274a 100644 --- a/Userland/Services/RequestServer/HttpCommon.h +++ b/Userland/Services/RequestServer/HttpCommon.h @@ -102,10 +102,12 @@ OwnPtr start_request(TBadgedProtocol&& protocol, i32 request_id, Connec auto protocol_request = TRequest::create_with_job(forward(protocol), client, (TJob&)*job, move(output_stream), request_id); protocol_request->set_request_fd(pipe_result.value().read_fd); - if constexpr (IsSame) - ConnectionCache::get_or_create_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); - else - ConnectionCache::get_or_create_connection(ConnectionCache::g_tcp_connection_cache, url, job, proxy_data); + Core::deferred_invoke([=] { + if constexpr (IsSame) + ConnectionCache::ensure_connection(ConnectionCache::g_tls_connection_cache, url, job, proxy_data); + else + ConnectionCache::ensure_connection(ConnectionCache::g_tcp_connection_cache, url, job, proxy_data); + }); return protocol_request; } diff --git a/Userland/Services/RequestServer/main.cpp b/Userland/Services/RequestServer/main.cpp index 17b2234f86b..823c296d1bd 100644 --- a/Userland/Services/RequestServer/main.cpp +++ b/Userland/Services/RequestServer/main.cpp @@ -20,18 +20,18 @@ ErrorOr serenity_main(Main::Arguments) { if constexpr (TLS_SSL_KEYLOG_DEBUG) - TRY(Core::System::pledge("stdio inet accept unix cpath wpath rpath sendfd recvfd sigaction")); + TRY(Core::System::pledge("stdio inet accept thread unix cpath wpath rpath sendfd recvfd sigaction")); else - TRY(Core::System::pledge("stdio inet accept unix rpath sendfd recvfd sigaction")); + TRY(Core::System::pledge("stdio inet accept thread unix rpath sendfd recvfd sigaction")); #ifdef SIGINFO signal(SIGINFO, [](int) { RequestServer::ConnectionCache::dump_jobs(); }); #endif if constexpr (TLS_SSL_KEYLOG_DEBUG) - TRY(Core::System::pledge("stdio inet accept unix cpath wpath rpath sendfd recvfd")); + TRY(Core::System::pledge("stdio inet accept thread unix cpath wpath rpath sendfd recvfd")); else - TRY(Core::System::pledge("stdio inet accept unix rpath sendfd recvfd")); + TRY(Core::System::pledge("stdio inet accept thread unix rpath sendfd recvfd")); // Ensure the certificates are read out here. // FIXME: Allow specifying extra certificates on the command line, or in other configuration. @@ -40,6 +40,7 @@ ErrorOr serenity_main(Main::Arguments) Core::EventLoop event_loop; // FIXME: Establish a connection to LookupServer and then drop "unix"? TRY(Core::System::unveil("/tmp/portal/lookup", "rw")); + TRY(Core::System::unveil("/etc/cacert.pem", "rw")); TRY(Core::System::unveil("/etc/timezone", "r")); if constexpr (TLS_SSL_KEYLOG_DEBUG) TRY(Core::System::unveil("/home/anon", "rwc"));