be/src/exec/operator/repeat_operator.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/operator/repeat_operator.h" |
19 | | |
20 | | #include <memory> |
21 | | |
22 | | #include "common/logging.h" |
23 | | #include "core/assert_cast.h" |
24 | | #include "core/block/block.h" |
25 | | #include "core/block/column_with_type_and_name.h" |
26 | | #include "exec/operator/operator.h" |
27 | | |
28 | | namespace doris { |
29 | | #include "common/compile_check_begin.h" |
30 | | class RuntimeState; |
31 | | } // namespace doris |
32 | | |
33 | | namespace doris { |
34 | | |
35 | | RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent) |
36 | 3 | : Base(state, parent), _child_block(Block::create_unique()), _repeat_id_idx(0) {} |
37 | | |
38 | 3 | Status RepeatLocalState::open(RuntimeState* state) { |
39 | 3 | SCOPED_TIMER(exec_time_counter()); |
40 | 3 | SCOPED_TIMER(_open_timer); |
41 | 3 | RETURN_IF_ERROR(Base::open(state)); |
42 | 3 | auto& p = _parent->cast<Parent>(); |
43 | 3 | _expr_ctxs.resize(p._expr_ctxs.size()); |
44 | 8 | for (size_t i = 0; i < _expr_ctxs.size(); i++) { |
45 | 5 | RETURN_IF_ERROR(p._expr_ctxs[i]->clone(state, _expr_ctxs[i])); |
46 | 5 | } |
47 | 3 | return Status::OK(); |
48 | 3 | } |
49 | | |
50 | 3 | Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
51 | 3 | RETURN_IF_ERROR(Base::init(state, info)); |
52 | 3 | SCOPED_TIMER(exec_time_counter()); |
53 | 3 | SCOPED_TIMER(_init_timer); |
54 | 3 | _evaluate_input_timer = ADD_TIMER(custom_profile(), "EvaluateInputDataTime"); |
55 | 3 | _get_repeat_data_timer = ADD_TIMER(custom_profile(), "GetRepeatDataTime"); |
56 | 3 | _filter_timer = ADD_TIMER(custom_profile(), "FilterTime"); |
57 | 3 | return Status::OK(); |
58 | 3 | } |
59 | | |
60 | 0 | Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { |
61 | 0 | RETURN_IF_ERROR(OperatorXBase::init(tnode, state)); |
62 | 0 | RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.repeat_node.exprs, _expr_ctxs)); |
63 | 0 | for (const auto& slot_idx : _grouping_list) { |
64 | 0 | if (slot_idx.size() < _repeat_id_list_size) { |
65 | 0 | return Status::InternalError( |
66 | 0 | "grouping_list size {} is less than repeat_id_list size {}", slot_idx.size(), |
67 | 0 | _repeat_id_list_size); |
68 | 0 | } |
69 | 0 | } |
70 | 0 | return Status::OK(); |
71 | 0 | } |
72 | | |
73 | 0 | Status RepeatOperatorX::prepare(RuntimeState* state) { |
74 | 0 | VLOG_CRITICAL << "VRepeatNode::open"; |
75 | 0 | RETURN_IF_ERROR(OperatorXBase::prepare(state)); |
76 | 0 | const auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); |
77 | 0 | if (output_tuple_desc == nullptr) { |
78 | 0 | return Status::InternalError("Failed to get tuple descriptor."); |
79 | 0 | } |
80 | 0 | for (const auto& slot_desc : output_tuple_desc->slots()) { |
81 | 0 | _output_slots.push_back(slot_desc); |
82 | 0 | } |
83 | 0 | RETURN_IF_ERROR(VExpr::prepare(_expr_ctxs, state, _child->row_desc())); |
84 | 0 | RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state)); |
85 | 0 | return Status::OK(); |
86 | 0 | } |
87 | | |
88 | | RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
89 | | const DescriptorTbl& descs) |
90 | 0 | : Base(pool, tnode, operator_id, descs), |
91 | 0 | _slot_id_set_list(tnode.repeat_node.slot_id_set_list), |
92 | 0 | _all_slot_ids(tnode.repeat_node.all_slot_ids), |
93 | 0 | _repeat_id_list_size(tnode.repeat_node.repeat_id_list.size()), |
94 | 0 | _grouping_list(tnode.repeat_node.grouping_list), |
95 | 0 | _output_tuple_id(tnode.repeat_node.output_tuple_id) {}; |
96 | | |
97 | | // The control logic of RepeatOperator is |
98 | | // push a block, output _repeat_id_list_size blocks |
99 | | // In the output block, the first part of the columns comes from the input block's columns, and the latter part of the columns is the grouping_id |
100 | | // If there is no expr, there is only grouping_id |
101 | | // If there is an expr, the first part of the columns in the output block uses _all_slot_ids and _slot_id_set_list to control whether it is null |
102 | 12 | bool RepeatOperatorX::need_more_input_data(RuntimeState* state) const { |
103 | 12 | auto& local_state = state->get_local_state(operator_id())->cast<RepeatLocalState>(); |
104 | 12 | return !local_state._child_block->rows() && !local_state._child_eos; |
105 | 12 | } |
106 | | |
107 | | Status RepeatLocalState::get_repeated_block(Block* input_block, int repeat_id_idx, |
108 | 4 | Block* output_block) { |
109 | 4 | auto& p = _parent->cast<RepeatOperatorX>(); |
110 | 4 | DCHECK(input_block != nullptr); |
111 | 4 | DCHECK_EQ(output_block->rows(), 0); |
112 | | |
113 | 4 | size_t input_column_size = input_block->columns(); |
114 | 4 | size_t output_column_size = p._output_slots.size(); |
115 | 4 | DCHECK_LT(input_column_size, output_column_size); |
116 | 4 | auto m_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, p._output_slots); |
117 | 4 | auto& output_columns = m_block.mutable_columns(); |
118 | | /* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2)); |
119 | | * insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1); |
120 | | * slot_id_set_list=[[0],[1]],repeat_id_idx=0, |
121 | | * child_block 1,2,1 | 1,3,1 | 2,1,1 | 3,1,1 |
122 | | * output_block 1,null,1,1 | 1,null,1,1 | 2,nul,1,1 | 3,null,1,1 |
123 | | */ |
124 | 4 | size_t cur_col = 0; |
125 | 14 | for (size_t i = 0; i < input_column_size; i++) { |
126 | 10 | const ColumnWithTypeAndName& src_column = input_block->get_by_position(i); |
127 | 10 | const auto slot_id = p._output_slots[cur_col]->id(); |
128 | 10 | const bool is_repeat_slot = p._all_slot_ids.contains(slot_id); |
129 | 10 | const bool is_set_null_slot = !p._slot_id_set_list[repeat_id_idx].contains(slot_id); |
130 | 10 | const auto row_size = src_column.column->size(); |
131 | 10 | ColumnPtr src = src_column.column; |
132 | 10 | if (is_repeat_slot) { |
133 | 6 | DCHECK(p._output_slots[cur_col]->is_nullable()); |
134 | 6 | auto* nullable_column = assert_cast<ColumnNullable*>(output_columns[cur_col].get()); |
135 | 6 | if (is_set_null_slot) { |
136 | | // is_set_null_slot = true, output all null |
137 | 1 | nullable_column->insert_many_defaults(row_size); |
138 | 5 | } else { |
139 | 5 | if (!src_column.type->is_nullable()) { |
140 | 3 | nullable_column->get_nested_column().insert_range_from(*src_column.column, 0, |
141 | 3 | row_size); |
142 | 3 | nullable_column->push_false_to_nullmap(row_size); |
143 | 3 | } else { |
144 | 2 | nullable_column->insert_range_from(*src_column.column, 0, row_size); |
145 | 2 | } |
146 | 5 | } |
147 | 6 | } else { |
148 | 4 | output_columns[cur_col]->insert_range_from(*src_column.column, 0, row_size); |
149 | 4 | } |
150 | 10 | cur_col++; |
151 | 10 | } |
152 | | |
153 | 4 | const auto rows = input_block->rows(); |
154 | | // Fill grouping ID to block |
155 | 4 | RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, output_columns, repeat_id_idx)); |
156 | | |
157 | 4 | DCHECK_EQ(cur_col, output_column_size); |
158 | | |
159 | 4 | return Status::OK(); |
160 | 4 | } |
161 | | |
162 | | Status RepeatLocalState::add_grouping_id_column(std::size_t rows, std::size_t& cur_col, |
163 | 6 | MutableColumns& columns, int repeat_id_idx) { |
164 | 6 | auto& p = _parent->cast<RepeatOperatorX>(); |
165 | 24 | for (auto slot_idx = 0; slot_idx < p._grouping_list.size(); slot_idx++) { |
166 | 18 | DCHECK_LT(slot_idx, p._output_slots.size()); |
167 | 18 | int64_t val = p._grouping_list[slot_idx][repeat_id_idx]; |
168 | 18 | auto* column_ptr = columns[cur_col].get(); |
169 | 18 | DCHECK(!p._output_slots[cur_col]->is_nullable()); |
170 | 18 | auto* col = assert_cast<ColumnInt64*>(column_ptr); |
171 | 18 | col->insert_many_vals(val, rows); |
172 | 18 | cur_col++; |
173 | 18 | } |
174 | 6 | return Status::OK(); |
175 | 6 | } |
176 | | |
177 | 3 | Status RepeatOperatorX::push(RuntimeState* state, Block* input_block, bool eos) const { |
178 | 3 | auto& local_state = get_local_state(state); |
179 | 3 | SCOPED_TIMER(local_state._evaluate_input_timer); |
180 | 3 | local_state._child_eos = eos; |
181 | 3 | auto& intermediate_block = local_state._intermediate_block; |
182 | 3 | auto& expr_ctxs = local_state._expr_ctxs; |
183 | 3 | DCHECK(!intermediate_block || intermediate_block->rows() == 0); |
184 | 3 | if (input_block->rows() > 0) { |
185 | 3 | SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); |
186 | 3 | intermediate_block = Block::create_unique(); |
187 | | |
188 | 5 | for (auto& expr : expr_ctxs) { |
189 | 5 | ColumnWithTypeAndName result_data; |
190 | 5 | RETURN_IF_ERROR(expr->execute(input_block, result_data)); |
191 | 5 | result_data.column = result_data.column->convert_to_full_column_if_const(); |
192 | 5 | intermediate_block->insert(result_data); |
193 | 5 | } |
194 | 3 | DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns()); |
195 | 3 | } |
196 | | |
197 | 3 | return Status::OK(); |
198 | 3 | } |
199 | | |
200 | 6 | Status RepeatOperatorX::pull(doris::RuntimeState* state, Block* output_block, bool* eos) const { |
201 | 6 | auto& local_state = get_local_state(state); |
202 | 6 | auto& _repeat_id_idx = local_state._repeat_id_idx; |
203 | 6 | auto& _child_block = *local_state._child_block; |
204 | 6 | auto& _child_eos = local_state._child_eos; |
205 | 6 | auto& _intermediate_block = local_state._intermediate_block; |
206 | 6 | RETURN_IF_CANCELLED(state); |
207 | | |
208 | 6 | SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); |
209 | | |
210 | 6 | DCHECK(_repeat_id_idx >= 0); |
211 | 18 | for (const std::vector<int64_t>& v : _grouping_list) { |
212 | 18 | DCHECK(_repeat_id_idx <= (int)v.size()); |
213 | 18 | } |
214 | 6 | DCHECK(output_block->rows() == 0); |
215 | | |
216 | 6 | { |
217 | 6 | SCOPED_TIMER(local_state._get_repeat_data_timer); |
218 | | // Each pull increases _repeat_id_idx by one until _repeat_id_idx equals _repeat_id_list_size |
219 | | // Then clear the data of _intermediate_block and _child_block, and set _repeat_id_idx to 0 |
220 | | // need_more_input_data will check if _child_block is empty |
221 | 6 | if (_intermediate_block && _intermediate_block->rows() > 0) { |
222 | 4 | RETURN_IF_ERROR(local_state.get_repeated_block(_intermediate_block.get(), |
223 | 4 | _repeat_id_idx, output_block)); |
224 | | |
225 | 4 | _repeat_id_idx++; |
226 | | |
227 | 4 | if (_repeat_id_idx >= _repeat_id_list_size) { |
228 | 2 | _intermediate_block->clear(); |
229 | 2 | _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); |
230 | 2 | _repeat_id_idx = 0; |
231 | 2 | } |
232 | 4 | } else if (local_state._expr_ctxs.empty()) { |
233 | 2 | auto m_block = |
234 | 2 | VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots); |
235 | 2 | auto rows = _child_block.rows(); |
236 | 2 | auto& columns = m_block.mutable_columns(); |
237 | | |
238 | 2 | std::size_t cur_col = 0; |
239 | 2 | RETURN_IF_ERROR( |
240 | 2 | local_state.add_grouping_id_column(rows, cur_col, columns, _repeat_id_idx)); |
241 | 2 | _repeat_id_idx++; |
242 | | |
243 | 2 | if (_repeat_id_idx >= _repeat_id_list_size) { |
244 | 1 | _intermediate_block->clear(); |
245 | 1 | _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); |
246 | 1 | _repeat_id_idx = 0; |
247 | 1 | } |
248 | 2 | } |
249 | 6 | } |
250 | | |
251 | 6 | { |
252 | 6 | SCOPED_TIMER(local_state._filter_timer); |
253 | 6 | RETURN_IF_ERROR(VExprContext::filter_block(local_state._conjuncts, output_block, |
254 | 6 | output_block->columns())); |
255 | 6 | } |
256 | | |
257 | 6 | *eos = _child_eos && _child_block.rows() == 0; |
258 | 6 | local_state.reached_limit(output_block, eos); |
259 | 6 | return Status::OK(); |
260 | 6 | } |
261 | | |
262 | | #include "common/compile_check_end.h" |
263 | | } // namespace doris |