Coverage Report

Created: 2026-04-21 19:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/json/new_json_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <rapidjson/allocators.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/rapidjson.h>
24
#include <simdjson/common_defs.h>
25
#include <simdjson/simdjson.h> // IWYU pragma: keep
26
27
#include <memory>
28
#include <string>
29
#include <string_view>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <vector>
33
34
#include "common/status.h"
35
#include "core/custom_allocator.h"
36
#include "core/string_ref.h"
37
#include "core/types.h"
38
#include "exprs/json_functions.h"
39
#include "format/line_reader.h"
40
#include "format/table/table_format_reader.h"
41
#include "io/file_factory.h"
42
#include "io/fs/file_reader_writer_fwd.h"
43
#include "runtime/runtime_profile.h"
44
#include "util/decompressor.h"
45
46
namespace simdjson::fallback::ondemand {
47
class object;
48
} // namespace simdjson::fallback::ondemand
49
50
namespace doris {
51
class SlotDescriptor;
52
class RuntimeState;
53
class TFileRangeDesc;
54
class TFileScanRangeParams;
55
56
namespace io {
57
class FileSystem;
58
struct IOContext;
59
} // namespace io
60
61
struct ScannerCounter;
62
class Block;
63
class IColumn;
64
65
/// JSON-specific initialization context.
66
/// Extends ReaderInitContext with default value context (unique to JSON reader).
67
struct JsonInitContext final : public ReaderInitContext {
68
    const std::unordered_map<std::string, VExprContextSPtr>* col_default_value_ctx = nullptr;
69
    bool is_load = false;
70
};
71
72
class NewJsonReader : public TableFormatReader {
73
    ENABLE_FACTORY_CREATOR(NewJsonReader);
74
75
public:
76
    NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
77
                  const TFileScanRangeParams& params, const TFileRangeDesc& range,
78
                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
79
                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
80
81
    NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
82
                  const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs,
83
                  io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
84
0
    ~NewJsonReader() override = default;
85
86
    Status init_reader(
87
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx,
88
            bool is_load);
89
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override;
90
    Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override;
91
    Status init_schema_reader() override;
92
    Status get_parsed_schema(std::vector<std::string>* col_names,
93
                             std::vector<DataTypePtr>* col_types) override;
94
95
protected:
96
    // ---- Unified init_reader(ReaderInitContext*) overrides ----
97
    Status _open_file_reader(ReaderInitContext* ctx) override;
98
    Status _do_init_reader(ReaderInitContext* ctx) override;
99
100
    void _collect_profile_before_close() override;
101
102
private:
103
    Status _get_range_params();
104
    void _init_system_properties();
105
    void _init_file_description();
106
    Status _open_file_reader(bool need_schema);
107
    Status _open_line_reader();
108
    Status _parse_jsonpath_and_json_root();
109
110
    Status _read_json_column(RuntimeState* state, Block& block,
111
                             const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
112
                             bool* eof);
113
114
    Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
115
116
    // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
117
    // Need to read all the data before performing JSON parsing.
118
    Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
119
120
    // simdjson, replace none simdjson function if it is ready
121
    Status _simdjson_init_reader();
122
    Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
123
                                simdjson::error_code* error);
124
    Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
125
                           bool* is_empty_row);
126
    Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
127
128
    Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& block, size_t num_rows,
129
                                  bool* eof);
130
131
    Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
132
                                        const std::vector<SlotDescriptor*>& slot_descs,
133
                                        bool* is_empty_row, bool* eof);
134
135
    Status _simdjson_handle_simple_json_write_columns(
136
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
137
            bool* eof);
138
139
    Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, Block& block,
140
                                                    const std::vector<SlotDescriptor*>& slot_descs,
141
                                                    bool* is_empty_row, bool* eof);
142
143
    Status _simdjson_handle_flat_array_complex_json_write_columns(
144
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
145
            bool* eof);
146
147
    Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& block,
148
                                                const std::vector<SlotDescriptor*>& slot_descs,
149
                                                bool* is_empty_row, bool* eof);
150
151
    Status _simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
152
                                      const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
153
154
    template <bool use_string_cache>
155
    Status _simdjson_write_data_to_column(simdjson::ondemand::value& value,
156
                                          const DataTypePtr& type_desc, IColumn* column_ptr,
157
                                          const std::string& column_name, DataTypeSerDeSPtr serde,
158
                                          bool* valid);
159
160
    Status _simdjson_write_columns_by_jsonpath(simdjson::ondemand::object* value,
161
                                               const std::vector<SlotDescriptor*>& slot_descs,
162
                                               Block& block, bool* valid);
163
    Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
164
                             std::string col_name, bool* valid);
165
166
    size_t _column_index(const StringRef& name, size_t key_index);
167
168
    Status (NewJsonReader::*_vhandle_json_callback)(RuntimeState* state, Block& block,
169
                                                    const std::vector<SlotDescriptor*>& slot_descs,
170
                                                    bool* is_empty_row, bool* eof);
171
    Status _get_column_default_value(
172
            const std::vector<SlotDescriptor*>& slot_descs,
173
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx);
174
175
    Status _fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde,
176
                                IColumn* column_ptr, bool* valid);
177
178
    // fe will add skip_bitmap_col to _file_slot_descs iff the target olap table has skip_bitmap_col
179
    // and the current load is a flexible partial update
180
    // flexible partial update can not be used when user specify jsonpaths, so we just fill the skip bitmap
181
    // in `_simdjson_handle_simple_json` and `_vhandle_simple_json` (which will be used when jsonpaths is not specified)
182
0
    bool _should_process_skip_bitmap_col() const { return skip_bitmap_col_idx != -1; }
183
    void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count);
184
    void _set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, Block& block,
185
                               size_t cur_row_count, bool* valid);
186
    RuntimeState* _state = nullptr;
187
    RuntimeProfile* _profile = nullptr;
188
    ScannerCounter* _counter = nullptr;
189
    const TFileScanRangeParams& _params;
190
    const TFileRangeDesc& _range;
191
    io::FileSystemProperties _system_properties;
192
    io::FileDescription _file_description;
193
    const std::vector<SlotDescriptor*>& _file_slot_descs;
194
195
    io::FileReaderSPtr _file_reader;
196
    std::unique_ptr<LineReader> _line_reader;
197
    bool _reader_eof;
198
    std::unique_ptr<Decompressor> _decompressor;
199
    TFileCompressType::type _file_compress_type;
200
201
    // When we fetch range doesn't start from 0 will always skip the first line
202
    bool _skip_first_line;
203
204
    std::string _line_delimiter;
205
    size_t _line_delimiter_length;
206
207
    uint32_t _next_row;
208
    size_t _total_rows;
209
210
    std::string _jsonpaths;
211
    std::string _json_root;
212
    bool _read_json_by_line;
213
    bool _strip_outer_array;
214
    bool _num_as_string;
215
    bool _fuzzy_parse;
216
217
    std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
218
    std::vector<JsonPath> _parsed_json_root;
219
    bool _parsed_from_json_root = false; // to avoid parsing json root multiple times
220
221
    char _value_buffer[4 * 1024 * 1024]; // 4MB
222
    char _parse_buffer[512 * 1024];      // 512KB
223
224
    using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>,
225
                                                rapidjson::MemoryPoolAllocator<>>;
226
    rapidjson::MemoryPoolAllocator<> _value_allocator;
227
    rapidjson::MemoryPoolAllocator<> _parse_allocator;
228
    Document _origin_json_doc;   // origin json document object from parsed json string
229
    rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root`
230
    std::unordered_map<std::string, int> _name_map;
231
232
    bool* _scanner_eof = nullptr;
233
234
    size_t _current_offset;
235
236
    io::IOContext* _io_ctx = nullptr;
237
    std::shared_ptr<io::IOContext> _io_ctx_holder;
238
239
    RuntimeProfile::Counter* _read_timer = nullptr;
240
241
    // ======SIMD JSON======
242
    // name mapping
243
    /// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
244
    using NameMap = phmap::flat_hash_map<StringRef, size_t, StringRefHash>;
245
    NameMap _slot_desc_index;
246
    /// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
247
    std::vector<NameMap::iterator> _prev_positions;
248
    /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
249
    std::vector<UInt8> _seen_columns;
250
    // simdjson
251
    DorisUniqueBufferPtr<uint8_t> _json_str_ptr;
252
    const uint8_t* _json_str = nullptr;
253
    static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
254
    size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
255
    size_t _original_doc_size = 0;
256
    std::string _simdjson_ondemand_padding_buffer;
257
    std::string _simdjson_ondemand_unscape_padding_buffer;
258
    // char _simdjson_ondemand_padding_buffer[_padded_size];
259
    simdjson::ondemand::document _original_json_doc;
260
    simdjson::ondemand::value _json_value;
261
    // for strip outer array
262
    // array_iter pointed to _array
263
    simdjson::ondemand::array_iterator _array_iter;
264
    simdjson::ondemand::array _array;
265
    std::unique_ptr<simdjson::ondemand::parser> _ondemand_json_parser;
266
    // column to default value string map
267
    std::unordered_map<std::string, std::string> _col_default_value_map;
268
269
    // From document of simdjson:
270
    // ```
271
    //   Important: a value should be consumed once. Calling get_string() twice on the same value is an error.
272
    // ```
273
    // We should cache the string_views to avoid multiple get_string() calls.
274
    struct StringViewHash {
275
0
        size_t operator()(const std::string_view& str) const {
276
0
            return std::hash<int64_t>()(reinterpret_cast<int64_t>(str.data()));
277
0
        }
278
    };
279
    struct StringViewEqual {
280
0
        bool operator()(const std::string_view& lhs, const std::string_view& rhs) const {
281
0
            return lhs.data() == rhs.data() && lhs.size() == rhs.size();
282
0
        }
283
    };
284
    std::unordered_map<std::string_view, std::string_view, StringViewHash, StringViewEqual>
285
            _cached_string_values;
286
287
    int32_t skip_bitmap_col_idx {-1};
288
289
    //Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString.
290
    //If an illegal value is encountered during the load process, `_append_error_msg` should be called
291
    //instead of directly returning `Status::DataQualityError`
292
    bool _is_load = true;
293
294
    // In hive : create table xxx ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
295
    // Hive will not allow you to create columns with the same name but different case, including field names inside
296
    // structs, and will automatically convert uppercase names in create sql to lowercase.However, when Hive loads data
297
    // to table, the column names in the data may be uppercase,and there may be multiple columns with
298
    // the same name but different capitalization.We refer to the behavior of hive, convert all column names
299
    // in the data to lowercase,and use the last one as the insertion value
300
    bool _is_hive_table = false;
301
302
    // hive : org.openx.data.jsonserde.JsonSerDe, `ignore.malformed.json` prop.
303
    // If the variable is true, `null` will be inserted for llegal json format instead of return error.
304
    bool _openx_json_ignore_malformed = false;
305
306
    DataTypeSerDeSPtrs _serdes;
307
    DataTypeSerDe::FormatOptions _serde_options;
308
};
309
310
} // namespace doris