/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc;

import com.google.protobuf.Internal;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.ReadTimeoutHandler;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.BindException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.rpc.AbstractHandshakeHandler;
import org.apache.drill.exec.rpc.OutOfMemoryHandler;
import org.apache.drill.exec.rpc.OutboundRpcMessage;
import org.apache.drill.exec.rpc.ProtobufLengthDecoder;
import org.apache.drill.exec.rpc.RemoteConnection;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcConfig;
import org.apache.drill.exec.rpc.RpcDecoder;
import org.apache.drill.exec.rpc.RpcEncoder;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcExceptionHandler;
import org.apache.drill.exec.rpc.ServerConnection;
import org.apache.drill.exec.rpc.TransportCheck;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class BasicServer<T extends Internal.EnumLite, SC extends ServerConnection<SC>>
extends RpcBus<T, SC> {
    final Logger logger = LoggerFactory.getLogger(this.getClass());
    private final ServerBootstrap b;
    private volatile boolean connect = false;
    private final EventLoopGroup eventLoopGroup;

    public BasicServer(final RpcConfig rpcMapping, ByteBufAllocator alloc, EventLoopGroup eventLoopGroup) {
        super(rpcMapping);
        this.eventLoopGroup = eventLoopGroup;
        this.b = ((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)((ServerBootstrap)new ServerBootstrap().channel(TransportCheck.getServerSocketChannel())).option(ChannelOption.SO_BACKLOG, 1000)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 30000)).option(ChannelOption.TCP_NODELAY, true)).option(ChannelOption.SO_REUSEADDR, true)).option(ChannelOption.SO_RCVBUF, 131072)).option(ChannelOption.SO_SNDBUF, 131072)).group(eventLoopGroup).childOption(ChannelOption.ALLOCATOR, alloc).childHandler(new ChannelInitializer<SocketChannel>(){

            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                RemoteConnection connection = BasicServer.this.initRemoteConnection(ch);
                ch.closeFuture().addListener((GenericFutureListener<? extends Future<? super Void>>)BasicServer.this.getCloseHandler(ch, connection));
                ChannelPipeline pipe = ch.pipeline();
                if (BasicServer.this.isSslEnabled()) {
                    BasicServer.this.setupSSL(pipe);
                }
                pipe.addLast("protocol-decoder", (ChannelHandler)BasicServer.this.getDecoder(connection.getAllocator(), BasicServer.this.getOutOfMemoryHandler()));
                pipe.addLast("message-decoder", (ChannelHandler)new RpcDecoder("s-" + BasicServer.this.rpcConfig.getName()));
                pipe.addLast("protocol-encoder", (ChannelHandler)new RpcEncoder("s-" + BasicServer.this.rpcConfig.getName()));
                pipe.addLast("handshake-handler", BasicServer.this.getHandshakeHandler(connection));
                if (rpcMapping.hasTimeout()) {
                    pipe.addLast("timeout-handler", (ChannelHandler)new LoggingReadTimeoutHandler(BasicServer.this, (ServerConnection)connection, rpcMapping.getTimeout()));
                }
                pipe.addLast("message-handler", (ChannelHandler)new RpcBus.InboundHandler((RpcBus)BasicServer.this, connection));
                pipe.addLast("exception-handler", new RpcExceptionHandler<RemoteConnection>(connection));
                BasicServer.this.connect = true;
            }
        });
    }

    protected void setupSSL(ChannelPipeline pipe) {
        throw new UnsupportedOperationException("SSL is implemented only by the User Server.");
    }

    protected boolean isSslEnabled() {
        return false;
    }

    public void setSslChannel(Channel c) {
    }

    protected void closeSSL() {
    }

    protected OutOfMemoryHandler getOutOfMemoryHandler() {
        return OutOfMemoryHandler.DEFAULT_INSTANCE;
    }

    protected abstract ProtobufLengthDecoder getDecoder(BufferAllocator var1, OutOfMemoryHandler var2);

    protected abstract ServerHandshakeHandler<?> getHandshakeHandler(SC var1);

    @Override
    protected abstract MessageLite getResponseDefaultInstance(int var1) throws RpcException;

    @Override
    protected void handle(SC connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException {
        connection.getCurrentHandler().handle(connection, rpcType, pBody, dBody, sender);
    }

    @Override
    protected SC initRemoteConnection(SocketChannel channel) {
        this.local = channel.localAddress();
        this.remote = channel.remoteAddress();
        return null;
    }

    public int bind(String bindAddr, int initialPort, boolean allowPortHunting) {
        int port = initialPort - 1;
        while (true) {
            try {
                this.b.bind(bindAddr, ++port).sync();
            }
            catch (Exception e) {
                if (e instanceof BindException && allowPortHunting) continue;
                UserException bindException = UserException.resourceError(e).addContext("Server type", this.getClass().getSimpleName()).message("Drillbit could not bind to port %s.", port).build(this.logger);
                throw bindException;
            }
            break;
        }
        this.connect = !this.connect;
        this.logger.debug("Server of type {} started on port {}.", (Object)this.getClass().getSimpleName(), (Object)port);
        return port;
    }

    @Override
    public void close() throws IOException {
        try {
            Stopwatch watch = Stopwatch.createStarted();
            this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.SECONDS).get();
            long elapsed = watch.elapsed(TimeUnit.MILLISECONDS);
            if (elapsed > 500L) {
                this.logger.info("closed eventLoopGroup " + this.eventLoopGroup + " in " + elapsed + " ms");
            }
            if (this.isSslEnabled()) {
                this.closeSSL();
            }
        }
        catch (InterruptedException | ExecutionException e) {
            this.logger.warn("Failure while shutting down {}. ", (Object)this.getClass().getName(), (Object)e);
            Thread.currentThread().interrupt();
        }
    }

    protected static abstract class ServerHandshakeHandler<T extends MessageLite>
    extends AbstractHandshakeHandler<T> {
        public ServerHandshakeHandler(Internal.EnumLite handshakeType, Parser<T> parser) {
            super(handshakeType, parser);
        }

        @Override
        protected void consumeHandshake(ChannelHandlerContext ctx, T inbound) throws Exception {
            OutboundRpcMessage msg = new OutboundRpcMessage(GeneralRPCProtos.RpcMode.RESPONSE, this.handshakeType, this.coordinationId, this.getHandshakeResponse(inbound), new ByteBuf[0]);
            ctx.writeAndFlush(msg);
        }

        public abstract MessageLite getHandshakeResponse(T var1) throws Exception;
    }

    private class LoggingReadTimeoutHandler
    extends ReadTimeoutHandler {
        private final SC connection;
        private final int timeoutSeconds;
        final /* synthetic */ BasicServer this$0;

        /*
         * WARNING - Possible parameter corruption
         */
        public LoggingReadTimeoutHandler(SC connection, int timeoutSeconds) {
            this.this$0 = (BasicServer)n;
            super(timeoutSeconds);
            this.connection = connection;
            this.timeoutSeconds = timeoutSeconds;
        }

        @Override
        protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
            this.this$0.logger.info("RPC connection {} timed out.  Timeout was set to {} seconds. Closing connection.", (Object)this.connection.getName(), (Object)this.timeoutSeconds);
            super.readTimedOut(ctx);
        }
    }
}

