Coverage Report

Created: 2026-05-25 15:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/csv/csv_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <algorithm>
26
#include <cstddef>
27
#include <map>
28
#include <memory>
29
#include <numeric>
30
#include <ostream>
31
#include <regex>
32
#include <utility>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/config.h"
36
#include "common/consts.h"
37
#include "common/status.h"
38
#include "core/block/block.h"
39
#include "core/block/column_with_type_and_name.h"
40
#include "core/data_type/data_type_factory.hpp"
41
#include "exec/scan/scanner.h"
42
#include "format/file_reader/new_plain_binary_line_reader.h"
43
#include "format/file_reader/new_plain_text_line_reader.h"
44
#include "format/line_reader.h"
45
#include "io/file_factory.h"
46
#include "io/fs/broker_file_reader.h"
47
#include "io/fs/buffered_reader.h"
48
#include "io/fs/file_reader.h"
49
#include "io/fs/s3_file_reader.h"
50
#include "io/fs/tracing_file_reader.h"
51
#include "runtime/descriptors.h"
52
#include "runtime/runtime_state.h"
53
#include "util/decompressor.h"
54
#include "util/string_util.h"
55
#include "util/utf8_check.h"
56
57
namespace doris {
58
class RuntimeProfile;
59
class IColumn;
60
namespace io {
61
struct IOContext;
62
enum class FileCachePolicy : uint8_t;
63
} // namespace io
64
} // namespace doris
65
66
namespace doris {
67
68
namespace {
69
70
14.9M
size_t columns_byte_size(const std::vector<MutableColumnPtr>& columns) {
71
14.9M
    size_t bytes = 0;
72
215M
    for (const auto& column : columns) {
73
215M
        DCHECK(column.get() != nullptr);
74
215M
        bytes += column->byte_size();
75
215M
    }
76
14.9M
    return bytes;
77
14.9M
}
78
79
} // namespace
80
81
878
void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
82
878
    const char* data = line.data;
83
878
    const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions();
84
878
    size_t value_start_offset = 0;
85
2.98k
    for (auto idx : column_sep_positions) {
86
2.98k
        process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char,
87
2.98k
                           splitted_values);
88
2.98k
        value_start_offset = idx + _value_sep_len;
89
2.98k
    }
90
878
    if (line.size >= value_start_offset) {
91
        // process the last column
92
876
        process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char,
93
876
                           splitted_values);
94
876
    }
95
878
}
96
97
void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line,
98
14.7M
                                                         std::vector<Slice>* splitted_values) {
99
14.7M
    const char* data = line.data;
100
14.7M
    const size_t size = line.size;
101
14.7M
    size_t value_start = 0;
102
3.58G
    for (size_t i = 0; i < size; ++i) {
103
3.56G
        if (data[i] == _value_sep[0]) {
104
446M
            process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
105
446M
            value_start = i + _value_sep_len;
106
446M
        }
107
3.56G
    }
108
14.7M
    process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
109
14.7M
}
110
111
void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line,
112
2.38k
                                                        std::vector<Slice>* splitted_values) {
113
2.38k
    size_t start = 0;  // point to the start pos of next col value.
114
2.38k
    size_t curpos = 0; // point to the start pos of separator matching sequence.
115
116
    // value_sep : AAAA
117
    // line.data : 1234AAAA5678
118
    // -> 1234,5678
119
120
    //    start   start
121
    //      ▼       ▼
122
    //      1234AAAA5678\0
123
    //          ▲       ▲
124
    //      curpos     curpos
125
126
    //kmp
127
2.38k
    std::vector<int> next(_value_sep_len);
128
2.38k
    next[0] = -1;
129
4.96k
    for (int i = 1, j = -1; i < _value_sep_len; i++) {
130
2.60k
        while (j > -1 && _value_sep[i] != _value_sep[j + 1]) {
131
20
            j = next[j];
132
20
        }
133
2.58k
        if (_value_sep[i] == _value_sep[j + 1]) {
134
2.44k
            j++;
135
2.44k
        }
136
2.58k
        next[i] = j;
137
2.58k
    }
138
139
42.1k
    for (int i = 0, j = -1; i < line.size; i++) {
140
        // i : line
141
        // j : _value_sep
142
42.4k
        while (j > -1 && line[i] != _value_sep[j + 1]) {
143
2.66k
            j = next[j];
144
2.66k
        }
145
39.7k
        if (line[i] == _value_sep[j + 1]) {
146
7.02k
            j++;
147
7.02k
        }
148
39.7k
        if (j == _value_sep_len - 1) {
149
3.19k
            curpos = i - _value_sep_len + 1;
150
151
            /*
152
             * column_separator : "xx"
153
             * data.csv :  data1xxxxdata2
154
             *
155
             * Parse incorrectly:
156
             *      data1[xx]xxdata2
157
             *      data1x[xx]xdata2
158
             *      data1xx[xx]data2
159
             * The string "xxxx" is parsed into three "xx" delimiters.
160
             *
161
             * Parse correctly:
162
             *      data1[xx]xxdata2
163
             *      data1xx[xx]data2
164
             */
165
166
3.19k
            if (curpos >= start) {
167
3.13k
                process_value_func(line.data, start, curpos - start, _trimming_char,
168
3.13k
                                   splitted_values);
169
3.13k
                start = i + 1;
170
3.13k
            }
171
172
3.19k
            j = next[j];
173
3.19k
        }
174
39.7k
    }
175
2.38k
    process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values);
176
2.38k
}
177
178
14.7M
void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
179
14.7M
    if (is_single_char_delim) {
180
14.7M
        _split_field_single_char(line, splitted_values);
181
14.7M
    } else {
182
1.33k
        _split_field_multi_char(line, splitted_values);
183
1.33k
    }
184
14.7M
}
185
186
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
187
                     const TFileScanRangeParams& params, const TFileRangeDesc& range,
188
                     const std::vector<SlotDescriptor*>& file_slot_descs, size_t batch_size,
189
                     io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
190
8.05k
        : _profile(profile),
191
8.05k
          _params(params),
192
8.05k
          _file_reader(nullptr),
193
8.05k
          _line_reader(nullptr),
194
8.05k
          _decompressor(nullptr),
195
8.05k
          _state(state),
196
8.05k
          _counter(counter),
197
8.05k
          _range(range),
198
8.05k
          _file_slot_descs(file_slot_descs),
199
8.05k
          _line_reader_eof(false),
200
8.05k
          _skip_lines(0),
201
8.05k
          _io_ctx(io_ctx),
202
8.05k
          _io_ctx_holder(std::move(io_ctx_holder)),
203
8.05k
          _batch_size(std::max(batch_size, 1UL)) {
204
8.05k
    if (_io_ctx == nullptr && _io_ctx_holder) {
205
0
        _io_ctx = _io_ctx_holder.get();
206
0
    }
207
8.05k
    _file_format_type = _params.format_type;
208
8.05k
    _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
209
8.05k
    if (_range.__isset.compress_type) {
210
        // for compatibility
211
6.07k
        _file_compress_type = _range.compress_type;
212
6.07k
    } else {
213
1.98k
        _file_compress_type = _params.compress_type;
214
1.98k
    }
215
8.05k
    _size = _range.size;
216
217
8.05k
    _split_values.reserve(_file_slot_descs.size());
218
8.05k
    _init_system_properties();
219
8.05k
    _init_file_description();
220
8.05k
    _serdes = create_data_type_serdes(_file_slot_descs);
221
8.05k
}
222
223
8.03k
void CsvReader::_init_system_properties() {
224
8.03k
    if (_range.__isset.file_type) {
225
        // for compatibility
226
5.90k
        _system_properties.system_type = _range.file_type;
227
5.90k
    } else {
228
2.13k
        _system_properties.system_type = _params.file_type;
229
2.13k
    }
230
8.03k
    _system_properties.properties = _params.properties;
231
8.03k
    _system_properties.hdfs_params = _params.hdfs_params;
232
8.03k
    if (_params.__isset.broker_addresses) {
233
1.87k
        _system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
234
1.87k
                                                   _params.broker_addresses.end());
235
1.87k
    }
236
8.03k
}
237
238
8.05k
void CsvReader::_init_file_description() {
239
8.05k
    _file_description.path = _range.path;
240
18.4E
    _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
241
8.05k
    if (_range.__isset.fs_name) {
242
4.75k
        _file_description.fs_name = _range.fs_name;
243
4.75k
    }
244
8.05k
    if (_range.__isset.file_cache_admission) {
245
5.26k
        _file_description.file_cache_admission = _range.file_cache_admission;
246
5.26k
    }
247
8.05k
}
248
249
0
Status CsvReader::init_reader(bool is_load) {
250
    // set the skip lines and start offset
251
0
    _start_offset = _range.start_offset;
252
0
    if (_start_offset == 0) {
253
        // check header typer first
254
0
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
255
0
            !_params.file_attributes.header_type.empty()) {
256
0
            std::string header_type = to_lower(_params.file_attributes.header_type);
257
0
            if (header_type == BeConsts::CSV_WITH_NAMES) {
258
0
                _skip_lines = 1;
259
0
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
260
0
                _skip_lines = 2;
261
0
            }
262
0
        } else if (_params.file_attributes.__isset.skip_lines) {
263
0
            _skip_lines = _params.file_attributes.skip_lines;
264
0
        }
265
0
    } else if (_start_offset != 0) {
266
0
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
267
0
            (_file_compress_type == TFileCompressType::UNKNOWN &&
268
0
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
269
0
            return Status::InternalError<false>("For now we do not support split compressed file");
270
0
        }
271
        // pre-read to promise first line skipped always read
272
0
        int64_t pre_read_len = std::min(
273
0
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
274
0
                _start_offset);
275
0
        _start_offset -= pre_read_len;
276
0
        _size += pre_read_len;
277
        // not first range will always skip one line
278
0
        _skip_lines = 1;
279
0
    }
280
281
0
    _use_nullable_string_opt.resize(_file_slot_descs.size());
282
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
283
0
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
284
0
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
285
0
            _use_nullable_string_opt[i] = 1;
286
0
        }
287
0
    }
288
289
0
    RETURN_IF_ERROR(_init_options());
290
0
    RETURN_IF_ERROR(_create_file_reader(false));
291
0
    RETURN_IF_ERROR(_create_decompressor());
292
0
    RETURN_IF_ERROR(_create_line_reader());
293
294
0
    _is_load = is_load;
295
0
    if (!_is_load) {
296
        // For query task, there are 2 slot mapping.
297
        // One is from file slot to values in line.
298
        //      eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
299
        //      the _col_idxs will save: 0, 2, 4
300
        // The other is from file slot to columns in output block
301
        //      eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
302
        //      where "p1" is the partition col which does not exist in file
303
        //      the _file_slot_idx_map will save: 1, 2, 3
304
0
        DCHECK(_params.__isset.column_idxs);
305
0
        _col_idxs = _params.column_idxs;
306
0
        int idx = 0;
307
0
        for (const auto& slot_info : _params.required_slots) {
308
0
            if (slot_info.is_file_slot) {
309
0
                _file_slot_idx_map.push_back(idx);
310
0
            }
311
0
            idx++;
312
0
        }
313
0
    } else {
314
        // For load task, the column order is same as file column order
315
0
        int i = 0;
316
0
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
317
0
            _col_idxs.push_back(i++);
318
0
        }
319
0
    }
320
321
0
    _line_reader_eof = false;
322
0
    return Status::OK();
323
0
}
324
325
// ---- Unified init_reader(ReaderInitContext*) overrides ----
326
327
7.23k
Status CsvReader::_open_file_reader(ReaderInitContext* base_ctx) {
328
7.23k
    _start_offset = _range.start_offset;
329
7.23k
    if (_start_offset == 0) {
330
7.10k
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
331
7.10k
            !_params.file_attributes.header_type.empty()) {
332
92
            std::string header_type = to_lower(_params.file_attributes.header_type);
333
92
            if (header_type == BeConsts::CSV_WITH_NAMES) {
334
64
                _skip_lines = 1;
335
64
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
336
28
                _skip_lines = 2;
337
28
            }
338
7.01k
        } else if (_params.file_attributes.__isset.skip_lines) {
339
7.01k
            _skip_lines = _params.file_attributes.skip_lines;
340
7.01k
        }
341
7.10k
    } else if (_start_offset != 0) {
342
130
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
343
130
            (_file_compress_type == TFileCompressType::UNKNOWN &&
344
130
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
345
0
            return Status::InternalError<false>("For now we do not support split compressed file");
346
0
        }
347
130
        int64_t pre_read_len = std::min(
348
130
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
349
130
                _start_offset);
350
130
        _start_offset -= pre_read_len;
351
130
        _size += pre_read_len;
352
130
        _skip_lines = 1;
353
130
    }
354
355
7.23k
    RETURN_IF_ERROR(_init_options());
356
7.23k
    RETURN_IF_ERROR(_create_file_reader(false));
357
7.23k
    return Status::OK();
358
7.23k
}
359
360
7.22k
Status CsvReader::_do_init_reader(ReaderInitContext* base_ctx) {
361
7.22k
    auto* ctx = checked_context_cast<CsvInitContext>(base_ctx);
362
7.22k
    _is_load = ctx->is_load;
363
364
7.22k
    _use_nullable_string_opt.resize(_file_slot_descs.size());
365
204k
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
366
197k
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
367
197k
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
368
45.0k
            _use_nullable_string_opt[i] = 1;
369
45.0k
        }
370
197k
    }
371
372
7.22k
    RETURN_IF_ERROR(_create_decompressor());
373
7.22k
    RETURN_IF_ERROR(_create_line_reader());
374
375
7.22k
    if (!_is_load) {
376
5.37k
        DCHECK(_params.__isset.column_idxs);
377
5.37k
        _col_idxs = _params.column_idxs;
378
5.37k
        int idx = 0;
379
181k
        for (const auto& slot_info : _params.required_slots) {
380
181k
            if (slot_info.is_file_slot) {
381
179k
                _file_slot_idx_map.push_back(idx);
382
179k
            }
383
181k
            idx++;
384
181k
        }
385
5.37k
    } else {
386
1.85k
        int i = 0;
387
17.5k
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
388
17.5k
            _col_idxs.push_back(i++);
389
17.5k
        }
390
1.85k
    }
391
7.22k
    _line_reader_eof = false;
392
7.22k
    return Status::OK();
393
7.22k
}
394
395
17.9k
void CsvReader::set_batch_size(size_t batch_size) {
396
    // 0 means "not set" / "use default" for the row-based readers; we must
397
    // never let _batch_size be 0 because _do_get_next_block uses it as the
398
    // upper bound of a `while (rows < _batch_size)` loop and a 0 would make
399
    // the reader return empty blocks and incorrectly signal EOF.
400
17.9k
    _batch_size = std::max(batch_size, 1UL);
401
17.9k
}
402
403
// !FIXME: Here we should use MutableBlock
404
19.0k
Status CsvReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
405
19.0k
    if (_line_reader_eof) {
406
7.16k
        *eof = true;
407
7.16k
        return Status::OK();
408
7.16k
    }
409
410
11.9k
    const size_t batch_size = _batch_size;
411
11.9k
    const auto max_block_bytes = _state->preferred_block_size_bytes();
412
11.9k
    size_t rows = 0;
413
414
11.9k
    bool success = false;
415
11.9k
    bool is_remove_bom = false;
416
11.9k
    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
417
140k
        while (rows < batch_size && !_line_reader_eof) {
418
139k
            const uint8_t* ptr = nullptr;
419
139k
            size_t size = 0;
420
139k
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
421
422
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
423
            // is_remove_bom means _remove_bom should only execute once
424
140k
            if (_skip_lines == 0 && !is_remove_bom) {
425
560
                ptr = _remove_bom(ptr, size);
426
560
                is_remove_bom = true;
427
560
            }
428
429
            // _skip_lines > 0 means we do not need to remove bom
430
139k
            if (_skip_lines > 0) {
431
0
                _skip_lines--;
432
0
                is_remove_bom = true;
433
0
                continue;
434
0
            }
435
139k
            if (size == 0) {
436
562
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
437
0
                    ++rows;
438
0
                }
439
                // Read empty line, continue
440
562
                continue;
441
562
            }
442
443
138k
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
444
138k
            ++rows;
445
138k
        }
446
560
        auto mutable_columns_guard = block->mutate_columns_scoped();
447
560
        auto& mutate_columns = mutable_columns_guard.mutable_columns();
448
562
        for (auto& col : mutate_columns) {
449
562
            col->resize(rows);
450
562
        }
451
11.3k
    } else {
452
11.3k
        auto columns_guard = block->mutate_columns_scoped();
453
11.3k
        auto& columns = columns_guard.mutable_columns();
454
14.9M
        while (rows < batch_size && !_line_reader_eof &&
455
14.9M
               (columns_byte_size(columns) < max_block_bytes)) {
456
14.9M
            const uint8_t* ptr = nullptr;
457
14.9M
            size_t size = 0;
458
14.9M
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
459
460
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
461
            // is_remove_bom means _remove_bom should only execute once
462
14.9M
            if (!is_remove_bom && _skip_lines == 0) {
463
11.1k
                ptr = _remove_bom(ptr, size);
464
11.1k
                is_remove_bom = true;
465
11.1k
            }
466
467
            // _skip_lines > 0 means we do not remove bom
468
14.9M
            if (_skip_lines > 0) {
469
319
                _skip_lines--;
470
319
                is_remove_bom = true;
471
319
                continue;
472
319
            }
473
14.9M
            if (size == 0) {
474
6.71k
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
475
12
                    RETURN_IF_ERROR(_fill_empty_line(columns, &rows));
476
12
                }
477
                // Read empty line, continue
478
6.71k
                continue;
479
6.71k
            }
480
481
14.9M
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
482
14.9M
            if (!success) {
483
128
                continue;
484
128
            }
485
14.9M
            RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), columns, &rows));
486
14.9M
        }
487
11.3k
    }
488
489
11.9k
    *eof = (rows == 0);
490
11.9k
    *read_rows = rows;
491
492
11.9k
    return Status::OK();
493
11.9k
}
494
495
7.24k
Status CsvReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
496
197k
    for (const auto& slot : _file_slot_descs) {
497
197k
        name_to_type->emplace(slot->col_name(), slot->type());
498
197k
    }
499
7.24k
    return Status::OK();
500
7.24k
}
501
502
// init decompressor, file reader and line reader for parsing schema
503
814
Status CsvReader::init_schema_reader() {
504
814
    _start_offset = _range.start_offset;
505
814
    if (_start_offset != 0) {
506
0
        return Status::InvalidArgument(
507
0
                "start offset of TFileRangeDesc must be zero in get parsered schema");
508
0
    }
509
814
    if (_params.file_type == TFileType::FILE_BROKER) {
510
0
        return Status::InternalError<false>(
511
0
                "Getting parsered schema from csv file do not support stream load and broker "
512
0
                "load.");
513
0
    }
514
515
    // csv file without names line and types line.
516
814
    _read_line = 1;
517
814
    _is_parse_name = false;
518
519
814
    if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
520
814
        !_params.file_attributes.header_type.empty()) {
521
92
        std::string header_type = to_lower(_params.file_attributes.header_type);
522
92
        if (header_type == BeConsts::CSV_WITH_NAMES) {
523
63
            _is_parse_name = true;
524
63
        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
525
29
            _read_line = 2;
526
29
            _is_parse_name = true;
527
29
        }
528
92
    }
529
530
814
    RETURN_IF_ERROR(_init_options());
531
814
    RETURN_IF_ERROR(_create_file_reader(true));
532
814
    RETURN_IF_ERROR(_create_decompressor());
533
814
    RETURN_IF_ERROR(_create_line_reader());
534
814
    return Status::OK();
535
814
}
536
537
Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
538
814
                                    std::vector<DataTypePtr>* col_types) {
539
814
    if (_read_line == 1) {
540
785
        if (!_is_parse_name) { //parse csv file without names and types
541
722
            size_t col_nums = 0;
542
722
            RETURN_IF_ERROR(_parse_col_nums(&col_nums));
543
8.00k
            for (size_t i = 0; i < col_nums; ++i) {
544
7.29k
                col_names->emplace_back("c" + std::to_string(i + 1));
545
7.29k
            }
546
715
        } else { // parse csv file with names
547
63
            RETURN_IF_ERROR(_parse_col_names(col_names));
548
63
        }
549
550
8.29k
        for (size_t j = 0; j < col_names->size(); ++j) {
551
7.51k
            col_types->emplace_back(
552
7.51k
                    DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
553
7.51k
        }
554
778
    } else { // parse csv file with names and types
555
29
        RETURN_IF_ERROR(_parse_col_names(col_names));
556
29
        RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
557
29
    }
558
807
    return Status::OK();
559
814
}
560
561
210M
Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
562
210M
    auto& null_column = assert_cast<ColumnNullable&>(column);
563
210M
    if (_empty_field_as_null) {
564
60
        if (slice.size == 0) {
565
5
            null_column.insert_data(nullptr, 0);
566
5
            return Status::OK();
567
5
        }
568
60
    }
569
213M
    if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) {
570
213M
        if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
571
31.4k
            null_column.insert_data(nullptr, 0);
572
31.4k
            return Status::OK();
573
31.4k
        }
574
213M
    }
575
210M
    static DataTypeStringSerDe stringSerDe(TYPE_STRING);
576
210M
    auto st = stringSerDe.deserialize_one_cell_from_csv(null_column.get_nested_column(), slice,
577
210M
                                                        _options);
578
210M
    if (!st.ok()) {
579
        // fill null if fail
580
0
        null_column.insert_data(nullptr, 0); // 0 is meaningless here
581
0
        return Status::OK();
582
0
    }
583
    // fill not null if success
584
210M
    null_column.get_null_map_data().push_back(0);
585
210M
    return Status::OK();
586
210M
}
587
588
3.61k
Status CsvReader::_init_options() {
589
    // get column_separator and line_delimiter
590
3.61k
    _value_separator = _params.file_attributes.text_params.column_separator;
591
3.61k
    _value_separator_length = _value_separator.size();
592
3.61k
    _line_delimiter = _params.file_attributes.text_params.line_delimiter;
593
3.61k
    _line_delimiter_length = _line_delimiter.size();
594
3.61k
    if (_params.file_attributes.text_params.__isset.enclose) {
595
3.61k
        _enclose = _params.file_attributes.text_params.enclose;
596
3.61k
    }
597
3.61k
    if (_params.file_attributes.text_params.__isset.escape) {
598
3.61k
        _escape = _params.file_attributes.text_params.escape;
599
3.61k
    }
600
601
3.61k
    _trim_tailing_spaces =
602
3.61k
            (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
603
604
3.61k
    _options.escape_char = _escape;
605
3.61k
    _options.quote_char = _enclose;
606
607
3.61k
    if (_params.file_attributes.text_params.collection_delimiter.empty()) {
608
3.61k
        _options.collection_delim = ',';
609
3.61k
    } else {
610
0
        _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
611
0
    }
612
3.61k
    if (_params.file_attributes.text_params.mapkv_delimiter.empty()) {
613
3.61k
        _options.map_key_delim = ':';
614
3.61k
    } else {
615
3
        _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
616
3
    }
617
618
3.61k
    if (_params.file_attributes.text_params.__isset.null_format) {
619
34
        _options.null_format = _params.file_attributes.text_params.null_format.data();
620
34
        _options.null_len = _params.file_attributes.text_params.null_format.length();
621
34
    }
622
623
3.61k
    if (_params.file_attributes.__isset.trim_double_quotes) {
624
3.61k
        _trim_double_quotes = _params.file_attributes.trim_double_quotes;
625
3.61k
    }
626
3.61k
    _options.converted_from_string = _trim_double_quotes;
627
628
3.61k
    if (_state != nullptr) {
629
2.80k
        _keep_cr = _state->query_options().keep_carriage_return;
630
2.80k
    }
631
632
3.61k
    if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
633
3.58k
        _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null;
634
3.58k
    }
635
3.61k
    return Status::OK();
636
3.61k
}
637
638
8.05k
Status CsvReader::_create_decompressor() {
639
8.05k
    if (_file_compress_type != TFileCompressType::UNKNOWN) {
640
7.96k
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
641
7.96k
    } else {
642
86
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor));
643
86
    }
644
645
8.05k
    return Status::OK();
646
8.05k
}
647
648
8.04k
Status CsvReader::_create_file_reader(bool need_schema) {
649
8.04k
    if (_params.file_type == TFileType::FILE_STREAM) {
650
1.95k
        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
651
1.95k
                                                        need_schema));
652
6.09k
    } else {
653
18.4E
        _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
654
6.09k
        io::FileReaderOptions reader_options =
655
6.09k
                FileFactory::get_reader_options(_state, _file_description);
656
6.09k
        io::FileReaderSPtr file_reader;
657
6.09k
        if (_io_ctx_holder) {
658
690
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
659
690
                    _profile, _system_properties, _file_description, reader_options,
660
690
                    io::DelegateReader::AccessMode::SEQUENTIAL,
661
690
                    std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
662
690
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
663
5.40k
        } else {
664
5.40k
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
665
5.40k
                    _profile, _system_properties, _file_description, reader_options,
666
5.40k
                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
667
5.40k
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
668
5.40k
        }
669
6.09k
        _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
670
6.09k
                                                                         _io_ctx->file_reader_stats)
671
18.4E
                               : file_reader;
672
6.09k
    }
673
8.04k
    if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
674
8.04k
        _params.file_type != TFileType::FILE_BROKER) {
675
0
        return Status::EndOfFile("init reader failed, empty csv file: " + _range.path);
676
0
    }
677
8.04k
    return Status::OK();
678
8.04k
}
679
680
3.61k
Status CsvReader::_create_line_reader() {
681
3.61k
    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
682
3.61k
    if (_enclose == 0) {
683
3.46k
        text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
684
3.46k
                _line_delimiter, _line_delimiter_length, _keep_cr);
685
3.46k
        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
686
3.46k
                _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
687
688
3.46k
    } else {
689
        // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0
690
157
        size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0;
691
157
        _enclose_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>(
692
157
                _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
693
157
                col_sep_num, _enclose, _escape, _keep_cr);
694
157
        text_line_reader_ctx = _enclose_reader_ctx;
695
696
157
        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
697
157
                _trim_tailing_spaces, true, _enclose_reader_ctx, _value_separator_length, _enclose);
698
157
    }
699
3.61k
    switch (_file_format_type) {
700
3.54k
    case TFileFormatType::FORMAT_CSV_PLAIN:
701
3.54k
        [[fallthrough]];
702
3.54k
    case TFileFormatType::FORMAT_CSV_GZ:
703
3.54k
        [[fallthrough]];
704
3.54k
    case TFileFormatType::FORMAT_CSV_BZ2:
705
3.54k
        [[fallthrough]];
706
3.54k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
707
3.54k
        [[fallthrough]];
708
3.54k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
709
3.54k
        [[fallthrough]];
710
3.54k
    case TFileFormatType::FORMAT_CSV_LZOP:
711
3.54k
        [[fallthrough]];
712
3.54k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
713
3.54k
        [[fallthrough]];
714
3.54k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
715
3.54k
        _line_reader =
716
3.54k
                NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
717
3.54k
                                                      text_line_reader_ctx, _size, _start_offset);
718
719
3.54k
        break;
720
77
    case TFileFormatType::FORMAT_PROTO:
721
77
        _fields_splitter = std::make_unique<CsvProtoFieldSplitter>();
722
77
        _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader);
723
77
        break;
724
0
    default:
725
0
        return Status::InternalError<false>(
726
0
                "Unknown format type, cannot init line reader in csv reader, type={}",
727
0
                _file_format_type);
728
3.61k
    }
729
3.61k
    return Status::OK();
730
3.61k
}
731
732
5.24k
Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
733
5.24k
    return serde->deserialize_one_cell_from_csv(column, slice, _options);
734
5.24k
}
735
736
Status CsvReader::_fill_dest_columns(const Slice& line, std::vector<MutableColumnPtr>& columns,
737
14.9M
                                     size_t* rows) {
738
14.9M
    bool is_success = false;
739
740
14.9M
    RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
741
14.9M
    if (UNLIKELY(!is_success)) {
742
        // If not success, which means we met an invalid row, filter this row and return.
743
539
        return Status::OK();
744
539
    }
745
746
232M
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
747
217M
        int col_idx = _col_idxs[i];
748
        // col idx is out of range, fill with null format
749
217M
        auto value = col_idx < _split_values.size()
750
218M
                             ? _split_values[col_idx]
751
18.4E
                             : Slice(_options.null_format, _options.null_len);
752
753
217M
        IColumn* col_ptr = columns[i].get();
754
217M
        if (!_is_load) {
755
41.2M
            col_ptr = columns[_file_slot_idx_map[i]].get();
756
41.2M
        }
757
758
217M
        if (_use_nullable_string_opt[i]) {
759
            // For load task, we always read "string" from file.
760
            // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe.
761
            // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
762
214M
            RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
763
214M
        } else {
764
2.36M
            RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
765
2.36M
        }
766
217M
    }
767
14.9M
    ++(*rows);
768
769
14.9M
    return Status::OK();
770
14.9M
}
771
772
12
Status CsvReader::_fill_empty_line(std::vector<MutableColumnPtr>& columns, size_t* rows) {
773
48
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
774
36
        IColumn* col_ptr = columns[i].get();
775
36
        if (!_is_load) {
776
36
            col_ptr = columns[_file_slot_idx_map[i]].get();
777
36
        }
778
36
        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
779
36
        null_column.insert_data(nullptr, 0);
780
36
    }
781
12
    ++(*rows);
782
12
    return Status::OK();
783
12
}
784
785
14.7M
Status CsvReader::_validate_line(const Slice& line, bool* success) {
786
14.7M
    if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) {
787
130
        if (!_is_load) {
788
2
            return Status::InternalError<false>("Only support csv data in utf8 codec");
789
128
        } else {
790
128
            _counter->num_rows_filtered++;
791
128
            *success = false;
792
128
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
793
128
                    [&]() -> std::string { return std::string(line.data, line.size); },
794
128
                    [&]() -> std::string {
795
128
                        return "Invalid file encoding: all CSV files must be UTF-8 encoded";
796
128
                    }));
797
128
            return Status::OK();
798
128
        }
799
130
    }
800
14.7M
    *success = true;
801
14.7M
    return Status::OK();
802
14.7M
}
803
804
14.9M
Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
805
14.9M
    _split_line(line);
806
807
14.9M
    if (_is_load) {
808
        // Only check for load task. For query task, the non exist column will be filled "null".
809
        // if actual column number in csv file is not equal to _file_slot_descs.size()
810
        // then filter this line.
811
10.7M
        bool ignore_col = false;
812
10.7M
        ignore_col = _params.__isset.file_attributes &&
813
10.7M
                     _params.file_attributes.__isset.ignore_csv_redundant_col &&
814
10.7M
                     _params.file_attributes.ignore_csv_redundant_col;
815
816
10.7M
        if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
817
10.7M
            (ignore_col && _split_values.size() < _file_slot_descs.size())) {
818
544
            _counter->num_rows_filtered++;
819
544
            *success = false;
820
544
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
821
544
                    [&]() -> std::string { return std::string(line.data, line.size); },
822
544
                    [&]() -> std::string {
823
544
                        fmt::memory_buffer error_msg;
824
544
                        fmt::format_to(error_msg,
825
544
                                       "Column count mismatch: expected {}, but found {}",
826
544
                                       _file_slot_descs.size(), _split_values.size());
827
544
                        std::string escaped_separator =
828
544
                                std::regex_replace(_value_separator, std::regex("\t"), "\\t");
829
544
                        std::string escaped_delimiter =
830
544
                                std::regex_replace(_line_delimiter, std::regex("\n"), "\\n");
831
544
                        fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator,
832
544
                                       escaped_delimiter);
833
544
                        if (_enclose != 0) {
834
544
                            fmt::format_to(error_msg, " encl:{}", _enclose);
835
544
                        }
836
544
                        if (_escape != 0) {
837
544
                            fmt::format_to(error_msg, " esc:{}", _escape);
838
544
                        }
839
544
                        fmt::format_to(error_msg, ")");
840
544
                        return fmt::to_string(error_msg);
841
544
                    }));
842
539
            return Status::OK();
843
544
        }
844
10.7M
    }
845
846
14.9M
    *success = true;
847
14.9M
    return Status::OK();
848
14.9M
}
849
850
14.9M
void CsvReader::_split_line(const Slice& line) {
851
14.9M
    _split_values.clear();
852
14.9M
    _fields_splitter->split_line(line, &_split_values);
853
14.9M
}
854
855
722
Status CsvReader::_parse_col_nums(size_t* col_nums) {
856
722
    const uint8_t* ptr = nullptr;
857
722
    size_t size = 0;
858
722
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
859
722
    if (size == 0) {
860
2
        return Status::InternalError<false>(
861
2
                "The first line is empty, can not parse column numbers");
862
2
    }
863
720
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
864
5
        return Status::InternalError<false>("Only support csv data in utf8 codec");
865
5
    }
866
715
    ptr = _remove_bom(ptr, size);
867
715
    _split_line(Slice(ptr, size));
868
715
    *col_nums = _split_values.size();
869
715
    return Status::OK();
870
720
}
871
872
92
Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
873
92
    const uint8_t* ptr = nullptr;
874
92
    size_t size = 0;
875
    // no use of _line_reader_eof
876
92
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
877
92
    if (size == 0) {
878
0
        return Status::InternalError<false>("The first line is empty, can not parse column names");
879
0
    }
880
92
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
881
0
        return Status::InternalError<false>("Only support csv data in utf8 codec");
882
0
    }
883
92
    ptr = _remove_bom(ptr, size);
884
92
    _split_line(Slice(ptr, size));
885
345
    for (auto _split_value : _split_values) {
886
345
        col_names->emplace_back(_split_value.to_string());
887
345
    }
888
92
    return Status::OK();
889
92
}
890
891
// TODO(ftw): parse type
892
29
Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) {
893
    // delete after.
894
155
    for (size_t i = 0; i < col_nums; ++i) {
895
126
        col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
896
126
    }
897
898
    // 1. check _line_reader_eof
899
    // 2. read line
900
    // 3. check utf8
901
    // 4. check size
902
    // 5. check _split_values.size must equal to col_nums.
903
    // 6. fill col_types
904
29
    return Status::OK();
905
29
}
906
907
12.4k
const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) {
908
12.4k
    if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) {
909
7
        LOG(INFO) << "remove bom";
910
7
        constexpr size_t bom_size = 3;
911
7
        size -= bom_size;
912
        // In enclose mode, column_sep_positions were computed on the original line
913
        // (including BOM). After shifting the pointer, we must adjust those positions
914
        // so they remain correct relative to the new start.
915
7
        if (_enclose_reader_ctx) {
916
1
            _enclose_reader_ctx->adjust_column_sep_positions(bom_size);
917
1
        }
918
7
        return ptr + bom_size;
919
7
    }
920
12.4k
    return ptr;
921
12.4k
}
922
923
7.24k
Status CsvReader::close() {
924
7.24k
    if (_line_reader) {
925
7.24k
        _line_reader->close();
926
7.24k
    }
927
928
7.24k
    if (_file_reader) {
929
7.24k
        RETURN_IF_ERROR(_file_reader->close());
930
7.24k
    }
931
932
7.24k
    return Status::OK();
933
7.24k
}
934
935
} // namespace doris