Coverage Report

Created: 2025-09-11 18:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/root/doris/be/src/vec/json/parse2column.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "vec/json/parse2column.h"
19
20
#include <fmt/format.h>
21
#include <glog/logging.h>
22
#include <parallel_hashmap/phmap.h>
23
#include <simdjson/simdjson.h> // IWYU pragma: keep
24
25
#include <algorithm>
26
#include <cassert>
27
#include <cstddef>
28
#include <memory>
29
#include <mutex>
30
#include <optional>
31
#include <ostream>
32
#include <stack>
33
#include <string>
34
#include <string_view>
35
#include <utility>
36
37
#include "common/config.h"
38
#include "common/status.h"
39
#include "vec/columns/column.h"
40
#include "vec/columns/column_string.h"
41
#include "vec/columns/column_variant.h"
42
#include "vec/common/assert_cast.h"
43
#include "vec/common/field_visitors.h"
44
#include "vec/common/schema_util.h"
45
#include "vec/common/string_ref.h"
46
#include "vec/core/field.h"
47
#include "vec/data_types/data_type.h"
48
#include "vec/json/json_parser.h"
49
#include "vec/json/path_in_data.h"
50
#include "vec/json/simd_json_parser.h"
51
52
namespace doris::vectorized {
53
54
/** Pool for objects that cannot be used from different threads simultaneously.
55
  * Allows to create an object for each thread.
56
  * Pool has unbounded size and objects are not destroyed before destruction of pool.
57
  *
58
  * Use it in cases when thread local storage is not appropriate
59
  *  (when maximum number of simultaneously used objects is less
60
  *   than number of running/sleeping threads, that has ever used object,
61
  *   and creation/destruction of objects is expensive).
62
  */
63
template <typename T>
64
class SimpleObjectPool {
65
protected:
66
    /// Hold all available objects in stack.
67
    std::mutex mutex;
68
    std::stack<std::unique_ptr<T>> stack;
69
    /// Specialized deleter for std::unique_ptr.
70
    /// Returns underlying pointer back to stack thus reclaiming its ownership.
71
    struct Deleter {
72
        SimpleObjectPool<T>* parent;
73
2.57k
        Deleter(SimpleObjectPool<T>* parent_ = nullptr) : parent {parent_} {} /// NOLINT
74
2.57k
        void operator()(T* owning_ptr) const {
75
2.57k
            std::lock_guard lock {parent->mutex};
76
2.57k
            parent->stack.emplace(owning_ptr);
77
2.57k
        }
78
    };
79
80
public:
81
    using Pointer = std::unique_ptr<T, Deleter>;
82
    /// Extracts and returns a pointer from the stack if it's not empty,
83
    ///  creates a new one by calling provided f() otherwise.
84
    template <typename Factory>
85
2.57k
    Pointer get(Factory&& f) {
86
2.57k
        std::unique_lock lock(mutex);
87
2.57k
        if (stack.empty()) {
88
1
            return {f(), this};
89
1
        }
90
2.57k
        auto object = stack.top().release();
91
2.57k
        stack.pop();
92
2.57k
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
2.57k
    }
data_type_variant_serde.cpp:_ZN5doris10vectorized16SimpleObjectPoolINS0_14JSONDataParserINS0_14SimdJSONParserEEEE3getIZNS0_21parse_json_to_variantERNS0_7IColumnERKNS0_9ColumnStrIjEERKNS0_11ParseConfigEE3$_0EESt10unique_ptrIS4_NS5_7DeleterEEOT_
Line
Count
Source
85
176
    Pointer get(Factory&& f) {
86
176
        std::unique_lock lock(mutex);
87
176
        if (stack.empty()) {
88
1
            return {f(), this};
89
1
        }
90
175
        auto object = stack.top().release();
91
175
        stack.pop();
92
175
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
176
    }
data_type_variant_serde.cpp:_ZN5doris10vectorized16SimpleObjectPoolINS0_14JSONDataParserINS0_14SimdJSONParserEEEE3getIZNKS0_20DataTypeVariantSerDe30deserialize_one_cell_from_jsonERNS0_7IColumnERNS_5SliceERKNS0_13DataTypeSerDe13FormatOptionsEE3$_0EESt10unique_ptrIS4_NS5_7DeleterEEOT_
Line
Count
Source
85
2.40k
    Pointer get(Factory&& f) {
86
2.40k
        std::unique_lock lock(mutex);
87
2.40k
        if (stack.empty()) {
88
0
            return {f(), this};
89
0
        }
90
2.40k
        auto object = stack.top().release();
91
2.40k
        stack.pop();
92
2.40k
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
2.40k
    }
94
    /// Like get(), but creates object using default constructor.
95
    Pointer getDefault() {
96
        return get([] { return new T; });
97
    }
98
};
99
100
SimpleObjectPool<JsonParser> parsers_pool;
101
102
using Node = typename ColumnVariant::Subcolumns::Node;
103
/// Visitor that keeps @num_dimensions_to_keep dimensions in arrays
104
/// and replaces all scalars or nested arrays to @replacement at that level.
105
class FieldVisitorReplaceScalars : public StaticVisitor<Field> {
106
public:
107
    FieldVisitorReplaceScalars(const Field& replacement_, size_t num_dimensions_to_keep_)
108
            : replacement(replacement_), num_dimensions_to_keep(num_dimensions_to_keep_) {}
109
    template <PrimitiveType T>
110
    Field operator()(const typename PrimitiveTypeTraits<T>::NearestFieldType& x) const {
111
        if constexpr (T == TYPE_ARRAY) {
112
            if (num_dimensions_to_keep == 0) {
113
                return replacement;
114
            }
115
            const size_t size = x.size();
116
            Array res(size);
117
            for (size_t i = 0; i < size; ++i) {
118
                res[i] = apply_visitor(
119
                        FieldVisitorReplaceScalars(replacement, num_dimensions_to_keep - 1), x[i]);
120
            }
121
            return Field::create_field<TYPE_ARRAY>(res);
122
        } else {
123
            return replacement;
124
        }
125
    }
126
127
private:
128
    const Field& replacement;
129
    size_t num_dimensions_to_keep;
130
};
131
132
template <typename ParserImpl>
133
void parse_json_to_variant(IColumn& column, const char* src, size_t length,
134
57.3k
                           JSONDataParser<ParserImpl>* parser, const ParseConfig& config) {
135
57.3k
    auto& column_variant = assert_cast<ColumnVariant&>(column);
136
57.3k
    std::optional<ParseResult> result;
137
    /// Treat empty string as an empty object
138
    /// for better CAST from String to Object.
139
57.3k
    if (length > 0) {
140
57.3k
        result = parser->parse(src, length, config);
141
57.3k
    } else {
142
1
        result = ParseResult {};
143
1
    }
144
57.3k
    if (!result) {
145
11
        VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << ", length= " << length;
146
11
        if (config::variant_throw_exeception_on_invalid_json) {
147
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}",
148
0
                                   std::string_view(src, length));
149
0
        }
150
        // Treat as string
151
11
        PathInData root_path;
152
11
        Field field = Field::create_field<TYPE_STRING>(String(src, length));
153
11
        result = ParseResult {{root_path}, {field}};
154
11
    }
155
57.3k
    auto& [paths, values] = *result;
156
57.3k
    assert(paths.size() == values.size());
157
56.7k
    size_t old_num_rows = column_variant.rows();
158
56.7k
    if (config.enable_flatten_nested) {
159
        // here we should check the paths in variant and paths in result,
160
        // if two paths which same prefix have different structure, we should throw an exception
161
3.00k
        std::vector<PathInData> check_paths;
162
11.9k
        for (const auto& entry : column_variant.get_subcolumns()) {
163
11.9k
            check_paths.push_back(entry->path);
164
11.9k
        }
165
3.00k
        check_paths.insert(check_paths.end(), paths.begin(), paths.end());
166
3.00k
        THROW_IF_ERROR(vectorized::schema_util::check_variant_has_no_ambiguous_paths(check_paths));
167
3.00k
    }
168
341k
    for (size_t i = 0; i < paths.size(); ++i) {
169
284k
        FieldInfo field_info;
170
284k
        schema_util::get_field_info(values[i], &field_info);
171
284k
        if (field_info.scalar_type_id == PrimitiveType::INVALID_TYPE) {
172
101
            continue;
173
101
        }
174
284k
        if (column_variant.get_subcolumn(paths[i], i) == nullptr) {
175
941
            if (paths[i].has_nested_part()) {
176
8
                column_variant.add_nested_subcolumn(paths[i], field_info, old_num_rows);
177
933
            } else {
178
933
                column_variant.add_sub_column(paths[i], old_num_rows);
179
933
            }
180
941
        }
181
284k
        auto* subcolumn = column_variant.get_subcolumn(paths[i], i);
182
284k
        if (!subcolumn) {
183
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
184
0
                                   paths[i].get_path());
185
0
        }
186
284k
        if (subcolumn->cur_num_of_defaults() > 0) {
187
84.0k
            subcolumn->insert_many_defaults(subcolumn->cur_num_of_defaults());
188
84.0k
            subcolumn->reset_current_num_of_defaults();
189
84.0k
        }
190
284k
        if (subcolumn->size() != old_num_rows) {
191
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
192
0
                                   "subcolumn {} size missmatched, may contains duplicated entry",
193
0
                                   paths[i].get_path());
194
0
        }
195
284k
        subcolumn->insert(std::move(values[i]), std::move(field_info));
196
284k
    }
197
    // /// Insert default values to missed subcolumns.
198
56.7k
    const auto& subcolumns = column_variant.get_subcolumns();
199
589k
    for (const auto& entry : subcolumns) {
200
589k
        if (entry->data.size() == old_num_rows) {
201
            // Handle nested paths differently from simple paths
202
305k
            if (entry->path.has_nested_part()) {
203
                // Try to insert default from nested, if failed, insert regular default
204
0
                bool success = UNLIKELY(column_variant.try_insert_default_from_nested(entry));
205
0
                if (!success) {
206
0
                    entry->data.insert_default();
207
0
                }
208
305k
            } else {
209
                // For non-nested paths, increment default counter
210
305k
                entry->data.increment_default_counter();
211
305k
            }
212
305k
        }
213
589k
    }
214
56.7k
    column_variant.incr_num_rows();
215
56.7k
    auto sparse_column = column_variant.get_sparse_column();
216
56.7k
    if (sparse_column->size() == old_num_rows) {
217
56.7k
        sparse_column->assume_mutable()->insert_default();
218
56.7k
    }
219
56.7k
#ifndef NDEBUG
220
56.7k
    column_variant.check_consistency();
221
56.7k
#endif
222
56.7k
}
223
224
// exposed interfaces
225
void parse_json_to_variant(IColumn& column, const StringRef& json, JsonParser* parser,
226
0
                           const ParseConfig& config) {
227
0
    return parse_json_to_variant(column, json.data, json.size, parser, config);
228
0
}
229
230
void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
231
176
                           const ParseConfig& config) {
232
176
    auto parser = parsers_pool.get([] { return new JsonParser(); });
233
55.1k
    for (size_t i = 0; i < raw_json_column.size(); ++i) {
234
54.9k
        StringRef raw_json = raw_json_column.get_data_at(i);
235
54.9k
        parse_json_to_variant(column, raw_json.data, raw_json.size, parser.get(), config);
236
54.9k
    }
237
176
    column.finalize();
238
176
}
239
240
} // namespace doris::vectorized