Coverage Report

Created: 2024-11-22 11:49

/root/doris/be/src/vec/json/parse2column.cpp
Line
Count
Source (jump to first uncovered line)
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "vec/json/parse2column.h"
19
20
#include <assert.h>
21
#include <fmt/format.h>
22
#include <glog/logging.h>
23
#include <parallel_hashmap/phmap.h>
24
#include <simdjson/simdjson.h> // IWYU pragma: keep
25
#include <stddef.h>
26
27
#include <algorithm>
28
#include <memory>
29
#include <mutex>
30
#include <optional>
31
#include <ostream>
32
#include <stack>
33
#include <string>
34
#include <string_view>
35
#include <utility>
36
37
#include "common/config.h"
38
#include "common/status.h"
39
#include "vec/columns/column.h"
40
#include "vec/columns/column_object.h"
41
#include "vec/columns/column_string.h"
42
#include "vec/common/assert_cast.h"
43
#include "vec/common/field_visitors.h"
44
#include "vec/common/schema_util.h"
45
#include "vec/common/string_ref.h"
46
#include "vec/core/field.h"
47
#include "vec/data_types/data_type.h"
48
#include "vec/json/json_parser.h"
49
#include "vec/json/path_in_data.h"
50
#include "vec/json/simd_json_parser.h"
51
52
namespace doris::vectorized {
53
54
/** Pool for objects that cannot be used from different threads simultaneously.
55
  * Allows to create an object for each thread.
56
  * Pool has unbounded size and objects are not destroyed before destruction of pool.
57
  *
58
  * Use it in cases when thread local storage is not appropriate
59
  *  (when maximum number of simultaneously used objects is less
60
  *   than number of running/sleeping threads, that has ever used object,
61
  *   and creation/destruction of objects is expensive).
62
  */
63
template <typename T>
64
class SimpleObjectPool {
65
protected:
66
    /// Hold all available objects in stack.
67
    std::mutex mutex;
68
    std::stack<std::unique_ptr<T>> stack;
69
    /// Specialized deleter for std::unique_ptr.
70
    /// Returns underlying pointer back to stack thus reclaiming its ownership.
71
    struct Deleter {
72
        SimpleObjectPool<T>* parent;
73
0
        Deleter(SimpleObjectPool<T>* parent_ = nullptr) : parent {parent_} {} /// NOLINT
74
0
        void operator()(T* owning_ptr) const {
75
0
            std::lock_guard lock {parent->mutex};
76
0
            parent->stack.emplace(owning_ptr);
77
0
        }
78
    };
79
80
public:
81
    using Pointer = std::unique_ptr<T, Deleter>;
82
    /// Extracts and returns a pointer from the stack if it's not empty,
83
    ///  creates a new one by calling provided f() otherwise.
84
    template <typename Factory>
85
0
    Pointer get(Factory&& f) {
86
0
        std::unique_lock lock(mutex);
87
0
        if (stack.empty()) {
88
0
            return {f(), this};
89
0
        }
90
0
        auto object = stack.top().release();
91
0
        stack.pop();
92
0
        return std::unique_ptr<T, Deleter>(object, Deleter(this));
93
0
    }
94
    /// Like get(), but creates object using default constructor.
95
    Pointer getDefault() {
96
        return get([] { return new T; });
97
    }
98
};
99
100
SimpleObjectPool<JsonParser> parsers_pool;
101
102
using Node = typename ColumnObject::Subcolumns::Node;
103
/// Visitor that keeps @num_dimensions_to_keep dimensions in arrays
104
/// and replaces all scalars or nested arrays to @replacement at that level.
105
class FieldVisitorReplaceScalars : public StaticVisitor<Field> {
106
public:
107
    FieldVisitorReplaceScalars(const Field& replacement_, size_t num_dimensions_to_keep_)
108
0
            : replacement(replacement_), num_dimensions_to_keep(num_dimensions_to_keep_) {}
109
    template <typename T>
110
    Field operator()(const T& x) const {
111
        if constexpr (std::is_same_v<T, Array>) {
112
            if (num_dimensions_to_keep == 0) {
113
                return replacement;
114
            }
115
            const size_t size = x.size();
116
            Array res(size);
117
            for (size_t i = 0; i < size; ++i) {
118
                res[i] = apply_visitor(
119
                        FieldVisitorReplaceScalars(replacement, num_dimensions_to_keep - 1), x[i]);
120
            }
121
            return res;
122
        } else {
123
            return replacement;
124
        }
125
    }
126
127
private:
128
    const Field& replacement;
129
    size_t num_dimensions_to_keep;
130
};
131
132
template <typename ParserImpl>
133
void parse_json_to_variant(IColumn& column, const char* src, size_t length,
134
0
                           JSONDataParser<ParserImpl>* parser, const ParseConfig& config) {
135
0
    auto& column_object = assert_cast<ColumnObject&>(column);
136
0
    std::optional<ParseResult> result;
137
    /// Treat empty string as an empty object
138
    /// for better CAST from String to Object.
139
0
    if (length > 0) {
140
0
        result = parser->parse(src, length, config);
141
0
    } else {
142
0
        result = ParseResult {};
143
0
    }
144
0
    if (!result) {
145
0
        VLOG_DEBUG << "failed to parse " << std::string_view(src, length) << ", length= " << length;
146
0
        if (config::variant_throw_exeception_on_invalid_json) {
147
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to parse object {}",
148
0
                                   std::string_view(src, length));
149
0
        }
150
        // Treat as string
151
0
        PathInData root_path;
152
0
        Field field(String(src, length));
153
0
        result = ParseResult {{root_path}, {field}};
154
0
    }
155
0
    auto& [paths, values] = *result;
156
0
    assert(paths.size() == values.size());
157
0
    size_t old_num_rows = column_object.size();
158
0
    for (size_t i = 0; i < paths.size(); ++i) {
159
0
        FieldInfo field_info;
160
0
        get_field_info(values[i], &field_info);
161
0
        if (WhichDataType(field_info.scalar_type_id).is_nothing()) {
162
0
            continue;
163
0
        }
164
0
        if (column_object.get_subcolumn(paths[i], i) == nullptr) {
165
0
            if (paths[i].has_nested_part()) {
166
0
                column_object.add_nested_subcolumn(paths[i], field_info, old_num_rows);
167
0
            } else {
168
0
                column_object.add_sub_column(paths[i], old_num_rows);
169
0
            }
170
0
        }
171
0
        auto* subcolumn = column_object.get_subcolumn(paths[i], i);
172
0
        if (!subcolumn) {
173
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT, "Failed to find sub column {}",
174
0
                                   paths[i].get_path());
175
0
        }
176
0
        if (subcolumn->size() != old_num_rows) {
177
0
            throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
178
0
                                   "subcolumn {} size missmatched, may contains duplicated entry",
179
0
                                   paths[i].get_path());
180
0
        }
181
0
        subcolumn->insert(std::move(values[i]), std::move(field_info));
182
0
    }
183
    // /// Insert default values to missed subcolumns.
184
0
    const auto& subcolumns = column_object.get_subcolumns();
185
0
    for (const auto& entry : subcolumns) {
186
0
        if (entry->data.size() == old_num_rows) {
187
0
            bool inserted = column_object.try_insert_default_from_nested(entry);
188
0
            if (!inserted) {
189
0
                entry->data.insert_default();
190
0
            }
191
0
        }
192
0
    }
193
0
    column_object.incr_num_rows();
194
0
}
195
196
// exposed interfaces
197
void parse_json_to_variant(IColumn& column, const StringRef& json, JsonParser* parser,
198
0
                           const ParseConfig& config) {
199
0
    return parse_json_to_variant(column, json.data, json.size, parser, config);
200
0
}
201
202
void parse_json_to_variant(IColumn& column, const ColumnString& raw_json_column,
203
0
                           const ParseConfig& config) {
204
0
    auto parser = parsers_pool.get([] { return new JsonParser(); });
205
0
    for (size_t i = 0; i < raw_json_column.size(); ++i) {
206
0
        StringRef raw_json = raw_json_column.get_data_at(i);
207
0
        parse_json_to_variant(column, raw_json.data, raw_json.size, parser.get(), config);
208
0
    }
209
0
}
210
211
} // namespace doris::vectorized