/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.client.hotrod.impl.transport.netty;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import net.jcip.annotations.GuardedBy;
import net.jcip.annotations.ThreadSafe;
import org.infinispan.client.hotrod.CacheTopologyInfo;
import org.infinispan.client.hotrod.FailoverRequestBalancingStrategy;
import org.infinispan.client.hotrod.RemoteCacheManager;
import org.infinispan.client.hotrod.configuration.ClusterConfiguration;
import org.infinispan.client.hotrod.configuration.Configuration;
import org.infinispan.client.hotrod.configuration.ServerConfiguration;
import org.infinispan.client.hotrod.event.impl.ClientListenerNotifier;
import org.infinispan.client.hotrod.impl.ConfigurationProperties;
import org.infinispan.client.hotrod.impl.MarshallerRegistry;
import org.infinispan.client.hotrod.impl.TopologyInfo;
import org.infinispan.client.hotrod.impl.Util;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHash;
import org.infinispan.client.hotrod.impl.consistenthash.ConsistentHashFactory;
import org.infinispan.client.hotrod.impl.consistenthash.SegmentConsistentHash;
import org.infinispan.client.hotrod.impl.operations.OperationsFactory;
import org.infinispan.client.hotrod.impl.protocol.Codec;
import org.infinispan.client.hotrod.impl.topology.CacheInfo;
import org.infinispan.client.hotrod.impl.topology.ClusterInfo;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelInitializer;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelOperation;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelPool;
import org.infinispan.client.hotrod.impl.transport.netty.ChannelRecord;
import org.infinispan.client.hotrod.impl.transport.netty.SecurityActions;
import org.infinispan.client.hotrod.impl.transport.netty.TransportHelper;
import org.infinispan.client.hotrod.logging.Log;
import org.infinispan.client.hotrod.logging.LogFactory;
import org.infinispan.commons.marshall.Marshaller;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.commons.marshall.WrappedBytes;
import org.infinispan.commons.util.Immutables;
import org.infinispan.commons.util.ProcessorInfo;

@ThreadSafe
public class ChannelFactory {
    public static final String DEFAULT_CLUSTER_NAME = "___DEFAULT-CLUSTER___";
    private static final Log log = LogFactory.getLog(ChannelFactory.class, Log.class);
    private static final boolean trace = log.isTraceEnabled();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final ConcurrentMap<SocketAddress, ChannelPool> channelPoolMap = new ConcurrentHashMap<SocketAddress, ChannelPool>();
    private final Function<SocketAddress, ChannelPool> newPool = this::newPool;
    private EventLoopGroup eventLoopGroup;
    private ExecutorService executorService;
    private OperationsFactory operationsFactory;
    private Configuration configuration;
    private int maxRetries;
    private Marshaller marshaller;
    private ClientListenerNotifier listenerNotifier;
    @GuardedBy(value="lock")
    private volatile TopologyInfo topologyInfo;
    private List<ClusterInfo> clusters;
    private MarshallerRegistry marshallerRegistry;
    private LongAdder totalRetries = new LongAdder();
    @GuardedBy(value="lock")
    private CompletableFuture<Void> clusterSwitchStage;
    @GuardedBy(value="lock")
    private final Set<SocketAddress> failedServers = new HashSet<SocketAddress>();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(Codec codec, Configuration configuration, Marshaller marshaller, ExecutorService executorService, ClientListenerNotifier listenerNotifier, MarshallerRegistry marshallerRegistry) {
        this.marshallerRegistry = marshallerRegistry;
        this.lock.writeLock().lock();
        try {
            this.marshaller = marshaller;
            this.configuration = configuration;
            this.executorService = executorService;
            this.listenerNotifier = listenerNotifier;
            int asyncThreads = this.maxAsyncThreads(executorService, configuration);
            int eventLoopThreads = SecurityActions.getIntProperty("io.netty.eventLoopThreads", ProcessorInfo.availableProcessors() * 2);
            int maxExecutors = Math.min(asyncThreads, eventLoopThreads);
            this.eventLoopGroup = TransportHelper.createEventLoopGroup(maxExecutors, executorService);
            ArrayList<InetSocketAddress> initialServers = new ArrayList<InetSocketAddress>();
            for (ServerConfiguration server : configuration.servers()) {
                initialServers.add(InetSocketAddress.createUnresolved(server.host(), server.port()));
            }
            ClusterInfo mainCluster = new ClusterInfo(DEFAULT_CLUSTER_NAME, initialServers);
            ArrayList<ClusterInfo> clustersDefinitions = new ArrayList<ClusterInfo>();
            if (log.isDebugEnabled()) {
                log.debugf("Statically configured servers: %s", initialServers);
                log.debugf("Tcp no delay = %b; client socket timeout = %d ms; connect timeout = %d ms", configuration.tcpNoDelay(), configuration.socketTimeout(), configuration.connectionTimeout());
            }
            if (!configuration.clusters().isEmpty()) {
                for (ClusterConfiguration clusterConfiguration : configuration.clusters()) {
                    ArrayList<InetSocketAddress> alternateServers = new ArrayList<InetSocketAddress>();
                    for (ServerConfiguration server : clusterConfiguration.getCluster()) {
                        alternateServers.add(InetSocketAddress.createUnresolved(server.host(), server.port()));
                    }
                    ClusterInfo alternateCluster = new ClusterInfo(clusterConfiguration.getClusterName(), alternateServers);
                    log.debugf("Add secondary cluster: %s", alternateCluster);
                    clustersDefinitions.add(alternateCluster);
                }
                clustersDefinitions.add(mainCluster);
            }
            this.clusters = Immutables.immutableListCopy(clustersDefinitions);
            this.topologyInfo = new TopologyInfo(configuration, mainCluster);
            this.operationsFactory = new OperationsFactory(this, codec, listenerNotifier, configuration);
            this.maxRetries = configuration.maxRetries();
            WrappedByteArray defaultCacheName = Util.wrapBytes(RemoteCacheManager.cacheNameBytes());
            this.topologyInfo.getOrCreateCacheInfo((WrappedBytes)defaultCacheName);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        this.pingServersIgnoreException();
    }

    private int maxAsyncThreads(ExecutorService executorService, Configuration configuration) {
        if (executorService instanceof ThreadPoolExecutor) {
            return ((ThreadPoolExecutor)executorService).getMaximumPoolSize();
        }
        return new ConfigurationProperties((Properties)configuration.asyncExecutorFactory().properties()).getDefaultExecutorFactoryPoolSize();
    }

    public MarshallerRegistry getMarshallerRegistry() {
        return this.marshallerRegistry;
    }

    private ChannelPool newPool(SocketAddress address) {
        log.debugf("Creating new channel pool for %s", address);
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.eventLoopGroup)).channel(TransportHelper.socketChannel())).remoteAddress(address).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (Object)this.configuration.connectionTimeout())).option(ChannelOption.SO_KEEPALIVE, (Object)this.configuration.tcpKeepAlive())).option(ChannelOption.TCP_NODELAY, (Object)this.configuration.tcpNoDelay())).option(ChannelOption.SO_RCVBUF, (Object)1024576);
        int maxConnections = this.configuration.connectionPool().maxActive();
        if (maxConnections < 0) {
            maxConnections = Integer.MAX_VALUE;
        }
        ChannelInitializer channelInitializer = new ChannelInitializer(bootstrap, address, this.operationsFactory, this.configuration, this);
        bootstrap.handler((ChannelHandler)channelInitializer);
        ChannelPool pool = new ChannelPool((EventExecutor)bootstrap.config().group().next(), address, channelInitializer, this.configuration.connectionPool().exhaustedAction(), this::onConnectionEvent, this.configuration.connectionPool().maxWait(), maxConnections, this.configuration.connectionPool().maxPendingRequests());
        channelInitializer.setChannelPool(pool);
        return pool;
    }

    private void pingServersIgnoreException() {
        Collection<InetSocketAddress> servers = this.topologyInfo.getAllServers();
        for (SocketAddress socketAddress : servers) {
            try {
                Util.await(this.fetchChannelAndInvoke(socketAddress, this.operationsFactory.newPingOperation(true)));
            }
            catch (Exception e) {
                if (!trace) continue;
                log.tracef(e, "Ignoring exception pinging configured servers %s to establish a connection", servers);
            }
        }
    }

    public void destroy() {
        try {
            this.channelPoolMap.values().forEach(ChannelPool::close);
            this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS).get();
            this.executorService.shutdownNow();
        }
        catch (Exception e) {
            log.warn("Exception while shutting down the connection pool.", e);
        }
    }

    public CacheTopologyInfo getCacheTopologyInfo(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            CacheTopologyInfo cacheTopologyInfo = this.topologyInfo.getCacheTopologyInfo(cacheName);
            return cacheTopologyInfo;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends ChannelOperation> T fetchChannelAndInvoke(Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        SocketAddress server;
        this.lock.writeLock().lock();
        try {
            CompletableFuture<Void> switchStage;
            if (failedServers != null && (switchStage = this.clusterSwitchStage) != null) {
                switchStage.whenComplete((__, t) -> this.fetchChannelAndInvoke(failedServers, cacheName, operation));
                ChannelOperation t2 = operation;
                return t2;
            }
            CacheInfo cacheInfo = this.topologyInfo.getCacheInfo((WrappedBytes)Util.wrapBytes(cacheName));
            FailoverRequestBalancingStrategy balancer = cacheInfo.getBalancer();
            server = balancer.nextServer(failedServers);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        return this.fetchChannelAndInvoke(server, operation);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateConsistentHash1x(List<InetSocketAddress> servers, Map<InetSocketAddress, Set<Integer>> servers2Hash, int numKeyOwners, short hashFunctionVersion, int hashSpace, byte[] cacheName, int topologyId) {
        this.lock.writeLock().lock();
        try {
            WrappedByteArray wrappedCacheName = Util.wrapBytes(cacheName);
            CacheInfo oldCacheInfo = this.topologyInfo.getCacheInfo((WrappedBytes)wrappedCacheName);
            assert (oldCacheInfo != null) : "The cache info must exist before receiving a topology update";
            ConsistentHash consistentHash = this.topologyInfo.createConsistentHash1x(servers2Hash, numKeyOwners, hashFunctionVersion, hashSpace);
            CacheInfo newCacheInfo = oldCacheInfo.withNewHash(this.topologyInfo.getTopologyAge(), topologyId, servers, consistentHash, -1);
            this.updateCacheInfo((WrappedBytes)wrappedCacheName, newCacheInfo, false);
            newCacheInfo.getTopologyIdRef().set(topologyId);
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public <T extends ChannelOperation> T fetchChannelAndInvoke(SocketAddress server, T operation) {
        ChannelPool pool = this.channelPoolMap.computeIfAbsent(server, this.newPool);
        pool.acquire(operation);
        return operation;
    }

    private void closeChannelPools(Set<? extends SocketAddress> failedServers) {
        for (SocketAddress socketAddress : failedServers) {
            log.removingServer(socketAddress);
            ChannelPool pool = (ChannelPool)this.channelPoolMap.remove(socketAddress);
            if (pool == null) continue;
            pool.close();
        }
    }

    public SocketAddress getHashAwareServer(Object key, byte[] cacheName) {
        CacheInfo cacheInfo = this.topologyInfo.getCacheInfo((WrappedBytes)Util.wrapBytes(cacheName));
        if (cacheInfo != null && cacheInfo.getConsistentHash() != null) {
            return cacheInfo.getConsistentHash().getServer(key);
        }
        return null;
    }

    public <T extends ChannelOperation> T fetchChannelAndInvoke(Object key, Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        SocketAddress server;
        CacheInfo cacheInfo = this.topologyInfo.getCacheInfo((WrappedBytes)Util.wrapBytes(cacheName));
        if (!(cacheInfo == null || cacheInfo.getConsistentHash() == null || (server = cacheInfo.getConsistentHash().getServer(key)) == null || failedServers != null && failedServers.contains(server))) {
            return this.fetchChannelAndInvoke(server, operation);
        }
        return this.fetchChannelAndInvoke(failedServers, cacheName, operation);
    }

    public <T extends ChannelOperation> T fetchChannelAndInvokeForSegments(Set<Integer> segments, Set<SocketAddress> failedServers, byte[] cacheName, T operation) {
        SocketAddress hashAwareServer = this.topologyInfo.getHashAwareServer(segments, cacheName);
        if (hashAwareServer != null) {
            return this.fetchChannelAndInvoke(hashAwareServer, operation);
        }
        return this.fetchChannelAndInvoke(failedServers, cacheName, operation);
    }

    public void releaseChannel(Channel channel) {
        if (trace) {
            log.tracef("Releasing channel %s", channel);
        }
        ChannelRecord record = ChannelRecord.of(channel);
        record.getChannelPool().release(channel, record);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void receiveTopology(byte[] cacheName, int responseTopologyAge, int responseTopologyId, List<InetSocketAddress> addresses, SocketAddress[][] segmentOwners, short hashFunctionVersion) {
        WrappedByteArray wrappedCacheName = Util.wrapBytes(cacheName);
        this.lock.writeLock().lock();
        try {
            CacheInfo cacheInfo = this.topologyInfo.getCacheInfo((WrappedBytes)wrappedCacheName);
            assert (cacheInfo != null) : "The cache info must exist before receiving a topology update";
            if (responseTopologyAge == cacheInfo.getTopologyAge() && responseTopologyId > cacheInfo.getTopologyId()) {
                CacheInfo newCacheInfo;
                log.newTopology(responseTopologyId, responseTopologyAge, addresses.size(), addresses);
                if (hashFunctionVersion >= 0) {
                    SegmentConsistentHash consistentHash = this.createConsistentHash(segmentOwners, hashFunctionVersion, cacheInfo.getCacheName());
                    newCacheInfo = cacheInfo.withNewHash(responseTopologyAge, responseTopologyId, addresses, consistentHash, segmentOwners.length);
                } else {
                    newCacheInfo = cacheInfo.withNewServers(responseTopologyAge, responseTopologyId, addresses);
                }
                this.updateCacheInfo((WrappedBytes)wrappedCacheName, newCacheInfo, false);
                newCacheInfo.getTopologyIdRef().set(responseTopologyId);
            } else if (log.isTraceEnabled()) {
                log.tracef("[%s] Ignoring outdated topology: topology id = %s, topology age = %s, servers = %s", new Object[]{cacheInfo.getCacheName(), responseTopologyId, responseTopologyAge, addresses});
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private SegmentConsistentHash createConsistentHash(SocketAddress[][] segmentOwners, short hashFunctionVersion, String cacheNameString) {
        if (log.isTraceEnabled()) {
            if (hashFunctionVersion == 0) {
                log.tracef("[%s] Not using a consistent hash function (hash function version == 0).", cacheNameString);
            } else {
                log.tracef("[%s] Updating client hash function with %s number of segments", cacheNameString, segmentOwners.length);
            }
        }
        return this.topologyInfo.createConsistentHash(segmentOwners.length, hashFunctionVersion, segmentOwners);
    }

    @GuardedBy(value="lock")
    protected void updateCacheInfo(WrappedBytes cacheName, CacheInfo newCacheInfo, boolean quiet) {
        List<InetSocketAddress> newServers = newCacheInfo.getServers();
        CacheInfo oldCacheInfo = this.topologyInfo.getCacheInfo(cacheName);
        List<InetSocketAddress> oldServers = oldCacheInfo.getServers();
        HashSet<InetSocketAddress> addedServers = new HashSet<InetSocketAddress>(newServers);
        addedServers.removeAll(oldServers);
        HashSet<SocketAddress> removedServers = new HashSet<SocketAddress>(oldServers);
        removedServers.removeAll(newServers);
        if (trace) {
            String cacheNameString = newCacheInfo.getCacheName();
            log.tracef("[%s] Current list: %s", cacheNameString, oldServers);
            log.tracef("[%s] New list: %s", cacheNameString, newServers);
            log.tracef("[%s] Added servers: %s", cacheNameString, addedServers);
            log.tracef("[%s] Removed servers: %s", cacheNameString, removedServers);
        }
        for (SocketAddress socketAddress : addedServers) {
            log.newServerAdded(socketAddress);
            this.fetchChannelAndInvoke(socketAddress, new ReleaseChannelOperation(quiet));
        }
        this.topologyInfo.updateCacheInfo(cacheName, oldCacheInfo, newCacheInfo);
        this.closeChannelPools(removedServers);
        if (!removedServers.isEmpty()) {
            this.listenerNotifier.failoverListeners(removedServers);
        }
    }

    public Collection<InetSocketAddress> getServers() {
        this.lock.readLock().lock();
        try {
            Collection<InetSocketAddress> collection = this.topologyInfo.getAllServers();
            return collection;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public List<InetSocketAddress> getServers(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            List<InetSocketAddress> list = this.topologyInfo.getServers((WrappedBytes)Util.wrapBytes(cacheName));
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public ConsistentHash getConsistentHash(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            ConsistentHash consistentHash = this.topologyInfo.getCacheInfo((WrappedBytes)Util.wrapBytes(cacheName)).getConsistentHash();
            return consistentHash;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public ConsistentHashFactory getConsistentHashFactory() {
        return this.topologyInfo.getConsistentHashFactory();
    }

    public boolean isTcpNoDelay() {
        return this.configuration.tcpNoDelay();
    }

    public boolean isTcpKeepAlive() {
        return this.configuration.tcpKeepAlive();
    }

    public int getMaxRetries() {
        return this.maxRetries;
    }

    public AtomicInteger createTopologyId(byte[] cacheName) {
        this.lock.writeLock().lock();
        try {
            AtomicInteger atomicInteger = this.topologyInfo.getOrCreateCacheInfo((WrappedBytes)Util.wrapBytes(cacheName)).getTopologyIdRef();
            return atomicInteger;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    public int getTopologyId(byte[] cacheName) {
        return this.topologyInfo.getCacheInfo((WrappedBytes)Util.wrapBytes(cacheName)).getTopologyId();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onConnectionEvent(ChannelPool pool, ChannelPool.ChannelEventType type) {
        boolean allInitialServersFailed;
        this.lock.writeLock().lock();
        try {
            if (type == ChannelPool.ChannelEventType.CONNECTED) {
                this.failedServers.remove(pool.getAddress());
                return;
            }
            if (type == ChannelPool.ChannelEventType.CONNECT_FAILED) {
                if (pool.getConnected() <= 0) {
                    this.failedServers.add(pool.getAddress());
                }
            } else {
                return;
            }
            if (trace) {
                log.tracef("Connection attempt failed, we now have %d servers with no established connections: %s", this.failedServers.size(), this.failedServers);
            }
            if (!(allInitialServersFailed = this.failedServers.containsAll(this.topologyInfo.getCluster().getInitialServers()))) {
                this.resetCachesWithFailedServers();
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (allInitialServersFailed && !this.clusters.isEmpty()) {
            this.trySwitchCluster(allInitialServersFailed);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void trySwitchCluster(boolean allInitialServersFailed) {
        ClusterInfo cluster;
        int ageBeforeSwitch;
        this.lock.writeLock().lock();
        try {
            ageBeforeSwitch = this.topologyInfo.getTopologyAge();
            cluster = this.topologyInfo.getCluster();
            if (this.clusterSwitchStage != null) {
                if (log.isTraceEnabled()) {
                    log.tracef("Cluster switch is already in progress for topology age %d", ageBeforeSwitch);
                }
                return;
            }
            this.clusterSwitchStage = new CompletableFuture();
        }
        finally {
            this.lock.writeLock().unlock();
        }
        this.checkServersAlive(cluster.getInitialServers()).thenCompose(alive -> {
            if (alive.booleanValue()) {
                if (log.isTraceEnabled()) {
                    log.tracef("Cluster %s is still alive, not switching", cluster);
                }
                return CompletableFuture.completedFuture(null);
            }
            if (trace) {
                log.tracef("Trying to switch cluster away from '%s'", cluster.getName());
            }
            return this.findLiveCluster(cluster, ageBeforeSwitch);
        }).thenAccept(newCluster -> {
            if (newCluster != null) {
                this.automaticSwitchToCluster((ClusterInfo)newCluster, cluster, ageBeforeSwitch);
            }
        }).whenComplete((__, t) -> this.completeClusterSwitch());
    }

    @GuardedBy(value="lock")
    private void resetCachesWithFailedServers() {
        ArrayList failedCaches = new ArrayList();
        ArrayList<String> nameStrings = new ArrayList<String>();
        this.topologyInfo.forEachCache((cacheNameBytes, cacheInfo) -> {
            if (this.failedServers.containsAll(cacheInfo.getServers())) {
                failedCaches.add(cacheNameBytes);
                nameStrings.add(cacheInfo.getCacheName());
            }
        });
        if (!failedCaches.isEmpty()) {
            log.revertCacheToInitialServerList(nameStrings);
            for (WrappedBytes cacheNameBytes2 : failedCaches) {
                this.topologyInfo.reset(cacheNameBytes2);
            }
        }
    }

    private void completeClusterSwitch() {
        CompletableFuture<Void> localStage;
        this.lock.writeLock().lock();
        try {
            localStage = this.clusterSwitchStage;
            this.clusterSwitchStage = null;
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (localStage != null) {
            localStage.complete(null);
        }
    }

    private CompletionStage<ClusterInfo> findLiveCluster(ClusterInfo failedCluster, int ageBeforeSwitch) {
        ArrayList<ClusterInfo> candidateClusters = new ArrayList<ClusterInfo>();
        for (ClusterInfo cluster : this.clusters) {
            String clusterName = cluster.getName();
            if (clusterName.equals(failedCluster.getName())) continue;
            candidateClusters.add(cluster);
        }
        Iterator<ClusterInfo> clusterIterator = candidateClusters.iterator();
        return this.findLiveCluster0(false, null, clusterIterator, ageBeforeSwitch);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletionStage<ClusterInfo> findLiveCluster0(boolean alive, ClusterInfo testedCluster, Iterator<ClusterInfo> clusterIterator, int ageBeforeSwitch) {
        this.lock.writeLock().lock();
        try {
            if (this.clusterSwitchStage == null || this.topologyInfo.getTopologyAge() != ageBeforeSwitch) {
                log.debugf("Cluster switch already completed by another thread, bailing out", new Object[0]);
                CompletableFuture<Object> completableFuture = CompletableFuture.completedFuture(null);
                return completableFuture;
            }
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (alive) {
            return CompletableFuture.completedFuture(testedCluster);
        }
        if (!clusterIterator.hasNext()) {
            log.debugf("All cluster addresses viewed and none worked: %s", this.clusters);
            return CompletableFuture.completedFuture(null);
        }
        ClusterInfo nextCluster = clusterIterator.next();
        return this.checkServersAlive(nextCluster.getInitialServers()).thenCompose(aliveNext -> this.findLiveCluster0((boolean)aliveNext, nextCluster, clusterIterator, ageBeforeSwitch));
    }

    private CompletionStage<Boolean> checkServersAlive(Collection<InetSocketAddress> servers) {
        if (servers.isEmpty()) {
            return CompletableFuture.completedFuture(false);
        }
        AtomicInteger remainingResponses = new AtomicInteger(servers.size());
        CompletableFuture<Boolean> allFuture = new CompletableFuture<Boolean>();
        for (SocketAddress socketAddress : servers) {
            this.fetchChannelAndInvoke(socketAddress, this.operationsFactory.newPingOperation(true)).whenComplete((result, throwable) -> {
                if (throwable != null) {
                    if (trace) {
                        log.tracef((Throwable)throwable, "Error checking whether this server is alive: %s", server);
                    }
                    if (remainingResponses.decrementAndGet() == 0) {
                        allFuture.complete(false);
                    }
                } else {
                    log.tracef("Ping to server %s succeeded", server);
                    allFuture.complete(true);
                }
            });
        }
        return allFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void automaticSwitchToCluster(ClusterInfo newCluster, ClusterInfo failedCluster, int ageBeforeSwitch) {
        this.lock.writeLock().lock();
        try {
            if (this.clusterSwitchStage == null || this.topologyInfo.getTopologyAge() != ageBeforeSwitch) {
                log.debugf("Cluster switch already completed by another thread, bailing out", new Object[0]);
                return;
            }
            this.topologyInfo.switchCluster(newCluster);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (!newCluster.getName().equals(DEFAULT_CLUSTER_NAME)) {
            log.switchedToCluster(newCluster.getName());
        } else {
            log.switchedBackToMainCluster();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean manualSwitchToCluster(String clusterName) {
        if (this.clusters.isEmpty()) {
            log.debugf("No alternative clusters configured, so can't switch cluster", new Object[0]);
            return false;
        }
        ClusterInfo cluster = this.findCluster(clusterName);
        if (cluster == null) {
            log.debugf("Cluster named %s does not exist in the configuration", clusterName);
            return false;
        }
        this.lock.writeLock().lock();
        boolean shouldComplete = false;
        try {
            if (this.clusterSwitchStage != null) {
                log.debugf("Another cluster switch is already in progress, overriding it", new Object[0]);
                shouldComplete = true;
            }
            log.debugf("Switching to cluster %s, servers: %s", clusterName, cluster.getInitialServers());
            this.topologyInfo.switchCluster(cluster);
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (log.isInfoEnabled()) {
            if (!clusterName.equals(DEFAULT_CLUSTER_NAME)) {
                log.manuallySwitchedToCluster(clusterName);
            } else {
                log.manuallySwitchedBackToMainCluster();
            }
        }
        if (shouldComplete) {
            this.completeClusterSwitch();
        }
        return true;
    }

    public Marshaller getMarshaller() {
        return this.marshaller;
    }

    public String getCurrentClusterName() {
        return this.topologyInfo.getCluster().getName();
    }

    public int getTopologyAge() {
        return this.topologyInfo.getTopologyAge();
    }

    private ClusterInfo findCluster(String clusterName) {
        for (ClusterInfo cluster : this.clusters) {
            if (!cluster.getName().equals(clusterName)) continue;
            return cluster;
        }
        return null;
    }

    public FailoverRequestBalancingStrategy getBalancer(byte[] cacheName) {
        this.lock.readLock().lock();
        try {
            FailoverRequestBalancingStrategy failoverRequestBalancingStrategy = this.topologyInfo.getCacheInfo((WrappedBytes)Util.wrapBytes(cacheName)).getBalancer();
            return failoverRequestBalancingStrategy;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public int socketTimeout() {
        return this.configuration.socketTimeout();
    }

    public int getNumActive(SocketAddress address) {
        ChannelPool pool = (ChannelPool)this.channelPoolMap.get(address);
        return pool == null ? 0 : pool.getActive();
    }

    public int getNumIdle(SocketAddress address) {
        ChannelPool pool = (ChannelPool)this.channelPoolMap.get(address);
        return pool == null ? 0 : pool.getIdle();
    }

    public int getNumActive() {
        return this.channelPoolMap.values().stream().mapToInt(ChannelPool::getActive).sum();
    }

    public int getNumIdle() {
        return this.channelPoolMap.values().stream().mapToInt(ChannelPool::getIdle).sum();
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public long getRetries() {
        return this.totalRetries.longValue();
    }

    public void incrementRetryCount() {
        this.totalRetries.increment();
    }

    private class ReleaseChannelOperation
    implements ChannelOperation {
        private final boolean quiet;

        private ReleaseChannelOperation(boolean quiet) {
            this.quiet = quiet;
        }

        @Override
        public void invoke(Channel channel) {
            ChannelFactory.this.releaseChannel(channel);
        }

        @Override
        public void cancel(SocketAddress address, Throwable cause) {
            if (!this.quiet) {
                log.failedAddingNewServer(address, cause);
            }
        }
    }
}

