be/src/format/csv/csv_reader.cpp
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #include "format/csv/csv_reader.h" |
19 | | |
20 | | #include <fmt/format.h> |
21 | | #include <gen_cpp/PlanNodes_types.h> |
22 | | #include <gen_cpp/Types_types.h> |
23 | | #include <glog/logging.h> |
24 | | |
25 | | #include <algorithm> |
26 | | #include <cstddef> |
27 | | #include <map> |
28 | | #include <memory> |
29 | | #include <numeric> |
30 | | #include <ostream> |
31 | | #include <regex> |
32 | | #include <utility> |
33 | | |
34 | | #include "common/compiler_util.h" // IWYU pragma: keep |
35 | | #include "common/config.h" |
36 | | #include "common/consts.h" |
37 | | #include "common/status.h" |
38 | | #include "core/block/block.h" |
39 | | #include "core/block/column_with_type_and_name.h" |
40 | | #include "core/column/column_nullable.h" |
41 | | #include "core/column/column_string.h" |
42 | | #include "core/data_type/data_type_factory.hpp" |
43 | | #include "core/data_type_serde/data_type_string_serde.h" |
44 | | #include "exec/scan/scanner.h" |
45 | | #include "format/file_reader/new_plain_binary_line_reader.h" |
46 | | #include "format/file_reader/new_plain_text_line_reader.h" |
47 | | #include "format/line_reader.h" |
48 | | #include "io/file_factory.h" |
49 | | #include "io/fs/broker_file_reader.h" |
50 | | #include "io/fs/buffered_reader.h" |
51 | | #include "io/fs/file_reader.h" |
52 | | #include "io/fs/s3_file_reader.h" |
53 | | #include "io/fs/tracing_file_reader.h" |
54 | | #include "runtime/descriptors.h" |
55 | | #include "runtime/runtime_state.h" |
56 | | #include "util/decompressor.h" |
57 | | #include "util/string_util.h" |
58 | | #include "util/utf8_check.h" |
59 | | |
60 | | namespace doris { |
61 | | class RuntimeProfile; |
62 | | class IColumn; |
63 | | namespace io { |
64 | | struct IOContext; |
65 | | enum class FileCachePolicy : uint8_t; |
66 | | } // namespace io |
67 | | } // namespace doris |
68 | | |
69 | | namespace doris { |
70 | | |
71 | | namespace { |
72 | | |
73 | 15.1M | size_t columns_byte_size(const std::vector<MutableColumnPtr>& columns) { |
74 | 15.1M | size_t bytes = 0; |
75 | 212M | for (const auto& column : columns) { |
76 | 212M | DCHECK(column.get() != nullptr); |
77 | 212M | bytes += column->byte_size(); |
78 | 212M | } |
79 | 15.1M | return bytes; |
80 | 15.1M | } |
81 | | |
82 | | } // namespace |
83 | | |
84 | 878 | void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) { |
85 | 878 | const char* data = line.data; |
86 | 878 | const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions(); |
87 | 878 | size_t value_start_offset = 0; |
88 | 2.98k | for (auto idx : column_sep_positions) { |
89 | 2.98k | process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char, |
90 | 2.98k | splitted_values); |
91 | 2.98k | value_start_offset = idx + _value_sep_len; |
92 | 2.98k | } |
93 | 878 | if (line.size >= value_start_offset) { |
94 | | // process the last column |
95 | 876 | process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char, |
96 | 876 | splitted_values); |
97 | 876 | } |
98 | 878 | } |
99 | | |
100 | | void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line, |
101 | 14.9M | std::vector<Slice>* splitted_values) { |
102 | 14.9M | const char* data = line.data; |
103 | 14.9M | const size_t size = line.size; |
104 | 14.9M | size_t value_start = 0; |
105 | 3.59G | for (size_t i = 0; i < size; ++i) { |
106 | 3.57G | if (data[i] == _value_sep[0]) { |
107 | 445M | process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values); |
108 | 445M | value_start = i + _value_sep_len; |
109 | 445M | } |
110 | 3.57G | } |
111 | 14.9M | process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values); |
112 | 14.9M | } |
113 | | |
114 | | void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line, |
115 | 2.38k | std::vector<Slice>* splitted_values) { |
116 | 2.38k | size_t start = 0; // point to the start pos of next col value. |
117 | 2.38k | size_t curpos = 0; // point to the start pos of separator matching sequence. |
118 | | |
119 | | // value_sep : AAAA |
120 | | // line.data : 1234AAAA5678 |
121 | | // -> 1234,5678 |
122 | | |
123 | | // start start |
124 | | // ▼ ▼ |
125 | | // 1234AAAA5678\0 |
126 | | // ▲ ▲ |
127 | | // curpos curpos |
128 | | |
129 | | //kmp |
130 | 2.38k | std::vector<int> next(_value_sep_len); |
131 | 2.38k | next[0] = -1; |
132 | 4.96k | for (int i = 1, j = -1; i < _value_sep_len; i++) { |
133 | 2.60k | while (j > -1 && _value_sep[i] != _value_sep[j + 1]) { |
134 | 20 | j = next[j]; |
135 | 20 | } |
136 | 2.58k | if (_value_sep[i] == _value_sep[j + 1]) { |
137 | 2.44k | j++; |
138 | 2.44k | } |
139 | 2.58k | next[i] = j; |
140 | 2.58k | } |
141 | | |
142 | 42.1k | for (int i = 0, j = -1; i < line.size; i++) { |
143 | | // i : line |
144 | | // j : _value_sep |
145 | 42.4k | while (j > -1 && line[i] != _value_sep[j + 1]) { |
146 | 2.66k | j = next[j]; |
147 | 2.66k | } |
148 | 39.7k | if (line[i] == _value_sep[j + 1]) { |
149 | 7.02k | j++; |
150 | 7.02k | } |
151 | 39.7k | if (j == _value_sep_len - 1) { |
152 | 3.19k | curpos = i - _value_sep_len + 1; |
153 | | |
154 | | /* |
155 | | * column_separator : "xx" |
156 | | * data.csv : data1xxxxdata2 |
157 | | * |
158 | | * Parse incorrectly: |
159 | | * data1[xx]xxdata2 |
160 | | * data1x[xx]xdata2 |
161 | | * data1xx[xx]data2 |
162 | | * The string "xxxx" is parsed into three "xx" delimiters. |
163 | | * |
164 | | * Parse correctly: |
165 | | * data1[xx]xxdata2 |
166 | | * data1xx[xx]data2 |
167 | | */ |
168 | | |
169 | 3.19k | if (curpos >= start) { |
170 | 3.13k | process_value_func(line.data, start, curpos - start, _trimming_char, |
171 | 3.13k | splitted_values); |
172 | 3.13k | start = i + 1; |
173 | 3.13k | } |
174 | | |
175 | 3.19k | j = next[j]; |
176 | 3.19k | } |
177 | 39.7k | } |
178 | 2.38k | process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values); |
179 | 2.38k | } |
180 | | |
181 | 14.9M | void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) { |
182 | 14.9M | if (is_single_char_delim) { |
183 | 14.9M | _split_field_single_char(line, splitted_values); |
184 | 18.4E | } else { |
185 | 18.4E | _split_field_multi_char(line, splitted_values); |
186 | 18.4E | } |
187 | 14.9M | } |
188 | | |
189 | | CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
190 | | const TFileScanRangeParams& params, const TFileRangeDesc& range, |
191 | | const std::vector<SlotDescriptor*>& file_slot_descs, size_t batch_size, |
192 | | io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder) |
193 | 8.41k | : _profile(profile), |
194 | 8.41k | _params(params), |
195 | 8.41k | _file_reader(nullptr), |
196 | 8.41k | _line_reader(nullptr), |
197 | 8.41k | _decompressor(nullptr), |
198 | 8.41k | _state(state), |
199 | 8.41k | _counter(counter), |
200 | 8.41k | _range(range), |
201 | 8.41k | _file_slot_descs(file_slot_descs), |
202 | 8.41k | _line_reader_eof(false), |
203 | 8.41k | _skip_lines(0), |
204 | 8.41k | _io_ctx(io_ctx), |
205 | 8.41k | _io_ctx_holder(std::move(io_ctx_holder)), |
206 | 8.41k | _batch_size(std::max(batch_size, 1UL)) { |
207 | 8.41k | if (_io_ctx == nullptr && _io_ctx_holder) { |
208 | 7.59k | _io_ctx = _io_ctx_holder.get(); |
209 | 7.59k | } |
210 | 8.41k | _file_format_type = _params.format_type; |
211 | 8.41k | _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO; |
212 | 8.41k | if (_range.__isset.compress_type) { |
213 | | // for compatibility |
214 | 6.42k | _file_compress_type = _range.compress_type; |
215 | 6.42k | } else { |
216 | 1.99k | _file_compress_type = _params.compress_type; |
217 | 1.99k | } |
218 | 8.41k | _size = _range.size; |
219 | | |
220 | 8.41k | _split_values.reserve(_file_slot_descs.size()); |
221 | 8.41k | _init_system_properties(); |
222 | 8.41k | _init_file_description(); |
223 | 8.41k | _serdes = create_data_type_serdes(_file_slot_descs); |
224 | 8.41k | } |
225 | | |
226 | 8.40k | void CsvReader::_init_system_properties() { |
227 | 8.40k | if (_range.__isset.file_type) { |
228 | | // for compatibility |
229 | 6.26k | _system_properties.system_type = _range.file_type; |
230 | 6.26k | } else { |
231 | 2.14k | _system_properties.system_type = _params.file_type; |
232 | 2.14k | } |
233 | 8.40k | _system_properties.properties = _params.properties; |
234 | 8.40k | _system_properties.hdfs_params = _params.hdfs_params; |
235 | 8.40k | if (_params.__isset.broker_addresses) { |
236 | 1.87k | _system_properties.broker_addresses.assign(_params.broker_addresses.begin(), |
237 | 1.87k | _params.broker_addresses.end()); |
238 | 1.87k | } |
239 | 8.40k | } |
240 | | |
241 | 8.40k | void CsvReader::_init_file_description() { |
242 | 8.40k | _file_description.path = _range.path; |
243 | 8.40k | _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1; |
244 | 8.40k | if (_range.__isset.fs_name) { |
245 | 5.10k | _file_description.fs_name = _range.fs_name; |
246 | 5.10k | } |
247 | 8.40k | if (_range.__isset.file_cache_admission) { |
248 | 5.60k | _file_description.file_cache_admission = _range.file_cache_admission; |
249 | 5.60k | } |
250 | 8.40k | } |
251 | | |
252 | 0 | Status CsvReader::init_reader(bool is_load) { |
253 | | // set the skip lines and start offset |
254 | 0 | _start_offset = _range.start_offset; |
255 | 0 | if (_start_offset == 0) { |
256 | | // check header typer first |
257 | 0 | if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && |
258 | 0 | !_params.file_attributes.header_type.empty()) { |
259 | 0 | std::string header_type = to_lower(_params.file_attributes.header_type); |
260 | 0 | if (header_type == BeConsts::CSV_WITH_NAMES) { |
261 | 0 | _skip_lines = 1; |
262 | 0 | } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { |
263 | 0 | _skip_lines = 2; |
264 | 0 | } |
265 | 0 | } else if (_params.file_attributes.__isset.skip_lines) { |
266 | 0 | _skip_lines = _params.file_attributes.skip_lines; |
267 | 0 | } |
268 | 0 | } else if (_start_offset != 0) { |
269 | 0 | if ((_file_compress_type != TFileCompressType::PLAIN) || |
270 | 0 | (_file_compress_type == TFileCompressType::UNKNOWN && |
271 | 0 | _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) { |
272 | 0 | return Status::InternalError<false>("For now we do not support split compressed file"); |
273 | 0 | } |
274 | | // pre-read to promise first line skipped always read |
275 | 0 | int64_t pre_read_len = std::min( |
276 | 0 | static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()), |
277 | 0 | _start_offset); |
278 | 0 | _start_offset -= pre_read_len; |
279 | 0 | _size += pre_read_len; |
280 | | // not first range will always skip one line |
281 | 0 | _skip_lines = 1; |
282 | 0 | } |
283 | | |
284 | 0 | _use_nullable_string_opt.resize(_file_slot_descs.size()); |
285 | 0 | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
286 | 0 | auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr(); |
287 | 0 | if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) { |
288 | 0 | _use_nullable_string_opt[i] = 1; |
289 | 0 | } |
290 | 0 | } |
291 | |
|
292 | 0 | RETURN_IF_ERROR(_init_options()); |
293 | 0 | RETURN_IF_ERROR(_create_file_reader(false)); |
294 | 0 | RETURN_IF_ERROR(_create_decompressor()); |
295 | 0 | RETURN_IF_ERROR(_create_line_reader()); |
296 | | |
297 | 0 | _is_load = is_load; |
298 | 0 | if (!_is_load) { |
299 | | // For query task, there are 2 slot mapping. |
300 | | // One is from file slot to values in line. |
301 | | // eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5 |
302 | | // the _col_idxs will save: 0, 2, 4 |
303 | | // The other is from file slot to columns in output block |
304 | | // eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5 |
305 | | // where "p1" is the partition col which does not exist in file |
306 | | // the _file_slot_idx_map will save: 1, 2, 3 |
307 | 0 | DCHECK(_params.__isset.column_idxs); |
308 | 0 | _col_idxs = _params.column_idxs; |
309 | 0 | int idx = 0; |
310 | 0 | for (const auto& slot_info : _params.required_slots) { |
311 | 0 | if (slot_info.is_file_slot) { |
312 | 0 | _file_slot_idx_map.push_back(idx); |
313 | 0 | } |
314 | 0 | idx++; |
315 | 0 | } |
316 | 0 | } else { |
317 | | // For load task, the column order is same as file column order |
318 | 0 | int i = 0; |
319 | 0 | for (const auto& desc [[maybe_unused]] : _file_slot_descs) { |
320 | 0 | _col_idxs.push_back(i++); |
321 | 0 | } |
322 | 0 | } |
323 | |
|
324 | 0 | _line_reader_eof = false; |
325 | 0 | return Status::OK(); |
326 | 0 | } |
327 | | |
328 | | // ---- Unified init_reader(ReaderInitContext*) overrides ---- |
329 | | |
330 | 7.58k | Status CsvReader::_open_file_reader(ReaderInitContext* base_ctx) { |
331 | 7.58k | _start_offset = _range.start_offset; |
332 | 7.58k | if (_start_offset == 0) { |
333 | 7.46k | if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && |
334 | 7.46k | !_params.file_attributes.header_type.empty()) { |
335 | 92 | std::string header_type = to_lower(_params.file_attributes.header_type); |
336 | 92 | if (header_type == BeConsts::CSV_WITH_NAMES) { |
337 | 64 | _skip_lines = 1; |
338 | 64 | } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { |
339 | 28 | _skip_lines = 2; |
340 | 28 | } |
341 | 7.36k | } else if (_params.file_attributes.__isset.skip_lines) { |
342 | 7.36k | _skip_lines = _params.file_attributes.skip_lines; |
343 | 7.36k | } |
344 | 7.46k | } else if (_start_offset != 0) { |
345 | 130 | if ((_file_compress_type != TFileCompressType::PLAIN) || |
346 | 130 | (_file_compress_type == TFileCompressType::UNKNOWN && |
347 | 130 | _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) { |
348 | 0 | return Status::InternalError<false>("For now we do not support split compressed file"); |
349 | 0 | } |
350 | 130 | int64_t pre_read_len = std::min( |
351 | 130 | static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()), |
352 | 130 | _start_offset); |
353 | 130 | _start_offset -= pre_read_len; |
354 | 130 | _size += pre_read_len; |
355 | 130 | _skip_lines = 1; |
356 | 130 | } |
357 | | |
358 | 7.58k | RETURN_IF_ERROR(_init_options()); |
359 | 7.58k | RETURN_IF_ERROR(_create_file_reader(false)); |
360 | 7.58k | return Status::OK(); |
361 | 7.58k | } |
362 | | |
363 | 7.58k | Status CsvReader::_do_init_reader(ReaderInitContext* base_ctx) { |
364 | 7.58k | auto* ctx = checked_context_cast<CsvInitContext>(base_ctx); |
365 | 7.58k | _is_load = ctx->is_load; |
366 | | |
367 | 7.58k | _use_nullable_string_opt.resize(_file_slot_descs.size()); |
368 | 223k | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
369 | 216k | auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr(); |
370 | 216k | if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) { |
371 | 47.6k | _use_nullable_string_opt[i] = 1; |
372 | 47.6k | } |
373 | 216k | } |
374 | | |
375 | 7.58k | RETURN_IF_ERROR(_create_decompressor()); |
376 | 7.58k | RETURN_IF_ERROR(_create_line_reader()); |
377 | | |
378 | 7.58k | if (!_is_load) { |
379 | 5.72k | DCHECK(_params.__isset.column_idxs); |
380 | 5.72k | _col_idxs = _params.column_idxs; |
381 | 5.72k | int idx = 0; |
382 | 200k | for (const auto& slot_info : _params.required_slots) { |
383 | 200k | if (slot_info.is_file_slot) { |
384 | 198k | _file_slot_idx_map.push_back(idx); |
385 | 198k | } |
386 | 200k | idx++; |
387 | 200k | } |
388 | 5.72k | } else { |
389 | 1.86k | int i = 0; |
390 | 17.8k | for (const auto& desc [[maybe_unused]] : _file_slot_descs) { |
391 | 17.8k | _col_idxs.push_back(i++); |
392 | 17.8k | } |
393 | 1.86k | } |
394 | 7.58k | _line_reader_eof = false; |
395 | 7.58k | return Status::OK(); |
396 | 7.58k | } |
397 | | |
398 | 18.4k | void CsvReader::set_batch_size(size_t batch_size) { |
399 | | // 0 means "not set" / "use default" for the row-based readers; we must |
400 | | // never let _batch_size be 0 because _do_get_next_block uses it as the |
401 | | // upper bound of a `while (rows < _batch_size)` loop and a 0 would make |
402 | | // the reader return empty blocks and incorrectly signal EOF. |
403 | 18.4k | _batch_size = std::max(batch_size, 1UL); |
404 | 18.4k | } |
405 | | |
406 | | // !FIXME: Here we should use MutableBlock |
407 | 19.7k | Status CsvReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) { |
408 | 19.7k | if (_line_reader_eof) { |
409 | 7.52k | *eof = true; |
410 | 7.52k | return Status::OK(); |
411 | 7.52k | } |
412 | | |
413 | 12.1k | const size_t batch_size = _batch_size; |
414 | 12.1k | const auto max_block_bytes = _state->preferred_block_size_bytes(); |
415 | 12.1k | size_t rows = 0; |
416 | | |
417 | 12.1k | bool success = false; |
418 | 12.1k | bool is_remove_bom = false; |
419 | 12.1k | if (_push_down_agg_type == TPushAggOp::type::COUNT) { |
420 | 141k | while (rows < batch_size && !_line_reader_eof) { |
421 | 140k | const uint8_t* ptr = nullptr; |
422 | 140k | size_t size = 0; |
423 | 140k | RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
424 | | |
425 | | // _skip_lines == 0 means this line is the actual data beginning line for the entire file |
426 | | // is_remove_bom means _remove_bom should only execute once |
427 | 141k | if (_skip_lines == 0 && !is_remove_bom) { |
428 | 606 | ptr = _remove_bom(ptr, size); |
429 | 606 | is_remove_bom = true; |
430 | 606 | } |
431 | | |
432 | | // _skip_lines > 0 means we do not need to remove bom |
433 | 140k | if (_skip_lines > 0) { |
434 | 0 | _skip_lines--; |
435 | 0 | is_remove_bom = true; |
436 | 0 | continue; |
437 | 0 | } |
438 | 140k | if (size == 0) { |
439 | 606 | if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) { |
440 | 0 | ++rows; |
441 | 0 | } |
442 | | // Read empty line, continue |
443 | 606 | continue; |
444 | 606 | } |
445 | | |
446 | 140k | RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); |
447 | 140k | ++rows; |
448 | 140k | } |
449 | 606 | auto mutable_columns_guard = block->mutate_columns_scoped(); |
450 | 606 | auto& mutate_columns = mutable_columns_guard.mutable_columns(); |
451 | 606 | for (auto& col : mutate_columns) { |
452 | 606 | col->resize(rows); |
453 | 606 | } |
454 | 11.5k | } else { |
455 | 11.5k | auto columns_guard = block->mutate_columns_scoped(); |
456 | 11.5k | auto& columns = columns_guard.mutable_columns(); |
457 | 11.5k | _reserve_nullable_string_columns(columns, batch_size); |
458 | 15.1M | while (rows < batch_size && !_line_reader_eof && |
459 | 15.1M | (columns_byte_size(columns) < max_block_bytes)) { |
460 | 15.0M | const uint8_t* ptr = nullptr; |
461 | 15.0M | size_t size = 0; |
462 | 15.0M | RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
463 | | |
464 | | // _skip_lines == 0 means this line is the actual data beginning line for the entire file |
465 | | // is_remove_bom means _remove_bom should only execute once |
466 | 15.0M | if (!is_remove_bom && _skip_lines == 0) { |
467 | 11.3k | ptr = _remove_bom(ptr, size); |
468 | 11.3k | is_remove_bom = true; |
469 | 11.3k | } |
470 | | |
471 | | // _skip_lines > 0 means we do not remove bom |
472 | 15.0M | if (_skip_lines > 0) { |
473 | 319 | _skip_lines--; |
474 | 319 | is_remove_bom = true; |
475 | 319 | continue; |
476 | 319 | } |
477 | 15.0M | if (size == 0) { |
478 | 7.02k | if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) { |
479 | 12 | RETURN_IF_ERROR(_fill_empty_line(columns, &rows)); |
480 | 12 | } |
481 | | // Read empty line, continue |
482 | 7.02k | continue; |
483 | 7.02k | } |
484 | | |
485 | 15.0M | RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success)); |
486 | 15.0M | if (!success) { |
487 | 128 | continue; |
488 | 128 | } |
489 | 15.0M | RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns, &rows)); |
490 | 15.0M | } |
491 | 11.5k | } |
492 | | |
493 | 12.1k | *eof = (rows == 0); |
494 | 12.1k | *read_rows = rows; |
495 | | |
496 | 12.1k | return Status::OK(); |
497 | 12.1k | } |
498 | | |
499 | 7.59k | Status CsvReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) { |
500 | 216k | for (const auto& slot : _file_slot_descs) { |
501 | 216k | name_to_type->emplace(slot->col_name(), slot->type()); |
502 | 216k | } |
503 | 7.59k | return Status::OK(); |
504 | 7.59k | } |
505 | | |
506 | | // init decompressor, file reader and line reader for parsing schema |
507 | 812 | Status CsvReader::init_schema_reader() { |
508 | 812 | _start_offset = _range.start_offset; |
509 | 812 | if (_start_offset != 0) { |
510 | 0 | return Status::InvalidArgument( |
511 | 0 | "start offset of TFileRangeDesc must be zero in get parsered schema"); |
512 | 0 | } |
513 | 812 | if (_params.file_type == TFileType::FILE_BROKER) { |
514 | 0 | return Status::InternalError<false>( |
515 | 0 | "Getting parsered schema from csv file do not support stream load and broker " |
516 | 0 | "load."); |
517 | 0 | } |
518 | | |
519 | | // csv file without names line and types line. |
520 | 812 | _read_line = 1; |
521 | 812 | _is_parse_name = false; |
522 | | |
523 | 812 | if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type && |
524 | 812 | !_params.file_attributes.header_type.empty()) { |
525 | 92 | std::string header_type = to_lower(_params.file_attributes.header_type); |
526 | 92 | if (header_type == BeConsts::CSV_WITH_NAMES) { |
527 | 63 | _is_parse_name = true; |
528 | 63 | } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) { |
529 | 29 | _read_line = 2; |
530 | 29 | _is_parse_name = true; |
531 | 29 | } |
532 | 92 | } |
533 | | |
534 | 812 | RETURN_IF_ERROR(_init_options()); |
535 | 812 | RETURN_IF_ERROR(_create_file_reader(true)); |
536 | 812 | RETURN_IF_ERROR(_create_decompressor()); |
537 | 812 | RETURN_IF_ERROR(_create_line_reader()); |
538 | 812 | return Status::OK(); |
539 | 812 | } |
540 | | |
541 | | Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names, |
542 | 812 | std::vector<DataTypePtr>* col_types) { |
543 | 812 | if (_read_line == 1) { |
544 | 783 | if (!_is_parse_name) { //parse csv file without names and types |
545 | 720 | size_t col_nums = 0; |
546 | 720 | RETURN_IF_ERROR(_parse_col_nums(&col_nums)); |
547 | 8.00k | for (size_t i = 0; i < col_nums; ++i) { |
548 | 7.28k | col_names->emplace_back("c" + std::to_string(i + 1)); |
549 | 7.28k | } |
550 | 713 | } else { // parse csv file with names |
551 | 63 | RETURN_IF_ERROR(_parse_col_names(col_names)); |
552 | 63 | } |
553 | | |
554 | 8.28k | for (size_t j = 0; j < col_names->size(); ++j) { |
555 | 7.50k | col_types->emplace_back( |
556 | 7.50k | DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true)); |
557 | 7.50k | } |
558 | 776 | } else { // parse csv file with names and types |
559 | 29 | RETURN_IF_ERROR(_parse_col_names(col_names)); |
560 | 29 | RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types)); |
561 | 29 | } |
562 | 805 | return Status::OK(); |
563 | 812 | } |
564 | | |
565 | 213M | Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) { |
566 | | // This is the per-row per-column hot path of CSV load (load reads every column as |
567 | | // nullable string). The column type was already verified by the checked assert_cast |
568 | | // in _reserve_nullable_string_columns at the beginning of the batch, so the casts |
569 | | // here can skip the release-build typeid check. |
570 | 213M | auto& null_column = assert_cast<ColumnNullable&, TypeCheckOnRelease::DISABLE>(column); |
571 | 213M | auto& string_column = assert_cast<ColumnString&, TypeCheckOnRelease::DISABLE>( |
572 | 213M | null_column.get_nested_column()); |
573 | 213M | if (_empty_field_as_null && slice.size == 0) { |
574 | 5 | string_column.insert_default(); |
575 | 5 | null_column.get_null_map_data().push_back(1); |
576 | 5 | return Status::OK(); |
577 | 5 | } |
578 | 213M | if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) { |
579 | 210M | if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) { |
580 | 31.6k | string_column.insert_default(); |
581 | 31.6k | null_column.get_null_map_data().push_back(1); |
582 | 31.6k | return Status::OK(); |
583 | 31.6k | } |
584 | 210M | } |
585 | | // Same as DataTypeStringSerDe::deserialize_one_cell_from_csv (which never fails), |
586 | | // written out here to skip the SerDe layer and its per-cell assert_cast. |
587 | 213M | if (_options.escape_char != 0) { |
588 | 3.62k | escape_string_for_csv(slice.data, &slice.size, _options.escape_char, _options.quote_char); |
589 | 3.62k | } |
590 | 213M | string_column.insert_data(slice.data, slice.size); |
591 | 213M | null_column.get_null_map_data().push_back(0); |
592 | 213M | return Status::OK(); |
593 | 213M | } |
594 | | |
595 | 3.62k | Status CsvReader::_init_options() { |
596 | | // get column_separator and line_delimiter |
597 | 3.62k | _value_separator = _params.file_attributes.text_params.column_separator; |
598 | 3.62k | _value_separator_length = _value_separator.size(); |
599 | 3.62k | _line_delimiter = _params.file_attributes.text_params.line_delimiter; |
600 | 3.62k | _line_delimiter_length = _line_delimiter.size(); |
601 | 3.62k | if (_params.file_attributes.text_params.__isset.enclose) { |
602 | 3.61k | _enclose = _params.file_attributes.text_params.enclose; |
603 | 3.61k | } |
604 | 3.62k | if (_params.file_attributes.text_params.__isset.escape) { |
605 | 3.61k | _escape = _params.file_attributes.text_params.escape; |
606 | 3.61k | } |
607 | | |
608 | 3.62k | _trim_tailing_spaces = |
609 | 3.62k | (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query()); |
610 | | |
611 | 3.62k | _options.escape_char = _escape; |
612 | 3.62k | _options.quote_char = _enclose; |
613 | | |
614 | 3.62k | if (_params.file_attributes.text_params.collection_delimiter.empty()) { |
615 | 3.62k | _options.collection_delim = ','; |
616 | 3.62k | } else { |
617 | 0 | _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0]; |
618 | 0 | } |
619 | 3.62k | if (_params.file_attributes.text_params.mapkv_delimiter.empty()) { |
620 | 3.61k | _options.map_key_delim = ':'; |
621 | 3.61k | } else { |
622 | 2 | _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0]; |
623 | 2 | } |
624 | | |
625 | 3.62k | if (_params.file_attributes.text_params.__isset.null_format) { |
626 | 34 | _options.null_format = _params.file_attributes.text_params.null_format.data(); |
627 | 34 | _options.null_len = _params.file_attributes.text_params.null_format.length(); |
628 | 34 | } |
629 | | |
630 | 3.62k | if (_params.file_attributes.__isset.trim_double_quotes) { |
631 | 3.61k | _trim_double_quotes = _params.file_attributes.trim_double_quotes; |
632 | 3.61k | } |
633 | 3.62k | _options.converted_from_string = _trim_double_quotes; |
634 | | |
635 | 3.62k | if (_state != nullptr) { |
636 | 2.81k | _keep_cr = _state->query_options().keep_carriage_return; |
637 | 2.81k | } |
638 | | |
639 | 3.62k | if (_params.file_attributes.text_params.__isset.empty_field_as_null) { |
640 | 3.58k | _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null; |
641 | 3.58k | } |
642 | 3.62k | return Status::OK(); |
643 | 3.62k | } |
644 | | |
645 | 8.41k | Status CsvReader::_create_decompressor() { |
646 | 8.41k | if (_file_compress_type != TFileCompressType::UNKNOWN) { |
647 | 8.32k | RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor)); |
648 | 8.32k | } else { |
649 | 84 | RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor)); |
650 | 84 | } |
651 | | |
652 | 8.41k | return Status::OK(); |
653 | 8.41k | } |
654 | | |
655 | 8.39k | Status CsvReader::_create_file_reader(bool need_schema) { |
656 | 8.39k | if (_params.file_type == TFileType::FILE_STREAM) { |
657 | 1.95k | RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state, |
658 | 1.95k | need_schema)); |
659 | 6.44k | } else { |
660 | 6.44k | _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0; |
661 | 6.44k | io::FileReaderOptions reader_options = |
662 | 6.44k | FileFactory::get_reader_options(_state, _file_description); |
663 | 6.44k | io::FileReaderSPtr file_reader; |
664 | 6.45k | if (_io_ctx_holder) { |
665 | 6.45k | file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
666 | 6.45k | _profile, _system_properties, _file_description, reader_options, |
667 | 6.45k | io::DelegateReader::AccessMode::SEQUENTIAL, |
668 | 6.45k | std::static_pointer_cast<const io::IOContext>(_io_ctx_holder), |
669 | 6.45k | io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); |
670 | 18.4E | } else { |
671 | 18.4E | file_reader = DORIS_TRY(io::DelegateReader::create_file_reader( |
672 | 18.4E | _profile, _system_properties, _file_description, reader_options, |
673 | 18.4E | io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx, |
674 | 18.4E | io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size))); |
675 | 18.4E | } |
676 | 6.45k | _file_reader = _io_ctx && _io_ctx->file_reader_stats |
677 | 6.45k | ? std::make_shared<io::TracingFileReader>(std::move(file_reader), |
678 | 6.45k | _io_ctx->file_reader_stats) |
679 | 18.4E | : file_reader; |
680 | 6.44k | } |
681 | 8.39k | if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM && |
682 | 8.39k | _params.file_type != TFileType::FILE_BROKER) { |
683 | 0 | return Status::EndOfFile("init reader failed, empty csv file: " + _range.path); |
684 | 0 | } |
685 | 8.39k | return Status::OK(); |
686 | 8.39k | } |
687 | | |
688 | 3.62k | Status CsvReader::_create_line_reader() { |
689 | 3.62k | std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx; |
690 | 3.62k | if (_enclose == 0) { |
691 | 3.46k | text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>( |
692 | 3.46k | _line_delimiter, _line_delimiter_length, _keep_cr); |
693 | 3.46k | _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>( |
694 | 3.46k | _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1); |
695 | | |
696 | 3.46k | } else { |
697 | | // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0 |
698 | 155 | size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0; |
699 | 155 | _enclose_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>( |
700 | 155 | _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length, |
701 | 155 | col_sep_num, _enclose, _escape, _keep_cr); |
702 | 155 | text_line_reader_ctx = _enclose_reader_ctx; |
703 | | |
704 | 155 | _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>( |
705 | 155 | _trim_tailing_spaces, true, _enclose_reader_ctx, _value_separator_length, _enclose); |
706 | 155 | } |
707 | 3.62k | switch (_file_format_type) { |
708 | 3.54k | case TFileFormatType::FORMAT_CSV_PLAIN: |
709 | 3.54k | [[fallthrough]]; |
710 | 3.54k | case TFileFormatType::FORMAT_CSV_GZ: |
711 | 3.54k | [[fallthrough]]; |
712 | 3.54k | case TFileFormatType::FORMAT_CSV_BZ2: |
713 | 3.54k | [[fallthrough]]; |
714 | 3.54k | case TFileFormatType::FORMAT_CSV_LZ4FRAME: |
715 | 3.54k | [[fallthrough]]; |
716 | 3.54k | case TFileFormatType::FORMAT_CSV_LZ4BLOCK: |
717 | 3.54k | [[fallthrough]]; |
718 | 3.54k | case TFileFormatType::FORMAT_CSV_LZOP: |
719 | 3.54k | [[fallthrough]]; |
720 | 3.54k | case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK: |
721 | 3.54k | [[fallthrough]]; |
722 | 3.54k | case TFileFormatType::FORMAT_CSV_DEFLATE: |
723 | 3.54k | _line_reader = |
724 | 3.54k | NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(), |
725 | 3.54k | text_line_reader_ctx, _size, _start_offset); |
726 | | |
727 | 3.54k | break; |
728 | 77 | case TFileFormatType::FORMAT_PROTO: |
729 | 77 | _fields_splitter = std::make_unique<CsvProtoFieldSplitter>(); |
730 | 77 | _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader); |
731 | 77 | break; |
732 | 0 | default: |
733 | 0 | return Status::InternalError<false>( |
734 | 0 | "Unknown format type, cannot init line reader in csv reader, type={}", |
735 | 0 | _file_format_type); |
736 | 3.62k | } |
737 | 3.62k | return Status::OK(); |
738 | 3.62k | } |
739 | | |
740 | 5.24k | Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) { |
741 | 5.24k | return serde->deserialize_one_cell_from_csv(column, slice, _options); |
742 | 5.24k | } |
743 | | |
744 | | Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns, |
745 | 15.1M | size_t* rows) { |
746 | 15.1M | bool is_success = false; |
747 | | |
748 | 15.1M | RETURN_IF_ERROR(_line_split_to_values(line, &is_success)); |
749 | 15.1M | if (UNLIKELY(!is_success)) { |
750 | | // If not success, which means we met an invalid row, filter this row and return. |
751 | 539 | return Status::OK(); |
752 | 539 | } |
753 | | |
754 | 229M | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
755 | 214M | int col_idx = _col_idxs[i]; |
756 | | // col idx is out of range, fill with null format |
757 | 214M | auto value = col_idx < _split_values.size() |
758 | 215M | ? _split_values[col_idx] |
759 | 18.4E | : Slice(_options.null_format, _options.null_len); |
760 | | |
761 | 214M | IColumn* col_ptr = columns[i].get(); |
762 | 214M | if (!_is_load) { |
763 | 41.1M | col_ptr = columns[_file_slot_idx_map[i]].get(); |
764 | 41.1M | } |
765 | | |
766 | 216M | if (_use_nullable_string_opt[i]) { |
767 | | // For load task, we always read "string" from file. |
768 | | // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe. |
769 | | // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls. |
770 | 216M | RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value)); |
771 | 18.4E | } else { |
772 | 18.4E | RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value)); |
773 | 18.4E | } |
774 | 214M | } |
775 | 15.1M | ++(*rows); |
776 | | |
777 | 15.1M | return Status::OK(); |
778 | 15.1M | } |
779 | | |
780 | | void CsvReader::_reserve_nullable_string_columns(std::vector<MutableColumnPtr>& columns, |
781 | 11.5k | size_t batch_size) { |
782 | 322k | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
783 | 310k | if (!_use_nullable_string_opt[i]) { |
784 | 200k | continue; |
785 | 200k | } |
786 | 110k | IColumn* col_ptr = _is_load ? columns[i].get() : columns[_file_slot_idx_map[i]].get(); |
787 | | // The checked casts here (once per batch) guarantee the column types for the |
788 | | // unchecked per-row casts in _deserialize_nullable_string. |
789 | 110k | auto& null_column = assert_cast<ColumnNullable&>(*col_ptr); |
790 | 110k | auto& string_column = assert_cast<ColumnString&>(null_column.get_nested_column()); |
791 | | // Reserve up front so the per-row loop does not pay for incremental growth. |
792 | | // The string chars are not reserved because their total size is unpredictable. |
793 | 110k | string_column.get_offsets().reserve(string_column.size() + batch_size); |
794 | 110k | null_column.get_null_map_data().reserve(null_column.get_null_map_data().size() + |
795 | 110k | batch_size); |
796 | 110k | } |
797 | 11.5k | } |
798 | | |
799 | 12 | Status CsvReader::_fill_empty_line(std::vector<MutableColumnPtr>& columns, size_t* rows) { |
800 | 48 | for (int i = 0; i < _file_slot_descs.size(); ++i) { |
801 | 36 | IColumn* col_ptr = columns[i].get(); |
802 | 36 | if (!_is_load) { |
803 | 36 | col_ptr = columns[_file_slot_idx_map[i]].get(); |
804 | 36 | } |
805 | 36 | auto& null_column = assert_cast<ColumnNullable&>(*col_ptr); |
806 | 36 | null_column.insert_data(nullptr, 0); |
807 | 36 | } |
808 | 12 | ++(*rows); |
809 | 12 | return Status::OK(); |
810 | 12 | } |
811 | | |
812 | 14.9M | Status CsvReader::_validate_line(const Slice& line, bool* success) { |
813 | 14.9M | if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) { |
814 | 130 | if (!_is_load) { |
815 | 2 | return Status::InternalError<false>("Only support csv data in utf8 codec"); |
816 | 128 | } else { |
817 | 128 | _counter->num_rows_filtered++; |
818 | 128 | *success = false; |
819 | 128 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
820 | 128 | [&]() -> std::string { return std::string(line.data, line.size); }, |
821 | 128 | [&]() -> std::string { |
822 | 128 | return "Invalid file encoding: all CSV files must be UTF-8 encoded"; |
823 | 128 | })); |
824 | 128 | return Status::OK(); |
825 | 128 | } |
826 | 130 | } |
827 | 14.9M | *success = true; |
828 | 14.9M | return Status::OK(); |
829 | 14.9M | } |
830 | | |
831 | 15.1M | Status CsvReader::_line_split_to_values(const Slice& line, bool* success) { |
832 | 15.1M | _split_line(line); |
833 | | |
834 | 15.1M | if (_is_load) { |
835 | | // Only check for load task. For query task, the non exist column will be filled "null". |
836 | | // if actual column number in csv file is not equal to _file_slot_descs.size() |
837 | | // then filter this line. |
838 | 10.9M | bool ignore_col = false; |
839 | 10.9M | ignore_col = _params.__isset.file_attributes && |
840 | 10.9M | _params.file_attributes.__isset.ignore_csv_redundant_col && |
841 | 10.9M | _params.file_attributes.ignore_csv_redundant_col; |
842 | | |
843 | 10.9M | if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) || |
844 | 10.9M | (ignore_col && _split_values.size() < _file_slot_descs.size())) { |
845 | 544 | _counter->num_rows_filtered++; |
846 | 544 | *success = false; |
847 | 544 | RETURN_IF_ERROR(_state->append_error_msg_to_file( |
848 | 544 | [&]() -> std::string { return std::string(line.data, line.size); }, |
849 | 544 | [&]() -> std::string { |
850 | 544 | fmt::memory_buffer error_msg; |
851 | 544 | fmt::format_to(error_msg, |
852 | 544 | "Column count mismatch: expected {}, but found {}", |
853 | 544 | _file_slot_descs.size(), _split_values.size()); |
854 | 544 | std::string escaped_separator = |
855 | 544 | std::regex_replace(_value_separator, std::regex("\t"), "\\t"); |
856 | 544 | std::string escaped_delimiter = |
857 | 544 | std::regex_replace(_line_delimiter, std::regex("\n"), "\\n"); |
858 | 544 | fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator, |
859 | 544 | escaped_delimiter); |
860 | 544 | if (_enclose != 0) { |
861 | 544 | fmt::format_to(error_msg, " encl:{}", _enclose); |
862 | 544 | } |
863 | 544 | if (_escape != 0) { |
864 | 544 | fmt::format_to(error_msg, " esc:{}", _escape); |
865 | 544 | } |
866 | 544 | fmt::format_to(error_msg, ")"); |
867 | 544 | return fmt::to_string(error_msg); |
868 | 544 | })); |
869 | 539 | return Status::OK(); |
870 | 544 | } |
871 | 10.9M | } |
872 | | |
873 | 15.1M | *success = true; |
874 | 15.1M | return Status::OK(); |
875 | 15.1M | } |
876 | | |
877 | 15.1M | void CsvReader::_split_line(const Slice& line) { |
878 | 15.1M | _split_values.clear(); |
879 | 15.1M | _fields_splitter->split_line(line, &_split_values); |
880 | 15.1M | } |
881 | | |
882 | 720 | Status CsvReader::_parse_col_nums(size_t* col_nums) { |
883 | 720 | const uint8_t* ptr = nullptr; |
884 | 720 | size_t size = 0; |
885 | 720 | RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
886 | 720 | if (size == 0) { |
887 | 2 | return Status::InternalError<false>( |
888 | 2 | "The first line is empty, can not parse column numbers"); |
889 | 2 | } |
890 | 718 | if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) { |
891 | 5 | return Status::InternalError<false>("Only support csv data in utf8 codec"); |
892 | 5 | } |
893 | 713 | ptr = _remove_bom(ptr, size); |
894 | 713 | _split_line(Slice(ptr, size)); |
895 | 713 | *col_nums = _split_values.size(); |
896 | 713 | return Status::OK(); |
897 | 718 | } |
898 | | |
899 | 92 | Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) { |
900 | 92 | const uint8_t* ptr = nullptr; |
901 | 92 | size_t size = 0; |
902 | | // no use of _line_reader_eof |
903 | 92 | RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx)); |
904 | 92 | if (size == 0) { |
905 | 0 | return Status::InternalError<false>("The first line is empty, can not parse column names"); |
906 | 0 | } |
907 | 92 | if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) { |
908 | 0 | return Status::InternalError<false>("Only support csv data in utf8 codec"); |
909 | 0 | } |
910 | 92 | ptr = _remove_bom(ptr, size); |
911 | 92 | _split_line(Slice(ptr, size)); |
912 | 345 | for (auto _split_value : _split_values) { |
913 | 345 | col_names->emplace_back(_split_value.to_string()); |
914 | 345 | } |
915 | 92 | return Status::OK(); |
916 | 92 | } |
917 | | |
918 | | // TODO(ftw): parse type |
919 | 29 | Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) { |
920 | | // delete after. |
921 | 155 | for (size_t i = 0; i < col_nums; ++i) { |
922 | 126 | col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>())); |
923 | 126 | } |
924 | | |
925 | | // 1. check _line_reader_eof |
926 | | // 2. read line |
927 | | // 3. check utf8 |
928 | | // 4. check size |
929 | | // 5. check _split_values.size must equal to col_nums. |
930 | | // 6. fill col_types |
931 | 29 | return Status::OK(); |
932 | 29 | } |
933 | | |
934 | 12.7k | const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) { |
935 | 12.7k | if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) { |
936 | 7 | LOG(INFO) << "remove bom"; |
937 | 7 | constexpr size_t bom_size = 3; |
938 | 7 | size -= bom_size; |
939 | | // In enclose mode, column_sep_positions were computed on the original line |
940 | | // (including BOM). After shifting the pointer, we must adjust those positions |
941 | | // so they remain correct relative to the new start. |
942 | 7 | if (_enclose_reader_ctx) { |
943 | 1 | _enclose_reader_ctx->adjust_column_sep_positions(bom_size); |
944 | 1 | } |
945 | 7 | return ptr + bom_size; |
946 | 7 | } |
947 | 12.7k | return ptr; |
948 | 12.7k | } |
949 | | |
950 | 7.59k | Status CsvReader::close() { |
951 | 7.59k | if (_line_reader) { |
952 | 7.59k | _line_reader->close(); |
953 | 7.59k | } |
954 | | |
955 | 7.59k | if (_file_reader) { |
956 | 7.59k | RETURN_IF_ERROR(_file_reader->close()); |
957 | 7.59k | } |
958 | | |
959 | 7.59k | return Status::OK(); |
960 | 7.59k | } |
961 | | |
962 | | } // namespace doris |