Coverage Report

Created: 2026-04-15 12:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/arrow/arrow_stream_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/arrow/arrow_stream_reader.h"
19
20
#include "arrow/io/buffered.h"
21
#include "arrow/ipc/options.h"
22
#include "arrow/ipc/reader.h"
23
#include "arrow/record_batch.h"
24
#include "arrow/result.h"
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/block/column_with_type_and_name.h"
29
#include "exec/common/arrow_column_to_doris_column.h"
30
#include "format/arrow/arrow_pip_input_stream.h"
31
#include "io/fs/stream_load_pipe.h"
32
#include "io/fs/tracing_file_reader.h"
33
#include "runtime/descriptors.h"
34
#include "runtime/runtime_state.h"
35
36
namespace doris {
37
class RuntimeProfile;
38
} // namespace doris
39
40
namespace doris {
41
42
ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profile,
43
                                     ScannerCounter* counter, const TFileScanRangeParams& params,
44
                                     const TFileRangeDesc& range,
45
                                     const std::vector<SlotDescriptor*>& file_slot_descs,
46
                                     io::IOContext* io_ctx)
47
0
        : _state(state),
48
0
          _range(range),
49
0
          _file_slot_descs(file_slot_descs),
50
0
          _io_ctx(io_ctx),
51
0
          _file_reader(nullptr) {
52
0
    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
53
0
}
54
55
0
ArrowStreamReader::~ArrowStreamReader() = default;
56
57
0
Status ArrowStreamReader::init_reader() {
58
0
    io::FileReaderSPtr file_reader;
59
0
    RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false));
60
0
    _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
61
0
                                                                     _io_ctx->file_reader_stats)
62
0
                           : file_reader;
63
0
    _pip_stream = ArrowPipInputStream::create_unique(_file_reader);
64
0
    return Status::OK();
65
0
}
66
67
0
Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
68
0
    bool has_next = false;
69
0
    RETURN_IF_ERROR(_pip_stream->HasNext(&has_next));
70
0
    if (!has_next) {
71
0
        *read_rows = 0;
72
0
        *eof = true;
73
0
        return Status::OK();
74
0
    }
75
76
    // create a reader to read data
77
0
    arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> res_open =
78
0
            arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(),
79
0
                                                      arrow::ipc::IpcReadOptions::Defaults());
80
0
    if (!res_open.ok()) {
81
0
        LOG(WARNING) << "failed to open stream reader: " << res_open.status().message();
82
0
        return Status::InternalError("failed to open stream reader: {}",
83
0
                                     res_open.status().message());
84
0
    }
85
0
    auto reader = std::move(res_open).ValueUnsafe();
86
87
    // get arrow data from reader
88
0
    arrow::Result<arrow::RecordBatchVector> res_reader = reader->ToRecordBatches();
89
0
    if (!res_reader.ok()) {
90
0
        LOG(WARNING) << "failed to read batch: " << res_reader.status().message();
91
0
        return Status::InternalError("failed to read batch: {}", res_reader.status().message());
92
0
    }
93
0
    std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches =
94
0
            std::move(res_reader).ValueUnsafe();
95
96
    // convert arrow batch to block
97
0
    auto columns = block->mutate_columns();
98
0
    size_t batch_size = out_batches.size();
99
0
    for (size_t i = 0; i < batch_size; i++) {
100
0
        arrow::RecordBatch& batch = *out_batches[i];
101
0
        auto num_rows = batch.num_rows();
102
0
        auto num_columns = batch.num_columns();
103
0
        for (int c = 0; c < num_columns; ++c) {
104
0
            arrow::Array* column = batch.column(c).get();
105
0
            std::string column_name = batch.schema()->field(c)->name();
106
107
0
            try {
108
0
                const ColumnWithTypeAndName& column_with_name = block->safe_get_by_position(c);
109
110
0
                if (column_with_name.name != column_name) {
111
0
                    return Status::InternalError("Column name mismatch: expected {}, got {}",
112
0
                                                 column_with_name.name, column_name);
113
0
                }
114
115
0
                RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
116
0
                        column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz));
117
0
            } catch (Exception& e) {
118
0
                return Status::InternalError("Failed to convert from arrow to block: {}", e.what());
119
0
            }
120
0
        }
121
0
        *read_rows += batch.num_rows();
122
0
    }
123
124
0
    *eof = (*read_rows == 0);
125
0
    return Status::OK();
126
0
}
127
128
Status ArrowStreamReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
129
0
                                      std::unordered_set<std::string>* missing_cols) {
130
0
    for (const auto& slot : _file_slot_descs) {
131
0
        name_to_type->emplace(slot->col_name(), slot->type());
132
0
    }
133
0
    return Status::OK();
134
0
}
135
136
} // namespace doris