@@ -1848,6 +1848,31 @@ void* EventuallyPersistentEngine::getEngineSpecific(const CookieIface* cookie) {
1848
1848
return cookie->getEngineStorage ();
1849
1849
}
1850
1850
1851
+ void EventuallyPersistentEngine::storeStatTask (
1852
+ const CookieIface* cookie, std::shared_ptr<BackgroundStatTask> task) {
1853
+ // store a ptr to a shared_ptr to the task (must ensure the task is not
1854
+ // destroyed before the connection retrieves the result).
1855
+ // Once the frontend thread is notified, it must call retrieveStatTask,
1856
+ // which will free the heap-allocated shared_ptr.
1857
+ auto wrapper = std::make_unique<std::shared_ptr<BackgroundStatTask>>(task);
1858
+ storeEngineSpecific (cookie, wrapper.release ());
1859
+ }
1860
+
1861
+ std::shared_ptr<BackgroundStatTask>
1862
+ EventuallyPersistentEngine::retrieveStatTask (const CookieIface* cookie) {
1863
+ void * data = getEngineSpecific (cookie);
1864
+ if (data) {
1865
+ // take back ownership of the task ptr
1866
+ auto ptr = std::unique_ptr<std::shared_ptr<BackgroundStatTask>>(
1867
+ reinterpret_cast <std::shared_ptr<BackgroundStatTask>*>(data));
1868
+ // clear the engine specific - it's not valid to retrieve the
1869
+ // task twice (would lead to a double free)
1870
+ storeEngineSpecific (cookie, nullptr );
1871
+ return *ptr;
1872
+ }
1873
+ return nullptr ;
1874
+ }
1875
+
1851
1876
bool EventuallyPersistentEngine::isDatatypeSupported (
1852
1877
const CookieIface* cookie, protocol_binary_datatype_t datatype) {
1853
1878
return cookie->isDatatypeSupported (datatype);
@@ -3756,23 +3781,20 @@ class StatCheckpointVisitor : public VBucketVisitor {
3756
3781
AddStatFn add_stat;
3757
3782
};
3758
3783
3759
-
3760
- class StatCheckpointTask : public GlobalTask {
3784
+ class StatCheckpointTask : public BackgroundStatTask {
3761
3785
public:
3762
3786
StatCheckpointTask (EventuallyPersistentEngine* e,
3763
3787
const CookieIface* c,
3764
3788
AddStatFn a)
3765
- : GlobalTask(e, TaskId::StatCheckpointTask, 0 , false ),
3766
- ep (e),
3767
- cookie(c),
3768
- add_stat(std::move(a)) {
3789
+ : BackgroundStatTask(e, c, TaskId::StatCheckpointTask) {
3769
3790
}
3770
- bool run () override {
3791
+
3792
+ cb::engine_errc collectStats () override {
3771
3793
TRACE_EVENT0 (" ep-engine/task" , " StatsCheckpointTask" );
3772
- StatCheckpointVisitor scv (ep-> getKVBucket (), cookie, add_stat);
3773
- ep ->getKVBucket ()-> visit (scv );
3774
- ep-> notifyIOComplete (cookie, cb::engine_errc::success );
3775
- return false ;
3794
+ StatCheckpointVisitor scv (
3795
+ e ->getKVBucket (), cookie, getDeferredAddStat () );
3796
+ e-> getKVBucket ()-> visit (scv );
3797
+ return cb::engine_errc::success ;
3776
3798
}
3777
3799
3778
3800
std::string getDescription () const override {
@@ -3785,11 +3807,6 @@ class StatCheckpointTask : public GlobalTask {
3785
3807
// take /too/ long, so set limit of 100ms.
3786
3808
return std::chrono::milliseconds (100 );
3787
3809
}
3788
-
3789
- private:
3790
- EventuallyPersistentEngine *ep;
3791
- const CookieIface* cookie;
3792
- AddStatFn add_stat;
3793
3810
};
3794
3811
// / @endcond
3795
3812
@@ -3799,15 +3816,14 @@ cb::engine_errc EventuallyPersistentEngine::doCheckpointStats(
3799
3816
const char * stat_key,
3800
3817
int nkey) {
3801
3818
if (nkey == 10 ) {
3802
- void * es = getEngineSpecific (cookie);
3803
- if (es == nullptr ) {
3804
- ExTask task = std::make_shared<StatCheckpointTask>(
3805
- this , cookie, add_stat);
3819
+ auto task = retrieveStatTask (cookie);
3820
+ if (!task) {
3821
+ task = std::make_shared<StatCheckpointTask>(this , cookie, add_stat);
3806
3822
ExecutorPool::get ()->schedule (task);
3807
- storeEngineSpecific (cookie, this );
3823
+ storeStatTask (cookie, task );
3808
3824
return cb::engine_errc::would_block;
3809
3825
} else {
3810
- storeEngineSpecific (cookie, nullptr );
3826
+ return task-> maybeWriteResponse (add_stat );
3811
3827
}
3812
3828
} else if (nkey > 11 ) {
3813
3829
std::string vbid (&stat_key[11 ], nkey - 11 );
@@ -4034,21 +4050,19 @@ static void showConnAggStat(const std::string& connType,
4034
4050
}
4035
4051
}
4036
4052
4037
- class StatDCPTask : public GlobalTask {
4053
+ class StatDCPTask : public BackgroundStatTask {
4038
4054
public:
4039
4055
using Callback = std::function<cb::engine_errc(
4040
4056
EventuallyPersistentEngine* e, const CookieIface* cookie)>;
4041
4057
StatDCPTask (EventuallyPersistentEngine* e,
4042
4058
const CookieIface* cookie,
4043
4059
std::string_view description,
4044
4060
Callback callback)
4045
- : GlobalTask(e, TaskId::StatDCPTask, 0 , false ),
4046
- e(e),
4047
- cookie(cookie),
4061
+ : BackgroundStatTask(e, cookie, TaskId::StatDCPTask),
4048
4062
description(description),
4049
4063
callback(std::move(callback)) {
4050
4064
}
4051
- bool run () override {
4065
+ cb::engine_errc collectStats () override {
4052
4066
TRACE_EVENT0 (" ep-engine/task" , " StatDCPTask" );
4053
4067
cb::engine_errc result = cb::engine_errc::failed;
4054
4068
try {
@@ -4060,8 +4074,7 @@ class StatDCPTask : public GlobalTask {
4060
4074
e.what (),
4061
4075
getDescription ());
4062
4076
}
4063
- e->notifyIOComplete (cookie, result);
4064
- return false ;
4077
+ return result;
4065
4078
}
4066
4079
4067
4080
std::string getDescription () const override {
@@ -4075,8 +4088,6 @@ class StatDCPTask : public GlobalTask {
4075
4088
}
4076
4089
4077
4090
private:
4078
- EventuallyPersistentEngine* e;
4079
- const CookieIface* cookie;
4080
4091
const std::string description;
4081
4092
Callback callback;
4082
4093
};
@@ -4085,9 +4096,9 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
4085
4096
const CookieIface* cookie,
4086
4097
const AddStatFn& add_stat,
4087
4098
std::string_view sep) {
4088
- void * engineSpecific = getEngineSpecific (cookie);
4089
- if (engineSpecific == nullptr ) {
4090
- ExTask task = std::make_shared<StatDCPTask>(
4099
+ auto task = retrieveStatTask (cookie);
4100
+ if (!task ) {
4101
+ task = std::make_shared<StatDCPTask>(
4091
4102
this ,
4092
4103
cookie,
4093
4104
" Aggregated DCP stats" ,
@@ -4100,13 +4111,11 @@ cb::engine_errc EventuallyPersistentEngine::doConnAggStats(
4100
4111
return cb::engine_errc::success;
4101
4112
});
4102
4113
ExecutorPool::get ()->schedule (task);
4103
- storeEngineSpecific (cookie, this );
4114
+ storeStatTask (cookie, task );
4104
4115
return cb::engine_errc::would_block;
4105
4116
} else {
4106
- storeEngineSpecific (cookie, nullptr );
4117
+ return task-> maybeWriteResponse (add_stat );
4107
4118
}
4108
-
4109
- return cb::engine_errc::success;
4110
4119
}
4111
4120
4112
4121
cb::engine_errc EventuallyPersistentEngine::doConnAggStatsInner (
@@ -4137,9 +4146,9 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats(
4137
4146
const CookieIface* cookie,
4138
4147
const AddStatFn& add_stat,
4139
4148
std::string_view value) {
4140
- void * engineSpecific = getEngineSpecific (cookie);
4141
- if (engineSpecific == nullptr ) {
4142
- ExTask task = std::make_shared<StatDCPTask>(
4149
+ auto task = retrieveStatTask (cookie);
4150
+ if (!task ) {
4151
+ task = std::make_shared<StatDCPTask>(
4143
4152
this ,
4144
4153
cookie,
4145
4154
" Summarised bucket-wide DCP stats" ,
@@ -4150,13 +4159,11 @@ cb::engine_errc EventuallyPersistentEngine::doDcpStats(
4150
4159
return cb::engine_errc::success;
4151
4160
});
4152
4161
ExecutorPool::get ()->schedule (task);
4153
- storeEngineSpecific (cookie, this );
4162
+ storeStatTask (cookie, task );
4154
4163
return cb::engine_errc::would_block;
4155
4164
} else {
4156
- storeEngineSpecific (cookie, nullptr );
4165
+ return task-> maybeWriteResponse (add_stat );
4157
4166
}
4158
-
4159
- return cb::engine_errc::success;
4160
4167
}
4161
4168
4162
4169
void EventuallyPersistentEngine::doDcpStatsInner (const CookieIface* cookie,
0 commit comments