be/src/runtime/fold_constant_executor.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "runtime/fold_constant_executor.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Descriptors_types.h> |
22 | | #include <gen_cpp/Exprs_types.h> |
23 | | #include <gen_cpp/PaloInternalService_types.h> |
24 | | #include <gen_cpp/internal_service.pb.h> |
25 | | #include <gen_cpp/types.pb.h> |
26 | | #include <glog/logging.h> |
27 | | #include <stdint.h> |
28 | | |
29 | | #include <algorithm> |
30 | | #include <boost/iterator/iterator_facade.hpp> |
31 | | #include <map> |
32 | | #include <ostream> |
33 | | #include <utility> |
34 | | |
35 | | #include "common/compiler_util.h" // IWYU pragma: keep |
36 | | #include "common/signal_handler.h" |
37 | | #include "common/status.h" |
38 | | #include "core/binary_cast.hpp" |
39 | | #include "core/block/block.h" |
40 | | #include "core/block/column_with_type_and_name.h" |
41 | | #include "core/column/column.h" |
42 | | #include "core/column/column_array.h" |
43 | | #include "core/column/column_vector.h" |
44 | | #include "core/data_type/data_type_array.h" |
45 | | #include "core/data_type/data_type_number.h" |
46 | | #include "core/data_type/define_primitive_type.h" |
47 | | #include "core/data_type/primitive_type.h" |
48 | | #include "core/field.h" |
49 | | #include "core/string_ref.h" |
50 | | #include "core/types.h" |
51 | | #include "core/value/decimalv2_value.h" |
52 | | #include "core/value/large_int_value.h" |
53 | | #include "core/value/timestamptz_value.h" |
54 | | #include "core/value/vdatetime_value.h" |
55 | | #include "exprs/function/cast/cast_to_string.h" |
56 | | #include "exprs/vexpr.h" |
57 | | #include "exprs/vexpr_context.h" |
58 | | #include "runtime/descriptors.h" |
59 | | #include "runtime/exec_env.h" |
60 | | #include "runtime/memory/mem_tracker.h" |
61 | | #include "runtime/runtime_profile.h" |
62 | | #include "runtime/runtime_state.h" |
63 | | #include "runtime/thread_context.h" |
64 | | #include "util/defer_op.h" |
65 | | #include "util/uid_util.h" |
66 | | |
67 | | namespace doris { |
68 | | |
69 | | static std::unordered_set<PrimitiveType> PRIMITIVE_TYPE_SET { |
70 | | TYPE_BOOLEAN, TYPE_TINYINT, TYPE_SMALLINT, TYPE_INT, TYPE_BIGINT, |
71 | | TYPE_LARGEINT, TYPE_FLOAT, TYPE_DOUBLE, TYPE_TIMEV2, TYPE_CHAR, |
72 | | TYPE_VARCHAR, TYPE_STRING, TYPE_HLL, TYPE_BITMAP, TYPE_DATE, |
73 | | TYPE_DATETIME, TYPE_DATEV2, TYPE_DATETIMEV2, TYPE_DECIMALV2, TYPE_TIMESTAMPTZ}; |
74 | | |
75 | | Status FoldConstantExecutor::fold_constant_vexpr(const TFoldConstantParams& params, |
76 | 566 | PConstantExprResult* response) { |
77 | 566 | const auto& expr_map = params.expr_map; |
78 | 566 | auto* expr_result_map = response->mutable_expr_result_map(); |
79 | | |
80 | 566 | TQueryGlobals query_globals = params.query_globals; |
81 | 566 | _query_id = params.query_id; |
82 | 566 | LOG(INFO) << "fold_query_id: " << print_id(_query_id); |
83 | | // init |
84 | 566 | RETURN_IF_ERROR(_init(query_globals, params.query_options)); |
85 | | // only after init operation, _mem_tracker is ready |
86 | 566 | SCOPED_ATTACH_TASK(_mem_tracker); |
87 | 566 | signal::SignalTaskIdKeeper keeper(_query_id); |
88 | | |
89 | 566 | DataTypeSerDe::FormatOptions format_options = DataTypeSerDe::get_default_format_options(); |
90 | 566 | format_options.timezone = &_runtime_state->timezone_obj(); |
91 | | |
92 | 566 | for (const auto& m : expr_map) { |
93 | 566 | PExprResultMap pexpr_result_map; |
94 | 566 | for (const auto& n : m.second) { |
95 | 566 | VExprContextSPtr ctx; |
96 | 566 | const TExpr& texpr = n.second; |
97 | | // create expr tree from TExpr |
98 | 566 | RETURN_IF_ERROR(VExpr::create_expr_tree(texpr, ctx)); |
99 | | // prepare and open context |
100 | 566 | RETURN_IF_ERROR(_prepare_and_open(ctx.get())); |
101 | | |
102 | 558 | ColumnWithTypeAndName tmp_data; |
103 | | // calc vexpr |
104 | 558 | RETURN_IF_ERROR(ctx->execute_const_expr(tmp_data)); |
105 | | // covert to thrift type |
106 | 528 | const auto& res_type = ctx->root()->data_type(); |
107 | 528 | TPrimitiveType::type t_type = doris::to_thrift(res_type->get_primitive_type()); |
108 | | // collect result |
109 | 528 | PExprResult expr_result; |
110 | 528 | std::string result; |
111 | 528 | const auto& column_ptr = tmp_data.column; |
112 | 528 | const auto& column_type = tmp_data.type; |
113 | | // 4 from fe: Config.be_exec_version maybe need remove after next version, now in 2.1 |
114 | 528 | if (_runtime_state->be_exec_version() >= 4 && params.__isset.is_nereids && |
115 | 528 | params.is_nereids) { |
116 | 528 | auto* p_type_desc = expr_result.mutable_type_desc(); |
117 | 528 | auto* p_values = expr_result.mutable_result_content(); |
118 | 528 | res_type->to_protobuf(p_type_desc); |
119 | 528 | auto datatype_serde = column_type->get_serde(); |
120 | 528 | RETURN_IF_ERROR(datatype_serde->write_column_to_pb( |
121 | 528 | *column_ptr->convert_to_full_column_if_const(), *p_values, 0, 1)); |
122 | 528 | expr_result.set_success(true); |
123 | | // after refactor, this field is useless, but it's required |
124 | 528 | expr_result.set_content("ERROR"); |
125 | 528 | expr_result.mutable_type()->set_type(t_type); |
126 | 528 | pexpr_result_map.mutable_map()->insert({n.first, expr_result}); |
127 | 528 | } else { |
128 | 0 | if (column_ptr->is_null_at(0)) { |
129 | 0 | expr_result.set_success(false); |
130 | 0 | } else { |
131 | 0 | expr_result.set_success(true); |
132 | 0 | StringRef string_ref; |
133 | 0 | auto type = ctx->root()->data_type()->get_primitive_type(); |
134 | | //eg: strcut, array, map VARIANT... will not impl get_data_at, so could use column->to_string() |
135 | 0 | if (PRIMITIVE_TYPE_SET.contains(type)) { |
136 | 0 | string_ref = column_ptr->get_data_at(0); |
137 | 0 | } |
138 | 0 | RETURN_IF_ERROR(_get_result((void*)string_ref.data, string_ref.size, |
139 | 0 | ctx->root()->data_type(), column_ptr, column_type, |
140 | 0 | result, format_options)); |
141 | 0 | } |
142 | 0 | expr_result.set_content(std::move(result)); |
143 | 0 | expr_result.mutable_type()->set_type(t_type); |
144 | 0 | expr_result.mutable_type()->set_scale(res_type->get_scale()); |
145 | 0 | expr_result.mutable_type()->set_precision(res_type->get_precision()); |
146 | 0 | expr_result.mutable_type()->set_len( |
147 | 0 | res_type->get_primitive_type() == PrimitiveType::TYPE_STRING |
148 | 0 | ? assert_cast<const DataTypeString*>( |
149 | 0 | remove_nullable(res_type).get()) |
150 | 0 | ->len() |
151 | 0 | : -1); |
152 | 0 | pexpr_result_map.mutable_map()->insert({n.first, expr_result}); |
153 | 0 | } |
154 | 528 | } |
155 | 528 | expr_result_map->insert({m.first, pexpr_result_map}); |
156 | 528 | } |
157 | 528 | return Status::OK(); |
158 | 566 | } |
159 | | |
160 | | Status FoldConstantExecutor::_init(const TQueryGlobals& query_globals, |
161 | 566 | const TQueryOptions& query_options) { |
162 | | // init runtime state, runtime profile |
163 | 566 | TPlanFragmentExecParams params; |
164 | 566 | params.fragment_instance_id = _query_id; |
165 | 566 | params.query_id = _query_id; |
166 | 566 | _mem_tracker = MemTrackerLimiter::create_shared( |
167 | 566 | MemTrackerLimiter::Type::OTHER, |
168 | 566 | fmt::format("FoldConstant:query_id={}", print_id(_query_id))); |
169 | 566 | _runtime_state = RuntimeState::create_unique(params, query_options, query_globals, |
170 | 566 | ExecEnv::GetInstance(), nullptr, _mem_tracker); |
171 | 566 | DescriptorTbl* desc_tbl = nullptr; |
172 | 566 | Status status = |
173 | 566 | DescriptorTbl::create(_runtime_state->obj_pool(), TDescriptorTable(), &desc_tbl); |
174 | 566 | if (UNLIKELY(!status.ok())) { |
175 | 0 | LOG(WARNING) << "Failed to create descriptor table, msg: " << status; |
176 | 0 | return status; |
177 | 0 | } |
178 | 566 | _runtime_state->set_desc_tbl(desc_tbl); |
179 | | |
180 | 566 | _runtime_profile = _runtime_state->runtime_profile(); |
181 | 566 | _runtime_profile->set_name("FoldConstantExpr"); |
182 | | |
183 | 566 | return Status::OK(); |
184 | 566 | } |
185 | | |
186 | | template <typename Context> |
187 | 566 | Status FoldConstantExecutor::_prepare_and_open(Context* ctx) { |
188 | 566 | RETURN_IF_ERROR(ctx->prepare(_runtime_state.get(), RowDescriptor())); |
189 | 566 | return ctx->open(_runtime_state.get()); |
190 | 566 | } |
191 | | |
192 | | Status FoldConstantExecutor::_get_result(void* src, size_t size, const DataTypePtr& type, |
193 | | const ColumnPtr column_ptr, const DataTypePtr column_type, |
194 | | std::string& result, |
195 | 0 | const DataTypeSerDe::FormatOptions& options) { |
196 | 0 | switch (type->get_primitive_type()) { |
197 | 0 | case TYPE_BOOLEAN: { |
198 | 0 | bool val = *reinterpret_cast<const bool*>(src); |
199 | 0 | result = val ? "true" : "false"; |
200 | 0 | break; |
201 | 0 | } |
202 | 0 | case TYPE_TINYINT: { |
203 | 0 | int8_t val = *reinterpret_cast<const int8_t*>(src); |
204 | 0 | result = fmt::format(FMT_COMPILE("{}"), val); |
205 | 0 | break; |
206 | 0 | } |
207 | 0 | case TYPE_SMALLINT: { |
208 | 0 | int16_t val = *reinterpret_cast<const int16_t*>(src); |
209 | 0 | result = fmt::format(FMT_COMPILE("{}"), val); |
210 | 0 | break; |
211 | 0 | } |
212 | 0 | case TYPE_INT: { |
213 | 0 | int32_t val = *reinterpret_cast<const int32_t*>(src); |
214 | 0 | result = fmt::format(FMT_COMPILE("{}"), val); |
215 | 0 | break; |
216 | 0 | } |
217 | 0 | case TYPE_BIGINT: { |
218 | 0 | int64_t val = *reinterpret_cast<const int64_t*>(src); |
219 | 0 | result = fmt::format(FMT_COMPILE("{}"), val); |
220 | 0 | break; |
221 | 0 | } |
222 | 0 | case TYPE_LARGEINT: { |
223 | 0 | result = LargeIntValue::to_string(*reinterpret_cast<__int128*>(src)); |
224 | 0 | break; |
225 | 0 | } |
226 | 0 | case TYPE_FLOAT: { |
227 | 0 | float val = *reinterpret_cast<const float*>(src); |
228 | 0 | result = fmt::format(FMT_COMPILE("{}"), val); |
229 | 0 | break; |
230 | 0 | } |
231 | 0 | case TYPE_DOUBLE: { |
232 | 0 | double val = *reinterpret_cast<double*>(src); |
233 | 0 | result = fmt::format(FMT_COMPILE("{}"), val); |
234 | 0 | break; |
235 | 0 | } |
236 | 0 | case TYPE_TIMEV2: { |
237 | 0 | constexpr static auto ratio_to_time = (1000 * 1000); |
238 | 0 | double val = *reinterpret_cast<double*>(src); |
239 | 0 | result = fmt::format(FMT_COMPILE("{}"), val / ratio_to_time); |
240 | 0 | break; |
241 | 0 | } |
242 | 0 | case TYPE_CHAR: |
243 | 0 | case TYPE_VARCHAR: |
244 | 0 | case TYPE_STRING: |
245 | 0 | case TYPE_HLL: |
246 | 0 | case TYPE_BITMAP: { |
247 | 0 | result = std::string((char*)src, size); |
248 | 0 | break; |
249 | 0 | } |
250 | 0 | case TYPE_DATE: |
251 | 0 | case TYPE_DATETIME: { |
252 | 0 | auto* date_value = reinterpret_cast<VecDateTimeValue*>(src); |
253 | 0 | result = CastToString::from_date_or_datetime(*date_value); |
254 | 0 | break; |
255 | 0 | } |
256 | 0 | case TYPE_DATEV2: { |
257 | 0 | DateV2Value<DateV2ValueType> value = |
258 | 0 | binary_cast<uint32_t, DateV2Value<DateV2ValueType>>(*(int32_t*)src); |
259 | 0 | result = CastToString::from_datev2(value); |
260 | 0 | break; |
261 | 0 | } |
262 | 0 | case TYPE_DATETIMEV2: { |
263 | 0 | DateV2Value<DateTimeV2ValueType> value = |
264 | 0 | binary_cast<uint64_t, DateV2Value<DateTimeV2ValueType>>(*(int64_t*)src); |
265 | 0 | result = CastToString::from_datetimev2(value, type->get_scale()); |
266 | 0 | break; |
267 | 0 | } |
268 | 0 | case TYPE_TIMESTAMPTZ: { |
269 | 0 | auto value = binary_cast<uint64_t, TimestampTzValue>(*(int64_t*)src); |
270 | 0 | result = CastToString::from_timestamptz(value, type->get_scale(), options.timezone); |
271 | 0 | break; |
272 | 0 | } |
273 | 0 | case TYPE_DECIMALV2: { |
274 | 0 | result = reinterpret_cast<DecimalV2Value*>(src)->to_string(type->get_scale()); |
275 | 0 | break; |
276 | 0 | } |
277 | 0 | case TYPE_DECIMAL32: |
278 | 0 | case TYPE_DECIMAL64: |
279 | 0 | case TYPE_DECIMAL128I: |
280 | 0 | case TYPE_DECIMAL256: |
281 | 0 | case TYPE_ARRAY: |
282 | 0 | case TYPE_JSONB: |
283 | 0 | case TYPE_MAP: |
284 | 0 | case TYPE_STRUCT: |
285 | 0 | case TYPE_VARIANT: |
286 | 0 | case TYPE_QUANTILE_STATE: |
287 | 0 | case TYPE_IPV4: |
288 | 0 | case TYPE_IPV6: { |
289 | 0 | result = column_type->to_string(*column_ptr, 0, options); |
290 | 0 | break; |
291 | 0 | } |
292 | 0 | default: |
293 | 0 | auto error_msg = |
294 | 0 | fmt::format("Type not implemented:{} need check it, and exec_query_id is: {}.", |
295 | 0 | type->get_name(), query_id_string()); |
296 | 0 | return Status::InternalError(error_msg); |
297 | 0 | } |
298 | 0 | return Status::OK(); |
299 | 0 | } |
300 | | |
301 | | } // namespace doris |