diff --git a/src/buffer/base.ts b/src/buffer/base.ts index 442d7c5..e8c0de3 100644 --- a/src/buffer/base.ts +++ b/src/buffer/base.ts @@ -11,6 +11,7 @@ import { } from "./index"; import { isInteger, + getDimensions, timestampToMicros, timestampToNanos, TimestampUnit, @@ -235,6 +236,8 @@ abstract class SenderBufferBase implements SenderBuffer { */ abstract floatColumn(name: string, value: number): SenderBuffer; + abstract arrayColumn(name: string, value: unknown[]): SenderBuffer; + /** * Write an integer column with its value into the buffer of the sender. * @@ -367,36 +370,66 @@ abstract class SenderBufferBase implements SenderBuffer { this.writeEscaped(name); this.write("="); writeValue(); + this.assertBufferOverflow(); this.hasColumns = true; } protected write(data: string) { this.position += this.buffer.write(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); - } } protected writeByte(data: number) { this.position = this.buffer.writeInt8(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); - } + } + + protected writeInt(data: number) { + this.position = this.buffer.writeInt32LE(data, this.position); } protected writeDouble(data: number) { this.position = this.buffer.writeDoubleLE(data, this.position); - if (this.position > this.bufferSize) { - // should never happen, if checkCapacity() is correctly used - throw new Error( - `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, - ); + } + + protected writeArray(arr: unknown[]) { + const dimensions = getDimensions(arr); + this.checkCapacity([], 1 + dimensions.length * 4); + this.writeByte(dimensions.length); + let numOfElements = 1; + for (let i = 0; i < dimensions.length; i++) { + numOfElements *= dimensions[i]; + this.writeInt(dimensions[i]); + } + + this.checkCapacity([], numOfElements * 8); + this.writeArrayValues(arr, dimensions); + } + + private writeArrayValues(arr: unknown[], dimensions: number[]) { + if (Array.isArray(arr[0])) { + const length = arr[0].length; + for (let i = 0; i < arr.length; i++) { + const subArray = arr[i] as unknown[]; + if (subArray.length !== length) { + throw new Error( + `length does not match array dimensions [dimensions=[${dimensions}], length=${subArray.length}]`, + ); + } + this.writeArrayValues(subArray, dimensions); + } + } else { + const dataType = typeof arr[0]; + switch (dataType) { + case "number": + for (let i = 0; i < arr.length; i++) { + this.position = this.buffer.writeDoubleLE( + arr[i] as number, + this.position, + ); + } + break; + default: + throw new Error(`unsupported array type [type=${dataType}]`); + } } } @@ -436,6 +469,15 @@ abstract class SenderBufferBase implements SenderBuffer { } } } + + private assertBufferOverflow() { + if (this.position > this.bufferSize) { + // should never happen, if checkCapacity() is correctly used + throw new Error( + `Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`, + ); + } + } } export { SenderBufferBase }; diff --git a/src/buffer/bufferv1.ts b/src/buffer/bufferv1.ts index aa2a37e..0efe7c5 100644 --- a/src/buffer/bufferv1.ts +++ b/src/buffer/bufferv1.ts @@ -21,6 +21,10 @@ class SenderBufferV1 extends SenderBufferBase { ); return this; } + + arrayColumn(): SenderBuffer { + throw new Error("Arrays are not supported in protocol v1"); + } } export { SenderBufferV1 }; diff --git a/src/buffer/bufferv2.ts b/src/buffer/bufferv2.ts index e21fdb1..168d5b5 100644 --- a/src/buffer/bufferv2.ts +++ b/src/buffer/bufferv2.ts @@ -3,7 +3,12 @@ import { SenderOptions } from "../options"; import { SenderBuffer } from "./index"; import { SenderBufferBase } from "./base"; +const COLUMN_TYPE_DOUBLE: number = 10; +const COLUMN_TYPE_NULL: number = 33; + +const ENTITY_TYPE_ARRAY: number = 14; const ENTITY_TYPE_DOUBLE: number = 16; + const EQUALS_SIGN: number = "=".charCodeAt(0); class SenderBufferV2 extends SenderBufferBase { @@ -25,6 +30,25 @@ class SenderBufferV2 extends SenderBufferBase { ); return this; } + + arrayColumn(name: string, value: unknown[]): SenderBuffer { + if (value && !Array.isArray(value)) { + throw new Error(`Value must be an array, received ${value}`); + } + this.writeColumn(name, value, () => { + this.checkCapacity([], 3); + this.writeByte(EQUALS_SIGN); + this.writeByte(ENTITY_TYPE_ARRAY); + + if (!value) { + this.writeByte(COLUMN_TYPE_NULL); + } else { + this.writeByte(COLUMN_TYPE_DOUBLE); + this.writeArray(value); + } + }); + return this; + } } export { SenderBufferV2 }; diff --git a/src/buffer/index.ts b/src/buffer/index.ts index fdac848..cd11d09 100644 --- a/src/buffer/index.ts +++ b/src/buffer/index.ts @@ -109,6 +109,8 @@ interface SenderBuffer { */ floatColumn(name: string, value: number): SenderBuffer; + arrayColumn(name: string, value: unknown[]): SenderBuffer; + /** * Write an integer column with its value into the buffer of the sender. * diff --git a/src/sender.ts b/src/sender.ts index e06df2e..a5767d9 100644 --- a/src/sender.ts +++ b/src/sender.ts @@ -234,6 +234,11 @@ class Sender { return this; } + arrayColumn(name: string, value: unknown[]): Sender { + this.buffer.arrayColumn(name, value); + return this; + } + /** * Write an integer column with its value into the buffer of the sender. * diff --git a/src/utils.ts b/src/utils.ts index 734740d..9a1b8a5 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -36,6 +36,18 @@ function timestampToNanos(timestamp: bigint, unit: TimestampUnit) { } } +function getDimensions(arr: unknown) { + const dimensions: number[] = []; + while (Array.isArray(arr)) { + if (arr.length === 0) { + throw new Error("zero length array not supported"); + } + dimensions.push(arr.length); + arr = arr[0]; + } + return dimensions; +} + async function fetchJson(url: string): Promise { let response: globalThis.Response; try { @@ -59,4 +71,5 @@ export { timestampToNanos, TimestampUnit, fetchJson, + getDimensions, }; diff --git a/test/sender.buffer.test.ts b/test/sender.buffer.test.ts index 4f43623..1fd55f8 100644 --- a/test/sender.buffer.test.ts +++ b/test/sender.buffer.test.ts @@ -167,6 +167,124 @@ describe("Sender message builder test suite (anything not covered in client inte await sender.close(); }); + it("does not support arrays with protocol v1", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "1", + host: "host", + init_buf_size: 1024, + }); + expect(() => + sender.table("tableName").arrayColumn("arrayCol", [12.3, 23.4]), + ).toThrow("Arrays are not supported in protocol v1"); + await sender.close(); + }); + + it("supports arrays with protocol v2", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + await sender + .table("tableName") + .arrayColumn("arrayCol", [12.3, 23.4]) + .atNow(); + expect(bufferContentHex(sender)).toBe( + toHex("tableName arrayCol==") + + " 0e 0a 01 02 00 00 00 9a 99 99 99 99 99 28 40 66 66 66 66 66 66 37 40 " + + toHex("\n"), + ); + await sender.close(); + }); + + it("supports multidimensional arrays with protocol v2", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + await sender + .table("tableName") + .arrayColumn("arrayCol", [[12.3], [23.4]]) + .atNow(); + expect(bufferContentHex(sender)).toBe( + toHex("tableName arrayCol==") + + " 0e 0a 02 02 00 00 00 01 00 00 00 9a 99 99 99 99 99 28 40 66 66 66 66 66 66 37 40 " + + toHex("\n"), + ); + await sender.close(); + }); + + it("does not accept empty array", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + sender.table("tableName"); + expect(() => sender.arrayColumn("arrayCol", [])).toThrow( + "zero length array not supported", + ); + expect(() => sender.arrayColumn("arrayCol", [[], []])).toThrow( + "zero length array not supported", + ); + await sender.close(); + }); + + it("does not accept irregular array", async function () { + const sender = new Sender({ + protocol: "tcp", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + expect(() => + sender.table("tableName").arrayColumn("arrayCol", [[1.1, 2.2], [3.3]]), + ).toThrow( + "length does not match array dimensions [dimensions=[2,2], length=1]", + ); + await sender.close(); + }); + + it("does not accept unsupported types", async function () { + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + sender.table("tableName"); + expect(() => sender.arrayColumn("col", ['str'])).toThrow("unsupported array type [type=string]"); + expect(() => sender.arrayColumn("col", [true])).toThrow("unsupported array type [type=boolean]"); + expect(() => sender.arrayColumn("col", [{}])).toThrow("unsupported array type [type=object]"); + expect(() => sender.arrayColumn("col", [null])).toThrow("unsupported array type [type=object]"); + expect(() => sender.arrayColumn("col", [undefined])).toThrow("unsupported array type [type=undefined]"); + await sender.close(); + }); + + it("supports arrays with NULL value", async function () { + const sender = new Sender({ + protocol: "http", + protocol_version: "2", + host: "host", + init_buf_size: 1024, + }); + await sender.table("tableName").arrayColumn("arrayCol", undefined).atNow(); + await sender.table("tableName").arrayColumn("arrayCol", null).atNow(); + expect(bufferContentHex(sender)).toBe( + toHex("tableName arrayCol==") + + " 0e 21 " + + toHex("\ntableName arrayCol==") + + " 0e 21 " + + toHex("\n"), + ); + await sender.close(); + }); + it("supports timestamp field as number", async function () { const sender = new Sender({ protocol: "tcp", @@ -838,6 +956,21 @@ function bufferContent(sender: Sender) { return sender.buffer.toBufferView().toString(); } +function bufferContentHex(sender: Sender) { + // @ts-expect-error - Accessing private field + return toHexString(sender.buffer.toBufferView()); +} + +function toHex(str: string) { + return toHexString(Buffer.from(str)); +} + +function toHexString(buffer: Buffer) { + return Array.from(buffer) + .map((b) => b.toString(16).padStart(2, "0")) + .join(" "); +} + function bufferSize(sender: Sender) { // @ts-expect-error - Accessing private field return sender.buffer.bufferSize;