Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PlanNodes_types.h> |
21 | | #include <gen_cpp/internal_service.pb.h> |
22 | | |
23 | | #include <cstddef> |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | #include <unordered_map> |
28 | | #include <unordered_set> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/status.h" |
33 | | #include "core/data_type/data_type.h" |
34 | | #include "format/file_reader/new_plain_text_line_reader.h" |
35 | | #include "format/table/table_format_reader.h" |
36 | | #include "io/file_factory.h" |
37 | | #include "io/fs/file_reader_writer_fwd.h" |
38 | | #include "util/decompressor.h" |
39 | | #include "util/slice.h" |
40 | | |
41 | | namespace doris { |
42 | | #include "common/compile_check_begin.h" |
43 | | |
44 | | class SlotDescriptor; |
45 | | class RuntimeProfile; |
46 | | class RuntimeState; |
47 | | |
48 | | namespace io { |
49 | | struct IOContext; |
50 | | } // namespace io |
51 | | |
52 | | struct ScannerCounter; |
53 | | class Block; |
54 | | |
55 | | /// CSV/Text-specific initialization context. |
56 | | struct CsvInitContext final : public ReaderInitContext { |
57 | | bool is_load = false; |
58 | | }; |
59 | | |
60 | | class LineFieldSplitterIf { |
61 | | public: |
62 | 24 | virtual ~LineFieldSplitterIf() = default; |
63 | | |
64 | | virtual void split_line(const Slice& line, std::vector<Slice>* splitted_values) = 0; |
65 | | }; |
66 | | |
67 | | template <typename Splitter> |
68 | | class BaseLineFieldSplitter : public LineFieldSplitterIf { |
69 | | public: |
70 | 0 | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { |
71 | 0 | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); |
72 | 0 | } Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_21CsvProtoFieldSplitterEE10split_lineERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE |
73 | | }; |
74 | | |
75 | | class CsvProtoFieldSplitter final : public BaseLineFieldSplitter<CsvProtoFieldSplitter> { |
76 | | public: |
77 | 0 | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { |
78 | 0 | auto** row_ptr = reinterpret_cast<PDataRow**>(line.data); |
79 | 0 | PDataRow* row = *row_ptr; |
80 | 0 | for (const PDataColumn& col : row->col()) { |
81 | 0 | splitted_values->emplace_back(col.value()); |
82 | 0 | } |
83 | 0 | } |
84 | | }; |
85 | | |
86 | | template <typename Splitter> |
87 | | class BaseCsvTextFieldSplitter : public BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> { |
88 | | // using a function ptr to decrease the overhead (found very effective during test). |
89 | | using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, std::vector<Slice>*); |
90 | | |
91 | | public: |
92 | | explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
93 | | size_t value_sep_len = 1, char trimming_char = 0) |
94 | 24 | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { |
95 | 24 | if (trim_tailing_space) { |
96 | 0 | if (trim_ends) { |
97 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; |
98 | 0 | } else { |
99 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; |
100 | 0 | } |
101 | 24 | } else { |
102 | 24 | if (trim_ends) { |
103 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; |
104 | 24 | } else { |
105 | 24 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; |
106 | 24 | } |
107 | 24 | } |
108 | 24 | } _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEC2Ebbmc Line | Count | Source | 94 | 24 | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { | 95 | 24 | if (trim_tailing_space) { | 96 | 0 | if (trim_ends) { | 97 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; | 98 | 0 | } else { | 99 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; | 100 | 0 | } | 101 | 24 | } else { | 102 | 24 | if (trim_ends) { | 103 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; | 104 | 24 | } else { | 105 | 24 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; | 106 | 24 | } | 107 | 24 | } | 108 | 24 | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEC2Ebbmc Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEC2Ebbmc |
109 | | |
110 | 0 | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { |
111 | 0 | static_cast<Splitter*>(this)->do_split(line, splitted_values); |
112 | 0 | } Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE |
113 | | |
114 | | protected: |
115 | | const char _trimming_char; |
116 | | const size_t _value_sep_len; |
117 | | ProcessValueFunc process_value_func; |
118 | | |
119 | | private: |
120 | | template <bool TrimTailingSpace, bool TrimEnds> |
121 | | inline static void _process_value(const char* data, size_t start_offset, size_t value_len, |
122 | 58 | char trimming_char, std::vector<Slice>* splitted_values) { |
123 | 58 | if constexpr (TrimTailingSpace) { |
124 | 0 | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { |
125 | 0 | --value_len; |
126 | 0 | } |
127 | 0 | } |
128 | 58 | if constexpr (TrimEnds) { |
129 | 0 | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && |
130 | 0 | *(data + start_offset + value_len - 1) == trimming_char; |
131 | 0 | if (trim_cond) { |
132 | 0 | ++(start_offset); |
133 | 0 | value_len -= 2; |
134 | 0 | } |
135 | 0 | } |
136 | 58 | splitted_values->emplace_back(data + start_offset, value_len); |
137 | 58 | } Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 122 | 58 | char trimming_char, std::vector<Slice>* splitted_values) { | 123 | | if constexpr (TrimTailingSpace) { | 124 | | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 125 | | --value_len; | 126 | | } | 127 | | } | 128 | | if constexpr (TrimEnds) { | 129 | | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 130 | | *(data + start_offset + value_len - 1) == trimming_char; | 131 | | if (trim_cond) { | 132 | | ++(start_offset); | 133 | | value_len -= 2; | 134 | | } | 135 | | } | 136 | 58 | splitted_values->emplace_back(data + start_offset, value_len); | 137 | 58 | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE |
138 | | }; |
139 | | |
140 | | class EncloseCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> { |
141 | | public: |
142 | | explicit EncloseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
143 | | std::shared_ptr<EncloseCsvLineReaderCtx> line_reader_ctx, |
144 | | size_t value_sep_len = 1, char trimming_char = 0) |
145 | 0 | : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char), |
146 | 0 | _text_line_reader_ctx(std::move(line_reader_ctx)) {} |
147 | | |
148 | | void do_split(const Slice& line, std::vector<Slice>* splitted_values); |
149 | | |
150 | | private: |
151 | | std::shared_ptr<EncloseCsvLineReaderCtx> _text_line_reader_ctx; |
152 | | }; |
153 | | |
154 | | class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> { |
155 | | public: |
156 | | explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
157 | | std::string value_sep, size_t value_sep_len = 1, |
158 | | char trimming_char = 0) |
159 | 0 | : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char), |
160 | 0 | _value_sep(std::move(value_sep)) { |
161 | 0 | is_single_char_delim = (value_sep_len == 1); |
162 | 0 | } |
163 | | |
164 | | void do_split(const Slice& line, std::vector<Slice>* splitted_values); |
165 | | |
166 | | private: |
167 | | void _split_field_single_char(const Slice& line, std::vector<Slice>* splitted_values); |
168 | | void _split_field_multi_char(const Slice& line, std::vector<Slice>* splitted_values); |
169 | | |
170 | | bool is_single_char_delim; |
171 | | std::string _value_sep; |
172 | | }; |
173 | | |
174 | | class CsvReader : public TableFormatReader { |
175 | | ENABLE_FACTORY_CREATOR(CsvReader); |
176 | | |
177 | | public: |
178 | | CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
179 | | const TFileScanRangeParams& params, const TFileRangeDesc& range, |
180 | | const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx, |
181 | | std::shared_ptr<io::IOContext> io_ctx_holder = nullptr); |
182 | 0 | ~CsvReader() override = default; |
183 | | |
184 | | Status init_reader(bool is_load); |
185 | | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
186 | | Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override; |
187 | | |
188 | | Status init_schema_reader() override; |
189 | | // get schema of csv file from first one line or first two lines. |
190 | | // if file format is FORMAT_CSV_DEFLATE and if |
191 | | // 1. header_type is empty, get schema from first line. |
192 | | // 2. header_type is CSV_WITH_NAMES, get schema from first line. |
193 | | // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line. |
194 | | Status get_parsed_schema(std::vector<std::string>* col_names, |
195 | | std::vector<DataTypePtr>* col_types) override; |
196 | | |
197 | | Status close() override; |
198 | | |
199 | | protected: |
200 | | // ---- Unified init_reader(ReaderInitContext*) overrides ---- |
201 | | Status _open_file_reader(ReaderInitContext* ctx) override; |
202 | | Status _do_init_reader(ReaderInitContext* ctx) override; |
203 | | |
204 | | // init options for type serde |
205 | | virtual Status _init_options(); |
206 | | virtual Status _create_line_reader(); |
207 | | virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice); |
208 | | virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice); |
209 | | // check the utf8 encoding of a line. |
210 | | // return error status to stop processing. |
211 | | // If return Status::OK but "success" is false, which means this is load request |
212 | | // and the line is skipped as unqualified row, and the process should continue. |
213 | | virtual Status _validate_line(const Slice& line, bool* success); |
214 | | |
215 | | RuntimeProfile* _profile = nullptr; |
216 | | const TFileScanRangeParams& _params; |
217 | | std::string _value_separator; |
218 | | size_t _value_separator_length; |
219 | | std::string _line_delimiter; |
220 | | size_t _line_delimiter_length; |
221 | | char _escape = 0; |
222 | | DataTypeSerDeSPtrs _serdes; |
223 | | DataTypeSerDe::FormatOptions _options; |
224 | | std::unique_ptr<LineFieldSplitterIf> _fields_splitter; |
225 | | int64_t _start_offset; |
226 | | int64_t _size; |
227 | | io::FileReaderSPtr _file_reader; |
228 | | std::unique_ptr<LineReader> _line_reader; |
229 | | std::unique_ptr<Decompressor> _decompressor; |
230 | | |
231 | | private: |
232 | | Status _create_decompressor(); |
233 | | Status _create_file_reader(bool need_schema); |
234 | | Status _fill_dest_columns(const Slice& line, Block* block, |
235 | | std::vector<MutableColumnPtr>& columns, size_t* rows); |
236 | | Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows); |
237 | | Status _line_split_to_values(const Slice& line, bool* success); |
238 | | void _split_line(const Slice& line); |
239 | | void _init_system_properties(); |
240 | | void _init_file_description(); |
241 | | |
242 | | Status _parse_col_nums(size_t* col_nums); |
243 | | Status _parse_col_names(std::vector<std::string>* col_names); |
244 | | // TODO(ftw): parse type |
245 | | Status _parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types); |
246 | | |
247 | | // If the CSV file is an UTF8 encoding with BOM, |
248 | | // then remove the first 3 bytes at the beginning of this file |
249 | | // and set size = size - 3. |
250 | | const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size); |
251 | | |
252 | | RuntimeState* _state = nullptr; |
253 | | ScannerCounter* _counter = nullptr; |
254 | | const TFileRangeDesc& _range; |
255 | | io::FileSystemProperties _system_properties; |
256 | | io::FileDescription _file_description; |
257 | | const std::vector<SlotDescriptor*>& _file_slot_descs; |
258 | | // Only for query task, save the file slot to columns in block map. |
259 | | // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 |
260 | | // and this 3 columns in block are k2, k3, k1, |
261 | | // the _file_slot_idx_map will save: 2, 0, 1 |
262 | | std::vector<int> _file_slot_idx_map; |
263 | | // Only for query task, save the columns' index which need to be read. |
264 | | // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 |
265 | | // and the corresponding position in file is 0, 3, 5. |
266 | | // So the _col_idx will be: <0, 3, 5> |
267 | | std::vector<int> _col_idxs; |
268 | | // True if this is a load task |
269 | | bool _is_load = false; |
270 | | bool _line_reader_eof; |
271 | | // For schema reader |
272 | | size_t _read_line = 0; |
273 | | bool _is_parse_name = false; |
274 | | TFileFormatType::type _file_format_type; |
275 | | bool _is_proto_format; |
276 | | TFileCompressType::type _file_compress_type; |
277 | | |
278 | | // When we fetch range start from 0, header_type="csv_with_names" skip first line |
279 | | // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line |
280 | | // When we fetch range doesn't start from 0 will always skip the first line |
281 | | int _skip_lines; |
282 | | char _enclose = 0; |
283 | | bool _trim_double_quotes = false; |
284 | | bool _trim_tailing_spaces = false; |
285 | | bool _keep_cr = false; |
286 | | bool _empty_field_as_null = false; |
287 | | |
288 | | io::IOContext* _io_ctx = nullptr; |
289 | | std::shared_ptr<io::IOContext> _io_ctx_holder; |
290 | | // Stored to adjust column_sep_positions when BOM is removed in enclose mode |
291 | | std::shared_ptr<EncloseCsvLineReaderCtx> _enclose_reader_ctx; |
292 | | // save source text which have been splitted. |
293 | | std::vector<Slice> _split_values; |
294 | | std::vector<int> _use_nullable_string_opt; |
295 | | }; |
296 | | #include "common/compile_check_end.h" |
297 | | } // namespace doris |