Coverage Report

Created: 2026-03-27 12:09

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/materialization_opertor.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/materialization_opertor.h"
19
20
#include <bthread/countdown_event.h>
21
#include <fmt/format.h>
22
#include <gen_cpp/internal_service.pb.h>
23
24
#include <utility>
25
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/column/column.h"
29
#include "exec/operator/operator.h"
30
#include "exec/rowid_fetcher.h"
31
#include "exec/scan/file_scanner.h"
32
#include "util/brpc_client_cache.h"
33
#include "util/brpc_closure.h"
34
35
namespace doris {
36
37
3
void MaterializationSharedState::get_block(Block* block) {
38
12
    for (int i = 0, j = 0, rowid_to_block_loc = rowid_locs[j]; i < origin_block.columns(); i++) {
39
9
        if (i != rowid_to_block_loc) {
40
4
            block->insert(origin_block.get_by_position(i));
41
5
        } else {
42
5
            auto response_block = response_blocks[j].to_block();
43
10
            for (int k = 0; k < response_block.columns(); k++) {
44
5
                auto& data = response_block.get_by_position(k);
45
5
                response_blocks[j].mutable_columns()[k] = data.column->clone_empty();
46
5
                block->insert(data);
47
5
            }
48
5
            if (++j < rowid_locs.size()) {
49
2
                rowid_to_block_loc = rowid_locs[j];
50
2
            }
51
5
        }
52
9
    }
53
3
    origin_block.clear();
54
3
}
55
56
// Merges RPC responses from multiple BEs into `response_blocks` in the original row order.
57
//
58
// After parallel multiget_data_v2 RPCs complete, each BE's response contains a partial block
59
// with only the rows that BE owns (ordered by file_id/row_id). This function reassembles them
60
// into the correct TopN output order using `block_order_results` as the ordering guide.
61
//
62
// Data flow:
63
//   rpc_struct_map[backend_id].response  (per-BE partial blocks, unordered across BEs)
64
//       + block_order_results[i][j]      (maps each output row → its source backend_id)
65
//       → response_blocks[i]             (final merged result in original TopN row order)
66
4
Status MaterializationSharedState::merge_multi_response() {
67
    // Outer loop: iterate over each relation (i.e., each rowid column / table).
68
    // A query with lazy materialization on 2 tables would have block_order_results.size() == 2,
69
    // each with its own set of response_blocks and RPC request_block_descs.
70
9
    for (int i = 0; i < block_order_results.size(); ++i) {
71
        // Maps backend_id → (deserialized block from that BE, row cursor into the block).
72
        // The cursor tracks how many rows we've consumed from this BE's block so far,
73
        // since the rows in the partial block are in the same order as the row_ids we sent.
74
75
        // block_maps must be rebuilt for each relation (each i), because a backend that
76
        // returned a non-empty block for relation i-1 may return an empty block for
77
        // relation i (e.g. it holds rows only from one of the two tables in a UNION ALL).
78
        // Keeping block_maps across iterations would leave stale entries from the previous
79
        // relation and miss entries for the current one, causing the
80
        // "backend_id not found in block_maps" error.
81
6
        std::unordered_map<int64_t, std::pair<Block, int>> block_maps;
82
83
        // Phase 1: Deserialize the i-th response block from every BE into block_maps.
84
        // Each BE's response.blocks(i) corresponds to the i-th relation's fetched columns.
85
12
        for (auto& [backend_id, rpc_struct] : rpc_struct_map) {
86
12
            Block partial_block;
87
12
            size_t uncompressed_size = 0;
88
12
            int64_t uncompressed_time = 0;
89
12
            DCHECK(rpc_struct.response.blocks_size() > i);
90
12
            RETURN_IF_ERROR(partial_block.deserialize(rpc_struct.response.blocks(i).block(),
91
12
                                                      &uncompressed_size, &uncompressed_time));
92
            // Check multiget result rows matches request row id count.
93
            // 1. A BE may return an empty block event if
94
            // request.request_block_descs(i).row_id_size() != 0:
95
            // If the id_file_map was GC'd on the BE before it could process the request,
96
            // refer 'if (!id_file_map)' in RowIdStorageReader::read_by_rowids.
97
            // 2. Report error in any case where the row count doesn't match, even if it's not empty,
98
            //    since that indicates a bug in BE's row fetching logic or serialization logic.
99
12
            if (rpc_struct.request.request_block_descs(i).row_id_size() != partial_block.rows()) {
100
1
                return Status::InternalError(
101
1
                        fmt::format("merge_multi_response, "
102
1
                                    "backend_id {} returned block with row count {} not match "
103
1
                                    "request row id count {}",
104
1
                                    backend_id, partial_block.rows(),
105
1
                                    rpc_struct.request.request_block_descs(i).row_id_size()));
106
1
            }
107
11
            if (rpc_struct.response.blocks(i).has_profile()) {
108
0
                auto response_profile =
109
0
                        RuntimeProfile::from_proto(rpc_struct.response.blocks(i).profile());
110
0
                _update_profile_info(backend_id, response_profile.get());
111
0
            }
112
113
            // Only insert non-empty blocks. A BE may return an empty block if
114
            // request.request_block_descs(i).row_id_size() is 0
115
11
            if (!partial_block.is_empty_column()) {
116
                // Reset row cursor to 0 — we'll consume rows from this block sequentially.
117
9
                block_maps[backend_id] = std::make_pair(std::move(partial_block), 0);
118
9
            }
119
11
        }
120
121
        // return error if any column in response block is not compatible with source block column
122
10
        for (int k = 0; k < response_blocks[i].columns(); ++k) {
123
5
            const auto& resp_col_type = response_blocks[i].get_datatype_by_position(k);
124
8
            for (const auto& [_, source_block_rows] : block_maps) {
125
8
                RETURN_IF_ERROR(resp_col_type->check_column(
126
8
                        *source_block_rows.first.get_by_position(k).column));
127
8
            }
128
5
        }
129
        // Phase 2: Walk the original row order and copy each row from the correct BE's block
130
        // into response_blocks[i]. block_order_results[i][j] tells us which backend_id owns
131
        // row j. A value of 0 means the rowid was NULL (e.g., from an outer join).
132
16
        for (int j = 0; j < block_order_results[i].size(); ++j) {
133
11
            auto backend_id = block_order_results[i][j];
134
            // Non-null rowid: copy the next row from this BE's partial block.
135
11
            if (backend_id) {
136
8
                if (UNLIKELY(block_maps.find(backend_id) == block_maps.end())) {
137
0
                    return Status::InternalError(
138
0
                            fmt::format("MaterializationSharedState::merge_multi_response, "
139
0
                                        "backend_id {} not found in block_maps",
140
0
                                        backend_id));
141
0
                }
142
                // source_block_rows.first  = the deserialized Block from this BE
143
                // source_block_rows.second = current row cursor (how many rows consumed so far)
144
8
                auto& source_block_rows = block_maps[backend_id];
145
8
                DCHECK(source_block_rows.second < source_block_rows.first.rows());
146
                // Copy column-by-column from the source block's current row into response_blocks.
147
16
                for (int k = 0; k < response_blocks[i].columns(); ++k) {
148
8
                    response_blocks[i].get_column_by_position(k)->insert_from(
149
8
                            *source_block_rows.first.get_by_position(k).column,
150
8
                            source_block_rows.second);
151
8
                }
152
                // Advance the cursor — next time we see this backend_id, we take the next row.
153
8
                source_block_rows.second++;
154
8
            } else {
155
6
                for (int k = 0; k < response_blocks[i].columns(); ++k) {
156
3
                    response_blocks[i].get_column_by_position(k)->insert_default();
157
3
                }
158
3
            }
159
11
        }
160
5
    }
161
162
    // clear request/response
163
    // Phase 3: Clear the row_id and file_id arrays in each RPC request to prepare for the
164
    // next batch. The request template (column_descs, slots, etc.) is reused across batches;
165
    // only the per-row data (file_id, row_id) needs to be cleared.
166
6
    for (auto& [_, rpc_struct] : rpc_struct_map) {
167
16
        for (int i = 0; i < rpc_struct.request.request_block_descs_size(); ++i) {
168
10
            rpc_struct.request.mutable_request_block_descs(i)->clear_row_id();
169
10
            rpc_struct.request.mutable_request_block_descs(i)->clear_file_id();
170
10
        }
171
6
    }
172
3
    return Status::OK();
173
4
}
174
175
void MaterializationSharedState::_update_profile_info(int64_t backend_id,
176
0
                                                      RuntimeProfile* response_profile) {
177
0
    if (!backend_profile_info_string.contains(backend_id)) {
178
0
        backend_profile_info_string.emplace(backend_id,
179
0
                                            std::map<std::string, fmt::memory_buffer> {});
180
0
    }
181
0
    auto& info_map = backend_profile_info_string[backend_id];
182
183
0
    auto update_profile_info_key = [&](const std::string& info_key) {
184
0
        const auto* info_value = response_profile->get_info_string(info_key);
185
0
        if (info_value == nullptr) [[unlikely]] {
186
0
            LOG(WARNING) << "Get row id fetch rpc profile success, but no info key :" << info_key;
187
0
            return;
188
0
        }
189
0
        if (!info_map.contains(info_key)) {
190
0
            info_map.emplace(info_key, fmt::memory_buffer {});
191
0
        }
192
0
        fmt::format_to(info_map[info_key], "{}, ", *info_value);
193
0
    };
194
195
0
    update_profile_info_key(RowIdStorageReader::ScannersRunningTimeProfile);
196
0
    update_profile_info_key(RowIdStorageReader::InitReaderAvgTimeProfile);
197
0
    update_profile_info_key(RowIdStorageReader::GetBlockAvgTimeProfile);
198
0
    update_profile_info_key(RowIdStorageReader::FileReadLinesProfile);
199
0
    update_profile_info_key(FileScanner::FileReadBytesProfile);
200
0
    update_profile_info_key(FileScanner::FileReadTimeProfile);
201
0
}
202
203
Status MaterializationSharedState::create_muiltget_result(const Columns& columns, bool child_eos,
204
1
                                                          bool gc_id_map) {
205
1
    const auto rows = columns.empty() ? 0 : columns[0]->size();
206
1
    block_order_results.resize(columns.size());
207
208
2
    for (int i = 0; i < columns.size(); ++i) {
209
1
        const uint8_t* null_map = nullptr;
210
1
        const ColumnString* column_rowid = nullptr;
211
1
        const auto& column = columns[i];
212
213
1
        if (const auto* const column_ptr = check_and_get_column<ColumnNullable>(*column)) {
214
0
            null_map = column_ptr->get_null_map_data().data();
215
0
            column_rowid =
216
0
                    assert_cast<const ColumnString*>(column_ptr->get_nested_column_ptr().get());
217
1
        } else {
218
1
            column_rowid = assert_cast<const ColumnString*>(column.get());
219
1
        }
220
221
1
        auto& block_order = block_order_results[i];
222
1
        block_order.resize(rows);
223
224
3
        for (int j = 0; j < rows; ++j) {
225
2
            if (!null_map || !null_map[j]) {
226
2
                DCHECK(column_rowid->get_data_at(j).size == sizeof(GlobalRowLoacationV2));
227
2
                GlobalRowLoacationV2 row_location =
228
2
                        *((GlobalRowLoacationV2*)column_rowid->get_data_at(j).data);
229
2
                auto rpc_struct = rpc_struct_map.find(row_location.backend_id);
230
2
                if (UNLIKELY(rpc_struct == rpc_struct_map.end())) {
231
0
                    return Status::InternalError(
232
0
                            "MaterializationSinkOperatorX failed to find rpc_struct, backend_id={}",
233
0
                            row_location.backend_id);
234
0
                }
235
2
                rpc_struct->second.request.mutable_request_block_descs(i)->add_row_id(
236
2
                        row_location.row_id);
237
2
                rpc_struct->second.request.mutable_request_block_descs(i)->add_file_id(
238
2
                        row_location.file_id);
239
2
                block_order[j] = row_location.backend_id;
240
241
                // Count rows per backend
242
2
                _backend_rows_count[row_location.backend_id]++;
243
2
            } else {
244
0
                block_order[j] = 0;
245
0
            }
246
2
        }
247
1
    }
248
249
    // Update max rows per backend
250
2
    for (const auto& [_, row_count] : _backend_rows_count) {
251
2
        if (row_count > _max_rows_per_backend) {
252
1
            _max_rows_per_backend = row_count;
253
1
        }
254
2
    }
255
256
1
    eos = child_eos;
257
1
    if (eos && gc_id_map) {
258
2
        for (auto& [_, rpc_struct] : rpc_struct_map) {
259
2
            rpc_struct.request.set_gc_id_map(true);
260
2
        }
261
1
    }
262
1
    need_merge_block = rows > 0;
263
264
1
    return Status::OK();
265
1
}
266
267
Status MaterializationSharedState::init_multi_requests(
268
0
        const TMaterializationNode& materialization_node, RuntimeState* state) {
269
0
    rpc_struct_inited = true;
270
0
    PMultiGetRequestV2 multi_get_request;
271
    // Initialize the base struct of PMultiGetRequestV2
272
0
    multi_get_request.set_be_exec_version(state->be_exec_version());
273
0
    multi_get_request.set_wg_id(state->get_query_ctx()->workload_group()->id());
274
0
    auto* query_id = multi_get_request.mutable_query_id();
275
0
    query_id->set_hi(state->query_id().hi);
276
0
    query_id->set_lo(state->query_id().lo);
277
0
    DCHECK_EQ(materialization_node.column_descs_lists.size(),
278
0
              materialization_node.slot_locs_lists.size());
279
280
0
    const auto& tuple_desc =
281
0
            state->desc_tbl().get_tuple_descriptor(materialization_node.intermediate_tuple_id);
282
0
    const auto& slots = tuple_desc->slots();
283
0
    response_blocks = std::vector<MutableBlock>(materialization_node.column_descs_lists.size());
284
285
0
    for (int i = 0; i < materialization_node.column_descs_lists.size(); ++i) {
286
0
        auto* request_block_desc = multi_get_request.add_request_block_descs();
287
0
        request_block_desc->set_fetch_row_store(materialization_node.fetch_row_stores[i]);
288
        // Initialize the column_descs and slot_locs
289
0
        const auto& column_descs = materialization_node.column_descs_lists[i];
290
0
        for (const auto& column_desc_item : column_descs) {
291
0
            TabletColumn(column_desc_item).to_schema_pb(request_block_desc->add_column_descs());
292
0
        }
293
294
0
        const auto& slot_locs = materialization_node.slot_locs_lists[i];
295
0
        tuple_desc->to_protobuf(request_block_desc->mutable_desc());
296
297
0
        const auto& column_idxs = materialization_node.column_idxs_lists[i];
298
0
        for (auto idx : column_idxs) {
299
0
            request_block_desc->add_column_idxs(idx);
300
0
        }
301
302
0
        std::vector<SlotDescriptor*> slots_res;
303
0
        for (const auto& slot_loc_item : slot_locs) {
304
0
            slots[slot_loc_item]->to_protobuf(request_block_desc->add_slots());
305
0
            slots_res.emplace_back(slots[slot_loc_item]);
306
0
        }
307
0
        response_blocks[i] = MutableBlock(Block(slots_res, 10));
308
0
    }
309
310
    // Initialize the stubs and requests for each BE
311
0
    for (const auto& node_info : materialization_node.nodes_info.nodes) {
312
0
        auto client = ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(
313
0
                node_info.host, node_info.async_internal_port);
314
0
        if (!client) {
315
0
            LOG(WARNING) << "Get rpc stub failed, host=" << node_info.host
316
0
                         << ", port=" << node_info.async_internal_port;
317
0
            return Status::InternalError("RowIDFetcher failed to init rpc client, host={}, port={}",
318
0
                                         node_info.host, node_info.async_internal_port);
319
0
        }
320
0
        rpc_struct_map.emplace(node_info.id,
321
0
                               FetchRpcStruct {.stub = std::move(client),
322
0
                                               .cntl = std::make_unique<brpc::Controller>(),
323
0
                                               .request = multi_get_request,
324
0
                                               .response = PMultiGetResponseV2()});
325
0
    }
326
327
0
    return Status::OK();
328
0
}
329
330
0
Status MaterializationOperator::init(const doris::TPlanNode& tnode, doris::RuntimeState* state) {
331
0
    RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
332
0
    DCHECK(tnode.__isset.materialization_node);
333
0
    _materialization_node = tnode.materialization_node;
334
0
    _gc_id_map = tnode.materialization_node.gc_id_map;
335
    // Create result_expr_ctx_lists_ from thrift exprs.
336
0
    const auto& fetch_expr_lists = tnode.materialization_node.fetch_expr_lists;
337
0
    RETURN_IF_ERROR(VExpr::create_expr_trees(fetch_expr_lists, _rowid_exprs));
338
0
    return Status::OK();
339
0
}
340
341
0
Status MaterializationOperator::prepare(RuntimeState* state) {
342
0
    RETURN_IF_ERROR(Base::prepare(state));
343
0
    RETURN_IF_ERROR(VExpr::prepare(_rowid_exprs, state, _child->row_desc()));
344
0
    RETURN_IF_ERROR(VExpr::open(_rowid_exprs, state));
345
0
    return Status::OK();
346
0
}
347
348
0
bool MaterializationOperator::need_more_input_data(RuntimeState* state) const {
349
0
    auto& local_state = get_local_state(state);
350
0
    return !local_state._materialization_state.origin_block.rows() &&
351
0
           !local_state._materialization_state.eos;
352
0
}
353
354
0
Status MaterializationOperator::pull(RuntimeState* state, Block* output_block, bool* eos) const {
355
0
    auto& local_state = get_local_state(state);
356
0
    output_block->clear();
357
0
    if (local_state._materialization_state.need_merge_block) {
358
0
        local_state._materialization_state.get_block(output_block);
359
0
    }
360
0
    *eos = local_state._materialization_state.eos;
361
362
0
    if (*eos) {
363
0
        for (const auto& [backend_id, child_info] :
364
0
             local_state._materialization_state.backend_profile_info_string) {
365
0
            auto* child_profile = local_state.operator_profile()->create_child(
366
0
                    "RowIDFetcher: BackendId:" + std::to_string(backend_id));
367
0
            for (const auto& [info_key, info_value] :
368
0
                 local_state._materialization_state.backend_profile_info_string[backend_id]) {
369
0
                child_profile->add_info_string(info_key, "{" + fmt::to_string(info_value) + "}");
370
0
            }
371
0
            local_state.operator_profile()->add_child(child_profile, true);
372
0
        }
373
0
    }
374
375
0
    return Status::OK();
376
0
}
377
378
0
Status MaterializationOperator::push(RuntimeState* state, Block* in_block, bool eos) const {
379
0
    auto& local_state = get_local_state(state);
380
0
    SCOPED_TIMER(local_state.exec_time_counter());
381
0
    if (!local_state._materialization_state.rpc_struct_inited) {
382
0
        RETURN_IF_ERROR(local_state._materialization_state.init_multi_requests(
383
0
                _materialization_node, state));
384
0
    }
385
386
0
    if (in_block->rows() > 0 || eos) {
387
        // execute the rowid exprs
388
0
        Columns columns;
389
0
        if (in_block->rows() != 0) {
390
0
            local_state._materialization_state.rowid_locs.resize(_rowid_exprs.size());
391
0
            for (int i = 0; i < _rowid_exprs.size(); ++i) {
392
0
                const auto& rowid_expr = _rowid_exprs[i];
393
0
                RETURN_IF_ERROR(rowid_expr->execute(
394
0
                        in_block, &local_state._materialization_state.rowid_locs[i]));
395
0
                columns.emplace_back(
396
0
                        in_block->get_by_position(local_state._materialization_state.rowid_locs[i])
397
0
                                .column);
398
0
            }
399
0
            local_state._materialization_state.origin_block.swap(*in_block);
400
0
        }
401
0
        RETURN_IF_ERROR(local_state._materialization_state.create_muiltget_result(columns, eos,
402
0
                                                                                  _gc_id_map));
403
404
0
        auto size = local_state._materialization_state.rpc_struct_map.size();
405
0
        bthread::CountdownEvent counter(static_cast<int>(size));
406
0
        MonotonicStopWatch rpc_timer(true);
407
0
        for (auto& [backend_id, rpc_struct] : local_state._materialization_state.rpc_struct_map) {
408
0
            auto* callback = brpc::NewCallback(fetch_callback, &counter);
409
0
            rpc_struct.cntl->set_timeout_ms(state->execution_timeout() * 1000);
410
            // send brpc request
411
0
            rpc_struct.stub->multiget_data_v2(rpc_struct.cntl.get(), &rpc_struct.request,
412
0
                                              &rpc_struct.response, callback);
413
0
        }
414
0
        counter.wait();
415
0
        if (auto time = rpc_timer.elapsed_time(); time > local_state._max_rpc_timer->value()) {
416
0
            local_state._max_rpc_timer->set(static_cast<int64_t>(time));
417
0
        }
418
419
0
        for (auto& [backend_id, rpc_struct] : local_state._materialization_state.rpc_struct_map) {
420
0
            if (rpc_struct.cntl->Failed()) {
421
0
                std::string error_text =
422
0
                        "Failed to send brpc request, error_text=" + rpc_struct.cntl->ErrorText() +
423
0
                        " Materialization Sink node id:" + std::to_string(node_id()) +
424
0
                        " target_backend_id:" + std::to_string(backend_id);
425
0
                return Status::InternalError(error_text);
426
0
            }
427
0
            rpc_struct.cntl->Reset();
428
0
        }
429
430
0
        if (local_state._materialization_state.need_merge_block) {
431
0
            SCOPED_TIMER(local_state._merge_response_timer);
432
0
            RETURN_IF_ERROR(local_state._materialization_state.merge_multi_response());
433
0
            local_state._max_rows_per_backend_counter->set(
434
0
                    (int64_t)local_state._materialization_state._max_rows_per_backend);
435
0
        }
436
0
    }
437
438
0
    return Status::OK();
439
0
}
440
441
} // namespace doris