be/src/format/json/new_json_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/json/new_json_reader.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <rapidjson/error/en.h> |
26 | | #include <rapidjson/reader.h> |
27 | | #include <rapidjson/stringbuffer.h> |
28 | | #include <rapidjson/writer.h> |
29 | | #include <simdjson/simdjson.h> // IWYU pragma: keep |
30 | | |
31 | | #include <algorithm> |
32 | | #include <cinttypes> |
33 | | #include <cstdio> |
34 | | #include <cstring> |
35 | | #include <map> |
36 | | #include <memory> |
37 | | #include <string_view> |
38 | | #include <utility> |
39 | | |
40 | | #include "common/compiler_util.h" // IWYU pragma: keep |
41 | | #include "common/config.h" |
42 | | #include "common/status.h" |
43 | | #include "core/assert_cast.h" |
44 | | #include "core/block/column_with_type_and_name.h" |
45 | | #include "core/column/column.h" |
46 | | #include "core/column/column_array.h" |
47 | | #include "core/column/column_map.h" |
48 | | #include "core/column/column_nullable.h" |
49 | | #include "core/column/column_string.h" |
50 | | #include "core/column/column_struct.h" |
51 | | #include "core/custom_allocator.h" |
52 | | #include "core/data_type/data_type_array.h" |
53 | | #include "core/data_type/data_type_factory.hpp" |
54 | | #include "core/data_type/data_type_map.h" |
55 | | #include "core/data_type/data_type_number.h" // IWYU pragma: keep |
56 | | #include "core/data_type/data_type_struct.h" |
57 | | #include "core/data_type/define_primitive_type.h" |
58 | | #include "exec/scan/scanner.h" |
59 | | #include "exprs/json_functions.h" |
60 | | #include "format/file_reader/new_plain_text_line_reader.h" |
61 | | #include "io/file_factory.h" |
62 | | #include "io/fs/buffered_reader.h" |
63 | | #include "io/fs/file_reader.h" |
64 | | #include "io/fs/stream_load_pipe.h" |
65 | | #include "io/fs/tracing_file_reader.h" |
66 | | #include "runtime/descriptors.h" |
67 | | #include "runtime/runtime_state.h" |
68 | | #include "util/slice.h" |
69 | | |
70 | | namespace doris::io { |
71 | | struct IOContext; |
72 | | enum class FileCachePolicy : uint8_t; |
73 | | } // namespace doris::io |
74 | | |
75 | | namespace doris { |
76 | | #include "common/compile_check_begin.h" |
77 | | using namespace ErrorCode; |
78 | | |
79 | | NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
80 | | const TFileScanRangeParams& params, const TFileRangeDesc& range, |
81 | | const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof, |
82 | | io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder) |
83 | 0 | : _vhandle_json_callback(nullptr), |
84 | 0 | _state(state), |
85 | 0 | _profile(profile), |
86 | 0 | _counter(counter), |
87 | 0 | _params(params), |
88 | 0 | _range(range), |
89 | 0 | _file_slot_descs(file_slot_descs), |
90 | 0 | _file_reader(nullptr), |
91 | 0 | _line_reader(nullptr), |
92 | 0 | _reader_eof(false), |
93 | 0 | _decompressor(nullptr), |
94 | 0 | _skip_first_line(false), |
95 | 0 | _next_row(0), |
96 | 0 | _total_rows(0), |
97 | 0 | _value_allocator(_value_buffer, sizeof(_value_buffer)), |
98 | 0 | _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), |
99 | 0 | _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), |
100 | 0 | _scanner_eof(scanner_eof), |
101 | 0 | _current_offset(0), |
102 | 0 | _io_ctx(io_ctx), |
103 | 0 | _io_ctx_holder(std::move(io_ctx_holder)) { |
104 | 0 | if (_io_ctx == nullptr && _io_ctx_holder) { |
105 | 0 | _io_ctx = _io_ctx_holder.get(); |
106 | 0 | } |
107 | 0 | _read_timer = ADD_TIMER(_profile, "ReadTime"); |
108 | 0 | if (_range.__isset.compress_type) { |
109 | | // for compatibility |
110 | 0 | _file_compress_type = _range.compress_type; |
111 | 0 | } else { |
112 | 0 | _file_compress_type = _params.compress_type; |
113 | 0 | } |
114 | 0 | _init_system_properties(); |
115 | 0 | _init_file_description(); |
116 | 0 | } |
117 | | |
118 | | NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params, |
119 | | const TFileRangeDesc& range, |
120 | | const std::vector<SlotDescriptor*>& file_slot_descs, |
121 | | io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder) |
122 | 0 | : _vhandle_json_callback(nullptr), |
123 | 0 | _state(nullptr), |
124 | 0 | _profile(profile), |
125 | 0 | _params(params), |
126 | 0 | _range(range), |
127 | 0 | _file_slot_descs(file_slot_descs), |
128 | 0 | _line_reader(nullptr), |
129 | 0 | _reader_eof(false), |
130 | 0 | _decompressor(nullptr), |
131 | 0 | _skip_first_line(false), |
132 | 0 | _next_row(0), |
133 | 0 | _total_rows(0), |
134 | 0 | _value_allocator(_value_buffer, sizeof(_value_buffer)), |
135 | 0 | _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), |
136 | 0 | _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), |
137 | 0 | _io_ctx(io_ctx), |
138 | 0 | _io_ctx_holder(std::move(io_ctx_holder)) { |
139 | 0 | if (_io_ctx == nullptr && _io_ctx_holder) { |
140 | 0 | _io_ctx = _io_ctx_holder.get(); |
141 | 0 | } |
142 | 0 | if (_range.__isset.compress_type) { |
143 | | // for compatibility |
144 | 0 | _file_compress_type = _range.compress_type; |
145 | 0 | } else { |
146 | 0 | _file_compress_type = _params.compress_type; |
147 | 0 | } |
148 | 0 | _init_system_properties(); |
149 | 0 | _init_file_description(); |
150 | 0 | } |
151 | | |
152 | 0 | void NewJsonReader::_init_system_properties() { |
153 | 0 | if (_range.__isset.file_type) { |
154 | | // for compatibility |
155 | 0 | _system_properties.system_type = _range.file_type; |
156 | 0 | } else { |
157 | 0 | _system_properties.system_type = _params.file_type; |
158 | 0 | } |
159 | 0 | _system_properties.properties = _params.properties; |
160 | 0 | _system_properties.hdfs_params = _params.hdfs_params; |
161 | 0 | if (_params.__isset.broker_addresses) { |
162 | 0 | _system_properties.broker_addresses.assign(_params.broker_addresses.begin(), |
163 | 0 | _params.broker_addresses.end()); |
164 | 0 | } |
165 | 0 | } |
166 | | |
167 | 0 | void NewJsonReader::_init_file_description() { |
168 | 0 | _file_description.path = _range.path; |
169 | 0 | _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1; |
170 | |
|
171 | 0 | if (_range.__isset.fs_name) { |
172 | 0 | _file_description.fs_name = _range.fs_name; |
173 | 0 | } |
174 | 0 | if (_range.__isset.file_cache_admission) { |
175 | 0 | _file_description.file_cache_admission = _range.file_cache_admission; |
176 | 0 | } |
177 | 0 | } |
178 | | |
179 | | Status NewJsonReader::init_reader( |
180 | | const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx, |
181 | 0 | bool is_load) { |
182 | 0 | _is_load = is_load; |
183 | | |
184 | | // generate _col_default_value_map |
185 | 0 | RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, col_default_value_ctx)); |
186 | | |
187 | | //use serde insert data to column. |
188 | 0 | for (auto* slot_desc : _file_slot_descs) { |
189 | 0 | _serdes.emplace_back(slot_desc->get_data_type_ptr()->get_serde()); |
190 | 0 | } |
191 | | |
192 | | // create decompressor. |
193 | | // _decompressor may be nullptr if this is not a compressed file |
194 | 0 | RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); |
195 | | |
196 | 0 | RETURN_IF_ERROR(_simdjson_init_reader()); |
197 | 0 | return Status::OK(); |
198 | 0 | } |
199 | | |
200 | 0 | Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
201 | 0 | if (_reader_eof) { |
202 | 0 | *eof = true; |
203 | 0 | return Status::OK(); |
204 | 0 | } |
205 | | |
206 | 0 | const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); |
207 | |
|
208 | 0 | while (block->rows() < batch_size && !_reader_eof) { |
209 | 0 | if (UNLIKELY(_read_json_by_line && _skip_first_line)) { |
210 | 0 | size_t size = 0; |
211 | 0 | const uint8_t* line_ptr = nullptr; |
212 | 0 | RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, &_reader_eof, _io_ctx)); |
213 | 0 | _skip_first_line = false; |
214 | 0 | continue; |
215 | 0 | } |
216 | | |
217 | 0 | bool is_empty_row = false; |
218 | |
|
219 | 0 | RETURN_IF_ERROR( |
220 | 0 | _read_json_column(_state, *block, _file_slot_descs, &is_empty_row, &_reader_eof)); |
221 | 0 | if (is_empty_row) { |
222 | | // Read empty row, just continue |
223 | 0 | continue; |
224 | 0 | } |
225 | 0 | ++(*read_rows); |
226 | 0 | } |
227 | | |
228 | 0 | return Status::OK(); |
229 | 0 | } |
230 | | |
231 | | Status NewJsonReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
232 | 0 | std::unordered_set<std::string>* missing_cols) { |
233 | 0 | for (const auto& slot : _file_slot_descs) { |
234 | 0 | name_to_type->emplace(slot->col_name(), slot->type()); |
235 | 0 | } |
236 | 0 | return Status::OK(); |
237 | 0 | } |
238 | | |
239 | | // init decompressor, file reader and line reader for parsing schema |
240 | 0 | Status NewJsonReader::init_schema_reader() { |
241 | 0 | RETURN_IF_ERROR(_get_range_params()); |
242 | | // create decompressor. |
243 | | // _decompressor may be nullptr if this is not a compressed file |
244 | 0 | RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); |
245 | 0 | RETURN_IF_ERROR(_open_file_reader(true)); |
246 | 0 | if (_read_json_by_line) { |
247 | 0 | RETURN_IF_ERROR(_open_line_reader()); |
248 | 0 | } |
249 | | // generate _parsed_jsonpaths and _parsed_json_root |
250 | 0 | RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); |
251 | 0 | return Status::OK(); |
252 | 0 | } |
253 | | |
254 | | Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, |
255 | 0 | std::vector<DataTypePtr>* col_types) { |
256 | 0 | bool eof = false; |
257 | 0 | const uint8_t* json_str = nullptr; |
258 | 0 | DorisUniqueBufferPtr<uint8_t> json_str_ptr; |
259 | 0 | size_t size = 0; |
260 | 0 | if (_line_reader != nullptr) { |
261 | 0 | RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof, _io_ctx)); |
262 | 0 | } else { |
263 | 0 | size_t read_size = 0; |
264 | 0 | RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size)); |
265 | 0 | json_str = json_str_ptr.get(); |
266 | 0 | size = read_size; |
267 | 0 | if (read_size == 0) { |
268 | 0 | eof = true; |
269 | 0 | } |
270 | 0 | } |
271 | | |
272 | 0 | if (size == 0 || eof) { |
273 | 0 | return Status::EndOfFile("Empty file."); |
274 | 0 | } |
275 | | |
276 | | // clear memory here. |
277 | 0 | _value_allocator.Clear(); |
278 | 0 | _parse_allocator.Clear(); |
279 | 0 | bool has_parse_error = false; |
280 | | |
281 | | // parse jsondata to JsonDoc |
282 | | // As the issue: https://github.com/Tencent/rapidjson/issues/1458 |
283 | | // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag. |
284 | 0 | if (_num_as_string) { |
285 | 0 | has_parse_error = |
286 | 0 | _origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, size) |
287 | 0 | .HasParseError(); |
288 | 0 | } else { |
289 | 0 | has_parse_error = _origin_json_doc.Parse((char*)json_str, size).HasParseError(); |
290 | 0 | } |
291 | |
|
292 | 0 | if (has_parse_error) { |
293 | 0 | return Status::DataQualityError( |
294 | 0 | "Parse json data for JsonDoc failed. code: {}, error info: {}", |
295 | 0 | _origin_json_doc.GetParseError(), |
296 | 0 | rapidjson::GetParseError_En(_origin_json_doc.GetParseError())); |
297 | 0 | } |
298 | | |
299 | | // set json root |
300 | 0 | if (!_parsed_json_root.empty()) { |
301 | 0 | _json_doc = JsonFunctions::get_json_object_from_parsed_json( |
302 | 0 | _parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator()); |
303 | 0 | if (_json_doc == nullptr) { |
304 | 0 | return Status::DataQualityError("JSON Root not found."); |
305 | 0 | } |
306 | 0 | } else { |
307 | 0 | _json_doc = &_origin_json_doc; |
308 | 0 | } |
309 | | |
310 | 0 | if (_json_doc->IsArray() && !_strip_outer_array) { |
311 | 0 | return Status::DataQualityError( |
312 | 0 | "JSON data is array-object, `strip_outer_array` must be TRUE."); |
313 | 0 | } |
314 | 0 | if (!_json_doc->IsArray() && _strip_outer_array) { |
315 | 0 | return Status::DataQualityError( |
316 | 0 | "JSON data is not an array-object, `strip_outer_array` must be FALSE."); |
317 | 0 | } |
318 | | |
319 | 0 | rapidjson::Value* objectValue = nullptr; |
320 | 0 | if (_json_doc->IsArray()) { |
321 | 0 | if (_json_doc->Size() == 0) { |
322 | | // may be passing an empty json, such as "[]" |
323 | 0 | return Status::InternalError<false>("Empty first json line"); |
324 | 0 | } |
325 | 0 | objectValue = &(*_json_doc)[0]; |
326 | 0 | } else { |
327 | 0 | objectValue = _json_doc; |
328 | 0 | } |
329 | | |
330 | 0 | if (!objectValue->IsObject()) { |
331 | 0 | return Status::DataQualityError("JSON data is not an object. but: {}", |
332 | 0 | objectValue->GetType()); |
333 | 0 | } |
334 | | |
335 | | // use jsonpaths to col_names |
336 | 0 | if (!_parsed_jsonpaths.empty()) { |
337 | 0 | for (auto& _parsed_jsonpath : _parsed_jsonpaths) { |
338 | 0 | size_t len = _parsed_jsonpath.size(); |
339 | 0 | if (len == 0) { |
340 | 0 | return Status::InvalidArgument("It's invalid jsonpaths."); |
341 | 0 | } |
342 | 0 | std::string key = _parsed_jsonpath[len - 1].key; |
343 | 0 | col_names->emplace_back(key); |
344 | 0 | col_types->emplace_back( |
345 | 0 | DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true)); |
346 | 0 | } |
347 | 0 | return Status::OK(); |
348 | 0 | } |
349 | | |
350 | 0 | for (int i = 0; i < objectValue->MemberCount(); ++i) { |
351 | 0 | auto it = objectValue->MemberBegin() + i; |
352 | 0 | col_names->emplace_back(it->name.GetString()); |
353 | 0 | col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>())); |
354 | 0 | } |
355 | 0 | return Status::OK(); |
356 | 0 | } |
357 | | |
358 | 0 | Status NewJsonReader::_get_range_params() { |
359 | 0 | if (!_params.__isset.file_attributes) { |
360 | 0 | return Status::InternalError<false>("BE cat get file_attributes"); |
361 | 0 | } |
362 | | |
363 | | // get line_delimiter |
364 | 0 | if (_params.file_attributes.__isset.text_params && |
365 | 0 | _params.file_attributes.text_params.__isset.line_delimiter) { |
366 | 0 | _line_delimiter = _params.file_attributes.text_params.line_delimiter; |
367 | 0 | _line_delimiter_length = _line_delimiter.size(); |
368 | 0 | } |
369 | |
|
370 | 0 | if (_params.file_attributes.__isset.jsonpaths) { |
371 | 0 | _jsonpaths = _params.file_attributes.jsonpaths; |
372 | 0 | } |
373 | 0 | if (_params.file_attributes.__isset.json_root) { |
374 | 0 | _json_root = _params.file_attributes.json_root; |
375 | 0 | } |
376 | 0 | if (_params.file_attributes.__isset.read_json_by_line) { |
377 | 0 | _read_json_by_line = _params.file_attributes.read_json_by_line; |
378 | 0 | } |
379 | 0 | if (_params.file_attributes.__isset.strip_outer_array) { |
380 | 0 | _strip_outer_array = _params.file_attributes.strip_outer_array; |
381 | 0 | } |
382 | 0 | if (_params.file_attributes.__isset.num_as_string) { |
383 | 0 | _num_as_string = _params.file_attributes.num_as_string; |
384 | 0 | } |
385 | 0 | if (_params.file_attributes.__isset.fuzzy_parse) { |
386 | 0 | _fuzzy_parse = _params.file_attributes.fuzzy_parse; |
387 | 0 | } |
388 | 0 | if (_range.table_format_params.table_format_type == "hive") { |
389 | 0 | _is_hive_table = true; |
390 | 0 | } |
391 | 0 | if (_params.file_attributes.__isset.openx_json_ignore_malformed) { |
392 | 0 | _openx_json_ignore_malformed = _params.file_attributes.openx_json_ignore_malformed; |
393 | 0 | } |
394 | 0 | return Status::OK(); |
395 | 0 | } |
396 | | |
397 | 0 | static Status ignore_malformed_json_append_null(Block& block) { |
398 | 0 | for (auto& column : block.get_columns()) { |
399 | 0 | if (!column->is_nullable()) [[unlikely]] { |
400 | 0 | return Status::DataQualityError("malformed json, but the column `{}` is not nullable.", |
401 | 0 | column->get_name()); |
402 | 0 | } |
403 | 0 | static_cast<ColumnNullable*>(column->assume_mutable().get())->insert_default(); |
404 | 0 | } |
405 | 0 | return Status::OK(); |
406 | 0 | } |
407 | | |
408 | 0 | Status NewJsonReader::_open_file_reader(bool need_schema) { |
409 | 0 | int64_t start_offset = _range.start_offset; |
410 | 0 | if (start_offset != 0) { |
411 | 0 | start_offset -= 1; |
412 | 0 | } |
413 | |
|
414 | 0 | _current_offset = start_offset; |
415 | |
|
416 | 0 | if (_params.file_type == TFileType::FILE_STREAM) { |
417 | | // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here |
418 | 0 | RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, |
419 | 0 | need_schema)); |
420 | 0 | } else { |
421 | 0 | _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; |
422 | 0 | io::FileReaderOptions reader_options = |
423 | 0 | FileFactory::get_reader_options(_state, _file_description); |
424 | 0 | io::FileReaderSPtr file_reader; |
425 | 0 | if (_io_ctx_holder) { |
426 | 0 | file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
427 | 0 | _profile, _system_properties, _file_description, reader_options, |
428 | 0 | io::DelegateReader::AccessMode::SEQUENTIAL, |
429 | 0 | std::static_pointer_cast<const io::IOContext>(_io_ctx_holder), |
430 | 0 | io::PrefetchRange(_range.start_offset, _range.size))); |
431 | 0 | } else { |
432 | 0 | file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
433 | 0 | _profile, _system_properties, _file_description, reader_options, |
434 | 0 | io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, |
435 | 0 | io::PrefetchRange(_range.start_offset, _range.size))); |
436 | 0 | } |
437 | 0 | _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader), |
438 | 0 | _io_ctx->file_reader_stats) |
439 | 0 | : file_reader; |
440 | 0 | } |
441 | 0 | return Status::OK(); |
442 | 0 | } |
443 | | |
444 | 0 | Status NewJsonReader::_open_line_reader() { |
445 | 0 | int64_t size = _range.size; |
446 | 0 | if (_range.start_offset != 0) { |
447 | | // When we fetch range doesn't start from 0, size will += 1. |
448 | 0 | size += 1; |
449 | 0 | _skip_first_line = true; |
450 | 0 | } else { |
451 | 0 | _skip_first_line = false; |
452 | 0 | } |
453 | 0 | _line_reader = NewPlainTextLineReader::create_unique( |
454 | 0 | _profile, _file_reader, _decompressor.get(), |
455 | 0 | std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length, |
456 | 0 | false), |
457 | 0 | size, _current_offset); |
458 | 0 | return Status::OK(); |
459 | 0 | } |
460 | | |
461 | 0 | Status NewJsonReader::_parse_jsonpath_and_json_root() { |
462 | | // parse jsonpaths |
463 | 0 | if (!_jsonpaths.empty()) { |
464 | 0 | rapidjson::Document jsonpaths_doc; |
465 | 0 | if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError()) { |
466 | 0 | if (!jsonpaths_doc.IsArray()) { |
467 | 0 | return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); |
468 | 0 | } |
469 | 0 | for (int i = 0; i < jsonpaths_doc.Size(); i++) { |
470 | 0 | const rapidjson::Value& path = jsonpaths_doc[i]; |
471 | 0 | if (!path.IsString()) { |
472 | 0 | return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); |
473 | 0 | } |
474 | 0 | std::string json_path = path.GetString(); |
475 | | // $ -> $. in json_path |
476 | 0 | if (UNLIKELY(json_path.size() == 1 && json_path[0] == '$')) { |
477 | 0 | json_path.insert(1, "."); |
478 | 0 | } |
479 | 0 | std::vector<JsonPath> parsed_paths; |
480 | 0 | JsonFunctions::parse_json_paths(json_path, &parsed_paths); |
481 | 0 | _parsed_jsonpaths.push_back(std::move(parsed_paths)); |
482 | 0 | } |
483 | |
|
484 | 0 | } else { |
485 | 0 | return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); |
486 | 0 | } |
487 | 0 | } |
488 | | |
489 | | // parse jsonroot |
490 | 0 | if (!_json_root.empty()) { |
491 | 0 | std::string json_root = _json_root; |
492 | | // $ -> $. in json_root |
493 | 0 | if (json_root.size() == 1 && json_root[0] == '$') { |
494 | 0 | json_root.insert(1, "."); |
495 | 0 | } |
496 | 0 | JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); |
497 | 0 | } |
498 | 0 | return Status::OK(); |
499 | 0 | } |
500 | | |
501 | | Status NewJsonReader::_read_json_column(RuntimeState* state, Block& block, |
502 | | const std::vector<SlotDescriptor*>& slot_descs, |
503 | 0 | bool* is_empty_row, bool* eof) { |
504 | 0 | return (this->*_vhandle_json_callback)(state, block, slot_descs, is_empty_row, eof); |
505 | 0 | } |
506 | | |
507 | | Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, |
508 | 0 | size_t* read_size) { |
509 | 0 | switch (_params.file_type) { |
510 | 0 | case TFileType::FILE_LOCAL: |
511 | 0 | [[fallthrough]]; |
512 | 0 | case TFileType::FILE_HDFS: |
513 | 0 | case TFileType::FILE_HTTP: |
514 | 0 | [[fallthrough]]; |
515 | 0 | case TFileType::FILE_S3: { |
516 | 0 | size_t file_size = _file_reader->size(); |
517 | 0 | *file_buf = make_unique_buffer<uint8_t>(file_size); |
518 | 0 | Slice result(file_buf->get(), file_size); |
519 | 0 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, read_size, _io_ctx)); |
520 | 0 | _current_offset += *read_size; |
521 | 0 | break; |
522 | 0 | } |
523 | 0 | case TFileType::FILE_STREAM: { |
524 | 0 | RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size)); |
525 | 0 | break; |
526 | 0 | } |
527 | 0 | default: { |
528 | 0 | return Status::NotSupported<false>("no supported file reader type: {}", _params.file_type); |
529 | 0 | } |
530 | 0 | } |
531 | 0 | return Status::OK(); |
532 | 0 | } |
533 | | |
534 | | Status NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, |
535 | 0 | size_t* read_size) { |
536 | 0 | auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()); |
537 | | |
538 | | // first read: read from the pipe once. |
539 | 0 | RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size)); |
540 | | |
541 | | // When the file is not chunked, the entire file has already been read. |
542 | 0 | if (!stream_load_pipe->is_chunked_transfer()) { |
543 | 0 | return Status::OK(); |
544 | 0 | } |
545 | | |
546 | 0 | std::vector<uint8_t> buf; |
547 | 0 | uint64_t cur_size = 0; |
548 | | |
549 | | // second read: continuously read data from the pipe until all data is read. |
550 | 0 | DorisUniqueBufferPtr<uint8_t> read_buf; |
551 | 0 | size_t read_buf_size = 0; |
552 | 0 | while (true) { |
553 | 0 | RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size)); |
554 | 0 | if (read_buf_size == 0) { |
555 | 0 | break; |
556 | 0 | } else { |
557 | 0 | buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size); |
558 | 0 | cur_size += read_buf_size; |
559 | 0 | read_buf_size = 0; |
560 | 0 | read_buf.reset(); |
561 | 0 | } |
562 | 0 | } |
563 | | |
564 | | // No data is available during the second read. |
565 | 0 | if (cur_size == 0) { |
566 | 0 | return Status::OK(); |
567 | 0 | } |
568 | | |
569 | 0 | DorisUniqueBufferPtr<uint8_t> total_buf = make_unique_buffer<uint8_t>(cur_size + *read_size); |
570 | | |
571 | | // copy the data during the first read |
572 | 0 | memcpy(total_buf.get(), file_buf->get(), *read_size); |
573 | | |
574 | | // copy the data during the second read |
575 | 0 | memcpy(total_buf.get() + *read_size, buf.data(), cur_size); |
576 | 0 | *file_buf = std::move(total_buf); |
577 | 0 | *read_size += cur_size; |
578 | 0 | return Status::OK(); |
579 | 0 | } |
580 | | |
581 | | // ---------SIMDJSON---------- |
582 | | // simdjson, replace none simdjson function if it is ready |
583 | 0 | Status NewJsonReader::_simdjson_init_reader() { |
584 | 0 | RETURN_IF_ERROR(_get_range_params()); |
585 | | |
586 | 0 | RETURN_IF_ERROR(_open_file_reader(false)); |
587 | 0 | if (LIKELY(_read_json_by_line)) { |
588 | 0 | RETURN_IF_ERROR(_open_line_reader()); |
589 | 0 | } |
590 | | |
591 | | // generate _parsed_jsonpaths and _parsed_json_root |
592 | 0 | RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); |
593 | | |
594 | | //improve performance |
595 | 0 | if (_parsed_jsonpaths.empty()) { // input is a simple json-string |
596 | 0 | _vhandle_json_callback = &NewJsonReader::_simdjson_handle_simple_json; |
597 | 0 | } else { // input is a complex json-string and a json-path |
598 | 0 | if (_strip_outer_array) { |
599 | 0 | _vhandle_json_callback = &NewJsonReader::_simdjson_handle_flat_array_complex_json; |
600 | 0 | } else { |
601 | 0 | _vhandle_json_callback = &NewJsonReader::_simdjson_handle_nested_complex_json; |
602 | 0 | } |
603 | 0 | } |
604 | 0 | _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>(); |
605 | 0 | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
606 | 0 | _slot_desc_index[StringRef {_file_slot_descs[i]->col_name()}] = i; |
607 | 0 | if (_file_slot_descs[i]->is_skip_bitmap_col()) { |
608 | 0 | skip_bitmap_col_idx = i; |
609 | 0 | } |
610 | 0 | } |
611 | 0 | _simdjson_ondemand_padding_buffer.resize(_padded_size); |
612 | 0 | _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size); |
613 | 0 | return Status::OK(); |
614 | 0 | } |
615 | | |
616 | | Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Block& block, |
617 | 0 | size_t num_rows, bool* eof) { |
618 | 0 | fmt::memory_buffer error_msg; |
619 | 0 | fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", error.error(), |
620 | 0 | error.what()); |
621 | 0 | _counter->num_rows_filtered++; |
622 | | // Before continuing to process other rows, we need to first clean the fail parsed row. |
623 | 0 | for (int i = 0; i < block.columns(); ++i) { |
624 | 0 | auto column = block.get_by_position(i).column->assume_mutable(); |
625 | 0 | if (column->size() > num_rows) { |
626 | 0 | column->pop_back(column->size() - num_rows); |
627 | 0 | } |
628 | 0 | } |
629 | |
|
630 | 0 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
631 | 0 | [&]() -> std::string { |
632 | 0 | return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size); |
633 | 0 | }, |
634 | 0 | [&]() -> std::string { return fmt::to_string(error_msg); })); |
635 | 0 | return Status::OK(); |
636 | 0 | } |
637 | | |
638 | | Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Block& block, |
639 | | const std::vector<SlotDescriptor*>& slot_descs, |
640 | 0 | bool* is_empty_row, bool* eof) { |
641 | | // simple json |
642 | 0 | size_t size = 0; |
643 | 0 | simdjson::error_code error; |
644 | 0 | size_t num_rows = block.rows(); |
645 | 0 | try { |
646 | | // step1: get and parse buf to get json doc |
647 | 0 | RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); |
648 | 0 | if (size == 0 || *eof) { |
649 | 0 | *is_empty_row = true; |
650 | 0 | return Status::OK(); |
651 | 0 | } |
652 | | |
653 | | // step2: get json value by json doc |
654 | 0 | Status st = _get_json_value(&size, eof, &error, is_empty_row); |
655 | 0 | if (st.is<DATA_QUALITY_ERROR>()) { |
656 | 0 | if (_is_load) { |
657 | 0 | return Status::OK(); |
658 | 0 | } else if (_openx_json_ignore_malformed) { |
659 | 0 | RETURN_IF_ERROR(ignore_malformed_json_append_null(block)); |
660 | 0 | return Status::OK(); |
661 | 0 | } |
662 | 0 | } |
663 | | |
664 | 0 | RETURN_IF_ERROR(st); |
665 | 0 | if (*is_empty_row || *eof) { |
666 | 0 | return Status::OK(); |
667 | 0 | } |
668 | | |
669 | | // step 3: write columns by json value |
670 | 0 | RETURN_IF_ERROR( |
671 | 0 | _simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof)); |
672 | 0 | } catch (simdjson::simdjson_error& e) { |
673 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
674 | 0 | if (*_scanner_eof) { |
675 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
676 | | // unqualified data and decided to stop the scan. |
677 | 0 | *is_empty_row = true; |
678 | 0 | return Status::OK(); |
679 | 0 | } |
680 | 0 | } |
681 | | |
682 | 0 | return Status::OK(); |
683 | 0 | } |
684 | | |
685 | | Status NewJsonReader::_simdjson_handle_simple_json_write_columns( |
686 | | Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
687 | 0 | bool* eof) { |
688 | 0 | simdjson::ondemand::object objectValue; |
689 | 0 | size_t num_rows = block.rows(); |
690 | 0 | bool valid = false; |
691 | 0 | try { |
692 | 0 | if (_json_value.type() == simdjson::ondemand::json_type::array) { |
693 | 0 | _array = _json_value.get_array(); |
694 | 0 | if (_array.count_elements() == 0) { |
695 | | // may be passing an empty json, such as "[]" |
696 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", "", nullptr)); |
697 | 0 | if (*_scanner_eof) { |
698 | 0 | *is_empty_row = true; |
699 | 0 | return Status::OK(); |
700 | 0 | } |
701 | 0 | return Status::OK(); |
702 | 0 | } |
703 | | |
704 | 0 | _array_iter = _array.begin(); |
705 | 0 | while (true) { |
706 | 0 | objectValue = *_array_iter; |
707 | 0 | RETURN_IF_ERROR( |
708 | 0 | _simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); |
709 | 0 | if (!valid) { |
710 | 0 | if (*_scanner_eof) { |
711 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
712 | | // unqualified data and decided to stop the scan. |
713 | 0 | *is_empty_row = true; |
714 | 0 | return Status::OK(); |
715 | 0 | } |
716 | 0 | } |
717 | 0 | ++_array_iter; |
718 | 0 | if (_array_iter == _array.end()) { |
719 | | // Hint to read next json doc |
720 | 0 | break; |
721 | 0 | } |
722 | 0 | } |
723 | 0 | } else { |
724 | 0 | objectValue = _json_value; |
725 | 0 | RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); |
726 | 0 | if (!valid) { |
727 | 0 | if (*_scanner_eof) { |
728 | 0 | *is_empty_row = true; |
729 | 0 | return Status::OK(); |
730 | 0 | } |
731 | 0 | } |
732 | 0 | *is_empty_row = false; |
733 | 0 | } |
734 | 0 | } catch (simdjson::simdjson_error& e) { |
735 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
736 | 0 | if (!valid) { |
737 | 0 | if (*_scanner_eof) { |
738 | 0 | *is_empty_row = true; |
739 | 0 | return Status::OK(); |
740 | 0 | } |
741 | 0 | } |
742 | 0 | } |
743 | 0 | return Status::OK(); |
744 | 0 | } |
745 | | |
746 | | Status NewJsonReader::_simdjson_handle_flat_array_complex_json( |
747 | | RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, |
748 | 0 | bool* is_empty_row, bool* eof) { |
749 | | // array complex json |
750 | 0 | size_t size = 0; |
751 | 0 | simdjson::error_code error; |
752 | 0 | size_t num_rows = block.rows(); |
753 | 0 | try { |
754 | | // step1: get and parse buf to get json doc |
755 | 0 | RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); |
756 | 0 | if (size == 0 || *eof) { |
757 | 0 | *is_empty_row = true; |
758 | 0 | return Status::OK(); |
759 | 0 | } |
760 | | |
761 | | // step2: get json value by json doc |
762 | 0 | Status st = _get_json_value(&size, eof, &error, is_empty_row); |
763 | 0 | if (st.is<DATA_QUALITY_ERROR>()) { |
764 | 0 | return Status::OK(); |
765 | 0 | } |
766 | 0 | RETURN_IF_ERROR(st); |
767 | 0 | if (*is_empty_row) { |
768 | 0 | return Status::OK(); |
769 | 0 | } |
770 | | |
771 | | // step 3: write columns by json value |
772 | 0 | RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs, |
773 | 0 | is_empty_row, eof)); |
774 | 0 | } catch (simdjson::simdjson_error& e) { |
775 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
776 | 0 | if (*_scanner_eof) { |
777 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
778 | | // unqualified data and decided to stop the scan. |
779 | 0 | *is_empty_row = true; |
780 | 0 | return Status::OK(); |
781 | 0 | } |
782 | 0 | } |
783 | | |
784 | 0 | return Status::OK(); |
785 | 0 | } |
786 | | |
787 | | Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns( |
788 | | Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
789 | 0 | bool* eof) { |
790 | | // Advance one row in array list, if it is the endpoint, stop advance and break the loop |
791 | 0 | #define ADVANCE_ROW() \ |
792 | 0 | ++_array_iter; \ |
793 | 0 | if (_array_iter == _array.end()) { \ |
794 | 0 | break; \ |
795 | 0 | } |
796 | |
|
797 | 0 | simdjson::ondemand::object cur; |
798 | 0 | size_t num_rows = block.rows(); |
799 | 0 | try { |
800 | 0 | bool valid = true; |
801 | 0 | _array = _json_value.get_array(); |
802 | 0 | _array_iter = _array.begin(); |
803 | |
|
804 | 0 | while (true) { |
805 | 0 | cur = (*_array_iter).get_object(); |
806 | | // extract root |
807 | 0 | if (!_parsed_from_json_root && !_parsed_json_root.empty()) { |
808 | 0 | simdjson::ondemand::value val; |
809 | 0 | Status st = JsonFunctions::extract_from_object(cur, _parsed_json_root, &val); |
810 | 0 | if (UNLIKELY(!st.ok())) { |
811 | 0 | if (st.is<NOT_FOUND>()) { |
812 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr)); |
813 | 0 | ADVANCE_ROW(); |
814 | 0 | continue; |
815 | 0 | } |
816 | 0 | return st; |
817 | 0 | } |
818 | 0 | if (val.type() != simdjson::ondemand::json_type::object) { |
819 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr)); |
820 | 0 | ADVANCE_ROW(); |
821 | 0 | continue; |
822 | 0 | } |
823 | 0 | cur = val.get_object(); |
824 | 0 | } |
825 | 0 | RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid)); |
826 | 0 | ADVANCE_ROW(); |
827 | 0 | if (!valid) { |
828 | 0 | continue; // process next line |
829 | 0 | } |
830 | 0 | *is_empty_row = false; |
831 | 0 | } |
832 | 0 | } catch (simdjson::simdjson_error& e) { |
833 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
834 | 0 | if (*_scanner_eof) { |
835 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
836 | | // unqualified data and decided to stop the scan. |
837 | 0 | *is_empty_row = true; |
838 | 0 | return Status::OK(); |
839 | 0 | } |
840 | 0 | } |
841 | | |
842 | 0 | return Status::OK(); |
843 | 0 | } |
844 | | |
845 | | Status NewJsonReader::_simdjson_handle_nested_complex_json( |
846 | | RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, |
847 | 0 | bool* is_empty_row, bool* eof) { |
848 | | // nested complex json |
849 | 0 | while (true) { |
850 | 0 | size_t num_rows = block.rows(); |
851 | 0 | simdjson::ondemand::object cur; |
852 | 0 | size_t size = 0; |
853 | 0 | simdjson::error_code error; |
854 | 0 | try { |
855 | 0 | RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); |
856 | 0 | if (size == 0 || *eof) { |
857 | 0 | *is_empty_row = true; |
858 | 0 | return Status::OK(); |
859 | 0 | } |
860 | 0 | Status st = _get_json_value(&size, eof, &error, is_empty_row); |
861 | 0 | if (st.is<DATA_QUALITY_ERROR>()) { |
862 | 0 | continue; // continue to read next |
863 | 0 | } |
864 | 0 | RETURN_IF_ERROR(st); |
865 | 0 | if (*is_empty_row) { |
866 | 0 | return Status::OK(); |
867 | 0 | } |
868 | 0 | *is_empty_row = false; |
869 | 0 | bool valid = true; |
870 | 0 | if (_json_value.type() != simdjson::ondemand::json_type::object) { |
871 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr)); |
872 | 0 | continue; |
873 | 0 | } |
874 | 0 | cur = _json_value.get_object(); |
875 | 0 | st = _simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid); |
876 | 0 | if (!st.ok()) { |
877 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr)); |
878 | | // Before continuing to process other rows, we need to first clean the fail parsed row. |
879 | 0 | for (int i = 0; i < block.columns(); ++i) { |
880 | 0 | auto column = block.get_by_position(i).column->assume_mutable(); |
881 | 0 | if (column->size() > num_rows) { |
882 | 0 | column->pop_back(column->size() - num_rows); |
883 | 0 | } |
884 | 0 | } |
885 | 0 | continue; |
886 | 0 | } |
887 | 0 | if (!valid) { |
888 | | // there is only one line in this case, so if it return false, just set is_empty_row true |
889 | | // so that the caller will continue reading next line. |
890 | 0 | *is_empty_row = true; |
891 | 0 | } |
892 | 0 | break; // read a valid row |
893 | 0 | } catch (simdjson::simdjson_error& e) { |
894 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
895 | 0 | if (*_scanner_eof) { |
896 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
897 | | // unqualified data and decided to stop the scan. |
898 | 0 | *is_empty_row = true; |
899 | 0 | return Status::OK(); |
900 | 0 | } |
901 | 0 | continue; |
902 | 0 | } |
903 | 0 | } |
904 | 0 | return Status::OK(); |
905 | 0 | } |
906 | | |
907 | 0 | size_t NewJsonReader::_column_index(const StringRef& name, size_t key_index) { |
908 | | /// Optimization by caching the order of fields (which is almost always the same) |
909 | | /// and a quick check to match the next expected field, instead of searching the hash table. |
910 | 0 | if (_prev_positions.size() > key_index && name == _prev_positions[key_index]->first) { |
911 | 0 | return _prev_positions[key_index]->second; |
912 | 0 | } |
913 | 0 | auto it = _slot_desc_index.find(name); |
914 | 0 | if (it != _slot_desc_index.end()) { |
915 | 0 | if (key_index < _prev_positions.size()) { |
916 | 0 | _prev_positions[key_index] = it; |
917 | 0 | } |
918 | 0 | return it->second; |
919 | 0 | } |
920 | 0 | return size_t(-1); |
921 | 0 | } |
922 | | |
923 | | Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* value, Block& block, |
924 | | const std::vector<SlotDescriptor*>& slot_descs, |
925 | 0 | bool* valid) { |
926 | | // set |
927 | 0 | _seen_columns.assign(block.columns(), false); |
928 | 0 | size_t cur_row_count = block.rows(); |
929 | 0 | bool has_valid_value = false; |
930 | | // iterate through object, simdjson::ondemond will parsing on the fly |
931 | 0 | size_t key_index = 0; |
932 | |
|
933 | 0 | for (auto field : *value) { |
934 | 0 | std::string_view key = field.unescaped_key(); |
935 | 0 | StringRef name_ref(key.data(), key.size()); |
936 | 0 | std::string key_string; |
937 | 0 | if (_is_hive_table) { |
938 | 0 | key_string = name_ref.to_string(); |
939 | 0 | std::transform(key_string.begin(), key_string.end(), key_string.begin(), ::tolower); |
940 | 0 | name_ref = StringRef(key_string); |
941 | 0 | } |
942 | 0 | const size_t column_index = _column_index(name_ref, key_index++); |
943 | 0 | if (UNLIKELY(ssize_t(column_index) < 0)) { |
944 | | // This key is not exist in slot desc, just ignore |
945 | 0 | continue; |
946 | 0 | } |
947 | 0 | if (column_index == skip_bitmap_col_idx) { |
948 | 0 | continue; |
949 | 0 | } |
950 | 0 | if (_seen_columns[column_index]) { |
951 | 0 | if (_is_hive_table) { |
952 | | //Since value can only be traversed once, |
953 | | // we can only insert the original value first, then delete it, and then reinsert the new value |
954 | 0 | block.get_by_position(column_index).column->assume_mutable()->pop_back(1); |
955 | 0 | } else { |
956 | 0 | continue; |
957 | 0 | } |
958 | 0 | } |
959 | 0 | simdjson::ondemand::value val = field.value(); |
960 | 0 | auto* column_ptr = block.get_by_position(column_index).column->assume_mutable().get(); |
961 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<false>( |
962 | 0 | val, slot_descs[column_index]->type(), column_ptr, |
963 | 0 | slot_descs[column_index]->col_name(), _serdes[column_index], valid)); |
964 | 0 | if (!(*valid)) { |
965 | 0 | return Status::OK(); |
966 | 0 | } |
967 | 0 | _seen_columns[column_index] = true; |
968 | 0 | has_valid_value = true; |
969 | 0 | } |
970 | | |
971 | 0 | if (!has_valid_value && _is_load) { |
972 | 0 | std::string col_names; |
973 | 0 | for (auto* slot_desc : slot_descs) { |
974 | 0 | col_names.append(slot_desc->col_name() + ", "); |
975 | 0 | } |
976 | 0 | RETURN_IF_ERROR(_append_error_msg(value, |
977 | 0 | "There is no column matching jsonpaths in the json file, " |
978 | 0 | "columns:[{}], please check columns " |
979 | 0 | "and jsonpaths:" + |
980 | 0 | _jsonpaths, |
981 | 0 | col_names, valid)); |
982 | 0 | return Status::OK(); |
983 | 0 | } |
984 | | |
985 | 0 | if (_should_process_skip_bitmap_col()) { |
986 | 0 | _append_empty_skip_bitmap_value(block, cur_row_count); |
987 | 0 | } |
988 | | |
989 | | // fill missing slot |
990 | 0 | int nullcount = 0; |
991 | 0 | for (size_t i = 0; i < slot_descs.size(); ++i) { |
992 | 0 | if (_seen_columns[i]) { |
993 | 0 | continue; |
994 | 0 | } |
995 | 0 | if (i == skip_bitmap_col_idx) { |
996 | 0 | continue; |
997 | 0 | } |
998 | | |
999 | 0 | auto* slot_desc = slot_descs[i]; |
1000 | 0 | auto* column_ptr = block.get_by_position(i).column->assume_mutable().get(); |
1001 | | |
1002 | | // Quick path to insert default value, instead of using default values in the value map. |
1003 | 0 | if (!_should_process_skip_bitmap_col() && |
1004 | 0 | (_col_default_value_map.empty() || |
1005 | 0 | _col_default_value_map.find(slot_desc->col_name()) == _col_default_value_map.end())) { |
1006 | 0 | column_ptr->insert_default(); |
1007 | 0 | continue; |
1008 | 0 | } |
1009 | 0 | if (column_ptr->size() < cur_row_count + 1) { |
1010 | 0 | DCHECK(column_ptr->size() == cur_row_count); |
1011 | 0 | if (_should_process_skip_bitmap_col()) { |
1012 | | // not found, skip this column in flexible partial update |
1013 | 0 | if (slot_desc->is_key() && !slot_desc->is_auto_increment()) { |
1014 | 0 | RETURN_IF_ERROR( |
1015 | 0 | _append_error_msg(value, |
1016 | 0 | "The key columns can not be ommited in flexible " |
1017 | 0 | "partial update, missing key column: {}", |
1018 | 0 | slot_desc->col_name(), valid)); |
1019 | | // remove this line in block |
1020 | 0 | for (size_t index = 0; index < block.columns(); ++index) { |
1021 | 0 | auto column = block.get_by_position(index).column->assume_mutable(); |
1022 | 0 | if (column->size() != cur_row_count) { |
1023 | 0 | DCHECK(column->size() == cur_row_count + 1); |
1024 | 0 | column->pop_back(1); |
1025 | 0 | DCHECK(column->size() == cur_row_count); |
1026 | 0 | } |
1027 | 0 | } |
1028 | 0 | return Status::OK(); |
1029 | 0 | } |
1030 | 0 | _set_skip_bitmap_mark(slot_desc, column_ptr, block, cur_row_count, valid); |
1031 | 0 | column_ptr->insert_default(); |
1032 | 0 | } else { |
1033 | 0 | RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid)); |
1034 | 0 | if (!(*valid)) { |
1035 | 0 | return Status::OK(); |
1036 | 0 | } |
1037 | 0 | } |
1038 | 0 | ++nullcount; |
1039 | 0 | } |
1040 | 0 | DCHECK(column_ptr->size() == cur_row_count + 1); |
1041 | 0 | } |
1042 | | |
1043 | | // There is at least one valid value here |
1044 | 0 | DCHECK(nullcount < block.columns()); |
1045 | 0 | *valid = true; |
1046 | 0 | return Status::OK(); |
1047 | 0 | } |
1048 | | |
1049 | | template <bool use_string_cache> |
1050 | | Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value, |
1051 | | const DataTypePtr& type_desc, |
1052 | | IColumn* column_ptr, |
1053 | | const std::string& column_name, |
1054 | 0 | DataTypeSerDeSPtr serde, bool* valid) { |
1055 | 0 | ColumnNullable* nullable_column = nullptr; |
1056 | 0 | IColumn* data_column_ptr = column_ptr; |
1057 | 0 | DataTypeSerDeSPtr data_serde = serde; |
1058 | |
|
1059 | 0 | if (column_ptr->is_nullable()) { |
1060 | 0 | nullable_column = reinterpret_cast<ColumnNullable*>(column_ptr); |
1061 | |
|
1062 | 0 | data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); |
1063 | 0 | data_serde = serde->get_nested_serdes()[0]; |
1064 | | |
1065 | | // kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType. |
1066 | 0 | if (value.type() == simdjson::ondemand::json_type::null) { |
1067 | 0 | nullable_column->insert_default(); |
1068 | 0 | *valid = true; |
1069 | 0 | return Status::OK(); |
1070 | 0 | } |
1071 | 0 | } else if (value.type() == simdjson::ondemand::json_type::null) [[unlikely]] { |
1072 | 0 | if (_is_load) { |
1073 | 0 | RETURN_IF_ERROR(_append_error_msg( |
1074 | 0 | nullptr, "Json value is null, but the column `{}` is not nullable.", |
1075 | 0 | column_name, valid)); |
1076 | 0 | return Status::OK(); |
1077 | 0 | } else { |
1078 | 0 | return Status::DataQualityError( |
1079 | 0 | "Json value is null, but the column `{}` is not nullable.", column_name); |
1080 | 0 | } |
1081 | 0 | } |
1082 | | |
1083 | 0 | auto primitive_type = type_desc->get_primitive_type(); |
1084 | 0 | if (_is_load || !is_complex_type(primitive_type)) { |
1085 | 0 | if (value.type() == simdjson::ondemand::json_type::string) { |
1086 | 0 | std::string_view value_string; |
1087 | 0 | if constexpr (use_string_cache) { |
1088 | 0 | const auto cache_key = value.raw_json().value(); |
1089 | 0 | if (_cached_string_values.contains(cache_key)) { |
1090 | 0 | value_string = _cached_string_values[cache_key]; |
1091 | 0 | } else { |
1092 | 0 | value_string = value.get_string(); |
1093 | 0 | _cached_string_values.emplace(cache_key, value_string); |
1094 | 0 | } |
1095 | 0 | } else { |
1096 | 0 | DCHECK(_cached_string_values.empty()); |
1097 | 0 | value_string = value.get_string(); |
1098 | 0 | } |
1099 | |
|
1100 | 0 | Slice slice {value_string.data(), value_string.size()}; |
1101 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1102 | 0 | _serde_options)); |
1103 | |
|
1104 | 0 | } else if (value.type() == simdjson::ondemand::json_type::boolean) { |
1105 | 0 | const char* str_value = nullptr; |
1106 | | // insert "1"/"0" , not "true"/"false". |
1107 | 0 | if (value.get_bool()) { |
1108 | 0 | str_value = (char*)"1"; |
1109 | 0 | } else { |
1110 | 0 | str_value = (char*)"0"; |
1111 | 0 | } |
1112 | 0 | Slice slice {str_value, 1}; |
1113 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1114 | 0 | _serde_options)); |
1115 | 0 | } else { |
1116 | | // Maybe we can `switch (value->GetType()) case: kNumberType`. |
1117 | | // Note that `if (value->IsInt())`, but column is FloatColumn. |
1118 | 0 | std::string_view json_str = simdjson::to_json_string(value); |
1119 | 0 | Slice slice {json_str.data(), json_str.size()}; |
1120 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1121 | 0 | _serde_options)); |
1122 | 0 | } |
1123 | 0 | } else if (primitive_type == TYPE_STRUCT) { |
1124 | 0 | if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] { |
1125 | 0 | return Status::DataQualityError( |
1126 | 0 | "Json value isn't object, but the column `{}` is struct.", column_name); |
1127 | 0 | } |
1128 | | |
1129 | 0 | const auto* type_struct = |
1130 | 0 | assert_cast<const DataTypeStruct*>(remove_nullable(type_desc).get()); |
1131 | 0 | auto sub_col_size = type_struct->get_elements().size(); |
1132 | 0 | simdjson::ondemand::object struct_value = value.get_object(); |
1133 | 0 | auto sub_serdes = data_serde->get_nested_serdes(); |
1134 | 0 | auto* struct_column_ptr = assert_cast<ColumnStruct*>(data_column_ptr); |
1135 | |
|
1136 | 0 | std::map<std::string, size_t> sub_col_name_to_idx; |
1137 | 0 | for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) { |
1138 | 0 | sub_col_name_to_idx.emplace(type_struct->get_element_name(sub_col_idx), sub_col_idx); |
1139 | 0 | } |
1140 | 0 | std::vector<bool> has_value(sub_col_size, false); |
1141 | 0 | for (simdjson::ondemand::field sub : struct_value) { |
1142 | 0 | std::string_view sub_key_view = sub.unescaped_key(); |
1143 | 0 | std::string sub_key(sub_key_view.data(), sub_key_view.length()); |
1144 | 0 | std::transform(sub_key.begin(), sub_key.end(), sub_key.begin(), ::tolower); |
1145 | |
|
1146 | 0 | if (sub_col_name_to_idx.find(sub_key) == sub_col_name_to_idx.end()) [[unlikely]] { |
1147 | 0 | continue; |
1148 | 0 | } |
1149 | 0 | size_t sub_column_idx = sub_col_name_to_idx[sub_key]; |
1150 | 0 | auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr(); |
1151 | |
|
1152 | 0 | if (has_value[sub_column_idx]) [[unlikely]] { |
1153 | | // Since struct_value can only be traversed once, we can only insert |
1154 | | // the original value first, then delete it, and then reinsert the new value. |
1155 | 0 | sub_column_ptr->pop_back(1); |
1156 | 0 | } |
1157 | 0 | has_value[sub_column_idx] = true; |
1158 | |
|
1159 | 0 | const auto& sub_col_type = type_struct->get_element(sub_column_idx); |
1160 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>( |
1161 | 0 | sub.value(), sub_col_type, sub_column_ptr.get(), column_name + "." + sub_key, |
1162 | 0 | sub_serdes[sub_column_idx], valid)); |
1163 | 0 | } |
1164 | | |
1165 | | //fill missing subcolumn |
1166 | 0 | for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) { |
1167 | 0 | if (has_value[sub_col_idx]) { |
1168 | 0 | continue; |
1169 | 0 | } |
1170 | | |
1171 | 0 | auto sub_column_ptr = struct_column_ptr->get_column(sub_col_idx).get_ptr(); |
1172 | 0 | if (sub_column_ptr->is_nullable()) { |
1173 | 0 | sub_column_ptr->insert_default(); |
1174 | 0 | continue; |
1175 | 0 | } else [[unlikely]] { |
1176 | 0 | return Status::DataQualityError( |
1177 | 0 | "Json file structColumn miss field {} and this column isn't nullable.", |
1178 | 0 | column_name + "." + type_struct->get_element_name(sub_col_idx)); |
1179 | 0 | } |
1180 | 0 | } |
1181 | 0 | } else if (primitive_type == TYPE_MAP) { |
1182 | 0 | if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] { |
1183 | 0 | return Status::DataQualityError("Json value isn't object, but the column `{}` is map.", |
1184 | 0 | column_name); |
1185 | 0 | } |
1186 | 0 | simdjson::ondemand::object object_value = value.get_object(); |
1187 | |
|
1188 | 0 | auto sub_serdes = data_serde->get_nested_serdes(); |
1189 | 0 | auto* map_column_ptr = assert_cast<ColumnMap*>(data_column_ptr); |
1190 | |
|
1191 | 0 | size_t field_count = 0; |
1192 | 0 | for (simdjson::ondemand::field member_value : object_value) { |
1193 | 0 | auto f = [](std::string_view key_view, const DataTypePtr& type_desc, |
1194 | 0 | IColumn* column_ptr, DataTypeSerDeSPtr serde, |
1195 | 0 | DataTypeSerDe::FormatOptions serde_options, bool* valid) { |
1196 | 0 | auto* data_column_ptr = column_ptr; |
1197 | 0 | auto data_serde = serde; |
1198 | 0 | if (column_ptr->is_nullable()) { |
1199 | 0 | auto* nullable_column = static_cast<ColumnNullable*>(column_ptr); |
1200 | |
|
1201 | 0 | nullable_column->get_null_map_data().push_back(0); |
1202 | 0 | data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); |
1203 | 0 | data_serde = serde->get_nested_serdes()[0]; |
1204 | 0 | } |
1205 | 0 | Slice slice(key_view.data(), key_view.length()); |
1206 | |
|
1207 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1208 | 0 | serde_options)); |
1209 | 0 | return Status::OK(); |
1210 | 0 | }; Unexecuted instantiation: _ZZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb0EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbENKUlSt17basic_string_viewIcSJ_ESD_SF_SP_NSO_13FormatOptionsESQ_E_clESS_SD_SF_SP_ST_SQ_ Unexecuted instantiation: _ZZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb1EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbENKUlSt17basic_string_viewIcSJ_ESD_SF_SP_NSO_13FormatOptionsESQ_E_clESS_SD_SF_SP_ST_SQ_ |
1211 | |
|
1212 | 0 | RETURN_IF_ERROR(f(member_value.unescaped_key(), |
1213 | 0 | assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get()) |
1214 | 0 | ->get_key_type(), |
1215 | 0 | map_column_ptr->get_keys_ptr()->assume_mutable()->get_ptr().get(), |
1216 | 0 | sub_serdes[0], _serde_options, valid)); |
1217 | | |
1218 | 0 | simdjson::ondemand::value field_value = member_value.value(); |
1219 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>( |
1220 | 0 | field_value, |
1221 | 0 | assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get()) |
1222 | 0 | ->get_value_type(), |
1223 | 0 | map_column_ptr->get_values_ptr()->assume_mutable()->get_ptr().get(), |
1224 | 0 | column_name + ".value", sub_serdes[1], valid)); |
1225 | 0 | field_count++; |
1226 | 0 | } |
1227 | | |
1228 | 0 | auto& offsets = map_column_ptr->get_offsets(); |
1229 | 0 | offsets.emplace_back(offsets.back() + field_count); |
1230 | |
|
1231 | 0 | } else if (primitive_type == TYPE_ARRAY) { |
1232 | 0 | if (value.type() != simdjson::ondemand::json_type::array) [[unlikely]] { |
1233 | 0 | return Status::DataQualityError("Json value isn't array, but the column `{}` is array.", |
1234 | 0 | column_name); |
1235 | 0 | } |
1236 | | |
1237 | 0 | simdjson::ondemand::array array_value = value.get_array(); |
1238 | |
|
1239 | 0 | auto sub_serdes = data_serde->get_nested_serdes(); |
1240 | 0 | auto* array_column_ptr = assert_cast<ColumnArray*>(data_column_ptr); |
1241 | |
|
1242 | 0 | int field_count = 0; |
1243 | 0 | for (simdjson::ondemand::value sub_value : array_value) { |
1244 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>( |
1245 | 0 | sub_value, |
1246 | 0 | assert_cast<const DataTypeArray*>(remove_nullable(type_desc).get()) |
1247 | 0 | ->get_nested_type(), |
1248 | 0 | array_column_ptr->get_data().get_ptr().get(), column_name + ".element", |
1249 | 0 | sub_serdes[0], valid)); |
1250 | 0 | field_count++; |
1251 | 0 | } |
1252 | 0 | auto& offsets = array_column_ptr->get_offsets(); |
1253 | 0 | offsets.emplace_back(offsets.back() + field_count); |
1254 | |
|
1255 | 0 | } else { |
1256 | 0 | return Status::InternalError("Not support load to complex column."); |
1257 | 0 | } |
1258 | | //We need to finally set the nullmap of column_nullable to keep the size consistent with data_column |
1259 | 0 | if (nullable_column && value.type() != simdjson::ondemand::json_type::null) { |
1260 | 0 | nullable_column->get_null_map_data().push_back(0); |
1261 | 0 | } |
1262 | 0 | *valid = true; |
1263 | 0 | return Status::OK(); |
1264 | 0 | } Unexecuted instantiation: _ZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb0EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPb Unexecuted instantiation: _ZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb1EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPb |
1265 | | |
1266 | | Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::string error_msg, |
1267 | 0 | std::string col_name, bool* valid) { |
1268 | 0 | std::string err_msg; |
1269 | 0 | if (!col_name.empty()) { |
1270 | 0 | fmt::memory_buffer error_buf; |
1271 | 0 | fmt::format_to(error_buf, error_msg, col_name, _jsonpaths); |
1272 | 0 | err_msg = fmt::to_string(error_buf); |
1273 | 0 | } else { |
1274 | 0 | err_msg = error_msg; |
1275 | 0 | } |
1276 | |
|
1277 | 0 | _counter->num_rows_filtered++; |
1278 | 0 | if (valid != nullptr) { |
1279 | | // current row is invalid |
1280 | 0 | *valid = false; |
1281 | 0 | } |
1282 | |
|
1283 | 0 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
1284 | 0 | [&]() -> std::string { |
1285 | 0 | if (!obj) { |
1286 | 0 | return ""; |
1287 | 0 | } |
1288 | 0 | std::string_view str_view; |
1289 | 0 | (void)!obj->raw_json().get(str_view); |
1290 | 0 | return std::string(str_view.data(), str_view.size()); |
1291 | 0 | }, |
1292 | 0 | [&]() -> std::string { return err_msg; })); |
1293 | 0 | return Status::OK(); |
1294 | 0 | } |
1295 | | |
1296 | | Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, |
1297 | 0 | simdjson::error_code* error) { |
1298 | 0 | SCOPED_TIMER(_read_timer); |
1299 | | // step1: read buf from pipe. |
1300 | 0 | if (_line_reader != nullptr) { |
1301 | 0 | RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx)); |
1302 | 0 | } else { |
1303 | 0 | size_t length = 0; |
1304 | 0 | RETURN_IF_ERROR(_read_one_message(&_json_str_ptr, &length)); |
1305 | 0 | _json_str = _json_str_ptr.get(); |
1306 | 0 | *size = length; |
1307 | 0 | if (length == 0) { |
1308 | 0 | *eof = true; |
1309 | 0 | } |
1310 | 0 | } |
1311 | 0 | if (*eof) { |
1312 | 0 | return Status::OK(); |
1313 | 0 | } |
1314 | | |
1315 | | // step2: init json parser iterate. |
1316 | 0 | if (*size + simdjson::SIMDJSON_PADDING > _padded_size) { |
1317 | | // For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end. |
1318 | | // Hence, a re-allocation is needed if the space is not enough. |
1319 | 0 | _simdjson_ondemand_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING); |
1320 | 0 | _simdjson_ondemand_unscape_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING); |
1321 | 0 | _padded_size = *size + simdjson::SIMDJSON_PADDING; |
1322 | 0 | } |
1323 | | // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM) |
1324 | 0 | if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' && |
1325 | 0 | static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') { |
1326 | | // skip the first three BOM bytes |
1327 | 0 | _json_str += 3; |
1328 | 0 | *size -= 3; |
1329 | 0 | } |
1330 | 0 | memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size); |
1331 | 0 | _original_doc_size = *size; |
1332 | 0 | *error = _ondemand_json_parser |
1333 | 0 | ->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size), |
1334 | 0 | _padded_size) |
1335 | 0 | .get(_original_json_doc); |
1336 | 0 | return Status::OK(); |
1337 | 0 | } |
1338 | | |
1339 | 0 | Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row) { |
1340 | 0 | if (size == 0 || eof) { |
1341 | 0 | *is_empty_row = true; |
1342 | 0 | return Status::OK(); |
1343 | 0 | } |
1344 | | |
1345 | 0 | if (!_parsed_jsonpaths.empty() && _strip_outer_array) { |
1346 | 0 | _total_rows = _json_value.count_elements().value(); |
1347 | 0 | _next_row = 0; |
1348 | |
|
1349 | 0 | if (_total_rows == 0) { |
1350 | | // meet an empty json array. |
1351 | 0 | *is_empty_row = true; |
1352 | 0 | } |
1353 | 0 | } |
1354 | 0 | return Status::OK(); |
1355 | 0 | } |
1356 | | |
1357 | | Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error, |
1358 | 0 | bool* is_empty_row) { |
1359 | 0 | SCOPED_TIMER(_read_timer); |
1360 | 0 | auto return_quality_error = [&](fmt::memory_buffer& error_msg, |
1361 | 0 | const std::string& doc_info) -> Status { |
1362 | 0 | _counter->num_rows_filtered++; |
1363 | 0 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
1364 | 0 | [&]() -> std::string { return doc_info; }, |
1365 | 0 | [&]() -> std::string { return fmt::to_string(error_msg); })); |
1366 | 0 | if (*_scanner_eof) { |
1367 | | // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means |
1368 | | // we meet enough invalid rows and the scanner should be stopped. |
1369 | | // So we set eof to true and return OK, the caller will stop the process as we meet the end of file. |
1370 | 0 | *eof = true; |
1371 | 0 | return Status::OK(); |
1372 | 0 | } |
1373 | 0 | return Status::DataQualityError(fmt::to_string(error_msg)); |
1374 | 0 | }; |
1375 | 0 | if (*error != simdjson::error_code::SUCCESS) { |
1376 | 0 | fmt::memory_buffer error_msg; |
1377 | 0 | fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", |
1378 | 0 | *error, simdjson::error_message(*error)); |
1379 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1380 | 0 | } |
1381 | 0 | auto type_res = _original_json_doc.type(); |
1382 | 0 | if (type_res.error() != simdjson::error_code::SUCCESS) { |
1383 | 0 | fmt::memory_buffer error_msg; |
1384 | 0 | fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", |
1385 | 0 | type_res.error(), simdjson::error_message(type_res.error())); |
1386 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1387 | 0 | } |
1388 | 0 | simdjson::ondemand::json_type type = type_res.value(); |
1389 | 0 | if (type != simdjson::ondemand::json_type::object && |
1390 | 0 | type != simdjson::ondemand::json_type::array) { |
1391 | 0 | fmt::memory_buffer error_msg; |
1392 | 0 | fmt::format_to(error_msg, "Not an json object or json array"); |
1393 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1394 | 0 | } |
1395 | 0 | if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { |
1396 | 0 | try { |
1397 | | // set json root |
1398 | | // if it is an array at top level, then we should iterate the entire array in |
1399 | | // ::_simdjson_handle_flat_array_complex_json |
1400 | 0 | simdjson::ondemand::object object = _original_json_doc; |
1401 | 0 | Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value); |
1402 | 0 | if (!st.ok()) { |
1403 | 0 | fmt::memory_buffer error_msg; |
1404 | 0 | fmt::format_to(error_msg, "{}", st.to_string()); |
1405 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1406 | 0 | } |
1407 | 0 | _parsed_from_json_root = true; |
1408 | 0 | } catch (simdjson::simdjson_error& e) { |
1409 | 0 | fmt::memory_buffer error_msg; |
1410 | 0 | fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}", |
1411 | 0 | e.what()); |
1412 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1413 | 0 | } |
1414 | 0 | } else { |
1415 | 0 | _json_value = _original_json_doc; |
1416 | 0 | } |
1417 | | |
1418 | 0 | if (_json_value.type() == simdjson::ondemand::json_type::array && !_strip_outer_array) { |
1419 | 0 | fmt::memory_buffer error_msg; |
1420 | 0 | fmt::format_to(error_msg, "{}", |
1421 | 0 | "JSON data is array-object, `strip_outer_array` must be TRUE."); |
1422 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1423 | 0 | } |
1424 | | |
1425 | 0 | if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) { |
1426 | 0 | fmt::memory_buffer error_msg; |
1427 | 0 | fmt::format_to(error_msg, "{}", |
1428 | 0 | "JSON data is not an array-object, `strip_outer_array` must be FALSE."); |
1429 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1430 | 0 | } |
1431 | 0 | RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row)); |
1432 | 0 | return Status::OK(); |
1433 | 0 | } |
1434 | | |
1435 | | Status NewJsonReader::_simdjson_write_columns_by_jsonpath( |
1436 | | simdjson::ondemand::object* value, const std::vector<SlotDescriptor*>& slot_descs, |
1437 | 0 | Block& block, bool* valid) { |
1438 | | // write by jsonpath |
1439 | 0 | bool has_valid_value = false; |
1440 | |
|
1441 | 0 | Defer clear_defer([this]() { _cached_string_values.clear(); }); |
1442 | |
|
1443 | 0 | for (size_t i = 0; i < slot_descs.size(); i++) { |
1444 | 0 | auto* slot_desc = slot_descs[i]; |
1445 | 0 | auto* column_ptr = block.get_by_position(i).column->assume_mutable().get(); |
1446 | 0 | simdjson::ondemand::value json_value; |
1447 | 0 | Status st; |
1448 | 0 | if (i < _parsed_jsonpaths.size()) { |
1449 | 0 | st = JsonFunctions::extract_from_object(*value, _parsed_jsonpaths[i], &json_value); |
1450 | 0 | if (!st.ok() && !st.is<NOT_FOUND>()) { |
1451 | 0 | return st; |
1452 | 0 | } |
1453 | 0 | } |
1454 | 0 | if (i < _parsed_jsonpaths.size() && JsonFunctions::is_root_path(_parsed_jsonpaths[i])) { |
1455 | | // Indicate that the jsonpath is "$" or "$.", read the full root json object, insert the original doc directly |
1456 | 0 | ColumnNullable* nullable_column = nullptr; |
1457 | 0 | IColumn* target_column_ptr = nullptr; |
1458 | 0 | if (slot_desc->is_nullable()) { |
1459 | 0 | nullable_column = assert_cast<ColumnNullable*>(column_ptr); |
1460 | 0 | target_column_ptr = &nullable_column->get_nested_column(); |
1461 | 0 | nullable_column->get_null_map_data().push_back(0); |
1462 | 0 | } |
1463 | 0 | auto* column_string = assert_cast<ColumnString*>(target_column_ptr); |
1464 | 0 | column_string->insert_data(_simdjson_ondemand_padding_buffer.data(), |
1465 | 0 | _original_doc_size); |
1466 | 0 | has_valid_value = true; |
1467 | 0 | } else if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) { |
1468 | | // not match in jsondata, filling with default value |
1469 | 0 | RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid)); |
1470 | 0 | if (!(*valid)) { |
1471 | 0 | return Status::OK(); |
1472 | 0 | } |
1473 | 0 | } else { |
1474 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<true>(json_value, slot_desc->type(), |
1475 | 0 | column_ptr, slot_desc->col_name(), |
1476 | 0 | _serdes[i], valid)); |
1477 | 0 | if (!(*valid)) { |
1478 | 0 | return Status::OK(); |
1479 | 0 | } |
1480 | 0 | has_valid_value = true; |
1481 | 0 | } |
1482 | 0 | } |
1483 | 0 | if (!has_valid_value) { |
1484 | | // there is no valid value in json line but has filled with default value before |
1485 | | // so remove this line in block |
1486 | 0 | std::string col_names; |
1487 | 0 | for (int i = 0; i < block.columns(); ++i) { |
1488 | 0 | auto column = block.get_by_position(i).column->assume_mutable(); |
1489 | 0 | column->pop_back(1); |
1490 | 0 | } |
1491 | 0 | for (auto* slot_desc : slot_descs) { |
1492 | 0 | col_names.append(slot_desc->col_name() + ", "); |
1493 | 0 | } |
1494 | 0 | RETURN_IF_ERROR(_append_error_msg(value, |
1495 | 0 | "There is no column matching jsonpaths in the json file, " |
1496 | 0 | "columns:[{}], please check columns " |
1497 | 0 | "and jsonpaths:" + |
1498 | 0 | _jsonpaths, |
1499 | 0 | col_names, valid)); |
1500 | 0 | return Status::OK(); |
1501 | 0 | } |
1502 | 0 | *valid = true; |
1503 | 0 | return Status::OK(); |
1504 | 0 | } |
1505 | | |
1506 | | Status NewJsonReader::_get_column_default_value( |
1507 | | const std::vector<SlotDescriptor*>& slot_descs, |
1508 | 0 | const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx) { |
1509 | 0 | for (auto* slot_desc : slot_descs) { |
1510 | 0 | auto it = col_default_value_ctx.find(slot_desc->col_name()); |
1511 | 0 | if (it != col_default_value_ctx.end() && it->second != nullptr) { |
1512 | 0 | const auto& ctx = it->second; |
1513 | | // NULL_LITERAL means no valid value of current column |
1514 | 0 | if (ctx->root()->node_type() == TExprNodeType::type::NULL_LITERAL) { |
1515 | 0 | continue; |
1516 | 0 | } |
1517 | 0 | ColumnWithTypeAndName result; |
1518 | 0 | RETURN_IF_ERROR(ctx->execute_const_expr(result)); |
1519 | 0 | DCHECK(result.column->size() == 1); |
1520 | 0 | _col_default_value_map.emplace(slot_desc->col_name(), |
1521 | 0 | result.column->get_data_at(0).to_string()); |
1522 | 0 | } |
1523 | 0 | } |
1524 | 0 | return Status::OK(); |
1525 | 0 | } |
1526 | | |
1527 | | Status NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde, |
1528 | 0 | IColumn* column_ptr, bool* valid) { |
1529 | 0 | auto col_value = _col_default_value_map.find(slot_desc->col_name()); |
1530 | 0 | if (col_value == _col_default_value_map.end()) { |
1531 | 0 | if (slot_desc->is_nullable()) { |
1532 | 0 | auto* nullable_column = static_cast<ColumnNullable*>(column_ptr); |
1533 | 0 | nullable_column->insert_default(); |
1534 | 0 | } else { |
1535 | 0 | if (_is_load) { |
1536 | 0 | RETURN_IF_ERROR(_append_error_msg( |
1537 | 0 | nullptr, "The column `{}` is not nullable, but it's not found in jsondata.", |
1538 | 0 | slot_desc->col_name(), valid)); |
1539 | 0 | } else { |
1540 | 0 | return Status::DataQualityError( |
1541 | 0 | "The column `{}` is not nullable, but it's not found in jsondata.", |
1542 | 0 | slot_desc->col_name()); |
1543 | 0 | } |
1544 | 0 | } |
1545 | 0 | } else { |
1546 | 0 | const std::string& v_str = col_value->second; |
1547 | 0 | Slice column_default_value {v_str}; |
1548 | 0 | RETURN_IF_ERROR(serde->deserialize_one_cell_from_json(*column_ptr, column_default_value, |
1549 | 0 | _serde_options)); |
1550 | 0 | } |
1551 | 0 | *valid = true; |
1552 | 0 | return Status::OK(); |
1553 | 0 | } |
1554 | | |
1555 | 0 | void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) { |
1556 | 0 | auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>( |
1557 | 0 | block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()); |
1558 | 0 | auto* skip_bitmap_col_ptr = |
1559 | 0 | assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get()); |
1560 | 0 | DCHECK(skip_bitmap_nullable_col_ptr->size() == cur_row_count); |
1561 | | // should append an empty bitmap for every row wheather this line misses columns |
1562 | 0 | skip_bitmap_nullable_col_ptr->get_null_map_data().push_back(0); |
1563 | 0 | skip_bitmap_col_ptr->insert_default(); |
1564 | 0 | DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1); |
1565 | 0 | } |
1566 | | |
1567 | | void NewJsonReader::_set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, |
1568 | 0 | Block& block, size_t cur_row_count, bool* valid) { |
1569 | | // we record the missing column's column unique id in skip bitmap |
1570 | | // to indicate which columns need to do the alignment process |
1571 | 0 | auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>( |
1572 | 0 | block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()); |
1573 | 0 | auto* skip_bitmap_col_ptr = |
1574 | 0 | assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get()); |
1575 | 0 | DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1); |
1576 | 0 | auto& skip_bitmap = skip_bitmap_col_ptr->get_data().back(); |
1577 | 0 | skip_bitmap.add(slot_desc->col_unique_id()); |
1578 | 0 | } |
1579 | | |
1580 | 0 | void NewJsonReader::_collect_profile_before_close() { |
1581 | 0 | if (_line_reader != nullptr) { |
1582 | 0 | _line_reader->collect_profile_before_close(); |
1583 | 0 | } |
1584 | 0 | if (_file_reader != nullptr) { |
1585 | 0 | _file_reader->collect_profile_before_close(); |
1586 | 0 | } |
1587 | 0 | } |
1588 | | |
1589 | | #include "common/compile_check_end.h" |
1590 | | } // namespace doris |