|
|
|
|
@ -2,7 +2,6 @@
|
|
|
|
|
Unit test file for rpc/external_message_consumer.py
|
|
|
|
|
"""
|
|
|
|
|
import asyncio
|
|
|
|
|
import functools
|
|
|
|
|
import logging
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from unittest.mock import MagicMock
|
|
|
|
|
@ -302,19 +301,16 @@ async def test_emc_receive_messages_valid(default_conf, caplog, mocker):
|
|
|
|
|
dp = DataProvider(default_conf, None, None, None)
|
|
|
|
|
emc = ExternalMessageConsumer(default_conf, dp)
|
|
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
def change_running(emc): emc._running = not emc._running
|
|
|
|
|
|
|
|
|
|
class TestChannel:
|
|
|
|
|
async def recv(self, *args, **kwargs):
|
|
|
|
|
emc._running = False
|
|
|
|
|
return {"type": "whitelist", "data": ["BTC/USDT"]}
|
|
|
|
|
|
|
|
|
|
async def ping(self, *args, **kwargs):
|
|
|
|
|
return asyncio.Future()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
change_running(emc)
|
|
|
|
|
loop.call_soon(functools.partial(change_running, emc=emc))
|
|
|
|
|
emc._running = True
|
|
|
|
|
await emc._receive_messages(TestChannel(), test_producer, lock)
|
|
|
|
|
|
|
|
|
|
assert log_has_re(r"Received message of type `whitelist`.+", caplog)
|
|
|
|
|
@ -349,19 +345,16 @@ async def test_emc_receive_messages_invalid(default_conf, caplog, mocker):
|
|
|
|
|
dp = DataProvider(default_conf, None, None, None)
|
|
|
|
|
emc = ExternalMessageConsumer(default_conf, dp)
|
|
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
def change_running(emc): emc._running = not emc._running
|
|
|
|
|
|
|
|
|
|
class TestChannel:
|
|
|
|
|
async def recv(self, *args, **kwargs):
|
|
|
|
|
emc._running = False
|
|
|
|
|
return {"type": ["BTC/USDT"]}
|
|
|
|
|
|
|
|
|
|
async def ping(self, *args, **kwargs):
|
|
|
|
|
return asyncio.Future()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
change_running(emc)
|
|
|
|
|
loop.call_soon(functools.partial(change_running, emc=emc))
|
|
|
|
|
emc._running = True
|
|
|
|
|
await emc._receive_messages(TestChannel(), test_producer, lock)
|
|
|
|
|
|
|
|
|
|
assert log_has_re(r"Invalid message from.+", caplog)
|
|
|
|
|
@ -396,8 +389,8 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
|
|
|
|
|
dp = DataProvider(default_conf, None, None, None)
|
|
|
|
|
emc = ExternalMessageConsumer(default_conf, dp)
|
|
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
def change_running(emc): emc._running = not emc._running
|
|
|
|
|
def change_running():
|
|
|
|
|
emc._running = not emc._running
|
|
|
|
|
|
|
|
|
|
class TestChannel:
|
|
|
|
|
async def recv(self, *args, **kwargs):
|
|
|
|
|
@ -407,8 +400,7 @@ async def test_emc_receive_messages_timeout(default_conf, caplog, mocker):
|
|
|
|
|
return asyncio.Future()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
change_running(emc)
|
|
|
|
|
loop.call_soon(functools.partial(change_running, emc=emc))
|
|
|
|
|
change_running()
|
|
|
|
|
|
|
|
|
|
with pytest.raises(asyncio.TimeoutError):
|
|
|
|
|
await emc._receive_messages(TestChannel(), test_producer, lock)
|
|
|
|
|
@ -447,19 +439,16 @@ async def test_emc_receive_messages_handle_error(default_conf, caplog, mocker):
|
|
|
|
|
|
|
|
|
|
emc.handle_producer_message = MagicMock(side_effect=Exception)
|
|
|
|
|
|
|
|
|
|
loop = asyncio.get_event_loop()
|
|
|
|
|
def change_running(emc): emc._running = not emc._running
|
|
|
|
|
|
|
|
|
|
class TestChannel:
|
|
|
|
|
async def recv(self, *args, **kwargs):
|
|
|
|
|
emc._running = False
|
|
|
|
|
return {"type": "whitelist", "data": ["BTC/USDT"]}
|
|
|
|
|
|
|
|
|
|
async def ping(self, *args, **kwargs):
|
|
|
|
|
return asyncio.Future()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
change_running(emc)
|
|
|
|
|
loop.call_soon(functools.partial(change_running, emc=emc))
|
|
|
|
|
emc._running = True
|
|
|
|
|
await emc._receive_messages(TestChannel(), test_producer, lock)
|
|
|
|
|
|
|
|
|
|
assert log_has_re(r"Error handling producer message.+", caplog)
|
|
|
|
|
|