Coverage Report

Created: 2026-04-11 00:05

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PlanNodes_types.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include <cstddef>
24
#include <cstdint>
25
#include <memory>
26
#include <string>
27
#include <unordered_map>
28
#include <unordered_set>
29
#include <utility>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "core/data_type/data_type.h"
34
#include "format/file_reader/new_plain_text_line_reader.h"
35
#include "format/generic_reader.h"
36
#include "io/file_factory.h"
37
#include "io/fs/file_reader_writer_fwd.h"
38
#include "util/decompressor.h"
39
#include "util/slice.h"
40
41
namespace doris {
42
43
class SlotDescriptor;
44
class RuntimeProfile;
45
class RuntimeState;
46
47
namespace io {
48
struct IOContext;
49
} // namespace io
50
51
struct ScannerCounter;
52
class Block;
53
54
class LineFieldSplitterIf {
55
public:
56
24
    virtual ~LineFieldSplitterIf() = default;
57
58
    virtual void split_line(const Slice& line, std::vector<Slice>* splitted_values) = 0;
59
};
60
61
template <typename Splitter>
62
class BaseLineFieldSplitter : public LineFieldSplitterIf {
63
public:
64
0
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
65
0
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
66
0
    }
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_21CsvProtoFieldSplitterEE10split_lineERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
67
};
68
69
class CsvProtoFieldSplitter final : public BaseLineFieldSplitter<CsvProtoFieldSplitter> {
70
public:
71
0
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
72
0
        auto** row_ptr = reinterpret_cast<PDataRow**>(line.data);
73
0
        PDataRow* row = *row_ptr;
74
0
        for (const PDataColumn& col : row->col()) {
75
0
            splitted_values->emplace_back(col.value());
76
0
        }
77
0
    }
78
};
79
80
template <typename Splitter>
81
class BaseCsvTextFieldSplitter : public BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> {
82
    // using a function ptr to decrease the overhead (found very effective during test).
83
    using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, std::vector<Slice>*);
84
85
public:
86
    explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
87
                                      size_t value_sep_len = 1, char trimming_char = 0)
88
24
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
89
24
        if (trim_tailing_space) {
90
0
            if (trim_ends) {
91
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
92
0
            } else {
93
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
94
0
            }
95
24
        } else {
96
24
            if (trim_ends) {
97
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
98
24
            } else {
99
24
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
100
24
            }
101
24
        }
102
24
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEC2Ebbmc
Line
Count
Source
88
24
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
89
24
        if (trim_tailing_space) {
90
0
            if (trim_ends) {
91
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
92
0
            } else {
93
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
94
0
            }
95
24
        } else {
96
24
            if (trim_ends) {
97
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
98
24
            } else {
99
24
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
100
24
            }
101
24
        }
102
24
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEC2Ebbmc
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEC2Ebbmc
103
104
0
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
105
0
        static_cast<Splitter*>(this)->do_split(line, splitted_values);
106
0
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
107
108
protected:
109
    const char _trimming_char;
110
    const size_t _value_sep_len;
111
    ProcessValueFunc process_value_func;
112
113
private:
114
    template <bool TrimTailingSpace, bool TrimEnds>
115
    inline static void _process_value(const char* data, size_t start_offset, size_t value_len,
116
58
                                      char trimming_char, std::vector<Slice>* splitted_values) {
117
58
        if constexpr (TrimTailingSpace) {
118
0
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
119
0
                --value_len;
120
0
            }
121
0
        }
122
58
        if constexpr (TrimEnds) {
123
0
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
124
0
                                   *(data + start_offset + value_len - 1) == trimming_char;
125
0
            if (trim_cond) {
126
0
                ++(start_offset);
127
0
                value_len -= 2;
128
0
            }
129
0
        }
130
58
        splitted_values->emplace_back(data + start_offset, value_len);
131
58
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Line
Count
Source
116
58
                                      char trimming_char, std::vector<Slice>* splitted_values) {
117
        if constexpr (TrimTailingSpace) {
118
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
119
                --value_len;
120
            }
121
        }
122
        if constexpr (TrimEnds) {
123
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
124
                                   *(data + start_offset + value_len - 1) == trimming_char;
125
            if (trim_cond) {
126
                ++(start_offset);
127
                value_len -= 2;
128
            }
129
        }
130
58
        splitted_values->emplace_back(data + start_offset, value_len);
131
58
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
132
};
133
134
class EncloseCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> {
135
public:
136
    explicit EncloseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
137
                                         std::shared_ptr<EncloseCsvLineReaderCtx> line_reader_ctx,
138
                                         size_t value_sep_len = 1, char trimming_char = 0)
139
0
            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
140
0
              _text_line_reader_ctx(std::move(line_reader_ctx)) {}
141
142
    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
143
144
private:
145
    std::shared_ptr<EncloseCsvLineReaderCtx> _text_line_reader_ctx;
146
};
147
148
class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> {
149
public:
150
    explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
151
                                       std::string value_sep, size_t value_sep_len = 1,
152
                                       char trimming_char = 0)
153
0
            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
154
0
              _value_sep(std::move(value_sep)) {
155
0
        is_single_char_delim = (value_sep_len == 1);
156
0
    }
157
158
    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
159
160
private:
161
    void _split_field_single_char(const Slice& line, std::vector<Slice>* splitted_values);
162
    void _split_field_multi_char(const Slice& line, std::vector<Slice>* splitted_values);
163
164
    bool is_single_char_delim;
165
    std::string _value_sep;
166
};
167
168
class CsvReader : public GenericReader {
169
    ENABLE_FACTORY_CREATOR(CsvReader);
170
171
public:
172
    CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
173
              const TFileScanRangeParams& params, const TFileRangeDesc& range,
174
              const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx,
175
              std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
176
0
    ~CsvReader() override = default;
177
178
    Status init_reader(bool is_load);
179
    Status get_next_block(Block* block, size_t* read_rows, bool* eof) override;
180
    Status get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
181
                       std::unordered_set<std::string>* missing_cols) override;
182
183
    Status init_schema_reader() override;
184
    // get schema of csv file from first one line or first two lines.
185
    // if file format is FORMAT_CSV_DEFLATE and if
186
    // 1. header_type is empty, get schema from first line.
187
    // 2. header_type is CSV_WITH_NAMES, get schema from first line.
188
    // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line.
189
    Status get_parsed_schema(std::vector<std::string>* col_names,
190
                             std::vector<DataTypePtr>* col_types) override;
191
192
    Status close() override;
193
194
protected:
195
    // init options for type serde
196
    virtual Status _init_options();
197
    virtual Status _create_line_reader();
198
    virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice);
199
    virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice);
200
    // check the utf8 encoding of a line.
201
    // return error status to stop processing.
202
    // If return Status::OK but "success" is false, which means this is load request
203
    // and the line is skipped as unqualified row, and the process should continue.
204
    virtual Status _validate_line(const Slice& line, bool* success);
205
206
    RuntimeProfile* _profile = nullptr;
207
    const TFileScanRangeParams& _params;
208
    std::string _value_separator;
209
    size_t _value_separator_length;
210
    std::string _line_delimiter;
211
    size_t _line_delimiter_length;
212
    char _escape = 0;
213
    DataTypeSerDeSPtrs _serdes;
214
    DataTypeSerDe::FormatOptions _options;
215
    std::unique_ptr<LineFieldSplitterIf> _fields_splitter;
216
    int64_t _start_offset;
217
    int64_t _size;
218
    io::FileReaderSPtr _file_reader;
219
    std::unique_ptr<LineReader> _line_reader;
220
    std::unique_ptr<Decompressor> _decompressor;
221
222
private:
223
    Status _create_decompressor();
224
    Status _create_file_reader(bool need_schema);
225
    Status _fill_dest_columns(const Slice& line, Block* block,
226
                              std::vector<MutableColumnPtr>& columns, size_t* rows);
227
    Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows);
228
    Status _line_split_to_values(const Slice& line, bool* success);
229
    void _split_line(const Slice& line);
230
    void _init_system_properties();
231
    void _init_file_description();
232
233
    Status _parse_col_nums(size_t* col_nums);
234
    Status _parse_col_names(std::vector<std::string>* col_names);
235
    // TODO(ftw): parse type
236
    Status _parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types);
237
238
    // If the CSV file is an UTF8 encoding with BOM,
239
    // then remove the first 3 bytes at the beginning of this file
240
    // and set size = size - 3.
241
    const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size);
242
243
    RuntimeState* _state = nullptr;
244
    ScannerCounter* _counter = nullptr;
245
    const TFileRangeDesc& _range;
246
    io::FileSystemProperties _system_properties;
247
    io::FileDescription _file_description;
248
    const std::vector<SlotDescriptor*>& _file_slot_descs;
249
    // Only for query task, save the file slot to columns in block map.
250
    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
251
    // and this 3 columns in block are k2, k3, k1,
252
    // the _file_slot_idx_map will save: 2, 0, 1
253
    std::vector<int> _file_slot_idx_map;
254
    // Only for query task, save the columns' index which need to be read.
255
    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
256
    // and the corresponding position in file is 0, 3, 5.
257
    // So the _col_idx will be: <0, 3, 5>
258
    std::vector<int> _col_idxs;
259
    // True if this is a load task
260
    bool _is_load = false;
261
    bool _line_reader_eof;
262
    // For schema reader
263
    size_t _read_line = 0;
264
    bool _is_parse_name = false;
265
    TFileFormatType::type _file_format_type;
266
    bool _is_proto_format;
267
    TFileCompressType::type _file_compress_type;
268
269
    // When we fetch range start from 0, header_type="csv_with_names" skip first line
270
    // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
271
    // When we fetch range doesn't start from 0 will always skip the first line
272
    int _skip_lines;
273
    char _enclose = 0;
274
    bool _trim_double_quotes = false;
275
    bool _trim_tailing_spaces = false;
276
    bool _keep_cr = false;
277
    bool _empty_field_as_null = false;
278
279
    io::IOContext* _io_ctx = nullptr;
280
    std::shared_ptr<io::IOContext> _io_ctx_holder;
281
    // Stored to adjust column_sep_positions when BOM is removed in enclose mode
282
    std::shared_ptr<EncloseCsvLineReaderCtx> _enclose_reader_ctx;
283
    // save source text which have been splitted.
284
    std::vector<Slice> _split_values;
285
    std::vector<int> _use_nullable_string_opt;
286
};
287
} // namespace doris