Skip to content

Reorganize object store metrics #5821

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 37 additions & 67 deletions quickwit/quickwit-storage/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

use once_cell::sync::Lazy;
use quickwit_common::metrics::{
GaugeGuard, Histogram, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec,
GaugeGuard, HistogramVec, IntCounter, IntCounterVec, IntGauge, new_counter, new_counter_vec,
new_gauge, new_histogram_vec,
};

Expand All @@ -30,19 +30,13 @@ pub struct StorageMetrics {
pub searcher_split_cache: CacheMetrics,
pub get_slice_timeout_successes: [IntCounter; 3],
pub get_slice_timeout_all_timeouts: IntCounter,
pub object_storage_get_total: IntCounter,
pub object_storage_get_errors_total: IntCounterVec<1>,
pub object_storage_requests_total: IntCounterVec<2>,
pub object_storage_request_duration: HistogramVec<2>,
pub object_storage_get_slice_in_flight_count: IntGauge,
pub object_storage_get_slice_in_flight_num_bytes: IntGauge,
pub object_storage_put_total: IntCounter,
pub object_storage_put_parts: IntCounter,
pub object_storage_download_num_bytes: IntCounter,
pub object_storage_upload_num_bytes: IntCounter,

pub object_storage_delete_requests_total: IntCounter,
pub object_storage_bulk_delete_requests_total: IntCounter,
pub object_storage_delete_request_duration: Histogram,
pub object_storage_bulk_delete_request_duration: Histogram,
pub object_storage_download_num_bytes: IntCounterVec<1>,
pub object_storage_download_errors: IntCounterVec<1>,
pub object_storage_upload_num_bytes: IntCounterVec<1>,
}

impl Default for StorageMetrics {
Expand All @@ -63,31 +57,6 @@ impl Default for StorageMetrics {
let get_slice_timeout_all_timeouts =
get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]);

let object_storage_requests_total = new_counter_vec(
"object_storage_requests_total",
"Total number of object storage requests performed.",
"storage",
&[],
["action"],
);
let object_storage_delete_requests_total =
object_storage_requests_total.with_label_values(["delete_object"]);
let object_storage_bulk_delete_requests_total =
object_storage_requests_total.with_label_values(["delete_objects"]);

let object_storage_request_duration = new_histogram_vec(
"object_storage_request_duration_seconds",
"Duration of object storage requests in seconds.",
"storage",
&[],
["action"],
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
);
let object_storage_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_object"]);
let object_storage_bulk_delete_request_duration =
object_storage_request_duration.with_label_values(["delete_objects"]);

StorageMetrics {
fast_field_cache: CacheMetrics::for_component("fastfields"),
fd_cache_metrics: CacheMetrics::for_component("fd"),
Expand All @@ -97,62 +66,63 @@ impl Default for StorageMetrics {
split_footer_cache: CacheMetrics::for_component("splitfooter"),
get_slice_timeout_successes,
get_slice_timeout_all_timeouts,
object_storage_get_total: new_counter(
"object_storage_gets_total",
"Number of objects fetched. Might be lower than get_slice_timeout_outcome if \
queries are debounced.",
object_storage_requests_total: new_counter_vec(
"object_storage_requests_total",
"Number of requests to the object store, by action and status. Requests are \
recorded when the response headers are returned, download failures will not \
appear as errors.",
"storage",
&[],
["action", "status"],
),
object_storage_get_errors_total: new_counter_vec::<1>(
"object_storage_get_errors_total",
"Number of GetObject errors.",
object_storage_request_duration: new_histogram_vec(
"object_storage_request_duration",
"Durations until the response headers are returned from the object store, by \
action and status. This does not measure the download time for the body content.",
"storage",
&[],
["code"],
["action", "status"],
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
),
object_storage_get_slice_in_flight_count: new_gauge(
"object_storage_get_slice_in_flight_count",
"Number of GetObject for which the memory was allocated but the download is still \
in progress.",
"Number of get_object for which the memory was allocated but the download is \
still in progress.",
"storage",
&[],
),
object_storage_get_slice_in_flight_num_bytes: new_gauge(
"object_storage_get_slice_in_flight_num_bytes",
"Memory allocated for GetObject requests that are still in progress.",
"Memory allocated for get_object requests that are still in progress.",
"storage",
&[],
),
object_storage_put_total: new_counter(
"object_storage_puts_total",
"Number of objects uploaded. May differ from object_storage_requests_parts due to \
multipart upload.",
object_storage_download_num_bytes: new_counter_vec(
"object_storage_download_num_bytes",
"Amount of data downloaded from object storage.",
"storage",
&[],
["status"],
),
object_storage_put_parts: new_counter(
"object_storage_puts_parts",
"Number of object parts uploaded.",
"",
&[],
),
object_storage_download_num_bytes: new_counter(
"object_storage_download_num_bytes",
"Amount of data downloaded from an object storage.",
object_storage_download_errors: new_counter_vec(
"object_storage_download_errors",
// Download errors are recorded separately because the associated
// get_object requests were already recorded as successful in
// object_storage_requests_total
"Number of download requests that received successful response headers but failed \
during download.",
"storage",
&[],
["status"],
),
object_storage_upload_num_bytes: new_counter(
object_storage_upload_num_bytes: new_counter_vec(
"object_storage_upload_num_bytes",
"Amount of data uploaded to an object storage.",
"Amount of data uploaded to object storage. The value recorded for failed and \
aborted uploads is the full payload size.",
"storage",
&[],
["status"],
),
object_storage_delete_requests_total,
object_storage_bulk_delete_requests_total,
object_storage_delete_request_duration,
object_storage_bulk_delete_request_duration,
}
}
}
Expand Down
49 changes: 27 additions & 22 deletions quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ use tracing::{instrument, warn};

use crate::debouncer::DebouncedStorage;
use crate::metrics::object_storage_get_slice_in_flight_guards;
use crate::object_storage::metrics_wrappers::{
ActionLabel, RequestMetricsWrapperExt, copy_with_download_metrics,
};
use crate::storage::SendableAsync;
use crate::{
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, STORAGE_METRICS, Storage,
StorageError, StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError,
StorageErrorKind, StorageFactory, StorageResolverError, StorageResult,
};

/// Azure object storage resolver.
Expand Down Expand Up @@ -225,10 +228,6 @@ impl AzureBlobStorage {
name: &'a str,
payload: Box<dyn crate::PutPayload>,
) -> StorageResult<()> {
crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(payload.len());
retry(&self.retry_params, || async {
let data = Bytes::from(payload.read_all().await?.to_vec());
let hash = azure_storage_blobs::prelude::Hash::from(md5::compute(&data[..]).0);
Expand All @@ -237,6 +236,7 @@ impl AzureBlobStorage {
.put_block_blob(data)
.hash(hash)
.into_future()
.with_count_and_upload_metrics(ActionLabel::PutObject, payload.len())
.await?;
Result::<(), AzureErrorWrapper>::Ok(())
})
Expand All @@ -261,10 +261,6 @@ impl AzureBlobStorage {
.map(|(num, range)| {
let moved_blob_client = blob_client.clone();
let moved_payload = payload.clone();
crate::STORAGE_METRICS.object_storage_put_parts.inc();
crate::STORAGE_METRICS
.object_storage_upload_num_bytes
.inc_by(range.end - range.start);
async move {
retry(&self.retry_params, || async {
let block_id = format!("block:{num}");
Expand All @@ -276,6 +272,10 @@ impl AzureBlobStorage {
.put_block(block_id.clone(), data)
.hash(hash)
.into_future()
.with_count_and_upload_metrics(
ActionLabel::UploadPart,
range.end - range.start,
)
.await?;
Result::<_, AzureErrorWrapper>::Ok(block_id)
})
Expand All @@ -299,6 +299,7 @@ impl AzureBlobStorage {
blob_client
.put_block_list(block_list)
.into_future()
.with_count_metric(ActionLabel::CompleteMultipartUpload)
.await
.map_err(AzureErrorWrapper::from)?;

Expand All @@ -315,6 +316,7 @@ impl Storage for AzureBlobStorage {
.max_results(NonZeroU32::new(1u32).expect("1 is always non-zero."))
.into_stream()
.next()
.with_count_metric(ActionLabel::ListObjects)
.await
{
let _ = first_blob_result?;
Expand All @@ -327,7 +329,6 @@ impl Storage for AzureBlobStorage {
path: &Path,
payload: Box<dyn crate::PutPayload>,
) -> crate::StorageResult<()> {
crate::STORAGE_METRICS.object_storage_put_total.inc();
let name = self.blob_name(path);
let total_len = payload.len();
let part_num_bytes = self.multipart_policy.part_num_bytes(total_len);
Expand All @@ -345,18 +346,19 @@ impl Storage for AzureBlobStorage {
let name = self.blob_name(path);
let mut output_stream = self.container_client.blob_client(name).get().into_stream();

while let Some(chunk_result) = output_stream.next().await {
while let Some(chunk_result) = output_stream
.next()
.with_count_metric(ActionLabel::GetObject)
.await
{
let chunk_response = chunk_result.map_err(AzureErrorWrapper::from)?;
let chunk_response_body_stream = chunk_response
.data
.map_err(FutureError::other)
.into_async_read()
.compat();
let mut body_stream_reader = BufReader::new(chunk_response_body_stream);
let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?;
STORAGE_METRICS
.object_storage_download_num_bytes
.inc_by(num_bytes_copied);
copy_with_download_metrics(&mut body_stream_reader, output).await?;
}
output.flush().await?;
Ok(())
Expand All @@ -369,6 +371,7 @@ impl Storage for AzureBlobStorage {
.blob_client(blob_name)
.delete()
.into_future()
.with_count_metric(ActionLabel::DeleteObject)
.await
.map_err(|err| AzureErrorWrapper::from(err).into());
ignore_error_kind!(StorageErrorKind::NotFound, delete_res)?;
Expand Down Expand Up @@ -491,6 +494,7 @@ impl Storage for AzureBlobStorage {
.blob_client(name)
.get_properties()
.into_future()
.with_count_metric(ActionLabel::HeadObject)
.await;
match properties_result {
Ok(response) => Ok(response.blob.properties.content_length),
Expand All @@ -513,7 +517,7 @@ async fn extract_range_data_and_hash(
.await?
.into_async_read();
let mut buf: Vec<u8> = Vec::with_capacity(range.count());
tokio::io::copy(&mut reader, &mut buf).await?;
tokio::io::copy_buf(&mut reader, &mut buf).await?;
let data = Bytes::from(buf);
let hash = md5::compute(&data[..]);
Ok((data, hash))
Expand Down Expand Up @@ -544,18 +548,19 @@ async fn download_all(
output: &mut Vec<u8>,
) -> Result<(), AzureErrorWrapper> {
output.clear();
while let Some(chunk_result) = chunk_stream.next().await {
while let Some(chunk_result) = chunk_stream
.next()
.with_count_metric(ActionLabel::GetObject)
.await
{
let chunk_response = chunk_result?;
let chunk_response_body_stream = chunk_response
.data
.map_err(FutureError::other)
.into_async_read()
.compat();
let mut body_stream_reader = BufReader::new(chunk_response_body_stream);
let num_bytes_copied = tokio::io::copy_buf(&mut body_stream_reader, output).await?;
crate::STORAGE_METRICS
.object_storage_download_num_bytes
.inc_by(num_bytes_copied);
copy_with_download_metrics(&mut body_stream_reader, output).await?;
}
// When calling `get_all`, the Vec capacity is not properly set.
output.shrink_to_fit();
Expand Down
7 changes: 1 addition & 6 deletions quickwit/quickwit-storage/src/object_storage/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use aws_sdk_s3::error::{DisplayErrorContext, ProvideErrorMetadata, SdkError};
use aws_sdk_s3::error::{DisplayErrorContext, SdkError};
use aws_sdk_s3::operation::abort_multipart_upload::AbortMultipartUploadError;
use aws_sdk_s3::operation::complete_multipart_upload::CompleteMultipartUploadError;
use aws_sdk_s3::operation::create_multipart_upload::CreateMultipartUploadError;
Expand Down Expand Up @@ -62,11 +62,6 @@ pub trait ToStorageErrorKind {

impl ToStorageErrorKind for GetObjectError {
fn to_storage_error_kind(&self) -> StorageErrorKind {
let error_code = self.code().unwrap_or("unknown");
crate::STORAGE_METRICS
.object_storage_get_errors_total
.with_label_values([error_code])
.inc();
match self {
GetObjectError::InvalidObjectState(_) => StorageErrorKind::Service,
GetObjectError::NoSuchKey(_) => StorageErrorKind::NotFound,
Expand Down
Loading