Coverage Report

Created: 2026-05-09 09:50

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/udf/python/python_server.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "udf/python/python_server.h"
19
20
#include <arrow/type_fwd.h>
21
#include <butil/fd_utility.h>
22
#include <dirent.h>
23
#include <fmt/core.h>
24
#include <sys/poll.h>
25
#include <sys/stat.h>
26
27
#include <boost/asio.hpp>
28
#include <boost/process.hpp>
29
#include <fstream>
30
#include <future>
31
32
#include "arrow/flight/client.h"
33
#include "common/config.h"
34
#include "udf/python/python_udaf_client.h"
35
#include "udf/python/python_udf_client.h"
36
#include "udf/python/python_udtf_client.h"
37
#include "util/cpu_info.h"
38
39
namespace doris {
40
41
std::shared_ptr<PythonServerManager::VersionedProcessPool>
42
7.24k
PythonServerManager::_get_or_create_process_pool(const PythonVersion& version) {
43
7.24k
    std::lock_guard<std::mutex> lock(_pools_mutex);
44
7.24k
    auto& pool = _process_pools[version];
45
7.24k
    if (!pool) {
46
18
        pool = std::make_shared<VersionedProcessPool>();
47
18
    }
48
7.24k
    return pool;
49
7.24k
}
50
51
std::vector<std::pair<PythonVersion, std::shared_ptr<PythonServerManager::VersionedProcessPool>>>
52
171
PythonServerManager::_snapshot_process_pools() {
53
171
    std::lock_guard<std::mutex> lock(_pools_mutex);
54
171
    std::vector<std::pair<PythonVersion, std::shared_ptr<VersionedProcessPool>>> snapshot;
55
171
    snapshot.reserve(_process_pools.size());
56
171
    for (const auto& [version, pool] : _process_pools) {
57
139
        snapshot.emplace_back(version, pool);
58
139
    }
59
171
    return snapshot;
60
171
}
61
62
#ifdef BE_TEST
63
void PythonServerManager::set_process_pool_for_test(const PythonVersion& version,
64
                                                    std::vector<ProcessPtr> processes,
65
                                                    bool initialized) {
66
    auto versioned_pool = _get_or_create_process_pool(version);
67
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
68
    versioned_pool->processes = std::move(processes);
69
    versioned_pool->initialized = initialized;
70
}
71
72
std::vector<ProcessPtr>& PythonServerManager::process_pool_for_test(const PythonVersion& version) {
73
    auto versioned_pool = _get_or_create_process_pool(version);
74
    return versioned_pool->processes;
75
}
76
#endif
77
78
template <typename ClientType>
79
Status PythonServerManager::get_client(const PythonUDFMeta& func_meta, const PythonVersion& version,
80
                                       std::shared_ptr<ClientType>* client,
81
7.21k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
82
7.21k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
83
7.21k
            DORIS_TRY(_ensure_pool_initialized(version));
84
85
7.21k
    ProcessPtr process;
86
7.21k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
87
88
7.21k
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
89
3.36k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
90
3.85k
    } else {
91
3.85k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
92
3.85k
    }
93
94
7.21k
    return Status::OK();
95
7.21k
}
_ZN5doris19PythonServerManager10get_clientINS_15PythonUDFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
81
1.28k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
82
1.28k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
83
1.28k
            DORIS_TRY(_ensure_pool_initialized(version));
84
85
1.28k
    ProcessPtr process;
86
1.28k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
87
88
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
89
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
90
1.28k
    } else {
91
1.28k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
92
1.28k
    }
93
94
1.28k
    return Status::OK();
95
1.28k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDAFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
81
3.36k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
82
3.36k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
83
3.36k
            DORIS_TRY(_ensure_pool_initialized(version));
84
85
3.36k
    ProcessPtr process;
86
3.36k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
87
88
3.36k
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
89
3.36k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
90
    } else {
91
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
92
    }
93
94
3.36k
    return Status::OK();
95
3.36k
}
_ZN5doris19PythonServerManager10get_clientINS_16PythonUDTFClientEEENS_6StatusERKNS_13PythonUDFMetaERKNS_13PythonVersionEPSt10shared_ptrIT_ERKSA_IN5arrow6SchemaEE
Line
Count
Source
81
2.57k
                                       const std::shared_ptr<arrow::Schema>& data_schema) {
82
2.57k
    std::shared_ptr<VersionedProcessPool> versioned_pool =
83
2.57k
            DORIS_TRY(_ensure_pool_initialized(version));
84
85
2.57k
    ProcessPtr process;
86
2.57k
    RETURN_IF_ERROR(_get_process(version, versioned_pool, &process));
87
88
    if constexpr (std::is_same_v<ClientType, PythonUDAFClient>) {
89
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), data_schema, client));
90
2.57k
    } else {
91
2.57k
        RETURN_IF_ERROR(ClientType::create(func_meta, std::move(process), client));
92
2.57k
    }
93
94
2.57k
    return Status::OK();
95
2.57k
}
96
97
Result<std::shared_ptr<PythonServerManager::VersionedProcessPool>>
98
7.22k
PythonServerManager::_ensure_pool_initialized(const PythonVersion& version) {
99
7.22k
    auto versioned_pool = _get_or_create_process_pool(version);
100
7.22k
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
101
102
    // Check if already initialized
103
7.23k
    if (versioned_pool->initialized) return versioned_pool;
104
105
    // 0 means use CPU core count as default, otherwise use the specified value
106
18.4E
    int max_pool_size = config::max_python_process_num > 0 ? config::max_python_process_num
107
18.4E
                                                           : CpuInfo::num_cores();
108
109
18.4E
    LOG(INFO) << "Initializing Python process pool for version " << version.to_string() << " with "
110
18.4E
              << max_pool_size
111
18.4E
              << " processes (config::max_python_process_num=" << config::max_python_process_num
112
18.4E
              << ", CPU cores=" << CpuInfo::num_cores() << ")";
113
114
18.4E
    std::vector<std::future<Status>> futures;
115
18.4E
    std::vector<ProcessPtr> temp_processes(max_pool_size);
116
117
18.4E
    for (int i = 0; i < max_pool_size; i++) {
118
112
        futures.push_back(std::async(std::launch::async, [this, &version, i, &temp_processes]() {
119
112
            ProcessPtr process;
120
112
            Status s = fork(version, &process);
121
112
            if (s.ok()) {
122
76
                temp_processes[i] = std::move(process);
123
76
            }
124
112
            return s;
125
112
        }));
126
112
    }
127
128
18.4E
    int success_count = 0;
129
18.4E
    int failure_count = 0;
130
18.4E
    for (int i = 0; i < max_pool_size; i++) {
131
112
        Status s = futures[i].get();
132
112
        if (s.ok() && temp_processes[i]) {
133
76
            versioned_pool->processes.emplace_back(std::move(temp_processes[i]));
134
76
            success_count++;
135
76
        } else {
136
36
            failure_count++;
137
36
            LOG(WARNING) << "Failed to create Python process " << (i + 1) << "/" << max_pool_size
138
36
                         << ": " << s.to_string();
139
36
        }
140
112
    }
141
142
18.4E
    if (versioned_pool->processes.empty()) {
143
3
        return ResultError(Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
144
3
                "Failed to initialize Python process pool: all {} process creation attempts failed",
145
3
                max_pool_size));
146
3
    }
147
148
18.4E
    LOG(INFO) << "Python process pool initialized for version " << version.to_string()
149
18.4E
              << ": created " << success_count << " processes"
150
18.4E
              << (failure_count > 0 ? fmt::format(" ({} failed)", failure_count) : "");
151
152
18.4E
    versioned_pool->initialized = true;
153
18.4E
    _start_health_check_thread();
154
155
18.4E
    return versioned_pool;
156
18.4E
}
157
158
Status PythonServerManager::_get_process(
159
        const PythonVersion& version, const std::shared_ptr<VersionedProcessPool>& versioned_pool,
160
7.24k
        ProcessPtr* process) {
161
7.24k
    std::lock_guard<std::mutex> lock(versioned_pool->mutex);
162
7.24k
    std::vector<ProcessPtr>& pool = versioned_pool->processes;
163
164
7.24k
    if (UNLIKELY(pool.empty())) {
165
1
        return Status::InternalError("Python process pool is empty for version {}",
166
1
                                     version.to_string());
167
1
    }
168
169
    // Prefer an already-alive process and only use load balancing inside that alive subset.
170
    // keep dead entries stay in the pool for the background health checker
171
    // unless there is no alive process left for the current request.
172
7.24k
    auto min_alive_iter = std::min_element(pool.begin(), pool.end(),
173
455k
                                           [](const ProcessPtr& a, const ProcessPtr& b) {
174
455k
                                               const bool a_alive = a && a->is_alive();
175
455k
                                               const bool b_alive = b && b->is_alive();
176
455k
                                               if (a_alive != b_alive) {
177
1
                                                   return a_alive > b_alive;
178
1
                                               }
179
455k
                                               return a.use_count() < b.use_count();
180
455k
                                           });
181
182
7.24k
    if (min_alive_iter != pool.end() && *min_alive_iter && (*min_alive_iter)->is_alive()) {
183
7.24k
        *process = *min_alive_iter;
184
7.24k
        return Status::OK();
185
7.24k
    }
186
187
    // Only reach here when the pool has no alive process at all. Try one foreground
188
    // recovery so the caller has a chance to proceed; leave batch repair to health check.
189
18.4E
    auto& candidate = pool.front();
190
18.4E
    ProcessPtr replacement;
191
18.4E
    Status status = fork(version, &replacement);
192
18.4E
    if (!status.ok()) {
193
0
        return Status::Error<ErrorCode::SERVICE_UNAVAILABLE>(
194
0
                "Python process pool has no available process for version {}, reason: {}",
195
0
                version.to_string(), status.to_string());
196
0
    }
197
198
18.4E
    candidate = std::move(replacement);
199
18.4E
    *process = candidate;
200
18.4E
    return Status::OK();
201
18.4E
}
202
203
127
Status PythonServerManager::fork(const PythonVersion& version, ProcessPtr* process) {
204
127
    std::string python_executable_path = version.get_executable_path();
205
127
    std::string fight_server_path = get_fight_server_path();
206
127
    std::string base_unix_socket_path = get_base_unix_socket_path();
207
127
    std::vector<std::string> args = {"-u", fight_server_path, base_unix_socket_path};
208
127
    boost::process::environment env = boost::this_process::environment();
209
127
    boost::process::ipstream child_output;
210
211
127
    try {
212
127
        boost::process::child c(
213
127
                python_executable_path, args, boost::process::std_out > child_output,
214
127
                boost::process::env = env,
215
127
                boost::process::on_exit([](int exit_code, const std::error_code& ec) {
216
0
                    if (ec) {
217
0
                        LOG(WARNING) << "Python UDF server exited with error: " << ec.message();
218
0
                    }
219
0
                }));
220
221
        // Wait for socket file to be created (indicates server is ready)
222
127
        std::string expected_socket_path = get_unix_socket_file_path(c.id());
223
127
        bool started_successfully = false;
224
127
        std::chrono::steady_clock::time_point start = std::chrono::steady_clock::now();
225
127
        const auto timeout = std::chrono::milliseconds(5000);
226
227
4.61k
        while (std::chrono::steady_clock::now() - start < timeout) {
228
4.57k
            struct stat buffer;
229
4.57k
            if (stat(expected_socket_path.c_str(), &buffer) == 0) {
230
85
                started_successfully = true;
231
85
                break;
232
85
            }
233
234
4.48k
            if (!c.running()) {
235
2
                break;
236
2
            }
237
4.48k
            std::this_thread::sleep_for(std::chrono::milliseconds(1));
238
4.48k
        }
239
240
127
        if (!started_successfully) {
241
2
            if (c.running()) {
242
0
                c.terminate();
243
0
                c.wait();
244
0
            }
245
2
            return Status::InternalError("Python server start failed: socket file not found at {}",
246
2
                                         expected_socket_path);
247
2
        }
248
249
125
        *process = std::make_shared<PythonUDFProcess>(std::move(c), std::move(child_output));
250
251
125
    } catch (const std::exception& e) {
252
40
        return Status::InternalError("Failed to start Python UDF server: {}", e.what());
253
40
    }
254
255
85
    return Status::OK();
256
127
}
257
258
11
void PythonServerManager::_start_health_check_thread() {
259
11
    std::lock_guard<std::mutex> lock(_health_check_mutex);
260
11
    if (_health_check_thread) return;
261
262
11
    LOG(INFO) << "Starting Python process health check thread (interval: 30 seconds)";
263
264
9
    _health_check_thread = std::make_unique<std::thread>([this]() {
265
        // Health check loop
266
70
        while (!_shutdown_flag.load(std::memory_order_acquire)) {
267
            // Wait for interval or shutdown signal
268
66
            {
269
66
                std::unique_lock<std::mutex> lock(_health_check_mutex);
270
129
                _health_check_cv.wait_for(lock, std::chrono::seconds(30), [this]() {
271
129
                    return _shutdown_flag.load(std::memory_order_acquire);
272
129
                });
273
66
            }
274
275
66
            if (_shutdown_flag.load(std::memory_order_acquire)) break;
276
277
61
            _check_and_recreate_processes();
278
61
            _refresh_memory_stats();
279
61
        }
280
281
9
        LOG(INFO) << "Python process health check thread exiting";
282
9
    });
283
9
}
284
285
62
void PythonServerManager::_check_and_recreate_processes() {
286
62
    int total_checked = 0;
287
62
    int total_dead = 0;
288
62
    int total_recreated = 0;
289
290
62
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
291
62
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
292
62
        auto& pool = versioned_pool->processes;
293
3.90k
        for (size_t i = 0; i < pool.size(); ++i) {
294
3.84k
            auto& process = pool[i];
295
3.84k
            if (!process) continue;
296
297
3.84k
            total_checked++;
298
3.84k
            if (!process->is_alive()) {
299
3
                total_dead++;
300
3
                LOG(WARNING) << "Detected dead Python process (pid=" << process->get_child_pid()
301
3
                             << ", version=" << version.to_string() << "), recreating...";
302
303
3
                ProcessPtr new_process;
304
3
                Status s = fork(version, &new_process);
305
3
                if (s.ok()) {
306
1
                    pool[i] = std::move(new_process);
307
1
                    total_recreated++;
308
1
                    LOG(INFO) << "Successfully recreated Python process for version "
309
1
                              << version.to_string();
310
2
                } else {
311
2
                    LOG(ERROR) << "Failed to recreate Python process for version "
312
2
                               << version.to_string() << ": " << s.to_string();
313
2
                    pool.erase(pool.begin() + i);
314
2
                    --i;
315
2
                }
316
3
            }
317
3.84k
        }
318
62
    }
319
320
62
    if (total_dead > 0) {
321
2
        LOG(INFO) << "Health check completed: checked=" << total_checked << ", dead=" << total_dead
322
2
                  << ", recreated=" << total_recreated;
323
2
    }
324
62
}
325
326
49
void PythonServerManager::shutdown() {
327
    // Signal health check thread to stop
328
49
    _shutdown_flag.store(true, std::memory_order_release);
329
49
    _health_check_cv.notify_one();
330
331
49
    if (_health_check_thread && _health_check_thread->joinable()) {
332
8
        _health_check_thread->join();
333
8
        _health_check_thread.reset();
334
8
    }
335
336
    // Shutdown all processes
337
49
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
338
17
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
339
17
        auto& pool = versioned_pool->processes;
340
17
        for (auto& process : pool) {
341
17
            if (process) {
342
16
                process->shutdown();
343
16
            }
344
17
        }
345
17
        pool.clear();
346
17
        versioned_pool->initialized = false;
347
17
    }
348
349
49
    {
350
49
        std::lock_guard<std::mutex> lock(_pools_mutex);
351
49
        _process_pools.clear();
352
49
    }
353
49
}
354
355
3.84k
Status PythonServerManager::_read_process_memory(pid_t pid, size_t* rss_bytes) {
356
    // Read from /proc/{pid}/statm
357
    // Format: size resident shared text lib data dt
358
3.84k
    std::string statm_path = fmt::format("/proc/{}/statm", pid);
359
3.84k
    std::ifstream statm_file(statm_path);
360
361
3.84k
    if (!statm_file.is_open()) {
362
0
        return Status::InternalError("Cannot open {}", statm_path);
363
0
    }
364
365
3.84k
    size_t size_pages = 0, rss_pages = 0;
366
    // we only care about RSS, read and ignore the total size field
367
3.84k
    statm_file >> size_pages >> rss_pages;
368
369
3.84k
    if (statm_file.fail()) {
370
0
        return Status::InternalError("Failed to read {}", statm_path);
371
0
    }
372
373
    // Convert pages to bytes
374
3.84k
    long page_size = sysconf(_SC_PAGESIZE);
375
3.84k
    *rss_bytes = rss_pages * page_size;
376
377
3.84k
    return Status::OK();
378
3.84k
}
379
380
60
void PythonServerManager::_refresh_memory_stats() {
381
60
    int64_t total_rss = 0;
382
383
60
    for (const auto& [version, versioned_pool] : _snapshot_process_pools()) {
384
60
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
385
60
        const auto& pool = versioned_pool->processes;
386
3.84k
        for (const auto& process : pool) {
387
3.84k
            if (!process || !process->is_alive()) continue;
388
389
3.84k
            size_t rss_bytes = 0;
390
3.84k
            Status s = _read_process_memory(process->get_child_pid(), &rss_bytes);
391
392
3.84k
            if (s.ok()) {
393
3.84k
                total_rss += rss_bytes;
394
3.84k
            } else [[unlikely]] {
395
0
                LOG(WARNING) << "Failed to read memory info for Python process (pid="
396
0
                             << process->get_child_pid() << "): " << s.to_string();
397
0
            }
398
3.84k
        }
399
60
    }
400
60
    _mem_tracker.set_consumption(total_rss);
401
60
    LOG(INFO) << _mem_tracker.log_usage();
402
403
60
    if (config::python_udf_processes_memory_limit_bytes > 0 &&
404
60
        total_rss > config::python_udf_processes_memory_limit_bytes) {
405
0
        LOG(WARNING) << "Python UDF process memory usage exceeds limit: rss_bytes=" << total_rss
406
0
                     << ", limit_bytes=" << config::python_udf_processes_memory_limit_bytes;
407
0
    }
408
60
}
409
410
0
Status PythonServerManager::clear_module_cache(const std::string& location) {
411
0
    if (location.empty()) {
412
0
        return Status::InvalidArgument("Empty location for clear_module_cache");
413
0
    }
414
415
0
    std::string body = fmt::format(R"({{"location": "{}"}})", location);
416
417
0
    int success_count = 0;
418
0
    int fail_count = 0;
419
0
    bool has_active_process = false;
420
421
0
    for (auto& [version, versioned_pool] : _snapshot_process_pools()) {
422
0
        std::lock_guard<std::mutex> lock(versioned_pool->mutex);
423
0
        auto& pool = versioned_pool->processes;
424
0
        for (auto& process : pool) {
425
0
            if (!process || !process->is_alive()) {
426
0
                continue;
427
0
            }
428
0
            has_active_process = true;
429
0
            try {
430
0
                auto loc_result = arrow::flight::Location::Parse(process->get_uri());
431
0
                if (!loc_result.ok()) [[unlikely]] {
432
0
                    fail_count++;
433
0
                    continue;
434
0
                }
435
436
0
                auto client_result = arrow::flight::FlightClient::Connect(*loc_result);
437
0
                if (!client_result.ok()) [[unlikely]] {
438
0
                    fail_count++;
439
0
                    continue;
440
0
                }
441
0
                auto client = std::move(*client_result);
442
443
0
                arrow::flight::Action action;
444
0
                action.type = "clear_module_cache";
445
0
                action.body = arrow::Buffer::FromString(body);
446
447
0
                auto result_stream = client->DoAction(action);
448
0
                if (!result_stream.ok()) {
449
0
                    fail_count++;
450
0
                    continue;
451
0
                }
452
453
0
                auto result = (*result_stream)->Next();
454
0
                if (result.ok() && *result) {
455
0
                    success_count++;
456
0
                } else {
457
0
                    fail_count++;
458
0
                }
459
460
0
            } catch (...) {
461
0
                fail_count++;
462
0
            }
463
0
        }
464
0
    }
465
466
0
    if (!has_active_process) {
467
0
        return Status::OK();
468
0
    }
469
470
0
    LOG(INFO) << "clear_module_cache completed for location=" << location
471
0
              << ", success=" << success_count << ", failed=" << fail_count;
472
473
0
    if (fail_count > 0) {
474
0
        return Status::InternalError(
475
0
                "clear_module_cache failed for location={}, success={}, failed={}", location,
476
0
                success_count, fail_count);
477
0
    }
478
479
0
    return Status::OK();
480
0
}
481
482
// Explicit template instantiation for UDF, UDAF and UDTF clients
483
template Status PythonServerManager::get_client<PythonUDFClient>(
484
        const PythonUDFMeta& func_meta, const PythonVersion& version,
485
        std::shared_ptr<PythonUDFClient>* client,
486
        const std::shared_ptr<arrow::Schema>& data_schema);
487
488
template Status PythonServerManager::get_client<PythonUDAFClient>(
489
        const PythonUDFMeta& func_meta, const PythonVersion& version,
490
        std::shared_ptr<PythonUDAFClient>* client,
491
        const std::shared_ptr<arrow::Schema>& data_schema);
492
493
template Status PythonServerManager::get_client<PythonUDTFClient>(
494
        const PythonUDFMeta& func_meta, const PythonVersion& version,
495
        std::shared_ptr<PythonUDTFClient>* client,
496
        const std::shared_ptr<arrow::Schema>& data_schema);
497
498
} // namespace doris