Coverage Report

Created: 2026-03-14 20:54

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/materialization_opertor.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <stdint.h>
21
22
#include "common/status.h"
23
#include "exec/operator/operator.h"
24
25
namespace doris {
26
#include "common/compile_check_begin.h"
27
class RuntimeState;
28
29
class MaterializationOperator;
30
31
struct FetchRpcStruct {
32
    std::shared_ptr<PBackendService_Stub> stub;
33
    std::unique_ptr<brpc::Controller> cntl;
34
    PMultiGetRequestV2 request;
35
    PMultiGetResponseV2 response;
36
};
37
38
struct MaterializationSharedState {
39
public:
40
3
    MaterializationSharedState() = default;
41
42
    Status init_multi_requests(const TMaterializationNode& tnode, RuntimeState* state);
43
    Status create_muiltget_result(const Columns& columns, bool eos, bool gc_id_map);
44
45
    Status merge_multi_response();
46
    void get_block(Block* block);
47
48
private:
49
    void _update_profile_info(int64_t backend_id, RuntimeProfile* response_profile);
50
51
public:
52
    bool rpc_struct_inited = false;
53
54
    bool eos = false;
55
    // empty materialization sink block not need to merge block
56
    bool need_merge_block = true;
57
    Block origin_block;
58
    // The rowid column of the origin block. should be replaced by the column of the result block.
59
    std::vector<int> rowid_locs;
60
    std::vector<MutableBlock> response_blocks;
61
    std::map<int64_t, FetchRpcStruct> rpc_struct_map;
62
    // Register each line in which block to ensure the order of the result.
63
    // Zero means NULL value.
64
    std::vector<std::vector<int64_t>> block_order_results;
65
    // backend id => <rpc profile info string key, rpc profile info string value>.
66
    std::map<int64_t, std::map<std::string, fmt::memory_buffer>> backend_profile_info_string;
67
68
    // Store the maximum number of rows processed by a single backend in the current batch
69
    uint32_t _max_rows_per_backend = 0;
70
    // Store the number of rows processed by each backend
71
    std::unordered_map<int64_t, uint32_t> _backend_rows_count; // backend_id => rows_count
72
};
73
74
class MaterializationLocalState final : public PipelineXLocalState<FakeSharedState> {
75
public:
76
    using Parent = MaterializationOperator;
77
    using Base = PipelineXLocalState<FakeSharedState>;
78
79
    ENABLE_FACTORY_CREATOR(MaterializationLocalState);
80
0
    MaterializationLocalState(RuntimeState* state, OperatorXBase* parent) : Base(state, parent) {};
81
82
0
    Status init(RuntimeState* state, LocalStateInfo& info) override {
83
0
        RETURN_IF_ERROR(Base::init(state, info));
84
0
        _max_rpc_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MaxRpcTime", 2);
85
0
        _merge_response_timer = ADD_TIMER_WITH_LEVEL(custom_profile(), "MergeResponseTime", 2);
86
0
        _max_rows_per_backend_counter =
87
0
                ADD_COUNTER_WITH_LEVEL(custom_profile(), "MaxRowsPerBackend", TUnit::UNIT, 2);
88
0
        return Status::OK();
89
0
    }
90
91
private:
92
    friend class MaterializationOperator;
93
    template <typename LocalStateType>
94
    friend class StatefulOperatorX;
95
96
    std::unique_ptr<Block> _child_block = Block::create_unique();
97
    bool _child_eos = false;
98
    MaterializationSharedState _materialization_state;
99
    RuntimeProfile::Counter* _max_rpc_timer = nullptr;
100
    RuntimeProfile::Counter* _merge_response_timer = nullptr;
101
    RuntimeProfile::Counter* _max_rows_per_backend_counter = nullptr;
102
};
103
104
class MaterializationOperator final : public StatefulOperatorX<MaterializationLocalState> {
105
public:
106
    using Base = StatefulOperatorX<MaterializationLocalState>;
107
    MaterializationOperator(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
108
                            const DescriptorTbl& descs)
109
0
            : Base(pool, tnode, operator_id, descs) {}
110
111
    Status init(const TPlanNode& tnode, RuntimeState* state) override;
112
113
    Status prepare(RuntimeState* state) override;
114
115
0
    bool is_blockable(RuntimeState* state) const override { return true; }
116
    bool need_more_input_data(RuntimeState* state) const override;
117
    Status pull(RuntimeState* state, Block* output_block, bool* eos) const override;
118
    Status push(RuntimeState* state, Block* input_block, bool eos) const override;
119
120
private:
121
    friend class MaterializationLocalState;
122
123
    // Materialized slot by this node. The i-th result expr list refers to a slot of RowId
124
    TMaterializationNode _materialization_node;
125
    VExprContextSPtrs _rowid_exprs;
126
    bool _gc_id_map = false;
127
};
128
129
#include "common/compile_check_end.h"
130
} // namespace doris