be/src/exprs/function/function_variant_element.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include <glog/logging.h> |
19 | | #include <stddef.h> |
20 | | |
21 | | #include <memory> |
22 | | #include <ostream> |
23 | | #include <string> |
24 | | #include <string_view> |
25 | | #include <utility> |
26 | | #include <vector> |
27 | | |
28 | | #include "common/status.h" |
29 | | #include "core/assert_cast.h" |
30 | | #include "core/block/block.h" |
31 | | #include "core/column/column.h" |
32 | | #include "core/column/column_nullable.h" |
33 | | #include "core/column/column_string.h" |
34 | | #include "core/column/column_variant.h" |
35 | | #include "core/column/subcolumn_tree.h" |
36 | | #include "core/data_type/data_type.h" |
37 | | #include "core/data_type/data_type_nothing.h" |
38 | | #include "core/data_type/data_type_nullable.h" |
39 | | #include "core/data_type/data_type_string.h" |
40 | | #include "core/data_type/data_type_variant.h" |
41 | | #include "core/string_ref.h" |
42 | | #include "exprs/function/function.h" |
43 | | #include "exprs/function/function_helpers.h" |
44 | | #include "exprs/function/simple_function_factory.h" |
45 | | #include "exprs/json_functions.h" |
46 | | #include "simdjson.h" |
47 | | #include "util/defer_op.h" |
48 | | #include "util/json/path_in_data.h" |
49 | | |
50 | | namespace doris { |
51 | | |
52 | | class FunctionVariantElement : public IFunction { |
53 | | public: |
54 | | static constexpr auto name = "element_at"; |
55 | 142 | static FunctionPtr create() { return std::make_shared<FunctionVariantElement>(); } |
56 | | |
57 | | // Get function name. |
58 | 1 | String get_name() const override { return name; } |
59 | | |
60 | 272 | bool use_default_implementation_for_nulls() const override { return false; } |
61 | | |
62 | 133 | size_t get_number_of_arguments() const override { return 2; } |
63 | | |
64 | 139 | ColumnNumbers get_arguments_that_are_always_constant() const override { return {1}; } |
65 | | |
66 | 8 | DataTypes get_variadic_argument_types_impl() const override { |
67 | 8 | return {std::make_shared<DataTypeVariant>(), std::make_shared<DataTypeString>()}; |
68 | 8 | } |
69 | | |
70 | 133 | DataTypePtr get_return_type_impl(const DataTypes& arguments) const override { |
71 | 133 | DCHECK_EQ(arguments[0]->get_primitive_type(), TYPE_VARIANT) |
72 | 0 | << "First argument for function: " << name |
73 | 0 | << " should be DataTypeVariant but it has type " << arguments[0]->get_name() << "."; |
74 | 133 | DCHECK(is_string_type(arguments[1]->get_primitive_type())) |
75 | 0 | << "Second argument for function: " << name << " should be String but it has type " |
76 | 0 | << arguments[1]->get_name() << "."; |
77 | 133 | auto arg_variant = remove_nullable(arguments[0]); |
78 | 133 | const auto& data_type_object = assert_cast<const DataTypeVariant&>(*arg_variant); |
79 | 133 | return make_nullable( |
80 | 133 | std::make_shared<DataTypeVariant>(data_type_object.variant_max_subcolumns_count())); |
81 | 133 | } |
82 | | |
83 | | // wrap variant column with nullable |
84 | | // 1. if variant is null root(empty or nothing as root), then nullable map is all null |
85 | | // 2. if variant is scalar variant, then use the root's nullable map |
86 | | // 3. if variant is hierarchical variant, then create a nullable map with all none null |
87 | 139 | ColumnPtr wrap_variant_nullable(ColumnPtr col) const { |
88 | 139 | const auto& var = assert_cast<const ColumnVariant&>(*col); |
89 | 139 | if (var.is_null_root()) { |
90 | 20 | return make_nullable(col, true); |
91 | 20 | } |
92 | 119 | if (var.is_scalar_variant() && var.get_root()->is_nullable()) { |
93 | 44 | const auto* nullable = assert_cast<const ColumnNullable*>(var.get_root().get()); |
94 | 44 | return ColumnNullable::create( |
95 | 44 | col, nullable->get_null_map_column_ptr()->clone_resized(col->size())); |
96 | 44 | } |
97 | 75 | return make_nullable(col); |
98 | 119 | } |
99 | | |
100 | | Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments, |
101 | 139 | uint32_t result, size_t input_rows_count) const override { |
102 | 139 | const auto* variant_col = check_and_get_column<ColumnVariant>( |
103 | 139 | remove_nullable(block.get_by_position(arguments[0]).column).get()); |
104 | 139 | if (!variant_col) { |
105 | 0 | return Status::RuntimeError( |
106 | 0 | fmt::format("unsupported types for function {}({}, {})", get_name(), |
107 | 0 | block.get_by_position(arguments[0]).type->get_name(), |
108 | 0 | block.get_by_position(arguments[1]).type->get_name())); |
109 | 0 | } |
110 | 139 | if (block.empty()) { |
111 | 0 | block.replace_by_position(result, block.get_by_position(result).type->create_column()); |
112 | 0 | return Status::OK(); |
113 | 0 | } |
114 | | |
115 | 139 | auto index_column = block.get_by_position(arguments[1]).column; |
116 | 139 | ColumnPtr result_column; |
117 | 139 | RETURN_IF_ERROR(get_element_column(*variant_col, index_column, &result_column)); |
118 | 139 | if (block.get_by_position(result).type->is_nullable()) { |
119 | 139 | result_column = wrap_variant_nullable(result_column); |
120 | 139 | } |
121 | 139 | block.replace_by_position(result, result_column); |
122 | 139 | return Status::OK(); |
123 | 139 | } |
124 | | |
125 | | private: |
126 | | // Return sub-path by specified prefix. |
127 | | // For example, for prefix a.b: |
128 | | // a.b.c.d -> c.d, a.b.c -> c |
129 | | static std::optional<std::string_view> get_sub_path(const std::string_view& path, |
130 | 19 | const std::string_view& prefix) { |
131 | 19 | if (path.size() <= prefix.size() || path[prefix.size()] != '.') { |
132 | 16 | return std::nullopt; |
133 | 16 | } |
134 | 3 | return path.substr(prefix.size() + 1); |
135 | 19 | } |
136 | | |
137 | | // Extract and populate sparse column data with given path prefix |
138 | | // Copies data from source sparse column, extracting only the sub-paths that match the prefix |
139 | | static void _extract_sparse_column_from_source(ColumnVariant* src_ptr, const PathInData& path, |
140 | 10 | ColumnVariant::MutablePtr& target_ptr) { |
141 | 10 | ColumnVariant::Subcolumn root {0, true, true}; |
142 | | // no root, no sparse column |
143 | 10 | const auto& sparse_data_map = assert_cast<const ColumnMap&>(*src_ptr->get_sparse_column()); |
144 | 10 | const auto& src_sparse_data_offsets = sparse_data_map.get_offsets(); |
145 | 10 | const auto& src_sparse_data_paths = |
146 | 10 | assert_cast<const ColumnString&>(sparse_data_map.get_keys()); |
147 | 10 | const auto& src_sparse_data_values = |
148 | 10 | assert_cast<const ColumnString&>(sparse_data_map.get_values()); |
149 | 10 | auto& sparse_data_offsets = |
150 | 10 | assert_cast<ColumnMap&>(*target_ptr->get_sparse_column()->assume_mutable()) |
151 | 10 | .get_offsets(); |
152 | 10 | auto [sparse_data_paths, sparse_data_values] = |
153 | 10 | target_ptr->get_sparse_data_paths_and_values(); |
154 | 10 | StringRef prefix_ref(path.get_path()); |
155 | 10 | std::string_view path_prefix(prefix_ref.data, prefix_ref.size); |
156 | 124 | for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) { |
157 | 114 | size_t start = src_sparse_data_offsets[ssize_t(i) - 1]; |
158 | 114 | size_t end = src_sparse_data_offsets[ssize_t(i)]; |
159 | 114 | size_t lower_bound_index = ColumnVariant::find_path_lower_bound_in_sparse_data( |
160 | 114 | prefix_ref, src_sparse_data_paths, start, end); |
161 | 171 | for (; lower_bound_index != end; ++lower_bound_index) { |
162 | 70 | auto path_ref = src_sparse_data_paths.get_data_at(lower_bound_index); |
163 | 70 | std::string_view nested_path(path_ref.data, path_ref.size); |
164 | 70 | if (!nested_path.starts_with(path_prefix)) { |
165 | 13 | break; |
166 | 13 | } |
167 | | // Don't include path that is equal to the prefix. |
168 | 57 | if (nested_path.size() != path_prefix.size()) { |
169 | 18 | auto sub_path_optional = get_sub_path(nested_path, path_prefix); |
170 | 18 | if (!sub_path_optional.has_value()) { |
171 | 16 | continue; |
172 | 16 | } |
173 | 2 | std::string_view sub_path = *sub_path_optional; |
174 | 2 | sparse_data_paths->insert_data(sub_path.data(), sub_path.size()); |
175 | 2 | sparse_data_values->insert_from(src_sparse_data_values, lower_bound_index); |
176 | 39 | } else { |
177 | | // insert into root column, example: access v['b'] and b is in sparse column |
178 | | // data example: |
179 | | // {"b" : 123} |
180 | | // {"b" : {"c" : 456}} |
181 | | // b maybe in sparse column, and b.c is in subolumn, put `b` into root column to distinguish |
182 | | // from "" which is empty path and root |
183 | 39 | root.deserialize_from_binary_column(&src_sparse_data_values, lower_bound_index); |
184 | 39 | } |
185 | 57 | } |
186 | 114 | if (root.size() == sparse_data_offsets.size()) { |
187 | 75 | root.insert_default(); |
188 | 75 | } |
189 | 114 | sparse_data_offsets.push_back(sparse_data_paths->size()); |
190 | 114 | } |
191 | 10 | target_ptr->get_subcolumns().create_root(root); |
192 | 10 | target_ptr->get_doc_value_column()->assume_mutable()->resize(src_ptr->size()); |
193 | 10 | target_ptr->set_num_rows(src_ptr->size()); |
194 | 10 | } |
195 | | |
196 | | // Extract and populate sparse column data from doc_value column with given path prefix |
197 | | // Copies data from source doc_value column, extracting only the sub-paths that match the prefix |
198 | | static void _extract_doc_value_column_from_source(ColumnVariant* src_ptr, |
199 | | const PathInData& path, |
200 | 48 | ColumnVariant::MutablePtr& target_ptr) { |
201 | 48 | ColumnVariant::Subcolumn root {0, true, true}; |
202 | | // no root, no sparse column |
203 | 48 | const auto& doc_value_data_map = |
204 | 48 | assert_cast<const ColumnMap&>(*src_ptr->get_doc_value_column()); |
205 | 48 | const auto& src_doc_value_data_offsets = doc_value_data_map.get_offsets(); |
206 | 48 | const auto& src_doc_value_data_paths = |
207 | 48 | assert_cast<const ColumnString&>(doc_value_data_map.get_keys()); |
208 | 48 | const auto& src_doc_value_data_values = |
209 | 48 | assert_cast<const ColumnString&>(doc_value_data_map.get_values()); |
210 | 48 | auto& sparse_data_offsets = |
211 | 48 | assert_cast<ColumnMap&>(*target_ptr->get_sparse_column()->assume_mutable()) |
212 | 48 | .get_offsets(); |
213 | 48 | auto [sparse_data_paths, sparse_data_values] = |
214 | 48 | target_ptr->get_sparse_data_paths_and_values(); |
215 | 48 | StringRef prefix_ref(path.get_path()); |
216 | 48 | std::string_view path_prefix(prefix_ref.data, prefix_ref.size); |
217 | 96 | for (size_t i = 0; i != src_doc_value_data_offsets.size(); ++i) { |
218 | 48 | size_t start = src_doc_value_data_offsets[ssize_t(i) - 1]; |
219 | 48 | size_t end = src_doc_value_data_offsets[ssize_t(i)]; |
220 | 48 | size_t lower_bound_index = ColumnVariant::find_path_lower_bound_in_sparse_data( |
221 | 48 | prefix_ref, src_doc_value_data_paths, start, end); |
222 | 77 | for (; lower_bound_index != end; ++lower_bound_index) { |
223 | 60 | auto path_ref = src_doc_value_data_paths.get_data_at(lower_bound_index); |
224 | 60 | std::string_view nested_path(path_ref.data, path_ref.size); |
225 | 60 | if (!nested_path.starts_with(path_prefix)) { |
226 | 31 | break; |
227 | 31 | } |
228 | | // Don't include path that is equal to the prefix. |
229 | 29 | if (nested_path.size() != path_prefix.size()) { |
230 | 1 | auto sub_path_optional = get_sub_path(nested_path, path_prefix); |
231 | 1 | if (!sub_path_optional.has_value()) { |
232 | 0 | continue; |
233 | 0 | } |
234 | 1 | std::string_view sub_path = *sub_path_optional; |
235 | 1 | sparse_data_paths->insert_data(sub_path.data(), sub_path.size()); |
236 | 1 | sparse_data_values->insert_from(src_doc_value_data_values, lower_bound_index); |
237 | 28 | } else { |
238 | | // insert into root column, example: access v['b'] and b is in sparse column |
239 | | // data example: |
240 | | // {"b" : 123} |
241 | | // {"b" : {"c" : 456}} |
242 | | // b maybe in sparse column, and b.c is in subolumn, put `b` into root column to distinguish |
243 | | // from "" which is empty path and root |
244 | 28 | root.deserialize_from_binary_column(&src_doc_value_data_values, |
245 | 28 | lower_bound_index); |
246 | 28 | } |
247 | 29 | } |
248 | 48 | if (root.size() == sparse_data_offsets.size()) { |
249 | 20 | root.insert_default(); |
250 | 20 | } |
251 | 48 | sparse_data_offsets.push_back(sparse_data_paths->size()); |
252 | 48 | } |
253 | 48 | target_ptr->get_subcolumns().create_root(root); |
254 | 48 | target_ptr->get_doc_value_column()->assume_mutable()->resize(src_ptr->size()); |
255 | 48 | target_ptr->set_num_rows(src_ptr->size()); |
256 | 48 | } |
257 | | |
258 | | static Status get_element_column(const ColumnVariant& src, const ColumnPtr& index_column, |
259 | 140 | ColumnPtr* result) { |
260 | 140 | std::string field_name = index_column->get_data_at(0).to_string(); |
261 | 140 | if (src.empty()) { |
262 | 0 | *result = ColumnVariant::create(src.max_subcolumns_count()); |
263 | | // src subcolumns empty but src row count may not be 0 |
264 | 0 | (*result)->assume_mutable()->insert_many_defaults(src.size()); |
265 | | // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure |
266 | 0 | (*result)->assume_mutable()->finalize(); |
267 | 0 | return Status::OK(); |
268 | 0 | } |
269 | 140 | if (src.is_scalar_variant() && is_string_type(src.get_root_type()->get_primitive_type())) { |
270 | | // use parser to extract from root |
271 | 75 | auto type = std::make_shared<DataTypeString>(); |
272 | 75 | MutableColumnPtr result_column = type->create_column(); |
273 | 75 | const ColumnString& docs = |
274 | 75 | *check_and_get_column<ColumnString>(remove_nullable(src.get_root()).get()); |
275 | 75 | simdjson::ondemand::parser parser; |
276 | 75 | std::vector<JsonPath> parsed_paths; |
277 | 75 | if (field_name.empty() || field_name[0] != '$') { |
278 | 75 | field_name = "$." + field_name; |
279 | 75 | } |
280 | 75 | JsonFunctions::parse_json_paths(field_name, &parsed_paths); |
281 | 75 | ColumnString* col_str = assert_cast<ColumnString*>(result_column.get()); |
282 | 151 | for (size_t i = 0; i < docs.size(); ++i) { |
283 | 76 | if (!extract_from_document(parser, docs.get_data_at(i), parsed_paths, col_str)) { |
284 | 15 | VLOG_DEBUG << "failed to parse " << docs.get_data_at(i) << ", field " |
285 | 0 | << field_name; |
286 | 15 | result_column->insert_default(); |
287 | 15 | } |
288 | 76 | } |
289 | 75 | *result = ColumnVariant::create(src.max_subcolumns_count(), type, |
290 | 75 | std::move(result_column)); |
291 | 75 | (*result)->assume_mutable()->finalize(); |
292 | 75 | return Status::OK(); |
293 | 75 | } else { |
294 | 65 | auto mutable_src = src.clone_finalized(); |
295 | 65 | auto* mutable_ptr = assert_cast<ColumnVariant*>(mutable_src.get()); |
296 | 65 | PathInData path(field_name); |
297 | 65 | ColumnVariant::Subcolumns subcolumns = mutable_ptr->get_subcolumns(); |
298 | 65 | const auto* node = subcolumns.find_exact(path); |
299 | 65 | MutableColumnPtr result_col = ColumnVariant::create(src.max_subcolumns_count()); |
300 | 65 | ColumnVariant::Subcolumns new_subcolumns; |
301 | | |
302 | 65 | if (node != nullptr) { |
303 | 7 | std::vector<decltype(node)> nodes; |
304 | 7 | PathsInData paths; |
305 | 7 | ColumnVariant::Subcolumns::get_leaves_of_node(node, nodes, paths); |
306 | 7 | for (const auto* n : nodes) { |
307 | 7 | PathInData new_path = n->path.copy_pop_front(); |
308 | 7 | VLOG_DEBUG << "add node " << new_path.get_path() |
309 | 0 | << ", data size: " << n->data.size() |
310 | 0 | << ", finalized size: " << n->data.get_finalized_column().size() |
311 | 0 | << ", common type: " << n->data.get_least_common_type()->get_name(); |
312 | | // if new_path is empty, indicate it's the root column, but adding a root will return false when calling add |
313 | 7 | if (!new_subcolumns.add(new_path, n->data)) { |
314 | 7 | VLOG_DEBUG << "failed to add node " << new_path.get_path(); |
315 | 7 | } |
316 | 7 | } |
317 | | |
318 | | // handle the root node |
319 | 7 | if (new_subcolumns.empty() && !nodes.empty()) { |
320 | 7 | CHECK_EQ(nodes.size(), 1); |
321 | 7 | new_subcolumns.create_root(ColumnVariant::Subcolumn { |
322 | 7 | nodes[0]->data.get_finalized_column_ptr()->assume_mutable(), |
323 | 7 | nodes[0]->data.get_least_common_type(), true, true}); |
324 | 7 | auto container = ColumnVariant::create(src.max_subcolumns_count(), |
325 | 7 | std::move(new_subcolumns)); |
326 | 7 | result_col->insert_range_from(*container, 0, container->size()); |
327 | 7 | } else { |
328 | 0 | auto container = ColumnVariant::create(src.max_subcolumns_count(), |
329 | 0 | std::move(new_subcolumns)); |
330 | 0 | container->clear_sparse_column(); |
331 | 0 | _extract_sparse_column_from_source(mutable_ptr, path, container); |
332 | 0 | result_col->insert_range_from(*container, 0, container->size()); |
333 | 0 | } |
334 | 58 | } else { |
335 | 58 | auto container = ColumnVariant::create(src.max_subcolumns_count(), |
336 | 58 | std::move(new_subcolumns)); |
337 | 58 | const auto& sparse_offsets = mutable_ptr->serialized_sparse_column_offsets(); |
338 | 58 | if (sparse_offsets.back() == sparse_offsets[-1]) { |
339 | 48 | _extract_doc_value_column_from_source(mutable_ptr, path, container); |
340 | 48 | } else { |
341 | 10 | _extract_sparse_column_from_source(mutable_ptr, path, container); |
342 | 10 | } |
343 | 58 | result_col->insert_range_from(*container, 0, container->size()); |
344 | 58 | } |
345 | 65 | *result = result_col->get_ptr(); |
346 | | // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure |
347 | 65 | (*result)->assume_mutable()->finalize(); |
348 | 65 | VLOG_DEBUG << "dump new object " |
349 | 0 | << static_cast<const ColumnVariant*>(result_col.get())->debug_string() |
350 | 0 | << ", path " << path.get_path(); |
351 | 65 | return Status::OK(); |
352 | 65 | } |
353 | 140 | } |
354 | | |
355 | | static Status extract_from_document(simdjson::ondemand::parser& parser, const StringRef& doc, |
356 | 76 | const std::vector<JsonPath>& paths, ColumnString* column) { |
357 | 76 | try { |
358 | 76 | simdjson::padded_string json_str {doc.data, doc.size}; |
359 | 76 | simdjson::ondemand::document document = parser.iterate(json_str); |
360 | 76 | simdjson::ondemand::object object = document.get_object(); |
361 | 76 | simdjson::ondemand::value value; |
362 | 76 | RETURN_IF_ERROR(JsonFunctions::extract_from_object(object, paths, &value)); |
363 | 66 | _write_data_to_column(value, column); |
364 | 66 | } catch (simdjson::simdjson_error& e) { |
365 | 5 | VLOG_DEBUG << "simdjson parse exception: " << e.what(); |
366 | 5 | return Status::DataQualityError("simdjson parse exception {}", e.what()); |
367 | 5 | } |
368 | 61 | return Status::OK(); |
369 | 76 | } |
370 | | |
371 | 61 | static void _write_data_to_column(simdjson::ondemand::value& value, ColumnString* column) { |
372 | 61 | switch (value.type()) { |
373 | 0 | case simdjson::ondemand::json_type::null: { |
374 | 0 | column->insert_default(); |
375 | 0 | break; |
376 | 0 | } |
377 | 0 | case simdjson::ondemand::json_type::boolean: { |
378 | 0 | if (value.get_bool()) { |
379 | 0 | column->insert_data("1", 1); |
380 | 0 | } else { |
381 | 0 | column->insert_data("0", 1); |
382 | 0 | } |
383 | 0 | break; |
384 | 0 | } |
385 | 61 | default: { |
386 | 61 | auto value_str = simdjson::to_json_string(value).value(); |
387 | 61 | column->insert_data(value_str.data(), value_str.length()); |
388 | 61 | } |
389 | 61 | } |
390 | 61 | } |
391 | | }; |
392 | | |
393 | 8 | void register_function_variant_element(SimpleFunctionFactory& factory) { |
394 | 8 | factory.register_function<FunctionVariantElement>(); |
395 | 8 | } |
396 | | |
397 | | } // namespace doris |