|
|
|
|
@ -127,26 +127,29 @@ class ExternalMessageConsumer:
|
|
|
|
|
|
|
|
|
|
self._channel_streams = {}
|
|
|
|
|
|
|
|
|
|
asyncio.run_coroutine_threadsafe(self._shutdown_async(), loop=self._loop)
|
|
|
|
|
|
|
|
|
|
self._thread.join(timeout=5)
|
|
|
|
|
|
|
|
|
|
self._thread = None
|
|
|
|
|
self._loop = None
|
|
|
|
|
self._sub_tasks = None
|
|
|
|
|
self._main_task = None
|
|
|
|
|
|
|
|
|
|
async def _shutdown_async(self):
|
|
|
|
|
"""Cancel all tasks, let them finish, then stop the loop."""
|
|
|
|
|
if self._sub_tasks:
|
|
|
|
|
# Cancel sub tasks
|
|
|
|
|
for task in self._sub_tasks:
|
|
|
|
|
task.cancel()
|
|
|
|
|
await asyncio.gather(*self._sub_tasks, return_exceptions=True)
|
|
|
|
|
|
|
|
|
|
if self._main_task:
|
|
|
|
|
# Cancel the main task
|
|
|
|
|
self._main_task.cancel()
|
|
|
|
|
|
|
|
|
|
self._thread.join()
|
|
|
|
|
|
|
|
|
|
self._thread = None
|
|
|
|
|
self._loop = None
|
|
|
|
|
self._sub_tasks = None
|
|
|
|
|
self._main_task = None
|
|
|
|
|
self._loop.stop()
|
|
|
|
|
|
|
|
|
|
async def _main(self):
|
|
|
|
|
"""
|
|
|
|
|
The main task coroutine
|
|
|
|
|
"""
|
|
|
|
|
"""The main task coroutine"""
|
|
|
|
|
lock = asyncio.Lock()
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
@ -161,6 +164,7 @@ class ExternalMessageConsumer:
|
|
|
|
|
pass
|
|
|
|
|
finally:
|
|
|
|
|
# Stop the loop once we are done
|
|
|
|
|
if self._loop:
|
|
|
|
|
self._loop.stop()
|
|
|
|
|
|
|
|
|
|
async def _handle_producer_connection(self, producer: Producer, lock: asyncio.Lock):
|
|
|
|
|
|