Coverage Report

Created: 2026-06-01 17:43

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/vec/json/parse2column.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "vec/json/parse2column.h"
19
20
#include <fmt/format.h>
21
#include <glog/logging.h>
22
#include <parallel_hashmap/phmap.h>
23
#include <simdjson/simdjson.h> // IWYU pragma: keep
24
25
#include <algorithm>
26
#include <cassert>
27
#include <cstddef>
28
#include <memory>
29
#include <mutex>
30
#include <optional>
31
#include <ostream>
32
#include <stack>
33
#include <string>
34
#include <string_view>
35
#include <utility>
36
37
#include "common/config.h"
38
#include "common/status.h"
39
#include "vec/columns/column.h"
40
#include "vec/columns/column_string.h"
41
#include "vec/columns/column_variant.h"
42
#include "vec/common/assert_cast.h"
43
#include "vec/common/field_visitors.h"
44
#include "vec/common/schema_util.h"
45
#include "vec/common/string_ref.h"
46
#include "vec/core/field.h"
47
#include "vec/data_types/data_type.h"
48
#include "vec/json/json_parser.h"
49
#include "vec/json/path_in_data.h"
50
#include "vec/json/simd_json_parser.h"
51
52
namespace doris::vectorized {
53
54
/** Pool for objects that cannot be used from different threads simultaneously.
55
  * Allows to create an object for each thread.
56
  * Pool has unbounded size and objects are not destroyed before destruction of pool.
57
  *
58
  * Use it in cases when thread local storage is not appropriate
59
  *  (when maximum number of simultaneously used objects is less
60
  *   than number of running/sleeping threads, that has ever used object,
61
  *   and creation/destruction of objects is expensive).
62
  */
63
template <typename T>
64
class SimpleObjectPool {
65
protected:
66
    /// Hold all available objects in stack.
67
    std::mutex mutex;
68
    std::stack<std::unique_ptr<T>> stack;
69
    /// Specialized deleter for std::unique_ptr.
70
    /// Returns underlying pointer back to stack thus reclaiming its ownership.
71
    struct Deleter {
72
        SimpleObjectPool<T>* parent;
73
2.59k
        Deleter(SimpleObjectPool<T>* parent_ = nullptr) : parent {parent_} {} /// NOLINT
74
2.59k
        void operator()(T* owning_ptr) const {
75
2.59k
            std::lock_guard lock {parent->mutex};
76
2.59k
            parent->stack.emplace(owning_ptr);
77
2.59k
        }
78
    };
79
80
public:
81
    using Pointer = std::unique_ptr<T, Deleter>;
82
    /// Extracts and returns a pointer from the stack if it's not empty,
83
    ///  creates a new one by calling provided f() otherwise.
84
    template <typename Factory>
85
2.59k
    Pointer get(Factory&& f) {
86
2.59k
        std::unique_lock lock(mutex);
87
2.59k
        if (stack.empty()) {
88
1
            return {f(), this};
89
1
        }
90
2.59k
        auto object = stack.top().release();
91
2.59k
        stack.pop();
92
2.59k
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
2.59k
    }
data_type_variant_serde.cpp:_ZN5doris10vectorized16SimpleObjectPoolINS0_14JSONDataParserINS0_14SimdJSONParserEEEE3getIZNS0_21parse_json_to_variantERNS0_7IColumnERKNS0_9ColumnStrIjEERKNS0_11ParseConfigEE3$_0EESt10unique_ptrIS4_NS5_7DeleterEEOT_
Line
Count
Source
85
193
    Pointer get(Factory&& f) {
86
193
        std::unique_lock lock(mutex);
87
193
        if (stack.empty()) {
88
1
            return {f(), this};
89
1
        }
90
192
        auto object = stack.top().release();
91
192
        stack.pop();
92
192
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
193
    }
data_type_variant_serde.cpp:_ZN5doris10vectorized16SimpleObjectPoolINS0_14JSONDataParserINS0_14SimdJSONParserEEEE3getIZNKS0_20DataTypeVariantSerDe30deserialize_one_cell_from_jsonERNS0_7IColumnERNS_5SliceERKNS0_13DataTypeSerDe13FormatOptionsEE3$_0EESt10unique_ptrIS4_NS5_7DeleterEEOT_
Line
Count
Source
85
2.40k
    Pointer get(Factory&& f) {
86
2.40k
        std::unique_lock lock(mutex);
87
2.40k
        if (stack.empty()) {
88
0
            return {f(), this};
89
0
        }
90
2.40k
        auto object = stack.top().release();
91
2.40k
        stack.pop();
92
2.40k
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
2.40k
    }
94
    /// Like get(), but creates object using default constructor.
95
    Pointer getDefault() {
96
        return get([] { return new T; });
97
    }
98
};
99
100
SimpleObjectPool<JsonParser> parsers_pool;
101
102
using Node = typename ColumnVariant::Subcolumns::Node;
103
/// Visitor that keeps @num_dimensions_to_keep dimensions in arrays
104
/// and replaces all scalars or nested arrays to @replacement at that level.
105
class FieldVisitorReplaceScalars : public StaticVisitor<Field> {
106
public:
107
    FieldVisitorReplaceScalars(const Field& replacement_, size_t num_dimensions_to_keep_)
108
            : replacement(replacement_), num_dimensions_to_keep(num_dimensions_to_keep_) {}
109
    template <PrimitiveType T>
110
    Field operator()(const typename PrimitiveTypeTraits<T>::CppType& x) const {
111
        if constexpr (T == TYPE_ARRAY) {
112
            if (num_dimensions_to_keep == 0) {
113
                return replacement;
114
            }
115
            const size_t size = x.size();
116
            Array res(size);
117
            for (size_t i = 0; i < size; ++i) {
118
                res[i] = apply_visitor(
119
                        FieldVisitorReplaceScalars(replacement, num_dimensions_to_keep - 1), x[i]);
120
            }
121
            return Field::create_field<TYPE_ARRAY>(res);
122
        } else {
123
            return replacement;
124
        }
125
    }
126
127
private:
128
    const Field& replacement;
129
    size_t num_dimensions_to_keep;
130
};
131
132
template <typename ParserImpl>
133
void parse_json_to_variant(IColumn& column, const char* src, size_t length,
134
69.5k
                           JSONDataParser<ParserImpl>* parser, const ParseConfig& config) {
135
69.5k
    auto& column_variant = assert_cast<ColumnVariant&>(column);
136
69.5k
    std::optional<ParseResult> result;
137
    /// Treat empty string as an empty object
138
    /// for better CAST from String to Object.
139
69.5k
    if (length > 0) {
140
69.5k
        result = parser->parse(src, length, config);
141
69.5k
    } else {
142
1
        result = ParseResult {};
143
1
    }
144
69.5k
    if (!result) {
145
11
        VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << ", length= " << length;
146
11
        if (config::variant_throw_exeception_on_invalid_json) {
147
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}",
148
0
                                   std::string_view(src, length));
149
0
        }
150
        // Treat as string
151
11
        PathInData root_path;
152
11
        Field field = Field::create_field<TYPE_STRING>(String(src, length));
153
11
        result = ParseResult {{root_path}, {field}};
154
11
    }
155
69.5k
    auto& [paths, values] = *result;
156
69.5k
    assert(paths.size() == values.size());
157
69.0k
    size_t old_num_rows = column_variant.rows();
158
69.0k
    if (config.enable_flatten_nested) {
159
        // here we should check the paths in variant and paths in result,
160
        // if two paths which same prefix have different structure, we should throw an exception
161
3.00k
        std::vector<PathInData> check_paths;
162
11.9k
        for (const auto& entry : column_variant.get_subcolumns()) {
163
11.9k
            check_paths.push_back(entry->path);
164
11.9k
        }
165
3.00k
        check_paths.insert(check_paths.end(), paths.begin(), paths.end());
166
3.00k
        THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
167
3.00k
    }
168
169
69.0k
    auto is_plain_path = [](const PathInData& path) {
170
4
        for (const auto& part : path.get_parts()) {
171
4
            if (part.is_nested || part.anonymous_array_level != 0) {
172
0
                return false;
173
0
            }
174
4
        }
175
3
        return true;
176
3
    };
177
178
69.0k
    auto get_or_create_subcolumn = [&](const PathInData& path, size_t index_hint,
179
349k
                                       const FieldInfo& field_info) -> ColumnVariant::Subcolumn* {
180
349k
        auto* subcolumn = column_variant.get_subcolumn(path, index_hint);
181
349k
        if (subcolumn == nullptr) {
182
1.07k
            if (path.has_nested_part()) {
183
8
                column_variant.add_nested_subcolumn(path, field_info, old_num_rows);
184
1.07k
            } else {
185
1.07k
                column_variant.add_sub_column(path, old_num_rows);
186
1.07k
            }
187
1.07k
            subcolumn = column_variant.get_subcolumn(path, index_hint);
188
1.07k
        }
189
349k
        if (!subcolumn) {
190
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
191
0
                                   path.get_path());
192
0
        }
193
349k
        return subcolumn;
194
349k
    };
195
196
349k
    auto normalize_plain_path = [&](const PathInData& path) {
197
349k
        if (!config.check_duplicate_json_path || path.empty() || !is_plain_path(path)) {
198
349k
            return path;
199
349k
        }
200
3
        return PathInData(path.get_path());
201
349k
    };
202
203
419k
    for (size_t i = 0; i < paths.size(); ++i) {
204
350k
        FieldInfo field_info;
205
350k
        schema_util::get_field_info(values[i], &field_info);
206
350k
        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
207
101
            continue;
208
101
        }
209
349k
        auto path = normalize_plain_path(paths[i]);
210
349k
        auto* subcolumn = get_or_create_subcolumn(path, i, field_info);
211
349k
        if (subcolumn->cur_num_of_defaults() > 0) {
212
104k
            subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
213
104k
            subcolumn->reset_current_num_of_defaults();
214
104k
        }
215
349k
        if (subcolumn->size() != old_num_rows) {
216
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
217
0
                                   "subcolumn {} size missmatched, may contains duplicated entry",
218
0
                                   path.get_path());
219
0
        }
220
349k
        subcolumn->insert(std::move(values[i]), std::move(field_info));
221
349k
    }
222
    // /// Insert default values to missed subcolumns.
223
69.0k
    const auto& subcolumns = column_variant.get_subcolumns();
224
723k
    for (const auto& entry : subcolumns) {
225
723k
        if (entry->data.size() == old_num_rows) {
226
            // Handle nested paths differently from simple paths
227
373k
            if (entry->path.has_nested_part()) {
228
                // Try to insert default from nested, if failed, insert regular default
229
0
                bool success = UNLIKELY(column_variant.try_insert_default_from_nested(entry));
230
0
                if (!success) {
231
0
                    entry->data.insert_default();
232
0
                }
233
373k
            } else {
234
                // For non-nested paths, increment default counter
235
373k
                entry->data.increment_default_counter();
236
373k
            }
237
373k
        }
238
723k
    }
239
69.0k
    column_variant.incr_num_rows();
240
69.0k
    auto sparse_column = column_variant.get_sparse_column();
241
69.0k
    if (sparse_column->size() == old_num_rows) {
242
69.0k
        sparse_column->assume_mutable()->insert_default();
243
69.0k
    }
244
69.0k
#ifndef NDEBUG
245
69.0k
    column_variant.check_consistency();
246
69.0k
#endif
247
69.0k
}
248
249
// exposed interfaces
250
void parse_json_to_variant(IColumn& column, const StringRef& json, JsonParser* parser,
251
0
                           const ParseConfig& config) {
252
0
    return parse_json_to_variant(column, json.data, json.size, parser, config);
253
0
}
254
255
void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
256
193
                           const ParseConfig& config) {
257
193
    auto parser = parsers_pool.get([] { return new JsonParser(); });
258
67.3k
    for (size_t i = 0; i < raw_json_column.size(); ++i) {
259
67.1k
        StringRef raw_json = raw_json_column.get_data_at(i);
260
67.1k
        parse_json_to_variant(column, raw_json.data, raw_json.size, parser.get(), config);
261
67.1k
    }
262
193
    column.finalize();
263
193
}
264
265
} // namespace doris::vectorized