Coverage Report

Created: 2026-03-17 19:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/materialization_opertor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/materialization_opertor.h"
19
20
#include <bthread/countdown_event.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
24
#include <utility>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/column/column.h"
29
#include "exec/operator/operator.h"
30
#include "exec/rowid_fetcher.h"
31
#include "exec/scan/file_scanner.h"
32
#include "util/brpc_client_cache.h"
33
#include "util/brpc_closure.h"
34
35
namespace doris {
36
37
2
void MaterializationSharedState::get_block(Block* block) {
38
8
    for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i < origin_block.columns(); i++) {
39
6
        if (i != rowid_to_block_loc) {
40
3
            block->insert(origin_block.get_by_position(i));
41
3
        } else {
42
3
            auto response_block = response_blocks[j].to_block();
43
6
            for (int k = 0; k < response_block.columns(); k++) {
44
3
                auto& data = response_block.get_by_position(k);
45
3
                response_blocks[j].mutable_columns()[k] = data.column->clone_empty();
46
3
                block->insert(data);
47
3
            }
48
3
            if (++j < rowid_locs.size()) {
49
1
                rowid_to_block_loc = rowid_locs[j];
50
1
            }
51
3
        }
52
6
    }
53
2
    origin_block.clear();
54
2
}
55
56
2
Status MaterializationSharedState::merge_multi_response() {
57
5
    for (int i = 0; i < block_order_results.size(); ++i) {
58
        // block_maps must be rebuilt for each relation (each i), because a backend that
59
        // returned a non-empty block for relation i-1 may return an empty block for
60
        // relation i (e.g. it holds rows only from one of the two tables in a UNION ALL).
61
        // Keeping block_maps across iterations would leave stale entries from the previous
62
        // relation and miss entries for the current one, causing the
63
        // "backend_id not found in block_maps" error.
64
3
        std::unordered_map<int64_t, std::pair<Block, int>> block_maps;
65
6
        for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
66
6
            Block partial_block;
67
6
            size_t uncompressed_size = 0;
68
6
            int64_t uncompressed_time = 0;
69
6
            DCHECK(rpc_struct.response.blocks_size() > i);
70
6
            RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
71
6
                                                      &uncompressed_size, &uncompressed_time));
72
6
            if (rpc_struct.response.blocks(i).has_profile()) {
73
0
                auto response_profile =
74
0
                        RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
75
0
                _update_profile_info(backend_id, response_profile.get());
76
0
            }
77
78
6
            if (!partial_block.is_empty_column()) {
79
6
                block_maps[backend_id] = std::make_pair(std::move(partial_block), 0);
80
6
            }
81
6
        }
82
83
12
        for (int j = 0; j < block_order_results[i].size(); ++j) {
84
9
            auto backend_id = block_order_results[i][j];
85
9
            if (backend_id) {
86
6
                if (UNLIKELY(block_maps.find(backend_id) == block_maps.end())) {
87
0
                    return Status::InternalError(
88
0
                            fmt::format("MaterializationSharedState::merge_multi_response, "
89
0
                                        "backend_id {} not found in block_maps",
90
0
                                        backend_id));
91
0
                }
92
6
                auto& source_block_rows = block_maps[backend_id];
93
6
                DCHECK(source_block_rows.second < source_block_rows.first.rows());
94
12
                for (int k = 0; k < response_blocks[i].columns(); ++k) {
95
6
                    response_blocks[i].get_column_by_position(k)->insert_from(
96
6
                            *source_block_rows.first.get_by_position(k).column,
97
6
                            source_block_rows.second);
98
6
                }
99
6
                source_block_rows.second++;
100
6
            } else {
101
6
                for (int k = 0; k < response_blocks[i].columns(); ++k) {
102
3
                    response_blocks[i].get_column_by_position(k)->insert_default();
103
3
                }
104
3
            }
105
9
        }
106
3
    }
107
108
    // clear request/response
109
4
    for (auto& [_, rpc_struct] : rpc_struct_map) {
110
8
        for (int i = 0; i < rpc_struct.request.request_block_descs_size(); ++i) {
111
4
            rpc_struct.request.mutable_request_block_descs(i)->clear_row_id();
112
4
            rpc_struct.request.mutable_request_block_descs(i)->clear_file_id();
113
4
        }
114
4
    }
115
2
    return Status::OK();
116
2
}
117
118
void MaterializationSharedState::_update_profile_info(int64_t backend_id,
119
0
                                                      RuntimeProfile* response_profile) {
120
0
    if (!backend_profile_info_string.contains(backend_id)) {
121
0
        backend_profile_info_string.emplace(backend_id,
122
0
                                            std::map<std::string, fmt::memory_buffer> {});
123
0
    }
124
0
    auto& info_map = backend_profile_info_string[backend_id];
125
126
0
    auto update_profile_info_key = [&](const std::string& info_key) {
127
0
        const auto* info_value = response_profile->get_info_string(info_key);
128
0
        if (info_value == nullptr) [[unlikely]] {
129
0
            LOG(WARNING) << "Get row id fetch rpc profile success, but no info key :" << info_key;
130
0
            return;
131
0
        }
132
0
        if (!info_map.contains(info_key)) {
133
0
            info_map.emplace(info_key, fmt::memory_buffer {});
134
0
        }
135
0
        fmt::format_to(info_map[info_key], "{}, ", *info_value);
136
0
    };
137
138
0
    update_profile_info_key(RowIdStorageReader::ScannersRunningTimeProfile);
139
0
    update_profile_info_key(RowIdStorageReader::InitReaderAvgTimeProfile);
140
0
    update_profile_info_key(RowIdStorageReader::GetBlockAvgTimeProfile);
141
0
    update_profile_info_key(RowIdStorageReader::FileReadLinesProfile);
142
0
    update_profile_info_key(FileScanner::FileReadBytesProfile);
143
0
    update_profile_info_key(FileScanner::FileReadTimeProfile);
144
0
}
145
146
Status MaterializationSharedState::create_muiltget_result(const Columns& columns, bool child_eos,
147
1
                                                          bool gc_id_map) {
148
1
    const auto rows = columns.empty() ? 0 : columns[0]->size();
149
1
    block_order_results.resize(columns.size());
150
151
2
    for (int i = 0; i < columns.size(); ++i) {
152
1
        const uint8_t* null_map = nullptr;
153
1
        const ColumnString* column_rowid = nullptr;
154
1
        auto& column = columns[i];
155
156
1
        if (auto column_ptr = check_and_get_column<ColumnNullable>(*column)) {
157
0
            null_map = column_ptr->get_null_map_data().data();
158
0
            column_rowid =
159
0
                    assert_cast<const ColumnString*>(column_ptr->get_nested_column_ptr().get());
160
1
        } else {
161
1
            column_rowid = assert_cast<const ColumnString*>(column.get());
162
1
        }
163
164
1
        auto& block_order = block_order_results[i];
165
1
        block_order.resize(rows);
166
167
3
        for (int j = 0; j < rows; ++j) {
168
2
            if (!null_map || !null_map[j]) {
169
2
                DCHECK(column_rowid->get_data_at(j).size == sizeof(GlobalRowLoacationV2));
170
2
                GlobalRowLoacationV2 row_location =
171
2
                        *((GlobalRowLoacationV2*)column_rowid->get_data_at(j).data);
172
2
                auto rpc_struct = rpc_struct_map.find(row_location.backend_id);
173
2
                if (UNLIKELY(rpc_struct == rpc_struct_map.end())) {
174
0
                    return Status::InternalError(
175
0
                            "MaterializationSinkOperatorX failed to find rpc_struct, backend_id={}",
176
0
                            row_location.backend_id);
177
0
                }
178
2
                rpc_struct->second.request.mutable_request_block_descs(i)->add_row_id(
179
2
                        row_location.row_id);
180
2
                rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
181
2
                        row_location.file_id);
182
2
                block_order[j] = row_location.backend_id;
183
184
                // Count rows per backend
185
2
                _backend_rows_count[row_location.backend_id]++;
186
2
            } else {
187
0
                block_order[j] = 0;
188
0
            }
189
2
        }
190
1
    }
191
192
    // Update max rows per backend
193
2
    for (const auto& [_, row_count] : _backend_rows_count) {
194
2
        if (row_count > _max_rows_per_backend) {
195
1
            _max_rows_per_backend = row_count;
196
1
        }
197
2
    }
198
199
1
    eos = child_eos;
200
1
    if (eos && gc_id_map) {
201
2
        for (auto& [_, rpc_struct] : rpc_struct_map) {
202
2
            rpc_struct.request.set_gc_id_map(true);
203
2
        }
204
1
    }
205
1
    need_merge_block = rows > 0;
206
207
1
    return Status::OK();
208
1
}
209
210
Status MaterializationSharedState::init_multi_requests(
211
0
        const TMaterializationNode& materialization_node, RuntimeState* state) {
212
0
    rpc_struct_inited = true;
213
0
    PMultiGetRequestV2 multi_get_request;
214
    // Initialize the base struct of PMultiGetRequestV2
215
0
    multi_get_request.set_be_exec_version(state->be_exec_version());
216
0
    multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
217
0
    auto query_id = multi_get_request.mutable_query_id();
218
0
    query_id->set_hi(state->query_id().hi);
219
0
    query_id->set_lo(state->query_id().lo);
220
0
    DCHECK_EQ(materialization_node.column_descs_lists.size(),
221
0
              materialization_node.slot_locs_lists.size());
222
223
0
    const auto& tuple_desc =
224
0
            state->desc_tbl().get_tuple_descriptor(materialization_node.intermediate_tuple_id);
225
0
    const auto& slots = tuple_desc->slots();
226
0
    response_blocks = std::vector<MutableBlock>(materialization_node.column_descs_lists.size());
227
228
0
    for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) {
229
0
        auto request_block_desc = multi_get_request.add_request_block_descs();
230
0
        request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]);
231
        // Initialize the column_descs and slot_locs
232
0
        auto& column_descs = materialization_node.column_descs_lists[i];
233
0
        for (auto& column_desc_item : column_descs) {
234
0
            TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs());
235
0
        }
236
237
0
        auto& slot_locs = materialization_node.slot_locs_lists[i];
238
0
        tuple_desc->to_protobuf(request_block_desc->mutable_desc());
239
240
0
        auto& column_idxs = materialization_node.column_idxs_lists[i];
241
0
        for (auto idx : column_idxs) {
242
0
            request_block_desc->add_column_idxs(idx);
243
0
        }
244
245
0
        std::vector<SlotDescriptor*> slots_res;
246
0
        for (auto& slot_loc_item : slot_locs) {
247
0
            slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots());
248
0
            slots_res.emplace_back(slots[slot_loc_item]);
249
0
        }
250
0
        response_blocks[i] = MutableBlock(Block(slots_res, 10));
251
0
    }
252
253
    // Initialize the stubs and requests for each BE
254
0
    for (const auto& node_info : materialization_node.nodes_info.nodes) {
255
0
        auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
256
0
                node_info.host, node_info.async_internal_port);
257
0
        if (!client) {
258
0
            LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
259
0
                         << ", port=" << node_info.async_internal_port;
260
0
            return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
261
0
                                         node_info.host, node_info.async_internal_port);
262
0
        }
263
0
        rpc_struct_map.emplace(node_info.id,
264
0
                               FetchRpcStruct {.stub = std::move(client),
265
0
                                               .cntl = std::make_unique<brpc::Controller>(),
266
0
                                               .request = multi_get_request,
267
0
                                               .response = PMultiGetResponseV2()});
268
0
    }
269
270
0
    return Status::OK();
271
0
}
272
273
0
Status MaterializationOperator::init(const doris::TPlanNode& tnode, doris::RuntimeState* state) {
274
0
    RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
275
0
    DCHECK(tnode.__isset.materialization_node);
276
0
    _materialization_node = tnode.materialization_node;
277
0
    _gc_id_map = tnode.materialization_node.gc_id_map;
278
    // Create result_expr_ctx_lists_ from thrift exprs.
279
0
    auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
280
0
    RETURN_IF_ERROR(VExpr::create_expr_trees(fetch_expr_lists, _rowid_exprs));
281
0
    return Status::OK();
282
0
}
283
284
0
Status MaterializationOperator::prepare(RuntimeState* state) {
285
0
    RETURN_IF_ERROR(Base::prepare(state));
286
0
    RETURN_IF_ERROR(VExpr::prepare(_rowid_exprs, state, _child->row_desc()));
287
0
    RETURN_IF_ERROR(VExpr::open(_rowid_exprs, state));
288
0
    return Status::OK();
289
0
}
290
291
0
bool MaterializationOperator::need_more_input_data(RuntimeState* state) const {
292
0
    auto& local_state = get_local_state(state);
293
0
    return !local_state._materialization_state.origin_block.rows() &&
294
0
           !local_state._materialization_state.eos;
295
0
}
296
297
0
Status MaterializationOperator::pull(RuntimeState* state, Block* output_block, bool* eos) const {
298
0
    auto& local_state = get_local_state(state);
299
0
    output_block->clear();
300
0
    if (local_state._materialization_state.need_merge_block) {
301
0
        local_state._materialization_state.get_block(output_block);
302
0
    }
303
0
    *eos = local_state._materialization_state.eos;
304
305
0
    if (*eos) {
306
0
        for (const auto& [backend_id, child_info] :
307
0
             local_state._materialization_state.backend_profile_info_string) {
308
0
            auto child_profile = local_state.operator_profile()->create_child(
309
0
                    "RowIDFetcher: BackendId:" + std::to_string(backend_id));
310
0
            for (const auto& [info_key, info_value] :
311
0
                 local_state._materialization_state.backend_profile_info_string[backend_id]) {
312
0
                child_profile->add_info_string(info_key, "{" + fmt::to_string(info_value) + "}");
313
0
            }
314
0
            local_state.operator_profile()->add_child(child_profile, true);
315
0
        }
316
0
    }
317
318
0
    return Status::OK();
319
0
}
320
321
0
Status MaterializationOperator::push(RuntimeState* state, Block* in_block, bool eos) const {
322
0
    auto& local_state = get_local_state(state);
323
0
    SCOPED_TIMER(local_state.exec_time_counter());
324
0
    if (!local_state._materialization_state.rpc_struct_inited) {
325
0
        RETURN_IF_ERROR(local_state._materialization_state.init_multi_requests(
326
0
                _materialization_node, state));
327
0
    }
328
329
0
    if (in_block->rows() > 0 || eos) {
330
        // execute the rowid exprs
331
0
        Columns columns;
332
0
        if (in_block->rows() != 0) {
333
0
            local_state._materialization_state.rowid_locs.resize(_rowid_exprs.size());
334
0
            for (int i = 0; i < _rowid_exprs.size(); ++i) {
335
0
                auto& rowid_expr = _rowid_exprs[i];
336
0
                RETURN_IF_ERROR(rowid_expr->execute(
337
0
                        in_block, &local_state._materialization_state.rowid_locs[i]));
338
0
                columns.emplace_back(
339
0
                        in_block->get_by_position(local_state._materialization_state.rowid_locs[i])
340
0
                                .column);
341
0
            }
342
0
            local_state._materialization_state.origin_block.swap(*in_block);
343
0
        }
344
0
        RETURN_IF_ERROR(local_state._materialization_state.create_muiltget_result(columns, eos,
345
0
                                                                                  _gc_id_map));
346
347
0
        auto size = local_state._materialization_state.rpc_struct_map.size();
348
0
        bthread::CountdownEvent counter(static_cast<int>(size));
349
0
        MonotonicStopWatch rpc_timer(true);
350
0
        for (auto& [backend_id, rpc_struct] : local_state._materialization_state.rpc_struct_map) {
351
0
            auto callback = brpc::NewCallback(fetch_callback, &counter);
352
0
            rpc_struct.cntl->set_timeout_ms(state->execution_timeout() * 1000);
353
            // send brpc request
354
0
            rpc_struct.stub->multiget_data_v2(rpc_struct.cntl.get(), &rpc_struct.request,
355
0
                                              &rpc_struct.response, callback);
356
0
        }
357
0
        counter.wait();
358
0
        if (auto time = rpc_timer.elapsed_time(); time > local_state._max_rpc_timer->value()) {
359
0
            local_state._max_rpc_timer->set(static_cast<int64_t>(time));
360
0
        }
361
362
0
        for (auto& [backend_id, rpc_struct] : local_state._materialization_state.rpc_struct_map) {
363
0
            if (rpc_struct.cntl->Failed()) {
364
0
                std::string error_text =
365
0
                        "Failed to send brpc request, error_text=" + rpc_struct.cntl->ErrorText() +
366
0
                        " Materialization Sink node id:" + std::to_string(node_id()) +
367
0
                        " target_backend_id:" + std::to_string(backend_id);
368
0
                return Status::InternalError(error_text);
369
0
            }
370
0
            rpc_struct.cntl->Reset();
371
0
        }
372
373
0
        if (local_state._materialization_state.need_merge_block) {
374
0
            SCOPED_TIMER(local_state._merge_response_timer);
375
0
            RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
376
0
            local_state._max_rows_per_backend_counter->set(
377
0
                    (int64_t)local_state._materialization_state._max_rows_per_backend);
378
0
        }
379
0
    }
380
381
0
    return Status::OK();
382
0
}
383
384
} // namespace doris