be/src/exec/operator/repeat_operator.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "exec/operator/repeat_operator.h" |
19 | | |
20 | | #include <memory> |
21 | | |
22 | | #include "common/logging.h" |
23 | | #include "core/assert_cast.h" |
24 | | #include "core/block/block.h" |
25 | | #include "core/block/column_with_type_and_name.h" |
26 | | #include "exec/operator/operator.h" |
27 | | |
28 | | namespace doris { |
29 | | class RuntimeState; |
30 | | } // namespace doris |
31 | | |
32 | | namespace doris { |
33 | | |
34 | | RepeatLocalState::RepeatLocalState(RuntimeState* state, OperatorXBase* parent) |
35 | 3 | : Base(state, parent), _child_block(Block::create_unique()), _repeat_id_idx(0) {} |
36 | | |
37 | 3 | Status RepeatLocalState::open(RuntimeState* state) { |
38 | 3 | SCOPED_TIMER(exec_time_counter()); |
39 | 3 | SCOPED_TIMER(_open_timer); |
40 | 3 | RETURN_IF_ERROR(Base::open(state)); |
41 | 3 | auto& p = _parent->cast<Parent>(); |
42 | 3 | _expr_ctxs.resize(p._expr_ctxs.size()); |
43 | 8 | for (size_t i = 0; i < _expr_ctxs.size(); i++) { |
44 | 5 | RETURN_IF_ERROR(p._expr_ctxs[i]->clone(state, _expr_ctxs[i])); |
45 | 5 | } |
46 | 3 | return Status::OK(); |
47 | 3 | } |
48 | | |
49 | 3 | Status RepeatLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
50 | 3 | RETURN_IF_ERROR(Base::init(state, info)); |
51 | 3 | SCOPED_TIMER(exec_time_counter()); |
52 | 3 | SCOPED_TIMER(_init_timer); |
53 | 3 | _evaluate_input_timer = ADD_TIMER(custom_profile(), "EvaluateInputDataTime"); |
54 | 3 | _get_repeat_data_timer = ADD_TIMER(custom_profile(), "GetRepeatDataTime"); |
55 | 3 | _filter_timer = ADD_TIMER(custom_profile(), "FilterTime"); |
56 | 3 | return Status::OK(); |
57 | 3 | } |
58 | | |
59 | 0 | Status RepeatOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { |
60 | 0 | RETURN_IF_ERROR(OperatorXBase::init(tnode, state)); |
61 | 0 | RETURN_IF_ERROR(VExpr::create_expr_trees(tnode.repeat_node.exprs, _expr_ctxs)); |
62 | 0 | for (const auto& slot_idx : _grouping_list) { |
63 | 0 | if (slot_idx.size() < _repeat_id_list_size) { |
64 | 0 | return Status::InternalError( |
65 | 0 | "grouping_list size {} is less than repeat_id_list size {}", slot_idx.size(), |
66 | 0 | _repeat_id_list_size); |
67 | 0 | } |
68 | 0 | } |
69 | 0 | return Status::OK(); |
70 | 0 | } |
71 | | |
72 | 0 | Status RepeatOperatorX::prepare(RuntimeState* state) { |
73 | 0 | VLOG_CRITICAL << "VRepeatNode::open"; |
74 | 0 | RETURN_IF_ERROR(OperatorXBase::prepare(state)); |
75 | 0 | const auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); |
76 | 0 | if (output_tuple_desc == nullptr) { |
77 | 0 | return Status::InternalError("Failed to get tuple descriptor."); |
78 | 0 | } |
79 | 0 | for (const auto& slot_desc : output_tuple_desc->slots()) { |
80 | 0 | _output_slots.push_back(slot_desc); |
81 | 0 | } |
82 | 0 | RETURN_IF_ERROR(VExpr::prepare(_expr_ctxs, state, _child->row_desc())); |
83 | 0 | RETURN_IF_ERROR(VExpr::open(_expr_ctxs, state)); |
84 | 0 | return Status::OK(); |
85 | 0 | } |
86 | | |
87 | | RepeatOperatorX::RepeatOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
88 | | const DescriptorTbl& descs) |
89 | 0 | : Base(pool, tnode, operator_id, descs), |
90 | 0 | _slot_id_set_list(tnode.repeat_node.slot_id_set_list), |
91 | 0 | _all_slot_ids(tnode.repeat_node.all_slot_ids), |
92 | 0 | _repeat_id_list_size(tnode.repeat_node.repeat_id_list.size()), |
93 | 0 | _grouping_list(tnode.repeat_node.grouping_list), |
94 | 0 | _output_tuple_id(tnode.repeat_node.output_tuple_id) {}; |
95 | | |
96 | | // The control logic of RepeatOperator is |
97 | | // push a block, output _repeat_id_list_size blocks |
98 | | // In the output block, the first part of the columns comes from the input block's columns, and the latter part of the columns is the grouping_id |
99 | | // If there is no expr, there is only grouping_id |
100 | | // If there is an expr, the first part of the columns in the output block uses _all_slot_ids and _slot_id_set_list to control whether it is null |
101 | 12 | bool RepeatOperatorX::need_more_input_data(RuntimeState* state) const { |
102 | 12 | auto& local_state = state->get_local_state(operator_id())->cast<RepeatLocalState>(); |
103 | 12 | return !local_state._child_block->rows() && !local_state._child_eos; |
104 | 12 | } |
105 | | |
106 | | Status RepeatLocalState::get_repeated_block(Block* input_block, int repeat_id_idx, |
107 | 4 | Block* output_block) { |
108 | 4 | auto& p = _parent->cast<RepeatOperatorX>(); |
109 | 4 | DCHECK(input_block != nullptr); |
110 | 4 | DCHECK_EQ(output_block->rows(), 0); |
111 | | |
112 | 4 | size_t input_column_size = input_block->columns(); |
113 | 4 | size_t output_column_size = p._output_slots.size(); |
114 | 4 | DCHECK_LT(input_column_size, output_column_size); |
115 | 4 | auto m_block = VectorizedUtils::build_mutable_mem_reuse_block(output_block, p._output_slots); |
116 | 4 | auto& output_columns = m_block.mutable_columns(); |
117 | | /* Fill all slots according to child, for example:select tc1,tc2,sum(tc3) from t1 group by grouping sets((tc1),(tc2)); |
118 | | * insert into t1 values(1,2,1),(1,3,1),(2,1,1),(3,1,1); |
119 | | * slot_id_set_list=[[0],[1]],repeat_id_idx=0, |
120 | | * child_block 1,2,1 | 1,3,1 | 2,1,1 | 3,1,1 |
121 | | * output_block 1,null,1,1 | 1,null,1,1 | 2,nul,1,1 | 3,null,1,1 |
122 | | */ |
123 | 4 | size_t cur_col = 0; |
124 | 14 | for (size_t i = 0; i < input_column_size; i++) { |
125 | 10 | const ColumnWithTypeAndName& src_column = input_block->get_by_position(i); |
126 | 10 | const auto slot_id = p._output_slots[cur_col]->id(); |
127 | 10 | const bool is_repeat_slot = p._all_slot_ids.contains(slot_id); |
128 | 10 | const bool is_set_null_slot = !p._slot_id_set_list[repeat_id_idx].contains(slot_id); |
129 | 10 | const auto row_size = src_column.column->size(); |
130 | 10 | ColumnPtr src = src_column.column; |
131 | 10 | if (is_repeat_slot) { |
132 | 6 | DCHECK(p._output_slots[cur_col]->is_nullable()); |
133 | 6 | auto* nullable_column = assert_cast<ColumnNullable*>(output_columns[cur_col].get()); |
134 | 6 | if (is_set_null_slot) { |
135 | | // is_set_null_slot = true, output all null |
136 | 1 | nullable_column->insert_many_defaults(row_size); |
137 | 5 | } else { |
138 | 5 | if (!src_column.type->is_nullable()) { |
139 | 3 | nullable_column->get_nested_column().insert_range_from(*src_column.column, 0, |
140 | 3 | row_size); |
141 | 3 | nullable_column->push_false_to_nullmap(row_size); |
142 | 3 | } else { |
143 | 2 | nullable_column->insert_range_from(*src_column.column, 0, row_size); |
144 | 2 | } |
145 | 5 | } |
146 | 6 | } else { |
147 | 4 | output_columns[cur_col]->insert_range_from(*src_column.column, 0, row_size); |
148 | 4 | } |
149 | 10 | cur_col++; |
150 | 10 | } |
151 | | |
152 | 4 | const auto rows = input_block->rows(); |
153 | | // Fill grouping ID to block |
154 | 4 | RETURN_IF_ERROR(add_grouping_id_column(rows, cur_col, output_columns, repeat_id_idx)); |
155 | | |
156 | 4 | DCHECK_EQ(cur_col, output_column_size); |
157 | | |
158 | 4 | return Status::OK(); |
159 | 4 | } |
160 | | |
161 | | Status RepeatLocalState::add_grouping_id_column(std::size_t rows, std::size_t& cur_col, |
162 | 6 | MutableColumns& columns, int repeat_id_idx) { |
163 | 6 | auto& p = _parent->cast<RepeatOperatorX>(); |
164 | 24 | for (auto slot_idx = 0; slot_idx < p._grouping_list.size(); slot_idx++) { |
165 | 18 | DCHECK_LT(slot_idx, p._output_slots.size()); |
166 | 18 | int64_t val = p._grouping_list[slot_idx][repeat_id_idx]; |
167 | 18 | auto* column_ptr = columns[cur_col].get(); |
168 | 18 | DCHECK(!p._output_slots[cur_col]->is_nullable()); |
169 | 18 | auto* col = assert_cast<ColumnInt64*>(column_ptr); |
170 | 18 | col->insert_many_vals(val, rows); |
171 | 18 | cur_col++; |
172 | 18 | } |
173 | 6 | return Status::OK(); |
174 | 6 | } |
175 | | |
176 | 3 | Status RepeatOperatorX::push(RuntimeState* state, Block* input_block, bool eos) const { |
177 | 3 | auto& local_state = get_local_state(state); |
178 | 3 | SCOPED_TIMER(local_state._evaluate_input_timer); |
179 | 3 | local_state._child_eos = eos; |
180 | 3 | auto& intermediate_block = local_state._intermediate_block; |
181 | 3 | auto& expr_ctxs = local_state._expr_ctxs; |
182 | 3 | DCHECK(!intermediate_block || intermediate_block->rows() == 0); |
183 | 3 | if (input_block->rows() > 0) { |
184 | 3 | SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); |
185 | 3 | intermediate_block = Block::create_unique(); |
186 | | |
187 | 5 | for (auto& expr : expr_ctxs) { |
188 | 5 | ColumnWithTypeAndName result_data; |
189 | 5 | RETURN_IF_ERROR(expr->execute(input_block, result_data)); |
190 | 5 | result_data.column = result_data.column->convert_to_full_column_if_const(); |
191 | 5 | intermediate_block->insert(result_data); |
192 | 5 | } |
193 | 3 | DCHECK_EQ(expr_ctxs.size(), intermediate_block->columns()); |
194 | 3 | } |
195 | | |
196 | 3 | return Status::OK(); |
197 | 3 | } |
198 | | |
199 | 6 | Status RepeatOperatorX::pull(doris::RuntimeState* state, Block* output_block, bool* eos) const { |
200 | 6 | auto& local_state = get_local_state(state); |
201 | 6 | auto& _repeat_id_idx = local_state._repeat_id_idx; |
202 | 6 | auto& _child_block = *local_state._child_block; |
203 | 6 | auto& _child_eos = local_state._child_eos; |
204 | 6 | auto& _intermediate_block = local_state._intermediate_block; |
205 | 6 | RETURN_IF_CANCELLED(state); |
206 | | |
207 | 6 | SCOPED_PEAK_MEM(&local_state._estimate_memory_usage); |
208 | | |
209 | 6 | DCHECK(_repeat_id_idx >= 0); |
210 | 18 | for (const std::vector<int64_t>& v : _grouping_list) { |
211 | 18 | DCHECK(_repeat_id_idx <= (int)v.size()); |
212 | 18 | } |
213 | 6 | DCHECK(output_block->rows() == 0); |
214 | | |
215 | 6 | { |
216 | 6 | SCOPED_TIMER(local_state._get_repeat_data_timer); |
217 | | // Each pull increases _repeat_id_idx by one until _repeat_id_idx equals _repeat_id_list_size |
218 | | // Then clear the data of _intermediate_block and _child_block, and set _repeat_id_idx to 0 |
219 | | // need_more_input_data will check if _child_block is empty |
220 | 6 | if (_intermediate_block && _intermediate_block->rows() > 0) { |
221 | 4 | RETURN_IF_ERROR(local_state.get_repeated_block(_intermediate_block.get(), |
222 | 4 | _repeat_id_idx, output_block)); |
223 | | |
224 | 4 | _repeat_id_idx++; |
225 | | |
226 | 4 | if (_repeat_id_idx >= _repeat_id_list_size) { |
227 | 2 | _intermediate_block->clear(); |
228 | 2 | _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); |
229 | 2 | _repeat_id_idx = 0; |
230 | 2 | } |
231 | 4 | } else if (local_state._expr_ctxs.empty()) { |
232 | 2 | auto m_block = |
233 | 2 | VectorizedUtils::build_mutable_mem_reuse_block(output_block, _output_slots); |
234 | 2 | auto rows = _child_block.rows(); |
235 | 2 | auto& columns = m_block.mutable_columns(); |
236 | | |
237 | 2 | std::size_t cur_col = 0; |
238 | 2 | RETURN_IF_ERROR( |
239 | 2 | local_state.add_grouping_id_column(rows, cur_col, columns, _repeat_id_idx)); |
240 | 2 | _repeat_id_idx++; |
241 | | |
242 | 2 | if (_repeat_id_idx >= _repeat_id_list_size) { |
243 | 1 | _intermediate_block->clear(); |
244 | 1 | _child_block.clear_column_data(_child->row_desc().num_materialized_slots()); |
245 | 1 | _repeat_id_idx = 0; |
246 | 1 | } |
247 | 2 | } |
248 | 6 | } |
249 | | |
250 | 6 | { |
251 | 6 | SCOPED_TIMER(local_state._filter_timer); |
252 | 6 | RETURN_IF_ERROR(VExprContext::filter_block(local_state._conjuncts, output_block, |
253 | 6 | output_block->columns())); |
254 | 6 | } |
255 | | |
256 | 6 | *eos = _child_eos && _child_block.rows() == 0; |
257 | 6 | local_state.reached_limit(output_block, eos); |
258 | 6 | return Status::OK(); |
259 | 6 | } |
260 | | |
261 | | } // namespace doris |