Coverage Report

Created: 2026-03-15 17:28

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/format/arrow/arrow_stream_reader.cpp
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#include "format/arrow/arrow_stream_reader.h"
19
20
#include "arrow/io/buffered.h"
21
#include "arrow/ipc/options.h"
22
#include "arrow/ipc/reader.h"
23
#include "arrow/record_batch.h"
24
#include "arrow/result.h"
25
#include "common/logging.h"
26
#include "common/status.h"
27
#include "core/block/block.h"
28
#include "core/block/column_with_type_and_name.h"
29
#include "exec/common/arrow_column_to_doris_column.h"
30
#include "format/arrow/arrow_pip_input_stream.h"
31
#include "io/fs/stream_load_pipe.h"
32
#include "io/fs/tracing_file_reader.h"
33
#include "runtime/descriptors.h"
34
#include "runtime/runtime_state.h"
35
36
namespace doris {
37
#include "common/compile_check_begin.h"
38
class RuntimeProfile;
39
} // namespace doris
40
41
namespace doris {
42
43
ArrowStreamReader::ArrowStreamReader(RuntimeState* state, RuntimeProfile* profile,
44
                                     ScannerCounter* counter, const TFileScanRangeParams& params,
45
                                     const TFileRangeDesc& range,
46
                                     const std::vector<SlotDescriptor*>& file_slot_descs,
47
                                     io::IOContext* io_ctx)
48
0
        : _state(state),
49
0
          _range(range),
50
0
          _file_slot_descs(file_slot_descs),
51
0
          _io_ctx(io_ctx),
52
0
          _file_reader(nullptr) {
53
0
    TimezoneUtils::find_cctz_time_zone(TimezoneUtils::default_time_zone, _ctzz);
54
0
}
55
56
0
ArrowStreamReader::~ArrowStreamReader() = default;
57
58
0
Status ArrowStreamReader::init_reader() {
59
0
    io::FileReaderSPtr file_reader;
60
0
    RETURN_IF_ERROR(FileFactory::create_pipe_reader(_range.load_id, &file_reader, _state, false));
61
0
    _file_reader = _io_ctx ? std::make_shared<io::TracingFileReader>(std::move(file_reader),
62
0
                                                                     _io_ctx->file_reader_stats)
63
0
                           : file_reader;
64
0
    _pip_stream = ArrowPipInputStream::create_unique(_file_reader);
65
0
    return Status::OK();
66
0
}
67
68
0
Status ArrowStreamReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
69
0
    bool has_next = false;
70
0
    RETURN_IF_ERROR(_pip_stream->HasNext(&has_next));
71
0
    if (!has_next) {
72
0
        *read_rows = 0;
73
0
        *eof = true;
74
0
        return Status::OK();
75
0
    }
76
77
    // create a reader to read data
78
0
    arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> res_open =
79
0
            arrow::ipc::RecordBatchStreamReader::Open(_pip_stream.get(),
80
0
                                                      arrow::ipc::IpcReadOptions::Defaults());
81
0
    if (!res_open.ok()) {
82
0
        LOG(WARNING) << "failed to open stream reader: " << res_open.status().message();
83
0
        return Status::InternalError("failed to open stream reader: {}",
84
0
                                     res_open.status().message());
85
0
    }
86
0
    auto reader = std::move(res_open).ValueUnsafe();
87
88
    // get arrow data from reader
89
0
    arrow::Result<arrow::RecordBatchVector> res_reader = reader->ToRecordBatches();
90
0
    if (!res_reader.ok()) {
91
0
        LOG(WARNING) << "failed to read batch: " << res_reader.status().message();
92
0
        return Status::InternalError("failed to read batch: {}", res_reader.status().message());
93
0
    }
94
0
    std::vector<std::shared_ptr<arrow::RecordBatch>> out_batches =
95
0
            std::move(res_reader).ValueUnsafe();
96
97
    // convert arrow batch to block
98
0
    auto columns = block->mutate_columns();
99
0
    size_t batch_size = out_batches.size();
100
0
    for (size_t i = 0; i < batch_size; i++) {
101
0
        arrow::RecordBatch& batch = *out_batches[i];
102
0
        auto num_rows = batch.num_rows();
103
0
        auto num_columns = batch.num_columns();
104
0
        for (int c = 0; c < num_columns; ++c) {
105
0
            arrow::Array* column = batch.column(c).get();
106
0
            std::string column_name = batch.schema()->field(c)->name();
107
108
0
            try {
109
0
                const ColumnWithTypeAndName& column_with_name = block->safe_get_by_position(c);
110
111
0
                if (column_with_name.name != column_name) {
112
0
                    return Status::InternalError("Column name mismatch: expected {}, got {}",
113
0
                                                 column_with_name.name, column_name);
114
0
                }
115
116
0
                RETURN_IF_ERROR(column_with_name.type->get_serde()->read_column_from_arrow(
117
0
                        column_with_name.column->assume_mutable_ref(), column, 0, num_rows, _ctzz));
118
0
            } catch (Exception& e) {
119
0
                return Status::InternalError("Failed to convert from arrow to block: {}", e.what());
120
0
            }
121
0
        }
122
0
        *read_rows += batch.num_rows();
123
0
    }
124
125
0
    *eof = (*read_rows == 0);
126
0
    return Status::OK();
127
0
}
128
129
Status ArrowStreamReader::get_columns(std::unordered_map<std::string, DataTypePtr>* name_to_type,
130
0
                                      std::unordered_set<std::string>* missing_cols) {
131
0
    for (const auto& slot : _file_slot_descs) {
132
0
        name_to_type->emplace(slot->col_name(), slot->type());
133
0
    }
134
0
    return Status::OK();
135
0
}
136
137
#include "common/compile_check_end.h"
138
} // namespace doris