Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions src/ipping/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def __init__(
self.on_error_received = on_error_received

self.expected_packets: WeakValueDictionary[
Tuple[int, int], 'asyncio.Future[Tuple[int, Addr]]'
Tuple[int, int], 'asyncio.Future[Tuple[int, int, Addr]]'
] = WeakValueDictionary()

self.transport: asyncio.DatagramTransport = None # type: ignore
Expand All @@ -33,7 +33,7 @@ def ping_request(
client_id: int,
packet_id: int,
payload_size: int,
response_future: 'asyncio.Future[Tuple[int, Addr]]',
response_future: 'asyncio.Future[Tuple[int, int, Addr]]',
) -> None:
self.expected_packets[(client_id, packet_id)] = response_future
frame = pack_frame(client_id, packet_id, payload_size)
Expand All @@ -48,7 +48,7 @@ def connection_lost(self, exc: Optional[Exception]) -> None:

def datagram_received(self, data: bytes, addr: Addr) -> None:
client_id, packet_id, payload_size = unpack_frame(data)
self.expected_packets[(client_id, packet_id)].set_result((payload_size, addr))
self.expected_packets[(client_id, packet_id)].set_result((packet_id, payload_size, addr))

def error_received(self, exc: Exception) -> None:
print(f'Request error: {exc}')
Expand Down Expand Up @@ -99,7 +99,7 @@ async def connect(self) -> None:
async def stop(self) -> None:
self.transport.close()

async def ping_request(self) -> Tuple[int, int, Addr]:
async def ping_request(self) -> Tuple[int, int, int, Addr]:
loop = self.loop or asyncio.get_event_loop()

packet_id = self.packet_counter
Expand Down Expand Up @@ -142,15 +142,21 @@ async def start_udp_client(
received = 0
try:
while run_infite or transmitted < count:
async def proc() -> None:
nonlocal received
try:
round_trip, packet_id, ret_payload_size, addr = await asyncio.wait_for(
udp_client.ping_request(), timeout)
except asyncio.TimeoutError:
print('Request timeout')
else:
received += 1
round_trips.append(round_trip)
print(f'{ret_payload_size + HEADER_SIZE} bytes from {addr[0]}:{addr[1]}: '
f'seq={packet_id} time={round_trip:.3f} ms')

transmitted += 1
try:
round_trip, ret_payload_size, addr = await asyncio.wait_for(udp_client.ping_request(), timeout)
except asyncio.TimeoutError:
print('Request timeout')
else:
received += 1
round_trips.append(round_trip)
print(f'{ret_payload_size + HEADER_SIZE} bytes from {addr[0]}:{addr[1]}: time={round_trip:.3f} ms')
asyncio.ensure_future(proc())
await asyncio.sleep(wait)
except asyncio.CancelledError:
await udp_client.stop()
Expand Down