From f9c64fd995605dbf42bc64502df538c28a939b72 Mon Sep 17 00:00:00 2001 From: vickash Date: Sat, 22 Jul 2023 13:32:58 -0400 Subject: [PATCH 1/3] Make message receiving non-blocking. Happens inside event handler --- README.md | 17 ++++++++-- mrblib/mrb_esp32_mqtt.rb | 22 +++++++++++++ src/mrb_esp32_mqtt.c | 70 ++++++++++++++++++++-------------------- 3 files changed, 71 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index b20c5fe..0a40e34 100644 --- a/README.md +++ b/README.md @@ -52,11 +52,22 @@ Publish message to topic. mqtt.publish("topic", 'message') ``` -Subscribe to topic and get message. +Subscribe to one or more topics to receive messages. Received messages are event driven. For each message, the main mruby task pauses temporarily, allowing the block given for the receiving topic to run. ```ruby -mqtt.subscribe("topic") -topic, message = mqtt.get +mqtt.subscribe("topic1") do |message| + puts "Received from topic1: #{message}" +end + +mqtt.on_message_from("topic2") do |message| + puts "New message from topic2: #{message}" +end +mqtt.subscribe("topic2") + +loop do + # Do whatever in your main loop. + ESP32::System.delay(1000) +end ``` Disconnect. diff --git a/mrblib/mrb_esp32_mqtt.rb b/mrblib/mrb_esp32_mqtt.rb index 7b9c122..b52d057 100644 --- a/mrblib/mrb_esp32_mqtt.rb +++ b/mrblib/mrb_esp32_mqtt.rb @@ -2,6 +2,28 @@ module ESP32 module MQTT class Client attr_accessor :ca, :cert, :key + + def initialize(host, port) + @callbacks = {} + + # C calls the block given here with every received message. + self._initialize(host, port) do |topic, message| + @callbacks[topic].call(message) if @callbacks[topic] + end + end + + def subscribe(topic, &block) + @callbacks[topic] = block if block + self._subscribe(topic) + end + + def on_message_from(topic, &block) + @callbacks[topic] = block if block + end + + def update(topic, message) + @callbacks[topic].call(message) if @callbacks[topic] + end end end end diff --git a/src/mrb_esp32_mqtt.c b/src/mrb_esp32_mqtt.c index 93a6cf4..b5cfc7c 100644 --- a/src/mrb_esp32_mqtt.c +++ b/src/mrb_esp32_mqtt.c @@ -31,8 +31,32 @@ typedef struct mqtt_client_t { mrb_bool ssl; esp_mqtt_client_handle_t client; QueueHandle_t queue; + TaskHandle_t main_task_handle; + mrb_value message_proc; } mqtt_client_t; +static void +mqtt_message_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Suspend main task. + vTaskSuspend(client->main_task_handle); + int arena_index = mrb_gc_arena_save(client->mrb); + + // Check message_proc is a a proc? + mrb_assert(mrb_type(client->message_proc) == MRB_TT_PROC); + + // Prep arguments to pass. + mrb_value args[2]; + args[0] = mrb_str_new_static(client->mrb, event->topic, event->topic_len); + args[1] = mrb_str_new_static(client->mrb, event->data, event->data_len); + + // Call message_proc. + mrb_yield_argv(client->mrb, client->message_proc, 2, &args[0]); + + // Resume main task. + mrb_gc_arena_restore(client->mrb, arena_index); + vTaskResume(client->main_task_handle); +} + static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); @@ -61,7 +85,7 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - xQueueSend(client->queue, event_data, (TickType_t)0); + mqtt_message_handler(client, event); break; case MQTT_EVENT_ERROR: ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); @@ -88,17 +112,6 @@ mqtt_wait_for_event(mrb_state *mrb, mrb_value self, int32_t event_id) { mrb_raise(mrb, error_class, "Timeout wait for mqtt event."); } -static void -mqtt_wait_for_data(mrb_state *mrb, mrb_value self, esp_mqtt_event_t *event) { - mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); - - while(true) { - if(xQueueReceive(client->queue, (void*)event, (TickType_t)(1000 / portTICK_PERIOD_MS))) { - if(event->event_id == MQTT_EVENT_DATA) break; - } - } -} - static void mrb_mqtt_client_free(mrb_state *mrb, void *p) { mqtt_client_t *client = (mqtt_client_t *)p; @@ -110,13 +123,14 @@ mrb_mqtt_client_free(mrb_state *mrb, void *p) { } static mrb_value -mrb_mqtt_client_init(mrb_state *mrb, mrb_value self) { +mrb_mqtt_client_initialize(mrb_state *mrb, mrb_value self) { mqtt_client_t *client = mrb_malloc(mrb, sizeof(mqtt_client_t)); mrb_value host; mrb_int port; + mrb_value block; - mrb_get_args(mrb, "Si", &host, &port); + mrb_get_args(mrb, "Si&", &host, &port, &block); client->mrb = mrb; client->host = mrb_malloc(mrb, strlen(mrb_str_to_cstr(mrb, host))); @@ -125,6 +139,11 @@ mrb_mqtt_client_init(mrb_state *mrb, mrb_value self) { client->ssl = FALSE; client->queue = xQueueCreate(WAIT_EVENT_QUEUE_LEN, sizeof(esp_mqtt_event_t)); + // Save block given and main task for handling incoming messages. + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@message_proc"), block); + client->message_proc = block; + client->main_task_handle = xTaskGetCurrentTaskHandle(); + mrb_data_init(self, client, &mrb_mqtt_client); ESP_LOGI(TAG, "initialize(%s, %d)", client->host, client->port); @@ -270,24 +289,6 @@ mrb_mqtt_client_unsubscribe(mrb_state *mrb, mrb_value self) { return self; } -static mrb_value -mrb_mqtt_client_get(mrb_state *mrb, mrb_value self) { - esp_mqtt_event_t event; - - mqtt_wait_for_data(mrb, self, &event); - - mrb_value topic = mrb_str_new_static(mrb, event.topic, event.topic_len); - mrb_value message = mrb_str_new_static(mrb, event.data, event.data_len); - - mrb_value ary = mrb_ary_new(mrb); - mrb_ary_push(mrb, ary, topic); - mrb_ary_push(mrb, ary, message); - - ESP_LOGI(TAG, "get()"); - - return ary; -} - static mrb_value mrb_mqtt_client_disconnect(mrb_state *mrb, mrb_value self) { mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); @@ -312,13 +313,12 @@ mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) { struct RClass *mqtt_module = mrb_define_module_under(mrb, esp32_module, "MQTT"); struct RClass *client_class = mrb_define_class_under(mrb, mqtt_module, "Client", mrb->object_class); - mrb_define_method(mrb, client_class, "initialize", mrb_mqtt_client_init, MRB_ARGS_REQ(2)); + mrb_define_method(mrb, client_class, "_initialize", mrb_mqtt_client_initialize, MRB_ARGS_REQ(2)|MRB_ARGS_BLOCK()); mrb_define_method(mrb, client_class, "ssl=", mrb_mqtt_client_set_ssl, MRB_ARGS_REQ(1)); mrb_define_method(mrb, client_class, "connect", mrb_mqtt_client_connect, MRB_ARGS_NONE()); mrb_define_method(mrb, client_class, "publish", mrb_mqtt_client_publish, MRB_ARGS_REQ(2)); - mrb_define_method(mrb, client_class, "subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1)); + mrb_define_method(mrb, client_class, "_subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1)); mrb_define_method(mrb, client_class, "unsubscribe", mrb_mqtt_client_unsubscribe, MRB_ARGS_REQ(1)); - mrb_define_method(mrb, client_class, "get", mrb_mqtt_client_get, MRB_ARGS_NONE()); mrb_define_method(mrb, client_class, "disconnect", mrb_mqtt_client_disconnect, MRB_ARGS_NONE()); mrb_define_class_under(mrb, mqtt_module, "TimeoutError", mrb->eStandardError_class); From 1d6db52c261bfec4de5e3017a5d5ec6ee6aff4b0 Mon Sep 17 00:00:00 2001 From: vickash Date: Sat, 22 Jul 2023 19:58:12 -0400 Subject: [PATCH 2/3] Remove unused #update method --- mrblib/mrb_esp32_mqtt.rb | 4 ---- 1 file changed, 4 deletions(-) diff --git a/mrblib/mrb_esp32_mqtt.rb b/mrblib/mrb_esp32_mqtt.rb index b52d057..a10824b 100644 --- a/mrblib/mrb_esp32_mqtt.rb +++ b/mrblib/mrb_esp32_mqtt.rb @@ -20,10 +20,6 @@ def subscribe(topic, &block) def on_message_from(topic, &block) @callbacks[topic] = block if block end - - def update(topic, message) - @callbacks[topic].call(message) if @callbacks[topic] - end end end end From 6ecb48d270f86ada53a90754aad0a59368f75ed5 Mon Sep 17 00:00:00 2001 From: vickash Date: Mon, 24 Jul 2023 12:47:34 -0400 Subject: [PATCH 3/3] Make the whole thing event driven --- README.md | 3 + mrblib/mrb_esp32_mqtt.rb | 49 +++++++++-- src/mrb_esp32_mqtt.c | 170 ++++++++++++++++++++++++++------------- 3 files changed, 158 insertions(+), 64 deletions(-) diff --git a/README.md b/README.md index 0a40e34..fa50374 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,9 @@ mqtt.connect Publish message to topic. ```ruby +while !mqtt.connected do + ESP32::System.delay(100) +end mqtt.publish("topic", 'message') ``` diff --git a/mrblib/mrb_esp32_mqtt.rb b/mrblib/mrb_esp32_mqtt.rb index a10824b..33827ea 100644 --- a/mrblib/mrb_esp32_mqtt.rb +++ b/mrblib/mrb_esp32_mqtt.rb @@ -1,24 +1,57 @@ module ESP32 module MQTT class Client - attr_accessor :ca, :cert, :key + attr_accessor :ca, :cert, :key, :connected def initialize(host, port) - @callbacks = {} + self._initialize(host, port) - # C calls the block given here with every received message. - self._initialize(host, port) do |topic, message| - @callbacks[topic].call(message) if @callbacks[topic] + @connected = false + @connect_callbacks = [] + @message_callbacks = {} + + self.set_connected_handler do + @connected = true + @connect_callbacks.each { |cb| cb.call } + @connect_callbacks = [] + end + + self.set_disconnected_handler do + @connected = false + end + + self.set_unsubscribed_handler do |topic| + @message_callbacks[topic] = nil + end + + # C calls this block with every received message. + self.set_data_handler do |topic, message| + @message_callbacks[topic].call(message) if @message_callbacks[topic] end end def subscribe(topic, &block) - @callbacks[topic] = block if block - self._subscribe(topic) + @message_callbacks[topic] = block if block + + # Take semaphore + + if @connected + self._subscribe(topic) + else + self.on_connect do + self._subscribe(topic) + end + end + + # Release semaphore end + def on_connect(&block) + @connect_callbacks << block + end + def on_message_from(topic, &block) - @callbacks[topic] = block if block + @message_callbacks[topic] = block end end end diff --git a/src/mrb_esp32_mqtt.c b/src/mrb_esp32_mqtt.c index b5cfc7c..2563cd2 100644 --- a/src/mrb_esp32_mqtt.c +++ b/src/mrb_esp32_mqtt.c @@ -13,10 +13,7 @@ #include "esp_log.h" #include "mqtt_client.h" - #define TAG ("mruby-esp32-mqtt") -#define WAIT_EVENT_TIMEOUT_SEC (20) -#define WAIT_EVENT_QUEUE_LEN (10) static void mrb_mqtt_client_free(mrb_state *mrb, void *p); @@ -30,31 +27,62 @@ typedef struct mqtt_client_t { mrb_int port; mrb_bool ssl; esp_mqtt_client_handle_t client; - QueueHandle_t queue; - TaskHandle_t main_task_handle; - mrb_value message_proc; + + TaskHandle_t mruby_task_handle; + mrb_value connected_proc; + mrb_value disconnected_proc; + mrb_value unsubscribed_proc; + mrb_value data_proc; } mqtt_client_t; + +static void +mqtt_connected_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. + + // Call @connected_proc. + mrb_assert(mrb_type(client->connected_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->connected_proc, 0, NULL); + + // Release semaphore. +} + +static void +mqtt_disconnected_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. + + // Call @disconnected_proc. + mrb_assert(mrb_type(client->disconnected_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->disconnected_proc, 0, NULL); + + // Release semaphore. +} + static void -mqtt_message_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { - // Suspend main task. - vTaskSuspend(client->main_task_handle); - int arena_index = mrb_gc_arena_save(client->mrb); +mqtt_unsubscribed_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. - // Check message_proc is a a proc? - mrb_assert(mrb_type(client->message_proc) == MRB_TT_PROC); + // Call @unsubscribed_proc. + mrb_assert(mrb_type(client->unsubscribed_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->unsubscribed_proc, 0, NULL); + + // Release semaphore. +} + +static void +mqtt_data_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. // Prep arguments to pass. mrb_value args[2]; args[0] = mrb_str_new_static(client->mrb, event->topic, event->topic_len); args[1] = mrb_str_new_static(client->mrb, event->data, event->data_len); - // Call message_proc. - mrb_yield_argv(client->mrb, client->message_proc, 2, &args[0]); + // Call @data_proc + mrb_assert(mrb_type(client->data_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->data_proc, 2, &args[0]); - // Resume main task. - mrb_gc_arena_restore(client->mrb, arena_index); - vTaskResume(client->main_task_handle); + // Release semaphore. } static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_id, void *event_data) @@ -64,31 +92,33 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i esp_mqtt_event_handle_t event = event_data; switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + break; case MQTT_EVENT_CONNECTED: - ESP_LOGD(TAG, "MQTT_EVENT_CONNECTED"); - xQueueSend(client->queue, event_data, (TickType_t)0); + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + mqtt_connected_handler(client, event); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - xQueueSend(client->queue, event_data, (TickType_t)0); + mqtt_disconnected_handler(client, event); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - xQueueSend(client->queue, event_data, (TickType_t)0); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); - xQueueSend(client->queue, event_data, (TickType_t)0); + mqtt_unsubscribed_handler(client, event); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - mqtt_message_handler(client, event); + mqtt_data_handler(client, event); break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + case MQTT_EVENT_BEFORE_CONNECT: + ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); @@ -96,22 +126,6 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i } } -static void -mqtt_wait_for_event(mrb_state *mrb, mrb_value self, int32_t event_id) { - mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); - esp_mqtt_event_t event; - int wait_count; - struct RClass* error_class; - - for(wait_count = 0 ; wait_count < WAIT_EVENT_TIMEOUT_SEC ; wait_count++) { - if(xQueueReceive(client->queue, (void*)&event, (TickType_t)(1000 / portTICK_PERIOD_MS))) { - if(event.event_id == event_id) return; - } - } - error_class = mrb_exc_get_id(mrb, MRB_ERROR_SYM(ESP32::MQTT::TimeoutError)); - mrb_raise(mrb, error_class, "Timeout wait for mqtt event."); -} - static void mrb_mqtt_client_free(mrb_state *mrb, void *p) { mqtt_client_t *client = (mqtt_client_t *)p; @@ -128,21 +142,14 @@ mrb_mqtt_client_initialize(mrb_state *mrb, mrb_value self) { mrb_value host; mrb_int port; - mrb_value block; - - mrb_get_args(mrb, "Si&", &host, &port, &block); + mrb_get_args(mrb, "Si", &host, &port); + client->mruby_task_handle = xTaskGetCurrentTaskHandle(); client->mrb = mrb; client->host = mrb_malloc(mrb, strlen(mrb_str_to_cstr(mrb, host))); strcpy(client->host, mrb_str_to_cstr(mrb, host)); client->port = port; client->ssl = FALSE; - client->queue = xQueueCreate(WAIT_EVENT_QUEUE_LEN, sizeof(esp_mqtt_event_t)); - - // Save block given and main task for handling incoming messages. - mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@message_proc"), block); - client->message_proc = block; - client->main_task_handle = xTaskGetCurrentTaskHandle(); mrb_data_init(self, client, &mrb_mqtt_client); ESP_LOGI(TAG, "initialize(%s, %d)", client->host, client->port); @@ -199,7 +206,6 @@ mrb_mqtt_client_connect(mrb_state *mrb, mrb_value self) { client->client = mqtt_client; - mqtt_wait_for_event(mrb, self, MQTT_EVENT_CONNECTED); ESP_LOGI( TAG, "connect(%s://%s:%d)", @@ -245,7 +251,6 @@ mrb_mqtt_client_subscribe(mrb_state *mrb, mrb_value self) { struct RClass* error_class; mrb_value topic; - mrb_get_args(mrb, "S", &topic); ret = esp_mqtt_client_subscribe( @@ -258,7 +263,6 @@ mrb_mqtt_client_subscribe(mrb_state *mrb, mrb_value self) { mrb_raise(mrb, error_class, "Failed to subscribe."); return self; } - mqtt_wait_for_event(mrb, self, MQTT_EVENT_SUBSCRIBED); ESP_LOGI(TAG, "subscribe(%s)", mrb_str_to_cstr(mrb, topic)); return self; @@ -283,7 +287,6 @@ mrb_mqtt_client_unsubscribe(mrb_state *mrb, mrb_value self) { mrb_raise(mrb, error_class, "Failed to unsubscribe."); return self; } - mqtt_wait_for_event(mrb, self, MQTT_EVENT_UNSUBSCRIBED); ESP_LOGI(TAG, "unsubscribe(%s)", mrb_str_to_cstr(mrb, topic)); return self; @@ -301,12 +304,63 @@ mrb_mqtt_client_disconnect(mrb_state *mrb, mrb_value self) { mrb_raise(mrb, error_class, "Failed to disconnect."); return self; } - mqtt_wait_for_event(mrb, self, MQTT_EVENT_DISCONNECTED); ESP_LOGI(TAG, "disconnect"); return self; } +static mrb_value +mrb_mqtt_client_set_connected_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@connected_proc"), block); + client->connected_proc = block; + + return self; +} + +static mrb_value +mrb_mqtt_client_set_disconnected_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@disconnected_proc"), block); + client->disconnected_proc = block; + + return self; +} + +static mrb_value +mrb_mqtt_client_set_unsubscribed_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@unsubscribed_proc"), block); + client->unsubscribed_proc = block; + + return self; +} + +static mrb_value +mrb_mqtt_client_set_data_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@data_proc"), block); + client->data_proc = block; + + return self; +} + void mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) { struct RClass *esp32_module = mrb_define_module(mrb, "ESP32"); @@ -320,8 +374,12 @@ mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) { mrb_define_method(mrb, client_class, "_subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1)); mrb_define_method(mrb, client_class, "unsubscribe", mrb_mqtt_client_unsubscribe, MRB_ARGS_REQ(1)); mrb_define_method(mrb, client_class, "disconnect", mrb_mqtt_client_disconnect, MRB_ARGS_NONE()); + + mrb_define_method(mrb, client_class, "set_connected_handler", mrb_mqtt_client_set_connected_handler, MRB_ARGS_BLOCK()); + mrb_define_method(mrb, client_class, "set_disconnected_handler", mrb_mqtt_client_set_disconnected_handler, MRB_ARGS_BLOCK()); + mrb_define_method(mrb, client_class, "set_unsubscribed_handler", mrb_mqtt_client_set_unsubscribed_handler, MRB_ARGS_BLOCK()); + mrb_define_method(mrb, client_class, "set_data_handler", mrb_mqtt_client_set_data_handler, MRB_ARGS_BLOCK()); - mrb_define_class_under(mrb, mqtt_module, "TimeoutError", mrb->eStandardError_class); mrb_define_class_under(mrb, mqtt_module, "ConnectError", mrb->eStandardError_class); mrb_define_class_under(mrb, mqtt_module, "PublishError", mrb->eStandardError_class); mrb_define_class_under(mrb, mqtt_module, "SubscribeError", mrb->eStandardError_class);