/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.control;

import com.google.protobuf.Internal;
import com.google.protobuf.MessageLite;
import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.SocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.rpc.BasicServer;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.ConnectionManagerRegistry;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlConnectionConfig;
import org.apache.drill.exec.rpc.control.ControlConnectionManager;
import org.apache.drill.exec.rpc.control.ControlProtobufLengthDecoder;
import org.apache.drill.exec.rpc.control.ControlRpcConfig;
import org.apache.drill.exec.rpc.control.DefaultInstanceHandler;
import org.apache.drill.exec.rpc.security.ServerAuthenticationHandler;

public class ControlServer
extends BasicServer<BitControl.RpcType, ControlConnection> {
    private final ControlConnectionConfig config;
    private final ConnectionManagerRegistry connectionRegistry;
    private volatile ProxyCloseHandler proxyCloseHandler;

    public ControlServer(ControlConnectionConfig config, ConnectionManagerRegistry connectionRegistry) {
        super(ControlRpcConfig.getMapping(config.getBootstrapContext().getConfig(), config.getBootstrapContext().getExecutor()), config.getAllocator().getAsByteBufAllocator(), config.getBootstrapContext().getControlLoopGroup());
        this.config = config;
        this.connectionRegistry = connectionRegistry;
    }

    @Override
    public MessageLite getResponseDefaultInstance(int rpcType) throws RpcException {
        return DefaultInstanceHandler.getResponseDefaultInstance(rpcType);
    }

    @Override
    protected GenericFutureListener<ChannelFuture> getCloseHandler(SocketChannel ch, ControlConnection connection) {
        this.proxyCloseHandler = new ProxyCloseHandler(super.getCloseHandler(ch, connection));
        return this.proxyCloseHandler;
    }

    @Override
    protected ControlConnection initRemoteConnection(SocketChannel channel) {
        super.initRemoteConnection(channel);
        ControlConnection controlConnection = new ControlConnection(channel, "control server", this.config, this.config.getAuthMechanismToUse() == null ? this.config.getMessageHandler() : new ServerAuthenticationHandler<ControlConnection, BitControl.RpcType>(this.config.getMessageHandler(), 19, BitControl.RpcType.SASL_MESSAGE), this);
        controlConnection.incConnectionCounter();
        return controlConnection;
    }

    @Override
    protected BasicServer.ServerHandshakeHandler<BitControl.BitControlHandshake> getHandshakeHandler(final ControlConnection connection) {
        return new BasicServer.ServerHandshakeHandler<BitControl.BitControlHandshake>((Internal.EnumLite)BitControl.RpcType.HANDSHAKE, BitControl.BitControlHandshake.PARSER){

            @Override
            public MessageLite getHandshakeResponse(BitControl.BitControlHandshake inbound) throws Exception {
                if (inbound.getRpcVersion() != 3) {
                    throw new RpcException(String.format("Invalid rpc version.  Expected %d, actual %d.", inbound.getRpcVersion(), 3));
                }
                if (!inbound.hasEndpoint() || inbound.getEndpoint().getAddress().isEmpty() || inbound.getEndpoint().getControlPort() < 1) {
                    throw new RpcException(String.format("RPC didn't provide valid counter endpoint information.  Received %s.", inbound.getEndpoint()));
                }
                connection.setEndpoint(inbound.getEndpoint());
                ControlConnectionManager manager = ControlServer.this.connectionRegistry.getConnectionManager(inbound.getEndpoint());
                ControlServer.this.proxyCloseHandler.setHandler(manager.getCloseHandlerCreator().getHandler(connection, ControlServer.this.proxyCloseHandler.getHandler()));
                manager.addExternalConnection(connection);
                BitControl.BitControlHandshake.Builder builder = BitControl.BitControlHandshake.newBuilder();
                builder.setRpcVersion(3);
                if (ControlServer.this.config.getAuthMechanismToUse() != null) {
                    builder.addAllAuthenticationMechanisms(ControlServer.this.config.getAuthProvider().getAllFactoryNames());
                }
                return builder.build();
            }
        };
    }

    @Override
    protected ProtobufLengthDecoder getDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) {
        return new ControlProtobufLengthDecoder(allocator, outOfMemoryHandler);
    }

    private class ProxyCloseHandler
    implements GenericFutureListener<ChannelFuture> {
        private volatile GenericFutureListener<ChannelFuture> handler;

        public ProxyCloseHandler(GenericFutureListener<ChannelFuture> handler) {
            this.handler = handler;
        }

        public GenericFutureListener<ChannelFuture> getHandler() {
            return this.handler;
        }

        public void setHandler(GenericFutureListener<ChannelFuture> handler) {
            this.handler = handler;
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            this.handler.operationComplete(future);
        }
    }
}

