3
3
from time import time
4
4
from typing import Any , Dict , List , Optional
5
5
from grpclib import GRPCError , Status
6
+ from google .protobuf .struct_pb2 import Struct
6
7
7
8
from grpclib .client import Channel
8
9
import viam
19
20
StreamEventsResponse ,
20
21
TriggerEventRequest ,
21
22
)
23
+ from viam .utils import dict_to_struct
22
24
23
25
from .input import Control , ControlFunction , Controller , Event , EventType
24
26
@@ -36,19 +38,29 @@ def __init__(self, name: str, channel: Channel):
36
38
self ._stream_lock = Lock ()
37
39
self ._is_streaming = False
38
40
self ._is_stream_ready = False
41
+ self ._callback_extra : Struct = dict_to_struct ({})
39
42
super ().__init__ (name )
40
43
41
- async def get_controls (self , * , timeout : Optional [float ] = None ) -> List [Control ]:
42
- request = GetControlsRequest (controller = self .name )
44
+ async def get_controls (self , * , extra : Optional [Dict [str , Any ]] = None , timeout : Optional [float ] = None ) -> List [Control ]:
45
+ if extra is None :
46
+ extra = {}
47
+ request = GetControlsRequest (controller = self .name , extra = dict_to_struct (extra ))
43
48
response : GetControlsResponse = await self .client .GetControls (request , timeout = timeout )
44
49
return [Control (control ) for control in response .controls ]
45
50
46
- async def get_events (self , * , timeout : Optional [float ] = None ) -> Dict [Control , Event ]:
47
- request = GetEventsRequest (controller = self .name )
51
+ async def get_events (self , * , extra : Optional [Dict [str , Any ]] = None , timeout : Optional [float ] = None ) -> Dict [Control , Event ]:
52
+ if extra is None :
53
+ extra = {}
54
+ request = GetEventsRequest (controller = self .name , extra = dict_to_struct (extra ))
48
55
response : GetEventsResponse = await self .client .GetEvents (request , timeout = timeout )
49
56
return {Control (event .control ): Event .from_proto (event ) for (event ) in response .events }
50
57
51
- def register_control_callback (self , control : Control , triggers : List [EventType ], function : Optional [ControlFunction ]):
58
+ def register_control_callback (
59
+ self , control : Control , triggers : List [EventType ], function : Optional [ControlFunction ], extra : Optional [Dict [str , Any ]] = None
60
+ ):
61
+ if extra is None :
62
+ extra = {}
63
+ self ._callback_extra = dict_to_struct (extra )
52
64
with self ._lock :
53
65
callbacks = self .callbacks .get (control , {})
54
66
for trigger in triggers :
@@ -71,8 +83,10 @@ def handle_task_result(task: asyncio.Task):
71
83
task = asyncio .create_task (self ._stream_events (), name = f"{ viam ._TASK_PREFIX } -input_stream_events" )
72
84
task .add_done_callback (handle_task_result )
73
85
74
- async def trigger_event (self , event : Event , * , timeout : Optional [float ] = None ):
75
- request = TriggerEventRequest (controller = self .name , event = event .proto )
86
+ async def trigger_event (self , event : Event , * , extra : Optional [Dict [str , Any ]] = None , timeout : Optional [float ] = None ):
87
+ if extra is None :
88
+ extra = {}
89
+ request = TriggerEventRequest (controller = self .name , event = event .proto , extra = dict_to_struct (extra ))
76
90
try :
77
91
await self .client .TriggerEvent (request , timeout = timeout )
78
92
except GRPCError as e :
@@ -88,7 +102,7 @@ async def _stream_events(self):
88
102
if not self .callbacks :
89
103
return
90
104
91
- request = StreamEventsRequest (controller = self .name , events = [])
105
+ request = StreamEventsRequest (controller = self .name , events = [], extra = self . _callback_extra )
92
106
with self ._lock :
93
107
for (control , callbacks ) in self .callbacks .items ():
94
108
event = StreamEventsRequest .Events (
0 commit comments