Coverage Report

Created: 2026-05-25 12:01

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/json/new_json_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <rapidjson/allocators.h>
21
#include <rapidjson/document.h>
22
#include <rapidjson/encodings.h>
23
#include <rapidjson/rapidjson.h>
24
#include <simdjson/common_defs.h>
25
#include <simdjson/simdjson.h> // IWYU pragma: keep
26
27
#include <memory>
28
#include <string>
29
#include <string_view>
30
#include <unordered_map>
31
#include <unordered_set>
32
#include <vector>
33
34
#include "common/status.h"
35
#include "core/custom_allocator.h"
36
#include "core/string_ref.h"
37
#include "core/types.h"
38
#include "exprs/json_functions.h"
39
#include "format/line_reader.h"
40
#include "format/table/table_format_reader.h"
41
#include "io/file_factory.h"
42
#include "io/fs/file_reader_writer_fwd.h"
43
#include "runtime/runtime_profile.h"
44
#include "util/decompressor.h"
45
46
namespace simdjson::fallback::ondemand {
47
class object;
48
} // namespace simdjson::fallback::ondemand
49
50
namespace doris {
51
class SlotDescriptor;
52
class RuntimeState;
53
class TFileRangeDesc;
54
class TFileScanRangeParams;
55
56
namespace io {
57
class FileSystem;
58
struct IOContext;
59
} // namespace io
60
61
struct ScannerCounter;
62
class Block;
63
class IColumn;
64
65
namespace json_reader_detail {
66
Status append_null_for_malformed_json(Block& block);
67
void truncate_block_to_rows(Block& block, size_t num_rows);
68
void pop_back_last_inserted_value(Block& block, size_t column_index);
69
} // namespace json_reader_detail
70
71
/// JSON-specific initialization context.
72
/// Extends ReaderInitContext with default value context (unique to JSON reader).
73
struct JsonInitContext final : public ReaderInitContext {
74
    const std::unordered_map<std::string, VExprContextSPtr>* col_default_value_ctx = nullptr;
75
    bool is_load = false;
76
};
77
78
class NewJsonReader : public TableFormatReader {
79
    ENABLE_FACTORY_CREATOR(NewJsonReader);
80
81
public:
82
    NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
83
                  const TFileScanRangeParams& params, const TFileRangeDesc& range,
84
                  const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
85
                  size_t batch_size, io::IOContext* io_ctx,
86
                  std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
87
88
    NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
89
                  const TFileRangeDesc& range, const std::vector<SlotDescriptor*>& file_slot_descs,
90
                  size_t batch_size, io::IOContext* io_ctx,
91
                  std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
92
2
    ~NewJsonReader() override = default;
93
94
    Status init_reader(
95
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx,
96
            bool is_load);
97
98
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override;
99
    Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override;
100
101
    // Row-based readers control throughput via row count, not byte budget.
102
    // The FileScanner's AdaptiveBlockSizePredictor converts the byte budget
103
    // into a predicted row count and calls set_batch_size() with it.
104
    void set_batch_size(size_t batch_size) override;
105
6
    size_t get_batch_size() const override { return _batch_size; }
106
107
    Status init_schema_reader() override;
108
    Status get_parsed_schema(std::vector<std::string>* col_names,
109
                             std::vector<DataTypePtr>* col_types) override;
110
111
protected:
112
    // ---- Unified init_reader(ReaderInitContext*) overrides ----
113
    Status _open_file_reader(ReaderInitContext* ctx) override;
114
    Status _do_init_reader(ReaderInitContext* ctx) override;
115
116
    void _collect_profile_before_close() override;
117
118
private:
119
    Status _get_range_params();
120
    void _init_system_properties();
121
    void _init_file_description();
122
    Status _open_file_reader(bool need_schema);
123
    Status _open_line_reader();
124
    Status _parse_jsonpath_and_json_root();
125
126
    Status _read_json_column(RuntimeState* state, Block& block,
127
                             const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
128
                             bool* eof);
129
130
    Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
131
132
    // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request.
133
    // Need to read all the data before performing JSON parsing.
134
    Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size);
135
136
    // simdjson, replace none simdjson function if it is ready
137
    Status _simdjson_init_reader();
138
    Status _simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
139
                                simdjson::error_code* error);
140
    Status _get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
141
                           bool* is_empty_row);
142
    Status _judge_empty_row(size_t size, bool eof, bool* is_empty_row);
143
144
    Status _handle_simdjson_error(simdjson::simdjson_error& error, Block& block, size_t num_rows,
145
                                  bool* eof);
146
147
    Status _simdjson_handle_simple_json(RuntimeState* state, Block& block,
148
                                        const std::vector<SlotDescriptor*>& slot_descs,
149
                                        bool* is_empty_row, bool* eof);
150
151
    Status _simdjson_handle_simple_json_write_columns(
152
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
153
            bool* eof);
154
155
    Status _simdjson_handle_flat_array_complex_json(RuntimeState* state, Block& block,
156
                                                    const std::vector<SlotDescriptor*>& slot_descs,
157
                                                    bool* is_empty_row, bool* eof);
158
159
    Status _simdjson_handle_flat_array_complex_json_write_columns(
160
            Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
161
            bool* eof);
162
163
    Status _simdjson_handle_nested_complex_json(RuntimeState* state, Block& block,
164
                                                const std::vector<SlotDescriptor*>& slot_descs,
165
                                                bool* is_empty_row, bool* eof);
166
167
    Status _simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
168
                                      const std::vector<SlotDescriptor*>& slot_descs, bool* valid);
169
170
    template <bool use_string_cache>
171
    Status _simdjson_write_data_to_column(simdjson::ondemand::value& value,
172
                                          const DataTypePtr& type_desc, IColumn* column_ptr,
173
                                          const std::string& column_name, DataTypeSerDeSPtr serde,
174
                                          bool* valid);
175
176
    Status _simdjson_write_columns_by_jsonpath(simdjson::ondemand::object* value,
177
                                               const std::vector<SlotDescriptor*>& slot_descs,
178
                                               Block& block, bool* valid);
179
    Status _append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
180
                             std::string col_name, bool* valid);
181
182
    size_t _column_index(const StringRef& name, size_t key_index);
183
184
    Status (NewJsonReader::*_vhandle_json_callback)(RuntimeState* state, Block& block,
185
                                                    const std::vector<SlotDescriptor*>& slot_descs,
186
                                                    bool* is_empty_row, bool* eof);
187
    Status _get_column_default_value(
188
            const std::vector<SlotDescriptor*>& slot_descs,
189
            const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx);
190
191
    Status _fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde,
192
                                IColumn* column_ptr, bool* valid);
193
194
    // fe will add skip_bitmap_col to _file_slot_descs iff the target olap table has skip_bitmap_col
195
    // and the current load is a flexible partial update
196
    // flexible partial update can not be used when user specify jsonpaths, so we just fill the skip bitmap
197
    // in `_simdjson_handle_simple_json` and `_vhandle_simple_json` (which will be used when jsonpaths is not specified)
198
0
    bool _should_process_skip_bitmap_col() const { return skip_bitmap_col_idx != -1; }
199
    void _append_empty_skip_bitmap_value(Block& block, size_t cur_row_count);
200
    void _set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, Block& block,
201
                               size_t cur_row_count, bool* valid);
202
    RuntimeState* _state = nullptr;
203
    RuntimeProfile* _profile = nullptr;
204
    ScannerCounter* _counter = nullptr;
205
    const TFileScanRangeParams& _params;
206
    const TFileRangeDesc& _range;
207
    io::FileSystemProperties _system_properties;
208
    io::FileDescription _file_description;
209
    const std::vector<SlotDescriptor*>& _file_slot_descs;
210
211
    io::FileReaderSPtr _file_reader;
212
    std::unique_ptr<LineReader> _line_reader;
213
    bool _reader_eof;
214
    std::unique_ptr<Decompressor> _decompressor;
215
    TFileCompressType::type _file_compress_type;
216
217
    // When we fetch range doesn't start from 0 will always skip the first line
218
    bool _skip_first_line;
219
220
    std::string _line_delimiter;
221
    size_t _line_delimiter_length;
222
223
    uint32_t _next_row;
224
    size_t _total_rows;
225
226
    std::string _jsonpaths;
227
    std::string _json_root;
228
    bool _read_json_by_line;
229
    bool _strip_outer_array;
230
    bool _num_as_string;
231
    bool _fuzzy_parse;
232
233
    std::vector<std::vector<JsonPath>> _parsed_jsonpaths;
234
    std::vector<JsonPath> _parsed_json_root;
235
    bool _parsed_from_json_root = false; // to avoid parsing json root multiple times
236
237
    char _value_buffer[4 * 1024 * 1024]; // 4MB
238
    char _parse_buffer[512 * 1024];      // 512KB
239
240
    using Document = rapidjson::GenericDocument<rapidjson::UTF8<>, rapidjson::MemoryPoolAllocator<>,
241
                                                rapidjson::MemoryPoolAllocator<>>;
242
    rapidjson::MemoryPoolAllocator<> _value_allocator;
243
    rapidjson::MemoryPoolAllocator<> _parse_allocator;
244
    Document _origin_json_doc;   // origin json document object from parsed json string
245
    rapidjson::Value* _json_doc; // _json_doc equals _final_json_doc iff not set `json_root`
246
    std::unordered_map<std::string, int> _name_map;
247
248
    bool* _scanner_eof = nullptr;
249
250
    size_t _current_offset;
251
252
    io::IOContext* _io_ctx = nullptr;
253
    std::shared_ptr<io::IOContext> _io_ctx_holder;
254
255
    RuntimeProfile::Counter* _read_timer = nullptr;
256
257
    // ======SIMD JSON======
258
    // name mapping
259
    /// Hash table match `field name -> position in the block`. NOTE You can use perfect hash map.
260
    using NameMap = phmap::flat_hash_map<StringRef, size_t, StringRefHash>;
261
    NameMap _slot_desc_index;
262
    /// Cached search results for previous row (keyed as index in JSON object) - used as a hint.
263
    std::vector<NameMap::iterator> _prev_positions;
264
    /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
265
    std::vector<UInt8> _seen_columns;
266
    // simdjson
267
    DorisUniqueBufferPtr<uint8_t> _json_str_ptr;
268
    const uint8_t* _json_str = nullptr;
269
    static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
270
    size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
271
    size_t _original_doc_size = 0;
272
    std::string _simdjson_ondemand_padding_buffer;
273
    std::string _simdjson_ondemand_unscape_padding_buffer;
274
    // char _simdjson_ondemand_padding_buffer[_padded_size];
275
    simdjson::ondemand::document _original_json_doc;
276
    simdjson::ondemand::value _json_value;
277
    // for strip outer array
278
    // array_iter pointed to _array
279
    simdjson::ondemand::array_iterator _array_iter;
280
    simdjson::ondemand::array _array;
281
    std::unique_ptr<simdjson::ondemand::parser> _ondemand_json_parser;
282
    // column to default value string map
283
    std::unordered_map<std::string, std::string> _col_default_value_map;
284
285
    // From document of simdjson:
286
    // ```
287
    //   Important: a value should be consumed once. Calling get_string() twice on the same value is an error.
288
    // ```
289
    // We should cache the string_views to avoid multiple get_string() calls.
290
    struct StringViewHash {
291
0
        size_t operator()(const std::string_view& str) const {
292
0
            return std::hash<int64_t>()(reinterpret_cast<int64_t>(str.data()));
293
0
        }
294
    };
295
    struct StringViewEqual {
296
0
        bool operator()(const std::string_view& lhs, const std::string_view& rhs) const {
297
0
            return lhs.data() == rhs.data() && lhs.size() == rhs.size();
298
0
        }
299
    };
300
    std::unordered_map<std::string_view, std::string_view, StringViewHash, StringViewEqual>
301
            _cached_string_values;
302
303
    int32_t skip_bitmap_col_idx {-1};
304
305
    //Used to indicate whether it is a stream load. When loading, only data will be inserted into columnString.
306
    //If an illegal value is encountered during the load process, `_append_error_msg` should be called
307
    //instead of directly returning `Status::DataQualityError`
308
    bool _is_load = true;
309
310
    // In hive : create table xxx ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe';
311
    // Hive will not allow you to create columns with the same name but different case, including field names inside
312
    // structs, and will automatically convert uppercase names in create sql to lowercase.However, when Hive loads data
313
    // to table, the column names in the data may be uppercase,and there may be multiple columns with
314
    // the same name but different capitalization.We refer to the behavior of hive, convert all column names
315
    // in the data to lowercase,and use the last one as the insertion value
316
    bool _is_hive_table = false;
317
318
    // hive : org.openx.data.jsonserde.JsonSerDe, `ignore.malformed.json` prop.
319
    // If the variable is true, `null` will be inserted for llegal json format instead of return error.
320
    bool _openx_json_ignore_malformed = false;
321
322
    DataTypeSerDeSPtrs _serdes;
323
    DataTypeSerDe::FormatOptions _serde_options;
324
    // Adaptive batch size set by FileScanner.
325
    size_t _batch_size;
326
};
327
328
} // namespace doris