Skip to content

Commit 35ce369

Browse files
committed
Merge branch 'memshell-main'
2 parents 6214ee8 + 63aa263 commit 35ce369

32 files changed

+1083
-194
lines changed

plugins/lib/rust/src/lib.rs

Lines changed: 104 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use parking_lot::Mutex;
88
use protobuf::Message;
99
use signal_hook::consts::SIGTERM;
1010
use std::{
11+
env,
1112
fs::File,
1213
io::{BufReader, BufWriter, Error, Read, Write},
1314
sync::Arc,
@@ -32,6 +33,7 @@ pub enum EncodeType {
3233
}
3334
#[derive(Clone)]
3435
pub struct Client {
36+
high_writer: Arc<Mutex<BufWriter<File>>>,
3537
writer: Arc<Mutex<BufWriter<File>>>,
3638
reader: Arc<Mutex<BufReader<File>>>,
3739
}
@@ -43,9 +45,27 @@ const READ_PIPE_FD: i32 = 3;
4345
const WRITE_PIPE_FD: i32 = 1;
4446
#[cfg(not(feature = "debug"))]
4547
const WRITE_PIPE_FD: i32 = 4;
48+
const HIGH_PRIORIT_FD: i32 = 5;
4649

4750
impl Client {
51+
52+
pub fn can_use_high() -> bool {
53+
match env::var("ELKEID_PLUGIN_HIGH_PRIORITY_PIPE") {
54+
Ok(value) => {
55+
if !value.is_empty() {
56+
return true;
57+
}
58+
59+
}
60+
Err(_) => {
61+
return false;
62+
}
63+
64+
}
65+
false
66+
}
4867
pub fn new(ignore_terminate: bool) -> Self {
68+
4969
let writer = Arc::new(Mutex::new(BufWriter::with_capacity(512 * 1024, unsafe {
5070
#[cfg(target_family = "unix")]
5171
{
@@ -58,6 +78,37 @@ impl Client {
5878
File::from_raw_handle(raw_handle.0 as _)
5979
}
6080
})));
81+
let mut high_writer = writer.clone();
82+
if Self::can_use_high() {
83+
high_writer = Arc::new(Mutex::new(BufWriter::with_capacity(512 * 1024, unsafe {
84+
#[cfg(target_family = "unix")]
85+
{
86+
File::from_raw_fd(HIGH_PRIORIT_FD)
87+
}
88+
89+
#[cfg(target_family = "windows")]
90+
{
91+
let raw_handle = GetStdHandle(STD_OUTPUT_HANDLE).unwrap();
92+
File::from_raw_handle(raw_handle.0 as _)
93+
}
94+
})));
95+
96+
let high_writer_c = high_writer.clone();
97+
thread::spawn(move || {
98+
let ticker = tick(Duration::from_millis(200));
99+
loop {
100+
select! {
101+
recv(ticker)->_=>{
102+
let mut w = high_writer_c.lock();
103+
if w.flush().is_err() {
104+
break;
105+
}
106+
}
107+
}
108+
}
109+
});
110+
}
111+
61112
let reader = Arc::new(Mutex::new(BufReader::new(unsafe {
62113
#[cfg(target_family = "unix")]
63114
{
@@ -70,6 +121,7 @@ impl Client {
70121
File::from_raw_handle(raw_handle.0 as _)
71122
}
72123
})));
124+
73125
let writer_c = writer.clone();
74126
thread::spawn(move || {
75127
let ticker = tick(Duration::from_millis(200));
@@ -93,14 +145,17 @@ impl Client {
93145
info!("received signal: {:?}, wait 3 secs to exit", sig);
94146
thread::sleep(Duration::from_secs(3));
95147
unsafe {
148+
if Self::can_use_high() {
149+
libc::close(HIGH_PRIORIT_FD);
150+
}
96151
libc::close(WRITE_PIPE_FD);
97152
libc::close(READ_PIPE_FD);
98153
}
99154
}
100155
}
101156
});
102157
}
103-
Self { writer, reader }
158+
Self { high_writer, writer, reader }
104159
}
105160
pub fn send_record(&mut self, rec: &Record) -> Result<(), Error> {
106161
let mut w = self.writer.lock();
@@ -120,6 +175,54 @@ impl Client {
120175
w.write_all(b"}\n")
121176
}
122177
}
178+
pub fn send_record_high_priority(&mut self, rec: &Record) -> Result<(), Error> {
179+
180+
let mut w = self.high_writer.lock();
181+
#[cfg(not(feature = "debug"))]
182+
{
183+
w.write_all(&rec.compute_size().to_le_bytes()[..])?;
184+
rec.write_to_writer(&mut (*w)).map_err(|err| err.into())
185+
}
186+
#[cfg(feature = "debug")]
187+
{
188+
w.write_all(b"{\"data_type\":")?;
189+
w.write_all(rec.data_type.to_string().as_bytes())?;
190+
w.write_all(b",\"timestamp\":")?;
191+
w.write_all(rec.timestamp.to_string().as_bytes())?;
192+
w.write_all(b",\"data\":")?;
193+
serde_json::to_writer(w.by_ref(), rec.get_data().get_fields())?;
194+
w.write_all(b"}\n")
195+
}
196+
}
197+
198+
pub fn send_records_high_priority(&mut self, recs: &Vec<Record>) -> Result<(), Error> {
199+
let mut w = self.high_writer.lock();
200+
#[cfg(not(feature = "debug"))]
201+
{
202+
for rec in recs.iter() {
203+
println!("send: {:?}", rec);
204+
w.write_all(&rec.compute_size().to_le_bytes()[..])?;
205+
rec.write_to_writer(&mut (*w))
206+
.map_err(|err| -> std::io::Error { err.into() })?;
207+
}
208+
Ok(())
209+
}
210+
#[cfg(feature = "debug")]
211+
{
212+
213+
for rec in recs.iter() {
214+
w.write_all(b"{\"data_type\":")?;
215+
w.write_all(rec.data_type.to_string().as_bytes())?;
216+
w.write_all(b",\"timestamp\":")?;
217+
w.write_all(rec.timestamp.to_string().as_bytes())?;
218+
w.write_all(b",\"data\":")?;
219+
serde_json::to_writer(w.by_ref(), rec.get_data().get_fields())?;
220+
w.write_all(b"}\n")
221+
}
222+
Ok(())
223+
}
224+
}
225+
123226
pub fn send_records(&mut self, recs: &Vec<Record>) -> Result<(), Error> {
124227
let mut w = self.writer.lock();
125228
#[cfg(not(feature = "debug"))]

rasp/Makefile

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ VCPKG_OVERLAY_PORTS ?= $(abspath overlay-ports)
88

99
.PHONY: all help install clean set-version agent-plugin nsenter pangolin jattach JVMAgent JVMProbe python-probe python-loader go-probe go-probe-ebpf node-probe php-probe librasp rasp-server NSMount
1010

11-
all: rasp-$(VERSION).tar.gz rasp-$(VERSION)-debug.tar.gz SHA256SUMS
11+
all: rasp-linux-default-x86_64-$(VERSION).tar.gz rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz SHA256SUMS
1212

1313

1414
help:
@@ -27,19 +27,19 @@ install: | $(OUTPUT)
2727

2828

2929
clean:
30-
rm -rf $(OUTPUT) $(DEBUG_SYMBOLS) rasp-$(VERSION).tar.gz rasp-$(VERSION)-debug.tar.gz SHA256SUMS
30+
rm -rf $(OUTPUT) $(DEBUG_SYMBOLS) rasp-linux-default-x86_64-$(VERSION).tar.gz rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz SHA256SUMS
3131

3232

33-
rasp-$(VERSION).tar.gz: rasp-$(VERSION)-debug.tar.gz
33+
rasp-linux-default-x86_64-$(VERSION).tar.gz: rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz
3434
cd $(OUTPUT) && tar -czvf ../$@ ./*
3535

3636

37-
rasp-$(VERSION)-debug.tar.gz: | $(DEBUG_SYMBOLS)
37+
rasp-linux-default-x86_64-$(VERSION)-debug.tar.gz: | $(DEBUG_SYMBOLS)
3838
tar -czvf $@ $(DEBUG_SYMBOLS)
3939

4040

41-
SHA256SUMS: rasp-$(VERSION).tar.gz
42-
sha256sum $(OUTPUT)/rasp rasp-$(VERSION).tar.gz > $@
41+
SHA256SUMS: rasp-linux-default-x86_64-$(VERSION).tar.gz
42+
sha256sum $(OUTPUT)/rasp rasp-linux-default-x86_64-$(VERSION).tar.gz > $@
4343

4444

4545
set-version:

rasp/golang/client/smith_client.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include "smith_client.h"
2+
#include "smith_probe.h"
23
#include <aio/ev/timer.h>
34
#include <aio/net/stream.h>
45
#include <zero/log.h>
@@ -80,7 +81,7 @@ startClient(const std::shared_ptr<aio::Context> &context) {
8081
reason.code,
8182
reason.message.c_str()
8283
);
83-
84+
gProbe->discard_send++;
8485
return zero::ptr::makeRef<aio::ev::Timer>(context)->setTimeout(1min);
8586
});
8687
});

rasp/golang/client/smith_message.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ void to_json(nlohmann::json &j, const Heartbeat &heartbeat) {
3232
j = {
3333
{"filter", heartbeat.filter},
3434
{"block", heartbeat.block},
35-
{"limit", heartbeat.limit}
35+
{"limit", heartbeat.limit},
36+
{"discard_surplus", heartbeat.discard_surplus},
37+
{"discard_post", heartbeat.discard_post},
38+
{"discard_send", heartbeat.discard_send}
3639
};
3740
}
3841

rasp/golang/client/smith_message.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@ struct Heartbeat {
3333
std::string filter;
3434
std::string block;
3535
std::string limit;
36+
int64_t discard_surplus;
37+
int64_t discard_post;
38+
int64_t discard_send;
3639
};
3740

3841
struct Request {

rasp/golang/client/smith_probe.cpp

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,9 @@ void startProbe() {
7878
gProbe->quotas[classID][methodID] = it->second;
7979
}
8080

81+
heartbeat->discard_surplus = gProbe->discard_surplus;
82+
heartbeat->discard_post = gProbe->discard_post;
83+
heartbeat->discard_send = gProbe->discard_send;
8184
sender->trySend({HEARTBEAT, *heartbeat});
8285
return true;
8386
});
@@ -265,8 +268,12 @@ void startProbe() {
265268
Trace trace = gProbe->buffer[*index];
266269
gProbe->buffer.release(*index);
267270

268-
if (pass(trace, *filters))
269-
sender->trySend({TRACE, gProbe->buffer[*index]});
271+
if (pass(trace, *filters)) {
272+
auto result = sender->trySend({TRACE, trace});
273+
if (!result) {
274+
gProbe->discard_send++;
275+
}
276+
}
270277

271278
P_CONTINUE(loop);
272279
}

rasp/golang/client/smith_probe.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ struct Probe {
4040
z_rwlock_t locks[CLASS_MAX][METHOD_MAX];
4141
std::pair<size_t, Policy *> policies[CLASS_MAX][METHOD_MAX];
4242
zero::atomic::CircularBuffer<Trace, TRACE_BUFFER_SIZE> buffer;
43+
std::atomic<int64_t> discard_surplus;
44+
std::atomic<int64_t> discard_post;
45+
std::atomic<int64_t> discard_send;
4346
};
4447

4548
void startProbe();

rasp/golang/go/api/api.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -353,8 +353,10 @@ struct APIEntry {
353353

354354
static bool handler(uintptr_t sp, uintptr_t g) {
355355
if constexpr (ErrorIndex < 0) {
356-
if (!surplus())
356+
if (!surplus()) {
357+
gProbe->discard_surplus++;
357358
return true;
359+
}
358360
}
359361

360362
size_t FPSize = FLOAT_REGISTER * sizeof(double _Complex);
@@ -393,8 +395,10 @@ struct APIEntry {
393395
return false;
394396
}
395397

396-
if (!surplus())
398+
if (!surplus()) {
399+
gProbe->discard_surplus++;
397400
return true;
401+
}
398402
}
399403

400404
post(trace);
@@ -481,8 +485,10 @@ struct APIEntry {
481485
static void post(const Trace &trace) {
482486
std::optional<size_t> index = gProbe->buffer.reserve();
483487

484-
if (!index)
488+
if (!index) {
489+
gProbe->discard_post++;
485490
return;
491+
}
486492

487493
gProbe->buffer[*index] = trace;
488494
gProbe->buffer.commit(*index);

0 commit comments

Comments
 (0)