Coverage Report

Created: 2026-07-03 22:26

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner_v2.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <algorithm>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <utility>
29
30
#include "common/cast_set.h"
31
#include "common/config.h"
32
#include "common/consts.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column.h"
37
#include "core/data_type/data_type.h"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type_serde/data_type_serde.h"
40
#include "core/string_ref.h"
41
#include "exec/common/util.hpp"
42
#include "exec/operator/scan_operator.h"
43
#include "exec/scan/access_path_parser.h"
44
#include "exprs/runtime_filter_expr.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "exprs/vslot_ref.h"
48
#include "format/format_common.h"
49
#include "format_v2/column_mapper.h"
50
#include "format_v2/jni/iceberg_sys_table_reader.h"
51
#include "format_v2/jni/jdbc_reader.h"
52
#include "format_v2/jni/max_compute_jni_reader.h"
53
#include "format_v2/jni/trino_connector_jni_reader.h"
54
#include "format_v2/table/hive_reader.h"
55
#include "format_v2/table/hudi_reader.h"
56
#include "format_v2/table/iceberg_reader.h"
57
#include "format_v2/table/paimon_reader.h"
58
#include "format_v2/table/remote_doris_reader.h"
59
#include "format_v2/table_reader.h"
60
#include "io/fs/file_meta_cache.h"
61
#include "io/io_common.h"
62
#include "runtime/descriptors.h"
63
#include "runtime/exec_env.h"
64
#include "runtime/runtime_state.h"
65
#include "service/backend_options.h"
66
#include "storage/id_manager.h"
67
68
namespace doris {
69
namespace {
70
71
77.8k
std::string table_format_name(const TFileRangeDesc& range) {
72
77.8k
    return range.__isset.table_format_params ? range.table_format_params.table_format_type
73
77.8k
                                             : "NotSet";
74
77.8k
}
75
76
TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params,
77
188k
                                            const TFileRangeDesc& range) {
78
188k
    return range.__isset.format_type ? range.format_type : params.format_type;
79
188k
}
80
81
39.4k
bool is_supported_table_format(const TFileRangeDesc& range) {
82
39.4k
    const auto table_format = table_format_name(range);
83
39.4k
    if (table_format == "hudi" && range.__isset.table_format_params &&
84
39.4k
        range.table_format_params.__isset.hudi_params &&
85
39.4k
        range.table_format_params.hudi_params.__isset.delta_logs &&
86
39.4k
        !range.table_format_params.hudi_params.delta_logs.empty()) {
87
        // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path.
88
        // FileScannerV2 currently supports native Parquet data files only.
89
1
        return false;
90
1
    }
91
39.4k
    return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" ||
92
39.4k
           table_format == "iceberg" || table_format == "paimon" || table_format == "hudi";
93
39.4k
}
94
95
101
bool is_supported_arrow_table_format(const TFileRangeDesc& range) {
96
101
    return table_format_name(range) == "remote_doris";
97
101
}
98
99
5.14k
bool is_supported_jni_table_format(const TFileRangeDesc& range) {
100
5.14k
    const auto table_format = table_format_name(range);
101
5.14k
    if (table_format == "paimon") {
102
1.85k
        return range.__isset.table_format_params &&
103
1.85k
               range.table_format_params.__isset.paimon_params &&
104
1.85k
               range.table_format_params.paimon_params.__isset.reader_type &&
105
1.86k
               range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI;
106
1.85k
    }
107
3.29k
    return table_format == "jdbc" || table_format == "iceberg" || table_format == "hudi" ||
108
3.29k
           table_format == "max_compute" || table_format == "trino_connector";
109
5.14k
}
110
111
15.7k
bool is_csv_format(TFileFormatType::type format_type) {
112
15.7k
    switch (format_type) {
113
949
    case TFileFormatType::FORMAT_CSV_PLAIN:
114
950
    case TFileFormatType::FORMAT_CSV_GZ:
115
951
    case TFileFormatType::FORMAT_CSV_BZ2:
116
952
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
117
953
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
118
954
    case TFileFormatType::FORMAT_CSV_LZOP:
119
955
    case TFileFormatType::FORMAT_CSV_DEFLATE:
120
956
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
121
957
    case TFileFormatType::FORMAT_PROTO:
122
957
        return true;
123
14.8k
    default:
124
14.8k
        return false;
125
15.7k
    }
126
15.7k
}
127
128
14.7k
bool is_text_format(TFileFormatType::type format_type) {
129
14.7k
    return format_type == TFileFormatType::FORMAT_TEXT;
130
14.7k
}
131
132
10.0k
bool is_json_format(TFileFormatType::type format_type) {
133
10.0k
    return format_type == TFileFormatType::FORMAT_JSON;
134
10.0k
}
135
136
9.52k
bool is_native_format(TFileFormatType::type format_type) {
137
9.52k
    return format_type == TFileFormatType::FORMAT_NATIVE;
138
9.52k
}
139
140
212k
bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
141
212k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
142
212k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
143
1.96k
        return false;
144
1.96k
    }
145
210k
    return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY
146
210k
                                      : !slot_info.is_file_slot;
147
212k
}
148
149
202k
bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
150
202k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
151
202k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
152
1.96k
        return false;
153
1.96k
    }
154
    // CSV and other non-self-describing formats need FE slot descriptors for only the columns that
155
    // are physically read from the file. Partition/default/virtual columns stay in TableReader's
156
    // mapping layer and are materialized after the file-local block is read. New FE provides an
157
    // explicit category; old FE falls back to `is_file_slot`.
158
200k
    if (slot_info.__isset.category) {
159
200k
        return slot_info.category == TColumnCategory::REGULAR ||
160
200k
               slot_info.category == TColumnCategory::GENERATED;
161
200k
    }
162
4
    return slot_info.is_file_slot;
163
200k
}
164
165
Status rewrite_slot_refs_to_global_index(
166
        VExprSPtr* expr,
167
70.3k
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
168
70.3k
    DORIS_CHECK(expr != nullptr);
169
70.3k
    if (*expr == nullptr) {
170
0
        return Status::OK();
171
0
    }
172
70.3k
    if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr->get());
173
70.3k
        runtime_filter != nullptr) {
174
6.71k
        auto impl = runtime_filter->get_impl();
175
6.71k
        DORIS_CHECK(impl != nullptr);
176
6.71k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&impl, slot_id_to_global_index));
177
6.71k
        runtime_filter->set_impl(std::move(impl));
178
6.71k
        return Status::OK();
179
6.71k
    }
180
63.6k
    if ((*expr)->is_slot_ref()) {
181
21.4k
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get());
182
21.4k
        const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id());
183
21.4k
        if (global_index_it == slot_id_to_global_index.end()) {
184
1
            DORIS_CHECK(slot_ref->slot_id() >= 0);
185
1
            const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id()));
186
1
            *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
187
1
                                            cast_set<int>(global_index.value()), -1,
188
1
                                            slot_ref->data_type(), slot_ref->column_name());
189
1
            RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
190
1
            return Status::OK();
191
1
        }
192
21.4k
        const auto global_index = global_index_it->second;
193
21.4k
        *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
194
21.4k
                                        cast_set<int>(global_index.value()), -1,
195
21.4k
                                        slot_ref->data_type(), slot_ref->column_name());
196
21.4k
        RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
197
21.4k
        return Status::OK();
198
21.4k
    }
199
42.1k
    auto children = (*expr)->children();
200
42.9k
    for (auto& child : children) {
201
42.9k
        if (child == nullptr) {
202
0
            continue;
203
0
        }
204
42.9k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index));
205
42.9k
    }
206
42.1k
    (*expr)->set_children(std::move(children));
207
42.1k
    return Status::OK();
208
42.1k
}
209
210
} // namespace
211
212
#ifdef BE_TEST
213
Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type,
214
                                          format::FileFormat* file_format) {
215
    return _to_file_format(format_type, file_format);
216
}
217
218
bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info,
219
                                           const std::string& column_name) {
220
    return is_partition_slot(slot_info, column_name);
221
}
222
223
bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info,
224
                                           const std::string& column_name) {
225
    return is_data_file_slot(slot_info, column_name);
226
}
227
228
Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index(
229
        VExprSPtr* expr,
230
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
231
    return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index);
232
}
233
#endif
234
235
54.1k
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
236
54.1k
    const auto format_type = get_range_format_type(params, range);
237
54.1k
    if (format_type == TFileFormatType::FORMAT_PARQUET) {
238
33.2k
        return is_supported_table_format(range);
239
33.2k
    } else if (format_type == TFileFormatType::FORMAT_ARROW) {
240
101
        return is_supported_arrow_table_format(range);
241
20.7k
    } else if (format_type == TFileFormatType::FORMAT_JNI) {
242
5.14k
        return is_supported_jni_table_format(range);
243
15.6k
    } else if (is_csv_format(format_type) || is_text_format(format_type) ||
244
15.6k
               is_json_format(format_type) || is_native_format(format_type)) {
245
6.19k
        return is_supported_table_format(range);
246
9.45k
    } else {
247
9.45k
        LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2";
248
9.45k
        return false;
249
9.45k
    }
250
54.1k
}
251
252
FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
253
                             std::shared_ptr<SplitSourceConnector> split_source,
254
                             RuntimeProfile* profile, ShardedKVCache* kv_cache,
255
                             const std::unordered_map<std::string, int>* colname_to_slot_id)
256
41.1k
        : Scanner(state, local_state, limit, profile),
257
41.1k
          _split_source(std::move(split_source)),
258
41.1k
          _kv_cache(kv_cache) {
259
41.1k
    (void)colname_to_slot_id;
260
41.1k
    if (state->get_query_ctx() != nullptr &&
261
41.1k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
262
41.1k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
263
18.4E
    } else {
264
18.4E
        _params = _split_source->get_params();
265
18.4E
    }
266
41.1k
}
267
268
41.0k
Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
269
41.0k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
270
41.0k
    _get_block_timer =
271
41.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1);
272
41.0k
    _file_counter =
273
41.0k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
274
41.0k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
275
41.0k
                                                      "FileReadBytes", TUnit::BYTES, 1);
276
41.0k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
277
41.0k
                                                      "FileReadCalls", TUnit::UNIT, 1);
278
41.0k
    _file_read_time_counter =
279
41.0k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1);
280
41.0k
    _adaptive_batch_predicted_rows_counter = ADD_COUNTER_WITH_LEVEL(
281
41.0k
            _local_state->scanner_profile(), "AdaptiveBatchPredictedRows", TUnit::UNIT, 1);
282
41.0k
    _adaptive_batch_actual_bytes_counter = ADD_COUNTER_WITH_LEVEL(
283
41.0k
            _local_state->scanner_profile(), "AdaptiveBatchActualBytes", TUnit::BYTES, 1);
284
41.0k
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
285
41.0k
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
286
41.0k
    _file_cache_statistics = std::make_unique<io::FileCacheStatistics>();
287
41.0k
    _file_reader_stats = std::make_unique<io::FileReaderStats>();
288
41.0k
    RETURN_IF_ERROR(_init_io_ctx());
289
41.0k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
290
41.0k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
291
41.0k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
292
41.0k
    return Status::OK();
293
41.0k
}
294
295
41.2k
Status FileScannerV2::_open_impl(RuntimeState* state) {
296
41.2k
    RETURN_IF_CANCELLED(state);
297
41.2k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
298
41.2k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
299
41.2k
    if (_first_scan_range) {
300
33.2k
        RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader));
301
33.2k
        DORIS_CHECK(_table_reader != nullptr);
302
33.2k
        RETURN_IF_ERROR(_init_expr_ctxes());
303
33.2k
        RETURN_IF_ERROR(_init_table_reader(_current_range));
304
33.2k
    }
305
41.2k
    return Status::OK();
306
41.2k
}
307
308
126k
Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
309
170k
    while (true) {
310
170k
        RETURN_IF_CANCELLED(state);
311
170k
        if (!_has_prepared_split) {
312
85.2k
            RETURN_IF_ERROR(_prepare_next_split(eof));
313
85.2k
            if (*eof) {
314
40.6k
                return Status::OK();
315
40.6k
            }
316
85.2k
        }
317
318
130k
        {
319
130k
            SCOPED_TIMER(_get_block_timer);
320
130k
            if (_should_run_adaptive_batch_size()) {
321
122k
                _table_reader->set_batch_size(_predict_reader_batch_rows());
322
122k
            }
323
130k
            RETURN_IF_ERROR(_table_reader->get_block(block, eof));
324
130k
        }
325
130k
        if (*eof) {
326
44.0k
            _state->update_num_finished_scan_range(1);
327
44.0k
            _has_prepared_split = false;
328
44.0k
            *eof = false;
329
44.0k
            continue;
330
44.0k
        }
331
86.1k
        _update_adaptive_batch_size(*block);
332
86.1k
        return Status::OK();
333
130k
    }
334
126k
}
335
336
85.2k
Status FileScannerV2::_prepare_next_split(bool* eos) {
337
85.2k
    bool has_next = _first_scan_range;
338
85.2k
    if (!_first_scan_range) {
339
52.1k
        RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
340
52.1k
    }
341
85.2k
    _first_scan_range = false;
342
85.2k
    if (!has_next || _should_stop) {
343
40.6k
        *eos = true;
344
40.6k
        return Status::OK();
345
40.6k
    }
346
44.6k
    DORIS_CHECK(_table_reader != nullptr);
347
44.6k
    _current_range_path = _current_range.path;
348
44.6k
    _init_adaptive_batch_size_state(get_range_format_type(*_params, _current_range));
349
44.6k
    RETURN_IF_ERROR(_prepare_table_reader_split(_current_range));
350
44.6k
    COUNTER_UPDATE(_file_counter, 1);
351
44.6k
    _has_prepared_split = true;
352
44.6k
    *eos = false;
353
44.6k
    return Status::OK();
354
44.6k
}
355
356
33.2k
Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) {
357
33.2k
    const auto format_type = get_range_format_type(*_params, range);
358
33.2k
    format::FileFormat file_format;
359
33.2k
    RETURN_IF_ERROR(_to_file_format(format_type, &file_format));
360
33.2k
    DORIS_CHECK(_table_reader != nullptr);
361
362
33.2k
    format::TableColumnPredicates table_column_predicates;
363
33.2k
    RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates));
364
33.2k
    VExprContextSPtrs table_conjuncts;
365
33.2k
    RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts));
366
33.2k
    RETURN_IF_ERROR(_table_reader->init({
367
33.2k
            .projected_columns = _projected_columns,
368
33.2k
            .column_predicates = std::move(table_column_predicates),
369
33.2k
            .conjuncts = std::move(table_conjuncts),
370
33.2k
            .format = file_format,
371
33.2k
            .scan_params = const_cast<TFileScanRangeParams*>(_params),
372
33.2k
            .io_ctx = _io_ctx,
373
33.2k
            .runtime_state = _state,
374
33.2k
            .scanner_profile = _local_state->scanner_profile(),
375
33.2k
            .file_slot_descs = &_file_slot_descs,
376
33.2k
            .push_down_agg_type = _local_state->get_push_down_agg_type(),
377
33.2k
            .condition_cache_digest = _local_state->get_condition_cache_digest(),
378
33.2k
    }));
379
33.2k
    return Status::OK();
380
33.2k
}
381
382
Status FileScannerV2::_create_table_reader_for_format(
383
33.2k
        const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const {
384
33.2k
    DORIS_CHECK(reader != nullptr);
385
33.2k
    const auto table_format = table_format_name(range);
386
33.2k
    if (table_format == "NotSet" || table_format == "tvf") {
387
3.68k
        *reader = std::make_unique<format::TableReader>();
388
29.5k
    } else if (table_format == "hive") {
389
11.2k
        *reader = format::hive::HiveReader::create_unique();
390
18.2k
    } else if (table_format == "iceberg") {
391
12.1k
        if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) {
392
1.21k
            *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>();
393
10.9k
        } else {
394
10.9k
            *reader = std::make_unique<format::iceberg::IcebergTableReader>();
395
10.9k
        }
396
12.1k
    } else if (table_format == "paimon") {
397
4.11k
        *reader = std::make_unique<format::paimon::PaimonHybridReader>();
398
4.11k
    } else if (table_format == "hudi") {
399
0
        *reader = std::make_unique<format::hudi::HudiHybridReader>();
400
1.98k
    } else if (table_format == "jdbc") {
401
1.24k
        *reader = std::make_unique<format::jdbc::JdbcJniReader>();
402
1.24k
    } else if (table_format == "max_compute") {
403
0
        const auto* mc_desc =
404
0
                static_cast<const MaxComputeTableDescriptor*>(_output_tuple_desc->table_desc());
405
0
        RETURN_IF_ERROR(mc_desc->init_status());
406
0
        *reader = std::make_unique<format::max_compute::MaxComputeJniReader>(mc_desc);
407
736
    } else if (table_format == "trino_connector") {
408
636
        *reader = std::make_unique<format::trino_connector::TrinoConnectorJniReader>();
409
636
    } else if (table_format == "remote_doris") {
410
98
        *reader = std::make_unique<format::remote_doris::RemoteDorisReader>();
411
98
    } else {
412
2
        return Status::NotSupported("FileScannerV2 does not support table format {}", table_format);
413
2
    }
414
33.2k
    return Status::OK();
415
33.2k
}
416
417
44.6k
Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) {
418
44.6k
    std::map<std::string, Field> partition_values;
419
44.6k
    RETURN_IF_ERROR(_generate_partition_values(range, &partition_values));
420
44.6k
    format::FileFormat current_split_format;
421
44.6k
    RETURN_IF_ERROR(_to_file_format(get_range_format_type(*_params, range), &current_split_format));
422
44.6k
    RETURN_IF_ERROR(_table_reader->prepare_split({
423
44.6k
            .partition_values = std::move(partition_values),
424
44.6k
            .cache = _kv_cache,
425
44.6k
            .current_range = range,
426
44.6k
            .current_split_format = current_split_format,
427
44.6k
            .global_rowid_context = _create_global_rowid_context(range),
428
44.6k
    }));
429
44.6k
    return Status::OK();
430
44.6k
}
431
432
2.12k
bool FileScannerV2::_should_enable_file_meta_cache() const {
433
2.12k
    return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
434
2.13k
           _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
435
2.12k
}
436
437
std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context(
438
44.6k
        const TFileRangeDesc& range) const {
439
44.6k
    if (!_need_global_rowid_column) {
440
42.5k
        return std::nullopt;
441
42.5k
    }
442
2.11k
    auto& id_file_map = _state->get_id_file_map();
443
2.11k
    DORIS_CHECK(id_file_map != nullptr);
444
2.11k
    const auto file_id = id_file_map->get_file_mapping_id(
445
2.11k
            std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(),
446
2.11k
                                          range, _should_enable_file_meta_cache()));
447
2.11k
    return format::GlobalRowIdContext {
448
2.11k
            .version = IdManager::ID_VERSION,
449
2.11k
            .backend_id = BackendOptions::get_backend_id(),
450
2.11k
            .file_id = file_id,
451
2.11k
    };
452
44.6k
}
453
454
Status FileScannerV2::_generate_partition_values(
455
44.6k
        const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const {
456
44.6k
    DORIS_CHECK(partition_values != nullptr);
457
44.6k
    partition_values->clear();
458
44.6k
    if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) {
459
34.1k
        return Status::OK();
460
34.1k
    }
461
10.4k
    DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size());
462
26.1k
    for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) {
463
15.6k
        const auto& key = range.columns_from_path_keys[idx];
464
15.6k
        const auto it = _partition_slot_descs.find(key);
465
15.6k
        if (it == _partition_slot_descs.end()) {
466
4.65k
            continue;
467
4.65k
        }
468
11.0k
        const auto& value = range.columns_from_path[idx];
469
11.0k
        const bool is_null = range.__isset.columns_from_path_is_null &&
470
11.0k
                             idx < range.columns_from_path_is_null.size() &&
471
11.0k
                             range.columns_from_path_is_null[idx];
472
11.0k
        Field field;
473
11.0k
        DORIS_CHECK(it->second.slot_desc != nullptr);
474
11.0k
        RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field));
475
11.0k
        partition_values->emplace(it->second.canonical_name, std::move(field));
476
11.0k
    }
477
10.4k
    return Status::OK();
478
10.4k
}
479
480
Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc,
481
                                             const std::string& value, bool is_null,
482
11.0k
                                             Field* field) const {
483
11.0k
    DORIS_CHECK(slot_desc != nullptr);
484
11.0k
    DORIS_CHECK(field != nullptr);
485
11.0k
    if (is_null) {
486
808
        *field = Field::create_field<TYPE_NULL>(Null());
487
808
        return Status::OK();
488
808
    }
489
10.2k
    const auto data_type = remove_nullable(slot_desc->get_data_type_ptr());
490
10.2k
    auto column = data_type->create_column();
491
10.2k
    auto serde = data_type->get_serde();
492
10.2k
    DataTypeSerDe::FormatOptions options;
493
10.2k
    options.converted_from_string = true;
494
10.2k
    StringRef ref(value.data(), value.size());
495
10.2k
    RETURN_IF_ERROR(serde->from_string(ref, *column, options));
496
10.2k
    DORIS_CHECK(column->size() == 1);
497
10.2k
    *field = (*column)[0];
498
10.2k
    return Status::OK();
499
10.2k
}
500
501
33.2k
Status FileScannerV2::_init_expr_ctxes() {
502
33.2k
    _slot_id_to_desc.clear();
503
33.2k
    _slot_id_to_global_index.clear();
504
33.2k
    _partition_slot_descs.clear();
505
33.2k
    _file_slot_descs.clear();
506
212k
    for (const auto* slot_desc : _output_tuple_desc->slots()) {
507
212k
        _slot_id_to_desc.emplace(slot_desc->id(), slot_desc);
508
212k
    }
509
33.2k
    DORIS_CHECK(_table_reader != nullptr);
510
33.2k
    RETURN_IF_ERROR(_build_projected_columns(*_table_reader));
511
33.2k
    return Status::OK();
512
33.2k
}
513
514
33.1k
Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) {
515
33.1k
    _projected_columns.clear();
516
33.1k
    _projected_columns.reserve(_params->required_slots.size());
517
33.1k
    _need_global_rowid_column = false;
518
33.1k
    format::ProjectedColumnBuildContext build_context {
519
33.1k
            .scan_params = _params,
520
33.1k
            .range = &_current_range,
521
33.1k
            .runtime_state = _state,
522
33.1k
    };
523
524
245k
    for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) {
525
212k
        const auto& slot_info = _params->required_slots[slot_idx];
526
212k
        const auto it = _slot_id_to_desc.find(slot_info.slot_id);
527
212k
        if (it == _slot_id_to_desc.end()) {
528
0
            return Status::InternalError("Unknown source slot descriptor, slot_id={}",
529
0
                                         slot_info.slot_id);
530
0
        }
531
212k
        auto column = _build_table_column(it->second);
532
212k
        if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
533
1.87k
            _need_global_rowid_column = true;
534
1.87k
        }
535
212k
        RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
536
212k
        build_context.schema_column.reset();
537
212k
        RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column));
538
        // Build nested children from access paths generated by the slot's access-path
539
        // expressions. A projected column can therefore contain only a subset of the schema
540
        // column's nested children.
541
212k
        RETURN_IF_ERROR(AccessPathParser::build_nested_children(
542
212k
                &column, it->second,
543
212k
                build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr));
544
212k
        if (is_partition_slot(slot_info, column.name)) {
545
10.5k
            column.is_partition_key = true;
546
10.5k
            _partition_slot_descs.emplace(
547
10.5k
                    column.name,
548
10.5k
                    PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
549
10.5k
            for (const auto& alias : column.name_mapping) {
550
0
                _partition_slot_descs.emplace(
551
0
                        alias,
552
0
                        PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
553
0
            }
554
202k
        } else if (is_data_file_slot(slot_info, column.name)) {
555
200k
            _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second));
556
200k
        }
557
212k
        const auto global_index = format::GlobalIndex(slot_idx);
558
212k
        _slot_id_to_global_index.emplace(slot_info.slot_id, global_index);
559
212k
        _projected_columns.push_back(std::move(column));
560
212k
    }
561
33.1k
    RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context));
562
33.1k
    return Status::OK();
563
33.1k
}
564
565
Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info,
566
212k
                                          VExprContextSPtr* ctx) const {
567
212k
    DORIS_CHECK(ctx != nullptr);
568
212k
    if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
569
208k
        return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx);
570
208k
    }
571
572
3.63k
    if (_params->__isset.default_value_of_src_slot) {
573
3.63k
        const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id);
574
3.63k
        if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) {
575
0
            return VExpr::create_expr_tree(it->second, *ctx);
576
0
        }
577
3.63k
    }
578
3.63k
    return Status::OK();
579
3.63k
}
580
581
212k
format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) {
582
212k
    DORIS_CHECK(slot_desc != nullptr);
583
212k
    format::ColumnDefinition column;
584
    // TODO(gabriel): why always BY_NAME here?
585
212k
    column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name());
586
212k
    column.name = slot_desc->col_name();
587
212k
    column.type = slot_desc->get_data_type_ptr();
588
212k
    return column;
589
212k
}
590
591
Status FileScannerV2::_build_table_column_predicates(
592
33.2k
        format::TableColumnPredicates* predicates) const {
593
33.2k
    DORIS_CHECK(predicates != nullptr);
594
33.2k
    predicates->clear();
595
33.2k
    const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates;
596
212k
    for (const auto& [slot_id, slot_predicate_list] : slot_predicates) {
597
212k
        const auto it = _slot_id_to_desc.find(slot_id);
598
212k
        if (it == _slot_id_to_desc.end()) {
599
0
            continue;
600
0
        }
601
212k
        const auto global_index_it = _slot_id_to_global_index.find(slot_id);
602
212k
        if (global_index_it == _slot_id_to_global_index.end()) {
603
0
            continue;
604
0
        }
605
212k
        (*predicates)[global_index_it->second] = slot_predicate_list;
606
212k
    }
607
33.2k
    return Status::OK();
608
33.2k
}
609
610
33.2k
Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const {
611
33.2k
    DORIS_CHECK(conjuncts != nullptr);
612
33.2k
    conjuncts->clear();
613
33.2k
    conjuncts->reserve(_conjuncts.size());
614
33.2k
    for (const auto& conjunct : _conjuncts) {
615
20.8k
        VExprSPtr root;
616
20.8k
        RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root));
617
20.8k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index));
618
20.8k
        conjuncts->push_back(VExprContext::create_shared(std::move(root)));
619
20.8k
    }
620
33.2k
    return Status::OK();
621
33.2k
}
622
623
0
TFileFormatType::type FileScannerV2::_get_current_format_type() const {
624
0
    return get_range_format_type(*_params, _current_range);
625
0
}
626
627
Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
628
77.8k
                                      format::FileFormat* file_format) {
629
77.8k
    DORIS_CHECK(file_format != nullptr);
630
77.8k
    switch (format_type) {
631
57.7k
    case TFileFormatType::FORMAT_PARQUET:
632
57.7k
        *file_format = format::FileFormat::PARQUET;
633
57.7k
        return Status::OK();
634
9.93k
    case TFileFormatType::FORMAT_JNI:
635
9.93k
        *file_format = format::FileFormat::JNI;
636
9.93k
        return Status::OK();
637
1.78k
    case TFileFormatType::FORMAT_CSV_PLAIN:
638
1.78k
    case TFileFormatType::FORMAT_CSV_GZ:
639
1.79k
    case TFileFormatType::FORMAT_CSV_BZ2:
640
1.79k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
641
1.79k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
642
1.79k
    case TFileFormatType::FORMAT_CSV_LZOP:
643
1.79k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
644
1.79k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
645
1.79k
    case TFileFormatType::FORMAT_PROTO:
646
1.79k
        *file_format = format::FileFormat::CSV;
647
1.79k
        return Status::OK();
648
7.07k
    case TFileFormatType::FORMAT_TEXT:
649
7.07k
        *file_format = format::FileFormat::TEXT;
650
7.07k
        return Status::OK();
651
1.11k
    case TFileFormatType::FORMAT_JSON:
652
1.11k
        *file_format = format::FileFormat::JSON;
653
1.11k
        return Status::OK();
654
5
    case TFileFormatType::FORMAT_NATIVE:
655
5
        *file_format = format::FileFormat::NATIVE;
656
5
        return Status::OK();
657
197
    case TFileFormatType::FORMAT_ARROW:
658
197
        *file_format = format::FileFormat::ARROW;
659
197
        return Status::OK();
660
1
    default:
661
1
        return Status::NotSupported("FileScannerV2 does not support file format {}",
662
1
                                    to_string(format_type));
663
77.8k
    }
664
77.8k
}
665
666
41.1k
Status FileScannerV2::_init_io_ctx() {
667
41.1k
    _io_ctx = std::make_shared<io::IOContext>();
668
41.1k
    _io_ctx->query_id = &_state->query_id();
669
41.1k
    return Status::OK();
670
41.1k
}
671
672
44.6k
void FileScannerV2::_reset_adaptive_batch_size_state() {
673
44.6k
    _block_size_predictor.reset();
674
44.6k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
675
44.6k
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, int64_t(0));
676
44.6k
}
677
678
44.6k
void FileScannerV2::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
679
44.6k
    _reset_adaptive_batch_size_state();
680
44.6k
    if (!_should_enable_adaptive_batch_size(format_type)) {
681
100
        return;
682
100
    }
683
684
    // V2 native file readers do not have reliable row-width hints before the first batch. Start
685
    // every split with a small probe, then learn bytes-per-row from the materialized table block
686
    // and keep later batches close to RuntimeState::preferred_block_size_bytes().
687
44.5k
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
688
44.5k
            _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
689
44.5k
            _state->batch_size());
690
44.5k
}
691
692
44.6k
bool FileScannerV2::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
693
44.6k
    if (!config::enable_adaptive_batch_size) {
694
0
        return false;
695
0
    }
696
44.6k
    switch (format_type) {
697
33.2k
    case TFileFormatType::FORMAT_PARQUET:
698
33.2k
    case TFileFormatType::FORMAT_ORC:
699
34.1k
    case TFileFormatType::FORMAT_CSV_PLAIN:
700
34.1k
    case TFileFormatType::FORMAT_CSV_GZ:
701
34.1k
    case TFileFormatType::FORMAT_CSV_BZ2:
702
34.1k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
703
34.1k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
704
34.1k
    case TFileFormatType::FORMAT_CSV_LZOP:
705
34.1k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
706
34.1k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
707
34.1k
    case TFileFormatType::FORMAT_PROTO:
708
38.8k
    case TFileFormatType::FORMAT_TEXT:
709
39.4k
    case TFileFormatType::FORMAT_JSON:
710
44.5k
    case TFileFormatType::FORMAT_JNI:
711
44.5k
        return true;
712
100
    default:
713
100
        return false;
714
44.6k
    }
715
44.6k
}
716
717
216k
bool FileScannerV2::_should_run_adaptive_batch_size() const {
718
    // COUNT pushdown emits synthetic rows from file metadata and does not materialize file columns,
719
    // so there is no useful row-width sample to learn from.
720
216k
    return _block_size_predictor != nullptr &&
721
216k
           _local_state->get_push_down_agg_type() != TPushAggOp::type::COUNT;
722
216k
}
723
724
122k
size_t FileScannerV2::_predict_reader_batch_rows() {
725
122k
    DORIS_CHECK(_block_size_predictor != nullptr);
726
    // Before history exists this returns the probe row count; after update(), it returns roughly
727
    // preferred_block_size_bytes / EWMA(bytes_per_row), capped by RuntimeState::batch_size().
728
122k
    const size_t predicted_rows = _block_size_predictor->predict_next_rows();
729
122k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
730
122k
    return predicted_rows;
731
122k
}
732
733
86.1k
void FileScannerV2::_update_adaptive_batch_size(const Block& block) {
734
86.1k
    if (!_should_run_adaptive_batch_size()) {
735
5.33k
        return;
736
5.33k
    }
737
80.8k
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, static_cast<int64_t>(block.bytes()));
738
80.8k
    if (block.rows() == 0) {
739
0
        return;
740
0
    }
741
    // The sample is taken after TableReader has finalized file-local columns to table columns.
742
    // This matches the memory shape seen by upstream operators and catches very wide nested
743
    // columns, such as map/string payloads, after the first probe batch.
744
80.8k
    if (!_block_size_predictor->has_history()) {
745
36.4k
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
746
36.4k
    }
747
80.8k
    _block_size_predictor->update(block);
748
80.8k
}
749
750
41.2k
Status FileScannerV2::close(RuntimeState* state) {
751
41.2k
    if (!_try_close()) {
752
0
        return Status::OK();
753
0
    }
754
41.2k
    if (_table_reader != nullptr) {
755
33.2k
        RETURN_IF_ERROR(_table_reader->close());
756
33.2k
        _report_condition_cache_profile();
757
33.2k
        _table_reader.reset();
758
33.2k
    }
759
41.2k
    return Scanner::close(state);
760
41.2k
}
761
762
41.2k
void FileScannerV2::try_stop() {
763
41.2k
    Scanner::try_stop();
764
41.2k
    if (_io_ctx) {
765
41.2k
        _io_ctx->should_stop = true;
766
41.2k
    }
767
41.2k
}
768
769
85.1k
void FileScannerV2::update_realtime_counters() {
770
85.1k
    if (_file_reader_stats == nullptr) {
771
0
        return;
772
0
    }
773
85.1k
    const int64_t bytes_read = _file_reader_stats->read_bytes;
774
85.1k
    COUNTER_SET(_file_read_bytes_counter, bytes_read);
775
85.1k
    COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
776
85.1k
    COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
777
85.1k
}
778
779
41.2k
void FileScannerV2::_collect_profile_before_close() {
780
41.2k
    _report_file_reader_predicate_filtered_rows();
781
41.2k
    Scanner::_collect_profile_before_close();
782
41.2k
    if (_file_reader_stats != nullptr) {
783
41.2k
        COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes));
784
41.2k
        COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
785
41.2k
        COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
786
41.2k
    }
787
    // Query profiles can be collected before Scanner::close() runs. Publish condition-cache
788
    // counters here as well, using deltas so this method and close() cannot double count.
789
41.2k
    _report_condition_cache_profile();
790
41.2k
}
791
792
41.2k
bool FileScannerV2::_should_update_load_counters() const {
793
41.2k
    if (_is_load) {
794
0
        return true;
795
0
    }
796
    // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a
797
    // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by
798
    // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM
799
    // is only reachable from such load entries, never from normal queries, so use it to
800
    // identify these scanners.
801
41.2k
    return (_params != nullptr && _params->__isset.file_type &&
802
41.2k
            _params->file_type == TFileType::FILE_STREAM) ||
803
41.2k
           (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM);
804
41.2k
}
805
806
41.2k
void FileScannerV2::_report_file_reader_predicate_filtered_rows() {
807
41.2k
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0;
808
41.2k
    const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows;
809
41.2k
    if (filtered_delta > 0) {
810
        // File readers can evaluate localized conjuncts before a block reaches Scanner. Count
811
        // those rows as scanner-level unselected rows so load statistics stay identical no matter
812
        // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block().
813
2.90k
        _counter.num_rows_unselected += filtered_delta;
814
2.90k
        _reported_predicate_filtered_rows = filtered_rows;
815
2.90k
    }
816
41.2k
}
817
818
74.4k
void FileScannerV2::_report_condition_cache_profile() {
819
74.4k
    auto* local_state = static_cast<FileScanLocalState*>(_local_state);
820
74.4k
    const int64_t hit_count =
821
74.4k
            _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0;
822
74.4k
    const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count;
823
74.4k
    if (hit_delta > 0) {
824
798
        COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta);
825
798
        _reported_condition_cache_hit_count = hit_count;
826
798
    }
827
74.4k
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0;
828
74.4k
    const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows;
829
74.4k
    if (filtered_delta > 0) {
830
190
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta);
831
190
        _reported_condition_cache_filtered_rows = filtered_rows;
832
190
    }
833
74.4k
}
834
835
} // namespace doris