Coverage Report

Created: 2026-03-14 04:23

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/materialization_opertor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/materialization_opertor.h"
19
20
#include <bthread/countdown_event.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
24
#include <utility>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/column/column.h"
29
#include "exec/operator/operator.h"
30
#include "exec/rowid_fetcher.h"
31
#include "exec/scan/file_scanner.h"
32
#include "util/brpc_client_cache.h"
33
#include "util/brpc_closure.h"
34
35
namespace doris {
36
37
2
void MaterializationSharedState::get_block(Block* block) {
38
8
    for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i < origin_block.columns(); i++) {
39
6
        if (i != rowid_to_block_loc) {
40
3
            block->insert(origin_block.get_by_position(i));
41
3
        } else {
42
3
            auto response_block = response_blocks[j].to_block();
43
6
            for (int k = 0; k < response_block.columns(); k++) {
44
3
                auto& data = response_block.get_by_position(k);
45
3
                response_blocks[j].mutable_columns()[k] = data.column->clone_empty();
46
3
                block->insert(data);
47
3
            }
48
3
            if (++j < rowid_locs.size()) {
49
1
                rowid_to_block_loc = rowid_locs[j];
50
1
            }
51
3
        }
52
6
    }
53
2
    origin_block.clear();
54
2
}
55
56
2
Status MaterializationSharedState::merge_multi_response() {
57
2
    std::unordered_map<int64_t, std::pair<Block, int>> block_maps;
58
59
5
    for (int i = 0; i < block_order_results.size(); ++i) {
60
6
        for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
61
6
            Block partial_block;
62
6
            size_t uncompressed_size = 0;
63
6
            int64_t uncompressed_time = 0;
64
6
            DCHECK(rpc_struct.response.blocks_size() > i);
65
6
            RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
66
6
                                                      &uncompressed_size, &uncompressed_time));
67
6
            if (rpc_struct.response.blocks(i).has_profile()) {
68
0
                auto response_profile =
69
0
                        RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
70
0
                _update_profile_info(backend_id, response_profile.get());
71
0
            }
72
73
6
            if (!partial_block.is_empty_column()) {
74
6
                block_maps[backend_id] = std::make_pair(std::move(partial_block), 0);
75
6
            }
76
6
        }
77
78
12
        for (int j = 0; j < block_order_results[i].size(); ++j) {
79
9
            auto backend_id = block_order_results[i][j];
80
9
            if (backend_id) {
81
6
                if (UNLIKELY(block_maps.find(backend_id) == block_maps.end())) {
82
0
                    return Status::InternalError(
83
0
                            fmt::format("MaterializationSharedState::merge_multi_response, "
84
0
                                        "backend_id {} not found in block_maps",
85
0
                                        backend_id));
86
0
                }
87
6
                auto& source_block_rows = block_maps[backend_id];
88
6
                DCHECK(source_block_rows.second < source_block_rows.first.rows());
89
12
                for (int k = 0; k < response_blocks[i].columns(); ++k) {
90
6
                    response_blocks[i].get_column_by_position(k)->insert_from(
91
6
                            *source_block_rows.first.get_by_position(k).column,
92
6
                            source_block_rows.second);
93
6
                }
94
6
                source_block_rows.second++;
95
6
            } else {
96
6
                for (int k = 0; k < response_blocks[i].columns(); ++k) {
97
3
                    response_blocks[i].get_column_by_position(k)->insert_default();
98
3
                }
99
3
            }
100
9
        }
101
3
    }
102
103
    // clear request/response
104
4
    for (auto& [_, rpc_struct] : rpc_struct_map) {
105
8
        for (int i = 0; i < rpc_struct.request.request_block_descs_size(); ++i) {
106
4
            rpc_struct.request.mutable_request_block_descs(i)->clear_row_id();
107
4
            rpc_struct.request.mutable_request_block_descs(i)->clear_file_id();
108
4
        }
109
4
    }
110
2
    return Status::OK();
111
2
}
112
113
void MaterializationSharedState::_update_profile_info(int64_t backend_id,
114
0
                                                      RuntimeProfile* response_profile) {
115
0
    if (!backend_profile_info_string.contains(backend_id)) {
116
0
        backend_profile_info_string.emplace(backend_id,
117
0
                                            std::map<std::string, fmt::memory_buffer> {});
118
0
    }
119
0
    auto& info_map = backend_profile_info_string[backend_id];
120
121
0
    auto update_profile_info_key = [&](const std::string& info_key) {
122
0
        const auto* info_value = response_profile->get_info_string(info_key);
123
0
        if (info_value == nullptr) [[unlikely]] {
124
0
            LOG(WARNING) << "Get row id fetch rpc profile success, but no info key :" << info_key;
125
0
            return;
126
0
        }
127
0
        if (!info_map.contains(info_key)) {
128
0
            info_map.emplace(info_key, fmt::memory_buffer {});
129
0
        }
130
0
        fmt::format_to(info_map[info_key], "{}, ", *info_value);
131
0
    };
132
133
0
    update_profile_info_key(RowIdStorageReader::ScannersRunningTimeProfile);
134
0
    update_profile_info_key(RowIdStorageReader::InitReaderAvgTimeProfile);
135
0
    update_profile_info_key(RowIdStorageReader::GetBlockAvgTimeProfile);
136
0
    update_profile_info_key(RowIdStorageReader::FileReadLinesProfile);
137
0
    update_profile_info_key(FileScanner::FileReadBytesProfile);
138
0
    update_profile_info_key(FileScanner::FileReadTimeProfile);
139
0
}
140
141
Status MaterializationSharedState::create_muiltget_result(const Columns& columns, bool child_eos,
142
1
                                                          bool gc_id_map) {
143
1
    const auto rows = columns.empty() ? 0 : columns[0]->size();
144
1
    block_order_results.resize(columns.size());
145
146
2
    for (int i = 0; i < columns.size(); ++i) {
147
1
        const uint8_t* null_map = nullptr;
148
1
        const ColumnString* column_rowid = nullptr;
149
1
        auto& column = columns[i];
150
151
1
        if (auto column_ptr = check_and_get_column<ColumnNullable>(*column)) {
152
0
            null_map = column_ptr->get_null_map_data().data();
153
0
            column_rowid =
154
0
                    assert_cast<const ColumnString*>(column_ptr->get_nested_column_ptr().get());
155
1
        } else {
156
1
            column_rowid = assert_cast<const ColumnString*>(column.get());
157
1
        }
158
159
1
        auto& block_order = block_order_results[i];
160
1
        block_order.resize(rows);
161
162
3
        for (int j = 0; j < rows; ++j) {
163
2
            if (!null_map || !null_map[j]) {
164
2
                DCHECK(column_rowid->get_data_at(j).size == sizeof(GlobalRowLoacationV2));
165
2
                GlobalRowLoacationV2 row_location =
166
2
                        *((GlobalRowLoacationV2*)column_rowid->get_data_at(j).data);
167
2
                auto rpc_struct = rpc_struct_map.find(row_location.backend_id);
168
2
                if (UNLIKELY(rpc_struct == rpc_struct_map.end())) {
169
0
                    return Status::InternalError(
170
0
                            "MaterializationSinkOperatorX failed to find rpc_struct, backend_id={}",
171
0
                            row_location.backend_id);
172
0
                }
173
2
                rpc_struct->second.request.mutable_request_block_descs(i)->add_row_id(
174
2
                        row_location.row_id);
175
2
                rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
176
2
                        row_location.file_id);
177
2
                block_order[j] = row_location.backend_id;
178
179
                // Count rows per backend
180
2
                _backend_rows_count[row_location.backend_id]++;
181
2
            } else {
182
0
                block_order[j] = 0;
183
0
            }
184
2
        }
185
1
    }
186
187
    // Update max rows per backend
188
2
    for (const auto& [_, row_count] : _backend_rows_count) {
189
2
        if (row_count > _max_rows_per_backend) {
190
1
            _max_rows_per_backend = row_count;
191
1
        }
192
2
    }
193
194
1
    eos = child_eos;
195
1
    if (eos && gc_id_map) {
196
2
        for (auto& [_, rpc_struct] : rpc_struct_map) {
197
2
            rpc_struct.request.set_gc_id_map(true);
198
2
        }
199
1
    }
200
1
    need_merge_block = rows > 0;
201
202
1
    return Status::OK();
203
1
}
204
205
Status MaterializationSharedState::init_multi_requests(
206
0
        const TMaterializationNode& materialization_node, RuntimeState* state) {
207
0
    rpc_struct_inited = true;
208
0
    PMultiGetRequestV2 multi_get_request;
209
    // Initialize the base struct of PMultiGetRequestV2
210
0
    multi_get_request.set_be_exec_version(state->be_exec_version());
211
0
    multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
212
0
    auto query_id = multi_get_request.mutable_query_id();
213
0
    query_id->set_hi(state->query_id().hi);
214
0
    query_id->set_lo(state->query_id().lo);
215
0
    DCHECK_EQ(materialization_node.column_descs_lists.size(),
216
0
              materialization_node.slot_locs_lists.size());
217
218
0
    const auto& tuple_desc =
219
0
            state->desc_tbl().get_tuple_descriptor(materialization_node.intermediate_tuple_id);
220
0
    const auto& slots = tuple_desc->slots();
221
0
    response_blocks = std::vector<MutableBlock>(materialization_node.column_descs_lists.size());
222
223
0
    for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) {
224
0
        auto request_block_desc = multi_get_request.add_request_block_descs();
225
0
        request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]);
226
        // Initialize the column_descs and slot_locs
227
0
        auto& column_descs = materialization_node.column_descs_lists[i];
228
0
        for (auto& column_desc_item : column_descs) {
229
0
            TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs());
230
0
        }
231
232
0
        auto& slot_locs = materialization_node.slot_locs_lists[i];
233
0
        tuple_desc->to_protobuf(request_block_desc->mutable_desc());
234
235
0
        auto& column_idxs = materialization_node.column_idxs_lists[i];
236
0
        for (auto idx : column_idxs) {
237
0
            request_block_desc->add_column_idxs(idx);
238
0
        }
239
240
0
        std::vector<SlotDescriptor*> slots_res;
241
0
        for (auto& slot_loc_item : slot_locs) {
242
0
            slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots());
243
0
            slots_res.emplace_back(slots[slot_loc_item]);
244
0
        }
245
0
        response_blocks[i] = MutableBlock(Block(slots_res, 10));
246
0
    }
247
248
    // Initialize the stubs and requests for each BE
249
0
    for (const auto& node_info : materialization_node.nodes_info.nodes) {
250
0
        auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
251
0
                node_info.host, node_info.async_internal_port);
252
0
        if (!client) {
253
0
            LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
254
0
                         << ", port=" << node_info.async_internal_port;
255
0
            return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
256
0
                                         node_info.host, node_info.async_internal_port);
257
0
        }
258
0
        rpc_struct_map.emplace(node_info.id,
259
0
                               FetchRpcStruct {.stub = std::move(client),
260
0
                                               .cntl = std::make_unique<brpc::Controller>(),
261
0
                                               .request = multi_get_request,
262
0
                                               .response = PMultiGetResponseV2()});
263
0
    }
264
265
0
    return Status::OK();
266
0
}
267
268
0
Status MaterializationOperator::init(const doris::TPlanNode& tnode, doris::RuntimeState* state) {
269
0
    RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
270
0
    DCHECK(tnode.__isset.materialization_node);
271
0
    _materialization_node = tnode.materialization_node;
272
0
    _gc_id_map = tnode.materialization_node.gc_id_map;
273
    // Create result_expr_ctx_lists_ from thrift exprs.
274
0
    auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
275
0
    RETURN_IF_ERROR(VExpr::create_expr_trees(fetch_expr_lists, _rowid_exprs));
276
0
    return Status::OK();
277
0
}
278
279
0
Status MaterializationOperator::prepare(RuntimeState* state) {
280
0
    RETURN_IF_ERROR(Base::prepare(state));
281
0
    RETURN_IF_ERROR(VExpr::prepare(_rowid_exprs, state, _child->row_desc()));
282
0
    RETURN_IF_ERROR(VExpr::open(_rowid_exprs, state));
283
0
    return Status::OK();
284
0
}
285
286
0
bool MaterializationOperator::need_more_input_data(RuntimeState* state) const {
287
0
    auto& local_state = get_local_state(state);
288
0
    return !local_state._materialization_state.origin_block.rows() &&
289
0
           !local_state._materialization_state.eos;
290
0
}
291
292
0
Status MaterializationOperator::pull(RuntimeState* state, Block* output_block, bool* eos) const {
293
0
    auto& local_state = get_local_state(state);
294
0
    output_block->clear();
295
0
    if (local_state._materialization_state.need_merge_block) {
296
0
        local_state._materialization_state.get_block(output_block);
297
0
    }
298
0
    *eos = local_state._materialization_state.eos;
299
300
0
    if (*eos) {
301
0
        for (const auto& [backend_id, child_info] :
302
0
             local_state._materialization_state.backend_profile_info_string) {
303
0
            auto child_profile = local_state.operator_profile()->create_child(
304
0
                    "RowIDFetcher: BackendId:" + std::to_string(backend_id));
305
0
            for (const auto& [info_key, info_value] :
306
0
                 local_state._materialization_state.backend_profile_info_string[backend_id]) {
307
0
                child_profile->add_info_string(info_key, "{" + fmt::to_string(info_value) + "}");
308
0
            }
309
0
            local_state.operator_profile()->add_child(child_profile, true);
310
0
        }
311
0
    }
312
313
0
    return Status::OK();
314
0
}
315
316
0
Status MaterializationOperator::push(RuntimeState* state, Block* in_block, bool eos) const {
317
0
    auto& local_state = get_local_state(state);
318
0
    SCOPED_TIMER(local_state.exec_time_counter());
319
0
    if (!local_state._materialization_state.rpc_struct_inited) {
320
0
        RETURN_IF_ERROR(local_state._materialization_state.init_multi_requests(
321
0
                _materialization_node, state));
322
0
    }
323
324
0
    if (in_block->rows() > 0 || eos) {
325
        // execute the rowid exprs
326
0
        Columns columns;
327
0
        if (in_block->rows() != 0) {
328
0
            local_state._materialization_state.rowid_locs.resize(_rowid_exprs.size());
329
0
            for (int i = 0; i < _rowid_exprs.size(); ++i) {
330
0
                auto& rowid_expr = _rowid_exprs[i];
331
0
                RETURN_IF_ERROR(rowid_expr->execute(
332
0
                        in_block, &local_state._materialization_state.rowid_locs[i]));
333
0
                columns.emplace_back(
334
0
                        in_block->get_by_position(local_state._materialization_state.rowid_locs[i])
335
0
                                .column);
336
0
            }
337
0
            local_state._materialization_state.origin_block.swap(*in_block);
338
0
        }
339
0
        RETURN_IF_ERROR(local_state._materialization_state.create_muiltget_result(columns, eos,
340
0
                                                                                  _gc_id_map));
341
342
0
        auto size = local_state._materialization_state.rpc_struct_map.size();
343
0
        bthread::CountdownEvent counter(static_cast<int>(size));
344
0
        MonotonicStopWatch rpc_timer(true);
345
0
        for (auto& [backend_id, rpc_struct] : local_state._materialization_state.rpc_struct_map) {
346
0
            auto callback = brpc::NewCallback(fetch_callback, &counter);
347
0
            rpc_struct.cntl->set_timeout_ms(state->execution_timeout() * 1000);
348
            // send brpc request
349
0
            rpc_struct.stub->multiget_data_v2(rpc_struct.cntl.get(), &rpc_struct.request,
350
0
                                              &rpc_struct.response, callback);
351
0
        }
352
0
        counter.wait();
353
0
        if (auto time = rpc_timer.elapsed_time(); time > local_state._max_rpc_timer->value()) {
354
0
            local_state._max_rpc_timer->set(static_cast<int64_t>(time));
355
0
        }
356
357
0
        for (auto& [backend_id, rpc_struct] : local_state._materialization_state.rpc_struct_map) {
358
0
            if (rpc_struct.cntl->Failed()) {
359
0
                std::string error_text =
360
0
                        "Failed to send brpc request, error_text=" + rpc_struct.cntl->ErrorText() +
361
0
                        " Materialization Sink node id:" + std::to_string(node_id()) +
362
0
                        " target_backend_id:" + std::to_string(backend_id);
363
0
                return Status::InternalError(error_text);
364
0
            }
365
0
            rpc_struct.cntl->Reset();
366
0
        }
367
368
0
        if (local_state._materialization_state.need_merge_block) {
369
0
            SCOPED_TIMER(local_state._merge_response_timer);
370
0
            RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
371
0
            local_state._max_rows_per_backend_counter->set(
372
0
                    (int64_t)local_state._materialization_state._max_rows_per_backend);
373
0
        }
374
0
    }
375
376
0
    return Status::OK();
377
0
}
378
379
} // namespace doris