Skip to content

Commit 01d808b

Browse files
committed
Add TestClientConnections to test spawning multiple clients and concurrent handling of Messages
1 parent 1dadc43 commit 01d808b

File tree

2 files changed

+102
-2
lines changed

2 files changed

+102
-2
lines changed

test/requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
pytest
22
pytest-cov
3-
mycroft-messagebus-client
3+
mycroft-messagebus-client
4+
ovos-messagebus

test/unittests/test_client.py

Lines changed: 100 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,17 @@
99
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1010
# See the License for the specific language governing permissions and
1111
# limitations under the License.
12-
12+
import secrets
1313
import unittest
14+
from multiprocessing import Process, Event
15+
from threading import Thread
16+
17+
from time import sleep, time
1418
from unittest.mock import call, Mock, patch
1519

1620
from pyee import ExecutorEventEmitter
1721

22+
import ovos_messagebus.__main__
1823
from ovos_bus_client.message import Message
1924
from ovos_bus_client.client.client import MessageBusClient, GUIWebsocketClient
2025
from ovos_bus_client.client import MessageWaiter, MessageCollector
@@ -217,3 +222,97 @@ def test_message_drop_invalid(self):
217222
collector._receive_response(valid_response)
218223
collector._receive_response(invalid_response)
219224
assert collector.collect() == [valid_response]
225+
226+
227+
class TestClientConnections(unittest.TestCase):
228+
service_proc: Process = None
229+
num_clients = 128
230+
clients = []
231+
232+
@classmethod
233+
def setUpClass(cls):
234+
from ovos_messagebus.__main__ import main
235+
ovos_messagebus.__main__.reset_sigint_handler = Mock()
236+
ready_event = Event()
237+
238+
def ready():
239+
ready_event.set()
240+
241+
cls.service_proc = Process(target=main, args=(ready,))
242+
cls.service_proc.start()
243+
if not ready_event.wait(10):
244+
raise TimeoutError("Timed out waiting for bus service to start")
245+
246+
def tearDown(self):
247+
for client in self.clients:
248+
client.close()
249+
self.clients = []
250+
251+
@classmethod
252+
def tearDownClass(cls):
253+
cls.service_proc.terminate()
254+
cls.service_proc.join(timeout=5)
255+
cls.service_proc.kill()
256+
257+
def test_create_clients(self):
258+
for i in range(self.num_clients):
259+
client = MessageBusClient()
260+
self.clients.append(client)
261+
client.run_in_thread()
262+
self.assertTrue(client.connected_event.wait(5))
263+
264+
self.assertEqual(len(self.clients), self.num_clients)
265+
self.assertTrue(all((client.connected_event.is_set()
266+
for client in self.clients)))
267+
268+
for client in self.clients:
269+
client.close()
270+
self.assertFalse(client.connected_event.is_set())
271+
self.clients = []
272+
273+
def test_handle_messages(self):
274+
handled = []
275+
test_messages = []
276+
277+
def handler(message):
278+
self.assertIsInstance(message, Message)
279+
self.assertIsInstance(message.data['test'], str)
280+
self.assertIsInstance(message.context['test'], str)
281+
handled.append(message)
282+
283+
for i in range(self.num_clients):
284+
client = MessageBusClient()
285+
self.clients.append(client)
286+
client.run_in_thread()
287+
self.assertTrue(client.connected_event.wait(5))
288+
client.on("test.message", handler)
289+
client.on(f"test.message{i}", handler)
290+
test_messages.append(Message(f"test.message{i}",
291+
{"test": secrets.token_hex(1024)},
292+
{"test": secrets.token_hex(512)}))
293+
294+
sender = MessageBusClient()
295+
sender.run_in_thread()
296+
self.assertTrue(sender.connected_event.wait(5))
297+
298+
# Send one message to many handlers
299+
test_message = Message("test.message", {"test": ""}, {"test": ""})
300+
sender.emit(test_message)
301+
timeout = time() + 10
302+
while len(handled) < self.num_clients and time() < timeout:
303+
sleep(1)
304+
self.assertEqual(len(handled), self.num_clients)
305+
306+
# Send many messages to many handlers
307+
handled = []
308+
for message in test_messages:
309+
Thread(target=sender.emit, args=(message,)).start()
310+
timeout = time() + 30
311+
while len(handled) < self.num_clients and time() < timeout:
312+
sleep(1)
313+
self.assertEqual(len(handled), self.num_clients)
314+
315+
sender.close()
316+
for client in self.clients:
317+
client.close()
318+
self.clients = []

0 commit comments

Comments
 (0)