diff --git a/README.md b/README.md index b20c5fe..fa50374 100644 --- a/README.md +++ b/README.md @@ -49,14 +49,28 @@ mqtt.connect Publish message to topic. ```ruby +while !mqtt.connected do + ESP32::System.delay(100) +end mqtt.publish("topic", 'message') ``` -Subscribe to topic and get message. +Subscribe to one or more topics to receive messages. Received messages are event driven. For each message, the main mruby task pauses temporarily, allowing the block given for the receiving topic to run. ```ruby -mqtt.subscribe("topic") -topic, message = mqtt.get +mqtt.subscribe("topic1") do |message| + puts "Received from topic1: #{message}" +end + +mqtt.on_message_from("topic2") do |message| + puts "New message from topic2: #{message}" +end +mqtt.subscribe("topic2") + +loop do + # Do whatever in your main loop. + ESP32::System.delay(1000) +end ``` Disconnect. diff --git a/mrblib/mrb_esp32_mqtt.rb b/mrblib/mrb_esp32_mqtt.rb index 7b9c122..33827ea 100644 --- a/mrblib/mrb_esp32_mqtt.rb +++ b/mrblib/mrb_esp32_mqtt.rb @@ -1,7 +1,58 @@ module ESP32 module MQTT class Client - attr_accessor :ca, :cert, :key + attr_accessor :ca, :cert, :key, :connected + + def initialize(host, port) + self._initialize(host, port) + + @connected = false + @connect_callbacks = [] + @message_callbacks = {} + + self.set_connected_handler do + @connected = true + @connect_callbacks.each { |cb| cb.call } + @connect_callbacks = [] + end + + self.set_disconnected_handler do + @connected = false + end + + self.set_unsubscribed_handler do |topic| + @message_callbacks[topic] = nil + end + + # C calls this block with every received message. + self.set_data_handler do |topic, message| + @message_callbacks[topic].call(message) if @message_callbacks[topic] + end + end + + def subscribe(topic, &block) + @message_callbacks[topic] = block if block + + # Take semaphore + + if @connected + self._subscribe(topic) + else + self.on_connect do + self._subscribe(topic) + end + end + + # Release semaphore + end + + def on_connect(&block) + @connect_callbacks << block + end + + def on_message_from(topic, &block) + @message_callbacks[topic] = block + end end end end diff --git a/src/mrb_esp32_mqtt.c b/src/mrb_esp32_mqtt.c index 93a6cf4..2563cd2 100644 --- a/src/mrb_esp32_mqtt.c +++ b/src/mrb_esp32_mqtt.c @@ -13,10 +13,7 @@ #include "esp_log.h" #include "mqtt_client.h" - #define TAG ("mruby-esp32-mqtt") -#define WAIT_EVENT_TIMEOUT_SEC (20) -#define WAIT_EVENT_QUEUE_LEN (10) static void mrb_mqtt_client_free(mrb_state *mrb, void *p); @@ -30,9 +27,64 @@ typedef struct mqtt_client_t { mrb_int port; mrb_bool ssl; esp_mqtt_client_handle_t client; - QueueHandle_t queue; + + TaskHandle_t mruby_task_handle; + mrb_value connected_proc; + mrb_value disconnected_proc; + mrb_value unsubscribed_proc; + mrb_value data_proc; } mqtt_client_t; + +static void +mqtt_connected_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. + + // Call @connected_proc. + mrb_assert(mrb_type(client->connected_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->connected_proc, 0, NULL); + + // Release semaphore. +} + +static void +mqtt_disconnected_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. + + // Call @disconnected_proc. + mrb_assert(mrb_type(client->disconnected_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->disconnected_proc, 0, NULL); + + // Release semaphore. +} + +static void +mqtt_unsubscribed_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. + + // Call @unsubscribed_proc. + mrb_assert(mrb_type(client->unsubscribed_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->unsubscribed_proc, 0, NULL); + + // Release semaphore. +} + +static void +mqtt_data_handler(mqtt_client_t *client, esp_mqtt_event_handle_t event) { + // Get semaphore. + + // Prep arguments to pass. + mrb_value args[2]; + args[0] = mrb_str_new_static(client->mrb, event->topic, event->topic_len); + args[1] = mrb_str_new_static(client->mrb, event->data, event->data_len); + + // Call @data_proc + mrb_assert(mrb_type(client->data_proc) == MRB_TT_PROC); + mrb_yield_argv(client->mrb, client->data_proc, 2, &args[0]); + + // Release semaphore. +} + static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_id, void *event_data) { ESP_LOGD(TAG, "Event dispatched from event loop base=%s, event_id=%d", base, event_id); @@ -40,31 +92,33 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i esp_mqtt_event_handle_t event = event_data; switch ((esp_mqtt_event_id_t)event_id) { + case MQTT_EVENT_ERROR: + ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + break; case MQTT_EVENT_CONNECTED: - ESP_LOGD(TAG, "MQTT_EVENT_CONNECTED"); - xQueueSend(client->queue, event_data, (TickType_t)0); + ESP_LOGI(TAG, "MQTT_EVENT_CONNECTED"); + mqtt_connected_handler(client, event); break; case MQTT_EVENT_DISCONNECTED: ESP_LOGI(TAG, "MQTT_EVENT_DISCONNECTED"); - xQueueSend(client->queue, event_data, (TickType_t)0); + mqtt_disconnected_handler(client, event); break; case MQTT_EVENT_SUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id); - xQueueSend(client->queue, event_data, (TickType_t)0); break; case MQTT_EVENT_UNSUBSCRIBED: ESP_LOGI(TAG, "MQTT_EVENT_UNSUBSCRIBED, msg_id=%d", event->msg_id); - xQueueSend(client->queue, event_data, (TickType_t)0); + mqtt_unsubscribed_handler(client, event); break; case MQTT_EVENT_PUBLISHED: ESP_LOGI(TAG, "MQTT_EVENT_PUBLISHED, msg_id=%d", event->msg_id); break; case MQTT_EVENT_DATA: ESP_LOGI(TAG, "MQTT_EVENT_DATA"); - xQueueSend(client->queue, event_data, (TickType_t)0); + mqtt_data_handler(client, event); break; - case MQTT_EVENT_ERROR: - ESP_LOGI(TAG, "MQTT_EVENT_ERROR"); + case MQTT_EVENT_BEFORE_CONNECT: + ESP_LOGI(TAG, "MQTT_EVENT_BEFORE_CONNECT"); break; default: ESP_LOGI(TAG, "Other event id:%d", event->event_id); @@ -72,33 +126,6 @@ static void mqtt_event_handler(void *arg, esp_event_base_t base, int32_t event_i } } -static void -mqtt_wait_for_event(mrb_state *mrb, mrb_value self, int32_t event_id) { - mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); - esp_mqtt_event_t event; - int wait_count; - struct RClass* error_class; - - for(wait_count = 0 ; wait_count < WAIT_EVENT_TIMEOUT_SEC ; wait_count++) { - if(xQueueReceive(client->queue, (void*)&event, (TickType_t)(1000 / portTICK_PERIOD_MS))) { - if(event.event_id == event_id) return; - } - } - error_class = mrb_exc_get_id(mrb, MRB_ERROR_SYM(ESP32::MQTT::TimeoutError)); - mrb_raise(mrb, error_class, "Timeout wait for mqtt event."); -} - -static void -mqtt_wait_for_data(mrb_state *mrb, mrb_value self, esp_mqtt_event_t *event) { - mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); - - while(true) { - if(xQueueReceive(client->queue, (void*)event, (TickType_t)(1000 / portTICK_PERIOD_MS))) { - if(event->event_id == MQTT_EVENT_DATA) break; - } - } -} - static void mrb_mqtt_client_free(mrb_state *mrb, void *p) { mqtt_client_t *client = (mqtt_client_t *)p; @@ -110,20 +137,19 @@ mrb_mqtt_client_free(mrb_state *mrb, void *p) { } static mrb_value -mrb_mqtt_client_init(mrb_state *mrb, mrb_value self) { +mrb_mqtt_client_initialize(mrb_state *mrb, mrb_value self) { mqtt_client_t *client = mrb_malloc(mrb, sizeof(mqtt_client_t)); mrb_value host; mrb_int port; - mrb_get_args(mrb, "Si", &host, &port); + client->mruby_task_handle = xTaskGetCurrentTaskHandle(); client->mrb = mrb; client->host = mrb_malloc(mrb, strlen(mrb_str_to_cstr(mrb, host))); strcpy(client->host, mrb_str_to_cstr(mrb, host)); client->port = port; client->ssl = FALSE; - client->queue = xQueueCreate(WAIT_EVENT_QUEUE_LEN, sizeof(esp_mqtt_event_t)); mrb_data_init(self, client, &mrb_mqtt_client); ESP_LOGI(TAG, "initialize(%s, %d)", client->host, client->port); @@ -180,7 +206,6 @@ mrb_mqtt_client_connect(mrb_state *mrb, mrb_value self) { client->client = mqtt_client; - mqtt_wait_for_event(mrb, self, MQTT_EVENT_CONNECTED); ESP_LOGI( TAG, "connect(%s://%s:%d)", @@ -226,7 +251,6 @@ mrb_mqtt_client_subscribe(mrb_state *mrb, mrb_value self) { struct RClass* error_class; mrb_value topic; - mrb_get_args(mrb, "S", &topic); ret = esp_mqtt_client_subscribe( @@ -239,7 +263,6 @@ mrb_mqtt_client_subscribe(mrb_state *mrb, mrb_value self) { mrb_raise(mrb, error_class, "Failed to subscribe."); return self; } - mqtt_wait_for_event(mrb, self, MQTT_EVENT_SUBSCRIBED); ESP_LOGI(TAG, "subscribe(%s)", mrb_str_to_cstr(mrb, topic)); return self; @@ -264,30 +287,11 @@ mrb_mqtt_client_unsubscribe(mrb_state *mrb, mrb_value self) { mrb_raise(mrb, error_class, "Failed to unsubscribe."); return self; } - mqtt_wait_for_event(mrb, self, MQTT_EVENT_UNSUBSCRIBED); ESP_LOGI(TAG, "unsubscribe(%s)", mrb_str_to_cstr(mrb, topic)); return self; } -static mrb_value -mrb_mqtt_client_get(mrb_state *mrb, mrb_value self) { - esp_mqtt_event_t event; - - mqtt_wait_for_data(mrb, self, &event); - - mrb_value topic = mrb_str_new_static(mrb, event.topic, event.topic_len); - mrb_value message = mrb_str_new_static(mrb, event.data, event.data_len); - - mrb_value ary = mrb_ary_new(mrb); - mrb_ary_push(mrb, ary, topic); - mrb_ary_push(mrb, ary, message); - - ESP_LOGI(TAG, "get()"); - - return ary; -} - static mrb_value mrb_mqtt_client_disconnect(mrb_state *mrb, mrb_value self) { mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); @@ -300,28 +304,82 @@ mrb_mqtt_client_disconnect(mrb_state *mrb, mrb_value self) { mrb_raise(mrb, error_class, "Failed to disconnect."); return self; } - mqtt_wait_for_event(mrb, self, MQTT_EVENT_DISCONNECTED); ESP_LOGI(TAG, "disconnect"); return self; } +static mrb_value +mrb_mqtt_client_set_connected_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@connected_proc"), block); + client->connected_proc = block; + + return self; +} + +static mrb_value +mrb_mqtt_client_set_disconnected_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@disconnected_proc"), block); + client->disconnected_proc = block; + + return self; +} + +static mrb_value +mrb_mqtt_client_set_unsubscribed_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@unsubscribed_proc"), block); + client->unsubscribed_proc = block; + + return self; +} + +static mrb_value +mrb_mqtt_client_set_data_handler(mrb_state *mrb, mrb_value self) { + mqtt_client_t *client = (mqtt_client_t *) DATA_PTR(self); + + mrb_value block; + mrb_get_args(mrb, "&", &block); + + mrb_iv_set(mrb, self, mrb_intern_lit(mrb, "@data_proc"), block); + client->data_proc = block; + + return self; +} + void mrb_mruby_esp32_mqtt_gem_init(mrb_state* mrb) { struct RClass *esp32_module = mrb_define_module(mrb, "ESP32"); struct RClass *mqtt_module = mrb_define_module_under(mrb, esp32_module, "MQTT"); struct RClass *client_class = mrb_define_class_under(mrb, mqtt_module, "Client", mrb->object_class); - mrb_define_method(mrb, client_class, "initialize", mrb_mqtt_client_init, MRB_ARGS_REQ(2)); + mrb_define_method(mrb, client_class, "_initialize", mrb_mqtt_client_initialize, MRB_ARGS_REQ(2)|MRB_ARGS_BLOCK()); mrb_define_method(mrb, client_class, "ssl=", mrb_mqtt_client_set_ssl, MRB_ARGS_REQ(1)); mrb_define_method(mrb, client_class, "connect", mrb_mqtt_client_connect, MRB_ARGS_NONE()); mrb_define_method(mrb, client_class, "publish", mrb_mqtt_client_publish, MRB_ARGS_REQ(2)); - mrb_define_method(mrb, client_class, "subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1)); + mrb_define_method(mrb, client_class, "_subscribe", mrb_mqtt_client_subscribe, MRB_ARGS_REQ(1)); mrb_define_method(mrb, client_class, "unsubscribe", mrb_mqtt_client_unsubscribe, MRB_ARGS_REQ(1)); - mrb_define_method(mrb, client_class, "get", mrb_mqtt_client_get, MRB_ARGS_NONE()); mrb_define_method(mrb, client_class, "disconnect", mrb_mqtt_client_disconnect, MRB_ARGS_NONE()); + + mrb_define_method(mrb, client_class, "set_connected_handler", mrb_mqtt_client_set_connected_handler, MRB_ARGS_BLOCK()); + mrb_define_method(mrb, client_class, "set_disconnected_handler", mrb_mqtt_client_set_disconnected_handler, MRB_ARGS_BLOCK()); + mrb_define_method(mrb, client_class, "set_unsubscribed_handler", mrb_mqtt_client_set_unsubscribed_handler, MRB_ARGS_BLOCK()); + mrb_define_method(mrb, client_class, "set_data_handler", mrb_mqtt_client_set_data_handler, MRB_ARGS_BLOCK()); - mrb_define_class_under(mrb, mqtt_module, "TimeoutError", mrb->eStandardError_class); mrb_define_class_under(mrb, mqtt_module, "ConnectError", mrb->eStandardError_class); mrb_define_class_under(mrb, mqtt_module, "PublishError", mrb->eStandardError_class); mrb_define_class_under(mrb, mqtt_module, "SubscribeError", mrb->eStandardError_class);