Skip to content

feat(nodejs): array support #50

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: binary_protocol_arrays
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions src/buffer/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
} from "./index";
import {
isInteger,
getDimensions,
timestampToMicros,
timestampToNanos,
TimestampUnit,
Expand Down Expand Up @@ -235,6 +236,8 @@ abstract class SenderBufferBase implements SenderBuffer {
*/
abstract floatColumn(name: string, value: number): SenderBuffer;

abstract arrayColumn(name: string, value: unknown[]): SenderBuffer;

/**
* Write an integer column with its value into the buffer of the sender.
*
Expand Down Expand Up @@ -367,36 +370,66 @@ abstract class SenderBufferBase implements SenderBuffer {
this.writeEscaped(name);
this.write("=");
writeValue();
this.assertBufferOverflow();
this.hasColumns = true;
}

protected write(data: string) {
this.position += this.buffer.write(data, this.position);
if (this.position > this.bufferSize) {
// should never happen, if checkCapacity() is correctly used
throw new Error(
`Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`,
);
}
}

protected writeByte(data: number) {
this.position = this.buffer.writeInt8(data, this.position);
if (this.position > this.bufferSize) {
// should never happen, if checkCapacity() is correctly used
throw new Error(
`Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`,
);
}
}

protected writeInt(data: number) {
this.position = this.buffer.writeInt32LE(data, this.position);
}

protected writeDouble(data: number) {
this.position = this.buffer.writeDoubleLE(data, this.position);
if (this.position > this.bufferSize) {
// should never happen, if checkCapacity() is correctly used
throw new Error(
`Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`,
);
}

protected writeArray(arr: unknown[]) {
const dimensions = getDimensions(arr);
this.checkCapacity([], 1 + dimensions.length * 4);
this.writeByte(dimensions.length);
let numOfElements = 1;
for (let i = 0; i < dimensions.length; i++) {
numOfElements *= dimensions[i];
this.writeInt(dimensions[i]);
}

this.checkCapacity([], numOfElements * 8);
this.writeArrayValues(arr, dimensions);
}

private writeArrayValues(arr: unknown[], dimensions: number[]) {
if (Array.isArray(arr[0])) {
const length = arr[0].length;
for (let i = 0; i < arr.length; i++) {
const subArray = arr[i] as unknown[];
if (subArray.length !== length) {
throw new Error(
`length does not match array dimensions [dimensions=[${dimensions}], length=${subArray.length}]`,
);
}
this.writeArrayValues(subArray, dimensions);
}
} else {
const dataType = typeof arr[0];
switch (dataType) {
case "number":
for (let i = 0; i < arr.length; i++) {
this.position = this.buffer.writeDoubleLE(
arr[i] as number,
this.position,
);
}
break;
default:
throw new Error(`unsupported array type [type=${dataType}]`);
}
}
}

Expand Down Expand Up @@ -436,6 +469,15 @@ abstract class SenderBufferBase implements SenderBuffer {
}
}
}

private assertBufferOverflow() {
if (this.position > this.bufferSize) {
// should never happen, if checkCapacity() is correctly used
throw new Error(
`Buffer overflow [position=${this.position}, bufferSize=${this.bufferSize}]`,
);
}
}
}

export { SenderBufferBase };
4 changes: 4 additions & 0 deletions src/buffer/bufferv1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class SenderBufferV1 extends SenderBufferBase {
);
return this;
}

arrayColumn(): SenderBuffer {
throw new Error("Arrays are not supported in protocol v1");
}
}

export { SenderBufferV1 };
24 changes: 24 additions & 0 deletions src/buffer/bufferv2.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@ import { SenderOptions } from "../options";
import { SenderBuffer } from "./index";
import { SenderBufferBase } from "./base";

const COLUMN_TYPE_DOUBLE: number = 10;
const COLUMN_TYPE_NULL: number = 33;

const ENTITY_TYPE_ARRAY: number = 14;
const ENTITY_TYPE_DOUBLE: number = 16;

const EQUALS_SIGN: number = "=".charCodeAt(0);

class SenderBufferV2 extends SenderBufferBase {
Expand All @@ -25,6 +30,25 @@ class SenderBufferV2 extends SenderBufferBase {
);
return this;
}

arrayColumn(name: string, value: unknown[]): SenderBuffer {
if (value && !Array.isArray(value)) {
throw new Error(`Value must be an array, received ${value}`);
}
this.writeColumn(name, value, () => {
this.checkCapacity([], 3);
this.writeByte(EQUALS_SIGN);
this.writeByte(ENTITY_TYPE_ARRAY);

if (!value) {
this.writeByte(COLUMN_TYPE_NULL);
} else {
this.writeByte(COLUMN_TYPE_DOUBLE);
this.writeArray(value);
}
});
return this;
}
}

export { SenderBufferV2 };
2 changes: 2 additions & 0 deletions src/buffer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,8 @@ interface SenderBuffer {
*/
floatColumn(name: string, value: number): SenderBuffer;

arrayColumn(name: string, value: unknown[]): SenderBuffer;

/**
* Write an integer column with its value into the buffer of the sender.
*
Expand Down
5 changes: 5 additions & 0 deletions src/sender.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,11 @@ class Sender {
return this;
}

arrayColumn(name: string, value: unknown[]): Sender {
this.buffer.arrayColumn(name, value);
return this;
}

/**
* Write an integer column with its value into the buffer of the sender.
*
Expand Down
13 changes: 13 additions & 0 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,18 @@ function timestampToNanos(timestamp: bigint, unit: TimestampUnit) {
}
}

function getDimensions(arr: unknown) {
const dimensions: number[] = [];
while (Array.isArray(arr)) {
if (arr.length === 0) {
throw new Error("zero length array not supported");
}
dimensions.push(arr.length);
arr = arr[0];
}
return dimensions;
}

async function fetchJson<T>(url: string): Promise<T> {
let response: globalThis.Response;
try {
Expand All @@ -59,4 +71,5 @@ export {
timestampToNanos,
TimestampUnit,
fetchJson,
getDimensions,
};
117 changes: 117 additions & 0 deletions test/sender.buffer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,108 @@ describe("Sender message builder test suite (anything not covered in client inte
await sender.close();
});

it("does not support arrays with protocol v1", async function () {
const sender = new Sender({
protocol: "tcp",
protocol_version: "1",
host: "host",
init_buf_size: 1024,
});
expect(() =>
sender.table("tableName").arrayColumn("arrayCol", [12.3, 23.4]),
).toThrow("Arrays are not supported in protocol v1");
await sender.close();
});

it("supports arrays with protocol v2", async function () {
const sender = new Sender({
protocol: "tcp",
protocol_version: "2",
host: "host",
init_buf_size: 1024,
});
await sender
.table("tableName")
.arrayColumn("arrayCol", [12.3, 23.4])
.atNow();
expect(bufferContentHex(sender)).toBe(
toHex("tableName arrayCol==") +
" 0e 0a 01 02 00 00 00 9a 99 99 99 99 99 28 40 66 66 66 66 66 66 37 40 " +
toHex("\n"),
);
await sender.close();
});

it("supports multidimensional arrays with protocol v2", async function () {
const sender = new Sender({
protocol: "tcp",
protocol_version: "2",
host: "host",
init_buf_size: 1024,
});
await sender
.table("tableName")
.arrayColumn("arrayCol", [[12.3], [23.4]])
.atNow();
expect(bufferContentHex(sender)).toBe(
toHex("tableName arrayCol==") +
" 0e 0a 02 02 00 00 00 01 00 00 00 9a 99 99 99 99 99 28 40 66 66 66 66 66 66 37 40 " +
toHex("\n"),
);
await sender.close();
});

it("does not accept empty array", async function () {
const sender = new Sender({
protocol: "tcp",
protocol_version: "2",
host: "host",
init_buf_size: 1024,
});
sender.table("tableName");
expect(() => sender.arrayColumn("arrayCol", [])).toThrow(
"zero length array not supported",
);
expect(() => sender.arrayColumn("arrayCol", [[], []])).toThrow(
"zero length array not supported",
);
await sender.close();
});

it("does not accept irregular array", async function () {
const sender = new Sender({
protocol: "tcp",
protocol_version: "2",
host: "host",
init_buf_size: 1024,
});
expect(() =>
sender.table("tableName").arrayColumn("arrayCol", [[1.1, 2.2], [3.3]]),
).toThrow(
"length does not match array dimensions [dimensions=[2,2], length=1]",
);
await sender.close();
});

it("supports arrays with NULL value", async function () {
const sender = new Sender({
protocol: "http",
protocol_version: "2",
host: "host",
init_buf_size: 1024,
});
await sender.table("tableName").arrayColumn("arrayCol", undefined).atNow();
await sender.table("tableName").arrayColumn("arrayCol", null).atNow();
expect(bufferContentHex(sender)).toBe(
toHex("tableName arrayCol==") +
" 0e 21 " +
toHex("\ntableName arrayCol==") +
" 0e 21 " +
toHex("\n"),
);
await sender.close();
});

it("supports timestamp field as number", async function () {
const sender = new Sender({
protocol: "tcp",
Expand Down Expand Up @@ -838,6 +940,21 @@ function bufferContent(sender: Sender) {
return sender.buffer.toBufferView().toString();
}

function bufferContentHex(sender: Sender) {
// @ts-expect-error - Accessing private field
return toHexString(sender.buffer.toBufferView());
}

function toHex(str: string) {
return toHexString(Buffer.from(str));
}

function toHexString(buffer: Buffer) {
return Array.from(buffer)
.map((b) => b.toString(16).padStart(2, "0"))
.join(" ");
}

function bufferSize(sender: Sender) {
// @ts-expect-error - Accessing private field
return sender.buffer.bufferSize;
Expand Down