import json
from djangofloor import tasks as tasks_module
__author__ = "Matthieu Gallet"
[docs]class SignalQueue:
"""Allow to keep called signals in memory instead of actualling sending them to Redis, for testing purposes.
>>> from djangofloor.tasks import scall, SERVER
>>> from djangofloor.wsgi.window_info import WindowInfo
>>> from djangofloor.wsgi.topics import serialize_topic
>>> from djangofloor.decorators import signal
>>> # noinspection PyUnusedLocal
>>> @signal(path='test.signal', queue='demo-queue')
>>> def test_signal(window_info, value=None):
>>> print(value)
>>>
>>> wi = WindowInfo()
>>> with SignalQueue() as fd:
>>> scall(wi, 'test.signal', to=[SERVER, 1], value="test value")
>>> 'demo-queue' in fd.python_signals
True
>>> serialize_topic(wi, 1) in fd.ws_signals
True
>>> fd.ws_signals[serialize_topic(wi, 1)]
('test.signal', {'value': 'test value'})
>>> fd.execute_delayed_signals()
'test value'
Of course, you should use this example from a method of a :class:`django.test.TestCase`.
"""
def __init__(self, immediate_execution: bool = False):
self.immediate = immediate_execution
self.ws_signals = {} # javascript_signals[client] = [signal1, signal2, …]
self.python_signals = {} # javascript_signals[queue] = [signal1, signal2, …]
self._old_async_method = None
self._old_ws_function = None
[docs] def activate(self):
"""replace private function that push signal calls to Celery or Websockets """
task_function = getattr(tasks_module, "_server_signal_call")
encoder = getattr(tasks_module, "_signal_encoder")
if self.immediate:
# noinspection PyUnusedLocal
def apply_async(signal_arguments, queue=None, **kwargs):
json.dumps(
signal_arguments, cls=encoder
) # to check if args are JSON-serializable
return task_function(*signal_arguments)
else:
# noinspection PyUnusedLocal
def apply_async(signal_arguments, queue=None, **kwargs):
json.dumps(
signal_arguments, cls=encoder
) # to check if args are JSON-serializable
self.python_signals.setdefault(queue, []).append(signal_arguments)
self._old_async_method = getattr(task_function, "apply_async")
setattr(task_function, "apply_async", apply_async)
# noinspection PyUnusedLocal
def ws_signal_call(signal_name, signal_id, serialized_topic, kwargs):
json.dumps(kwargs, cls=encoder) # to check if args are JSON-serializable
self.ws_signals.setdefault(serialized_topic, []).append(
(signal_name, kwargs)
)
self._old_ws_function = getattr(tasks_module, "_call_ws_signal")
setattr(tasks_module, "_call_ws_signal", ws_signal_call)
[docs] def deactivate(self):
""" replace the normal private functions for calling Celery or websockets signals"""
task = getattr(tasks_module, "_server_signal_call")
setattr(task, "apply_async", self._old_async_method)
self._old_async_method = None
setattr(tasks_module, "_call_ws_signal", self._old_ws_function)
self._old_ws_function = None
[docs] def execute_delayed_signals(self, queues=None):
""" execute the Celery signals """
task = getattr(tasks_module, "_server_signal_call")
if queues is None:
queues = self.python_signals.keys()
for queue in queues:
while queues[queue]:
signal_arguments = self.python_signals[queue].pop()
task(*signal_arguments)
def __enter__(self):
self.activate()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.deactivate()