Coverage Report

Created: 2026-06-25 12:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner_v2.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <algorithm>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <utility>
29
30
#include "common/cast_set.h"
31
#include "common/config.h"
32
#include "common/consts.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column.h"
37
#include "core/data_type/data_type.h"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type_serde/data_type_serde.h"
40
#include "core/string_ref.h"
41
#include "exec/common/util.hpp"
42
#include "exec/operator/scan_operator.h"
43
#include "exec/scan/access_path_parser.h"
44
#include "exprs/vexpr.h"
45
#include "exprs/vexpr_context.h"
46
#include "exprs/vslot_ref.h"
47
#include "format/format_common.h"
48
#include "format_v2/column_mapper.h"
49
#include "format_v2/jni/iceberg_sys_table_reader.h"
50
#include "format_v2/jni/jdbc_reader.h"
51
#include "format_v2/table/hive_reader.h"
52
#include "format_v2/table/hudi_reader.h"
53
#include "format_v2/table/iceberg_reader.h"
54
#include "format_v2/table/paimon_reader.h"
55
#include "format_v2/table_reader.h"
56
#include "io/fs/file_meta_cache.h"
57
#include "io/io_common.h"
58
#include "runtime/descriptors.h"
59
#include "runtime/exec_env.h"
60
#include "runtime/runtime_state.h"
61
#include "service/backend_options.h"
62
#include "storage/id_manager.h"
63
64
namespace doris {
65
namespace {
66
67
31
std::string table_format_name(const TFileRangeDesc& range) {
68
31
    return range.__isset.table_format_params ? range.table_format_params.table_format_type
69
31
                                             : "NotSet";
70
31
}
71
72
TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params,
73
34
                                            const TFileRangeDesc& range) {
74
34
    return range.__isset.format_type ? range.format_type : params.format_type;
75
34
}
76
77
28
bool is_supported_table_format(const TFileRangeDesc& range) {
78
28
    const auto table_format = table_format_name(range);
79
28
    if (table_format == "hudi" && range.__isset.table_format_params &&
80
28
        range.table_format_params.__isset.hudi_params &&
81
28
        range.table_format_params.hudi_params.__isset.delta_logs &&
82
28
        !range.table_format_params.hudi_params.delta_logs.empty()) {
83
        // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path.
84
        // FileScannerV2 currently supports native Parquet data files only.
85
1
        return false;
86
1
    }
87
27
    return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" ||
88
27
           table_format == "iceberg" || table_format == "paimon" || table_format == "hudi";
89
28
}
90
91
3
bool is_supported_jni_table_format(const TFileRangeDesc& range) {
92
3
    const auto table_format = table_format_name(range);
93
3
    if (table_format == "paimon") {
94
0
        return range.__isset.table_format_params &&
95
0
               range.table_format_params.__isset.paimon_params &&
96
0
               range.table_format_params.paimon_params.__isset.reader_type &&
97
0
               range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI;
98
0
    }
99
3
    return table_format == "jdbc" || table_format == "iceberg";
100
3
}
101
102
15
bool is_csv_format(TFileFormatType::type format_type) {
103
15
    switch (format_type) {
104
2
    case TFileFormatType::FORMAT_CSV_PLAIN:
105
3
    case TFileFormatType::FORMAT_CSV_GZ:
106
4
    case TFileFormatType::FORMAT_CSV_BZ2:
107
5
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
108
6
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
109
7
    case TFileFormatType::FORMAT_CSV_LZOP:
110
8
    case TFileFormatType::FORMAT_CSV_DEFLATE:
111
9
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
112
10
    case TFileFormatType::FORMAT_PROTO:
113
10
        return true;
114
5
    default:
115
5
        return false;
116
15
    }
117
15
}
118
119
5
bool is_text_format(TFileFormatType::type format_type) {
120
5
    return format_type == TFileFormatType::FORMAT_TEXT;
121
5
}
122
123
6
bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
124
6
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
125
6
        column_name == BeConsts::ICEBERG_ROWID_COL) {
126
2
        return false;
127
2
    }
128
4
    return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY
129
4
                                      : !slot_info.is_file_slot;
130
6
}
131
132
8
bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
133
8
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
134
8
        column_name == BeConsts::ICEBERG_ROWID_COL) {
135
2
        return false;
136
2
    }
137
    // CSV and other non-self-describing formats need FE slot descriptors for only the columns that
138
    // are physically read from the file. Partition/default/virtual columns stay in TableReader's
139
    // mapping layer and are materialized after the file-local block is read. New FE provides an
140
    // explicit category; old FE falls back to `is_file_slot`.
141
6
    if (slot_info.__isset.category) {
142
4
        return slot_info.category == TColumnCategory::REGULAR ||
143
4
               slot_info.category == TColumnCategory::GENERATED;
144
4
    }
145
2
    return slot_info.is_file_slot;
146
6
}
147
148
Status rewrite_slot_refs_to_global_index(
149
        VExprSPtr* expr,
150
4
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
151
4
    DORIS_CHECK(expr != nullptr);
152
4
    if (*expr == nullptr) {
153
0
        return Status::OK();
154
0
    }
155
4
    if ((*expr)->is_slot_ref()) {
156
3
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get());
157
3
        const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id());
158
3
        if (global_index_it == slot_id_to_global_index.end()) {
159
1
            DORIS_CHECK(slot_ref->slot_id() >= 0);
160
1
            const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id()));
161
1
            *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
162
1
                                            cast_set<int>(global_index.value()), -1,
163
1
                                            slot_ref->data_type(), slot_ref->column_name());
164
1
            RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
165
1
            return Status::OK();
166
1
        }
167
2
        const auto global_index = global_index_it->second;
168
2
        *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
169
2
                                        cast_set<int>(global_index.value()), -1,
170
2
                                        slot_ref->data_type(), slot_ref->column_name());
171
2
        RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
172
2
        return Status::OK();
173
2
    }
174
1
    auto children = (*expr)->children();
175
1
    for (auto& child : children) {
176
1
        if (child == nullptr) {
177
0
            continue;
178
0
        }
179
1
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index));
180
1
    }
181
1
    (*expr)->set_children(std::move(children));
182
1
    return Status::OK();
183
1
}
184
185
} // namespace
186
187
#ifdef BE_TEST
188
Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type,
189
13
                                          format::FileFormat* file_format) {
190
13
    return _to_file_format(format_type, file_format);
191
13
}
192
193
bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info,
194
6
                                           const std::string& column_name) {
195
6
    return is_partition_slot(slot_info, column_name);
196
6
}
197
198
bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info,
199
8
                                           const std::string& column_name) {
200
8
    return is_data_file_slot(slot_info, column_name);
201
8
}
202
203
Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index(
204
        VExprSPtr* expr,
205
3
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
206
3
    return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index);
207
3
}
208
#endif
209
210
34
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
211
34
    const auto format_type = get_range_format_type(params, range);
212
34
    if (format_type == TFileFormatType::FORMAT_PARQUET) {
213
16
        return is_supported_table_format(range);
214
18
    } else if (format_type == TFileFormatType::FORMAT_JNI) {
215
3
        return is_supported_jni_table_format(range);
216
15
    } else if (is_csv_format(format_type) || is_text_format(format_type)) {
217
12
        return is_supported_table_format(range);
218
12
    } else {
219
3
        LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2";
220
3
        return false;
221
3
    }
222
34
}
223
224
FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
225
                             std::shared_ptr<SplitSourceConnector> split_source,
226
                             RuntimeProfile* profile, ShardedKVCache* kv_cache,
227
                             const std::unordered_map<std::string, int>* colname_to_slot_id)
228
0
        : Scanner(state, local_state, limit, profile),
229
0
          _split_source(std::move(split_source)),
230
0
          _kv_cache(kv_cache) {
231
0
    (void)colname_to_slot_id;
232
0
    if (state->get_query_ctx() != nullptr &&
233
0
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
234
0
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
235
0
    } else {
236
0
        _params = _split_source->get_params();
237
0
    }
238
0
}
239
240
0
Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
241
0
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
242
0
    _get_block_timer =
243
0
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1);
244
0
    _file_counter =
245
0
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
246
0
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
247
0
                                                      "FileReadBytes", TUnit::BYTES, 1);
248
0
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
249
0
                                                      "FileReadCalls", TUnit::UNIT, 1);
250
0
    _file_read_time_counter =
251
0
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1);
252
0
    _file_cache_statistics = std::make_unique<io::FileCacheStatistics>();
253
0
    _file_reader_stats = std::make_unique<io::FileReaderStats>();
254
0
    RETURN_IF_ERROR(_init_io_ctx());
255
0
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
256
0
    _io_ctx->file_reader_stats = _file_reader_stats.get();
257
0
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
258
0
    return Status::OK();
259
0
}
260
261
0
Status FileScannerV2::_open_impl(RuntimeState* state) {
262
0
    RETURN_IF_CANCELLED(state);
263
0
    RETURN_IF_ERROR(Scanner::_open_impl(state));
264
0
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
265
0
    if (_first_scan_range) {
266
0
        RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader));
267
0
        DORIS_CHECK(_table_reader != nullptr);
268
0
        RETURN_IF_ERROR(_init_expr_ctxes());
269
0
        RETURN_IF_ERROR(_init_table_reader(_current_range));
270
0
    }
271
0
    return Status::OK();
272
0
}
273
274
0
Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
275
0
    while (true) {
276
0
        RETURN_IF_CANCELLED(state);
277
0
        if (!_has_prepared_split) {
278
0
            RETURN_IF_ERROR(_prepare_next_split(eof));
279
0
            if (*eof) {
280
0
                return Status::OK();
281
0
            }
282
0
        }
283
284
0
        {
285
0
            SCOPED_TIMER(_get_block_timer);
286
0
            RETURN_IF_ERROR(_table_reader->get_block(block, eof));
287
0
        }
288
0
        if (*eof) {
289
0
            _state->update_num_finished_scan_range(1);
290
0
            _has_prepared_split = false;
291
0
            *eof = false;
292
0
            continue;
293
0
        }
294
0
        return Status::OK();
295
0
    }
296
0
}
297
298
0
Status FileScannerV2::_prepare_next_split(bool* eos) {
299
0
    bool has_next = _first_scan_range;
300
0
    if (!_first_scan_range) {
301
0
        RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
302
0
    }
303
0
    _first_scan_range = false;
304
0
    if (!has_next || _should_stop) {
305
0
        *eos = true;
306
0
        return Status::OK();
307
0
    }
308
0
    DORIS_CHECK(_table_reader != nullptr);
309
0
    _current_range_path = _current_range.path;
310
0
    RETURN_IF_ERROR(_prepare_table_reader_split(_current_range));
311
0
    COUNTER_UPDATE(_file_counter, 1);
312
0
    _has_prepared_split = true;
313
0
    *eos = false;
314
0
    return Status::OK();
315
0
}
316
317
0
Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) {
318
0
    const auto format_type = get_range_format_type(*_params, range);
319
0
    format::FileFormat file_format;
320
0
    RETURN_IF_ERROR(_to_file_format(format_type, &file_format));
321
0
    DORIS_CHECK(_table_reader != nullptr);
322
323
0
    format::TableColumnPredicates table_column_predicates;
324
0
    RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates));
325
0
    VExprContextSPtrs table_conjuncts;
326
0
    RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts));
327
0
    RETURN_IF_ERROR(_table_reader->init({
328
0
            .projected_columns = _projected_columns,
329
0
            .column_predicates = std::move(table_column_predicates),
330
0
            .conjuncts = std::move(table_conjuncts),
331
0
            .format = file_format,
332
0
            .scan_params = const_cast<TFileScanRangeParams*>(_params),
333
0
            .io_ctx = _io_ctx,
334
0
            .runtime_state = _state,
335
0
            .scanner_profile = _local_state->scanner_profile(),
336
0
            .file_slot_descs = &_file_slot_descs,
337
0
            .push_down_agg_type = _local_state->get_push_down_agg_type(),
338
0
            .condition_cache_digest = _local_state->get_condition_cache_digest(),
339
0
    }));
340
0
    return Status::OK();
341
0
}
342
343
Status FileScannerV2::_create_table_reader_for_format(
344
0
        const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const {
345
0
    DORIS_CHECK(reader != nullptr);
346
0
    const auto table_format = table_format_name(range);
347
0
    if (table_format == "NotSet" || table_format == "tvf") {
348
0
        *reader = std::make_unique<format::TableReader>();
349
0
    } else if (table_format == "hive") {
350
0
        *reader = format::hive::HiveReader::create_unique();
351
0
    } else if (table_format == "iceberg") {
352
0
        if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) {
353
0
            *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>();
354
0
        } else {
355
0
            *reader = std::make_unique<format::iceberg::IcebergTableReader>();
356
0
        }
357
0
    } else if (table_format == "paimon") {
358
0
        *reader = std::make_unique<format::paimon::PaimonHybridReader>();
359
0
    } else if (table_format == "hudi") {
360
0
        *reader = format::hudi::HudiReader::create_unique();
361
0
    } else if (table_format == "jdbc") {
362
0
        *reader = std::make_unique<format::jdbc::JdbcJniReader>();
363
0
    } else {
364
0
        return Status::NotSupported("FileScannerV2 does not support table format {}", table_format);
365
0
    }
366
0
    return Status::OK();
367
0
}
368
369
0
Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) {
370
0
    std::map<std::string, Field> partition_values;
371
0
    RETURN_IF_ERROR(_generate_partition_values(range, &partition_values));
372
0
    RETURN_IF_ERROR(_table_reader->prepare_split({
373
0
            .partition_values = std::move(partition_values),
374
0
            .cache = _kv_cache,
375
0
            .current_range = range,
376
0
            .global_rowid_context = _create_global_rowid_context(range),
377
0
    }));
378
0
    return Status::OK();
379
0
}
380
381
0
bool FileScannerV2::_should_enable_file_meta_cache() const {
382
0
    return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
383
0
           _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
384
0
}
385
386
std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context(
387
0
        const TFileRangeDesc& range) const {
388
0
    if (!_need_global_rowid_column) {
389
0
        return std::nullopt;
390
0
    }
391
0
    auto& id_file_map = _state->get_id_file_map();
392
0
    DORIS_CHECK(id_file_map != nullptr);
393
0
    const auto file_id = id_file_map->get_file_mapping_id(
394
0
            std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(),
395
0
                                          range, _should_enable_file_meta_cache()));
396
0
    return format::GlobalRowIdContext {
397
0
            .version = IdManager::ID_VERSION,
398
0
            .backend_id = BackendOptions::get_backend_id(),
399
0
            .file_id = file_id,
400
0
    };
401
0
}
402
403
Status FileScannerV2::_generate_partition_values(
404
0
        const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const {
405
0
    DORIS_CHECK(partition_values != nullptr);
406
0
    partition_values->clear();
407
0
    if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) {
408
0
        return Status::OK();
409
0
    }
410
0
    DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size());
411
0
    for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) {
412
0
        const auto& key = range.columns_from_path_keys[idx];
413
0
        const auto it = _partition_slot_descs.find(key);
414
0
        if (it == _partition_slot_descs.end()) {
415
0
            continue;
416
0
        }
417
0
        const auto& value = range.columns_from_path[idx];
418
0
        const bool is_null = range.__isset.columns_from_path_is_null &&
419
0
                             idx < range.columns_from_path_is_null.size() &&
420
0
                             range.columns_from_path_is_null[idx];
421
0
        Field field;
422
0
        DORIS_CHECK(it->second.slot_desc != nullptr);
423
0
        RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field));
424
0
        partition_values->emplace(it->second.canonical_name, std::move(field));
425
0
    }
426
0
    return Status::OK();
427
0
}
428
429
Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc,
430
                                             const std::string& value, bool is_null,
431
0
                                             Field* field) const {
432
0
    DORIS_CHECK(slot_desc != nullptr);
433
0
    DORIS_CHECK(field != nullptr);
434
0
    if (is_null) {
435
0
        *field = Field::create_field<TYPE_NULL>(Null());
436
0
        return Status::OK();
437
0
    }
438
0
    const auto data_type = remove_nullable(slot_desc->get_data_type_ptr());
439
0
    auto column = data_type->create_column();
440
0
    auto serde = data_type->get_serde();
441
0
    DataTypeSerDe::FormatOptions options;
442
0
    options.converted_from_string = true;
443
0
    StringRef ref(value.data(), value.size());
444
0
    RETURN_IF_ERROR(serde->from_string(ref, *column, options));
445
0
    DORIS_CHECK(column->size() == 1);
446
0
    *field = (*column)[0];
447
0
    return Status::OK();
448
0
}
449
450
0
Status FileScannerV2::_init_expr_ctxes() {
451
0
    _slot_id_to_desc.clear();
452
0
    _slot_id_to_global_index.clear();
453
0
    _partition_slot_descs.clear();
454
0
    _file_slot_descs.clear();
455
0
    for (const auto* slot_desc : _output_tuple_desc->slots()) {
456
0
        _slot_id_to_desc.emplace(slot_desc->id(), slot_desc);
457
0
    }
458
0
    DORIS_CHECK(_table_reader != nullptr);
459
0
    RETURN_IF_ERROR(_build_projected_columns(*_table_reader));
460
0
    return Status::OK();
461
0
}
462
463
0
Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) {
464
0
    _projected_columns.clear();
465
0
    _projected_columns.reserve(_params->required_slots.size());
466
0
    _need_global_rowid_column = false;
467
0
    format::ProjectedColumnBuildContext build_context {
468
0
            .scan_params = _params,
469
0
            .range = &_current_range,
470
0
            .runtime_state = _state,
471
0
    };
472
473
0
    for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) {
474
0
        const auto& slot_info = _params->required_slots[slot_idx];
475
0
        const auto it = _slot_id_to_desc.find(slot_info.slot_id);
476
0
        if (it == _slot_id_to_desc.end()) {
477
0
            return Status::InternalError("Unknown source slot descriptor, slot_id={}",
478
0
                                         slot_info.slot_id);
479
0
        }
480
0
        auto column = _build_table_column(it->second);
481
0
        if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
482
0
            _need_global_rowid_column = true;
483
0
        }
484
0
        RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
485
0
        build_context.schema_column.reset();
486
0
        RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column));
487
        // Build nested children from access paths generated by the slot's access-path
488
        // expressions. A projected column can therefore contain only a subset of the schema
489
        // column's nested children.
490
0
        RETURN_IF_ERROR(AccessPathParser::build_nested_children(
491
0
                &column, it->second,
492
0
                build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr));
493
0
        if (is_partition_slot(slot_info, column.name)) {
494
0
            column.is_partition_key = true;
495
0
            _partition_slot_descs.emplace(
496
0
                    column.name,
497
0
                    PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
498
0
            for (const auto& alias : column.name_mapping) {
499
0
                _partition_slot_descs.emplace(
500
0
                        alias,
501
0
                        PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
502
0
            }
503
0
        } else if (is_data_file_slot(slot_info, column.name)) {
504
0
            _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second));
505
0
        }
506
0
        const auto global_index = format::GlobalIndex(slot_idx);
507
0
        _slot_id_to_global_index.emplace(slot_info.slot_id, global_index);
508
0
        _projected_columns.push_back(std::move(column));
509
0
    }
510
0
    RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context));
511
0
    return Status::OK();
512
0
}
513
514
Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info,
515
0
                                          VExprContextSPtr* ctx) const {
516
0
    DORIS_CHECK(ctx != nullptr);
517
0
    if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
518
0
        return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx);
519
0
    }
520
521
0
    if (_params->__isset.default_value_of_src_slot) {
522
0
        const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id);
523
0
        if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) {
524
0
            return VExpr::create_expr_tree(it->second, *ctx);
525
0
        }
526
0
    }
527
0
    return Status::OK();
528
0
}
529
530
0
format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) {
531
0
    DORIS_CHECK(slot_desc != nullptr);
532
0
    format::ColumnDefinition column;
533
    // TODO(gabriel): why always BY_NAME here?
534
0
    column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name());
535
0
    column.name = slot_desc->col_name();
536
0
    column.type = slot_desc->get_data_type_ptr();
537
0
    return column;
538
0
}
539
540
Status FileScannerV2::_build_table_column_predicates(
541
0
        format::TableColumnPredicates* predicates) const {
542
0
    DORIS_CHECK(predicates != nullptr);
543
0
    predicates->clear();
544
0
    const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates;
545
0
    for (const auto& [slot_id, slot_predicate_list] : slot_predicates) {
546
0
        const auto it = _slot_id_to_desc.find(slot_id);
547
0
        if (it == _slot_id_to_desc.end()) {
548
0
            continue;
549
0
        }
550
0
        const auto global_index_it = _slot_id_to_global_index.find(slot_id);
551
0
        if (global_index_it == _slot_id_to_global_index.end()) {
552
0
            continue;
553
0
        }
554
0
        (*predicates)[global_index_it->second] = slot_predicate_list;
555
0
    }
556
0
    return Status::OK();
557
0
}
558
559
0
Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const {
560
0
    DORIS_CHECK(conjuncts != nullptr);
561
0
    conjuncts->clear();
562
0
    conjuncts->reserve(_conjuncts.size());
563
0
    for (const auto& conjunct : _conjuncts) {
564
0
        VExprSPtr root;
565
0
        RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root));
566
0
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index));
567
0
        conjuncts->push_back(VExprContext::create_shared(std::move(root)));
568
0
    }
569
0
    return Status::OK();
570
0
}
571
572
0
TFileFormatType::type FileScannerV2::_get_current_format_type() const {
573
0
    return get_range_format_type(*_params, _current_range);
574
0
}
575
576
Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
577
13
                                      format::FileFormat* file_format) {
578
13
    DORIS_CHECK(file_format != nullptr);
579
13
    switch (format_type) {
580
1
    case TFileFormatType::FORMAT_PARQUET:
581
1
        *file_format = format::FileFormat::PARQUET;
582
1
        return Status::OK();
583
1
    case TFileFormatType::FORMAT_JNI:
584
1
        *file_format = format::FileFormat::JNI;
585
1
        return Status::OK();
586
1
    case TFileFormatType::FORMAT_CSV_PLAIN:
587
2
    case TFileFormatType::FORMAT_CSV_GZ:
588
3
    case TFileFormatType::FORMAT_CSV_BZ2:
589
4
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
590
5
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
591
6
    case TFileFormatType::FORMAT_CSV_LZOP:
592
7
    case TFileFormatType::FORMAT_CSV_DEFLATE:
593
8
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
594
9
    case TFileFormatType::FORMAT_PROTO:
595
9
        *file_format = format::FileFormat::CSV;
596
9
        return Status::OK();
597
1
    case TFileFormatType::FORMAT_TEXT:
598
1
        *file_format = format::FileFormat::TEXT;
599
1
        return Status::OK();
600
1
    default:
601
1
        return Status::NotSupported("FileScannerV2 does not support file format {}",
602
1
                                    to_string(format_type));
603
13
    }
604
13
}
605
606
0
Status FileScannerV2::_init_io_ctx() {
607
0
    _io_ctx = std::make_shared<io::IOContext>();
608
0
    _io_ctx->query_id = &_state->query_id();
609
0
    return Status::OK();
610
0
}
611
612
0
Status FileScannerV2::close(RuntimeState* state) {
613
0
    if (!_try_close()) {
614
0
        return Status::OK();
615
0
    }
616
0
    if (_table_reader != nullptr) {
617
0
        RETURN_IF_ERROR(_table_reader->close());
618
0
        _report_condition_cache_profile();
619
0
        _table_reader.reset();
620
0
    }
621
0
    return Scanner::close(state);
622
0
}
623
624
0
void FileScannerV2::try_stop() {
625
0
    Scanner::try_stop();
626
0
    if (_io_ctx) {
627
0
        _io_ctx->should_stop = true;
628
0
    }
629
0
}
630
631
0
void FileScannerV2::update_realtime_counters() {
632
0
    if (_file_reader_stats == nullptr) {
633
0
        return;
634
0
    }
635
0
    const int64_t bytes_read = _file_reader_stats->read_bytes;
636
0
    COUNTER_SET(_file_read_bytes_counter, bytes_read);
637
0
    COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
638
0
    COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
639
0
}
640
641
0
void FileScannerV2::_collect_profile_before_close() {
642
0
    _report_file_reader_predicate_filtered_rows();
643
0
    Scanner::_collect_profile_before_close();
644
0
    if (_file_reader_stats != nullptr) {
645
0
        COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes));
646
0
        COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
647
0
        COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
648
0
    }
649
    // Query profiles can be collected before Scanner::close() runs. Publish condition-cache
650
    // counters here as well, using deltas so this method and close() cannot double count.
651
0
    _report_condition_cache_profile();
652
0
}
653
654
0
void FileScannerV2::_report_file_reader_predicate_filtered_rows() {
655
0
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0;
656
0
    const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows;
657
0
    if (filtered_delta > 0) {
658
        // File readers can evaluate localized conjuncts before a block reaches Scanner. Count
659
        // those rows as scanner-level unselected rows so load statistics stay identical no matter
660
        // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block().
661
0
        _counter.num_rows_unselected += filtered_delta;
662
0
        _reported_predicate_filtered_rows = filtered_rows;
663
0
    }
664
0
}
665
666
0
void FileScannerV2::_report_condition_cache_profile() {
667
0
    auto* local_state = static_cast<FileScanLocalState*>(_local_state);
668
0
    const int64_t hit_count =
669
0
            _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0;
670
0
    const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count;
671
0
    if (hit_delta > 0) {
672
0
        COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta);
673
0
        _reported_condition_cache_hit_count = hit_count;
674
0
    }
675
0
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0;
676
0
    const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows;
677
0
    if (filtered_delta > 0) {
678
0
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta);
679
0
        _reported_condition_cache_filtered_rows = filtered_rows;
680
0
    }
681
0
}
682
683
} // namespace doris