Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions docs/reference/streamedQuery.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ const query = queryOptions({
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
- When set to `'append'`, data will be appended to existing data.
- When set to `'replace'`, all data will be written to the cache once the stream ends.
- `maxChunks?: number`
- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
- Optional
- The maximum number of chunks to keep in the cache.
- Defaults to `undefined`, meaning all chunks will be kept.
- If `undefined` or `0`, the number of chunks is unlimited.
- If the number of chunks exceeds this number, the oldest chunk will be removed.
- A function to reduce the streamed chunks into the final data.
- Defaults to a function that appends chunks to the end of the array.
- `placeholderData?: TData = TQueryFnData`
- Optional
- Defines the initial data to be used while the first chunk is being fetched.
- Defaults to an empty array.
110 changes: 31 additions & 79 deletions packages/query-core/src/__tests__/streamedQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,17 @@ describe('streamedQuery', () => {
unsubscribe()
})

test('should support maxChunks', async () => {
test('should support custom reducer', async () => {
const key = queryKey()

const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: () => createAsyncNumberGenerator(2),
reducer: (acc, chunk) => ({
...acc,
[chunk]: true,
}),
}),
})

Expand All @@ -368,41 +372,34 @@ describe('streamedQuery', () => {
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)
await vi.advanceTimersByTimeAsync(100)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
data: {
0: true,
1: true,
},
})

unsubscribe()
})

test('maxChunks with append refetch', async () => {
test('should support custom reducer with placeholderData', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
refetchMode: 'append',
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: () => createAsyncNumberGenerator(2),
reducer: (acc, chunk) => ({
...acc,
[chunk]: true,
}),
placeholderData: {
10: true,
11: true,
},
}),
})

Expand All @@ -414,62 +411,17 @@ describe('streamedQuery', () => {
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
})

void observer.refetch()

await vi.advanceTimersByTimeAsync(10)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [1, 2],
})

await vi.advanceTimersByTimeAsync(40)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [2, 0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)
await vi.advanceTimersByTimeAsync(100)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
data: {
10: true,
11: true,
0: true,
1: true,
},
})

unsubscribe()
Expand Down
48 changes: 27 additions & 21 deletions packages/query-core/src/streamedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,15 @@
import { addToEnd } from './utils'
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'

type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this type just for clarity. should i keep it inline/move to a dedicated file or in types?
About type safety i tried to split the type in 2 like below, but i feel it is over-engineering.

type BaseStreamedQueryParams<TQueryFnData, TData> = {
   queryFn: (
    context: QueryFunctionContext<TQueryKey>,
  ) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
  refetchMode?: 'append' | 'reset' | 'replace'
}
type SimpleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TData> & {

  reducer?: never;
  placeholderData?: never;
}

type ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TData> & {
  
  reducer?: (acc: TData, chunk: TQueryFnData) => TData
  placeholderData?: TData
}
type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> =
  | SimpleStreamedQueryParams<TQueryFnData, TData, TQueryKey>
  | ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>

export function streamedQuery<
  TQueryFnData = unknown,
  TData = Array<TQueryFnData>,
  TQueryKey extends QueryKey = QueryKey,
>(params: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<TData, TQueryKey> {

  const reducer =
    'reducer' in params && typeof params.reducer === 'function'
      ? params.reducer
      : (items: TData, chunk: TQueryFnData) =>
          addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData;

  const placeholderData =
    'placeholderData' in params ? params.placeholderData : ([] as TData);
// ...

queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset' | 'replace'
reducer?: (acc: TData, chunk: TQueryFnData) => TData
placeholderData?: TData
}

/**
* This is a helper function to create a query function that streams data from an AsyncIterable.
* Data will be an Array of all the chunks received.
Expand All @@ -11,31 +20,30 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
* Set to `'append'` to append new data to the existing data.
* Set to `'replace'` to write all data to the cache once the stream ends.
* @param maxChunks - The maximum number of chunks to keep in the cache.
* Defaults to `undefined`, meaning all chunks will be kept.
* If `undefined` or `0`, the number of chunks is unlimited.
* If the number of chunks exceeds this number, the oldest chunk will be removed.
* @param reducer - A function to reduce the streamed chunks into the final data.
* Defaults to a function that appends chunks to the end of the array.
* @param placeholderData - Initial data to be used while the first chunk is being fetched.
* Defaults to an empty array.
*/
export function streamedQuery<
TQueryFnData = unknown,
TData = Array<TQueryFnData>,
TQueryKey extends QueryKey = QueryKey,
>({
queryFn,
refetchMode = 'reset',
maxChunks,
}: {
queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset' | 'replace'
maxChunks?: number
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
reducer = (items, chunk) =>
addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData,
placeholderData = [] as TData,
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i introduced this placeholderData prop to mimic native reduce with an initialValue.
My first approach was to use the existing placeholderData parameter from the Observer, but even though I can access it through the query.options object, TypeScript throws an error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Below you can find an implementation about how to mimic Array.prototype.reduce behavior where the initial value is optional, and if not provided, the first element becomes the accumulator. I didn't applied it to the PR because i think it better to have your point of of view about it.

// ...existing code...
return async (context) => {
  const query = context.client
    .getQueryCache()
    .find({ queryKey: context.queryKey, exact: true })
  const isRefetch = !!query && query.state.data !== undefined
  if (isRefetch && params.refetchMode === 'reset') {
    query.setState({
      status: 'pending',
      data: undefined,
      error: null,
      fetchStatus: 'fetching',
    })
  }

  const hasInitialValue = 'placeholderData' in params;
  let result: TData;
  let isFirstChunk = true;

  const stream = await params.queryFn(context)

  for await (const chunk of stream) {
    if (context.signal.aborted) {
      break
    }
    
    if (isFirstChunk) {
      if (hasInitialValue) {
        // If we have placeholderData, use it as initial accumulator
        result = reducer(placeholderData, chunk);
      } else {
        // If no placeholderData, first chunk becomes the accumulator
        result = chunk as unknown as TData;
      }
      isFirstChunk = false;
    } else {
      result = reducer(result, chunk);
    }
    
    // don't append to the cache directly when replace-refetching
    if (!isRefetch || params.refetchMode !== 'replace') {
      context.client.setQueryData<TData>(
        context.queryKey,
        (prev) => {
          if (prev === undefined) {
            return result;
          }
          return hasInitialValue ? reducer(prev, chunk) : result;
        }
      )
    }
  }

  // Handle empty stream case
  if (isFirstChunk) {
    if (hasInitialValue) {
      result = placeholderData;
    } else {
      throw new Error('Reduce of empty stream with no initial value');
    }
  }

  // finalize result: replace-refetching needs to write to the cache
  if (isRefetch && params.refetchMode === 'replace' && !context.signal.aborted) {
    context.client.setQueryData<TData>(context.queryKey, result)
  }

  return context.client.getQueryData(context.queryKey)!
}

The key changes:

  • Initial value logic: Only use placeholderData if it's explicitly provided
  • First chunk handling: If no placeholderData the first chunk becomes the initial accumulator
  • Empty stream error: Throw an error if the stream is empty and no placeholderData is provided

Copy link
Collaborator

@TkDodo TkDodo Aug 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what we’d want here is a mandatory initialValue to the reducer if you pass a custom reducer. The term placeholderData is a bit overloaded because here, it does something different than on useQuery itself so I wouldn’t name it like that.

It could be two separate params, like reducer and initialValue, but we’d have to make the types so that initialValue is required when you pass a reducer. That’s possible but usually needs overloads or conditional types. Or, we could also just use a tuple maybe:

reducer: [initialValue: TData, (accumulator: TData, chunk: TQueryFnData) => TData]

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great.thanks for the feedback. I'm on vacation right now,but next week I'll back. I'm going to provide a prop initial value, and I'll play with TS in order to make it mandatory when reducer is specified.

}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
TData,
TQueryKey
> {
return async (context) => {
const query = context.client
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true })
const isRefetch = !!query && query.state.data !== undefined

if (isRefetch && refetchMode === 'reset') {
query.setState({
status: 'pending',
Expand All @@ -45,7 +53,8 @@ export function streamedQuery<
})
}

let result: Array<TQueryFnData> = []
let result = placeholderData

const stream = await queryFn(context)

for await (const chunk of stream) {
Expand All @@ -55,19 +64,16 @@ export function streamedQuery<

// don't append to the cache directly when replace-refetching
if (!isRefetch || refetchMode !== 'replace') {
context.client.setQueryData<Array<TQueryFnData>>(
context.queryKey,
(prev = []) => {
return addToEnd(prev, chunk, maxChunks)
},
context.client.setQueryData<TData>(context.queryKey, (prev) =>
reducer(prev ?? placeholderData, chunk),
)
}
result = addToEnd(result, chunk, maxChunks)
result = reducer(result, chunk)
}

// finalize result: replace-refetching needs to write to the cache
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
context.client.setQueryData<Array<TQueryFnData>>(context.queryKey, result)
context.client.setQueryData<TData>(context.queryKey, result)
}

return context.client.getQueryData(context.queryKey)!
Expand Down