Coverage Report

Created: 2026-07-02 10:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/scan/file_scanner_v2.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "exec/scan/file_scanner_v2.h"
19
20
#include <gen_cpp/Exprs_types.h>
21
#include <gen_cpp/PlanNodes_types.h>
22
23
#include <algorithm>
24
#include <map>
25
#include <memory>
26
#include <optional>
27
#include <string>
28
#include <utility>
29
30
#include "common/cast_set.h"
31
#include "common/config.h"
32
#include "common/consts.h"
33
#include "common/status.h"
34
#include "core/assert_cast.h"
35
#include "core/block/column_with_type_and_name.h"
36
#include "core/column/column.h"
37
#include "core/data_type/data_type.h"
38
#include "core/data_type/data_type_nullable.h"
39
#include "core/data_type_serde/data_type_serde.h"
40
#include "core/string_ref.h"
41
#include "exec/common/util.hpp"
42
#include "exec/operator/scan_operator.h"
43
#include "exec/scan/access_path_parser.h"
44
#include "exprs/runtime_filter_expr.h"
45
#include "exprs/vexpr.h"
46
#include "exprs/vexpr_context.h"
47
#include "exprs/vslot_ref.h"
48
#include "format/format_common.h"
49
#include "format_v2/column_mapper.h"
50
#include "format_v2/jni/iceberg_sys_table_reader.h"
51
#include "format_v2/jni/jdbc_reader.h"
52
#include "format_v2/jni/max_compute_jni_reader.h"
53
#include "format_v2/jni/trino_connector_jni_reader.h"
54
#include "format_v2/table/hive_reader.h"
55
#include "format_v2/table/hudi_reader.h"
56
#include "format_v2/table/iceberg_reader.h"
57
#include "format_v2/table/paimon_reader.h"
58
#include "format_v2/table/remote_doris_reader.h"
59
#include "format_v2/table_reader.h"
60
#include "io/fs/file_meta_cache.h"
61
#include "io/io_common.h"
62
#include "runtime/descriptors.h"
63
#include "runtime/exec_env.h"
64
#include "runtime/runtime_state.h"
65
#include "service/backend_options.h"
66
#include "storage/id_manager.h"
67
68
namespace doris {
69
namespace {
70
71
38.6k
std::string table_format_name(const TFileRangeDesc& range) {
72
38.6k
    return range.__isset.table_format_params ? range.table_format_params.table_format_type
73
38.6k
                                             : "NotSet";
74
38.6k
}
75
76
TFileFormatType::type get_range_format_type(const TFileScanRangeParams& params,
77
93.6k
                                            const TFileRangeDesc& range) {
78
93.6k
    return range.__isset.format_type ? range.format_type : params.format_type;
79
93.6k
}
80
81
19.5k
bool is_supported_table_format(const TFileRangeDesc& range) {
82
19.5k
    const auto table_format = table_format_name(range);
83
19.5k
    if (table_format == "hudi" && range.__isset.table_format_params &&
84
19.5k
        range.table_format_params.__isset.hudi_params &&
85
19.5k
        range.table_format_params.hudi_params.__isset.delta_logs &&
86
19.5k
        !range.table_format_params.hudi_params.delta_logs.empty()) {
87
        // Hudi MOR splits need log-file merge semantics and must stay on the existing JNI path.
88
        // FileScannerV2 currently supports native Parquet data files only.
89
1
        return false;
90
1
    }
91
19.5k
    return table_format == "NotSet" || table_format == "tvf" || table_format == "hive" ||
92
19.5k
           table_format == "iceberg" || table_format == "paimon" || table_format == "hudi";
93
19.5k
}
94
95
52
bool is_supported_arrow_table_format(const TFileRangeDesc& range) {
96
52
    return table_format_name(range) == "remote_doris";
97
52
}
98
99
2.57k
bool is_supported_jni_table_format(const TFileRangeDesc& range) {
100
2.57k
    const auto table_format = table_format_name(range);
101
2.57k
    if (table_format == "paimon") {
102
929
        return range.__isset.table_format_params &&
103
929
               range.table_format_params.__isset.paimon_params &&
104
930
               range.table_format_params.paimon_params.__isset.reader_type &&
105
929
               range.table_format_params.paimon_params.reader_type == TPaimonReaderType::PAIMON_JNI;
106
929
    }
107
1.64k
    return table_format == "jdbc" || table_format == "iceberg" || table_format == "hudi" ||
108
1.64k
           table_format == "max_compute" || table_format == "trino_connector";
109
2.57k
}
110
111
8.05k
bool is_csv_format(TFileFormatType::type format_type) {
112
8.05k
    switch (format_type) {
113
646
    case TFileFormatType::FORMAT_CSV_PLAIN:
114
647
    case TFileFormatType::FORMAT_CSV_GZ:
115
648
    case TFileFormatType::FORMAT_CSV_BZ2:
116
649
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
117
650
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
118
651
    case TFileFormatType::FORMAT_CSV_LZOP:
119
652
    case TFileFormatType::FORMAT_CSV_DEFLATE:
120
653
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
121
654
    case TFileFormatType::FORMAT_PROTO:
122
654
        return true;
123
7.40k
    default:
124
7.40k
        return false;
125
8.05k
    }
126
8.05k
}
127
128
7.39k
bool is_text_format(TFileFormatType::type format_type) {
129
7.39k
    return format_type == TFileFormatType::FORMAT_TEXT;
130
7.39k
}
131
132
5.12k
bool is_json_format(TFileFormatType::type format_type) {
133
5.12k
    return format_type == TFileFormatType::FORMAT_JSON;
134
5.12k
}
135
136
4.82k
bool is_native_format(TFileFormatType::type format_type) {
137
4.82k
    return format_type == TFileFormatType::FORMAT_NATIVE;
138
4.82k
}
139
140
105k
bool is_partition_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
141
105k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
142
105k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
143
990
        return false;
144
990
    }
145
104k
    return slot_info.__isset.category ? slot_info.category == TColumnCategory::PARTITION_KEY
146
104k
                                      : !slot_info.is_file_slot;
147
105k
}
148
149
100k
bool is_data_file_slot(const TFileScanSlotInfo& slot_info, const std::string& column_name) {
150
100k
    if (column_name.starts_with(BeConsts::GLOBAL_ROWID_COL) ||
151
100k
        column_name == BeConsts::ICEBERG_ROWID_COL) {
152
990
        return false;
153
990
    }
154
    // CSV and other non-self-describing formats need FE slot descriptors for only the columns that
155
    // are physically read from the file. Partition/default/virtual columns stay in TableReader's
156
    // mapping layer and are materialized after the file-local block is read. New FE provides an
157
    // explicit category; old FE falls back to `is_file_slot`.
158
99.4k
    if (slot_info.__isset.category) {
159
99.4k
        return slot_info.category == TColumnCategory::REGULAR ||
160
99.4k
               slot_info.category == TColumnCategory::GENERATED;
161
99.4k
    }
162
3
    return slot_info.is_file_slot;
163
99.4k
}
164
165
Status rewrite_slot_refs_to_global_index(
166
        VExprSPtr* expr,
167
34.7k
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
168
34.7k
    DORIS_CHECK(expr != nullptr);
169
34.7k
    if (*expr == nullptr) {
170
0
        return Status::OK();
171
0
    }
172
34.7k
    if (auto* runtime_filter = dynamic_cast<RuntimeFilterExpr*>(expr->get());
173
34.7k
        runtime_filter != nullptr) {
174
3.25k
        auto impl = runtime_filter->get_impl();
175
3.25k
        DORIS_CHECK(impl != nullptr);
176
3.25k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&impl, slot_id_to_global_index));
177
3.25k
        runtime_filter->set_impl(std::move(impl));
178
3.25k
        return Status::OK();
179
3.25k
    }
180
31.5k
    if ((*expr)->is_slot_ref()) {
181
10.6k
        const auto* slot_ref = assert_cast<const VSlotRef*>(expr->get());
182
10.6k
        const auto global_index_it = slot_id_to_global_index.find(slot_ref->slot_id());
183
10.6k
        if (global_index_it == slot_id_to_global_index.end()) {
184
1
            DORIS_CHECK(slot_ref->slot_id() >= 0);
185
1
            const auto global_index = format::GlobalIndex(cast_set<size_t>(slot_ref->slot_id()));
186
1
            *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
187
1
                                            cast_set<int>(global_index.value()), -1,
188
1
                                            slot_ref->data_type(), slot_ref->column_name());
189
1
            RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
190
1
            return Status::OK();
191
1
        }
192
10.6k
        const auto global_index = global_index_it->second;
193
10.6k
        *expr = VSlotRef::create_shared(cast_set<int>(global_index.value()),
194
10.6k
                                        cast_set<int>(global_index.value()), -1,
195
10.6k
                                        slot_ref->data_type(), slot_ref->column_name());
196
10.6k
        RETURN_IF_ERROR(expr->get()->prepare(nullptr, RowDescriptor(), nullptr));
197
10.6k
        return Status::OK();
198
10.6k
    }
199
20.9k
    auto children = (*expr)->children();
200
21.2k
    for (auto& child : children) {
201
21.2k
        if (child == nullptr) {
202
0
            continue;
203
0
        }
204
21.2k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&child, slot_id_to_global_index));
205
21.2k
    }
206
20.9k
    (*expr)->set_children(std::move(children));
207
20.9k
    return Status::OK();
208
20.9k
}
209
210
} // namespace
211
212
#ifdef BE_TEST
213
Status FileScannerV2::TEST_to_file_format(TFileFormatType::type format_type,
214
                                          format::FileFormat* file_format) {
215
    return _to_file_format(format_type, file_format);
216
}
217
218
bool FileScannerV2::TEST_is_partition_slot(const TFileScanSlotInfo& slot_info,
219
                                           const std::string& column_name) {
220
    return is_partition_slot(slot_info, column_name);
221
}
222
223
bool FileScannerV2::TEST_is_data_file_slot(const TFileScanSlotInfo& slot_info,
224
                                           const std::string& column_name) {
225
    return is_data_file_slot(slot_info, column_name);
226
}
227
228
Status FileScannerV2::TEST_rewrite_slot_refs_to_global_index(
229
        VExprSPtr* expr,
230
        const std::unordered_map<int32_t, format::GlobalIndex>& slot_id_to_global_index) {
231
    return rewrite_slot_refs_to_global_index(expr, slot_id_to_global_index);
232
}
233
#endif
234
235
26.9k
bool FileScannerV2::is_supported(const TFileScanRangeParams& params, const TFileRangeDesc& range) {
236
26.9k
    const auto format_type = get_range_format_type(params, range);
237
26.9k
    if (format_type == TFileFormatType::FORMAT_PARQUET) {
238
16.3k
        return is_supported_table_format(range);
239
16.3k
    } else if (format_type == TFileFormatType::FORMAT_ARROW) {
240
52
        return is_supported_arrow_table_format(range);
241
10.5k
    } else if (format_type == TFileFormatType::FORMAT_JNI) {
242
2.57k
        return is_supported_jni_table_format(range);
243
7.99k
    } else if (is_csv_format(format_type) || is_text_format(format_type) ||
244
7.99k
               is_json_format(format_type) || is_native_format(format_type)) {
245
3.22k
        return is_supported_table_format(range);
246
4.76k
    } else {
247
4.76k
        LOG(WARNING) << "Unsupported file format type " << format_type << " for file scanner v2";
248
4.76k
        return false;
249
4.76k
    }
250
26.9k
}
251
252
FileScannerV2::FileScannerV2(RuntimeState* state, FileScanLocalState* local_state, int64_t limit,
253
                             std::shared_ptr<SplitSourceConnector> split_source,
254
                             RuntimeProfile* profile, ShardedKVCache* kv_cache,
255
                             const std::unordered_map<std::string, int>* colname_to_slot_id)
256
20.3k
        : Scanner(state, local_state, limit, profile),
257
20.3k
          _split_source(std::move(split_source)),
258
20.3k
          _kv_cache(kv_cache) {
259
20.3k
    (void)colname_to_slot_id;
260
20.3k
    if (state->get_query_ctx() != nullptr &&
261
20.3k
        state->get_query_ctx()->file_scan_range_params_map.count(local_state->parent_id()) > 0) {
262
20.3k
        _params = &(state->get_query_ctx()->file_scan_range_params_map[local_state->parent_id()]);
263
20.3k
    } else {
264
4
        _params = _split_source->get_params();
265
4
    }
266
20.3k
}
267
268
20.3k
Status FileScannerV2::init(RuntimeState* state, const VExprContextSPtrs& conjuncts) {
269
20.3k
    RETURN_IF_ERROR(Scanner::init(state, conjuncts));
270
20.3k
    _get_block_timer =
271
20.3k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileScannerV2GetBlockTime", 1);
272
20.3k
    _file_counter =
273
20.3k
            ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(), "FileNumber", TUnit::UNIT, 1);
274
20.3k
    _file_read_bytes_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
275
20.3k
                                                      "FileReadBytes", TUnit::BYTES, 1);
276
20.3k
    _file_read_calls_counter = ADD_COUNTER_WITH_LEVEL(_local_state->scanner_profile(),
277
20.3k
                                                      "FileReadCalls", TUnit::UNIT, 1);
278
20.3k
    _file_read_time_counter =
279
20.3k
            ADD_TIMER_WITH_LEVEL(_local_state->scanner_profile(), "FileReadTime", 1);
280
20.3k
    _adaptive_batch_predicted_rows_counter = ADD_COUNTER_WITH_LEVEL(
281
20.3k
            _local_state->scanner_profile(), "AdaptiveBatchPredictedRows", TUnit::UNIT, 1);
282
20.3k
    _adaptive_batch_actual_bytes_counter = ADD_COUNTER_WITH_LEVEL(
283
20.3k
            _local_state->scanner_profile(), "AdaptiveBatchActualBytes", TUnit::BYTES, 1);
284
20.3k
    _adaptive_batch_probe_count_counter = ADD_COUNTER_WITH_LEVEL(
285
20.3k
            _local_state->scanner_profile(), "AdaptiveBatchProbeCount", TUnit::UNIT, 1);
286
20.3k
    _file_cache_statistics = std::make_unique<io::FileCacheStatistics>();
287
20.3k
    _file_reader_stats = std::make_unique<io::FileReaderStats>();
288
20.3k
    RETURN_IF_ERROR(_init_io_ctx());
289
20.3k
    _io_ctx->file_cache_stats = _file_cache_statistics.get();
290
20.3k
    _io_ctx->file_reader_stats = _file_reader_stats.get();
291
20.3k
    _io_ctx->is_disposable = _state->query_options().disable_file_cache;
292
20.3k
    return Status::OK();
293
20.3k
}
294
295
20.3k
Status FileScannerV2::_open_impl(RuntimeState* state) {
296
20.3k
    RETURN_IF_CANCELLED(state);
297
20.3k
    RETURN_IF_ERROR(Scanner::_open_impl(state));
298
20.3k
    RETURN_IF_ERROR(_split_source->get_next(&_first_scan_range, &_current_range));
299
20.3k
    if (_first_scan_range) {
300
16.4k
        RETURN_IF_ERROR(_create_table_reader_for_format(_current_range, &_table_reader));
301
16.4k
        DORIS_CHECK(_table_reader != nullptr);
302
16.4k
        RETURN_IF_ERROR(_init_expr_ctxes());
303
16.4k
        RETURN_IF_ERROR(_init_table_reader(_current_range));
304
16.4k
    }
305
20.3k
    return Status::OK();
306
20.3k
}
307
308
63.1k
Status FileScannerV2::_get_block_impl(RuntimeState* state, Block* block, bool* eof) {
309
85.0k
    while (true) {
310
85.0k
        RETURN_IF_CANCELLED(state);
311
85.0k
        if (!_has_prepared_split) {
312
42.2k
            RETURN_IF_ERROR(_prepare_next_split(eof));
313
42.2k
            if (*eof) {
314
20.1k
                return Status::OK();
315
20.1k
            }
316
42.2k
        }
317
318
64.8k
        {
319
64.8k
            SCOPED_TIMER(_get_block_timer);
320
64.8k
            if (_should_run_adaptive_batch_size()) {
321
61.0k
                _table_reader->set_batch_size(_predict_reader_batch_rows());
322
61.0k
            }
323
64.8k
            RETURN_IF_ERROR(_table_reader->get_block(block, eof));
324
64.8k
        }
325
64.8k
        if (*eof) {
326
21.8k
            _state->update_num_finished_scan_range(1);
327
21.8k
            _has_prepared_split = false;
328
21.8k
            *eof = false;
329
21.8k
            continue;
330
21.8k
        }
331
42.9k
        _update_adaptive_batch_size(*block);
332
42.9k
        return Status::OK();
333
64.8k
    }
334
63.1k
}
335
336
42.2k
Status FileScannerV2::_prepare_next_split(bool* eos) {
337
42.2k
    bool has_next = _first_scan_range;
338
42.2k
    if (!_first_scan_range) {
339
25.8k
        RETURN_IF_ERROR(_split_source->get_next(&has_next, &_current_range));
340
25.8k
    }
341
42.2k
    _first_scan_range = false;
342
42.2k
    if (!has_next || _should_stop) {
343
20.1k
        *eos = true;
344
20.1k
        return Status::OK();
345
20.1k
    }
346
22.1k
    DORIS_CHECK(_table_reader != nullptr);
347
22.1k
    _current_range_path = _current_range.path;
348
22.1k
    _init_adaptive_batch_size_state(get_range_format_type(*_params, _current_range));
349
22.1k
    RETURN_IF_ERROR(_prepare_table_reader_split(_current_range));
350
22.1k
    COUNTER_UPDATE(_file_counter, 1);
351
22.1k
    _has_prepared_split = true;
352
22.1k
    *eos = false;
353
22.1k
    return Status::OK();
354
22.1k
}
355
356
16.4k
Status FileScannerV2::_init_table_reader(const TFileRangeDesc& range) {
357
16.4k
    const auto format_type = get_range_format_type(*_params, range);
358
16.4k
    format::FileFormat file_format;
359
16.4k
    RETURN_IF_ERROR(_to_file_format(format_type, &file_format));
360
16.4k
    DORIS_CHECK(_table_reader != nullptr);
361
362
16.4k
    format::TableColumnPredicates table_column_predicates;
363
16.4k
    RETURN_IF_ERROR(_build_table_column_predicates(&table_column_predicates));
364
16.4k
    VExprContextSPtrs table_conjuncts;
365
16.4k
    RETURN_IF_ERROR(_build_table_conjuncts(&table_conjuncts));
366
16.4k
    RETURN_IF_ERROR(_table_reader->init({
367
16.4k
            .projected_columns = _projected_columns,
368
16.4k
            .column_predicates = std::move(table_column_predicates),
369
16.4k
            .conjuncts = std::move(table_conjuncts),
370
16.4k
            .format = file_format,
371
16.4k
            .scan_params = const_cast<TFileScanRangeParams*>(_params),
372
16.4k
            .io_ctx = _io_ctx,
373
16.4k
            .runtime_state = _state,
374
16.4k
            .scanner_profile = _local_state->scanner_profile(),
375
16.4k
            .file_slot_descs = &_file_slot_descs,
376
16.4k
            .push_down_agg_type = _local_state->get_push_down_agg_type(),
377
16.4k
            .condition_cache_digest = _local_state->get_condition_cache_digest(),
378
16.4k
    }));
379
16.4k
    return Status::OK();
380
16.4k
}
381
382
Status FileScannerV2::_create_table_reader_for_format(
383
16.4k
        const TFileRangeDesc& range, std::unique_ptr<format::TableReader>* reader) const {
384
16.4k
    DORIS_CHECK(reader != nullptr);
385
16.4k
    const auto table_format = table_format_name(range);
386
16.4k
    if (table_format == "NotSet" || table_format == "tvf") {
387
1.75k
        *reader = std::make_unique<format::TableReader>();
388
14.6k
    } else if (table_format == "hive") {
389
5.64k
        *reader = format::hive::HiveReader::create_unique();
390
9.05k
    } else if (table_format == "iceberg") {
391
6.02k
        if (get_range_format_type(*_params, range) == TFileFormatType::FORMAT_JNI) {
392
608
            *reader = std::make_unique<format::iceberg::IcebergSysTableJniReader>();
393
5.42k
        } else {
394
5.42k
            *reader = std::make_unique<format::iceberg::IcebergTableReader>();
395
5.42k
        }
396
6.02k
    } else if (table_format == "paimon") {
397
2.03k
        *reader = std::make_unique<format::paimon::PaimonHybridReader>();
398
2.03k
    } else if (table_format == "hudi") {
399
0
        *reader = std::make_unique<format::hudi::HudiHybridReader>();
400
990
    } else if (table_format == "jdbc") {
401
625
        *reader = std::make_unique<format::jdbc::JdbcJniReader>();
402
625
    } else if (table_format == "max_compute") {
403
0
        const auto* mc_desc =
404
0
                static_cast<const MaxComputeTableDescriptor*>(_output_tuple_desc->table_desc());
405
0
        RETURN_IF_ERROR(mc_desc->init_status());
406
0
        *reader = std::make_unique<format::max_compute::MaxComputeJniReader>(mc_desc);
407
365
    } else if (table_format == "trino_connector") {
408
318
        *reader = std::make_unique<format::trino_connector::TrinoConnectorJniReader>();
409
318
    } else if (table_format == "remote_doris") {
410
49
        *reader = std::make_unique<format::remote_doris::RemoteDorisReader>();
411
18.4E
    } else {
412
18.4E
        return Status::NotSupported("FileScannerV2 does not support table format {}", table_format);
413
18.4E
    }
414
16.4k
    return Status::OK();
415
16.4k
}
416
417
22.1k
Status FileScannerV2::_prepare_table_reader_split(const TFileRangeDesc& range) {
418
22.1k
    std::map<std::string, Field> partition_values;
419
22.1k
    RETURN_IF_ERROR(_generate_partition_values(range, &partition_values));
420
22.1k
    format::FileFormat current_split_format;
421
22.1k
    RETURN_IF_ERROR(_to_file_format(get_range_format_type(*_params, range), &current_split_format));
422
22.1k
    RETURN_IF_ERROR(_table_reader->prepare_split({
423
22.1k
            .partition_values = std::move(partition_values),
424
22.1k
            .cache = _kv_cache,
425
22.1k
            .current_range = range,
426
22.1k
            .current_split_format = current_split_format,
427
22.1k
            .global_rowid_context = _create_global_rowid_context(range),
428
22.1k
    }));
429
22.1k
    return Status::OK();
430
22.1k
}
431
432
1.07k
bool FileScannerV2::_should_enable_file_meta_cache() const {
433
1.07k
    return ExecEnv::GetInstance()->file_meta_cache()->enabled() &&
434
1.07k
           _split_source->num_scan_ranges() < config::max_external_file_meta_cache_num / 3;
435
1.07k
}
436
437
std::optional<format::GlobalRowIdContext> FileScannerV2::_create_global_rowid_context(
438
22.1k
        const TFileRangeDesc& range) const {
439
22.1k
    if (!_need_global_rowid_column) {
440
21.0k
        return std::nullopt;
441
21.0k
    }
442
1.06k
    auto& id_file_map = _state->get_id_file_map();
443
1.06k
    DORIS_CHECK(id_file_map != nullptr);
444
1.06k
    const auto file_id = id_file_map->get_file_mapping_id(
445
1.06k
            std::make_shared<FileMapping>(_local_state->cast<FileScanLocalState>().parent_id(),
446
1.06k
                                          range, _should_enable_file_meta_cache()));
447
1.06k
    return format::GlobalRowIdContext {
448
1.06k
            .version = IdManager::ID_VERSION,
449
1.06k
            .backend_id = BackendOptions::get_backend_id(),
450
1.06k
            .file_id = file_id,
451
1.06k
    };
452
22.1k
}
453
454
Status FileScannerV2::_generate_partition_values(
455
22.1k
        const TFileRangeDesc& range, std::map<std::string, Field>* partition_values) const {
456
22.1k
    DORIS_CHECK(partition_values != nullptr);
457
22.1k
    partition_values->clear();
458
22.1k
    if (!range.__isset.columns_from_path_keys || !range.__isset.columns_from_path) {
459
16.8k
        return Status::OK();
460
16.8k
    }
461
5.23k
    DORIS_CHECK(range.columns_from_path_keys.size() == range.columns_from_path.size());
462
13.0k
    for (size_t idx = 0; idx < range.columns_from_path_keys.size(); ++idx) {
463
7.82k
        const auto& key = range.columns_from_path_keys[idx];
464
7.82k
        const auto it = _partition_slot_descs.find(key);
465
7.82k
        if (it == _partition_slot_descs.end()) {
466
2.32k
            continue;
467
2.32k
        }
468
5.49k
        const auto& value = range.columns_from_path[idx];
469
5.49k
        const bool is_null = range.__isset.columns_from_path_is_null &&
470
5.50k
                             idx < range.columns_from_path_is_null.size() &&
471
5.50k
                             range.columns_from_path_is_null[idx];
472
5.49k
        Field field;
473
5.49k
        DORIS_CHECK(it->second.slot_desc != nullptr);
474
5.49k
        RETURN_IF_ERROR(_parse_partition_value(it->second.slot_desc, value, is_null, &field));
475
5.49k
        partition_values->emplace(it->second.canonical_name, std::move(field));
476
5.49k
    }
477
5.23k
    return Status::OK();
478
5.23k
}
479
480
Status FileScannerV2::_parse_partition_value(const SlotDescriptor* slot_desc,
481
                                             const std::string& value, bool is_null,
482
5.50k
                                             Field* field) const {
483
5.50k
    DORIS_CHECK(slot_desc != nullptr);
484
5.50k
    DORIS_CHECK(field != nullptr);
485
5.50k
    if (is_null) {
486
404
        *field = Field::create_field<TYPE_NULL>(Null());
487
404
        return Status::OK();
488
404
    }
489
5.09k
    const auto data_type = remove_nullable(slot_desc->get_data_type_ptr());
490
5.09k
    auto column = data_type->create_column();
491
5.09k
    auto serde = data_type->get_serde();
492
5.09k
    DataTypeSerDe::FormatOptions options;
493
5.09k
    options.converted_from_string = true;
494
5.09k
    StringRef ref(value.data(), value.size());
495
5.09k
    RETURN_IF_ERROR(serde->from_string(ref, *column, options));
496
5.09k
    DORIS_CHECK(column->size() == 1);
497
5.09k
    *field = (*column)[0];
498
5.09k
    return Status::OK();
499
5.09k
}
500
501
16.4k
Status FileScannerV2::_init_expr_ctxes() {
502
16.4k
    _slot_id_to_desc.clear();
503
16.4k
    _slot_id_to_global_index.clear();
504
16.4k
    _partition_slot_descs.clear();
505
16.4k
    _file_slot_descs.clear();
506
105k
    for (const auto* slot_desc : _output_tuple_desc->slots()) {
507
105k
        _slot_id_to_desc.emplace(slot_desc->id(), slot_desc);
508
105k
    }
509
16.4k
    DORIS_CHECK(_table_reader != nullptr);
510
16.4k
    RETURN_IF_ERROR(_build_projected_columns(*_table_reader));
511
16.4k
    return Status::OK();
512
16.4k
}
513
514
16.4k
Status FileScannerV2::_build_projected_columns(const format::TableReader& table_reader) {
515
16.4k
    _projected_columns.clear();
516
16.4k
    _projected_columns.reserve(_params->required_slots.size());
517
16.4k
    _need_global_rowid_column = false;
518
16.4k
    format::ProjectedColumnBuildContext build_context {
519
16.4k
            .scan_params = _params,
520
16.4k
            .range = &_current_range,
521
16.4k
            .runtime_state = _state,
522
16.4k
    };
523
524
122k
    for (size_t slot_idx = 0; slot_idx < _params->required_slots.size(); ++slot_idx) {
525
105k
        const auto& slot_info = _params->required_slots[slot_idx];
526
105k
        const auto it = _slot_id_to_desc.find(slot_info.slot_id);
527
105k
        if (it == _slot_id_to_desc.end()) {
528
0
            return Status::InternalError("Unknown source slot descriptor, slot_id={}",
529
0
                                         slot_info.slot_id);
530
0
        }
531
105k
        auto column = _build_table_column(it->second);
532
105k
        if (column.name.starts_with(BeConsts::GLOBAL_ROWID_COL)) {
533
940
            _need_global_rowid_column = true;
534
940
        }
535
105k
        RETURN_IF_ERROR(_build_default_expr(slot_info, &column.default_expr));
536
105k
        build_context.schema_column.reset();
537
105k
        RETURN_IF_ERROR(table_reader.annotate_projected_column(slot_info, &build_context, &column));
538
        // Build nested children from access paths generated by the slot's access-path
539
        // expressions. A projected column can therefore contain only a subset of the schema
540
        // column's nested children.
541
105k
        RETURN_IF_ERROR(AccessPathParser::build_nested_children(
542
105k
                &column, it->second,
543
105k
                build_context.schema_column.has_value() ? &*build_context.schema_column : nullptr));
544
105k
        if (is_partition_slot(slot_info, column.name)) {
545
5.26k
            column.is_partition_key = true;
546
5.26k
            _partition_slot_descs.emplace(
547
5.26k
                    column.name,
548
5.26k
                    PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
549
5.26k
            for (const auto& alias : column.name_mapping) {
550
0
                _partition_slot_descs.emplace(
551
0
                        alias,
552
0
                        PartitionSlotInfo {.slot_desc = it->second, .canonical_name = column.name});
553
0
            }
554
100k
        } else if (is_data_file_slot(slot_info, column.name)) {
555
99.4k
            _file_slot_descs.push_back(const_cast<SlotDescriptor*>(it->second));
556
99.4k
        }
557
105k
        const auto global_index = format::GlobalIndex(slot_idx);
558
105k
        _slot_id_to_global_index.emplace(slot_info.slot_id, global_index);
559
105k
        _projected_columns.push_back(std::move(column));
560
105k
    }
561
16.4k
    RETURN_IF_ERROR(table_reader.validate_projected_columns(build_context));
562
16.4k
    return Status::OK();
563
16.4k
}
564
565
Status FileScannerV2::_build_default_expr(const TFileScanSlotInfo& slot_info,
566
105k
                                          VExprContextSPtr* ctx) const {
567
105k
    DORIS_CHECK(ctx != nullptr);
568
105k
    if (slot_info.__isset.default_value_expr && !slot_info.default_value_expr.nodes.empty()) {
569
103k
        return VExpr::create_expr_tree(slot_info.default_value_expr, *ctx);
570
103k
    }
571
572
1.83k
    if (_params->__isset.default_value_of_src_slot) {
573
1.83k
        const auto it = _params->default_value_of_src_slot.find(slot_info.slot_id);
574
1.83k
        if (it != _params->default_value_of_src_slot.end() && !it->second.nodes.empty()) {
575
0
            return VExpr::create_expr_tree(it->second, *ctx);
576
0
        }
577
1.83k
    }
578
1.82k
    return Status::OK();
579
1.82k
}
580
581
105k
format::ColumnDefinition FileScannerV2::_build_table_column(const SlotDescriptor* slot_desc) {
582
105k
    DORIS_CHECK(slot_desc != nullptr);
583
105k
    format::ColumnDefinition column;
584
    // TODO(gabriel): why always BY_NAME here?
585
105k
    column.identifier = Field::create_field<TYPE_STRING>(slot_desc->col_name());
586
105k
    column.name = slot_desc->col_name();
587
105k
    column.type = slot_desc->get_data_type_ptr();
588
105k
    return column;
589
105k
}
590
591
Status FileScannerV2::_build_table_column_predicates(
592
16.4k
        format::TableColumnPredicates* predicates) const {
593
16.4k
    DORIS_CHECK(predicates != nullptr);
594
16.4k
    predicates->clear();
595
16.4k
    const auto& slot_predicates = _local_state->cast<FileScanLocalState>()._slot_id_to_predicates;
596
105k
    for (const auto& [slot_id, slot_predicate_list] : slot_predicates) {
597
105k
        const auto it = _slot_id_to_desc.find(slot_id);
598
105k
        if (it == _slot_id_to_desc.end()) {
599
0
            continue;
600
0
        }
601
105k
        const auto global_index_it = _slot_id_to_global_index.find(slot_id);
602
105k
        if (global_index_it == _slot_id_to_global_index.end()) {
603
0
            continue;
604
0
        }
605
105k
        (*predicates)[global_index_it->second] = slot_predicate_list;
606
105k
    }
607
16.4k
    return Status::OK();
608
16.4k
}
609
610
16.4k
Status FileScannerV2::_build_table_conjuncts(VExprContextSPtrs* conjuncts) const {
611
16.4k
    DORIS_CHECK(conjuncts != nullptr);
612
16.4k
    conjuncts->clear();
613
16.4k
    conjuncts->reserve(_conjuncts.size());
614
16.4k
    for (const auto& conjunct : _conjuncts) {
615
10.2k
        VExprSPtr root;
616
10.2k
        RETURN_IF_ERROR(format::clone_table_expr_tree(conjunct->root(), &root));
617
10.2k
        RETURN_IF_ERROR(rewrite_slot_refs_to_global_index(&root, _slot_id_to_global_index));
618
10.2k
        conjuncts->push_back(VExprContext::create_shared(std::move(root)));
619
10.2k
    }
620
16.4k
    return Status::OK();
621
16.4k
}
622
623
0
TFileFormatType::type FileScannerV2::_get_current_format_type() const {
624
0
    return get_range_format_type(*_params, _current_range);
625
0
}
626
627
Status FileScannerV2::_to_file_format(TFileFormatType::type format_type,
628
38.5k
                                      format::FileFormat* file_format) {
629
38.5k
    DORIS_CHECK(file_format != nullptr);
630
38.5k
    switch (format_type) {
631
28.2k
    case TFileFormatType::FORMAT_PARQUET:
632
28.2k
        *file_format = format::FileFormat::PARQUET;
633
28.2k
        return Status::OK();
634
4.97k
    case TFileFormatType::FORMAT_JNI:
635
4.97k
        *file_format = format::FileFormat::JNI;
636
4.97k
        return Status::OK();
637
1.23k
    case TFileFormatType::FORMAT_CSV_PLAIN:
638
1.23k
    case TFileFormatType::FORMAT_CSV_GZ:
639
1.23k
    case TFileFormatType::FORMAT_CSV_BZ2:
640
1.23k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
641
1.23k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
642
1.23k
    case TFileFormatType::FORMAT_CSV_LZOP:
643
1.24k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
644
1.24k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
645
1.24k
    case TFileFormatType::FORMAT_PROTO:
646
1.24k
        *file_format = format::FileFormat::CSV;
647
1.24k
        return Status::OK();
648
3.45k
    case TFileFormatType::FORMAT_TEXT:
649
3.45k
        *file_format = format::FileFormat::TEXT;
650
3.45k
        return Status::OK();
651
591
    case TFileFormatType::FORMAT_JSON:
652
591
        *file_format = format::FileFormat::JSON;
653
591
        return Status::OK();
654
5
    case TFileFormatType::FORMAT_NATIVE:
655
5
        *file_format = format::FileFormat::NATIVE;
656
5
        return Status::OK();
657
99
    case TFileFormatType::FORMAT_ARROW:
658
99
        *file_format = format::FileFormat::ARROW;
659
99
        return Status::OK();
660
1
    default:
661
1
        return Status::NotSupported("FileScannerV2 does not support file format {}",
662
1
                                    to_string(format_type));
663
38.5k
    }
664
38.5k
}
665
666
20.3k
Status FileScannerV2::_init_io_ctx() {
667
20.3k
    _io_ctx = std::make_shared<io::IOContext>();
668
20.3k
    _io_ctx->query_id = &_state->query_id();
669
20.3k
    return Status::OK();
670
20.3k
}
671
672
22.1k
void FileScannerV2::_reset_adaptive_batch_size_state() {
673
22.1k
    _block_size_predictor.reset();
674
22.1k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, int64_t(0));
675
22.1k
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, int64_t(0));
676
22.1k
}
677
678
22.1k
void FileScannerV2::_init_adaptive_batch_size_state(TFileFormatType::type format_type) {
679
22.1k
    _reset_adaptive_batch_size_state();
680
22.1k
    if (!_should_enable_adaptive_batch_size(format_type)) {
681
51
        return;
682
51
    }
683
684
    // V2 native file readers do not have reliable row-width hints before the first batch. Start
685
    // every split with a small probe, then learn bytes-per-row from the materialized table block
686
    // and keep later batches close to RuntimeState::preferred_block_size_bytes().
687
22.0k
    _block_size_predictor = std::make_unique<AdaptiveBlockSizePredictor>(
688
22.0k
            _state->preferred_block_size_bytes(), 0.0, ADAPTIVE_BATCH_INITIAL_PROBE_ROWS,
689
22.0k
            _state->batch_size());
690
22.0k
}
691
692
22.1k
bool FileScannerV2::_should_enable_adaptive_batch_size(TFileFormatType::type format_type) const {
693
22.1k
    if (!config::enable_adaptive_batch_size) {
694
0
        return false;
695
0
    }
696
22.1k
    switch (format_type) {
697
16.3k
    case TFileFormatType::FORMAT_PARQUET:
698
16.3k
    case TFileFormatType::FORMAT_ORC:
699
16.9k
    case TFileFormatType::FORMAT_CSV_PLAIN:
700
16.9k
    case TFileFormatType::FORMAT_CSV_GZ:
701
16.9k
    case TFileFormatType::FORMAT_CSV_BZ2:
702
16.9k
    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
703
16.9k
    case TFileFormatType::FORMAT_CSV_LZ4BLOCK:
704
16.9k
    case TFileFormatType::FORMAT_CSV_LZOP:
705
16.9k
    case TFileFormatType::FORMAT_CSV_DEFLATE:
706
16.9k
    case TFileFormatType::FORMAT_CSV_SNAPPYBLOCK:
707
16.9k
    case TFileFormatType::FORMAT_PROTO:
708
19.2k
    case TFileFormatType::FORMAT_TEXT:
709
19.5k
    case TFileFormatType::FORMAT_JSON:
710
22.0k
    case TFileFormatType::FORMAT_JNI:
711
22.0k
        return true;
712
51
    default:
713
51
        return false;
714
22.1k
    }
715
22.1k
}
716
717
107k
bool FileScannerV2::_should_run_adaptive_batch_size() const {
718
    // COUNT pushdown emits synthetic rows from file metadata and does not materialize file columns,
719
    // so there is no useful row-width sample to learn from.
720
107k
    return _block_size_predictor != nullptr &&
721
107k
           _local_state->get_push_down_agg_type() != TPushAggOp::type::COUNT;
722
107k
}
723
724
61.0k
size_t FileScannerV2::_predict_reader_batch_rows() {
725
61.0k
    DORIS_CHECK(_block_size_predictor != nullptr);
726
    // Before history exists this returns the probe row count; after update(), it returns roughly
727
    // preferred_block_size_bytes / EWMA(bytes_per_row), capped by RuntimeState::batch_size().
728
61.0k
    const size_t predicted_rows = _block_size_predictor->predict_next_rows();
729
61.0k
    COUNTER_SET(_adaptive_batch_predicted_rows_counter, static_cast<int64_t>(predicted_rows));
730
61.0k
    return predicted_rows;
731
61.0k
}
732
733
42.9k
void FileScannerV2::_update_adaptive_batch_size(const Block& block) {
734
42.9k
    if (!_should_run_adaptive_batch_size()) {
735
2.64k
        return;
736
2.64k
    }
737
40.3k
    COUNTER_SET(_adaptive_batch_actual_bytes_counter, static_cast<int64_t>(block.bytes()));
738
40.3k
    if (block.rows() == 0) {
739
0
        return;
740
0
    }
741
    // The sample is taken after TableReader has finalized file-local columns to table columns.
742
    // This matches the memory shape seen by upstream operators and catches very wide nested
743
    // columns, such as map/string payloads, after the first probe batch.
744
40.3k
    if (!_block_size_predictor->has_history()) {
745
18.0k
        COUNTER_UPDATE(_adaptive_batch_probe_count_counter, 1);
746
18.0k
    }
747
40.3k
    _block_size_predictor->update(block);
748
40.3k
}
749
750
20.4k
Status FileScannerV2::close(RuntimeState* state) {
751
20.4k
    if (!_try_close()) {
752
0
        return Status::OK();
753
0
    }
754
20.4k
    if (_table_reader != nullptr) {
755
16.4k
        RETURN_IF_ERROR(_table_reader->close());
756
16.4k
        _report_condition_cache_profile();
757
16.4k
        _table_reader.reset();
758
16.4k
    }
759
20.4k
    return Scanner::close(state);
760
20.4k
}
761
762
20.4k
void FileScannerV2::try_stop() {
763
20.4k
    Scanner::try_stop();
764
20.4k
    if (_io_ctx) {
765
20.4k
        _io_ctx->should_stop = true;
766
20.4k
    }
767
20.4k
}
768
769
42.5k
void FileScannerV2::update_realtime_counters() {
770
42.5k
    if (_file_reader_stats == nullptr) {
771
0
        return;
772
0
    }
773
42.5k
    const int64_t bytes_read = _file_reader_stats->read_bytes;
774
42.5k
    COUNTER_SET(_file_read_bytes_counter, bytes_read);
775
42.5k
    COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
776
42.5k
    COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
777
42.5k
}
778
779
20.4k
void FileScannerV2::_collect_profile_before_close() {
780
20.4k
    _report_file_reader_predicate_filtered_rows();
781
20.4k
    Scanner::_collect_profile_before_close();
782
20.4k
    if (_file_reader_stats != nullptr) {
783
20.4k
        COUNTER_SET(_file_read_bytes_counter, cast_set<int64_t>(_file_reader_stats->read_bytes));
784
20.4k
        COUNTER_SET(_file_read_calls_counter, cast_set<int64_t>(_file_reader_stats->read_calls));
785
20.4k
        COUNTER_SET(_file_read_time_counter, cast_set<int64_t>(_file_reader_stats->read_time_ns));
786
20.4k
    }
787
    // Query profiles can be collected before Scanner::close() runs. Publish condition-cache
788
    // counters here as well, using deltas so this method and close() cannot double count.
789
20.4k
    _report_condition_cache_profile();
790
20.4k
}
791
792
20.4k
bool FileScannerV2::_should_update_load_counters() const {
793
20.4k
    if (_is_load) {
794
0
        return true;
795
0
    }
796
    // TVF based loads (e.g. http_stream, group commit relay) plan the load source as a
797
    // tvf query scan without src tuple desc, so _is_load is false. But rows filtered by
798
    // the load's WHERE clause still need to be reported as unselected rows. FILE_STREAM
799
    // is only reachable from such load entries, never from normal queries, so use it to
800
    // identify these scanners.
801
20.4k
    return (_params != nullptr && _params->__isset.file_type &&
802
20.4k
            _params->file_type == TFileType::FILE_STREAM) ||
803
20.4k
           (_current_range.__isset.file_type && _current_range.file_type == TFileType::FILE_STREAM);
804
20.4k
}
805
806
20.3k
void FileScannerV2::_report_file_reader_predicate_filtered_rows() {
807
18.4E
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->predicate_filtered_rows : 0;
808
20.3k
    const int64_t filtered_delta = filtered_rows - _reported_predicate_filtered_rows;
809
20.3k
    if (filtered_delta > 0) {
810
        // File readers can evaluate localized conjuncts before a block reaches Scanner. Count
811
        // those rows as scanner-level unselected rows so load statistics stay identical no matter
812
        // whether a predicate is pushed down or evaluated by Scanner::_filter_output_block().
813
1.43k
        _counter.num_rows_unselected += filtered_delta;
814
1.43k
        _reported_predicate_filtered_rows = filtered_rows;
815
1.43k
    }
816
20.3k
}
817
818
36.8k
void FileScannerV2::_report_condition_cache_profile() {
819
36.8k
    auto* local_state = static_cast<FileScanLocalState*>(_local_state);
820
36.8k
    const int64_t hit_count =
821
36.8k
            _table_reader != nullptr ? _table_reader->condition_cache_hit_count() : 0;
822
36.8k
    const int64_t hit_delta = hit_count - _reported_condition_cache_hit_count;
823
36.8k
    if (hit_delta > 0) {
824
399
        COUNTER_UPDATE(local_state->_condition_cache_hit_counter, hit_delta);
825
399
        _reported_condition_cache_hit_count = hit_count;
826
399
    }
827
18.4E
    const int64_t filtered_rows = _io_ctx != nullptr ? _io_ctx->condition_cache_filtered_rows : 0;
828
36.8k
    const int64_t filtered_delta = filtered_rows - _reported_condition_cache_filtered_rows;
829
36.8k
    if (filtered_delta > 0) {
830
96
        COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, filtered_delta);
831
96
        _reported_condition_cache_filtered_rows = filtered_rows;
832
96
    }
833
36.8k
}
834
835
} // namespace doris