Coverage Report

Created: 2026-04-05 19:35

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/PlanNodes_types.h>
21
#include <gen_cpp/internal_service.pb.h>
22
23
#include <cstddef>
24
#include <cstdint>
25
#include <memory>
26
#include <string>
27
#include <unordered_map>
28
#include <unordered_set>
29
#include <utility>
30
#include <vector>
31
32
#include "common/status.h"
33
#include "core/data_type/data_type.h"
34
#include "format/file_reader/new_plain_text_line_reader.h"
35
#include "format/table/table_format_reader.h"
36
#include "io/file_factory.h"
37
#include "io/fs/file_reader_writer_fwd.h"
38
#include "util/decompressor.h"
39
#include "util/slice.h"
40
41
namespace doris {
42
#include "common/compile_check_begin.h"
43
44
class SlotDescriptor;
45
class RuntimeProfile;
46
class RuntimeState;
47
48
namespace io {
49
struct IOContext;
50
} // namespace io
51
52
struct ScannerCounter;
53
class Block;
54
55
/// CSV/Text-specific initialization context.
56
struct CsvInitContext final : public ReaderInitContext {
57
    bool is_load = false;
58
};
59
60
class LineFieldSplitterIf {
61
public:
62
24
    virtual ~LineFieldSplitterIf() = default;
63
64
    virtual void split_line(const Slice& line, std::vector<Slice>* splitted_values) = 0;
65
};
66
67
template <typename Splitter>
68
class BaseLineFieldSplitter : public LineFieldSplitterIf {
69
public:
70
0
    inline void split_line(const Slice& line, std::vector<Slice>* splitted_values) final {
71
0
        static_cast<Splitter*>(this)->split_line_impl(line, splitted_values);
72
0
    }
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_21CsvProtoFieldSplitterEE10split_lineERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
Unexecuted instantiation: _ZN5doris21BaseLineFieldSplitterINS_24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEEE10split_lineERKNS_5SliceEPSt6vectorIS5_SaIS5_EE
73
};
74
75
class CsvProtoFieldSplitter final : public BaseLineFieldSplitter<CsvProtoFieldSplitter> {
76
public:
77
0
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
78
0
        auto** row_ptr = reinterpret_cast<PDataRow**>(line.data);
79
0
        PDataRow* row = *row_ptr;
80
0
        for (const PDataColumn& col : row->col()) {
81
0
            splitted_values->emplace_back(col.value());
82
0
        }
83
0
    }
84
};
85
86
template <typename Splitter>
87
class BaseCsvTextFieldSplitter : public BaseLineFieldSplitter<BaseCsvTextFieldSplitter<Splitter>> {
88
    // using a function ptr to decrease the overhead (found very effective during test).
89
    using ProcessValueFunc = void (*)(const char*, size_t, size_t, char, std::vector<Slice>*);
90
91
public:
92
    explicit BaseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
93
                                      size_t value_sep_len = 1, char trimming_char = 0)
94
24
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
95
24
        if (trim_tailing_space) {
96
0
            if (trim_ends) {
97
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
98
0
            } else {
99
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
100
0
            }
101
24
        } else {
102
24
            if (trim_ends) {
103
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
104
24
            } else {
105
24
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
106
24
            }
107
24
        }
108
24
    }
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEEC2Ebbmc
Line
Count
Source
94
24
            : _trimming_char(trimming_char), _value_sep_len(value_sep_len) {
95
24
        if (trim_tailing_space) {
96
0
            if (trim_ends) {
97
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, true>;
98
0
            } else {
99
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<true, false>;
100
0
            }
101
24
        } else {
102
24
            if (trim_ends) {
103
0
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, true>;
104
24
            } else {
105
24
                process_value_func = &BaseCsvTextFieldSplitter::_process_value<false, false>;
106
24
            }
107
24
        }
108
24
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEEC2Ebbmc
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEEC2Ebbmc
109
110
0
    inline void split_line_impl(const Slice& line, std::vector<Slice>* splitted_values) {
111
0
        static_cast<Splitter*>(this)->do_split(line, splitted_values);
112
0
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE15split_line_implERKNS_5SliceEPSt6vectorIS3_SaIS3_EE
113
114
protected:
115
    const char _trimming_char;
116
    const size_t _value_sep_len;
117
    ProcessValueFunc process_value_func;
118
119
private:
120
    template <bool TrimTailingSpace, bool TrimEnds>
121
    inline static void _process_value(const char* data, size_t start_offset, size_t value_len,
122
58
                                      char trimming_char, std::vector<Slice>* splitted_values) {
123
58
        if constexpr (TrimTailingSpace) {
124
0
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
125
0
                --value_len;
126
0
            }
127
0
        }
128
58
        if constexpr (TrimEnds) {
129
0
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
130
0
                                   *(data + start_offset + value_len - 1) == trimming_char;
131
0
            if (trim_cond) {
132
0
                ++(start_offset);
133
0
                value_len -= 2;
134
0
            }
135
0
        }
136
58
        splitted_values->emplace_back(data + start_offset, value_len);
137
58
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
_ZN5doris24BaseCsvTextFieldSplitterINS_21HiveTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Line
Count
Source
122
58
                                      char trimming_char, std::vector<Slice>* splitted_values) {
123
        if constexpr (TrimTailingSpace) {
124
            while (value_len > 0 && *(data + start_offset + value_len - 1) == ' ') {
125
                --value_len;
126
            }
127
        }
128
        if constexpr (TrimEnds) {
129
            const bool trim_cond = value_len > 1 && *(data + start_offset) == trimming_char &&
130
                                   *(data + start_offset + value_len - 1) == trimming_char;
131
            if (trim_cond) {
132
                ++(start_offset);
133
                value_len -= 2;
134
            }
135
        }
136
58
        splitted_values->emplace_back(data + start_offset, value_len);
137
58
    }
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_27EncloseCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb1ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb1EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
Unexecuted instantiation: _ZN5doris24BaseCsvTextFieldSplitterINS_25PlainCsvTextFieldSplitterEE14_process_valueILb0ELb0EEEvPKcmmcPSt6vectorINS_5SliceESaIS7_EE
138
};
139
140
class EncloseCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<EncloseCsvTextFieldSplitter> {
141
public:
142
    explicit EncloseCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
143
                                         std::shared_ptr<EncloseCsvLineReaderCtx> line_reader_ctx,
144
                                         size_t value_sep_len = 1, char trimming_char = 0)
145
0
            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
146
0
              _text_line_reader_ctx(std::move(line_reader_ctx)) {}
147
148
    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
149
150
private:
151
    std::shared_ptr<EncloseCsvLineReaderCtx> _text_line_reader_ctx;
152
};
153
154
class PlainCsvTextFieldSplitter : public BaseCsvTextFieldSplitter<PlainCsvTextFieldSplitter> {
155
public:
156
    explicit PlainCsvTextFieldSplitter(bool trim_tailing_space, bool trim_ends,
157
                                       std::string value_sep, size_t value_sep_len = 1,
158
                                       char trimming_char = 0)
159
0
            : BaseCsvTextFieldSplitter(trim_tailing_space, trim_ends, value_sep_len, trimming_char),
160
0
              _value_sep(std::move(value_sep)) {
161
0
        is_single_char_delim = (value_sep_len == 1);
162
0
    }
163
164
    void do_split(const Slice& line, std::vector<Slice>* splitted_values);
165
166
private:
167
    void _split_field_single_char(const Slice& line, std::vector<Slice>* splitted_values);
168
    void _split_field_multi_char(const Slice& line, std::vector<Slice>* splitted_values);
169
170
    bool is_single_char_delim;
171
    std::string _value_sep;
172
};
173
174
class CsvReader : public TableFormatReader {
175
    ENABLE_FACTORY_CREATOR(CsvReader);
176
177
public:
178
    CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
179
              const TFileScanRangeParams& params, const TFileRangeDesc& range,
180
              const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx,
181
              std::shared_ptr<io::IOContext> io_ctx_holder = nullptr);
182
0
    ~CsvReader() override = default;
183
184
    Status init_reader(bool is_load);
185
    Status _do_get_next_block(Block* block, size_t* read_rows, bool* eof) override;
186
    Status _get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) override;
187
188
    Status init_schema_reader() override;
189
    // get schema of csv file from first one line or first two lines.
190
    // if file format is FORMAT_CSV_DEFLATE and if
191
    // 1. header_type is empty, get schema from first line.
192
    // 2. header_type is CSV_WITH_NAMES, get schema from first line.
193
    // 3. header_type is CSV_WITH_NAMES_AND_TYPES, get schema from first two line.
194
    Status get_parsed_schema(std::vector<std::string>* col_names,
195
                             std::vector<DataTypePtr>* col_types) override;
196
197
    Status close() override;
198
199
protected:
200
    // ---- Unified init_reader(ReaderInitContext*) overrides ----
201
    Status _open_file_reader(ReaderInitContext* ctx) override;
202
    Status _do_init_reader(ReaderInitContext* ctx) override;
203
204
    // init options for type serde
205
    virtual Status _init_options();
206
    virtual Status _create_line_reader();
207
    virtual Status _deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice);
208
    virtual Status _deserialize_nullable_string(IColumn& column, Slice& slice);
209
    // check the utf8 encoding of a line.
210
    // return error status to stop processing.
211
    // If return Status::OK but "success" is false, which means this is load request
212
    // and the line is skipped as unqualified row, and the process should continue.
213
    virtual Status _validate_line(const Slice& line, bool* success);
214
215
    RuntimeProfile* _profile = nullptr;
216
    const TFileScanRangeParams& _params;
217
    std::string _value_separator;
218
    size_t _value_separator_length;
219
    std::string _line_delimiter;
220
    size_t _line_delimiter_length;
221
    char _escape = 0;
222
    DataTypeSerDeSPtrs _serdes;
223
    DataTypeSerDe::FormatOptions _options;
224
    std::unique_ptr<LineFieldSplitterIf> _fields_splitter;
225
    int64_t _start_offset;
226
    int64_t _size;
227
    io::FileReaderSPtr _file_reader;
228
    std::unique_ptr<LineReader> _line_reader;
229
    std::unique_ptr<Decompressor> _decompressor;
230
231
private:
232
    Status _create_decompressor();
233
    Status _create_file_reader(bool need_schema);
234
    Status _fill_dest_columns(const Slice& line, Block* block,
235
                              std::vector<MutableColumnPtr>& columns, size_t* rows);
236
    Status _fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns, size_t* rows);
237
    Status _line_split_to_values(const Slice& line, bool* success);
238
    void _split_line(const Slice& line);
239
    void _init_system_properties();
240
    void _init_file_description();
241
242
    Status _parse_col_nums(size_t* col_nums);
243
    Status _parse_col_names(std::vector<std::string>* col_names);
244
    // TODO(ftw): parse type
245
    Status _parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types);
246
247
    // If the CSV file is an UTF8 encoding with BOM,
248
    // then remove the first 3 bytes at the beginning of this file
249
    // and set size = size - 3.
250
    const uint8_t* _remove_bom(const uint8_t* ptr, size_t& size);
251
252
    RuntimeState* _state = nullptr;
253
    ScannerCounter* _counter = nullptr;
254
    const TFileRangeDesc& _range;
255
    io::FileSystemProperties _system_properties;
256
    io::FileDescription _file_description;
257
    const std::vector<SlotDescriptor*>& _file_slot_descs;
258
    // Only for query task, save the file slot to columns in block map.
259
    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
260
    // and this 3 columns in block are k2, k3, k1,
261
    // the _file_slot_idx_map will save: 2, 0, 1
262
    std::vector<int> _file_slot_idx_map;
263
    // Only for query task, save the columns' index which need to be read.
264
    // eg, there are 3 cols in "_file_slot_descs" named: k1, k2, k3
265
    // and the corresponding position in file is 0, 3, 5.
266
    // So the _col_idx will be: <0, 3, 5>
267
    std::vector<int> _col_idxs;
268
    // True if this is a load task
269
    bool _is_load = false;
270
    bool _line_reader_eof;
271
    // For schema reader
272
    size_t _read_line = 0;
273
    bool _is_parse_name = false;
274
    TFileFormatType::type _file_format_type;
275
    bool _is_proto_format;
276
    TFileCompressType::type _file_compress_type;
277
278
    // When we fetch range start from 0, header_type="csv_with_names" skip first line
279
    // When we fetch range start from 0, header_type="csv_with_names_and_types" skip first two line
280
    // When we fetch range doesn't start from 0 will always skip the first line
281
    int _skip_lines;
282
    char _enclose = 0;
283
    bool _trim_double_quotes = false;
284
    bool _trim_tailing_spaces = false;
285
    bool _keep_cr = false;
286
    bool _empty_field_as_null = false;
287
288
    io::IOContext* _io_ctx = nullptr;
289
    std::shared_ptr<io::IOContext> _io_ctx_holder;
290
    // Stored to adjust column_sep_positions when BOM is removed in enclose mode
291
    std::shared_ptr<EncloseCsvLineReaderCtx> _enclose_reader_ctx;
292
    // save source text which have been splitted.
293
    std::vector<Slice> _split_values;
294
    std::vector<int> _use_nullable_string_opt;
295
};
296
#include "common/compile_check_end.h"
297
} // namespace doris