18
18
19
19
#include < glog/logging.h>
20
20
21
+ #include < atomic>
21
22
#include < cstdint>
22
23
#include < functional>
24
+ #include < memory>
25
+ #include < utility>
23
26
#include < vector>
24
27
25
28
#include " cache/utils/context.h"
26
29
#include " client/common/utils.h"
27
- #include " client/vfs/ const.h"
30
+ #include " client/const.h"
28
31
#include " client/vfs/data/common/common.h"
29
32
#include " client/vfs/data/common/data_utils.h"
30
33
#include " client/vfs/data/reader/reader_common.h"
31
34
#include " client/vfs/hub/vfs_hub.h"
32
35
#include " common/status.h"
33
36
#include " options/client/vfs/vfs_dynamic_option.h"
34
- #include " trace/tracer .h"
37
+ #include " trace/context .h"
35
38
36
39
namespace dingofs {
37
40
namespace client {
38
41
namespace vfs {
39
42
40
- ChunkReader::ChunkReader (VFSHub* hub, uint64_t ino, uint64_t index)
43
+ #define METHOD_NAME () (" ChunkReader::" + std::string(__FUNCTION__))
44
+
45
+ ChunkReader::ChunkReader (VFSHub* hub, uint64_t fh, uint64_t ino, uint64_t index)
41
46
: hub_(hub),
47
+ fh_ (fh),
42
48
chunk_(hub->GetFsInfo ().id, ino, index, hub->GetFsInfo().chunk_size,
43
49
hub->GetFsInfo().block_size, hub->GetPageSize()) {}
44
50
45
- void ChunkReader::BlockReadCallback (ChunkReader* reader,
51
+ void ChunkReader::BlockReadCallback (ContextSPtr ctx, ChunkReader* reader,
46
52
const BlockCacheReadReq& req,
47
53
ReaderSharedState& shared, Status s) {
54
+ auto span = reader->hub_ ->GetTracer ()->StartSpanWithContext (
55
+ kVFSDataMoudule , METHOD_NAME (), ctx);
56
+
48
57
if (!s.ok ()) {
49
58
LOG (WARNING) << fmt::format (
50
59
" {} ChunkReader fail read block_key: {}, buf_pos: {}, block_req: {} "
@@ -60,11 +69,16 @@ void ChunkReader::BlockReadCallback(ChunkReader* reader,
60
69
}
61
70
62
71
{
63
- std::lock_guard<std::mutex> lock (shared. mtx );
64
-
72
+ auto copy_span = reader-> hub_ -> GetTracer ()-> StartSpanWithParent (
73
+ kVFSDataMoudule , " ChunkReader::BlockReadCallback.IoBufferCopy " , *span);
65
74
if (s.ok ()) {
66
75
req.io_buffer .CopyTo (req.buf_pos );
67
- } else {
76
+ }
77
+ }
78
+
79
+ {
80
+ std::lock_guard<std::mutex> lock (shared.mtx );
81
+ if (!s.ok ()) {
68
82
// Handle read failure with error priority: other errors > NotFound
69
83
if (shared.status .ok ()) {
70
84
// First error, record it directly
@@ -83,13 +97,17 @@ void ChunkReader::BlockReadCallback(ChunkReader* reader,
83
97
}
84
98
}
85
99
86
- void ChunkReader::ReadAsync (const ChunkReadReq& req, StatusCallback cb) {
87
- hub_->GetReadExecutor ()->Execute ([this , &req, cb]() { DoRead (req, cb); });
100
+ void ChunkReader::ReadAsync (ContextSPtr ctx, const ChunkReadReq& req,
101
+ StatusCallback cb) {
102
+ hub_->GetReadExecutor ()->Execute (
103
+ [this , ctx, &req, cb]() { DoRead (ctx, req, cb); });
88
104
}
89
105
90
- void ChunkReader::DoRead (const ChunkReadReq& req, StatusCallback cb) {
91
- // TODO: get ctx from parent
92
- auto span = hub_->GetTracer ()->StartSpan (kVFSDataMoudule , __func__);
106
+ void ChunkReader::DoRead (ContextSPtr ctx, const ChunkReadReq& req,
107
+ StatusCallback cb) {
108
+ auto * tracer = hub_->GetTracer ();
109
+ auto span = tracer->StartSpanWithContext (kVFSDataMoudule , METHOD_NAME (), ctx);
110
+
93
111
uint64_t chunk_offset = req.offset ;
94
112
uint64_t size = req.to_read_size ;
95
113
char * buf = req.buf ;
@@ -112,36 +130,46 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
112
130
do {
113
131
uint64_t remain_len = size;
114
132
115
- std::vector<Slice> slices;
116
- Status s = hub_->GetMetaSystem ()->ReadSlice (span->GetContext (), chunk_.ino ,
117
- chunk_.index , 0 , &slices);
133
+ ChunkSlices chunk_slices;
134
+ Status s = GetSlices (span->GetContext (), &chunk_slices);
118
135
if (!s.ok ()) {
119
- LOG (WARNING) << fmt::format (" {} Read slice failed , status: {}" , UUID (),
136
+ LOG (WARNING) << fmt::format (" {} Failed GetSlices , status: {}" , UUID (),
120
137
s.ToString ());
121
138
cb (s);
122
139
}
123
140
141
+ std::vector<SliceReadReq> slice_reqs;
124
142
FileRange range{.offset = read_file_offset, .len = size};
125
- std::vector<SliceReadReq> slice_reqs = ProcessReadRequest (slices, range);
143
+ {
144
+ auto process_slice_reqs_span = tracer->StartSpanWithParent (
145
+ kVFSDataMoudule , " ChunkReader::DoRead.ProcessReadRequest" , *span);
146
+ slice_reqs = ProcessReadRequest (chunk_slices.slices , range);
147
+ }
126
148
127
149
std::vector<BlockReadReq> block_reqs;
128
150
129
- for (auto & slice_req : slice_reqs) {
130
- VLOG (6 ) << " {} Read slice_req: " << slice_req.ToString ();
131
-
132
- if (slice_req.slice .has_value () && !slice_req.slice .value ().is_zero ) {
133
- std::vector<BlockReadReq> reqs = ConvertSliceReadReqToBlockReadReqs (
134
- slice_req, chunk_.fs_id , chunk_.ino , chunk_.chunk_size ,
135
- chunk_.block_size );
136
-
137
- block_reqs.insert (block_reqs.end (),
138
- std::make_move_iterator (reqs.begin ()),
139
- std::make_move_iterator (reqs.end ()));
140
- } else {
141
- char * buf_pos = buf + (slice_req.file_offset - read_file_offset);
142
- VLOG (6 ) << fmt::format (" {} Read buf: {}, zero fill, read_size: {}" ,
143
- UUID (), Char2Addr (buf_pos), slice_req.len );
144
- memset (buf_pos, 0 , slice_req.len );
151
+ {
152
+ auto slice_req_to_block_req_span = tracer->StartSpanWithParent (
153
+ kVFSDataMoudule ,
154
+ " ChunkReader::DoRead.ConvertSliceReadReqToBlockReadReqs" , *span);
155
+
156
+ for (auto & slice_req : slice_reqs) {
157
+ VLOG (6 ) << " {} Read slice_req: " << slice_req.ToString ();
158
+
159
+ if (slice_req.slice .has_value () && !slice_req.slice .value ().is_zero ) {
160
+ std::vector<BlockReadReq> reqs = ConvertSliceReadReqToBlockReadReqs (
161
+ slice_req, chunk_.fs_id , chunk_.ino , chunk_.chunk_size ,
162
+ chunk_.block_size );
163
+
164
+ block_reqs.insert (block_reqs.end (),
165
+ std::make_move_iterator (reqs.begin ()),
166
+ std::make_move_iterator (reqs.end ()));
167
+ } else {
168
+ char * buf_pos = buf + (slice_req.file_offset - read_file_offset);
169
+ VLOG (6 ) << fmt::format (" {} Read buf: {}, zero fill, read_size: {}" ,
170
+ UUID (), Char2Addr (buf_pos), slice_req.len );
171
+ memset (buf_pos, 0 , slice_req.len );
172
+ }
145
173
}
146
174
}
147
175
@@ -176,13 +204,20 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
176
204
shared.status = Status::OK ();
177
205
178
206
for (auto & block_cache_req : block_cache_reqs) {
207
+ auto block_cache_range_span = tracer->StartSpanWithParent (
208
+ kVFSDataMoudule , " ChunkReader::DoRead.AsyncRange" , *span);
209
+
210
+ auto callback = [this , &span, &block_cache_req, &shared,
211
+ span_ptr = block_cache_range_span.release ()](Status s) {
212
+ std::unique_ptr<ITraceSpan> block_cache_range_span (span_ptr);
213
+ block_cache_range_span->End ();
214
+ BlockReadCallback (span->GetContext (), this , block_cache_req, shared, s);
215
+ };
216
+
179
217
hub_->GetBlockCache ()->AsyncRange (
180
218
cache::NewContext (), block_cache_req.key ,
181
219
block_cache_req.block_req .block_offset , block_cache_req.block_req .len ,
182
- &block_cache_req.io_buffer ,
183
- [this , &block_cache_req, &shared](Status s) {
184
- BlockReadCallback (this , block_cache_req, shared, s);
185
- },
220
+ &block_cache_req.io_buffer , std::move (callback),
186
221
block_cache_req.option );
187
222
}
188
223
@@ -201,6 +236,9 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
201
236
UUID (), ret.ToString (), retry, chunk_offset, end_read_chunk_offet,
202
237
read_file_offset, end_read_file_offset);
203
238
239
+ if (ret.IsNotFound ()) {
240
+ InvalidateSlices (chunk_slices.version );
241
+ }
204
242
} while (ret.IsNotFound () &&
205
243
retry++ < FLAGS_vfs_read_max_retry_block_not_found);
206
244
@@ -209,6 +247,50 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
209
247
cb (ret);
210
248
}
211
249
250
+ void ChunkReader::Invalidate () {
251
+ VLOG (4 ) << fmt::format (" {} Invalidate, cversion: {}" , UUID (),
252
+ cversion_.load (std::memory_order_relaxed));
253
+ std::lock_guard<std::mutex> lg (mutex_);
254
+ cversion_ = kInvalidVersion ;
255
+ slices_.clear ();
256
+ }
257
+
258
+ Status ChunkReader::GetSlices (ContextSPtr ctx, ChunkSlices* chunk_slices) {
259
+ auto * tracer = hub_->GetTracer ();
260
+ auto span = tracer->StartSpanWithContext (kVFSDataMoudule , METHOD_NAME (), ctx);
261
+
262
+ std::lock_guard<std::mutex> lg (mutex_);
263
+ if (cversion_ == kInvalidVersion ) {
264
+ auto slice_span = tracer->StartSpanWithParent (
265
+ kVFSDataMoudule , " ChunkReader::GetSlices.ReadSlice" , *span);
266
+
267
+ std::vector<Slice> slices;
268
+ DINGOFS_RETURN_NOT_OK (hub_->GetMetaSystem ()->ReadSlice (
269
+ slice_span->GetContext (), chunk_.ino , chunk_.index , 0 , &slices));
270
+
271
+ cversion_.store (next_version_, std::memory_order_relaxed);
272
+ slices_ = std::move (slices);
273
+
274
+ next_version_++;
275
+ }
276
+
277
+ chunk_slices->version = cversion_;
278
+ chunk_slices->slices = slices_;
279
+
280
+ return Status::OK ();
281
+ }
282
+
283
+ void ChunkReader::InvalidateSlices (uint32_t version) {
284
+ VLOG (4 ) << fmt::format (" {} InvalidateSlices, version: {}, cversion: {}" ,
285
+ UUID (), version,
286
+ cversion_.load (std::memory_order_relaxed));
287
+ std::lock_guard<std::mutex> lg (mutex_);
288
+ if (cversion_ <= version) {
289
+ cversion_ = kInvalidVersion ;
290
+ slices_.clear ();
291
+ }
292
+ }
293
+
212
294
} // namespace vfs
213
295
214
296
} // namespace client
0 commit comments