Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions .vscode/query.code-workspace
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
{
"folders": [
{
"path": "../"
}
],
"settings": {
"terminal.integrated.profiles.osx": { "zsh": { "path": "/bin/zsh", "args": ["-l", "-i"] } },
"css.validate": false,
"editor.codeActionsOnSave": {
"source.fixAll": "explicit"
},
"eslint.format.enable": true,
"eslint.enable": true,
"files.autoSave": "afterDelay",
"files.eol": "\n",
"javascript.validate.enable": false,
"json.validate.enable": true,
"less.validate": false,
"scss.validate": false,
"scss.completion.completePropertyWithSemicolon": true,
"scss.completion.triggerPropertyValueCompletion": true,
"stylelint.validate": [
"css",
"scss",
"less"
],

"window.zoomLevel": 2,
"typescript.format.enable": false,
"typescript.preferences.includePackageJsonAutoImports": "on",
"typescript.validate.enable": false,
"javascript.updateImportsOnFileMove.enabled": "prompt",
"javascript.suggest.autoImports": true,
"typescript.suggest.autoImports": true,
"typescript.updateImportsOnFileMove.enabled": "always",
"typescript.disableAutomaticTypeAcquisition": true,
"typescript.tsdk": "node_modules/typescript/lib",
"vitest.maximumConfigs":15

}
}
12 changes: 7 additions & 5 deletions docs/reference/streamedQuery.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@ const query = queryOptions({
- When set to `'reset'`, the query will erase all data and go back into `pending` state.
- When set to `'append'`, data will be appended to existing data.
- When set to `'replace'`, all data will be written to the cache once the stream ends.
- `maxChunks?: number`
- `reducer?: (accumulator: TData, chunk: TQueryFnData) => TData`
- Optional
- The maximum number of chunks to keep in the cache.
- Defaults to `undefined`, meaning all chunks will be kept.
- If `undefined` or `0`, the number of chunks is unlimited.
- If the number of chunks exceeds this number, the oldest chunk will be removed.
- A function to reduce the streamed chunks into the final data.
- Defaults to a function that appends chunks to the end of the array.
- `placeholderData?: TData = TQueryFnData`
- Optional
- Defines the initial data to be used while the first chunk is being fetched.
- Defaults to an empty array.
111 changes: 32 additions & 79 deletions packages/query-core/src/__tests__/streamedQuery.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -350,13 +350,18 @@ describe('streamedQuery', () => {
unsubscribe()
})

test('should support maxChunks', async () => {
test('should support custom reducer', async () => {
const key = queryKey()

const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: () => createAsyncNumberGenerator(2),
reducer: (acc, chunk) => ({
...acc,
[chunk]: true,
}),
initialValue: {}
}),
})

Expand All @@ -368,41 +373,34 @@ describe('streamedQuery', () => {
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)
await vi.advanceTimersByTimeAsync(100)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
data: {
0: true,
1: true,
},
})

unsubscribe()
})

test('maxChunks with append refetch', async () => {
test('should support custom reducer with initialValue', async () => {
const key = queryKey()
const observer = new QueryObserver(queryClient, {
queryKey: key,
queryFn: streamedQuery({
queryFn: () => createAsyncNumberGenerator(3),
maxChunks: 2,
refetchMode: 'append',
queryFn: streamedQuery<number, Record<number, boolean>>({
queryFn: () => createAsyncNumberGenerator(2),
reducer: (acc, chunk) => ({
...acc,
[chunk]: true,
}),
initialValue: {
10: true,
11: true,
},
}),
})

Expand All @@ -414,62 +412,17 @@ describe('streamedQuery', () => {
data: undefined,
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
})

void observer.refetch()

await vi.advanceTimersByTimeAsync(10)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [1, 2],
})

await vi.advanceTimersByTimeAsync(40)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [2, 0],
})

await vi.advanceTimersByTimeAsync(50)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'fetching',
data: [0, 1],
})

await vi.advanceTimersByTimeAsync(50)
await vi.advanceTimersByTimeAsync(100)

expect(observer.getCurrentResult()).toMatchObject({
status: 'success',
fetchStatus: 'idle',
data: [1, 2],
data: {
10: true,
11: true,
0: true,
1: true,
},
})

unsubscribe()
Expand Down
60 changes: 39 additions & 21 deletions packages/query-core/src/streamedQuery.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,28 @@
import { addToEnd } from './utils'
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'

type BaseStreamedQueryParams<TQueryFnData,TQueryKey extends QueryKey> = {
queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset' | 'replace'
}

type SimpleStreamedQueryParams<TQueryFnData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData,TQueryKey> & {
reducer?: never;
initialValue?: never;
}

type ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = BaseStreamedQueryParams<TQueryFnData, TQueryKey> & {
reducer: (acc: TData, chunk: TQueryFnData) => TData
initialValue: TData
}

type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> =
| SimpleStreamedQueryParams<TQueryFnData, TQueryKey>
| ReducibleStreamedQueryParams<TQueryFnData, TData, TQueryKey>


/**
* This is a helper function to create a query function that streams data from an AsyncIterable.
* Data will be an Array of all the chunks received.
Expand All @@ -11,31 +33,29 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types'
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state.
* Set to `'append'` to append new data to the existing data.
* Set to `'replace'` to write all data to the cache once the stream ends.
* @param maxChunks - The maximum number of chunks to keep in the cache.
* Defaults to `undefined`, meaning all chunks will be kept.
* If `undefined` or `0`, the number of chunks is unlimited.
* If the number of chunks exceeds this number, the oldest chunk will be removed.
* @param reducer - A function to reduce the streamed chunks into the final data.
* Defaults to a function that appends chunks to the end of the array.
* @param initialValue - Initial value to be used while the first chunk is being fetched.
*/
export function streamedQuery<
TQueryFnData = unknown,
TData = Array<TQueryFnData>,
TQueryKey extends QueryKey = QueryKey,
>({
queryFn,
refetchMode = 'reset',
maxChunks,
}: {
queryFn: (
context: QueryFunctionContext<TQueryKey>,
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>>
refetchMode?: 'append' | 'reset' | 'replace'
maxChunks?: number
}): QueryFunction<Array<TQueryFnData>, TQueryKey> {
reducer = (items, chunk) => addToEnd(items as Array<TQueryFnData>, chunk) as TData,
initialValue = [] as TData,
}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction<
TData,
TQueryKey
> {

return async (context) => {
const query = context.client
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true })
const isRefetch = !!query && query.state.data !== undefined

if (isRefetch && refetchMode === 'reset') {
query.setState({
status: 'pending',
Expand All @@ -45,7 +65,8 @@ export function streamedQuery<
})
}

let result: Array<TQueryFnData> = []
let result = initialValue

const stream = await queryFn(context)

for await (const chunk of stream) {
Expand All @@ -55,19 +76,16 @@ export function streamedQuery<

// don't append to the cache directly when replace-refetching
if (!isRefetch || refetchMode !== 'replace') {
context.client.setQueryData<Array<TQueryFnData>>(
context.queryKey,
(prev = []) => {
return addToEnd(prev, chunk, maxChunks)
},
context.client.setQueryData<TData>(context.queryKey, (prev) =>
reducer(prev ?? initialValue, chunk),
)
}
result = addToEnd(result, chunk, maxChunks)
result = reducer(result, chunk)
}

// finalize result: replace-refetching needs to write to the cache
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) {
context.client.setQueryData<Array<TQueryFnData>>(context.queryKey, result)
context.client.setQueryData<TData>(context.queryKey, result)
}

return context.client.getQueryData(context.queryKey)!
Expand Down