Coverage Report

Created: 2026-06-13 22:56

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/text/text_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/text/text_reader.h"
19
20
#include <gen_cpp/PlanNodes_types.h>
21
#include <gen_cpp/Types_types.h>
22
#include <glog/logging.h>
23
24
#include <cstddef>
25
#include <utility>
26
#include <vector>
27
28
#include "common/compiler_util.h" // IWYU pragma: keep
29
#include "common/status.h"
30
#include "core/block/block.h"
31
#include "core/column/column_nullable.h"
32
#include "core/column/column_string.h"
33
#include "core/data_type_serde/data_type_string_serde.h"
34
#include "exec/scan/scanner.h"
35
#include "format/csv/csv_reader.h"
36
#include "format/file_reader/new_plain_text_line_reader.h"
37
#include "format/line_reader.h"
38
#include "io/file_factory.h"
39
#include "io/fs/buffered_reader.h"
40
#include "io/fs/file_reader.h"
41
#include "io/fs/s3_file_reader.h"
42
#include "runtime/descriptors.h"
43
#include "runtime/runtime_state.h"
44
45
namespace doris {
46
47
252k
void HiveTextFieldSplitter::do_split(const Slice& line, std::vector<Slice>* splitted_values) {
48
252k
    if (_value_sep_len == 1) {
49
252k
        _split_field_single_char(line, splitted_values);
50
18.4E
    } else {
51
18.4E
        _split_field_multi_char(line, splitted_values);
52
18.4E
    }
53
252k
}
54
55
void HiveTextFieldSplitter::_split_field_single_char(const Slice& line,
56
252k
                                                     std::vector<Slice>* splitted_values) {
57
252k
    const char* data = line.data;
58
252k
    const size_t size = line.size;
59
252k
    size_t value_start = 0;
60
135M
    for (size_t i = 0; i < size; ++i) {
61
135M
        if (data[i] == _value_sep[0]) {
62
            // hive will escape the field separator in string
63
9.35M
            if (_escape_char != 0 && i > 0 && data[i - 1] == _escape_char) {
64
53
                continue;
65
53
            }
66
9.35M
            process_value_func(data, value_start, i - value_start, _trimming_char, splitted_values);
67
9.35M
            value_start = i + _value_sep_len;
68
9.35M
        }
69
135M
    }
70
252k
    process_value_func(data, value_start, size - value_start, _trimming_char, splitted_values);
71
252k
}
72
73
void HiveTextFieldSplitter::_split_field_multi_char(const Slice& line,
74
17
                                                    std::vector<Slice>* splitted_values) {
75
17
    const char* data = line.data;
76
17
    const size_t size = line.size;
77
17
    size_t start = 0;
78
79
17
    std::vector<int> next(_value_sep_len);
80
17
    next[0] = -1;
81
50
    for (int i = 1, j = -1; i < (int)_value_sep_len; i++) {
82
33
        while (j >= 0 && _value_sep[i] != _value_sep[j + 1]) {
83
0
            j = next[j];
84
0
        }
85
33
        if (_value_sep[i] == _value_sep[j + 1]) {
86
22
            j++;
87
22
        }
88
33
        next[i] = j;
89
33
    }
90
91
    // KMP search
92
198
    for (int i = 0, j = -1; i < (int)size; i++) {
93
200
        while (j >= 0 && data[i] != _value_sep[j + 1]) {
94
19
            j = next[j];
95
19
        }
96
181
        if (data[i] == _value_sep[j + 1]) {
97
90
            j++;
98
90
        }
99
181
        if (j == (int)_value_sep_len - 1) {
100
34
            size_t curpos = i - _value_sep_len + 1;
101
34
            if (_escape_char != 0 && curpos > 0 && data[curpos - 1] == _escape_char) {
102
2
                j = next[j];
103
2
                continue;
104
2
            }
105
106
32
            if (curpos >= start) {
107
25
                process_value_func(data, start, curpos - start, _trimming_char, splitted_values);
108
25
                start = curpos + _value_sep_len;
109
25
            }
110
111
32
            j = next[j];
112
32
        }
113
181
    }
114
17
    process_value_func(data, start, size - start, _trimming_char, splitted_values);
115
17
}
116
117
TextReader::TextReader(RuntimeState* state, RuntimeProfile* profile, ScannerCounter* counter,
118
                       const TFileScanRangeParams& params, const TFileRangeDesc& range,
119
                       const std::vector<SlotDescriptor*>& file_slot_descs, size_t batch_size,
120
                       io::IOContext* io_ctx, std::shared_ptr<io::IOContext> io_ctx_holder)
121
4.60k
        : CsvReader(state, profile, counter, params, range, file_slot_descs, batch_size, io_ctx,
122
4.60k
                    std::move(io_ctx_holder)) {}
123
124
4.59k
Status TextReader::_init_options() {
125
    // get column_separator and line_delimiter
126
4.59k
    _value_separator = _params.file_attributes.text_params.column_separator;
127
4.59k
    _value_separator_length = _value_separator.size();
128
4.59k
    _line_delimiter = _params.file_attributes.text_params.line_delimiter;
129
4.59k
    _line_delimiter_length = _line_delimiter.size();
130
131
4.59k
    if (_params.file_attributes.text_params.__isset.escape) {
132
3.41k
        _escape = _params.file_attributes.text_params.escape;
133
3.41k
    }
134
4.59k
    _options.escape_char = _escape;
135
136
4.59k
    _options.collection_delim = _params.file_attributes.text_params.collection_delimiter[0];
137
4.59k
    _options.map_key_delim = _params.file_attributes.text_params.mapkv_delimiter[0];
138
139
4.59k
    _options.null_format = _params.file_attributes.text_params.null_format.data();
140
4.59k
    _options.null_len = _params.file_attributes.text_params.null_format.length();
141
142
4.59k
    return Status::OK();
143
4.59k
}
144
145
6.77M
Status TextReader::_deserialize_one_cell(DataTypeSerDeSPtr serde, IColumn& column, Slice& slice) {
146
6.77M
    return serde->deserialize_one_cell_from_hive_text(column, slice, _options);
147
6.77M
}
148
149
4.58k
Status TextReader::_create_line_reader() {
150
4.58k
    std::shared_ptr<TextLineReaderContextIf> text_line_reader_ctx;
151
152
4.58k
    text_line_reader_ctx = std::make_shared<PlainTextLineReaderCtx>(_line_delimiter,
153
4.58k
                                                                    _line_delimiter_length, false);
154
155
4.58k
    _fields_splitter = std::make_unique<HiveTextFieldSplitter>(
156
4.58k
            false, false, _value_separator, _value_separator_length, -1, _escape);
157
158
4.58k
    _line_reader =
159
4.58k
            NewPlainTextLineReader::create_unique(_profile, _file_reader, _decompressor.get(),
160
4.58k
                                                  text_line_reader_ctx, _size, _start_offset);
161
162
4.58k
    return Status::OK();
163
4.58k
}
164
165
391k
Status TextReader::_validate_line(const Slice& line, bool* success) {
166
    // text file do not need utf8 check
167
391k
    *success = true;
168
391k
    return Status::OK();
169
391k
}
170
171
1.62M
Status TextReader::_deserialize_nullable_string(IColumn& column, Slice& slice) {
172
    // Hot path of hive text load, see CsvReader::_deserialize_nullable_string. The
173
    // column type was verified by the checked assert_cast in
174
    // _reserve_nullable_string_columns at the beginning of the batch.
175
1.62M
    auto& null_column = assert_cast<ColumnNullable&, TypeCheckOnRelease::DISABLE>(column);
176
1.62M
    auto& string_column = assert_cast<ColumnString&, TypeCheckOnRelease::DISABLE>(
177
1.62M
            null_column.get_nested_column());
178
1.62M
    if (slice.compare(Slice(_options.null_format, _options.null_len)) == 0) {
179
1.04k
        string_column.insert_default();
180
1.04k
        null_column.get_null_map_data().push_back(1);
181
1.04k
        return Status::OK();
182
1.04k
    }
183
    // Same as DataTypeStringSerDe::deserialize_one_cell_from_hive_text (which never
184
    // fails), written out here to skip the SerDe layer and its per-cell assert_cast.
185
1.62M
    if (_options.escape_char != 0) {
186
982k
        escape_string(slice.data, &slice.size, _options.escape_char);
187
982k
    }
188
1.62M
    string_column.insert_data(slice.data, slice.size);
189
1.62M
    null_column.get_null_map_data().push_back(0);
190
1.62M
    return Status::OK();
191
1.62M
}
192
193
} // namespace doris