Coverage Report

Created: 2026-06-24 14:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/csv/csv_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <algorithm>
26
#include <cstddef>
27
#include <map>
28
#include <memory>
29
#include <numeric>
30
#include <ostream>
31
#include <regex>
32
#include <utility>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/config.h"
36
#include "common/consts.h"
37
#include "common/status.h"
38
#include "core/block/block.h"
39
#include "core/block/column_with_type_and_name.h"
40
#include "core/column/column_nullable.h"
41
#include "core/column/column_string.h"
42
#include "core/data_type/data_type_factory.hpp"
43
#include "core/data_type_serde/data_type_string_serde.h"
44
#include "exec/scan/scanner.h"
45
#include "format/file_reader/new_plain_binary_line_reader.h"
46
#include "format/file_reader/new_plain_text_line_reader.h"
47
#include "format/line_reader.h"
48
#include "io/file_factory.h"
49
#include "io/fs/broker_file_reader.h"
50
#include "io/fs/buffered_reader.h"
51
#include "io/fs/file_reader.h"
52
#include "io/fs/s3_file_reader.h"
53
#include "io/fs/tracing_file_reader.h"
54
#include "runtime/descriptors.h"
55
#include "runtime/runtime_state.h"
56
#include "util/decompressor.h"
57
#include "util/string_util.h"
58
#include "util/utf8_check.h"
59
60
namespace doris {
61
class RuntimeProfile;
62
class IColumn;
63
namespace io {
64
struct IOContext;
65
enum class FileCachePolicy : uint8_t;
66
} // namespace io
67
} // namespace doris
68
69
namespace doris {
70
71
namespace {
72
73
15.1M
size_t columns_byte_size(const std::vector<MutableColumnPtr>& columns) {
74
15.1M
    size_t bytes = 0;
75
213M
    for (const auto& column : columns) {
76
213M
        DCHECK(column.get() != nullptr);
77
213M
        bytes += column->byte_size();
78
213M
    }
79
15.1M
    return bytes;
80
15.1M
}
81
82
} // namespace
83
84
878
void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
85
878
    const char* data = line.data;
86
878
    const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions();
87
878
    size_t value_start_offset = 0;
88
2.98k
    for (auto idx : column_sep_positions) {
89
2.98k
        process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char,
90
2.98k
                           splitted_values);
91
2.98k
        value_start_offset = idx + _value_sep_len;
92
2.98k
    }
93
878
    if (line.size >= value_start_offset) {
94
        // process the last column
95
876
        process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char,
96
876
                           splitted_values);
97
876
    }
98
878
}
99
100
void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line,
101
14.9M
                                                         std::vector<Slice>* splitted_values) {
102
14.9M
    const char* data = line.data;
103
14.9M
    const size_t size = line.size;
104
14.9M
    size_t value_start = 0;
105
3.60G
    for (size_t i = 0; i < size; ++i) {
106
3.59G
        if (data[i] == _value_sep[0]) {
107
446M
            process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
108
446M
            value_start = i + _value_sep_len;
109
446M
        }
110
3.59G
    }
111
14.9M
    process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
112
14.9M
}
113
114
void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line,
115
2.38k
                                                        std::vector<Slice>* splitted_values) {
116
2.38k
    size_t start = 0;  // point to the start pos of next col value.
117
2.38k
    size_t curpos = 0; // point to the start pos of separator matching sequence.
118
119
    // value_sep : AAAA
120
    // line.data : 1234AAAA5678
121
    // -> 1234,5678
122
123
    //    start   start
124
    //      ▼       ▼
125
    //      1234AAAA5678\0
126
    //          ▲       ▲
127
    //      curpos     curpos
128
129
    //kmp
130
2.38k
    std::vector<int> next(_value_sep_len);
131
2.38k
    next[0] = -1;
132
4.96k
    for (int i = 1, j = -1; i < _value_sep_len; i++) {
133
2.60k
        while (j > -1 && _value_sep[i] != _value_sep[j + 1]) {
134
20
            j = next[j];
135
20
        }
136
2.58k
        if (_value_sep[i] == _value_sep[j + 1]) {
137
2.44k
            j++;
138
2.44k
        }
139
2.58k
        next[i] = j;
140
2.58k
    }
141
142
42.1k
    for (int i = 0, j = -1; i < line.size; i++) {
143
        // i : line
144
        // j : _value_sep
145
42.4k
        while (j > -1 && line[i] != _value_sep[j + 1]) {
146
2.66k
            j = next[j];
147
2.66k
        }
148
39.7k
        if (line[i] == _value_sep[j + 1]) {
149
7.02k
            j++;
150
7.02k
        }
151
39.7k
        if (j == _value_sep_len - 1) {
152
3.19k
            curpos = i - _value_sep_len + 1;
153
154
            /*
155
             * column_separator : "xx"
156
             * data.csv :  data1xxxxdata2
157
             *
158
             * Parse incorrectly:
159
             *      data1[xx]xxdata2
160
             *      data1x[xx]xdata2
161
             *      data1xx[xx]data2
162
             * The string "xxxx" is parsed into three "xx" delimiters.
163
             *
164
             * Parse correctly:
165
             *      data1[xx]xxdata2
166
             *      data1xx[xx]data2
167
             */
168
169
3.19k
            if (curpos >= start) {
170
3.13k
                process_value_func(line.data, start, curpos - start, _trimming_char,
171
3.13k
                                   splitted_values);
172
3.13k
                start = i + 1;
173
3.13k
            }
174
175
3.19k
            j = next[j];
176
3.19k
        }
177
39.7k
    }
178
2.38k
    process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values);
179
2.38k
}
180
181
14.9M
void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
182
14.9M
    if (is_single_char_delim) {
183
14.9M
        _split_field_single_char(line, splitted_values);
184
18.4E
    } else {
185
18.4E
        _split_field_multi_char(line, splitted_values);
186
18.4E
    }
187
14.9M
}
188
189
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
190
                     const TFileScanRangeParams& params, const TFileRangeDesc& range,
191
                     const std::vector<SlotDescriptor*>& file_slot_descs, size_t batch_size,
192
                     io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
193
8.28k
        : _profile(profile),
194
8.28k
          _params(params),
195
8.28k
          _file_reader(nullptr),
196
8.28k
          _line_reader(nullptr),
197
8.28k
          _decompressor(nullptr),
198
8.28k
          _state(state),
199
8.28k
          _counter(counter),
200
8.28k
          _range(range),
201
8.28k
          _file_slot_descs(file_slot_descs),
202
8.28k
          _line_reader_eof(false),
203
8.28k
          _skip_lines(0),
204
8.28k
          _io_ctx(io_ctx),
205
8.28k
          _io_ctx_holder(std::move(io_ctx_holder)),
206
8.28k
          _batch_size(std::max(batch_size, 1UL)) {
207
8.28k
    if (_io_ctx == nullptr && _io_ctx_holder) {
208
7.47k
        _io_ctx = _io_ctx_holder.get();
209
7.47k
    }
210
8.28k
    _file_format_type = _params.format_type;
211
8.28k
    _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
212
8.28k
    if (_range.__isset.compress_type) {
213
        // for compatibility
214
6.28k
        _file_compress_type = _range.compress_type;
215
6.28k
    } else {
216
1.99k
        _file_compress_type = _params.compress_type;
217
1.99k
    }
218
8.28k
    _size = _range.size;
219
220
8.28k
    _split_values.reserve(_file_slot_descs.size());
221
8.28k
    _init_system_properties();
222
8.28k
    _init_file_description();
223
8.28k
    _serdes = create_data_type_serdes(_file_slot_descs);
224
8.28k
}
225
226
8.28k
void CsvReader::_init_system_properties() {
227
8.28k
    if (_range.__isset.file_type) {
228
        // for compatibility
229
6.12k
        _system_properties.system_type = _range.file_type;
230
6.12k
    } else {
231
2.15k
        _system_properties.system_type = _params.file_type;
232
2.15k
    }
233
8.28k
    _system_properties.properties = _params.properties;
234
8.28k
    _system_properties.hdfs_params = _params.hdfs_params;
235
8.28k
    if (_params.__isset.broker_addresses) {
236
1.88k
        _system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
237
1.88k
                                                   _params.broker_addresses.end());
238
1.88k
    }
239
8.28k
}
240
241
8.28k
void CsvReader::_init_file_description() {
242
8.28k
    _file_description.path = _range.path;
243
8.28k
    _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
244
8.28k
    if (_range.__isset.fs_name) {
245
4.96k
        _file_description.fs_name = _range.fs_name;
246
4.96k
    }
247
8.28k
    if (_range.__isset.file_cache_admission) {
248
5.47k
        _file_description.file_cache_admission = _range.file_cache_admission;
249
5.47k
    }
250
8.28k
}
251
252
0
Status CsvReader::init_reader(bool is_load) {
253
    // set the skip lines and start offset
254
0
    _start_offset = _range.start_offset;
255
0
    if (_start_offset == 0) {
256
        // check header typer first
257
0
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
258
0
            !_params.file_attributes.header_type.empty()) {
259
0
            std::string header_type = to_lower(_params.file_attributes.header_type);
260
0
            if (header_type == BeConsts::CSV_WITH_NAMES) {
261
0
                _skip_lines = 1;
262
0
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
263
0
                _skip_lines = 2;
264
0
            }
265
0
        } else if (_params.file_attributes.__isset.skip_lines) {
266
0
            _skip_lines = _params.file_attributes.skip_lines;
267
0
        }
268
0
    } else if (_start_offset != 0) {
269
0
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
270
0
            (_file_compress_type == TFileCompressType::UNKNOWN &&
271
0
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
272
0
            return Status::InternalError<false>("For now we do not support split compressed file");
273
0
        }
274
        // pre-read to promise first line skipped always read
275
0
        int64_t pre_read_len = std::min(
276
0
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
277
0
                _start_offset);
278
0
        _start_offset -= pre_read_len;
279
0
        _size += pre_read_len;
280
        // not first range will always skip one line
281
0
        _skip_lines = 1;
282
0
    }
283
284
0
    _use_nullable_string_opt.resize(_file_slot_descs.size());
285
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
286
0
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
287
0
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
288
0
            _use_nullable_string_opt[i] = 1;
289
0
        }
290
0
    }
291
292
0
    RETURN_IF_ERROR(_init_options());
293
0
    RETURN_IF_ERROR(_create_file_reader(false));
294
0
    RETURN_IF_ERROR(_create_decompressor());
295
0
    RETURN_IF_ERROR(_create_line_reader());
296
297
0
    _is_load = is_load;
298
0
    if (!_is_load) {
299
        // For query task, there are 2 slot mapping.
300
        // One is from file slot to values in line.
301
        //      eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
302
        //      the _col_idxs will save: 0, 2, 4
303
        // The other is from file slot to columns in output block
304
        //      eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
305
        //      where "p1" is the partition col which does not exist in file
306
        //      the _file_slot_idx_map will save: 1, 2, 3
307
0
        DCHECK(_params.__isset.column_idxs);
308
0
        _col_idxs = _params.column_idxs;
309
0
        int idx = 0;
310
0
        for (const auto& slot_info : _params.required_slots) {
311
0
            if (slot_info.is_file_slot) {
312
0
                _file_slot_idx_map.push_back(idx);
313
0
            }
314
0
            idx++;
315
0
        }
316
0
    } else {
317
        // For load task, the column order is same as file column order
318
0
        int i = 0;
319
0
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
320
0
            _col_idxs.push_back(i++);
321
0
        }
322
0
    }
323
324
0
    _line_reader_eof = false;
325
0
    return Status::OK();
326
0
}
327
328
// ---- Unified init_reader(ReaderInitContext*) overrides ----
329
330
7.46k
Status CsvReader::_open_file_reader(ReaderInitContext* base_ctx) {
331
7.46k
    _start_offset = _range.start_offset;
332
7.46k
    if (_start_offset == 0) {
333
7.33k
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
334
7.33k
            !_params.file_attributes.header_type.empty()) {
335
92
            std::string header_type = to_lower(_params.file_attributes.header_type);
336
92
            if (header_type == BeConsts::CSV_WITH_NAMES) {
337
64
                _skip_lines = 1;
338
64
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
339
28
                _skip_lines = 2;
340
28
            }
341
7.24k
        } else if (_params.file_attributes.__isset.skip_lines) {
342
7.24k
            _skip_lines = _params.file_attributes.skip_lines;
343
7.24k
        }
344
7.33k
    } else if (_start_offset != 0) {
345
130
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
346
130
            (_file_compress_type == TFileCompressType::UNKNOWN &&
347
130
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
348
0
            return Status::InternalError<false>("For now we do not support split compressed file");
349
0
        }
350
130
        int64_t pre_read_len = std::min(
351
130
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
352
130
                _start_offset);
353
130
        _start_offset -= pre_read_len;
354
130
        _size += pre_read_len;
355
130
        _skip_lines = 1;
356
130
    }
357
358
7.46k
    RETURN_IF_ERROR(_init_options());
359
7.46k
    RETURN_IF_ERROR(_create_file_reader(false));
360
7.46k
    return Status::OK();
361
7.46k
}
362
363
7.46k
Status CsvReader::_do_init_reader(ReaderInitContext* base_ctx) {
364
7.46k
    auto* ctx = checked_context_cast<CsvInitContext>(base_ctx);
365
7.46k
    _is_load = ctx->is_load;
366
367
7.46k
    _use_nullable_string_opt.resize(_file_slot_descs.size());
368
215k
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
369
207k
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
370
207k
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
371
46.8k
            _use_nullable_string_opt[i] = 1;
372
46.8k
        }
373
207k
    }
374
375
7.46k
    RETURN_IF_ERROR(_create_decompressor());
376
7.46k
    RETURN_IF_ERROR(_create_line_reader());
377
378
7.46k
    if (!_is_load) {
379
5.58k
        DCHECK(_params.__isset.column_idxs);
380
5.58k
        _col_idxs = _params.column_idxs;
381
5.58k
        int idx = 0;
382
191k
        for (const auto& slot_info : _params.required_slots) {
383
191k
            if (slot_info.is_file_slot) {
384
189k
                _file_slot_idx_map.push_back(idx);
385
189k
            }
386
191k
            idx++;
387
191k
        }
388
5.58k
    } else {
389
1.88k
        int i = 0;
390
18.0k
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
391
18.0k
            _col_idxs.push_back(i++);
392
18.0k
        }
393
1.88k
    }
394
7.46k
    _line_reader_eof = false;
395
7.46k
    return Status::OK();
396
7.46k
}
397
398
18.2k
void CsvReader::set_batch_size(size_t batch_size) {
399
    // 0 means "not set" / "use default" for the row-based readers; we must
400
    // never let _batch_size be 0 because _do_get_next_block uses it as the
401
    // upper bound of a `while (rows < _batch_size)` loop and a 0 would make
402
    // the reader return empty blocks and incorrectly signal EOF.
403
18.2k
    _batch_size = std::max(batch_size, 1UL);
404
18.2k
}
405
406
// !FIXME: Here we should use MutableBlock
407
19.4k
Status CsvReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
408
19.4k
    if (_line_reader_eof) {
409
7.39k
        *eof = true;
410
7.39k
        return Status::OK();
411
7.39k
    }
412
413
12.1k
    const size_t batch_size = _batch_size;
414
12.1k
    const auto max_block_bytes = _state->preferred_block_size_bytes();
415
12.1k
    size_t rows = 0;
416
417
12.1k
    bool success = false;
418
12.1k
    bool is_remove_bom = false;
419
12.1k
    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
420
140k
        while (rows < batch_size && !_line_reader_eof) {
421
139k
            const uint8_t* ptr = nullptr;
422
139k
            size_t size = 0;
423
139k
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
424
425
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
426
            // is_remove_bom means _remove_bom should only execute once
427
141k
            if (_skip_lines == 0 && !is_remove_bom) {
428
598
                ptr = _remove_bom(ptr, size);
429
598
                is_remove_bom = true;
430
598
            }
431
432
            // _skip_lines > 0 means we do not need to remove bom
433
139k
            if (_skip_lines > 0) {
434
0
                _skip_lines--;
435
0
                is_remove_bom = true;
436
0
                continue;
437
0
            }
438
139k
            if (size == 0) {
439
602
                if (!_line_reader_eof) {
440
4
                    if (_empty_line_as_record() || _state->is_read_csv_empty_line_as_null()) {
441
4
                        ++rows;
442
4
                    }
443
4
                }
444
                // Read empty line, continue
445
602
                continue;
446
602
            }
447
448
138k
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
449
138k
            ++rows;
450
138k
        }
451
598
        auto mutable_columns_guard = block->mutate_columns_scoped();
452
598
        auto& mutate_columns = mutable_columns_guard.mutable_columns();
453
598
        for (auto& col : mutate_columns) {
454
598
            col->resize(rows);
455
598
        }
456
11.5k
    } else {
457
11.5k
        auto columns_guard = block->mutate_columns_scoped();
458
11.5k
        auto& columns = columns_guard.mutable_columns();
459
11.5k
        _reserve_nullable_string_columns(columns, batch_size);
460
15.1M
        while (rows < batch_size && !_line_reader_eof &&
461
15.1M
               (columns_byte_size(columns) < max_block_bytes)) {
462
15.0M
            const uint8_t* ptr = nullptr;
463
15.0M
            size_t size = 0;
464
15.0M
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
465
466
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
467
            // is_remove_bom means _remove_bom should only execute once
468
15.0M
            if (!is_remove_bom && _skip_lines == 0) {
469
11.2k
                ptr = _remove_bom(ptr, size);
470
11.2k
                is_remove_bom = true;
471
11.2k
            }
472
473
            // _skip_lines > 0 means we do not remove bom
474
15.0M
            if (_skip_lines > 0) {
475
319
                _skip_lines--;
476
319
                is_remove_bom = true;
477
319
                continue;
478
319
            }
479
15.0M
            if (size == 0) {
480
6.91k
                if (!_line_reader_eof) {
481
56
                    if (_empty_line_as_record()) {
482
4
                        Slice empty_line("", 0);
483
4
                        RETURN_IF_ERROR(_validate_line(empty_line, &success));
484
4
                        if (success) {
485
4
                            RETURN_IF_ERROR(_fill_dest_columns(empty_line, columns, &rows));
486
4
                        }
487
52
                    } else if (_state->is_read_csv_empty_line_as_null()) {
488
12
                        RETURN_IF_ERROR(_fill_empty_line(columns, &rows));
489
12
                    }
490
56
                }
491
                // Read empty line, continue
492
6.91k
                continue;
493
6.91k
            }
494
495
15.0M
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
496
15.0M
            if (!success) {
497
128
                continue;
498
128
            }
499
15.0M
            RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns, &rows));
500
15.0M
        }
501
11.5k
    }
502
503
12.0k
    *eof = (rows == 0);
504
12.0k
    *read_rows = rows;
505
506
12.0k
    return Status::OK();
507
12.1k
}
508
509
7.46k
Status CsvReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
510
207k
    for (const auto& slot : _file_slot_descs) {
511
207k
        name_to_type->emplace(slot->col_name(), slot->type());
512
207k
    }
513
7.46k
    return Status::OK();
514
7.46k
}
515
516
// init decompressor, file reader and line reader for parsing schema
517
815
Status CsvReader::init_schema_reader() {
518
815
    _start_offset = _range.start_offset;
519
815
    if (_start_offset != 0) {
520
0
        return Status::InvalidArgument(
521
0
                "start offset of TFileRangeDesc must be zero in get parsered schema");
522
0
    }
523
815
    if (_params.file_type == TFileType::FILE_BROKER) {
524
0
        return Status::InternalError<false>(
525
0
                "Getting parsered schema from csv file do not support stream load and broker "
526
0
                "load.");
527
0
    }
528
529
    // csv file without names line and types line.
530
815
    _read_line = 1;
531
815
    _is_parse_name = false;
532
533
815
    if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
534
815
        !_params.file_attributes.header_type.empty()) {
535
92
        std::string header_type = to_lower(_params.file_attributes.header_type);
536
92
        if (header_type == BeConsts::CSV_WITH_NAMES) {
537
63
            _is_parse_name = true;
538
63
        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
539
29
            _read_line = 2;
540
29
            _is_parse_name = true;
541
29
        }
542
92
    }
543
544
815
    RETURN_IF_ERROR(_init_options());
545
815
    RETURN_IF_ERROR(_create_file_reader(true));
546
815
    RETURN_IF_ERROR(_create_decompressor());
547
815
    RETURN_IF_ERROR(_create_line_reader());
548
815
    return Status::OK();
549
815
}
550
551
Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
552
815
                                    std::vector<DataTypePtr>* col_types) {
553
815
    if (_read_line == 1) {
554
786
        if (!_is_parse_name) { //parse csv file without names and types
555
723
            size_t col_nums = 0;
556
723
            RETURN_IF_ERROR(_parse_col_nums(&col_nums));
557
8.01k
            for (size_t i = 0; i < col_nums; ++i) {
558
7.29k
                col_names->emplace_back("c" + std::to_string(i + 1));
559
7.29k
            }
560
716
        } else { // parse csv file with names
561
63
            RETURN_IF_ERROR(_parse_col_names(col_names));
562
63
        }
563
564
8.29k
        for (size_t j = 0; j < col_names->size(); ++j) {
565
7.51k
            col_types->emplace_back(
566
7.51k
                    DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
567
7.51k
        }
568
779
    } else { // parse csv file with names and types
569
29
        RETURN_IF_ERROR(_parse_col_names(col_names));
570
29
        RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
571
29
    }
572
808
    return Status::OK();
573
815
}
574
575
212M
Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
576
    // This is the per-row per-column hot path of CSV load (load reads every column as
577
    // nullable string). The column type was already verified by the checked assert_cast
578
    // in _reserve_nullable_string_columns at the beginning of the batch, so the casts
579
    // here can skip the release-build typeid check.
580
212M
    auto& null_column = assert_cast<ColumnNullable&, TypeCheckOnRelease::DISABLE>(column);
581
212M
    auto& string_column = assert_cast<ColumnString&, TypeCheckOnRelease::DISABLE>(
582
212M
            null_column.get_nested_column());
583
212M
    if (_empty_field_as_null && slice.size == 0) {
584
5
        string_column.insert_default();
585
5
        null_column.get_null_map_data().push_back(1);
586
5
        return Status::OK();
587
5
    }
588
212M
    if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) {
589
210M
        if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
590
31.7k
            string_column.insert_default();
591
31.7k
            null_column.get_null_map_data().push_back(1);
592
31.7k
            return Status::OK();
593
31.7k
        }
594
210M
    }
595
    // Same as DataTypeStringSerDe::deserialize_one_cell_from_csv (which never fails),
596
    // written out here to skip the SerDe layer and its per-cell assert_cast.
597
212M
    if (_options.escape_char != 0) {
598
3.62k
        escape_string_for_csv(slice.data, &slice.size, _options.escape_char, _options.quote_char);
599
3.62k
    }
600
212M
    string_column.insert_data(slice.data, slice.size);
601
212M
    null_column.get_null_map_data().push_back(0);
602
212M
    return Status::OK();
603
212M
}
604
605
3.63k
Status CsvReader::_init_options() {
606
    // get column_separator and line_delimiter
607
3.63k
    _value_separator = _params.file_attributes.text_params.column_separator;
608
3.63k
    _value_separator_length = _value_separator.size();
609
3.63k
    _line_delimiter = _params.file_attributes.text_params.line_delimiter;
610
3.63k
    _line_delimiter_length = _line_delimiter.size();
611
3.63k
    if (_params.file_attributes.text_params.__isset.enclose) {
612
3.63k
        _enclose = _params.file_attributes.text_params.enclose;
613
3.63k
    }
614
3.63k
    if (_params.file_attributes.text_params.__isset.escape) {
615
3.63k
        _escape = _params.file_attributes.text_params.escape;
616
3.63k
    }
617
618
3.63k
    _trim_tailing_spaces =
619
3.63k
            (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
620
621
3.63k
    _options.escape_char = _escape;
622
3.63k
    _options.quote_char = _enclose;
623
624
3.63k
    if (_params.file_attributes.text_params.collection_delimiter.empty()) {
625
3.63k
        _options.collection_delim = ',';
626
3.63k
    } else {
627
0
        _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
628
0
    }
629
3.63k
    if (_params.file_attributes.text_params.mapkv_delimiter.empty()) {
630
3.63k
        _options.map_key_delim = ':';
631
3.63k
    } else {
632
0
        _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
633
0
    }
634
635
3.63k
    if (_params.file_attributes.text_params.__isset.null_format) {
636
34
        _options.null_format = _params.file_attributes.text_params.null_format.data();
637
34
        _options.null_len = _params.file_attributes.text_params.null_format.length();
638
34
    }
639
640
3.63k
    if (_params.file_attributes.__isset.trim_double_quotes) {
641
3.63k
        _trim_double_quotes = _params.file_attributes.trim_double_quotes;
642
3.63k
    }
643
3.63k
    _options.converted_from_string = _trim_double_quotes;
644
645
3.63k
    if (_state != nullptr) {
646
2.82k
        _keep_cr = _state->query_options().keep_carriage_return;
647
2.82k
    }
648
649
3.63k
    if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
650
3.60k
        _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null;
651
3.60k
    }
652
3.63k
    return Status::OK();
653
3.63k
}
654
655
8.28k
Status CsvReader::_create_decompressor() {
656
8.28k
    if (_file_compress_type != TFileCompressType::UNKNOWN) {
657
8.19k
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
658
8.19k
    } else {
659
86
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor));
660
86
    }
661
662
8.28k
    return Status::OK();
663
8.28k
}
664
665
8.28k
Status CsvReader::_create_file_reader(bool need_schema) {
666
8.28k
    if (_params.file_type == TFileType::FILE_STREAM) {
667
1.96k
        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
668
1.96k
                                                        need_schema));
669
6.31k
    } else {
670
6.31k
        _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
671
6.31k
        io::FileReaderOptions reader_options =
672
6.31k
                FileFactory::get_reader_options(_state, _file_description);
673
6.31k
        io::FileReaderSPtr file_reader;
674
6.31k
        if (_io_ctx_holder) {
675
6.30k
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
676
6.30k
                    _profile, _system_properties, _file_description, reader_options,
677
6.30k
                    io::DelegateReader::AccessMode::SEQUENTIAL,
678
6.30k
                    std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
679
6.30k
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
680
6.30k
        } else {
681
6
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
682
6
                    _profile, _system_properties, _file_description, reader_options,
683
6
                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
684
6
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
685
6
        }
686
6.31k
        _file_reader = _io_ctx && _io_ctx->file_reader_stats
687
6.31k
                               ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
688
6.31k
                                                                         _io_ctx->file_reader_stats)
689
6.31k
                               : file_reader;
690
6.31k
    }
691
8.28k
    if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
692
8.28k
        _params.file_type != TFileType::FILE_BROKER) {
693
0
        return Status::EndOfFile("init reader failed, empty csv file: " + _range.path);
694
0
    }
695
8.28k
    return Status::OK();
696
8.28k
}
697
698
3.63k
Status CsvReader::_create_line_reader() {
699
3.63k
    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
700
3.63k
    if (_enclose == 0) {
701
3.47k
        text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
702
3.47k
                _line_delimiter, _line_delimiter_length, _keep_cr);
703
3.47k
        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
704
3.47k
                _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
705
706
3.47k
    } else {
707
        // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0
708
154
        size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0;
709
154
        _enclose_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>(
710
154
                _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
711
154
                col_sep_num, _enclose, _escape, _keep_cr);
712
154
        text_line_reader_ctx = _enclose_reader_ctx;
713
714
154
        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
715
154
                _trim_tailing_spaces, true, _enclose_reader_ctx, _value_separator_length, _enclose);
716
154
    }
717
3.63k
    switch (_file_format_type) {
718
3.55k
    case TFileFormatType::FORMAT_CSV_PLAIN:
719
3.55k
        [[fallthrough]];
720
3.55k
    case TFileFormatType::FORMAT_CSV_GZ:
721
3.55k
        [[fallthrough]];
722
3.55k
    case TFileFormatType::FORMAT_CSV_BZ2:
723
3.55k
        [[fallthrough]];
724
3.55k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
725
3.55k
        [[fallthrough]];
726
3.55k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
727
3.55k
        [[fallthrough]];
728
3.55k
    case TFileFormatType::FORMAT_CSV_LZOP:
729
3.55k
        [[fallthrough]];
730
3.55k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
731
3.55k
        [[fallthrough]];
732
3.55k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
733
3.55k
        _line_reader =
734
3.55k
                NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
735
3.55k
                                                      text_line_reader_ctx, _size, _start_offset);
736
737
3.55k
        break;
738
77
    case TFileFormatType::FORMAT_PROTO:
739
77
        _fields_splitter = std::make_unique<CsvProtoFieldSplitter>();
740
77
        _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader);
741
77
        break;
742
0
    default:
743
0
        return Status::InternalError<false>(
744
0
                "Unknown format type, cannot init line reader in csv reader, type={}",
745
0
                _file_format_type);
746
3.63k
    }
747
3.63k
    return Status::OK();
748
3.63k
}
749
750
5.24k
Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
751
5.24k
    return serde->deserialize_one_cell_from_csv(column, slice, _options);
752
5.24k
}
753
754
Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns,
755
15.1M
                                     size_t* rows) {
756
15.1M
    bool is_success = false;
757
758
15.1M
    RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
759
15.1M
    if (UNLIKELY(!is_success)) {
760
        // If not success, which means we met an invalid row, filter this row and return.
761
539
        return Status::OK();
762
539
    }
763
764
228M
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
765
213M
        int col_idx = _col_idxs[i];
766
        // col idx is out of range, fill with null format
767
213M
        auto value = col_idx < _split_values.size()
768
214M
                             ? _split_values[col_idx]
769
18.4E
                             : Slice(_options.null_format, _options.null_len);
770
771
213M
        IColumn* col_ptr = columns[i].get();
772
213M
        if (!_is_load) {
773
41.2M
            col_ptr = columns[_file_slot_idx_map[i]].get();
774
41.2M
        }
775
776
214M
        if (_use_nullable_string_opt[i]) {
777
            // For load task, we always read "string" from file.
778
            // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe.
779
            // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
780
214M
            RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
781
18.4E
        } else {
782
18.4E
            RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
783
18.4E
        }
784
213M
    }
785
15.1M
    ++(*rows);
786
787
15.1M
    return Status::OK();
788
15.1M
}
789
790
void CsvReader::_reserve_nullable_string_columns(std::vector<MutableColumnPtr>& columns,
791
11.5k
                                                 size_t batch_size) {
792
316k
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
793
305k
        if (!_use_nullable_string_opt[i]) {
794
195k
            continue;
795
195k
        }
796
109k
        IColumn* col_ptr = _is_load ? columns[i].get() : columns[_file_slot_idx_map[i]].get();
797
        // The checked casts here (once per batch) guarantee the column types for the
798
        // unchecked per-row casts in _deserialize_nullable_string.
799
109k
        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
800
109k
        auto& string_column = assert_cast<ColumnString&>(null_column.get_nested_column());
801
        // Reserve up front so the per-row loop does not pay for incremental growth.
802
        // The string chars are not reserved because their total size is unpredictable.
803
109k
        string_column.get_offsets().reserve(string_column.size() + batch_size);
804
109k
        null_column.get_null_map_data().reserve(null_column.get_null_map_data().size() +
805
109k
                                                batch_size);
806
109k
    }
807
11.5k
}
808
809
12
Status CsvReader::_fill_empty_line(std::vector<MutableColumnPtr>& columns, size_t* rows) {
810
48
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
811
36
        IColumn* col_ptr = columns[i].get();
812
36
        if (!_is_load) {
813
36
            col_ptr = columns[_file_slot_idx_map[i]].get();
814
36
        }
815
36
        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
816
36
        null_column.insert_data(nullptr, 0);
817
36
    }
818
12
    ++(*rows);
819
12
    return Status::OK();
820
12
}
821
822
14.9M
Status CsvReader::_validate_line(const Slice& line, bool* success) {
823
14.9M
    if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) {
824
130
        if (!_is_load) {
825
2
            return Status::InternalError<false>("Only support csv data in utf8 codec");
826
128
        } else {
827
128
            _counter->num_rows_filtered++;
828
128
            *success = false;
829
128
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
830
128
                    [&]() -> std::string { return std::string(line.data, line.size); },
831
128
                    [&]() -> std::string {
832
128
                        return "Invalid file encoding: all CSV files must be UTF-8 encoded";
833
128
                    }));
834
128
            return Status::OK();
835
128
        }
836
130
    }
837
14.9M
    *success = true;
838
14.9M
    return Status::OK();
839
14.9M
}
840
841
15.1M
Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
842
15.1M
    _split_line(line);
843
844
15.1M
    if (_is_load) {
845
        // Only check for load task. For query task, the non exist column will be filled "null".
846
        // if actual column number in csv file is not equal to _file_slot_descs.size()
847
        // then filter this line.
848
10.9M
        bool ignore_col = false;
849
10.9M
        ignore_col = _params.__isset.file_attributes &&
850
10.9M
                     _params.file_attributes.__isset.ignore_csv_redundant_col &&
851
10.9M
                     _params.file_attributes.ignore_csv_redundant_col;
852
853
10.9M
        if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
854
10.9M
            (ignore_col && _split_values.size() < _file_slot_descs.size())) {
855
544
            _counter->num_rows_filtered++;
856
544
            *success = false;
857
544
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
858
544
                    [&]() -> std::string { return std::string(line.data, line.size); },
859
544
                    [&]() -> std::string {
860
544
                        fmt::memory_buffer error_msg;
861
544
                        fmt::format_to(error_msg,
862
544
                                       "Column count mismatch: expected {}, but found {}",
863
544
                                       _file_slot_descs.size(), _split_values.size());
864
544
                        std::string escaped_separator =
865
544
                                std::regex_replace(_value_separator, std::regex("\t"), "\\t");
866
544
                        std::string escaped_delimiter =
867
544
                                std::regex_replace(_line_delimiter, std::regex("\n"), "\\n");
868
544
                        fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator,
869
544
                                       escaped_delimiter);
870
544
                        if (_enclose != 0) {
871
544
                            fmt::format_to(error_msg, " encl:{}", _enclose);
872
544
                        }
873
544
                        if (_escape != 0) {
874
544
                            fmt::format_to(error_msg, " esc:{}", _escape);
875
544
                        }
876
544
                        fmt::format_to(error_msg, ")");
877
544
                        return fmt::to_string(error_msg);
878
544
                    }));
879
539
            return Status::OK();
880
544
        }
881
10.9M
    }
882
883
15.1M
    *success = true;
884
15.1M
    return Status::OK();
885
15.1M
}
886
887
15.1M
void CsvReader::_split_line(const Slice& line) {
888
15.1M
    _split_values.clear();
889
15.1M
    _fields_splitter->split_line(line, &_split_values);
890
15.1M
}
891
892
723
Status CsvReader::_parse_col_nums(size_t* col_nums) {
893
723
    const uint8_t* ptr = nullptr;
894
723
    size_t size = 0;
895
723
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
896
723
    if (size == 0) {
897
2
        return Status::InternalError<false>(
898
2
                "The first line is empty, can not parse column numbers");
899
2
    }
900
721
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
901
5
        return Status::InternalError<false>("Only support csv data in utf8 codec");
902
5
    }
903
716
    ptr = _remove_bom(ptr, size);
904
716
    _split_line(Slice(ptr, size));
905
716
    *col_nums = _split_values.size();
906
716
    return Status::OK();
907
721
}
908
909
92
Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
910
92
    const uint8_t* ptr = nullptr;
911
92
    size_t size = 0;
912
    // no use of _line_reader_eof
913
92
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
914
92
    if (size == 0) {
915
0
        return Status::InternalError<false>("The first line is empty, can not parse column names");
916
0
    }
917
92
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
918
0
        return Status::InternalError<false>("Only support csv data in utf8 codec");
919
0
    }
920
92
    ptr = _remove_bom(ptr, size);
921
92
    _split_line(Slice(ptr, size));
922
345
    for (auto _split_value : _split_values) {
923
345
        col_names->emplace_back(_split_value.to_string());
924
345
    }
925
92
    return Status::OK();
926
92
}
927
928
// TODO(ftw): parse type
929
29
Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) {
930
    // delete after.
931
155
    for (size_t i = 0; i < col_nums; ++i) {
932
126
        col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
933
126
    }
934
935
    // 1. check _line_reader_eof
936
    // 2. read line
937
    // 3. check utf8
938
    // 4. check size
939
    // 5. check _split_values.size must equal to col_nums.
940
    // 6. fill col_types
941
29
    return Status::OK();
942
29
}
943
944
12.6k
const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) {
945
12.6k
    if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) {
946
7
        LOG(INFO) << "remove bom";
947
7
        constexpr size_t bom_size = 3;
948
7
        size -= bom_size;
949
        // In enclose mode, column_sep_positions were computed on the original line
950
        // (including BOM). After shifting the pointer, we must adjust those positions
951
        // so they remain correct relative to the new start.
952
7
        if (_enclose_reader_ctx) {
953
1
            _enclose_reader_ctx->adjust_column_sep_positions(bom_size);
954
1
        }
955
7
        return ptr + bom_size;
956
7
    }
957
12.6k
    return ptr;
958
12.6k
}
959
960
7.47k
Status CsvReader::close() {
961
7.47k
    if (_line_reader) {
962
7.47k
        _line_reader->close();
963
7.47k
    }
964
965
7.47k
    if (_file_reader) {
966
7.47k
        RETURN_IF_ERROR(_file_reader->close());
967
7.47k
    }
968
969
7.47k
    return Status::OK();
970
7.47k
}
971
972
} // namespace doris