Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/client/vfs/const.h → src/client/const.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,12 @@

namespace dingofs {
namespace client {
namespace vfs {

// module name
static const std::string kVFSMoudule = "vfs";
static const std::string kVFSWrapperMoudule = "vfs_wrapper";
static const std::string kVFSDataMoudule = "vfs_data";

} // namespace vfs
} // namespace client
} // namespace dingofs

Expand Down
14 changes: 8 additions & 6 deletions src/client/vfs/data/file.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,11 @@
#include <mutex>
#include <utility>

#include "client/common/utils.h"
#include "client/vfs/data/chunk.h"
#include "client/vfs/data/common/async_util.h"
#include "client/vfs/hub/vfs_hub.h"
#include "common/callback.h"
#include "common/status.h"
#include "options/client/vfs/vfs_option.h"
#include "trace/context.h"

namespace dingofs {
namespace client {
Expand Down Expand Up @@ -57,13 +55,17 @@ Status File::Write(const char* buf, uint64_t size, uint64_t offset,
uint64_t* out_wsize) {
DINGOFS_RETURN_NOT_OK(PreCheck());

return file_writer_->Write(buf, size, offset, out_wsize);
Status s = file_writer_->Write(buf, size, offset, out_wsize);
if (s.ok()) {
file_reader_->Invalidate();
}
return s;
}

Status File::Read(char* buf, uint64_t size, uint64_t offset,
Status File::Read(ContextSPtr ctx, char* buf, uint64_t size, uint64_t offset,
uint64_t* out_rsize) {
DINGOFS_RETURN_NOT_OK(PreCheck());
return file_reader_->Read(buf, size, offset, out_rsize);
return file_reader_->Read(ctx, buf, size, offset, out_rsize);
}

void File::FileFlushed(StatusCallback cb, Status status) {
Expand Down
10 changes: 6 additions & 4 deletions src/client/vfs/data/file.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,18 +35,19 @@ class VFSHub;

class File : public IFile {
public:
File(VFSHub* hub, uint64_t ino)
File(VFSHub* hub, uint64_t fh, int64_t ino)
: vfs_hub_(hub),
fh_(fh),
ino_(ino),
file_writer_(std::make_unique<FileWriter>(hub, ino)),
file_reader_(std::make_unique<FileReader>(hub, ino)) {}
file_writer_(std::make_unique<FileWriter>(hub, fh_, ino)),
file_reader_(std::make_unique<FileReader>(hub, fh_, ino)) {}

~File() override = default;

Status Write(const char* buf, uint64_t size, uint64_t offset,
uint64_t* out_wsize) override;

Status Read(char* buf, uint64_t size, uint64_t offset,
Status Read(ContextSPtr ctx, char* buf, uint64_t size, uint64_t offset,
uint64_t* out_rsize) override;

Status Flush() override;
Expand All @@ -59,6 +60,7 @@ class File : public IFile {
void FileFlushed(StatusCallback cb, Status status);

VFSHub* vfs_hub_;
const uint64_t fh_;
const uint64_t ino_;

FileWriterUPtr file_writer_;
Expand Down
3 changes: 2 additions & 1 deletion src/client/vfs/data/ifile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include "common/callback.h"
#include "common/status.h"
#include "trace/context.h"

namespace dingofs {
namespace client {
Expand All @@ -33,7 +34,7 @@ class IFile {
virtual Status Write(const char* buf, uint64_t size, uint64_t offset,
uint64_t* out_wsize) = 0;

virtual Status Read(char* buf, uint64_t size,
virtual Status Read(ContextSPtr ctx, char* buf, uint64_t size,
uint64_t offset, uint64_t* out_rsize) = 0;

virtual Status Flush() = 0;
Expand Down
156 changes: 119 additions & 37 deletions src/client/vfs/data/reader/chunk_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,33 +18,42 @@

#include <glog/logging.h>

#include <atomic>
#include <cstdint>
#include <functional>
#include <memory>
#include <utility>
#include <vector>

#include "cache/utils/context.h"
#include "client/common/utils.h"
#include "client/vfs/const.h"
#include "client/const.h"
#include "client/vfs/data/common/common.h"
#include "client/vfs/data/common/data_utils.h"
#include "client/vfs/data/reader/reader_common.h"
#include "client/vfs/hub/vfs_hub.h"
#include "common/status.h"
#include "options/client/vfs/vfs_dynamic_option.h"
#include "trace/tracer.h"
#include "trace/context.h"

namespace dingofs {
namespace client {
namespace vfs {

ChunkReader::ChunkReader(VFSHub* hub, uint64_t ino, uint64_t index)
#define METHOD_NAME() ("ChunkReader::" + std::string(__FUNCTION__))

ChunkReader::ChunkReader(VFSHub* hub, uint64_t fh, uint64_t ino, uint64_t index)
: hub_(hub),
fh_(fh),
chunk_(hub->GetFsInfo().id, ino, index, hub->GetFsInfo().chunk_size,
hub->GetFsInfo().block_size, hub->GetPageSize()) {}

void ChunkReader::BlockReadCallback(ChunkReader* reader,
void ChunkReader::BlockReadCallback(ContextSPtr ctx, ChunkReader* reader,
const BlockCacheReadReq& req,
ReaderSharedState& shared, Status s) {
auto span = reader->hub_->GetTracer()->StartSpanWithContext(
kVFSDataMoudule, METHOD_NAME(), ctx);

if (!s.ok()) {
LOG(WARNING) << fmt::format(
"{} ChunkReader fail read block_key: {}, buf_pos: {}, block_req: {} "
Expand All @@ -60,11 +69,16 @@ void ChunkReader::BlockReadCallback(ChunkReader* reader,
}

{
std::lock_guard<std::mutex> lock(shared.mtx);

auto copy_span = reader->hub_->GetTracer()->StartSpanWithParent(
kVFSDataMoudule, "ChunkReader::BlockReadCallback.IoBufferCopy", *span);
if (s.ok()) {
req.io_buffer.CopyTo(req.buf_pos);
} else {
}
}

{
std::lock_guard<std::mutex> lock(shared.mtx);
if (!s.ok()) {
// Handle read failure with error priority: other errors > NotFound
if (shared.status.ok()) {
// First error, record it directly
Expand All @@ -83,13 +97,17 @@ void ChunkReader::BlockReadCallback(ChunkReader* reader,
}
}

void ChunkReader::ReadAsync(const ChunkReadReq& req, StatusCallback cb) {
hub_->GetReadExecutor()->Execute([this, &req, cb]() { DoRead(req, cb); });
void ChunkReader::ReadAsync(ContextSPtr ctx, const ChunkReadReq& req,
StatusCallback cb) {
hub_->GetReadExecutor()->Execute(
[this, ctx, &req, cb]() { DoRead(ctx, req, cb); });
}

void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
// TODO: get ctx from parent
auto span = hub_->GetTracer()->StartSpan(kVFSDataMoudule, __func__);
void ChunkReader::DoRead(ContextSPtr ctx, const ChunkReadReq& req,
StatusCallback cb) {
auto* tracer = hub_->GetTracer();
auto span = tracer->StartSpanWithContext(kVFSDataMoudule, METHOD_NAME(), ctx);

uint64_t chunk_offset = req.offset;
uint64_t size = req.to_read_size;
char* buf = req.buf;
Expand All @@ -112,36 +130,46 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
do {
uint64_t remain_len = size;

std::vector<Slice> slices;
Status s = hub_->GetMetaSystem()->ReadSlice(span->GetContext(), chunk_.ino,
chunk_.index, 0, &slices);
ChunkSlices chunk_slices;
Status s = GetSlices(span->GetContext(), &chunk_slices);
if (!s.ok()) {
LOG(WARNING) << fmt::format("{} Read slice failed, status: {}", UUID(),
LOG(WARNING) << fmt::format("{} Failed GetSlices, status: {}", UUID(),
s.ToString());
cb(s);
}

std::vector<SliceReadReq> slice_reqs;
FileRange range{.offset = read_file_offset, .len = size};
std::vector<SliceReadReq> slice_reqs = ProcessReadRequest(slices, range);
{
auto process_slice_reqs_span = tracer->StartSpanWithParent(
kVFSDataMoudule, "ChunkReader::DoRead.ProcessReadRequest", *span);
slice_reqs = ProcessReadRequest(chunk_slices.slices, range);
}

std::vector<BlockReadReq> block_reqs;

for (auto& slice_req : slice_reqs) {
VLOG(6) << "{} Read slice_req: " << slice_req.ToString();

if (slice_req.slice.has_value() && !slice_req.slice.value().is_zero) {
std::vector<BlockReadReq> reqs = ConvertSliceReadReqToBlockReadReqs(
slice_req, chunk_.fs_id, chunk_.ino, chunk_.chunk_size,
chunk_.block_size);

block_reqs.insert(block_reqs.end(),
std::make_move_iterator(reqs.begin()),
std::make_move_iterator(reqs.end()));
} else {
char* buf_pos = buf + (slice_req.file_offset - read_file_offset);
VLOG(6) << fmt::format("{} Read buf: {}, zero fill, read_size: {}",
UUID(), Char2Addr(buf_pos), slice_req.len);
memset(buf_pos, 0, slice_req.len);
{
auto slice_req_to_block_req_span = tracer->StartSpanWithParent(
kVFSDataMoudule,
"ChunkReader::DoRead.ConvertSliceReadReqToBlockReadReqs", *span);

for (auto& slice_req : slice_reqs) {
VLOG(6) << "{} Read slice_req: " << slice_req.ToString();

if (slice_req.slice.has_value() && !slice_req.slice.value().is_zero) {
std::vector<BlockReadReq> reqs = ConvertSliceReadReqToBlockReadReqs(
slice_req, chunk_.fs_id, chunk_.ino, chunk_.chunk_size,
chunk_.block_size);

block_reqs.insert(block_reqs.end(),
std::make_move_iterator(reqs.begin()),
std::make_move_iterator(reqs.end()));
} else {
char* buf_pos = buf + (slice_req.file_offset - read_file_offset);
VLOG(6) << fmt::format("{} Read buf: {}, zero fill, read_size: {}",
UUID(), Char2Addr(buf_pos), slice_req.len);
memset(buf_pos, 0, slice_req.len);
}
}
}

Expand Down Expand Up @@ -176,13 +204,20 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
shared.status = Status::OK();

for (auto& block_cache_req : block_cache_reqs) {
auto block_cache_range_span = tracer->StartSpanWithParent(
kVFSDataMoudule, "ChunkReader::DoRead.AsyncRange", *span);

auto callback = [this, &span, &block_cache_req, &shared,
span_ptr = block_cache_range_span.release()](Status s) {
std::unique_ptr<ITraceSpan> block_cache_range_span(span_ptr);
block_cache_range_span->End();
BlockReadCallback(span->GetContext(), this, block_cache_req, shared, s);
};

hub_->GetBlockCache()->AsyncRange(
cache::NewContext(), block_cache_req.key,
block_cache_req.block_req.block_offset, block_cache_req.block_req.len,
&block_cache_req.io_buffer,
[this, &block_cache_req, &shared](Status s) {
BlockReadCallback(this, block_cache_req, shared, s);
},
&block_cache_req.io_buffer, std::move(callback),
block_cache_req.option);
}

Expand All @@ -201,6 +236,9 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
UUID(), ret.ToString(), retry, chunk_offset, end_read_chunk_offet,
read_file_offset, end_read_file_offset);

if (ret.IsNotFound()) {
InvalidateSlices(chunk_slices.version);
}
} while (ret.IsNotFound() &&
retry++ < FLAGS_vfs_read_max_retry_block_not_found);

Expand All @@ -209,6 +247,50 @@ void ChunkReader::DoRead(const ChunkReadReq& req, StatusCallback cb) {
cb(ret);
}

void ChunkReader::Invalidate() {
VLOG(4) << fmt::format("{} Invalidate, cversion: {}", UUID(),
cversion_.load(std::memory_order_relaxed));
std::lock_guard<std::mutex> lg(mutex_);
cversion_ = kInvalidVersion;
slices_.clear();
}

Status ChunkReader::GetSlices(ContextSPtr ctx, ChunkSlices* chunk_slices) {
auto* tracer = hub_->GetTracer();
auto span = tracer->StartSpanWithContext(kVFSDataMoudule, METHOD_NAME(), ctx);

std::lock_guard<std::mutex> lg(mutex_);
if (cversion_ == kInvalidVersion) {
auto slice_span = tracer->StartSpanWithParent(
kVFSDataMoudule, "ChunkReader::GetSlices.ReadSlice", *span);

std::vector<Slice> slices;
DINGOFS_RETURN_NOT_OK(hub_->GetMetaSystem()->ReadSlice(
slice_span->GetContext(), chunk_.ino, chunk_.index, 0, &slices));

cversion_.store(next_version_, std::memory_order_relaxed);
slices_ = std::move(slices);

next_version_++;
}

chunk_slices->version = cversion_;
chunk_slices->slices = slices_;

return Status::OK();
}

void ChunkReader::InvalidateSlices(uint32_t version) {
VLOG(4) << fmt::format("{} InvalidateSlices, version: {}, cversion: {}",
UUID(), version,
cversion_.load(std::memory_order_relaxed));
std::lock_guard<std::mutex> lg(mutex_);
if (cversion_ <= version) {
cversion_ = kInvalidVersion;
slices_.clear();
}
}

} // namespace vfs

} // namespace client
Expand Down
Loading