djangofloor.tests

class djangofloor.tests.SignalQueue(immediate_execution: bool = False)[source]

Allow to keep called signals in memory instead of actualling sending them to Redis, for testing purposes.

>>> from djangofloor.tasks import scall, SERVER
>>> from djangofloor.wsgi.window_info import WindowInfo
>>> from djangofloor.wsgi.topics import serialize_topic
>>> from djangofloor.decorators import signal
>>> # noinspection PyUnusedLocal
>>> @signal(path='test.signal', queue='demo-queue')
>>> def test_signal(window_info, value=None):
>>>     print(value)
>>>
>>> wi = WindowInfo()
>>> with SignalQueue() as fd:
>>>     scall(wi, 'test.signal', to=[SERVER, 1], value="test value")
>>> 'demo-queue' in fd.python_signals
True
>>> serialize_topic(wi, 1) in fd.ws_signals
True
>>> fd.ws_signals[serialize_topic(wi, 1)]
('test.signal', {'value': 'test value'})
>>> fd.execute_delayed_signals()
'test value'

Of course, you should use this example from a method of a django.test.TestCase.

activate()[source]

replace private function that push signal calls to Celery or Websockets

deactivate()[source]

replace the normal private functions for calling Celery or websockets signals

execute_delayed_signals(queues=None)[source]

execute the Celery signals