Coverage Report

Created: 2026-04-14 13:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/json/new_json_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <rapidjson/allocators.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/rapidjson.h>
24
#include <simdjson/common_defs.h>
25
#include <simdjson/simdjson.h> // IWYU pragma: keep
26
27
#include <memory>
28
#include <string>
29
#include <string_view>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <vector>
33
34
#include "common/status.h"
35
#include "core/custom_allocator.h"
36
#include "core/string_ref.h"
37
#include "core/types.h"
38
#include "exprs/json_functions.h"
39
#include "format/generic_reader.h"
40
#include "format/line_reader.h"
41
#include "io/file_factory.h"
42
#include "io/fs/file_reader_writer_fwd.h"
43
#include "runtime/runtime_profile.h"
44
#include "util/decompressor.h"
45
46
namespace simdjson::fallback::ondemand {
47
class object;
48
} // namespace simdjson::fallback::ondemand
49
50
namespace doris {
51
class SlotDescriptor;
52
class RuntimeState;
53
class TFileRangeDesc;
54
class TFileScanRangeParams;
55
56
namespace io {
57
class FileSystem;
58
struct IOContext;
59
} // namespace io
60
61
struct ScannerCounter;
62
class Block;
63
class IColumn;
64
65
class NewJsonReader : public GenericReader {
66
    ENABLE_FACTORY_CREATOR(NewJsonReader);
67
68
public:
69
    NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
70
                  const TFileScanRangeParams& params, const TFileRangeDesc& range,
71
                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
72
                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
73
74
    NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
75
                  const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs,
76
                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
77
0
    ~NewJsonReader() override = default;
78
79
    Status init_reader(
80
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx,
81
            bool is_load);
82
    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
83
    Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
84
                       std::unordered_set<std::string>* missing_cols) override;
85
    Status init_schema_reader() override;
86
    Status get_parsed_schema(std::vector<std::string>* col_names,
87
                             std::vector<DataTypePtr>* col_types) override;
88
89
protected:
90
    void _collect_profile_before_close() override;
91
92
private:
93
    Status _get_range_params();
94
    void _init_system_properties();
95
    void _init_file_description();
96
    Status _open_file_reader(bool need_schema);
97
    Status _open_line_reader();
98
    Status _parse_jsonpath_and_json_root();
99
100
    Status _read_json_column(RuntimeState* state, Block& block,
101
                             const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
102
                             bool* eof);
103
104
    Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
105
106
    // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
107
    // Need to read all the data before performing JSON parsing.
108
    Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
109
110
    // simdjson, replace none simdjson function if it is ready
111
    Status _simdjson_init_reader();
112
    Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
113
                                simdjson::error_code* error);
114
    Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
115
                           bool* is_empty_row);
116
    Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
117
118
    Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& block, size_t num_rows,
119
                                  bool* eof);
120
121
    Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
122
                                        const std::vector<SlotDescriptor*>& slot_descs,
123
                                        bool* is_empty_row, bool* eof);
124
125
    Status _simdjson_handle_simple_json_write_columns(
126
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
127
            bool* eof);
128
129
    Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, Block& block,
130
                                                    const std::vector<SlotDescriptor*>& slot_descs,
131
                                                    bool* is_empty_row, bool* eof);
132
133
    Status _simdjson_handle_flat_array_complex_json_write_columns(
134
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
135
            bool* eof);
136
137
    Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& block,
138
                                                const std::vector<SlotDescriptor*>& slot_descs,
139
                                                bool* is_empty_row, bool* eof);
140
141
    Status _simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
142
                                      const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
143
144
    template <bool use_string_cache>
145
    Status _simdjson_write_data_to_column(simdjson::ondemand::value& value,
146
                                          const DataTypePtr& type_desc, IColumn* column_ptr,
147
                                          const std::string& column_name, DataTypeSerDeSPtr serde,
148
                                          bool* valid);
149
150
    Status _simdjson_write_columns_by_jsonpath(simdjson::ondemand::object* value,
151
                                               const std::vector<SlotDescriptor*>& slot_descs,
152
                                               Block& block, bool* valid);
153
    Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
154
                             std::string col_name, bool* valid);
155
156
    size_t _column_index(const StringRef& name, size_t key_index);
157
158
    Status (NewJsonReader::*_vhandle_json_callback)(RuntimeState* state, Block& block,
159
                                                    const std::vector<SlotDescriptor*>& slot_descs,
160
                                                    bool* is_empty_row, bool* eof);
161
    Status _get_column_default_value(
162
            const std::vector<SlotDescriptor*>& slot_descs,
163
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx);
164
165
    Status _fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde,
166
                                IColumn* column_ptr, bool* valid);
167
168
    // fe will add skip_bitmap_col to _file_slot_descs iff the target olap table has skip_bitmap_col
169
    // and the current load is a flexible partial update
170
    // flexible partial update can not be used when user specify jsonpaths, so we just fill the skip bitmap
171
    // in `_simdjson_handle_simple_json` and `_vhandle_simple_json` (which will be used when jsonpaths is not specified)
172
0
    bool _should_process_skip_bitmap_col() const { return skip_bitmap_col_idx != -1; }
173
    void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count);
174
    void _set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, Block& block,
175
                               size_t cur_row_count, bool* valid);
176
    RuntimeState* _state = nullptr;
177
    RuntimeProfile* _profile = nullptr;
178
    ScannerCounter* _counter = nullptr;
179
    const TFileScanRangeParams& _params;
180
    const TFileRangeDesc& _range;
181
    io::FileSystemProperties _system_properties;
182
    io::FileDescription _file_description;
183
    const std::vector<SlotDescriptor*>& _file_slot_descs;
184
185
    io::FileReaderSPtr _file_reader;
186
    std::unique_ptr<LineReader> _line_reader;
187
    bool _reader_eof;
188
    std::unique_ptr<Decompressor> _decompressor;
189
    TFileCompressType::type _file_compress_type;
190
191
    // When we fetch range doesn't start from 0 will always skip the first line
192
    bool _skip_first_line;
193
194
    std::string _line_delimiter;
195
    size_t _line_delimiter_length;
196
197
    uint32_t _next_row;
198
    size_t _total_rows;
199
200
    std::string _jsonpaths;
201
    std::string _json_root;
202
    bool _read_json_by_line;
203
    bool _strip_outer_array;
204
    bool _num_as_string;
205
    bool _fuzzy_parse;
206
207
    std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
208
    std::vector<JsonPath> _parsed_json_root;
209
    bool _parsed_from_json_root = false; // to avoid parsing json root multiple times
210
211
    char _value_buffer[4 * 1024 * 1024]; // 4MB
212
    char _parse_buffer[512 * 1024];      // 512KB
213
214
    using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>,
215
                                                rapidjson::MemoryPoolAllocator<>>;
216
    rapidjson::MemoryPoolAllocator<> _value_allocator;
217
    rapidjson::MemoryPoolAllocator<> _parse_allocator;
218
    Document _origin_json_doc;   // origin json document object from parsed json string
219
    rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root`
220
    std::unordered_map<std::string, int> _name_map;
221
222
    bool* _scanner_eof = nullptr;
223
224
    size_t _current_offset;
225
226
    io::IOContext* _io_ctx = nullptr;
227
    std::shared_ptr<io::IOContext> _io_ctx_holder;
228
229
    RuntimeProfile::Counter* _read_timer = nullptr;
230
231
    // ======SIMD JSON======
232
    // name mapping
233
    /// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
234
    using NameMap = phmap::flat_hash_map<StringRef, size_t, StringRefHash>;
235
    NameMap _slot_desc_index;
236
    /// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
237
    std::vector<NameMap::iterator> _prev_positions;
238
    /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
239
    std::vector<UInt8> _seen_columns;
240
    // simdjson
241
    DorisUniqueBufferPtr<uint8_t> _json_str_ptr;
242
    const uint8_t* _json_str = nullptr;
243
    static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
244
    size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
245
    size_t _original_doc_size = 0;
246
    std::string _simdjson_ondemand_padding_buffer;
247
    std::string _simdjson_ondemand_unscape_padding_buffer;
248
    // char _simdjson_ondemand_padding_buffer[_padded_size];
249
    simdjson::ondemand::document _original_json_doc;
250
    simdjson::ondemand::value _json_value;
251
    // for strip outer array
252
    // array_iter pointed to _array
253
    simdjson::ondemand::array_iterator _array_iter;
254
    simdjson::ondemand::array _array;
255
    std::unique_ptr<simdjson::ondemand::parser> _ondemand_json_parser;
256
    // column to default value string map
257
    std::unordered_map<std::string, std::string> _col_default_value_map;
258
259
    // From document of simdjson:
260
    // ```
261
    //   Important: a value should be consumed once. Calling get_string() twice on the same value is an error.
262
    // ```
263
    // We should cache the string_views to avoid multiple get_string() calls.
264
    struct StringViewHash {
265
0
        size_t operator()(const std::string_view& str) const {
266
0
            return std::hash<int64_t>()(reinterpret_cast<int64_t>(str.data()));
267
0
        }
268
    };
269
    struct StringViewEqual {
270
0
        bool operator()(const std::string_view& lhs, const std::string_view& rhs) const {
271
0
            return lhs.data() == rhs.data() && lhs.size() == rhs.size();
272
0
        }
273
    };
274
    std::unordered_map<std::string_view, std::string_view, StringViewHash, StringViewEqual>
275
            _cached_string_values;
276
277
    int32_t skip_bitmap_col_idx {-1};
278
279
    //Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString.
280
    //If an illegal value is encountered during the load process, `_append_error_msg` should be called
281
    //instead of directly returning `Status::DataQualityError`
282
    bool _is_load = true;
283
284
    // In hive : create table xxx ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
285
    // Hive will not allow you to create columns with the same name but different case, including field names inside
286
    // structs, and will automatically convert uppercase names in create sql to lowercase.However, when Hive loads data
287
    // to table, the column names in the data may be uppercase,and there may be multiple columns with
288
    // the same name but different capitalization.We refer to the behavior of hive, convert all column names
289
    // in the data to lowercase,and use the last one as the insertion value
290
    bool _is_hive_table = false;
291
292
    // hive : org.openx.data.jsonserde.JsonSerDe, `ignore.malformed.json` prop.
293
    // If the variable is true, `null` will be inserted for llegal json format instead of return error.
294
    bool _openx_json_ignore_malformed = false;
295
296
    DataTypeSerDeSPtrs _serdes;
297
    DataTypeSerDe::FormatOptions _serde_options;
298
};
299
300
} // namespace doris