Skip to content

Add test coverage to diagnose disconnection errors #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion test/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
pytest
pytest-cov
mycroft-messagebus-client
mycroft-messagebus-client
ovos-messagebus
137 changes: 136 additions & 1 deletion test/unittests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,17 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import secrets
import unittest
from multiprocessing import Process, Event
from threading import Thread

from time import sleep, time
from unittest.mock import call, Mock, patch

from pyee import ExecutorEventEmitter

import ovos_messagebus.__main__
from ovos_bus_client.message import Message
from ovos_bus_client.client.client import MessageBusClient, GUIWebsocketClient
from ovos_bus_client.client import MessageWaiter, MessageCollector
Expand Down Expand Up @@ -217,3 +222,133 @@ def test_message_drop_invalid(self):
collector._receive_response(valid_response)
collector._receive_response(invalid_response)
assert collector.collect() == [valid_response]


class TestClientConnections(unittest.TestCase):
service_proc: Process = None
num_clients = 256
clients = []

@classmethod
def setUpClass(cls):
from ovos_messagebus.__main__ import main
ovos_messagebus.__main__.reset_sigint_handler = Mock()
ready_event = Event()

def ready():
ready_event.set()

cls.service_proc = Process(target=main, args=(ready,))
cls.service_proc.start()
if not ready_event.wait(10):
raise TimeoutError("Timed out waiting for bus service to start")

def tearDown(self):
for client in self.clients:
client.close()
self.clients = []

@classmethod
def tearDownClass(cls):
cls.service_proc.terminate()
cls.service_proc.join(timeout=5)
cls.service_proc.kill()

def test_create_clients(self):
for i in range(self.num_clients):
client = MessageBusClient()
self.clients.append(client)
client.run_in_thread()
self.assertTrue(client.connected_event.wait(5))

self.assertEqual(len(self.clients), self.num_clients)
self.assertTrue(all((client.connected_event.is_set()
for client in self.clients)))

for client in self.clients:
client.close()
self.assertFalse(client.connected_event.is_set())
self.clients = []

def test_handle_messages(self):
handled = []
test_messages = []

def handler(message):
self.assertIsInstance(message, Message)
self.assertIsInstance(message.data['test'], str)
self.assertIsInstance(message.context['test'], str)
handled.append(message)

for i in range(self.num_clients):
client = MessageBusClient()
self.clients.append(client)
client.run_in_thread()
self.assertTrue(client.connected_event.wait(5))
client.on("test.message", handler)
client.on(f"test.message{i}", handler)
test_messages.append(Message(f"test.message{i}",
{"test": secrets.token_hex(512)},
{"test": secrets.token_hex(512)}))

sender = MessageBusClient()
sender.run_in_thread()
self.assertTrue(sender.connected_event.wait(5))

# Send one message to many handlers
test_message = Message("test.message", {"test": ""}, {"test": ""})
sender.emit(test_message)
timeout = time() + 10
while len(handled) < self.num_clients and time() < timeout:
sleep(1)
self.assertEqual(len(handled), self.num_clients)

# Send many messages to many handlers
handled = []
for message in test_messages:
Thread(target=sender.emit, args=(message,)).start()
timeout = time() + 60
while len(handled) < self.num_clients and time() < timeout:
sleep(1)
self.assertEqual(len(handled), self.num_clients)

sender.close()
for client in self.clients:
client.close()
self.clients = []

def test_wait_for_response(self):
threads = list()
handlers = list()

def _handler(message):
self.assertIn(message.data['idx'], message.msg_type)
self.clients[0].emit(message.response())

def _await_response(client, idx):
message_type = f"test.message.{idx}"
context = {"test": secrets.token_hex(512)}
resp = client.wait_for_response(Message(message_type, {"idx": idx},
context))
self.assertIsInstance(resp, Message)
self.assertEqual(resp.msg_type, f"{message_type}.response")
self.assertEqual(resp.context['test'], context['test'])

for i in range(self.num_clients):
client = MessageBusClient()
self.clients.append(client)
client.run_in_thread()
self.assertTrue(client.connected_event.wait(5))
message_type = f"test.message.{i}"
handler = Mock(side_effect=_handler)
handlers.append(handler)
client.on(message_type, handler)

for idx, client in enumerate(self.clients):
t = Thread(target=_await_response, args=(client, idx))
threads.append(t)
t.start()

for thread in threads:
thread.join(3)
self.assertFalse(thread.is_alive())
Loading