Skip to content
This repository was archived by the owner on Aug 21, 2025. It is now read-only.

Commit 93bf498

Browse files
committed
add: [send] listener on a msg ref
1 parent e5bd333 commit 93bf498

File tree

5 files changed

+48
-18
lines changed

5 files changed

+48
-18
lines changed

poetry.lock

Lines changed: 12 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ python = "^3.8"
99
websockets = "^11.0"
1010
python-dateutil = "^2.8.1"
1111
typing-extensions = "^4.2.0"
12+
uuid = "^1.30"
1213

1314
[tool.poetry.dev-dependencies]
1415
pytest = "^7.2.0"

realtime/channel.py

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import asyncio
44
import json
5+
import uuid
56
from typing import Any, List, Dict, TYPE_CHECKING, NamedTuple
67
from realtime.message import *
78

@@ -14,13 +15,15 @@
1415
class CallbackListener(NamedTuple):
1516
"""A tuple with `event` and `callback` """
1617
event: str
18+
ref: str
1719
callback: Callback
1820

1921

2022
class Channel:
2123
"""
2224
`Channel` is an abstraction for a topic listener for an existing socket connection.
2325
Each Channel has its own topic and a list of event-callbacks that responds to messages.
26+
A client can also send messages to a channel and register callback when expecting replies.
2427
Should only be instantiated through `connection.Socket().set_channel(topic)`
2528
Topic-Channel has a 1-many relationship.
2629
"""
@@ -36,6 +39,8 @@ def __init__(self, socket: Socket, topic: str, params: Dict[str, Any] = {}) -> N
3639
self.topic = topic
3740
self.listeners: List[CallbackListener] = []
3841
self.joined = False
42+
self.join_ref = str(uuid.uuid4())
43+
self.join_msg_ref = str(uuid.uuid4())
3944

4045
def join(self) -> Channel:
4146
"""
@@ -56,7 +61,8 @@ async def _join(self) -> None:
5661
join_req = dict(topic=self.topic, event=ChannelEvents.join,
5762
payload={}, ref=None)
5863
elif self.socket.version == 2:
59-
join_req = [None, None, self.topic, ChannelEvents.join, {}]
64+
#[join_reference, message_reference, topic_name, event_name, payload]
65+
join_req = [self.join_ref, self.join_msg_ref, self.topic, ChannelEvents.join, {}]
6066

6167
try:
6268
await self.socket.ws_connection.send(json.dumps(join_req))
@@ -83,60 +89,64 @@ async def _leave(self) -> None:
8389
join_req = dict(topic=self.topic, event=ChannelEvents.leave,
8490
payload={}, ref=None)
8591
elif self.socket.version == 2:
86-
join_req = [None, None, self.topic, ChannelEvents.leave, {}]
92+
join_req = [self.join_ref, None, self.topic, ChannelEvents.leave, {}]
8793

8894
try:
8995
await self.socket.ws_connection.send(json.dumps(join_req))
9096
except Exception as e:
9197
print(str(e)) # TODO: better error propagation
9298
return
9399

94-
def on(self, event: str, callback: Callback) -> Channel:
100+
def on(self, event: str, ref: str, callback: Callback) -> Channel:
95101
"""
96102
:param event: A specific event will have a specific callback
103+
:param ref: A specific reference that will have a specific callback
97104
:param callback: Callback that takes msg payload as its first argument
98105
:return: Channel
99106
"""
100-
cl = CallbackListener(event=event, callback=callback)
107+
cl = CallbackListener(event=event, ref=ref, callback=callback)
101108
self.listeners.append(cl)
102109
return self
103110

104-
def off(self, event: str) -> None:
111+
def off(self, event: str, ref: str) -> None:
105112
"""
106113
:param event: Stop responding to a certain event
114+
:param event: Stop responding to a certain reference
107115
:return: None
108116
"""
109117
self.listeners = [
110-
callback for callback in self.listeners if callback.event != event]
118+
callback for callback in self.listeners if (callback.event != event and callback.ref != ref)]
111119

112-
def send(self, event_name: str, payload:str) -> None:
120+
def send(self, event_name: str, payload:str, ref: uuid = str(uuid.uuid4())) -> None:
113121
"""
114122
Wrapper for async def _send() to expose a non-async interface
115123
Essentially gets the only event loop and attempt sending a payload
116124
to a topic
117125
:param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module.
118126
:param payload: The payload to be sent to the phoenix server
127+
:param ref: The message reference that the server will use for replying - if none is set, generates the string repr of a uuidv4
119128
:return: None
120129
"""
121130
loop = asyncio.get_event_loop() # TODO: replace with get_running_loop
122-
loop.run_until_complete(self._send(event_name, payload))
131+
loop.run_until_complete(self._send(event_name, payload, ref))
123132
return self
124133

125-
async def _send(self, event_name: str, payload:str) -> None:
134+
async def _send(self, event_name: str, payload: str, ref: str) -> None:
126135
"""
127136
Coroutine that attempts to join Phoenix Realtime server via a certain topic
128137
:param event_name: The event_name: it must match the first argument of a handle_in function on the server channel module.
129138
:param payload: The payload to be sent to the phoenix server
139+
:param ref: The message reference that the server will use for replying
130140
:return: None
131141
"""
132142
if self.socket.version == 1:
133143
msg = dict(topic=self.topic, event=event_name,
134144
payload=payload, ref=None)
135145
elif self.socket.version == 2:
136-
msg = [3, 3, self.topic, event_name, payload]
146+
msg = [None, ref, self.topic, event_name, payload]
137147

138148
try:
139149
await self.socket.ws_connection.send(json.dumps(msg))
140150
except Exception as e:
141151
print(str(e)) # TODO: better error propagation
142-
return
152+
return

realtime/connection.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import asyncio
22
import json
33
import logging
4+
import pdb
45
from collections import defaultdict
56
from functools import wraps
67
from typing import Any, Callable, List, Dict, TypeVar, DefaultDict
@@ -76,15 +77,23 @@ async def _listen(self) -> None:
7677
msg = Message(**json.loads(msg))
7778
elif self.version == 2:
7879
msg_array = json.loads(msg)
79-
msg = Message(chanid=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4])
80+
msg = Message(join_ref=msg_array[0], ref= msg_array[1], topic=msg_array[2], event= msg_array[3], payload= msg_array[4])
8081

8182
if msg.event == ChannelEvents.reply:
82-
continue
83+
for channel in self.channels.get(msg.topic, []):
84+
if msg.ref == channel.join_msg_ref :
85+
logging.info(f"Successfully joined {msg.topic}")
86+
continue
87+
else:
88+
for cl in channel.listeners:
89+
if cl.ref in ["*", msg.ref]:
90+
cl.callback(msg.payload)
8391

8492
for channel in self.channels.get(msg.topic, []):
8593
for cl in channel.listeners:
8694
if cl.event in ["*", msg.event]:
8795
cl.callback(msg.payload)
96+
8897
except websockets.exceptions.ConnectionClosed:
8998
if self.auto_reconnect:
9099
logging.info("Connection with server closed, trying to reconnect...")
@@ -155,10 +164,10 @@ def set_channel(self, topic: str) -> Channel:
155164

156165
def summary(self) -> None:
157166
"""
158-
Prints a list of topics and event the socket is listening to
167+
Prints a list of topics and event, and reference that the socket is listening to
159168
:return: None
160169
"""
161170
for topic, chans in self.channels.items():
162171
for chan in chans:
163172
print(
164-
f"Topic: {topic} | Events: {[e for e, _ in chan.listeners]}]")
173+
f"Topic: {topic} | Events: {[e for e, _, _ in chan.listeners]} | References: {[r for _, r, _ in chan.listeners]}]")

realtime/message.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ class Message:
1111
event: str
1212
payload: Dict[str, Any]
1313
ref: Any
14-
chanid: Any
14+
join_ref: Any
1515
topic: str
1616

1717
def __hash__(self):

0 commit comments

Comments
 (0)