SSE-arkitektur og AI-workloads
All Server-Sent Events (SSE) i mobile-appen kjøres gjennom én sentral mekanisme: AIWorkloadCoordinator i KMP shared-modulen. Denne guiden forklarer prinsippet, arkitekturen og hvordan du legger til nye AI-drevne features uten å finne opp hjulet på nytt.
Målgruppe: utviklere som skal tilføye eller vedlikeholde AI-baserte features i appen.
Kjerneprinsippet
heading.anchorLabelAll SSE-subscription skjer i AIWorkloadCoordinator (KMP app-scope). Views og stores skal aldri opprette egne @State Task eller launch { } for SSE-streams.
Hvorfor dette er viktig
heading.anchorLabelFør denne arkitekturen eksisterte, var SSE-håndtering spredt over view-lokale tasks i SwiftUI. Pattern-et var:
@State private var sseTask: Task<Void, Never>?
// onAppear → start task// onDisappear → cancel taskDet førte til en hel bug-kategori: hvis en bruker lukket et view mid-generation (editor, session, app backgrounding), ble SSE-en revet ned før backend fullfortere. Completion-eventet ble tapt, og views som lyttet til samme data endte i “stuck loading”-state.
Den nye arkitekturen eliminerer denne bug-klassen strukturelt. Coordinator-en lever i app-scope, overlever view navigasjon, og fullforer alltid sin workload selv om brukeren navigerer vekk.
Arkitektur-diagram
heading.anchorLabelBackend SSE ↓AIWorkloadCoordinator (KMP, app-scope CoroutineScope) ├── _activeWorkloads: Map<WorkloadKey, Job> ├── workloadProgress: StateFlow<Map<WorkloadKey, WorkloadProgress>> └── On completion → AppEventBus.emit(XReady) ↓ Stores lytter, dispatcher RefreshX intent ↓ clientX.getById() refetcher fra backend ↓ @Observable state oppdateres ↓ Views re-rendrer automatiskPrinsipper
heading.anchorLabel- App-scope SSE — ingen
onDisappear-cancel, ingen view-lifecycle-binding. - Events er triggers, ikke data-carriers — completion events inneholder kun identifier. Stores refetcher fra backend. Dette unngar state-duplisering.
- Idempotent
observeX()— safe å kalle flere ganger; coordinator sjekkerWorkloadKeyog returnererno-ophvis allerede aktiv. - DB-as-truth i backend-endpoints — SSE-endpoints sjekker DB forst for completed state, faller tilbake til queue-subscription kun hvis jobben er in-flight.
- Reconciliation ved app-foreground — coordinator re-scanner pending workloads og emitter missed completion events.
Forbudte mønstre
heading.anchorLabel| Mønster | Hvorfor |
|---|---|
@State private var sseTask: Task<Void, Never>? i views | Bryter app-scope-prinsippet; view-dismissal kanselierer SSE for tidlig |
APIService.streamX(...) direkte fra view | Samme problem; bypass-er coordinator |
AppEventBus.tryEmit() fra Swift for SSE-completion | Coordinator er eneste emitter |
onDisappear { coordinator.cancel(...) } | Aldri avbryt coordinator-observation på view-lifecycle |
| Event payload med domain data (ikke triggers) | Duplikerer state; late subscribers mister data |
Key files
heading.anchorLabelapps/mobile-kmp/shared/src/commonMain/kotlin/com/im/shared/ai/workload/AIWorkloadCoordinator.kt— selve SSE-motorenapps/mobile-kmp/shared/src/commonMain/kotlin/com/im/shared/ai/workload/WorkloadKey.kt— sealed class med unike keys per workload-typeapps/mobile-kmp/iosApp/Stores/AIWorkloadCoordinatorWrapper.swift— @Observable-bridge for SwiftUIapps/mobile-kmp/shared/src/commonMain/kotlin/com/im/shared/state/events/AppEvent.kt— alle workload events (XStarted,XReady,XError)apps/api/src/routes/impulses/enrichment-status.ts— template for backend SSE-endpoint
Legge til en ny AI-workload
heading.anchorLabelFølg denne oppskriften når du tilfoerer et nytt async AI-feature (f.eks. “deep analysis”, “personalized recommendations”, “voice transcription”). 9 steg, estimert tid 1-3 timer.
Steg 1 — Backend SSE-endpoint med DB-as-truth
heading.anchorLabelOpprett apps/api/src/routes/<domain>/<feature>-status.ts etter enrichment-status.ts som mal. Må implementere:
- DB-lookup forst — sjekk DB for completed/failed state. Hvis ferdig, send
completedevent +reply.raw.end()umiddelbart. Ingen queue-subscription nødvendig. - Queue-subscription fallback — hvis DB sier pending/processing, kall
subscribeToX(jobId, callback)fra queue-modulen. - Heartbeat —
setInterval(sendHeartbeat, HEARTBEAT_INTERVAL_MS)holder SSE-forbindelsen i live under lange AI-operasjoner. - Idempotent cleanup — bruk
cleanup()+safeEnd()pattern fraenrichment-status.tsfor å hindre double-close.
Backend må emitte disse event-typene: queued, processing, completed, error, valgfritt retrying, rejected, idle, not_found.
Steg 2 — KMP: WorkloadKey case
heading.anchorLabeldata class DeepAnalysis(val impulseId: String) : WorkloadKey() { override fun toString() = "DeepAnalysis($impulseId)"}toString()-outputet blir cache-key i workloadProgress. Hold den stabil — Swift-accessoren i wrapperen bruker strengen direkte.
Steg 3 — KMP: SSE event sealed class + parser
heading.anchorLabelsealed class DeepAnalysisSSEEvent : SSEEvent { data class Processing(val message: String) : DeepAnalysisSSEEvent() data class Completed(val type: String = "completed") : DeepAnalysisSSEEvent() data class Error(val code: String, val message: String) : DeepAnalysisSSEEvent()
override val isError: Boolean get() = this is Error override val isTerminal: Boolean get() = this is Completed || this is Error}Opprett parseren i shared/api/sse/parsers/DeepAnalysisSSEEventParser.kt etter ImpulseSSEEventParser.kt. Parse event type-felt og returner korresponderende sealed class-instans.
Steg 4 — KMP: AppEvent trigger-events
heading.anchorLabeldata class DeepAnalysisStarted(val impulseId: String) : AppEventdata class DeepAnalysisCompleted(val impulseId: String) : AppEventdata class DeepAnalysisError(val impulseId: String, val error: String) : AppEventOppdater event-matrisen i doc-kommentaren øverst i AppEvent.kt med emitter (AIWorkloadCoordinator) og listeners (stores som skal refetche).
Steg 5 — KMP: coordinator observeX-metode
heading.anchorLabelLegg til i AIWorkloadCoordinator.kt, mirror observeImpulseEnrichment:
fun observeDeepAnalysis(impulseId: String) { val key = WorkloadKey.DeepAnalysis(impulseId) if (_activeWorkloads.value.containsKey(key)) return // idempotent
_pendingWorkloads.update { it + (key to WorkloadMeta( startedAt = Clock.System.now(), impulseId = impulseId )) } updateProgress(key, WorkloadProgress.Loading("Analyzing...")) AppEventBus.tryEmit(AppEvent.DeepAnalysisStarted(impulseId))
val job = scope.launch { try { val authHeaders = apiClient.buildAuthHeaders() val sseUrl = "${apiClient.baseUrl}/api/v1/impulses/$impulseId/deep-analysis/status"
sseClient.stream(sseUrl, DeepAnalysisSSEEventParser(), authHeaders).collect { event -> when (event) { is DeepAnalysisSSEEvent.Processing -> updateProgress(key, WorkloadProgress.Loading(event.message)) is DeepAnalysisSSEEvent.Completed -> { cleanup(key) updateProgress(key, WorkloadProgress.Completed) AppEventBus.emit(AppEvent.DeepAnalysisCompleted(impulseId)) } is DeepAnalysisSSEEvent.Error -> handleDeepAnalysisError(key, impulseId, event.message) } } } catch (e: CancellationException) { throw e // KRITISK — behold cooperative cancellation } catch (e: Exception) { handleDeepAnalysisError(key, impulseId, e.message ?: "Stream failed") } }
_activeWorkloads.update { it + (key to job) }}Hvis workload-en trenger spesielle policies (timeout, idle-retry), lag en dedikert helper som runSummaryStreamWithPolicy — ikke bake det inn i generic observeSSEStream.
Steg 6 — KMP: store lytter og refetcher
heading.anchorLabelRelevant store (ImpulsesStore, SessionsStore, ny domain store) abonnerer på det nye *Completed-eventet i sin init { AppEventBus.events.collect }-blokk:
is AppEvent.DeepAnalysisCompleted -> { Logger.d("Store-Event", "DeepAnalysisCompleted(${event.impulseId.take(8)}) → RefreshImpulse") intent(ImpulsesIntent.RefreshImpulse(event.impulseId))}Store-handleren refetcher autoritativ data fra backend via impulseClient.getById(id, token) og merger inn i state. Aldri kopier data fra eventet — alltid refetch. Dette sikrer at stores alltid reflekterer DB-truth.
Steg 7 — iOS: wrapper accessor-metoder
heading.anchorLabelLegg til typede accessors i AIWorkloadCoordinatorWrapper.swift:
func observeDeepAnalysis(impulseId: String) { coordinator.observeDeepAnalysis(impulseId: impulseId)}
func deepAnalysisProgress(impulseId: String) -> WorkloadProgress? { let keyString = "DeepAnalysis(\(impulseId))" return workloadProgress[keyString]}
func isDeepAnalysisLoading(impulseId: String) -> Bool { deepAnalysisProgress(impulseId: impulseId) is WorkloadProgress.Loading}Key-strengen MÅ matche Kotlin toString()-outputet fra steg 2 eksakt.
Steg 8 — iOS: view-integrering
heading.anchorLabelstruct MyView: View { private var coordinator: AIWorkloadCoordinatorWrapper { KMPStoreProvider.shared.aiWorkloadCoordinator } private var impulsesStore: ImpulsesStoreWrapper { KMPStoreProvider.shared.impulsesStore }
// Derivert state fra store — reagerer automatisk når store refetcher private var impulse: shared.Impulse? { impulsesStore.impulses.first { $0.id == impulseId } }
// Derivert loading state fra coordinator private var isAnalyzing: Bool { coordinator.isDeepAnalysisLoading(impulseId: impulseId) }
var body: some View { Group { if isAnalyzing { ProgressView("Analyzing...") } else if let result = impulse?.deepAnalysisResult { ResultView(data: result) } } .onAppear { // Idempotent — trygt å kalle hver gang coordinator.observeDeepAnalysis(impulseId: impulseId) } // INGEN onDisappear cancel — coordinator eier lifecycle }}Steg 9 — Oppdater dokumentasjon
heading.anchorLabel- Legg workload-en til i “Migration status”-listen i
CLAUDE.mdunder “SSE Architecture Principle” - Oppdater denne guiden hvis det er nye pitfalls eller mønstre som er verdt å dele
Vanlige fallgruver
heading.anchorLabel1. Skriving til derived properties
heading.anchorLabelHvis du gjør guidance/analysisResult til computed properties, kan du ikke lenger assigne til dem (guidance = existingText vil feile compile). Fjern alle slike writes; deriveringen håndterer oppdateringer.
2. Per-session/per-impulse job-keys
heading.anchorLabelFor intents som kan kjøre parallelt for forskjellige ressurser, bruk dynamiske string-keys som "RefreshImpulse_${id}" (Any-type i manageJobs). Dette hindrer cross-resource cancellation.
3. CancellationException først
heading.anchorLabelDenne catch-grenen MÅ komme før generic catch (Exception). Uten den genererer scope-teardown (logout, store reset) falske error-logger.
4. Event-data kontaminering
heading.anchorLabelHvis eventet bærer data, to problemer:
- State dupliseres mellom event-payload og store
- Sene subscribers mister payload
Hold events som triggers; la stores refetche. Unntak: transiente UI-only progress-meldinger (Processing.message) er OK siden de ikke persisteres.
5. DB-delay før refetch
heading.anchorLabelBullMQ sitt completed-event kan fyre før worker-en har skrevet ferdig til DB. Legg til delay(500) i refetch-handleren (se HomeStore.RefreshImpulse-mønsteret).
6. View-lifecycle tasks
heading.anchorLabelAldri opprett @State Task for SSE-arbeid. Alltid bruk coordinator. Views kan ha @State Task for urelaterte lokale jobber (bildelasting, animasjoner), men ikke for SSE.
7. Backend SSE uten DB-as-truth
heading.anchorLabelHvis backend-en emitter cached state for completed jobs men frontend-en ikke håndterer “already complete”-case gracefully, får du race conditions. Returner alltid completed + reply.raw.end() umiddelbart hvis DB allerede viser terminal state.
8. WorkloadKey-streng-drift
heading.anchorLabelHvis du endrer WorkloadKey.toString()-output, bryter alle Swift-accessors stille (de bruker string matching, ingen type check). Grep etter den gamle strengen når du refaktorerer.
9. Flere views som observerer samme workload
heading.anchorLabelCoordinator er idempotent, så to views som kaller observeX samtidig oppretter én connection. Men begge views må lese state fra SAMME kilde (store) — ikke fra eventet selv. Ellers vil de divergere.
10. Timeout-policy leak
heading.anchorLabelHvis workload-en har en timeout, plasser den i coordinator-en (withTimeoutOrNull(...)). Ikke legg til timeout Task.sleep i views — det re-introduserer view-lifecycle-lossiness.
Sjekkliste
heading.anchorLabelBruk denne når du legger til en ny workload:
- Backend SSE-endpoint med DB-as-truth pattern
-
WorkloadKeycase lagt til med stabiltoString() - SSE event sealed class + parser
-
AppEventtrigger-events (Started,Completed,Error) - Coordinator
observeX-metode (idempotent,CancellationExceptionrethrow) - Store lytter på completion event + refetcher
- iOS wrapper accessor-metoder (key-streng matcher Kotlin)
- View bruker
.onAppear { observe }+ derivert state, ingen@State Task -
CLAUDE.mdmigration status oppdatert - KMP compile grønt
- Xcode build grønt
- Manuell smoke test inkludert:
- Happy path
- Lukk view mid-workload → verifiser completion propageres
- App backgrounded mid-workload → åpne igjen → reconciliation firer
- Nettverksfeil → verifiser error-state i view
Relatert dokumentasjon
heading.anchorLabel- AI Pipeline — hvordan AI-workloads kjøres i backend-queues
- AI Feilhåndtering — fallback-strategier
- AI Quality Measurement — variation + relevance scoring
- Tilstandsmaskiner — Robot3-basert state machine pattern