Coverage Report

Created: 2026-07-02 15:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner_v2.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <algorithm>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <utility>
29
30
#include "common/cast_set.h"
31
#include "common/config.h"
32
#include "common/consts.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column.h"
37
#include "core/data_type/data_type.h"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type_serde/data_type_serde.h"
40
#include "core/string_ref.h"
41
#include "exec/common/util.hpp"
42
#include "exec/operator/scan_operator.h"
43
#include "exec/scan/access_path_parser.h"
44
#include "exprs/runtime_filter_expr.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "exprs/vslot_ref.h"
48
#include "format/format_common.h"
49
#include "format_v2/column_mapper.h"
50
#include "format_v2/jni/iceberg_sys_table_reader.h"
51
#include "format_v2/jni/jdbc_reader.h"
52
#include "format_v2/jni/max_compute_jni_reader.h"
53
#include "format_v2/jni/trino_connector_jni_reader.h"
54
#include "format_v2/table/hive_reader.h"
55
#include "format_v2/table/hudi_reader.h"
56
#include "format_v2/table/iceberg_reader.h"
57
#include "format_v2/table/paimon_reader.h"
58
#include "format_v2/table/remote_doris_reader.h"
59
#include "format_v2/table_reader.h"
60
#include "io/fs/file_meta_cache.h"
61
#include "io/io_common.h"
62
#include "runtime/descriptors.h"
63
#include "runtime/exec_env.h"
64
#include "runtime/runtime_state.h"
65
#include "service/backend_options.h"
66
#include "storage/id_manager.h"
67
68
namespace doris {
69
namespace {
70
71
76.2k
std::string table_format_name(const TFileRangeDesc& range) {
72
76.2k
    return range.__isset.table_format_params ? range.table_format_params.table_format_type
73
76.2k
                                             : "NotSet";
74
76.2k
}
75
76
TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params,
77
185k
                                            const TFileRangeDesc& range) {
78
185k
    return range.__isset.format_type ? range.format_type : params.format_type;
79
185k
}
80
81
38.5k
bool is_supported_table_format(const TFileRangeDesc& range) {
82
38.5k
    const auto table_format = table_format_name(range);
83
38.5k
    if (table_format == "hudi" && range.__isset.table_format_params &&
84
38.5k
        range.table_format_params.__isset.hudi_params &&
85
38.5k
        range.table_format_params.hudi_params.__isset.delta_logs &&
86
38.5k
        !range.table_format_params.hudi_params.delta_logs.empty()) {
87
        // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path.
88
        // FileScannerV2 currently supports native Parquet data files only.
89
1
        return false;
90
1
    }
91
38.5k
    return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" ||
92
38.5k
           table_format == "iceberg" || table_format == "paimon" || table_format == "hudi";
93
38.5k
}
94
95
101
bool is_supported_arrow_table_format(const TFileRangeDesc& range) {
96
101
    return table_format_name(range) == "remote_doris";
97
101
}
98
99
5.14k
bool is_supported_jni_table_format(const TFileRangeDesc& range) {
100
5.14k
    const auto table_format = table_format_name(range);
101
5.14k
    if (table_format == "paimon") {
102
1.85k
        return range.__isset.table_format_params &&
103
1.85k
               range.table_format_params.__isset.paimon_params &&
104
1.86k
               range.table_format_params.paimon_params.__isset.reader_type &&
105
1.85k
               range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI;
106
1.85k
    }
107
3.28k
    return table_format == "jdbc" || table_format == "iceberg" || table_format == "hudi" ||
108
3.28k
           table_format == "max_compute" || table_format == "trino_connector";
109
5.14k
}
110
111
15.6k
bool is_csv_format(TFileFormatType::type format_type) {
112
15.6k
    switch (format_type) {
113
951
    case TFileFormatType::FORMAT_CSV_PLAIN:
114
952
    case TFileFormatType::FORMAT_CSV_GZ:
115
953
    case TFileFormatType::FORMAT_CSV_BZ2:
116
954
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
117
955
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
118
956
    case TFileFormatType::FORMAT_CSV_LZOP:
119
957
    case TFileFormatType::FORMAT_CSV_DEFLATE:
120
958
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
121
959
    case TFileFormatType::FORMAT_PROTO:
122
959
        return true;
123
14.7k
    default:
124
14.7k
        return false;
125
15.6k
    }
126
15.6k
}
127
128
14.6k
bool is_text_format(TFileFormatType::type format_type) {
129
14.6k
    return format_type == TFileFormatType::FORMAT_TEXT;
130
14.6k
}
131
132
10.0k
bool is_json_format(TFileFormatType::type format_type) {
133
10.0k
    return format_type == TFileFormatType::FORMAT_JSON;
134
10.0k
}
135
136
9.51k
bool is_native_format(TFileFormatType::type format_type) {
137
9.51k
    return format_type == TFileFormatType::FORMAT_NATIVE;
138
9.51k
}
139
140
208k
bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
141
208k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
142
208k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
143
1.96k
        return false;
144
1.96k
    }
145
206k
    return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY
146
206k
                                      : !slot_info.is_file_slot;
147
208k
}
148
149
198k
bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
150
198k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
151
198k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
152
1.96k
        return false;
153
1.96k
    }
154
    // CSV and other non-self-describing formats need FE slot descriptors for only the columns that
155
    // are physically read from the file. Partition/default/virtual columns stay in TableReader's
156
    // mapping layer and are materialized after the file-local block is read. New FE provides an
157
    // explicit category; old FE falls back to `is_file_slot`.
158
196k
    if (slot_info.__isset.category) {
159
196k
        return slot_info.category == TColumnCategory::REGULAR ||
160
196k
               slot_info.category == TColumnCategory::GENERATED;
161
196k
    }
162
4
    return slot_info.is_file_slot;
163
196k
}
164
165
Status rewrite_slot_refs_to_global_index(
166
        VExprSPtr* expr,
167
70.0k
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
168
70.0k
    DORIS_CHECK(expr != nullptr);
169
70.0k
    if (*expr == nullptr) {
170
0
        return Status::OK();
171
0
    }
172
70.0k
    if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr->get());
173
70.0k
        runtime_filter != nullptr) {
174
6.61k
        auto impl = runtime_filter->get_impl();
175
6.61k
        DORIS_CHECK(impl != nullptr);
176
6.61k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&impl, slot_id_to_global_index));
177
6.61k
        runtime_filter->set_impl(std::move(impl));
178
6.61k
        return Status::OK();
179
6.61k
    }
180
63.4k
    if ((*expr)->is_slot_ref()) {
181
21.3k
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get());
182
21.3k
        const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id());
183
21.3k
        if (global_index_it == slot_id_to_global_index.end()) {
184
1
            DORIS_CHECK(slot_ref->slot_id() >= 0);
185
1
            const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id()));
186
1
            *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
187
1
                                            cast_set<int>(global_index.value()), -1,
188
1
                                            slot_ref->data_type(), slot_ref->column_name());
189
1
            RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
190
1
            return Status::OK();
191
1
        }
192
21.3k
        const auto global_index = global_index_it->second;
193
21.3k
        *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
194
21.3k
                                        cast_set<int>(global_index.value()), -1,
195
21.3k
                                        slot_ref->data_type(), slot_ref->column_name());
196
21.3k
        RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
197
21.3k
        return Status::OK();
198
21.3k
    }
199
42.0k
    auto children = (*expr)->children();
200
42.7k
    for (auto& child : children) {
201
42.7k
        if (child == nullptr) {
202
0
            continue;
203
0
        }
204
42.7k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index));
205
42.7k
    }
206
42.0k
    (*expr)->set_children(std::move(children));
207
42.0k
    return Status::OK();
208
42.0k
}
209
210
} // namespace
211
212
#ifdef BE_TEST
213
Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type,
214
                                          format::FileFormat* file_format) {
215
    return _to_file_format(format_type, file_format);
216
}
217
218
bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info,
219
                                           const std::string& column_name) {
220
    return is_partition_slot(slot_info, column_name);
221
}
222
223
bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info,
224
                                           const std::string& column_name) {
225
    return is_data_file_slot(slot_info, column_name);
226
}
227
228
Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index(
229
        VExprSPtr* expr,
230
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
231
    return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index);
232
}
233
#endif
234
235
53.2k
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
236
53.2k
    const auto format_type = get_range_format_type(params, range);
237
53.2k
    if (format_type == TFileFormatType::FORMAT_PARQUET) {
238
32.4k
        return is_supported_table_format(range);
239
32.4k
    } else if (format_type == TFileFormatType::FORMAT_ARROW) {
240
101
        return is_supported_arrow_table_format(range);
241
20.6k
    } else if (format_type == TFileFormatType::FORMAT_JNI) {
242
5.14k
        return is_supported_jni_table_format(range);
243
15.5k
    } else if (is_csv_format(format_type) || is_text_format(format_type) ||
244
15.5k
               is_json_format(format_type) || is_native_format(format_type)) {
245
6.11k
        return is_supported_table_format(range);
246
9.42k
    } else {
247
9.42k
        LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2";
248
9.42k
        return false;
249
9.42k
    }
250
53.2k
}
251
252
FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
253
                             std::shared_ptr<SplitSourceConnector> split_source,
254
                             RuntimeProfile* profile, ShardedKVCache* kv_cache,
255
                             const std::unordered_map<std::string, int>* colname_to_slot_id)
256
40.3k
        : Scanner(state, local_state, limit, profile),
257
40.3k
          _split_source(std::move(split_source)),
258
40.3k
          _kv_cache(kv_cache) {
259
40.3k
    (void)colname_to_slot_id;
260
40.3k
    if (state->get_query_ctx() != nullptr &&
261
40.3k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
262
40.3k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
263
40.3k
    } else {
264
0
        _params = _split_source->get_params();
265
0
    }
266
40.3k
}
267
268
40.2k
Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
269
40.2k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
270
40.2k
    _get_block_timer =
271
40.2k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1);
272
40.2k
    _file_counter =
273
40.2k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
274
40.2k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
275
40.2k
                                                      "FileReadBytes", TUnit::BYTES, 1);
276
40.2k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
277
40.2k
                                                      "FileReadCalls", TUnit::UNIT, 1);
278
40.2k
    _file_read_time_counter =
279
40.2k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1);
280
40.2k
    _adaptive_batch_predicted_rows_counter = ADD_COUNTER_WITH_LEVEL(
281
40.2k
            _local_state->scanner_profile(), "AdaptiveBatchPredictedRows", TUnit::UNIT, 1);
282
40.2k
    _adaptive_batch_actual_bytes_counter = ADD_COUNTER_WITH_LEVEL(
283
40.2k
            _local_state->scanner_profile(), "AdaptiveBatchActualBytes", TUnit::BYTES, 1);
284
40.2k
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
285
40.2k
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
286
40.2k
    _file_cache_statistics = std::make_unique<io::FileCacheStatistics>();
287
40.2k
    _file_reader_stats = std::make_unique<io::FileReaderStats>();
288
40.2k
    RETURN_IF_ERROR(_init_io_ctx());
289
40.2k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
290
40.2k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
291
40.2k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
292
40.2k
    return Status::OK();
293
40.2k
}
294
295
40.3k
Status FileScannerV2::_open_impl(RuntimeState* state) {
296
40.3k
    RETURN_IF_CANCELLED(state);
297
40.3k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
298
40.3k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
299
40.3k
    if (_first_scan_range) {
300
32.4k
        RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader));
301
32.4k
        DORIS_CHECK(_table_reader != nullptr);
302
32.4k
        RETURN_IF_ERROR(_init_expr_ctxes());
303
32.4k
        RETURN_IF_ERROR(_init_table_reader(_current_range));
304
32.4k
    }
305
40.3k
    return Status::OK();
306
40.3k
}
307
308
125k
Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
309
168k
    while (true) {
310
168k
        RETURN_IF_CANCELLED(state);
311
168k
        if (!_has_prepared_split) {
312
83.7k
            RETURN_IF_ERROR(_prepare_next_split(eof));
313
83.7k
            if (*eof) {
314
39.9k
                return Status::OK();
315
39.9k
            }
316
83.7k
        }
317
318
128k
        {
319
128k
            SCOPED_TIMER(_get_block_timer);
320
128k
            if (_should_run_adaptive_batch_size()) {
321
121k
                _table_reader->set_batch_size(_predict_reader_batch_rows());
322
121k
            }
323
128k
            RETURN_IF_ERROR(_table_reader->get_block(block, eof));
324
128k
        }
325
128k
        if (*eof) {
326
43.3k
            _state->update_num_finished_scan_range(1);
327
43.3k
            _has_prepared_split = false;
328
43.3k
            *eof = false;
329
43.3k
            continue;
330
43.3k
        }
331
85.2k
        _update_adaptive_batch_size(*block);
332
85.2k
        return Status::OK();
333
128k
    }
334
125k
}
335
336
83.7k
Status FileScannerV2::_prepare_next_split(bool* eos) {
337
83.7k
    bool has_next = _first_scan_range;
338
83.7k
    if (!_first_scan_range) {
339
51.2k
        RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
340
51.2k
    }
341
83.7k
    _first_scan_range = false;
342
83.7k
    if (!has_next || _should_stop) {
343
39.9k
        *eos = true;
344
39.9k
        return Status::OK();
345
39.9k
    }
346
43.8k
    DORIS_CHECK(_table_reader != nullptr);
347
43.8k
    _current_range_path = _current_range.path;
348
43.8k
    _init_adaptive_batch_size_state(get_range_format_type(*_params, _current_range));
349
43.8k
    RETURN_IF_ERROR(_prepare_table_reader_split(_current_range));
350
43.8k
    COUNTER_UPDATE(_file_counter, 1);
351
43.8k
    _has_prepared_split = true;
352
43.8k
    *eos = false;
353
43.8k
    return Status::OK();
354
43.8k
}
355
356
32.4k
Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) {
357
32.4k
    const auto format_type = get_range_format_type(*_params, range);
358
32.4k
    format::FileFormat file_format;
359
32.4k
    RETURN_IF_ERROR(_to_file_format(format_type, &file_format));
360
32.4k
    DORIS_CHECK(_table_reader != nullptr);
361
362
32.4k
    format::TableColumnPredicates table_column_predicates;
363
32.4k
    RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates));
364
32.4k
    VExprContextSPtrs table_conjuncts;
365
32.4k
    RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts));
366
32.4k
    RETURN_IF_ERROR(_table_reader->init({
367
32.4k
            .projected_columns = _projected_columns,
368
32.4k
            .column_predicates = std::move(table_column_predicates),
369
32.4k
            .conjuncts = std::move(table_conjuncts),
370
32.4k
            .format = file_format,
371
32.4k
            .scan_params = const_cast<TFileScanRangeParams*>(_params),
372
32.4k
            .io_ctx = _io_ctx,
373
32.4k
            .runtime_state = _state,
374
32.4k
            .scanner_profile = _local_state->scanner_profile(),
375
32.4k
            .file_slot_descs = &_file_slot_descs,
376
32.4k
            .push_down_agg_type = _local_state->get_push_down_agg_type(),
377
32.4k
            .condition_cache_digest = _local_state->get_condition_cache_digest(),
378
32.4k
    }));
379
32.4k
    return Status::OK();
380
32.4k
}
381
382
Status FileScannerV2::_create_table_reader_for_format(
383
32.4k
        const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const {
384
32.4k
    DORIS_CHECK(reader != nullptr);
385
32.4k
    const auto table_format = table_format_name(range);
386
32.4k
    if (table_format == "NotSet" || table_format == "tvf") {
387
3.02k
        *reader = std::make_unique<format::TableReader>();
388
29.4k
    } else if (table_format == "hive") {
389
11.3k
        *reader = format::hive::HiveReader::create_unique();
390
18.1k
    } else if (table_format == "iceberg") {
391
12.0k
        if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) {
392
1.21k
            *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>();
393
10.8k
        } else {
394
10.8k
            *reader = std::make_unique<format::iceberg::IcebergTableReader>();
395
10.8k
        }
396
12.0k
    } else if (table_format == "paimon") {
397
4.08k
        *reader = std::make_unique<format::paimon::PaimonHybridReader>();
398
4.08k
    } else if (table_format == "hudi") {
399
0
        *reader = std::make_unique<format::hudi::HudiHybridReader>();
400
1.98k
    } else if (table_format == "jdbc") {
401
1.24k
        *reader = std::make_unique<format::jdbc::JdbcJniReader>();
402
1.24k
    } else if (table_format == "max_compute") {
403
0
        const auto* mc_desc =
404
0
                static_cast<const MaxComputeTableDescriptor*>(_output_tuple_desc->table_desc());
405
0
        RETURN_IF_ERROR(mc_desc->init_status());
406
0
        *reader = std::make_unique<format::max_compute::MaxComputeJniReader>(mc_desc);
407
734
    } else if (table_format == "trino_connector") {
408
636
        *reader = std::make_unique<format::trino_connector::TrinoConnectorJniReader>();
409
636
    } else if (table_format == "remote_doris") {
410
98
        *reader = std::make_unique<format::remote_doris::RemoteDorisReader>();
411
98
    } else {
412
0
        return Status::NotSupported("FileScannerV2 does not support table format {}", table_format);
413
0
    }
414
32.4k
    return Status::OK();
415
32.4k
}
416
417
43.8k
Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) {
418
43.8k
    std::map<std::string, Field> partition_values;
419
43.8k
    RETURN_IF_ERROR(_generate_partition_values(range, &partition_values));
420
43.8k
    format::FileFormat current_split_format;
421
43.8k
    RETURN_IF_ERROR(_to_file_format(get_range_format_type(*_params, range), &current_split_format));
422
43.8k
    RETURN_IF_ERROR(_table_reader->prepare_split({
423
43.8k
            .partition_values = std::move(partition_values),
424
43.8k
            .cache = _kv_cache,
425
43.8k
            .current_range = range,
426
43.8k
            .current_split_format = current_split_format,
427
43.8k
            .global_rowid_context = _create_global_rowid_context(range),
428
43.8k
    }));
429
43.8k
    return Status::OK();
430
43.8k
}
431
432
2.13k
bool FileScannerV2::_should_enable_file_meta_cache() const {
433
2.13k
    return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
434
2.13k
           _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
435
2.13k
}
436
437
std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context(
438
43.8k
        const TFileRangeDesc& range) const {
439
43.8k
    if (!_need_global_rowid_column) {
440
41.6k
        return std::nullopt;
441
41.6k
    }
442
2.12k
    auto& id_file_map = _state->get_id_file_map();
443
2.12k
    DORIS_CHECK(id_file_map != nullptr);
444
2.12k
    const auto file_id = id_file_map->get_file_mapping_id(
445
2.12k
            std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(),
446
2.12k
                                          range, _should_enable_file_meta_cache()));
447
2.12k
    return format::GlobalRowIdContext {
448
2.12k
            .version = IdManager::ID_VERSION,
449
2.12k
            .backend_id = BackendOptions::get_backend_id(),
450
2.12k
            .file_id = file_id,
451
2.12k
    };
452
43.8k
}
453
454
Status FileScannerV2::_generate_partition_values(
455
43.8k
        const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const {
456
43.8k
    DORIS_CHECK(partition_values != nullptr);
457
43.8k
    partition_values->clear();
458
43.8k
    if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) {
459
33.3k
        return Status::OK();
460
33.3k
    }
461
10.4k
    DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size());
462
26.1k
    for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) {
463
15.6k
        const auto& key = range.columns_from_path_keys[idx];
464
15.6k
        const auto it = _partition_slot_descs.find(key);
465
15.6k
        if (it == _partition_slot_descs.end()) {
466
4.65k
            continue;
467
4.65k
        }
468
11.0k
        const auto& value = range.columns_from_path[idx];
469
11.0k
        const bool is_null = range.__isset.columns_from_path_is_null &&
470
11.0k
                             idx < range.columns_from_path_is_null.size() &&
471
11.0k
                             range.columns_from_path_is_null[idx];
472
11.0k
        Field field;
473
11.0k
        DORIS_CHECK(it->second.slot_desc != nullptr);
474
11.0k
        RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field));
475
11.0k
        partition_values->emplace(it->second.canonical_name, std::move(field));
476
11.0k
    }
477
10.4k
    return Status::OK();
478
10.4k
}
479
480
Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc,
481
                                             const std::string& value, bool is_null,
482
11.0k
                                             Field* field) const {
483
11.0k
    DORIS_CHECK(slot_desc != nullptr);
484
11.0k
    DORIS_CHECK(field != nullptr);
485
11.0k
    if (is_null) {
486
808
        *field = Field::create_field<TYPE_NULL>(Null());
487
808
        return Status::OK();
488
808
    }
489
10.2k
    const auto data_type = remove_nullable(slot_desc->get_data_type_ptr());
490
10.2k
    auto column = data_type->create_column();
491
10.2k
    auto serde = data_type->get_serde();
492
10.2k
    DataTypeSerDe::FormatOptions options;
493
10.2k
    options.converted_from_string = true;
494
10.2k
    StringRef ref(value.data(), value.size());
495
10.2k
    RETURN_IF_ERROR(serde->from_string(ref, *column, options));
496
10.2k
    DORIS_CHECK(column->size() == 1);
497
10.2k
    *field = (*column)[0];
498
10.2k
    return Status::OK();
499
10.2k
}
500
501
32.4k
Status FileScannerV2::_init_expr_ctxes() {
502
32.4k
    _slot_id_to_desc.clear();
503
32.4k
    _slot_id_to_global_index.clear();
504
32.4k
    _partition_slot_descs.clear();
505
32.4k
    _file_slot_descs.clear();
506
208k
    for (const auto* slot_desc : _output_tuple_desc->slots()) {
507
208k
        _slot_id_to_desc.emplace(slot_desc->id(), slot_desc);
508
208k
    }
509
32.4k
    DORIS_CHECK(_table_reader != nullptr);
510
32.4k
    RETURN_IF_ERROR(_build_projected_columns(*_table_reader));
511
32.4k
    return Status::OK();
512
32.4k
}
513
514
32.4k
Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) {
515
32.4k
    _projected_columns.clear();
516
32.4k
    _projected_columns.reserve(_params->required_slots.size());
517
32.4k
    _need_global_rowid_column = false;
518
32.4k
    format::ProjectedColumnBuildContext build_context {
519
32.4k
            .scan_params = _params,
520
32.4k
            .range = &_current_range,
521
32.4k
            .runtime_state = _state,
522
32.4k
    };
523
524
241k
    for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) {
525
208k
        const auto& slot_info = _params->required_slots[slot_idx];
526
208k
        const auto it = _slot_id_to_desc.find(slot_info.slot_id);
527
208k
        if (it == _slot_id_to_desc.end()) {
528
0
            return Status::InternalError("Unknown source slot descriptor, slot_id={}",
529
0
                                         slot_info.slot_id);
530
0
        }
531
208k
        auto column = _build_table_column(it->second);
532
208k
        if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
533
1.87k
            _need_global_rowid_column = true;
534
1.87k
        }
535
208k
        RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
536
208k
        build_context.schema_column.reset();
537
208k
        RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column));
538
        // Build nested children from access paths generated by the slot's access-path
539
        // expressions. A projected column can therefore contain only a subset of the schema
540
        // column's nested children.
541
208k
        RETURN_IF_ERROR(AccessPathParser::build_nested_children(
542
208k
                &column, it->second,
543
208k
                build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr));
544
208k
        if (is_partition_slot(slot_info, column.name)) {
545
10.5k
            column.is_partition_key = true;
546
10.5k
            _partition_slot_descs.emplace(
547
10.5k
                    column.name,
548
10.5k
                    PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
549
10.5k
            for (const auto& alias : column.name_mapping) {
550
0
                _partition_slot_descs.emplace(
551
0
                        alias,
552
0
                        PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
553
0
            }
554
198k
        } else if (is_data_file_slot(slot_info, column.name)) {
555
196k
            _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second));
556
196k
        }
557
208k
        const auto global_index = format::GlobalIndex(slot_idx);
558
208k
        _slot_id_to_global_index.emplace(slot_info.slot_id, global_index);
559
208k
        _projected_columns.push_back(std::move(column));
560
208k
    }
561
32.4k
    RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context));
562
32.4k
    return Status::OK();
563
32.4k
}
564
565
Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info,
566
208k
                                          VExprContextSPtr* ctx) const {
567
208k
    DORIS_CHECK(ctx != nullptr);
568
208k
    if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
569
205k
        return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx);
570
205k
    }
571
572
3.64k
    if (_params->__isset.default_value_of_src_slot) {
573
3.64k
        const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id);
574
3.64k
        if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) {
575
0
            return VExpr::create_expr_tree(it->second, *ctx);
576
0
        }
577
3.64k
    }
578
3.62k
    return Status::OK();
579
3.62k
}
580
581
208k
format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) {
582
208k
    DORIS_CHECK(slot_desc != nullptr);
583
208k
    format::ColumnDefinition column;
584
    // TODO(gabriel): why always BY_NAME here?
585
208k
    column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name());
586
208k
    column.name = slot_desc->col_name();
587
208k
    column.type = slot_desc->get_data_type_ptr();
588
208k
    return column;
589
208k
}
590
591
Status FileScannerV2::_build_table_column_predicates(
592
32.4k
        format::TableColumnPredicates* predicates) const {
593
32.4k
    DORIS_CHECK(predicates != nullptr);
594
32.4k
    predicates->clear();
595
32.4k
    const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates;
596
208k
    for (const auto& [slot_id, slot_predicate_list] : slot_predicates) {
597
208k
        const auto it = _slot_id_to_desc.find(slot_id);
598
208k
        if (it == _slot_id_to_desc.end()) {
599
0
            continue;
600
0
        }
601
208k
        const auto global_index_it = _slot_id_to_global_index.find(slot_id);
602
208k
        if (global_index_it == _slot_id_to_global_index.end()) {
603
0
            continue;
604
0
        }
605
208k
        (*predicates)[global_index_it->second] = slot_predicate_list;
606
208k
    }
607
32.4k
    return Status::OK();
608
32.4k
}
609
610
32.4k
Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const {
611
32.4k
    DORIS_CHECK(conjuncts != nullptr);
612
32.4k
    conjuncts->clear();
613
32.4k
    conjuncts->reserve(_conjuncts.size());
614
32.4k
    for (const auto& conjunct : _conjuncts) {
615
20.7k
        VExprSPtr root;
616
20.7k
        RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root));
617
20.7k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index));
618
20.7k
        conjuncts->push_back(VExprContext::create_shared(std::move(root)));
619
20.7k
    }
620
32.4k
    return Status::OK();
621
32.4k
}
622
623
0
TFileFormatType::type FileScannerV2::_get_current_format_type() const {
624
0
    return get_range_format_type(*_params, _current_range);
625
0
}
626
627
Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
628
76.3k
                                      format::FileFormat* file_format) {
629
76.3k
    DORIS_CHECK(file_format != nullptr);
630
76.3k
    switch (format_type) {
631
56.2k
    case TFileFormatType::FORMAT_PARQUET:
632
56.2k
        *file_format = format::FileFormat::PARQUET;
633
56.2k
        return Status::OK();
634
9.93k
    case TFileFormatType::FORMAT_JNI:
635
9.93k
        *file_format = format::FileFormat::JNI;
636
9.93k
        return Status::OK();
637
1.79k
    case TFileFormatType::FORMAT_CSV_PLAIN:
638
1.79k
    case TFileFormatType::FORMAT_CSV_GZ:
639
1.79k
    case TFileFormatType::FORMAT_CSV_BZ2:
640
1.79k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
641
1.79k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
642
1.79k
    case TFileFormatType::FORMAT_CSV_LZOP:
643
1.79k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
644
1.79k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
645
1.80k
    case TFileFormatType::FORMAT_PROTO:
646
1.80k
        *file_format = format::FileFormat::CSV;
647
1.80k
        return Status::OK();
648
7.00k
    case TFileFormatType::FORMAT_TEXT:
649
7.00k
        *file_format = format::FileFormat::TEXT;
650
7.00k
        return Status::OK();
651
1.13k
    case TFileFormatType::FORMAT_JSON:
652
1.13k
        *file_format = format::FileFormat::JSON;
653
1.13k
        return Status::OK();
654
5
    case TFileFormatType::FORMAT_NATIVE:
655
5
        *file_format = format::FileFormat::NATIVE;
656
5
        return Status::OK();
657
197
    case TFileFormatType::FORMAT_ARROW:
658
197
        *file_format = format::FileFormat::ARROW;
659
197
        return Status::OK();
660
1
    default:
661
1
        return Status::NotSupported("FileScannerV2 does not support file format {}",
662
1
                                    to_string(format_type));
663
76.3k
    }
664
76.3k
}
665
666
40.3k
Status FileScannerV2::_init_io_ctx() {
667
40.3k
    _io_ctx = std::make_shared<io::IOContext>();
668
40.3k
    _io_ctx->query_id = &_state->query_id();
669
40.3k
    return Status::OK();
670
40.3k
}
671
672
43.8k
void FileScannerV2::_reset_adaptive_batch_size_state() {
673
43.8k
    _block_size_predictor.reset();
674
43.8k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
675
43.8k
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, int64_t(0));
676
43.8k
}
677
678
43.8k
void FileScannerV2::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
679
43.8k
    _reset_adaptive_batch_size_state();
680
43.8k
    if (!_should_enable_adaptive_batch_size(format_type)) {
681
100
        return;
682
100
    }
683
684
    // V2 native file readers do not have reliable row-width hints before the first batch. Start
685
    // every split with a small probe, then learn bytes-per-row from the materialized table block
686
    // and keep later batches close to RuntimeState::preferred_block_size_bytes().
687
43.7k
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
688
43.7k
            _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
689
43.7k
            _state->batch_size());
690
43.7k
}
691
692
43.8k
bool FileScannerV2::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
693
43.8k
    if (!config::enable_adaptive_batch_size) {
694
0
        return false;
695
0
    }
696
43.8k
    switch (format_type) {
697
32.4k
    case TFileFormatType::FORMAT_PARQUET:
698
32.4k
    case TFileFormatType::FORMAT_ORC:
699
33.4k
    case TFileFormatType::FORMAT_CSV_PLAIN:
700
33.4k
    case TFileFormatType::FORMAT_CSV_GZ:
701
33.4k
    case TFileFormatType::FORMAT_CSV_BZ2:
702
33.4k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
703
33.4k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
704
33.4k
    case TFileFormatType::FORMAT_CSV_LZOP:
705
33.4k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
706
33.4k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
707
33.4k
    case TFileFormatType::FORMAT_PROTO:
708
38.0k
    case TFileFormatType::FORMAT_TEXT:
709
38.6k
    case TFileFormatType::FORMAT_JSON:
710
43.7k
    case TFileFormatType::FORMAT_JNI:
711
43.7k
        return true;
712
100
    default:
713
100
        return false;
714
43.8k
    }
715
43.8k
}
716
717
213k
bool FileScannerV2::_should_run_adaptive_batch_size() const {
718
    // COUNT pushdown emits synthetic rows from file metadata and does not materialize file columns,
719
    // so there is no useful row-width sample to learn from.
720
213k
    return _block_size_predictor != nullptr &&
721
213k
           _local_state->get_push_down_agg_type() != TPushAggOp::type::COUNT;
722
213k
}
723
724
121k
size_t FileScannerV2::_predict_reader_batch_rows() {
725
121k
    DORIS_CHECK(_block_size_predictor != nullptr);
726
    // Before history exists this returns the probe row count; after update(), it returns roughly
727
    // preferred_block_size_bytes / EWMA(bytes_per_row), capped by RuntimeState::batch_size().
728
121k
    const size_t predicted_rows = _block_size_predictor->predict_next_rows();
729
121k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
730
121k
    return predicted_rows;
731
121k
}
732
733
85.2k
void FileScannerV2::_update_adaptive_batch_size(const Block& block) {
734
85.2k
    if (!_should_run_adaptive_batch_size()) {
735
5.29k
        return;
736
5.29k
    }
737
79.9k
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, static_cast<int64_t>(block.bytes()));
738
79.9k
    if (block.rows() == 0) {
739
0
        return;
740
0
    }
741
    // The sample is taken after TableReader has finalized file-local columns to table columns.
742
    // This matches the memory shape seen by upstream operators and catches very wide nested
743
    // columns, such as map/string payloads, after the first probe batch.
744
79.9k
    if (!_block_size_predictor->has_history()) {
745
35.6k
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
746
35.6k
    }
747
79.9k
    _block_size_predictor->update(block);
748
79.9k
}
749
750
40.4k
Status FileScannerV2::close(RuntimeState* state) {
751
40.4k
    if (!_try_close()) {
752
0
        return Status::OK();
753
0
    }
754
40.4k
    if (_table_reader != nullptr) {
755
32.4k
        RETURN_IF_ERROR(_table_reader->close());
756
32.4k
        _report_condition_cache_profile();
757
32.4k
        _table_reader.reset();
758
32.4k
    }
759
40.4k
    return Scanner::close(state);
760
40.4k
}
761
762
40.4k
void FileScannerV2::try_stop() {
763
40.4k
    Scanner::try_stop();
764
40.4k
    if (_io_ctx) {
765
40.4k
        _io_ctx->should_stop = true;
766
40.4k
    }
767
40.4k
}
768
769
83.9k
void FileScannerV2::update_realtime_counters() {
770
83.9k
    if (_file_reader_stats == nullptr) {
771
0
        return;
772
0
    }
773
83.9k
    const int64_t bytes_read = _file_reader_stats->read_bytes;
774
83.9k
    COUNTER_SET(_file_read_bytes_counter, bytes_read);
775
83.9k
    COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
776
83.9k
    COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
777
83.9k
}
778
779
40.4k
void FileScannerV2::_collect_profile_before_close() {
780
40.4k
    _report_file_reader_predicate_filtered_rows();
781
40.4k
    Scanner::_collect_profile_before_close();
782
40.4k
    if (_file_reader_stats != nullptr) {
783
40.4k
        COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes));
784
40.4k
        COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
785
40.4k
        COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
786
40.4k
    }
787
    // Query profiles can be collected before Scanner::close() runs. Publish condition-cache
788
    // counters here as well, using deltas so this method and close() cannot double count.
789
40.4k
    _report_condition_cache_profile();
790
40.4k
}
791
792
40.4k
bool FileScannerV2::_should_update_load_counters() const {
793
40.4k
    if (_is_load) {
794
0
        return true;
795
0
    }
796
    // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a
797
    // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by
798
    // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM
799
    // is only reachable from such load entries, never from normal queries, so use it to
800
    // identify these scanners.
801
40.4k
    return (_params != nullptr && _params->__isset.file_type &&
802
40.4k
            _params->file_type == TFileType::FILE_STREAM) ||
803
40.4k
           (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM);
804
40.4k
}
805
806
40.4k
void FileScannerV2::_report_file_reader_predicate_filtered_rows() {
807
18.4E
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0;
808
40.4k
    const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows;
809
40.4k
    if (filtered_delta > 0) {
810
        // File readers can evaluate localized conjuncts before a block reaches Scanner. Count
811
        // those rows as scanner-level unselected rows so load statistics stay identical no matter
812
        // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block().
813
2.81k
        _counter.num_rows_unselected += filtered_delta;
814
2.81k
        _reported_predicate_filtered_rows = filtered_rows;
815
2.81k
    }
816
40.4k
}
817
818
72.8k
void FileScannerV2::_report_condition_cache_profile() {
819
72.8k
    auto* local_state = static_cast<FileScanLocalState*>(_local_state);
820
72.8k
    const int64_t hit_count =
821
72.8k
            _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0;
822
72.8k
    const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count;
823
72.8k
    if (hit_delta > 0) {
824
774
        COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta);
825
774
        _reported_condition_cache_hit_count = hit_count;
826
774
    }
827
72.8k
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0;
828
72.8k
    const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows;
829
72.8k
    if (filtered_delta > 0) {
830
120
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta);
831
120
        _reported_condition_cache_filtered_rows = filtered_rows;
832
120
    }
833
72.8k
}
834
835
} // namespace doris