Coverage Report

Created: 2026-03-14 06:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/json/new_json_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <rapidjson/allocators.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/rapidjson.h>
24
#include <simdjson/common_defs.h>
25
#include <simdjson/simdjson.h> // IWYU pragma: keep
26
27
#include <memory>
28
#include <string>
29
#include <string_view>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <vector>
33
34
#include "common/status.h"
35
#include "core/custom_allocator.h"
36
#include "core/string_ref.h"
37
#include "core/types.h"
38
#include "exprs/json_functions.h"
39
#include "format/generic_reader.h"
40
#include "format/line_reader.h"
41
#include "io/file_factory.h"
42
#include "io/fs/file_reader_writer_fwd.h"
43
#include "runtime/runtime_profile.h"
44
#include "util/decompressor.h"
45
46
namespace simdjson::fallback::ondemand {
47
class object;
48
} // namespace simdjson::fallback::ondemand
49
50
namespace doris {
51
#include "common/compile_check_begin.h"
52
class SlotDescriptor;
53
class RuntimeState;
54
class TFileRangeDesc;
55
class TFileScanRangeParams;
56
57
namespace io {
58
class FileSystem;
59
struct IOContext;
60
} // namespace io
61
62
struct ScannerCounter;
63
class Block;
64
class IColumn;
65
66
class NewJsonReader : public GenericReader {
67
    ENABLE_FACTORY_CREATOR(NewJsonReader);
68
69
public:
70
    NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
71
                  const TFileScanRangeParams& params, const TFileRangeDesc& range,
72
                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
73
                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
74
75
    NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
76
                  const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs,
77
                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
78
0
    ~NewJsonReader() override = default;
79
80
    Status init_reader(
81
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx,
82
            bool is_load);
83
    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
84
    Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
85
                       std::unordered_set<std::string>* missing_cols) override;
86
    Status init_schema_reader() override;
87
    Status get_parsed_schema(std::vector<std::string>* col_names,
88
                             std::vector<DataTypePtr>* col_types) override;
89
90
protected:
91
    void _collect_profile_before_close() override;
92
93
private:
94
    Status _get_range_params();
95
    void _init_system_properties();
96
    void _init_file_description();
97
    Status _open_file_reader(bool need_schema);
98
    Status _open_line_reader();
99
    Status _parse_jsonpath_and_json_root();
100
101
    Status _read_json_column(RuntimeState* state, Block& block,
102
                             const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
103
                             bool* eof);
104
105
    Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
106
107
    // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
108
    // Need to read all the data before performing JSON parsing.
109
    Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
110
111
    // simdjson, replace none simdjson function if it is ready
112
    Status _simdjson_init_reader();
113
    Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
114
                                simdjson::error_code* error);
115
    Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
116
                           bool* is_empty_row);
117
    Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
118
119
    Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& block, size_t num_rows,
120
                                  bool* eof);
121
122
    Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
123
                                        const std::vector<SlotDescriptor*>& slot_descs,
124
                                        bool* is_empty_row, bool* eof);
125
126
    Status _simdjson_handle_simple_json_write_columns(
127
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
128
            bool* eof);
129
130
    Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, Block& block,
131
                                                    const std::vector<SlotDescriptor*>& slot_descs,
132
                                                    bool* is_empty_row, bool* eof);
133
134
    Status _simdjson_handle_flat_array_complex_json_write_columns(
135
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
136
            bool* eof);
137
138
    Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& block,
139
                                                const std::vector<SlotDescriptor*>& slot_descs,
140
                                                bool* is_empty_row, bool* eof);
141
142
    Status _simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
143
                                      const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
144
145
    template <bool use_string_cache>
146
    Status _simdjson_write_data_to_column(simdjson::ondemand::value& value,
147
                                          const DataTypePtr& type_desc, IColumn* column_ptr,
148
                                          const std::string& column_name, DataTypeSerDeSPtr serde,
149
                                          bool* valid);
150
151
    Status _simdjson_write_columns_by_jsonpath(simdjson::ondemand::object* value,
152
                                               const std::vector<SlotDescriptor*>& slot_descs,
153
                                               Block& block, bool* valid);
154
    Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
155
                             std::string col_name, bool* valid);
156
157
    size_t _column_index(const StringRef& name, size_t key_index);
158
159
    Status (NewJsonReader::*_vhandle_json_callback)(RuntimeState* state, Block& block,
160
                                                    const std::vector<SlotDescriptor*>& slot_descs,
161
                                                    bool* is_empty_row, bool* eof);
162
    Status _get_column_default_value(
163
            const std::vector<SlotDescriptor*>& slot_descs,
164
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx);
165
166
    Status _fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde,
167
                                IColumn* column_ptr, bool* valid);
168
169
    // fe will add skip_bitmap_col to _file_slot_descs iff the target olap table has skip_bitmap_col
170
    // and the current load is a flexible partial update
171
    // flexible partial update can not be used when user specify jsonpaths, so we just fill the skip bitmap
172
    // in `_simdjson_handle_simple_json` and `_vhandle_simple_json` (which will be used when jsonpaths is not specified)
173
0
    bool _should_process_skip_bitmap_col() const { return skip_bitmap_col_idx != -1; }
174
    void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count);
175
    void _set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, Block& block,
176
                               size_t cur_row_count, bool* valid);
177
    RuntimeState* _state = nullptr;
178
    RuntimeProfile* _profile = nullptr;
179
    ScannerCounter* _counter = nullptr;
180
    const TFileScanRangeParams& _params;
181
    const TFileRangeDesc& _range;
182
    io::FileSystemProperties _system_properties;
183
    io::FileDescription _file_description;
184
    const std::vector<SlotDescriptor*>& _file_slot_descs;
185
186
    io::FileReaderSPtr _file_reader;
187
    std::unique_ptr<LineReader> _line_reader;
188
    bool _reader_eof;
189
    std::unique_ptr<Decompressor> _decompressor;
190
    TFileCompressType::type _file_compress_type;
191
192
    // When we fetch range doesn't start from 0 will always skip the first line
193
    bool _skip_first_line;
194
195
    std::string _line_delimiter;
196
    size_t _line_delimiter_length;
197
198
    uint32_t _next_row;
199
    size_t _total_rows;
200
201
    std::string _jsonpaths;
202
    std::string _json_root;
203
    bool _read_json_by_line;
204
    bool _strip_outer_array;
205
    bool _num_as_string;
206
    bool _fuzzy_parse;
207
208
    std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
209
    std::vector<JsonPath> _parsed_json_root;
210
    bool _parsed_from_json_root = false; // to avoid parsing json root multiple times
211
212
    char _value_buffer[4 * 1024 * 1024]; // 4MB
213
    char _parse_buffer[512 * 1024];      // 512KB
214
215
    using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>,
216
                                                rapidjson::MemoryPoolAllocator<>>;
217
    rapidjson::MemoryPoolAllocator<> _value_allocator;
218
    rapidjson::MemoryPoolAllocator<> _parse_allocator;
219
    Document _origin_json_doc;   // origin json document object from parsed json string
220
    rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root`
221
    std::unordered_map<std::string, int> _name_map;
222
223
    bool* _scanner_eof = nullptr;
224
225
    size_t _current_offset;
226
227
    io::IOContext* _io_ctx = nullptr;
228
    std::shared_ptr<io::IOContext> _io_ctx_holder;
229
230
    RuntimeProfile::Counter* _read_timer = nullptr;
231
232
    // ======SIMD JSON======
233
    // name mapping
234
    /// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
235
    using NameMap = phmap::flat_hash_map<StringRef, size_t, StringRefHash>;
236
    NameMap _slot_desc_index;
237
    /// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
238
    std::vector<NameMap::iterator> _prev_positions;
239
    /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
240
    std::vector<UInt8> _seen_columns;
241
    // simdjson
242
    DorisUniqueBufferPtr<uint8_t> _json_str_ptr;
243
    const uint8_t* _json_str = nullptr;
244
    static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
245
    size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
246
    size_t _original_doc_size = 0;
247
    std::string _simdjson_ondemand_padding_buffer;
248
    std::string _simdjson_ondemand_unscape_padding_buffer;
249
    // char _simdjson_ondemand_padding_buffer[_padded_size];
250
    simdjson::ondemand::document _original_json_doc;
251
    simdjson::ondemand::value _json_value;
252
    // for strip outer array
253
    // array_iter pointed to _array
254
    simdjson::ondemand::array_iterator _array_iter;
255
    simdjson::ondemand::array _array;
256
    std::unique_ptr<simdjson::ondemand::parser> _ondemand_json_parser;
257
    // column to default value string map
258
    std::unordered_map<std::string, std::string> _col_default_value_map;
259
260
    // From document of simdjson:
261
    // ```
262
    //   Important: a value should be consumed once. Calling get_string() twice on the same value is an error.
263
    // ```
264
    // We should cache the string_views to avoid multiple get_string() calls.
265
    struct StringViewHash {
266
0
        size_t operator()(const std::string_view& str) const {
267
0
            return std::hash<int64_t>()(reinterpret_cast<int64_t>(str.data()));
268
0
        }
269
    };
270
    struct StringViewEqual {
271
0
        bool operator()(const std::string_view& lhs, const std::string_view& rhs) const {
272
0
            return lhs.data() == rhs.data() && lhs.size() == rhs.size();
273
0
        }
274
    };
275
    std::unordered_map<std::string_view, std::string_view, StringViewHash, StringViewEqual>
276
            _cached_string_values;
277
278
    int32_t skip_bitmap_col_idx {-1};
279
280
    //Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString.
281
    //If an illegal value is encountered during the load process, `_append_error_msg` should be called
282
    //instead of directly returning `Status::DataQualityError`
283
    bool _is_load = true;
284
285
    // In hive : create table xxx ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
286
    // Hive will not allow you to create columns with the same name but different case, including field names inside
287
    // structs, and will automatically convert uppercase names in create sql to lowercase.However, when Hive loads data
288
    // to table, the column names in the data may be uppercase,and there may be multiple columns with
289
    // the same name but different capitalization.We refer to the behavior of hive, convert all column names
290
    // in the data to lowercase,and use the last one as the insertion value
291
    bool _is_hive_table = false;
292
293
    // hive : org.openx.data.jsonserde.JsonSerDe, `ignore.malformed.json` prop.
294
    // If the variable is true, `null` will be inserted for llegal json format instead of return error.
295
    bool _openx_json_ignore_malformed = false;
296
297
    DataTypeSerDeSPtrs _serdes;
298
    DataTypeSerDe::FormatOptions _serde_options;
299
};
300
301
#include "common/compile_check_end.h"
302
} // namespace doris