mirror of
https://git.deluge-torrent.org/deluge
synced 2025-08-08 01:18:39 +00:00
[Core] Call wait_for_alert in a thread
This spawns a thread in alertmanager to call wait_for_alert in a thread. This reduces latency to deluge responding to events. And removes all `hasattr` checks in Component Closes: https://github.com/deluge-torrent/deluge/pull/221
This commit is contained in:
parent
c7dc60571e
commit
b5f8c5af2d
4 changed files with 85 additions and 10 deletions
|
@ -64,9 +64,9 @@ class Component:
|
||||||
paused by instructing the :class:`ComponentRegistry` to pause
|
paused by instructing the :class:`ComponentRegistry` to pause
|
||||||
this Component.
|
this Component.
|
||||||
|
|
||||||
**pause()** - This method is called when the component is being paused.
|
**pause()** - This method is called when the component is being paused.
|
||||||
|
|
||||||
**resume()** - This method is called when the component resumes from a Paused
|
**resume()** - This method is called when the component resumes from a Paused
|
||||||
state.
|
state.
|
||||||
|
|
||||||
**shutdown()** - This method is called when the client is exiting. If the
|
**shutdown()** - This method is called when the client is exiting. If the
|
||||||
|
@ -85,7 +85,7 @@ class Component:
|
||||||
|
|
||||||
**Stopped** - The Component has either been stopped or has yet to be started.
|
**Stopped** - The Component has either been stopped or has yet to be started.
|
||||||
|
|
||||||
**Stopping** - The Component has had it's stop method called, but it hasn't
|
**Stopping** - The Component has had its stop method called, but it hasn't
|
||||||
fully stopped yet.
|
fully stopped yet.
|
||||||
|
|
||||||
**Paused** - The Component has had its update timer stopped, but will
|
**Paused** - The Component has had its update timer stopped, but will
|
||||||
|
|
|
@ -42,11 +42,13 @@ def mock_callback():
|
||||||
The returned Mock instance will have a `deferred` attribute which will complete when the callback has been called.
|
The returned Mock instance will have a `deferred` attribute which will complete when the callback has been called.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def reset():
|
def reset(timeout=0.5, *args, **kwargs):
|
||||||
if mock.called:
|
if mock.called:
|
||||||
original_reset_mock()
|
original_reset_mock(*args, **kwargs)
|
||||||
deferred = Deferred()
|
if mock.deferred:
|
||||||
deferred.addTimeout(0.5, reactor)
|
mock.deferred.cancel()
|
||||||
|
deferred = Deferred(canceller=lambda x: deferred.callback(None))
|
||||||
|
deferred.addTimeout(timeout, reactor)
|
||||||
mock.side_effect = lambda *args, **kw: deferred.callback((args, kw))
|
mock.side_effect = lambda *args, **kw: deferred.callback((args, kw))
|
||||||
mock.deferred = deferred
|
mock.deferred = deferred
|
||||||
|
|
||||||
|
|
|
@ -16,11 +16,12 @@ This should typically only be used by the Core. Plugins should utilize the
|
||||||
"""
|
"""
|
||||||
import contextlib
|
import contextlib
|
||||||
import logging
|
import logging
|
||||||
|
import threading
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
from typing import Any, Callable
|
from typing import Any, Callable
|
||||||
|
|
||||||
from twisted.internet import reactor
|
from twisted.internet import reactor, threads
|
||||||
|
|
||||||
import deluge.component as component
|
import deluge.component as component
|
||||||
from deluge._libtorrent import lt
|
from deluge._libtorrent import lt
|
||||||
|
@ -34,7 +35,7 @@ class AlertManager(component.Component):
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
log.debug('AlertManager init...')
|
log.debug('AlertManager init...')
|
||||||
component.Component.__init__(self, 'AlertManager', interval=0.3)
|
component.Component.__init__(self, 'AlertManager')
|
||||||
self.session = component.get('Core').session
|
self.session = component.get('Core').session
|
||||||
|
|
||||||
# Increase the alert queue size so that alerts don't get lost.
|
# Increase the alert queue size so that alerts don't get lost.
|
||||||
|
@ -57,10 +58,17 @@ class AlertManager(component.Component):
|
||||||
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
|
# handlers is a dictionary of lists {"alert_type": [handler1,h2,..]}
|
||||||
self.handlers = defaultdict(list)
|
self.handlers = defaultdict(list)
|
||||||
self.delayed_calls = []
|
self.delayed_calls = []
|
||||||
|
self._event = threading.Event()
|
||||||
|
|
||||||
def update(self):
|
def update(self):
|
||||||
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
|
self.delayed_calls = [dc for dc in self.delayed_calls if dc.active()]
|
||||||
self.handle_alerts()
|
|
||||||
|
def start(self):
|
||||||
|
thread = threading.Thread(
|
||||||
|
target=self.wait_for_alert_in_thread, name='alert-poller', daemon=True
|
||||||
|
)
|
||||||
|
thread.start()
|
||||||
|
self._event.set()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
for delayed_call in self.delayed_calls:
|
for delayed_call in self.delayed_calls:
|
||||||
|
@ -68,6 +76,23 @@ class AlertManager(component.Component):
|
||||||
delayed_call.cancel()
|
delayed_call.cancel()
|
||||||
self.delayed_calls = []
|
self.delayed_calls = []
|
||||||
|
|
||||||
|
def pause(self):
|
||||||
|
self._event.clear()
|
||||||
|
|
||||||
|
def resume(self):
|
||||||
|
self._event.set()
|
||||||
|
|
||||||
|
def wait_for_alert_in_thread(self):
|
||||||
|
while self._component_state not in ('Stopping', 'Stopped'):
|
||||||
|
if self.session.wait_for_alert(1000) is None:
|
||||||
|
continue
|
||||||
|
if self._event.wait():
|
||||||
|
threads.blockingCallFromThread(reactor, self.maybe_handle_alerts)
|
||||||
|
|
||||||
|
def maybe_handle_alerts(self):
|
||||||
|
if self._component_state == 'Started':
|
||||||
|
self.handle_alerts()
|
||||||
|
|
||||||
def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
|
def register_handler(self, alert_type: str, handler: Callable[[Any], None]) -> None:
|
||||||
"""
|
"""
|
||||||
Registers a function that will be called when 'alert_type' is pop'd
|
Registers a function that will be called when 'alert_type' is pop'd
|
||||||
|
|
|
@ -3,17 +3,47 @@
|
||||||
# the additional special exception to link portions of this program with the OpenSSL library.
|
# the additional special exception to link portions of this program with the OpenSSL library.
|
||||||
# See LICENSE for more details.
|
# See LICENSE for more details.
|
||||||
#
|
#
|
||||||
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
import pytest_twisted
|
||||||
|
|
||||||
import deluge.component as component
|
import deluge.component as component
|
||||||
from deluge.conftest import BaseTestCase
|
from deluge.conftest import BaseTestCase
|
||||||
from deluge.core.core import Core
|
from deluge.core.core import Core
|
||||||
|
|
||||||
|
|
||||||
|
class DummyAlert1:
|
||||||
|
def __init__(self):
|
||||||
|
self.message = '1'
|
||||||
|
|
||||||
|
|
||||||
|
class DummyAlert2:
|
||||||
|
def __init__(self):
|
||||||
|
self.message = '2'
|
||||||
|
|
||||||
|
|
||||||
|
class SessionMock:
|
||||||
|
def __init__(self):
|
||||||
|
self.alerts = []
|
||||||
|
|
||||||
|
def set_alerts(self):
|
||||||
|
self.alerts = [DummyAlert1(), DummyAlert2()]
|
||||||
|
|
||||||
|
def wait_for_alert(self, timeout):
|
||||||
|
return self.alerts[0] if len(self.alerts) > 0 else None
|
||||||
|
|
||||||
|
def pop_alerts(self):
|
||||||
|
alerts = self.alerts
|
||||||
|
self.alerts = []
|
||||||
|
return alerts
|
||||||
|
|
||||||
|
|
||||||
class TestAlertManager(BaseTestCase):
|
class TestAlertManager(BaseTestCase):
|
||||||
def set_up(self):
|
def set_up(self):
|
||||||
self.core = Core()
|
self.core = Core()
|
||||||
self.core.config.config['lsd'] = False
|
self.core.config.config['lsd'] = False
|
||||||
self.am = component.get('AlertManager')
|
self.am = component.get('AlertManager')
|
||||||
|
self.am.session = SessionMock()
|
||||||
return component.start(['AlertManager'])
|
return component.start(['AlertManager'])
|
||||||
|
|
||||||
def tear_down(self):
|
def tear_down(self):
|
||||||
|
@ -28,6 +58,24 @@ class TestAlertManager(BaseTestCase):
|
||||||
assert self.am.handlers['dummy1'] == [handler]
|
assert self.am.handlers['dummy1'] == [handler]
|
||||||
assert self.am.handlers['dummy2'] == [handler]
|
assert self.am.handlers['dummy2'] == [handler]
|
||||||
|
|
||||||
|
@pytest_twisted.ensureDeferred
|
||||||
|
async def test_pop_alert(self, mock_callback):
|
||||||
|
mock_callback.reset_mock()
|
||||||
|
self.am.register_handler('DummyAlert1', mock_callback)
|
||||||
|
self.am.session.set_alerts()
|
||||||
|
await mock_callback.deferred
|
||||||
|
mock_callback.assert_called_once_with(SimpleNamespace(message='1'))
|
||||||
|
|
||||||
|
@pytest_twisted.ensureDeferred
|
||||||
|
async def test_pause_not_pop_alert(self, mock_callback):
|
||||||
|
await component.pause(['AlertManager'])
|
||||||
|
self.am.register_handler('DummyAlert1', mock_callback)
|
||||||
|
self.am.session.set_alerts()
|
||||||
|
await mock_callback.deferred
|
||||||
|
mock_callback.assert_not_called()
|
||||||
|
assert not self.am._event.isSet()
|
||||||
|
assert len(self.am.session.alerts) == 2
|
||||||
|
|
||||||
def test_deregister_handler(self):
|
def test_deregister_handler(self):
|
||||||
def handler(alert):
|
def handler(alert):
|
||||||
...
|
...
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue