Coverage Report

Created: 2026-04-22 09:47

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/csv/csv_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/csv/csv_reader.h"
19
20
#include <fmt/format.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
#include <gen_cpp/Types_types.h>
23
#include <glog/logging.h>
24
25
#include <algorithm>
26
#include <cstddef>
27
#include <map>
28
#include <memory>
29
#include <ostream>
30
#include <regex>
31
#include <utility>
32
33
#include "common/compiler_util.h" // IWYU pragma: keep
34
#include "common/config.h"
35
#include "common/consts.h"
36
#include "common/status.h"
37
#include "core/block/block.h"
38
#include "core/block/column_with_type_and_name.h"
39
#include "core/data_type/data_type_factory.hpp"
40
#include "exec/scan/scanner.h"
41
#include "format/file_reader/new_plain_binary_line_reader.h"
42
#include "format/file_reader/new_plain_text_line_reader.h"
43
#include "format/line_reader.h"
44
#include "io/file_factory.h"
45
#include "io/fs/broker_file_reader.h"
46
#include "io/fs/buffered_reader.h"
47
#include "io/fs/file_reader.h"
48
#include "io/fs/s3_file_reader.h"
49
#include "io/fs/tracing_file_reader.h"
50
#include "runtime/descriptors.h"
51
#include "runtime/runtime_state.h"
52
#include "util/decompressor.h"
53
#include "util/string_util.h"
54
#include "util/utf8_check.h"
55
56
namespace doris {
57
class RuntimeProfile;
58
class IColumn;
59
namespace io {
60
struct IOContext;
61
enum class FileCachePolicy : uint8_t;
62
} // namespace io
63
} // namespace doris
64
65
namespace doris {
66
67
712
void EncloseCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
68
712
    const char* data = line.data;
69
712
    const auto& column_sep_positions = _text_line_reader_ctx->column_sep_positions();
70
712
    size_t value_start_offset = 0;
71
2.59k
    for (auto idx : column_sep_positions) {
72
2.59k
        process_value_func(data, value_start_offset, idx - value_start_offset, _trimming_char,
73
2.59k
                           splitted_values);
74
2.59k
        value_start_offset = idx + _value_sep_len;
75
2.59k
    }
76
712
    if (line.size >= value_start_offset) {
77
        // process the last column
78
710
        process_value_func(data, value_start_offset, line.size - value_start_offset, _trimming_char,
79
710
                           splitted_values);
80
710
    }
81
712
}
82
83
void PlainCsvTextFieldSplitter::_split_field_single_char(const Slice& line,
84
11.8M
                                                         std::vector<Slice>* splitted_values) {
85
11.8M
    const char* data = line.data;
86
11.8M
    const size_t size = line.size;
87
11.8M
    size_t value_start = 0;
88
1.53G
    for (size_t i = 0; i < size; ++i) {
89
1.52G
        if (data[i] == _value_sep[0]) {
90
185M
            process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
91
185M
            value_start = i + _value_sep_len;
92
185M
        }
93
1.52G
    }
94
11.8M
    process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
95
11.8M
}
96
97
void PlainCsvTextFieldSplitter::_split_field_multi_char(const Slice& line,
98
162
                                                        std::vector<Slice>* splitted_values) {
99
162
    size_t start = 0;  // point to the start pos of next col value.
100
162
    size_t curpos = 0; // point to the start pos of separator matching sequence.
101
102
    // value_sep : AAAA
103
    // line.data : 1234AAAA5678
104
    // -> 1234,5678
105
106
    //    start   start
107
    //      ▼       ▼
108
    //      1234AAAA5678\0
109
    //          ▲       ▲
110
    //      curpos     curpos
111
112
    //kmp
113
162
    std::vector<int> next(_value_sep_len);
114
162
    next[0] = -1;
115
403
    for (int i = 1, j = -1; i < _value_sep_len; i++) {
116
261
        while (j > -1 && _value_sep[i] != _value_sep[j + 1]) {
117
20
            j = next[j];
118
20
        }
119
241
        if (_value_sep[i] == _value_sep[j + 1]) {
120
130
            j++;
121
130
        }
122
241
        next[i] = j;
123
241
    }
124
125
6.96k
    for (int i = 0, j = -1; i < line.size; i++) {
126
        // i : line
127
        // j : _value_sep
128
7.09k
        while (j > -1 && line[i] != _value_sep[j + 1]) {
129
290
            j = next[j];
130
290
        }
131
6.80k
        if (line[i] == _value_sep[j + 1]) {
132
2.17k
            j++;
133
2.17k
        }
134
6.80k
        if (j == _value_sep_len - 1) {
135
832
            curpos = i - _value_sep_len + 1;
136
137
            /*
138
             * column_separator : "xx"
139
             * data.csv :  data1xxxxdata2
140
             *
141
             * Parse incorrectly:
142
             *      data1[xx]xxdata2
143
             *      data1x[xx]xdata2
144
             *      data1xx[xx]data2
145
             * The string "xxxx" is parsed into three "xx" delimiters.
146
             *
147
             * Parse correctly:
148
             *      data1[xx]xxdata2
149
             *      data1xx[xx]data2
150
             */
151
152
832
            if (curpos >= start) {
153
803
                process_value_func(line.data, start, curpos - start, _trimming_char,
154
803
                                   splitted_values);
155
803
                start = i + 1;
156
803
            }
157
158
832
            j = next[j];
159
832
        }
160
6.80k
    }
161
162
    process_value_func(line.data, start, line.size - start, _trimming_char, splitted_values);
162
162
}
163
164
11.8M
void PlainCsvTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
165
11.8M
    if (is_single_char_delim) {
166
11.8M
        _split_field_single_char(line, splitted_values);
167
18.4E
    } else {
168
18.4E
        _split_field_multi_char(line, splitted_values);
169
18.4E
    }
170
11.8M
}
171
172
CsvReader::CsvReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
173
                     const TFileScanRangeParams& params, const TFileRangeDesc& range,
174
                     const std::vector<SlotDescriptor*>& file_slot_descs, io::IOContext* io_ctx,
175
                     std::shared_ptr<io::IOContext> io_ctx_holder)
176
2.38k
        : _profile(profile),
177
2.38k
          _params(params),
178
2.38k
          _file_reader(nullptr),
179
2.38k
          _line_reader(nullptr),
180
2.38k
          _decompressor(nullptr),
181
2.38k
          _state(state),
182
2.38k
          _counter(counter),
183
2.38k
          _range(range),
184
2.38k
          _file_slot_descs(file_slot_descs),
185
2.38k
          _line_reader_eof(false),
186
2.38k
          _skip_lines(0),
187
2.38k
          _io_ctx(io_ctx),
188
2.38k
          _io_ctx_holder(std::move(io_ctx_holder)) {
189
2.38k
    if (_io_ctx == nullptr && _io_ctx_holder) {
190
0
        _io_ctx = _io_ctx_holder.get();
191
0
    }
192
2.38k
    _file_format_type = _params.format_type;
193
2.38k
    _is_proto_format = _file_format_type == TFileFormatType::FORMAT_PROTO;
194
2.38k
    if (_range.__isset.compress_type) {
195
        // for compatibility
196
599
        _file_compress_type = _range.compress_type;
197
1.78k
    } else {
198
1.78k
        _file_compress_type = _params.compress_type;
199
1.78k
    }
200
2.38k
    _size = _range.size;
201
202
2.38k
    _split_values.reserve(_file_slot_descs.size());
203
2.38k
    _init_system_properties();
204
2.38k
    _init_file_description();
205
2.38k
    _serdes = create_data_type_serdes(_file_slot_descs);
206
2.38k
}
207
208
2.38k
void CsvReader::_init_system_properties() {
209
2.38k
    if (_range.__isset.file_type) {
210
        // for compatibility
211
579
        _system_properties.system_type = _range.file_type;
212
1.80k
    } else {
213
1.80k
        _system_properties.system_type = _params.file_type;
214
1.80k
    }
215
2.38k
    _system_properties.properties = _params.properties;
216
2.38k
    _system_properties.hdfs_params = _params.hdfs_params;
217
2.38k
    if (_params.__isset.broker_addresses) {
218
1.67k
        _system_properties.broker_addresses.assign(_params.broker_addresses.begin(),
219
1.67k
                                                   _params.broker_addresses.end());
220
1.67k
    }
221
2.38k
}
222
223
2.38k
void CsvReader::_init_file_description() {
224
2.38k
    _file_description.path = _range.path;
225
18.4E
    _file_description.file_size = _range.__isset.file_size ? _range.file_size : -1;
226
2.38k
    if (_range.__isset.fs_name) {
227
0
        _file_description.fs_name = _range.fs_name;
228
0
    }
229
2.38k
    if (_range.__isset.file_cache_admission) {
230
215
        _file_description.file_cache_admission = _range.file_cache_admission;
231
215
    }
232
2.38k
}
233
234
0
Status CsvReader::init_reader(bool is_load) {
235
    // set the skip lines and start offset
236
0
    _start_offset = _range.start_offset;
237
0
    if (_start_offset == 0) {
238
        // check header typer first
239
0
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
240
0
            !_params.file_attributes.header_type.empty()) {
241
0
            std::string header_type = to_lower(_params.file_attributes.header_type);
242
0
            if (header_type == BeConsts::CSV_WITH_NAMES) {
243
0
                _skip_lines = 1;
244
0
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
245
0
                _skip_lines = 2;
246
0
            }
247
0
        } else if (_params.file_attributes.__isset.skip_lines) {
248
0
            _skip_lines = _params.file_attributes.skip_lines;
249
0
        }
250
0
    } else if (_start_offset != 0) {
251
0
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
252
0
            (_file_compress_type == TFileCompressType::UNKNOWN &&
253
0
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
254
0
            return Status::InternalError<false>("For now we do not support split compressed file");
255
0
        }
256
        // pre-read to promise first line skipped always read
257
0
        int64_t pre_read_len = std::min(
258
0
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
259
0
                _start_offset);
260
0
        _start_offset -= pre_read_len;
261
0
        _size += pre_read_len;
262
        // not first range will always skip one line
263
0
        _skip_lines = 1;
264
0
    }
265
266
0
    _use_nullable_string_opt.resize(_file_slot_descs.size());
267
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
268
0
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
269
0
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
270
0
            _use_nullable_string_opt[i] = 1;
271
0
        }
272
0
    }
273
274
0
    RETURN_IF_ERROR(_init_options());
275
0
    RETURN_IF_ERROR(_create_file_reader(false));
276
0
    RETURN_IF_ERROR(_create_decompressor());
277
0
    RETURN_IF_ERROR(_create_line_reader());
278
279
0
    _is_load = is_load;
280
0
    if (!_is_load) {
281
        // For query task, there are 2 slot mapping.
282
        // One is from file slot to values in line.
283
        //      eg, the file_slot_descs is k1, k3, k5, and values in line are k1, k2, k3, k4, k5
284
        //      the _col_idxs will save: 0, 2, 4
285
        // The other is from file slot to columns in output block
286
        //      eg, the file_slot_descs is k1, k3, k5, and columns in block are p1, k1, k3, k5
287
        //      where "p1" is the partition col which does not exist in file
288
        //      the _file_slot_idx_map will save: 1, 2, 3
289
0
        DCHECK(_params.__isset.column_idxs);
290
0
        _col_idxs = _params.column_idxs;
291
0
        int idx = 0;
292
0
        for (const auto& slot_info : _params.required_slots) {
293
0
            if (slot_info.is_file_slot) {
294
0
                _file_slot_idx_map.push_back(idx);
295
0
            }
296
0
            idx++;
297
0
        }
298
0
    } else {
299
        // For load task, the column order is same as file column order
300
0
        int i = 0;
301
0
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
302
0
            _col_idxs.push_back(i++);
303
0
        }
304
0
    }
305
306
0
    _line_reader_eof = false;
307
0
    return Status::OK();
308
0
}
309
310
// ---- Unified init_reader(ReaderInitContext*) overrides ----
311
312
1.99k
Status CsvReader::_open_file_reader(ReaderInitContext* base_ctx) {
313
1.99k
    _start_offset = _range.start_offset;
314
1.99k
    if (_start_offset == 0) {
315
1.99k
        if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
316
1.99k
            !_params.file_attributes.header_type.empty()) {
317
44
            std::string header_type = to_lower(_params.file_attributes.header_type);
318
44
            if (header_type == BeConsts::CSV_WITH_NAMES) {
319
24
                _skip_lines = 1;
320
24
            } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
321
20
                _skip_lines = 2;
322
20
            }
323
1.95k
        } else if (_params.file_attributes.__isset.skip_lines) {
324
1.95k
            _skip_lines = _params.file_attributes.skip_lines;
325
1.95k
        }
326
1.99k
    } else if (_start_offset != 0) {
327
0
        if ((_file_compress_type != TFileCompressType::PLAIN) ||
328
0
            (_file_compress_type == TFileCompressType::UNKNOWN &&
329
0
             _file_format_type != TFileFormatType::FORMAT_CSV_PLAIN)) {
330
0
            return Status::InternalError<false>("For now we do not support split compressed file");
331
0
        }
332
0
        int64_t pre_read_len = std::min(
333
0
                static_cast<int64_t>(_params.file_attributes.text_params.line_delimiter.size()),
334
0
                _start_offset);
335
0
        _start_offset -= pre_read_len;
336
0
        _size += pre_read_len;
337
0
        _skip_lines = 1;
338
0
    }
339
340
1.99k
    RETURN_IF_ERROR(_init_options());
341
1.99k
    RETURN_IF_ERROR(_create_file_reader(false));
342
1.99k
    return Status::OK();
343
1.99k
}
344
345
1.99k
Status CsvReader::_do_init_reader(ReaderInitContext* base_ctx) {
346
1.99k
    auto* ctx = checked_context_cast<CsvInitContext>(base_ctx);
347
1.99k
    _is_load = ctx->is_load;
348
349
1.99k
    _use_nullable_string_opt.resize(_file_slot_descs.size());
350
17.4k
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
351
15.4k
        auto data_type_ptr = _file_slot_descs[i]->get_data_type_ptr();
352
15.4k
        if (data_type_ptr->is_nullable() && is_string_type(data_type_ptr->get_primitive_type())) {
353
15.4k
            _use_nullable_string_opt[i] = 1;
354
15.4k
        }
355
15.4k
    }
356
357
1.99k
    RETURN_IF_ERROR(_create_decompressor());
358
1.99k
    RETURN_IF_ERROR(_create_line_reader());
359
360
1.99k
    if (!_is_load) {
361
323
        DCHECK(_params.__isset.column_idxs);
362
323
        _col_idxs = _params.column_idxs;
363
323
        int idx = 0;
364
2.34k
        for (const auto& slot_info : _params.required_slots) {
365
2.34k
            if (slot_info.is_file_slot) {
366
2.34k
                _file_slot_idx_map.push_back(idx);
367
2.34k
            }
368
2.34k
            idx++;
369
2.34k
        }
370
1.67k
    } else {
371
1.67k
        int i = 0;
372
13.1k
        for (const auto& desc [[maybe_unused]] : _file_slot_descs) {
373
13.1k
            _col_idxs.push_back(i++);
374
13.1k
        }
375
1.67k
    }
376
1.99k
    _line_reader_eof = false;
377
1.99k
    return Status::OK();
378
1.99k
}
379
380
// !FIXME: Here we should use MutableBlock
381
6.58k
Status CsvReader::_do_get_next_block(Block* block, size_t* read_rows, bool* eof) {
382
6.58k
    if (_line_reader_eof) {
383
1.95k
        *eof = true;
384
1.95k
        return Status::OK();
385
1.95k
    }
386
387
4.63k
    const int batch_size = std::max(_state->batch_size(), (int)_MIN_BATCH_SIZE);
388
4.63k
    const int64_t max_block_bytes =
389
4.63k
            (_state->query_type() == TQueryType::LOAD && config::load_reader_max_block_bytes > 0)
390
4.63k
                    ? config::load_reader_max_block_bytes
391
4.63k
                    : 0;
392
4.63k
    size_t rows = 0;
393
4.63k
    size_t block_bytes = 0;
394
395
4.63k
    bool success = false;
396
4.63k
    bool is_remove_bom = false;
397
4.63k
    if (_push_down_agg_type == TPushAggOp::type::COUNT) {
398
0
        while (rows < batch_size && !_line_reader_eof &&
399
0
               (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) {
400
0
            const uint8_t* ptr = nullptr;
401
0
            size_t size = 0;
402
0
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
403
404
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
405
            // is_remove_bom means _remove_bom should only execute once
406
0
            if (_skip_lines == 0 && !is_remove_bom) {
407
0
                ptr = _remove_bom(ptr, size);
408
0
                is_remove_bom = true;
409
0
            }
410
411
            // _skip_lines > 0 means we do not need to remove bom
412
0
            if (_skip_lines > 0) {
413
0
                _skip_lines--;
414
0
                is_remove_bom = true;
415
0
                continue;
416
0
            }
417
0
            if (size == 0) {
418
0
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
419
0
                    ++rows;
420
0
                }
421
                // Read empty line, continue
422
0
                continue;
423
0
            }
424
425
0
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
426
0
            ++rows;
427
0
            block_bytes += size;
428
0
        }
429
0
        auto mutate_columns = block->mutate_columns();
430
0
        for (auto& col : mutate_columns) {
431
0
            col->resize(rows);
432
0
        }
433
0
        block->set_columns(std::move(mutate_columns));
434
4.63k
    } else {
435
4.63k
        auto columns = block->mutate_columns();
436
11.8M
        while (rows < batch_size && !_line_reader_eof &&
437
11.8M
               (max_block_bytes <= 0 || (int64_t)block_bytes < max_block_bytes)) {
438
11.8M
            const uint8_t* ptr = nullptr;
439
11.8M
            size_t size = 0;
440
11.8M
            RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
441
442
            // _skip_lines == 0 means this line is the actual data beginning line for the entire file
443
            // is_remove_bom means _remove_bom should only execute once
444
11.8M
            if (!is_remove_bom && _skip_lines == 0) {
445
4.56k
                ptr = _remove_bom(ptr, size);
446
4.56k
                is_remove_bom = true;
447
4.56k
            }
448
449
            // _skip_lines > 0 means we do not remove bom
450
11.8M
            if (_skip_lines > 0) {
451
119
                _skip_lines--;
452
119
                is_remove_bom = true;
453
119
                continue;
454
119
            }
455
11.8M
            if (size == 0) {
456
2.01k
                if (!_line_reader_eof && _state->is_read_csv_empty_line_as_null()) {
457
0
                    RETURN_IF_ERROR(_fill_empty_line(block, columns, &rows));
458
0
                }
459
                // Read empty line, continue
460
2.01k
                continue;
461
2.01k
            }
462
463
11.8M
            RETURN_IF_ERROR(_validate_line(Slice(ptr, size), &success));
464
11.8M
            if (!success) {
465
128
                continue;
466
128
            }
467
11.8M
            RETURN_IF_ERROR(_fill_dest_columns(Slice(ptr, size), block, columns, &rows));
468
11.8M
            block_bytes += size;
469
11.8M
        }
470
4.62k
        block->set_columns(std::move(columns));
471
4.62k
    }
472
473
4.62k
    *eof = (rows == 0);
474
4.62k
    *read_rows = rows;
475
476
4.62k
    return Status::OK();
477
4.63k
}
478
479
1.99k
Status CsvReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>* name_to_type) {
480
15.4k
    for (const auto& slot : _file_slot_descs) {
481
15.4k
        name_to_type->emplace(slot->col_name(), slot->type());
482
15.4k
    }
483
1.99k
    return Status::OK();
484
1.99k
}
485
486
// init decompressor, file reader and line reader for parsing schema
487
384
Status CsvReader::init_schema_reader() {
488
384
    _start_offset = _range.start_offset;
489
384
    if (_start_offset != 0) {
490
0
        return Status::InvalidArgument(
491
0
                "start offset of TFileRangeDesc must be zero in get parsered schema");
492
0
    }
493
384
    if (_params.file_type == TFileType::FILE_BROKER) {
494
0
        return Status::InternalError<false>(
495
0
                "Getting parsered schema from csv file do not support stream load and broker "
496
0
                "load.");
497
0
    }
498
499
    // csv file without names line and types line.
500
384
    _read_line = 1;
501
384
    _is_parse_name = false;
502
503
384
    if (_params.__isset.file_attributes && _params.file_attributes.__isset.header_type &&
504
384
        !_params.file_attributes.header_type.empty()) {
505
48
        std::string header_type = to_lower(_params.file_attributes.header_type);
506
48
        if (header_type == BeConsts::CSV_WITH_NAMES) {
507
25
            _is_parse_name = true;
508
25
        } else if (header_type == BeConsts::CSV_WITH_NAMES_AND_TYPES) {
509
23
            _read_line = 2;
510
23
            _is_parse_name = true;
511
23
        }
512
48
    }
513
514
384
    RETURN_IF_ERROR(_init_options());
515
384
    RETURN_IF_ERROR(_create_file_reader(true));
516
384
    RETURN_IF_ERROR(_create_decompressor());
517
384
    RETURN_IF_ERROR(_create_line_reader());
518
384
    return Status::OK();
519
384
}
520
521
Status CsvReader::get_parsed_schema(std::vector<std::string>* col_names,
522
384
                                    std::vector<DataTypePtr>* col_types) {
523
384
    if (_read_line == 1) {
524
361
        if (!_is_parse_name) { //parse csv file without names and types
525
336
            size_t col_nums = 0;
526
336
            RETURN_IF_ERROR(_parse_col_nums(&col_nums));
527
2.92k
            for (size_t i = 0; i < col_nums; ++i) {
528
2.58k
                col_names->emplace_back("c" + std::to_string(i + 1));
529
2.58k
            }
530
335
        } else { // parse csv file with names
531
25
            RETURN_IF_ERROR(_parse_col_names(col_names));
532
25
        }
533
534
3.04k
        for (size_t j = 0; j < col_names->size(); ++j) {
535
2.68k
            col_types->emplace_back(
536
2.68k
                    DataTypeFactory::instance().create_data_type(PrimitiveType::TYPE_STRING, true));
537
2.68k
        }
538
360
    } else { // parse csv file with names and types
539
23
        RETURN_IF_ERROR(_parse_col_names(col_names));
540
23
        RETURN_IF_ERROR(_parse_col_types(col_names->size(), col_types));
541
23
    }
542
383
    return Status::OK();
543
384
}
544
545
192M
Status CsvReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
546
192M
    auto& null_column = assert_cast<ColumnNullable&>(column);
547
192M
    if (_empty_field_as_null) {
548
60
        if (slice.size == 0) {
549
5
            null_column.insert_data(nullptr, 0);
550
5
            return Status::OK();
551
5
        }
552
60
    }
553
192M
    if (_options.null_len > 0 && !(_options.converted_from_string && slice.trim_double_quotes())) {
554
192M
        if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
555
30.6k
            null_column.insert_data(nullptr, 0);
556
30.6k
            return Status::OK();
557
30.6k
        }
558
192M
    }
559
192M
    static DataTypeStringSerDe stringSerDe(TYPE_STRING);
560
192M
    auto st = stringSerDe.deserialize_one_cell_from_csv(null_column.get_nested_column(), slice,
561
192M
                                                        _options);
562
192M
    if (!st.ok()) {
563
        // fill null if fail
564
0
        null_column.insert_data(nullptr, 0); // 0 is meaningless here
565
0
        return Status::OK();
566
0
    }
567
    // fill not null if success
568
192M
    null_column.get_null_map_data().push_back(0);
569
192M
    return Status::OK();
570
192M
}
571
572
2.37k
Status CsvReader::_init_options() {
573
    // get column_separator and line_delimiter
574
2.37k
    _value_separator = _params.file_attributes.text_params.column_separator;
575
2.37k
    _value_separator_length = _value_separator.size();
576
2.37k
    _line_delimiter = _params.file_attributes.text_params.line_delimiter;
577
2.37k
    _line_delimiter_length = _line_delimiter.size();
578
2.37k
    if (_params.file_attributes.text_params.__isset.enclose) {
579
2.37k
        _enclose = _params.file_attributes.text_params.enclose;
580
2.37k
    }
581
2.37k
    if (_params.file_attributes.text_params.__isset.escape) {
582
2.37k
        _escape = _params.file_attributes.text_params.escape;
583
2.37k
    }
584
585
2.37k
    _trim_tailing_spaces =
586
2.37k
            (_state != nullptr && _state->trim_tailing_spaces_for_external_table_query());
587
588
2.37k
    _options.escape_char = _escape;
589
2.37k
    _options.quote_char = _enclose;
590
591
2.37k
    if (_params.file_attributes.text_params.collection_delimiter.empty()) {
592
2.37k
        _options.collection_delim = ',';
593
2.37k
    } else {
594
1
        _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
595
1
    }
596
2.37k
    if (_params.file_attributes.text_params.mapkv_delimiter.empty()) {
597
2.37k
        _options.map_key_delim = ':';
598
2.37k
    } else {
599
1
        _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
600
1
    }
601
602
2.37k
    if (_params.file_attributes.text_params.__isset.null_format) {
603
0
        _options.null_format = _params.file_attributes.text_params.null_format.data();
604
0
        _options.null_len = _params.file_attributes.text_params.null_format.length();
605
0
    }
606
607
2.37k
    if (_params.file_attributes.__isset.trim_double_quotes) {
608
2.37k
        _trim_double_quotes = _params.file_attributes.trim_double_quotes;
609
2.37k
    }
610
2.37k
    _options.converted_from_string = _trim_double_quotes;
611
612
2.37k
    if (_state != nullptr) {
613
1.99k
        _keep_cr = _state->query_options().keep_carriage_return;
614
1.99k
    }
615
616
2.37k
    if (_params.file_attributes.text_params.__isset.empty_field_as_null) {
617
2.37k
        _empty_field_as_null = _params.file_attributes.text_params.empty_field_as_null;
618
2.37k
    }
619
2.37k
    return Status::OK();
620
2.37k
}
621
622
2.38k
Status CsvReader::_create_decompressor() {
623
2.38k
    if (_file_compress_type != TFileCompressType::UNKNOWN) {
624
2.29k
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_compress_type, &_decompressor));
625
2.29k
    } else {
626
86
        RETURN_IF_ERROR(Decompressor::create_decompressor(_file_format_type, &_decompressor));
627
86
    }
628
629
2.38k
    return Status::OK();
630
2.38k
}
631
632
2.38k
Status CsvReader::_create_file_reader(bool need_schema) {
633
2.38k
    if (_params.file_type == TFileType::FILE_STREAM) {
634
1.82k
        RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &_file_reader, _state,
635
1.82k
                                                        need_schema));
636
1.82k
    } else {
637
554
        _file_description.mtime = _range.__isset.modification_time ? _range.modification_time : 0;
638
554
        io::FileReaderOptions reader_options =
639
554
                FileFactory::get_reader_options(_state, _file_description);
640
554
        io::FileReaderSPtr file_reader;
641
554
        if (_io_ctx_holder) {
642
264
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
643
264
                    _profile, _system_properties, _file_description, reader_options,
644
264
                    io::DelegateReader::AccessMode::SEQUENTIAL,
645
264
                    std::static_pointer_cast<const io::IOContext>(_io_ctx_holder),
646
264
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
647
290
        } else {
648
290
            file_reader = DORIS_TRY(io::DelegateReader::create_file_reader(
649
290
                    _profile, _system_properties, _file_description, reader_options,
650
290
                    io::DelegateReader::AccessMode::SEQUENTIAL, _io_ctx,
651
290
                    io::PrefetchRange(_range.start_offset, _range.start_offset + _range.size)));
652
290
        }
653
554
        _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
654
552
                                                                         _io_ctx->file_reader_stats)
655
554
                               : file_reader;
656
554
    }
657
2.38k
    if (_file_reader->size() == 0 && _params.file_type != TFileType::FILE_STREAM &&
658
2.38k
        _params.file_type != TFileType::FILE_BROKER) {
659
0
        return Status::EndOfFile("init reader failed, empty csv file: " + _range.path);
660
0
    }
661
2.38k
    return Status::OK();
662
2.38k
}
663
664
2.37k
Status CsvReader::_create_line_reader() {
665
2.37k
    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
666
2.37k
    if (_enclose == 0) {
667
2.28k
        text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(
668
2.28k
                _line_delimiter, _line_delimiter_length, _keep_cr);
669
2.28k
        _fields_splitter = std::make_unique<PlainCsvTextFieldSplitter>(
670
2.28k
                _trim_tailing_spaces, false, _value_separator, _value_separator_length, -1);
671
672
2.28k
    } else {
673
        // in load task, the _file_slot_descs is empty vector, so we need to set col_sep_num to 0
674
96
        size_t col_sep_num = _file_slot_descs.size() > 1 ? _file_slot_descs.size() - 1 : 0;
675
96
        _enclose_reader_ctx = std::make_shared<EncloseCsvLineReaderCtx>(
676
96
                _line_delimiter, _line_delimiter_length, _value_separator, _value_separator_length,
677
96
                col_sep_num, _enclose, _escape, _keep_cr);
678
96
        text_line_reader_ctx = _enclose_reader_ctx;
679
680
96
        _fields_splitter = std::make_unique<EncloseCsvTextFieldSplitter>(
681
96
                _trim_tailing_spaces, true, _enclose_reader_ctx, _value_separator_length, _enclose);
682
96
    }
683
2.37k
    switch (_file_format_type) {
684
2.30k
    case TFileFormatType::FORMAT_CSV_PLAIN:
685
2.30k
        [[fallthrough]];
686
2.30k
    case TFileFormatType::FORMAT_CSV_GZ:
687
2.30k
        [[fallthrough]];
688
2.30k
    case TFileFormatType::FORMAT_CSV_BZ2:
689
2.30k
        [[fallthrough]];
690
2.30k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
691
2.30k
        [[fallthrough]];
692
2.30k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
693
2.30k
        [[fallthrough]];
694
2.30k
    case TFileFormatType::FORMAT_CSV_LZOP:
695
2.30k
        [[fallthrough]];
696
2.30k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
697
2.30k
        [[fallthrough]];
698
2.30k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
699
2.30k
        _line_reader =
700
2.30k
                NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
701
2.30k
                                                      text_line_reader_ctx, _size, _start_offset);
702
703
2.30k
        break;
704
77
    case TFileFormatType::FORMAT_PROTO:
705
77
        _fields_splitter = std::make_unique<CsvProtoFieldSplitter>();
706
77
        _line_reader = NewPlainBinaryLineReader::create_unique(_file_reader);
707
77
        break;
708
0
    default:
709
0
        return Status::InternalError<false>(
710
0
                "Unknown format type, cannot init line reader in csv reader, type={}",
711
0
                _file_format_type);
712
2.37k
    }
713
2.37k
    return Status::OK();
714
2.37k
}
715
716
300
Status CsvReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
717
300
    return serde->deserialize_one_cell_from_csv(column, slice, _options);
718
300
}
719
720
Status CsvReader::_fill_dest_columns(const Slice& line, Block* block,
721
11.8M
                                     std::vector<MutableColumnPtr>& columns, size_t* rows) {
722
11.8M
    bool is_success = false;
723
724
11.8M
    RETURN_IF_ERROR(_line_split_to_values(line, &is_success));
725
11.8M
    if (UNLIKELY(!is_success)) {
726
        // If not success, which means we met an invalid row, filter this row and return.
727
539
        return Status::OK();
728
539
    }
729
730
201M
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
731
189M
        int col_idx = _col_idxs[i];
732
        // col idx is out of range, fill with null format
733
189M
        auto value = col_idx < _split_values.size()
734
190M
                             ? _split_values[col_idx]
735
18.4E
                             : Slice(_options.null_format, _options.null_len);
736
737
189M
        IColumn* col_ptr = columns[i].get();
738
189M
        if (!_is_load) {
739
            // block is a Block*, and get_by_position returns a ColumnPtr,
740
            // which is a const pointer. Therefore, using const_cast is permissible.
741
20.5M
            col_ptr = const_cast<IColumn*>(
742
20.5M
                    block->get_by_position(_file_slot_idx_map[i]).column.get());
743
20.5M
        }
744
745
195M
        if (_use_nullable_string_opt[i]) {
746
            // For load task, we always read "string" from file.
747
            // So serdes[i] here must be DataTypeNullableSerDe, and DataTypeNullableSerDe -> nested_serde must be DataTypeStringSerDe.
748
            // So we use deserialize_nullable_string and stringSerDe to reduce virtual function calls.
749
195M
            RETURN_IF_ERROR(_deserialize_nullable_string(*col_ptr, value));
750
18.4E
        } else {
751
18.4E
            RETURN_IF_ERROR(_deserialize_one_cell(_serdes[i], *col_ptr, value));
752
18.4E
        }
753
189M
    }
754
11.8M
    ++(*rows);
755
756
11.8M
    return Status::OK();
757
11.8M
}
758
759
Status CsvReader::_fill_empty_line(Block* block, std::vector<MutableColumnPtr>& columns,
760
0
                                   size_t* rows) {
761
0
    for (int i = 0; i < _file_slot_descs.size(); ++i) {
762
0
        IColumn* col_ptr = columns[i].get();
763
0
        if (!_is_load) {
764
            // block is a Block*, and get_by_position returns a ColumnPtr,
765
            // which is a const pointer. Therefore, using const_cast is permissible.
766
0
            col_ptr = const_cast<IColumn*>(
767
0
                    block->get_by_position(_file_slot_idx_map[i]).column.get());
768
0
        }
769
0
        auto& null_column = assert_cast<ColumnNullable&>(*col_ptr);
770
0
        null_column.insert_data(nullptr, 0);
771
0
    }
772
0
    ++(*rows);
773
0
    return Status::OK();
774
0
}
775
776
11.8M
Status CsvReader::_validate_line(const Slice& line, bool* success) {
777
11.8M
    if (!_is_proto_format && !validate_utf8(_params, line.data, line.size)) {
778
128
        if (!_is_load) {
779
0
            return Status::InternalError<false>("Only support csv data in utf8 codec");
780
128
        } else {
781
128
            _counter->num_rows_filtered++;
782
128
            *success = false;
783
128
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
784
128
                    [&]() -> std::string { return std::string(line.data, line.size); },
785
128
                    [&]() -> std::string {
786
128
                        return "Invalid file encoding: all CSV files must be UTF-8 encoded";
787
128
                    }));
788
128
            return Status::OK();
789
128
        }
790
128
    }
791
11.8M
    *success = true;
792
11.8M
    return Status::OK();
793
11.8M
}
794
795
11.8M
Status CsvReader::_line_split_to_values(const Slice& line, bool* success) {
796
11.8M
    _split_line(line);
797
798
11.8M
    if (_is_load) {
799
        // Only check for load task. For query task, the non exist column will be filled "null".
800
        // if actual column number in csv file is not equal to _file_slot_descs.size()
801
        // then filter this line.
802
10.6M
        bool ignore_col = false;
803
10.6M
        ignore_col = _params.__isset.file_attributes &&
804
10.6M
                     _params.file_attributes.__isset.ignore_csv_redundant_col &&
805
10.6M
                     _params.file_attributes.ignore_csv_redundant_col;
806
807
10.6M
        if ((!ignore_col && _split_values.size() != _file_slot_descs.size()) ||
808
10.6M
            (ignore_col && _split_values.size() < _file_slot_descs.size())) {
809
544
            _counter->num_rows_filtered++;
810
544
            *success = false;
811
544
            RETURN_IF_ERROR(_state->append_error_msg_to_file(
812
544
                    [&]() -> std::string { return std::string(line.data, line.size); },
813
544
                    [&]() -> std::string {
814
544
                        fmt::memory_buffer error_msg;
815
544
                        fmt::format_to(error_msg,
816
544
                                       "Column count mismatch: expected {}, but found {}",
817
544
                                       _file_slot_descs.size(), _split_values.size());
818
544
                        std::string escaped_separator =
819
544
                                std::regex_replace(_value_separator, std::regex("\t"), "\\t");
820
544
                        std::string escaped_delimiter =
821
544
                                std::regex_replace(_line_delimiter, std::regex("\n"), "\\n");
822
544
                        fmt::format_to(error_msg, " (sep:{} delim:{}", escaped_separator,
823
544
                                       escaped_delimiter);
824
544
                        if (_enclose != 0) {
825
544
                            fmt::format_to(error_msg, " encl:{}", _enclose);
826
544
                        }
827
544
                        if (_escape != 0) {
828
544
                            fmt::format_to(error_msg, " esc:{}", _escape);
829
544
                        }
830
544
                        fmt::format_to(error_msg, ")");
831
544
                        return fmt::to_string(error_msg);
832
544
                    }));
833
539
            return Status::OK();
834
544
        }
835
10.6M
    }
836
837
11.8M
    *success = true;
838
11.8M
    return Status::OK();
839
11.8M
}
840
841
11.8M
void CsvReader::_split_line(const Slice& line) {
842
11.8M
    _split_values.clear();
843
11.8M
    _fields_splitter->split_line(line, &_split_values);
844
11.8M
}
845
846
336
Status CsvReader::_parse_col_nums(size_t* col_nums) {
847
336
    const uint8_t* ptr = nullptr;
848
336
    size_t size = 0;
849
336
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
850
336
    if (size == 0) {
851
0
        return Status::InternalError<false>(
852
0
                "The first line is empty, can not parse column numbers");
853
0
    }
854
336
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
855
1
        return Status::InternalError<false>("Only support csv data in utf8 codec");
856
1
    }
857
335
    ptr = _remove_bom(ptr, size);
858
335
    _split_line(Slice(ptr, size));
859
335
    *col_nums = _split_values.size();
860
335
    return Status::OK();
861
336
}
862
863
48
Status CsvReader::_parse_col_names(std::vector<std::string>* col_names) {
864
48
    const uint8_t* ptr = nullptr;
865
48
    size_t size = 0;
866
    // no use of _line_reader_eof
867
48
    RETURN_IF_ERROR(_line_reader->read_line(&ptr, &size, &_line_reader_eof, _io_ctx));
868
48
    if (size == 0) {
869
0
        return Status::InternalError<false>("The first line is empty, can not parse column names");
870
0
    }
871
48
    if (!validate_utf8(_params, reinterpret_cast<const char*>(ptr), size)) {
872
0
        return Status::InternalError<false>("Only support csv data in utf8 codec");
873
0
    }
874
48
    ptr = _remove_bom(ptr, size);
875
48
    _split_line(Slice(ptr, size));
876
187
    for (auto _split_value : _split_values) {
877
187
        col_names->emplace_back(_split_value.to_string());
878
187
    }
879
48
    return Status::OK();
880
48
}
881
882
// TODO(ftw): parse type
883
23
Status CsvReader::_parse_col_types(size_t col_nums, std::vector<DataTypePtr>* col_types) {
884
    // delete after.
885
113
    for (size_t i = 0; i < col_nums; ++i) {
886
90
        col_types->emplace_back(make_nullable(std::make_shared<DataTypeString>()));
887
90
    }
888
889
    // 1. check _line_reader_eof
890
    // 2. read line
891
    // 3. check utf8
892
    // 4. check size
893
    // 5. check _split_values.size must equal to col_nums.
894
    // 6. fill col_types
895
23
    return Status::OK();
896
23
}
897
898
4.95k
const uint8_t* CsvReader::_remove_bom(const uint8_t* ptr, size_t& size) {
899
4.95k
    if (size >= 3 && ptr[0] == 0xEF && ptr[1] == 0xBB && ptr[2] == 0xBF) {
900
7
        LOG(INFO) << "remove bom";
901
7
        constexpr size_t bom_size = 3;
902
7
        size -= bom_size;
903
        // In enclose mode, column_sep_positions were computed on the original line
904
        // (including BOM). After shifting the pointer, we must adjust those positions
905
        // so they remain correct relative to the new start.
906
7
        if (_enclose_reader_ctx) {
907
1
            _enclose_reader_ctx->adjust_column_sep_positions(bom_size);
908
1
        }
909
7
        return ptr + bom_size;
910
7
    }
911
4.94k
    return ptr;
912
4.95k
}
913
914
1.99k
Status CsvReader::close() {
915
1.99k
    if (_line_reader) {
916
1.99k
        _line_reader->close();
917
1.99k
    }
918
919
1.99k
    if (_file_reader) {
920
1.99k
        RETURN_IF_ERROR(_file_reader->close());
921
1.99k
    }
922
923
1.99k
    return Status::OK();
924
1.99k
}
925
926
} // namespace doris