be/src/exprs/function/function_utility.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | #include <stddef.h> |
18 | | |
19 | | #include <boost/iterator/iterator_facade.hpp> |
20 | | // IWYU pragma: no_include <bits/chrono.h> |
21 | | #include <algorithm> |
22 | | #include <chrono> // IWYU pragma: keep |
23 | | #include <cmath> |
24 | | #include <limits> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | #include <thread> |
28 | | #include <utility> |
29 | | |
30 | | #include "common/status.h" |
31 | | #include "core/assert_cast.h" |
32 | | #include "core/block/block.h" |
33 | | #include "core/block/column_numbers.h" |
34 | | #include "core/block/column_with_type_and_name.h" |
35 | | #include "core/block/columns_with_type_and_name.h" |
36 | | #include "core/column/column.h" |
37 | | #include "core/column/column_const.h" |
38 | | #include "core/column/column_nullable.h" |
39 | | #include "core/column/column_string.h" |
40 | | #include "core/column/column_vector.h" |
41 | | #include "core/data_type/data_type.h" |
42 | | #include "core/data_type/data_type_nullable.h" |
43 | | #include "core/data_type/data_type_number.h" |
44 | | #include "core/data_type/data_type_string.h" |
45 | | #include "core/types.h" |
46 | | #include "exprs/aggregate/aggregate_function.h" |
47 | | #include "exprs/function/function.h" |
48 | | #include "exprs/function/simple_function_factory.h" |
49 | | |
50 | | namespace doris { |
51 | | class FunctionContext; |
52 | | } // namespace doris |
53 | | |
54 | | namespace doris { |
55 | | |
56 | | class FunctionSleep : public IFunction { |
57 | | public: |
58 | | static constexpr auto name = "sleep"; |
59 | 2 | static FunctionPtr create() { return std::make_shared<FunctionSleep>(); } |
60 | | |
61 | 1 | String get_name() const override { return name; } |
62 | | |
63 | 0 | size_t get_number_of_arguments() const override { return 1; } |
64 | | |
65 | 0 | DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
66 | 0 | if (arguments[0]->is_nullable()) { |
67 | 0 | return make_nullable(std::make_shared<DataTypeUInt8>()); |
68 | 0 | } |
69 | 0 | return std::make_shared<DataTypeUInt8>(); |
70 | 0 | } |
71 | | |
72 | 0 | bool use_default_implementation_for_nulls() const override { return false; } |
73 | | |
74 | | // Sleep function should not be executed during open stage, this will makes fragment prepare |
75 | | // waiting too long, so we do not use default impl. |
76 | 0 | bool use_default_implementation_for_constants() const override { return false; } |
77 | | |
78 | | Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
79 | 0 | uint32_t result, size_t input_rows_count) const override { |
80 | 0 | const auto& argument_column = |
81 | 0 | block.get_by_position(arguments[0]).column->convert_to_full_column_if_const(); |
82 | |
|
83 | 0 | auto res_column = ColumnUInt8::create(input_rows_count, 0); |
84 | |
|
85 | 0 | if (auto* nullable_column = check_and_get_column<ColumnNullable>(*argument_column)) { |
86 | 0 | auto null_map_column = ColumnUInt8::create(); |
87 | |
|
88 | 0 | auto nested_column = nullable_column->get_nested_column_ptr(); |
89 | 0 | auto data_column = assert_cast<const ColumnInt32*>(nested_column.get()); |
90 | |
|
91 | 0 | for (int i = 0; i < input_rows_count; i++) { |
92 | 0 | if (nullable_column->is_null_at(i)) { |
93 | 0 | null_map_column->insert(Field::create_field<TYPE_BOOLEAN>(1)); |
94 | 0 | } else { |
95 | 0 | int seconds = data_column->get_data()[i]; |
96 | 0 | std::this_thread::sleep_for(std::chrono::seconds(seconds)); |
97 | 0 | null_map_column->insert(Field::create_field<TYPE_BOOLEAN>(0)); |
98 | 0 | } |
99 | 0 | } |
100 | |
|
101 | 0 | block.replace_by_position(result, ColumnNullable::create(std::move(res_column), |
102 | 0 | std::move(null_map_column))); |
103 | 0 | } else { |
104 | 0 | auto data_column = assert_cast<const ColumnInt32*>(argument_column.get()); |
105 | |
|
106 | 0 | for (int i = 0; i < input_rows_count; i++) { |
107 | 0 | int seconds = data_column->get_element(i); |
108 | 0 | std::this_thread::sleep_for(std::chrono::seconds(seconds)); |
109 | 0 | } |
110 | |
|
111 | 0 | block.replace_by_position(result, std::move(res_column)); |
112 | 0 | } |
113 | 0 | return Status::OK(); |
114 | 0 | } |
115 | | }; |
116 | | |
117 | | class FunctionVersion : public IFunction { |
118 | | public: |
119 | | static constexpr auto name = "version"; |
120 | | |
121 | | static const std::string version; |
122 | | |
123 | 2 | static FunctionPtr create() { return std::make_shared<FunctionVersion>(); } |
124 | | |
125 | 1 | String get_name() const override { return name; } |
126 | | |
127 | 0 | size_t get_number_of_arguments() const override { return 0; } |
128 | | |
129 | 0 | DataTypePtr get_return_type_impl(const ColumnsWithTypeAndName& arguments) const override { |
130 | 0 | return std::make_shared<DataTypeString>(); |
131 | 0 | } |
132 | | |
133 | | Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
134 | 0 | uint32_t result, size_t input_rows_count) const override { |
135 | 0 | auto res_column = ColumnString::create(); |
136 | 0 | res_column->insert_data(version.c_str(), version.length()); |
137 | 0 | auto col_const = ColumnConst::create(std::move(res_column), input_rows_count); |
138 | 0 | block.replace_by_position(result, std::move(col_const)); |
139 | 0 | return Status::OK(); |
140 | 0 | } |
141 | | }; |
142 | | |
143 | | class FunctionHumanReadableSeconds : public IFunction { |
144 | | public: |
145 | | static constexpr auto name = "human_readable_seconds"; |
146 | | |
147 | 20 | static FunctionPtr create() { return std::make_shared<FunctionHumanReadableSeconds>(); } |
148 | | |
149 | 1 | String get_name() const override { return name; } |
150 | | |
151 | 18 | size_t get_number_of_arguments() const override { return 1; } |
152 | | |
153 | 18 | DataTypePtr get_return_type_impl(const DataTypes& /*arguments*/) const override { |
154 | 18 | return std::make_shared<DataTypeString>(); |
155 | 18 | } |
156 | | |
157 | | Status execute_impl(FunctionContext* /*context*/, Block& block, const ColumnNumbers& arguments, |
158 | 17 | uint32_t result, size_t input_rows_count) const override { |
159 | 17 | const auto& argument_column = block.get_by_position(arguments[0]).column; |
160 | 17 | const auto* data_column = check_and_get_column<ColumnFloat64>(*argument_column); |
161 | 17 | if (data_column == nullptr) { |
162 | 0 | return Status::InvalidArgument("Illegal column {} of first argument of function {}", |
163 | 0 | argument_column->get_name(), name); |
164 | 0 | } |
165 | | |
166 | 17 | auto result_column = ColumnString::create(); |
167 | 17 | result_column->reserve(input_rows_count); |
168 | 17 | std::string buffer; |
169 | 50 | for (size_t i = 0; i < input_rows_count; ++i) { |
170 | 33 | double value = data_column->get_element(i); |
171 | 33 | if (std::isnan(value) || std::isinf(value)) { |
172 | 0 | return Status::InvalidArgument("Invalid argument value {} for function {}", value, |
173 | 0 | name); |
174 | 0 | } |
175 | 33 | buffer.clear(); |
176 | 33 | to_human_readable(value, buffer); |
177 | 33 | result_column->insert_data(buffer.data(), buffer.size()); |
178 | 33 | } |
179 | | |
180 | 17 | block.replace_by_position(result, std::move(result_column)); |
181 | 17 | return Status::OK(); |
182 | 17 | } |
183 | | |
184 | | private: |
185 | | static void append_unit(std::string& out, int64_t value, const char* singular, |
186 | 69 | const char* plural) { |
187 | 69 | if (!out.empty()) { |
188 | 36 | out += ", "; |
189 | 36 | } |
190 | 69 | out += std::to_string(value); |
191 | 69 | out += ' '; |
192 | 69 | out += (value == 1 ? singular : plural); |
193 | 69 | } |
194 | | |
195 | 33 | static void to_human_readable(double seconds, std::string& out) { |
196 | | // Match Presto/Trino: round to whole seconds and ignore the sign. |
197 | | // Saturate at int64_t max for very large finite inputs. This both matches the |
198 | | // FE constant-folding path (Java Math.round saturates to Long.MAX_VALUE) and |
199 | | // avoids std::llround's domain error when the rounded value exceeds int64_t. |
200 | 33 | double abs_seconds = std::fabs(seconds); |
201 | 33 | int64_t remain; |
202 | 33 | if (abs_seconds >= static_cast<double>(std::numeric_limits<int64_t>::max())) { |
203 | 4 | remain = std::numeric_limits<int64_t>::max(); |
204 | 29 | } else { |
205 | 29 | remain = std::llround(abs_seconds); |
206 | 29 | } |
207 | | |
208 | 33 | constexpr int64_t WEEK = 7 * 24 * 60 * 60; |
209 | 33 | constexpr int64_t DAY = 24 * 60 * 60; |
210 | 33 | constexpr int64_t HOUR = 60 * 60; |
211 | 33 | constexpr int64_t MINUTE = 60; |
212 | | |
213 | 33 | const int64_t weeks = remain / WEEK; |
214 | 33 | remain %= WEEK; |
215 | 33 | const int64_t days = remain / DAY; |
216 | 33 | remain %= DAY; |
217 | 33 | const int64_t hours = remain / HOUR; |
218 | 33 | remain %= HOUR; |
219 | 33 | const int64_t minutes = remain / MINUTE; |
220 | 33 | const int64_t secs = remain % MINUTE; |
221 | | |
222 | 33 | if (weeks > 0) { |
223 | 8 | append_unit(out, weeks, "week", "weeks"); |
224 | 8 | } |
225 | 33 | if (days > 0) { |
226 | 6 | append_unit(out, days, "day", "days"); |
227 | 6 | } |
228 | 33 | if (hours > 0) { |
229 | 10 | append_unit(out, hours, "hour", "hours"); |
230 | 10 | } |
231 | 33 | if (minutes > 0) { |
232 | 16 | append_unit(out, minutes, "minute", "minutes"); |
233 | 16 | } |
234 | 33 | if (secs > 0 || out.empty()) { |
235 | 29 | append_unit(out, secs, "second", "seconds"); |
236 | 29 | } |
237 | 33 | } |
238 | | }; |
239 | | |
240 | | const std::string FunctionVersion::version = "5.7.99"; |
241 | | |
242 | 1 | void register_function_utility(SimpleFunctionFactory& factory) { |
243 | 1 | factory.register_function<FunctionSleep>(); |
244 | 1 | factory.register_function<FunctionVersion>(); |
245 | 1 | factory.register_function<FunctionHumanReadableSeconds>(); |
246 | 1 | } |
247 | | |
248 | | } // namespace doris |