be/src/format/json/new_json_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/json/new_json_reader.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/Metrics_types.h> |
22 | | #include <gen_cpp/PlanNodes_types.h> |
23 | | #include <gen_cpp/Types_types.h> |
24 | | #include <glog/logging.h> |
25 | | #include <rapidjson/error/en.h> |
26 | | #include <rapidjson/reader.h> |
27 | | #include <rapidjson/stringbuffer.h> |
28 | | #include <rapidjson/writer.h> |
29 | | #include <simdjson/simdjson.h> // IWYU pragma: keep |
30 | | |
31 | | #include <algorithm> |
32 | | #include <cinttypes> |
33 | | #include <cstdio> |
34 | | #include <cstring> |
35 | | #include <map> |
36 | | #include <memory> |
37 | | #include <string_view> |
38 | | #include <utility> |
39 | | |
40 | | #include "common/compiler_util.h" // IWYU pragma: keep |
41 | | #include "common/config.h" |
42 | | #include "common/status.h" |
43 | | #include "core/assert_cast.h" |
44 | | #include "core/block/column_with_type_and_name.h" |
45 | | #include "core/column/column.h" |
46 | | #include "core/column/column_array.h" |
47 | | #include "core/column/column_map.h" |
48 | | #include "core/column/column_nullable.h" |
49 | | #include "core/column/column_string.h" |
50 | | #include "core/column/column_struct.h" |
51 | | #include "core/custom_allocator.h" |
52 | | #include "core/data_type/data_type_array.h" |
53 | | #include "core/data_type/data_type_factory.hpp" |
54 | | #include "core/data_type/data_type_map.h" |
55 | | #include "core/data_type/data_type_number.h" // IWYU pragma: keep |
56 | | #include "core/data_type/data_type_struct.h" |
57 | | #include "core/data_type/define_primitive_type.h" |
58 | | #include "exec/scan/scanner.h" |
59 | | #include "exprs/json_functions.h" |
60 | | #include "format/file_reader/new_plain_text_line_reader.h" |
61 | | #include "io/file_factory.h" |
62 | | #include "io/fs/buffered_reader.h" |
63 | | #include "io/fs/file_reader.h" |
64 | | #include "io/fs/stream_load_pipe.h" |
65 | | #include "io/fs/tracing_file_reader.h" |
66 | | #include "runtime/descriptors.h" |
67 | | #include "runtime/runtime_state.h" |
68 | | #include "util/slice.h" |
69 | | |
70 | | namespace doris::io { |
71 | | struct IOContext; |
72 | | enum class FileCachePolicy : uint8_t; |
73 | | } // namespace doris::io |
74 | | |
75 | | namespace doris { |
76 | | #include "common/compile_check_begin.h" |
77 | | using namespace ErrorCode; |
78 | | |
79 | | NewJsonReader::NewJsonReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
80 | | const TFileScanRangeParams& params, const TFileRangeDesc& range, |
81 | | const std::vector<SlotDescriptor*>& file_slot_descs, bool* scanner_eof, |
82 | | io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder) |
83 | 0 | : _vhandle_json_callback(nullptr), |
84 | 0 | _state(state), |
85 | 0 | _profile(profile), |
86 | 0 | _counter(counter), |
87 | 0 | _params(params), |
88 | 0 | _range(range), |
89 | 0 | _file_slot_descs(file_slot_descs), |
90 | 0 | _file_reader(nullptr), |
91 | 0 | _line_reader(nullptr), |
92 | 0 | _reader_eof(false), |
93 | 0 | _decompressor(nullptr), |
94 | 0 | _skip_first_line(false), |
95 | 0 | _next_row(0), |
96 | 0 | _total_rows(0), |
97 | 0 | _value_allocator(_value_buffer, sizeof(_value_buffer)), |
98 | 0 | _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), |
99 | 0 | _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), |
100 | 0 | _scanner_eof(scanner_eof), |
101 | 0 | _current_offset(0), |
102 | 0 | _io_ctx(io_ctx), |
103 | 0 | _io_ctx_holder(std::move(io_ctx_holder)) { |
104 | 0 | if (_io_ctx == nullptr && _io_ctx_holder) { |
105 | 0 | _io_ctx = _io_ctx_holder.get(); |
106 | 0 | } |
107 | 0 | _read_timer = ADD_TIMER(_profile, "ReadTime"); |
108 | 0 | if (_range.__isset.compress_type) { |
109 | | // for compatibility |
110 | 0 | _file_compress_type = _range.compress_type; |
111 | 0 | } else { |
112 | 0 | _file_compress_type = _params.compress_type; |
113 | 0 | } |
114 | 0 | _init_system_properties(); |
115 | 0 | _init_file_description(); |
116 | 0 | } |
117 | | |
118 | | NewJsonReader::NewJsonReader(RuntimeProfile* profile, const TFileScanRangeParams& params, |
119 | | const TFileRangeDesc& range, |
120 | | const std::vector<SlotDescriptor*>& file_slot_descs, |
121 | | io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder) |
122 | 0 | : _vhandle_json_callback(nullptr), |
123 | 0 | _state(nullptr), |
124 | 0 | _profile(profile), |
125 | 0 | _params(params), |
126 | 0 | _range(range), |
127 | 0 | _file_slot_descs(file_slot_descs), |
128 | 0 | _line_reader(nullptr), |
129 | 0 | _reader_eof(false), |
130 | 0 | _decompressor(nullptr), |
131 | 0 | _skip_first_line(false), |
132 | 0 | _next_row(0), |
133 | 0 | _total_rows(0), |
134 | 0 | _value_allocator(_value_buffer, sizeof(_value_buffer)), |
135 | 0 | _parse_allocator(_parse_buffer, sizeof(_parse_buffer)), |
136 | 0 | _origin_json_doc(&_value_allocator, sizeof(_parse_buffer), &_parse_allocator), |
137 | 0 | _io_ctx(io_ctx), |
138 | 0 | _io_ctx_holder(std::move(io_ctx_holder)) { |
139 | 0 | if (_io_ctx == nullptr && _io_ctx_holder) { |
140 | 0 | _io_ctx = _io_ctx_holder.get(); |
141 | 0 | } |
142 | 0 | if (_range.__isset.compress_type) { |
143 | | // for compatibility |
144 | 0 | _file_compress_type = _range.compress_type; |
145 | 0 | } else { |
146 | 0 | _file_compress_type = _params.compress_type; |
147 | 0 | } |
148 | 0 | _init_system_properties(); |
149 | 0 | _init_file_description(); |
150 | 0 | } |
151 | | |
152 | 0 | void NewJsonReader::_init_system_properties() { |
153 | 0 | if (_range.__isset.file_type) { |
154 | | // for compatibility |
155 | 0 | _system_properties.system_type = _range.file_type; |
156 | 0 | } else { |
157 | 0 | _system_properties.system_type = _params.file_type; |
158 | 0 | } |
159 | 0 | _system_properties.properties = _params.properties; |
160 | 0 | _system_properties.hdfs_params = _params.hdfs_params; |
161 | 0 | if (_params.__isset.broker_addresses) { |
162 | 0 | _system_properties.broker_addresses.assign(_params.broker_addresses.begin(), |
163 | 0 | _params.broker_addresses.end()); |
164 | 0 | } |
165 | 0 | } |
166 | | |
167 | 0 | void NewJsonReader::_init_file_description() { |
168 | 0 | _file_description.path = _range.path; |
169 | 0 | _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1; |
170 | |
|
171 | 0 | if (_range.__isset.fs_name) { |
172 | 0 | _file_description.fs_name = _range.fs_name; |
173 | 0 | } |
174 | 0 | } |
175 | | |
176 | | Status NewJsonReader::init_reader( |
177 | | const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx, |
178 | 0 | bool is_load) { |
179 | 0 | _is_load = is_load; |
180 | | |
181 | | // generate _col_default_value_map |
182 | 0 | RETURN_IF_ERROR(_get_column_default_value(_file_slot_descs, col_default_value_ctx)); |
183 | | |
184 | | //use serde insert data to column. |
185 | 0 | for (auto* slot_desc : _file_slot_descs) { |
186 | 0 | _serdes.emplace_back(slot_desc->get_data_type_ptr()->get_serde()); |
187 | 0 | } |
188 | | |
189 | | // create decompressor. |
190 | | // _decompressor may be nullptr if this is not a compressed file |
191 | 0 | RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); |
192 | | |
193 | 0 | RETURN_IF_ERROR(_simdjson_init_reader()); |
194 | 0 | return Status::OK(); |
195 | 0 | } |
196 | | |
197 | 0 | Status NewJsonReader::get_next_block(Block* block, size_t* read_rows, bool* eof) { |
198 | 0 | if (_reader_eof) { |
199 | 0 | *eof = true; |
200 | 0 | return Status::OK(); |
201 | 0 | } |
202 | | |
203 | 0 | const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE); |
204 | |
|
205 | 0 | while (block->rows() < batch_size && !_reader_eof) { |
206 | 0 | if (UNLIKELY(_read_json_by_line && _skip_first_line)) { |
207 | 0 | size_t size = 0; |
208 | 0 | const uint8_t* line_ptr = nullptr; |
209 | 0 | RETURN_IF_ERROR(_line_reader->read_line(&line_ptr, &size, &_reader_eof, _io_ctx)); |
210 | 0 | _skip_first_line = false; |
211 | 0 | continue; |
212 | 0 | } |
213 | | |
214 | 0 | bool is_empty_row = false; |
215 | |
|
216 | 0 | RETURN_IF_ERROR( |
217 | 0 | _read_json_column(_state, *block, _file_slot_descs, &is_empty_row, &_reader_eof)); |
218 | 0 | if (is_empty_row) { |
219 | | // Read empty row, just continue |
220 | 0 | continue; |
221 | 0 | } |
222 | 0 | ++(*read_rows); |
223 | 0 | } |
224 | | |
225 | 0 | return Status::OK(); |
226 | 0 | } |
227 | | |
228 | | Status NewJsonReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
229 | 0 | std::unordered_set<std::string>* missing_cols) { |
230 | 0 | for (const auto& slot : _file_slot_descs) { |
231 | 0 | name_to_type->emplace(slot->col_name(), slot->type()); |
232 | 0 | } |
233 | 0 | return Status::OK(); |
234 | 0 | } |
235 | | |
236 | | // init decompressor, file reader and line reader for parsing schema |
237 | 0 | Status NewJsonReader::init_schema_reader() { |
238 | 0 | RETURN_IF_ERROR(_get_range_params()); |
239 | | // create decompressor. |
240 | | // _decompressor may be nullptr if this is not a compressed file |
241 | 0 | RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); |
242 | 0 | RETURN_IF_ERROR(_open_file_reader(true)); |
243 | 0 | if (_read_json_by_line) { |
244 | 0 | RETURN_IF_ERROR(_open_line_reader()); |
245 | 0 | } |
246 | | // generate _parsed_jsonpaths and _parsed_json_root |
247 | 0 | RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); |
248 | 0 | return Status::OK(); |
249 | 0 | } |
250 | | |
251 | | Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, |
252 | 0 | std::vector<DataTypePtr>* col_types) { |
253 | 0 | bool eof = false; |
254 | 0 | const uint8_t* json_str = nullptr; |
255 | 0 | DorisUniqueBufferPtr<uint8_t> json_str_ptr; |
256 | 0 | size_t size = 0; |
257 | 0 | if (_line_reader != nullptr) { |
258 | 0 | RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof, _io_ctx)); |
259 | 0 | } else { |
260 | 0 | size_t read_size = 0; |
261 | 0 | RETURN_IF_ERROR(_read_one_message(&json_str_ptr, &read_size)); |
262 | 0 | json_str = json_str_ptr.get(); |
263 | 0 | size = read_size; |
264 | 0 | if (read_size == 0) { |
265 | 0 | eof = true; |
266 | 0 | } |
267 | 0 | } |
268 | | |
269 | 0 | if (size == 0 || eof) { |
270 | 0 | return Status::EndOfFile("Empty file."); |
271 | 0 | } |
272 | | |
273 | | // clear memory here. |
274 | 0 | _value_allocator.Clear(); |
275 | 0 | _parse_allocator.Clear(); |
276 | 0 | bool has_parse_error = false; |
277 | | |
278 | | // parse jsondata to JsonDoc |
279 | | // As the issue: https://github.com/Tencent/rapidjson/issues/1458 |
280 | | // Now, rapidjson only support uint64_t, So lagreint load cause bug. We use kParseNumbersAsStringsFlag. |
281 | 0 | if (_num_as_string) { |
282 | 0 | has_parse_error = |
283 | 0 | _origin_json_doc.Parse<rapidjson::kParseNumbersAsStringsFlag>((char*)json_str, size) |
284 | 0 | .HasParseError(); |
285 | 0 | } else { |
286 | 0 | has_parse_error = _origin_json_doc.Parse((char*)json_str, size).HasParseError(); |
287 | 0 | } |
288 | |
|
289 | 0 | if (has_parse_error) { |
290 | 0 | return Status::DataQualityError( |
291 | 0 | "Parse json data for JsonDoc failed. code: {}, error info: {}", |
292 | 0 | _origin_json_doc.GetParseError(), |
293 | 0 | rapidjson::GetParseError_En(_origin_json_doc.GetParseError())); |
294 | 0 | } |
295 | | |
296 | | // set json root |
297 | 0 | if (!_parsed_json_root.empty()) { |
298 | 0 | _json_doc = JsonFunctions::get_json_object_from_parsed_json( |
299 | 0 | _parsed_json_root, &_origin_json_doc, _origin_json_doc.GetAllocator()); |
300 | 0 | if (_json_doc == nullptr) { |
301 | 0 | return Status::DataQualityError("JSON Root not found."); |
302 | 0 | } |
303 | 0 | } else { |
304 | 0 | _json_doc = &_origin_json_doc; |
305 | 0 | } |
306 | | |
307 | 0 | if (_json_doc->IsArray() && !_strip_outer_array) { |
308 | 0 | return Status::DataQualityError( |
309 | 0 | "JSON data is array-object, `strip_outer_array` must be TRUE."); |
310 | 0 | } |
311 | 0 | if (!_json_doc->IsArray() && _strip_outer_array) { |
312 | 0 | return Status::DataQualityError( |
313 | 0 | "JSON data is not an array-object, `strip_outer_array` must be FALSE."); |
314 | 0 | } |
315 | | |
316 | 0 | rapidjson::Value* objectValue = nullptr; |
317 | 0 | if (_json_doc->IsArray()) { |
318 | 0 | if (_json_doc->Size() == 0) { |
319 | | // may be passing an empty json, such as "[]" |
320 | 0 | return Status::InternalError<false>("Empty first json line"); |
321 | 0 | } |
322 | 0 | objectValue = &(*_json_doc)[0]; |
323 | 0 | } else { |
324 | 0 | objectValue = _json_doc; |
325 | 0 | } |
326 | | |
327 | 0 | if (!objectValue->IsObject()) { |
328 | 0 | return Status::DataQualityError("JSON data is not an object. but: {}", |
329 | 0 | objectValue->GetType()); |
330 | 0 | } |
331 | | |
332 | | // use jsonpaths to col_names |
333 | 0 | if (!_parsed_jsonpaths.empty()) { |
334 | 0 | for (auto& _parsed_jsonpath : _parsed_jsonpaths) { |
335 | 0 | size_t len = _parsed_jsonpath.size(); |
336 | 0 | if (len == 0) { |
337 | 0 | return Status::InvalidArgument("It's invalid jsonpaths."); |
338 | 0 | } |
339 | 0 | std::string key = _parsed_jsonpath[len - 1].key; |
340 | 0 | col_names->emplace_back(key); |
341 | 0 | col_types->emplace_back( |
342 | 0 | DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true)); |
343 | 0 | } |
344 | 0 | return Status::OK(); |
345 | 0 | } |
346 | | |
347 | 0 | for (int i = 0; i < objectValue->MemberCount(); ++i) { |
348 | 0 | auto it = objectValue->MemberBegin() + i; |
349 | 0 | col_names->emplace_back(it->name.GetString()); |
350 | 0 | col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>())); |
351 | 0 | } |
352 | 0 | return Status::OK(); |
353 | 0 | } |
354 | | |
355 | 0 | Status NewJsonReader::_get_range_params() { |
356 | 0 | if (!_params.__isset.file_attributes) { |
357 | 0 | return Status::InternalError<false>("BE cat get file_attributes"); |
358 | 0 | } |
359 | | |
360 | | // get line_delimiter |
361 | 0 | if (_params.file_attributes.__isset.text_params && |
362 | 0 | _params.file_attributes.text_params.__isset.line_delimiter) { |
363 | 0 | _line_delimiter = _params.file_attributes.text_params.line_delimiter; |
364 | 0 | _line_delimiter_length = _line_delimiter.size(); |
365 | 0 | } |
366 | |
|
367 | 0 | if (_params.file_attributes.__isset.jsonpaths) { |
368 | 0 | _jsonpaths = _params.file_attributes.jsonpaths; |
369 | 0 | } |
370 | 0 | if (_params.file_attributes.__isset.json_root) { |
371 | 0 | _json_root = _params.file_attributes.json_root; |
372 | 0 | } |
373 | 0 | if (_params.file_attributes.__isset.read_json_by_line) { |
374 | 0 | _read_json_by_line = _params.file_attributes.read_json_by_line; |
375 | 0 | } |
376 | 0 | if (_params.file_attributes.__isset.strip_outer_array) { |
377 | 0 | _strip_outer_array = _params.file_attributes.strip_outer_array; |
378 | 0 | } |
379 | 0 | if (_params.file_attributes.__isset.num_as_string) { |
380 | 0 | _num_as_string = _params.file_attributes.num_as_string; |
381 | 0 | } |
382 | 0 | if (_params.file_attributes.__isset.fuzzy_parse) { |
383 | 0 | _fuzzy_parse = _params.file_attributes.fuzzy_parse; |
384 | 0 | } |
385 | 0 | if (_range.table_format_params.table_format_type == "hive") { |
386 | 0 | _is_hive_table = true; |
387 | 0 | } |
388 | 0 | if (_params.file_attributes.__isset.openx_json_ignore_malformed) { |
389 | 0 | _openx_json_ignore_malformed = _params.file_attributes.openx_json_ignore_malformed; |
390 | 0 | } |
391 | 0 | return Status::OK(); |
392 | 0 | } |
393 | | |
394 | 0 | static Status ignore_malformed_json_append_null(Block& block) { |
395 | 0 | for (auto& column : block.get_columns()) { |
396 | 0 | if (!column->is_nullable()) [[unlikely]] { |
397 | 0 | return Status::DataQualityError("malformed json, but the column `{}` is not nullable.", |
398 | 0 | column->get_name()); |
399 | 0 | } |
400 | 0 | static_cast<ColumnNullable*>(column->assume_mutable().get())->insert_default(); |
401 | 0 | } |
402 | 0 | return Status::OK(); |
403 | 0 | } |
404 | | |
405 | 0 | Status NewJsonReader::_open_file_reader(bool need_schema) { |
406 | 0 | int64_t start_offset = _range.start_offset; |
407 | 0 | if (start_offset != 0) { |
408 | 0 | start_offset -= 1; |
409 | 0 | } |
410 | |
|
411 | 0 | _current_offset = start_offset; |
412 | |
|
413 | 0 | if (_params.file_type == TFileType::FILE_STREAM) { |
414 | | // Due to http_stream needs to pre read a portion of the data to parse column information, so it is set to true here |
415 | 0 | RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, |
416 | 0 | need_schema)); |
417 | 0 | } else { |
418 | 0 | _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; |
419 | 0 | io::FileReaderOptions reader_options = |
420 | 0 | FileFactory::get_reader_options(_state, _file_description); |
421 | 0 | io::FileReaderSPtr file_reader; |
422 | 0 | if (_io_ctx_holder) { |
423 | 0 | file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
424 | 0 | _profile, _system_properties, _file_description, reader_options, |
425 | 0 | io::DelegateReader::AccessMode::SEQUENTIAL, |
426 | 0 | std::static_pointer_cast<const io::IOContext>(_io_ctx_holder), |
427 | 0 | io::PrefetchRange(_range.start_offset, _range.size))); |
428 | 0 | } else { |
429 | 0 | file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
430 | 0 | _profile, _system_properties, _file_description, reader_options, |
431 | 0 | io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, |
432 | 0 | io::PrefetchRange(_range.start_offset, _range.size))); |
433 | 0 | } |
434 | 0 | _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader), |
435 | 0 | _io_ctx->file_reader_stats) |
436 | 0 | : file_reader; |
437 | 0 | } |
438 | 0 | return Status::OK(); |
439 | 0 | } |
440 | | |
441 | 0 | Status NewJsonReader::_open_line_reader() { |
442 | 0 | int64_t size = _range.size; |
443 | 0 | if (_range.start_offset != 0) { |
444 | | // When we fetch range doesn't start from 0, size will += 1. |
445 | 0 | size += 1; |
446 | 0 | _skip_first_line = true; |
447 | 0 | } else { |
448 | 0 | _skip_first_line = false; |
449 | 0 | } |
450 | 0 | _line_reader = NewPlainTextLineReader::create_unique( |
451 | 0 | _profile, _file_reader, _decompressor.get(), |
452 | 0 | std::make_shared<PlainTextLineReaderCtx>(_line_delimiter, _line_delimiter_length, |
453 | 0 | false), |
454 | 0 | size, _current_offset); |
455 | 0 | return Status::OK(); |
456 | 0 | } |
457 | | |
458 | 0 | Status NewJsonReader::_parse_jsonpath_and_json_root() { |
459 | | // parse jsonpaths |
460 | 0 | if (!_jsonpaths.empty()) { |
461 | 0 | rapidjson::Document jsonpaths_doc; |
462 | 0 | if (!jsonpaths_doc.Parse(_jsonpaths.c_str(), _jsonpaths.length()).HasParseError()) { |
463 | 0 | if (!jsonpaths_doc.IsArray()) { |
464 | 0 | return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); |
465 | 0 | } |
466 | 0 | for (int i = 0; i < jsonpaths_doc.Size(); i++) { |
467 | 0 | const rapidjson::Value& path = jsonpaths_doc[i]; |
468 | 0 | if (!path.IsString()) { |
469 | 0 | return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); |
470 | 0 | } |
471 | 0 | std::string json_path = path.GetString(); |
472 | | // $ -> $. in json_path |
473 | 0 | if (UNLIKELY(json_path.size() == 1 && json_path[0] == '$')) { |
474 | 0 | json_path.insert(1, "."); |
475 | 0 | } |
476 | 0 | std::vector<JsonPath> parsed_paths; |
477 | 0 | JsonFunctions::parse_json_paths(json_path, &parsed_paths); |
478 | 0 | _parsed_jsonpaths.push_back(std::move(parsed_paths)); |
479 | 0 | } |
480 | |
|
481 | 0 | } else { |
482 | 0 | return Status::InvalidJsonPath("Invalid json path: {}", _jsonpaths); |
483 | 0 | } |
484 | 0 | } |
485 | | |
486 | | // parse jsonroot |
487 | 0 | if (!_json_root.empty()) { |
488 | 0 | std::string json_root = _json_root; |
489 | | // $ -> $. in json_root |
490 | 0 | if (json_root.size() == 1 && json_root[0] == '$') { |
491 | 0 | json_root.insert(1, "."); |
492 | 0 | } |
493 | 0 | JsonFunctions::parse_json_paths(json_root, &_parsed_json_root); |
494 | 0 | } |
495 | 0 | return Status::OK(); |
496 | 0 | } |
497 | | |
498 | | Status NewJsonReader::_read_json_column(RuntimeState* state, Block& block, |
499 | | const std::vector<SlotDescriptor*>& slot_descs, |
500 | 0 | bool* is_empty_row, bool* eof) { |
501 | 0 | return (this->*_vhandle_json_callback)(state, block, slot_descs, is_empty_row, eof); |
502 | 0 | } |
503 | | |
504 | | Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, |
505 | 0 | size_t* read_size) { |
506 | 0 | switch (_params.file_type) { |
507 | 0 | case TFileType::FILE_LOCAL: |
508 | 0 | [[fallthrough]]; |
509 | 0 | case TFileType::FILE_HDFS: |
510 | 0 | case TFileType::FILE_HTTP: |
511 | 0 | [[fallthrough]]; |
512 | 0 | case TFileType::FILE_S3: { |
513 | 0 | size_t file_size = _file_reader->size(); |
514 | 0 | *file_buf = make_unique_buffer<uint8_t>(file_size); |
515 | 0 | Slice result(file_buf->get(), file_size); |
516 | 0 | RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, read_size, _io_ctx)); |
517 | 0 | _current_offset += *read_size; |
518 | 0 | break; |
519 | 0 | } |
520 | 0 | case TFileType::FILE_STREAM: { |
521 | 0 | RETURN_IF_ERROR(_read_one_message_from_pipe(file_buf, read_size)); |
522 | 0 | break; |
523 | 0 | } |
524 | 0 | default: { |
525 | 0 | return Status::NotSupported<false>("no supported file reader type: {}", _params.file_type); |
526 | 0 | } |
527 | 0 | } |
528 | 0 | return Status::OK(); |
529 | 0 | } |
530 | | |
531 | | Status NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, |
532 | 0 | size_t* read_size) { |
533 | 0 | auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()); |
534 | | |
535 | | // first read: read from the pipe once. |
536 | 0 | RETURN_IF_ERROR(stream_load_pipe->read_one_message(file_buf, read_size)); |
537 | | |
538 | | // When the file is not chunked, the entire file has already been read. |
539 | 0 | if (!stream_load_pipe->is_chunked_transfer()) { |
540 | 0 | return Status::OK(); |
541 | 0 | } |
542 | | |
543 | 0 | std::vector<uint8_t> buf; |
544 | 0 | uint64_t cur_size = 0; |
545 | | |
546 | | // second read: continuously read data from the pipe until all data is read. |
547 | 0 | DorisUniqueBufferPtr<uint8_t> read_buf; |
548 | 0 | size_t read_buf_size = 0; |
549 | 0 | while (true) { |
550 | 0 | RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size)); |
551 | 0 | if (read_buf_size == 0) { |
552 | 0 | break; |
553 | 0 | } else { |
554 | 0 | buf.insert(buf.end(), read_buf.get(), read_buf.get() + read_buf_size); |
555 | 0 | cur_size += read_buf_size; |
556 | 0 | read_buf_size = 0; |
557 | 0 | read_buf.reset(); |
558 | 0 | } |
559 | 0 | } |
560 | | |
561 | | // No data is available during the second read. |
562 | 0 | if (cur_size == 0) { |
563 | 0 | return Status::OK(); |
564 | 0 | } |
565 | | |
566 | 0 | DorisUniqueBufferPtr<uint8_t> total_buf = make_unique_buffer<uint8_t>(cur_size + *read_size); |
567 | | |
568 | | // copy the data during the first read |
569 | 0 | memcpy(total_buf.get(), file_buf->get(), *read_size); |
570 | | |
571 | | // copy the data during the second read |
572 | 0 | memcpy(total_buf.get() + *read_size, buf.data(), cur_size); |
573 | 0 | *file_buf = std::move(total_buf); |
574 | 0 | *read_size += cur_size; |
575 | 0 | return Status::OK(); |
576 | 0 | } |
577 | | |
578 | | // ---------SIMDJSON---------- |
579 | | // simdjson, replace none simdjson function if it is ready |
580 | 0 | Status NewJsonReader::_simdjson_init_reader() { |
581 | 0 | RETURN_IF_ERROR(_get_range_params()); |
582 | | |
583 | 0 | RETURN_IF_ERROR(_open_file_reader(false)); |
584 | 0 | if (LIKELY(_read_json_by_line)) { |
585 | 0 | RETURN_IF_ERROR(_open_line_reader()); |
586 | 0 | } |
587 | | |
588 | | // generate _parsed_jsonpaths and _parsed_json_root |
589 | 0 | RETURN_IF_ERROR(_parse_jsonpath_and_json_root()); |
590 | | |
591 | | //improve performance |
592 | 0 | if (_parsed_jsonpaths.empty()) { // input is a simple json-string |
593 | 0 | _vhandle_json_callback = &NewJsonReader::_simdjson_handle_simple_json; |
594 | 0 | } else { // input is a complex json-string and a json-path |
595 | 0 | if (_strip_outer_array) { |
596 | 0 | _vhandle_json_callback = &NewJsonReader::_simdjson_handle_flat_array_complex_json; |
597 | 0 | } else { |
598 | 0 | _vhandle_json_callback = &NewJsonReader::_simdjson_handle_nested_complex_json; |
599 | 0 | } |
600 | 0 | } |
601 | 0 | _ondemand_json_parser = std::make_unique<simdjson::ondemand::parser>(); |
602 | 0 | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
603 | 0 | _slot_desc_index[StringRef {_file_slot_descs[i]->col_name()}] = i; |
604 | 0 | if (_file_slot_descs[i]->is_skip_bitmap_col()) { |
605 | 0 | skip_bitmap_col_idx = i; |
606 | 0 | } |
607 | 0 | } |
608 | 0 | _simdjson_ondemand_padding_buffer.resize(_padded_size); |
609 | 0 | _simdjson_ondemand_unscape_padding_buffer.resize(_padded_size); |
610 | 0 | return Status::OK(); |
611 | 0 | } |
612 | | |
613 | | Status NewJsonReader::_handle_simdjson_error(simdjson::simdjson_error& error, Block& block, |
614 | 0 | size_t num_rows, bool* eof) { |
615 | 0 | fmt::memory_buffer error_msg; |
616 | 0 | fmt::format_to(error_msg, "Parse json data failed. code: {}, error info: {}", error.error(), |
617 | 0 | error.what()); |
618 | 0 | _counter->num_rows_filtered++; |
619 | | // Before continuing to process other rows, we need to first clean the fail parsed row. |
620 | 0 | for (int i = 0; i < block.columns(); ++i) { |
621 | 0 | auto column = block.get_by_position(i).column->assume_mutable(); |
622 | 0 | if (column->size() > num_rows) { |
623 | 0 | column->pop_back(column->size() - num_rows); |
624 | 0 | } |
625 | 0 | } |
626 | |
|
627 | 0 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
628 | 0 | [&]() -> std::string { |
629 | 0 | return std::string(_simdjson_ondemand_padding_buffer.data(), _original_doc_size); |
630 | 0 | }, |
631 | 0 | [&]() -> std::string { return fmt::to_string(error_msg); })); |
632 | 0 | return Status::OK(); |
633 | 0 | } |
634 | | |
635 | | Status NewJsonReader::_simdjson_handle_simple_json(RuntimeState* /*state*/, Block& block, |
636 | | const std::vector<SlotDescriptor*>& slot_descs, |
637 | 0 | bool* is_empty_row, bool* eof) { |
638 | | // simple json |
639 | 0 | size_t size = 0; |
640 | 0 | simdjson::error_code error; |
641 | 0 | size_t num_rows = block.rows(); |
642 | 0 | try { |
643 | | // step1: get and parse buf to get json doc |
644 | 0 | RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); |
645 | 0 | if (size == 0 || *eof) { |
646 | 0 | *is_empty_row = true; |
647 | 0 | return Status::OK(); |
648 | 0 | } |
649 | | |
650 | | // step2: get json value by json doc |
651 | 0 | Status st = _get_json_value(&size, eof, &error, is_empty_row); |
652 | 0 | if (st.is<DATA_QUALITY_ERROR>()) { |
653 | 0 | if (_is_load) { |
654 | 0 | return Status::OK(); |
655 | 0 | } else if (_openx_json_ignore_malformed) { |
656 | 0 | RETURN_IF_ERROR(ignore_malformed_json_append_null(block)); |
657 | 0 | return Status::OK(); |
658 | 0 | } |
659 | 0 | } |
660 | | |
661 | 0 | RETURN_IF_ERROR(st); |
662 | 0 | if (*is_empty_row || *eof) { |
663 | 0 | return Status::OK(); |
664 | 0 | } |
665 | | |
666 | | // step 3: write columns by json value |
667 | 0 | RETURN_IF_ERROR( |
668 | 0 | _simdjson_handle_simple_json_write_columns(block, slot_descs, is_empty_row, eof)); |
669 | 0 | } catch (simdjson::simdjson_error& e) { |
670 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
671 | 0 | if (*_scanner_eof) { |
672 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
673 | | // unqualified data and decided to stop the scan. |
674 | 0 | *is_empty_row = true; |
675 | 0 | return Status::OK(); |
676 | 0 | } |
677 | 0 | } |
678 | | |
679 | 0 | return Status::OK(); |
680 | 0 | } |
681 | | |
682 | | Status NewJsonReader::_simdjson_handle_simple_json_write_columns( |
683 | | Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
684 | 0 | bool* eof) { |
685 | 0 | simdjson::ondemand::object objectValue; |
686 | 0 | size_t num_rows = block.rows(); |
687 | 0 | bool valid = false; |
688 | 0 | try { |
689 | 0 | if (_json_value.type() == simdjson::ondemand::json_type::array) { |
690 | 0 | _array = _json_value.get_array(); |
691 | 0 | if (_array.count_elements() == 0) { |
692 | | // may be passing an empty json, such as "[]" |
693 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, "Empty json line", "", nullptr)); |
694 | 0 | if (*_scanner_eof) { |
695 | 0 | *is_empty_row = true; |
696 | 0 | return Status::OK(); |
697 | 0 | } |
698 | 0 | return Status::OK(); |
699 | 0 | } |
700 | | |
701 | 0 | _array_iter = _array.begin(); |
702 | 0 | while (true) { |
703 | 0 | objectValue = *_array_iter; |
704 | 0 | RETURN_IF_ERROR( |
705 | 0 | _simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); |
706 | 0 | if (!valid) { |
707 | 0 | if (*_scanner_eof) { |
708 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
709 | | // unqualified data and decided to stop the scan. |
710 | 0 | *is_empty_row = true; |
711 | 0 | return Status::OK(); |
712 | 0 | } |
713 | 0 | } |
714 | 0 | ++_array_iter; |
715 | 0 | if (_array_iter == _array.end()) { |
716 | | // Hint to read next json doc |
717 | 0 | break; |
718 | 0 | } |
719 | 0 | } |
720 | 0 | } else { |
721 | 0 | objectValue = _json_value; |
722 | 0 | RETURN_IF_ERROR(_simdjson_set_column_value(&objectValue, block, slot_descs, &valid)); |
723 | 0 | if (!valid) { |
724 | 0 | if (*_scanner_eof) { |
725 | 0 | *is_empty_row = true; |
726 | 0 | return Status::OK(); |
727 | 0 | } |
728 | 0 | } |
729 | 0 | *is_empty_row = false; |
730 | 0 | } |
731 | 0 | } catch (simdjson::simdjson_error& e) { |
732 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
733 | 0 | if (!valid) { |
734 | 0 | if (*_scanner_eof) { |
735 | 0 | *is_empty_row = true; |
736 | 0 | return Status::OK(); |
737 | 0 | } |
738 | 0 | } |
739 | 0 | } |
740 | 0 | return Status::OK(); |
741 | 0 | } |
742 | | |
743 | | Status NewJsonReader::_simdjson_handle_flat_array_complex_json( |
744 | | RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, |
745 | 0 | bool* is_empty_row, bool* eof) { |
746 | | // array complex json |
747 | 0 | size_t size = 0; |
748 | 0 | simdjson::error_code error; |
749 | 0 | size_t num_rows = block.rows(); |
750 | 0 | try { |
751 | | // step1: get and parse buf to get json doc |
752 | 0 | RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); |
753 | 0 | if (size == 0 || *eof) { |
754 | 0 | *is_empty_row = true; |
755 | 0 | return Status::OK(); |
756 | 0 | } |
757 | | |
758 | | // step2: get json value by json doc |
759 | 0 | Status st = _get_json_value(&size, eof, &error, is_empty_row); |
760 | 0 | if (st.is<DATA_QUALITY_ERROR>()) { |
761 | 0 | return Status::OK(); |
762 | 0 | } |
763 | 0 | RETURN_IF_ERROR(st); |
764 | 0 | if (*is_empty_row) { |
765 | 0 | return Status::OK(); |
766 | 0 | } |
767 | | |
768 | | // step 3: write columns by json value |
769 | 0 | RETURN_IF_ERROR(_simdjson_handle_flat_array_complex_json_write_columns(block, slot_descs, |
770 | 0 | is_empty_row, eof)); |
771 | 0 | } catch (simdjson::simdjson_error& e) { |
772 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
773 | 0 | if (*_scanner_eof) { |
774 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
775 | | // unqualified data and decided to stop the scan. |
776 | 0 | *is_empty_row = true; |
777 | 0 | return Status::OK(); |
778 | 0 | } |
779 | 0 | } |
780 | | |
781 | 0 | return Status::OK(); |
782 | 0 | } |
783 | | |
784 | | Status NewJsonReader::_simdjson_handle_flat_array_complex_json_write_columns( |
785 | | Block& block, const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, |
786 | 0 | bool* eof) { |
787 | | // Advance one row in array list, if it is the endpoint, stop advance and break the loop |
788 | 0 | #define ADVANCE_ROW() \ |
789 | 0 | ++_array_iter; \ |
790 | 0 | if (_array_iter == _array.end()) { \ |
791 | 0 | break; \ |
792 | 0 | } |
793 | |
|
794 | 0 | simdjson::ondemand::object cur; |
795 | 0 | size_t num_rows = block.rows(); |
796 | 0 | try { |
797 | 0 | bool valid = true; |
798 | 0 | _array = _json_value.get_array(); |
799 | 0 | _array_iter = _array.begin(); |
800 | |
|
801 | 0 | while (true) { |
802 | 0 | cur = (*_array_iter).get_object(); |
803 | | // extract root |
804 | 0 | if (!_parsed_from_json_root && !_parsed_json_root.empty()) { |
805 | 0 | simdjson::ondemand::value val; |
806 | 0 | Status st = JsonFunctions::extract_from_object(cur, _parsed_json_root, &val); |
807 | 0 | if (UNLIKELY(!st.ok())) { |
808 | 0 | if (st.is<NOT_FOUND>()) { |
809 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr)); |
810 | 0 | ADVANCE_ROW(); |
811 | 0 | continue; |
812 | 0 | } |
813 | 0 | return st; |
814 | 0 | } |
815 | 0 | if (val.type() != simdjson::ondemand::json_type::object) { |
816 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr)); |
817 | 0 | ADVANCE_ROW(); |
818 | 0 | continue; |
819 | 0 | } |
820 | 0 | cur = val.get_object(); |
821 | 0 | } |
822 | 0 | RETURN_IF_ERROR(_simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid)); |
823 | 0 | ADVANCE_ROW(); |
824 | 0 | if (!valid) { |
825 | 0 | continue; // process next line |
826 | 0 | } |
827 | 0 | *is_empty_row = false; |
828 | 0 | } |
829 | 0 | } catch (simdjson::simdjson_error& e) { |
830 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
831 | 0 | if (*_scanner_eof) { |
832 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
833 | | // unqualified data and decided to stop the scan. |
834 | 0 | *is_empty_row = true; |
835 | 0 | return Status::OK(); |
836 | 0 | } |
837 | 0 | } |
838 | | |
839 | 0 | return Status::OK(); |
840 | 0 | } |
841 | | |
842 | | Status NewJsonReader::_simdjson_handle_nested_complex_json( |
843 | | RuntimeState* /*state*/, Block& block, const std::vector<SlotDescriptor*>& slot_descs, |
844 | 0 | bool* is_empty_row, bool* eof) { |
845 | | // nested complex json |
846 | 0 | while (true) { |
847 | 0 | size_t num_rows = block.rows(); |
848 | 0 | simdjson::ondemand::object cur; |
849 | 0 | size_t size = 0; |
850 | 0 | simdjson::error_code error; |
851 | 0 | try { |
852 | 0 | RETURN_IF_ERROR(_simdjson_parse_json(&size, is_empty_row, eof, &error)); |
853 | 0 | if (size == 0 || *eof) { |
854 | 0 | *is_empty_row = true; |
855 | 0 | return Status::OK(); |
856 | 0 | } |
857 | 0 | Status st = _get_json_value(&size, eof, &error, is_empty_row); |
858 | 0 | if (st.is<DATA_QUALITY_ERROR>()) { |
859 | 0 | continue; // continue to read next |
860 | 0 | } |
861 | 0 | RETURN_IF_ERROR(st); |
862 | 0 | if (*is_empty_row) { |
863 | 0 | return Status::OK(); |
864 | 0 | } |
865 | 0 | *is_empty_row = false; |
866 | 0 | bool valid = true; |
867 | 0 | if (_json_value.type() != simdjson::ondemand::json_type::object) { |
868 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, "Not object item", "", nullptr)); |
869 | 0 | continue; |
870 | 0 | } |
871 | 0 | cur = _json_value.get_object(); |
872 | 0 | st = _simdjson_write_columns_by_jsonpath(&cur, slot_descs, block, &valid); |
873 | 0 | if (!st.ok()) { |
874 | 0 | RETURN_IF_ERROR(_append_error_msg(nullptr, st.to_string(), "", nullptr)); |
875 | | // Before continuing to process other rows, we need to first clean the fail parsed row. |
876 | 0 | for (int i = 0; i < block.columns(); ++i) { |
877 | 0 | auto column = block.get_by_position(i).column->assume_mutable(); |
878 | 0 | if (column->size() > num_rows) { |
879 | 0 | column->pop_back(column->size() - num_rows); |
880 | 0 | } |
881 | 0 | } |
882 | 0 | continue; |
883 | 0 | } |
884 | 0 | if (!valid) { |
885 | | // there is only one line in this case, so if it return false, just set is_empty_row true |
886 | | // so that the caller will continue reading next line. |
887 | 0 | *is_empty_row = true; |
888 | 0 | } |
889 | 0 | break; // read a valid row |
890 | 0 | } catch (simdjson::simdjson_error& e) { |
891 | 0 | RETURN_IF_ERROR(_handle_simdjson_error(e, block, num_rows, eof)); |
892 | 0 | if (*_scanner_eof) { |
893 | | // When _scanner_eof is true and valid is false, it means that we have encountered |
894 | | // unqualified data and decided to stop the scan. |
895 | 0 | *is_empty_row = true; |
896 | 0 | return Status::OK(); |
897 | 0 | } |
898 | 0 | continue; |
899 | 0 | } |
900 | 0 | } |
901 | 0 | return Status::OK(); |
902 | 0 | } |
903 | | |
904 | 0 | size_t NewJsonReader::_column_index(const StringRef& name, size_t key_index) { |
905 | | /// Optimization by caching the order of fields (which is almost always the same) |
906 | | /// and a quick check to match the next expected field, instead of searching the hash table. |
907 | 0 | if (_prev_positions.size() > key_index && name == _prev_positions[key_index]->first) { |
908 | 0 | return _prev_positions[key_index]->second; |
909 | 0 | } |
910 | 0 | auto it = _slot_desc_index.find(name); |
911 | 0 | if (it != _slot_desc_index.end()) { |
912 | 0 | if (key_index < _prev_positions.size()) { |
913 | 0 | _prev_positions[key_index] = it; |
914 | 0 | } |
915 | 0 | return it->second; |
916 | 0 | } |
917 | 0 | return size_t(-1); |
918 | 0 | } |
919 | | |
920 | | Status NewJsonReader::_simdjson_set_column_value(simdjson::ondemand::object* value, Block& block, |
921 | | const std::vector<SlotDescriptor*>& slot_descs, |
922 | 0 | bool* valid) { |
923 | | // set |
924 | 0 | _seen_columns.assign(block.columns(), false); |
925 | 0 | size_t cur_row_count = block.rows(); |
926 | 0 | bool has_valid_value = false; |
927 | | // iterate through object, simdjson::ondemond will parsing on the fly |
928 | 0 | size_t key_index = 0; |
929 | |
|
930 | 0 | for (auto field : *value) { |
931 | 0 | std::string_view key = field.unescaped_key(); |
932 | 0 | StringRef name_ref(key.data(), key.size()); |
933 | 0 | std::string key_string; |
934 | 0 | if (_is_hive_table) { |
935 | 0 | key_string = name_ref.to_string(); |
936 | 0 | std::transform(key_string.begin(), key_string.end(), key_string.begin(), ::tolower); |
937 | 0 | name_ref = StringRef(key_string); |
938 | 0 | } |
939 | 0 | const size_t column_index = _column_index(name_ref, key_index++); |
940 | 0 | if (UNLIKELY(ssize_t(column_index) < 0)) { |
941 | | // This key is not exist in slot desc, just ignore |
942 | 0 | continue; |
943 | 0 | } |
944 | 0 | if (column_index == skip_bitmap_col_idx) { |
945 | 0 | continue; |
946 | 0 | } |
947 | 0 | if (_seen_columns[column_index]) { |
948 | 0 | if (_is_hive_table) { |
949 | | //Since value can only be traversed once, |
950 | | // we can only insert the original value first, then delete it, and then reinsert the new value |
951 | 0 | block.get_by_position(column_index).column->assume_mutable()->pop_back(1); |
952 | 0 | } else { |
953 | 0 | continue; |
954 | 0 | } |
955 | 0 | } |
956 | 0 | simdjson::ondemand::value val = field.value(); |
957 | 0 | auto* column_ptr = block.get_by_position(column_index).column->assume_mutable().get(); |
958 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<false>( |
959 | 0 | val, slot_descs[column_index]->type(), column_ptr, |
960 | 0 | slot_descs[column_index]->col_name(), _serdes[column_index], valid)); |
961 | 0 | if (!(*valid)) { |
962 | 0 | return Status::OK(); |
963 | 0 | } |
964 | 0 | _seen_columns[column_index] = true; |
965 | 0 | has_valid_value = true; |
966 | 0 | } |
967 | | |
968 | 0 | if (!has_valid_value && _is_load) { |
969 | 0 | std::string col_names; |
970 | 0 | for (auto* slot_desc : slot_descs) { |
971 | 0 | col_names.append(slot_desc->col_name() + ", "); |
972 | 0 | } |
973 | 0 | RETURN_IF_ERROR(_append_error_msg(value, |
974 | 0 | "There is no column matching jsonpaths in the json file, " |
975 | 0 | "columns:[{}], please check columns " |
976 | 0 | "and jsonpaths:" + |
977 | 0 | _jsonpaths, |
978 | 0 | col_names, valid)); |
979 | 0 | return Status::OK(); |
980 | 0 | } |
981 | | |
982 | 0 | if (_should_process_skip_bitmap_col()) { |
983 | 0 | _append_empty_skip_bitmap_value(block, cur_row_count); |
984 | 0 | } |
985 | | |
986 | | // fill missing slot |
987 | 0 | int nullcount = 0; |
988 | 0 | for (size_t i = 0; i < slot_descs.size(); ++i) { |
989 | 0 | if (_seen_columns[i]) { |
990 | 0 | continue; |
991 | 0 | } |
992 | 0 | if (i == skip_bitmap_col_idx) { |
993 | 0 | continue; |
994 | 0 | } |
995 | | |
996 | 0 | auto* slot_desc = slot_descs[i]; |
997 | 0 | auto* column_ptr = block.get_by_position(i).column->assume_mutable().get(); |
998 | | |
999 | | // Quick path to insert default value, instead of using default values in the value map. |
1000 | 0 | if (!_should_process_skip_bitmap_col() && |
1001 | 0 | (_col_default_value_map.empty() || |
1002 | 0 | _col_default_value_map.find(slot_desc->col_name()) == _col_default_value_map.end())) { |
1003 | 0 | column_ptr->insert_default(); |
1004 | 0 | continue; |
1005 | 0 | } |
1006 | 0 | if (column_ptr->size() < cur_row_count + 1) { |
1007 | 0 | DCHECK(column_ptr->size() == cur_row_count); |
1008 | 0 | if (_should_process_skip_bitmap_col()) { |
1009 | | // not found, skip this column in flexible partial update |
1010 | 0 | if (slot_desc->is_key() && !slot_desc->is_auto_increment()) { |
1011 | 0 | RETURN_IF_ERROR( |
1012 | 0 | _append_error_msg(value, |
1013 | 0 | "The key columns can not be ommited in flexible " |
1014 | 0 | "partial update, missing key column: {}", |
1015 | 0 | slot_desc->col_name(), valid)); |
1016 | | // remove this line in block |
1017 | 0 | for (size_t index = 0; index < block.columns(); ++index) { |
1018 | 0 | auto column = block.get_by_position(index).column->assume_mutable(); |
1019 | 0 | if (column->size() != cur_row_count) { |
1020 | 0 | DCHECK(column->size() == cur_row_count + 1); |
1021 | 0 | column->pop_back(1); |
1022 | 0 | DCHECK(column->size() == cur_row_count); |
1023 | 0 | } |
1024 | 0 | } |
1025 | 0 | return Status::OK(); |
1026 | 0 | } |
1027 | 0 | _set_skip_bitmap_mark(slot_desc, column_ptr, block, cur_row_count, valid); |
1028 | 0 | column_ptr->insert_default(); |
1029 | 0 | } else { |
1030 | 0 | RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid)); |
1031 | 0 | if (!(*valid)) { |
1032 | 0 | return Status::OK(); |
1033 | 0 | } |
1034 | 0 | } |
1035 | 0 | ++nullcount; |
1036 | 0 | } |
1037 | 0 | DCHECK(column_ptr->size() == cur_row_count + 1); |
1038 | 0 | } |
1039 | | |
1040 | | // There is at least one valid value here |
1041 | 0 | DCHECK(nullcount < block.columns()); |
1042 | 0 | *valid = true; |
1043 | 0 | return Status::OK(); |
1044 | 0 | } |
1045 | | |
1046 | | template <bool use_string_cache> |
1047 | | Status NewJsonReader::_simdjson_write_data_to_column(simdjson::ondemand::value& value, |
1048 | | const DataTypePtr& type_desc, |
1049 | | IColumn* column_ptr, |
1050 | | const std::string& column_name, |
1051 | 0 | DataTypeSerDeSPtr serde, bool* valid) { |
1052 | 0 | ColumnNullable* nullable_column = nullptr; |
1053 | 0 | IColumn* data_column_ptr = column_ptr; |
1054 | 0 | DataTypeSerDeSPtr data_serde = serde; |
1055 | |
|
1056 | 0 | if (column_ptr->is_nullable()) { |
1057 | 0 | nullable_column = reinterpret_cast<ColumnNullable*>(column_ptr); |
1058 | |
|
1059 | 0 | data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); |
1060 | 0 | data_serde = serde->get_nested_serdes()[0]; |
1061 | | |
1062 | | // kNullType will put 1 into the Null map, so there is no need to push 0 for kNullType. |
1063 | 0 | if (value.type() == simdjson::ondemand::json_type::null) { |
1064 | 0 | nullable_column->insert_default(); |
1065 | 0 | *valid = true; |
1066 | 0 | return Status::OK(); |
1067 | 0 | } |
1068 | 0 | } else if (value.type() == simdjson::ondemand::json_type::null) [[unlikely]] { |
1069 | 0 | if (_is_load) { |
1070 | 0 | RETURN_IF_ERROR(_append_error_msg( |
1071 | 0 | nullptr, "Json value is null, but the column `{}` is not nullable.", |
1072 | 0 | column_name, valid)); |
1073 | 0 | return Status::OK(); |
1074 | 0 | } else { |
1075 | 0 | return Status::DataQualityError( |
1076 | 0 | "Json value is null, but the column `{}` is not nullable.", column_name); |
1077 | 0 | } |
1078 | 0 | } |
1079 | | |
1080 | 0 | auto primitive_type = type_desc->get_primitive_type(); |
1081 | 0 | if (_is_load || !is_complex_type(primitive_type)) { |
1082 | 0 | if (value.type() == simdjson::ondemand::json_type::string) { |
1083 | 0 | std::string_view value_string; |
1084 | 0 | if constexpr (use_string_cache) { |
1085 | 0 | const auto cache_key = value.raw_json().value(); |
1086 | 0 | if (_cached_string_values.contains(cache_key)) { |
1087 | 0 | value_string = _cached_string_values[cache_key]; |
1088 | 0 | } else { |
1089 | 0 | value_string = value.get_string(); |
1090 | 0 | _cached_string_values.emplace(cache_key, value_string); |
1091 | 0 | } |
1092 | 0 | } else { |
1093 | 0 | DCHECK(_cached_string_values.empty()); |
1094 | 0 | value_string = value.get_string(); |
1095 | 0 | } |
1096 | |
|
1097 | 0 | Slice slice {value_string.data(), value_string.size()}; |
1098 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1099 | 0 | _serde_options)); |
1100 | |
|
1101 | 0 | } else if (value.type() == simdjson::ondemand::json_type::boolean) { |
1102 | 0 | const char* str_value = nullptr; |
1103 | | // insert "1"/"0" , not "true"/"false". |
1104 | 0 | if (value.get_bool()) { |
1105 | 0 | str_value = (char*)"1"; |
1106 | 0 | } else { |
1107 | 0 | str_value = (char*)"0"; |
1108 | 0 | } |
1109 | 0 | Slice slice {str_value, 1}; |
1110 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1111 | 0 | _serde_options)); |
1112 | 0 | } else { |
1113 | | // Maybe we can `switch (value->GetType()) case: kNumberType`. |
1114 | | // Note that `if (value->IsInt())`, but column is FloatColumn. |
1115 | 0 | std::string_view json_str = simdjson::to_json_string(value); |
1116 | 0 | Slice slice {json_str.data(), json_str.size()}; |
1117 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1118 | 0 | _serde_options)); |
1119 | 0 | } |
1120 | 0 | } else if (primitive_type == TYPE_STRUCT) { |
1121 | 0 | if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] { |
1122 | 0 | return Status::DataQualityError( |
1123 | 0 | "Json value isn't object, but the column `{}` is struct.", column_name); |
1124 | 0 | } |
1125 | | |
1126 | 0 | const auto* type_struct = |
1127 | 0 | assert_cast<const DataTypeStruct*>(remove_nullable(type_desc).get()); |
1128 | 0 | auto sub_col_size = type_struct->get_elements().size(); |
1129 | 0 | simdjson::ondemand::object struct_value = value.get_object(); |
1130 | 0 | auto sub_serdes = data_serde->get_nested_serdes(); |
1131 | 0 | auto* struct_column_ptr = assert_cast<ColumnStruct*>(data_column_ptr); |
1132 | |
|
1133 | 0 | std::map<std::string, size_t> sub_col_name_to_idx; |
1134 | 0 | for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) { |
1135 | 0 | sub_col_name_to_idx.emplace(type_struct->get_element_name(sub_col_idx), sub_col_idx); |
1136 | 0 | } |
1137 | 0 | std::vector<bool> has_value(sub_col_size, false); |
1138 | 0 | for (simdjson::ondemand::field sub : struct_value) { |
1139 | 0 | std::string_view sub_key_view = sub.unescaped_key(); |
1140 | 0 | std::string sub_key(sub_key_view.data(), sub_key_view.length()); |
1141 | 0 | std::transform(sub_key.begin(), sub_key.end(), sub_key.begin(), ::tolower); |
1142 | |
|
1143 | 0 | if (sub_col_name_to_idx.find(sub_key) == sub_col_name_to_idx.end()) [[unlikely]] { |
1144 | 0 | continue; |
1145 | 0 | } |
1146 | 0 | size_t sub_column_idx = sub_col_name_to_idx[sub_key]; |
1147 | 0 | auto sub_column_ptr = struct_column_ptr->get_column(sub_column_idx).get_ptr(); |
1148 | |
|
1149 | 0 | if (has_value[sub_column_idx]) [[unlikely]] { |
1150 | | // Since struct_value can only be traversed once, we can only insert |
1151 | | // the original value first, then delete it, and then reinsert the new value. |
1152 | 0 | sub_column_ptr->pop_back(1); |
1153 | 0 | } |
1154 | 0 | has_value[sub_column_idx] = true; |
1155 | |
|
1156 | 0 | const auto& sub_col_type = type_struct->get_element(sub_column_idx); |
1157 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>( |
1158 | 0 | sub.value(), sub_col_type, sub_column_ptr.get(), column_name + "." + sub_key, |
1159 | 0 | sub_serdes[sub_column_idx], valid)); |
1160 | 0 | } |
1161 | | |
1162 | | //fill missing subcolumn |
1163 | 0 | for (size_t sub_col_idx = 0; sub_col_idx < sub_col_size; sub_col_idx++) { |
1164 | 0 | if (has_value[sub_col_idx]) { |
1165 | 0 | continue; |
1166 | 0 | } |
1167 | | |
1168 | 0 | auto sub_column_ptr = struct_column_ptr->get_column(sub_col_idx).get_ptr(); |
1169 | 0 | if (sub_column_ptr->is_nullable()) { |
1170 | 0 | sub_column_ptr->insert_default(); |
1171 | 0 | continue; |
1172 | 0 | } else [[unlikely]] { |
1173 | 0 | return Status::DataQualityError( |
1174 | 0 | "Json file structColumn miss field {} and this column isn't nullable.", |
1175 | 0 | column_name + "." + type_struct->get_element_name(sub_col_idx)); |
1176 | 0 | } |
1177 | 0 | } |
1178 | 0 | } else if (primitive_type == TYPE_MAP) { |
1179 | 0 | if (value.type() != simdjson::ondemand::json_type::object) [[unlikely]] { |
1180 | 0 | return Status::DataQualityError("Json value isn't object, but the column `{}` is map.", |
1181 | 0 | column_name); |
1182 | 0 | } |
1183 | 0 | simdjson::ondemand::object object_value = value.get_object(); |
1184 | |
|
1185 | 0 | auto sub_serdes = data_serde->get_nested_serdes(); |
1186 | 0 | auto* map_column_ptr = assert_cast<ColumnMap*>(data_column_ptr); |
1187 | |
|
1188 | 0 | size_t field_count = 0; |
1189 | 0 | for (simdjson::ondemand::field member_value : object_value) { |
1190 | 0 | auto f = [](std::string_view key_view, const DataTypePtr& type_desc, |
1191 | 0 | IColumn* column_ptr, DataTypeSerDeSPtr serde, |
1192 | 0 | DataTypeSerDe::FormatOptions serde_options, bool* valid) { |
1193 | 0 | auto* data_column_ptr = column_ptr; |
1194 | 0 | auto data_serde = serde; |
1195 | 0 | if (column_ptr->is_nullable()) { |
1196 | 0 | auto* nullable_column = static_cast<ColumnNullable*>(column_ptr); |
1197 | |
|
1198 | 0 | nullable_column->get_null_map_data().push_back(0); |
1199 | 0 | data_column_ptr = nullable_column->get_nested_column().get_ptr().get(); |
1200 | 0 | data_serde = serde->get_nested_serdes()[0]; |
1201 | 0 | } |
1202 | 0 | Slice slice(key_view.data(), key_view.length()); |
1203 | |
|
1204 | 0 | RETURN_IF_ERROR(data_serde->deserialize_one_cell_from_json(*data_column_ptr, slice, |
1205 | 0 | serde_options)); |
1206 | 0 | return Status::OK(); |
1207 | 0 | }; Unexecuted instantiation: _ZZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb0EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbENKUlSt17basic_string_viewIcSJ_ESD_SF_SP_NSO_13FormatOptionsESQ_E_clESS_SD_SF_SP_ST_SQ_ Unexecuted instantiation: _ZZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb1EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPbENKUlSt17basic_string_viewIcSJ_ESD_SF_SP_NSO_13FormatOptionsESQ_E_clESS_SD_SF_SP_ST_SQ_ |
1208 | |
|
1209 | 0 | RETURN_IF_ERROR(f(member_value.unescaped_key(), |
1210 | 0 | assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get()) |
1211 | 0 | ->get_key_type(), |
1212 | 0 | map_column_ptr->get_keys_ptr()->assume_mutable()->get_ptr().get(), |
1213 | 0 | sub_serdes[0], _serde_options, valid)); |
1214 | | |
1215 | 0 | simdjson::ondemand::value field_value = member_value.value(); |
1216 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>( |
1217 | 0 | field_value, |
1218 | 0 | assert_cast<const DataTypeMap*>(remove_nullable(type_desc).get()) |
1219 | 0 | ->get_value_type(), |
1220 | 0 | map_column_ptr->get_values_ptr()->assume_mutable()->get_ptr().get(), |
1221 | 0 | column_name + ".value", sub_serdes[1], valid)); |
1222 | 0 | field_count++; |
1223 | 0 | } |
1224 | | |
1225 | 0 | auto& offsets = map_column_ptr->get_offsets(); |
1226 | 0 | offsets.emplace_back(offsets.back() + field_count); |
1227 | |
|
1228 | 0 | } else if (primitive_type == TYPE_ARRAY) { |
1229 | 0 | if (value.type() != simdjson::ondemand::json_type::array) [[unlikely]] { |
1230 | 0 | return Status::DataQualityError("Json value isn't array, but the column `{}` is array.", |
1231 | 0 | column_name); |
1232 | 0 | } |
1233 | | |
1234 | 0 | simdjson::ondemand::array array_value = value.get_array(); |
1235 | |
|
1236 | 0 | auto sub_serdes = data_serde->get_nested_serdes(); |
1237 | 0 | auto* array_column_ptr = assert_cast<ColumnArray*>(data_column_ptr); |
1238 | |
|
1239 | 0 | int field_count = 0; |
1240 | 0 | for (simdjson::ondemand::value sub_value : array_value) { |
1241 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<use_string_cache>( |
1242 | 0 | sub_value, |
1243 | 0 | assert_cast<const DataTypeArray*>(remove_nullable(type_desc).get()) |
1244 | 0 | ->get_nested_type(), |
1245 | 0 | array_column_ptr->get_data().get_ptr().get(), column_name + ".element", |
1246 | 0 | sub_serdes[0], valid)); |
1247 | 0 | field_count++; |
1248 | 0 | } |
1249 | 0 | auto& offsets = array_column_ptr->get_offsets(); |
1250 | 0 | offsets.emplace_back(offsets.back() + field_count); |
1251 | |
|
1252 | 0 | } else { |
1253 | 0 | return Status::InternalError("Not support load to complex column."); |
1254 | 0 | } |
1255 | | //We need to finally set the nullmap of column_nullable to keep the size consistent with data_column |
1256 | 0 | if (nullable_column && value.type() != simdjson::ondemand::json_type::null) { |
1257 | 0 | nullable_column->get_null_map_data().push_back(0); |
1258 | 0 | } |
1259 | 0 | *valid = true; |
1260 | 0 | return Status::OK(); |
1261 | 0 | } Unexecuted instantiation: _ZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb0EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPb Unexecuted instantiation: _ZN5doris13NewJsonReader30_simdjson_write_data_to_columnILb1EEENS_6StatusERN8simdjson8fallback8ondemand5valueERKSt10shared_ptrIKNS_9IDataTypeEEPNS_7IColumnERKNSt7__cxx1112basic_stringIcSt11char_traitsIcESaIcEEES8_INS_13DataTypeSerDeEEPb |
1262 | | |
1263 | | Status NewJsonReader::_append_error_msg(simdjson::ondemand::object* obj, std::string error_msg, |
1264 | 0 | std::string col_name, bool* valid) { |
1265 | 0 | std::string err_msg; |
1266 | 0 | if (!col_name.empty()) { |
1267 | 0 | fmt::memory_buffer error_buf; |
1268 | 0 | fmt::format_to(error_buf, error_msg, col_name, _jsonpaths); |
1269 | 0 | err_msg = fmt::to_string(error_buf); |
1270 | 0 | } else { |
1271 | 0 | err_msg = error_msg; |
1272 | 0 | } |
1273 | |
|
1274 | 0 | _counter->num_rows_filtered++; |
1275 | 0 | if (valid != nullptr) { |
1276 | | // current row is invalid |
1277 | 0 | *valid = false; |
1278 | 0 | } |
1279 | |
|
1280 | 0 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
1281 | 0 | [&]() -> std::string { |
1282 | 0 | if (!obj) { |
1283 | 0 | return ""; |
1284 | 0 | } |
1285 | 0 | std::string_view str_view; |
1286 | 0 | (void)!obj->raw_json().get(str_view); |
1287 | 0 | return std::string(str_view.data(), str_view.size()); |
1288 | 0 | }, |
1289 | 0 | [&]() -> std::string { return err_msg; })); |
1290 | 0 | return Status::OK(); |
1291 | 0 | } |
1292 | | |
1293 | | Status NewJsonReader::_simdjson_parse_json(size_t* size, bool* is_empty_row, bool* eof, |
1294 | 0 | simdjson::error_code* error) { |
1295 | 0 | SCOPED_TIMER(_read_timer); |
1296 | | // step1: read buf from pipe. |
1297 | 0 | if (_line_reader != nullptr) { |
1298 | 0 | RETURN_IF_ERROR(_line_reader->read_line(&_json_str, size, eof, _io_ctx)); |
1299 | 0 | } else { |
1300 | 0 | size_t length = 0; |
1301 | 0 | RETURN_IF_ERROR(_read_one_message(&_json_str_ptr, &length)); |
1302 | 0 | _json_str = _json_str_ptr.get(); |
1303 | 0 | *size = length; |
1304 | 0 | if (length == 0) { |
1305 | 0 | *eof = true; |
1306 | 0 | } |
1307 | 0 | } |
1308 | 0 | if (*eof) { |
1309 | 0 | return Status::OK(); |
1310 | 0 | } |
1311 | | |
1312 | | // step2: init json parser iterate. |
1313 | 0 | if (*size + simdjson::SIMDJSON_PADDING > _padded_size) { |
1314 | | // For efficiency reasons, simdjson requires a string with a few bytes (simdjson::SIMDJSON_PADDING) at the end. |
1315 | | // Hence, a re-allocation is needed if the space is not enough. |
1316 | 0 | _simdjson_ondemand_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING); |
1317 | 0 | _simdjson_ondemand_unscape_padding_buffer.resize(*size + simdjson::SIMDJSON_PADDING); |
1318 | 0 | _padded_size = *size + simdjson::SIMDJSON_PADDING; |
1319 | 0 | } |
1320 | | // trim BOM since simdjson does not handle UTF-8 Unicode (with BOM) |
1321 | 0 | if (*size >= 3 && static_cast<char>(_json_str[0]) == '\xEF' && |
1322 | 0 | static_cast<char>(_json_str[1]) == '\xBB' && static_cast<char>(_json_str[2]) == '\xBF') { |
1323 | | // skip the first three BOM bytes |
1324 | 0 | _json_str += 3; |
1325 | 0 | *size -= 3; |
1326 | 0 | } |
1327 | 0 | memcpy(&_simdjson_ondemand_padding_buffer.front(), _json_str, *size); |
1328 | 0 | _original_doc_size = *size; |
1329 | 0 | *error = _ondemand_json_parser |
1330 | 0 | ->iterate(std::string_view(_simdjson_ondemand_padding_buffer.data(), *size), |
1331 | 0 | _padded_size) |
1332 | 0 | .get(_original_json_doc); |
1333 | 0 | return Status::OK(); |
1334 | 0 | } |
1335 | | |
1336 | 0 | Status NewJsonReader::_judge_empty_row(size_t size, bool eof, bool* is_empty_row) { |
1337 | 0 | if (size == 0 || eof) { |
1338 | 0 | *is_empty_row = true; |
1339 | 0 | return Status::OK(); |
1340 | 0 | } |
1341 | | |
1342 | 0 | if (!_parsed_jsonpaths.empty() && _strip_outer_array) { |
1343 | 0 | _total_rows = _json_value.count_elements().value(); |
1344 | 0 | _next_row = 0; |
1345 | |
|
1346 | 0 | if (_total_rows == 0) { |
1347 | | // meet an empty json array. |
1348 | 0 | *is_empty_row = true; |
1349 | 0 | } |
1350 | 0 | } |
1351 | 0 | return Status::OK(); |
1352 | 0 | } |
1353 | | |
1354 | | Status NewJsonReader::_get_json_value(size_t* size, bool* eof, simdjson::error_code* error, |
1355 | 0 | bool* is_empty_row) { |
1356 | 0 | SCOPED_TIMER(_read_timer); |
1357 | 0 | auto return_quality_error = [&](fmt::memory_buffer& error_msg, |
1358 | 0 | const std::string& doc_info) -> Status { |
1359 | 0 | _counter->num_rows_filtered++; |
1360 | 0 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
1361 | 0 | [&]() -> std::string { return doc_info; }, |
1362 | 0 | [&]() -> std::string { return fmt::to_string(error_msg); })); |
1363 | 0 | if (*_scanner_eof) { |
1364 | | // Case A: if _scanner_eof is set to true in "append_error_msg_to_file", which means |
1365 | | // we meet enough invalid rows and the scanner should be stopped. |
1366 | | // So we set eof to true and return OK, the caller will stop the process as we meet the end of file. |
1367 | 0 | *eof = true; |
1368 | 0 | return Status::OK(); |
1369 | 0 | } |
1370 | 0 | return Status::DataQualityError(fmt::to_string(error_msg)); |
1371 | 0 | }; |
1372 | 0 | if (*error != simdjson::error_code::SUCCESS) { |
1373 | 0 | fmt::memory_buffer error_msg; |
1374 | 0 | fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", |
1375 | 0 | *error, simdjson::error_message(*error)); |
1376 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1377 | 0 | } |
1378 | 0 | auto type_res = _original_json_doc.type(); |
1379 | 0 | if (type_res.error() != simdjson::error_code::SUCCESS) { |
1380 | 0 | fmt::memory_buffer error_msg; |
1381 | 0 | fmt::format_to(error_msg, "Parse json data for JsonDoc failed. code: {}, error info: {}", |
1382 | 0 | type_res.error(), simdjson::error_message(type_res.error())); |
1383 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1384 | 0 | } |
1385 | 0 | simdjson::ondemand::json_type type = type_res.value(); |
1386 | 0 | if (type != simdjson::ondemand::json_type::object && |
1387 | 0 | type != simdjson::ondemand::json_type::array) { |
1388 | 0 | fmt::memory_buffer error_msg; |
1389 | 0 | fmt::format_to(error_msg, "Not an json object or json array"); |
1390 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1391 | 0 | } |
1392 | 0 | if (!_parsed_json_root.empty() && type == simdjson::ondemand::json_type::object) { |
1393 | 0 | try { |
1394 | | // set json root |
1395 | | // if it is an array at top level, then we should iterate the entire array in |
1396 | | // ::_simdjson_handle_flat_array_complex_json |
1397 | 0 | simdjson::ondemand::object object = _original_json_doc; |
1398 | 0 | Status st = JsonFunctions::extract_from_object(object, _parsed_json_root, &_json_value); |
1399 | 0 | if (!st.ok()) { |
1400 | 0 | fmt::memory_buffer error_msg; |
1401 | 0 | fmt::format_to(error_msg, "{}", st.to_string()); |
1402 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1403 | 0 | } |
1404 | 0 | _parsed_from_json_root = true; |
1405 | 0 | } catch (simdjson::simdjson_error& e) { |
1406 | 0 | fmt::memory_buffer error_msg; |
1407 | 0 | fmt::format_to(error_msg, "Encounter error while extract_from_object, error: {}", |
1408 | 0 | e.what()); |
1409 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1410 | 0 | } |
1411 | 0 | } else { |
1412 | 0 | _json_value = _original_json_doc; |
1413 | 0 | } |
1414 | | |
1415 | 0 | if (_json_value.type() == simdjson::ondemand::json_type::array && !_strip_outer_array) { |
1416 | 0 | fmt::memory_buffer error_msg; |
1417 | 0 | fmt::format_to(error_msg, "{}", |
1418 | 0 | "JSON data is array-object, `strip_outer_array` must be TRUE."); |
1419 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1420 | 0 | } |
1421 | | |
1422 | 0 | if (_json_value.type() != simdjson::ondemand::json_type::array && _strip_outer_array) { |
1423 | 0 | fmt::memory_buffer error_msg; |
1424 | 0 | fmt::format_to(error_msg, "{}", |
1425 | 0 | "JSON data is not an array-object, `strip_outer_array` must be FALSE."); |
1426 | 0 | return return_quality_error(error_msg, std::string((char*)_json_str, *size)); |
1427 | 0 | } |
1428 | 0 | RETURN_IF_ERROR(_judge_empty_row(*size, *eof, is_empty_row)); |
1429 | 0 | return Status::OK(); |
1430 | 0 | } |
1431 | | |
1432 | | Status NewJsonReader::_simdjson_write_columns_by_jsonpath( |
1433 | | simdjson::ondemand::object* value, const std::vector<SlotDescriptor*>& slot_descs, |
1434 | 0 | Block& block, bool* valid) { |
1435 | | // write by jsonpath |
1436 | 0 | bool has_valid_value = false; |
1437 | |
|
1438 | 0 | Defer clear_defer([this]() { _cached_string_values.clear(); }); |
1439 | |
|
1440 | 0 | for (size_t i = 0; i < slot_descs.size(); i++) { |
1441 | 0 | auto* slot_desc = slot_descs[i]; |
1442 | 0 | auto* column_ptr = block.get_by_position(i).column->assume_mutable().get(); |
1443 | 0 | simdjson::ondemand::value json_value; |
1444 | 0 | Status st; |
1445 | 0 | if (i < _parsed_jsonpaths.size()) { |
1446 | 0 | st = JsonFunctions::extract_from_object(*value, _parsed_jsonpaths[i], &json_value); |
1447 | 0 | if (!st.ok() && !st.is<NOT_FOUND>()) { |
1448 | 0 | return st; |
1449 | 0 | } |
1450 | 0 | } |
1451 | 0 | if (i < _parsed_jsonpaths.size() && JsonFunctions::is_root_path(_parsed_jsonpaths[i])) { |
1452 | | // Indicate that the jsonpath is "$" or "$.", read the full root json object, insert the original doc directly |
1453 | 0 | ColumnNullable* nullable_column = nullptr; |
1454 | 0 | IColumn* target_column_ptr = nullptr; |
1455 | 0 | if (slot_desc->is_nullable()) { |
1456 | 0 | nullable_column = assert_cast<ColumnNullable*>(column_ptr); |
1457 | 0 | target_column_ptr = &nullable_column->get_nested_column(); |
1458 | 0 | nullable_column->get_null_map_data().push_back(0); |
1459 | 0 | } |
1460 | 0 | auto* column_string = assert_cast<ColumnString*>(target_column_ptr); |
1461 | 0 | column_string->insert_data(_simdjson_ondemand_padding_buffer.data(), |
1462 | 0 | _original_doc_size); |
1463 | 0 | has_valid_value = true; |
1464 | 0 | } else if (i >= _parsed_jsonpaths.size() || st.is<NOT_FOUND>()) { |
1465 | | // not match in jsondata, filling with default value |
1466 | 0 | RETURN_IF_ERROR(_fill_missing_column(slot_desc, _serdes[i], column_ptr, valid)); |
1467 | 0 | if (!(*valid)) { |
1468 | 0 | return Status::OK(); |
1469 | 0 | } |
1470 | 0 | } else { |
1471 | 0 | RETURN_IF_ERROR(_simdjson_write_data_to_column<true>(json_value, slot_desc->type(), |
1472 | 0 | column_ptr, slot_desc->col_name(), |
1473 | 0 | _serdes[i], valid)); |
1474 | 0 | if (!(*valid)) { |
1475 | 0 | return Status::OK(); |
1476 | 0 | } |
1477 | 0 | has_valid_value = true; |
1478 | 0 | } |
1479 | 0 | } |
1480 | 0 | if (!has_valid_value) { |
1481 | | // there is no valid value in json line but has filled with default value before |
1482 | | // so remove this line in block |
1483 | 0 | std::string col_names; |
1484 | 0 | for (int i = 0; i < block.columns(); ++i) { |
1485 | 0 | auto column = block.get_by_position(i).column->assume_mutable(); |
1486 | 0 | column->pop_back(1); |
1487 | 0 | } |
1488 | 0 | for (auto* slot_desc : slot_descs) { |
1489 | 0 | col_names.append(slot_desc->col_name() + ", "); |
1490 | 0 | } |
1491 | 0 | RETURN_IF_ERROR(_append_error_msg(value, |
1492 | 0 | "There is no column matching jsonpaths in the json file, " |
1493 | 0 | "columns:[{}], please check columns " |
1494 | 0 | "and jsonpaths:" + |
1495 | 0 | _jsonpaths, |
1496 | 0 | col_names, valid)); |
1497 | 0 | return Status::OK(); |
1498 | 0 | } |
1499 | 0 | *valid = true; |
1500 | 0 | return Status::OK(); |
1501 | 0 | } |
1502 | | |
1503 | | Status NewJsonReader::_get_column_default_value( |
1504 | | const std::vector<SlotDescriptor*>& slot_descs, |
1505 | 0 | const std::unordered_map<std::string, VExprContextSPtr>& col_default_value_ctx) { |
1506 | 0 | for (auto* slot_desc : slot_descs) { |
1507 | 0 | auto it = col_default_value_ctx.find(slot_desc->col_name()); |
1508 | 0 | if (it != col_default_value_ctx.end() && it->second != nullptr) { |
1509 | 0 | const auto& ctx = it->second; |
1510 | | // NULL_LITERAL means no valid value of current column |
1511 | 0 | if (ctx->root()->node_type() == TExprNodeType::type::NULL_LITERAL) { |
1512 | 0 | continue; |
1513 | 0 | } |
1514 | 0 | ColumnWithTypeAndName result; |
1515 | 0 | RETURN_IF_ERROR(ctx->execute_const_expr(result)); |
1516 | 0 | DCHECK(result.column->size() == 1); |
1517 | 0 | _col_default_value_map.emplace(slot_desc->col_name(), |
1518 | 0 | result.column->get_data_at(0).to_string()); |
1519 | 0 | } |
1520 | 0 | } |
1521 | 0 | return Status::OK(); |
1522 | 0 | } |
1523 | | |
1524 | | Status NewJsonReader::_fill_missing_column(SlotDescriptor* slot_desc, DataTypeSerDeSPtr serde, |
1525 | 0 | IColumn* column_ptr, bool* valid) { |
1526 | 0 | auto col_value = _col_default_value_map.find(slot_desc->col_name()); |
1527 | 0 | if (col_value == _col_default_value_map.end()) { |
1528 | 0 | if (slot_desc->is_nullable()) { |
1529 | 0 | auto* nullable_column = static_cast<ColumnNullable*>(column_ptr); |
1530 | 0 | nullable_column->insert_default(); |
1531 | 0 | } else { |
1532 | 0 | if (_is_load) { |
1533 | 0 | RETURN_IF_ERROR(_append_error_msg( |
1534 | 0 | nullptr, "The column `{}` is not nullable, but it's not found in jsondata.", |
1535 | 0 | slot_desc->col_name(), valid)); |
1536 | 0 | } else { |
1537 | 0 | return Status::DataQualityError( |
1538 | 0 | "The column `{}` is not nullable, but it's not found in jsondata.", |
1539 | 0 | slot_desc->col_name()); |
1540 | 0 | } |
1541 | 0 | } |
1542 | 0 | } else { |
1543 | 0 | const std::string& v_str = col_value->second; |
1544 | 0 | Slice column_default_value {v_str}; |
1545 | 0 | RETURN_IF_ERROR(serde->deserialize_one_cell_from_json(*column_ptr, column_default_value, |
1546 | 0 | _serde_options)); |
1547 | 0 | } |
1548 | 0 | *valid = true; |
1549 | 0 | return Status::OK(); |
1550 | 0 | } |
1551 | | |
1552 | 0 | void NewJsonReader::_append_empty_skip_bitmap_value(Block& block, size_t cur_row_count) { |
1553 | 0 | auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>( |
1554 | 0 | block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()); |
1555 | 0 | auto* skip_bitmap_col_ptr = |
1556 | 0 | assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get()); |
1557 | 0 | DCHECK(skip_bitmap_nullable_col_ptr->size() == cur_row_count); |
1558 | | // should append an empty bitmap for every row wheather this line misses columns |
1559 | 0 | skip_bitmap_nullable_col_ptr->get_null_map_data().push_back(0); |
1560 | 0 | skip_bitmap_col_ptr->insert_default(); |
1561 | 0 | DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1); |
1562 | 0 | } |
1563 | | |
1564 | | void NewJsonReader::_set_skip_bitmap_mark(SlotDescriptor* slot_desc, IColumn* column_ptr, |
1565 | 0 | Block& block, size_t cur_row_count, bool* valid) { |
1566 | | // we record the missing column's column unique id in skip bitmap |
1567 | | // to indicate which columns need to do the alignment process |
1568 | 0 | auto* skip_bitmap_nullable_col_ptr = assert_cast<ColumnNullable*>( |
1569 | 0 | block.get_by_position(skip_bitmap_col_idx).column->assume_mutable().get()); |
1570 | 0 | auto* skip_bitmap_col_ptr = |
1571 | 0 | assert_cast<ColumnBitmap*>(skip_bitmap_nullable_col_ptr->get_nested_column_ptr().get()); |
1572 | 0 | DCHECK(skip_bitmap_col_ptr->size() == cur_row_count + 1); |
1573 | 0 | auto& skip_bitmap = skip_bitmap_col_ptr->get_data().back(); |
1574 | 0 | skip_bitmap.add(slot_desc->col_unique_id()); |
1575 | 0 | } |
1576 | | |
1577 | 0 | void NewJsonReader::_collect_profile_before_close() { |
1578 | 0 | if (_line_reader != nullptr) { |
1579 | 0 | _line_reader->collect_profile_before_close(); |
1580 | 0 | } |
1581 | 0 | if (_file_reader != nullptr) { |
1582 | 0 | _file_reader->collect_profile_before_close(); |
1583 | 0 | } |
1584 | 0 | } |
1585 | | |
1586 | | #include "common/compile_check_end.h" |
1587 | | } // namespace doris |