/*
 * Decompiled with CFR 0.152.
 */
package iep.io.reactivex.netty.client;

import iep.io.reactivex.netty.channel.ObservableConnection;
import iep.io.reactivex.netty.client.ClientChannelFactory;
import iep.io.reactivex.netty.client.ClientConnectionFactory;
import iep.io.reactivex.netty.client.ClientMetricsEvent;
import iep.io.reactivex.netty.client.ConnectionPool;
import iep.io.reactivex.netty.client.MaxConnectionsBasedStrategy;
import iep.io.reactivex.netty.client.PoolConfig;
import iep.io.reactivex.netty.client.PoolExhaustedException;
import iep.io.reactivex.netty.client.PoolLimitDeterminationStrategy;
import iep.io.reactivex.netty.client.PooledConnection;
import iep.io.reactivex.netty.client.PooledConnectionFactory;
import iep.io.reactivex.netty.client.PooledConnectionReleasedEvent;
import iep.io.reactivex.netty.client.RxClient;
import iep.io.reactivex.netty.metrics.Clock;
import iep.io.reactivex.netty.metrics.MetricEventsListener;
import iep.io.reactivex.netty.metrics.MetricEventsSubject;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.Action1;
import rx.observers.Subscribers;

public class ConnectionPoolImpl<I, O>
implements ConnectionPool<I, O> {
    private static final Logger logger = LoggerFactory.getLogger(ConnectionPoolImpl.class);
    @Deprecated
    public static final PoolExhaustedException POOL_EXHAUSTED_EXCEPTION = new PoolExhaustedException("Rx Connection Pool exhausted.");
    private final ConcurrentLinkedQueue<PooledConnection<I, O>> idleConnections;
    private final ClientChannelFactory<I, O> channelFactory;
    private final ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory;
    private final PoolLimitDeterminationStrategy limitDeterminationStrategy;
    private final MetricEventsSubject<ClientMetricsEvent<?>> metricEventsSubject;
    private final RxClient.ServerInfo serverInfo;
    private final PoolConfig poolConfig;
    private final ScheduledExecutorService cleanupScheduler;
    private final AtomicInteger aquiredConnectionsCounter = new AtomicInteger();
    private final AtomicBoolean isShutdownRequested = new AtomicBoolean();
    private final AtomicBoolean isShutdownPerformed = new AtomicBoolean();
    private final ReadWriteLock shutdownLock = new ReentrantReadWriteLock();
    private final ScheduledFuture<?> idleConnCleanupScheduleFuture;

    public ConnectionPoolImpl(RxClient.ServerInfo serverInfo, PoolConfig poolConfig, PoolLimitDeterminationStrategy strategy, ScheduledExecutorService cleanupScheduler, ClientChannelFactory<I, O> channelFactory, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
        this(serverInfo, poolConfig, strategy, cleanupScheduler, new PooledConnectionFactory(poolConfig, eventsSubject), channelFactory, eventsSubject);
    }

    public ConnectionPoolImpl(RxClient.ServerInfo serverInfo, PoolConfig poolConfig, PoolLimitDeterminationStrategy strategy, ScheduledExecutorService cleanupScheduler, ClientConnectionFactory<I, O, PooledConnection<I, O>> connectionFactory, ClientChannelFactory<I, O> channelFactory, MetricEventsSubject<ClientMetricsEvent<?>> eventsSubject) {
        this.serverInfo = serverInfo;
        this.poolConfig = poolConfig;
        this.cleanupScheduler = cleanupScheduler;
        this.connectionFactory = connectionFactory;
        this.channelFactory = channelFactory;
        this.metricEventsSubject = eventsSubject;
        long scheduleDurationMillis = Math.max(30L, this.poolConfig.getMaxIdleTimeMillis());
        this.idleConnCleanupScheduleFuture = null != cleanupScheduler ? this.cleanupScheduler.scheduleWithFixedDelay(new IdleConnectionsCleanupTask(), scheduleDurationMillis, scheduleDurationMillis, TimeUnit.MILLISECONDS) : null;
        this.limitDeterminationStrategy = null == strategy ? new MaxConnectionsBasedStrategy() : strategy;
        this.metricEventsSubject.subscribe(this.limitDeterminationStrategy);
        this.idleConnections = new ConcurrentLinkedQueue();
    }

    @Override
    public Observable<ObservableConnection<I, O>> acquire() {
        return Observable.create((Observable.OnSubscribe)new Observable.OnSubscribe<ObservableConnection<I, O>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void call(Subscriber<? super ObservableConnection<I, O>> subscriber) {
                Lock lock = ConnectionPoolImpl.this.shutdownLock.readLock();
                boolean lockRetrieved = lock.tryLock();
                if (!lockRetrieved) {
                    subscriber.onError((Throwable)new IllegalStateException("Connection pool is already shutdown."));
                    return;
                }
                try {
                    if (ConnectionPoolImpl.this.isShutdownRequested.get()) {
                        subscriber.onError((Throwable)new IllegalStateException("Connection pool is already shutdown."));
                        return;
                    }
                    ConnectionPoolImpl.this.performAquire(subscriber);
                }
                finally {
                    lock.unlock();
                }
            }
        });
    }

    private void performAquire(Subscriber<? super ObservableConnection<I, O>> subscriber) {
        long startTimeMillis = Clock.newStartTimeMillis();
        try {
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_START);
            PooledConnection<I, O> idleConnection = this.getAnIdleConnection(true);
            if (null != idleConnection) {
                idleConnection.beforeReuse();
                this.channelFactory.onNewConnection(idleConnection, subscriber);
                long endTime = Clock.onEndMillis(startTimeMillis);
                this.metricEventsSubject.onEvent(ClientMetricsEvent.POOLED_CONNECTION_REUSE, endTime);
                this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_SUCCESS, endTime);
                this.aquiredConnectionsCounter.incrementAndGet();
            } else if (this.limitDeterminationStrategy.acquireCreationPermit(startTimeMillis, TimeUnit.MILLISECONDS)) {
                this.aquiredConnectionsCounter.incrementAndGet();
                Subscriber<ObservableConnection<I, O>> newConnectionSubscriber = this.newConnectionSubscriber(subscriber, startTimeMillis);
                try {
                    this.channelFactory.connect(newConnectionSubscriber, this.serverInfo, this.connectionFactory);
                }
                catch (Throwable throwable) {
                    newConnectionSubscriber.onError(throwable);
                }
            } else {
                PoolExhaustedException e = new PoolExhaustedException();
                this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, Clock.onEndMillis(startTimeMillis), e);
                subscriber.onError((Throwable)e);
            }
        }
        catch (Throwable throwable) {
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, Clock.onEndMillis(startTimeMillis), throwable);
            subscriber.onError(throwable);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Observable<Void> release(PooledConnection<I, O> connection) {
        if (null == connection) {
            return Observable.error((Throwable)new IllegalArgumentException("Returned a null connection to the pool."));
        }
        long startTimeMillis = Clock.newStartTimeMillis();
        try {
            connection.getChannel().pipeline().fireUserEventTriggered((Object)new PooledConnectionReleasedEvent(connection));
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_START);
            if (this.isShutdownRequested.get() || !connection.isUsable()) {
                this.discardConnection(connection);
                this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_SUCCESS, Clock.onEndMillis(startTimeMillis));
                Observable observable = Observable.empty();
                return observable;
            }
            this.idleConnections.add(connection);
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_SUCCESS, Clock.onEndMillis(startTimeMillis));
            Observable observable = Observable.empty();
            return observable;
        }
        catch (Throwable throwable) {
            this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_RELEASE_FAILED, Clock.onEndMillis(startTimeMillis));
            Observable observable = Observable.error((Throwable)throwable);
            return observable;
        }
        finally {
            this.aquiredConnectionsCounter.decrementAndGet();
        }
    }

    @Override
    public Observable<Void> discard(PooledConnection<I, O> connection) {
        if (null == connection) {
            return Observable.error((Throwable)new IllegalArgumentException("Returned a null connection to the pool."));
        }
        boolean removed = this.idleConnections.remove(connection);
        if (removed) {
            this.discardConnection(connection);
        }
        return Observable.empty();
    }

    @Override
    public void shutdown() {
        if (!this.isShutdownRequested.compareAndSet(false, true)) {
            return;
        }
        Observable.just((Object)1L).subscribe(this.createShutdownAction());
    }

    private Action1<Long> createShutdownAction() {
        return new Action1<Long>(){

            public void call(Long aLong) {
                boolean shutdown = ConnectionPoolImpl.this.performShutdownIfPossible();
                if (!shutdown) {
                    Observable timer = Observable.timer((long)200L, (TimeUnit)TimeUnit.MILLISECONDS);
                    timer.subscribe(ConnectionPoolImpl.this.createShutdownAction());
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private boolean performShutdownIfPossible() {
        if (this.aquiredConnectionsCounter.get() != 0) {
            return false;
        }
        Lock shutdownLock = this.shutdownLock.writeLock();
        boolean lockRetrieved = shutdownLock.tryLock();
        if (!lockRetrieved) {
            return false;
        }
        try {
            if (this.aquiredConnectionsCounter.get() == 0 && this.isShutdownPerformed.compareAndSet(false, true)) {
                this.performShutdown();
                boolean bl = true;
                return bl;
            }
        }
        finally {
            shutdownLock.unlock();
        }
        return false;
    }

    private void performShutdown() {
        if (null != this.idleConnCleanupScheduleFuture) {
            this.idleConnCleanupScheduleFuture.cancel(true);
        }
        PooledConnection<I, O> idleConnection = this.getAnIdleConnection(false);
        while (null != idleConnection) {
            this.discardConnection(idleConnection);
            idleConnection = this.getAnIdleConnection(false);
        }
        this.metricEventsSubject.onCompleted();
    }

    private PooledConnection<I, O> getAnIdleConnection(boolean claimConnectionIfFound) {
        PooledConnection<I, O> idleConnection;
        while ((idleConnection = this.idleConnections.poll()) != null) {
            if (!idleConnection.isUsable()) {
                this.discardConnection(idleConnection);
                continue;
            }
            if (claimConnectionIfFound && !idleConnection.claim()) continue;
            break;
        }
        return idleConnection;
    }

    private Observable<Void> discardConnection(PooledConnection<I, O> idleConnection) {
        this.metricEventsSubject.onEvent(ClientMetricsEvent.POOLED_CONNECTION_EVICTION);
        return idleConnection.closeUnderlyingChannel();
    }

    private Subscriber<? super ObservableConnection<I, O>> newConnectionSubscriber(final Subscriber<? super ObservableConnection<I, O>> subscriber, final long startTime) {
        return Subscribers.create((Action1)new Action1<ObservableConnection<I, O>>(){

            public void call(ObservableConnection<I, O> connection) {
                ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_SUCCESS, Clock.onEndMillis(startTime));
                PooledConnection pooledConnection = (PooledConnection)connection;
                pooledConnection.setConnectionPool(ConnectionPoolImpl.this);
                pooledConnection.updateMaxIdleTimeMillis(ConnectionPoolImpl.this.poolConfig.getMaxIdleTimeMillis());
                subscriber.onNext(connection);
                subscriber.onCompleted();
            }
        }, (Action1)new Action1<Throwable>(){

            public void call(Throwable throwable) {
                ConnectionPoolImpl.this.metricEventsSubject.onEvent(ClientMetricsEvent.POOL_ACQUIRE_FAILED, Clock.onEndMillis(startTime), throwable);
                ConnectionPoolImpl.this.aquiredConnectionsCounter.decrementAndGet();
                subscriber.onError(throwable);
            }
        });
    }

    @Override
    public Subscription subscribe(MetricEventsListener<? extends ClientMetricsEvent<?>> listener) {
        return this.metricEventsSubject.subscribe(listener);
    }

    private class IdleConnectionsCleanupTask
    implements Runnable {
        private IdleConnectionsCleanupTask() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            block6: {
                try {
                    Lock lock = ConnectionPoolImpl.this.shutdownLock.readLock();
                    boolean lockAquired = lock.tryLock();
                    if (!lockAquired) break block6;
                    try {
                        Iterator iterator = ConnectionPoolImpl.this.idleConnections.iterator();
                        while (iterator.hasNext()) {
                            PooledConnection idleConnection = (PooledConnection)iterator.next();
                            if (idleConnection.isUsable() || !idleConnection.claim()) continue;
                            iterator.remove();
                            ConnectionPoolImpl.this.discardConnection(idleConnection);
                        }
                    }
                    finally {
                        lock.unlock();
                    }
                }
                catch (Exception e) {
                    logger.error("Exception in the idle connection cleanup task. This does NOT stop the next schedule of the task. ", (Throwable)e);
                }
            }
        }
    }
}

