Skip to content

Commit ee051ac

Browse files
committed
use current aiomqtt conventions
This appears to address kubealex#3 in as much as it works and the warning I reported there is silenced. Note that this patch contains the PR I submitted in kubealex#4. closes kubealex#3
1 parent f8d48ea commit ee051ac

File tree

1 file changed

+17
-18
lines changed

1 file changed

+17
-18
lines changed

plugins/event_source/mqtt.py

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,29 +56,28 @@ async def main(queue: asyncio.Queue, args: Dict[str, Any]) -> None:
5656
cert_reqs=validate_certs if validate_certs is not None else True,
5757
)
5858

59-
mqtt_consumer = aiomqtt.Client(
59+
async with aiomqtt.Client(
6060
hostname=host,
6161
port=port,
6262
username=username,
6363
password=password,
6464
tls_params=tls_params if ca_certs else None,
65-
)
66-
67-
await mqtt_consumer.connect()
68-
69-
try:
70-
async with mqtt_consumer.messages() as messages:
71-
await mqtt_consumer.subscribe(topic)
72-
async for message in messages:
73-
try:
74-
data = json.loads(message.payload.decode())
75-
await queue.put(data)
76-
except json.decoder.JSONDecodeError:
77-
logger.exception("Decoding exception for incoming message")
78-
finally:
79-
logger.info("Disconneccting from broker")
80-
mqtt_consumer.disconnect()
81-
65+
) as mqtt_consumer:
66+
67+
try:
68+
async with mqtt_consumer.messages() as messages:
69+
await mqtt_consumer.subscribe(topic)
70+
async for message in messages:
71+
try:
72+
try:
73+
data = json.loads(message.payload.decode())
74+
except json.decoder.JSONDecodeError:
75+
data = dict(payload=message.payload.decode())
76+
await queue.put(data)
77+
except json.decoder.JSONDecodeError:
78+
logger.exception("Decoding exception for incoming message")
79+
finally:
80+
logger.info("Disconneccting from broker")
8281

8382
if __name__ == "__main__":
8483
"""MockQueue if running directly."""

0 commit comments

Comments
 (0)