CdcStreamTableValuedFunction.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.tablefunction;

import org.apache.doris.analysis.BrokerDesc;
import org.apache.doris.analysis.StorageBackend.StorageType;
import org.apache.doris.catalog.Column;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.datasource.jdbc.client.JdbcClient;
import org.apache.doris.job.cdc.DataSourceConfigKeys;
import org.apache.doris.job.cdc.request.FetchRecordRequest;
import org.apache.doris.job.common.DataSourceType;
import org.apache.doris.job.util.StreamingJobUtils;
import org.apache.doris.thrift.TBrokerFileStatus;
import org.apache.doris.thrift.TFileType;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

public class CdcStreamTableValuedFunction extends ExternalFileTableValuedFunction {
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private static String URI = "http://127.0.0.1:{}/api/fetchRecordStream";
    private final Map<String, String> originProps;

    public CdcStreamTableValuedFunction(Map<String, String> properties) throws AnalysisException {
        this.originProps = properties;
        processProps(properties);
        validate(properties);
    }

    private void processProps(Map<String, String> properties) throws AnalysisException {
        Map<String, String> copyProps = new HashMap<>(properties);
        copyProps.put("format", "json");
        super.parseCommonProperties(copyProps);
        this.processedParams.put("enable_cdc_client", "true");
        this.processedParams.put("uri", URI);
        this.processedParams.put("http.enable.range.request", "false");
        this.processedParams.put("http.chunk.response", "true");
        this.processedParams.put("http.method", "POST");

        String payload = generateParams(properties);
        this.processedParams.put("http.payload", payload);
        this.backendConnectProperties.putAll(processedParams);
        generateFileStatus();
    }

    private String generateParams(Map<String, String> properties) throws AnalysisException {
        FetchRecordRequest recordRequest = new FetchRecordRequest();
        recordRequest.setJobId(UUID.randomUUID().toString().replace("-", ""));
        recordRequest.setDataSource(properties.get(DataSourceConfigKeys.TYPE));
        recordRequest.setConfig(properties);
        try {
            return objectMapper.writeValueAsString(recordRequest);
        } catch (IOException e) {
            LOG.info("Failed to serialize fetch record request," + e.getMessage());
            throw new AnalysisException(e.getMessage());
        }
    }

    private void validate(Map<String, String> properties) {
        Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.JDBC_URL), "jdbc_url is required");
        Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TYPE), "type is required");
        Preconditions.checkArgument(properties.containsKey(DataSourceConfigKeys.TABLE), "table is required");
    }

    private void generateFileStatus() {
        this.fileStatuses.clear();
        this.fileStatuses.add(new TBrokerFileStatus(URI, false, Integer.MAX_VALUE, false));
    }

    @Override
    public List<Column> getTableColumns() throws AnalysisException {
        DataSourceType dataSourceType =
                DataSourceType.valueOf(processedParams.get(DataSourceConfigKeys.TYPE).toUpperCase());
        JdbcClient jdbcClient = StreamingJobUtils.getJdbcClient(dataSourceType, processedParams);
        String database = StreamingJobUtils.getRemoteDbName(dataSourceType, processedParams);
        String table = processedParams.get(DataSourceConfigKeys.TABLE);
        boolean tableExist = jdbcClient.isTableExist(database, table);
        Preconditions.checkArgument(tableExist, "Table does not exist: " + table);
        return jdbcClient.getColumnsFromJdbc(database, table);
    }

    @Override
    public TFileType getTFileType() {
        return TFileType.FILE_HTTP;
    }

    @Override
    public String getFilePath() {
        return URI;
    }

    @Override
    public BrokerDesc getBrokerDesc() {
        return new BrokerDesc("CdcStreamTvfBroker", StorageType.HTTP, originProps);
    }

    @Override
    public String getTableName() {
        return "CdcStreamTableValuedFunction";
    }

    @Override
    public List<String> getPathPartitionKeys() {
        return new ArrayList<>();
    }
}