@@ -2992,12 +2992,16 @@ FetchResult Executor::fetchChunks(
2992
2992
if (memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
2993
2993
std::mutex all_frag;
2994
2994
std::atomic<bool > empty_frags{false };
2995
- tbb::task_arena limitedArena (16 );
2995
+ tbb::task_arena limitedArena (1 );
2996
+ for (const auto & selected_frag_ids : frag_ids_crossjoin) {
2997
+ selected_frag_ids_vec.push_back (selected_frag_ids);
2998
+ }
2999
+ all_frag_col_buffers.resize (selected_frag_ids_vec.size ());
3000
+
2996
3001
limitedArena.execute ([&]() {
2997
- tbb::parallel_for_each (
2998
- frag_ids_crossjoin.begin (),
2999
- frag_ids_crossjoin.end (),
3000
- [&](const std::vector<size_t >& selected_frag_ids) {
3002
+ tbb::parallel_for (
3003
+ 0ul , selected_frag_ids_vec.size (), [&](const size_t idx) {
3004
+ const auto & selected_frag_ids = selected_frag_ids_vec[idx];
3001
3005
std::vector<const int8_t *> frag_col_buffers (
3002
3006
plan_state_->global_to_local_col_ids_ .size ());
3003
3007
for (const auto & col_id : col_global_ids) {
@@ -3029,6 +3033,7 @@ FetchResult Executor::fetchChunks(
3029
3033
// determine if we need special treatment to linearlize multi-frag table
3030
3034
// i.e., a column that is classified as varlen type, i.e., array
3031
3035
// for now, we can support more types in this way
3036
+ all_frag.lock ();
3032
3037
if (needLinearizeAllFragments (
3033
3038
*col_id, ra_exe_unit, selected_fragments, memory_level)) {
3034
3039
bool for_lazy_fetch = false ;
@@ -3055,7 +3060,9 @@ FetchResult Executor::fetchChunks(
3055
3060
device_allocator,
3056
3061
/* thread_idx=*/ 0 );
3057
3062
}
3063
+ all_frag.unlock ();
3058
3064
} else {
3065
+ LOG (INFO) << " Pushing to idx " << idx;
3059
3066
frag_col_buffers[it->second ] =
3060
3067
column_fetcher.getOneTableColumnFragment (col_id->getColInfo (),
3061
3068
frag_id,
@@ -3067,10 +3074,8 @@ FetchResult Executor::fetchChunks(
3067
3074
device_allocator);
3068
3075
}
3069
3076
}
3070
- all_frag.lock ();
3071
- selected_frag_ids_vec.push_back (selected_frag_ids);
3072
- all_frag_col_buffers.push_back (frag_col_buffers);
3073
- all_frag.unlock ();
3077
+ LOG (INFO) << " frag_col_buffers size to push: " << frag_col_buffers.size ();
3078
+ all_frag_col_buffers[idx] = frag_col_buffers;
3074
3079
});
3075
3080
});
3076
3081
if (empty_frags) {
@@ -3152,6 +3157,29 @@ FetchResult Executor::fetchChunks(
3152
3157
}
3153
3158
std::tie (all_num_rows, all_frag_offsets) = getRowCountAndOffsetForAllFrags (
3154
3159
ra_exe_unit, selected_frag_ids_vec, ra_exe_unit.input_descs , all_tables_fragments);
3160
+ // if(memory_level == Data_Namespace::MemoryLevel::GPU_LEVEL){
3161
+ // LOG(INFO) << "selected_frag_ids_vec - all_frag_col_buffers - all_num_rows -
3162
+ // all_frag_offsets"; LOG(INFO) << "Sizes: " << selected_frag_ids_vec.size() << " - "
3163
+ // << all_frag_col_buffers.size() << " - " << all_num_rows.size() << " - " <<
3164
+ // all_frag_offsets.size(); for(size_t idx = 0; idx < selected_frag_ids_vec.size();
3165
+ // idx++){
3166
+ // LOG(INFO) << "Sizes[" << idx << "]: " << selected_frag_ids_vec[idx].size() << " -
3167
+ // " << all_frag_col_buffers[idx].size() << " - " << all_num_rows[idx].size() << "
3168
+ // - " << all_frag_offsets[idx].size(); for(size_t iidx = 0; iidx <
3169
+ // selected_frag_ids_vec[idx].size(); iidx++){
3170
+ // LOG(INFO) << " -- "<< selected_frag_ids_vec[idx][iidx] << " - " <<
3171
+ // (all_frag_col_buffers[idx].size()? ((void*)(all_frag_col_buffers[idx][iidx])) :
3172
+ // "__") << " - " << all_num_rows[idx][iidx] << " - " <<
3173
+ // all_frag_offsets[idx][iidx];
3174
+ // }
3175
+ // for(size_t iidx = selected_frag_ids_vec[idx].size(); iidx <
3176
+ // all_frag_col_buffers[idx].size(); iidx++){
3177
+ // LOG(INFO) << " -- "<< " __ "<< " - " <<
3178
+ // (void*)(all_frag_col_buffers[idx][iidx]) << " - " << " __ " << " - " << " __
3179
+ // ";
3180
+ // }
3181
+ // }
3182
+ // }
3155
3183
return {all_frag_col_buffers, all_num_rows, all_frag_offsets};
3156
3184
}
3157
3185
0 commit comments