HttpDialectUtils.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.plugin.dialect;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Data;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.lang.reflect.Type;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;

/**
 * This class is used to convert sql with different dialects using sql convertor service.
 * The sql convertor service is a http service which is used to convert sql.
 * <p>
 * Features:
 * - Support multiple URLs (comma separated)
 * - Blacklist mechanism for failed URLs
 * - Automatic failover and retry
 * - URL caching and smart selection
 */
public class HttpDialectUtils {
    private static final Logger LOG = LogManager.getLogger(HttpDialectUtils.class);

    // Cache URL manager instances to avoid duplicate parsing with automatic expiration
    private static final Cache<String, UrlManager> urlManagerCache = Caffeine.newBuilder()
            .maximumSize(10)
            .expireAfterAccess(8, TimeUnit.HOURS)
            .build();

    // Blacklist recovery time (ms): 1 minute
    private static final long BLACKLIST_RECOVERY_TIME_MS = 60 * 1000;
    // Connection timeout period (ms): 3 seconds
    private static final int CONNECTION_TIMEOUT_MS = 3000;
    // Read timeout period (ms): 10 seconds
    private static final int READ_TIMEOUT_MS = 10000;

    public static String convertSql(String targetURLs, String originStmt, String dialect,
            String[] features, String config) {
        UrlManager urlManager = getOrCreateUrlManager(targetURLs);
        ConvertRequest convertRequest = new ConvertRequest(originStmt, dialect, features, config);
        String requestStr = convertRequest.toJson();

        // Try to convert SQL using intelligent URL selection strategy
        return tryConvertWithIntelligentSelection(urlManager, requestStr, originStmt);
    }

    /**
     * Try to convert SQL using intelligent URL selection strategy
     * CRITICAL: This method ensures 100% success rate when ANY service is available
     */
    private static String tryConvertWithIntelligentSelection(
            UrlManager urlManager, String requestStr, String originStmt) {
        // Strategy: Try ALL URLs in intelligent order, regardless of blacklist status
        // This ensures 100% success rate when any service is actually available

        List<String> allUrls = urlManager.getAllUrlsInPriorityOrder();

        for (String url : allUrls) {
            try {
                String result = doConvertSql(url, requestStr);
                // If no exception thrown, HTTP response was successful (200)
                // Mark URL as healthy and return result (even if empty)
                urlManager.markUrlAsHealthy(url);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Successfully converted SQL using URL: {}", url);
                }
                return result;
            } catch (Exception e) {
                LOG.warn("Failed to convert SQL using URL: {}, error: {}", url, e.getMessage());
                // Add failed URL to blacklist for future optimization
                urlManager.markUrlAsBlacklisted(url);
                // Continue trying next URL - this is CRITICAL for 100% success rate
            }
        }

        return originStmt;
    }

    /**
     * Get or create a URL manager
     */
    private static UrlManager getOrCreateUrlManager(String targetURLs) {
        return urlManagerCache.get(targetURLs, UrlManager::new);
    }

    /**
     * Perform SQL conversion for individual URL
     */
    private static String doConvertSql(String targetURL, String requestStr) throws Exception {
        HttpURLConnection connection = null;
        try {
            if (targetURL == null || targetURL.trim().isEmpty()) {
                throw new Exception("Target URL is null or empty");
            }
            URL url = new URL(targetURL.trim());
            connection = (HttpURLConnection) url.openConnection();
            connection.setRequestMethod("POST");
            connection.setRequestProperty("Content-Type", "application/json");
            connection.setUseCaches(false);
            connection.setDoOutput(true);
            connection.setConnectTimeout(CONNECTION_TIMEOUT_MS);
            connection.setReadTimeout(READ_TIMEOUT_MS);

            try (OutputStream outputStream = connection.getOutputStream()) {
                outputStream.write(requestStr.getBytes(StandardCharsets.UTF_8));
            }

            int responseCode = connection.getResponseCode();
            if (LOG.isDebugEnabled()) {
                LOG.debug("POST Response Code: {}, URL: {}, post data: {}", responseCode, targetURL, requestStr);
            }

            if (responseCode == HttpURLConnection.HTTP_OK) {
                try (InputStreamReader inputStreamReader
                        = new InputStreamReader(connection.getInputStream(), StandardCharsets.UTF_8);
                        BufferedReader in = new BufferedReader(inputStreamReader)) {
                    String inputLine;
                    StringBuilder response = new StringBuilder();

                    while ((inputLine = in.readLine()) != null) {
                        response.append(inputLine);
                    }

                    Type type = new TypeToken<ConvertResponse>() {
                    }.getType();
                    ConvertResponse result = new Gson().fromJson(response.toString(), type);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Convert response: {}, URL: {}", result, targetURL);
                    }
                    if (result.code == 0) {
                        if (!"v1".equals(result.version)) {
                            throw new Exception("Unsupported version: " + result.version);
                        }
                        return result.data;
                    } else {
                        throw new Exception("Conversion failed: " + result.message);
                    }
                }
            } else {
                throw new Exception("HTTP response code: " + responseCode);
            }
        } finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
    }

    /**
     * URL Manager - Responsible for URL parsing, caching, blacklist management, and smart selection
     */
    private static class UrlManager {
        private final List<String> parsedUrls;
        private final ConcurrentHashMap<String, BlacklistEntry> blacklist;

        public UrlManager(String urls) {
            this.parsedUrls = parseUrls(urls);
            this.blacklist = new ConcurrentHashMap<>();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Created UrlManager with URLs: {}, parsed: {}", urls, parsedUrls);
            }
        }

        /**
         * Parse comma separated URL strings
         */
        private List<String> parseUrls(String urls) {
            List<String> result = Lists.newArrayList();
            if (urls != null && !urls.trim().isEmpty()) {
                String[] urlArray = urls.split(",");
                for (String url : urlArray) {
                    String trimmedUrl = url.trim();
                    if (!trimmedUrl.isEmpty()) {
                        result.add(trimmedUrl);
                    }
                }
            }
            return result;
        }

        /**
         * Mark URL as healthy (remove from blacklist)
         */
        public void markUrlAsHealthy(String url) {
            if (blacklist.remove(url) != null) {
                LOG.info("Removed URL from blacklist due to successful request: {}", url);
            }
        }

        /**
         * Add URL to blacklist
         */
        public void markUrlAsBlacklisted(String url) {
            // If URL is already in blacklist, just return
            if (blacklist.containsKey(url)) {
                return;
            }

            long currentTime = System.currentTimeMillis();
            long recoverTime = currentTime + BLACKLIST_RECOVERY_TIME_MS;
            blacklist.put(url, new BlacklistEntry(currentTime, recoverTime));
            LOG.warn("Added URL to blacklist: {}, will recover at: {}", url, new Date(recoverTime));
        }

        /**
         * Check if URL is localhost (127.0.0.1 or localhost)
         */
        private boolean isLocalhost(String url) {
            return url.contains("127.0.0.1") || url.contains("localhost");
        }

        /**
         * Get ALL URLs in priority order for 100% success guarantee
         * CRITICAL: This method ensures we try every URL when any service might be available
         * <p>
         * Priority order:
         * 1. Localhost URLs (127.0.0.1 or localhost) that are healthy
         * 2. Other healthy URLs (randomly selected)
         * 3. Localhost URLs in blacklist
         * 4. Other blacklisted URLs (sorted by recovery time)
         */
        public List<String> getAllUrlsInPriorityOrder() {
            List<String> prioritizedUrls = Lists.newArrayList();
            List<String> healthyLocalhost = Lists.newArrayList();
            List<String> healthyOthers = Lists.newArrayList();
            List<String> blacklistedLocalhost = Lists.newArrayList();
            List<String> blacklistedOthers = Lists.newArrayList();

            long currentTime = System.currentTimeMillis();

            // Single traversal to categorize all URLs
            for (String url : parsedUrls) {
                BlacklistEntry entry = blacklist.get(url);
                boolean isHealthy = false;

                if (entry == null) {
                    // URL is not in blacklist, consider it healthy
                    isHealthy = true;
                } else if (currentTime >= entry.recoverTime) {
                    // URL has reached recovery time, remove from blacklist and consider healthy
                    blacklist.remove(url);
                    isHealthy = true;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("URL recovered from blacklist: {}", url);
                    }
                }

                boolean isLocal = isLocalhost(url);

                if (isHealthy) {
                    if (isLocal) {
                        healthyLocalhost.add(url);
                    } else {
                        healthyOthers.add(url);
                    }
                } else {
                    if (isLocal) {
                        blacklistedLocalhost.add(url);
                    } else {
                        blacklistedOthers.add(url);
                    }
                }
            }

            // Add URLs in priority order
            // 1. Healthy localhost URLs first
            prioritizedUrls.addAll(healthyLocalhost);

            // 2. Other healthy URLs (randomly shuffled for load balancing)
            Collections.shuffle(healthyOthers, ThreadLocalRandom.current());
            prioritizedUrls.addAll(healthyOthers);

            // 3. Blacklisted localhost URLs
            prioritizedUrls.addAll(blacklistedLocalhost);

            // 4. Other blacklisted URLs (sorted by recovery time)
            blacklistedOthers.sort((url1, url2) -> {
                BlacklistEntry entry1 = blacklist.get(url1);
                BlacklistEntry entry2 = blacklist.get(url2);
                if (entry1 == null && entry2 == null) {
                    return 0;
                }
                if (entry1 == null) {
                    return -1;
                }
                if (entry2 == null) {
                    return 1;
                }
                return Long.compare(entry1.recoverTime, entry2.recoverTime);
            });
            prioritizedUrls.addAll(blacklistedOthers);

            if (LOG.isDebugEnabled()) {
                LOG.debug("All URLs in priority order: {}", prioritizedUrls);
            }

            return prioritizedUrls;
        }
    }

    /**
     * Blacklist entry
     */
    private static class BlacklistEntry {
        final long blacklistedTime;
        final long recoverTime;

        BlacklistEntry(long blacklistedTime, long recoverTime) {
            this.blacklistedTime = blacklistedTime;
            this.recoverTime = recoverTime;
        }
    }

    @Data
    private static class ConvertRequest {
        private String version;    // CHECKSTYLE IGNORE THIS LINE
        private String sql_query;   // CHECKSTYLE IGNORE THIS LINE
        private String from;    // CHECKSTYLE IGNORE THIS LINE
        private String to;   // CHECKSTYLE IGNORE THIS LINE
        private String source;  // CHECKSTYLE IGNORE THIS LINE
        private String case_sensitive;  // CHECKSTYLE IGNORE THIS LINE
        private String[] enable_sql_convertor_features; // CHECKSTYLE IGNORE THIS LINE
        private String config; // CHECKSTYLE IGNORE THIS LINE

        public ConvertRequest(String originStmt, String dialect, String[] features, String config) {
            this.version = "v1";
            this.sql_query = originStmt;
            this.from = dialect;
            this.to = "doris";
            this.source = "text";
            this.case_sensitive = "0";
            this.enable_sql_convertor_features = features;
            this.config = config;
        }

        public String toJson() {
            return new Gson().toJson(this);
        }
    }

    @Data
    private static class ConvertResponse {
        private String version;    // CHECKSTYLE IGNORE THIS LINE
        private String data;    // CHECKSTYLE IGNORE THIS LINE
        private int code;   // CHECKSTYLE IGNORE THIS LINE
        private String message; // CHECKSTYLE IGNORE THIS LINE

        public String toJson() {
            return new Gson().toJson(this);
        }

        @Override
        public String toString() {
            return toJson();
        }
    }
}