be/src/format/csv/csv_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PlanNodes_types.h> |
21 | | #include <gen_cpp/internal_service.pb.h> |
22 | | |
23 | | #include <cstddef> |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | #include <unordered_map> |
28 | | #include <unordered_set> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/status.h" |
33 | | #include "core/data_type/data_type.h" |
34 | | #include "format/file_reader/new_plain_text_line_reader.h" |
35 | | #include "format/generic_reader.h" |
36 | | #include "io/file_factory.h" |
37 | | #include "io/fs/file_reader_writer_fwd.h" |
38 | | #include "util/decompressor.h" |
39 | | #include "util/slice.h" |
40 | | |
41 | | namespace doris { |
42 | | |
43 | | class SlotDescriptor; |
44 | | class RuntimeProfile; |
45 | | class RuntimeState; |
46 | | |
47 | | namespace io { |
48 | | struct IOContext; |
49 | | } // namespace io |
50 | | |
51 | | struct ScannerCounter; |
52 | | class Block; |
53 | | |
54 | | class LineFieldSplitterIf { |
55 | | public: |
56 | 24 | virtual ~LineFieldSplitterIf() = default; |
57 | | |
58 | | virtual void split_line(const Slice& line, std::vector<Slice>* splitted_values) = 0; |
59 | | }; |
60 | | |
61 | | template <typename Splitter> |
62 | | class BaseLineFieldSplitter : public LineFieldSplitterIf { |
63 | | public: |
64 | 0 | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { |
65 | 0 | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); |
66 | 0 | } Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_21CsvProtoFieldSplitterEE10split_lineERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE |
67 | | }; |
68 | | |
69 | | class CsvProtoFieldSplitter final : public BaseLineFieldSplitter<CsvProtoFieldSplitter> { |
70 | | public: |
71 | 0 | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { |
72 | 0 | auto** row_ptr = reinterpret_cast<PDataRow**>(line.data); |
73 | 0 | PDataRow* row = *row_ptr; |
74 | 0 | for (const PDataColumn& col : row->col()) { |
75 | 0 | splitted_values->emplace_back(col.value()); |
76 | 0 | } |
77 | 0 | } |
78 | | }; |
79 | | |
80 | | template <typename Splitter> |
81 | | class BaseCsvTextFieldSplitter : public BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> { |
82 | | // using a function ptr to decrease the overhead (found very effective during test). |
83 | | using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, std::vector<Slice>*); |
84 | | |
85 | | public: |
86 | | explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
87 | | size_t value_sep_len = 1, char trimming_char = 0) |
88 | 24 | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { |
89 | 24 | if (trim_tailing_space) { |
90 | 0 | if (trim_ends) { |
91 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; |
92 | 0 | } else { |
93 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; |
94 | 0 | } |
95 | 24 | } else { |
96 | 24 | if (trim_ends) { |
97 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; |
98 | 24 | } else { |
99 | 24 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; |
100 | 24 | } |
101 | 24 | } |
102 | 24 | } _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEC2Ebbmc Line | Count | Source | 88 | 24 | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { | 89 | 24 | if (trim_tailing_space) { | 90 | 0 | if (trim_ends) { | 91 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; | 92 | 0 | } else { | 93 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; | 94 | 0 | } | 95 | 24 | } else { | 96 | 24 | if (trim_ends) { | 97 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; | 98 | 24 | } else { | 99 | 24 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; | 100 | 24 | } | 101 | 24 | } | 102 | 24 | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEC2Ebbmc Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEC2Ebbmc |
103 | | |
104 | 0 | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { |
105 | 0 | static_cast<Splitter*>(this)->do_split(line, splitted_values); |
106 | 0 | } Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE |
107 | | |
108 | | protected: |
109 | | const char _trimming_char; |
110 | | const size_t _value_sep_len; |
111 | | ProcessValueFunc process_value_func; |
112 | | |
113 | | private: |
114 | | template <bool TrimTailingSpace, bool TrimEnds> |
115 | | inline static void _process_value(const char* data, size_t start_offset, size_t value_len, |
116 | 58 | char trimming_char, std::vector<Slice>* splitted_values) { |
117 | 58 | if constexpr (TrimTailingSpace) { |
118 | 0 | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { |
119 | 0 | --value_len; |
120 | 0 | } |
121 | 0 | } |
122 | 58 | if constexpr (TrimEnds) { |
123 | 0 | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && |
124 | 0 | *(data + start_offset + value_len - 1) == trimming_char; |
125 | 0 | if (trim_cond) { |
126 | 0 | ++(start_offset); |
127 | 0 | value_len -= 2; |
128 | 0 | } |
129 | 0 | } |
130 | 58 | splitted_values->emplace_back(data + start_offset, value_len); |
131 | 58 | } Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 116 | 58 | char trimming_char, std::vector<Slice>* splitted_values) { | 117 | | if constexpr (TrimTailingSpace) { | 118 | | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 119 | | --value_len; | 120 | | } | 121 | | } | 122 | | if constexpr (TrimEnds) { | 123 | | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 124 | | *(data + start_offset + value_len - 1) == trimming_char; | 125 | | if (trim_cond) { | 126 | | ++(start_offset); | 127 | | value_len -= 2; | 128 | | } | 129 | | } | 130 | 58 | splitted_values->emplace_back(data + start_offset, value_len); | 131 | 58 | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE |
132 | | }; |
133 | | |
134 | | class EncloseCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> { |
135 | | public: |
136 | | explicit EncloseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
137 | | std::shared_ptr<EncloseCsvLineReaderCtx> line_reader_ctx, |
138 | | size_t value_sep_len = 1, char trimming_char = 0) |
139 | 0 | : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char), |
140 | 0 | _text_line_reader_ctx(std::move(line_reader_ctx)) {} |
141 | | |
142 | | void do_split(const Slice& line, std::vector<Slice>* splitted_values); |
143 | | |
144 | | private: |
145 | | std::shared_ptr<EncloseCsvLineReaderCtx> _text_line_reader_ctx; |
146 | | }; |
147 | | |
148 | | class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> { |
149 | | public: |
150 | | explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
151 | | std::string value_sep, size_t value_sep_len = 1, |
152 | | char trimming_char = 0) |
153 | 0 | : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char), |
154 | 0 | _value_sep(std::move(value_sep)) { |
155 | 0 | is_single_char_delim = (value_sep_len == 1); |
156 | 0 | } |
157 | | |
158 | | void do_split(const Slice& line, std::vector<Slice>* splitted_values); |
159 | | |
160 | | private: |
161 | | void _split_field_single_char(const Slice& line, std::vector<Slice>* splitted_values); |
162 | | void _split_field_multi_char(const Slice& line, std::vector<Slice>* splitted_values); |
163 | | |
164 | | bool is_single_char_delim; |
165 | | std::string _value_sep; |
166 | | }; |
167 | | |
168 | | class CsvReader : public GenericReader { |
169 | | ENABLE_FACTORY_CREATOR(CsvReader); |
170 | | |
171 | | public: |
172 | | CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
173 | | const TFileScanRangeParams& params, const TFileRangeDesc& range, |
174 | | const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx, |
175 | | std::shared_ptr<io::IOContext> io_ctx_holder = nullptr); |
176 | 0 | ~CsvReader() override = default; |
177 | | |
178 | | Status init_reader(bool is_load); |
179 | | Status get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
180 | | Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type, |
181 | | std::unordered_set<std::string>* missing_cols) override; |
182 | | |
183 | | Status init_schema_reader() override; |
184 | | // get schema of csv file from first one line or first two lines. |
185 | | // if file format is FORMAT_CSV_DEFLATE and if |
186 | | // 1. header_type is empty, get schema from first line. |
187 | | // 2. header_type is CSV_WITH_NAMES, get schema from first line. |
188 | | // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line. |
189 | | Status get_parsed_schema(std::vector<std::string>* col_names, |
190 | | std::vector<DataTypePtr>* col_types) override; |
191 | | |
192 | | Status close() override; |
193 | | |
194 | | protected: |
195 | | // init options for type serde |
196 | | virtual Status _init_options(); |
197 | | virtual Status _create_line_reader(); |
198 | | virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice); |
199 | | virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice); |
200 | | // check the utf8 encoding of a line. |
201 | | // return error status to stop processing. |
202 | | // If return Status::OK but "success" is false, which means this is load request |
203 | | // and the line is skipped as unqualified row, and the process should continue. |
204 | | virtual Status _validate_line(const Slice& line, bool* success); |
205 | | |
206 | | RuntimeProfile* _profile = nullptr; |
207 | | const TFileScanRangeParams& _params; |
208 | | std::string _value_separator; |
209 | | size_t _value_separator_length; |
210 | | std::string _line_delimiter; |
211 | | size_t _line_delimiter_length; |
212 | | char _escape = 0; |
213 | | DataTypeSerDeSPtrs _serdes; |
214 | | DataTypeSerDe::FormatOptions _options; |
215 | | std::unique_ptr<LineFieldSplitterIf> _fields_splitter; |
216 | | int64_t _start_offset; |
217 | | int64_t _size; |
218 | | io::FileReaderSPtr _file_reader; |
219 | | std::unique_ptr<LineReader> _line_reader; |
220 | | std::unique_ptr<Decompressor> _decompressor; |
221 | | |
222 | | private: |
223 | | Status _create_decompressor(); |
224 | | Status _create_file_reader(bool need_schema); |
225 | | Status _fill_dest_columns(const Slice& line, Block* block, |
226 | | std::vector<MutableColumnPtr>& columns, size_t* rows); |
227 | | Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows); |
228 | | Status _line_split_to_values(const Slice& line, bool* success); |
229 | | void _split_line(const Slice& line); |
230 | | void _init_system_properties(); |
231 | | void _init_file_description(); |
232 | | |
233 | | Status _parse_col_nums(size_t* col_nums); |
234 | | Status _parse_col_names(std::vector<std::string>* col_names); |
235 | | // TODO(ftw): parse type |
236 | | Status _parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types); |
237 | | |
238 | | // If the CSV file is an UTF8 encoding with BOM, |
239 | | // then remove the first 3 bytes at the beginning of this file |
240 | | // and set size = size - 3. |
241 | | const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size); |
242 | | |
243 | | RuntimeState* _state = nullptr; |
244 | | ScannerCounter* _counter = nullptr; |
245 | | const TFileRangeDesc& _range; |
246 | | io::FileSystemProperties _system_properties; |
247 | | io::FileDescription _file_description; |
248 | | const std::vector<SlotDescriptor*>& _file_slot_descs; |
249 | | // Only for query task, save the file slot to columns in block map. |
250 | | // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 |
251 | | // and this 3 columns in block are k2, k3, k1, |
252 | | // the _file_slot_idx_map will save: 2, 0, 1 |
253 | | std::vector<int> _file_slot_idx_map; |
254 | | // Only for query task, save the columns' index which need to be read. |
255 | | // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 |
256 | | // and the corresponding position in file is 0, 3, 5. |
257 | | // So the _col_idx will be: <0, 3, 5> |
258 | | std::vector<int> _col_idxs; |
259 | | // True if this is a load task |
260 | | bool _is_load = false; |
261 | | bool _line_reader_eof; |
262 | | // For schema reader |
263 | | size_t _read_line = 0; |
264 | | bool _is_parse_name = false; |
265 | | TFileFormatType::type _file_format_type; |
266 | | bool _is_proto_format; |
267 | | TFileCompressType::type _file_compress_type; |
268 | | |
269 | | // When we fetch range start from 0, header_type="csv_with_names" skip first line |
270 | | // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line |
271 | | // When we fetch range doesn't start from 0 will always skip the first line |
272 | | int _skip_lines; |
273 | | char _enclose = 0; |
274 | | bool _trim_double_quotes = false; |
275 | | bool _trim_tailing_spaces = false; |
276 | | bool _keep_cr = false; |
277 | | bool _empty_field_as_null = false; |
278 | | |
279 | | io::IOContext* _io_ctx = nullptr; |
280 | | std::shared_ptr<io::IOContext> _io_ctx_holder; |
281 | | // Stored to adjust column_sep_positions when BOM is removed in enclose mode |
282 | | std::shared_ptr<EncloseCsvLineReaderCtx> _enclose_reader_ctx; |
283 | | // save source text which have been splitted. |
284 | | std::vector<Slice> _split_values; |
285 | | std::vector<int> _use_nullable_string_opt; |
286 | | }; |
287 | | } // namespace doris |