Coverage Report

Created: 2026-04-09 06:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/csv/csv_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <algorithm>
26
#include <cstddef>
27
#include <map>
28
#include <memory>
29
#include <ostream>
30
#include <regex>
31
#include <utility>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/config.h"
35
#include "common/consts.h"
36
#include "common/status.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/data_type/data_type_factory.hpp"
40
#include "exec/scan/scanner.h"
41
#include "format/file_reader/new_plain_binary_line_reader.h"
42
#include "format/file_reader/new_plain_text_line_reader.h"
43
#include "format/line_reader.h"
44
#include "io/file_factory.h"
45
#include "io/fs/broker_file_reader.h"
46
#include "io/fs/buffered_reader.h"
47
#include "io/fs/file_reader.h"
48
#include "io/fs/s3_file_reader.h"
49
#include "io/fs/tracing_file_reader.h"
50
#include "runtime/descriptors.h"
51
#include "runtime/runtime_state.h"
52
#include "util/decompressor.h"
53
#include "util/string_util.h"
54
#include "util/utf8_check.h"
55
56
namespace doris {
57
class RuntimeProfile;
58
class IColumn;
59
namespace io {
60
struct IOContext;
61
enum class FileCachePolicy : uint8_t;
62
} // namespace io
63
} // namespace doris
64
65
namespace doris {
66
#include "common/compile_check_begin.h"
67
68
166
void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
69
166
    const char* data = line.data;
70
166
    const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions();
71
166
    size_t value_start_offset = 0;
72
386
    for (auto idx : column_sep_positions) {
73
386
        process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char,
74
386
                           splitted_values);
75
386
        value_start_offset = idx + _value_sep_len;
76
386
    }
77
166
    if (line.size >= value_start_offset) {
78
        // process the last column
79
166
        process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char,
80
166
                           splitted_values);
81
166
    }
82
166
}
83
84
void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line,
85
2.79M
                                                         std::vector<Slice>* splitted_values) {
86
2.79M
    const char* data = line.data;
87
2.79M
    const size_t size = line.size;
88
2.79M
    size_t value_start = 0;
89
2.03G
    for (size_t i = 0; i < size; ++i) {
90
2.03G
        if (data[i] == _value_sep[0]) {
91
259M
            process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
92
259M
            value_start = i + _value_sep_len;
93
259M
        }
94
2.03G
    }
95
2.79M
    process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
96
2.79M
}
97
98
void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line,
99
2.22k
                                                        std::vector<Slice>* splitted_values) {
100
2.22k
    size_t start = 0;  // point to the start pos of next col value.
101
2.22k
    size_t curpos = 0; // point to the start pos of separator matching sequence.
102
103
    // value_sep : AAAA
104
    // line.data : 1234AAAA5678
105
    // -> 1234,5678
106
107
    //    start   start
108
    //      ▼       ▼
109
    //      1234AAAA5678\0
110
    //          ▲       ▲
111
    //      curpos     curpos
112
113
    //kmp
114
2.22k
    std::vector<int> next(_value_sep_len);
115
2.22k
    next[0] = -1;
116
4.56k
    for (int i = 1, j = -1; i < _value_sep_len; i++) {
117
2.34k
        while (j > -1 && _value_sep[i] != _value_sep[j + 1]) {
118
0
            j = next[j];
119
0
        }
120
2.34k
        if (_value_sep[i] == _value_sep[j + 1]) {
121
2.31k
            j++;
122
2.31k
        }
123
2.34k
        next[i] = j;
124
2.34k
    }
125
126
35.1k
    for (int i = 0, j = -1; i < line.size; i++) {
127
        // i : line
128
        // j : _value_sep
129
35.3k
        while (j > -1 && line[i] != _value_sep[j + 1]) {
130
2.37k
            j = next[j];
131
2.37k
        }
132
32.9k
        if (line[i] == _value_sep[j + 1]) {
133
4.85k
            j++;
134
4.85k
        }
135
32.9k
        if (j == _value_sep_len - 1) {
136
2.36k
            curpos = i - _value_sep_len + 1;
137
138
            /*
139
             * column_separator : "xx"
140
             * data.csv :  data1xxxxdata2
141
             *
142
             * Parse incorrectly:
143
             *      data1[xx]xxdata2
144
             *      data1x[xx]xdata2
145
             *      data1xx[xx]data2
146
             * The string "xxxx" is parsed into three "xx" delimiters.
147
             *
148
             * Parse correctly:
149
             *      data1[xx]xxdata2
150
             *      data1xx[xx]data2
151
             */
152
153
2.36k
            if (curpos >= start) {
154
2.33k
                process_value_func(line.data, start, curpos - start, _trimming_char,
155
2.33k
                                   splitted_values);
156
2.33k
                start = i + 1;
157
2.33k
            }
158
159
2.36k
            j = next[j];
160
2.36k
        }
161
32.9k
    }
162
2.22k
    process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values);
163
2.22k
}
164
165
2.80M
void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
166
2.80M
    if (is_single_char_delim) {
167
2.80M
        _split_field_single_char(line, splitted_values);
168
2.80M
    } else {
169
2.02k
        _split_field_multi_char(line, splitted_values);
170
2.02k
    }
171
2.80M
}
172
173
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
174
                     const TFileScanRangeParams& params, const TFileRangeDesc& range,
175
                     const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx,
176
                     std::shared_ptr<io::IOContext> io_ctx_holder)
177
5.78k
        : _profile(profile),
178
5.78k
          _params(params),
179
5.78k
          _file_reader(nullptr),
180
5.78k
          _line_reader(nullptr),
181
5.78k
          _decompressor(nullptr),
182
5.78k
          _state(state),
183
5.78k
          _counter(counter),
184
5.78k
          _range(range),
185
5.78k
          _file_slot_descs(file_slot_descs),
186
5.78k
          _line_reader_eof(false),
187
5.78k
          _skip_lines(0),
188
5.78k
          _io_ctx(io_ctx),
189
5.78k
          _io_ctx_holder(std::move(io_ctx_holder)) {
190
5.78k
    if (_io_ctx == nullptr && _io_ctx_holder) {
191
0
        _io_ctx = _io_ctx_holder.get();
192
0
    }
193
5.78k
    _file_format_type = _params.format_type;
194
5.78k
    _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
195
5.78k
    if (_range.__isset.compress_type) {
196
        // for compatibility
197
5.60k
        _file_compress_type = _range.compress_type;
198
5.60k
    } else {
199
180
        _file_compress_type = _params.compress_type;
200
180
    }
201
5.78k
    _size = _range.size;
202
203
5.78k
    _split_values.reserve(_file_slot_descs.size());
204
5.78k
    _init_system_properties();
205
5.78k
    _init_file_description();
206
5.78k
    _serdes = create_data_type_serdes(_file_slot_descs);
207
5.78k
}
208
209
5.77k
void CsvReader::_init_system_properties() {
210
5.77k
    if (_range.__isset.file_type) {
211
        // for compatibility
212
5.45k
        _system_properties.system_type = _range.file_type;
213
5.45k
    } else {
214
322
        _system_properties.system_type = _params.file_type;
215
322
    }
216
5.77k
    _system_properties.properties = _params.properties;
217
5.77k
    _system_properties.hdfs_params = _params.hdfs_params;
218
5.77k
    if (_params.__isset.broker_addresses) {
219
180
        _system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
220
180
                                                   _params.broker_addresses.end());
221
180
    }
222
5.77k
}
223
224
5.78k
void CsvReader::_init_file_description() {
225
5.78k
    _file_description.path = _range.path;
226
5.78k
    _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
227
5.78k
    if (_range.__isset.fs_name) {
228
4.90k
        _file_description.fs_name = _range.fs_name;
229
4.90k
    }
230
5.78k
    if (_range.__isset.file_cache_admission) {
231
5.18k
        _file_description.file_cache_admission = _range.file_cache_admission;
232
5.18k
    }
233
5.78k
}
234
235
5.36k
Status CsvReader::init_reader(bool is_load) {
236
    // set the skip lines and start offset
237
5.36k
    _start_offset = _range.start_offset;
238
5.36k
    if (_start_offset == 0) {
239
        // check header typer first
240
5.23k
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
241
5.23k
            !_params.file_attributes.header_type.empty()) {
242
48
            std::string header_type = to_lower(_params.file_attributes.header_type);
243
48
            if (header_type == BeConsts::CSV_WITH_NAMES) {
244
40
                _skip_lines = 1;
245
40
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
246
8
                _skip_lines = 2;
247
8
            }
248
5.18k
        } else if (_params.file_attributes.__isset.skip_lines) {
249
5.18k
            _skip_lines = _params.file_attributes.skip_lines;
250
5.18k
        }
251
5.23k
    } else if (_start_offset != 0) {
252
126
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
253
126
            (_file_compress_type == TFileCompressType::UNKNOWN &&
254
126
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
255
0
            return Status::InternalError<false>("For now we do not support split compressed file");
256
0
        }
257
        // pre-read to promise first line skipped always read
258
126
        int64_t pre_read_len = std::min(
259
126
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
260
126
                _start_offset);
261
126
        _start_offset -= pre_read_len;
262
126
        _size += pre_read_len;
263
        // not first range will always skip one line
264
126
        _skip_lines = 1;
265
126
    }
266
267
5.36k
    _use_nullable_string_opt.resize(_file_slot_descs.size());
268
195k
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
269
190k
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
270
190k
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
271
30.3k
            _use_nullable_string_opt[i] = 1;
272
30.3k
        }
273
190k
    }
274
275
5.36k
    RETURN_IF_ERROR(_init_options());
276
5.36k
    RETURN_IF_ERROR(_create_file_reader(false));
277
5.36k
    RETURN_IF_ERROR(_create_decompressor());
278
5.36k
    RETURN_IF_ERROR(_create_line_reader());
279
280
5.36k
    _is_load = is_load;
281
5.36k
    if (!_is_load) {
282
        // For query task, there are 2 slot mapping.
283
        // One is from file slot to values in line.
284
        //      eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
285
        //      the _col_idxs will save: 0, 2, 4
286
        // The other is from file slot to columns in output block
287
        //      eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
288
        //      where "p1" is the partition col which does not exist in file
289
        //      the _file_slot_idx_map will save: 1, 2, 3
290
5.18k
        DCHECK(_params.__isset.column_idxs);
291
5.18k
        _col_idxs = _params.column_idxs;
292
5.18k
        int idx = 0;
293
188k
        for (const auto& slot_info : _params.required_slots) {
294
188k
            if (slot_info.is_file_slot) {
295
186k
                _file_slot_idx_map.push_back(idx);
296
186k
            }
297
188k
            idx++;
298
188k
        }
299
5.18k
    } else {
300
        // For load task, the column order is same as file column order
301
176
        int i = 0;
302
3.94k
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
303
3.94k
            _col_idxs.push_back(i++);
304
3.94k
        }
305
176
    }
306
307
5.36k
    _line_reader_eof = false;
308
5.36k
    return Status::OK();
309
5.36k
}
310
311
// !FIXME: Here we should use MutableBlock
312
10.9k
Status CsvReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
313
10.9k
    if (_line_reader_eof) {
314
5.34k
        *eof = true;
315
5.34k
        return Status::OK();
316
5.34k
    }
317
318
5.62k
    const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
319
5.62k
    const int64_t max_block_bytes =
320
5.62k
            (_state->query_type() == TQueryType::LOAD && config::load_reader_max_block_bytes > 0)
321
5.62k
                    ? config::load_reader_max_block_bytes
322
5.62k
                    : 0;
323
5.62k
    size_t rows = 0;
324
5.62k
    size_t block_bytes = 0;
325
326
5.62k
    bool success = false;
327
5.62k
    bool is_remove_bom = false;
328
5.62k
    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
329
140k
        while (rows < batch_size && !_line_reader_eof &&
330
140k
               (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) {
331
139k
            const uint8_t* ptr = nullptr;
332
139k
            size_t size = 0;
333
139k
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
334
335
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
336
            // is_remove_bom means _remove_bom should only execute once
337
140k
            if (_skip_lines == 0 && !is_remove_bom) {
338
564
                ptr = _remove_bom(ptr, size);
339
564
                is_remove_bom = true;
340
564
            }
341
342
            // _skip_lines > 0 means we do not need to remove bom
343
139k
            if (_skip_lines > 0) {
344
0
                _skip_lines--;
345
0
                is_remove_bom = true;
346
0
                continue;
347
0
            }
348
139k
            if (size == 0) {
349
564
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
350
0
                    ++rows;
351
0
                }
352
                // Read empty line, continue
353
564
                continue;
354
564
            }
355
356
139k
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
357
139k
            ++rows;
358
139k
            block_bytes += size;
359
139k
        }
360
564
        auto mutate_columns = block->mutate_columns();
361
564
        for (auto& col : mutate_columns) {
362
564
            col->resize(rows);
363
564
        }
364
564
        block->set_columns(std::move(mutate_columns));
365
5.06k
    } else {
366
5.06k
        auto columns = block->mutate_columns();
367
3.03M
        while (rows < batch_size && !_line_reader_eof &&
368
3.03M
               (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) {
369
3.03M
            const uint8_t* ptr = nullptr;
370
3.03M
            size_t size = 0;
371
3.03M
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
372
373
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
374
            // is_remove_bom means _remove_bom should only execute once
375
3.03M
            if (!is_remove_bom && _skip_lines == 0) {
376
4.89k
                ptr = _remove_bom(ptr, size);
377
4.89k
                is_remove_bom = true;
378
4.89k
            }
379
380
            // _skip_lines > 0 means we do not remove bom
381
3.03M
            if (_skip_lines > 0) {
382
196
                _skip_lines--;
383
196
                is_remove_bom = true;
384
196
                continue;
385
196
            }
386
3.03M
            if (size == 0) {
387
4.82k
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
388
12
                    RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows));
389
12
                }
390
                // Read empty line, continue
391
4.82k
                continue;
392
4.82k
            }
393
394
3.02M
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
395
3.02M
            if (!success) {
396
0
                continue;
397
0
            }
398
3.02M
            RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
399
3.02M
            block_bytes += size;
400
3.02M
        }
401
5.06k
        block->set_columns(std::move(columns));
402
5.06k
    }
403
404
5.62k
    *eof = (rows == 0);
405
5.62k
    *read_rows = rows;
406
407
5.62k
    return Status::OK();
408
5.62k
}
409
410
Status CsvReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
411
5.36k
                              std::unordered_set<std::string>* missing_cols) {
412
190k
    for (const auto& slot : _file_slot_descs) {
413
190k
        name_to_type->emplace(slot->col_name(), slot->type());
414
190k
    }
415
5.36k
    return Status::OK();
416
5.36k
}
417
418
// init decompressor, file reader and line reader for parsing schema
419
416
Status CsvReader::init_schema_reader() {
420
416
    _start_offset = _range.start_offset;
421
416
    if (_start_offset != 0) {
422
0
        return Status::InvalidArgument(
423
0
                "start offset of TFileRangeDesc must be zero in get parsered schema");
424
0
    }
425
416
    if (_params.file_type == TFileType::FILE_BROKER) {
426
0
        return Status::InternalError<false>(
427
0
                "Getting parsered schema from csv file do not support stream load and broker "
428
0
                "load.");
429
0
    }
430
431
    // csv file without names line and types line.
432
416
    _read_line = 1;
433
416
    _is_parse_name = false;
434
435
416
    if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
436
416
        !_params.file_attributes.header_type.empty()) {
437
44
        std::string header_type = to_lower(_params.file_attributes.header_type);
438
44
        if (header_type == BeConsts::CSV_WITH_NAMES) {
439
38
            _is_parse_name = true;
440
38
        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
441
6
            _read_line = 2;
442
6
            _is_parse_name = true;
443
6
        }
444
44
    }
445
446
416
    RETURN_IF_ERROR(_init_options());
447
416
    RETURN_IF_ERROR(_create_file_reader(true));
448
416
    RETURN_IF_ERROR(_create_decompressor());
449
416
    RETURN_IF_ERROR(_create_line_reader());
450
416
    return Status::OK();
451
416
}
452
453
Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
454
416
                                    std::vector<DataTypePtr>* col_types) {
455
416
    if (_read_line == 1) {
456
410
        if (!_is_parse_name) { //parse csv file without names and types
457
372
            size_t col_nums = 0;
458
372
            RETURN_IF_ERROR(_parse_col_nums(&col_nums));
459
5.08k
            for (size_t i = 0; i < col_nums; ++i) {
460
4.71k
                col_names->emplace_back("c" + std::to_string(i + 1));
461
4.71k
            }
462
366
        } else { // parse csv file with names
463
38
            RETURN_IF_ERROR(_parse_col_names(col_names));
464
38
        }
465
466
5.24k
        for (size_t j = 0; j < col_names->size(); ++j) {
467
4.83k
            col_types->emplace_back(
468
4.83k
                    DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
469
4.83k
        }
470
404
    } else { // parse csv file with names and types
471
6
        RETURN_IF_ERROR(_parse_col_names(col_names));
472
6
        RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
473
6
    }
474
410
    return Status::OK();
475
416
}
476
477
17.1M
Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
478
17.1M
    auto& null_column = assert_cast<ColumnNullable&>(column);
479
17.1M
    if (_empty_field_as_null) {
480
0
        if (slice.size == 0) {
481
0
            null_column.insert_data(nullptr, 0);
482
0
            return Status::OK();
483
0
        }
484
0
    }
485
17.1M
    if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) {
486
17.1M
        if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
487
456
            null_column.insert_data(nullptr, 0);
488
456
            return Status::OK();
489
456
        }
490
17.1M
    }
491
17.1M
    static DataTypeStringSerDe stringSerDe(TYPE_STRING);
492
17.1M
    auto st = stringSerDe.deserialize_one_cell_from_csv(null_column.get_nested_column(), slice,
493
17.1M
                                                        _options);
494
17.1M
    if (!st.ok()) {
495
        // fill null if fail
496
0
        null_column.insert_data(nullptr, 0); // 0 is meaningless here
497
0
        return Status::OK();
498
0
    }
499
    // fill not null if success
500
17.1M
    null_column.get_null_map_data().push_back(0);
501
17.1M
    return Status::OK();
502
17.1M
}
503
504
1.20k
Status CsvReader::_init_options() {
505
    // get column_separator and line_delimiter
506
1.20k
    _value_separator = _params.file_attributes.text_params.column_separator;
507
1.20k
    _value_separator_length = _value_separator.size();
508
1.20k
    _line_delimiter = _params.file_attributes.text_params.line_delimiter;
509
1.20k
    _line_delimiter_length = _line_delimiter.size();
510
1.20k
    if (_params.file_attributes.text_params.__isset.enclose) {
511
1.19k
        _enclose = _params.file_attributes.text_params.enclose;
512
1.19k
    }
513
1.20k
    if (_params.file_attributes.text_params.__isset.escape) {
514
1.19k
        _escape = _params.file_attributes.text_params.escape;
515
1.19k
    }
516
517
1.20k
    _trim_tailing_spaces =
518
1.20k
            (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
519
520
1.20k
    _options.escape_char = _escape;
521
1.20k
    _options.quote_char = _enclose;
522
523
1.20k
    if (_params.file_attributes.text_params.collection_delimiter.empty()) {
524
1.20k
        _options.collection_delim = ',';
525
1.20k
    } else {
526
0
        _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
527
0
    }
528
1.20k
    if (_params.file_attributes.text_params.mapkv_delimiter.empty()) {
529
1.20k
        _options.map_key_delim = ':';
530
1.20k
    } else {
531
0
        _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
532
0
    }
533
534
1.20k
    if (_params.file_attributes.text_params.__isset.null_format) {
535
34
        _options.null_format = _params.file_attributes.text_params.null_format.data();
536
34
        _options.null_len = _params.file_attributes.text_params.null_format.length();
537
34
    }
538
539
1.20k
    if (_params.file_attributes.__isset.trim_double_quotes) {
540
1.19k
        _trim_double_quotes = _params.file_attributes.trim_double_quotes;
541
1.19k
    }
542
1.20k
    _options.converted_from_string = _trim_double_quotes;
543
544
1.20k
    if (_state != nullptr) {
545
790
        _keep_cr = _state->query_options().keep_carriage_return;
546
790
    }
547
548
1.20k
    if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
549
1.16k
        _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null;
550
1.16k
    }
551
1.20k
    return Status::OK();
552
1.20k
}
553
554
5.77k
Status CsvReader::_create_decompressor() {
555
5.78k
    if (_file_compress_type != TFileCompressType::UNKNOWN) {
556
5.78k
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
557
18.4E
    } else {
558
18.4E
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor));
559
18.4E
    }
560
561
5.77k
    return Status::OK();
562
5.77k
}
563
564
5.77k
Status CsvReader::_create_file_reader(bool need_schema) {
565
5.77k
    if (_params.file_type == TFileType::FILE_STREAM) {
566
108
        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
567
108
                                                        need_schema));
568
5.67k
    } else {
569
18.4E
        _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
570
5.67k
        io::FileReaderOptions reader_options =
571
5.67k
                FileFactory::get_reader_options(_state, _file_description);
572
5.67k
        io::FileReaderSPtr file_reader;
573
5.67k
        if (_io_ctx_holder) {
574
412
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
575
412
                    _profile, _system_properties, _file_description, reader_options,
576
412
                    io::DelegateReader::AccessMode::SEQUENTIAL,
577
412
                    std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
578
412
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
579
5.25k
        } else {
580
5.25k
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
581
5.25k
                    _profile, _system_properties, _file_description, reader_options,
582
5.25k
                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
583
5.25k
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
584
5.25k
        }
585
5.67k
        _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
586
5.66k
                                                                         _io_ctx->file_reader_stats)
587
5.67k
                               : file_reader;
588
5.67k
    }
589
5.77k
    if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
590
5.77k
        _params.file_type != TFileType::FILE_BROKER) {
591
0
        return Status::EndOfFile("init reader failed, empty csv file: " + _range.path);
592
0
    }
593
5.77k
    return Status::OK();
594
5.77k
}
595
596
1.20k
Status CsvReader::_create_line_reader() {
597
1.20k
    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
598
1.20k
    if (_enclose == 0) {
599
1.14k
        text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
600
1.14k
                _line_delimiter, _line_delimiter_length, _keep_cr);
601
1.14k
        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
602
1.14k
                _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
603
604
1.14k
    } else {
605
        // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0
606
56
        size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0;
607
56
        _enclose_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>(
608
56
                _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
609
56
                col_sep_num, _enclose, _escape, _keep_cr);
610
56
        text_line_reader_ctx = _enclose_reader_ctx;
611
612
56
        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
613
56
                _trim_tailing_spaces, true, _enclose_reader_ctx, _value_separator_length, _enclose);
614
56
    }
615
1.20k
    switch (_file_format_type) {
616
1.20k
    case TFileFormatType::FORMAT_CSV_PLAIN:
617
1.20k
        [[fallthrough]];
618
1.20k
    case TFileFormatType::FORMAT_CSV_GZ:
619
1.20k
        [[fallthrough]];
620
1.20k
    case TFileFormatType::FORMAT_CSV_BZ2:
621
1.20k
        [[fallthrough]];
622
1.20k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
623
1.20k
        [[fallthrough]];
624
1.20k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
625
1.20k
        [[fallthrough]];
626
1.20k
    case TFileFormatType::FORMAT_CSV_LZOP:
627
1.20k
        [[fallthrough]];
628
1.20k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
629
1.20k
        [[fallthrough]];
630
1.20k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
631
1.20k
        _line_reader =
632
1.20k
                NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
633
1.20k
                                                      text_line_reader_ctx, _size, _start_offset);
634
635
1.20k
        break;
636
0
    case TFileFormatType::FORMAT_PROTO:
637
0
        _fields_splitter = std::make_unique<CsvProtoFieldSplitter>();
638
0
        _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader);
639
0
        break;
640
0
    default:
641
0
        return Status::InternalError<false>(
642
0
                "Unknown format type, cannot init line reader in csv reader, type={}",
643
0
                _file_format_type);
644
1.20k
    }
645
1.20k
    return Status::OK();
646
1.20k
}
647
648
4.94k
Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
649
4.94k
    return serde->deserialize_one_cell_from_csv(column, slice, _options);
650
4.94k
}
651
652
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
653
3.05M
                                     std::vector<MutableColumnPtr>& columns, size_t* rows) {
654
3.05M
    bool is_success = false;
655
656
3.05M
    RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
657
3.05M
    if (UNLIKELY(!is_success)) {
658
        // If not success, which means we met an invalid row, filter this row and return.
659
0
        return Status::OK();
660
0
    }
661
662
28.4M
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
663
25.3M
        int col_idx = _col_idxs[i];
664
        // col idx is out of range, fill with null format
665
25.3M
        auto value = col_idx < _split_values.size()
666
25.3M
                             ? _split_values[col_idx]
667
18.4E
                             : Slice(_options.null_format, _options.null_len);
668
669
25.3M
        IColumn* col_ptr = columns[i].get();
670
25.3M
        if (!_is_load) {
671
            // block is a Block*, and get_by_position returns a ColumnPtr,
672
            // which is a const pointer. Therefore, using const_cast is permissible.
673
20.5M
            col_ptr = const_cast<IColumn*>(
674
20.5M
                    block->get_by_position(_file_slot_idx_map[i]).column.get());
675
20.5M
        }
676
677
25.3M
        if (_use_nullable_string_opt[i]) {
678
            // For load task, we always read "string" from file.
679
            // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe.
680
            // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
681
18.7M
            RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
682
18.7M
        } else {
683
6.61M
            RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
684
6.61M
        }
685
25.3M
    }
686
3.05M
    ++(*rows);
687
688
3.05M
    return Status::OK();
689
3.05M
}
690
691
Status CsvReader::_fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns,
692
12
                                   size_t* rows) {
693
48
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
694
36
        IColumn* col_ptr = columns[i].get();
695
36
        if (!_is_load) {
696
            // block is a Block*, and get_by_position returns a ColumnPtr,
697
            // which is a const pointer. Therefore, using const_cast is permissible.
698
36
            col_ptr = const_cast<IColumn*>(
699
36
                    block->get_by_position(_file_slot_idx_map[i]).column.get());
700
36
        }
701
36
        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
702
36
        null_column.insert_data(nullptr, 0);
703
36
    }
704
12
    ++(*rows);
705
12
    return Status::OK();
706
12
}
707
708
2.80M
Status CsvReader::_validate_line(const Slice& line, bool* success) {
709
2.80M
    if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) {
710
2
        if (!_is_load) {
711
2
            return Status::InternalError<false>("Only support csv data in utf8 codec");
712
2
        } else {
713
0
            _counter->num_rows_filtered++;
714
0
            *success = false;
715
0
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
716
0
                    [&]() -> std::string { return std::string(line.data, line.size); },
717
0
                    [&]() -> std::string {
718
0
                        return "Invalid file encoding: all CSV files must be UTF-8 encoded";
719
0
                    }));
720
0
            return Status::OK();
721
0
        }
722
2
    }
723
2.80M
    *success = true;
724
2.80M
    return Status::OK();
725
2.80M
}
726
727
3.05M
Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
728
3.05M
    _split_line(line);
729
730
3.05M
    if (_is_load) {
731
        // Only check for load task. For query task, the non exist column will be filled "null".
732
        // if actual column number in csv file is not equal to _file_slot_descs.size()
733
        // then filter this line.
734
119k
        bool ignore_col = false;
735
119k
        ignore_col = _params.__isset.file_attributes &&
736
119k
                     _params.file_attributes.__isset.ignore_csv_redundant_col &&
737
119k
                     _params.file_attributes.ignore_csv_redundant_col;
738
739
119k
        if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
740
119k
            (ignore_col && _split_values.size() < _file_slot_descs.size())) {
741
0
            _counter->num_rows_filtered++;
742
0
            *success = false;
743
0
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
744
0
                    [&]() -> std::string { return std::string(line.data, line.size); },
745
0
                    [&]() -> std::string {
746
0
                        fmt::memory_buffer error_msg;
747
0
                        fmt::format_to(error_msg,
748
0
                                       "Column count mismatch: expected {}, but found {}",
749
0
                                       _file_slot_descs.size(), _split_values.size());
750
0
                        std::string escaped_separator =
751
0
                                std::regex_replace(_value_separator, std::regex("\t"), "\\t");
752
0
                        std::string escaped_delimiter =
753
0
                                std::regex_replace(_line_delimiter, std::regex("\n"), "\\n");
754
0
                        fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator,
755
0
                                       escaped_delimiter);
756
0
                        if (_enclose != 0) {
757
0
                            fmt::format_to(error_msg, " encl:{}", _enclose);
758
0
                        }
759
0
                        if (_escape != 0) {
760
0
                            fmt::format_to(error_msg, " esc:{}", _escape);
761
0
                        }
762
0
                        fmt::format_to(error_msg, ")");
763
0
                        return fmt::to_string(error_msg);
764
0
                    }));
765
0
            return Status::OK();
766
0
        }
767
119k
    }
768
769
3.05M
    *success = true;
770
3.05M
    return Status::OK();
771
3.05M
}
772
773
3.05M
void CsvReader::_split_line(const Slice& line) {
774
3.05M
    _split_values.clear();
775
3.05M
    _fields_splitter->split_line(line, &_split_values);
776
3.05M
}
777
778
372
Status CsvReader::_parse_col_nums(size_t* col_nums) {
779
372
    const uint8_t* ptr = nullptr;
780
372
    size_t size = 0;
781
372
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
782
372
    if (size == 0) {
783
2
        return Status::InternalError<false>(
784
2
                "The first line is empty, can not parse column numbers");
785
2
    }
786
370
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
787
4
        return Status::InternalError<false>("Only support csv data in utf8 codec");
788
4
    }
789
366
    ptr = _remove_bom(ptr, size);
790
366
    _split_line(Slice(ptr, size));
791
366
    *col_nums = _split_values.size();
792
366
    return Status::OK();
793
370
}
794
795
44
Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
796
44
    const uint8_t* ptr = nullptr;
797
44
    size_t size = 0;
798
    // no use of _line_reader_eof
799
44
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
800
44
    if (size == 0) {
801
0
        return Status::InternalError<false>("The first line is empty, can not parse column names");
802
0
    }
803
44
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
804
0
        return Status::InternalError<false>("Only support csv data in utf8 codec");
805
0
    }
806
44
    ptr = _remove_bom(ptr, size);
807
44
    _split_line(Slice(ptr, size));
808
158
    for (auto _split_value : _split_values) {
809
158
        col_names->emplace_back(_split_value.to_string());
810
158
    }
811
44
    return Status::OK();
812
44
}
813
814
// TODO(ftw): parse type
815
6
Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) {
816
    // delete after.
817
42
    for (size_t i = 0; i < col_nums; ++i) {
818
36
        col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
819
36
    }
820
821
    // 1. check _line_reader_eof
822
    // 2. read line
823
    // 3. check utf8
824
    // 4. check size
825
    // 5. check _split_values.size must equal to col_nums.
826
    // 6. fill col_types
827
6
    return Status::OK();
828
6
}
829
830
5.86k
const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) {
831
5.86k
    if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) {
832
0
        LOG(INFO) << "remove bom";
833
0
        constexpr size_t bom_size = 3;
834
0
        size -= bom_size;
835
        // In enclose mode, column_sep_positions were computed on the original line
836
        // (including BOM). After shifting the pointer, we must adjust those positions
837
        // so they remain correct relative to the new start.
838
0
        if (_enclose_reader_ctx) {
839
0
            _enclose_reader_ctx->adjust_column_sep_positions(bom_size);
840
0
        }
841
0
        return ptr + bom_size;
842
0
    }
843
5.86k
    return ptr;
844
5.86k
}
845
846
5.36k
Status CsvReader::close() {
847
5.36k
    if (_line_reader) {
848
5.36k
        _line_reader->close();
849
5.36k
    }
850
851
5.36k
    if (_file_reader) {
852
5.36k
        RETURN_IF_ERROR(_file_reader->close());
853
5.36k
    }
854
855
5.36k
    return Status::OK();
856
5.36k
}
857
858
#include "common/compile_check_end.h"
859
} // namespace doris