Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions .github/workflows/cppcheck.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@

name: cppcheck

on:
push:
branches: [ main ]
branches: [main]
pull_request:
branches: [ main ]
branches: [main]

permissions:
contents: read
Expand Down Expand Up @@ -66,12 +65,7 @@ jobs:
set +e
readonly WARNING_COUNT=`grep -c -E "\[.+\]" cppcheck.log`
echo "cppcheck reported ${WARNING_COUNT} warning(s)"
# Acceptable limit, to decrease over time down to 0
readonly WARNING_LIMIT=10
# FAIL the build if WARNING_COUNT > WARNING_LIMIT
if [ $WARNING_COUNT -gt $WARNING_LIMIT ] ; then
exit 1
# WARN in annotations if WARNING_COUNT > 0
elif [ $WARNING_COUNT -gt 0 ] ; then
if [ $WARNING_COUNT -gt 0 ] ; then
echo "::warning::cppcheck reported ${WARNING_COUNT} warning(s)"
exit 1
fi
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ class HttpClient : public opentelemetry::ext::http::client::HttpClient

void SetMaxSessionsPerConnection(std::size_t max_requests_per_connection) noexcept override;

bool InternalCancelAllSessions() noexcept;

inline uint64_t GetMaxSessionsPerConnection() const noexcept
{
return max_sessions_per_connection_;
Expand Down
49 changes: 27 additions & 22 deletions ext/src/http/client/curl/http_client_curl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ HttpClient::~HttpClient()
}

// Force to abort all sessions
CancelAllSessions();
InternalCancelAllSessions();

if (!background_thread)
{
Expand Down Expand Up @@ -342,27 +342,7 @@ std::shared_ptr<opentelemetry::ext::http::client::Session> HttpClient::CreateSes

bool HttpClient::CancelAllSessions() noexcept
{
// CancelSession may change sessions_, we can not change a container while iterating it.
while (true)
{
std::unordered_map<uint64_t, std::shared_ptr<Session>> sessions;
{
// We can only cleanup session and curl handles in the IO thread.
std::lock_guard<std::mutex> lock_guard{sessions_m_};
sessions = sessions_;
}

if (sessions.empty())
{
break;
}

for (auto &session : sessions)
{
session.second->CancelSession();
}
}
return true;
return InternalCancelAllSessions();
}

bool HttpClient::FinishAllSessions() noexcept
Expand Down Expand Up @@ -435,6 +415,31 @@ void HttpClient::CleanupSession(uint64_t session_id)
}
}

bool HttpClient::InternalCancelAllSessions() noexcept
{
// CancelSession may change sessions_, we can not change a container while iterating it.
while (true)
{
std::unordered_map<uint64_t, std::shared_ptr<Session>> sessions;
{
// We can only cleanup session and curl handles in the IO thread.
std::lock_guard<std::mutex> lock_guard{sessions_m_};
sessions = sessions_;
}

if (sessions.empty())
{
break;
}

for (auto &session : sessions)
{
session.second->CancelSession();
}
}
return true;
}

bool HttpClient::MaybeSpawnBackgroundThread()
{
std::lock_guard<std::mutex> lock_guard{background_thread_m_};
Expand Down
2 changes: 1 addition & 1 deletion sdk/include/opentelemetry/sdk/common/circular_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class CircularBuffer
{
return {};
}
auto data = data_.get();
AtomicUniquePtr<T> *data = data_.get();
if (tail_index < head_index)
{
return CircularBufferRange<AtomicUniquePtr<T>>{nostd::span<AtomicUniquePtr<T>>{
Expand Down
14 changes: 11 additions & 3 deletions sdk/include/opentelemetry/sdk/logs/batch_log_record_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,10 +93,11 @@ class BatchLogRecordProcessor : public LogRecordProcessor

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter. Any subsequent calls to
* ForceFlush or Shutdown will return immediately without doing anything.
* all its logs and passes them to the exporter.
*
* NOTE: Timeout functionality not supported yet.
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
* returning failure.
* @return true if the shutdown succeeded, false otherwise
*/
bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
Expand All @@ -107,6 +108,13 @@ class BatchLogRecordProcessor : public LogRecordProcessor
~BatchLogRecordProcessor() override;

protected:
/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter.
Comment on lines +112 to +113
Copy link
Preview

Copilot AI Aug 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The InternalShutdown method documentation is incomplete. It should include @param timeout and @return documentation to match the method signature, similar to the public Shutdown method documentation.

Suggested change
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its logs and passes them to the exporter.
* all its logs and passes them to the exporter.
*
* @param timeout The maximum time to wait for shutdown to complete.
* @return true if the shutdown completed successfully within the timeout, false otherwise.

Copilot uses AI. Check for mistakes.

*/
bool InternalShutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* The background routine performed by the worker thread.
*/
Expand Down
23 changes: 21 additions & 2 deletions sdk/include/opentelemetry/sdk/logs/multi_log_record_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,32 @@ class MultiLogRecordProcessor : public LogRecordProcessor
/**
* Shuts down the processor and does any cleanup required.
* ShutDown should only be called once for each processor.
* @param timeout minimum amount of microseconds to wait for
* shutdown before giving up and returning failure.
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
* returning failure.
* @return true if the shutdown succeeded, false otherwise
*/
bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;

protected:
/**
* Exports all log records that have not yet been exported to the configured Exporter.
* @param timeout that the forceflush is required to finish within.
* @return a result code indicating whether it succeeded, failed or timed out
*/
bool InternalForceFlush(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* Shuts down the processor and does any cleanup required.
* ShutDown should only be called once for each processor.
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
* returning failure.
* @return true if the shutdown succeeded, false otherwise
*/
bool InternalShutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

private:
std::vector<std::unique_ptr<LogRecordProcessor>> processors_;
};
Expand Down
18 changes: 15 additions & 3 deletions sdk/include/opentelemetry/sdk/trace/batch_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,11 @@ class BatchSpanProcessor : public SpanProcessor

/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its ended spans and passes them to the exporter. Any subsequent calls to OnStart, OnEnd,
* ForceFlush or Shutdown will return immediately without doing anything.
* all its ended spans and passes them to the exporter.
*
* NOTE: Timeout functionality not supported yet.
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
* returning failure.
* @return true if the shutdown succeeded, false otherwise
*/
bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override;
Expand All @@ -108,6 +109,17 @@ class BatchSpanProcessor : public SpanProcessor
~BatchSpanProcessor() override;

protected:
/**
* Shuts down the processor and does any cleanup required. Completely drains the buffer/queue of
* all its ended spans and passes them to the exporter.
*
* @param timeout minimum amount of microseconds to wait for shutdown before giving up and
* returning failure.
* @return true if the shutdown succeeded, false otherwise
*/
bool InternalShutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept;

/**
* The background routine performed by the worker thread.
*/
Expand Down
19 changes: 13 additions & 6 deletions sdk/include/opentelemetry/sdk/trace/multi_span_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,19 @@ class MultiSpanProcessor : public SpanProcessor

bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
{
return InternalShutdown(timeout);
}

~MultiSpanProcessor() override
{
InternalShutdown();
Cleanup();
}

protected:
bool InternalShutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept
{
bool result = true;
ProcessorNode *node = head_;
Expand All @@ -135,12 +148,6 @@ class MultiSpanProcessor : public SpanProcessor
return result;
}

~MultiSpanProcessor() override
{
Shutdown();
Cleanup();
}

private:
struct ProcessorNode
{
Expand Down
11 changes: 9 additions & 2 deletions sdk/include/opentelemetry/sdk/trace/simple_processor.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ class SimpleSpanProcessor : public SpanProcessor

bool Shutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept override
{
return InternalShutdown(timeout);
}

~SimpleSpanProcessor() override { InternalShutdown(); }

protected:
bool InternalShutdown(
std::chrono::microseconds timeout = (std::chrono::microseconds::max)()) noexcept
{
// We only call shutdown ONCE.
if (exporter_ != nullptr && !shutdown_latch_.test_and_set(std::memory_order_acquire))
Expand All @@ -85,8 +94,6 @@ class SimpleSpanProcessor : public SpanProcessor
return true;
}

~SimpleSpanProcessor() override { Shutdown(); }

private:
std::unique_ptr<SpanExporter> exporter_;
opentelemetry::common::SpinLockMutex lock_;
Expand Down
21 changes: 13 additions & 8 deletions sdk/src/logs/batch_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,19 @@ void BatchLogRecordProcessor::GetWaitAdjustedTime(
}

bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
return InternalShutdown(timeout);
}

BatchLogRecordProcessor::~BatchLogRecordProcessor()
{
if (synchronization_data_->is_shutdown.load() == false)
{
InternalShutdown();
}
}

bool BatchLogRecordProcessor::InternalShutdown(std::chrono::microseconds timeout) noexcept
{
auto start_time = std::chrono::system_clock::now();

Expand All @@ -391,14 +404,6 @@ bool BatchLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexce
return true;
}

BatchLogRecordProcessor::~BatchLogRecordProcessor()
{
if (synchronization_data_->is_shutdown.load() == false)
{
Shutdown();
}
}

} // namespace logs
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
16 changes: 13 additions & 3 deletions sdk/src/logs/multi_log_record_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ MultiLogRecordProcessor::MultiLogRecordProcessor(

MultiLogRecordProcessor::~MultiLogRecordProcessor()
{
ForceFlush();
Shutdown();
InternalForceFlush();
InternalShutdown();
}

void MultiLogRecordProcessor::AddProcessor(std::unique_ptr<LogRecordProcessor> &&processor)
Expand Down Expand Up @@ -74,6 +74,16 @@ void MultiLogRecordProcessor::OnEmit(std::unique_ptr<Recordable> &&record) noexc
}

bool MultiLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noexcept
{
return InternalForceFlush(timeout);
}

bool MultiLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
return InternalShutdown(timeout);
}

bool MultiLogRecordProcessor::InternalForceFlush(std::chrono::microseconds timeout) noexcept
{
bool result = true;
auto start_time = std::chrono::system_clock::now();
Expand Down Expand Up @@ -108,7 +118,7 @@ bool MultiLogRecordProcessor::ForceFlush(std::chrono::microseconds timeout) noex
return result;
}

bool MultiLogRecordProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
bool MultiLogRecordProcessor::InternalShutdown(std::chrono::microseconds timeout) noexcept
{
bool result = true;
auto start_time = std::chrono::system_clock::now();
Expand Down
21 changes: 13 additions & 8 deletions sdk/src/trace/batch_span_processor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,19 @@ void BatchSpanProcessor::GetWaitAdjustedTime(
}

bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
{
return InternalShutdown(timeout);
}

BatchSpanProcessor::~BatchSpanProcessor()
{
if (synchronization_data_->is_shutdown.load() == false)
{
InternalShutdown();
}
}

bool BatchSpanProcessor::InternalShutdown(std::chrono::microseconds timeout) noexcept
{
auto start_time = std::chrono::system_clock::now();
std::lock_guard<std::mutex> shutdown_guard{synchronization_data_->shutdown_m};
Expand All @@ -380,14 +393,6 @@ bool BatchSpanProcessor::Shutdown(std::chrono::microseconds timeout) noexcept
return true;
}

BatchSpanProcessor::~BatchSpanProcessor()
{
if (synchronization_data_->is_shutdown.load() == false)
{
Shutdown();
}
}

} // namespace trace
} // namespace sdk
OPENTELEMETRY_END_NAMESPACE
Loading