Coverage Report

Created: 2026-03-15 18:33

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PlanNodes_types.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include <cstddef>
24
#include <cstdint>
25
#include <memory>
26
#include <string>
27
#include <unordered_map>
28
#include <unordered_set>
29
#include <utility>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "core/data_type/data_type.h"
34
#include "format/file_reader/new_plain_text_line_reader.h"
35
#include "format/generic_reader.h"
36
#include "io/file_factory.h"
37
#include "io/fs/file_reader_writer_fwd.h"
38
#include "util/decompressor.h"
39
#include "util/slice.h"
40
41
namespace doris {
42
#include "common/compile_check_begin.h"
43
44
class SlotDescriptor;
45
class RuntimeProfile;
46
class RuntimeState;
47
48
namespace io {
49
struct IOContext;
50
} // namespace io
51
52
struct ScannerCounter;
53
class Block;
54
55
class LineFieldSplitterIf {
56
public:
57
2.50k
    virtual ~LineFieldSplitterIf() = default;
58
59
    virtual void split_line(const Slice& line, std::vector<Slice>* splitted_values) = 0;
60
};
61
62
template <typename Splitter>
63
class BaseLineFieldSplitter : public LineFieldSplitterIf {
64
public:
65
12.2M
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
66
12.2M
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
67
12.2M
    }
_ZN5doris21BaseLineFieldSplitterINS_21CsvProtoFieldSplitterEE10split_lineERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Line
Count
Source
65
148
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
66
148
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
67
148
    }
_ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Line
Count
Source
65
712
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
66
712
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
67
712
    }
_ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Line
Count
Source
65
12.2M
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
66
12.2M
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
67
12.2M
    }
_ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Line
Count
Source
65
9
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
66
9
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
67
9
    }
68
};
69
70
class CsvProtoFieldSplitter final : public BaseLineFieldSplitter<CsvProtoFieldSplitter> {
71
public:
72
148
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
73
148
        auto** row_ptr = reinterpret_cast<PDataRow**>(line.data);
74
148
        PDataRow* row = *row_ptr;
75
732
        for (const PDataColumn& col : row->col()) {
76
732
            splitted_values->emplace_back(col.value());
77
732
        }
78
148
    }
79
};
80
81
template <typename Splitter>
82
class BaseCsvTextFieldSplitter : public BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> {
83
    // using a function ptr to decrease the overhead (found very effective during test).
84
    using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, std::vector<Slice>*);
85
86
public:
87
    explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
88
                                      size_t value_sep_len = 1, char trimming_char = 0)
89
2.42k
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
90
2.42k
        if (trim_tailing_space) {
91
0
            if (trim_ends) {
92
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
93
0
            } else {
94
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
95
0
            }
96
2.42k
        } else {
97
2.42k
            if (trim_ends) {
98
97
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
99
2.32k
            } else {
100
2.32k
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
101
2.32k
            }
102
2.42k
        }
103
2.42k
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEC2Ebbmc
Line
Count
Source
89
2.30k
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
90
2.30k
        if (trim_tailing_space) {
91
0
            if (trim_ends) {
92
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
93
0
            } else {
94
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
95
0
            }
96
2.30k
        } else {
97
2.30k
            if (trim_ends) {
98
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
99
2.30k
            } else {
100
2.30k
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
101
2.30k
            }
102
2.30k
        }
103
2.30k
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEC2Ebbmc
Line
Count
Source
89
97
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
90
97
        if (trim_tailing_space) {
91
0
            if (trim_ends) {
92
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
93
0
            } else {
94
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
95
0
            }
96
97
        } else {
97
97
            if (trim_ends) {
98
97
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
99
97
            } else {
100
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
101
0
            }
102
97
        }
103
97
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEC2Ebbmc
Line
Count
Source
89
27
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
90
27
        if (trim_tailing_space) {
91
0
            if (trim_ends) {
92
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
93
0
            } else {
94
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
95
0
            }
96
27
        } else {
97
27
            if (trim_ends) {
98
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
99
27
            } else {
100
27
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
101
27
            }
102
27
        }
103
27
    }
104
105
12.2M
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
106
12.2M
        static_cast<Splitter*>(this)->do_split(line, splitted_values);
107
12.2M
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Line
Count
Source
105
712
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
106
712
        static_cast<Splitter*>(this)->do_split(line, splitted_values);
107
712
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Line
Count
Source
105
12.2M
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
106
12.2M
        static_cast<Splitter*>(this)->do_split(line, splitted_values);
107
12.2M
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Line
Count
Source
105
9
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
106
9
        static_cast<Splitter*>(this)->do_split(line, splitted_values);
107
9
    }
108
109
protected:
110
    const char _trimming_char;
111
    const size_t _value_sep_len;
112
    ProcessValueFunc process_value_func;
113
114
private:
115
    template <bool TrimTailingSpace, bool TrimEnds>
116
    inline static void _process_value(const char* data, size_t start_offset, size_t value_len,
117
201M
                                      char trimming_char, std::vector<Slice>* splitted_values) {
118
201M
        if constexpr (TrimTailingSpace) {
119
0
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
120
0
                --value_len;
121
0
            }
122
0
        }
123
201M
        if constexpr (TrimEnds) {
124
3.30k
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
125
3.30k
                                   *(data + start_offset + value_len - 1) == trimming_char;
126
3.30k
            if (trim_cond) {
127
146
                ++(start_offset);
128
146
                value_len -= 2;
129
146
            }
130
3.30k
        }
131
201M
        splitted_values->emplace_back(data + start_offset, value_len);
132
201M
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
_ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Line
Count
Source
117
3.30k
                                      char trimming_char, std::vector<Slice>* splitted_values) {
118
        if constexpr (TrimTailingSpace) {
119
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
120
                --value_len;
121
            }
122
        }
123
3.30k
        if constexpr (TrimEnds) {
124
3.30k
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
125
3.30k
                                   *(data + start_offset + value_len - 1) == trimming_char;
126
3.30k
            if (trim_cond) {
127
146
                ++(start_offset);
128
146
                value_len -= 2;
129
146
            }
130
3.30k
        }
131
3.30k
        splitted_values->emplace_back(data + start_offset, value_len);
132
3.30k
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
_ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Line
Count
Source
117
201M
                                      char trimming_char, std::vector<Slice>* splitted_values) {
118
        if constexpr (TrimTailingSpace) {
119
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
120
                --value_len;
121
            }
122
        }
123
        if constexpr (TrimEnds) {
124
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
125
                                   *(data + start_offset + value_len - 1) == trimming_char;
126
            if (trim_cond) {
127
                ++(start_offset);
128
                value_len -= 2;
129
            }
130
        }
131
201M
        splitted_values->emplace_back(data + start_offset, value_len);
132
201M
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Line
Count
Source
117
184
                                      char trimming_char, std::vector<Slice>* splitted_values) {
118
        if constexpr (TrimTailingSpace) {
119
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
120
                --value_len;
121
            }
122
        }
123
        if constexpr (TrimEnds) {
124
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
125
                                   *(data + start_offset + value_len - 1) == trimming_char;
126
            if (trim_cond) {
127
                ++(start_offset);
128
                value_len -= 2;
129
            }
130
        }
131
184
        splitted_values->emplace_back(data + start_offset, value_len);
132
184
    }
133
};
134
135
class EncloseCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> {
136
public:
137
    explicit EncloseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
138
                                         std::shared_ptr<EncloseCsvLineReaderCtx> line_reader_ctx,
139
                                         size_t value_sep_len = 1, char trimming_char = 0)
140
97
            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
141
97
              _text_line_reader_ctx(std::move(line_reader_ctx)) {}
142
143
    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
144
145
private:
146
    std::shared_ptr<EncloseCsvLineReaderCtx> _text_line_reader_ctx;
147
};
148
149
class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> {
150
public:
151
    explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
152
                                       std::string value_sep, size_t value_sep_len = 1,
153
                                       char trimming_char = 0)
154
2.30k
            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
155
2.30k
              _value_sep(std::move(value_sep)) {
156
2.30k
        is_single_char_delim = (value_sep_len == 1);
157
2.30k
    }
158
159
    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
160
161
private:
162
    void _split_field_single_char(const Slice& line, std::vector<Slice>* splitted_values);
163
    void _split_field_multi_char(const Slice& line, std::vector<Slice>* splitted_values);
164
165
    bool is_single_char_delim;
166
    std::string _value_sep;
167
};
168
169
class CsvReader : public GenericReader {
170
    ENABLE_FACTORY_CREATOR(CsvReader);
171
172
public:
173
    CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
174
              const TFileScanRangeParams& params, const TFileRangeDesc& range,
175
              const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx,
176
              std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
177
2.40k
    ~CsvReader() override = default;
178
179
    Status init_reader(bool is_load);
180
    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
181
    Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
182
                       std::unordered_set<std::string>* missing_cols) override;
183
184
    Status init_schema_reader() override;
185
    // get schema of csv file from first one line or first two lines.
186
    // if file format is FORMAT_CSV_DEFLATE and if
187
    // 1. header_type is empty, get schema from first line.
188
    // 2. header_type is CSV_WITH_NAMES, get schema from first line.
189
    // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line.
190
    Status get_parsed_schema(std::vector<std::string>* col_names,
191
                             std::vector<DataTypePtr>* col_types) override;
192
193
    Status close() override;
194
195
protected:
196
    // init options for type serde
197
    virtual Status _init_options();
198
    virtual Status _create_line_reader();
199
    virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice);
200
    virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice);
201
    // check the utf8 encoding of a line.
202
    // return error status to stop processing.
203
    // If return Status::OK but "success" is false, which means this is load request
204
    // and the line is skipped as unqualified row, and the process should continue.
205
    virtual Status _validate_line(const Slice& line, bool* success);
206
207
    RuntimeProfile* _profile = nullptr;
208
    const TFileScanRangeParams& _params;
209
    std::string _value_separator;
210
    size_t _value_separator_length;
211
    std::string _line_delimiter;
212
    size_t _line_delimiter_length;
213
    char _escape = 0;
214
    DataTypeSerDeSPtrs _serdes;
215
    DataTypeSerDe::FormatOptions _options;
216
    std::unique_ptr<LineFieldSplitterIf> _fields_splitter;
217
    int64_t _start_offset;
218
    int64_t _size;
219
    io::FileReaderSPtr _file_reader;
220
    std::unique_ptr<LineReader> _line_reader;
221
    std::unique_ptr<Decompressor> _decompressor;
222
223
private:
224
    Status _create_decompressor();
225
    Status _create_file_reader(bool need_schema);
226
    Status _fill_dest_columns(const Slice& line, Block* block,
227
                              std::vector<MutableColumnPtr>& columns, size_t* rows);
228
    Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows);
229
    Status _line_split_to_values(const Slice& line, bool* success);
230
    void _split_line(const Slice& line);
231
    void _init_system_properties();
232
    void _init_file_description();
233
234
    Status _parse_col_nums(size_t* col_nums);
235
    Status _parse_col_names(std::vector<std::string>* col_names);
236
    // TODO(ftw): parse type
237
    Status _parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types);
238
239
    // If the CSV file is an UTF8 encoding with BOM,
240
    // then remove the first 3 bytes at the beginning of this file
241
    // and set size = size - 3.
242
    const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size);
243
244
    RuntimeState* _state = nullptr;
245
    ScannerCounter* _counter = nullptr;
246
    const TFileRangeDesc& _range;
247
    io::FileSystemProperties _system_properties;
248
    io::FileDescription _file_description;
249
    const std::vector<SlotDescriptor*>& _file_slot_descs;
250
    // Only for query task, save the file slot to columns in block map.
251
    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
252
    // and this 3 columns in block are k2, k3, k1,
253
    // the _file_slot_idx_map will save: 2, 0, 1
254
    std::vector<int> _file_slot_idx_map;
255
    // Only for query task, save the columns' index which need to be read.
256
    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
257
    // and the corresponding position in file is 0, 3, 5.
258
    // So the _col_idx will be: <0, 3, 5>
259
    std::vector<int> _col_idxs;
260
    // True if this is a load task
261
    bool _is_load = false;
262
    bool _line_reader_eof;
263
    // For schema reader
264
    size_t _read_line = 0;
265
    bool _is_parse_name = false;
266
    TFileFormatType::type _file_format_type;
267
    bool _is_proto_format;
268
    TFileCompressType::type _file_compress_type;
269
270
    // When we fetch range start from 0, header_type="csv_with_names" skip first line
271
    // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
272
    // When we fetch range doesn't start from 0 will always skip the first line
273
    int _skip_lines;
274
    char _enclose = 0;
275
    bool _trim_double_quotes = false;
276
    bool _trim_tailing_spaces = false;
277
    bool _keep_cr = false;
278
    bool _empty_field_as_null = false;
279
280
    io::IOContext* _io_ctx = nullptr;
281
    std::shared_ptr<io::IOContext> _io_ctx_holder;
282
    // Stored to adjust column_sep_positions when BOM is removed in enclose mode
283
    std::shared_ptr<EncloseCsvLineReaderCtx> _enclose_reader_ctx;
284
    // save source text which have been splitted.
285
    std::vector<Slice> _split_values;
286
    std::vector<int> _use_nullable_string_opt;
287
};
288
#include "common/compile_check_end.h"
289
} // namespace doris