diff --git a/DEPRECATION.md b/DEPRECATION.md index 1ee5c4f14295a..c55794741cc52 100644 --- a/DEPRECATION.md +++ b/DEPRECATION.md @@ -64,8 +64,9 @@ features: | Removed | [`initApp` hook](#initapp-hook) | v0.35.0 | v0.35.0 | | Removed | [`/v1/run-scheduled-refresh` REST API endpoint](#v1run-scheduled-refresh-rest-api-endpoint) | v0.35.0 | v0.36.0 | | Removed | [Node.js 18](#nodejs-18) | v0.36.0 | v1.3.0 | -| Deprecated | [`CUBEJS_SCHEDULED_REFRESH_CONCURRENCY`](#cubejs_scheduled_refresh_concurrency) | v1.2.7 | | +| Deprecated | [`CUBEJS_SCHEDULED_REFRESH_CONCURRENCY`](#cubejs_scheduled_refresh_concurrency) | v1.2.7 | | | Deprecated | [Node.js 20](#nodejs-20) | v1.3.0 | | +| Deprecated | [`renewQuery` parameter of the `/v1/load` endpoint](#renewquery-parameter-of-the-v1load-endpoint) | v1.3.73 | | ### Node.js 8 @@ -412,3 +413,8 @@ This environment variable was renamed to [`CUBEJS_SCHEDULED_REFRESH_QUERIES_PER_ Node.js 20 is in maintenance mode from [November 22, 2024][link-nodejs-eol]. This means no more new features, only security updates. Please upgrade to Node.js 22 or higher. + +### `renewQuery` parameter of the `/v1/load` endpoint + +This parameter is deprecated and will be removed in future releases. See [cache control](https://cube.dev/docs/product/apis-integrations/rest-api#cache-control) +options and use the new `cache` parameter of the `/v1/load` endpoint instead. \ No newline at end of file diff --git a/docs/pages/product/apis-integrations/rest-api.mdx b/docs/pages/product/apis-integrations/rest-api.mdx index 98b1e1d9a0cb7..607d19272472e 100644 --- a/docs/pages/product/apis-integrations/rest-api.mdx +++ b/docs/pages/product/apis-integrations/rest-api.mdx @@ -161,7 +161,7 @@ accessible for everyone. | API scope | REST API endpoints | Accessible by default? | | --- | --- | --- | | `meta` | [`/v1/meta`][ref-ref-meta] | ✅ Yes | -| `data` | [`/v1/load`][ref-ref-load] | ✅ Yes | +| `data` | [`/v1/load`][ref-ref-load], [`/v1/cubesql`][ref-ref-cubesql] | ✅ Yes | | `graphql` | `/graphql` | ✅ Yes | | `sql` | [`/v1/sql`][ref-ref-sql] | ✅ Yes | | `jobs` | [`/v1/pre-aggregations/jobs`][ref-ref-paj] | ❌ No | @@ -248,9 +248,20 @@ should be unique for each separate request. `spanId` should define user interaction span such us `Continue wait` retry cycle and it's value shouldn't change during one single interaction. -## Troubleshooting +## Cache control -### `Continue wait` +[`/v1/load`][ref-ref-load] and [`/v1/cubesql`][ref-ref-cubesql] endpoints of the REST API +allow to control the cache behavior. The following querying strategies with regards to +the cache are supported: + +| Strategy | Description | +| --- | --- | +| `stale-if-slow` | If [refresh keys][ref-refresh-keys] are up-to-date, returns cached value. If expired, tries to return fresh value from the data source. If the data source query is slow (hits [`Continue wait`](#continue-wait)), returns stale value from cache. | +| `stale-while-revalidate`| If [refresh keys][ref-refresh-keys] are up-to-date, returns cached value. If expired, returns stale data from cache and updates cache in background. | +| `must-revalidate` | If [refresh keys][ref-refresh-keys] are up-to-date, returns cached value. If expired, always waits for fresh value from the data source, even if slow (hits one or more [`Continue wait`](#continue-wait) intervals). | +| `no-cache` | Skips [refresh key][ref-refresh-keys] checks. Always returns fresh data from the data source, regardless of cache or query performance. | + +## `Continue wait` If the request takes too long to be processed, the REST API responds with `{ "error": "Continue wait" }` and the status code 200. @@ -295,6 +306,7 @@ warehouse][ref-data-warehouses]. [ref-ref-load]: /product/apis-integrations/rest-api/reference#base_pathv1load [ref-ref-meta]: /product/apis-integrations/rest-api/reference#base_pathv1meta [ref-ref-sql]: /product/apis-integrations/rest-api/reference#base_pathv1sql +[ref-ref-cubesql]: /product/apis-integrations/rest-api/reference#base_pathv1cubesql [ref-ref-paj]: /product/apis-integrations/rest-api/reference#base_pathv1pre-aggregationsjobs [ref-security-context]: /product/auth/context [ref-graphql-api]: /product/apis-integrations/graphql-api @@ -313,4 +325,5 @@ warehouse][ref-data-warehouses]. [ref-traditional-databases]: /product/configuration/data-sources#transactional-databases [ref-pre-aggregations]: /product/caching/using-pre-aggregations [ref-javascript-sdk]: /product/apis-integrations/javascript-sdk -[ref-recipe-real-time-data-fetch]: /product/apis-integrations/recipes/real-time-data-fetch \ No newline at end of file +[ref-recipe-real-time-data-fetch]: /product/apis-integrations/recipes/real-time-data-fetch +[ref-refresh-keys]: /product/data-modeling/reference/cube#refresh_key \ No newline at end of file diff --git a/docs/pages/product/apis-integrations/rest-api/query-format.mdx b/docs/pages/product/apis-integrations/rest-api/query-format.mdx index 12840981bbd74..17683e70f1d0e 100644 --- a/docs/pages/product/apis-integrations/rest-api/query-format.mdx +++ b/docs/pages/product/apis-integrations/rest-api/query-format.mdx @@ -41,17 +41,6 @@ The default value is `false`. - `timezone`: A [time zone][ref-time-zone] for your query. You can set the desired time zone in the [TZ Database Name](https://en.wikipedia.org/wiki/Tz_database) format, e.g., `America/Los_Angeles`. -- `renewQuery`: If `renewQuery` is set to `true`, Cube will renew all - [`refreshKey`][ref-schema-ref-preaggs-refreshkey] for queries and query - results in the foreground. However, if the - [`refreshKey`][ref-schema-ref-preaggs-refreshkey] (or - [`refreshKey.every`][ref-schema-ref-preaggs-refreshkey-every]) doesn't - indicate that there's a need for an update this setting has no effect. The - default value is `false`. - > **NOTE**: Cube provides only eventual consistency guarantee. Using a small - > [`refreshKey.every`][ref-schema-ref-preaggs-refreshkey-every] value together - > with `renewQuery` to achieve immediate consistency can lead to endless - > refresh loops and overall system instability. - `ungrouped`: If set to `true`, Cube will run an [ungrouped query][ref-ungrouped-query]. - `joinHints`: Query-time [join hints][ref-join-hints], provided as an array of diff --git a/docs/pages/product/apis-integrations/rest-api/reference.mdx b/docs/pages/product/apis-integrations/rest-api/reference.mdx index 8e30162750ee8..00c7c2357225f 100644 --- a/docs/pages/product/apis-integrations/rest-api/reference.mdx +++ b/docs/pages/product/apis-integrations/rest-api/reference.mdx @@ -13,10 +13,11 @@ By default, it's `/cubejs-api`. Run the query to the REST API and get the results. -| Parameter | Description | -| ----------- | --------------------------------------------------------------------------------------------------------------------- | -| `query` | Either a single URL encoded Cube [Query](/product/apis-integrations/rest-api/query-format), or an array of queries | -| `queryType` | If multiple queries are passed in `query` for [data blending][ref-recipes-data-blending], this must be set to `multi` | +| Parameter | Description | Required | +| ----------- | --------------------------------------------------------------------------------------------------------------------- | --- | +| `query` | Either a single URL encoded Cube [Query](/product/apis-integrations/rest-api/query-format), or an array of queries | ✅ Yes | +| `queryType` | If multiple queries are passed in `query` for [data blending][ref-recipes-data-blending], this must be set to `multi` | ❌ No | +| `cache` | See [cache control][ref-cache-control]. `stale-if-slow` by default | ❌ No | Response @@ -319,9 +320,10 @@ This endpoint is part of the [SQL API][ref-sql-api]. -| Parameter | Description | -| --- | --- | -| `query` | The SQL query to run. | +| Parameter | Description | Required | +| --- | --- | --- | +| `query` | The SQL query to run. | ✅ Yes | +| `cache` | See [cache control][ref-cache-control]. `stale-if-slow` by default | ❌ No | Response: a stream of newline-delimited JSON objects. The first object contains the `schema` property with column names and types. The following objects contain @@ -639,4 +641,5 @@ Keep-Alive: timeout=5 [ref-query-wpd]: /product/apis-integrations/queries#query-with-pushdown [ref-sql-api]: /product/apis-integrations/sql-api [ref-orchestration-api]: /product/apis-integrations/orchestration-api -[ref-folders]: /product/data-modeling/reference/view#folders \ No newline at end of file +[ref-folders]: /product/data-modeling/reference/view#folders +[ref-cache-control]: /product/apis-integrations/rest-api#cache-control \ No newline at end of file diff --git a/packages/cubejs-api-gateway/openspec.yml b/packages/cubejs-api-gateway/openspec.yml index be44c1369937b..4ad8edb08ddc3 100644 --- a/packages/cubejs-api-gateway/openspec.yml +++ b/packages/cubejs-api-gateway/openspec.yml @@ -489,6 +489,13 @@ components: properties: queryType: type: "string" + cache: + type: "string" + enum: + - stale-if-slow + - stale-while-revalidate + - must-revalidate + - no-cache query: type: "object" $ref: "#/components/schemas/V1LoadRequestQuery" diff --git a/packages/cubejs-api-gateway/package.json b/packages/cubejs-api-gateway/package.json index bdedf6eb96c71..d4c4813bd9f8e 100644 --- a/packages/cubejs-api-gateway/package.json +++ b/packages/cubejs-api-gateway/package.json @@ -29,6 +29,7 @@ "dependencies": { "@cubejs-backend/native": "1.3.74", "@cubejs-backend/shared": "1.3.74", + "@cubejs-backend/query-orchestrator": "1.3.74", "@ungap/structured-clone": "^0.3.4", "assert-never": "^1.4.0", "body-parser": "^1.19.0", diff --git a/packages/cubejs-api-gateway/src/gateway.ts b/packages/cubejs-api-gateway/src/gateway.ts index 8e2cfce793f26..d02135496e40d 100644 --- a/packages/cubejs-api-gateway/src/gateway.ts +++ b/packages/cubejs-api-gateway/src/gateway.ts @@ -12,6 +12,7 @@ import { getRealType, parseUtcIntoLocalDate, QueryAlias, + CacheMode, } from '@cubejs-backend/shared'; import { ResultArrayWrapper, @@ -28,6 +29,7 @@ import type { } from 'express'; import { createProxyMiddleware } from 'http-proxy-middleware'; +import { QueryBody } from '@cubejs-backend/query-orchestrator'; import { QueryType, ApiScopes, @@ -177,7 +179,13 @@ class ApiGateway { public constructor( protected readonly apiSecret: string, + /** + * It actually returns a Promise + */ protected readonly compilerApi: (ctx: RequestContext) => Promise, + /** + * It actually returns a Promise + */ protected readonly adapterApi: (ctx: RequestContext) => Promise, protected readonly logger: any, protected readonly options: ApiGatewayOptions, @@ -311,6 +319,7 @@ class ApiGateway { context: req.context, res: this.resToResultFn(res), queryType: req.query.queryType, + cacheMode: this.normalizeCacheMode(req.query.query, req.query.cache), }); })); @@ -320,7 +329,8 @@ class ApiGateway { query: req.body.query, context: req.context, res: this.resToResultFn(res), - queryType: req.body.queryType + queryType: req.body.queryType, + cacheMode: this.normalizeCacheMode(req.body.query, req.body.cache), }); })); @@ -329,7 +339,8 @@ class ApiGateway { query: req.query.query, context: req.context, res: this.resToResultFn(res), - queryType: req.query.queryType + queryType: req.query.queryType, + cacheMode: this.normalizeCacheMode(req.query.query, req.query.cache), }); })); @@ -425,7 +436,7 @@ class ApiGateway { try { await this.assertApiScope('data', req.context?.securityContext); - await this.sqlServer.execSql(req.body.query, res, req.context?.securityContext); + await this.sqlServer.execSql(req.body.query, res, req.context?.securityContext, req.body.cache); } catch (e: any) { this.handleError({ e, @@ -576,6 +587,19 @@ class ApiGateway { return requestStarted && (new Date().getTime() - requestStarted.getTime()); } + // TODO: Drop this when renewQuery will be removed + private normalizeCacheMode(query, cache: string): CacheMode { + if (cache !== undefined) { + return cache as CacheMode; + } else if (query?.renewQuery !== undefined) { + return query.renewQuery === true + ? 'must-revalidate' + : 'stale-if-slow'; + } + + return 'stale-if-slow'; + } + private filterVisibleItemsInMeta(context: RequestContext, cubes: any[]) { const isDevMode = getEnv('devMode'); function visibilityFilter(item) { @@ -1636,13 +1660,14 @@ class ApiGateway { context: RequestContext, normalizedQuery: NormalizedQuery, sqlQuery: any, + cacheMode: CacheMode = 'stale-if-slow', ): Promise { - const queries = [{ + const queries: QueryBody[] = [{ ...sqlQuery, query: sqlQuery.sql[0], values: sqlQuery.sql[1], - continueWait: true, renewQuery: normalizedQuery.renewQuery, + cacheMode, requestId: context.requestId, context, persistent: false, @@ -1665,8 +1690,8 @@ class ApiGateway { ...totalQuery, query: totalQuery.sql[0], values: totalQuery.sql[1], - continueWait: true, renewQuery: normalizedTotal.renewQuery, + cacheMode, requestId: context.requestId, context }); @@ -1782,12 +1807,12 @@ class ApiGateway { this.log({ type: 'Load Request', query, streaming: true }, context); const [, normalizedQueries] = await this.getNormalizedQueries(query, context, true); const sqlQuery = (await this.getSqlQueriesInternal(context, normalizedQueries))[0]; - const q = { + const q: QueryBody = { ...sqlQuery, query: sqlQuery.sql[0], values: sqlQuery.sql[1], - continueWait: true, renewQuery: false, + cacheMode: 'stale-if-slow', requestId: context.requestId, context, persistent: true, @@ -1880,6 +1905,7 @@ class ApiGateway { context, normalizedQuery, sqlQueries[index], + props.cacheMode, ); const annotation = prepareAnnotation( @@ -1970,17 +1996,17 @@ class ApiGateway { normalizedQueries.map(q => ({ ...q, disableExternalPreAggregations: request.sqlQuery })) ); - let results; + let results: any[]; let slowQuery = false; const streamResponse = async (sqlQuery) => { - const q = { + const q: QueryBody = { ...sqlQuery, query: sqlQuery.query || sqlQuery.sql[0], values: sqlQuery.values || sqlQuery.sql[1], - continueWait: true, renewQuery: false, + cacheMode: 'stale-if-slow', requestId: context.requestId, context, persistent: true, @@ -1995,11 +2021,11 @@ class ApiGateway { }; if (request.sqlQuery) { - const finalQuery = { + const finalQuery: QueryBody = { query: request.sqlQuery[0], values: request.sqlQuery[1], - continueWait: true, renewQuery: normalizedQueries[0].renewQuery, + cacheMode: request.cacheMode, requestId: context.requestId, context, ...sqlQueries[0], diff --git a/packages/cubejs-api-gateway/src/graphql.ts b/packages/cubejs-api-gateway/src/graphql.ts index b00a9d4738b4e..b9002bf4ade5d 100644 --- a/packages/cubejs-api-gateway/src/graphql.ts +++ b/packages/cubejs-api-gateway/src/graphql.ts @@ -363,7 +363,7 @@ function parseDates(result: any) { } export function getJsonQuery(metaConfig: any, args: Record, infos: GraphQLResolveInfo) { - const { where, limit, offset, timezone, orderBy, renewQuery, ungrouped } = args; + const { where, limit, offset, timezone, orderBy, renewQuery, ungrouped, cache } = args; const measures: string[] = []; const dimensions: string[] = []; @@ -461,6 +461,7 @@ export function getJsonQuery(metaConfig: any, args: Record, infos: ...(timezone && { timezone }), ...(filters.length && { filters }), ...(renewQuery && { renewQuery }), + ...(cache && { cache }), ...(ungrouped && { ungrouped }), }; } @@ -639,6 +640,7 @@ export function makeSchema(metaConfig: any): GraphQLSchema { offset: intArg(), timezone: stringArg(), renewQuery: booleanArg(), + cache: stringArg(), ungrouped: booleanArg(), orderBy: arg({ type: 'RootOrderByInput' @@ -651,6 +653,7 @@ export function makeSchema(metaConfig: any): GraphQLSchema { apiGateway.load({ query, queryType: QueryType.REGULAR_QUERY, + ...(query.cache ? { cacheMode: query.cache } : {}), context: req.context, res: async (message) => { if (message.error) { diff --git a/packages/cubejs-api-gateway/src/query.js b/packages/cubejs-api-gateway/src/query.js index b9320d128168f..8bee96aa83802 100644 --- a/packages/cubejs-api-gateway/src/query.js +++ b/packages/cubejs-api-gateway/src/query.js @@ -187,6 +187,7 @@ const querySchema = Joi.object().keys({ limit: Joi.number().integer().strict().min(0), offset: Joi.number().integer().strict().min(0), total: Joi.boolean(), + // @deprecated renewQuery: Joi.boolean(), ungrouped: Joi.boolean(), responseFormat: Joi.valid('default', 'compact'), diff --git a/packages/cubejs-api-gateway/src/sql-server.ts b/packages/cubejs-api-gateway/src/sql-server.ts index cd3a6eeff0d05..8384547bf9149 100644 --- a/packages/cubejs-api-gateway/src/sql-server.ts +++ b/packages/cubejs-api-gateway/src/sql-server.ts @@ -10,7 +10,7 @@ import { Sql4SqlResponse, } from '@cubejs-backend/native'; import type { ShutdownMode } from '@cubejs-backend/native'; -import { displayCLIWarning, getEnv } from '@cubejs-backend/shared'; +import { displayCLIWarning, getEnv, CacheMode } from '@cubejs-backend/shared'; import * as crypto from 'crypto'; import type { ApiGateway } from './gateway'; @@ -65,8 +65,8 @@ export class SQLServer { throw new Error('Native api gateway is not enabled'); } - public async execSql(sqlQuery: string, stream: any, securityContext?: any) { - await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext); + public async execSql(sqlQuery: string, stream: any, securityContext?: any, cacheMode?: CacheMode) { + await execSql(this.sqlInterfaceInstance!, sqlQuery, stream, securityContext, cacheMode); } public async sql4sql(sqlQuery: string, disablePostProcessing: boolean, securityContext?: unknown): Promise { @@ -207,7 +207,7 @@ export class SQLServer { } }); }, - sqlApiLoad: async ({ request, session, query, queryKey, sqlQuery, streaming }) => { + sqlApiLoad: async ({ request, session, query, queryKey, sqlQuery, streaming, cacheMode }) => { const context = await contextByRequest(request, session); // eslint-disable-next-line no-async-promise-executor @@ -218,6 +218,7 @@ export class SQLServer { query, sqlQuery, streaming, + cacheMode, context, memberExpressions: true, res: (response) => { diff --git a/packages/cubejs-api-gateway/src/types/query.ts b/packages/cubejs-api-gateway/src/types/query.ts index 7c470a6aaa27c..ef8deb3bd8c89 100644 --- a/packages/cubejs-api-gateway/src/types/query.ts +++ b/packages/cubejs-api-gateway/src/types/query.ts @@ -139,6 +139,7 @@ interface Query { totalQuery?: boolean; order?: any; timezone?: string; + // @deprecated renewQuery?: boolean; ungrouped?: boolean; responseFormat?: ResultType; diff --git a/packages/cubejs-api-gateway/src/types/request.ts b/packages/cubejs-api-gateway/src/types/request.ts index 9ff0ba90121b6..ed9310b5e95da 100644 --- a/packages/cubejs-api-gateway/src/types/request.ts +++ b/packages/cubejs-api-gateway/src/types/request.ts @@ -7,6 +7,7 @@ import type { Request as ExpressRequest } from 'express'; import type { DataResult } from '@cubejs-backend/native'; +import { CacheMode } from '@cubejs-backend/shared'; import { RequestType, ApiType, ResultType } from './strings'; import { Query } from './query'; @@ -133,6 +134,7 @@ type QueryRequest = BaseRequest & { memberExpressions?: boolean; disableExternalPreAggregations?: boolean; disableLimitEnforcing?: boolean; + cacheMode?: CacheMode; }; type SqlApiRequest = BaseRequest & { @@ -142,6 +144,7 @@ type SqlApiRequest = BaseRequest & { queryKey: any; streaming?: boolean; memberExpressions?: boolean; + cacheMode?: CacheMode; }; /** diff --git a/packages/cubejs-backend-native/js/index.ts b/packages/cubejs-backend-native/js/index.ts index a7343a76cf8b2..36e3cd8a20296 100644 --- a/packages/cubejs-backend-native/js/index.ts +++ b/packages/cubejs-backend-native/js/index.ts @@ -3,6 +3,7 @@ import fs from 'fs'; import path from 'path'; import { Writable } from 'stream'; import type { Request as ExpressRequest } from 'express'; +import { CacheMode } from '@cubejs-backend/shared'; import { ResultWrapper } from './ResultWrapper'; export * from './ResultWrapper'; @@ -77,12 +78,13 @@ export interface SqlPayload { } export interface SqlApiLoadPayload { - request: Request, - session: SessionContext, - query: any, - queryKey: any, - sqlQuery: any, - streaming: boolean, + request: Request; + session: SessionContext; + query: any; + queryKey: any; + sqlQuery: any; + streaming: boolean; + cacheMode: CacheMode; } export interface LogLoadEventPayload { @@ -435,10 +437,10 @@ export const shutdownInterface = async (instance: SqlInterfaceInstance, shutdown await native.shutdownInterface(instance, shutdownMode); }; -export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any): Promise => { +export const execSql = async (instance: SqlInterfaceInstance, sqlQuery: string, stream: any, securityContext?: any, cacheMode: CacheMode = 'stale-if-slow'): Promise => { const native = loadNative(); - await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null); + await native.execSql(instance, sqlQuery, stream, securityContext ? JSON.stringify(securityContext) : null, cacheMode); }; // TODO parse result from native code diff --git a/packages/cubejs-backend-native/src/node_export.rs b/packages/cubejs-backend-native/src/node_export.rs index 7f1c38861b8ff..42470dc7f9a4c 100644 --- a/packages/cubejs-backend-native/src/node_export.rs +++ b/packages/cubejs-backend-native/src/node_export.rs @@ -223,6 +223,7 @@ async fn handle_sql_query( channel: Arc, stream_methods: WritableStreamMethods, sql_query: &str, + cache_mode: &str, ) -> Result<(), CubeError> { let span_id = Some(Arc::new(SpanId::new( Uuid::new_v4().to_string(), @@ -252,6 +253,17 @@ async fn handle_sql_query( .await?; } + let cache_enum = cache_mode.parse().map_err(CubeError::user)?; + + { + let mut cm = session + .state + .cache_mode + .write() + .expect("failed to unlock session cache_mode for change"); + *cm = Some(cache_enum); + } + let session_clone = Arc::clone(&session); let span_id_clone = span_id.clone(); @@ -424,6 +436,8 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult { Err(_) => None, }; + let cache_mode = cx.argument::(4)?.value(&mut cx); + let js_stream_on_fn = Arc::new( node_stream .get::(&mut cx, "on")? @@ -471,6 +485,7 @@ fn exec_sql(mut cx: FunctionContext) -> JsResult { channel.clone(), stream_methods, &sql_query, + &cache_mode, ) .await; diff --git a/packages/cubejs-backend-native/src/transport.rs b/packages/cubejs-backend-native/src/transport.rs index 0331f4e78add3..eaf048d126763 100644 --- a/packages/cubejs-backend-native/src/transport.rs +++ b/packages/cubejs-backend-native/src/transport.rs @@ -13,7 +13,7 @@ use crate::{ }; use async_trait::async_trait; use cubesql::compile::engine::df::scan::{ - convert_transport_response, transform_response, MemberField, RecordBatch, SchemaRef, + convert_transport_response, transform_response, CacheMode, MemberField, RecordBatch, SchemaRef, }; use cubesql::compile::engine::df::wrapper::SqlQuery; use cubesql::transport::{ @@ -91,6 +91,8 @@ struct LoadRequest { streaming: bool, #[serde(rename = "queryKey", skip_serializing_if = "Option::is_none")] query_key: Option, + #[serde(rename = "cacheMode", skip_serializing_if = "Option::is_none")] + cache_mode: Option, } #[derive(Debug, Serialize)] @@ -287,6 +289,7 @@ impl TransportService for NodeBridgeTransport { member_to_alias, expression_params, streaming: false, + cache_mode: None, })?; let response: serde_json::Value = call_js_with_channel_as_callback( @@ -338,6 +341,7 @@ impl TransportService for NodeBridgeTransport { meta: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + cache_mode: Option, ) -> Result, CubeError> { trace!("[transport] Request ->"); @@ -371,6 +375,7 @@ impl TransportService for NodeBridgeTransport { member_to_alias: None, expression_params: None, streaming: false, + cache_mode: cache_mode.clone(), })?; let result = call_raw_js_with_channel_as_callback( @@ -527,6 +532,7 @@ impl TransportService for NodeBridgeTransport { member_to_alias: None, expression_params: None, streaming: true, + cache_mode: None, })?; let res = call_js_with_stream_as_callback( diff --git a/packages/cubejs-backend-shared/src/index.ts b/packages/cubejs-backend-shared/src/index.ts index ac7ff477ac440..29f89efa7d72e 100644 --- a/packages/cubejs-backend-shared/src/index.ts +++ b/packages/cubejs-backend-shared/src/index.ts @@ -13,6 +13,7 @@ export * from './convert'; export * from './helpers'; export * from './machine-id'; export * from './type-helpers'; +export * from './shared-types'; export * from './http-utils'; export * from './cli'; export * from './proxy'; diff --git a/packages/cubejs-backend-shared/src/shared-types.ts b/packages/cubejs-backend-shared/src/shared-types.ts new file mode 100644 index 0000000000000..52eb4fc09db74 --- /dev/null +++ b/packages/cubejs-backend-shared/src/shared-types.ts @@ -0,0 +1,22 @@ +/* +stale-if-slow (default) — equivalent to previously used renewQuery: false + If refresh keys are up-to-date, returns the value from cache + If refresh keys are expired, tries to return the value from the database + Returns fresh value from the database if the query executed in the database until the first “Continue wait” interval is reached + Returns stale value from cache otherwise + +stale-while-revalidate — AKA “backgroundRefresh” + If refresh keys are up-to-date, returns the value from cache + If refresh keys are expired, returns stale data from cache + Updates the cache in background + +must-revalidate — equivalent to previously used renewQuery: true + If refresh keys are up-to-date, returns the value from cache + If refresh keys are expired, tries to return the value from the database + Returns fresh value from the database even if it takes minutes and many “Continue wait” intervals + +no-cache — AKA “forceRefresh” + Skips refresh key checks + Returns fresh data from the database, even if it takes minutes and many “Continue wait” intervals +*/ +export type CacheMode = 'stale-if-slow' | 'stale-while-revalidate' | 'must-revalidate' | 'no-cache'; diff --git a/packages/cubejs-client-core/src/types.ts b/packages/cubejs-client-core/src/types.ts index 58f5adba4c24a..400de81044e93 100644 --- a/packages/cubejs-client-core/src/types.ts +++ b/packages/cubejs-client-core/src/types.ts @@ -113,6 +113,7 @@ export interface Query { offset?: number; order?: TQueryOrderObject | TQueryOrderArray; timezone?: string; + // @deprecated renewQuery?: boolean; ungrouped?: boolean; responseFormat?: 'compact' | 'default'; diff --git a/packages/cubejs-client-dx/index.d.ts b/packages/cubejs-client-dx/index.d.ts index 9e61ed1fe0de6..c37dcb86385c0 100644 --- a/packages/cubejs-client-dx/index.d.ts +++ b/packages/cubejs-client-dx/index.d.ts @@ -19,6 +19,7 @@ declare module "@cubejs-client/core" { offset?: number; order?: IntrospectedTQueryOrderObject | IntrospectedTQueryOrderArray; timezone?: string; + // @deprecated renewQuery?: boolean; ungrouped?: boolean; } diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts index c8a6722cc9049..14c1150bb2c69 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/PreAggregations.ts @@ -498,7 +498,7 @@ export class PreAggregations { maxPartitions: this.options.maxPartitions, maxSourceRowLimit: this.options.maxSourceRowLimit, isJob: queryBody.isJob, - waitForRenew: queryBody.renewQuery, + waitForRenew: queryBody.cacheMode !== undefined ? queryBody.cacheMode === 'must-revalidate' : queryBody.renewQuery, // TODO workaround to avoid continuous waiting on building pre-aggregation dependencies forceBuild: i === preAggregations.length - 1 ? queryBody.forceBuildPreAggregations : false, requestId: queryBody.requestId, @@ -603,7 +603,7 @@ export class PreAggregations { { maxPartitions: this.options.maxPartitions, maxSourceRowLimit: this.options.maxSourceRowLimit, - waitForRenew: queryBody.renewQuery, + waitForRenew: queryBody.cacheMode !== undefined ? queryBody.cacheMode === 'must-revalidate' : queryBody.renewQuery, requestId: queryBody.requestId, externalRefresh: this.externalRefresh, compilerCacheFn: queryBody.compilerCacheFn, diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts index a492e974aa510..0887aac625065 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryCache.ts @@ -2,7 +2,13 @@ import crypto from 'crypto'; import csvWriter from 'csv-write-stream'; import { LRUCache } from 'lru-cache'; import { pipeline } from 'stream'; -import { AsyncDebounce, getEnv, MaybeCancelablePromise, streamToArray } from '@cubejs-backend/shared'; +import { + AsyncDebounce, + getEnv, + MaybeCancelablePromise, + streamToArray, + CacheMode, +} from '@cubejs-backend/shared'; import { CubeStoreCacheDriver, CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { BaseDriver, @@ -20,6 +26,23 @@ import { LoadPreAggregationResult, PreAggregationDescription } from './PreAggreg import { getCacheHash } from './utils'; import { CacheAndQueryDriverType, MetadataOperationType } from './QueryOrchestrator'; +export type CacheQueryResultOptions = { + renewalThreshold?: number, + renewalKey?: any, + priority?: number, + external?: boolean, + requestId?: string, + dataSource: string, + waitForRenew?: boolean, + forceNoCache?: boolean, + useInMemory?: boolean, + useCsvQuery?: boolean, + lambdaTypes?: TableStructure, + persistent?: boolean, + primaryQuery?: boolean, + renewCycle?: boolean, +}; + type QueryOptions = { external?: boolean; renewalThreshold?: number; @@ -46,7 +69,9 @@ export type Query = { preAggregations?: PreAggregationDescription[]; groupedPartitionPreAggregations?: PreAggregationDescription[][]; preAggregationsLoadCacheByDataSource?: any; + // @deprecated renewQuery?: boolean; + cacheMode?: CacheMode; compilerCacheFn?: (subKey: string[], cacheFn: () => T) => T; }; @@ -55,8 +80,11 @@ export type QueryBody = { persistent?: boolean; query?: string; values?: string[]; - continueWait?: boolean; + loadRefreshKeysOnly?: boolean; + scheduledRefresh?: boolean; + // @deprecated renewQuery?: boolean; + cacheMode?: CacheMode; requestId?: string; external?: boolean; isJob?: boolean; @@ -202,7 +230,7 @@ export class QueryCache { queuePriority = queryBody.queuePriority; } - const forceNoCache = queryBody.forceNoCache || false; + const forceNoCache = queryBody.forceNoCache || (queryBody.cacheMode === 'no-cache') || false; const { values } = queryBody; @@ -252,7 +280,8 @@ export class QueryCache { } } - if (queryBody.renewQuery) { + // renewQuery has been deprecated, but keeping it for now + if (queryBody.cacheMode === 'must-revalidate' || queryBody.renewQuery) { this.logger('Requested renew', { cacheKey, requestId: queryBody.requestId }); return this.renewQuery( query, @@ -270,7 +299,7 @@ export class QueryCache { ); } - if (!this.options.backgroundRenew) { + if (!this.options.backgroundRenew && queryBody.cacheMode !== 'stale-while-revalidate') { const resultPromise = this.renewQuery( query, values, @@ -843,22 +872,7 @@ export class QueryCache { values: string[], cacheKey: CacheKey, expiration: number, - options: { - renewalThreshold?: number, - renewalKey?: any, - priority?: number, - external?: boolean, - requestId?: string, - dataSource: string, - waitForRenew?: boolean, - forceNoCache?: boolean, - useInMemory?: boolean, - useCsvQuery?: boolean, - lambdaTypes?: TableStructure, - persistent?: boolean, - primaryQuery?: boolean, - renewCycle?: boolean, - } + options: CacheQueryResultOptions, ) { const spanId = crypto.randomBytes(16).toString('hex'); options = options || { dataSource: 'default' }; diff --git a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts index fe53c59a55888..f682955ae2331 100644 --- a/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts +++ b/packages/cubejs-query-orchestrator/src/orchestrator/QueryOrchestrator.ts @@ -1,6 +1,6 @@ import * as stream from 'stream'; import R from 'ramda'; -import { getEnv } from '@cubejs-backend/shared'; +import { CacheMode, getEnv } from '@cubejs-backend/shared'; import { CubeStoreDriver } from '@cubejs-backend/cubestore-driver'; import { QuerySchemasResult, @@ -408,7 +408,6 @@ export class QueryOrchestrator { const data = await this.fetchQuery({ dataSource: preAggregation.dataSource, - continueWait: true, query, external, preAggregations: [ diff --git a/packages/cubejs-server-core/src/core/CompilerApi.js b/packages/cubejs-server-core/src/core/CompilerApi.js index b00a9b92927ab..dfe6027c5eecb 100644 --- a/packages/cubejs-server-core/src/core/CompilerApi.js +++ b/packages/cubejs-server-core/src/core/CompilerApi.js @@ -504,7 +504,7 @@ export class CompilerApi { /** * - * @param {unknown} filter + * @param {unknown|undefined} filter * @returns {Promise>} */ async preAggregations(filter) { diff --git a/packages/cubejs-server-core/src/core/OrchestratorApi.ts b/packages/cubejs-server-core/src/core/OrchestratorApi.ts index 77caf0d556c3f..e715e55a9be69 100644 --- a/packages/cubejs-server-core/src/core/OrchestratorApi.ts +++ b/packages/cubejs-server-core/src/core/OrchestratorApi.ts @@ -81,7 +81,7 @@ export class OrchestratorApi { requestId: query.requestId }); - let fetchQueryPromise = query.loadRefreshKeysOnly + let fetchQueryPromise: Promise = query.loadRefreshKeysOnly ? this.orchestrator.loadRefreshKeys(query) : this.orchestrator.fetchQuery(query); @@ -120,7 +120,7 @@ export class OrchestratorApi { return data; } catch (err) { - if ((err instanceof pt.TimeoutError || err instanceof ContinueWaitError)) { + if (err instanceof pt.TimeoutError || err instanceof ContinueWaitError) { this.logger('Continue wait', { duration: ((new Date()).getTime() - startQueryTime), query: queryForLog, @@ -128,14 +128,18 @@ export class OrchestratorApi { requestId: query.requestId }); + if (query.scheduledRefresh) { + throw { + error: 'Continue wait', + stage: null + }; + } + const fromCache = await this .orchestrator .resultFromCacheIfExists(query); - if ( - !query.renewQuery && - fromCache && - !query.scheduledRefresh - ) { + + if ((query.cacheMode === 'stale-if-slow' || query.cacheMode === 'stale-while-revalidate') && fromCache) { this.logger('Slow Query Warning', { query: queryForLog, requestId: query.requestId, @@ -152,9 +156,7 @@ export class OrchestratorApi { throw { error: 'Continue wait', - stage: !query.scheduledRefresh - ? await this.orchestrator.queryStage(query) - : null + stage: await this.orchestrator.queryStage(query) }; } @@ -260,7 +262,7 @@ export class OrchestratorApi { this.seenDataSources[dataSource] = true; } - public getPreAggregationVersionEntries(context: RequestContext, preAggregations, preAggregationsSchema) { + public getPreAggregationVersionEntries(context: RequestContext, preAggregations, preAggregationsSchema): Promise { return this.orchestrator.getPreAggregationVersionEntries( preAggregations, preAggregationsSchema, diff --git a/packages/cubejs-server-core/src/core/RefreshScheduler.ts b/packages/cubejs-server-core/src/core/RefreshScheduler.ts index d9d947ec5b1b5..50b0d9b8dc924 100644 --- a/packages/cubejs-server-core/src/core/RefreshScheduler.ts +++ b/packages/cubejs-server-core/src/core/RefreshScheduler.ts @@ -368,8 +368,7 @@ export class RefreshScheduler { ...sqlQuery, sql: null, preAggregations: [], - continueWait: true, - renewQuery: true, + cacheMode: 'must-revalidate', requestId: context.requestId, scheduledRefresh: true, loadRefreshKeysOnly: true, @@ -579,8 +578,7 @@ export class RefreshScheduler { ...partition, priority: preAggregationsWarmup ? 1 : queryCursor - queries.length })), - continueWait: true, - renewQuery: true, + cacheMode: 'must-revalidate', requestId: context.requestId, timezone: timezones[timezoneCursor], scheduledRefresh: true, @@ -644,8 +642,7 @@ export class RefreshScheduler { Promise.all(partitions.map(async (partition) => { await orchestratorApi.executeQuery({ preAggregations: dependencies.concat([partition]), - continueWait: true, - renewQuery: true, + cacheMode: 'must-revalidate', forceBuildPreAggregations: queryingOptions.forceBuildPreAggregations ?? true, orphanedTimeout: 60 * 60, requestId: context.requestId, @@ -738,8 +735,7 @@ export class RefreshScheduler { async (partition): Promise => { const job = await orchestratorApi.executeQuery({ preAggregations: dependencies.concat([partition]), - continueWait: true, - renewQuery: false, + cacheMode: 'stale-if-slow', forceBuildPreAggregations: true, orphanedTimeout: 60 * 60, requestId: context.requestId, diff --git a/rust/cubesql/cubeclient/.openapi-generator/VERSION b/rust/cubesql/cubeclient/.openapi-generator/VERSION index e465da43155f4..368fd8fd8d784 100644 --- a/rust/cubesql/cubeclient/.openapi-generator/VERSION +++ b/rust/cubesql/cubeclient/.openapi-generator/VERSION @@ -1 +1 @@ -7.14.0 +7.15.0 diff --git a/rust/cubesql/cubeclient/src/models/mod.rs b/rust/cubesql/cubeclient/src/models/mod.rs index afd9f0deb3e28..599604ea7c8b0 100644 --- a/rust/cubesql/cubeclient/src/models/mod.rs +++ b/rust/cubesql/cubeclient/src/models/mod.rs @@ -33,6 +33,7 @@ pub use self::v1_error::V1Error; pub mod v1_load_continue_wait; pub use self::v1_load_continue_wait::V1LoadContinueWait; pub mod v1_load_request; +pub use self::v1_load_request::Cache; pub use self::v1_load_request::V1LoadRequest; pub mod v1_load_request_query; pub use self::v1_load_request_query::V1LoadRequestQuery; diff --git a/rust/cubesql/cubeclient/src/models/v1_load_request.rs b/rust/cubesql/cubeclient/src/models/v1_load_request.rs index c9d0c5e28fedb..8363498c4fa63 100644 --- a/rust/cubesql/cubeclient/src/models/v1_load_request.rs +++ b/rust/cubesql/cubeclient/src/models/v1_load_request.rs @@ -15,6 +15,8 @@ use serde::{Deserialize, Serialize}; pub struct V1LoadRequest { #[serde(rename = "queryType", skip_serializing_if = "Option::is_none")] pub query_type: Option, + #[serde(rename = "cache", skip_serializing_if = "Option::is_none")] + pub cache: Option, #[serde(rename = "query", skip_serializing_if = "Option::is_none")] pub query: Option, } @@ -23,7 +25,26 @@ impl V1LoadRequest { pub fn new() -> V1LoadRequest { V1LoadRequest { query_type: None, + cache: None, query: None, } } } + +#[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] +pub enum Cache { + #[serde(rename = "stale-if-slow")] + StaleIfSlow, + #[serde(rename = "stale-while-revalidate")] + StaleWhileRevalidate, + #[serde(rename = "must-revalidate")] + MustRevalidate, + #[serde(rename = "no-cache")] + NoCache, +} + +impl Default for Cache { + fn default() -> Cache { + Self::StaleIfSlow + } +} diff --git a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs index cf6e6af5f28e4..db87dca4cef9a 100644 --- a/rust/cubesql/cubesql/src/compile/engine/df/scan.rs +++ b/rust/cubesql/cubesql/src/compile/engine/df/scan.rs @@ -1,4 +1,16 @@ +use crate::compile::date_parser::parse_date_str; +use crate::{ + compile::{ + engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery}, + test::find_cube_scans_deep_search, + }, + config::ConfigObj, + sql::AuthContextRef, + transport::{CubeStreamReceiver, LoadRequestMeta, SpanId, TransportService}, + CubeError, +}; use async_trait::async_trait; +use chrono::{Datelike, NaiveDate}; use cubeclient::models::{V1LoadRequestQuery, V1LoadResponse}; pub use datafusion::{ arrow::{ @@ -18,28 +30,6 @@ pub use datafusion::{ Partitioning, PhysicalPlanner, RecordBatchStream, SendableRecordBatchStream, Statistics, }, }; -use futures::Stream; -use log::warn; -use std::{ - any::Any, - borrow::Cow, - fmt, - sync::Arc, - task::{Context, Poll}, -}; - -use crate::compile::date_parser::parse_date_str; -use crate::{ - compile::{ - engine::df::wrapper::{CubeScanWrappedSqlNode, CubeScanWrapperNode, SqlQuery}, - test::find_cube_scans_deep_search, - }, - config::ConfigObj, - sql::AuthContextRef, - transport::{CubeStreamReceiver, LoadRequestMeta, SpanId, TransportService}, - CubeError, -}; -use chrono::{Datelike, NaiveDate}; use datafusion::{ arrow::{ array::{ @@ -51,7 +41,18 @@ use datafusion::{ execution::context::TaskContext, scalar::ScalarValue, }; +use futures::Stream; +use log::warn; +use serde::Serialize; use serde_json::Value; +use std::str::FromStr; +use std::{ + any::Any, + borrow::Cow, + fmt, + sync::Arc, + task::{Context, Poll}, +}; #[derive(Debug, Clone, Eq, PartialEq)] pub struct RegularMember { @@ -79,10 +80,33 @@ impl MemberField { } } +#[derive(Debug, Clone, Serialize)] +pub enum CacheMode { + StaleIfSlow, + StaleWhileRevalidate, + MustRevalidate, + NoCache, +} + +impl FromStr for CacheMode { + type Err = String; + + fn from_str(s: &str) -> std::result::Result { + match s { + "stale-if-slow" => Ok(Self::StaleIfSlow), + "stale-while-revalidate" => Ok(Self::StaleWhileRevalidate), + "must-revalidate" => Ok(Self::MustRevalidate), + "no-cache" => Ok(Self::NoCache), + other => Err(format!("Unknown cache mode: {}", other)), + } + } +} + #[derive(Debug, Clone)] pub struct CubeScanOptions { pub change_user: Option, pub max_records: Option, + pub cache_mode: Option, } #[derive(Debug, Clone)] @@ -682,6 +706,7 @@ async fn load_data( meta, schema, member_fields, + options.cache_mode, ) .await .map_err(|err| ArrowError::ComputeError(err.to_string()))?; @@ -1192,6 +1217,7 @@ mod tests { _meta_fields: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + _cache_mode: Option, ) -> Result, CubeError> { let response = r#" { @@ -1317,6 +1343,7 @@ mod tests { options: CubeScanOptions { change_user: None, max_records: None, + cache_mode: None, }, transport: get_test_transport(), meta: get_test_load_meta(DatabaseProtocol::PostgreSQL), diff --git a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs index 3717d2e99d45d..68b715d99f4ad 100644 --- a/rust/cubesql/cubesql/src/compile/rewrite/converter.rs +++ b/rust/cubesql/cubesql/src/compile/rewrite/converter.rs @@ -2055,6 +2055,13 @@ impl LanguageToLogicalPlanConverter { let member_fields = fields.iter().map(|(_, m)| m.clone()).collect(); + let cache_mode = &*self + .cube_context + .session_state + .cache_mode + .read() + .expect("failed to read lock for session cache_mode"); + let node = Arc::new(CubeScanNode::new( Arc::new(DFSchema::new_with_metadata( fields.into_iter().map(|(f, _)| f).collect(), @@ -2066,6 +2073,7 @@ impl LanguageToLogicalPlanConverter { CubeScanOptions { change_user, max_records, + cache_mode: cache_mode.clone(), }, alias_to_cube.into_iter().map(|(_, c)| c).unique().collect(), self.span_id.clone(), diff --git a/rust/cubesql/cubesql/src/compile/test/mod.rs b/rust/cubesql/cubesql/src/compile/test/mod.rs index ce0d8fd6576a6..e9bbf05a49cdc 100644 --- a/rust/cubesql/cubesql/src/compile/test/mod.rs +++ b/rust/cubesql/cubesql/src/compile/test/mod.rs @@ -47,6 +47,7 @@ pub mod test_user_change; #[cfg(test)] pub mod test_wrapper; pub mod utils; +use crate::compile::engine::df::scan::CacheMode; use crate::compile::{ arrow::record_batch::RecordBatch, engine::df::scan::convert_transport_response, }; @@ -915,6 +916,7 @@ impl TransportService for TestConnectionTransport { meta: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + _cache_mode: Option, ) -> Result, CubeError> { { let mut calls = self.load_calls.lock().await; diff --git a/rust/cubesql/cubesql/src/sql/session.rs b/rust/cubesql/cubesql/src/sql/session.rs index dd7a7d39cfa85..91d476b9b87f8 100644 --- a/rust/cubesql/cubesql/src/sql/session.rs +++ b/rust/cubesql/cubesql/src/sql/session.rs @@ -9,6 +9,7 @@ use std::{ use tokio_util::sync::CancellationToken; use super::{server_manager::ServerManager, session_manager::SessionManager, AuthContextRef}; +use crate::compile::engine::df::scan::CacheMode; use crate::{ compile::{ DatabaseProtocol, DatabaseProtocolDetails, DatabaseVariable, DatabaseVariables, @@ -92,6 +93,8 @@ pub struct SessionState { pub statements: RWLockAsync>, auth_context_expiration: Duration, + + pub cache_mode: RwLockSync>, } impl SessionState { @@ -123,6 +126,7 @@ impl SessionState { query: RwLockSync::new(QueryState::None), statements: RWLockAsync::new(HashMap::new()), auth_context_expiration, + cache_mode: RwLockSync::new(None), } } diff --git a/rust/cubesql/cubesql/src/transport/mod.rs b/rust/cubesql/cubesql/src/transport/mod.rs index 9977caa19737a..ac625e874a2c1 100644 --- a/rust/cubesql/cubesql/src/transport/mod.rs +++ b/rust/cubesql/cubesql/src/transport/mod.rs @@ -22,6 +22,7 @@ pub type CubeMetaMeasureFormat = cubeclient::models::V1CubeMetaMeasureFormat; pub type TransportLoadResponse = cubeclient::models::V1LoadResponse; pub type TransportLoadRequestQuery = cubeclient::models::V1LoadRequestQuery; pub type TransportLoadRequest = cubeclient::models::V1LoadRequest; +pub type TransportLoadRequestCacheMode = cubeclient::models::Cache; pub type TransportMetaResponse = cubeclient::models::V1MetaResponse; pub type TransportError = cubeclient::models::V1Error; diff --git a/rust/cubesql/cubesql/src/transport/service.rs b/rust/cubesql/cubesql/src/transport/service.rs index 8b5a9dabdf20f..e6e5e32591f7d 100644 --- a/rust/cubesql/cubesql/src/transport/service.rs +++ b/rust/cubesql/cubesql/src/transport/service.rs @@ -25,6 +25,8 @@ use tokio::{ }; use uuid::Uuid; +use crate::compile::engine::df::scan::CacheMode; +use crate::transport::TransportLoadRequestCacheMode; use crate::{ compile::{ engine::df::{ @@ -142,6 +144,7 @@ pub trait TransportService: Send + Sync + Debug { meta_fields: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + cache_mode: Option, ) -> Result, CubeError>; async fn load_stream( @@ -282,6 +285,7 @@ impl TransportService for HttpTransport { meta: LoadRequestMeta, schema: SchemaRef, member_fields: Vec, + cache_mode: Option, ) -> Result, CubeError> { if meta.change_user().is_some() { return Err(CubeError::internal( @@ -290,10 +294,23 @@ impl TransportService for HttpTransport { )); } + let cache_mode = match cache_mode { + None => None, + Some(m) => match m { + CacheMode::StaleIfSlow => Some(TransportLoadRequestCacheMode::StaleIfSlow), + CacheMode::StaleWhileRevalidate => { + Some(TransportLoadRequestCacheMode::StaleWhileRevalidate) + } + CacheMode::MustRevalidate => Some(TransportLoadRequestCacheMode::MustRevalidate), + CacheMode::NoCache => Some(TransportLoadRequestCacheMode::NoCache), + }, + }; + // TODO: support meta_fields for HTTP let request = TransportLoadRequest { query: Some(query), query_type: Some("multi".to_string()), + cache: cache_mode, }; let response = cube_api::load_v1(&self.get_client_config_for_ctx(ctx), Some(request)).await?;