Skip to content

Commit ba46a61

Browse files
authored
perf: optimised marker operations removing heap allocations [backport] (#108)
1 parent ce8c43d commit ba46a61

File tree

2 files changed

+103
-20
lines changed

2 files changed

+103
-20
lines changed

questdb-rs/src/ingress/mod.rs

Lines changed: 31 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use std::collections::HashMap;
3434
use std::convert::Infallible;
3535
use std::fmt::{Debug, Display, Formatter, Write};
3636
use std::io::{self, BufRead, BufReader, ErrorKind, Write as IoWrite};
37+
use std::num::NonZeroUsize;
3738
use std::ops::Deref;
3839
use std::path::PathBuf;
3940
use std::str::FromStr;
@@ -430,11 +431,13 @@ impl OpCase {
430431
}
431432
}
432433

433-
#[derive(Debug, Clone)]
434+
// IMPORTANT: This struct MUST remain `Copy` to ensure that
435+
// there are no heap allocations when performing marker operations.
436+
#[derive(Debug, Clone, Copy)]
434437
struct BufferState {
435438
op_case: OpCase,
436439
row_count: usize,
437-
first_table: Option<String>,
440+
first_table_len: Option<NonZeroUsize>,
438441
transactional: bool,
439442
}
440443

@@ -443,17 +446,10 @@ impl BufferState {
443446
Self {
444447
op_case: OpCase::Init,
445448
row_count: 0,
446-
first_table: None,
449+
first_table_len: None,
447450
transactional: true,
448451
}
449452
}
450-
451-
fn clear(&mut self) {
452-
self.op_case = OpCase::Init;
453-
self.row_count = 0;
454-
self.first_table = None;
455-
self.transactional = true;
456-
}
457453
}
458454

459455
/// A reusable buffer to prepare a batch of ILP messages.
@@ -494,11 +490,11 @@ impl BufferState {
494490
/// * A row always starts with [`table`](Buffer::table).
495491
/// * A row must contain at least one [`symbol`](Buffer::symbol) or
496492
/// column (
497-
/// [`column_bool`](Buffer::column_bool),
498-
/// [`column_i64`](Buffer::column_i64),
499-
/// [`column_f64`](Buffer::column_f64),
500-
/// [`column_str`](Buffer::column_str),
501-
/// [`column_ts`](Buffer::column_ts)).
493+
/// [`column_bool`](Buffer::column_bool),
494+
/// [`column_i64`](Buffer::column_i64),
495+
/// [`column_f64`](Buffer::column_f64),
496+
/// [`column_str`](Buffer::column_str),
497+
/// [`column_ts`](Buffer::column_ts)).
502498
/// * Symbols must appear before columns.
503499
/// * A row must be terminated with either [`at`](Buffer::at) or
504500
/// [`at_now`](Buffer::at_now).
@@ -630,7 +626,7 @@ impl Buffer {
630626
)
631627
));
632628
}
633-
self.marker = Some((self.output.len(), self.state.clone()));
629+
self.marker = Some((self.output.len(), self.state));
634630
Ok(())
635631
}
636632

@@ -663,7 +659,7 @@ impl Buffer {
663659
/// [`capacity`](Buffer::capacity).
664660
pub fn clear(&mut self) {
665661
self.output.clear();
666-
self.state.clear();
662+
self.state = BufferState::new();
667663
self.marker = None;
668664
}
669665

@@ -729,16 +725,31 @@ impl Buffer {
729725
let name: TableName<'a> = name.try_into()?;
730726
self.validate_max_name_len(name.name)?;
731727
self.check_op(Op::Table)?;
728+
let table_begin = self.output.len();
732729
write_escaped_unquoted(&mut self.output, name.name);
730+
let table_end = self.output.len();
733731
self.state.op_case = OpCase::TableWritten;
734732

735733
// A buffer stops being transactional if it targets multiple tables.
736-
if let Some(first_table) = &self.state.first_table {
737-
if first_table != name.name {
734+
if let Some(first_table_len) = &self.state.first_table_len {
735+
let first_table = &self.output[0..(first_table_len.get())];
736+
let this_table = &self.output[table_begin..table_end];
737+
if first_table != this_table {
738738
self.state.transactional = false;
739739
}
740740
} else {
741-
self.state.first_table = Some(name.name.to_owned());
741+
debug_assert!(table_begin == 0);
742+
743+
// This is a bit confusing, so worth explaining:
744+
// `NonZeroUsize::new(table_end)` will return `None` if `table_end` is 0,
745+
// but we know that `table_end` is never 0 here, we just need an option type
746+
// anyway, so we don't bother unwrapping it to then wrap it again.
747+
let first_table_len = NonZeroUsize::new(table_end);
748+
749+
// Instead we just assert that it's `Some`.
750+
debug_assert!(first_table_len.is_some());
751+
752+
self.state.first_table_len = first_table_len;
742753
}
743754
Ok(self)
744755
}

questdb-rs/src/tests/sender.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,78 @@ fn test_row_count() -> TestResult {
151151
Ok(())
152152
}
153153

154+
#[test]
155+
fn test_transactional() -> TestResult {
156+
let mut buffer = Buffer::new();
157+
158+
// transactional since there are no recorded tables yet
159+
assert_eq!(buffer.row_count(), 0);
160+
assert!(buffer.transactional());
161+
162+
buffer.set_marker()?;
163+
buffer.table("table 1.test")?.symbol("a", "b")?.at_now()?;
164+
assert_eq!(buffer.row_count(), 1); // tables={'table 1.test'}
165+
166+
// still transactional since there is only one single table.
167+
assert!(buffer.transactional());
168+
169+
buffer.table("table 2.test")?.symbol("c", "d")?.at_now()?;
170+
171+
// not transactional since we have both tables "x" and "y".
172+
assert_eq!(buffer.row_count(), 2); // tables={'table 1.test', 'table 2.test'}
173+
assert!(!buffer.transactional());
174+
175+
buffer.rewind_to_marker()?;
176+
// no tables, so we're transactional again
177+
assert_eq!(buffer.row_count(), 0); // tables=[]
178+
assert!(buffer.transactional());
179+
assert!(buffer.is_empty());
180+
181+
// We add another new and different table, so we are still transactional.
182+
buffer.table("test=table=3")?.symbol("e", "f")?.at_now()?;
183+
assert_eq!(buffer.row_count(), 1); // tables={'test=table=3'}
184+
assert!(buffer.transactional());
185+
186+
// Same table again, so we are still transactional.
187+
buffer.table("test=table=3")?.symbol("g", "h")?.at_now()?;
188+
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
189+
assert!(buffer.transactional());
190+
191+
buffer.set_marker()?;
192+
// We add a new different table: Name differs in length.
193+
buffer.table("test=table=3 ")?.symbol("i", "j")?.at_now()?;
194+
assert_eq!(buffer.row_count(), 3); // tables={'test=table=3', 'test=table=3 '}
195+
assert!(!buffer.transactional());
196+
197+
buffer.rewind_to_marker()?;
198+
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
199+
assert!(buffer.transactional());
200+
201+
buffer.set_marker()?;
202+
// We add a new different table: Name differs in content, but not in length.
203+
buffer.table("test=table=4")?.symbol("k", "l")?.at_now()?;
204+
assert_eq!(buffer.row_count(), 3); // tables={'test=table=3', 'test=table=4'}
205+
assert!(!buffer.transactional());
206+
207+
buffer.rewind_to_marker()?;
208+
assert_eq!(buffer.row_count(), 2); // tables={'test=table=3'}
209+
assert!(buffer.transactional());
210+
211+
buffer.clear();
212+
assert_eq!(buffer.row_count(), 0); // tables=[]
213+
assert!(buffer.transactional());
214+
assert!(buffer.is_empty());
215+
216+
// We add three rows of the same new table, so we are still transactional.
217+
buffer.table("test=table=5")?.symbol("m", "n")?.at_now()?;
218+
buffer.table("test=table=5")?.symbol("o", "p")?.at_now()?;
219+
buffer.table("test=table=5")?.symbol("q", "r")?.at_now()?;
220+
assert_eq!(buffer.row_count(), 3); // tables={'test=table=5'}
221+
assert!(buffer.transactional());
222+
223+
Ok(())
224+
}
225+
154226
#[test]
155227
fn test_auth_inconsistent_keys() -> TestResult {
156228
test_bad_key("fLKYEaoEb9lrn3nkwLDA-M_xnuFOdSt9y0Z7_vWSHLU", // d

0 commit comments

Comments
 (0)