Skip to content

Commit d58c002

Browse files
Merge pull request #13 from CleverCloud/feat/tracing
Implements tracing trough opentelemetry
2 parents af573e5 + b4b4fdb commit d58c002

File tree

18 files changed

+1233
-165
lines changed

18 files changed

+1233
-165
lines changed

Cargo.lock

Lines changed: 792 additions & 112 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 60 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -13,49 +13,86 @@ keywords = ["kubernetes", "operator", "clevercloud", "openshift"]
1313
[dependencies]
1414
async-trait = "^0.1.51"
1515
chrono = "^0.4.19"
16-
clevercloud-sdk = { version = "^0.1.1", features = ["jsonschemas", "logging", "metrics"] }
16+
clevercloud-sdk = { version = "^0.2.0", features = ["jsonschemas"] }
1717
config = "^0.11.0"
1818
futures = "^0.3.17"
1919
hostname = "^0.3.1"
2020
hyper = { version = "^0.14.13", features = ["server", "tcp", "http1"] }
2121
json-patch = "^0.2.6"
22-
kube = { version = "^0.61.0", default-features = false, features = ["client", "rustls-tls", "ws", "gzip", "derive", "jsonpatch"] }
22+
kube = { version = "^0.61.0", default-features = false, features = [
23+
"client",
24+
"rustls-tls",
25+
"ws",
26+
"gzip",
27+
"derive",
28+
"jsonpatch",
29+
] }
2330
kube-derive = "^0.61.0"
2431
kube-runtime = "^0.61.0"
25-
k8s-openapi = { version = "^0.13.0", default-features = false, features = ["v1_21"] }
32+
k8s-openapi = { version = "^0.13.0", default-features = false, features = [
33+
"v1_21",
34+
] }
2635
lazy_static = { version = "^1.4.0", optional = true }
36+
opentelemetry = { version = "^0.16.0", features = [
37+
"rt-tokio",
38+
], optional = true }
39+
opentelemetry-jaeger = { version = "^0.15.0", features = [
40+
"rt-tokio",
41+
"collector_client",
42+
], optional = true }
2743
paw = "^1.0.0"
2844
prometheus = { version = "^0.13.0", optional = true }
29-
schemars = { version = "^0.8.6", features = [ "chrono", "indexmap", "uuid", "bytes", "url" ] }
45+
schemars = { version = "^0.8.6", features = [
46+
"chrono",
47+
"indexmap",
48+
"uuid",
49+
"bytes",
50+
"url",
51+
] }
3052
sentry = { version = "^0.23.0", optional = true }
3153
serde = { version = "^1.0.130", features = ["derive"] }
32-
serde_json = { version = "^1.0.68", features = ["preserve_order", "float_roundtrip"] }
54+
serde_json = { version = "^1.0.68", features = [
55+
"preserve_order",
56+
"float_roundtrip",
57+
] }
3358
serde_yaml = "^0.8.21"
3459
slog = { version = "^2.7.0" }
3560
slog-async = "^2.7.0"
3661
slog-term = "^2.8.0"
3762
slog-scope = "^4.4.0"
3863
slog-stdlog = { version = "^4.1.0", optional = true }
39-
structopt = { version = "^0.3.23", features = ["paw"] }
64+
structopt = { version = "^0.3.25", features = ["paw"] }
4065
thiserror = "^1.0.29"
4166
tokio = { version = "^1.12.0", features = ["full"] }
67+
tracing = { version = "^0.1.29", optional = true }
68+
tracing-futures = { version = "^0.2.5", features = ["tokio"], optional = true }
69+
tracing-subscriber = { version = "^0.2.25", features = [
70+
"chrono",
71+
], optional = true }
72+
tracing-opentelemetry = { version = "^0.15.0", optional = true }
4273

4374
[features]
44-
default = ["metrics", "release-log-debug", "debug-log-trace"]
45-
logging = ["slog-stdlog"]
46-
metrics = ["lazy_static", "prometheus"]
75+
default = [
76+
"metrics",
77+
"trace",
78+
"tracker",
79+
"slog/release_max_level_debug",
80+
"slog/max_level_trace",
81+
]
82+
logging = [
83+
"clevercloud-sdk/logging",
84+
"tracing-subscriber/tracing-log",
85+
"slog-stdlog",
86+
]
87+
metrics = ["clevercloud-sdk/metrics", "lazy_static", "prometheus"]
4788
tracker = ["sentry"]
48-
49-
release-log-off = ["slog/release_max_level_off"]
50-
release-log-error = ["slog/release_max_level_error"]
51-
release-log-warning = ["slog/release_max_level_warn"]
52-
release-log-info = ["slog/release_max_level_info"]
53-
release-log-debug = ["slog/release_max_level_debug"]
54-
release-log-trace = ["slog/release_max_level_trace"]
55-
56-
debug-log-off = ["slog/max_level_off"]
57-
debug-log-error = ["slog/max_level_error"]
58-
debug-log-warning = ["slog/max_level_warn"]
59-
debug-log-info = ["slog/max_level_info"]
60-
debug-log-debug = ["slog/max_level_debug"]
61-
debug-log-trace = ["slog/max_level_trace"]
89+
trace = [
90+
"clevercloud-sdk/trace",
91+
"clevercloud-sdk/tokio",
92+
"tracing",
93+
"tracing-futures",
94+
"tracing-subscriber",
95+
"tracing-opentelemetry",
96+
"opentelemetry",
97+
"opentelemetry-jaeger",
98+
]

src/cmd/crd.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ pub enum CustomResource {
2525
impl FromStr for CustomResource {
2626
type Err = Box<dyn Error + Send + Sync>;
2727

28+
#[cfg_attr(feature = "trace", tracing::instrument)]
2829
fn from_str(s: &str) -> Result<Self, Self::Err> {
2930
match s.to_lowercase().as_str() {
3031
"postgresql" => Ok(Self::PostgreSql),
@@ -59,6 +60,7 @@ pub enum CustomResourceDefinition {
5960
impl Executor for CustomResourceDefinition {
6061
type Error = CustomResourceDefinitionError;
6162

63+
#[cfg_attr(feature = "trace", tracing::instrument)]
6264
async fn execute(&self, config: Arc<Configuration>) -> Result<(), Self::Error> {
6365
match self {
6466
Self::View { custom_resource } => view(config, custom_resource).await,
@@ -69,6 +71,7 @@ impl Executor for CustomResourceDefinition {
6971
// -----------------------------------------------------------------------------
7072
// view function
7173

74+
#[cfg_attr(feature = "trace", tracing::instrument)]
7275
pub async fn view(
7376
_config: Arc<Configuration>,
7477
custom_resource: &Option<CustomResource>,

src/cmd/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub enum Command {
5959
impl Executor for Command {
6060
type Error = CommandError;
6161

62+
#[cfg_attr(feature = "trace", tracing::instrument)]
6263
async fn execute(&self, config: Arc<Configuration>) -> Result<(), Self::Error> {
6364
match self {
6465
Self::CustomResourceDefinition(crd) => crd
@@ -110,6 +111,7 @@ pub enum DaemonError {
110111
// -----------------------------------------------------------------------------
111112
// daemon function
112113

114+
#[cfg_attr(feature = "trace", tracing::instrument)]
113115
pub async fn daemon(
114116
kubeconfig: Option<PathBuf>,
115117
config: Arc<Configuration>,

src/main.rs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,16 @@
44
//! resource definition
55
use std::{convert::TryFrom, error::Error, sync::Arc};
66

7+
#[cfg(feature = "trace")]
8+
use opentelemetry::global;
9+
#[cfg(feature = "trace")]
10+
use opentelemetry_jaeger::Propagator;
711
use slog::{o, Drain, Level, LevelFilter, Logger};
812
use slog_async::Async;
913
use slog_scope::{crit, debug, info, set_global_logger, GlobalLoggerGuard as Guard};
1014
use slog_term::{FullFormat, TermDecorator};
15+
#[cfg(feature = "trace")]
16+
use tracing_subscriber::{layer::SubscriberExt, Registry};
1117

1218
use crate::{
1319
cmd::{daemon, Args, Executor},
@@ -51,14 +57,36 @@ pub(crate) async fn main(args: Args) -> Result<(), Box<dyn Error + Send + Sync>>
5157
}
5258

5359
#[cfg(feature = "tracker")]
54-
if let Some(dsn) = config.sentry.dsn.to_owned() {
55-
let _sguard = sentry::init((
56-
dsn,
60+
let _sguard = config.sentry.dsn.as_ref().map(|dsn| {
61+
sentry::init((
62+
dsn.to_owned(),
5763
sentry::ClientOptions {
5864
release: sentry::release_name!(),
5965
..Default::default()
6066
},
61-
));
67+
))
68+
});
69+
70+
#[cfg(feature = "trace")]
71+
if let Some(jaeger) = &config.jaeger {
72+
info!("Start to trace using jaeger with opentelemetry compatibility"; "endpoint" => jaeger.endpoint.to_owned());
73+
global::set_text_map_propagator(Propagator::new());
74+
75+
let mut builder = opentelemetry_jaeger::new_pipeline()
76+
.with_collector_endpoint(jaeger.endpoint.to_owned())
77+
.with_service_name(env!("CARGO_PKG_NAME"));
78+
79+
if let Some(user) = &jaeger.user {
80+
builder = builder.with_collector_username(user);
81+
}
82+
83+
if let Some(password) = &jaeger.password {
84+
builder = builder.with_collector_password(password);
85+
}
86+
87+
let layer = tracing_opentelemetry::layer().with_tracer(builder.install_simple()?);
88+
89+
tracing::subscriber::set_global_default(Registry::default().with(layer))?;
6290
}
6391

6492
let result: Result<_, Box<dyn Error + Send + Sync>> = match &args.command {
@@ -71,6 +99,9 @@ pub(crate) async fn main(args: Args) -> Result<(), Box<dyn Error + Send + Sync>>
7199
return Err(err);
72100
}
73101

102+
#[cfg(feature = "trace")]
103+
global::shutdown_tracer_provider();
104+
74105
info!("{} halted!", env!("CARGO_PKG_NAME"));
75106
Ok(())
76107
}

src/svc/cfg.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub struct Api {
4141

4242
#[allow(clippy::from_over_into)]
4343
impl Into<Credentials> for Api {
44+
#[cfg_attr(feature = "trace", tracing::instrument)]
4445
fn into(self) -> Credentials {
4546
Credentials {
4647
token: self.token.to_owned(),
@@ -76,6 +77,17 @@ pub struct Sentry {
7677
pub dsn: Option<String>,
7778
}
7879

80+
// -----------------------------------------------------------------------------
81+
// Jaeger structure
82+
83+
#[cfg(feature = "trace")]
84+
#[derive(Serialize, Deserialize, PartialEq, Eq, Clone, Debug, Default)]
85+
pub struct Jaeger {
86+
pub endpoint: String,
87+
pub user: Option<String>,
88+
pub password: Option<String>,
89+
}
90+
7991
// -----------------------------------------------------------------------------
8092
// Configuration structures
8193

@@ -88,11 +100,15 @@ pub struct Configuration {
88100
#[cfg(feature = "tracker")]
89101
#[serde(rename = "sentry", default = "Default::default")]
90102
pub sentry: Sentry,
103+
#[cfg(feature = "trace")]
104+
#[serde(rename = "jaeger")]
105+
pub jaeger: Option<Jaeger>,
91106
}
92107

93108
impl TryFrom<PathBuf> for Configuration {
94109
type Error = ConfigurationError;
95110

111+
#[cfg_attr(feature = "trace", tracing::instrument)]
96112
fn try_from(path: PathBuf) -> Result<Self, Self::Error> {
97113
let mut config = Config::default();
98114

@@ -119,6 +135,7 @@ impl TryFrom<PathBuf> for Configuration {
119135
}
120136

121137
impl Configuration {
138+
#[cfg_attr(feature = "trace", tracing::instrument)]
122139
pub fn try_default() -> Result<Self, ConfigurationError> {
123140
let mut config = Config::default();
124141

src/svc/clevercloud/ext.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
//! This module provide extensions to help building custom resource reconciler
44
//! loop
55
6-
use std::collections::BTreeMap;
6+
use std::{collections::BTreeMap, fmt::Debug};
77

88
use async_trait::async_trait;
99
use clevercloud_sdk::{
@@ -18,7 +18,7 @@ use slog_scope::{debug, trace};
1818
// AddonExt trait
1919

2020
#[async_trait]
21-
pub trait AddonExt: Into<CreateAddonOpts> + Clone + Sync + Send {
21+
pub trait AddonExt: Into<CreateAddonOpts> + Clone + Debug + Sync + Send {
2222
type Error: From<ClientError> + Sync + Send;
2323

2424
fn id(&self) -> Option<String>;
@@ -27,6 +27,7 @@ pub trait AddonExt: Into<CreateAddonOpts> + Clone + Sync + Send {
2727

2828
fn name(&self) -> String;
2929

30+
#[cfg_attr(feature = "trace", tracing::instrument)]
3031
async fn get(&self, client: &Client) -> Result<Option<Addon>, Self::Error> {
3132
if let Some(id) = &self.id() {
3233
trace!("Retrieve the addon from the identifier"; "id" => &id, "name" => self.name());
@@ -56,6 +57,7 @@ pub trait AddonExt: Into<CreateAddonOpts> + Clone + Sync + Send {
5657
Ok(None)
5758
}
5859

60+
#[cfg_attr(feature = "trace", tracing::instrument)]
5961
async fn upsert(&self, client: &Client) -> Result<Addon, Self::Error> {
6062
debug!("Try to retrieve the addon, before creating a new one"; "id" => &self.id(), "name" => self.name());
6163
if let Some(addon) = self.get(client).await? {
@@ -66,6 +68,7 @@ pub trait AddonExt: Into<CreateAddonOpts> + Clone + Sync + Send {
6668
Ok(addon::create(client, &self.organisation(), &self.to_owned().into()).await?)
6769
}
6870

71+
#[cfg_attr(feature = "trace", tracing::instrument)]
6972
async fn delete(&self, client: &Client) -> Result<(), Self::Error> {
7073
if let Some(a) = self.get(client).await? {
7174
addon::delete(client, &self.organisation(), &a.id).await?;
@@ -74,6 +77,7 @@ pub trait AddonExt: Into<CreateAddonOpts> + Clone + Sync + Send {
7477
Ok(())
7578
}
7679

80+
#[cfg_attr(feature = "trace", tracing::instrument)]
7781
async fn secrets(
7882
&self,
7983
client: &Client,

src/svc/crd/postgresql.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ pub struct PostgreSqlStatus {
9898

9999
#[allow(clippy::from_over_into)]
100100
impl Into<CreateAddonOpts> for PostgreSql {
101+
#[cfg_attr(feature = "trace", tracing::instrument)]
101102
fn into(self) -> CreateAddonOpts {
102103
CreateAddonOpts {
103104
name: AddonExt::name(&self),
@@ -112,6 +113,7 @@ impl Into<CreateAddonOpts> for PostgreSql {
112113
impl AddonExt for PostgreSql {
113114
type Error = ReconcilerError;
114115

116+
#[cfg_attr(feature = "trace", tracing::instrument)]
115117
fn id(&self) -> Option<String> {
116118
if let Some(status) = &self.status {
117119
return status.addon.to_owned();
@@ -120,10 +122,12 @@ impl AddonExt for PostgreSql {
120122
None
121123
}
122124

125+
#[cfg_attr(feature = "trace", tracing::instrument)]
123126
fn organisation(&self) -> String {
124127
self.spec.organisation.to_owned()
125128
}
126129

130+
#[cfg_attr(feature = "trace", tracing::instrument)]
127131
fn name(&self) -> String {
128132
"kubernetes_".to_string()
129133
+ &self
@@ -133,13 +137,15 @@ impl AddonExt for PostgreSql {
133137
}
134138

135139
impl PostgreSql {
140+
#[cfg_attr(feature = "trace", tracing::instrument)]
136141
pub fn set_addon_id(&mut self, id: Option<String>) {
137142
let mut status = self.status.get_or_insert_with(PostgreSqlStatus::default);
138143

139144
status.addon = id;
140145
self.status = Some(status.to_owned());
141146
}
142147

148+
#[cfg_attr(feature = "trace", tracing::instrument)]
143149
pub fn get_addon_id(&self) -> Option<String> {
144150
self.status
145151
.to_owned()
@@ -190,18 +196,21 @@ pub enum ReconcilerError {
190196
}
191197

192198
impl From<kube::Error> for ReconcilerError {
199+
#[cfg_attr(feature = "trace", tracing::instrument)]
193200
fn from(err: kube::Error) -> Self {
194201
Self::KubeClient(err)
195202
}
196203
}
197204

198205
impl From<ClientError> for ReconcilerError {
206+
#[cfg_attr(feature = "trace", tracing::instrument)]
199207
fn from(err: ClientError) -> Self {
200208
Self::CleverClient(err)
201209
}
202210
}
203211

204212
impl From<controller::Error<Self, watcher::Error>> for ReconcilerError {
213+
#[cfg_attr(feature = "trace", tracing::instrument)]
205214
fn from(err: controller::Error<ReconcilerError, watcher::Error>) -> Self {
206215
Self::Reconcile(err.to_string())
207216
}

src/svc/k8s/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use kube::{
99
Config,
1010
};
1111

12+
#[cfg_attr(feature = "trace", tracing::instrument)]
1213
/// returns a new kubernetes client from the given path if defined
1314
/// or retrieve it from environment or defaults paths
1415
pub async fn try_new(path: Option<PathBuf>) -> Result<kube::Client, kube::Error> {
@@ -20,5 +21,5 @@ pub async fn try_new(path: Option<PathBuf>) -> Result<kube::Client, kube::Error>
2021
let opts = KubeConfigOptions::default();
2122
let config = Config::from_custom_kubeconfig(kubeconfig, &opts).await?;
2223

23-
Ok(kube::Client::try_from(config)?)
24+
kube::Client::try_from(config)
2425
}

0 commit comments

Comments
 (0)