be/src/format/csv/csv_reader.h
Line | Count | Source |
1 | | // Licensed to the Apache Software Foundation (ASF) under one |
2 | | // or more contributor license agreements. See the NOTICE file |
3 | | // distributed with this work for additional information |
4 | | // regarding copyright ownership. The ASF licenses this file |
5 | | // to you under the Apache License, Version 2.0 (the |
6 | | // "License"); you may not use this file except in compliance |
7 | | // with the License. You may obtain a copy of the License at |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, |
12 | | // software distributed under the License is distributed on an |
13 | | // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
14 | | // KIND, either express or implied. See the License for the |
15 | | // specific language governing permissions and limitations |
16 | | // under the License. |
17 | | |
18 | | #pragma once |
19 | | |
20 | | #include <gen_cpp/PlanNodes_types.h> |
21 | | #include <gen_cpp/internal_service.pb.h> |
22 | | |
23 | | #include <cstddef> |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <string> |
27 | | #include <unordered_map> |
28 | | #include <unordered_set> |
29 | | #include <utility> |
30 | | #include <vector> |
31 | | |
32 | | #include "common/status.h" |
33 | | #include "core/data_type/data_type.h" |
34 | | #include "format/file_reader/new_plain_text_line_reader.h" |
35 | | #include "format/table/table_format_reader.h" |
36 | | #include "io/file_factory.h" |
37 | | #include "io/fs/file_reader_writer_fwd.h" |
38 | | #include "util/decompressor.h" |
39 | | #include "util/slice.h" |
40 | | |
41 | | namespace doris { |
42 | | |
43 | | class SlotDescriptor; |
44 | | class RuntimeProfile; |
45 | | class RuntimeState; |
46 | | |
47 | | namespace io { |
48 | | struct IOContext; |
49 | | } // namespace io |
50 | | |
51 | | struct ScannerCounter; |
52 | | class Block; |
53 | | |
54 | | /// CSV/Text-specific initialization context. |
55 | | struct CsvInitContext final : public ReaderInitContext { |
56 | | bool is_load = false; |
57 | | }; |
58 | | |
59 | | class LineFieldSplitterIf { |
60 | | public: |
61 | 8.28k | virtual ~LineFieldSplitterIf() = default; |
62 | | |
63 | | virtual void split_line(const Slice& line, std::vector<Slice>* splitted_values) = 0; |
64 | | }; |
65 | | |
66 | | template <typename Splitter> |
67 | | class BaseLineFieldSplitter : public LineFieldSplitterIf { |
68 | | public: |
69 | 14.9M | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { |
70 | 14.9M | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); |
71 | 14.9M | } _ZN5doris21BaseLineFieldSplitterINS_21CsvProtoFieldSplitterEE10split_lineERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Line | Count | Source | 69 | 148 | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { | 70 | 148 | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); | 71 | 148 | } |
_ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Line | Count | Source | 69 | 878 | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { | 70 | 878 | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); | 71 | 878 | } |
_ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Line | Count | Source | 69 | 14.6M | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { | 70 | 14.6M | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); | 71 | 14.6M | } |
_ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE Line | Count | Source | 69 | 252k | inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final { | 70 | 252k | static_cast<Splitter*>(this)->split_line_impl(line, splitted_values); | 71 | 252k | } |
|
72 | | }; |
73 | | |
74 | | class CsvProtoFieldSplitter final : public BaseLineFieldSplitter<CsvProtoFieldSplitter> { |
75 | | public: |
76 | 148 | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { |
77 | 148 | auto** row_ptr = reinterpret_cast<PDataRow**>(line.data); |
78 | 148 | PDataRow* row = *row_ptr; |
79 | 732 | for (const PDataColumn& col : row->col()) { |
80 | 732 | splitted_values->emplace_back(col.value()); |
81 | 732 | } |
82 | 148 | } |
83 | | }; |
84 | | |
85 | | template <typename Splitter> |
86 | | class BaseCsvTextFieldSplitter : public BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> { |
87 | | // using a function ptr to decrease the overhead (found very effective during test). |
88 | | using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, std::vector<Slice>*); |
89 | | |
90 | | public: |
91 | | explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
92 | | size_t value_sep_len = 1, char trimming_char = 0) |
93 | 8.20k | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { |
94 | 8.20k | if (trim_tailing_space) { |
95 | 50 | if (trim_ends) { |
96 | 10 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; |
97 | 40 | } else { |
98 | 40 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; |
99 | 40 | } |
100 | 8.15k | } else { |
101 | 8.15k | if (trim_ends) { |
102 | 145 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; |
103 | 8.00k | } else { |
104 | 8.00k | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; |
105 | 8.00k | } |
106 | 8.15k | } |
107 | 8.20k | } _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEC2Ebbmc Line | Count | Source | 93 | 4.64k | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { | 94 | 4.64k | if (trim_tailing_space) { | 95 | 0 | if (trim_ends) { | 96 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; | 97 | 0 | } else { | 98 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; | 99 | 0 | } | 100 | 4.64k | } else { | 101 | 4.64k | if (trim_ends) { | 102 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; | 103 | 4.64k | } else { | 104 | 4.64k | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; | 105 | 4.64k | } | 106 | 4.64k | } | 107 | 4.64k | } |
_ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEC2Ebbmc Line | Count | Source | 93 | 3.40k | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { | 94 | 3.40k | if (trim_tailing_space) { | 95 | 40 | if (trim_ends) { | 96 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; | 97 | 40 | } else { | 98 | 40 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; | 99 | 40 | } | 100 | 3.36k | } else { | 101 | 3.36k | if (trim_ends) { | 102 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; | 103 | 3.36k | } else { | 104 | 3.36k | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; | 105 | 3.36k | } | 106 | 3.36k | } | 107 | 3.40k | } |
_ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEC2Ebbmc Line | Count | Source | 93 | 155 | : _trimming_char(trimming_char), _value_sep_len(value_sep_len) { | 94 | 155 | if (trim_tailing_space) { | 95 | 10 | if (trim_ends) { | 96 | 10 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>; | 97 | 10 | } else { | 98 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>; | 99 | 0 | } | 100 | 145 | } else { | 101 | 145 | if (trim_ends) { | 102 | 145 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>; | 103 | 145 | } else { | 104 | 0 | process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>; | 105 | 0 | } | 106 | 145 | } | 107 | 155 | } |
|
108 | | |
109 | 14.9M | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { |
110 | 14.9M | static_cast<Splitter*>(this)->do_split(line, splitted_values); |
111 | 14.9M | } _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Line | Count | Source | 109 | 878 | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { | 110 | 878 | static_cast<Splitter*>(this)->do_split(line, splitted_values); | 111 | 878 | } |
_ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Line | Count | Source | 109 | 14.7M | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { | 110 | 14.7M | static_cast<Splitter*>(this)->do_split(line, splitted_values); | 111 | 14.7M | } |
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE Line | Count | Source | 109 | 252k | inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) { | 110 | 252k | static_cast<Splitter*>(this)->do_split(line, splitted_values); | 111 | 252k | } |
|
112 | | |
113 | | protected: |
114 | | const char _trimming_char; |
115 | | const size_t _value_sep_len; |
116 | | ProcessValueFunc process_value_func; |
117 | | |
118 | | private: |
119 | | template <bool TrimTailingSpace, bool TrimEnds> |
120 | | inline static void _process_value(const char* data, size_t start_offset, size_t value_len, |
121 | 468M | char trimming_char, std::vector<Slice>* splitted_values) { |
122 | 468M | if constexpr (TrimTailingSpace) { |
123 | 520 | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { |
124 | 0 | --value_len; |
125 | 0 | } |
126 | 520 | } |
127 | 468M | if constexpr (TrimEnds) { |
128 | 3.85k | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && |
129 | 3.85k | *(data + start_offset + value_len - 1) == trimming_char; |
130 | 3.85k | if (trim_cond) { |
131 | 568 | ++(start_offset); |
132 | 568 | value_len -= 2; |
133 | 568 | } |
134 | 3.85k | } |
135 | 468M | splitted_values->emplace_back(data + start_offset, value_len); |
136 | 468M | } _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 121 | 100 | char trimming_char, std::vector<Slice>* splitted_values) { | 122 | 100 | if constexpr (TrimTailingSpace) { | 123 | 100 | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 124 | 0 | --value_len; | 125 | 0 | } | 126 | 100 | } | 127 | 100 | if constexpr (TrimEnds) { | 128 | 100 | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 129 | 100 | *(data + start_offset + value_len - 1) == trimming_char; | 130 | 100 | if (trim_cond) { | 131 | 80 | ++(start_offset); | 132 | 80 | value_len -= 2; | 133 | 80 | } | 134 | 100 | } | 135 | 100 | splitted_values->emplace_back(data + start_offset, value_len); | 136 | 100 | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 121 | 3.75k | char trimming_char, std::vector<Slice>* splitted_values) { | 122 | | if constexpr (TrimTailingSpace) { | 123 | | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 124 | | --value_len; | 125 | | } | 126 | | } | 127 | 3.75k | if constexpr (TrimEnds) { | 128 | 3.75k | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 129 | 3.75k | *(data + start_offset + value_len - 1) == trimming_char; | 130 | 3.75k | if (trim_cond) { | 131 | 488 | ++(start_offset); | 132 | 488 | value_len -= 2; | 133 | 488 | } | 134 | 3.75k | } | 135 | 3.75k | splitted_values->emplace_back(data + start_offset, value_len); | 136 | 3.75k | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 121 | 420 | char trimming_char, std::vector<Slice>* splitted_values) { | 122 | 420 | if constexpr (TrimTailingSpace) { | 123 | 420 | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 124 | 0 | --value_len; | 125 | 0 | } | 126 | 420 | } | 127 | | if constexpr (TrimEnds) { | 128 | | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 129 | | *(data + start_offset + value_len - 1) == trimming_char; | 130 | | if (trim_cond) { | 131 | | ++(start_offset); | 132 | | value_len -= 2; | 133 | | } | 134 | | } | 135 | 420 | splitted_values->emplace_back(data + start_offset, value_len); | 136 | 420 | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 121 | 458M | char trimming_char, std::vector<Slice>* splitted_values) { | 122 | | if constexpr (TrimTailingSpace) { | 123 | | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 124 | | --value_len; | 125 | | } | 126 | | } | 127 | | if constexpr (TrimEnds) { | 128 | | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 129 | | *(data + start_offset + value_len - 1) == trimming_char; | 130 | | if (trim_cond) { | 131 | | ++(start_offset); | 132 | | value_len -= 2; | 133 | | } | 134 | | } | 135 | 458M | splitted_values->emplace_back(data + start_offset, value_len); | 136 | 458M | } |
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE Line | Count | Source | 121 | 9.61M | char trimming_char, std::vector<Slice>* splitted_values) { | 122 | | if constexpr (TrimTailingSpace) { | 123 | | while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') { | 124 | | --value_len; | 125 | | } | 126 | | } | 127 | | if constexpr (TrimEnds) { | 128 | | const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char && | 129 | | *(data + start_offset + value_len - 1) == trimming_char; | 130 | | if (trim_cond) { | 131 | | ++(start_offset); | 132 | | value_len -= 2; | 133 | | } | 134 | | } | 135 | 9.61M | splitted_values->emplace_back(data + start_offset, value_len); | 136 | 9.61M | } |
|
137 | | }; |
138 | | |
139 | | class EncloseCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> { |
140 | | public: |
141 | | explicit EncloseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
142 | | std::shared_ptr<EncloseCsvLineReaderCtx> line_reader_ctx, |
143 | | size_t value_sep_len = 1, char trimming_char = 0) |
144 | 155 | : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char), |
145 | 155 | _text_line_reader_ctx(std::move(line_reader_ctx)) {} |
146 | | |
147 | | void do_split(const Slice& line, std::vector<Slice>* splitted_values); |
148 | | |
149 | | private: |
150 | | std::shared_ptr<EncloseCsvLineReaderCtx> _text_line_reader_ctx; |
151 | | }; |
152 | | |
153 | | class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> { |
154 | | public: |
155 | | explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends, |
156 | | std::string value_sep, size_t value_sep_len = 1, |
157 | | char trimming_char = 0) |
158 | 3.40k | : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char), |
159 | 3.40k | _value_sep(std::move(value_sep)) { |
160 | 3.40k | is_single_char_delim = (value_sep_len == 1); |
161 | 3.40k | } |
162 | | |
163 | | void do_split(const Slice& line, std::vector<Slice>* splitted_values); |
164 | | |
165 | | private: |
166 | | void _split_field_single_char(const Slice& line, std::vector<Slice>* splitted_values); |
167 | | void _split_field_multi_char(const Slice& line, std::vector<Slice>* splitted_values); |
168 | | |
169 | | bool is_single_char_delim; |
170 | | std::string _value_sep; |
171 | | }; |
172 | | |
173 | | class CsvReader : public TableFormatReader { |
174 | | ENABLE_FACTORY_CREATOR(CsvReader); |
175 | | |
176 | | public: |
177 | | CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter, |
178 | | const TFileScanRangeParams& params, const TFileRangeDesc& range, |
179 | | const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx, |
180 | | std::shared_ptr<io::IOContext> io_ctx_holder = nullptr); |
181 | 8.18k | ~CsvReader() override = default; |
182 | | |
183 | | Status init_reader(bool is_load); |
184 | | Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override; |
185 | | Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override; |
186 | | |
187 | | Status init_schema_reader() override; |
188 | | // get schema of csv file from first one line or first two lines. |
189 | | // if file format is FORMAT_CSV_DEFLATE and if |
190 | | // 1. header_type is empty, get schema from first line. |
191 | | // 2. header_type is CSV_WITH_NAMES, get schema from first line. |
192 | | // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line. |
193 | | Status get_parsed_schema(std::vector<std::string>* col_names, |
194 | | std::vector<DataTypePtr>* col_types) override; |
195 | | |
196 | | Status close() override; |
197 | | |
198 | | protected: |
199 | | // ---- Unified init_reader(ReaderInitContext*) overrides ---- |
200 | | Status _open_file_reader(ReaderInitContext* ctx) override; |
201 | | Status _do_init_reader(ReaderInitContext* ctx) override; |
202 | | |
203 | | // init options for type serde |
204 | | virtual Status _init_options(); |
205 | | virtual Status _create_line_reader(); |
206 | | virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice); |
207 | | virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice); |
208 | | // check the utf8 encoding of a line. |
209 | | // return error status to stop processing. |
210 | | // If return Status::OK but "success" is false, which means this is load request |
211 | | // and the line is skipped as unqualified row, and the process should continue. |
212 | | virtual Status _validate_line(const Slice& line, bool* success); |
213 | | |
214 | | RuntimeProfile* _profile = nullptr; |
215 | | const TFileScanRangeParams& _params; |
216 | | std::string _value_separator; |
217 | | size_t _value_separator_length; |
218 | | std::string _line_delimiter; |
219 | | size_t _line_delimiter_length; |
220 | | char _escape = 0; |
221 | | DataTypeSerDeSPtrs _serdes; |
222 | | DataTypeSerDe::FormatOptions _options; |
223 | | std::unique_ptr<LineFieldSplitterIf> _fields_splitter; |
224 | | int64_t _start_offset; |
225 | | int64_t _size; |
226 | | io::FileReaderSPtr _file_reader; |
227 | | std::unique_ptr<LineReader> _line_reader; |
228 | | std::unique_ptr<Decompressor> _decompressor; |
229 | | |
230 | | private: |
231 | | Status _create_decompressor(); |
232 | | Status _create_file_reader(bool need_schema); |
233 | | Status _fill_dest_columns(const Slice& line, Block* block, |
234 | | std::vector<MutableColumnPtr>& columns, size_t* rows); |
235 | | Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows); |
236 | | Status _line_split_to_values(const Slice& line, bool* success); |
237 | | void _split_line(const Slice& line); |
238 | | void _init_system_properties(); |
239 | | void _init_file_description(); |
240 | | |
241 | | Status _parse_col_nums(size_t* col_nums); |
242 | | Status _parse_col_names(std::vector<std::string>* col_names); |
243 | | // TODO(ftw): parse type |
244 | | Status _parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types); |
245 | | |
246 | | // If the CSV file is an UTF8 encoding with BOM, |
247 | | // then remove the first 3 bytes at the beginning of this file |
248 | | // and set size = size - 3. |
249 | | const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size); |
250 | | |
251 | | RuntimeState* _state = nullptr; |
252 | | ScannerCounter* _counter = nullptr; |
253 | | const TFileRangeDesc& _range; |
254 | | io::FileSystemProperties _system_properties; |
255 | | io::FileDescription _file_description; |
256 | | const std::vector<SlotDescriptor*>& _file_slot_descs; |
257 | | // Only for query task, save the file slot to columns in block map. |
258 | | // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 |
259 | | // and this 3 columns in block are k2, k3, k1, |
260 | | // the _file_slot_idx_map will save: 2, 0, 1 |
261 | | std::vector<int> _file_slot_idx_map; |
262 | | // Only for query task, save the columns' index which need to be read. |
263 | | // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3 |
264 | | // and the corresponding position in file is 0, 3, 5. |
265 | | // So the _col_idx will be: <0, 3, 5> |
266 | | std::vector<int> _col_idxs; |
267 | | // True if this is a load task |
268 | | bool _is_load = false; |
269 | | bool _line_reader_eof; |
270 | | // For schema reader |
271 | | size_t _read_line = 0; |
272 | | bool _is_parse_name = false; |
273 | | TFileFormatType::type _file_format_type; |
274 | | bool _is_proto_format; |
275 | | TFileCompressType::type _file_compress_type; |
276 | | |
277 | | // When we fetch range start from 0, header_type="csv_with_names" skip first line |
278 | | // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line |
279 | | // When we fetch range doesn't start from 0 will always skip the first line |
280 | | int _skip_lines; |
281 | | char _enclose = 0; |
282 | | bool _trim_double_quotes = false; |
283 | | bool _trim_tailing_spaces = false; |
284 | | bool _keep_cr = false; |
285 | | bool _empty_field_as_null = false; |
286 | | |
287 | | io::IOContext* _io_ctx = nullptr; |
288 | | std::shared_ptr<io::IOContext> _io_ctx_holder; |
289 | | // Stored to adjust column_sep_positions when BOM is removed in enclose mode |
290 | | std::shared_ptr<EncloseCsvLineReaderCtx> _enclose_reader_ctx; |
291 | | // save source text which have been splitted. |
292 | | std::vector<Slice> _split_values; |
293 | | std::vector<int> _use_nullable_string_opt; |
294 | | }; |
295 | | } // namespace doris |