-
-
Notifications
You must be signed in to change notification settings - Fork 3.4k
feat(query-core): add custom reducer support to streamedQuery #9532
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
d2c4807
9474d4c
890e373
0a4e065
d46d488
42b9908
14b6241
5f68a62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,15 @@ | ||
import { addToEnd } from './utils' | ||
import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' | ||
|
||
type StreamedQueryParams<TQueryFnData, TData, TQueryKey extends QueryKey> = { | ||
queryFn: ( | ||
context: QueryFunctionContext<TQueryKey>, | ||
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>> | ||
refetchMode?: 'append' | 'reset' | 'replace' | ||
reducer?: (acc: TData, chunk: TQueryFnData) => TData | ||
placeholderData?: TData | ||
} | ||
|
||
/** | ||
* This is a helper function to create a query function that streams data from an AsyncIterable. | ||
* Data will be an Array of all the chunks received. | ||
|
@@ -11,31 +20,30 @@ import type { QueryFunction, QueryFunctionContext, QueryKey } from './types' | |
* Defaults to `'reset'`, erases all data and puts the query back into `pending` state. | ||
* Set to `'append'` to append new data to the existing data. | ||
* Set to `'replace'` to write all data to the cache once the stream ends. | ||
* @param maxChunks - The maximum number of chunks to keep in the cache. | ||
* Defaults to `undefined`, meaning all chunks will be kept. | ||
* If `undefined` or `0`, the number of chunks is unlimited. | ||
* If the number of chunks exceeds this number, the oldest chunk will be removed. | ||
* @param reducer - A function to reduce the streamed chunks into the final data. | ||
* Defaults to a function that appends chunks to the end of the array. | ||
* @param placeholderData - Initial data to be used while the first chunk is being fetched. | ||
* Defaults to an empty array. | ||
*/ | ||
export function streamedQuery< | ||
TQueryFnData = unknown, | ||
TData = Array<TQueryFnData>, | ||
TQueryKey extends QueryKey = QueryKey, | ||
>({ | ||
queryFn, | ||
refetchMode = 'reset', | ||
maxChunks, | ||
}: { | ||
queryFn: ( | ||
context: QueryFunctionContext<TQueryKey>, | ||
) => AsyncIterable<TQueryFnData> | Promise<AsyncIterable<TQueryFnData>> | ||
refetchMode?: 'append' | 'reset' | 'replace' | ||
maxChunks?: number | ||
}): QueryFunction<Array<TQueryFnData>, TQueryKey> { | ||
reducer = (items, chunk) => | ||
addToEnd((items ?? []) as Array<TQueryFnData>, chunk) as TData, | ||
placeholderData = [] as TData, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i introduced this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Below you can find an implementation about how to mimic // ...existing code...
return async (context) => {
const query = context.client
.getQueryCache()
.find({ queryKey: context.queryKey, exact: true })
const isRefetch = !!query && query.state.data !== undefined
if (isRefetch && params.refetchMode === 'reset') {
query.setState({
status: 'pending',
data: undefined,
error: null,
fetchStatus: 'fetching',
})
}
const hasInitialValue = 'placeholderData' in params;
let result: TData;
let isFirstChunk = true;
const stream = await params.queryFn(context)
for await (const chunk of stream) {
if (context.signal.aborted) {
break
}
if (isFirstChunk) {
if (hasInitialValue) {
// If we have placeholderData, use it as initial accumulator
result = reducer(placeholderData, chunk);
} else {
// If no placeholderData, first chunk becomes the accumulator
result = chunk as unknown as TData;
}
isFirstChunk = false;
} else {
result = reducer(result, chunk);
}
// don't append to the cache directly when replace-refetching
if (!isRefetch || params.refetchMode !== 'replace') {
context.client.setQueryData<TData>(
context.queryKey,
(prev) => {
if (prev === undefined) {
return result;
}
return hasInitialValue ? reducer(prev, chunk) : result;
}
)
}
}
// Handle empty stream case
if (isFirstChunk) {
if (hasInitialValue) {
result = placeholderData;
} else {
throw new Error('Reduce of empty stream with no initial value');
}
}
// finalize result: replace-refetching needs to write to the cache
if (isRefetch && params.refetchMode === 'replace' && !context.signal.aborted) {
context.client.setQueryData<TData>(context.queryKey, result)
}
return context.client.getQueryData(context.queryKey)!
} The key changes:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think what we’d want here is a mandatory It could be two separate params, like
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great.thanks for the feedback. I'm on vacation right now,but next week I'll back. I'm going to provide a prop initial value, and I'll play with TS in order to make it mandatory when reducer is specified. |
||
}: StreamedQueryParams<TQueryFnData, TData, TQueryKey>): QueryFunction< | ||
TData, | ||
TQueryKey | ||
> { | ||
return async (context) => { | ||
const query = context.client | ||
.getQueryCache() | ||
.find({ queryKey: context.queryKey, exact: true }) | ||
const isRefetch = !!query && query.state.data !== undefined | ||
|
||
if (isRefetch && refetchMode === 'reset') { | ||
query.setState({ | ||
status: 'pending', | ||
|
@@ -45,7 +53,8 @@ export function streamedQuery< | |
}) | ||
} | ||
|
||
let result: Array<TQueryFnData> = [] | ||
let result = placeholderData | ||
|
||
const stream = await queryFn(context) | ||
|
||
for await (const chunk of stream) { | ||
|
@@ -55,19 +64,16 @@ export function streamedQuery< | |
|
||
// don't append to the cache directly when replace-refetching | ||
if (!isRefetch || refetchMode !== 'replace') { | ||
context.client.setQueryData<Array<TQueryFnData>>( | ||
context.queryKey, | ||
(prev = []) => { | ||
return addToEnd(prev, chunk, maxChunks) | ||
}, | ||
context.client.setQueryData<TData>(context.queryKey, (prev) => | ||
reducer(prev ?? placeholderData, chunk), | ||
) | ||
} | ||
result = addToEnd(result, chunk, maxChunks) | ||
result = reducer(result, chunk) | ||
} | ||
|
||
// finalize result: replace-refetching needs to write to the cache | ||
if (isRefetch && refetchMode === 'replace' && !context.signal.aborted) { | ||
context.client.setQueryData<Array<TQueryFnData>>(context.queryKey, result) | ||
context.client.setQueryData<TData>(context.queryKey, result) | ||
} | ||
|
||
return context.client.getQueryData(context.queryKey)! | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I created this type just for clarity. should i keep it inline/move to a dedicated file or in types?
About type safety i tried to split the type in 2 like below, but i feel it is over-engineering.