Coverage Report

Created: 2026-05-09 09:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exprs/function/function_variant_element.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include <glog/logging.h>
19
#include <stddef.h>
20
21
#include <memory>
22
#include <ostream>
23
#include <string>
24
#include <string_view>
25
#include <utility>
26
#include <vector>
27
28
#include "common/status.h"
29
#include "core/assert_cast.h"
30
#include "core/block/block.h"
31
#include "core/column/column.h"
32
#include "core/column/column_nullable.h"
33
#include "core/column/column_string.h"
34
#include "core/column/column_variant.h"
35
#include "core/column/subcolumn_tree.h"
36
#include "core/data_type/data_type.h"
37
#include "core/data_type/data_type_nothing.h"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type/data_type_string.h"
40
#include "core/data_type/data_type_variant.h"
41
#include "core/string_ref.h"
42
#include "exprs/function/function.h"
43
#include "exprs/function/function_helpers.h"
44
#include "exprs/function/simple_function_factory.h"
45
#include "exprs/json_functions.h"
46
#include "simdjson.h"
47
#include "util/defer_op.h"
48
#include "util/json/path_in_data.h"
49
50
namespace doris {
51
52
class FunctionVariantElement : public IFunction {
53
public:
54
    static constexpr auto name = "element_at";
55
136
    static FunctionPtr create() { return std::make_shared<FunctionVariantElement>(); }
56
57
    // Get function name.
58
1
    String get_name() const override { return name; }
59
60
254
    bool use_default_implementation_for_nulls() const override { return false; }
61
62
127
    size_t get_number_of_arguments() const override { return 2; }
63
64
127
    ColumnNumbers get_arguments_that_are_always_constant() const override { return {1}; }
65
66
8
    DataTypes get_variadic_argument_types_impl() const override {
67
8
        return {std::make_shared<DataTypeVariant>(), std::make_shared<DataTypeString>()};
68
8
    }
69
70
127
    DataTypePtr get_return_type_impl(const DataTypes& arguments) const override {
71
127
        DCHECK_EQ(arguments[0]->get_primitive_type(), TYPE_VARIANT)
72
0
                << "First argument for function: " << name
73
0
                << " should be DataTypeVariant but it has type " << arguments[0]->get_name() << ".";
74
127
        DCHECK(is_string_type(arguments[1]->get_primitive_type()))
75
0
                << "Second argument for function: " << name << " should be String but it has type "
76
0
                << arguments[1]->get_name() << ".";
77
127
        auto arg_variant = remove_nullable(arguments[0]);
78
127
        const auto& data_type_object = assert_cast<const DataTypeVariant&>(*arg_variant);
79
127
        return make_nullable(
80
127
                std::make_shared<DataTypeVariant>(data_type_object.variant_max_subcolumns_count(),
81
127
                                                  data_type_object.enable_doc_mode()));
82
127
    }
83
84
    // wrap variant column with nullable
85
    // 1. if variant is null root(empty or nothing as root), then nullable map is all null
86
    // 2. if variant is scalar variant, then use the root's nullable map
87
    // 3. if variant is hierarchical variant, then create a nullable map with all none null
88
127
    ColumnPtr wrap_variant_nullable(ColumnPtr col) const {
89
127
        const auto& var = assert_cast<const ColumnVariant&>(*col);
90
127
        if (var.is_null_root()) {
91
20
            return make_nullable(col, true);
92
20
        }
93
107
        if (var.is_scalar_variant() && var.get_root()->is_nullable()) {
94
33
            const auto* nullable = assert_cast<const ColumnNullable*>(var.get_root().get());
95
33
            return ColumnNullable::create(
96
33
                    col, nullable->get_null_map_column_ptr()->clone_resized(col->size()));
97
33
        }
98
74
        return make_nullable(col);
99
107
    }
100
101
    Status execute_impl(FunctionContext* context, Block& block, const ColumnNumbers& arguments,
102
127
                        uint32_t result, size_t input_rows_count) const override {
103
127
        const auto* variant_col = check_and_get_column<ColumnVariant>(
104
127
                remove_nullable(block.get_by_position(arguments[0]).column).get());
105
127
        if (!variant_col) {
106
0
            return Status::RuntimeError(
107
0
                    fmt::format("unsupported types for function {}({}, {})", get_name(),
108
0
                                block.get_by_position(arguments[0]).type->get_name(),
109
0
                                block.get_by_position(arguments[1]).type->get_name()));
110
0
        }
111
127
        if (block.empty()) {
112
0
            block.replace_by_position(result, block.get_by_position(result).type->create_column());
113
0
            return Status::OK();
114
0
        }
115
116
127
        auto index_column = block.get_by_position(arguments[1]).column;
117
127
        ColumnPtr result_column;
118
127
        RETURN_IF_ERROR(get_element_column(*variant_col, index_column, &result_column));
119
127
        if (block.get_by_position(result).type->is_nullable()) {
120
127
            result_column = wrap_variant_nullable(result_column);
121
127
        }
122
127
        block.replace_by_position(result, result_column);
123
127
        return Status::OK();
124
127
    }
125
126
private:
127
    // Return sub-path by specified prefix.
128
    // For example, for prefix a.b:
129
    // a.b.c.d -> c.d, a.b.c -> c
130
    static std::optional<std::string_view> get_sub_path(const std::string_view& path,
131
3
                                                        const std::string_view& prefix) {
132
3
        if (path.size() <= prefix.size() || path[prefix.size()] != '.') {
133
1
            return std::nullopt;
134
1
        }
135
2
        return path.substr(prefix.size() + 1);
136
3
    }
137
138
    // Extract and populate sparse column data with given path prefix
139
    // Copies data from source sparse column, extracting only the sub-paths that match the prefix
140
    static void _extract_sparse_column_from_source(ColumnVariant* src_ptr, const PathInData& path,
141
2
                                                   ColumnVariant::MutablePtr& target_ptr) {
142
2
        ColumnVariant::Subcolumn root {0, true, true};
143
        // no root, no sparse column
144
2
        const auto& sparse_data_map = assert_cast<const ColumnMap&>(*src_ptr->get_sparse_column());
145
2
        const auto& src_sparse_data_offsets = sparse_data_map.get_offsets();
146
2
        const auto& src_sparse_data_paths =
147
2
                assert_cast<const ColumnString&>(sparse_data_map.get_keys());
148
2
        const auto& src_sparse_data_values =
149
2
                assert_cast<const ColumnString&>(sparse_data_map.get_values());
150
2
        auto& sparse_data_offsets =
151
2
                assert_cast<ColumnMap&>(*target_ptr->get_sparse_column()->assume_mutable())
152
2
                        .get_offsets();
153
2
        auto [sparse_data_paths, sparse_data_values] =
154
2
                target_ptr->get_sparse_data_paths_and_values();
155
2
        StringRef prefix_ref(path.get_path());
156
2
        std::string_view path_prefix(prefix_ref.data, prefix_ref.size);
157
4
        for (size_t i = 0; i != src_sparse_data_offsets.size(); ++i) {
158
2
            size_t start = src_sparse_data_offsets[ssize_t(i) - 1];
159
2
            size_t end = src_sparse_data_offsets[ssize_t(i)];
160
2
            size_t lower_bound_index = ColumnVariant::find_path_lower_bound_in_sparse_data(
161
2
                    prefix_ref, src_sparse_data_paths, start, end);
162
5
            for (; lower_bound_index != end; ++lower_bound_index) {
163
3
                auto path_ref = src_sparse_data_paths.get_data_at(lower_bound_index);
164
3
                std::string_view nested_path(path_ref.data, path_ref.size);
165
3
                if (!nested_path.starts_with(path_prefix)) {
166
0
                    break;
167
0
                }
168
                // Don't include path that is equal to the prefix.
169
3
                if (nested_path.size() != path_prefix.size()) {
170
3
                    auto sub_path_optional = get_sub_path(nested_path, path_prefix);
171
3
                    if (!sub_path_optional.has_value()) {
172
1
                        continue;
173
1
                    }
174
2
                    std::string_view sub_path = *sub_path_optional;
175
2
                    sparse_data_paths->insert_data(sub_path.data(), sub_path.size());
176
2
                    sparse_data_values->insert_from(src_sparse_data_values, lower_bound_index);
177
2
                } else {
178
                    // insert into root column, example:  access v['b'] and b is in sparse column
179
                    // data example:
180
                    // {"b" : 123}
181
                    // {"b" : {"c" : 456}}
182
                    // b maybe in sparse column, and b.c is in subolumn, put `b` into root column to distinguish
183
                    // from "" which is empty path and root
184
0
                    root.deserialize_from_binary_column(&src_sparse_data_values, lower_bound_index);
185
0
                }
186
3
            }
187
2
            if (root.size() == sparse_data_offsets.size()) {
188
2
                root.insert_default();
189
2
            }
190
2
            sparse_data_offsets.push_back(sparse_data_paths->size());
191
2
        }
192
2
        target_ptr->get_subcolumns().create_root(root);
193
2
        target_ptr->get_doc_value_column()->assume_mutable()->resize(src_ptr->size());
194
2
        target_ptr->set_num_rows(src_ptr->size());
195
2
    }
196
197
    // Extract and populate sparse column data from doc_value column with given path prefix
198
    // Copies data from source doc_value column, extracting only the sub-paths that match the prefix
199
    static void _extract_doc_value_column_from_source(ColumnVariant* src_ptr,
200
                                                      const PathInData& path,
201
21
                                                      ColumnVariant::MutablePtr& target_ptr) {
202
21
        ColumnVariant::Subcolumn root {0, true, true};
203
21
        const auto& doc_value_data_map =
204
21
                assert_cast<const ColumnMap&>(*src_ptr->get_doc_value_column());
205
21
        const auto& src_doc_value_data_offsets = doc_value_data_map.get_offsets();
206
21
        const auto& src_doc_value_data_paths =
207
21
                assert_cast<const ColumnString&>(doc_value_data_map.get_keys());
208
21
        const auto& src_doc_value_data_values =
209
21
                assert_cast<const ColumnString&>(doc_value_data_map.get_values());
210
21
        const bool write_to_doc_value = target_ptr->enable_doc_mode();
211
        // Ordinary Variant extraction keeps the selected prefix in sparse data, matching the
212
        // source branch behavior. Only doc-mode columns keep extracted data in doc_value.
213
21
        auto& extracted_offsets =
214
21
                assert_cast<ColumnMap&>(*(write_to_doc_value ? target_ptr->get_doc_value_column()
215
21
                                                             : target_ptr->get_sparse_column())
216
21
                                                 ->assume_mutable())
217
21
                        .get_offsets();
218
21
        auto [extracted_paths, extracted_values] =
219
21
                write_to_doc_value ? target_ptr->get_doc_value_data_paths_and_values()
220
21
                                   : target_ptr->get_sparse_data_paths_and_values();
221
21
        StringRef prefix_ref(path.get_path());
222
21
        std::string_view path_prefix(prefix_ref.data, prefix_ref.size);
223
42
        for (size_t i = 0; i != src_doc_value_data_offsets.size(); ++i) {
224
21
            size_t start = src_doc_value_data_offsets[ssize_t(i) - 1];
225
21
            size_t end = src_doc_value_data_offsets[ssize_t(i)];
226
21
            size_t lower_bound_index = ColumnVariant::find_path_lower_bound_in_sparse_data(
227
21
                    prefix_ref, src_doc_value_data_paths, start, end);
228
23
            for (; lower_bound_index != end; ++lower_bound_index) {
229
2
                auto path_ref = src_doc_value_data_paths.get_data_at(lower_bound_index);
230
2
                std::string_view nested_path(path_ref.data, path_ref.size);
231
2
                if (!nested_path.starts_with(path_prefix)) {
232
0
                    break;
233
0
                }
234
2
                if (nested_path.size() != path_prefix.size()) {
235
0
                    auto sub_path_optional = get_sub_path(nested_path, path_prefix);
236
0
                    if (!sub_path_optional.has_value()) {
237
0
                        continue;
238
0
                    }
239
0
                    std::string_view sub_path = *sub_path_optional;
240
0
                    extracted_paths->insert_data(sub_path.data(), sub_path.size());
241
0
                    extracted_values->insert_from(src_doc_value_data_values, lower_bound_index);
242
2
                } else {
243
2
                    root.deserialize_from_binary_column(&src_doc_value_data_values,
244
2
                                                        lower_bound_index);
245
2
                }
246
2
            }
247
21
            if (root.size() == extracted_offsets.size()) {
248
19
                root.insert_default();
249
19
            }
250
21
            extracted_offsets.push_back(extracted_paths->size());
251
21
        }
252
21
        target_ptr->get_subcolumns().create_root(root);
253
21
        if (write_to_doc_value) {
254
2
            target_ptr->get_sparse_column()->assume_mutable()->resize(src_ptr->size());
255
19
        } else {
256
19
            target_ptr->get_doc_value_column()->assume_mutable()->resize(src_ptr->size());
257
19
        }
258
21
        target_ptr->set_num_rows(src_ptr->size());
259
21
    }
260
261
    static Status get_element_column(const ColumnVariant& src, const ColumnPtr& index_column,
262
128
                                     ColumnPtr* result) {
263
128
        std::string field_name = index_column->get_data_at(0).to_string();
264
128
        if (src.empty()) {
265
0
            *result = ColumnVariant::create(src.max_subcolumns_count(), src.enable_doc_mode());
266
            // src subcolumns empty but src row count may not be 0
267
0
            (*result)->assume_mutable()->insert_many_defaults(src.size());
268
            // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure
269
0
            (*result)->assume_mutable()->finalize();
270
0
            return Status::OK();
271
0
        }
272
128
        if (src.is_scalar_variant() && is_string_type(src.get_root_type()->get_primitive_type())) {
273
            // use parser to extract from root
274
74
            auto type = std::make_shared<DataTypeString>();
275
74
            MutableColumnPtr result_column = type->create_column();
276
74
            const ColumnString& docs =
277
74
                    *check_and_get_column<ColumnString>(remove_nullable(src.get_root()).get());
278
74
            simdjson::ondemand::parser parser;
279
74
            std::vector<JsonPath> parsed_paths;
280
74
            if (field_name.empty() || field_name[0] != '$') {
281
74
                field_name = "$." + field_name;
282
74
            }
283
74
            JsonFunctions::parse_json_paths(field_name, &parsed_paths);
284
74
            ColumnString* col_str = static_cast<ColumnString*>(result_column.get());
285
149
            for (size_t i = 0; i < docs.size(); ++i) {
286
75
                if (!extract_from_document(parser, docs.get_data_at(i), parsed_paths, col_str)) {
287
15
                    VLOG_DEBUG << "failed to parse " << docs.get_data_at(i) << ", field "
288
0
                               << field_name;
289
15
                    result_column->insert_default();
290
15
                }
291
75
            }
292
74
            *result = ColumnVariant::create(src.max_subcolumns_count(), src.enable_doc_mode(), type,
293
74
                                            std::move(result_column));
294
74
            (*result)->assume_mutable()->finalize();
295
74
            return Status::OK();
296
74
        } else {
297
54
            auto mutable_src = src.clone_finalized();
298
54
            auto* mutable_ptr = assert_cast<ColumnVariant*>(mutable_src.get());
299
54
            PathInData path(field_name);
300
54
            ColumnVariant::Subcolumns subcolumns = mutable_ptr->get_subcolumns();
301
54
            const auto* node = subcolumns.find_exact(path);
302
54
            MutableColumnPtr result_col =
303
54
                    ColumnVariant::create(src.max_subcolumns_count(), src.enable_doc_mode());
304
54
            ColumnVariant::Subcolumns new_subcolumns;
305
306
54
            if (node != nullptr) {
307
32
                std::vector<decltype(node)> nodes;
308
32
                PathsInData paths;
309
32
                ColumnVariant::Subcolumns::get_leaves_of_node(node, nodes, paths);
310
32
                for (const auto* n : nodes) {
311
32
                    PathInData new_path = n->path.copy_pop_front();
312
32
                    VLOG_DEBUG << "add node " << new_path.get_path()
313
0
                               << ", data size: " << n->data.size()
314
0
                               << ", finalized size: " << n->data.get_finalized_column().size()
315
0
                               << ", common type: " << n->data.get_least_common_type()->get_name();
316
                    // if new_path is empty, indicate it's the root column, but adding a root will return false when calling add
317
32
                    if (!new_subcolumns.add(new_path, n->data)) {
318
31
                        VLOG_DEBUG << "failed to add node " << new_path.get_path();
319
31
                    }
320
32
                }
321
322
                // handle the root node
323
32
                if (new_subcolumns.empty() && !nodes.empty()) {
324
31
                    CHECK_EQ(nodes.size(), 1);
325
31
                    new_subcolumns.create_root(ColumnVariant::Subcolumn {
326
31
                            nodes[0]->data.get_finalized_column_ptr()->assume_mutable(),
327
31
                            nodes[0]->data.get_least_common_type(), true, true});
328
31
                    auto container =
329
31
                            ColumnVariant::create(src.max_subcolumns_count(), src.enable_doc_mode(),
330
31
                                                  std::move(new_subcolumns));
331
31
                    result_col->insert_range_from(*container, 0, container->size());
332
31
                } else {
333
1
                    auto container =
334
1
                            ColumnVariant::create(src.max_subcolumns_count(), src.enable_doc_mode(),
335
1
                                                  std::move(new_subcolumns));
336
1
                    container->clear_sparse_column();
337
1
                    _extract_sparse_column_from_source(mutable_ptr, path, container);
338
1
                    result_col->insert_range_from(*container, 0, container->size());
339
1
                }
340
32
            } else {
341
22
                auto container =
342
22
                        ColumnVariant::create(src.max_subcolumns_count(), src.enable_doc_mode(),
343
22
                                              std::move(new_subcolumns));
344
22
                const auto& sparse_offsets = mutable_ptr->serialized_sparse_column_offsets();
345
22
                if (sparse_offsets.back() == sparse_offsets[-1]) {
346
21
                    _extract_doc_value_column_from_source(mutable_ptr, path, container);
347
21
                } else {
348
1
                    _extract_sparse_column_from_source(mutable_ptr, path, container);
349
1
                }
350
22
                result_col->insert_range_from(*container, 0, container->size());
351
22
            }
352
54
            *result = result_col->get_ptr();
353
            // ColumnVariant should be finalized before parsing, finalize maybe modify original column structure
354
54
            (*result)->assume_mutable()->finalize();
355
54
            VLOG_DEBUG << "dump new object "
356
0
                       << static_cast<const ColumnVariant*>(result_col.get())->debug_string()
357
0
                       << ", path " << path.get_path();
358
54
            return Status::OK();
359
54
        }
360
128
    }
361
362
    static Status extract_from_document(simdjson::ondemand::parser& parser, const StringRef& doc,
363
75
                                        const std::vector<JsonPath>& paths, ColumnString* column) {
364
75
        try {
365
75
            simdjson::padded_string json_str {doc.data, doc.size};
366
75
            simdjson::ondemand::document document = parser.iterate(json_str);
367
75
            simdjson::ondemand::object object = document.get_object();
368
75
            simdjson::ondemand::value value;
369
75
            RETURN_IF_ERROR(JsonFunctions::extract_from_object(object, paths, &value));
370
65
            _write_data_to_column(value, column);
371
65
        } catch (simdjson::simdjson_error& e) {
372
5
            VLOG_DEBUG << "simdjson parse exception: " << e.what();
373
5
            return Status::DataQualityError("simdjson parse exception {}", e.what());
374
5
        }
375
60
        return Status::OK();
376
75
    }
377
378
60
    static void _write_data_to_column(simdjson::ondemand::value& value, ColumnString* column) {
379
60
        switch (value.type()) {
380
0
        case simdjson::ondemand::json_type::null: {
381
0
            column->insert_default();
382
0
            break;
383
0
        }
384
0
        case simdjson::ondemand::json_type::boolean: {
385
0
            if (value.get_bool()) {
386
0
                column->insert_data("1", 1);
387
0
            } else {
388
0
                column->insert_data("0", 1);
389
0
            }
390
0
            break;
391
0
        }
392
60
        default: {
393
60
            auto value_str = simdjson::to_json_string(value).value();
394
60
            column->insert_data(value_str.data(), value_str.length());
395
60
        }
396
60
        }
397
60
    }
398
};
399
400
8
void register_function_variant_element(SimpleFunctionFactory& factory) {
401
8
    factory.register_function<FunctionVariantElement>();
402
8
}
403
404
} // namespace doris