Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 189 additions & 0 deletions src/app/data-client.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,14 @@ import {
CaptureInterval,
ConfigureDatabaseUserRequest,
ConfigureDatabaseUserResponse,
CreateIndexRequest,
CreateIndexResponse,
DataRequest,
DeleteBinaryDataByFilterRequest,
DeleteBinaryDataByFilterResponse,
DeleteBinaryDataByIDsResponse,
DeleteIndexRequest,
DeleteIndexResponse,
DeleteTabularDataResponse,
ExportTabularDataRequest,
ExportTabularDataResponse,
Expand All @@ -35,6 +39,11 @@ import {
GetDatabaseConnectionResponse,
GetLatestTabularDataRequest,
GetLatestTabularDataResponse,
Index,
IndexableCollection,
IndexCreator,
ListIndexesRequest,
ListIndexesResponse,
RemoveBinaryDataFromDatasetByIDsRequest,
RemoveBinaryDataFromDatasetByIDsResponse,
RemoveBoundingBoxFromImageByIDRequest,
Expand All @@ -46,6 +55,7 @@ import {
TabularData,
TabularDataByFilterRequest,
TabularDataByFilterResponse,
TabularDataByMQLRequest,
TabularDataByMQLResponse,
TabularDataBySQLResponse,
TabularDataSourceType,
Expand Down Expand Up @@ -325,6 +335,36 @@ describe('DataClient tests', () => {
expect(result[0]?.key1).toBeInstanceOf(Date);
expect(promise).toEqual(data);
});

it('get tabular data from MQL with queryPrefixName', async () => {
const expectedRequest = new TabularDataByMQLRequest({
organizationId: 'some_org_id',
mqlBinary: [BSON.serialize({ query: 'some_mql_query' })],
queryPrefixName: 'my_prefix',
});
let capReq: TabularDataByMQLRequest | undefined = undefined;
mockTransport = createRouterTransport(({ service }) => {
service(DataService, {
tabularDataByMQL: (req) => {
capReq = req;
return new TabularDataByMQLResponse({
rawData: data.map((x) => BSON.serialize(x)),
});
},
});
});
const promise = await subject().tabularDataByMQL(
'some_org_id',
[{ query: 'some_mql_query' }],
false,
undefined,
'my_prefix'
);
expect(capReq).toStrictEqual(expectedRequest);
const result = promise as typeof data;
expect(result[0]?.key1).toBeInstanceOf(Date);
expect(promise).toEqual(data);
});
});

describe('tabularDataByFilter tests', () => {
Expand Down Expand Up @@ -1040,6 +1080,155 @@ describe('DataClient tests', () => {
});
});

describe('createIndex tests', () => {
let capReq: CreateIndexRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataService, {
createIndex: (req) => {
capReq = req;
return new CreateIndexResponse();
},
});
});
});
it('creates an index', async () => {
const organizationId = 'orgId';
const collectionType = IndexableCollection.HOT_STORE;
const indexSpec = { keys: { field: 1 }, options: { priority: 1 } };
const pipelineName = 'pipeline1';
await subject().createIndex(
organizationId,
collectionType,
indexSpec,
pipelineName
);
expect(capReq.organizationId).toBe(organizationId);
expect(capReq.collectionType).toBe(collectionType);
expect(
capReq.indexSpec.map((spec) => BSON.deserialize(spec))[0]
).toStrictEqual(indexSpec);
expect(capReq.pipelineName).toBe(pipelineName);
});
it('creates an index without pipeline name', async () => {
const organizationId = 'orgId';
const collectionType = IndexableCollection.HOT_STORE;
const indexSpec = { keys: { field: 2 }, options: { priority: 2 } };
await subject().createIndex(organizationId, collectionType, indexSpec);
expect(capReq.organizationId).toBe(organizationId);
expect(capReq.collectionType).toBe(collectionType);
expect(
capReq.indexSpec.map((spec) => BSON.deserialize(spec))[0]
).toStrictEqual(indexSpec);
});
});
describe('listIndexes tests', () => {
let capReq: ListIndexesRequest;
const index1 = new Index({
collectionType: IndexableCollection.HOT_STORE,
indexName: 'index1',
indexSpec: [new TextEncoder().encode(JSON.stringify({ field: 1 }))],
createdBy: IndexCreator.CUSTOMER,
});
const index2 = new Index({
collectionType: IndexableCollection.PIPELINE_SINK,
pipelineName: 'pipeline1',
indexName: 'index2',
indexSpec: [
new TextEncoder().encode(JSON.stringify({ another_field: -1 })),
],
createdBy: IndexCreator.VIAM,
});
const indexes = [index1, index2];
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataService, {
listIndexes: (req) => {
capReq = req;
return new ListIndexesResponse({
indexes,
});
},
});
});
});
it('lists indexes', async () => {
const organizationId = 'orgId';
const collectionType = IndexableCollection.HOT_STORE;
const pipelineName = 'pipeline1';
const expectedRequest = new ListIndexesRequest({
organizationId,
collectionType,
pipelineName,
});
const result = await subject().listIndexes(
organizationId,
collectionType,
pipelineName
);
expect(capReq).toStrictEqual(expectedRequest);
expect(result).toEqual(indexes);
});
it('lists indexes without pipeline name', async () => {
const organizationId = 'orgId';
const collectionType = IndexableCollection.HOT_STORE;
const expectedRequest = new ListIndexesRequest({
organizationId,
collectionType,
});
const result = await subject().listIndexes(
organizationId,
collectionType
);
expect(capReq).toStrictEqual(expectedRequest);
expect(result).toEqual(indexes);
});
});
describe('deleteIndex tests', () => {
let capReq: DeleteIndexRequest;
beforeEach(() => {
mockTransport = createRouterTransport(({ service }) => {
service(DataService, {
deleteIndex: (req) => {
capReq = req;
return new DeleteIndexResponse();
},
});
});
});
it('deletes an index', async () => {
const organizationId = 'orgId';
const collectionType = IndexableCollection.HOT_STORE;
const indexName = 'my_index';
const pipelineName = 'pipeline1';
const expectedRequest = new DeleteIndexRequest({
organizationId,
collectionType,
indexName,
pipelineName,
});
await subject().deleteIndex(
organizationId,
collectionType,
indexName,
pipelineName
);
expect(capReq).toStrictEqual(expectedRequest);
});
it('deletes an index without pipeline name', async () => {
const organizationId = 'orgId';
const collectionType = IndexableCollection.HOT_STORE;
const indexName = 'my_index';
const expectedRequest = new DeleteIndexRequest({
organizationId,
collectionType,
indexName,
});
await subject().deleteIndex(organizationId, collectionType, indexName);
expect(capReq).toStrictEqual(expectedRequest);
});
});

describe('createFilter tests', () => {
it('create empty filter', () => {
const testFilter = DataClient.createFilter({});
Expand Down
117 changes: 114 additions & 3 deletions src/app/data-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import {
CaptureInterval,
CaptureMetadata,
Filter,
Index,
IndexableCollection,
Order,
TabularDataSource,
TabularDataSourceType,
Expand Down Expand Up @@ -117,7 +119,7 @@ export class DataClient {
private datasetClient: Client<typeof DatasetService>;
private dataSyncClient: Client<typeof DataSyncService>;
private dataPipelinesClient: Client<typeof DataPipelinesService>;
static readonly UPLOAD_CHUNK_SIZE = 8;
static readonly UPLOAD_CHUNK_SIZE = 1024 * 64;

constructor(transport: Transport) {
this.dataClient = createClient(DataService, transport);
Expand Down Expand Up @@ -273,13 +275,15 @@ export class DataClient {
* store. Defaults to false. Deprecated - use dataSource instead.
* @param dataSource The data source to query. Defaults to the standard data
* source.
* @param queryPrefixName Optional name of the query prefix.
* @returns An array of data objects
*/
async tabularDataByMQL(
organizationId: string,
query: Uint8Array[] | Record<string, Date | JsonValue>[],
useRecentData?: boolean,
tabularDataSource?: TabularDataSource
tabularDataSource?: TabularDataSource,
queryPrefixName?: string
) {
const binary: Uint8Array[] =
query[0] instanceof Uint8Array
Expand All @@ -301,6 +305,7 @@ export class DataClient {
organizationId,
mqlBinary: binary,
dataSource,
queryPrefixName,
});
return resp.rawData.map((value) => BSON.deserialize(value));
}
Expand Down Expand Up @@ -1544,6 +1549,7 @@ export class DataClient {
* @param query The MQL query to run as a list of BSON documents
* @param schedule The schedule to run the query on (cron expression)
* @param dataSourceType The type of data source to use for the data pipeline
* @param enableBackfill Whether to enable backfill for the data pipeline
* @returns The ID of the created data pipeline
*/
async createDataPipeline(
Expand Down Expand Up @@ -1630,6 +1636,107 @@ export class DataClient {
resp.nextPageToken
);
}

/**
* CreateIndex starts a custom index build
*
* @example
*
* ```ts
* await dataClient.createIndex(
* '123abc45-1234-5678-90ab-cdef12345678',
* IndexableCollection.HOT_STORE,
* [new TextEncoder().encode(JSON.stringify({ field: 1 }))]
* );
* ```
*
* @param organizationId The ID of the organization
* @param collectionType The type of collection to create the index on
* @param indexSpec The MongoDB index specification in JSON format, as a
* Uint8Array
* @param pipelineName Optional name of the pipeline if collectionType is
* PIPELINE_SINK
*/
async createIndex(
organizationId: string,
collectionType: IndexableCollection,
indexSpec: {
keys: Record<string, number>;
options?: Record<string, unknown>;
},
pipelineName?: string
) {
await this.dataClient.createIndex({
organizationId,
collectionType,
indexSpec: [BSON.serialize(indexSpec)],
pipelineName,
});
}

/**
* ListIndexes returns all the indexes for a given collection
*
* @example
*
* ```ts
* const indexes = await dataClient.listIndexes(
* '123abc45-1234-5678-90ab-cdef12345678',
* IndexableCollection.HOT_STORE
* );
* ```
*
* @param organizationId The ID of the organization
* @param collectionType The type of collection to list indexes for
* @param pipelineName Optional name of the pipeline if collectionType is
* PIPELINE_SINK
* @returns An array of indexes
*/
async listIndexes(
organizationId: string,
collectionType: IndexableCollection,
pipelineName?: string
): Promise<Index[]> {
const resp = await this.dataClient.listIndexes({
organizationId,
collectionType,
pipelineName,
});
return resp.indexes;
}

/**
* DeleteIndex drops the specified custom index from a collection
*
* @example
*
* ```ts
* await dataClient.deleteIndex(
* '123abc45-1234-5678-90ab-cdef12345678',
* IndexableCollection.HOT_STORE,
* 'my_index'
* );
* ```
*
* @param organizationId The ID of the organization
* @param collectionType The type of collection to delete the index from
* @param indexName The name of the index to delete
* @param pipelineName Optional name of the pipeline if collectionType is
* PIPELINE_SINK
*/
async deleteIndex(
organizationId: string,
collectionType: IndexableCollection,
indexName: string,
pipelineName?: string
) {
await this.dataClient.deleteIndex({
organizationId,
collectionType,
indexName,
pipelineName,
});
}
}

export class ListDataPipelineRunsPage {
Expand Down Expand Up @@ -1682,5 +1789,9 @@ export class ListDataPipelineRunsPage {
}
}

export { type BinaryID, type Order } from '../gen/app/data/v1/data_pb';
export {
type BinaryID,
type IndexableCollection,
type Order,
} from '../gen/app/data/v1/data_pb';
export { type UploadMetadata } from '../gen/app/datasync/v1/data_sync_pb';
Loading