@@ -226,7 +226,7 @@ def test_message_drop_invalid(self):
226
226
227
227
class TestClientConnections (unittest .TestCase ):
228
228
service_proc : Process = None
229
- num_clients = 128
229
+ num_clients = 256
230
230
clients = []
231
231
232
232
@classmethod
@@ -316,3 +316,39 @@ def handler(message):
316
316
for client in self .clients :
317
317
client .close ()
318
318
self .clients = []
319
+
320
+ def test_wait_for_response (self ):
321
+ threads = list ()
322
+ handlers = list ()
323
+
324
+ def _handler (message ):
325
+ self .assertIn (message .data ['idx' ], message .msg_type )
326
+ self .clients [0 ].emit (message .response ())
327
+
328
+ def _await_response (client , idx ):
329
+ message_type = f"test.message.{ idx } "
330
+ context = {"test" : secrets .token_hex (512 )}
331
+ resp = client .wait_for_response (Message (message_type , {"idx" : idx },
332
+ context ))
333
+ self .assertIsInstance (resp , Message )
334
+ self .assertEqual (resp .msg_type , f"{ message_type } .response" )
335
+ self .assertEqual (resp .context ['test' ], context ['test' ])
336
+
337
+ for i in range (self .num_clients ):
338
+ client = MessageBusClient ()
339
+ self .clients .append (client )
340
+ client .run_in_thread ()
341
+ self .assertTrue (client .connected_event .wait (5 ))
342
+ message_type = f"test.message.{ i } "
343
+ handler = Mock (side_effect = _handler )
344
+ handlers .append (handler )
345
+ client .on (message_type , handler )
346
+
347
+ for idx , client in enumerate (self .clients ):
348
+ t = Thread (target = _await_response , args = (client , idx ))
349
+ threads .append (t )
350
+ t .start ()
351
+
352
+ for thread in threads :
353
+ thread .join (3 )
354
+ self .assertFalse (thread .is_alive ())
0 commit comments