Skip to content

Commit 83fa8c5

Browse files
authored
Fix LivestreamChannelController not reconnecting when connection is dropped (#3782)
* Fix not reconnecting livestream channel controller when connection is dropped * Add test coverage * Update CHANGELOG.md * Add sync operation test coverage
1 parent 01052a9 commit 83fa8c5

File tree

10 files changed

+479
-26
lines changed

10 files changed

+479
-26
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
55

66
## StreamChat
77
### 🐞 Fixed
8+
- Fix LivestreamChannelController not reconnecting when connection is dropped [#3782](https://github.com/GetStream/stream-chat-swift/pull/3782)
89
- Fix `StreamAudioRecorder` not overridable because of init method [#3783](https://github.com/GetStream/stream-chat-swift/pull/3783)
910

1011
# [4.85.0](https://github.com/GetStream/stream-chat-swift/releases/tag/4.85.0)

Sources/StreamChat/Controllers/ChannelController/LivestreamChannelController.swift

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,12 +219,56 @@ public class LivestreamChannelController: DataStoreProvider, EventsControllerDel
219219
messages = channel.latestMessages
220220
}
221221

222+
client.syncRepository.startTrackingLivestreamController(self)
223+
222224
updateChannelData(
223225
channelQuery: channelQuery,
224226
completion: completion
225227
)
226228
}
227229

230+
/// Start watching a channel
231+
///
232+
/// - Parameter completion: Called when the API call is finished. Called with `Error` if the remote update fails.
233+
public func startWatching(isInRecoveryMode: Bool, completion: ((Error?) -> Void)? = nil) {
234+
guard let cid = cid else {
235+
let error = ClientError.ChannelNotCreatedYet()
236+
callback {
237+
completion?(error)
238+
}
239+
return
240+
}
241+
242+
client.syncRepository.startTrackingLivestreamController(self)
243+
244+
updater.startWatching(cid: cid, isInRecoveryMode: isInRecoveryMode) { error in
245+
self.callback {
246+
completion?(error)
247+
}
248+
}
249+
}
250+
251+
/// Stop watching a channel
252+
///
253+
/// - Parameter completion: Called when the API call is finished. Called with `Error` if the remote update fails.
254+
public func stopWatching(completion: ((Error?) -> Void)? = nil) {
255+
guard let cid = cid else {
256+
let error = ClientError.ChannelNotCreatedYet()
257+
callback {
258+
completion?(error)
259+
}
260+
return
261+
}
262+
263+
client.syncRepository.stopTrackingLivestreamController(self)
264+
265+
updater.stopWatching(cid: cid) { error in
266+
self.callback {
267+
completion?(error)
268+
}
269+
}
270+
}
271+
228272
/// Loads previous (older) messages from backend.
229273
/// - Parameters:
230274
/// - messageId: ID of the last fetched message. You will get messages `older` than the provided ID.

Sources/StreamChat/Repositories/SyncOperations.swift

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,30 @@ final class WatchChannelOperation: AsyncOperation, @unchecked Sendable {
161161
}
162162
}
163163
}
164-
164+
165+
init(livestreamController: LivestreamChannelController, context: SyncContext, recovery: Bool) {
166+
super.init(maxRetries: syncOperationsMaximumRetries) { [weak livestreamController] _, done in
167+
guard let controller = livestreamController else {
168+
done(.continue)
169+
return
170+
}
171+
172+
let cidString = (controller.cid?.rawValue ?? "unknown")
173+
log.info("Watching active channel \(cidString)", subsystems: .offlineSupport)
174+
controller.startWatching(isInRecoveryMode: recovery) { error in
175+
if let cid = controller.cid, error == nil {
176+
log.info("Successfully watched active channel \(cidString)", subsystems: .offlineSupport)
177+
context.watchedAndSynchedChannelIds.insert(cid)
178+
done(.continue)
179+
} else {
180+
let errorMessage = error?.localizedDescription ?? "missing cid"
181+
log.error("Failed watching active channel \(cidString): \(errorMessage)", subsystems: .offlineSupport)
182+
done(.retry)
183+
}
184+
}
185+
}
186+
}
187+
165188
init(chat: Chat, context: SyncContext) {
166189
super.init(maxRetries: syncOperationsMaximumRetries) { [weak chat] _, done in
167190
guard let chat else {

Sources/StreamChat/Repositories/SyncRepository.swift

Lines changed: 43 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ class SyncRepository {
3636
private let channelListUpdater: ChannelListUpdater
3737
let offlineRequestsRepository: OfflineRequestsRepository
3838
let eventNotificationCenter: EventNotificationCenter
39-
39+
4040
let activeChannelControllers = ThreadSafeWeakCollection<ChatChannelController>()
4141
let activeChannelListControllers = ThreadSafeWeakCollection<ChatChannelListController>()
4242
let activeChats = ThreadSafeWeakCollection<Chat>()
43+
let activeLivestreamControllers = ThreadSafeWeakCollection<LivestreamChannelController>()
4344
let activeChannelLists = ThreadSafeWeakCollection<ChannelList>()
44-
45+
4546
private lazy var operationQueue: OperationQueue = {
4647
let operationQueue = OperationQueue()
4748
operationQueue.maxConcurrentOperationCount = 1
@@ -69,54 +70,64 @@ class SyncRepository {
6970
deinit {
7071
cancelRecoveryFlow()
7172
}
72-
73+
7374
// MARK: - Tracking Active
74-
75+
7576
func startTrackingChat(_ chat: Chat) {
7677
guard !activeChats.contains(chat) else { return }
7778
activeChats.add(chat)
7879
}
79-
80+
8081
func stopTrackingChat(_ chat: Chat) {
8182
activeChats.remove(chat)
8283
}
83-
84+
8485
func startTrackingChannelController(_ controller: ChatChannelController) {
8586
guard !activeChannelControllers.contains(controller) else { return }
8687
activeChannelControllers.add(controller)
8788
}
88-
89+
8990
func stopTrackingChannelController(_ controller: ChatChannelController) {
9091
activeChannelControllers.remove(controller)
9192
}
92-
93+
94+
func startTrackingLivestreamController(_ controller: LivestreamChannelController) {
95+
guard !activeLivestreamControllers.contains(controller) else { return }
96+
activeLivestreamControllers.add(controller)
97+
}
98+
99+
func stopTrackingLivestreamController(_ controller: LivestreamChannelController) {
100+
activeLivestreamControllers.remove(controller)
101+
}
102+
93103
func startTrackingChannelList(_ channelList: ChannelList) {
94104
guard !activeChannelLists.contains(channelList) else { return }
95105
activeChannelLists.add(channelList)
96106
}
97-
107+
98108
func stopTrackingChannelList(_ channelList: ChannelList) {
99109
activeChannelLists.remove(channelList)
100110
}
101-
111+
102112
func startTrackingChannelListController(_ controller: ChatChannelListController) {
103113
guard !activeChannelListControllers.contains(controller) else { return }
104114
activeChannelListControllers.add(controller)
105115
}
106-
116+
107117
func stopTrackingChannelListController(_ controller: ChatChannelListController) {
108118
activeChannelListControllers.remove(controller)
109119
}
110-
120+
111121
func removeAllTracked() {
112122
activeChats.removeAllObjects()
113123
activeChannelControllers.removeAllObjects()
114124
activeChannelLists.removeAllObjects()
115125
activeChannelListControllers.removeAllObjects()
126+
activeLivestreamControllers.removeAllObjects()
116127
}
117-
128+
118129
// MARK: - Syncing
119-
130+
120131
func syncLocalState(completion: @escaping () -> Void) {
121132
cancelRecoveryFlow()
122133

@@ -137,9 +148,9 @@ class SyncRepository {
137148
self?.syncLocalState(lastSyncAt: lastSyncAt, completion: completion)
138149
}
139150
}
140-
151+
141152
// MARK: -
142-
153+
143154
/// Runs offline tasks and updates the local state for channels
144155
///
145156
/// Recovery mode (pauses regular API requests while it is running)
@@ -159,7 +170,7 @@ class SyncRepository {
159170
var operations: [Operation] = []
160171
let start = CFAbsoluteTimeGetCurrent()
161172
log.info("Starting to refresh offline state", subsystems: .offlineSupport)
162-
173+
163174
//
164175
// Recovery mode operations (other API requests are paused)
165176
//
@@ -170,34 +181,41 @@ class SyncRepository {
170181
apiClient.exitRecoveryMode()
171182
}))
172183
}
173-
184+
174185
//
175186
// Background mode operations
176187
//
177-
188+
178189
/// 1. Collect all the **active** channel ids
179190
operations.append(ActiveChannelIdsOperation(syncRepository: self, context: context))
180-
191+
181192
// 2. Refresh channel lists
182193
operations.append(contentsOf: activeChannelLists.allObjects.map { RefreshChannelListOperation(channelList: $0, context: context) })
183194
operations.append(contentsOf: activeChannelListControllers.allObjects.map { RefreshChannelListOperation(controller: $0, context: context) })
184195

185196
// 3. /sync (for channels what not part of active channel lists)
186197
operations.append(SyncEventsOperation(syncRepository: self, context: context, recovery: false))
187-
198+
188199
// 4. Re-watch channels what we were watching before disconnect
189200
// Needs to be done explicitly after reconnection, otherwise SDK users need to handle connection changes
190-
operations.append(contentsOf: activeChannelControllers.allObjects.map { WatchChannelOperation(controller: $0, context: context, recovery: false) })
191-
operations.append(contentsOf: activeChats.allObjects.map { WatchChannelOperation(chat: $0, context: context) })
192-
201+
operations.append(contentsOf: activeChannelControllers.allObjects.map {
202+
WatchChannelOperation(controller: $0, context: context, recovery: false)
203+
})
204+
operations.append(contentsOf: activeChats.allObjects.map {
205+
WatchChannelOperation(chat: $0, context: context)
206+
})
207+
operations.append(contentsOf: activeLivestreamControllers.allObjects.map {
208+
WatchChannelOperation(livestreamController: $0, context: context, recovery: false)
209+
})
210+
193211
operations.append(BlockOperation(block: {
194212
let duration = CFAbsoluteTimeGetCurrent() - start
195213
log.info("Finished refreshing offline state (\(context.synchedChannelIds.count) channels in \(String(format: "%.1f", duration)) seconds)", subsystems: .offlineSupport)
196214
DispatchQueue.main.async {
197215
completion()
198216
}
199217
}))
200-
218+
201219
var previousOperation: Operation?
202220
operations.reversed().forEach { operation in
203221
defer { previousOperation = operation }

StreamChat.xcodeproj/project.pbxproj

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1439,6 +1439,7 @@
14391439
AD2F2D992D271B07006ED24B /* UserAnnotation.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD2F2D982D271B07006ED24B /* UserAnnotation.swift */; };
14401440
AD2F2D9B2D271B36006ED24B /* UserAnnotationView.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD2F2D9A2D271B36006ED24B /* UserAnnotationView.swift */; };
14411441
AD3331702A30DB2E00ABF38F /* SwipeToReplyGestureHandler_Mock.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD33316F2A30DB2E00ABF38F /* SwipeToReplyGestureHandler_Mock.swift */; };
1442+
AD35D5392E538712003142CD /* LivestreamChannelController_Spy.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD35D5382E538712003142CD /* LivestreamChannelController_Spy.swift */; };
14421443
AD37D7C42BC979B000800D8C /* ThreadDTO.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD37D7C32BC979B000800D8C /* ThreadDTO.swift */; };
14431444
AD37D7C52BC979B000800D8C /* ThreadDTO.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD37D7C32BC979B000800D8C /* ThreadDTO.swift */; };
14441445
AD37D7C72BC98A4400800D8C /* ThreadParticipantDTO.swift in Sources */ = {isa = PBXBuildFile; fileRef = AD37D7C62BC98A4400800D8C /* ThreadParticipantDTO.swift */; };
@@ -4305,6 +4306,7 @@
43054306
AD2F2D982D271B07006ED24B /* UserAnnotation.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UserAnnotation.swift; sourceTree = "<group>"; };
43064307
AD2F2D9A2D271B36006ED24B /* UserAnnotationView.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = UserAnnotationView.swift; sourceTree = "<group>"; };
43074308
AD33316F2A30DB2E00ABF38F /* SwipeToReplyGestureHandler_Mock.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SwipeToReplyGestureHandler_Mock.swift; sourceTree = "<group>"; };
4309+
AD35D5382E538712003142CD /* LivestreamChannelController_Spy.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = LivestreamChannelController_Spy.swift; sourceTree = "<group>"; };
43084310
AD37D7C32BC979B000800D8C /* ThreadDTO.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadDTO.swift; sourceTree = "<group>"; };
43094311
AD37D7C62BC98A4400800D8C /* ThreadParticipantDTO.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadParticipantDTO.swift; sourceTree = "<group>"; };
43104312
AD37D7C92BC98A5300800D8C /* ThreadReadDTO.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadReadDTO.swift; sourceTree = "<group>"; };
@@ -8470,6 +8472,7 @@
84708472
A3F65E3127EB6E71003F6256 /* Spy */ = {
84718473
isa = PBXGroup;
84728474
children = (
8475+
AD35D5382E538712003142CD /* LivestreamChannelController_Spy.swift */,
84738476
792921C624C047DD00116BBB /* APIClient_Spy.swift */,
84748477
649968D8264E6A71000515AB /* CDNClient_Spy.swift */,
84758478
4FBD840A2C774E5C00B1E680 /* AttachmentDownloader_Spy.swift */,
@@ -11484,6 +11487,7 @@
1148411487
40D4840B2A1264F1009E4134 /* MockAudioPlayerDelegate.swift in Sources */,
1148511488
A3C3BC4727E87F5C00224761 /* TypingEventsSender_Mock.swift in Sources */,
1148611489
8459C9F22BFB6D3200F0D235 /* PollVoteListController_Mock.swift in Sources */,
11490+
AD35D5392E538712003142CD /* LivestreamChannelController_Spy.swift in Sources */,
1148711491
A3C3BC3727E87F3200224761 /* Logger_Spy.swift in Sources */,
1148811492
82F714A52B07831700442A74 /* AssertDate.swift in Sources */,
1148911493
A3C3BC6B27E8AA4300224761 /* TestUser.swift in Sources */,

TestTools/StreamChatTestTools/Mocks/StreamChat/Workers/ChannelUpdater_Mock.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ final class ChannelUpdater_Mock: ChannelUpdater {
117117
@Atomic var disableSlowMode_completion_result: Result<Void, Error>?
118118

119119
@Atomic var startWatching_cid: ChannelId?
120+
@Atomic var startWatching_isInRecoveryMode: Bool?
120121
@Atomic var startWatching_completion: ((Error?) -> Void)?
121122
@Atomic var startWatching_completion_result: Result<Void, Error>?
122123

@@ -258,6 +259,7 @@ final class ChannelUpdater_Mock: ChannelUpdater {
258259
disableSlowMode_completion_result = nil
259260

260261
startWatching_cid = nil
262+
startWatching_isInRecoveryMode = nil
261263
startWatching_completion = nil
262264
startWatching_completion_result = nil
263265

@@ -524,6 +526,7 @@ final class ChannelUpdater_Mock: ChannelUpdater {
524526

525527
override func startWatching(cid: ChannelId, isInRecoveryMode: Bool, completion: ((Error?) -> Void)? = nil) {
526528
startWatching_cid = cid
529+
startWatching_isInRecoveryMode = isInRecoveryMode
527530
startWatching_completion = completion
528531
startWatching_completion_result?.invoke(with: completion)
529532
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
@testable import StreamChat
7+
8+
final class LivestreamChannelController_Spy: LivestreamChannelController, Spy {
9+
var startWatchingError: Error?
10+
let spyState = SpyState()
11+
12+
init(client: ChatClient_Mock) {
13+
super.init(channelQuery: .init(cid: .unique), client: client)
14+
}
15+
16+
override func startWatching(isInRecoveryMode: Bool, completion: ((Error?) -> Void)? = nil) {
17+
record()
18+
completion?(startWatchingError)
19+
}
20+
}

0 commit comments

Comments
 (0)