Skip to content

Commit ddbbec2

Browse files
authored
Add WaitingPlugin support (#395)
* Fix update(.initial) being called twice on destination plugin timeline. * Add WaitingPlugin support
1 parent e83f63e commit ddbbec2

File tree

6 files changed

+529
-25
lines changed

6 files changed

+529
-25
lines changed

Sources/Segment/Analytics.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public class Analytics {
3030
static internal weak var firstInstance: Analytics? = nil
3131

3232
@Atomic static internal var activeWriteKeys = [String]()
33+
34+
// Used for WaitingPlugin's, see waiting.swift
35+
internal var processingTimer: DispatchWorkItem? = nil
3336

3437
/**
3538
This method isn't a traditional singleton implementation. It's provided here

Sources/Segment/Plugins.swift

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ extension DestinationPlugin {
124124
public func add(plugin: Plugin) -> Plugin {
125125
if let analytics = self.analytics {
126126
plugin.configure(analytics: analytics)
127+
if let waiting = plugin as? WaitingPlugin {
128+
analytics.pauseEventProcessing(plugin: waiting)
129+
}
127130
}
128131
timeline.add(plugin: plugin)
129132
analytics?.updateIfNecessary(plugin: plugin)
@@ -188,6 +191,9 @@ extension Analytics {
188191
@discardableResult
189192
public func add(plugin: Plugin) -> Plugin {
190193
plugin.configure(analytics: self)
194+
if let waiting = plugin as? WaitingPlugin {
195+
pauseEventProcessing(plugin: waiting)
196+
}
191197
timeline.add(plugin: plugin)
192198
updateIfNecessary(plugin: plugin)
193199
return plugin

Sources/Segment/Settings.swift

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,11 @@ extension Settings: Equatable {
109109

110110
extension Analytics {
111111
internal func update(settings: Settings) {
112-
guard let system: System = store.currentState() else { return }
113112
apply { plugin in
114-
plugin.update(settings: settings, type: updateType(for: plugin, in: system))
113+
plugin.update(settings: settings, type: updateType(for: plugin))
115114
if let destPlugin = plugin as? DestinationPlugin {
116115
destPlugin.apply { subPlugin in
117-
subPlugin.update(settings: settings, type: updateType(for: subPlugin, in: system))
116+
subPlugin.update(settings: settings, type: updateType(for: subPlugin))
118117
}
119118
}
120119
}
@@ -125,19 +124,12 @@ extension Analytics {
125124
// if we're already running, update has already been called for existing plugins,
126125
// so we just wanna call it on this one if it hasn't been done already.
127126
if system.running, let settings = system.settings {
128-
let alreadyInitialized = system.initializedPlugins.contains { p in
129-
return plugin === p
130-
}
131-
if !alreadyInitialized {
132-
store.dispatch(action: System.AddPluginToInitialized(plugin: plugin))
133-
plugin.update(settings: settings, type: .initial)
134-
} else {
135-
plugin.update(settings: settings, type: .refresh)
136-
}
127+
plugin.update(settings: settings, type: updateType(for: plugin))
137128
}
138129
}
139130

140-
internal func updateType(for plugin: Plugin, in system: System) -> UpdateType {
131+
internal func updateType(for plugin: Plugin) -> UpdateType {
132+
guard let system: System = store.currentState() else { return .initial }
141133
let alreadyInitialized = system.initializedPlugins.contains { p in
142134
return plugin === p
143135
}
@@ -154,14 +146,14 @@ extension Analytics {
154146
if isUnitTesting {
155147
// we don't really wanna wait for this network call during tests...
156148
// but we should make it work similarly.
157-
store.dispatch(action: System.ToggleRunningAction(running: false))
149+
pauseEventProcessing()
158150

159151
operatingMode.run(queue: DispatchQueue.main) {
160152
if let state: System = self.store.currentState(), let settings = state.settings {
161153
self.store.dispatch(action: System.UpdateSettingsAction(settings: settings))
162154
self.update(settings: settings)
163155
}
164-
self.store.dispatch(action: System.ToggleRunningAction(running: true))
156+
self.resumeEventProcessing()
165157
}
166158

167159
return
@@ -172,7 +164,7 @@ extension Analytics {
172164
let httpClient = HTTPClient(analytics: self)
173165

174166
// stop things; queue in case our settings have changed.
175-
store.dispatch(action: System.ToggleRunningAction(running: false))
167+
pauseEventProcessing()
176168
httpClient.settingsFor(writeKey: writeKey) { (success, settings) in
177169
if success, let s = settings {
178170
// put the new settings in the state store.
@@ -186,7 +178,7 @@ extension Analytics {
186178
}
187179

188180
// we're good to go back to a running state.
189-
self.store.dispatch(action: System.ToggleRunningAction(running: true))
181+
self.resumeEventProcessing()
190182
}
191183
}
192184
}

Sources/Segment/State.swift

Lines changed: 95 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ struct System: State {
1616
let running: Bool
1717
let enabled: Bool
1818
let initializedPlugins: [Plugin]
19+
let waitingPlugins: [Plugin]
1920

2021
struct UpdateSettingsAction: Action {
2122
let settings: Settings
@@ -25,7 +26,8 @@ struct System: State {
2526
settings: settings,
2627
running: state.running,
2728
enabled: state.enabled,
28-
initializedPlugins: state.initializedPlugins)
29+
initializedPlugins: state.initializedPlugins,
30+
waitingPlugins: state.waitingPlugins)
2931
return result
3032
}
3133
}
@@ -34,11 +36,29 @@ struct System: State {
3436
let running: Bool
3537

3638
func reduce(state: System) -> System {
39+
var desiredRunning = running
40+
41+
if desiredRunning == true && state.waitingPlugins.count > 0 {
42+
desiredRunning = false
43+
}
44+
3745
return System(configuration: state.configuration,
3846
settings: state.settings,
39-
running: running,
47+
running: desiredRunning,
4048
enabled: state.enabled,
41-
initializedPlugins: state.initializedPlugins)
49+
initializedPlugins: state.initializedPlugins,
50+
waitingPlugins: state.waitingPlugins)
51+
}
52+
}
53+
54+
struct ForceRunningAction: Action {
55+
func reduce(state: System) -> System {
56+
return System(configuration: state.configuration,
57+
settings: state.settings,
58+
running: true,
59+
enabled: state.enabled,
60+
initializedPlugins: state.initializedPlugins,
61+
waitingPlugins: state.waitingPlugins)
4262
}
4363
}
4464

@@ -50,7 +70,8 @@ struct System: State {
5070
settings: state.settings,
5171
running: state.running,
5272
enabled: enabled,
53-
initializedPlugins: state.initializedPlugins)
73+
initializedPlugins: state.initializedPlugins,
74+
waitingPlugins: state.waitingPlugins)
5475
}
5576
}
5677

@@ -62,7 +83,8 @@ struct System: State {
6283
settings: state.settings,
6384
running: state.running,
6485
enabled: state.enabled,
65-
initializedPlugins: state.initializedPlugins)
86+
initializedPlugins: state.initializedPlugins,
87+
waitingPlugins: state.waitingPlugins)
6688
}
6789
}
6890

@@ -79,7 +101,8 @@ struct System: State {
79101
settings: settings,
80102
running: state.running,
81103
enabled: state.enabled,
82-
initializedPlugins: state.initializedPlugins)
104+
initializedPlugins: state.initializedPlugins,
105+
waitingPlugins: state.waitingPlugins)
83106
}
84107
}
85108

@@ -97,7 +120,64 @@ struct System: State {
97120
settings: state.settings,
98121
running: state.running,
99122
enabled: state.enabled,
100-
initializedPlugins: initializedPlugins)
123+
initializedPlugins: initializedPlugins,
124+
waitingPlugins: state.waitingPlugins)
125+
}
126+
}
127+
128+
struct AddWaitingPlugin: Action {
129+
let plugin: Plugin
130+
131+
func reduce(state: System) -> System {
132+
var waitingPlugins = state.waitingPlugins
133+
if !waitingPlugins.contains(where: { p in
134+
return plugin === p
135+
}) {
136+
waitingPlugins.append(plugin)
137+
}
138+
return System(configuration: state.configuration,
139+
settings: state.settings,
140+
running: state.running,
141+
enabled: state.enabled,
142+
initializedPlugins: state.initializedPlugins,
143+
waitingPlugins: waitingPlugins)
144+
}
145+
}
146+
147+
/*struct RemoveWaitingPlugin: Action {
148+
let plugin: Plugin
149+
150+
func reduce(state: System) -> System {
151+
var waitingPlugins = state.waitingPlugins
152+
waitingPlugins.removeAll { p in
153+
return plugin === p
154+
}
155+
return System(configuration: state.configuration,
156+
settings: state.settings,
157+
running: state.running,
158+
enabled: state.enabled,
159+
initializedPlugins: state.initializedPlugins,
160+
waitingPlugins: waitingPlugins)
161+
}
162+
}*/
163+
struct RemoveWaitingPlugin: Action {
164+
let plugin: Plugin
165+
166+
func reduce(state: System) -> System {
167+
var waitingPlugins = state.waitingPlugins
168+
let countBefore = waitingPlugins.count
169+
waitingPlugins.removeAll { p in
170+
return plugin === p
171+
}
172+
let countAfter = waitingPlugins.count
173+
print("RemoveWaitingPlugin: \(countBefore) -> \(countAfter)")
174+
175+
return System(configuration: state.configuration,
176+
settings: state.settings,
177+
running: state.running,
178+
enabled: state.enabled,
179+
initializedPlugins: state.initializedPlugins,
180+
waitingPlugins: waitingPlugins)
101181
}
102182
}
103183
}
@@ -171,7 +251,14 @@ extension System {
171251
settings = Settings(writeKey: configuration.values.writeKey, apiHost: HTTPClient.getDefaultAPIHost())
172252
}
173253
}
174-
return System(configuration: configuration, settings: settings, running: false, enabled: true, initializedPlugins: [Plugin]())
254+
return System(
255+
configuration: configuration,
256+
settings: settings,
257+
running: false,
258+
enabled: true,
259+
initializedPlugins: [Plugin](),
260+
waitingPlugins: [WaitingPlugin]()
261+
)
175262
}
176263
}
177264

Sources/Segment/Waiting.swift

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
//
2+
// Waiting.swift
3+
// Segment
4+
//
5+
// Created by Brandon Sneed on 7/12/25.
6+
//
7+
import Foundation
8+
9+
public protocol WaitingPlugin: Plugin {}
10+
11+
extension Analytics {
12+
/// Pauses event processing, causing events to be queued. When processing resumes
13+
/// any queued events will be replayed to the system with their original timestamps.
14+
/// The system will forcibly resume after 30 seconds, but you should
15+
/// call `resumeEventProcessing(plugin:)` when you've completed your task.
16+
public func pauseEventProcessing(plugin: WaitingPlugin) {
17+
store.dispatch(action: System.AddWaitingPlugin(plugin: plugin))
18+
pauseEventProcessing()
19+
}
20+
21+
/// Resume event processing. Any queued events will be replayed into the system
22+
/// using their original timestamps.
23+
public func resumeEventProcessing(plugin: WaitingPlugin) {
24+
store.dispatch(action: System.RemoveWaitingPlugin(plugin: plugin))
25+
resumeEventProcessing()
26+
}
27+
}
28+
29+
extension Analytics {
30+
internal func running() -> Bool {
31+
if let system: System = store.currentState() {
32+
return system.running
33+
}
34+
// we have no state, so assume no.
35+
return false
36+
}
37+
38+
internal func pauseEventProcessing() {
39+
let running = running()
40+
// if we're already paused, ignore and leave.
41+
if !running {
42+
return
43+
}
44+
// pause processing
45+
store.dispatch(action: System.ToggleRunningAction(running: false))
46+
// if we WERE running, someone stopped us, set a timer for
47+
// 30 seconds so they can't keep the system stopped forever.
48+
startProcessingAfterTimeout()
49+
}
50+
51+
internal func resumeEventProcessing() {
52+
let running = running()
53+
// if we're already running, ignore and leave.
54+
if running {
55+
return
56+
}
57+
store.dispatch(action: System.ToggleRunningAction(running: true))
58+
}
59+
60+
internal func startProcessingAfterTimeout() {
61+
DispatchQueue.main.async { [weak self] in
62+
guard let self else { return }
63+
self.processingTimer?.cancel()
64+
self.processingTimer = DispatchWorkItem { [weak self] in
65+
self?.store.dispatch(action: System.ForceRunningAction())
66+
self?.processingTimer = nil // clean up after ourselves
67+
}
68+
if let processingTimer = self.processingTimer {
69+
DispatchQueue.main.asyncAfter(deadline: .now() + 30, execute: processingTimer)
70+
}
71+
}
72+
}
73+
}

0 commit comments

Comments
 (0)