/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.Booleans;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.CountDown;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.transport.RemoteClusterAwareClient;
import org.elasticsearch.transport.RemoteClusterConnection;
import org.elasticsearch.transport.RemoteConnectionInfo;
import org.elasticsearch.transport.RemoteTransportException;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportService;

public final class RemoteClusterService
extends RemoteClusterAware
implements Closeable {
    public static final Setting<Integer> REMOTE_CONNECTIONS_PER_CLUSTER = Setting.intSetting("search.remote.connections_per_cluster", 3, 1, Setting.Property.NodeScope);
    public static final Setting<TimeValue> REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING = Setting.positiveTimeSetting("search.remote.initial_connect_timeout", TimeValue.timeValueSeconds(30L), Setting.Property.NodeScope);
    public static final Setting<String> REMOTE_NODE_ATTRIBUTE = Setting.simpleString("search.remote.node.attr", Setting.Property.NodeScope);
    public static final Setting<Boolean> ENABLE_REMOTE_CLUSTERS = Setting.boolSetting("search.remote.connect", true, Setting.Property.NodeScope);
    public static final Setting.AffixSetting<Boolean> REMOTE_CLUSTER_SKIP_UNAVAILABLE = Setting.affixKeySetting("search.remote.", "skip_unavailable", key -> Setting.boolSetting(key, false, Setting.Property.NodeScope, Setting.Property.Dynamic), REMOTE_CLUSTERS_SEEDS);
    private final TransportService transportService;
    private final int numRemoteConnections;
    private volatile Map<String, RemoteClusterConnection> remoteClusters = Collections.emptyMap();

    RemoteClusterService(Settings settings, TransportService transportService) {
        super(settings);
        this.transportService = transportService;
        this.numRemoteConnections = REMOTE_CONNECTIONS_PER_CLUSTER.get(settings);
    }

    private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
        if (seeds.containsKey("")) {
            throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
        }
        HashMap<String, RemoteClusterConnection> remoteClusters = new HashMap<String, RemoteClusterConnection>();
        if (seeds.isEmpty()) {
            connectionListener.onResponse(null);
        } else {
            CountDown countDown = new CountDown(seeds.size());
            Predicate<DiscoveryNode> nodePredicate = node -> Version.CURRENT.isCompatible(node.getVersion());
            if (REMOTE_NODE_ATTRIBUTE.exists(this.settings)) {
                String attribute = REMOTE_NODE_ATTRIBUTE.get(this.settings);
                nodePredicate = nodePredicate.and(node -> Booleans.parseBoolean(node.getAttributes().getOrDefault(attribute, "false")));
            }
            remoteClusters.putAll(this.remoteClusters);
            for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
                RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
                if (entry.getValue().isEmpty()) {
                    try {
                        IOUtils.close(remote);
                    }
                    catch (IOException e) {
                        this.logger.warn("failed to close remote cluster connections for cluster: " + entry.getKey(), (Throwable)e);
                    }
                    remoteClusters.remove(entry.getKey());
                    continue;
                }
                if (remote == null) {
                    remote = new RemoteClusterConnection(this.settings, entry.getKey(), entry.getValue(), this.transportService, this.numRemoteConnections, nodePredicate);
                    remoteClusters.put(entry.getKey(), remote);
                }
                RemoteClusterConnection finalRemote = remote;
                remote.updateSeedNodes(entry.getValue(), ActionListener.wrap(response -> {
                    if (countDown.countDown()) {
                        connectionListener.onResponse((Void)response);
                    }
                }, exception -> {
                    if (countDown.fastForward()) {
                        connectionListener.onFailure((Exception)exception);
                    }
                    if (!finalRemote.isClosed()) {
                        this.logger.warn("failed to update seed list for cluster: " + (String)entry.getKey(), (Throwable)exception);
                    }
                }));
            }
        }
        this.remoteClusters = Collections.unmodifiableMap(remoteClusters);
    }

    public boolean isCrossClusterSearchEnabled() {
        return !this.remoteClusters.isEmpty();
    }

    boolean isRemoteNodeConnected(String remoteCluster, DiscoveryNode node) {
        return this.remoteClusters.get(remoteCluster).isNodeConnected(node);
    }

    public Map<String, OriginalIndices> groupIndices(IndicesOptions indicesOptions, String[] indices, Predicate<String> indexExists) {
        HashMap<String, OriginalIndices> originalIndicesMap = new HashMap<String, OriginalIndices>();
        if (this.isCrossClusterSearchEnabled()) {
            Map<String, List<String>> groupedIndices = this.groupClusterIndices(indices, indexExists);
            for (Map.Entry<String, List<String>> entry : groupedIndices.entrySet()) {
                String clusterAlias = entry.getKey();
                List<String> originalIndices = entry.getValue();
                originalIndicesMap.put(clusterAlias, new OriginalIndices(originalIndices.toArray(new String[originalIndices.size()]), indicesOptions));
            }
            if (!originalIndicesMap.containsKey("")) {
                originalIndicesMap.put("", new OriginalIndices(Strings.EMPTY_ARRAY, indicesOptions));
            }
        } else {
            originalIndicesMap.put("", new OriginalIndices(indices, indicesOptions));
        }
        return originalIndicesMap;
    }

    boolean isRemoteClusterRegistered(String clusterName) {
        return this.remoteClusters.containsKey(clusterName);
    }

    public void collectSearchShards(IndicesOptions indicesOptions, String preference, String routing, Map<String, OriginalIndices> remoteIndicesByCluster, final ActionListener<Map<String, ClusterSearchShardsResponse>> listener) {
        final CountDown responsesCountDown = new CountDown(remoteIndicesByCluster.size());
        final ConcurrentHashMap searchShardsResponses = new ConcurrentHashMap();
        final AtomicReference transportException = new AtomicReference();
        for (Map.Entry<String, OriginalIndices> entry : remoteIndicesByCluster.entrySet()) {
            final String clusterName = entry.getKey();
            RemoteClusterConnection remoteClusterConnection = this.remoteClusters.get(clusterName);
            if (remoteClusterConnection == null) {
                throw new IllegalArgumentException("no such remote cluster: " + clusterName);
            }
            String[] indices = entry.getValue().indices();
            ClusterSearchShardsRequest searchShardsRequest = ((ClusterSearchShardsRequest)new ClusterSearchShardsRequest(indices).indicesOptions(indicesOptions).local(true)).preference(preference).routing(routing);
            remoteClusterConnection.fetchSearchShards(searchShardsRequest, new ActionListener<ClusterSearchShardsResponse>(){

                @Override
                public void onResponse(ClusterSearchShardsResponse clusterSearchShardsResponse) {
                    searchShardsResponses.put(clusterName, clusterSearchShardsResponse);
                    if (responsesCountDown.countDown()) {
                        RemoteTransportException exception = (RemoteTransportException)transportException.get();
                        if (exception == null) {
                            listener.onResponse(searchShardsResponses);
                        } else {
                            listener.onFailure((Exception)transportException.get());
                        }
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    RemoteTransportException exception = new RemoteTransportException("error while communicating with remote cluster [" + clusterName + "]", e);
                    if (!transportException.compareAndSet(null, exception)) {
                        exception = transportException.accumulateAndGet(exception, (previous, current) -> {
                            current.addSuppressed((Throwable)previous);
                            return current;
                        });
                    }
                    if (responsesCountDown.countDown()) {
                        listener.onFailure(exception);
                    }
                }
            });
        }
    }

    public Transport.Connection getConnection(DiscoveryNode node, String cluster) {
        RemoteClusterConnection connection = this.remoteClusters.get(cluster);
        if (connection == null) {
            throw new IllegalArgumentException("no such remote cluster: " + cluster);
        }
        return connection.getConnection(node);
    }

    public void ensureConnected(String clusterAlias, ActionListener<Void> listener) {
        RemoteClusterConnection remoteClusterConnection = this.remoteClusters.get(clusterAlias);
        if (remoteClusterConnection == null) {
            throw new IllegalArgumentException("no such remote cluster: " + clusterAlias);
        }
        remoteClusterConnection.ensureConnected(listener);
    }

    public Transport.Connection getConnection(String cluster) {
        RemoteClusterConnection connection = this.remoteClusters.get(cluster);
        if (connection == null) {
            throw new IllegalArgumentException("no such remote cluster: " + cluster);
        }
        return connection.getConnection();
    }

    @Override
    protected Set<String> getRemoteClusterNames() {
        return this.remoteClusters.keySet();
    }

    @Override
    public void listenForUpdates(ClusterSettings clusterSettings) {
        super.listenForUpdates(clusterSettings);
        clusterSettings.addAffixUpdateConsumer(REMOTE_CLUSTER_SKIP_UNAVAILABLE, this::updateSkipUnavailable, (clusterAlias, value) -> {});
    }

    synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavailable) {
        RemoteClusterConnection remote = this.remoteClusters.get(clusterAlias);
        if (remote != null) {
            remote.updateSkipUnavailable(skipUnavailable);
        }
    }

    @Override
    protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
        this.updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap(x -> {}, x -> {}));
    }

    void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses, ActionListener<Void> connectionListener) {
        List nodes = addresses.stream().map(address -> {
            TransportAddress transportAddress = new TransportAddress((InetSocketAddress)address);
            String id = clusterAlias + "#" + transportAddress.toString();
            Version version = Version.CURRENT.minimumCompatibilityVersion();
            return new DiscoveryNode(id, transportAddress, version);
        }).collect(Collectors.toList());
        this.updateRemoteClusters(Collections.singletonMap(clusterAlias, nodes), connectionListener);
    }

    void initializeRemoteClusters() {
        TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(this.settings);
        PlainActionFuture<Void> future = new PlainActionFuture<Void>();
        Map<String, List<DiscoveryNode>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(this.settings);
        this.updateRemoteClusters(seeds, future);
        try {
            future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        catch (TimeoutException ex) {
            this.logger.warn("failed to connect to remote clusters within {}", (Object)timeValue.toString());
        }
        catch (Exception e) {
            throw new IllegalStateException("failed to connect to remote clusters", e);
        }
    }

    @Override
    public void close() throws IOException {
        IOUtils.close(this.remoteClusters.values());
    }

    public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionInfo>> listener) {
        Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
        if (remoteClusters.isEmpty()) {
            listener.onResponse(Collections.emptyList());
        } else {
            GroupedActionListener<RemoteConnectionInfo> actionListener = new GroupedActionListener<RemoteConnectionInfo>(listener, remoteClusters.size(), Collections.emptyList());
            for (RemoteClusterConnection connection : remoteClusters.values()) {
                connection.getConnectionInfo(actionListener);
            }
        }
    }

    public void collectNodes(Set<String> clusters, final ActionListener<BiFunction<String, String, DiscoveryNode>> listener) {
        Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
        for (String cluster : clusters) {
            if (remoteClusters.containsKey(cluster)) continue;
            listener.onFailure(new IllegalArgumentException("no such remote cluster: [" + cluster + "]"));
            return;
        }
        final HashMap clusterMap = new HashMap();
        final CountDown countDown = new CountDown(clusters.size());
        final Function<String, DiscoveryNode> nullFunction = s -> null;
        for (final String cluster : clusters) {
            RemoteClusterConnection connection = remoteClusters.get(cluster);
            connection.collectNodes(new ActionListener<Function<String, DiscoveryNode>>(){

                /*
                 * WARNING - Removed try catching itself - possible behaviour change.
                 */
                @Override
                public void onResponse(Function<String, DiscoveryNode> nodeLookup) {
                    Map map = clusterMap;
                    synchronized (map) {
                        clusterMap.put(cluster, nodeLookup);
                    }
                    if (countDown.countDown()) {
                        listener.onResponse((clusterAlias, nodeId) -> (DiscoveryNode)clusterMap.getOrDefault(clusterAlias, nullFunction).apply(nodeId));
                    }
                }

                @Override
                public void onFailure(Exception e) {
                    if (countDown.fastForward()) {
                        listener.onFailure(e);
                    }
                }
            });
        }
    }

    public Client getRemoteClusterClient(ThreadPool threadPool, String clusterAlias) {
        if (!this.transportService.getRemoteClusterService().getRemoteClusterNames().contains(clusterAlias)) {
            throw new IllegalArgumentException("unknown cluster alias [" + clusterAlias + "]");
        }
        return new RemoteClusterAwareClient(this.settings, threadPool, this.transportService, clusterAlias);
    }
}

