Coverage Report

Created: 2026-07-02 15:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner_v2.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <algorithm>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <utility>
29
30
#include "common/cast_set.h"
31
#include "common/config.h"
32
#include "common/consts.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column.h"
37
#include "core/data_type/data_type.h"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type_serde/data_type_serde.h"
40
#include "core/string_ref.h"
41
#include "exec/common/util.hpp"
42
#include "exec/operator/scan_operator.h"
43
#include "exec/scan/access_path_parser.h"
44
#include "exprs/runtime_filter_expr.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "exprs/vslot_ref.h"
48
#include "format/format_common.h"
49
#include "format_v2/column_mapper.h"
50
#include "format_v2/jni/iceberg_sys_table_reader.h"
51
#include "format_v2/jni/jdbc_reader.h"
52
#include "format_v2/jni/max_compute_jni_reader.h"
53
#include "format_v2/jni/trino_connector_jni_reader.h"
54
#include "format_v2/table/hive_reader.h"
55
#include "format_v2/table/hudi_reader.h"
56
#include "format_v2/table/iceberg_reader.h"
57
#include "format_v2/table/paimon_reader.h"
58
#include "format_v2/table/remote_doris_reader.h"
59
#include "format_v2/table_reader.h"
60
#include "io/fs/file_meta_cache.h"
61
#include "io/io_common.h"
62
#include "runtime/descriptors.h"
63
#include "runtime/exec_env.h"
64
#include "runtime/runtime_state.h"
65
#include "service/backend_options.h"
66
#include "storage/id_manager.h"
67
68
namespace doris {
69
namespace {
70
71
971
std::string table_format_name(const TFileRangeDesc& range) {
72
971
    return range.__isset.table_format_params ? range.table_format_params.table_format_type
73
971
                                             : "NotSet";
74
971
}
75
76
TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params,
77
2.00k
                                            const TFileRangeDesc& range) {
78
2.00k
    return range.__isset.format_type ? range.format_type : params.format_type;
79
2.00k
}
80
81
497
bool is_supported_table_format(const TFileRangeDesc& range) {
82
497
    const auto table_format = table_format_name(range);
83
497
    if (table_format == "hudi" && range.__isset.table_format_params &&
84
497
        range.table_format_params.__isset.hudi_params &&
85
497
        range.table_format_params.hudi_params.__isset.delta_logs &&
86
497
        !range.table_format_params.hudi_params.delta_logs.empty()) {
87
        // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path.
88
        // FileScannerV2 currently supports native Parquet data files only.
89
1
        return false;
90
1
    }
91
496
    return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" ||
92
496
           table_format == "iceberg" || table_format == "paimon" || table_format == "hudi";
93
497
}
94
95
3
bool is_supported_arrow_table_format(const TFileRangeDesc& range) {
96
3
    return table_format_name(range) == "remote_doris";
97
3
}
98
99
5
bool is_supported_jni_table_format(const TFileRangeDesc& range) {
100
5
    const auto table_format = table_format_name(range);
101
5
    if (table_format == "paimon") {
102
0
        return range.__isset.table_format_params &&
103
0
               range.table_format_params.__isset.paimon_params &&
104
0
               range.table_format_params.paimon_params.__isset.reader_type &&
105
0
               range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI;
106
0
    }
107
5
    return table_format == "jdbc" || table_format == "iceberg" || table_format == "hudi" ||
108
5
           table_format == "max_compute" || table_format == "trino_connector";
109
5
}
110
111
472
bool is_csv_format(TFileFormatType::type format_type) {
112
472
    switch (format_type) {
113
338
    case TFileFormatType::FORMAT_CSV_PLAIN:
114
339
    case TFileFormatType::FORMAT_CSV_GZ:
115
340
    case TFileFormatType::FORMAT_CSV_BZ2:
116
341
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
117
342
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
118
343
    case TFileFormatType::FORMAT_CSV_LZOP:
119
344
    case TFileFormatType::FORMAT_CSV_DEFLATE:
120
345
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
121
346
    case TFileFormatType::FORMAT_PROTO:
122
346
        return true;
123
126
    default:
124
126
        return false;
125
472
    }
126
472
}
127
128
126
bool is_text_format(TFileFormatType::type format_type) {
129
126
    return format_type == TFileFormatType::FORMAT_TEXT;
130
126
}
131
132
124
bool is_json_format(TFileFormatType::type format_type) {
133
124
    return format_type == TFileFormatType::FORMAT_JSON;
134
124
}
135
136
104
bool is_native_format(TFileFormatType::type format_type) {
137
104
    return format_type == TFileFormatType::FORMAT_NATIVE;
138
104
}
139
140
3.07k
bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
141
3.07k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
142
3.07k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
143
11
        return false;
144
11
    }
145
3.05k
    return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY
146
3.05k
                                      : !slot_info.is_file_slot;
147
3.07k
}
148
149
3.07k
bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
150
3.07k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
151
3.07k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
152
11
        return false;
153
11
    }
154
    // CSV and other non-self-describing formats need FE slot descriptors for only the columns that
155
    // are physically read from the file. Partition/default/virtual columns stay in TableReader's
156
    // mapping layer and are materialized after the file-local block is read. New FE provides an
157
    // explicit category; old FE falls back to `is_file_slot`.
158
3.06k
    if (slot_info.__isset.category) {
159
3.05k
        return slot_info.category == TColumnCategory::REGULAR ||
160
3.05k
               slot_info.category == TColumnCategory::GENERATED;
161
3.05k
    }
162
2
    return slot_info.is_file_slot;
163
3.06k
}
164
165
Status rewrite_slot_refs_to_global_index(
166
        VExprSPtr* expr,
167
29
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
168
29
    DORIS_CHECK(expr != nullptr);
169
29
    if (*expr == nullptr) {
170
0
        return Status::OK();
171
0
    }
172
29
    if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr->get());
173
29
        runtime_filter != nullptr) {
174
1
        auto impl = runtime_filter->get_impl();
175
1
        DORIS_CHECK(impl != nullptr);
176
1
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&impl, slot_id_to_global_index));
177
1
        runtime_filter->set_impl(std::move(impl));
178
1
        return Status::OK();
179
1
    }
180
28
    if ((*expr)->is_slot_ref()) {
181
13
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get());
182
13
        const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id());
183
13
        if (global_index_it == slot_id_to_global_index.end()) {
184
1
            DORIS_CHECK(slot_ref->slot_id() >= 0);
185
1
            const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id()));
186
1
            *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
187
1
                                            cast_set<int>(global_index.value()), -1,
188
1
                                            slot_ref->data_type(), slot_ref->column_name());
189
1
            RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
190
1
            return Status::OK();
191
1
        }
192
12
        const auto global_index = global_index_it->second;
193
12
        *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
194
12
                                        cast_set<int>(global_index.value()), -1,
195
12
                                        slot_ref->data_type(), slot_ref->column_name());
196
12
        RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
197
12
        return Status::OK();
198
12
    }
199
15
    auto children = (*expr)->children();
200
15
    for (auto& child : children) {
201
15
        if (child == nullptr) {
202
0
            continue;
203
0
        }
204
15
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index));
205
15
    }
206
15
    (*expr)->set_children(std::move(children));
207
15
    return Status::OK();
208
15
}
209
210
} // namespace
211
212
#ifdef BE_TEST
213
Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type,
214
                                          format::FileFormat* file_format) {
215
    return _to_file_format(format_type, file_format);
216
}
217
218
bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info,
219
                                           const std::string& column_name) {
220
    return is_partition_slot(slot_info, column_name);
221
}
222
223
bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info,
224
                                           const std::string& column_name) {
225
    return is_data_file_slot(slot_info, column_name);
226
}
227
228
Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index(
229
        VExprSPtr* expr,
230
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
231
    return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index);
232
}
233
#endif
234
235
607
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
236
607
    const auto format_type = get_range_format_type(params, range);
237
607
    if (format_type == TFileFormatType::FORMAT_PARQUET) {
238
126
        return is_supported_table_format(range);
239
481
    } else if (format_type == TFileFormatType::FORMAT_ARROW) {
240
3
        return is_supported_arrow_table_format(range);
241
478
    } else if (format_type == TFileFormatType::FORMAT_JNI) {
242
5
        return is_supported_jni_table_format(range);
243
473
    } else if (is_csv_format(format_type) || is_text_format(format_type) ||
244
473
               is_json_format(format_type) || is_native_format(format_type)) {
245
371
        return is_supported_table_format(range);
246
371
    } else {
247
102
        LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2";
248
102
        return false;
249
102
    }
250
607
}
251
252
FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
253
                             std::shared_ptr<SplitSourceConnector> split_source,
254
                             RuntimeProfile* profile, ShardedKVCache* kv_cache,
255
                             const std::unordered_map<std::string, int>* colname_to_slot_id)
256
469
        : Scanner(state, local_state, limit, profile),
257
469
          _split_source(std::move(split_source)),
258
469
          _kv_cache(kv_cache) {
259
469
    (void)colname_to_slot_id;
260
469
    if (state->get_query_ctx() != nullptr &&
261
469
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
262
468
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
263
468
    } else {
264
1
        _params = _split_source->get_params();
265
1
    }
266
469
}
267
268
468
Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
269
468
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
270
468
    _get_block_timer =
271
468
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1);
272
468
    _file_counter =
273
468
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
274
468
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
275
468
                                                      "FileReadBytes", TUnit::BYTES, 1);
276
468
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
277
468
                                                      "FileReadCalls", TUnit::UNIT, 1);
278
468
    _file_read_time_counter =
279
468
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1);
280
468
    _adaptive_batch_predicted_rows_counter = ADD_COUNTER_WITH_LEVEL(
281
468
            _local_state->scanner_profile(), "AdaptiveBatchPredictedRows", TUnit::UNIT, 1);
282
468
    _adaptive_batch_actual_bytes_counter = ADD_COUNTER_WITH_LEVEL(
283
468
            _local_state->scanner_profile(), "AdaptiveBatchActualBytes", TUnit::BYTES, 1);
284
468
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
285
468
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
286
468
    _file_cache_statistics = std::make_unique<io::FileCacheStatistics>();
287
468
    _file_reader_stats = std::make_unique<io::FileReaderStats>();
288
468
    RETURN_IF_ERROR(_init_io_ctx());
289
468
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
290
468
    _io_ctx->file_reader_stats = _file_reader_stats.get();
291
468
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
292
468
    return Status::OK();
293
468
}
294
295
469
Status FileScannerV2::_open_impl(RuntimeState* state) {
296
469
    RETURN_IF_CANCELLED(state);
297
469
    RETURN_IF_ERROR(Scanner::_open_impl(state));
298
469
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
299
469
    if (_first_scan_range) {
300
467
        RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader));
301
467
        DORIS_CHECK(_table_reader != nullptr);
302
467
        RETURN_IF_ERROR(_init_expr_ctxes());
303
467
        RETURN_IF_ERROR(_init_table_reader(_current_range));
304
467
    }
305
469
    return Status::OK();
306
469
}
307
308
1.12k
Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
309
1.59k
    while (true) {
310
1.59k
        RETURN_IF_CANCELLED(state);
311
1.59k
        if (!_has_prepared_split) {
312
938
            RETURN_IF_ERROR(_prepare_next_split(eof));
313
938
            if (*eof) {
314
469
                return Status::OK();
315
469
            }
316
938
        }
317
318
1.12k
        {
319
1.12k
            SCOPED_TIMER(_get_block_timer);
320
1.12k
            if (_should_run_adaptive_batch_size()) {
321
1.11k
                _table_reader->set_batch_size(_predict_reader_batch_rows());
322
1.11k
            }
323
1.12k
            RETURN_IF_ERROR(_table_reader->get_block(block, eof));
324
1.12k
        }
325
1.12k
        if (*eof) {
326
469
            _state->update_num_finished_scan_range(1);
327
469
            _has_prepared_split = false;
328
469
            *eof = false;
329
469
            continue;
330
469
        }
331
657
        _update_adaptive_batch_size(*block);
332
657
        return Status::OK();
333
1.12k
    }
334
1.12k
}
335
336
938
Status FileScannerV2::_prepare_next_split(bool* eos) {
337
938
    bool has_next = _first_scan_range;
338
938
    if (!_first_scan_range) {
339
471
        RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
340
471
    }
341
938
    _first_scan_range = false;
342
938
    if (!has_next || _should_stop) {
343
469
        *eos = true;
344
469
        return Status::OK();
345
469
    }
346
469
    DORIS_CHECK(_table_reader != nullptr);
347
469
    _current_range_path = _current_range.path;
348
469
    _init_adaptive_batch_size_state(get_range_format_type(*_params, _current_range));
349
469
    RETURN_IF_ERROR(_prepare_table_reader_split(_current_range));
350
469
    COUNTER_UPDATE(_file_counter, 1);
351
469
    _has_prepared_split = true;
352
469
    *eos = false;
353
469
    return Status::OK();
354
469
}
355
356
467
Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) {
357
467
    const auto format_type = get_range_format_type(*_params, range);
358
467
    format::FileFormat file_format;
359
467
    RETURN_IF_ERROR(_to_file_format(format_type, &file_format));
360
467
    DORIS_CHECK(_table_reader != nullptr);
361
362
467
    format::TableColumnPredicates table_column_predicates;
363
467
    RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates));
364
467
    VExprContextSPtrs table_conjuncts;
365
467
    RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts));
366
467
    RETURN_IF_ERROR(_table_reader->init({
367
467
            .projected_columns = _projected_columns,
368
467
            .column_predicates = std::move(table_column_predicates),
369
467
            .conjuncts = std::move(table_conjuncts),
370
467
            .format = file_format,
371
467
            .scan_params = const_cast<TFileScanRangeParams*>(_params),
372
467
            .io_ctx = _io_ctx,
373
467
            .runtime_state = _state,
374
467
            .scanner_profile = _local_state->scanner_profile(),
375
467
            .file_slot_descs = &_file_slot_descs,
376
467
            .push_down_agg_type = _local_state->get_push_down_agg_type(),
377
467
            .condition_cache_digest = _local_state->get_condition_cache_digest(),
378
467
    }));
379
467
    return Status::OK();
380
467
}
381
382
Status FileScannerV2::_create_table_reader_for_format(
383
467
        const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const {
384
467
    DORIS_CHECK(reader != nullptr);
385
467
    const auto table_format = table_format_name(range);
386
467
    if (table_format == "NotSet" || table_format == "tvf") {
387
465
        *reader = std::make_unique<format::TableReader>();
388
465
    } else if (table_format == "hive") {
389
0
        *reader = format::hive::HiveReader::create_unique();
390
2
    } else if (table_format == "iceberg") {
391
0
        if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) {
392
0
            *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>();
393
0
        } else {
394
0
            *reader = std::make_unique<format::iceberg::IcebergTableReader>();
395
0
        }
396
2
    } else if (table_format == "paimon") {
397
0
        *reader = std::make_unique<format::paimon::PaimonHybridReader>();
398
2
    } else if (table_format == "hudi") {
399
0
        *reader = std::make_unique<format::hudi::HudiHybridReader>();
400
2
    } else if (table_format == "jdbc") {
401
2
        *reader = std::make_unique<format::jdbc::JdbcJniReader>();
402
2
    } else if (table_format == "max_compute") {
403
0
        const auto* mc_desc =
404
0
                static_cast<const MaxComputeTableDescriptor*>(_output_tuple_desc->table_desc());
405
0
        RETURN_IF_ERROR(mc_desc->init_status());
406
0
        *reader = std::make_unique<format::max_compute::MaxComputeJniReader>(mc_desc);
407
0
    } else if (table_format == "trino_connector") {
408
0
        *reader = std::make_unique<format::trino_connector::TrinoConnectorJniReader>();
409
0
    } else if (table_format == "remote_doris") {
410
0
        *reader = std::make_unique<format::remote_doris::RemoteDorisReader>();
411
0
    } else {
412
0
        return Status::NotSupported("FileScannerV2 does not support table format {}", table_format);
413
0
    }
414
467
    return Status::OK();
415
467
}
416
417
466
Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) {
418
466
    std::map<std::string, Field> partition_values;
419
466
    RETURN_IF_ERROR(_generate_partition_values(range, &partition_values));
420
466
    format::FileFormat current_split_format;
421
466
    RETURN_IF_ERROR(_to_file_format(get_range_format_type(*_params, range), &current_split_format));
422
466
    RETURN_IF_ERROR(_table_reader->prepare_split({
423
466
            .partition_values = std::move(partition_values),
424
466
            .cache = _kv_cache,
425
466
            .current_range = range,
426
466
            .current_split_format = current_split_format,
427
466
            .global_rowid_context = _create_global_rowid_context(range),
428
466
    }));
429
466
    return Status::OK();
430
466
}
431
432
9
bool FileScannerV2::_should_enable_file_meta_cache() const {
433
9
    return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
434
9
           _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
435
9
}
436
437
std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context(
438
467
        const TFileRangeDesc& range) const {
439
467
    if (!_need_global_rowid_column) {
440
459
        return std::nullopt;
441
459
    }
442
8
    auto& id_file_map = _state->get_id_file_map();
443
8
    DORIS_CHECK(id_file_map != nullptr);
444
8
    const auto file_id = id_file_map->get_file_mapping_id(
445
8
            std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(),
446
8
                                          range, _should_enable_file_meta_cache()));
447
8
    return format::GlobalRowIdContext {
448
8
            .version = IdManager::ID_VERSION,
449
8
            .backend_id = BackendOptions::get_backend_id(),
450
8
            .file_id = file_id,
451
8
    };
452
467
}
453
454
Status FileScannerV2::_generate_partition_values(
455
467
        const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const {
456
467
    DORIS_CHECK(partition_values != nullptr);
457
467
    partition_values->clear();
458
467
    if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) {
459
467
        return Status::OK();
460
467
    }
461
0
    DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size());
462
0
    for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) {
463
0
        const auto& key = range.columns_from_path_keys[idx];
464
0
        const auto it = _partition_slot_descs.find(key);
465
0
        if (it == _partition_slot_descs.end()) {
466
0
            continue;
467
0
        }
468
0
        const auto& value = range.columns_from_path[idx];
469
0
        const bool is_null = range.__isset.columns_from_path_is_null &&
470
0
                             idx < range.columns_from_path_is_null.size() &&
471
0
                             range.columns_from_path_is_null[idx];
472
0
        Field field;
473
0
        DORIS_CHECK(it->second.slot_desc != nullptr);
474
0
        RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field));
475
0
        partition_values->emplace(it->second.canonical_name, std::move(field));
476
0
    }
477
0
    return Status::OK();
478
0
}
479
480
Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc,
481
                                             const std::string& value, bool is_null,
482
0
                                             Field* field) const {
483
0
    DORIS_CHECK(slot_desc != nullptr);
484
0
    DORIS_CHECK(field != nullptr);
485
0
    if (is_null) {
486
0
        *field = Field::create_field<TYPE_NULL>(Null());
487
0
        return Status::OK();
488
0
    }
489
0
    const auto data_type = remove_nullable(slot_desc->get_data_type_ptr());
490
0
    auto column = data_type->create_column();
491
0
    auto serde = data_type->get_serde();
492
0
    DataTypeSerDe::FormatOptions options;
493
0
    options.converted_from_string = true;
494
0
    StringRef ref(value.data(), value.size());
495
0
    RETURN_IF_ERROR(serde->from_string(ref, *column, options));
496
0
    DORIS_CHECK(column->size() == 1);
497
0
    *field = (*column)[0];
498
0
    return Status::OK();
499
0
}
500
501
467
Status FileScannerV2::_init_expr_ctxes() {
502
467
    _slot_id_to_desc.clear();
503
467
    _slot_id_to_global_index.clear();
504
467
    _partition_slot_descs.clear();
505
467
    _file_slot_descs.clear();
506
3.06k
    for (const auto* slot_desc : _output_tuple_desc->slots()) {
507
3.06k
        _slot_id_to_desc.emplace(slot_desc->id(), slot_desc);
508
3.06k
    }
509
467
    DORIS_CHECK(_table_reader != nullptr);
510
467
    RETURN_IF_ERROR(_build_projected_columns(*_table_reader));
511
467
    return Status::OK();
512
467
}
513
514
467
Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) {
515
467
    _projected_columns.clear();
516
467
    _projected_columns.reserve(_params->required_slots.size());
517
467
    _need_global_rowid_column = false;
518
467
    format::ProjectedColumnBuildContext build_context {
519
467
            .scan_params = _params,
520
467
            .range = &_current_range,
521
467
            .runtime_state = _state,
522
467
    };
523
524
3.53k
    for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) {
525
3.06k
        const auto& slot_info = _params->required_slots[slot_idx];
526
3.06k
        const auto it = _slot_id_to_desc.find(slot_info.slot_id);
527
3.06k
        if (it == _slot_id_to_desc.end()) {
528
0
            return Status::InternalError("Unknown source slot descriptor, slot_id={}",
529
0
                                         slot_info.slot_id);
530
0
        }
531
3.06k
        auto column = _build_table_column(it->second);
532
3.06k
        if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
533
9
            _need_global_rowid_column = true;
534
9
        }
535
3.06k
        RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
536
3.06k
        build_context.schema_column.reset();
537
3.06k
        RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column));
538
        // Build nested children from access paths generated by the slot's access-path
539
        // expressions. A projected column can therefore contain only a subset of the schema
540
        // column's nested children.
541
3.06k
        RETURN_IF_ERROR(AccessPathParser::build_nested_children(
542
3.06k
                &column, it->second,
543
3.06k
                build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr));
544
3.06k
        if (is_partition_slot(slot_info, column.name)) {
545
0
            column.is_partition_key = true;
546
0
            _partition_slot_descs.emplace(
547
0
                    column.name,
548
0
                    PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
549
0
            for (const auto& alias : column.name_mapping) {
550
0
                _partition_slot_descs.emplace(
551
0
                        alias,
552
0
                        PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
553
0
            }
554
3.06k
        } else if (is_data_file_slot(slot_info, column.name)) {
555
3.05k
            _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second));
556
3.05k
        }
557
3.06k
        const auto global_index = format::GlobalIndex(slot_idx);
558
3.06k
        _slot_id_to_global_index.emplace(slot_info.slot_id, global_index);
559
3.06k
        _projected_columns.push_back(std::move(column));
560
3.06k
    }
561
467
    RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context));
562
467
    return Status::OK();
563
467
}
564
565
Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info,
566
3.06k
                                          VExprContextSPtr* ctx) const {
567
3.06k
    DORIS_CHECK(ctx != nullptr);
568
3.06k
    if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
569
3.05k
        return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx);
570
3.05k
    }
571
572
10
    if (_params->__isset.default_value_of_src_slot) {
573
10
        const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id);
574
10
        if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) {
575
0
            return VExpr::create_expr_tree(it->second, *ctx);
576
0
        }
577
10
    }
578
10
    return Status::OK();
579
10
}
580
581
3.06k
format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) {
582
3.06k
    DORIS_CHECK(slot_desc != nullptr);
583
3.06k
    format::ColumnDefinition column;
584
    // TODO(gabriel): why always BY_NAME here?
585
3.06k
    column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name());
586
3.06k
    column.name = slot_desc->col_name();
587
3.06k
    column.type = slot_desc->get_data_type_ptr();
588
3.06k
    return column;
589
3.06k
}
590
591
Status FileScannerV2::_build_table_column_predicates(
592
467
        format::TableColumnPredicates* predicates) const {
593
467
    DORIS_CHECK(predicates != nullptr);
594
467
    predicates->clear();
595
467
    const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates;
596
3.06k
    for (const auto& [slot_id, slot_predicate_list] : slot_predicates) {
597
3.06k
        const auto it = _slot_id_to_desc.find(slot_id);
598
3.06k
        if (it == _slot_id_to_desc.end()) {
599
0
            continue;
600
0
        }
601
3.06k
        const auto global_index_it = _slot_id_to_global_index.find(slot_id);
602
3.06k
        if (global_index_it == _slot_id_to_global_index.end()) {
603
0
            continue;
604
0
        }
605
3.06k
        (*predicates)[global_index_it->second] = slot_predicate_list;
606
3.06k
    }
607
467
    return Status::OK();
608
467
}
609
610
467
Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const {
611
467
    DORIS_CHECK(conjuncts != nullptr);
612
467
    conjuncts->clear();
613
467
    conjuncts->reserve(_conjuncts.size());
614
467
    for (const auto& conjunct : _conjuncts) {
615
9
        VExprSPtr root;
616
9
        RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root));
617
9
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index));
618
9
        conjuncts->push_back(VExprContext::create_shared(std::move(root)));
619
9
    }
620
467
    return Status::OK();
621
467
}
622
623
0
TFileFormatType::type FileScannerV2::_get_current_format_type() const {
624
0
    return get_range_format_type(*_params, _current_range);
625
0
}
626
627
Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
628
950
                                      format::FileFormat* file_format) {
629
950
    DORIS_CHECK(file_format != nullptr);
630
950
    switch (format_type) {
631
221
    case TFileFormatType::FORMAT_PARQUET:
632
221
        *file_format = format::FileFormat::PARQUET;
633
221
        return Status::OK();
634
5
    case TFileFormatType::FORMAT_JNI:
635
5
        *file_format = format::FileFormat::JNI;
636
5
        return Status::OK();
637
670
    case TFileFormatType::FORMAT_CSV_PLAIN:
638
671
    case TFileFormatType::FORMAT_CSV_GZ:
639
672
    case TFileFormatType::FORMAT_CSV_BZ2:
640
673
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
641
674
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
642
675
    case TFileFormatType::FORMAT_CSV_LZOP:
643
676
    case TFileFormatType::FORMAT_CSV_DEFLATE:
644
677
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
645
678
    case TFileFormatType::FORMAT_PROTO:
646
678
        *file_format = format::FileFormat::CSV;
647
678
        return Status::OK();
648
1
    case TFileFormatType::FORMAT_TEXT:
649
1
        *file_format = format::FileFormat::TEXT;
650
1
        return Status::OK();
651
37
    case TFileFormatType::FORMAT_JSON:
652
37
        *file_format = format::FileFormat::JSON;
653
37
        return Status::OK();
654
5
    case TFileFormatType::FORMAT_NATIVE:
655
5
        *file_format = format::FileFormat::NATIVE;
656
5
        return Status::OK();
657
1
    case TFileFormatType::FORMAT_ARROW:
658
1
        *file_format = format::FileFormat::ARROW;
659
1
        return Status::OK();
660
1
    default:
661
1
        return Status::NotSupported("FileScannerV2 does not support file format {}",
662
1
                                    to_string(format_type));
663
950
    }
664
950
}
665
666
469
Status FileScannerV2::_init_io_ctx() {
667
469
    _io_ctx = std::make_shared<io::IOContext>();
668
469
    _io_ctx->query_id = &_state->query_id();
669
469
    return Status::OK();
670
469
}
671
672
469
void FileScannerV2::_reset_adaptive_batch_size_state() {
673
469
    _block_size_predictor.reset();
674
469
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
675
469
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, int64_t(0));
676
469
}
677
678
468
void FileScannerV2::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
679
468
    _reset_adaptive_batch_size_state();
680
468
    if (!_should_enable_adaptive_batch_size(format_type)) {
681
2
        return;
682
2
    }
683
684
    // V2 native file readers do not have reliable row-width hints before the first batch. Start
685
    // every split with a small probe, then learn bytes-per-row from the materialized table block
686
    // and keep later batches close to RuntimeState::preferred_block_size_bytes().
687
466
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
688
466
            _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
689
466
            _state->batch_size());
690
466
}
691
692
468
bool FileScannerV2::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
693
468
    if (!config::enable_adaptive_batch_size) {
694
0
        return false;
695
0
    }
696
468
    switch (format_type) {
697
110
    case TFileFormatType::FORMAT_PARQUET:
698
110
    case TFileFormatType::FORMAT_ORC:
699
445
    case TFileFormatType::FORMAT_CSV_PLAIN:
700
445
    case TFileFormatType::FORMAT_CSV_GZ:
701
445
    case TFileFormatType::FORMAT_CSV_BZ2:
702
445
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
703
445
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
704
445
    case TFileFormatType::FORMAT_CSV_LZOP:
705
445
    case TFileFormatType::FORMAT_CSV_DEFLATE:
706
445
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
707
445
    case TFileFormatType::FORMAT_PROTO:
708
445
    case TFileFormatType::FORMAT_TEXT:
709
463
    case TFileFormatType::FORMAT_JSON:
710
465
    case TFileFormatType::FORMAT_JNI:
711
465
        return true;
712
2
    default:
713
2
        return false;
714
468
    }
715
468
}
716
717
1.78k
bool FileScannerV2::_should_run_adaptive_batch_size() const {
718
    // COUNT pushdown emits synthetic rows from file metadata and does not materialize file columns,
719
    // so there is no useful row-width sample to learn from.
720
1.78k
    return _block_size_predictor != nullptr &&
721
1.78k
           _local_state->get_push_down_agg_type() != TPushAggOp::type::COUNT;
722
1.78k
}
723
724
1.11k
size_t FileScannerV2::_predict_reader_batch_rows() {
725
1.11k
    DORIS_CHECK(_block_size_predictor != nullptr);
726
    // Before history exists this returns the probe row count; after update(), it returns roughly
727
    // preferred_block_size_bytes / EWMA(bytes_per_row), capped by RuntimeState::batch_size().
728
1.11k
    const size_t predicted_rows = _block_size_predictor->predict_next_rows();
729
1.11k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
730
1.11k
    return predicted_rows;
731
1.11k
}
732
733
657
void FileScannerV2::_update_adaptive_batch_size(const Block& block) {
734
657
    if (!_should_run_adaptive_batch_size()) {
735
7
        return;
736
7
    }
737
650
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, static_cast<int64_t>(block.bytes()));
738
650
    if (block.rows() == 0) {
739
0
        return;
740
0
    }
741
    // The sample is taken after TableReader has finalized file-local columns to table columns.
742
    // This matches the memory shape seen by upstream operators and catches very wide nested
743
    // columns, such as map/string payloads, after the first probe batch.
744
650
    if (!_block_size_predictor->has_history()) {
745
467
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
746
467
    }
747
650
    _block_size_predictor->update(block);
748
650
}
749
750
469
Status FileScannerV2::close(RuntimeState* state) {
751
469
    if (!_try_close()) {
752
0
        return Status::OK();
753
0
    }
754
469
    if (_table_reader != nullptr) {
755
467
        RETURN_IF_ERROR(_table_reader->close());
756
467
        _report_condition_cache_profile();
757
467
        _table_reader.reset();
758
467
    }
759
469
    return Scanner::close(state);
760
469
}
761
762
469
void FileScannerV2::try_stop() {
763
469
    Scanner::try_stop();
764
469
    if (_io_ctx) {
765
469
        _io_ctx->should_stop = true;
766
469
    }
767
469
}
768
769
886
void FileScannerV2::update_realtime_counters() {
770
886
    if (_file_reader_stats == nullptr) {
771
0
        return;
772
0
    }
773
886
    const int64_t bytes_read = _file_reader_stats->read_bytes;
774
886
    COUNTER_SET(_file_read_bytes_counter, bytes_read);
775
886
    COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
776
886
    COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
777
886
}
778
779
469
void FileScannerV2::_collect_profile_before_close() {
780
469
    _report_file_reader_predicate_filtered_rows();
781
469
    Scanner::_collect_profile_before_close();
782
469
    if (_file_reader_stats != nullptr) {
783
469
        COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes));
784
469
        COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
785
469
        COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
786
469
    }
787
    // Query profiles can be collected before Scanner::close() runs. Publish condition-cache
788
    // counters here as well, using deltas so this method and close() cannot double count.
789
469
    _report_condition_cache_profile();
790
469
}
791
792
469
bool FileScannerV2::_should_update_load_counters() const {
793
469
    if (_is_load) {
794
0
        return true;
795
0
    }
796
    // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a
797
    // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by
798
    // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM
799
    // is only reachable from such load entries, never from normal queries, so use it to
800
    // identify these scanners.
801
469
    return (_params != nullptr && _params->__isset.file_type &&
802
469
            _params->file_type == TFileType::FILE_STREAM) ||
803
469
           (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM);
804
469
}
805
806
469
void FileScannerV2::_report_file_reader_predicate_filtered_rows() {
807
469
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0;
808
469
    const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows;
809
469
    if (filtered_delta > 0) {
810
        // File readers can evaluate localized conjuncts before a block reaches Scanner. Count
811
        // those rows as scanner-level unselected rows so load statistics stay identical no matter
812
        // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block().
813
9
        _counter.num_rows_unselected += filtered_delta;
814
9
        _reported_predicate_filtered_rows = filtered_rows;
815
9
    }
816
469
}
817
818
936
void FileScannerV2::_report_condition_cache_profile() {
819
936
    auto* local_state = static_cast<FileScanLocalState*>(_local_state);
820
936
    const int64_t hit_count =
821
936
            _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0;
822
936
    const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count;
823
936
    if (hit_delta > 0) {
824
0
        COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta);
825
0
        _reported_condition_cache_hit_count = hit_count;
826
0
    }
827
936
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0;
828
936
    const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows;
829
936
    if (filtered_delta > 0) {
830
0
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta);
831
0
        _reported_condition_cache_filtered_rows = filtered_rows;
832
0
    }
833
936
}
834
835
} // namespace doris