From bf2e3ee0b7b70c0d185e655d595981776930e563 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Wed, 22 May 2024 10:53:09 -0700 Subject: [PATCH 1/3] Add `TestClientConnections` to test spawning multiple clients and concurrent handling of Messages --- test/requirements.txt | 3 +- test/unittests/test_client.py | 101 +++++++++++++++++++++++++++++++++- 2 files changed, 102 insertions(+), 2 deletions(-) diff --git a/test/requirements.txt b/test/requirements.txt index 2225b64..7fe5533 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,3 +1,4 @@ pytest pytest-cov -mycroft-messagebus-client \ No newline at end of file +mycroft-messagebus-client +ovos-messagebus \ No newline at end of file diff --git a/test/unittests/test_client.py b/test/unittests/test_client.py index 91f82ae..849cd82 100644 --- a/test/unittests/test_client.py +++ b/test/unittests/test_client.py @@ -9,12 +9,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import secrets import unittest +from multiprocessing import Process, Event +from threading import Thread + +from time import sleep, time from unittest.mock import call, Mock, patch from pyee import ExecutorEventEmitter +import ovos_messagebus.__main__ from ovos_bus_client.message import Message from ovos_bus_client.client.client import MessageBusClient, GUIWebsocketClient from ovos_bus_client.client import MessageWaiter, MessageCollector @@ -217,3 +222,97 @@ def test_message_drop_invalid(self): collector._receive_response(valid_response) collector._receive_response(invalid_response) assert collector.collect() == [valid_response] + + +class TestClientConnections(unittest.TestCase): + service_proc: Process = None + num_clients = 128 + clients = [] + + @classmethod + def setUpClass(cls): + from ovos_messagebus.__main__ import main + ovos_messagebus.__main__.reset_sigint_handler = Mock() + ready_event = Event() + + def ready(): + ready_event.set() + + cls.service_proc = Process(target=main, args=(ready,)) + cls.service_proc.start() + if not ready_event.wait(10): + raise TimeoutError("Timed out waiting for bus service to start") + + def tearDown(self): + for client in self.clients: + client.close() + self.clients = [] + + @classmethod + def tearDownClass(cls): + cls.service_proc.terminate() + cls.service_proc.join(timeout=5) + cls.service_proc.kill() + + def test_create_clients(self): + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + + self.assertEqual(len(self.clients), self.num_clients) + self.assertTrue(all((client.connected_event.is_set() + for client in self.clients))) + + for client in self.clients: + client.close() + self.assertFalse(client.connected_event.is_set()) + self.clients = [] + + def test_handle_messages(self): + handled = [] + test_messages = [] + + def handler(message): + self.assertIsInstance(message, Message) + self.assertIsInstance(message.data['test'], str) + self.assertIsInstance(message.context['test'], str) + handled.append(message) + + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + client.on("test.message", handler) + client.on(f"test.message{i}", handler) + test_messages.append(Message(f"test.message{i}", + {"test": secrets.token_hex(1024)}, + {"test": secrets.token_hex(512)})) + + sender = MessageBusClient() + sender.run_in_thread() + self.assertTrue(sender.connected_event.wait(5)) + + # Send one message to many handlers + test_message = Message("test.message", {"test": ""}, {"test": ""}) + sender.emit(test_message) + timeout = time() + 10 + while len(handled) < self.num_clients and time() < timeout: + sleep(1) + self.assertEqual(len(handled), self.num_clients) + + # Send many messages to many handlers + handled = [] + for message in test_messages: + Thread(target=sender.emit, args=(message,)).start() + timeout = time() + 30 + while len(handled) < self.num_clients and time() < timeout: + sleep(1) + self.assertEqual(len(handled), self.num_clients) + + sender.close() + for client in self.clients: + client.close() + self.clients = [] From 61e5dc4ff5fa58f616c2776e58c8b39bfc6d69e5 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Wed, 22 May 2024 11:13:19 -0700 Subject: [PATCH 2/3] Update tests to troubleshoot GHA failure --- test/unittests/test_client.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/unittests/test_client.py b/test/unittests/test_client.py index 849cd82..561617b 100644 --- a/test/unittests/test_client.py +++ b/test/unittests/test_client.py @@ -288,7 +288,7 @@ def handler(message): client.on("test.message", handler) client.on(f"test.message{i}", handler) test_messages.append(Message(f"test.message{i}", - {"test": secrets.token_hex(1024)}, + {"test": secrets.token_hex(512)}, {"test": secrets.token_hex(512)})) sender = MessageBusClient() @@ -307,7 +307,7 @@ def handler(message): handled = [] for message in test_messages: Thread(target=sender.emit, args=(message,)).start() - timeout = time() + 30 + timeout = time() + 60 while len(handled) < self.num_clients and time() < timeout: sleep(1) self.assertEqual(len(handled), self.num_clients) From 5d5efc085ecae59481851bab9f2063478f155526 Mon Sep 17 00:00:00 2001 From: Daniel McKnight Date: Thu, 27 Jun 2024 17:11:55 -0700 Subject: [PATCH 3/3] Add test coverage for `wait_for_response` --- test/unittests/test_client.py | 38 ++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) diff --git a/test/unittests/test_client.py b/test/unittests/test_client.py index 561617b..0bc0aad 100644 --- a/test/unittests/test_client.py +++ b/test/unittests/test_client.py @@ -226,7 +226,7 @@ def test_message_drop_invalid(self): class TestClientConnections(unittest.TestCase): service_proc: Process = None - num_clients = 128 + num_clients = 256 clients = [] @classmethod @@ -316,3 +316,39 @@ def handler(message): for client in self.clients: client.close() self.clients = [] + + def test_wait_for_response(self): + threads = list() + handlers = list() + + def _handler(message): + self.assertIn(message.data['idx'], message.msg_type) + self.clients[0].emit(message.response()) + + def _await_response(client, idx): + message_type = f"test.message.{idx}" + context = {"test": secrets.token_hex(512)} + resp = client.wait_for_response(Message(message_type, {"idx": idx}, + context)) + self.assertIsInstance(resp, Message) + self.assertEqual(resp.msg_type, f"{message_type}.response") + self.assertEqual(resp.context['test'], context['test']) + + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + message_type = f"test.message.{i}" + handler = Mock(side_effect=_handler) + handlers.append(handler) + client.on(message_type, handler) + + for idx, client in enumerate(self.clients): + t = Thread(target=_await_response, args=(client, idx)) + threads.append(t) + t.start() + + for thread in threads: + thread.join(3) + self.assertFalse(thread.is_alive())