Skip to content

onDisconnected Method Takes Significant Time to Detect Disconnection Despite Low KeepAlive Setting #76

Open
@qwerty470

Description

@qwerty470

I have implemented auto-reconnect and used onDisconnected to detect disconnections and trigger reconnection. However, onDisconnected takes 2–3 minutes to be called, causing a significant delay in detecting disconnections, even with keepAlive set to 5 seconds.

Here’s the relevant code snippet:

internal val MQTTClient.isConnected: Boolean
    get() = isRunning() && isConnackReceived()

class MQTTManager(
    private val host: String,
    private val port: Int,
    private val clientId: String,
    private val mqttProtocol: MqttProtocolVersion = MqttProtocolVersion.MQTT_5,
    private val tlsSettings: MqttTLSSettings? = null,
    private val cleanStart: Boolean = false,
    private val keepAliveSeconds: Long = 5L,
    private val sessionExpiry: Long = 300L,
    private val publishIntervalMs: Long = 1000L,
    private val debugLog: Boolean = false
) {

    /** Reference to the underlying KMQTT client. */
    private var client: MQTTClient? = null

    /** Protect shared queue with ReentrantLock. */
    private val queueLock = ReentrantLock()
    private val packetQueue = mutableListOf<MqttPacket>()

    /** Whether we’re connected (i.e., we received CONNACK). */
    private val _isConnected = MutableStateFlow(false)
    val isConnected = _isConnected.asStateFlow()

    /** Parent Job for coroutines. Cancel this to terminate everything. */
    private val supervisorJob = SupervisorJob()

    private val scope = CoroutineScope(Dispatchers.Default + supervisorJob)

    /** Jobs for reconnect loop, step loop, and publishing loop. */
    private var reconnectJob: Job? = null
    private var stepLoopJob: Job? = null
    private var publishLoopJob: Job? = null

    /** For exponential backoff. */
    private var reconnectDelayMs = 2_000L

    /** Whether this is the first-ever connection attempt. Used for “Clean Start” logic. */
    private var isFirstConnection = true

    /** QoS Mapping utility. */
    private val qosMap = mapOf(
        0 to Qos.AT_MOST_ONCE,
        1 to Qos.AT_LEAST_ONCE,
        2 to Qos.EXACTLY_ONCE
    )

    // ---------------
    // PUBLIC METHODS
    // ---------------

    /**
     * Start the MQTT manager:
     * - Launch the reconnect logic in one coroutine.
     * - It will create a new client, connect, and if successful, start the step loop & publish loop.
     */
    fun start() {
        if (reconnectJob?.isActive == true) {
            if (debugLog) println("Reconnect job already active. Skipping.")
            return
        }
        reconnectJob = scope.launch {
            while (isActive) {
                if (client == null || (client?.isRunning() == false)) {
                    try {
                        createAndConnectClient()
                        // Start step loop to process the handshake and keep connection alive
                        startStepLoop()
                        // Start publish loop
                        startPublishLoop()
                    } catch (e: Exception) {
                        if (debugLog) println("Connection attempt failed: ${e.message}")
                        _isConnected.value = false
                        delay(reconnectDelayMs)
                        reconnectDelayMs = (reconnectDelayMs * 2).coerceAtMost(30_000L)
                    }
                }
                delay(1000)
            }
        }
    }

    /**
     * Stop the MQTT manager:
     * - Cancel reconnect, step, and publish loops.
     * - Disconnect the client gracefully.
     */
    fun stop() {
        reconnectJob?.cancel()
        reconnectJob = null

        stepLoopJob?.cancel()
        stepLoopJob = null

        publishLoopJob?.cancel()
        publishLoopJob = null

        _isConnected.value = false
        isFirstConnection = true
        // Graceful disconnect from broker, if client is present
        client?.disconnect(ReasonCode.SUCCESS)
        client = null

        // Cancel the entire scope
        supervisorJob.cancel()
    }

    /**
     * Enqueue an MQTT packet (e.g., for publishing).
     * If we are disconnected, it remains in queue until we reconnect.
     */
    fun enqueuePacket(packet: MqttPacket) {
        queueLock.withLock {
            packetQueue.add(packet)
        }
    }

    // ---------------
    // INTERNAL LOGIC
    // ---------------

    /**
     * Create a new MQTTClient instance and connect.
     *
     * On success, sets _isConnected.value = true.
     */
    private fun createAndConnectClient() {
        // If we already have a client, close it
        client?.disconnect(ReasonCode.SUCCESS)
        client = null

        // Construct the new MQTT client, which automatically sends CONNECT
        val newClient = MQTTClient(
            mqttVersion = toKMQTTVersion(mqttProtocol),
            address = host,
            port = port,
            tls = toKMQTTTLSSettings(tlsSettings),
            properties = MQTT5Properties(sessionExpiryInterval = sessionExpiry.toUInt()),
            keepAlive = keepAliveSeconds.toInt(),
            cleanStart = isFirstConnection.takeIf { it } ?: cleanStart,
            clientId = clientId.ifBlank { null },
            debugLog = debugLog,
            onConnected = { connack ->
                if (debugLog) println("MQTTClient onConnected: CONNACK arrived!")
                _isConnected.value = true
            },
            onDisconnected = { disconnect ->
                if (debugLog) println("MQTTClient onDisconnected: $disconnect")
                _isConnected.value = false
            },
            publishReceived = { incoming ->
                if (debugLog) {
                    println("Received PUBLISH from broker: topic=${incoming.topicName}")
                }
            }
        )

        if (debugLog) println("createAndConnectClient: constructor done, waiting for CONNACK...")

        isFirstConnection = false
        client = newClient
    }

    /**
     * Start a coroutine that repeatedly calls client.step() every ~50ms as long as the client is running.
     */
    private fun startStepLoop() {
        // Avoid duplicating the step loop
        if (stepLoopJob?.isActive == true) return
        val c = client ?: return

        stepLoopJob = scope.launch {
            try {
                while (isActive && c.isRunning()) {
                    c.step()
                    delay(50)
                }
            } catch (e: Exception) {
                if (debugLog) println("stepLoopJob caught exception: ${e.message}")
            } finally {
                _isConnected.value = false
            }
        }
    }

    /**
     * Start a coroutine that periodically flushes the outbound queue (publish messages).
     */
    private fun startPublishLoop() {
        if (publishLoopJob?.isActive == true) return
        publishLoopJob = scope.launch {
            while (isActive) {
                flushQueue()
                delay(publishIntervalMs.milliseconds)
            }
        }
    }

    /**
     * Publish all queued messages.
     * If a publish fails, re-queue it for later.
     */
    private fun flushQueue() {
        val c = client ?: return
        if (!_isConnected.value) return

        // Snapshot the queue
        val localList = queueLock.withLock {
            if (packetQueue.isEmpty()) return
            val snapshot = packetQueue.toList()
            packetQueue.clear()
            snapshot
        }

        localList.forEach { packet ->
            try {
                val qos = qosMap.getOrElse(packet.qos) { Qos.AT_MOST_ONCE }
                c.publish(
                    retain = packet.retain,
                    qos = qos,
                    topic = packet.topic,
                    payload = packet.payload.toUByteArray()
                )

                // Debug/log
                if (debugLog) {
                    try {
                        val jsonPayload = packet.payload.decodeToString()
                        val ecgPacket = Json.decodeFromString<ECGPacket>(jsonPayload)
                        println("Publishing packet number: ${ecgPacket.packetNo}")
                    } catch (e: Exception) {
                        println("Failed to parse packet payload: ${e.message}")
                    }
                }

            } catch (e: Exception) {
                // If publish fails, put it back in the queue for a retry
                if (debugLog) {
                    println("Failed to publish message: ${e.message}. Re-enqueueing.")
                }
                enqueuePacket(packet)
            }
        }
    }

Could you suggest any way to reduce this delay or improve disconnection detection? Additionally, auto-reconnect and Offline Buffering is an important feature for an MQTT client to ensure lossless delivery in case of network disruptions. It would be great if this could be implemented in the library itself.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions