Providers
When a Node or Coordinator calls Request(SomeImpulse), the bus itself
doesn’t know how to fetch anything — it just routes the request. Providers
are the other half: each one is a small class bound to exactly one
DataImpulse type that knows how to produce the data it asks for. The
framework instantiates a provider on demand, wraps its output in
DataState, and tears it down when nobody’s listening.
DataImpulse
Section titled “DataImpulse”A DataImpulse<Need> is both the request definition and the type-safe link
to its result. The generic parameter declares what the caller expects back,
and subclassing as a data class makes structural equality drive request
deduplication:
data class FetchProduct(val productId: String) : DataImpulse<Product>()data class ObserveCart(val userId: String) : DataImpulse<List<CartItemWithProduct>>()Two impulses that compare equal are treated as the same request and share
the same in-flight job. FetchProduct("sku-42") fired from a product
detail screen and again from a “you might also like” carousel runs exactly
once — both callers collect the same flow.
Because Need is declared on the impulse itself, Request(FetchProduct(id))
infers its result type as Flow<DataState<Product>>. You never specify it
at the call site, and the compiler can’t let you ask for the wrong one.
Writing a provider
Section titled “Writing a provider”A provider subclasses Provider<I, Need> and overrides a single method:
produce(impulse: I). The return type is Flow<Need>. For one-shot
fetches, use flow { emit(...) }. For observations, return the underlying
flow directly.
One-shot fetch — run once and emit a single value:
@SynapseProviderclass ProductDetailProvider @Inject constructor( private val api: MarketApi,) : Provider<FetchProduct, Product>() { override fun ProviderScope.produce(impulse: FetchProduct): Flow<Product> = flow { emit(api.getProduct(impulse.productId)) }}Streaming / observation — delegate to an underlying flow:
@SynapseProviderclass CartProvider @Inject constructor( private val api: MarketApi,) : Provider<ObserveCart, List<CartItemWithProduct>>() { override fun ProviderScope.produce(impulse: ObserveCart): Flow<List<CartItemWithProduct>> = api.observeCartWithProducts(impulse.userId)}Parallel composition — fan out, await, emit once:
@SynapseProviderclass HomeProductsProvider @Inject constructor( private val api: MarketApi,) : Provider<FetchHomeProducts, HomeProducts>() { override fun ProviderScope.produce(impulse: FetchHomeProducts): Flow<HomeProducts> = flow { coroutineScope { val personalized = async { api.getPersonalizedProducts(impulse.token, 20) } val onSale = async { api.getOnSale(8) } val topRated = async { api.getTopRated(20) } emit( HomeProducts( personalized = personalized.await(), onSale = onSale.await(), topRated = topRated.await(), ) ) } }}Concrete providers are regular Kotlin classes with @Inject constructors.
Hilt constructs them, KSP registers them (more on that in a moment), and
there’s no base wiring you need to touch beyond the Provider superclass.
ProviderScope
Section titled “ ProviderScope”produce is an extension function on ProviderScope, which gives the
provider access to the rest of the bus. Like CoordinatorScope and
NodeScope, it exposes the usual capabilities — plus nested requests so
providers can depend on each other’s data.
Capabilities:
| Category | Method | Returns |
|---|---|---|
| State observation | ListenFor<O>() | SharedFlow<O> |
| Reaction observation | ReactTo<A>() | SharedFlow<A> |
| State broadcast | Broadcast(data) | suspend |
| Reaction trigger | Trigger(event) | suspend |
| Nested request | Request(impulse) | Flow<DataState<Need>> |
ProviderScope implements CoroutineScope by delegation, so launch,
async, and coroutineScope are available directly inside produce. The
backing scope is managed by the framework — it’s canceled when the
provider is disposed, so any work launched through it stops automatically.
Nested requests let one provider depend on another provider’s data:
override fun ProviderScope.produce(impulse: FetchSecureData): Flow<SecurePayload> = flow { val token = Request(FetchCachedToken()) .filterIsInstance<DataState.Success<AuthToken>>() .first() .data emit(api.fetch(impulse.endpoint, token.accessToken))}Most providers don’t need bus access at all — they take their dependencies
from DI and return a flow. Reach for ProviderScope’s capabilities only
when a provider genuinely needs to compose with other bus traffic.
@SynapseProvider and registration
Section titled “ @SynapseProvider and registration”Every concrete provider is annotated @SynapseProvider. A KSP processor
walks every annotated class at compile time, validates that:
- The class extends
Provider<I, Need>directly - The type arguments are concrete (no raw types, no type variables)
- No two providers claim the same
DataImpulsetype
…and generates a Hilt @Module in SingletonComponent that assembles a
ProviderRegistry. Each provider is injected as a javax.inject.Provider<T>
so Hilt can construct fresh instances on demand, and
ProviderRegistry.Builder wires each one to its impulse type:
// Generated — do not edit@Module@InstallIn(SingletonComponent::class)object SynapseProviderModule_App { @Provides @Singleton fun provideRegistry( productDetailProvider: javax.inject.Provider<ProductDetailProvider>, cartProvider: javax.inject.Provider<CartProvider>, // … one entry per @SynapseProvider class ): ProviderRegistry = ProviderRegistry.Builder() .register( impulseType = FetchProduct::class, needClass = Product::class.java, factory = ProviderFactory { productDetailProvider.get() }, ) .register( impulseType = ObserveCart::class, needClass = List::class.java as Class<List<CartItemWithProduct>>, factory = ProviderFactory { cartProvider.get() }, ) // … .build()}The SwitchBoard takes the ProviderRegistry as a constructor dependency,
so once Hilt has built the registry, every Request(FetchProduct(id)) call
flows through it to the right factory. You never touch ProviderRegistry
directly except in tests.
A missing provider is a compile error — KSP refuses to generate the module
if a @SynapseProvider class is malformed. In the unusual case where a
DataImpulse is dispatched at runtime with no registered handler (e.g.,
because the provider lives in a module that wasn’t included in this build),
the request surfaces NoProviderException instead of silently hanging.
The DataState lifecycle
Section titled “The DataState lifecycle”Callers don’t see raw values — every provider’s output is wrapped in
DataState<Need>, a sealed type that captures the full fetch lifecycle:
| State | When |
|---|---|
Idle | No request has been made yet. Default before activation — not emitted by providers themselves. |
Loading | Before produce emits its first value. |
Success(data) | Each value the provider emits. |
Error(cause, staleData) | Any uncaught exception. Carries the last successful value if one existed. |
The sequence is always the same:
Streaming providers stay in Success as long as they keep emitting, drop
to Error if the underlying flow throws (carrying the last successful
value forward as staleData), and recover back to Success on the next
successful emission. One-shot providers go Loading → Success or
Loading → Error and then complete.
Consumers typically read the state with dataOrNull or a when branch:
Node(initialState = ProductScreenState()) { Request(FetchProduct(productId)) { dataState -> update { it.copy(productState = dataState) } }
when (val s = state.productState) { DataState.Idle, DataState.Loading -> Spinner() is DataState.Success -> ProductView(s.data) is DataState.Error -> ErrorBanner(s.cause, stale = s.staleData) }}Deduplication and lifecycle
Section titled “Deduplication and lifecycle”When Request(FetchProduct("sku-42")) arrives at the bus, the
ProviderManager checks whether an active job already exists for that
impulse, keyed by structural equality. If one does, the existing flow is
returned — no new work is started, no new Provider instance is
constructed, and both callers see the same emissions.
If no matching job exists, the manager:
- Resolves the registered
ProviderFactoryfrom the registry. - Creates a fresh
Providerinstance via the factory. - Waits until something actually subscribes, then emits
Loading. - Runs
produce, collecting its flow, and emits each value asSuccess. - On completion (success or error), removes the job from the active table so a future request starts cleanly.
The “wait until something subscribes” step matters: a Request call does
no work until its returned flow is actually collected. And because the
underlying flow retains its most recent DataState, a subscriber that
arrives after the first emission still sees the latest value immediately.
The provider instance itself is throwaway. It’s created when the job starts and garbage-collected when the job ends. There’s no long-lived provider you can hold a reference to — that’s intentional. Any state that needs to outlive a single request belongs in the injected dependencies (a cache, a DAO, a repository), not in the provider.
The full internal path from a Request call to the emitted
Flow<DataState<Need>> — including dedup, concurrency gating, factory
resolution, the first-subscriber wait, collection, and cleanup — looks
like this:
Most of that machinery is invisible to callers — the diagram exists so
that when you do need to reason about dedup, cleanup ordering, or the
“last successful value” stash on DataState.Error, the mental model
matches what’s actually happening.
Freshness windows
Section titled “Freshness windows”A one-shot provider’s job is removed from activeJobs the moment its
last collector leaves. The next Request(FetchProduct(id))
runs produce from scratch, even if the data was fetched thirty seconds
ago — on a screen that remounts often (back navigation, tab switching,
bottom-nav), the same endpoint fires on every visit.
Bus dedup only covers overlapping requests — two callers firing the
same impulse while the fetch is in flight share one job. Once that job
completes, the next Request starts from scratch; a
Provider never caches its own results. For data that’s
safe to reuse for a bounded window — a user profile, a feature config,
a catalog page — use FreshnessRegistry to mint a stable token for
that window and key a cache by it.
This is an opt-in convention, not a framework concern. For cheap, idempotent, infrequent requests, skip it; the indirection earns nothing. Reach for it when remount traffic would otherwise refire the same fetch.
FreshnessKey and FreshnessRegistry
Section titled “FreshnessKey and FreshnessRegistry”A FreshnessKey binds a logical cache entry to how long it stays fresh:
data class ProductBucket(val productId: String) : FreshnessKey { override val duration: Duration = 5.minutes}Hold a FreshnessRegistry<ProductBucket> as a singleton. Every call to
get(key) within the same 5-minute window returns the identical
FreshnessToken; the next call after the window opens a new bucket and
issues a new token.
@Module@InstallIn(SingletonComponent::class)object FreshnessModule { @Provides @Singleton fun productFreshness() = FreshnessRegistry<ProductBucket>(maxSize = 500)}maxSize caps the registry for user-generated key spaces. When the map
is full, inserting a new key evicts the bucket nearest to expiring — a
min-heap keeps that lookup O(log n) regardless of how mixed the
durations are.
Keying a cache by token
Section titled “Keying a cache by token”The token is deliberately opaque. Its only contract is stable within a window, different across windows, which makes it a safe component of a cache key in whatever storage the provider sits in front of:
@SynapseProviderclass ProductDetailProvider @Inject constructor( private val api: MarketApi, private val freshness: FreshnessRegistry<ProductBucket>, private val cache: ProductCache,) : Provider<FetchProduct, Product>() { override fun ProviderScope.produce(impulse: FetchProduct): Flow<Product> = flow { val token = freshness.get(ProductBucket(impulse.productId)) emit(cache.getOrPut(impulse.productId, token) { api.getProduct(impulse.productId) }) }}Where ProductCache is a thin memory map:
@Singletonclass ProductCache @Inject constructor() { private val entries = ConcurrentHashMap<Pair<String, FreshnessToken>, Product>() inline fun getOrPut( id: String, token: FreshnessToken, load: () -> Product, ): Product = entries.getOrPut(id to token, load) fun put(id: String, token: FreshnessToken, product: Product) { entries[id to token] = product }}Cache hits and misses ride on FreshnessToken equality — a primitive
Long compare, since FreshnessToken is a @JvmInline value class.
Every new bucket gets a Long the registry has never used before, so
keys that were equal moments ago compare unequal the instant the window
rolls over. Within the window every fetch hits the cache; once the
window rolls over, the (id, token) pair misses, and api.getProduct
runs once — its result then serves the next window.
Bus dedup compounds this: if three callers simultaneously request the
same product on a cold cache, they share a single produce run and only
one api.getProduct call is made.
Choosing a duration
Section titled “Choosing a duration”Pick the duration from how stale the user will tolerate the data being, not from how expensive the fetch is:
- Seconds for things that should feel live (notification counts, presence indicators).
- Minutes for things that are effectively static within a session (catalog entries, profile).
- Hours for things that only change on explicit user action (feature flags, saved preferences).
Calls fired more often than the duration coalesce onto the cache; calls fired less often always refetch, so a long duration on an infrequent request buys nothing.
One registry per key type, or one per scope?
Section titled “One registry per key type, or one per scope?”Per-type is the right default. Each registry is cheap — a HashMap, a
min-heap, and a counter — and splitting gives you an independent
maxSize budget, targeted clear(), and type-safe DI. Mixing unrelated
keys in a single pool lets a burst of product lookups evict your
feature-flag buckets, because the heap evicts by expiry and is
indifferent to which concern each key serves.
Merge registries when a set of key types share both a duration profile
and an invalidation trigger — everything user-scoped that should drop on
sign-out, say. Define a sealed parent and let FreshnessRegistry take
the parent type:
sealed interface UserScopedKey : FreshnessKey { data class Profile(val userId: String) : UserScopedKey { override val duration = 5.minutes } data class Cart(val userId: String) : UserScopedKey { override val duration = 30.seconds } data class Orders(val userId: String) : UserScopedKey { override val duration = 2.minutes }}
// One registry, one clear() call on sign-out.FreshnessRegistry<UserScopedKey>(maxSize = 1_000)Past that, an app-wide registry tends to blur eviction and invalidation semantics without saving much.
Escape hatches
Section titled “Escape hatches”A cache serves the same value for the entire window — wrong behavior when the user asks for fresh data, or when something upstream just changed and you know the cache is stale. Three hatches cover the common cases.
Per-request override. Add a refresh flag to the impulse and let
the provider skip the cache read when it’s set:
data class FetchProduct( val productId: String, val refresh: Boolean = false,) : DataImpulse<Product>()
override fun ProviderScope.produce(impulse: FetchProduct): Flow<Product> = flow { val token = freshness.get(ProductBucket(impulse.productId)) val product = if (impulse.refresh) { api.getProduct(impulse.productId).also { cache.put(impulse.productId, token, it) } } else { cache.getOrPut(impulse.productId, token) { api.getProduct(impulse.productId) } } emit(product)}Because refresh is part of the impulse, FetchProduct(id, refresh = true)
is structurally not equal to a passive FetchProduct(id) — the two
won’t dedup, which is the behavior you want for pull-to-refresh: the
user’s explicit refresh shouldn’t piggyback on an in-flight background
fetch. The refresh result lands in the cache under the current token,
so the next passive caller reads it.
Targeted invalidation. After a write succeeds, the cached entry is
stale but the next reader can refetch. Expose an invalidate hook on the
cache and call it from wherever the mutation is handled — a Reaction
handler for ProductUpdated, say:
@Singletonclass ProductCache @Inject constructor() { private val entries = ConcurrentHashMap<Pair<String, FreshnessToken>, Product>() fun invalidate(productId: String) = entries.keys.removeAll { it.first == productId } // getOrPut, put as before}The next Request(FetchProduct(id)) misses the cache,
refetches, and the refreshed value serves readers for the rest of the
window.
Wholesale clear. FreshnessRegistry.clear() drops every bucket
regardless of expiry, and clearing the downstream cache alongside it
resets every entry. Appropriate for sign-out, tenant switch, or any
transition that should invalidate every cached view at once — heavy for
anything smaller.
Testing with an injected clock
Section titled “Testing with an injected clock”FreshnessRegistry’s first constructor parameter is a
TimeSource.WithComparableMarks, defaulting to TimeSource.Monotonic.
Hand it a kotlin.time.TestTimeSource in tests and advance the clock
manually to exercise window boundaries without wall-clock sleeps:
@Testfun `token rotates once the window elapses`() { val time = TestTimeSource() val registry = FreshnessRegistry<ProductBucket>(timeSource = time) val key = ProductBucket("sku-42")
val first = registry.get(key) time += 4.minutes assertEquals(first, registry.get(key)) // same window
time += 2.minutes assertNotEquals(first, registry.get(key)) // rolled over}For provider-level tests that go through SynapseTestRule, construct
the FreshnessRegistry in the test’s Hilt replacement module with a
TestTimeSource and drive it the same way — the provider sees the same
token semantics production code does, just on a clock you control.
Testing
Section titled “Testing”The arch-test module ships a SynapseTestRule that stands up a real
SwitchBoard for a test and lets you stub providers inline via a small
DSL. You don’t hand-build a ProviderRegistry or uninstall the generated
Hilt module — the rule does it for you:
@get:Ruleval synapse = SynapseTestRule { // Single-value provider — wraps the returned value in a one-shot flow provide<List<Address>, FetchAddresses> { testAddresses }
// Emits nothing — returning null produces an empty flow provide<AuthToken, FetchCachedToken> { null }
// Streaming / multi-emission provider provideFlow<List<Product>, FetchProducts> { impulse -> flow { emit(cachedProducts) delay(100) emit(freshProducts) } }}Inside the test, synapse.switchBoard is a fully wired switchboard with
those providers registered. For Compose screens, hand it to the
composition via LocalSwitchBoard:
composeTestRule.setContent { CompositionLocalProvider(LocalSwitchBoard provides synapse.switchBoard) { CreateContext(appContext) { CheckoutScreen() } }}Only the impulses the test actually exercises need to be registered. The
Testing page covers the full rule surface — capture
helpers, coordinator setup, and runTest integration.
- Interceptors — cross-cutting concerns at every channel