Coverage Report

Created: 2026-04-16 16:37

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/materialization_opertor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include "common/status.h"
23
#include "exec/operator/operator.h"
24
25
namespace doris {
26
class RuntimeState;
27
28
class MaterializationOperator;
29
30
struct FetchRpcStruct {
31
    std::shared_ptr<PBackendService_Stub> stub;
32
    std::unique_ptr<brpc::Controller> cntl;
33
    PMultiGetRequestV2 request;
34
    PMultiGetResponseV2 response;
35
};
36
37
struct MaterializationSharedState {
38
public:
39
5
    MaterializationSharedState() = default;
40
41
    Status init_multi_requests(const TMaterializationNode& tnode, RuntimeState* state);
42
    Status create_muiltget_result(const Columns& columns, bool eos);
43
44
    Status merge_multi_response();
45
    void get_block(Block* block);
46
47
private:
48
    void _update_profile_info(int64_t backend_id, RuntimeProfile* response_profile);
49
50
public:
51
    bool rpc_struct_inited = false;
52
53
    bool eos = false;
54
    // empty materialization sink block not need to merge block
55
    bool need_merge_block = true;
56
    Block origin_block;
57
    // The rowid column of the origin block. should be replaced by the column of the result block.
58
    std::vector<int> rowid_locs;
59
    std::vector<MutableBlock> response_blocks;
60
    std::map<int64_t, FetchRpcStruct> rpc_struct_map;
61
    // Register each line in which block to ensure the order of the result.
62
    // Zero means NULL value.
63
    std::vector<std::vector<int64_t>> block_order_results;
64
    // backend id => <rpc profile info string key, rpc profile info string value>.
65
    std::map<int64_t, std::map<std::string, fmt::memory_buffer>> backend_profile_info_string;
66
67
    // Store the maximum number of rows processed by a single backend in the current batch
68
    uint32_t _max_rows_per_backend = 0;
69
    // Store the number of rows processed by each backend
70
    std::unordered_map<int64_t, uint32_t> _backend_rows_count; // backend_id => rows_count
71
};
72
73
class MaterializationLocalState final : public PipelineXLocalState<FakeSharedState> {
74
public:
75
    using Parent = MaterializationOperator;
76
    using Base = PipelineXLocalState<FakeSharedState>;
77
78
    ENABLE_FACTORY_CREATOR(MaterializationLocalState);
79
0
    MaterializationLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {};
80
81
0
    Status init(RuntimeState* state, LocalStateInfo& info) override {
82
0
        RETURN_IF_ERROR(Base::init(state, info));
83
0
        _max_rpc_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MaxRpcTime", 2);
84
0
        _merge_response_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MergeResponseTime", 2);
85
0
        _max_rows_per_backend_counter =
86
0
                ADD_COUNTER_WITH_LEVEL(custom_profile(), "MaxRowsPerBackend", TUnit::UNIT, 2);
87
0
        return Status::OK();
88
0
    }
89
90
private:
91
    friend class MaterializationOperator;
92
    template <typename LocalStateType>
93
    friend class StatefulOperatorX;
94
95
    std::unique_ptr<Block> _child_block = Block::create_unique();
96
    bool _child_eos = false;
97
    MaterializationSharedState _materialization_state;
98
    RuntimeProfile::Counter* _max_rpc_timer = nullptr;
99
    RuntimeProfile::Counter* _merge_response_timer = nullptr;
100
    RuntimeProfile::Counter* _max_rows_per_backend_counter = nullptr;
101
};
102
103
class MaterializationOperator final : public StatefulOperatorX<MaterializationLocalState> {
104
public:
105
    using Base = StatefulOperatorX<MaterializationLocalState>;
106
    MaterializationOperator(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
107
                            const DescriptorTbl& descs)
108
0
            : Base(pool, tnode, operator_id, descs) {}
109
110
    Status init(const TPlanNode& tnode, RuntimeState* state) override;
111
112
    Status prepare(RuntimeState* state) override;
113
114
0
    bool is_blockable(RuntimeState* state) const override { return true; }
115
    bool need_more_input_data(RuntimeState* state) const override;
116
    Status pull(RuntimeState* state, Block* output_block, bool* eos) const override;
117
    Status push(RuntimeState* state, Block* input_block, bool eos) const override;
118
119
private:
120
    friend class MaterializationLocalState;
121
122
    // Materialized slot by this node. The i-th result expr list refers to a slot of RowId
123
    TMaterializationNode _materialization_node;
124
    VExprContextSPtrs _rowid_exprs;
125
};
126
127
} // namespace doris