Coverage Report

Created: 2026-05-17 04:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/json/new_json_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/json/new_json_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/Metrics_types.h>
22
#include <gen_cpp/PlanNodes_types.h>
23
#include <gen_cpp/Types_types.h>
24
#include <glog/logging.h>
25
#include <rapidjson/error/en.h>
26
#include <rapidjson/reader.h>
27
#include <rapidjson/stringbuffer.h>
28
#include <rapidjson/writer.h>
29
#include <simdjson/simdjson.h> // IWYU pragma: keep
30
31
#include <algorithm>
32
#include <cinttypes>
33
#include <cstdio>
34
#include <cstring>
35
#include <map>
36
#include <memory>
37
#include <string_view>
38
#include <utility>
39
40
#include "common/compiler_util.h" // IWYU pragma: keep
41
#include "common/config.h"
42
#include "common/exception.h"
43
#include "common/status.h"
44
#include "core/assert_cast.h"
45
#include "core/block/column_with_type_and_name.h"
46
#include "core/column/column.h"
47
#include "core/column/column_array.h"
48
#include "core/column/column_map.h"
49
#include "core/column/column_nullable.h"
50
#include "core/column/column_string.h"
51
#include "core/column/column_struct.h"
52
#include "core/column/column_variant.h"
53
#include "core/custom_allocator.h"
54
#include "core/data_type/data_type_array.h"
55
#include "core/data_type/data_type_factory.hpp"
56
#include "core/data_type/data_type_map.h"
57
#include "core/data_type/data_type_number.h" // IWYU pragma: keep
58
#include "core/data_type/data_type_struct.h"
59
#include "core/data_type/define_primitive_type.h"
60
#include "exec/common/variant_util.h"
61
#include "exec/scan/scanner.h"
62
#include "exprs/json_functions.h"
63
#include "format/file_reader/new_plain_text_line_reader.h"
64
#include "io/file_factory.h"
65
#include "io/fs/buffered_reader.h"
66
#include "io/fs/file_reader.h"
67
#include "io/fs/stream_load_pipe.h"
68
#include "io/fs/tracing_file_reader.h"
69
#include "runtime/descriptors.h"
70
#include "runtime/runtime_state.h"
71
#include "util/slice.h"
72
73
namespace doris::io {
74
struct IOContext;
75
enum class FileCachePolicy : uint8_t;
76
} // namespace doris::io
77
78
namespace doris {
79
using namespace ErrorCode;
80
81
NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
82
                             const TFileScanRangeParams& params, const TFileRangeDesc& range,
83
                             const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof,
84
                             size_t batch_size, io::IOContext* io_ctx,
85
                             std::shared_ptr<io::IOContext> io_ctx_holder)
86
0
        : _vhandle_json_callback(nullptr),
87
0
          _state(state),
88
0
          _profile(profile),
89
0
          _counter(counter),
90
0
          _params(params),
91
0
          _range(range),
92
0
          _file_slot_descs(file_slot_descs),
93
0
          _file_reader(nullptr),
94
0
          _line_reader(nullptr),
95
0
          _reader_eof(false),
96
0
          _decompressor(nullptr),
97
0
          _skip_first_line(false),
98
0
          _next_row(0),
99
0
          _total_rows(0),
100
0
          _value_allocator(_value_buffer, sizeof(_value_buffer)),
101
0
          _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
102
0
          _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
103
0
          _scanner_eof(scanner_eof),
104
0
          _current_offset(0),
105
0
          _io_ctx(io_ctx),
106
0
          _io_ctx_holder(std::move(io_ctx_holder)),
107
0
          _batch_size(std::max(batch_size, 1UL)) {
108
0
    if (_io_ctx == nullptr && _io_ctx_holder) {
109
0
        _io_ctx = _io_ctx_holder.get();
110
0
    }
111
0
    _read_timer = ADD_TIMER(_profile, "ReadTime");
112
0
    if (_range.__isset.compress_type) {
113
        // for compatibility
114
0
        _file_compress_type = _range.compress_type;
115
0
    } else {
116
0
        _file_compress_type = _params.compress_type;
117
0
    }
118
0
    _init_system_properties();
119
0
    _init_file_description();
120
0
}
121
122
NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params,
123
                             const TFileRangeDesc& range,
124
                             const std::vector<SlotDescriptor*>& file_slot_descs, size_t batch_size,
125
                             io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
126
2
        : _vhandle_json_callback(nullptr),
127
2
          _state(nullptr),
128
2
          _profile(profile),
129
2
          _params(params),
130
2
          _range(range),
131
2
          _file_slot_descs(file_slot_descs),
132
2
          _line_reader(nullptr),
133
2
          _reader_eof(false),
134
2
          _decompressor(nullptr),
135
2
          _skip_first_line(false),
136
2
          _next_row(0),
137
2
          _total_rows(0),
138
2
          _value_allocator(_value_buffer, sizeof(_value_buffer)),
139
2
          _parse_allocator(_parse_buffer, sizeof(_parse_buffer)),
140
2
          _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator),
141
2
          _io_ctx(io_ctx),
142
2
          _io_ctx_holder(std::move(io_ctx_holder)),
143
2
          _batch_size(std::max(batch_size, 1UL)) {
144
2
    if (_io_ctx == nullptr && _io_ctx_holder) {
145
0
        _io_ctx = _io_ctx_holder.get();
146
0
    }
147
2
    if (_range.__isset.compress_type) {
148
        // for compatibility
149
0
        _file_compress_type = _range.compress_type;
150
2
    } else {
151
2
        _file_compress_type = _params.compress_type;
152
2
    }
153
2
    _init_system_properties();
154
2
    _init_file_description();
155
2
}
156
157
2
void NewJsonReader::_init_system_properties() {
158
2
    if (_range.__isset.file_type) {
159
        // for compatibility
160
0
        _system_properties.system_type = _range.file_type;
161
2
    } else {
162
2
        _system_properties.system_type = _params.file_type;
163
2
    }
164
2
    _system_properties.properties = _params.properties;
165
2
    _system_properties.hdfs_params = _params.hdfs_params;
166
2
    if (_params.__isset.broker_addresses) {
167
0
        _system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
168
0
                                                   _params.broker_addresses.end());
169
0
    }
170
2
}
171
172
2
void NewJsonReader::_init_file_description() {
173
2
    _file_description.path = _range.path;
174
2
    _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
175
176
2
    if (_range.__isset.fs_name) {
177
0
        _file_description.fs_name = _range.fs_name;
178
0
    }
179
2
    if (_range.__isset.file_cache_admission) {
180
0
        _file_description.file_cache_admission = _range.file_cache_admission;
181
0
    }
182
2
}
183
184
Status NewJsonReader::init_reader(
185
        const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx,
186
0
        bool is_load) {
187
0
    _is_load = is_load;
188
189
    // generate _col_default_value_map
190
0
    RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, col_default_value_ctx));
191
192
    //use serde insert data to column.
193
0
    for (auto* slot_desc : _file_slot_descs) {
194
0
        _serdes.emplace_back(slot_desc->get_data_type_ptr()->get_serde());
195
0
    }
196
197
    // create decompressor.
198
    // _decompressor may be nullptr if this is not a compressed file
199
0
    RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
200
201
0
    RETURN_IF_ERROR(_simdjson_init_reader());
202
0
    return Status::OK();
203
0
}
204
205
// ---- Unified init_reader(ReaderInitContext*) overrides ----
206
207
0
Status NewJsonReader::_open_file_reader(ReaderInitContext* /*ctx*/) {
208
0
    RETURN_IF_ERROR(_get_range_params());
209
0
    RETURN_IF_ERROR(_open_file_reader(false));
210
0
    return Status::OK();
211
0
}
212
213
0
Status NewJsonReader::_do_init_reader(ReaderInitContext* base_ctx) {
214
0
    auto* ctx = checked_context_cast<JsonInitContext>(base_ctx);
215
0
    _is_load = ctx->is_load;
216
217
0
    RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, *ctx->col_default_value_ctx));
218
0
    for (auto* slot_desc : _file_slot_descs) {
219
0
        _serdes.emplace_back(slot_desc->get_data_type_ptr()->get_serde());
220
0
    }
221
222
    // Create decompressor (needed by line reader below)
223
0
    RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
224
225
0
    if (LIKELY(_read_json_by_line)) {
226
0
        RETURN_IF_ERROR(_open_line_reader());
227
0
    }
228
0
    RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
229
230
0
    if (_parsed_jsonpaths.empty()) {
231
0
        _vhandle_json_callback = &NewJsonReader::_simdjson_handle_simple_json;
232
0
    } else {
233
0
        if (_strip_outer_array) {
234
0
            _vhandle_json_callback = &NewJsonReader::_simdjson_handle_flat_array_complex_json;
235
0
        } else {
236
0
            _vhandle_json_callback = &NewJsonReader::_simdjson_handle_nested_complex_json;
237
0
        }
238
0
    }
239
0
    _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
240
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
241
0
        _slot_desc_index[StringRef {_file_slot_descs[i]->col_name()}] = i;
242
0
        if (_file_slot_descs[i]->is_skip_bitmap_col()) {
243
0
            skip_bitmap_col_idx = i;
244
0
        }
245
0
    }
246
0
    _simdjson_ondemand_padding_buffer.resize(_padded_size);
247
0
    _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
248
0
    return Status::OK();
249
0
}
250
251
5
void NewJsonReader::set_batch_size(size_t batch_size) {
252
    // 0 means "not set" / "use default" for the row-based readers; we must
253
    // never let _batch_size be 0 because _do_get_next_block uses it as the
254
    // upper bound of a `while (block->rows() < batch_size)` loop and a 0
255
    // would make the reader return without setting eof, causing the scanner
256
    // to spin on empty blocks.
257
5
    _batch_size = std::max(batch_size, 1UL);
258
5
}
259
260
0
Status NewJsonReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
261
0
    if (_reader_eof) {
262
0
        *eof = true;
263
0
        return Status::OK();
264
0
    }
265
266
0
    const auto batch_size = _batch_size;
267
0
    const auto max_block_bytes = _state->preferred_block_size_bytes();
268
269
0
    while (block->rows() < batch_size && !_reader_eof && (block->bytes() < max_block_bytes)) {
270
0
        if (UNLIKELY(_read_json_by_line && _skip_first_line)) {
271
0
            size_t size = 0;
272
0
            const uint8_t* line_ptr = nullptr;
273
0
            RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, &_reader_eof, _io_ctx));
274
0
            _skip_first_line = false;
275
0
            continue;
276
0
        }
277
278
0
        bool is_empty_row = false;
279
280
0
        RETURN_IF_ERROR(
281
0
                _read_json_column(_state, *block, _file_slot_descs, &is_empty_row, &_reader_eof));
282
0
        if (is_empty_row) {
283
            // Read empty row, just continue
284
0
            continue;
285
0
        }
286
0
        ++(*read_rows);
287
0
    }
288
289
0
    return Status::OK();
290
0
}
291
292
Status NewJsonReader::_get_columns_impl(
293
0
        std::unordered_map<std::string, DataTypePtr>* name_to_type) {
294
0
    for (const auto& slot : _file_slot_descs) {
295
0
        name_to_type->emplace(slot->col_name(), slot->type());
296
0
    }
297
0
    return Status::OK();
298
0
}
299
300
// init decompressor, file reader and line reader for parsing schema
301
0
Status NewJsonReader::init_schema_reader() {
302
0
    RETURN_IF_ERROR(_get_range_params());
303
    // create decompressor.
304
    // _decompressor may be nullptr if this is not a compressed file
305
0
    RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
306
0
    RETURN_IF_ERROR(_open_file_reader(true));
307
0
    if (_read_json_by_line) {
308
0
        RETURN_IF_ERROR(_open_line_reader());
309
0
    }
310
    // generate _parsed_jsonpaths and _parsed_json_root
311
0
    RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
312
0
    return Status::OK();
313
0
}
314
315
Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
316
0
                                        std::vector<DataTypePtr>* col_types) {
317
0
    bool eof = false;
318
0
    const uint8_t* json_str = nullptr;
319
0
    DorisUniqueBufferPtr<uint8_t> json_str_ptr;
320
0
    size_t size = 0;
321
0
    if (_line_reader != nullptr) {
322
0
        RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof, _io_ctx));
323
0
    } else {
324
0
        size_t read_size = 0;
325
0
        RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size));
326
0
        json_str = json_str_ptr.get();
327
0
        size = read_size;
328
0
        if (read_size == 0) {
329
0
            eof = true;
330
0
        }
331
0
    }
332
333
0
    if (size == 0 || eof) {
334
0
        return Status::EndOfFile("Empty file.");
335
0
    }
336
337
    // clear memory here.
338
0
    _value_allocator.Clear();
339
0
    _parse_allocator.Clear();
340
0
    bool has_parse_error = false;
341
342
    // parse jsondata to JsonDoc
343
    // As the issue: https://github.com/Tencent/rapidjson/issues/1458
344
    // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag.
345
0
    if (_num_as_string) {
346
0
        has_parse_error =
347
0
                _origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, size)
348
0
                        .HasParseError();
349
0
    } else {
350
0
        has_parse_error = _origin_json_doc.Parse((char*)json_str, size).HasParseError();
351
0
    }
352
353
0
    if (has_parse_error) {
354
0
        return Status::DataQualityError(
355
0
                "Parse json data for JsonDoc failed. code: {}, error info: {}",
356
0
                _origin_json_doc.GetParseError(),
357
0
                rapidjson::GetParseError_En(_origin_json_doc.GetParseError()));
358
0
    }
359
360
    // set json root
361
0
    if (!_parsed_json_root.empty()) {
362
0
        _json_doc = JsonFunctions::get_json_object_from_parsed_json(
363
0
                _parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator());
364
0
        if (_json_doc == nullptr) {
365
0
            return Status::DataQualityError("JSON Root not found.");
366
0
        }
367
0
    } else {
368
0
        _json_doc = &_origin_json_doc;
369
0
    }
370
371
0
    if (_json_doc->IsArray() && !_strip_outer_array) {
372
0
        return Status::DataQualityError(
373
0
                "JSON data is array-object, `strip_outer_array` must be TRUE.");
374
0
    }
375
0
    if (!_json_doc->IsArray() && _strip_outer_array) {
376
0
        return Status::DataQualityError(
377
0
                "JSON data is not an array-object, `strip_outer_array` must be FALSE.");
378
0
    }
379
380
0
    rapidjson::Value* objectValue = nullptr;
381
0
    if (_json_doc->IsArray()) {
382
0
        if (_json_doc->Size() == 0) {
383
            // may be passing an empty json, such as "[]"
384
0
            return Status::InternalError<false>("Empty first json line");
385
0
        }
386
0
        objectValue = &(*_json_doc)[0];
387
0
    } else {
388
0
        objectValue = _json_doc;
389
0
    }
390
391
0
    if (!objectValue->IsObject()) {
392
0
        return Status::DataQualityError("JSON data is not an object. but: {}",
393
0
                                        objectValue->GetType());
394
0
    }
395
396
    // use jsonpaths to col_names
397
0
    if (!_parsed_jsonpaths.empty()) {
398
0
        for (auto& _parsed_jsonpath : _parsed_jsonpaths) {
399
0
            size_t len = _parsed_jsonpath.size();
400
0
            if (len == 0) {
401
0
                return Status::InvalidArgument("It's invalid jsonpaths.");
402
0
            }
403
0
            std::string key = _parsed_jsonpath[len - 1].key;
404
0
            col_names->emplace_back(key);
405
0
            col_types->emplace_back(
406
0
                    DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
407
0
        }
408
0
        return Status::OK();
409
0
    }
410
411
0
    for (int i = 0; i < objectValue->MemberCount(); ++i) {
412
0
        auto it = objectValue->MemberBegin() + i;
413
0
        col_names->emplace_back(it->name.GetString());
414
0
        col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
415
0
    }
416
0
    return Status::OK();
417
0
}
418
419
0
Status NewJsonReader::_get_range_params() {
420
0
    if (!_params.__isset.file_attributes) {
421
0
        return Status::InternalError<false>("BE cat get file_attributes");
422
0
    }
423
424
    // get line_delimiter
425
0
    if (_params.file_attributes.__isset.text_params &&
426
0
        _params.file_attributes.text_params.__isset.line_delimiter) {
427
0
        _line_delimiter = _params.file_attributes.text_params.line_delimiter;
428
0
        _line_delimiter_length = _line_delimiter.size();
429
0
    }
430
431
0
    if (_params.file_attributes.__isset.jsonpaths) {
432
0
        _jsonpaths = _params.file_attributes.jsonpaths;
433
0
    }
434
0
    if (_params.file_attributes.__isset.json_root) {
435
0
        _json_root = _params.file_attributes.json_root;
436
0
    }
437
0
    if (_params.file_attributes.__isset.read_json_by_line) {
438
0
        _read_json_by_line = _params.file_attributes.read_json_by_line;
439
0
    }
440
0
    if (_params.file_attributes.__isset.strip_outer_array) {
441
0
        _strip_outer_array = _params.file_attributes.strip_outer_array;
442
0
    }
443
0
    if (_params.file_attributes.__isset.num_as_string) {
444
0
        _num_as_string = _params.file_attributes.num_as_string;
445
0
    }
446
0
    if (_params.file_attributes.__isset.fuzzy_parse) {
447
0
        _fuzzy_parse = _params.file_attributes.fuzzy_parse;
448
0
    }
449
0
    if (_range.table_format_params.table_format_type == "hive") {
450
0
        _is_hive_table = true;
451
0
    }
452
0
    if (_params.file_attributes.__isset.openx_json_ignore_malformed) {
453
0
        _openx_json_ignore_malformed = _params.file_attributes.openx_json_ignore_malformed;
454
0
    }
455
0
    return Status::OK();
456
0
}
457
458
0
static Status ignore_malformed_json_append_null(Block& block) {
459
0
    for (auto& column : block.get_columns()) {
460
0
        if (!column->is_nullable()) [[unlikely]] {
461
0
            return Status::DataQualityError("malformed json, but the column `{}` is not nullable.",
462
0
                                            column->get_name());
463
0
        }
464
0
        static_cast<ColumnNullable*>(column->assume_mutable().get())->insert_default();
465
0
    }
466
0
    return Status::OK();
467
0
}
468
469
0
Status NewJsonReader::_open_file_reader(bool need_schema) {
470
0
    int64_t start_offset = _range.start_offset;
471
0
    if (start_offset != 0) {
472
0
        start_offset -= 1;
473
0
    }
474
475
0
    _current_offset = start_offset;
476
477
0
    if (_params.file_type == TFileType::FILE_STREAM) {
478
        // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here
479
0
        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
480
0
                                                        need_schema));
481
0
    } else {
482
0
        _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
483
0
        io::FileReaderOptions reader_options =
484
0
                FileFactory::get_reader_options(_state, _file_description);
485
0
        io::FileReaderSPtr file_reader;
486
0
        if (_io_ctx_holder) {
487
0
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
488
0
                    _profile, _system_properties, _file_description, reader_options,
489
0
                    io::DelegateReader::AccessMode::SEQUENTIAL,
490
0
                    std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
491
0
                    io::PrefetchRange(_range.start_offset, _range.size)));
492
0
        } else {
493
0
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
494
0
                    _profile, _system_properties, _file_description, reader_options,
495
0
                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
496
0
                    io::PrefetchRange(_range.start_offset, _range.size)));
497
0
        }
498
0
        _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
499
0
                                                                         _io_ctx->file_reader_stats)
500
0
                               : file_reader;
501
0
    }
502
0
    return Status::OK();
503
0
}
504
505
0
Status NewJsonReader::_open_line_reader() {
506
0
    int64_t size = _range.size;
507
0
    if (_range.start_offset != 0) {
508
        // When we fetch range doesn't start from 0, size will += 1.
509
0
        size += 1;
510
0
        _skip_first_line = true;
511
0
    } else {
512
0
        _skip_first_line = false;
513
0
    }
514
0
    _line_reader = NewPlainTextLineReader::create_unique(
515
0
            _profile, _file_reader, _decompressor.get(),
516
0
            std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length,
517
0
                                                     false),
518
0
            size, _current_offset);
519
0
    return Status::OK();
520
0
}
521
522
0
Status NewJsonReader::_parse_jsonpath_and_json_root() {
523
    // parse jsonpaths
524
0
    if (!_jsonpaths.empty()) {
525
0
        rapidjson::Document jsonpaths_doc;
526
0
        if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError()) {
527
0
            if (!jsonpaths_doc.IsArray()) {
528
0
                return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths);
529
0
            }
530
0
            for (int i = 0; i < jsonpaths_doc.Size(); i++) {
531
0
                const rapidjson::Value& path = jsonpaths_doc[i];
532
0
                if (!path.IsString()) {
533
0
                    return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths);
534
0
                }
535
0
                std::string json_path = path.GetString();
536
                // $ -> $. in json_path
537
0
                if (UNLIKELY(json_path.size() == 1 && json_path[0] == '$')) {
538
0
                    json_path.insert(1, ".");
539
0
                }
540
0
                std::vector<JsonPath> parsed_paths;
541
0
                JsonFunctions::parse_json_paths(json_path, &parsed_paths);
542
0
                _parsed_jsonpaths.push_back(std::move(parsed_paths));
543
0
            }
544
545
0
        } else {
546
0
            return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths);
547
0
        }
548
0
    }
549
550
    // parse jsonroot
551
0
    if (!_json_root.empty()) {
552
0
        std::string json_root = _json_root;
553
        //  $ -> $. in json_root
554
0
        if (json_root.size() == 1 && json_root[0] == '$') {
555
0
            json_root.insert(1, ".");
556
0
        }
557
0
        JsonFunctions::parse_json_paths(json_root, &_parsed_json_root);
558
0
    }
559
0
    return Status::OK();
560
0
}
561
562
Status NewJsonReader::_read_json_column(RuntimeState* state, Block& block,
563
                                        const std::vector<SlotDescriptor*>& slot_descs,
564
0
                                        bool* is_empty_row, bool* eof) {
565
0
    return (this->*_vhandle_json_callback)(state, block, slot_descs, is_empty_row, eof);
566
0
}
567
568
Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf,
569
0
                                        size_t* read_size) {
570
0
    switch (_params.file_type) {
571
0
    case TFileType::FILE_LOCAL:
572
0
        [[fallthrough]];
573
0
    case TFileType::FILE_HDFS:
574
0
    case TFileType::FILE_HTTP:
575
0
        [[fallthrough]];
576
0
    case TFileType::FILE_S3: {
577
0
        size_t file_size = _file_reader->size();
578
0
        *file_buf = make_unique_buffer<uint8_t>(file_size);
579
0
        Slice result(file_buf->get(), file_size);
580
0
        RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, read_size, _io_ctx));
581
0
        _current_offset += *read_size;
582
0
        break;
583
0
    }
584
0
    case TFileType::FILE_STREAM: {
585
0
        RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size));
586
0
        break;
587
0
    }
588
0
    default: {
589
0
        return Status::NotSupported<false>("no supported file reader type: {}", _params.file_type);
590
0
    }
591
0
    }
592
0
    return Status::OK();
593
0
}
594
595
Status NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf,
596
0
                                                  size_t* read_size) {
597
0
    auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
598
599
    // first read: read from the pipe once.
600
0
    RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size));
601
602
    // When the file is not chunked, the entire file has already been read.
603
0
    if (!stream_load_pipe->is_chunked_transfer()) {
604
0
        return Status::OK();
605
0
    }
606
607
0
    std::vector<uint8_t> buf;
608
0
    uint64_t cur_size = 0;
609
610
    // second read: continuously read data from the pipe until all data is read.
611
0
    DorisUniqueBufferPtr<uint8_t> read_buf;
612
0
    size_t read_buf_size = 0;
613
0
    while (true) {
614
0
        RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size));
615
0
        if (read_buf_size == 0) {
616
0
            break;
617
0
        } else {
618
0
            buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size);
619
0
            cur_size += read_buf_size;
620
0
            read_buf_size = 0;
621
0
            read_buf.reset();
622
0
        }
623
0
    }
624
625
    // No data is available during the second read.
626
0
    if (cur_size == 0) {
627
0
        return Status::OK();
628
0
    }
629
630
0
    DorisUniqueBufferPtr<uint8_t> total_buf = make_unique_buffer<uint8_t>(cur_size + *read_size);
631
632
    // copy the data during the first read
633
0
    memcpy(total_buf.get(), file_buf->get(), *read_size);
634
635
    // copy the data during the second read
636
0
    memcpy(total_buf.get() + *read_size, buf.data(), cur_size);
637
0
    *file_buf = std::move(total_buf);
638
0
    *read_size += cur_size;
639
0
    return Status::OK();
640
0
}
641
642
// ---------SIMDJSON----------
643
// simdjson, replace none simdjson function if it is ready
644
0
Status NewJsonReader::_simdjson_init_reader() {
645
0
    RETURN_IF_ERROR(_get_range_params());
646
647
0
    RETURN_IF_ERROR(_open_file_reader(false));
648
0
    if (LIKELY(_read_json_by_line)) {
649
0
        RETURN_IF_ERROR(_open_line_reader());
650
0
    }
651
652
    // generate _parsed_jsonpaths and _parsed_json_root
653
0
    RETURN_IF_ERROR(_parse_jsonpath_and_json_root());
654
655
    //improve performance
656
0
    if (_parsed_jsonpaths.empty()) { // input is a simple json-string
657
0
        _vhandle_json_callback = &NewJsonReader::_simdjson_handle_simple_json;
658
0
    } else { // input is a complex json-string and a json-path
659
0
        if (_strip_outer_array) {
660
0
            _vhandle_json_callback = &NewJsonReader::_simdjson_handle_flat_array_complex_json;
661
0
        } else {
662
0
            _vhandle_json_callback = &NewJsonReader::_simdjson_handle_nested_complex_json;
663
0
        }
664
0
    }
665
0
    _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>();
666
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
667
0
        _slot_desc_index[StringRef {_file_slot_descs[i]->col_name()}] = i;
668
0
        if (_file_slot_descs[i]->is_skip_bitmap_col()) {
669
0
            skip_bitmap_col_idx = i;
670
0
        }
671
0
    }
672
0
    _simdjson_ondemand_padding_buffer.resize(_padded_size);
673
0
    _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size);
674
0
    return Status::OK();
675
0
}
676
677
Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Block& block,
678
0
                                             size_t num_rows, bool* eof) {
679
0
    fmt::memory_buffer error_msg;
680
0
    fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", error.error(),
681
0
                   error.what());
682
0
    _counter->num_rows_filtered++;
683
    // Before continuing to process other rows, we need to first clean the fail parsed row.
684
0
    for (int i = 0; i < block.columns(); ++i) {
685
0
        auto column = block.get_by_position(i).column->assume_mutable();
686
0
        if (column->size() > num_rows) {
687
0
            column->pop_back(column->size() - num_rows);
688
0
        }
689
0
    }
690
691
0
    RETURN_IF_ERROR(_state->append_error_msg_to_file(
692
0
            [&]() -> std::string {
693
0
                return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size);
694
0
            },
695
0
            [&]() -> std::string { return fmt::to_string(error_msg); }));
696
0
    return Status::OK();
697
0
}
698
699
Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Block& block,
700
                                                   const std::vector<SlotDescriptor*>& slot_descs,
701
0
                                                   bool* is_empty_row, bool* eof) {
702
    // simple json
703
0
    size_t size = 0;
704
0
    simdjson::error_code error;
705
0
    size_t num_rows = block.rows();
706
0
    try {
707
        // step1: get and parse buf to get json doc
708
0
        RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
709
0
        if (size == 0 || *eof) {
710
0
            *is_empty_row = true;
711
0
            return Status::OK();
712
0
        }
713
714
        // step2: get json value by json doc
715
0
        Status st = _get_json_value(&size, eof, &error, is_empty_row);
716
0
        if (st.is<DATA_QUALITY_ERROR>()) {
717
0
            if (_is_load) {
718
0
                return Status::OK();
719
0
            } else if (_openx_json_ignore_malformed) {
720
0
                RETURN_IF_ERROR(ignore_malformed_json_append_null(block));
721
0
                return Status::OK();
722
0
            }
723
0
        }
724
725
0
        RETURN_IF_ERROR(st);
726
0
        if (*is_empty_row || *eof) {
727
0
            return Status::OK();
728
0
        }
729
730
        // step 3: write columns by json value
731
0
        RETURN_IF_ERROR(
732
0
                _simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof));
733
0
    } catch (simdjson::simdjson_error& e) {
734
0
        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
735
0
        if (*_scanner_eof) {
736
            // When _scanner_eof is true and valid is false, it means that we have encountered
737
            // unqualified data and decided to stop the scan.
738
0
            *is_empty_row = true;
739
0
            return Status::OK();
740
0
        }
741
0
    }
742
743
0
    return Status::OK();
744
0
}
745
746
Status NewJsonReader::_simdjson_handle_simple_json_write_columns(
747
        Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
748
0
        bool* eof) {
749
0
    simdjson::ondemand::object objectValue;
750
0
    size_t num_rows = block.rows();
751
0
    bool valid = false;
752
0
    try {
753
0
        if (_json_value.type() == simdjson::ondemand::json_type::array) {
754
0
            _array = _json_value.get_array();
755
0
            if (_array.count_elements() == 0) {
756
                // may be passing an empty json, such as "[]"
757
0
                RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", "", nullptr));
758
0
                if (*_scanner_eof) {
759
0
                    *is_empty_row = true;
760
0
                    return Status::OK();
761
0
                }
762
0
                return Status::OK();
763
0
            }
764
765
0
            _array_iter = _array.begin();
766
0
            while (true) {
767
0
                objectValue = *_array_iter;
768
0
                RETURN_IF_ERROR(
769
0
                        _simdjson_set_column_value(&objectValue, block, slot_descs, &valid));
770
0
                if (!valid) {
771
0
                    if (*_scanner_eof) {
772
                        // When _scanner_eof is true and valid is false, it means that we have encountered
773
                        // unqualified data and decided to stop the scan.
774
0
                        *is_empty_row = true;
775
0
                        return Status::OK();
776
0
                    }
777
0
                }
778
0
                ++_array_iter;
779
0
                if (_array_iter == _array.end()) {
780
                    // Hint to read next json doc
781
0
                    break;
782
0
                }
783
0
            }
784
0
        } else {
785
0
            objectValue = _json_value;
786
0
            RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block, slot_descs, &valid));
787
0
            if (!valid) {
788
0
                if (*_scanner_eof) {
789
0
                    *is_empty_row = true;
790
0
                    return Status::OK();
791
0
                }
792
0
            }
793
0
            *is_empty_row = false;
794
0
        }
795
0
    } catch (simdjson::simdjson_error& e) {
796
0
        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
797
0
        if (!valid) {
798
0
            if (*_scanner_eof) {
799
0
                *is_empty_row = true;
800
0
                return Status::OK();
801
0
            }
802
0
        }
803
0
    }
804
0
    return Status::OK();
805
0
}
806
807
Status NewJsonReader::_simdjson_handle_flat_array_complex_json(
808
        RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs,
809
0
        bool* is_empty_row, bool* eof) {
810
    // array complex json
811
0
    size_t size = 0;
812
0
    simdjson::error_code error;
813
0
    size_t num_rows = block.rows();
814
0
    try {
815
        // step1: get and parse buf to get json doc
816
0
        RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
817
0
        if (size == 0 || *eof) {
818
0
            *is_empty_row = true;
819
0
            return Status::OK();
820
0
        }
821
822
        // step2: get json value by json doc
823
0
        Status st = _get_json_value(&size, eof, &error, is_empty_row);
824
0
        if (st.is<DATA_QUALITY_ERROR>()) {
825
0
            return Status::OK();
826
0
        }
827
0
        RETURN_IF_ERROR(st);
828
0
        if (*is_empty_row) {
829
0
            return Status::OK();
830
0
        }
831
832
        // step 3: write columns by json value
833
0
        RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs,
834
0
                                                                               is_empty_row, eof));
835
0
    } catch (simdjson::simdjson_error& e) {
836
0
        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
837
0
        if (*_scanner_eof) {
838
            // When _scanner_eof is true and valid is false, it means that we have encountered
839
            // unqualified data and decided to stop the scan.
840
0
            *is_empty_row = true;
841
0
            return Status::OK();
842
0
        }
843
0
    }
844
845
0
    return Status::OK();
846
0
}
847
848
Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns(
849
        Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row,
850
0
        bool* eof) {
851
// Advance one row in array list, if it is the endpoint, stop advance and break the loop
852
0
#define ADVANCE_ROW()                  \
853
0
    ++_array_iter;                     \
854
0
    if (_array_iter == _array.end()) { \
855
0
        break;                         \
856
0
    }
857
858
0
    simdjson::ondemand::object cur;
859
0
    size_t num_rows = block.rows();
860
0
    try {
861
0
        bool valid = true;
862
0
        _array = _json_value.get_array();
863
0
        _array_iter = _array.begin();
864
865
0
        while (true) {
866
0
            cur = (*_array_iter).get_object();
867
            // extract root
868
0
            if (!_parsed_from_json_root && !_parsed_json_root.empty()) {
869
0
                simdjson::ondemand::value val;
870
0
                Status st = JsonFunctions::extract_from_object(cur, _parsed_json_root, &val);
871
0
                if (UNLIKELY(!st.ok())) {
872
0
                    if (st.is<NOT_FOUND>()) {
873
0
                        RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr));
874
0
                        ADVANCE_ROW();
875
0
                        continue;
876
0
                    }
877
0
                    return st;
878
0
                }
879
0
                if (val.type() != simdjson::ondemand::json_type::object) {
880
0
                    RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr));
881
0
                    ADVANCE_ROW();
882
0
                    continue;
883
0
                }
884
0
                cur = val.get_object();
885
0
            }
886
0
            RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid));
887
0
            ADVANCE_ROW();
888
0
            if (!valid) {
889
0
                continue; // process next line
890
0
            }
891
0
            *is_empty_row = false;
892
0
        }
893
0
    } catch (simdjson::simdjson_error& e) {
894
0
        RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
895
0
        if (*_scanner_eof) {
896
            // When _scanner_eof is true and valid is false, it means that we have encountered
897
            // unqualified data and decided to stop the scan.
898
0
            *is_empty_row = true;
899
0
            return Status::OK();
900
0
        }
901
0
    }
902
903
0
    return Status::OK();
904
0
}
905
906
Status NewJsonReader::_simdjson_handle_nested_complex_json(
907
        RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs,
908
0
        bool* is_empty_row, bool* eof) {
909
    // nested complex json
910
0
    while (true) {
911
0
        size_t num_rows = block.rows();
912
0
        simdjson::ondemand::object cur;
913
0
        size_t size = 0;
914
0
        simdjson::error_code error;
915
0
        try {
916
0
            RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error));
917
0
            if (size == 0 || *eof) {
918
0
                *is_empty_row = true;
919
0
                return Status::OK();
920
0
            }
921
0
            Status st = _get_json_value(&size, eof, &error, is_empty_row);
922
0
            if (st.is<DATA_QUALITY_ERROR>()) {
923
0
                continue; // continue to read next
924
0
            }
925
0
            RETURN_IF_ERROR(st);
926
0
            if (*is_empty_row) {
927
0
                return Status::OK();
928
0
            }
929
0
            *is_empty_row = false;
930
0
            bool valid = true;
931
0
            if (_json_value.type() != simdjson::ondemand::json_type::object) {
932
0
                RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr));
933
0
                continue;
934
0
            }
935
0
            cur = _json_value.get_object();
936
0
            st = _simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid);
937
0
            if (!st.ok()) {
938
0
                RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr));
939
                // Before continuing to process other rows, we need to first clean the fail parsed row.
940
0
                for (int i = 0; i < block.columns(); ++i) {
941
0
                    auto column = block.get_by_position(i).column->assume_mutable();
942
0
                    if (column->size() > num_rows) {
943
0
                        column->pop_back(column->size() - num_rows);
944
0
                    }
945
0
                }
946
0
                continue;
947
0
            }
948
0
            if (!valid) {
949
                // there is only one line in this case, so if it return false, just set is_empty_row true
950
                // so that the caller will continue reading next line.
951
0
                *is_empty_row = true;
952
0
            }
953
0
            break; // read a valid row
954
0
        } catch (simdjson::simdjson_error& e) {
955
0
            RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof));
956
0
            if (*_scanner_eof) {
957
                // When _scanner_eof is true and valid is false, it means that we have encountered
958
                // unqualified data and decided to stop the scan.
959
0
                *is_empty_row = true;
960
0
                return Status::OK();
961
0
            }
962
0
            continue;
963
0
        }
964
0
    }
965
0
    return Status::OK();
966
0
}
967
968
0
size_t NewJsonReader::_column_index(const StringRef& name, size_t key_index) {
969
    /// Optimization by caching the order of fields (which is almost always the same)
970
    /// and a quick check to match the next expected field, instead of searching the hash table.
971
0
    if (_prev_positions.size() > key_index && name == _prev_positions[key_index]->first) {
972
0
        return _prev_positions[key_index]->second;
973
0
    }
974
0
    auto it = _slot_desc_index.find(name);
975
0
    if (it != _slot_desc_index.end()) {
976
0
        if (key_index < _prev_positions.size()) {
977
0
            _prev_positions[key_index] = it;
978
0
        }
979
0
        return it->second;
980
0
    }
981
0
    return size_t(-1);
982
0
}
983
984
Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* value, Block& block,
985
                                                 const std::vector<SlotDescriptor*>& slot_descs,
986
0
                                                 bool* valid) {
987
    // set
988
0
    _seen_columns.assign(block.columns(), false);
989
0
    size_t cur_row_count = block.rows();
990
0
    auto pop_current_row = [&]() {
991
0
        for (size_t index = 0; index < block.columns(); ++index) {
992
0
            auto column = block.get_by_position(index).column->assume_mutable();
993
0
            if (column->size() > cur_row_count) {
994
0
                DCHECK(column->size() == cur_row_count + 1);
995
0
                column->pop_back(column->size() - cur_row_count);
996
0
                DCHECK(column->size() == cur_row_count);
997
0
            }
998
0
        }
999
0
    };
1000
0
    bool has_valid_value = false;
1001
    // iterate through object, simdjson::ondemond will parsing on the fly
1002
0
    size_t key_index = 0;
1003
1004
0
    for (auto field : *value) {
1005
0
        std::string_view key = field.unescaped_key();
1006
0
        StringRef name_ref(key.data(), key.size());
1007
0
        std::string key_string;
1008
0
        if (_is_hive_table) {
1009
0
            key_string = name_ref.to_string();
1010
0
            std::transform(key_string.begin(), key_string.end(), key_string.begin(), ::tolower);
1011
0
            name_ref = StringRef(key_string);
1012
0
        }
1013
0
        const size_t column_index = _column_index(name_ref, key_index++);
1014
0
        if (UNLIKELY(ssize_t(column_index) < 0)) {
1015
            // This key is not exist in slot desc, just ignore
1016
0
            continue;
1017
0
        }
1018
0
        if (skip_bitmap_col_idx >= 0 && std::cmp_equal(column_index, skip_bitmap_col_idx)) {
1019
0
            continue;
1020
0
        }
1021
0
        if (_seen_columns[column_index]) {
1022
0
            if (_is_hive_table) {
1023
                //Since value can only be traversed once,
1024
                // we can only insert the original value first, then delete it, and then reinsert the new value
1025
0
                block.get_by_position(column_index).column->assume_mutable()->pop_back(1);
1026
0
            } else {
1027
0
                continue;
1028
0
            }
1029
0
        }
1030
0
        simdjson::ondemand::value val = field.value();
1031
0
        auto* column_ptr = block.get_by_position(column_index).column->assume_mutable().get();
1032
0
        RETURN_IF_ERROR(_simdjson_write_data_to_column<false>(
1033
0
                val, slot_descs[column_index]->type(), column_ptr,
1034
0
                slot_descs[column_index]->col_name(), _serdes[column_index], valid,
1035
0
                _is_flexible_variant_column(*slot_descs[column_index])));
1036
0
        if (!(*valid)) {
1037
0
            pop_current_row();
1038
0
            return Status::OK();
1039
0
        }
1040
0
        _seen_columns[column_index] = true;
1041
0
        has_valid_value = true;
1042
0
    }
1043
1044
0
    if (!has_valid_value && _is_load) {
1045
0
        std::string col_names;
1046
0
        for (auto* slot_desc : slot_descs) {
1047
0
            col_names.append(slot_desc->col_name() + ", ");
1048
0
        }
1049
0
        RETURN_IF_ERROR(_append_error_msg(value,
1050
0
                                          "There is no column matching jsonpaths in the json file, "
1051
0
                                          "columns:[{}], please check columns "
1052
0
                                          "and jsonpaths:" +
1053
0
                                                  _jsonpaths,
1054
0
                                          col_names, valid));
1055
0
        return Status::OK();
1056
0
    }
1057
1058
0
    if (_should_process_skip_bitmap_col()) {
1059
0
        _append_empty_skip_bitmap_value(block, cur_row_count);
1060
0
    }
1061
1062
    // fill missing slot
1063
0
    int nullcount = 0;
1064
0
    for (size_t i = 0; i < slot_descs.size(); ++i) {
1065
0
        if (_seen_columns[i]) {
1066
0
            continue;
1067
0
        }
1068
0
        if (skip_bitmap_col_idx >= 0 && std::cmp_equal(i, skip_bitmap_col_idx)) {
1069
0
            continue;
1070
0
        }
1071
1072
0
        auto* slot_desc = slot_descs[i];
1073
0
        auto* column_ptr = block.get_by_position(i).column->assume_mutable().get();
1074
1075
        // Quick path to insert default value, instead of using default values in the value map.
1076
0
        if (!_should_process_skip_bitmap_col() &&
1077
0
            (_col_default_value_map.empty() ||
1078
0
             _col_default_value_map.find(slot_desc->col_name()) == _col_default_value_map.end())) {
1079
0
            column_ptr->insert_default();
1080
0
            continue;
1081
0
        }
1082
0
        if (column_ptr->size() < cur_row_count + 1) {
1083
0
            DCHECK(column_ptr->size() == cur_row_count);
1084
0
            if (_should_process_skip_bitmap_col()) {
1085
                // not found, skip this column in flexible partial update
1086
0
                if (slot_desc->is_key() && !slot_desc->is_auto_increment()) {
1087
0
                    RETURN_IF_ERROR(
1088
0
                            _append_error_msg(value,
1089
0
                                              "The key columns can not be ommited in flexible "
1090
0
                                              "partial update, missing key column: {}",
1091
0
                                              slot_desc->col_name(), valid));
1092
0
                    pop_current_row();
1093
0
                    return Status::OK();
1094
0
                }
1095
0
                _set_skip_bitmap_mark(slot_desc, column_ptr, block, cur_row_count, valid);
1096
0
                column_ptr->insert_default();
1097
0
            } else {
1098
0
                RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid));
1099
0
                if (!(*valid)) {
1100
0
                    return Status::OK();
1101
0
                }
1102
0
            }
1103
0
            ++nullcount;
1104
0
        }
1105
0
        DCHECK(column_ptr->size() == cur_row_count + 1);
1106
0
    }
1107
1108
    // There is at least one valid value here
1109
0
    DCHECK(nullcount < block.columns());
1110
0
    *valid = true;
1111
0
    return Status::OK();
1112
0
}
1113
1114
template <bool use_string_cache>
1115
Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value,
1116
                                                     const DataTypePtr& type_desc,
1117
                                                     IColumn* column_ptr,
1118
                                                     const std::string& column_name,
1119
                                                     DataTypeSerDeSPtr serde, bool* valid,
1120
0
                                                     bool is_flexible_variant_column) {
1121
0
    ColumnNullable* nullable_column = nullptr;
1122
0
    IColumn* data_column_ptr = column_ptr;
1123
0
    DataTypeSerDeSPtr data_serde = serde;
1124
1125
0
    auto primitive_type = remove_nullable(type_desc)->get_primitive_type();
1126
0
    const IColumn* nested_column_ptr =
1127
0
            column_ptr->is_nullable()
1128
0
                    ? assert_cast<const ColumnNullable&>(*column_ptr).get_nested_column_ptr().get()
1129
0
                    : column_ptr;
1130
0
    const bool is_flexible_variant_patch_column =
1131
0
            _should_process_skip_bitmap_col() &&
1132
0
            (primitive_type == TYPE_VARIANT ||
1133
0
             check_and_get_column<ColumnVariant>(nested_column_ptr) != nullptr ||
1134
0
             is_flexible_variant_column);
1135
0
    if (is_flexible_variant_patch_column && value.type() != simdjson::ondemand::json_type::object) {
1136
0
        if (_is_load) {
1137
0
            RETURN_IF_ERROR(_append_error_msg(
1138
0
                    nullptr,
1139
0
                    "VARIANT flexible partial update only supports JSON object patch values", "",
1140
0
                    valid));
1141
0
            return Status::OK();
1142
0
        }
1143
0
        return Status::NotSupported(
1144
0
                "VARIANT flexible partial update only supports JSON object patch values");
1145
0
    }
1146
1147
0
    if (column_ptr->is_nullable()) {
1148
0
        nullable_column = reinterpret_cast<ColumnNullable*>(column_ptr);
1149
1150
0
        data_column_ptr = nullable_column->get_nested_column().get_ptr().get();
1151
0
        data_serde = serde->get_nested_serdes()[0];
1152
1153
        // kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType.
1154
0
        if (value.type() == simdjson::ondemand::json_type::null) {
1155
0
            nullable_column->insert_default();
1156
0
            *valid = true;
1157
0
            return Status::OK();
1158
0
        }
1159
0
    } else if (value.type() == simdjson::ondemand::json_type::null) [[unlikely]] {
1160
0
        if (_is_load) {
1161
0
            RETURN_IF_ERROR(_append_error_msg(
1162
0
                    nullptr, "Json value is null, but the column `{}` is not nullable.",
1163
0
                    column_name, valid));
1164
0
            return Status::OK();
1165
0
        } else {
1166
0
            return Status::DataQualityError(
1167
0
                    "Json value is null, but the column `{}` is not nullable.", column_name);
1168
0
        }
1169
0
    }
1170
1171
0
    if (_is_load || !is_complex_type(primitive_type)) {
1172
0
        if (is_flexible_variant_patch_column && primitive_type == TYPE_VARIANT) {
1173
0
            ParseConfig parse_config;
1174
0
            parse_config.check_duplicate_json_path =
1175
0
                    config::variant_enable_duplicate_json_path_check;
1176
0
            parse_config.reject_json_null_value = true;
1177
0
            parse_config.record_empty_object_path = true;
1178
0
            std::string_view json_str = simdjson::to_json_string(value);
1179
0
            StringRef json_ref {json_str.data(), json_str.size()};
1180
0
            try {
1181
0
                variant_util::parse_json_to_variant(*data_column_ptr, json_ref, nullptr,
1182
0
                                                    parse_config);
1183
0
            } catch (const Exception& e) {
1184
0
                return e.to_status();
1185
0
            }
1186
0
            if (nullable_column) {
1187
0
                nullable_column->get_null_map_data().push_back(0);
1188
0
            }
1189
0
            *valid = true;
1190
0
            return Status::OK();
1191
0
        }
1192
0
        if (value.type() == simdjson::ondemand::json_type::string) {
1193
0
            std::string_view value_string;
1194
0
            if constexpr (use_string_cache) {
1195
0
                const auto cache_key = value.raw_json().value();
1196
0
                if (_cached_string_values.contains(cache_key)) {
1197
0
                    value_string = _cached_string_values[cache_key];
1198
0
                } else {
1199
0
                    value_string = value.get_string();
1200
0
                    _cached_string_values.emplace(cache_key, value_string);
1201
0
                }
1202
0
            } else {
1203
0
                DCHECK(_cached_string_values.empty());
1204
0
                value_string = value.get_string();
1205
0
            }
1206
1207
0
            Slice slice {value_string.data(), value_string.size()};
1208
0
            RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
1209
0
                                                                       _serde_options));
1210
1211
0
        } else if (value.type() == simdjson::ondemand::json_type::boolean) {
1212
0
            const char* str_value = nullptr;
1213
            // insert "1"/"0" , not "true"/"false".
1214
0
            if (value.get_bool()) {
1215
0
                str_value = (char*)"1";
1216
0
            } else {
1217
0
                str_value = (char*)"0";
1218
0
            }
1219
0
            Slice slice {str_value, 1};
1220
0
            RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
1221
0
                                                                       _serde_options));
1222
0
        } else {
1223
            // Maybe we can `switch (value->GetType()) case: kNumberType`.
1224
            // Note that `if (value->IsInt())`, but column is FloatColumn.
1225
0
            std::string_view json_str = simdjson::to_json_string(value);
1226
0
            Slice slice {json_str.data(), json_str.size()};
1227
0
            RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
1228
0
                                                                       _serde_options));
1229
0
        }
1230
0
    } else if (primitive_type == TYPE_STRUCT) {
1231
0
        if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] {
1232
0
            return Status::DataQualityError(
1233
0
                    "Json value isn't object, but the column `{}` is struct.", column_name);
1234
0
        }
1235
1236
0
        const auto* type_struct =
1237
0
                assert_cast<const DataTypeStruct*>(remove_nullable(type_desc).get());
1238
0
        auto sub_col_size = type_struct->get_elements().size();
1239
0
        simdjson::ondemand::object struct_value = value.get_object();
1240
0
        auto sub_serdes = data_serde->get_nested_serdes();
1241
0
        auto* struct_column_ptr = assert_cast<ColumnStruct*>(data_column_ptr);
1242
1243
0
        std::map<std::string, size_t> sub_col_name_to_idx;
1244
0
        for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) {
1245
0
            sub_col_name_to_idx.emplace(type_struct->get_element_name(sub_col_idx), sub_col_idx);
1246
0
        }
1247
0
        std::vector<bool> has_value(sub_col_size, false);
1248
0
        for (simdjson::ondemand::field sub : struct_value) {
1249
0
            std::string_view sub_key_view = sub.unescaped_key();
1250
0
            std::string sub_key(sub_key_view.data(), sub_key_view.length());
1251
0
            std::transform(sub_key.begin(), sub_key.end(), sub_key.begin(), ::tolower);
1252
1253
0
            if (sub_col_name_to_idx.find(sub_key) == sub_col_name_to_idx.end()) [[unlikely]] {
1254
0
                continue;
1255
0
            }
1256
0
            size_t sub_column_idx = sub_col_name_to_idx[sub_key];
1257
0
            auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr();
1258
1259
0
            if (has_value[sub_column_idx]) [[unlikely]] {
1260
                // Since struct_value can only be traversed once, we can only insert
1261
                // the original value first, then delete it, and then reinsert the new value.
1262
0
                sub_column_ptr->pop_back(1);
1263
0
            }
1264
0
            has_value[sub_column_idx] = true;
1265
1266
0
            const auto& sub_col_type = type_struct->get_element(sub_column_idx);
1267
0
            RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
1268
0
                    sub.value(), sub_col_type, sub_column_ptr.get(), column_name + "." + sub_key,
1269
0
                    sub_serdes[sub_column_idx], valid));
1270
0
        }
1271
1272
        //fill missing subcolumn
1273
0
        for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) {
1274
0
            if (has_value[sub_col_idx]) {
1275
0
                continue;
1276
0
            }
1277
1278
0
            auto sub_column_ptr = struct_column_ptr->get_column(sub_col_idx).get_ptr();
1279
0
            if (sub_column_ptr->is_nullable()) {
1280
0
                sub_column_ptr->insert_default();
1281
0
                continue;
1282
0
            } else [[unlikely]] {
1283
0
                return Status::DataQualityError(
1284
0
                        "Json file structColumn miss field {} and this column isn't nullable.",
1285
0
                        column_name + "." + type_struct->get_element_name(sub_col_idx));
1286
0
            }
1287
0
        }
1288
0
    } else if (primitive_type == TYPE_MAP) {
1289
0
        if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] {
1290
0
            return Status::DataQualityError("Json value isn't object, but the column `{}` is map.",
1291
0
                                            column_name);
1292
0
        }
1293
0
        simdjson::ondemand::object object_value = value.get_object();
1294
1295
0
        auto sub_serdes = data_serde->get_nested_serdes();
1296
0
        auto* map_column_ptr = assert_cast<ColumnMap*>(data_column_ptr);
1297
1298
0
        size_t field_count = 0;
1299
0
        for (simdjson::ondemand::field member_value : object_value) {
1300
0
            auto f = [](std::string_view key_view, const DataTypePtr& type_desc,
1301
0
                        IColumn* column_ptr, DataTypeSerDeSPtr serde,
1302
0
                        DataTypeSerDe::FormatOptions serde_options, bool* valid) {
1303
0
                auto* data_column_ptr = column_ptr;
1304
0
                auto data_serde = serde;
1305
0
                if (column_ptr->is_nullable()) {
1306
0
                    auto* nullable_column = static_cast<ColumnNullable*>(column_ptr);
1307
1308
0
                    nullable_column->get_null_map_data().push_back(0);
1309
0
                    data_column_ptr = nullable_column->get_nested_column().get_ptr().get();
1310
0
                    data_serde = serde->get_nested_serdes()[0];
1311
0
                }
1312
0
                Slice slice(key_view.data(), key_view.length());
1313
1314
0
                RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice,
1315
0
                                                                           serde_options));
1316
0
                return Status::OK();
1317
0
            };
Unexecuted instantiation: _ZZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb0EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbbENKUlSt17basic_string_viewIcSJ_ESD_SF_SP_NSO_13FormatOptionsESQ_E_clESS_SD_SF_SP_ST_SQ_
Unexecuted instantiation: _ZZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb1EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbbENKUlSt17basic_string_viewIcSJ_ESD_SF_SP_NSO_13FormatOptionsESQ_E_clESS_SD_SF_SP_ST_SQ_
1318
1319
0
            RETURN_IF_ERROR(f(member_value.unescaped_key(),
1320
0
                              assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get())
1321
0
                                      ->get_key_type(),
1322
0
                              map_column_ptr->get_keys_ptr()->assume_mutable()->get_ptr().get(),
1323
0
                              sub_serdes[0], _serde_options, valid));
1324
1325
0
            simdjson::ondemand::value field_value = member_value.value();
1326
0
            RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
1327
0
                    field_value,
1328
0
                    assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get())
1329
0
                            ->get_value_type(),
1330
0
                    map_column_ptr->get_values_ptr()->assume_mutable()->get_ptr().get(),
1331
0
                    column_name + ".value", sub_serdes[1], valid));
1332
0
            field_count++;
1333
0
        }
1334
1335
0
        auto& offsets = map_column_ptr->get_offsets();
1336
0
        offsets.emplace_back(offsets.back() + field_count);
1337
1338
0
    } else if (primitive_type == TYPE_ARRAY) {
1339
0
        if (value.type() != simdjson::ondemand::json_type::array) [[unlikely]] {
1340
0
            return Status::DataQualityError("Json value isn't array, but the column `{}` is array.",
1341
0
                                            column_name);
1342
0
        }
1343
1344
0
        simdjson::ondemand::array array_value = value.get_array();
1345
1346
0
        auto sub_serdes = data_serde->get_nested_serdes();
1347
0
        auto* array_column_ptr = assert_cast<ColumnArray*>(data_column_ptr);
1348
1349
0
        int field_count = 0;
1350
0
        for (simdjson::ondemand::value sub_value : array_value) {
1351
0
            RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>(
1352
0
                    sub_value,
1353
0
                    assert_cast<const DataTypeArray*>(remove_nullable(type_desc).get())
1354
0
                            ->get_nested_type(),
1355
0
                    array_column_ptr->get_data().get_ptr().get(), column_name + ".element",
1356
0
                    sub_serdes[0], valid));
1357
0
            field_count++;
1358
0
        }
1359
0
        auto& offsets = array_column_ptr->get_offsets();
1360
0
        offsets.emplace_back(offsets.back() + field_count);
1361
1362
0
    } else {
1363
0
        return Status::InternalError("Not support load to complex column.");
1364
0
    }
1365
    //We need to finally set the nullmap of column_nullable to keep the size consistent with data_column
1366
0
    if (nullable_column && value.type() != simdjson::ondemand::json_type::null) {
1367
0
        nullable_column->get_null_map_data().push_back(0);
1368
0
    }
1369
0
    *valid = true;
1370
0
    return Status::OK();
1371
0
}
Unexecuted instantiation: _ZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb0EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbb
Unexecuted instantiation: _ZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb1EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbb
1372
1373
Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::string error_msg,
1374
0
                                        std::string col_name, bool* valid) {
1375
0
    std::string err_msg;
1376
0
    if (!col_name.empty()) {
1377
0
        fmt::memory_buffer error_buf;
1378
0
        fmt::format_to(error_buf, error_msg, col_name, _jsonpaths);
1379
0
        err_msg = fmt::to_string(error_buf);
1380
0
    } else {
1381
0
        err_msg = error_msg;
1382
0
    }
1383
1384
0
    _counter->num_rows_filtered++;
1385
0
    if (valid != nullptr) {
1386
        // current row is invalid
1387
0
        *valid = false;
1388
0
    }
1389
1390
0
    RETURN_IF_ERROR(_state->append_error_msg_to_file(
1391
0
            [&]() -> std::string {
1392
0
                if (!obj) {
1393
0
                    return "";
1394
0
                }
1395
0
                std::string_view str_view;
1396
0
                (void)!obj->raw_json().get(str_view);
1397
0
                return std::string(str_view.data(), str_view.size());
1398
0
            },
1399
0
            [&]() -> std::string { return err_msg; }));
1400
0
    return Status::OK();
1401
0
}
1402
1403
Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof,
1404
0
                                           simdjson::error_code* error) {
1405
0
    SCOPED_TIMER(_read_timer);
1406
    // step1: read buf from pipe.
1407
0
    if (_line_reader != nullptr) {
1408
0
        RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx));
1409
0
    } else {
1410
0
        size_t length = 0;
1411
0
        RETURN_IF_ERROR(_read_one_message(&_json_str_ptr, &length));
1412
0
        _json_str = _json_str_ptr.get();
1413
0
        *size = length;
1414
0
        if (length == 0) {
1415
0
            *eof = true;
1416
0
        }
1417
0
    }
1418
0
    if (*eof) {
1419
0
        return Status::OK();
1420
0
    }
1421
1422
    // step2: init json parser iterate.
1423
0
    if (*size + simdjson::SIMDJSON_PADDING > _padded_size) {
1424
        // For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end.
1425
        // Hence, a re-allocation is needed if the space is not enough.
1426
0
        _simdjson_ondemand_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING);
1427
0
        _simdjson_ondemand_unscape_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING);
1428
0
        _padded_size = *size + simdjson::SIMDJSON_PADDING;
1429
0
    }
1430
    // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM)
1431
0
    if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' &&
1432
0
        static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') {
1433
        // skip the first three BOM bytes
1434
0
        _json_str += 3;
1435
0
        *size -= 3;
1436
0
    }
1437
0
    memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size);
1438
0
    _original_doc_size = *size;
1439
0
    *error = _ondemand_json_parser
1440
0
                     ->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size),
1441
0
                               _padded_size)
1442
0
                     .get(_original_json_doc);
1443
0
    return Status::OK();
1444
0
}
1445
1446
0
Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row) {
1447
0
    if (size == 0 || eof) {
1448
0
        *is_empty_row = true;
1449
0
        return Status::OK();
1450
0
    }
1451
1452
0
    if (!_parsed_jsonpaths.empty() && _strip_outer_array) {
1453
0
        _total_rows = _json_value.count_elements().value();
1454
0
        _next_row = 0;
1455
1456
0
        if (_total_rows == 0) {
1457
            // meet an empty json array.
1458
0
            *is_empty_row = true;
1459
0
        }
1460
0
    }
1461
0
    return Status::OK();
1462
0
}
1463
1464
Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error,
1465
0
                                      bool* is_empty_row) {
1466
0
    SCOPED_TIMER(_read_timer);
1467
0
    auto return_quality_error = [&](fmt::memory_buffer& error_msg,
1468
0
                                    const std::string& doc_info) -> Status {
1469
0
        _counter->num_rows_filtered++;
1470
0
        RETURN_IF_ERROR(_state->append_error_msg_to_file(
1471
0
                [&]() -> std::string { return doc_info; },
1472
0
                [&]() -> std::string { return fmt::to_string(error_msg); }));
1473
0
        if (*_scanner_eof) {
1474
            // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means
1475
            // we meet enough invalid rows and the scanner should be stopped.
1476
            // So we set eof to true and return OK, the caller will stop the process as we meet the end of file.
1477
0
            *eof = true;
1478
0
            return Status::OK();
1479
0
        }
1480
0
        return Status::DataQualityError(fmt::to_string(error_msg));
1481
0
    };
1482
0
    if (*error != simdjson::error_code::SUCCESS) {
1483
0
        fmt::memory_buffer error_msg;
1484
0
        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
1485
0
                       *error, simdjson::error_message(*error));
1486
0
        return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1487
0
    }
1488
0
    auto type_res = _original_json_doc.type();
1489
0
    if (type_res.error() != simdjson::error_code::SUCCESS) {
1490
0
        fmt::memory_buffer error_msg;
1491
0
        fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}",
1492
0
                       type_res.error(), simdjson::error_message(type_res.error()));
1493
0
        return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1494
0
    }
1495
0
    simdjson::ondemand::json_type type = type_res.value();
1496
0
    if (type != simdjson::ondemand::json_type::object &&
1497
0
        type != simdjson::ondemand::json_type::array) {
1498
0
        fmt::memory_buffer error_msg;
1499
0
        fmt::format_to(error_msg, "Not an json object or json array");
1500
0
        return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1501
0
    }
1502
0
    if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) {
1503
0
        try {
1504
            // set json root
1505
            // if it is an array at top level, then we should iterate the entire array in
1506
            // ::_simdjson_handle_flat_array_complex_json
1507
0
            simdjson::ondemand::object object = _original_json_doc;
1508
0
            Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value);
1509
0
            if (!st.ok()) {
1510
0
                fmt::memory_buffer error_msg;
1511
0
                fmt::format_to(error_msg, "{}", st.to_string());
1512
0
                return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1513
0
            }
1514
0
            _parsed_from_json_root = true;
1515
0
        } catch (simdjson::simdjson_error& e) {
1516
0
            fmt::memory_buffer error_msg;
1517
0
            fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}",
1518
0
                           e.what());
1519
0
            return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1520
0
        }
1521
0
    } else {
1522
0
        _json_value = _original_json_doc;
1523
0
    }
1524
1525
0
    if (_json_value.type() == simdjson::ondemand::json_type::array && !_strip_outer_array) {
1526
0
        fmt::memory_buffer error_msg;
1527
0
        fmt::format_to(error_msg, "{}",
1528
0
                       "JSON data is array-object, `strip_outer_array` must be TRUE.");
1529
0
        return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1530
0
    }
1531
1532
0
    if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) {
1533
0
        fmt::memory_buffer error_msg;
1534
0
        fmt::format_to(error_msg, "{}",
1535
0
                       "JSON data is not an array-object, `strip_outer_array` must be FALSE.");
1536
0
        return return_quality_error(error_msg, std::string((char*)_json_str, *size));
1537
0
    }
1538
0
    RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row));
1539
0
    return Status::OK();
1540
0
}
1541
1542
Status NewJsonReader::_simdjson_write_columns_by_jsonpath(
1543
        simdjson::ondemand::object* value, const std::vector<SlotDescriptor*>& slot_descs,
1544
0
        Block& block, bool* valid) {
1545
    // write by jsonpath
1546
0
    bool has_valid_value = false;
1547
1548
0
    Defer clear_defer([this]() { _cached_string_values.clear(); });
1549
1550
0
    for (size_t i = 0; i < slot_descs.size(); i++) {
1551
0
        auto* slot_desc = slot_descs[i];
1552
0
        auto* column_ptr = block.get_by_position(i).column->assume_mutable().get();
1553
0
        simdjson::ondemand::value json_value;
1554
0
        Status st;
1555
0
        if (i < _parsed_jsonpaths.size()) {
1556
0
            st = JsonFunctions::extract_from_object(*value, _parsed_jsonpaths[i], &json_value);
1557
0
            if (!st.ok() && !st.is<NOT_FOUND>()) {
1558
0
                return st;
1559
0
            }
1560
0
        }
1561
0
        if (i < _parsed_jsonpaths.size() && JsonFunctions::is_root_path(_parsed_jsonpaths[i])) {
1562
            // Indicate that the jsonpath is "$" or "$.", read the full root json object, insert the original doc directly
1563
0
            ColumnNullable* nullable_column = nullptr;
1564
0
            IColumn* target_column_ptr = nullptr;
1565
0
            if (slot_desc->is_nullable()) {
1566
0
                nullable_column = assert_cast<ColumnNullable*>(column_ptr);
1567
0
                target_column_ptr = &nullable_column->get_nested_column();
1568
0
                nullable_column->get_null_map_data().push_back(0);
1569
0
            }
1570
0
            auto* column_string = assert_cast<ColumnString*>(target_column_ptr);
1571
0
            column_string->insert_data(_simdjson_ondemand_padding_buffer.data(),
1572
0
                                       _original_doc_size);
1573
0
            has_valid_value = true;
1574
0
        } else if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) {
1575
            // not match in jsondata, filling with default value
1576
0
            RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid));
1577
0
            if (!(*valid)) {
1578
0
                return Status::OK();
1579
0
            }
1580
0
        } else {
1581
0
            RETURN_IF_ERROR(_simdjson_write_data_to_column<true>(json_value, slot_desc->type(),
1582
0
                                                                 column_ptr, slot_desc->col_name(),
1583
0
                                                                 _serdes[i], valid));
1584
0
            if (!(*valid)) {
1585
0
                return Status::OK();
1586
0
            }
1587
0
            has_valid_value = true;
1588
0
        }
1589
0
    }
1590
0
    if (!has_valid_value) {
1591
        // there is no valid value in json line but has filled with default value before
1592
        // so remove this line in block
1593
0
        std::string col_names;
1594
0
        for (int i = 0; i < block.columns(); ++i) {
1595
0
            auto column = block.get_by_position(i).column->assume_mutable();
1596
0
            column->pop_back(1);
1597
0
        }
1598
0
        for (auto* slot_desc : slot_descs) {
1599
0
            col_names.append(slot_desc->col_name() + ", ");
1600
0
        }
1601
0
        RETURN_IF_ERROR(_append_error_msg(value,
1602
0
                                          "There is no column matching jsonpaths in the json file, "
1603
0
                                          "columns:[{}], please check columns "
1604
0
                                          "and jsonpaths:" +
1605
0
                                                  _jsonpaths,
1606
0
                                          col_names, valid));
1607
0
        return Status::OK();
1608
0
    }
1609
0
    *valid = true;
1610
0
    return Status::OK();
1611
0
}
1612
1613
Status NewJsonReader::_get_column_default_value(
1614
        const std::vector<SlotDescriptor*>& slot_descs,
1615
0
        const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx) {
1616
0
    for (auto* slot_desc : slot_descs) {
1617
0
        auto it = col_default_value_ctx.find(slot_desc->col_name());
1618
0
        if (it != col_default_value_ctx.end() && it->second != nullptr) {
1619
0
            const auto& ctx = it->second;
1620
            // NULL_LITERAL means no valid value of current column
1621
0
            if (ctx->root()->node_type() == TExprNodeType::type::NULL_LITERAL) {
1622
0
                continue;
1623
0
            }
1624
0
            ColumnWithTypeAndName result;
1625
0
            RETURN_IF_ERROR(ctx->execute_const_expr(result));
1626
0
            DCHECK(result.column->size() == 1);
1627
0
            _col_default_value_map.emplace(slot_desc->col_name(),
1628
0
                                           result.column->get_data_at(0).to_string());
1629
0
        }
1630
0
    }
1631
0
    return Status::OK();
1632
0
}
1633
1634
Status NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde,
1635
0
                                           IColumn* column_ptr, bool* valid) {
1636
0
    auto col_value = _col_default_value_map.find(slot_desc->col_name());
1637
0
    if (col_value == _col_default_value_map.end()) {
1638
0
        if (slot_desc->is_nullable()) {
1639
0
            auto* nullable_column = static_cast<ColumnNullable*>(column_ptr);
1640
0
            nullable_column->insert_default();
1641
0
        } else {
1642
0
            if (_is_load) {
1643
0
                RETURN_IF_ERROR(_append_error_msg(
1644
0
                        nullptr, "The column `{}` is not nullable, but it's not found in jsondata.",
1645
0
                        slot_desc->col_name(), valid));
1646
0
            } else {
1647
0
                return Status::DataQualityError(
1648
0
                        "The column `{}` is not nullable, but it's not found in jsondata.",
1649
0
                        slot_desc->col_name());
1650
0
            }
1651
0
        }
1652
0
    } else {
1653
0
        const std::string& v_str = col_value->second;
1654
0
        Slice column_default_value {v_str};
1655
0
        RETURN_IF_ERROR(serde->deserialize_one_cell_from_json(*column_ptr, column_default_value,
1656
0
                                                              _serde_options));
1657
0
    }
1658
0
    *valid = true;
1659
0
    return Status::OK();
1660
0
}
1661
1662
0
bool NewJsonReader::_is_flexible_variant_column(const SlotDescriptor& slot_desc) const {
1663
0
    if (!_should_process_skip_bitmap_col()) {
1664
0
        return false;
1665
0
    }
1666
0
    if (remove_nullable(slot_desc.type())->get_primitive_type() == TYPE_VARIANT) {
1667
0
        return true;
1668
0
    }
1669
0
    DORIS_CHECK(_state != nullptr);
1670
0
    DORIS_CHECK(_params.__isset.dest_tuple_id);
1671
0
    const auto* dest_tuple_desc = _state->desc_tbl().get_tuple_descriptor(_params.dest_tuple_id);
1672
0
    DORIS_CHECK(dest_tuple_desc != nullptr);
1673
0
    for (const auto* dest_slot_desc : dest_tuple_desc->slots()) {
1674
0
        if (dest_slot_desc->col_name() == slot_desc.col_name()) {
1675
0
            return remove_nullable(dest_slot_desc->type())->get_primitive_type() == TYPE_VARIANT;
1676
0
        }
1677
0
    }
1678
0
    return false;
1679
0
}
1680
1681
0
void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) const {
1682
0
    auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(
1683
0
            block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get());
1684
0
    auto* skip_bitmap_col_ptr =
1685
0
            assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get());
1686
0
    DCHECK(skip_bitmap_nullable_col_ptr->size() == cur_row_count);
1687
    // should append an empty bitmap for every row wheather this line misses columns
1688
0
    skip_bitmap_nullable_col_ptr->get_null_map_data().push_back(0);
1689
0
    skip_bitmap_col_ptr->insert_default();
1690
0
    DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1);
1691
0
}
1692
1693
void NewJsonReader::_set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr,
1694
0
                                          Block& block, size_t cur_row_count, bool* valid) const {
1695
    // we record the missing column's column unique id in skip bitmap
1696
    // to indicate which columns need to do the alignment process
1697
0
    auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>(
1698
0
            block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get());
1699
0
    auto* skip_bitmap_col_ptr =
1700
0
            assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get());
1701
0
    DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1);
1702
0
    auto& skip_bitmap = skip_bitmap_col_ptr->get_data().back();
1703
0
    skip_bitmap.add(slot_desc->col_unique_id());
1704
0
}
1705
1706
0
void NewJsonReader::_collect_profile_before_close() {
1707
0
    if (_line_reader != nullptr) {
1708
0
        _line_reader->collect_profile_before_close();
1709
0
    }
1710
0
    if (_file_reader != nullptr) {
1711
0
        _file_reader->collect_profile_before_close();
1712
0
    }
1713
0
}
1714
1715
} // namespace doris