Coverage Report

Created: 2026-05-08 18:22

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/csv/csv_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <algorithm>
26
#include <cstddef>
27
#include <map>
28
#include <memory>
29
#include <numeric>
30
#include <ostream>
31
#include <regex>
32
#include <utility>
33
34
#include "common/compiler_util.h" // IWYU pragma: keep
35
#include "common/config.h"
36
#include "common/consts.h"
37
#include "common/status.h"
38
#include "core/block/block.h"
39
#include "core/block/column_with_type_and_name.h"
40
#include "core/data_type/data_type_factory.hpp"
41
#include "exec/scan/scanner.h"
42
#include "format/file_reader/new_plain_binary_line_reader.h"
43
#include "format/file_reader/new_plain_text_line_reader.h"
44
#include "format/line_reader.h"
45
#include "io/file_factory.h"
46
#include "io/fs/broker_file_reader.h"
47
#include "io/fs/buffered_reader.h"
48
#include "io/fs/file_reader.h"
49
#include "io/fs/s3_file_reader.h"
50
#include "io/fs/tracing_file_reader.h"
51
#include "runtime/descriptors.h"
52
#include "runtime/runtime_state.h"
53
#include "util/decompressor.h"
54
#include "util/string_util.h"
55
#include "util/utf8_check.h"
56
57
namespace doris {
58
class RuntimeProfile;
59
class IColumn;
60
namespace io {
61
struct IOContext;
62
enum class FileCachePolicy : uint8_t;
63
} // namespace io
64
} // namespace doris
65
66
namespace doris {
67
68
878
void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
69
878
    const char* data = line.data;
70
878
    const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions();
71
878
    size_t value_start_offset = 0;
72
2.98k
    for (auto idx : column_sep_positions) {
73
2.98k
        process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char,
74
2.98k
                           splitted_values);
75
2.98k
        value_start_offset = idx + _value_sep_len;
76
2.98k
    }
77
878
    if (line.size >= value_start_offset) {
78
        // process the last column
79
876
        process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char,
80
876
                           splitted_values);
81
876
    }
82
878
}
83
84
void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line,
85
14.6M
                                                         std::vector<Slice>* splitted_values) {
86
14.6M
    const char* data = line.data;
87
14.6M
    const size_t size = line.size;
88
14.6M
    size_t value_start = 0;
89
3.56G
    for (size_t i = 0; i < size; ++i) {
90
3.55G
        if (data[i] == _value_sep[0]) {
91
445M
            process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
92
445M
            value_start = i + _value_sep_len;
93
445M
        }
94
3.55G
    }
95
14.6M
    process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
96
14.6M
}
97
98
void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line,
99
2.38k
                                                        std::vector<Slice>* splitted_values) {
100
2.38k
    size_t start = 0;  // point to the start pos of next col value.
101
2.38k
    size_t curpos = 0; // point to the start pos of separator matching sequence.
102
103
    // value_sep : AAAA
104
    // line.data : 1234AAAA5678
105
    // -> 1234,5678
106
107
    //    start   start
108
    //      ▼       ▼
109
    //      1234AAAA5678\0
110
    //          ▲       ▲
111
    //      curpos     curpos
112
113
    //kmp
114
2.38k
    std::vector<int> next(_value_sep_len);
115
2.38k
    next[0] = -1;
116
4.96k
    for (int i = 1, j = -1; i < _value_sep_len; i++) {
117
2.60k
        while (j > -1 && _value_sep[i] != _value_sep[j + 1]) {
118
20
            j = next[j];
119
20
        }
120
2.58k
        if (_value_sep[i] == _value_sep[j + 1]) {
121
2.44k
            j++;
122
2.44k
        }
123
2.58k
        next[i] = j;
124
2.58k
    }
125
126
42.1k
    for (int i = 0, j = -1; i < line.size; i++) {
127
        // i : line
128
        // j : _value_sep
129
42.4k
        while (j > -1 && line[i] != _value_sep[j + 1]) {
130
2.66k
            j = next[j];
131
2.66k
        }
132
39.7k
        if (line[i] == _value_sep[j + 1]) {
133
7.02k
            j++;
134
7.02k
        }
135
39.7k
        if (j == _value_sep_len - 1) {
136
3.19k
            curpos = i - _value_sep_len + 1;
137
138
            /*
139
             * column_separator : "xx"
140
             * data.csv :  data1xxxxdata2
141
             *
142
             * Parse incorrectly:
143
             *      data1[xx]xxdata2
144
             *      data1x[xx]xdata2
145
             *      data1xx[xx]data2
146
             * The string "xxxx" is parsed into three "xx" delimiters.
147
             *
148
             * Parse correctly:
149
             *      data1[xx]xxdata2
150
             *      data1xx[xx]data2
151
             */
152
153
3.19k
            if (curpos >= start) {
154
3.13k
                process_value_func(line.data, start, curpos - start, _trimming_char,
155
3.13k
                                   splitted_values);
156
3.13k
                start = i + 1;
157
3.13k
            }
158
159
3.19k
            j = next[j];
160
3.19k
        }
161
39.7k
    }
162
2.38k
    process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values);
163
2.38k
}
164
165
14.6M
void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
166
14.6M
    if (is_single_char_delim) {
167
14.6M
        _split_field_single_char(line, splitted_values);
168
18.4E
    } else {
169
18.4E
        _split_field_multi_char(line, splitted_values);
170
18.4E
    }
171
14.6M
}
172
173
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
174
                     const TFileScanRangeParams& params, const TFileRangeDesc& range,
175
                     const std::vector<SlotDescriptor*>& file_slot_descs, size_t batch_size,
176
                     io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
177
4.78k
        : _profile(profile),
178
4.78k
          _params(params),
179
4.78k
          _file_reader(nullptr),
180
4.78k
          _line_reader(nullptr),
181
4.78k
          _decompressor(nullptr),
182
4.78k
          _state(state),
183
4.78k
          _counter(counter),
184
4.78k
          _range(range),
185
4.78k
          _file_slot_descs(file_slot_descs),
186
4.78k
          _line_reader_eof(false),
187
4.78k
          _skip_lines(0),
188
4.78k
          _io_ctx(io_ctx),
189
4.78k
          _io_ctx_holder(std::move(io_ctx_holder)),
190
4.78k
          _batch_size(std::max(batch_size, 1UL)) {
191
4.78k
    if (_io_ctx == nullptr && _io_ctx_holder) {
192
0
        _io_ctx = _io_ctx_holder.get();
193
0
    }
194
4.78k
    _file_format_type = _params.format_type;
195
4.78k
    _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
196
4.78k
    if (_range.__isset.compress_type) {
197
        // for compatibility
198
2.82k
        _file_compress_type = _range.compress_type;
199
2.82k
    } else {
200
1.96k
        _file_compress_type = _params.compress_type;
201
1.96k
    }
202
4.78k
    _size = _range.size;
203
204
4.78k
    _split_values.reserve(_file_slot_descs.size());
205
4.78k
    _init_system_properties();
206
4.78k
    _init_file_description();
207
4.78k
    _serdes = create_data_type_serdes(_file_slot_descs);
208
4.78k
}
209
210
4.78k
void CsvReader::_init_system_properties() {
211
4.78k
    if (_range.__isset.file_type) {
212
        // for compatibility
213
2.66k
        _system_properties.system_type = _range.file_type;
214
2.66k
    } else {
215
2.12k
        _system_properties.system_type = _params.file_type;
216
2.12k
    }
217
4.78k
    _system_properties.properties = _params.properties;
218
4.78k
    _system_properties.hdfs_params = _params.hdfs_params;
219
4.78k
    if (_params.__isset.broker_addresses) {
220
1.85k
        _system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
221
1.85k
                                                   _params.broker_addresses.end());
222
1.85k
    }
223
4.78k
}
224
225
4.78k
void CsvReader::_init_file_description() {
226
4.78k
    _file_description.path = _range.path;
227
18.4E
    _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
228
4.78k
    if (_range.__isset.fs_name) {
229
1.51k
        _file_description.fs_name = _range.fs_name;
230
1.51k
    }
231
4.78k
    if (_range.__isset.file_cache_admission) {
232
2.01k
        _file_description.file_cache_admission = _range.file_cache_admission;
233
2.01k
    }
234
4.78k
}
235
236
0
Status CsvReader::init_reader(bool is_load) {
237
    // set the skip lines and start offset
238
0
    _start_offset = _range.start_offset;
239
0
    if (_start_offset == 0) {
240
        // check header typer first
241
0
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
242
0
            !_params.file_attributes.header_type.empty()) {
243
0
            std::string header_type = to_lower(_params.file_attributes.header_type);
244
0
            if (header_type == BeConsts::CSV_WITH_NAMES) {
245
0
                _skip_lines = 1;
246
0
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
247
0
                _skip_lines = 2;
248
0
            }
249
0
        } else if (_params.file_attributes.__isset.skip_lines) {
250
0
            _skip_lines = _params.file_attributes.skip_lines;
251
0
        }
252
0
    } else if (_start_offset != 0) {
253
0
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
254
0
            (_file_compress_type == TFileCompressType::UNKNOWN &&
255
0
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
256
0
            return Status::InternalError<false>("For now we do not support split compressed file");
257
0
        }
258
        // pre-read to promise first line skipped always read
259
0
        int64_t pre_read_len = std::min(
260
0
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
261
0
                _start_offset);
262
0
        _start_offset -= pre_read_len;
263
0
        _size += pre_read_len;
264
        // not first range will always skip one line
265
0
        _skip_lines = 1;
266
0
    }
267
268
0
    _use_nullable_string_opt.resize(_file_slot_descs.size());
269
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
270
0
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
271
0
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
272
0
            _use_nullable_string_opt[i] = 1;
273
0
        }
274
0
    }
275
276
0
    RETURN_IF_ERROR(_init_options());
277
0
    RETURN_IF_ERROR(_create_file_reader(false));
278
0
    RETURN_IF_ERROR(_create_decompressor());
279
0
    RETURN_IF_ERROR(_create_line_reader());
280
281
0
    _is_load = is_load;
282
0
    if (!_is_load) {
283
        // For query task, there are 2 slot mapping.
284
        // One is from file slot to values in line.
285
        //      eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
286
        //      the _col_idxs will save: 0, 2, 4
287
        // The other is from file slot to columns in output block
288
        //      eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
289
        //      where "p1" is the partition col which does not exist in file
290
        //      the _file_slot_idx_map will save: 1, 2, 3
291
0
        DCHECK(_params.__isset.column_idxs);
292
0
        _col_idxs = _params.column_idxs;
293
0
        int idx = 0;
294
0
        for (const auto& slot_info : _params.required_slots) {
295
0
            if (slot_info.is_file_slot) {
296
0
                _file_slot_idx_map.push_back(idx);
297
0
            }
298
0
            idx++;
299
0
        }
300
0
    } else {
301
        // For load task, the column order is same as file column order
302
0
        int i = 0;
303
0
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
304
0
            _col_idxs.push_back(i++);
305
0
        }
306
0
    }
307
308
0
    _line_reader_eof = false;
309
0
    return Status::OK();
310
0
}
311
312
// ---- Unified init_reader(ReaderInitContext*) overrides ----
313
314
3.96k
Status CsvReader::_open_file_reader(ReaderInitContext* base_ctx) {
315
3.96k
    _start_offset = _range.start_offset;
316
3.96k
    if (_start_offset == 0) {
317
3.84k
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
318
3.84k
            !_params.file_attributes.header_type.empty()) {
319
92
            std::string header_type = to_lower(_params.file_attributes.header_type);
320
92
            if (header_type == BeConsts::CSV_WITH_NAMES) {
321
64
                _skip_lines = 1;
322
64
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
323
28
                _skip_lines = 2;
324
28
            }
325
3.75k
        } else if (_params.file_attributes.__isset.skip_lines) {
326
3.74k
            _skip_lines = _params.file_attributes.skip_lines;
327
3.74k
        }
328
3.84k
    } else if (_start_offset != 0) {
329
128
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
330
128
            (_file_compress_type == TFileCompressType::UNKNOWN &&
331
128
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
332
0
            return Status::InternalError<false>("For now we do not support split compressed file");
333
0
        }
334
128
        int64_t pre_read_len = std::min(
335
128
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
336
128
                _start_offset);
337
128
        _start_offset -= pre_read_len;
338
128
        _size += pre_read_len;
339
128
        _skip_lines = 1;
340
128
    }
341
342
3.96k
    RETURN_IF_ERROR(_init_options());
343
3.96k
    RETURN_IF_ERROR(_create_file_reader(false));
344
3.96k
    return Status::OK();
345
3.96k
}
346
347
3.96k
Status CsvReader::_do_init_reader(ReaderInitContext* base_ctx) {
348
3.96k
    auto* ctx = checked_context_cast<CsvInitContext>(base_ctx);
349
3.96k
    _is_load = ctx->is_load;
350
351
3.96k
    _use_nullable_string_opt.resize(_file_slot_descs.size());
352
29.2k
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
353
25.3k
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
354
25.3k
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
355
22.8k
            _use_nullable_string_opt[i] = 1;
356
22.8k
        }
357
25.3k
    }
358
359
3.96k
    RETURN_IF_ERROR(_create_decompressor());
360
3.96k
    RETURN_IF_ERROR(_create_line_reader());
361
362
3.96k
    if (!_is_load) {
363
2.12k
        DCHECK(_params.__isset.column_idxs);
364
2.12k
        _col_idxs = _params.column_idxs;
365
2.12k
        int idx = 0;
366
9.66k
        for (const auto& slot_info : _params.required_slots) {
367
9.66k
            if (slot_info.is_file_slot) {
368
8.25k
                _file_slot_idx_map.push_back(idx);
369
8.25k
            }
370
9.66k
            idx++;
371
9.66k
        }
372
2.12k
    } else {
373
1.84k
        int i = 0;
374
17.0k
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
375
17.0k
            _col_idxs.push_back(i++);
376
17.0k
        }
377
1.84k
    }
378
3.96k
    _line_reader_eof = false;
379
3.96k
    return Status::OK();
380
3.96k
}
381
382
11.7k
void CsvReader::set_batch_size(size_t batch_size) {
383
    // 0 means "not set" / "use default" for the row-based readers; we must
384
    // never let _batch_size be 0 because _do_get_next_block uses it as the
385
    // upper bound of a `while (rows < _batch_size)` loop and a 0 would make
386
    // the reader return empty blocks and incorrectly signal EOF.
387
11.7k
    _batch_size = std::max(batch_size, 1UL);
388
11.7k
}
389
390
// !FIXME: Here we should use MutableBlock
391
11.7k
Status CsvReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
392
11.7k
    if (_line_reader_eof) {
393
3.90k
        *eof = true;
394
3.90k
        return Status::OK();
395
3.90k
    }
396
397
7.89k
    const size_t batch_size = _batch_size;
398
7.89k
    const auto max_block_bytes = _state->preferred_block_size_bytes();
399
7.89k
    size_t rows = 0;
400
401
7.89k
    bool success = false;
402
7.89k
    bool is_remove_bom = false;
403
7.89k
    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
404
17.9k
        while (rows < batch_size && !_line_reader_eof) {
405
17.9k
            const uint8_t* ptr = nullptr;
406
17.9k
            size_t size = 0;
407
17.9k
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
408
409
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
410
            // is_remove_bom means _remove_bom should only execute once
411
17.9k
            if (_skip_lines == 0 && !is_remove_bom) {
412
34
                ptr = _remove_bom(ptr, size);
413
34
                is_remove_bom = true;
414
34
            }
415
416
            // _skip_lines > 0 means we do not need to remove bom
417
17.9k
            if (_skip_lines > 0) {
418
0
                _skip_lines--;
419
0
                is_remove_bom = true;
420
0
                continue;
421
0
            }
422
17.9k
            if (size == 0) {
423
34
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
424
0
                    ++rows;
425
0
                }
426
                // Read empty line, continue
427
34
                continue;
428
34
            }
429
430
17.8k
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
431
17.8k
            ++rows;
432
17.8k
        }
433
34
        auto mutate_columns = block->mutate_columns();
434
34
        for (auto& col : mutate_columns) {
435
34
            col->resize(rows);
436
34
        }
437
34
        block->set_columns(std::move(mutate_columns));
438
7.86k
    } else {
439
7.86k
        auto columns = block->mutate_columns();
440
14.8M
        while (rows < batch_size && !_line_reader_eof && (block->bytes() < max_block_bytes)) {
441
14.7M
            const uint8_t* ptr = nullptr;
442
14.7M
            size_t size = 0;
443
14.7M
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
444
445
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
446
            // is_remove_bom means _remove_bom should only execute once
447
14.7M
            if (!is_remove_bom && _skip_lines == 0) {
448
7.61k
                ptr = _remove_bom(ptr, size);
449
7.61k
                is_remove_bom = true;
450
7.61k
            }
451
452
            // _skip_lines > 0 means we do not remove bom
453
14.7M
            if (_skip_lines > 0) {
454
319
                _skip_lines--;
455
319
                is_remove_bom = true;
456
319
                continue;
457
319
            }
458
14.7M
            if (size == 0) {
459
3.97k
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
460
12
                    RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows));
461
12
                }
462
                // Read empty line, continue
463
3.97k
                continue;
464
3.97k
            }
465
466
14.7M
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
467
14.7M
            if (!success) {
468
128
                continue;
469
128
            }
470
14.7M
            RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
471
14.7M
        }
472
7.84k
        block->set_columns(std::move(columns));
473
7.84k
    }
474
475
7.88k
    *eof = (rows == 0);
476
7.88k
    *read_rows = rows;
477
478
7.88k
    return Status::OK();
479
7.89k
}
480
481
3.97k
Status CsvReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
482
25.3k
    for (const auto& slot : _file_slot_descs) {
483
25.3k
        name_to_type->emplace(slot->col_name(), slot->type());
484
25.3k
    }
485
3.97k
    return Status::OK();
486
3.97k
}
487
488
// init decompressor, file reader and line reader for parsing schema
489
811
Status CsvReader::init_schema_reader() {
490
811
    _start_offset = _range.start_offset;
491
811
    if (_start_offset != 0) {
492
0
        return Status::InvalidArgument(
493
0
                "start offset of TFileRangeDesc must be zero in get parsered schema");
494
0
    }
495
811
    if (_params.file_type == TFileType::FILE_BROKER) {
496
0
        return Status::InternalError<false>(
497
0
                "Getting parsered schema from csv file do not support stream load and broker "
498
0
                "load.");
499
0
    }
500
501
    // csv file without names line and types line.
502
811
    _read_line = 1;
503
811
    _is_parse_name = false;
504
505
811
    if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
506
811
        !_params.file_attributes.header_type.empty()) {
507
92
        std::string header_type = to_lower(_params.file_attributes.header_type);
508
92
        if (header_type == BeConsts::CSV_WITH_NAMES) {
509
63
            _is_parse_name = true;
510
63
        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
511
29
            _read_line = 2;
512
29
            _is_parse_name = true;
513
29
        }
514
92
    }
515
516
811
    RETURN_IF_ERROR(_init_options());
517
811
    RETURN_IF_ERROR(_create_file_reader(true));
518
811
    RETURN_IF_ERROR(_create_decompressor());
519
811
    RETURN_IF_ERROR(_create_line_reader());
520
811
    return Status::OK();
521
811
}
522
523
Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
524
811
                                    std::vector<DataTypePtr>* col_types) {
525
811
    if (_read_line == 1) {
526
782
        if (!_is_parse_name) { //parse csv file without names and types
527
719
            size_t col_nums = 0;
528
719
            RETURN_IF_ERROR(_parse_col_nums(&col_nums));
529
7.98k
            for (size_t i = 0; i < col_nums; ++i) {
530
7.27k
                col_names->emplace_back("c" + std::to_string(i + 1));
531
7.27k
            }
532
712
        } else { // parse csv file with names
533
63
            RETURN_IF_ERROR(_parse_col_names(col_names));
534
63
        }
535
536
8.26k
        for (size_t j = 0; j < col_names->size(); ++j) {
537
7.49k
            col_types->emplace_back(
538
7.49k
                    DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
539
7.49k
        }
540
775
    } else { // parse csv file with names and types
541
29
        RETURN_IF_ERROR(_parse_col_names(col_names));
542
29
        RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
543
29
    }
544
804
    return Status::OK();
545
811
}
546
547
210M
Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
548
210M
    auto& null_column = assert_cast<ColumnNullable&>(column);
549
210M
    if (_empty_field_as_null) {
550
60
        if (slice.size == 0) {
551
5
            null_column.insert_data(nullptr, 0);
552
5
            return Status::OK();
553
5
        }
554
60
    }
555
210M
    if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) {
556
209M
        if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
557
31.1k
            null_column.insert_data(nullptr, 0);
558
31.1k
            return Status::OK();
559
31.1k
        }
560
209M
    }
561
210M
    static DataTypeStringSerDe stringSerDe(TYPE_STRING);
562
210M
    auto st = stringSerDe.deserialize_one_cell_from_csv(null_column.get_nested_column(), slice,
563
210M
                                                        _options);
564
210M
    if (!st.ok()) {
565
        // fill null if fail
566
0
        null_column.insert_data(nullptr, 0); // 0 is meaningless here
567
0
        return Status::OK();
568
0
    }
569
    // fill not null if success
570
210M
    null_column.get_null_map_data().push_back(0);
571
210M
    return Status::OK();
572
210M
}
573
574
3.59k
Status CsvReader::_init_options() {
575
    // get column_separator and line_delimiter
576
3.59k
    _value_separator = _params.file_attributes.text_params.column_separator;
577
3.59k
    _value_separator_length = _value_separator.size();
578
3.59k
    _line_delimiter = _params.file_attributes.text_params.line_delimiter;
579
3.59k
    _line_delimiter_length = _line_delimiter.size();
580
3.59k
    if (_params.file_attributes.text_params.__isset.enclose) {
581
3.59k
        _enclose = _params.file_attributes.text_params.enclose;
582
3.59k
    }
583
3.59k
    if (_params.file_attributes.text_params.__isset.escape) {
584
3.58k
        _escape = _params.file_attributes.text_params.escape;
585
3.58k
    }
586
587
3.59k
    _trim_tailing_spaces =
588
3.59k
            (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
589
590
3.59k
    _options.escape_char = _escape;
591
3.59k
    _options.quote_char = _enclose;
592
593
3.59k
    if (_params.file_attributes.text_params.collection_delimiter.empty()) {
594
3.59k
        _options.collection_delim = ',';
595
3.59k
    } else {
596
0
        _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
597
0
    }
598
3.59k
    if (_params.file_attributes.text_params.mapkv_delimiter.empty()) {
599
3.59k
        _options.map_key_delim = ':';
600
18.4E
    } else {
601
18.4E
        _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
602
18.4E
    }
603
604
3.59k
    if (_params.file_attributes.text_params.__isset.null_format) {
605
34
        _options.null_format = _params.file_attributes.text_params.null_format.data();
606
34
        _options.null_len = _params.file_attributes.text_params.null_format.length();
607
34
    }
608
609
3.59k
    if (_params.file_attributes.__isset.trim_double_quotes) {
610
3.58k
        _trim_double_quotes = _params.file_attributes.trim_double_quotes;
611
3.58k
    }
612
3.59k
    _options.converted_from_string = _trim_double_quotes;
613
614
3.59k
    if (_state != nullptr) {
615
2.78k
        _keep_cr = _state->query_options().keep_carriage_return;
616
2.78k
    }
617
618
3.59k
    if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
619
3.55k
        _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null;
620
3.55k
    }
621
3.59k
    return Status::OK();
622
3.59k
}
623
624
4.78k
Status CsvReader::_create_decompressor() {
625
4.78k
    if (_file_compress_type != TFileCompressType::UNKNOWN) {
626
4.70k
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
627
4.70k
    } else {
628
86
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor));
629
86
    }
630
631
4.78k
    return Status::OK();
632
4.78k
}
633
634
4.77k
Status CsvReader::_create_file_reader(bool need_schema) {
635
4.77k
    if (_params.file_type == TFileType::FILE_STREAM) {
636
1.93k
        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
637
1.93k
                                                        need_schema));
638
2.84k
    } else {
639
2.84k
        _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
640
2.84k
        io::FileReaderOptions reader_options =
641
2.84k
                FileFactory::get_reader_options(_state, _file_description);
642
2.84k
        io::FileReaderSPtr file_reader;
643
2.84k
        if (_io_ctx_holder) {
644
687
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
645
687
                    _profile, _system_properties, _file_description, reader_options,
646
687
                    io::DelegateReader::AccessMode::SEQUENTIAL,
647
687
                    std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
648
687
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
649
2.15k
        } else {
650
2.15k
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
651
2.15k
                    _profile, _system_properties, _file_description, reader_options,
652
2.15k
                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
653
2.15k
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
654
2.15k
        }
655
2.85k
        _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
656
2.85k
                                                                         _io_ctx->file_reader_stats)
657
18.4E
                               : file_reader;
658
2.84k
    }
659
4.77k
    if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
660
4.77k
        _params.file_type != TFileType::FILE_BROKER) {
661
0
        return Status::EndOfFile("init reader failed, empty csv file: " + _range.path);
662
0
    }
663
4.77k
    return Status::OK();
664
4.77k
}
665
666
3.59k
Status CsvReader::_create_line_reader() {
667
3.59k
    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
668
3.59k
    if (_enclose == 0) {
669
3.44k
        text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
670
3.44k
                _line_delimiter, _line_delimiter_length, _keep_cr);
671
3.44k
        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
672
3.44k
                _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
673
674
3.44k
    } else {
675
        // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0
676
154
        size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0;
677
154
        _enclose_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>(
678
154
                _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
679
154
                col_sep_num, _enclose, _escape, _keep_cr);
680
154
        text_line_reader_ctx = _enclose_reader_ctx;
681
682
154
        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
683
154
                _trim_tailing_spaces, true, _enclose_reader_ctx, _value_separator_length, _enclose);
684
154
    }
685
3.59k
    switch (_file_format_type) {
686
3.51k
    case TFileFormatType::FORMAT_CSV_PLAIN:
687
3.51k
        [[fallthrough]];
688
3.51k
    case TFileFormatType::FORMAT_CSV_GZ:
689
3.51k
        [[fallthrough]];
690
3.51k
    case TFileFormatType::FORMAT_CSV_BZ2:
691
3.51k
        [[fallthrough]];
692
3.51k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
693
3.51k
        [[fallthrough]];
694
3.51k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
695
3.51k
        [[fallthrough]];
696
3.51k
    case TFileFormatType::FORMAT_CSV_LZOP:
697
3.51k
        [[fallthrough]];
698
3.51k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
699
3.51k
        [[fallthrough]];
700
3.51k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
701
3.51k
        _line_reader =
702
3.51k
                NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
703
3.51k
                                                      text_line_reader_ctx, _size, _start_offset);
704
705
3.51k
        break;
706
77
    case TFileFormatType::FORMAT_PROTO:
707
77
        _fields_splitter = std::make_unique<CsvProtoFieldSplitter>();
708
77
        _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader);
709
77
        break;
710
0
    default:
711
0
        return Status::InternalError<false>(
712
0
                "Unknown format type, cannot init line reader in csv reader, type={}",
713
0
                _file_format_type);
714
3.59k
    }
715
3.59k
    return Status::OK();
716
3.59k
}
717
718
5.24k
Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
719
5.24k
    return serde->deserialize_one_cell_from_csv(column, slice, _options);
720
5.24k
}
721
722
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
723
14.8M
                                     std::vector<MutableColumnPtr>& columns, size_t* rows) {
724
14.8M
    bool is_success = false;
725
726
14.8M
    RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
727
14.8M
    if (UNLIKELY(!is_success)) {
728
        // If not success, which means we met an invalid row, filter this row and return.
729
539
        return Status::OK();
730
539
    }
731
732
221M
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
733
207M
        int col_idx = _col_idxs[i];
734
        // col idx is out of range, fill with null format
735
207M
        auto value = col_idx < _split_values.size()
736
207M
                             ? _split_values[col_idx]
737
18.4E
                             : Slice(_options.null_format, _options.null_len);
738
739
207M
        IColumn* col_ptr = columns[i].get();
740
207M
        if (!_is_load) {
741
            // block is a Block*, and get_by_position returns a ColumnPtr,
742
            // which is a const pointer. Therefore, using const_cast is permissible.
743
33.4M
            col_ptr = const_cast<IColumn*>(
744
33.4M
                    block->get_by_position(_file_slot_idx_map[i]).column.get());
745
33.4M
        }
746
747
213M
        if (_use_nullable_string_opt[i]) {
748
            // For load task, we always read "string" from file.
749
            // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe.
750
            // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
751
213M
            RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
752
18.4E
        } else {
753
18.4E
            RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
754
18.4E
        }
755
207M
    }
756
14.8M
    ++(*rows);
757
758
14.8M
    return Status::OK();
759
14.8M
}
760
761
Status CsvReader::_fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns,
762
12
                                   size_t* rows) {
763
48
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
764
36
        IColumn* col_ptr = columns[i].get();
765
36
        if (!_is_load) {
766
            // block is a Block*, and get_by_position returns a ColumnPtr,
767
            // which is a const pointer. Therefore, using const_cast is permissible.
768
36
            col_ptr = const_cast<IColumn*>(
769
36
                    block->get_by_position(_file_slot_idx_map[i]).column.get());
770
36
        }
771
36
        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
772
36
        null_column.insert_data(nullptr, 0);
773
36
    }
774
12
    ++(*rows);
775
12
    return Status::OK();
776
12
}
777
778
14.6M
Status CsvReader::_validate_line(const Slice& line, bool* success) {
779
14.6M
    if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) {
780
130
        if (!_is_load) {
781
2
            return Status::InternalError<false>("Only support csv data in utf8 codec");
782
128
        } else {
783
128
            _counter->num_rows_filtered++;
784
128
            *success = false;
785
128
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
786
128
                    [&]() -> std::string { return std::string(line.data, line.size); },
787
128
                    [&]() -> std::string {
788
128
                        return "Invalid file encoding: all CSV files must be UTF-8 encoded";
789
128
                    }));
790
128
            return Status::OK();
791
128
        }
792
130
    }
793
14.6M
    *success = true;
794
14.6M
    return Status::OK();
795
14.6M
}
796
797
14.8M
Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
798
14.8M
    _split_line(line);
799
800
14.8M
    if (_is_load) {
801
        // Only check for load task. For query task, the non exist column will be filled "null".
802
        // if actual column number in csv file is not equal to _file_slot_descs.size()
803
        // then filter this line.
804
10.7M
        bool ignore_col = false;
805
10.7M
        ignore_col = _params.__isset.file_attributes &&
806
10.7M
                     _params.file_attributes.__isset.ignore_csv_redundant_col &&
807
10.7M
                     _params.file_attributes.ignore_csv_redundant_col;
808
809
10.7M
        if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
810
10.7M
            (ignore_col && _split_values.size() < _file_slot_descs.size())) {
811
544
            _counter->num_rows_filtered++;
812
544
            *success = false;
813
544
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
814
544
                    [&]() -> std::string { return std::string(line.data, line.size); },
815
544
                    [&]() -> std::string {
816
544
                        fmt::memory_buffer error_msg;
817
544
                        fmt::format_to(error_msg,
818
544
                                       "Column count mismatch: expected {}, but found {}",
819
544
                                       _file_slot_descs.size(), _split_values.size());
820
544
                        std::string escaped_separator =
821
544
                                std::regex_replace(_value_separator, std::regex("\t"), "\\t");
822
544
                        std::string escaped_delimiter =
823
544
                                std::regex_replace(_line_delimiter, std::regex("\n"), "\\n");
824
544
                        fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator,
825
544
                                       escaped_delimiter);
826
544
                        if (_enclose != 0) {
827
544
                            fmt::format_to(error_msg, " encl:{}", _enclose);
828
544
                        }
829
544
                        if (_escape != 0) {
830
544
                            fmt::format_to(error_msg, " esc:{}", _escape);
831
544
                        }
832
544
                        fmt::format_to(error_msg, ")");
833
544
                        return fmt::to_string(error_msg);
834
544
                    }));
835
539
            return Status::OK();
836
544
        }
837
10.7M
    }
838
839
14.8M
    *success = true;
840
14.8M
    return Status::OK();
841
14.8M
}
842
843
14.8M
void CsvReader::_split_line(const Slice& line) {
844
14.8M
    _split_values.clear();
845
14.8M
    _fields_splitter->split_line(line, &_split_values);
846
14.8M
}
847
848
719
Status CsvReader::_parse_col_nums(size_t* col_nums) {
849
719
    const uint8_t* ptr = nullptr;
850
719
    size_t size = 0;
851
719
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
852
719
    if (size == 0) {
853
2
        return Status::InternalError<false>(
854
2
                "The first line is empty, can not parse column numbers");
855
2
    }
856
717
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
857
5
        return Status::InternalError<false>("Only support csv data in utf8 codec");
858
5
    }
859
712
    ptr = _remove_bom(ptr, size);
860
712
    _split_line(Slice(ptr, size));
861
712
    *col_nums = _split_values.size();
862
712
    return Status::OK();
863
717
}
864
865
92
Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
866
92
    const uint8_t* ptr = nullptr;
867
92
    size_t size = 0;
868
    // no use of _line_reader_eof
869
92
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
870
92
    if (size == 0) {
871
0
        return Status::InternalError<false>("The first line is empty, can not parse column names");
872
0
    }
873
92
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
874
0
        return Status::InternalError<false>("Only support csv data in utf8 codec");
875
0
    }
876
92
    ptr = _remove_bom(ptr, size);
877
92
    _split_line(Slice(ptr, size));
878
345
    for (auto _split_value : _split_values) {
879
345
        col_names->emplace_back(_split_value.to_string());
880
345
    }
881
92
    return Status::OK();
882
92
}
883
884
// TODO(ftw): parse type
885
29
Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) {
886
    // delete after.
887
155
    for (size_t i = 0; i < col_nums; ++i) {
888
126
        col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
889
126
    }
890
891
    // 1. check _line_reader_eof
892
    // 2. read line
893
    // 3. check utf8
894
    // 4. check size
895
    // 5. check _split_values.size must equal to col_nums.
896
    // 6. fill col_types
897
29
    return Status::OK();
898
29
}
899
900
8.44k
const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) {
901
8.44k
    if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) {
902
7
        LOG(INFO) << "remove bom";
903
7
        constexpr size_t bom_size = 3;
904
7
        size -= bom_size;
905
        // In enclose mode, column_sep_positions were computed on the original line
906
        // (including BOM). After shifting the pointer, we must adjust those positions
907
        // so they remain correct relative to the new start.
908
7
        if (_enclose_reader_ctx) {
909
1
            _enclose_reader_ctx->adjust_column_sep_positions(bom_size);
910
1
        }
911
7
        return ptr + bom_size;
912
7
    }
913
8.43k
    return ptr;
914
8.44k
}
915
916
3.97k
Status CsvReader::close() {
917
3.97k
    if (_line_reader) {
918
3.97k
        _line_reader->close();
919
3.97k
    }
920
921
3.97k
    if (_file_reader) {
922
3.97k
        RETURN_IF_ERROR(_file_reader->close());
923
3.97k
    }
924
925
3.97k
    return Status::OK();
926
3.97k
}
927
928
} // namespace doris