Coverage Report

Created: 2026-03-17 19:14

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/operator/repeat_operator.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/operator/repeat_operator.h"
19
20
#include <memory>
21
22
#include "common/logging.h"
23
#include "core/assert_cast.h"
24
#include "core/block/block.h"
25
#include "core/block/column_with_type_and_name.h"
26
#include "exec/operator/operator.h"
27
28
namespace doris {
29
#include "common/compile_check_begin.h"
30
class RuntimeState;
31
} // namespace doris
32
33
namespace doris {
34
35
RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent)
36
3
        : Base(state, parent), _child_block(Block::create_unique()), _repeat_id_idx(0) {}
37
38
3
Status RepeatLocalState::open(RuntimeState* state) {
39
3
    SCOPED_TIMER(exec_time_counter());
40
3
    SCOPED_TIMER(_open_timer);
41
3
    RETURN_IF_ERROR(Base::open(state));
42
3
    auto& p = _parent->cast<Parent>();
43
3
    _expr_ctxs.resize(p._expr_ctxs.size());
44
8
    for (size_t i = 0; i < _expr_ctxs.size(); i++) {
45
5
        RETURN_IF_ERROR(p._expr_ctxs[i]->clone(state, _expr_ctxs[i]));
46
5
    }
47
3
    return Status::OK();
48
3
}
49
50
3
Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) {
51
3
    RETURN_IF_ERROR(Base::init(state, info));
52
3
    SCOPED_TIMER(exec_time_counter());
53
3
    SCOPED_TIMER(_init_timer);
54
3
    _evaluate_input_timer = ADD_TIMER(custom_profile(), "EvaluateInputDataTime");
55
3
    _get_repeat_data_timer = ADD_TIMER(custom_profile(), "GetRepeatDataTime");
56
3
    _filter_timer = ADD_TIMER(custom_profile(), "FilterTime");
57
3
    return Status::OK();
58
3
}
59
60
0
Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
61
0
    RETURN_IF_ERROR(OperatorXBase::init(tnode, state));
62
0
    RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.repeat_node.exprs, _expr_ctxs));
63
0
    for (const auto& slot_idx : _grouping_list) {
64
0
        if (slot_idx.size() < _repeat_id_list_size) {
65
0
            return Status::InternalError(
66
0
                    "grouping_list size {} is less than repeat_id_list size {}", slot_idx.size(),
67
0
                    _repeat_id_list_size);
68
0
        }
69
0
    }
70
0
    return Status::OK();
71
0
}
72
73
0
Status RepeatOperatorX::prepare(RuntimeState* state) {
74
0
    VLOG_CRITICAL << "VRepeatNode::open";
75
0
    RETURN_IF_ERROR(OperatorXBase::prepare(state));
76
0
    const auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
77
0
    if (output_tuple_desc == nullptr) {
78
0
        return Status::InternalError("Failed to get tuple descriptor.");
79
0
    }
80
0
    for (const auto& slot_desc : output_tuple_desc->slots()) {
81
0
        _output_slots.push_back(slot_desc);
82
0
    }
83
0
    RETURN_IF_ERROR(VExpr::prepare(_expr_ctxs, state, _child->row_desc()));
84
0
    RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state));
85
0
    return Status::OK();
86
0
}
87
88
RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
89
                                 const DescriptorTbl& descs)
90
0
        : Base(pool, tnode, operator_id, descs),
91
0
          _slot_id_set_list(tnode.repeat_node.slot_id_set_list),
92
0
          _all_slot_ids(tnode.repeat_node.all_slot_ids),
93
0
          _repeat_id_list_size(tnode.repeat_node.repeat_id_list.size()),
94
0
          _grouping_list(tnode.repeat_node.grouping_list),
95
0
          _output_tuple_id(tnode.repeat_node.output_tuple_id) {};
96
97
// The control logic of RepeatOperator is
98
// push a block, output _repeat_id_list_size blocks
99
// In the output block, the first part of the columns comes from the input block's columns, and the latter part of the columns is the grouping_id
100
// If there is no expr, there is only grouping_id
101
// If there is an expr, the first part of the columns in the output block uses _all_slot_ids and _slot_id_set_list to control whether it is null
102
12
bool RepeatOperatorX::need_more_input_data(RuntimeState* state) const {
103
12
    auto& local_state = state->get_local_state(operator_id())->cast<RepeatLocalState>();
104
12
    return !local_state._child_block->rows() && !local_state._child_eos;
105
12
}
106
107
Status RepeatLocalState::get_repeated_block(Block* input_block, int repeat_id_idx,
108
4
                                            Block* output_block) {
109
4
    auto& p = _parent->cast<RepeatOperatorX>();
110
4
    DCHECK(input_block != nullptr);
111
4
    DCHECK_EQ(output_block->rows(), 0);
112
113
4
    size_t input_column_size = input_block->columns();
114
4
    size_t output_column_size = p._output_slots.size();
115
4
    DCHECK_LT(input_column_size, output_column_size);
116
4
    auto m_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, p._output_slots);
117
4
    auto& output_columns = m_block.mutable_columns();
118
    /* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2));
119
     * insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1);
120
     * slot_id_set_list=[[0],[1]],repeat_id_idx=0,
121
     * child_block 1,2,1 | 1,3,1 | 2,1,1 | 3,1,1
122
     * output_block 1,null,1,1 | 1,null,1,1 | 2,nul,1,1 | 3,null,1,1
123
     */
124
4
    size_t cur_col = 0;
125
14
    for (size_t i = 0; i < input_column_size; i++) {
126
10
        const ColumnWithTypeAndName& src_column = input_block->get_by_position(i);
127
10
        const auto slot_id = p._output_slots[cur_col]->id();
128
10
        const bool is_repeat_slot = p._all_slot_ids.contains(slot_id);
129
10
        const bool is_set_null_slot = !p._slot_id_set_list[repeat_id_idx].contains(slot_id);
130
10
        const auto row_size = src_column.column->size();
131
10
        ColumnPtr src = src_column.column;
132
10
        if (is_repeat_slot) {
133
6
            DCHECK(p._output_slots[cur_col]->is_nullable());
134
6
            auto* nullable_column = assert_cast<ColumnNullable*>(output_columns[cur_col].get());
135
6
            if (is_set_null_slot) {
136
                // is_set_null_slot = true, output all null
137
1
                nullable_column->insert_many_defaults(row_size);
138
5
            } else {
139
5
                if (!src_column.type->is_nullable()) {
140
3
                    nullable_column->get_nested_column().insert_range_from(*src_column.column, 0,
141
3
                                                                           row_size);
142
3
                    nullable_column->push_false_to_nullmap(row_size);
143
3
                } else {
144
2
                    nullable_column->insert_range_from(*src_column.column, 0, row_size);
145
2
                }
146
5
            }
147
6
        } else {
148
4
            output_columns[cur_col]->insert_range_from(*src_column.column, 0, row_size);
149
4
        }
150
10
        cur_col++;
151
10
    }
152
153
4
    const auto rows = input_block->rows();
154
    // Fill grouping ID to block
155
4
    RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, output_columns, repeat_id_idx));
156
157
4
    DCHECK_EQ(cur_col, output_column_size);
158
159
4
    return Status::OK();
160
4
}
161
162
Status RepeatLocalState::add_grouping_id_column(std::size_t rows, std::size_t& cur_col,
163
6
                                                MutableColumns& columns, int repeat_id_idx) {
164
6
    auto& p = _parent->cast<RepeatOperatorX>();
165
24
    for (auto slot_idx = 0; slot_idx < p._grouping_list.size(); slot_idx++) {
166
18
        DCHECK_LT(slot_idx, p._output_slots.size());
167
18
        int64_t val = p._grouping_list[slot_idx][repeat_id_idx];
168
18
        auto* column_ptr = columns[cur_col].get();
169
18
        DCHECK(!p._output_slots[cur_col]->is_nullable());
170
18
        auto* col = assert_cast<ColumnInt64*>(column_ptr);
171
18
        col->insert_many_vals(val, rows);
172
18
        cur_col++;
173
18
    }
174
6
    return Status::OK();
175
6
}
176
177
3
Status RepeatOperatorX::push(RuntimeState* state, Block* input_block, bool eos) const {
178
3
    auto& local_state = get_local_state(state);
179
3
    SCOPED_TIMER(local_state._evaluate_input_timer);
180
3
    local_state._child_eos = eos;
181
3
    auto& intermediate_block = local_state._intermediate_block;
182
3
    auto& expr_ctxs = local_state._expr_ctxs;
183
3
    DCHECK(!intermediate_block || intermediate_block->rows() == 0);
184
3
    if (input_block->rows() > 0) {
185
3
        SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
186
3
        intermediate_block = Block::create_unique();
187
188
5
        for (auto& expr : expr_ctxs) {
189
5
            ColumnWithTypeAndName result_data;
190
5
            RETURN_IF_ERROR(expr->execute(input_block, result_data));
191
5
            result_data.column = result_data.column->convert_to_full_column_if_const();
192
5
            intermediate_block->insert(result_data);
193
5
        }
194
3
        DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns());
195
3
    }
196
197
3
    return Status::OK();
198
3
}
199
200
6
Status RepeatOperatorX::pull(doris::RuntimeState* state, Block* output_block, bool* eos) const {
201
6
    auto& local_state = get_local_state(state);
202
6
    auto& _repeat_id_idx = local_state._repeat_id_idx;
203
6
    auto& _child_block = *local_state._child_block;
204
6
    auto& _child_eos = local_state._child_eos;
205
6
    auto& _intermediate_block = local_state._intermediate_block;
206
6
    RETURN_IF_CANCELLED(state);
207
208
6
    SCOPED_PEAK_MEM(&local_state._estimate_memory_usage);
209
210
6
    DCHECK(_repeat_id_idx >= 0);
211
18
    for (const std::vector<int64_t>& v : _grouping_list) {
212
18
        DCHECK(_repeat_id_idx <= (int)v.size());
213
18
    }
214
6
    DCHECK(output_block->rows() == 0);
215
216
6
    {
217
6
        SCOPED_TIMER(local_state._get_repeat_data_timer);
218
        // Each pull increases _repeat_id_idx by one until _repeat_id_idx equals _repeat_id_list_size
219
        // Then clear the data of _intermediate_block and _child_block, and set _repeat_id_idx to 0
220
        // need_more_input_data will check if _child_block is empty
221
6
        if (_intermediate_block && _intermediate_block->rows() > 0) {
222
4
            RETURN_IF_ERROR(local_state.get_repeated_block(_intermediate_block.get(),
223
4
                                                           _repeat_id_idx, output_block));
224
225
4
            _repeat_id_idx++;
226
227
4
            if (_repeat_id_idx >= _repeat_id_list_size) {
228
2
                _intermediate_block->clear();
229
2
                _child_block.clear_column_data(_child->row_desc().num_materialized_slots());
230
2
                _repeat_id_idx = 0;
231
2
            }
232
4
        } else if (local_state._expr_ctxs.empty()) {
233
2
            auto m_block =
234
2
                    VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots);
235
2
            auto rows = _child_block.rows();
236
2
            auto& columns = m_block.mutable_columns();
237
238
2
            std::size_t cur_col = 0;
239
2
            RETURN_IF_ERROR(
240
2
                    local_state.add_grouping_id_column(rows, cur_col, columns, _repeat_id_idx));
241
2
            _repeat_id_idx++;
242
243
2
            if (_repeat_id_idx >= _repeat_id_list_size) {
244
1
                _intermediate_block->clear();
245
1
                _child_block.clear_column_data(_child->row_desc().num_materialized_slots());
246
1
                _repeat_id_idx = 0;
247
1
            }
248
2
        }
249
6
    }
250
251
6
    {
252
6
        SCOPED_TIMER(local_state._filter_timer);
253
6
        RETURN_IF_ERROR(VExprContext::filter_block(local_state._conjuncts, output_block,
254
6
                                                   output_block->columns()));
255
6
    }
256
257
6
    *eos = _child_eos && _child_block.rows() == 0;
258
6
    local_state.reached_limit(output_block, eos);
259
6
    return Status::OK();
260
6
}
261
262
#include "common/compile_check_end.h"
263
} // namespace doris