diff --git a/test/requirements.txt b/test/requirements.txt index 2225b64..7fe5533 100644 --- a/test/requirements.txt +++ b/test/requirements.txt @@ -1,3 +1,4 @@ pytest pytest-cov -mycroft-messagebus-client \ No newline at end of file +mycroft-messagebus-client +ovos-messagebus \ No newline at end of file diff --git a/test/unittests/test_client.py b/test/unittests/test_client.py index 91f82ae..0bc0aad 100644 --- a/test/unittests/test_client.py +++ b/test/unittests/test_client.py @@ -9,12 +9,17 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import secrets import unittest +from multiprocessing import Process, Event +from threading import Thread + +from time import sleep, time from unittest.mock import call, Mock, patch from pyee import ExecutorEventEmitter +import ovos_messagebus.__main__ from ovos_bus_client.message import Message from ovos_bus_client.client.client import MessageBusClient, GUIWebsocketClient from ovos_bus_client.client import MessageWaiter, MessageCollector @@ -217,3 +222,133 @@ def test_message_drop_invalid(self): collector._receive_response(valid_response) collector._receive_response(invalid_response) assert collector.collect() == [valid_response] + + +class TestClientConnections(unittest.TestCase): + service_proc: Process = None + num_clients = 256 + clients = [] + + @classmethod + def setUpClass(cls): + from ovos_messagebus.__main__ import main + ovos_messagebus.__main__.reset_sigint_handler = Mock() + ready_event = Event() + + def ready(): + ready_event.set() + + cls.service_proc = Process(target=main, args=(ready,)) + cls.service_proc.start() + if not ready_event.wait(10): + raise TimeoutError("Timed out waiting for bus service to start") + + def tearDown(self): + for client in self.clients: + client.close() + self.clients = [] + + @classmethod + def tearDownClass(cls): + cls.service_proc.terminate() + cls.service_proc.join(timeout=5) + cls.service_proc.kill() + + def test_create_clients(self): + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + + self.assertEqual(len(self.clients), self.num_clients) + self.assertTrue(all((client.connected_event.is_set() + for client in self.clients))) + + for client in self.clients: + client.close() + self.assertFalse(client.connected_event.is_set()) + self.clients = [] + + def test_handle_messages(self): + handled = [] + test_messages = [] + + def handler(message): + self.assertIsInstance(message, Message) + self.assertIsInstance(message.data['test'], str) + self.assertIsInstance(message.context['test'], str) + handled.append(message) + + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + client.on("test.message", handler) + client.on(f"test.message{i}", handler) + test_messages.append(Message(f"test.message{i}", + {"test": secrets.token_hex(512)}, + {"test": secrets.token_hex(512)})) + + sender = MessageBusClient() + sender.run_in_thread() + self.assertTrue(sender.connected_event.wait(5)) + + # Send one message to many handlers + test_message = Message("test.message", {"test": ""}, {"test": ""}) + sender.emit(test_message) + timeout = time() + 10 + while len(handled) < self.num_clients and time() < timeout: + sleep(1) + self.assertEqual(len(handled), self.num_clients) + + # Send many messages to many handlers + handled = [] + for message in test_messages: + Thread(target=sender.emit, args=(message,)).start() + timeout = time() + 60 + while len(handled) < self.num_clients and time() < timeout: + sleep(1) + self.assertEqual(len(handled), self.num_clients) + + sender.close() + for client in self.clients: + client.close() + self.clients = [] + + def test_wait_for_response(self): + threads = list() + handlers = list() + + def _handler(message): + self.assertIn(message.data['idx'], message.msg_type) + self.clients[0].emit(message.response()) + + def _await_response(client, idx): + message_type = f"test.message.{idx}" + context = {"test": secrets.token_hex(512)} + resp = client.wait_for_response(Message(message_type, {"idx": idx}, + context)) + self.assertIsInstance(resp, Message) + self.assertEqual(resp.msg_type, f"{message_type}.response") + self.assertEqual(resp.context['test'], context['test']) + + for i in range(self.num_clients): + client = MessageBusClient() + self.clients.append(client) + client.run_in_thread() + self.assertTrue(client.connected_event.wait(5)) + message_type = f"test.message.{i}" + handler = Mock(side_effect=_handler) + handlers.append(handler) + client.on(message_type, handler) + + for idx, client in enumerate(self.clients): + t = Thread(target=_await_response, args=(client, idx)) + threads.append(t) + t.start() + + for thread in threads: + thread.join(3) + self.assertFalse(thread.is_alive())