Skip to content

Conversation

marcog83
Copy link

@marcog83 marcog83 commented Aug 5, 2025

Replace maxChunks parameter with flexible reducer function that delegates data aggregation to consumer code. This provides full control over how streamed chunks are combined into the final data structure.

Add support for custom placeholderData that works seamlessly with the reducer function, allowing initialization of complex data types beyond simple arrays.

#9065

BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery. Use a custom reducer function to control data aggregation behavior instead.

Summary by CodeRabbit

  • New Features

    • Streamed queries now support a reducer-based API for combining chunks into final data.
    • Optional initialValue lets you seed data before the first chunk; required when using a custom reducer.
    • Default behavior continues to append chunks to an array for backward-friendly results.
  • Documentation

    • Updated guidance to describe the reducer-based model and initialValue usage.
    • Removed references to the deprecated maxChunks option and its behavior.

Replace maxChunks parameter with flexible reducer function that delegates
data aggregation to consumer code. This provides full control over how
streamed chunks are combined into the final data structure.

Add support for custom placeholderData that works seamlessly with the
reducer function, allowing initialization of complex data types beyond
simple arrays.

TanStack#9065

BREAKING CHANGE: The maxChunks parameter has been removed from streamedQuery.
Use a custom reducer function to control data aggregation behavior instead.
@github-actions github-actions bot added documentation Improvements or additions to documentation package: query-core labels Aug 5, 2025
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: () => createAsyncNumberGenerator(2),
reducer: (acc, chunk) => ({
...acc,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the first time reducer is invoked, acc is an empty array, because placeholderData is not specified.
it seems a bit strange to me. but i'm not sure how to solve it.
i can apply the same logic as in native reduce...

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/reduce#initialvalue.

@@ -1,6 +1,15 @@
import { addToEnd } from './utils'
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'

type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this type just for clarity. should i keep it inline/move to a dedicated file or in types?
About type safety i tried to split the type in 2 like below, but i feel it is over-engineering.

type BaseStreamedQueryParams<TQueryFnData, TData> = {
   queryFn: (
    context: QueryFunctionContext<TQueryKey>,
  ) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
  refetchMode?: 'append' | 'reset' | 'replace'
}
type SimpleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TData> & {

  reducer?: never;
  placeholderData?: never;
}

type ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TData> & {
  
  reducer?: (acc: TData, chunk: TQueryFnData) => TData
  placeholderData?: TData
}
type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> =
  | SimpleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
  | ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>

export function streamedQuery<
  TQueryFnData = unknown,
  TData = Array<TQueryFnData>,
  TQueryKey extends QueryKey = QueryKey,
>(params: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<TData, TQueryKey> {

  const reducer =
    'reducer' in params && typeof params.reducer === 'function'
      ? params.reducer
      : (items: TData, chunk: TQueryFnData) =>
          addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData;

  const placeholderData =
    'placeholderData' in params ? params.placeholderData : ([] as TData);
// ...

maxChunks?: number
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
reducer = (items, chunk) => addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData,
placeholderData = [] as TData,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i introduced this placeholderData prop to mimic native reduce with an initialValue.
My first approach was to use the existing placeholderData parameter from the Observer, but even though I can access it through the query.options object, TypeScript throws an error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below you can find an implementation about how to mimic Array.prototype.reduce behavior where the initial value is optional, and if not provided, the first element becomes the accumulator. I didn't applied it to the PR because i think it better to have your point of of view about it.

// ...existing code...
return async (context) => {
  const query = context.client
    .getQueryCache()
    .find({ queryKey: context.queryKey, exact: true })
  const isRefetch = !!query && query.state.data !== undefined
  if (isRefetch && params.refetchMode === 'reset') {
    query.setState({
      status: 'pending',
      data: undefined,
      error: null,
      fetchStatus: 'fetching',
    })
  }

  const hasInitialValue = 'placeholderData' in params;
  let result: TData;
  let isFirstChunk = true;

  const stream = await params.queryFn(context)

  for await (const chunk of stream) {
    if (context.signal.aborted) {
      break
    }
    
    if (isFirstChunk) {
      if (hasInitialValue) {
        // If we have placeholderData, use it as initial accumulator
        result = reducer(placeholderData, chunk);
      } else {
        // If no placeholderData, first chunk becomes the accumulator
        result = chunk as unknown as TData;
      }
      isFirstChunk = false;
    } else {
      result = reducer(result, chunk);
    }
    
    // don't append to the cache directly when replace-refetching
    if (!isRefetch || params.refetchMode !== 'replace') {
      context.client.setQueryData<TData>(
        context.queryKey,
        (prev) => {
          if (prev === undefined) {
            return result;
          }
          return hasInitialValue ? reducer(prev, chunk) : result;
        }
      )
    }
  }

  // Handle empty stream case
  if (isFirstChunk) {
    if (hasInitialValue) {
      result = placeholderData;
    } else {
      throw new Error('Reduce of empty stream with no initial value');
    }
  }

  // finalize result: replace-refetching needs to write to the cache
  if (isRefetch && params.refetchMode === 'replace' && !context.signal.aborted) {
    context.client.setQueryData<TData>(context.queryKey, result)
  }

  return context.client.getQueryData(context.queryKey)!
}

The key changes:

  • Initial value logic: Only use placeholderData if it's explicitly provided
  • First chunk handling: If no placeholderData the first chunk becomes the initial accumulator
  • Empty stream error: Throw an error if the stream is empty and no placeholderData is provided

Copy link
Collaborator

@TkDodo TkDodo Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we’d want here is a mandatory initialValue to the reducer if you pass a custom reducer. The term placeholderData is a bit overloaded because here, it does something different than on useQuery itself so I wouldn’t name it like that.

It could be two separate params, like reducer and initialValue, but we’d have to make the types so that initialValue is required when you pass a reducer. That’s possible but usually needs overloads or conditional types. Or, we could also just use a tuple maybe:

reducer: [initialValue: TData, (accumulator: TData, chunk: TQueryFnData) => TData]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great.thanks for the feedback. I'm on vacation right now,but next week I'll back. I'm going to provide a prop initial value, and I'll play with TS in order to make it mandatory when reducer is specified.

Copy link

nx-cloud bot commented Aug 11, 2025

View your CI Pipeline Execution ↗ for commit 890e373

Command Status Duration Result
nx affected --targets=test:sherif,test:knip,tes... ✅ Succeeded 24s View ↗

☁️ Nx Cloud last updated this comment at 2025-08-11 12:32:14 UTC

autofix-ci bot and others added 4 commits August 11, 2025 12:31
…treamedQuery

Add type safety by making initialValue mandatory when providing a custom
reducer function. This prevents runtime errors and ensures proper data
initialization for custom data structures beyond simple arrays.

Use conditional types to enforce the relationship between reducer and
initialValue parameters, maintaining backward compatibility for simple
array-based streaming while requiring explicit initialization for
custom reducers.

BREAKING CHANGE: When using a custom reducer function with streamedQuery,
the initialValue parameter is now required and must be provided.
…treamedQuery

Add type safety by making initialValue mandatory when providing a custom
reducer function. This prevents runtime errors and ensures proper data
initialization for custom data structures beyond simple arrays.

Use conditional types to enforce the relationship between reducer and
initialValue parameters, maintaining backward compatibility for simple
array-based streaming while requiring explicit initialization for
custom reducers.

BREAKING CHANGE: When using a custom reducer function with streamedQuery,
the initialValue parameter is now required and must be provided.
…treamedQuery

Add type safety by making initialValue mandatory when providing a custom
reducer function. This prevents runtime errors and ensures proper data
initialization for custom data structures beyond simple arrays.

Use conditional types to enforce the relationship between reducer and
initialValue parameters, maintaining backward compatibility for simple
array-based streaming while requiring explicit initialization for
custom reducers.

BREAKING CHANGE: When using a custom reducer function with streamedQuery,
the initialValue parameter is now required and must be provided.
Copy link

coderabbitai bot commented Aug 27, 2025

Walkthrough

Shifts streamedQuery from a maxChunks-based array model to a reducer-driven accumulation API with an optional initialValue. Updates function signature, types, caching logic, and tests. Documentation reflects the new reducer and initialValue options; prior maxChunks guidance is removed.

Changes

Cohort / File(s) Summary
Docs: streamedQuery API
docs/reference/streamedQuery.md
Replaced maxChunks with reducer and initialValue options; updated narrative to describe reducer-based accumulation and default append behavior; removed eviction semantics.
Core Implementation: reducer-based streaming
packages/query-core/src/streamedQuery.ts
Refactored API to support TData accumulator via reducer(initial, chunk) with default append reducer and empty-array initialValue; removed maxChunks logic; introduced parameter union types (simple vs reducible); updated return type to TData and cache updates to use reducer.
Tests: updated for new API
packages/query-core/src/__tests__/streamedQuery.test.tsx
Rewrote tests to use custom reducer and initialValue; adjusted generator output, timing, expectations, and test names to reflect accumulator object instead of arrays; maintained refetch flow assertions with new data shape.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor Caller
  participant SQ as streamedQuery(params)
  participant QF as queryFn (async generator)
  participant Red as reducer(acc, chunk)
  participant Cache as Query Cache

  Caller->>SQ: invoke streamedQuery({ queryFn, reducer?, initialValue? })
  SQ->>Cache: seed in-memory acc = initialValue (default [])
  SQ->>QF: start generator
  loop For each chunk
    QF-->>SQ: yield chunk
    SQ->>Red: reduce(acc, chunk)
    Red-->>SQ: newAcc
    SQ->>Cache: setQueryData(prev => reduce(prev ?? initialValue, chunk))
  end
  QF-->>SQ: complete
  SQ->>Cache: finalize with accumulated TData
  SQ-->>Caller: returns QueryFunction<TData>
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

I hop through streams where chunks once grew,
Now fold by fold, the data’s new—
A reducer’s dance, initial cheer,
We gather truth as chunks appear.
No max to count, just tidy art,
Combining bites into one smart heart. 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (2)
packages/query-core/src/streamedQuery.ts (1)

42-49: Type-safety hole: callers can specify TData without a reducer. Add overloads to prevent mismatches.

As written, a caller can do streamedQuery<number, Record<string, boolean>>({ queryFn }) with no reducer. Types accept it, but runtime returns an Array while types claim a Record. Add overloads so TData is only specifiable when reducer is provided; otherwise TData is fixed to Array.

+// Overloads to prevent specifying a custom TData without a reducer
+export function streamedQuery<
+  TQueryFnData = unknown,
+  TQueryKey extends QueryKey = QueryKey,
+>(
+  params: SimpleStreamedQueryParams<TQueryFnData, TQueryKey>
+): QueryFunction<Array<TQueryFnData>, TQueryKey>
+
+export function streamedQuery<
+  TQueryFnData = unknown,
+  TData = Array<TQueryFnData>,
+  TQueryKey extends QueryKey = QueryKey,
+>(
+  params: ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
+): QueryFunction<TData, TQueryKey>
+
 export function streamedQuery<
   TQueryFnData = unknown,
   TData = Array<TQueryFnData>,
   TQueryKey extends QueryKey = QueryKey,
 >({
   queryFn,
   refetchMode = 'reset',
   reducer = (items, chunk) => addToEnd(items as Array<TQueryFnData>, chunk) as TData,
   initialValue = [] as TData,
-}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
-  TData,
-  TQueryKey
-> {
+}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<TData, TQueryKey> {

Optional: move the type aliases next to this for discoverability.

docs/reference/streamedQuery.md (1)

36-44: Docs/code mismatch: use “initialValue” instead of “placeholderData”, and document requirement when a reducer is provided.

Code uses initialValue and makes it mandatory with a reducer. Update docs accordingly.

-- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
-  - Optional
-  - A function to reduce the streamed chunks into the final data.
-  - Defaults to a function that appends chunks to the end of the array.
-- `placeholderData?: TData = TQueryFnData`
-  - Optional
-  - Defines the initial data to be used while the first chunk is being fetched.
-  - Defaults to an empty array.
+- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
+  - Optional
+  - Reduces streamed chunks into the accumulator.
+  - Defaults to appending chunks to the end of an Array<TQueryFnData>.
+- `initialValue?: TData`
+  - Optional without a reducer; required when a reducer is provided.
+  - The initial accumulator used before the first chunk arrives.
+  - Defaults to an empty array when no reducer is provided.
🧹 Nitpick comments (6)
.vscode/query.code-workspace (1)

28-29: Nit: remove stray blank lines to keep the workspace file minimal.

No functional impact, just trims noise in diffs.

-		
 		"window.zoomLevel": 2,
 		"typescript.format.enable": false,
@@
-		
 	}
 }

Also applies to: 39-41

packages/query-core/src/streamedQuery.ts (2)

26-35: JSDoc still implies Array-only data; clarify that TData can be any accumulator.

Update wording so it doesn’t promise “Data will be an Array…”. With custom reducers, TData is arbitrary.

- * Data will be an Array of all the chunks received.
+ * Data will be accumulated into TData. By default, chunks are appended to an Array<TQueryFnData>.

79-81: Aborted reset-refetch before first chunk can return undefined despite non-null assertion.

If refetchMode==='reset' and the stream is aborted before yielding a chunk, getQueryData(...) is undefined but asserted with !. Either (a) widen return to TData | undefined, or (b) explicitly return query?.state.data to reflect cache state (still undefined), or (c) guard with an invariant/error.

Minimal change:

-    return context.client.getQueryData(context.queryKey)!
+    return context.client.getQueryData(context.queryKey) as TData

Or, if you can accept a type change:

-}: StreamedQueryParams<...>): QueryFunction<TData, TQueryKey> {
+}: StreamedQueryParams<...>): QueryFunction<TData | undefined, TQueryKey> {
...
-    return context.client.getQueryData(context.queryKey)!
+    return context.client.getQueryData(context.queryKey)

Please confirm expected behavior for “reset + aborted before first chunk”.

Also applies to: 91-91

packages/query-core/src/__tests__/streamedQuery.test.tsx (1)

131-181: Consider adding a test for “replace + abort” to lock in cache semantics.

Edge case: refetchMode 'replace' + abort mid-stream should retain old data and never write partials. A focused test would prevent regressions.

I can add a vitest like:

test('should keep old data on replace when aborted before completion', async () => {
  // arrange: stream 2 items, then abort before the 2nd
  // expect: data stays at old [0,1], never becomes [100] mid-flight, ends with [0,1]
})

Also applies to: 240-296

docs/reference/streamedQuery.md (2)

25-29: Typing in docs: queryFn should return AsyncIterable, not TData.

Align with implementation and types.

-- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>`
+- `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`

29-35: Typo: missing closing quote in refetchMode union.

Small formatting fix.

-- `refetchMode?: 'append' | 'reset' | 'replace`
+- `refetchMode?: 'append' | 'reset' | 'replace'`
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between d25c0da and 42b9908.

📒 Files selected for processing (4)
  • .vscode/query.code-workspace (1 hunks)
  • docs/reference/streamedQuery.md (1 hunks)
  • packages/query-core/src/__tests__/streamedQuery.test.tsx (3 hunks)
  • packages/query-core/src/streamedQuery.ts (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (1)
packages/query-core/src/streamedQuery.ts (1)
  • streamedQuery (40-93)
packages/query-core/src/streamedQuery.ts (3)
packages/query-core/src/types.ts (3)
  • QueryKey (53-61)
  • QueryFunctionContext (138-165)
  • QueryFunction (96-100)
packages/query-core/src/index.ts (1)
  • streamedQuery (37-37)
packages/query-core/src/utils.ts (1)
  • addToEnd (392-395)
🪛 LanguageTool
docs/reference/streamedQuery.md

[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...

(QB_NEW_EN)

🔇 Additional comments (2)
packages/query-core/src/__tests__/streamedQuery.test.tsx (2)

353-385: LGTM: reducer mode is exercised with a non-array accumulator.

Good coverage of custom reducer semantics and final shape.


390-429: LGTM: initialValue path works and merges correctly.

Nice example of seeding the accumulator.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
docs/reference/streamedQuery.md (4)

25-28: Fix chunk type in queryFn signature

The async iterable yields chunk type, not the accumulator type. Use TQueryFnData here to avoid type confusion with TData.

- - `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TData>>`
+ - `queryFn: (context: QueryFunctionContext) => Promise<AsyncIterable<TQueryFnData>>`

29-35: Close the missing quote in refetchMode options

Small typo: missing closing quote around 'replace'.

-- `refetchMode?: 'append' | 'reset' | 'replace`
+- `refetchMode?: 'append' | 'reset' | 'replace'`

40-45: Correct initialValue typing and requirements

  • “initialValue?: TData = TQueryFnData” reads like a type default on a value; remove “= TQueryFnData”.
  • Clarify when initialValue is required and what the default is.
  • Grammar: add “a” before “custom reducer”.
-`initialValue?: TData = TQueryFnData`
-  - Optional
-  - Defines the initial data to be used while the first chunk is being fetched. 
-  - It is mandatory when custom `reducer` is provided.
-  - Defaults to an empty array.
+`initialValue?: TData`
+  - Optional
+  - Initial accumulator value used while the first chunk is being fetched.
+  - Required when a custom `reducer` is provided or when `TData` is not an array.
+  - Default: `[]` when `TData` is an array; otherwise no default.

6-6: Update overview to reflect reducer-based accumulation (not always an array)

This sentence contradicts the new reducer-based API. By default it’s an array, but with a custom reducer it can be any shape.

-`streamedQuery` is a helper function to create a query function that streams data from an AsyncIterable. Data will be an Array of all the chunks received. The query will be in a `pending` state until the first chunk of data is received, but will go to `success` after that. The query will stay in fetchStatus `fetching` until the stream ends.
+`streamedQuery` helps create a query function that streams data from an AsyncIterable. By default, data accumulates into an array of all received chunks; with a custom `reducer` (and `initialValue`), you can accumulate into any shape. The query will be `pending` until the first chunk arrives, then `success` while fetchStatus remains `fetching` until the stream ends.
🧹 Nitpick comments (2)
docs/reference/streamedQuery.md (2)

36-45: Tighten wording and fix minor grammar nits

  • “the streamed chunks into the final data” → fine, but remove extra spaces.
  • “It is mandatory when custom reducer is provided.” → add article “a”.
-  - A function to reduce the streamed chunks into the final data.
+  - A function that reduces streamed chunks into the final data.
@@
-  - It is mandatory when custom `reducer` is provided.
+  - It is mandatory when a custom `reducer` is provided.

45-45: Consider adding a minimal custom reducer example

A short example will help users migrate from maxChunks and understand TQueryFnData vs TData.

+#### Example: Aggregating into a non-array accumulator
+```ts
+import { experimental_streamedQuery as streamedQuery } from '@tanstack/react-query'
+
+type Chunk = { id: string; value: number }
+type Acc = Map<string, number>
+
+const query = queryOptions({
+  queryKey: ['stats'],
+  queryFn: streamedQuery<Chunk, Acc>({
+    queryFn: fetchChunkStream, // () => Promise<AsyncIterable<Chunk>>
+    initialValue: new Map(),
+    reducer: (acc, chunk) => acc.set(chunk.id, (acc.get(chunk.id) ?? 0) + chunk.value),
+    refetchMode: 'replace',
+  }),
+})
+```

Would you like me to add a migration note showing how to replicate the old maxChunks behavior with a reducer?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 42b9908 and 5f68a62.

📒 Files selected for processing (1)
  • docs/reference/streamedQuery.md (1 hunks)
🧰 Additional context used
🪛 LanguageTool
docs/reference/streamedQuery.md

[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...

(QB_NEW_EN)


[grammar] ~43-~43: There might be a mistake here.
Context: ... is being fetched. - It is mandatory when custom reducer is provided. - Defau...

(QB_NEW_EN)


[grammar] ~43-~43: There might be a mistake here.
Context: ...atory when custom reducer is provided. - Defaults to an empty array.

(QB_NEW_EN)

Comment on lines +36 to +40
- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
- Optional
- The maximum number of chunks to keep in the cache.
- Defaults to `undefined`, meaning all chunks will be kept.
- If `undefined` or `0`, the number of chunks is unlimited.
- If the number of chunks exceeds this number, the oldest chunk will be removed.
- A function to reduce the streamed chunks into the final data.
- Defaults to a function that appends chunks to the end of the array.
- `initialValue?: TData = TQueryFnData`
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clarify reducer defaults and applicability beyond arrays

  • The default “append” behavior only makes sense when TData is an array.
  • Explicitly state that when TData is not an array, a custom reducer is required.
  • Minor grammar tweak (“chunks” vs “chunk list” not needed).
-`reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
-  - Optional
-  - A function to reduce the streamed chunks into the final data.
-  - Defaults to a function that appends chunks to the end of the array.
+`reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
+  - Optional
+  - Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`).
+  - Default: appends each chunk to the end of the accumulator when `TData` is an array.
+  - If `TData` is not an array, you must provide a custom `reducer`.
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
- Optional
- The maximum number of chunks to keep in the cache.
- Defaults to `undefined`, meaning all chunks will be kept.
- If `undefined` or `0`, the number of chunks is unlimited.
- If the number of chunks exceeds this number, the oldest chunk will be removed.
- A function to reduce the streamed chunks into the final data.
- Defaults to a function that appends chunks to the end of the array.
- `initialValue?: TData = TQueryFnData`
`reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
- Optional
- Reduces streamed chunks (`TQueryFnData`) into the final data shape (`TData`).
- Default: appends each chunk to the end of the accumulator when `TData` is an array.
- If `TData` is not an array, you must provide a custom `reducer`.
`initialValue?: TData = TQueryFnData`
🧰 Tools
🪛 LanguageTool

[grammar] ~38-~38: There might be a mistake here.
Context: ...the streamed chunks into the final data. - Defaults to a function that appends chun...

(QB_NEW_EN)

🤖 Prompt for AI Agents
In docs/reference/streamedQuery.md around lines 36 to 40, the docs state the
default reducer "appends chunks to the end of the array" but do not clarify that
this default only makes sense when TData is an array; update the text to say the
default reducer appends each incoming chunk to the end of the accumulator only
when TData is an array (or TData extends Array<TQueryFnData>), and explicitly
note that if TData is not an array a custom reducer must be provided; also
adjust the initialValue line to clarify expected type (e.g., initialValue?:
TData — default is [] for array TData) and keep the wording simple (use "chunks"
not "chunk list") as suggested.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
documentation Improvements or additions to documentation package: query-core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants