Skip to content

Commit 7a3698c

Browse files
authored
Merge pull request #78 from neon-bindings/kv/gzip-stream
feat: Add streaming gzip compression example
2 parents b603cd9 + 3d650a0 commit 7a3698c

File tree

10 files changed

+2311
-5
lines changed

10 files changed

+2311
-5
lines changed

Cargo.lock

Lines changed: 80 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
[workspace]
22
members = [
33
"examples/async-sqlite",
4-
"examples/hello-world",
54
"examples/cpu-count",
5+
"examples/gzip-stream",
6+
"examples/hello-world",
67
]
8+
9+
[profile.release]
10+
lto = true

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@ All examples are for [`napi-backend`][napi-migration]. For examples using `legac
1616
| ------------------------------ | ------------------------------------------ |
1717
| [`async-sqlite`][async-sqlite] | Async interface to a SQLite database |
1818
| [`cpu-count`][cpu-count] | Return the number of CPUs |
19+
| [`gzip-stream`][gzip-stream] | Asynchronously compress a stream of data |
1920
| [`hello-world`][hello-world] | Return a JS String with a greeting message |
2021

2122
[async-sqlite]: examples/async-sqlite
2223
[cpu-count]: examples/cpu-count
24+
[gzip-stream]: examples/gzip-stream
2325
[hello-world]: examples/hello-world
2426

2527
## Contributing

examples/gzip-stream/Cargo.toml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "gzip-stream"
3+
version = "0.1.0"
4+
description = "Neon Stream Example"
5+
license = "MIT"
6+
edition = "2018"
7+
exclude = ["index.node"]
8+
9+
[lib]
10+
crate-type = ["cdylib"]
11+
12+
[dependencies]
13+
flate2 = "1"
14+
15+
[dependencies.neon]
16+
version = "0.10.0-alpha.2"
17+
default-features = false
18+
features = ["napi-6", "promise-api", "task-api"]

examples/gzip-stream/README.md

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
# Async gzip compress
2+
3+
The gzip compression example demonstrates building a ["through" stream](https://nodejs.dev/learn/nodejs-streams) with Neon. The stream is both readable and writeable and CPU intensive processing occurs on the Node worker thread pool.
4+
5+
## Design
6+
7+
A small amount of JavaScript glue code can often simplify Neon modules. In this case, Node.js already provides a [`Transform`](https://nodejs.org/api/stream.html#stream_duplex_and_transform_streams) stream class. The `Transform` stream provides important features that could be complex to implement:
8+
9+
* Writeable
10+
* Readable
11+
* Backpressure
12+
* Asynchronous
13+
14+
Two methods must be implemented.
15+
16+
### `transform(chunk, encoding, callback)`
17+
18+
The `transform` method accepts a chunk of data and its encoding, as well a callback. The callback should be called when processing the `chunk` has completed.
19+
20+
### `flush(callback)`
21+
22+
The `flush` method allows any internally buffered data to be processed before completion. The `callback` is identical to the callback in `transform`.
23+
24+
## Glue
25+
26+
```js
27+
function compress() {
28+
const compressor = compressNew();
29+
30+
return new Transform({
31+
transform(chunk, encoding, callback) {
32+
compressChunk(compressor, encoding, chunk)
33+
.then(data => callback(null, data))
34+
.catch(callback);
35+
},
36+
37+
flush(callback) {
38+
compressFinish(compressor)
39+
.then(data => callback(null, data))
40+
.catch(callback);
41+
}
42+
});
43+
}
44+
```
45+
46+
The glue code exports a single function `compress` that creates a `Transform` stream delegating the implementation to Neon functions. Since these functions return promises, they are adapted to the `callback` style continuation that `Transform` expects.
47+
48+
## Neon
49+
50+
The Neon module exports three functions:
51+
52+
* [`compressNew`](#compressnew)
53+
* [`compressChunk`](#compresschunkcompressstream-chunk-encoding-callback)
54+
* [`compressFinish`](#compressfinishcompressstream-callback)
55+
56+
### `compressNew()`
57+
58+
```rust
59+
fn compress_new(mut cx: FunctionContext) -> JsResult<JsBox<CompressStream>> {
60+
let stream = CompressStream::new(Compression::best());
61+
62+
Ok(cx.boxed(stream))
63+
}
64+
```
65+
66+
`compressNew` creates an instance of the stateful Rust struct, `CompressStream`, and returns it wrapped in a [`JsBox`](https://docs.rs/neon/latest/neon/types/struct.JsBox.html). Each of the other two methods expects `CompressStream` as the first argument. This pattern is similar to using [`Function.prototype.call`](https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Function/call) on a class method to manually bind `this`.
67+
68+
### `compressChunk(compressStream, chunk, encoding, callback)`
69+
70+
```rust
71+
fn compress_chunk(mut cx: FunctionContext) -> JsResult<JsPromise> {
72+
let stream = (&**cx.argument::<JsBox<CompressStream>>(0)?).clone();
73+
let chunk = cx.argument::<JsTypedArray<u8>>(2)?
74+
.as_slice(&cx)
75+
.to_vec();
76+
77+
let promise = cx
78+
.task(move || stream.write(chunk))
79+
.promise(CompressStream::and_buffer);
80+
81+
Ok(promise)
82+
}
83+
```
84+
85+
`compressChunk` accepts the instance of the `CompressStream` struct and the other arguments to the [`transform`](#transformchunk-encoding-callback) function. The chunk is cloned to a `Vec<u8>` and passed to a task to execute on the Node worker pool. The asynchronous task compresses the data and passes the compressed data to the `.promise(|cx, result| { ... })` callback. The callback to `promise` is executed on the JavaScript main thread and converts the compressed `Vec<u8>` to a `JsBuffer` and resolves the promise.
86+
87+
`CompressChunk::and_buffer` is used to create a `Buffer`. `ArrayBuffer` cannot be used because stream chunks are required to be an instance of `Uint8Array`. `Buffer` is a subclass of `Uint8Array`.
88+
89+
### `compressFinish(compressStream, callback)`
90+
91+
fn compress_finish(mut cx: FunctionContext) -> JsResult<JsPromise> {
92+
let stream = (&**cx.argument::<JsBox<CompressStream>>(0)?).clone();
93+
94+
```rust
95+
fn compress_finish(mut cx: FunctionContext) -> JsResult<JsPromise> {
96+
let stream = (&**cx.argument::<JsBox<CompressStream>>(0)?).clone();
97+
98+
let promise = cx
99+
.task(move || stream.finish())
100+
.promise(CompressStream::and_buffer);
101+
102+
Ok(promise)
103+
}
104+
```
105+
106+
`compressFinish` works very similar to [`compressChunkl`](#compresschunkcompressstream-chunk-encoding-callback), except it is provided the arguments to [`flush`](#flushcallback) which does not include any data. Instead, the remaining buffered data is compressed, a CRC is calculated, and the compressed gzip data is completed.
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#!/usr/bin/env node
2+
3+
"use strict";
4+
5+
const { pipeline } = require("stream");
6+
const { Gzip, constants: { Z_BEST_COMPRESSION } } = require("zlib");
7+
8+
const neonCompress = require("..");
9+
const builtInCompress = () => new Gzip({ level: Z_BEST_COMPRESSION });
10+
11+
const compress = process.argv[2] === "built-in" ? builtInCompress : neonCompress;
12+
13+
pipeline(
14+
process.stdin,
15+
compress(),
16+
process.stdout,
17+
(err) => {
18+
if (err) {
19+
console.error(err);
20+
process.exitCode = -1;
21+
}
22+
}
23+
);

examples/gzip-stream/index.js

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"use strict";
2+
3+
// Transform stream reduces the boilerplate of a stream that reads bytes from a
4+
// source and produces new bytes for a destination sink.
5+
const { Transform } = require("stream");
6+
7+
const { compressNew, compressChunk, compressFinish } = require("./index.node");
8+
9+
// Creates a gzip compression transform stream, implemented asynchronously in Rust
10+
function compress() {
11+
// Create a native streaming gzip compressing with Neon
12+
const compressor = compressNew();
13+
14+
return new Transform({
15+
// Compress a chunk of data by delegating to `compressChunk`
16+
transform(chunk, encoding, callback) {
17+
compressChunk(compressor, encoding, chunk)
18+
.then(data => callback(null, data))
19+
.catch(callback);
20+
},
21+
22+
// Complete the compression by delegating to `compressFinish`
23+
flush(callback) {
24+
compressFinish(compressor)
25+
.then(data => callback(null, data))
26+
.catch(callback);
27+
}
28+
});
29+
}
30+
31+
module.exports = compress;

0 commit comments

Comments
 (0)