-
Notifications
You must be signed in to change notification settings - Fork 429
feat[accl-barex]: add barex_transport by build with USE_BAREX #1045
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Summary of ChangesHello @ZechaoZhang-beta, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly expands the data transfer capabilities by introducing and integrating the Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new barex transport layer, which is a significant feature addition. The changes span across CMake configuration, C++ source code for the transfer engine, and Python integration files. While the feature implementation is comprehensive, there are several critical issues related to correctness, memory management, and maintainability that need to be addressed. These include a copy-paste error in the transport name, memory leaks, incorrect concurrency control, duplicated complex logic, and unsafe parsing of environment variables. Addressing these points will greatly improve the robustness and quality of the new transport layer.
| std::shared_ptr<TransferMetadata> meta, | ||
| std::shared_ptr<Topology> topo) override; | ||
|
|
||
| const char *getName() const override { return "rdma"; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This transport's getName() method returns "rdma", which is incorrect for BarexTransport. This seems to be a copy-paste error and will cause issues with transport selection logic. It should return "barex".
| const char *getName() const override { return "rdma"; } | |
| const char *getName() const override { return "barex"; } |
| status = 1; | ||
| } | ||
| } | ||
| if (!status) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in CheckStatus is inverted. The status flag is set to true when a bad channel is found. The if (!status) check then incorrectly returns an error when all channels are healthy. This should be if (status) to correctly report an error when there are invalid channels.
| if (!status) { | |
| if (status) { |
| } | ||
| ContextConfig server_config = XConfigUtil::DefaultContextConfig(); | ||
| XContext *raw_server_context = nullptr; | ||
| result = XContext::NewInstance(raw_server_context, server_config, new EmptyCallback(), dev, mempool, server_threadpool); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a memory leak here. new EmptyCallback() allocates a new callback object on the heap, but it is never deleted. Since this is inside a loop, it will leak memory for each device context created. The same issue exists on line 1216. You should manage the lifecycle of the callback object, for example by creating it on the stack if its lifetime is contained within the function, or using a smart pointer if it needs to be shared.
| Status BarexTransport::submitTransfer( | ||
| BatchID batch_id, const std::vector<TransferRequest> &entries) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for processing slices, especially handling memory regions that span multiple registered blocks (the 1:1, 1:2, 2:1, 2:2 cases), is very complex and is duplicated almost identically in submitTransferTask (starting at line 679). This massive code duplication makes the code extremely difficult to maintain and prone to bugs. This complex logic should be extracted into a shared private helper function to ensure consistency and improve readability.
|
|
||
| // 查询某个 SegmentID 下的 channel 状态 | ||
| bool CheckAllChannels(SegmentID segment_id) { | ||
| RWSpinlock::WriteGuard guard(lock_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CheckAllChannels appears to be a read-only operation, but it uses a RWSpinlock::WriteGuard. This is inefficient and semantically incorrect. It should use a RWSpinlock::ReadGuard to allow for concurrent reads. The same issue exists in copyAll at line 148.
| RWSpinlock::WriteGuard guard(lock_); | |
| RWSpinlock::ReadGuard guard(lock_); |
| int total_channels = client_ctx_cnt * client_context_list_[0]->getQpNum(); | ||
| CountDownLatch connect_latch(total_channels); | ||
| std::vector<XChannel *> channels; | ||
| static std::mutex push_channel_mtx; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a static std::mutex inside a member function makes it shared across all instances of BarexTransport. If multiple instances of BarexTransport call OpenChannel concurrently, they will contend for the same lock, which is likely not the intended behavior and can lead to performance bottlenecks or incorrect behavior. Consider making this a member variable (std::mutex) if thread safety is needed within a single instance, or redesign the channel collection logic to be thread-safe without a static lock.
| if (pass_alloc_env) { | ||
| int val = atoi(pass_alloc_env); | ||
| if (val != 0) { | ||
| pass_alloc = true; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The use of atoi is generally discouraged as it has undefined behavior on errors (e.g., when the input string is not a valid number or is out of range). For parsing environment variables, it's safer to use std::stoi within a try-catch block or strtol to handle potential parsing errors gracefully. This pattern of using atoi appears in other places in this PR as well (transfer_engine.cpp:79, transfer_metadata_plugin.cpp:1150, etc.). Consider replacing it with a safer alternative throughout.
| if (pass_alloc_env) { | |
| int val = atoi(pass_alloc_env); | |
| if (val != 0) { | |
| pass_alloc = true; | |
| } | |
| } | |
| if (pass_alloc_env) { | |
| try { | |
| if (std::stoi(pass_alloc_env) != 0) { | |
| pass_alloc = true; | |
| } | |
| } catch (const std::exception&) { | |
| // Ignore invalid values or log a warning | |
| } | |
| } |
| Status segment_status = engine_->CheckSegmentStatus(handle); | ||
| if (!segment_status.ok()) { | ||
| LOG(WARNING) << "submitTransfer failed with target " << target_hostname << ", CheckSegmentStatus not ok, ready to closeSegment"; | ||
| std::lock_guard<std::mutex> guard(mutex_); | ||
| engine_->closeSegment(handle); | ||
| engine_->getMetadata()->removeSegmentDesc(target_hostname); | ||
| handle_map_.erase(target_hostname); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
|
|
||
| private: | ||
| std::unordered_map<SegmentID, std::unordered_map<int, std::vector<XChannel*>>> cache_; | ||
| std::unordered_map<SegmentID, bool> status_map_; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR adds support for the barex_transport backend as an alternative to RDMA transport. The implementation can be enabled at build time with the USE_BAREX flag and at runtime with the USE_BAREX environment variable. Key changes include new barex transport implementation files, integration with the transfer engine, build system updates, and modifications to wheel packaging to handle the barex library dependency.
Reviewed Changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 13 comments.
Show a summary per file
| File | Description |
|---|---|
| scripts/build_wheel.sh | Updated to conditionally copy allocator files when barex or nvlink libraries are present and exclude libaccl_barex.so from wheel |
| mooncake-transfer-engine/src/transport/barex_transport/barex_transport.cpp | New 1322-line implementation of barex transport layer with memory registration and transfer logic |
| mooncake-transfer-engine/src/transport/barex_transport/barex_context.cpp | New context management implementation for barex channels and batch operations |
| mooncake-transfer-engine/src/transport/barex_transport/CMakeLists.txt | Build configuration for barex transport module |
| mooncake-transfer-engine/src/transport/CMakeLists.txt | Added conditional compilation of barex_transport subdirectory |
| mooncake-transfer-engine/src/transfer_metadata_plugin.cpp | Added port range configuration for barex via environment variables |
| mooncake-transfer-engine/src/transfer_metadata.cpp | Extended metadata encoding/decoding to support barex protocol and barex_port field |
| mooncake-transfer-engine/src/transfer_engine.cpp | Integrated barex transport selection and channel management into main engine |
| mooncake-transfer-engine/src/multi_transport.cpp | Added barex transport registration and EIC NIC filtering logic |
| mooncake-transfer-engine/src/config.cpp | Added MC_MIN_REG_SIZE environment variable for configuring eic_max_block_size |
| mooncake-transfer-engine/src/CMakeLists.txt | Added CMAKE_INCLUDE definition and barex_transport linking |
| mooncake-transfer-engine/include/transport/transport.h | Extended Slice struct with dest_rkeys vector and lkey_index fields; added OpenChannel/CheckStatus virtual methods |
| mooncake-transfer-engine/include/transport/barex_transport/barex_transport.h | New header defining BarexTransport class with 175 lines |
| mooncake-transfer-engine/include/transport/barex_transport/barex_context.h | New header defining BarexContext and ChannelCache classes |
| mooncake-transfer-engine/include/transfer_metadata_plugin.h | Updated findAvailableTcpPort signature to accept optional set_range parameter |
| mooncake-transfer-engine/include/transfer_metadata.h | Added barex_port fields to RpcMetaDesc and HandShakeDesc structures |
| mooncake-transfer-engine/include/transfer_engine.h | Added CheckSegmentStatus method and use_barex_ flag |
| mooncake-transfer-engine/include/config.h | Added eic_max_block_size configuration field |
| mooncake-transfer-engine/example/transfer_engine_bench.cpp | Extended protocol options to support barex in example code |
| mooncake-integration/transfer_engine/transfer_engine_py.cpp | Added segment status checking and recovery logic; optional buddy allocation bypass |
| mooncake-integration/allocator.py | Changed allocator wrapper function names to stream-based versions |
| mooncake-common/common.cmake | Added USE_BAREX build option |
Comments suppressed due to low confidence (1)
mooncake-transfer-engine/src/transfer_engine.cpp:1
- Inconsistent spacing: double space before 'target_hostname'. Should be single space.
// Copyright 2024 KVCache.AI
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| int min_port = globalConfig().rpc_min_port;; | ||
| int max_port = globalConfig().rpc_max_port;; |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra semicolons after variable declarations. Should use single semicolon.
| int min_port = globalConfig().rpc_min_port;; | |
| int max_port = globalConfig().rpc_max_port;; | |
| int min_port = globalConfig().rpc_min_port; | |
| int max_port = globalConfig().rpc_max_port; |
| nics += ","; | ||
| } | ||
|
|
||
| // 移除最后一个多余的逗号 |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is in Chinese. Should be translated to English for consistency: '// Remove the last extra comma'.
| // 移除最后一个多余的逗号 | |
| // Remove the last extra comma |
| << ", listening on " << desc.ip_or_host_name << ":" | ||
| << desc.rpc_port; | ||
| << desc.rpc_port | ||
| << (use_barex_ ? ", barex use port:" + std::to_string(desc.barex_port) : ""); |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Inconsistent spacing in log message. Should be 'barex uses port:' for correct grammar.
| << (use_barex_ ? ", barex use port:" + std::to_string(desc.barex_port) : ""); | |
| << (use_barex_ ? ", barex uses port:" + std::to_string(desc.barex_port) : ""); |
| size_t val = atoll(min_reg_size_env); | ||
| if (val > 0) { | ||
| config.eic_max_block_size = val; | ||
| LOG(INFO) << "Barex set MC_MIN_REG_SIZE=" << val; |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message prefix 'Barex set' is unclear. Should be 'Set MC_MIN_REG_SIZE from environment variable' for clarity.
| LOG(INFO) << "Barex set MC_MIN_REG_SIZE=" << val; | |
| LOG(INFO) << "Set MC_MIN_REG_SIZE from environment variable: " << val; |
|
|
||
| class ChannelCache { | ||
| public: | ||
| // 添加一个 channel 到指定 key & nic_id |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is in Chinese. Should be translated to English: '// Add a channel to the specified key & nic_id'.
| // 添加一个 channel 到指定 key & nic_id | |
| // Add a channel to the specified key & nic_id |
| return true; | ||
| } | ||
|
|
||
| // 检查并删除某个 SegmentID 下的异常channel,并返回删除的数量 |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is in Chinese. Should be translated to English: '// Check and remove invalid channels under a SegmentID, return the number of removed channels'.
| // 检查并删除某个 SegmentID 下的异常channel,并返回删除的数量 | |
| // Check and remove invalid channels under a SegmentID, return the number of removed channels |
| return invalid_count; | ||
| } | ||
|
|
||
| // 将所有的 channel 以 vector 形式返回 |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment is in Chinese. Should be translated to English: '// Return all channels as a vector'.
| // 将所有的 channel 以 vector 形式返回 | |
| // Return all channels as a vector |
| if (dtype == CPU) { | ||
| buf_length_map_.emplace(addr, std::make_pair(length, 0)); | ||
| } else { | ||
| buf_length_map_.emplace(addr, std::make_pair(length, 1)); | ||
| } |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Magic numbers 0 and 1 are used to represent CPU and GPU types. Consider using boolean or enum constants for clarity, e.g., 'false' for CPU and 'true' for GPU, or define named constants.
| if (dtype == CPU) { | |
| buf_length_map_.emplace(addr, std::make_pair(length, 0)); | |
| } else { | |
| buf_length_map_.emplace(addr, std::make_pair(length, 1)); | |
| } | |
| buf_length_map_.emplace(addr, std::make_pair(length, dtype)); |
| << ", " << buffer.name | ||
| << ", " << buffer.addr | ||
| << ", " << buffer.length | ||
| << ", " << buffer.rkey.size() | ||
| << ", " << buffer.lkey.size(); |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Error message lacks field labels, making it difficult to understand which values correspond to which fields. Should include labels like 'buffer.name=', 'buffer.addr=', etc.
| << ", " << buffer.name | |
| << ", " << buffer.addr | |
| << ", " << buffer.length | |
| << ", " << buffer.rkey.size() | |
| << ", " << buffer.lkey.size(); | |
| << ", buffer.name=" << buffer.name | |
| << ", buffer.addr=" << buffer.addr | |
| << ", buffer.length=" << buffer.length | |
| << ", buffer.rkey.size=" << buffer.rkey.size() | |
| << ", buffer.lkey.size=" << buffer.lkey.size(); |
| if [ -f build/mooncake-transfer-engine/nvlink-allocator/nvlink_allocator.so ] \ | ||
| || [ -f /usr/lib/libaccl_barex.so ] \ | ||
| || [ -f /usr/lib64/libaccl_barex.so ]; then |
Copilot
AI
Nov 11, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition checks for barex library existence but the code inside still only copies nvlink_allocator.so conditionally. The outer condition logic seems inconsistent - if barex exists, it will copy allocator.py even though nvlink_allocator.so might not exist.
| engine_->closeSegment(handle); | ||
| engine_->getMetadata()->removeSegmentDesc(target_hostname); | ||
| handle_map_.erase(target_hostname); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code block looks like the same as above
| so_path = cls._get_so_path() | ||
| cls._instances[device] = CUDAPluggableAllocator( | ||
| so_path, "u2mm_alloc_wrapper", "u2mm_free_wrapper" | ||
| so_path, "u2mm_alloc_wrapper_with_stream", "u2mm_free_wrapper_with_stream" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if this has a compatible issue
| if (min_reg_size_env) { | ||
| size_t val = atoll(min_reg_size_env); | ||
| if (val > 0) { | ||
| config.eic_max_block_size = val; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's confusing to assign a miv value to a max variable
| nics.pop_back(); | ||
| } | ||
|
|
||
| if (!nics.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The empty nics have any side effects?
| pass_alloc = true; | ||
| } | ||
| } catch (const std::exception &) { | ||
| // Ignore invalid values or log a warning |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A warning seems to be necessary
| #endif | ||
| if (metadata_conn_string == P2PHANDSHAKE) { | ||
| rpc_binding_method = "P2P handshake"; | ||
| desc.rpc_port = findAvailableTcpPort(desc.sockfd); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This call to findAvailableTcpPort needs two args
| int min_port = globalConfig().rpc_min_port; | ||
| int max_port = globalConfig().rpc_max_port; | ||
| #ifdef USE_BAREX | ||
| if (set_range) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new arg for a single MACRO is tricky
No description provided.