Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions packages/cubejs-server-core/src/core/OrchestratorStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ export class OrchestratorStorage {
});
}

protected readonly initializers: Map<string, Promise<OrchestratorApi>> = new Map();

public has(orchestratorId: string) {
return this.storage.has(orchestratorId);
}
Expand All @@ -24,6 +26,29 @@ export class OrchestratorStorage {
return this.storage.set(orchestratorId, orchestratorApi);
}

public async getOrInit(orchestratorId: string, init: () => Promise<OrchestratorApi>): Promise<OrchestratorApi> {
if (this.storage.has(orchestratorId)) {
return this.storage.get(orchestratorId);
}

if (this.initializers.has(orchestratorId)) {
return this.initializers.get(orchestratorId);
}

try {
const initPromise = init();
this.initializers.set(orchestratorId, initPromise);

const instance = await initPromise;

this.storage.set(orchestratorId, instance);

return instance;
} finally {
this.initializers.delete(orchestratorId);
}
}

public clear() {
this.storage.clear();
}
Expand Down
168 changes: 81 additions & 87 deletions packages/cubejs-server-core/src/core/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -569,88 +569,49 @@ export class CubejsServerCore {
public async getOrchestratorApi(context: RequestContext): Promise<OrchestratorApi> {
const orchestratorId = await this.contextToOrchestratorId(context);

if (this.orchestratorStorage.has(orchestratorId)) {
return this.orchestratorStorage.get(orchestratorId);
}

/**
* Hash table to store promises which will be resolved with the
* datasource drivers. DriverFactoryByDataSource function is closure
* this constant.
*/
const driverPromise: Record<string, Promise<BaseDriver>> = {};

let externalPreAggregationsDriverPromise: Promise<BaseDriver> | null = null;

const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this);
const externalDbType = this.contextToExternalDbType(context);

// orchestrator options can be empty, if user didn't define it.
// so we are adding default and configuring queues concurrency.
const orchestratorOptions =
this.optsHandler.getOrchestratorInitializedOptions(
context,
(await this.orchestratorOptions(context)) || {},
);

const orchestratorApi = this.createOrchestratorApi(
return this.orchestratorStorage.getOrInit(orchestratorId, async () => {
/**
* Driver factory function `DriverFactoryByDataSource`.
* Hash table to store promises which will be resolved with the
* datasource drivers. DriverFactoryByDataSource function is a closure
* this constant.
*/
async (dataSource = 'default') => {
if (driverPromise[dataSource]) {
return driverPromise[dataSource];
}
const driverPromise: Record<string, Promise<BaseDriver>> = {};

// eslint-disable-next-line no-return-assign
return driverPromise[dataSource] = (async () => {
let driver: BaseDriver | null = null;

try {
driver = await this.resolveDriver(
{
...context,
dataSource,
},
orchestratorOptions,
);

if (typeof driver === 'object' && driver != null) {
if (driver.setLogger) {
driver.setLogger(this.logger);
}
let externalPreAggregationsDriverPromise: Promise<BaseDriver> | null = null;

await driver.testConnection();
const contextToDbType: DbTypeAsyncFn = this.contextToDbType.bind(this);
const externalDbType = this.contextToExternalDbType(context);

return driver;
}

throw new Error(
`Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}`
);
} catch (e) {
driverPromise[dataSource] = null;

if (driver) {
await driver.release();
}
// orchestrator options can be empty, if user didn't define it.
// so we are adding default and configuring queues concurrency.
const orchestratorOptions =
this.optsHandler.getOrchestratorInitializedOptions(
context,
(await this.orchestratorOptions(context)) || {},
);

throw e;
}
})();
},
{
externalDriverFactory: this.options.externalDriverFactory && (async () => {
if (externalPreAggregationsDriverPromise) {
return externalPreAggregationsDriverPromise;
return this.createOrchestratorApi(
/**
* Driver factory function `DriverFactoryByDataSource`.
*/
async (dataSource = 'default') => {
if (driverPromise[dataSource]) {
return driverPromise[dataSource];
}

// eslint-disable-next-line no-return-assign
return externalPreAggregationsDriverPromise = (async () => {
return driverPromise[dataSource] = (async () => {
let driver: BaseDriver | null = null;

try {
driver = await this.options.externalDriverFactory(context);
driver = await this.resolveDriver(
{
...context,
dataSource,
},
orchestratorOptions,
);

if (typeof driver === 'object' && driver != null) {
if (driver.setLogger) {
driver.setLogger(this.logger);
Expand All @@ -662,10 +623,10 @@ export class CubejsServerCore {
}

throw new Error(
`Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}`
`Unexpected return type, driverFactory must return driver (dataSource: "${dataSource}"), actual: ${getRealType(driver)}`
);
} catch (e) {
externalPreAggregationsDriverPromise = null;
driverPromise[dataSource] = null;

if (driver) {
await driver.release();
Expand All @@ -674,23 +635,56 @@ export class CubejsServerCore {
throw e;
}
})();
}),
contextToDbType: async (dataSource) => contextToDbType({
...context,
dataSource
}),
// speedup with cache
contextToExternalDbType: () => externalDbType,
redisPrefix: orchestratorId,
skipExternalCacheAndQueue: externalDbType === 'cubestore',
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
...orchestratorOptions,
}
);
},
{
externalDriverFactory: this.options.externalDriverFactory && (async () => {
if (externalPreAggregationsDriverPromise) {
return externalPreAggregationsDriverPromise;
}

// eslint-disable-next-line no-return-assign
return externalPreAggregationsDriverPromise = (async () => {
let driver: BaseDriver | null = null;

try {
driver = await this.options.externalDriverFactory(context);
if (typeof driver === 'object' && driver != null) {
if (driver.setLogger) {
driver.setLogger(this.logger);
}

this.orchestratorStorage.set(orchestratorId, orchestratorApi);
await driver.testConnection();

return driver;
}

return orchestratorApi;
throw new Error(
`Unexpected return type, externalDriverFactory must return driver, actual: ${getRealType(driver)}`
);
} catch (e) {
externalPreAggregationsDriverPromise = null;

if (driver) {
await driver.release();
}

throw e;
}
})();
}),
contextToDbType: async (dataSource) => contextToDbType({
...context,
dataSource
}),
// speedup with cache
contextToExternalDbType: () => externalDbType,
redisPrefix: orchestratorId,
skipExternalCacheAndQueue: externalDbType === 'cubestore',
cacheAndQueueDriver: this.options.cacheAndQueueDriver,
...orchestratorOptions,
}
);
});
}

protected createCompilerApi(repository, options: Record<string, any> = {}) {
Expand Down
Loading