/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kudu.client;

import com.stumbleupon.async.Callback;
import com.stumbleupon.async.Deferred;
import java.net.ConnectException;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Set;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.kudu.client.CallResponse;
import org.apache.kudu.client.InvalidAuthnTokenException;
import org.apache.kudu.client.InvalidAuthzTokenException;
import org.apache.kudu.client.KuduException;
import org.apache.kudu.client.KuduRpc;
import org.apache.kudu.client.Negotiator;
import org.apache.kudu.client.NonRecoverableException;
import org.apache.kudu.client.RecoverableException;
import org.apache.kudu.client.RpcOutboundMessage;
import org.apache.kudu.client.RpcRemoteException;
import org.apache.kudu.client.SecurityContext;
import org.apache.kudu.client.ServerInfo;
import org.apache.kudu.client.Status;
import org.apache.kudu.rpc.RpcHeader;
import org.apache.kudu.shaded.com.google.common.base.Preconditions;
import org.apache.kudu.shaded.com.google.common.collect.Lists;
import org.apache.kudu.shaded.io.netty.bootstrap.Bootstrap;
import org.apache.kudu.shaded.io.netty.buffer.Unpooled;
import org.apache.kudu.shaded.io.netty.channel.Channel;
import org.apache.kudu.shaded.io.netty.channel.ChannelFuture;
import org.apache.kudu.shaded.io.netty.channel.ChannelFutureListener;
import org.apache.kudu.shaded.io.netty.channel.ChannelHandler;
import org.apache.kudu.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.kudu.shaded.io.netty.channel.ChannelInitializer;
import org.apache.kudu.shaded.io.netty.channel.ChannelPipeline;
import org.apache.kudu.shaded.io.netty.channel.SimpleChannelInboundHandler;
import org.apache.kudu.shaded.io.netty.channel.embedded.EmbeddedChannel;
import org.apache.kudu.shaded.io.netty.channel.socket.SocketChannel;
import org.apache.kudu.shaded.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.kudu.shaded.io.netty.handler.timeout.ReadTimeoutException;
import org.apache.kudu.shaded.io.netty.handler.timeout.ReadTimeoutHandler;
import org.apache.kudu.shaded.io.netty.util.concurrent.Future;
import org.apache.kudu.shaded.io.netty.util.concurrent.GenericFutureListener;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
@InterfaceStability.Unstable
class Connection
extends SimpleChannelInboundHandler<Object> {
    private final ServerInfo serverInfo;
    private final SecurityContext securityContext;
    private final CredentialsPolicy credentialsPolicy;
    private final Bootstrap bootstrap;
    private final String saslProtocolName;
    private final boolean requireAuthentication;
    private final boolean requireEncryption;
    private final boolean encryptLoopback;
    private SocketChannel channel;
    private volatile boolean explicitlyDisconnected = false;
    private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
    private static final byte RPC_CURRENT_VERSION = 9;
    private static final byte[] CONNECTION_HEADER = new byte[]{104, 114, 112, 99, 9, 0, 0};
    private static final String NEGOTIATION_TIMEOUT_HANDLER = "negotiation-timeout-handler";
    private final long negotiationTimeoutMs;
    private final ReentrantLock lock = new ReentrantLock();
    @GuardedBy(value="lock")
    private State state;
    @GuardedBy(value="lock")
    private HashMap<Integer, Callback<Void, CallResponseInfo>> inflightMessages = new HashMap();
    @GuardedBy(value="lock")
    private ArrayList<QueuedMessage> queuedMessages = Lists.newArrayList();
    @GuardedBy(value="lock")
    private Negotiator.Success negotiationResult = null;
    @GuardedBy(value="lock")
    private Negotiator.Failure negotiationFailure = null;
    @GuardedBy(value="lock")
    private int nextCallId = 0;
    @Nullable
    @GuardedBy(value="lock")
    private ChannelFuture connectFuture;

    Connection(ServerInfo serverInfo, SecurityContext securityContext, Bootstrap bootstrap, CredentialsPolicy credentialsPolicy, String saslProtocolName, boolean requireAuthentication, boolean requireEncryption, boolean encryptLoopback, long negotiationTimeoutMs) {
        this.serverInfo = serverInfo;
        this.securityContext = securityContext;
        this.saslProtocolName = saslProtocolName;
        this.state = State.NEW;
        this.credentialsPolicy = credentialsPolicy;
        this.bootstrap = bootstrap.clone();
        this.bootstrap.handler(new ConnectionChannelInitializer());
        this.requireAuthentication = requireAuthentication;
        this.requireEncryption = requireEncryption;
        this.encryptLoopback = encryptLoopback;
        this.negotiationTimeoutMs = negotiationTimeoutMs;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                return;
            }
            Preconditions.checkState(this.state == State.CONNECTING);
            this.state = State.NEGOTIATING;
        }
        finally {
            this.lock.unlock();
        }
        ctx.writeAndFlush(Unpooled.wrappedBuffer(CONNECTION_HEADER), ctx.voidPromise());
        Negotiator negotiator = new Negotiator(this.serverInfo.getAndCanonicalizeHostname(), this.securityContext, this.credentialsPolicy == CredentialsPolicy.PRIMARY_CREDENTIALS, this.saslProtocolName, this.requireAuthentication, this.requireEncryption, this.encryptLoopback);
        ctx.pipeline().addBefore(ctx.name(), "negotiation", negotiator);
        negotiator.sendHello(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        LOG.debug("{} handling channelInactive", (Object)this.getLogPrefix());
        String msg = "connection closed";
        this.lock.lock();
        try {
            if (this.connectFuture != null && this.connectFuture.cause() != null) {
                msg = this.connectFuture.cause().toString();
            }
        }
        finally {
            this.lock.unlock();
        }
        this.cleanup(new RecoverableException(Status.NetworkError(msg)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, Object m4) throws Exception {
        Callback<Void, CallResponseInfo> responseCbk;
        if (m4 instanceof Negotiator.Success) {
            this.lock.lock();
            try {
                this.negotiationResult = (Negotiator.Success)m4;
                Preconditions.checkState(this.state == State.TERMINATED || this.inflightMessages.isEmpty());
                while (this.state != State.TERMINATED && !this.queuedMessages.isEmpty()) {
                    ArrayList<QueuedMessage> queued = this.queuedMessages;
                    for (QueuedMessage qm : queued) {
                        Callback<Void, CallResponseInfo> empty = this.inflightMessages.put(qm.message.getHeaderBuilder().getCallId(), qm.cb);
                        Preconditions.checkState(empty == null);
                    }
                    this.queuedMessages = Lists.newArrayList();
                    this.lock.unlock();
                    try {
                        for (QueuedMessage qm : queued) {
                            this.sendCallToWire(qm.message);
                        }
                    }
                    finally {
                        this.lock.lock();
                    }
                }
                if (this.state == State.TERMINATED) {
                    return;
                }
                Preconditions.checkState(this.state == State.NEGOTIATING);
                this.queuedMessages = null;
                ctx.pipeline().remove(NEGOTIATION_TIMEOUT_HANDLER);
                this.state = State.READY;
            }
            finally {
                this.lock.unlock();
            }
            return;
        }
        if (m4 instanceof Negotiator.Failure) {
            this.lock.lock();
            try {
                if (this.state == State.TERMINATED) {
                    return;
                }
                Preconditions.checkState(this.state == State.NEGOTIATING);
                Preconditions.checkState(this.inflightMessages.isEmpty());
                this.state = State.NEGOTIATION_FAILED;
                this.negotiationFailure = (Negotiator.Failure)m4;
            }
            finally {
                this.lock.unlock();
            }
            ctx.close();
            return;
        }
        if (!(m4 instanceof CallResponse)) {
            ctx.fireChannelRead(m4);
            return;
        }
        CallResponse response = (CallResponse)m4;
        RpcHeader.ResponseHeader header = response.getHeader();
        if (!header.hasCallId()) {
            int size = response.getTotalResponseSize();
            String msg = this.getLogPrefix() + " RPC response (size: " + size + ") doesn't have callID: " + header;
            LOG.error(msg);
            throw new NonRecoverableException(Status.Incomplete(msg));
        }
        int callId = header.getCallId();
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                return;
            }
            Preconditions.checkState(this.state == State.READY);
            responseCbk = this.inflightMessages.remove(callId);
        }
        finally {
            this.lock.unlock();
        }
        if (responseCbk == null) {
            String msg = this.getLogPrefix() + " invalid callID: " + callId;
            LOG.error(msg);
            throw new NonRecoverableException(Status.IllegalState(msg));
        }
        if (!header.hasIsError() || !header.getIsError()) {
            responseCbk.call(new CallResponseInfo(response, null));
            return;
        }
        RpcHeader.ErrorStatusPB.Builder errorBuilder = RpcHeader.ErrorStatusPB.newBuilder();
        KuduRpc.readProtobuf(response.getPBMessage(), errorBuilder);
        RpcHeader.ErrorStatusPB error = errorBuilder.build();
        RpcHeader.ErrorStatusPB.RpcErrorCodePB code = error.getCode();
        if (code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_SERVER_TOO_BUSY) || code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_UNAVAILABLE)) {
            responseCbk.call(new CallResponseInfo(response, new RecoverableException(Status.ServiceUnavailable(error.getMessage()))));
            return;
        }
        if (code.equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.ERROR_INVALID_AUTHORIZATION_TOKEN)) {
            responseCbk.call(new CallResponseInfo(response, new InvalidAuthzTokenException(Status.NotAuthorized(error.getMessage()))));
            return;
        }
        String message = this.getLogPrefix() + " server sent error " + error.getMessage();
        LOG.error(message);
        responseCbk.call(new CallResponseInfo(response, new RpcRemoteException(Status.RemoteError(message), error)));
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
        KuduException error;
        if (e instanceof KuduException) {
            error = (KuduException)e;
        } else if (e instanceof RejectedExecutionException) {
            String message = String.format("%s RPC rejected by the executor (ignore if shutting down)", this.getLogPrefix());
            error = new RecoverableException(Status.NetworkError(message), e);
            LOG.warn(message, e);
        } else if (e instanceof ReadTimeoutException) {
            String message = String.format("%s encountered a read timeout; closing the channel", this.getLogPrefix());
            error = new RecoverableException(Status.NetworkError(message), e);
            LOG.debug(message);
        } else if (e instanceof ClosedChannelException) {
            String message = String.format(this.explicitlyDisconnected ? "%s disconnected from peer" : "%s lost connection to peer", this.getLogPrefix());
            error = new RecoverableException(Status.NetworkError(message), e);
            LOG.info(message);
        } else if (e instanceof ConnectException) {
            String message = "Failed to connect to peer " + this.serverInfo + ": " + e.getMessage();
            error = new RecoverableException(Status.NetworkError(message), e);
            LOG.info(message);
        } else if (e instanceof SSLException && this.explicitlyDisconnected) {
            error = new RecoverableException(Status.NetworkError(String.format("%s disconnected from peer", this.getLogPrefix())));
        } else if (e instanceof SSLPeerUnverifiedException) {
            String m4 = String.format("unable to verify identity of peer %s: %s", this.serverInfo, e.getMessage());
            error = new NonRecoverableException(Status.NetworkError(m4), e);
            LOG.error(m4, e);
        } else {
            assert (!this.explicitlyDisconnected);
            String channelInfo = ctx == null ? "" : String.format(" on %s", ctx.channel());
            String message = String.format("%s unexpected exception from downstream%s", this.getLogPrefix(), channelInfo);
            error = new RecoverableException(Status.NetworkError(message), e);
            LOG.error(message, e);
        }
        this.cleanup(error);
        if (ctx != null) {
            ctx.close();
        }
    }

    public ServerInfo getServerInfo() {
        return this.serverInfo;
    }

    CredentialsPolicy getCredentialsPolicy() {
        return this.credentialsPolicy;
    }

    boolean isTerminated() {
        this.lock.lock();
        try {
            boolean bl = this.state == State.TERMINATED;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    @Nullable
    Set<RpcHeader.RpcFeatureFlag> getPeerFeatures() {
        Set<RpcHeader.RpcFeatureFlag> features = null;
        this.lock.lock();
        try {
            if (this.negotiationResult != null) {
                features = this.negotiationResult.serverFeatures;
            }
        }
        finally {
            this.lock.unlock();
        }
        return features;
    }

    String getLogPrefix() {
        return "[peer " + this.serverInfo + "]";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void enqueueMessage(RpcOutboundMessage msg, Callback<Void, CallResponseInfo> cb) throws RecoverableException {
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                throw new RecoverableException(Status.IllegalState("connection is terminated"));
            }
            if (this.state == State.NEW) {
                this.connect();
            }
            int callId = this.nextCallId++;
            RpcHeader.RequestHeader.Builder headerBuilder = msg.getHeaderBuilder();
            headerBuilder.setCallId(callId);
            int timeoutMs = headerBuilder.getTimeoutMillis();
            if (timeoutMs > 0) {
                headerBuilder.setTimeoutMillis(timeoutMs);
            }
            if (this.state != State.READY) {
                this.queuedMessages.add(new QueuedMessage(msg, cb));
                return;
            }
            assert (this.state == State.READY);
            Callback<Void, CallResponseInfo> empty = this.inflightMessages.put(callId, cb);
            Preconditions.checkState(empty == null);
        }
        finally {
            this.lock.unlock();
        }
        this.sendCallToWire(msg);
    }

    ChannelFuture disconnect() {
        this.lock.lock();
        try {
            LOG.debug("{} disconnecting while in state {}", (Object)this.getLogPrefix(), (Object)this.state);
            this.explicitlyDisconnected = true;
            if (this.state == State.NEW) {
                ChannelFuture channelFuture = new EmbeddedChannel().disconnect();
                return channelFuture;
            }
            ChannelFuture channelFuture = this.connectFuture.channel().disconnect();
            return channelFuture;
        }
        finally {
            this.lock.unlock();
        }
    }

    Deferred<Void> shutdown() {
        ChannelFuture disconnectFuture = this.disconnect();
        Deferred<Void> d = new Deferred<Void>();
        disconnectFuture.addListener(new ShutdownListener(d));
        return d;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public String toString() {
        StringBuilder buf = new StringBuilder();
        buf.append("Connection@").append(this.hashCode()).append("(channel=").append(this.channel).append(", uuid=").append(this.serverInfo.getUuid());
        int queuedMessagesNum = 0;
        int inflightMessagesNum = 0;
        this.lock.lock();
        try {
            queuedMessagesNum = this.queuedMessages == null ? 0 : this.queuedMessages.size();
            inflightMessagesNum = this.inflightMessages == null ? 0 : this.inflightMessages.size();
        }
        finally {
            this.lock.unlock();
        }
        buf.append(", #queued=").append(queuedMessagesNum).append(", #inflight=").append(inflightMessagesNum).append(")");
        return buf.toString();
    }

    @InterfaceAudience.LimitedPrivate(value={"Test"})
    boolean isReady() {
        this.lock.lock();
        try {
            boolean bl = this.state == State.READY;
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    private void sendCallToWire(RpcOutboundMessage msg) {
        assert (!this.lock.isHeldByCurrentThread());
        if (LOG.isTraceEnabled()) {
            LOG.trace("{} sending {}", (Object)this.getLogPrefix(), (Object)msg);
        }
        this.channel.writeAndFlush(msg, this.channel.voidPromise());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void cleanup(KuduException error) {
        HashMap<Integer, Callback<Void, CallResponseInfo>> inflight;
        ArrayList<QueuedMessage> queued;
        boolean needNewAuthnToken = false;
        this.lock.lock();
        try {
            if (this.state == State.TERMINATED) {
                Preconditions.checkState(this.queuedMessages == null);
                Preconditions.checkState(this.inflightMessages == null);
                return;
            }
            if (this.state == State.NEGOTIATION_FAILED) {
                Preconditions.checkState(this.negotiationFailure != null);
                Preconditions.checkState(this.inflightMessages.isEmpty());
                needNewAuthnToken = this.negotiationFailure.status.getCode().equals(RpcHeader.ErrorStatusPB.RpcErrorCodePB.FATAL_INVALID_AUTHENTICATION_TOKEN);
            }
            LOG.debug("{} cleaning up while in state {} due to: {}", new Object[]{this.getLogPrefix(), this.state, error.getMessage()});
            queued = this.queuedMessages;
            this.queuedMessages = null;
            inflight = this.inflightMessages;
            this.inflightMessages = null;
            this.state = State.TERMINATED;
        }
        finally {
            this.lock.unlock();
        }
        if (needNewAuthnToken) {
            error = new InvalidAuthnTokenException(error.getStatus());
        }
        for (Callback cb : inflight.values()) {
            try {
                cb.call(new CallResponseInfo(null, error));
            }
            catch (Exception e) {
                LOG.warn("{} exception while aborting in-flight call: {}", (Object)this.getLogPrefix(), (Object)e);
            }
        }
        if (queued != null) {
            for (QueuedMessage qm : queued) {
                try {
                    qm.cb.call(new CallResponseInfo(null, error));
                }
                catch (Exception e) {
                    LOG.warn("{} exception while aborting enqueued call: {}", (Object)this.getLogPrefix(), (Object)e);
                }
            }
        }
    }

    @GuardedBy(value="lock")
    private void connect() {
        LOG.debug("{} connecting to peer", (Object)this.getLogPrefix());
        Preconditions.checkState(this.lock.isHeldByCurrentThread());
        Preconditions.checkState(this.state == State.NEW);
        this.state = State.CONNECTING;
        this.connectFuture = this.bootstrap.connect(this.serverInfo.getResolvedAddress());
        this.connectFuture.addListener((GenericFutureListener<? extends Future<? super Void>>)new GenericFutureListener<Future<? super Void>>(){

            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                if (future.isSuccess()) {
                    LOG.debug("{} Successfully connected to peer", (Object)Connection.this.getLogPrefix());
                    return;
                }
                Throwable t2 = future.cause();
                Connection.this.exceptionCaught(null, t2);
            }
        });
        this.channel = (SocketChannel)this.connectFuture.channel();
    }

    private final class ConnectionChannelInitializer
    extends ChannelInitializer<Channel> {
        private ConnectionChannelInitializer() {
        }

        @Override
        public void initChannel(Channel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addFirst("decode-frames", (ChannelHandler)new LengthFieldBasedFrameDecoder(0x10000000, 0, 4, 0, 4));
            pipeline.addLast("decode-inbound", (ChannelHandler)new CallResponse.Decoder());
            pipeline.addLast("encode-outbound", (ChannelHandler)new RpcOutboundMessage.Encoder());
            pipeline.addLast(Connection.NEGOTIATION_TIMEOUT_HANDLER, (ChannelHandler)new ReadTimeoutHandler(Connection.this.negotiationTimeoutMs, TimeUnit.MILLISECONDS));
            pipeline.addLast("kudu-handler", (ChannelHandler)Connection.this);
        }
    }

    private static final class QueuedMessage {
        private final RpcOutboundMessage message;
        private final Callback<Void, CallResponseInfo> cb;

        QueuedMessage(RpcOutboundMessage message, Callback<Void, CallResponseInfo> cb) {
            this.message = message;
            this.cb = cb;
        }
    }

    static final class CallResponseInfo {
        public final CallResponse response;
        public final KuduException exception;

        CallResponseInfo(CallResponse response, KuduException exception) {
            this.response = response;
            this.exception = exception;
        }
    }

    private static enum State {
        NEW,
        CONNECTING,
        NEGOTIATING,
        NEGOTIATION_FAILED,
        READY,
        TERMINATED;

    }

    private static class ShutdownListener
    implements ChannelFutureListener {
        private final Deferred<Void> deferred;

        public ShutdownListener(Deferred<Void> deferred) {
            this.deferred = deferred;
        }

        @Override
        public void operationComplete(ChannelFuture future) {
            if (future.isSuccess()) {
                this.deferred.callback(null);
                return;
            }
            Throwable t2 = future.cause();
            if (t2 instanceof Exception) {
                this.deferred.callback(t2);
            } else {
                this.deferred.callback(new NonRecoverableException(Status.IllegalState("failed to shutdown: " + this), t2));
            }
        }
    }

    public static enum CredentialsPolicy {
        ANY_CREDENTIALS,
        PRIMARY_CREDENTIALS;

    }
}

