MetaServiceListResolverProvider.java

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.cloud.rpc;

import org.apache.doris.system.SystemInfoService;

import io.grpc.EquivalentAddressGroup;
import io.grpc.NameResolver;
import io.grpc.NameResolver.Args;
import io.grpc.NameResolverProvider;
import io.grpc.Status;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class MetaServiceListResolverProvider extends NameResolverProvider {
    public static final Logger LOG = LogManager.getLogger(MetaServiceClient.class);
    public static final String MS_LIST_SCHEME = "ms-list";
    public static final String MS_LIST_SCHEME_PREFIX = MS_LIST_SCHEME + "://";

    @Override
    protected boolean isAvailable() {
        return true;
    }

    @Override
    protected int priority() {
        return 5;
    }

    @Override
    public NameResolver newNameResolver(URI targetUri, Args args) {
        return new MetaServiceListResolver(targetUri);
    }

    @Override
    public String getDefaultScheme() {
        return MS_LIST_SCHEME;
    }

    static class MetaServiceListResolver extends NameResolver {

        private Listener2 listener;

        private final URI uri;

        public MetaServiceListResolver(URI targetUri) {
            this.uri = targetUri;
        }

        private String getNamingResolverUrl() {
            return uri.getAuthority();
        }

        @Override
        public String getServiceAuthority() {
            // Be consistent with behavior in grpc-go, authority is saved in Host field of
            // URI.
            if (uri.getHost() != null) {
                return uri.getHost();
            }
            return "no host";
        }

        @Override
        public void shutdown() {
        }

        @Override
        public void start(Listener2 listener) {
            this.listener = listener;
            this.resolve();
        }

        private void resolve() {
            try {
                List<SocketAddress> endpoints = new ArrayList<>();
                for (String endpoint : getNamingResolverUrl().split(",")) {
                    if (endpoint.isEmpty()) {
                        continue;
                    }
                    SystemInfoService.HostInfo info = SystemInfoService.getHostAndPort(endpoint);
                    endpoints.add(new InetSocketAddress(info.getHost(), info.getPort()));
                }

                List<EquivalentAddressGroup> equivalentAddressGroup = endpoints.stream()
                        // every socket address is a single EquivalentAddressGroup, so they can be
                        // accessed randomly
                        .map(Arrays::asList)
                        .map(this::addrToEquivalentAddressGroup)
                        .collect(Collectors.toList());

                ResolutionResult resolutionResult = ResolutionResult.newBuilder()
                        .setAddresses(equivalentAddressGroup)
                        .build();

                this.listener.onResult(resolutionResult);

            } catch (Exception e) {
                // when error occurs, notify listener
                this.listener.onError(Status.UNAVAILABLE
                        .withDescription(
                                "Unable to resolve host for meta service endpoint list: " + getNamingResolverUrl())
                        .withCause(e));
            }
        }

        private EquivalentAddressGroup addrToEquivalentAddressGroup(List<SocketAddress> addrList) {
            return new EquivalentAddressGroup(addrList);
        }
    }

}