/*
 * Decompiled with CFR 0.152.
 */
package com.emc.ecs.nfsclient.network;

import com.emc.ecs.nfsclient.network.ClientIOHandler;
import com.emc.ecs.nfsclient.network.NetMgr;
import com.emc.ecs.nfsclient.network.RPCRecordDecoder;
import com.emc.ecs.nfsclient.network.RecordMarkingUtil;
import com.emc.ecs.nfsclient.rpc.RpcException;
import com.emc.ecs.nfsclient.rpc.RpcStatus;
import com.emc.ecs.nfsclient.rpc.Xdr;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.socket.nio.NioSocketChannelConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Connection {
    static final String CONNECTION_OPTION = "bourneLocalConn";
    static final String REMOTE_ADDRESS_OPTION = "remoteAddress";
    private static final Logger LOG = LoggerFactory.getLogger(Connection.class);
    private static final int CONNECT_TIMEOUT = 10000;
    private static final int MAX_SENDING_QUEUE_SIZE = 0x40000000;
    private final ClientBootstrap _clientBootstrap;
    private Channel _channel;
    ChannelFuture _channelFuture = Channels.future(null, (boolean)true);
    private final String _remoteHost;
    private final int _port;
    private final boolean _usePrivilegedPort;
    private final ConcurrentHashMap<Integer, ChannelFuture> _futureMap = new ConcurrentHashMap();
    private final ConcurrentHashMap<Integer, Xdr> _responseMap = new ConcurrentHashMap();
    private State _state = State.DISCONNECTED;

    public Connection(String remoteHost, int port, boolean usePrivilegedPort) {
        this._remoteHost = remoteHost;
        this._port = port;
        this._usePrivilegedPort = usePrivilegedPort;
        this._clientBootstrap = new ClientBootstrap(NetMgr.getInstance().getFactory());
        this._clientBootstrap.setOption(REMOTE_ADDRESS_OPTION, (Object)new InetSocketAddress(this._remoteHost, this._port));
        this._clientBootstrap.setOption("connectTimeoutMillis", (Object)10000);
        this._clientBootstrap.setOption("tcpNoDelay", (Object)true);
        this._clientBootstrap.setOption("keepAlive", (Object)true);
        this._clientBootstrap.setOption(CONNECTION_OPTION, (Object)this);
        this._clientBootstrap.setPipelineFactory(new ChannelPipelineFactory(){
            private final ChannelHandler ioHandler;
            {
                this.ioHandler = new ClientIOHandler(Connection.this._clientBootstrap);
            }

            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline((ChannelHandler[])new ChannelHandler[]{new RPCRecordDecoder(), this.ioHandler});
            }
        });
    }

    public InetSocketAddress getRemoteAddress() {
        return (InetSocketAddress)this._clientBootstrap.getOption(REMOTE_ADDRESS_OPTION);
    }

    public State getConnectionState() {
        return this._state;
    }

    public Xdr sendAndWait(int timeout, Xdr xdrRequest) throws RpcException {
        if (!this._state.equals((Object)State.CONNECTED)) {
            this._channelFuture.awaitUninterruptibly();
            if (!this._channelFuture.isSuccess()) {
                String msg = String.format("waiting for connection to be established, but failed %s", this.getRemoteAddress());
                LOG.error(msg);
                throw new RpcException(RpcStatus.NETWORK_ERROR, msg);
            }
        }
        if (!this._channel.isWritable()) {
            String msg = this._channel.isConnected() ? String.format("too many pending requests for the connection: %s", this.getRemoteAddress()) : String.format("the connection is broken: %s", this.getRemoteAddress());
            throw new RpcException(RpcStatus.NETWORK_ERROR, msg);
        }
        ChannelFuture timeoutFuture = Channels.future((Channel)this._channel);
        Integer xid = xdrRequest.getXid();
        this._futureMap.put(xid, timeoutFuture);
        RecordMarkingUtil.putRecordMarkingAndSend(this._channel, xdrRequest);
        timeoutFuture.awaitUninterruptibly((long)timeout, TimeUnit.SECONDS);
        Xdr response = this._responseMap.remove(xid);
        this._futureMap.remove(xid);
        if (!timeoutFuture.isSuccess()) {
            LOG.warn("cause:", timeoutFuture.getCause());
            if (timeoutFuture.isDone()) {
                String msg = String.format("tcp IO error on the connection: %s", this.getRemoteAddress());
                throw new RpcException(RpcStatus.NETWORK_ERROR, msg);
            }
            String msg = String.format("rpc request timeout on the connection: %s", this.getRemoteAddress());
            throw new RpcException(RpcStatus.NETWORK_ERROR, msg);
        }
        return response;
    }

    protected void connect() throws RpcException {
        if (this._state.equals((Object)State.CONNECTED)) {
            return;
        }
        final ChannelFuture oldChannelFuture = this._channelFuture;
        if (LOG.isDebugEnabled()) {
            String logPrefix = this._usePrivilegedPort ? "usePrivilegedPort " : "";
            LOG.debug("{}connecting to {}", (Object)logPrefix, (Object)this.getRemoteAddress());
        }
        this._state = State.CONNECTING;
        if (this._usePrivilegedPort) {
            this._channel = this.bindToPrivilegedPort();
            this._channelFuture = this._channel.connect((SocketAddress)this.getRemoteAddress());
        } else {
            this._channelFuture = this._clientBootstrap.connect();
            this._channel = this._channelFuture.getChannel();
        }
        NioSocketChannelConfig cfg = (NioSocketChannelConfig)this._channel.getConfig();
        cfg.setWriteBufferHighWaterMark(0x40000000);
        this._channelFuture.addListener(new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) {
                if (Connection.this._channelFuture.isSuccess()) {
                    Connection.this._state = State.CONNECTED;
                    oldChannelFuture.setSuccess();
                } else {
                    Connection.this._state = State.DISCONNECTED;
                    oldChannelFuture.cancel();
                }
            }
        });
    }

    protected void shutdown() {
        if (this._channel != null) {
            this._channel.close();
        }
    }

    protected void close() {
        this._state = State.DISCONNECTED;
        this.shutdown();
        NetMgr.getInstance().dropConnection(InetSocketAddress.createUnresolved(this._remoteHost, this._port));
        this.notifyAllPendingSenders("Channel closed, connection closing.");
    }

    protected void notifySender(Integer xid, Xdr response) {
        ChannelFuture future = this._futureMap.get(xid);
        if (future != null) {
            this._responseMap.put(xid, response);
            future.setSuccess();
        }
    }

    protected void notifyAllPendingSenders(String message) {
        for (ChannelFuture future : this._futureMap.values()) {
            future.setFailure((Throwable)new Error(message));
        }
    }

    private Channel bindToPrivilegedPort() throws RpcException {
        System.out.println("Attempting to use privileged port.");
        for (int port = 1023; port > 0; --port) {
            try {
                ChannelPipeline pipeline = this._clientBootstrap.getPipelineFactory().getPipeline();
                Channel channel = this._clientBootstrap.getFactory().newChannel(pipeline);
                channel.getConfig().setOptions(this._clientBootstrap.getOptions());
                ChannelFuture bindFuture = channel.bind((SocketAddress)new InetSocketAddress(port)).awaitUninterruptibly();
                if (!bindFuture.isSuccess()) continue;
                System.out.println("Success! Bound to port " + port);
                return bindFuture.getChannel();
            }
            catch (Exception e) {
                String msg = String.format("rpc request bind error for address: %s", this.getRemoteAddress());
                throw new RpcException(RpcStatus.NETWORK_ERROR, msg, e);
            }
        }
        throw new RpcException(RpcStatus.LOCAL_BINDING_ERROR, String.format("Cannot bind a port < 1024: %s", this.getRemoteAddress()));
    }

    public static enum State {
        DISCONNECTED,
        CONNECTING,
        CONNECTED;

    }
}

