Coverage Report

Created: 2026-03-27 17:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
be/src/exec/sink/viceberg_merge_sink.h
Line
Count
Source
1
// Licensed to the Apache Software Foundation (ASF) under one
2
// or more contributor license agreements.  See the NOTICE file
3
// distributed with this work for additional information
4
// regarding copyright ownership.  The ASF licenses this file
5
// to you under the Apache License, Version 2.0 (the
6
// "License"); you may not use this file except in compliance
7
// with the License.  You may obtain a copy of the License at
8
//
9
//   http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing,
12
// software distributed under the License is distributed on an
13
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
// KIND, either express or implied.  See the License for the
15
// specific language governing permissions and limitations
16
// under the License.
17
18
#pragma once
19
20
#include <gen_cpp/DataSinks_types.h>
21
22
#include <memory>
23
#include <string>
24
#include <vector>
25
26
#include "common/status.h"
27
#include "exec/sink/writer/async_result_writer.h"
28
#include "exprs/vexpr_fwd.h"
29
#include "runtime/descriptors.h"
30
#include "runtime/runtime_profile.h"
31
32
namespace doris {
33
34
class ObjectPool;
35
class RuntimeState;
36
class Dependency;
37
38
class VIcebergDeleteSink;
39
class VIcebergTableWriter;
40
41
class VIcebergMergeSink final : public AsyncResultWriter {
42
public:
43
    VIcebergMergeSink(const TDataSink& t_sink, const VExprContextSPtrs& output_exprs,
44
                      std::shared_ptr<Dependency> dep, std::shared_ptr<Dependency> fin_dep);
45
46
    ~VIcebergMergeSink() override;
47
48
    Status init_properties(ObjectPool* pool, const RowDescriptor& row_desc);
49
50
    Status open(RuntimeState* state, RuntimeProfile* profile) override;
51
52
    Status write(RuntimeState* state, Block& block) override;
53
54
    Status close(Status) override;
55
56
#ifdef BE_TEST
57
3
    void set_skip_io(bool skip) { _skip_io = skip; }
58
#endif
59
60
private:
61
    Status _build_inner_sinks();
62
    Status _prepare_output_layout();
63
64
    TDataSink _t_sink;
65
    TDataSink _table_sink;
66
    TDataSink _delete_sink;
67
68
    std::unique_ptr<VIcebergTableWriter> _table_writer;
69
    std::unique_ptr<VIcebergDeleteSink> _delete_writer;
70
71
    RuntimeState* _state = nullptr;
72
73
    int _operation_idx = -1;
74
    int _row_id_idx = -1;
75
    std::vector<int> _data_column_indices;
76
77
    VExprContextSPtrs _table_output_expr_ctxs;
78
    VExprContextSPtrs _delete_output_expr_ctxs;
79
80
    size_t _row_count = 0;
81
    size_t _insert_row_count = 0;
82
    size_t _delete_row_count = 0;
83
84
    RuntimeProfile::Counter* _written_rows_counter = nullptr;
85
    RuntimeProfile::Counter* _insert_rows_counter = nullptr;
86
    RuntimeProfile::Counter* _delete_rows_counter = nullptr;
87
    RuntimeProfile::Counter* _send_data_timer = nullptr;
88
    RuntimeProfile::Counter* _open_timer = nullptr;
89
    RuntimeProfile::Counter* _close_timer = nullptr;
90
91
#ifdef BE_TEST
92
    bool _skip_io = false;
93
#endif
94
};
95
96
} // namespace doris