Skip to content

Providers

When a Node or Coordinator calls Request(SomeImpulse), the bus itself doesn’t know how to fetch anything — it just routes the request. Providers are the other half: each one is a small class bound to exactly one DataImpulse type that knows how to produce the data it asks for. The framework instantiates a provider on demand, wraps its output in DataState, and tears it down when nobody’s listening.

A DataImpulse<Need> is both the request definition and the type-safe link to its result. The generic parameter declares what the caller expects back, and subclassing as a data class makes structural equality drive request deduplication:

data class FetchProduct(val productId: String) : DataImpulse<Product>()
data class ObserveCart(val userId: String) : DataImpulse<List<CartItemWithProduct>>()

Two impulses that compare equal are treated as the same request and share the same in-flight job. FetchProduct("sku-42") fired from a product detail screen and again from a “you might also like” carousel runs exactly once — both callers collect the same flow.

Because Need is declared on the impulse itself, Request(FetchProduct(id)) infers its result type as Flow<DataState<Product>>. You never specify it at the call site, and the compiler can’t let you ask for the wrong one.

A provider subclasses Provider<I, Need> and overrides a single method: produce(impulse: I). The return type is Flow<Need>. For one-shot fetches, use flow { emit(...) }. For observations, return the underlying flow directly.

One-shot fetch — run once and emit a single value:

@SynapseProvider
class ProductDetailProvider @Inject constructor(
private val api: MarketApi,
) : Provider<FetchProduct, Product>() {
override fun ProviderScope.produce(impulse: FetchProduct): Flow<Product> = flow {
emit(api.getProduct(impulse.productId))
}
}

Streaming / observation — delegate to an underlying flow:

@SynapseProvider
class CartProvider @Inject constructor(
private val api: MarketApi,
) : Provider<ObserveCart, List<CartItemWithProduct>>() {
override fun ProviderScope.produce(impulse: ObserveCart): Flow<List<CartItemWithProduct>> =
api.observeCartWithProducts(impulse.userId)
}

Parallel composition — fan out, await, emit once:

@SynapseProvider
class HomeProductsProvider @Inject constructor(
private val api: MarketApi,
) : Provider<FetchHomeProducts, HomeProducts>() {
override fun ProviderScope.produce(impulse: FetchHomeProducts): Flow<HomeProducts> = flow {
coroutineScope {
val personalized = async { api.getPersonalizedProducts(impulse.token, 20) }
val onSale = async { api.getOnSale(8) }
val topRated = async { api.getTopRated(20) }
emit(
HomeProducts(
personalized = personalized.await(),
onSale = onSale.await(),
topRated = topRated.await(),
)
)
}
}
}

Concrete providers are regular Kotlin classes with @Inject constructors. Hilt constructs them, KSP registers them (more on that in a moment), and there’s no base wiring you need to touch beyond the Provider superclass.

produce is an extension function on ProviderScope, which gives the provider access to the rest of the bus. Like CoordinatorScope and NodeScope, it exposes the usual capabilities — plus nested requests so providers can depend on each other’s data.

Capabilities:

CategoryMethodReturns
State observation ListenFor<O>()SharedFlow<O>
Reaction observation ReactTo<A>()SharedFlow<A>
State broadcast Broadcast(data)suspend
Reaction trigger Trigger(event)suspend
Nested request Request(impulse)Flow<DataState<Need>>

ProviderScope implements CoroutineScope by delegation, so launch, async, and coroutineScope are available directly inside produce. The backing scope is managed by the framework — it’s canceled when the provider is disposed, so any work launched through it stops automatically.

Nested requests let one provider depend on another provider’s data:

override fun ProviderScope.produce(impulse: FetchSecureData): Flow<SecurePayload> = flow {
val token = Request(FetchCachedToken())
.filterIsInstance<DataState.Success<AuthToken>>()
.first()
.data
emit(api.fetch(impulse.endpoint, token.accessToken))
}

Most providers don’t need bus access at all — they take their dependencies from DI and return a flow. Reach for ProviderScope’s capabilities only when a provider genuinely needs to compose with other bus traffic.

Every concrete provider is annotated @SynapseProvider. A KSP processor walks every annotated class at compile time, validates that:

  • The class extends Provider<I, Need> directly
  • The type arguments are concrete (no raw types, no type variables)
  • No two providers claim the same DataImpulse type

…and generates a Hilt @Module in SingletonComponent that assembles a ProviderRegistry. Each provider is injected as a javax.inject.Provider<T> so Hilt can construct fresh instances on demand, and ProviderRegistry.Builder wires each one to its impulse type:

// Generated — do not edit
@Module
@InstallIn(SingletonComponent::class)
object SynapseProviderModule_App {
@Provides @Singleton
fun provideRegistry(
productDetailProvider: javax.inject.Provider<ProductDetailProvider>,
cartProvider: javax.inject.Provider<CartProvider>,
// … one entry per @SynapseProvider class
): ProviderRegistry = ProviderRegistry.Builder()
.register(
impulseType = FetchProduct::class,
needClass = Product::class.java,
factory = ProviderFactory { productDetailProvider.get() },
)
.register(
impulseType = ObserveCart::class,
needClass = List::class.java as Class<List<CartItemWithProduct>>,
factory = ProviderFactory { cartProvider.get() },
)
// …
.build()
}

The SwitchBoard takes the ProviderRegistry as a constructor dependency, so once Hilt has built the registry, every Request(FetchProduct(id)) call flows through it to the right factory. You never touch ProviderRegistry directly except in tests.

A missing provider is a compile error — KSP refuses to generate the module if a @SynapseProvider class is malformed. In the unusual case where a DataImpulse is dispatched at runtime with no registered handler (e.g., because the provider lives in a module that wasn’t included in this build), the request surfaces NoProviderException instead of silently hanging.

Callers don’t see raw values — every provider’s output is wrapped in DataState<Need>, a sealed type that captures the full fetch lifecycle:

StateWhen
IdleNo request has been made yet. Default before activation — not emitted by providers themselves.
LoadingBefore produce emits its first value.
Success(data)Each value the provider emits.
Error(cause, staleData)Any uncaught exception. Carries the last successful value if one existed.

The sequence is always the same:

Streaming providers stay in Success as long as they keep emitting, drop to Error if the underlying flow throws (carrying the last successful value forward as staleData), and recover back to Success on the next successful emission. One-shot providers go Loading → Success or Loading → Error and then complete.

Consumers typically read the state with dataOrNull or a when branch:

Node(initialState = ProductScreenState()) {
Request(FetchProduct(productId)) { dataState ->
update { it.copy(productState = dataState) }
}
when (val s = state.productState) {
DataState.Idle, DataState.Loading -> Spinner()
is DataState.Success -> ProductView(s.data)
is DataState.Error -> ErrorBanner(s.cause, stale = s.staleData)
}
}

When Request(FetchProduct("sku-42")) arrives at the bus, the ProviderManager checks whether an active job already exists for that impulse, keyed by structural equality. If one does, the existing flow is returned — no new work is started, no new Provider instance is constructed, and both callers see the same emissions.

If no matching job exists, the manager:

  1. Resolves the registered ProviderFactory from the registry.
  2. Creates a fresh Provider instance via the factory.
  3. Waits until something actually subscribes, then emits Loading.
  4. Runs produce, collecting its flow, and emits each value as Success.
  5. On completion (success or error), removes the job from the active table so a future request starts cleanly.

The “wait until something subscribes” step matters: a Request call does no work until its returned flow is actually collected. And because the underlying flow retains its most recent DataState, a subscriber that arrives after the first emission still sees the latest value immediately.

The provider instance itself is throwaway. It’s created when the job starts and garbage-collected when the job ends. There’s no long-lived provider you can hold a reference to — that’s intentional. Any state that needs to outlive a single request belongs in the injected dependencies (a cache, a DAO, a repository), not in the provider.

The full internal path from a Request call to the emitted Flow<DataState<Need>> — including dedup, concurrency gating, factory resolution, the first-subscriber wait, collection, and cleanup — looks like this:

Most of that machinery is invisible to callers — the diagram exists so that when you do need to reason about dedup, cleanup ordering, or the “last successful value” stash on DataState.Error, the mental model matches what’s actually happening.

A one-shot provider’s job is removed from activeJobs the moment its last collector leaves. The next Request(FetchProduct(id)) runs produce from scratch, even if the data was fetched thirty seconds ago — on a screen that remounts often (back navigation, tab switching, bottom-nav), the same endpoint fires on every visit.

Bus dedup only covers overlapping requests — two callers firing the same impulse while the fetch is in flight share one job. Once that job completes, the next Request starts from scratch; a Provider never caches its own results. For data that’s safe to reuse for a bounded window — a user profile, a feature config, a catalog page — use FreshnessRegistry to mint a stable token for that window and key a cache by it.

This is an opt-in convention, not a framework concern. For cheap, idempotent, infrequent requests, skip it; the indirection earns nothing. Reach for it when remount traffic would otherwise refire the same fetch.

A FreshnessKey binds a logical cache entry to how long it stays fresh:

data class ProductBucket(val productId: String) : FreshnessKey {
override val duration: Duration = 5.minutes
}

Hold a FreshnessRegistry<ProductBucket> as a singleton. Every call to get(key) within the same 5-minute window returns the identical FreshnessToken; the next call after the window opens a new bucket and issues a new token.

@Module
@InstallIn(SingletonComponent::class)
object FreshnessModule {
@Provides @Singleton
fun productFreshness() = FreshnessRegistry<ProductBucket>(maxSize = 500)
}

maxSize caps the registry for user-generated key spaces. When the map is full, inserting a new key evicts the bucket nearest to expiring — a min-heap keeps that lookup O(log n) regardless of how mixed the durations are.

The token is deliberately opaque. Its only contract is stable within a window, different across windows, which makes it a safe component of a cache key in whatever storage the provider sits in front of:

@SynapseProvider
class ProductDetailProvider @Inject constructor(
private val api: MarketApi,
private val freshness: FreshnessRegistry<ProductBucket>,
private val cache: ProductCache,
) : Provider<FetchProduct, Product>() {
override fun ProviderScope.produce(impulse: FetchProduct): Flow<Product> = flow {
val token = freshness.get(ProductBucket(impulse.productId))
emit(cache.getOrPut(impulse.productId, token) { api.getProduct(impulse.productId) })
}
}

Where ProductCache is a thin memory map:

@Singleton
class ProductCache @Inject constructor() {
private val entries = ConcurrentHashMap<Pair<String, FreshnessToken>, Product>()
inline fun getOrPut(
id: String,
token: FreshnessToken,
load: () -> Product,
): Product = entries.getOrPut(id to token, load)
fun put(id: String, token: FreshnessToken, product: Product) { entries[id to token] = product }
}

Cache hits and misses ride on FreshnessToken equality — a primitive Long compare, since FreshnessToken is a @JvmInline value class. Every new bucket gets a Long the registry has never used before, so keys that were equal moments ago compare unequal the instant the window rolls over. Within the window every fetch hits the cache; once the window rolls over, the (id, token) pair misses, and api.getProduct runs once — its result then serves the next window.

Bus dedup compounds this: if three callers simultaneously request the same product on a cold cache, they share a single produce run and only one api.getProduct call is made.

Pick the duration from how stale the user will tolerate the data being, not from how expensive the fetch is:

  • Seconds for things that should feel live (notification counts, presence indicators).
  • Minutes for things that are effectively static within a session (catalog entries, profile).
  • Hours for things that only change on explicit user action (feature flags, saved preferences).

Calls fired more often than the duration coalesce onto the cache; calls fired less often always refetch, so a long duration on an infrequent request buys nothing.

One registry per key type, or one per scope?

Section titled “One registry per key type, or one per scope?”

Per-type is the right default. Each registry is cheap — a HashMap, a min-heap, and a counter — and splitting gives you an independent maxSize budget, targeted clear(), and type-safe DI. Mixing unrelated keys in a single pool lets a burst of product lookups evict your feature-flag buckets, because the heap evicts by expiry and is indifferent to which concern each key serves.

Merge registries when a set of key types share both a duration profile and an invalidation trigger — everything user-scoped that should drop on sign-out, say. Define a sealed parent and let FreshnessRegistry take the parent type:

sealed interface UserScopedKey : FreshnessKey {
data class Profile(val userId: String) : UserScopedKey { override val duration = 5.minutes }
data class Cart(val userId: String) : UserScopedKey { override val duration = 30.seconds }
data class Orders(val userId: String) : UserScopedKey { override val duration = 2.minutes }
}
// One registry, one clear() call on sign-out.
FreshnessRegistry<UserScopedKey>(maxSize = 1_000)

Past that, an app-wide registry tends to blur eviction and invalidation semantics without saving much.

A cache serves the same value for the entire window — wrong behavior when the user asks for fresh data, or when something upstream just changed and you know the cache is stale. Three hatches cover the common cases.

Per-request override. Add a refresh flag to the impulse and let the provider skip the cache read when it’s set:

data class FetchProduct(
val productId: String,
val refresh: Boolean = false,
) : DataImpulse<Product>()
override fun ProviderScope.produce(impulse: FetchProduct): Flow<Product> = flow {
val token = freshness.get(ProductBucket(impulse.productId))
val product = if (impulse.refresh) {
api.getProduct(impulse.productId).also { cache.put(impulse.productId, token, it) }
} else {
cache.getOrPut(impulse.productId, token) { api.getProduct(impulse.productId) }
}
emit(product)
}

Because refresh is part of the impulse, FetchProduct(id, refresh = true) is structurally not equal to a passive FetchProduct(id) — the two won’t dedup, which is the behavior you want for pull-to-refresh: the user’s explicit refresh shouldn’t piggyback on an in-flight background fetch. The refresh result lands in the cache under the current token, so the next passive caller reads it.

Targeted invalidation. After a write succeeds, the cached entry is stale but the next reader can refetch. Expose an invalidate hook on the cache and call it from wherever the mutation is handled — a Reaction handler for ProductUpdated, say:

@Singleton
class ProductCache @Inject constructor() {
private val entries = ConcurrentHashMap<Pair<String, FreshnessToken>, Product>()
fun invalidate(productId: String) = entries.keys.removeAll { it.first == productId }
// getOrPut, put as before
}

The next Request(FetchProduct(id)) misses the cache, refetches, and the refreshed value serves readers for the rest of the window.

Wholesale clear. FreshnessRegistry.clear() drops every bucket regardless of expiry, and clearing the downstream cache alongside it resets every entry. Appropriate for sign-out, tenant switch, or any transition that should invalidate every cached view at once — heavy for anything smaller.

FreshnessRegistry’s first constructor parameter is a TimeSource.WithComparableMarks, defaulting to TimeSource.Monotonic. Hand it a kotlin.time.TestTimeSource in tests and advance the clock manually to exercise window boundaries without wall-clock sleeps:

@Test
fun `token rotates once the window elapses`() {
val time = TestTimeSource()
val registry = FreshnessRegistry<ProductBucket>(timeSource = time)
val key = ProductBucket("sku-42")
val first = registry.get(key)
time += 4.minutes
assertEquals(first, registry.get(key)) // same window
time += 2.minutes
assertNotEquals(first, registry.get(key)) // rolled over
}

For provider-level tests that go through SynapseTestRule, construct the FreshnessRegistry in the test’s Hilt replacement module with a TestTimeSource and drive it the same way — the provider sees the same token semantics production code does, just on a clock you control.

The arch-test module ships a SynapseTestRule that stands up a real SwitchBoard for a test and lets you stub providers inline via a small DSL. You don’t hand-build a ProviderRegistry or uninstall the generated Hilt module — the rule does it for you:

@get:Rule
val synapse = SynapseTestRule {
// Single-value provider — wraps the returned value in a one-shot flow
provide<List<Address>, FetchAddresses> { testAddresses }
// Emits nothing — returning null produces an empty flow
provide<AuthToken, FetchCachedToken> { null }
// Streaming / multi-emission provider
provideFlow<List<Product>, FetchProducts> { impulse ->
flow {
emit(cachedProducts)
delay(100)
emit(freshProducts)
}
}
}

Inside the test, synapse.switchBoard is a fully wired switchboard with those providers registered. For Compose screens, hand it to the composition via LocalSwitchBoard:

composeTestRule.setContent {
CompositionLocalProvider(LocalSwitchBoard provides synapse.switchBoard) {
CreateContext(appContext) { CheckoutScreen() }
}
}

Only the impulses the test actually exercises need to be registered. The Testing page covers the full rule surface — capture helpers, coordinator setup, and runTest integration.