/*
 * Decompiled with CFR 0.152.
 */
package org.apache.ratis.netty.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.ratis.client.DataStreamClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.security.TlsConf;
import org.apache.ratis.thirdparty.io.netty.bootstrap.Bootstrap;
import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.Channel;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFuture;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelFutureListener;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandler;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelInitializer;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelOption;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelPipeline;
import org.apache.ratis.thirdparty.io.netty.channel.EventLoopGroup;
import org.apache.ratis.thirdparty.io.netty.channel.socket.SocketChannel;
import org.apache.ratis.thirdparty.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.ratis.thirdparty.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.ratis.thirdparty.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.thirdparty.io.netty.util.concurrent.GenericFutureListener;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NettyClientStreamRpc
implements DataStreamClientRpc {
    public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
    private final String name;
    private final Connection connection;
    private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<ClientInvocationId, ReplyQueue>();
    private final TimeDuration replyQueueGracePeriod;
    private final TimeoutScheduler timeoutScheduler = TimeoutScheduler.getInstance();

    public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
        this.name = JavaUtils.getClassSimpleName(this.getClass()) + "->" + server;
        this.replyQueueGracePeriod = NettyConfigKeys.DataStream.Client.replyQueueGracePeriod(properties);
        InetSocketAddress address = NetUtils.createSocketAddr((String)server.getDataStreamAddress());
        SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
        this.connection = new Connection(address, new WorkerGroupGetter(properties), () -> NettyClientStreamRpc.newChannelInitializer(address, sslContext, this.getClientHandler()));
    }

    private ChannelInboundHandler getClientHandler() {
        return new ChannelInboundHandlerAdapter(){
            private ClientInvocationId clientInvocationId;

            public void channelRead(ChannelHandlerContext ctx, Object msg) {
                CompletableFuture<DataStreamReply> f;
                ReplyQueue queue;
                if (!(msg instanceof DataStreamReply)) {
                    LOG.error("{}: unexpected message {}", (Object)this, msg.getClass());
                    return;
                }
                DataStreamReply reply = (DataStreamReply)msg;
                LOG.debug("{}: read {}", (Object)this, (Object)reply);
                this.clientInvocationId = ClientInvocationId.valueOf((ClientId)reply.getClientId(), (long)reply.getStreamId());
                ReplyQueue replyQueue = queue = reply.isSuccess() ? (ReplyQueue)NettyClientStreamRpc.this.replies.get(this.clientInvocationId) : (ReplyQueue)NettyClientStreamRpc.this.replies.remove(this.clientInvocationId);
                if (queue != null && (f = queue.poll()) != null) {
                    Integer emptyId;
                    f.complete(reply);
                    if (!reply.isSuccess() && queue.size() > 0) {
                        IllegalStateException e = new IllegalStateException((Object)((Object)this) + ": an earlier request failed with " + reply);
                        queue.forEach(future -> future.completeExceptionally(e));
                    }
                    if ((emptyId = queue.getEmptyId()) != null) {
                        NettyClientStreamRpc.this.timeoutScheduler.onTimeout(NettyClientStreamRpc.this.replyQueueGracePeriod, () -> NettyClientStreamRpc.this.replies.computeIfPresent(this.clientInvocationId, (key, q) -> q == queue && emptyId.equals(q.getEmptyId()) ? null : q), LOG, () -> "Timeout check failed, clientInvocationId=" + this.clientInvocationId);
                    }
                }
            }

            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                LOG.warn(NettyClientStreamRpc.this.name + ": exceptionCaught", cause);
                Optional.ofNullable(this.clientInvocationId).map(NettyClientStreamRpc.this.replies::remove).orElse(ReplyQueue.EMPTY).forEach(f -> f.completeExceptionally(cause));
                LOG.warn(NettyClientStreamRpc.this.name + ": exceptionCaught", cause);
                ctx.close();
            }

            public void channelInactive(ChannelHandlerContext ctx) throws Exception {
                if (!NettyClientStreamRpc.this.connection.isClosed()) {
                    NettyClientStreamRpc.this.connection.scheduleReconnect("channel is inactive", null);
                }
                super.channelInactive(ctx);
            }
        };
    }

    static ChannelInitializer<SocketChannel> newChannelInitializer(final InetSocketAddress address, final SslContext sslContext, final ChannelInboundHandler handler) {
        return new ChannelInitializer<SocketChannel>(){

            public void initChannel(SocketChannel ch) {
                ChannelPipeline p = ch.pipeline();
                if (sslContext != null) {
                    p.addLast("ssl", (ChannelHandler)sslContext.newHandler(ch.alloc(), address.getHostName(), address.getPort()));
                }
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.newEncoder()});
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.newEncoderDataStreamRequestFilePositionCount()});
                p.addLast(new ChannelHandler[]{NettyClientStreamRpc.newDecoder()});
                p.addLast(new ChannelHandler[]{handler});
            }
        };
    }

    static MessageToMessageEncoder<DataStreamRequestByteBuffer> newEncoder() {
        return new MessageToMessageEncoder<DataStreamRequestByteBuffer>(){

            protected void encode(ChannelHandlerContext context, DataStreamRequestByteBuffer request, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamRequestByteBuffer(request, out::add, context.alloc());
            }
        };
    }

    static MessageToMessageEncoder<DataStreamRequestFilePositionCount> newEncoderDataStreamRequestFilePositionCount() {
        return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>(){

            protected void encode(ChannelHandlerContext ctx, DataStreamRequestFilePositionCount request, List<Object> out) {
                NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, out::add, ctx.alloc());
            }
        };
    }

    static ByteToMessageDecoder newDecoder() {
        return new ByteToMessageDecoder(){
            {
                this.setCumulator(ByteToMessageDecoder.COMPOSITE_CUMULATOR);
            }

            protected void decode(ChannelHandlerContext context, ByteBuf buf, List<Object> out) {
                Optional.ofNullable(NettyDataStreamUtils.decodeDataStreamReplyByteBuffer(buf)).ifPresent(out::add);
            }
        };
    }

    public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
        CompletableFuture<DataStreamReply> f = new CompletableFuture<DataStreamReply>();
        ClientInvocationId clientInvocationId = ClientInvocationId.valueOf((ClientId)request.getClientId(), (long)request.getStreamId());
        ReplyQueue q = this.replies.computeIfAbsent(clientInvocationId, key -> new ReplyQueue());
        if (!q.offer(f)) {
            f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
            return f;
        }
        Channel channel = this.connection.getChannelUninterruptibly();
        if (channel == null) {
            f.completeExceptionally((Throwable)new AlreadyClosedException(this + ": Failed to send " + request));
            return f;
        }
        LOG.debug("{}: write {}", (Object)this, (Object)request);
        channel.writeAndFlush((Object)request).addListener(future -> {
            if (!future.isSuccess()) {
                IOException e = new IOException(this + ": Failed to send " + request, future.cause());
                LOG.error("Channel write failed", (Throwable)e);
                f.completeExceptionally(e);
            }
        });
        return f;
    }

    public void close() {
        this.connection.close();
    }

    public String toString() {
        return this.name;
    }

    static class Connection {
        static final TimeDuration RECONNECT = TimeDuration.valueOf((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        private final InetSocketAddress address;
        private final WorkerGroupGetter workerGroup;
        private final Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier;
        private final AtomicReference<ChannelFuture> ref;

        Connection(InetSocketAddress address, WorkerGroupGetter workerGroup, Supplier<ChannelInitializer<SocketChannel>> channelInitializerSupplier) {
            this.address = address;
            this.workerGroup = workerGroup;
            this.channelInitializerSupplier = channelInitializerSupplier;
            this.ref = new AtomicReference<ChannelFuture>(this.connect());
        }

        Channel getChannelUninterruptibly() {
            ChannelFuture future = this.ref.get();
            if (future == null) {
                return null;
            }
            Channel channel = future.syncUninterruptibly().channel();
            if (channel.isOpen()) {
                return channel;
            }
            return this.reconnect().syncUninterruptibly().channel();
        }

        private EventLoopGroup getWorkerGroup() {
            return this.workerGroup.get();
        }

        private ChannelFuture connect() {
            return ((Bootstrap)((Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group(this.getWorkerGroup())).channel(NioSocketChannel.class)).handler((ChannelHandler)this.channelInitializerSupplier.get())).option(ChannelOption.SO_KEEPALIVE, (Object)true)).connect((SocketAddress)this.address).addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) {
                    if (!future.isSuccess()) {
                        this.scheduleReconnect(this + " failed", future.cause());
                    } else {
                        LOG.trace("{} succeed.", (Object)this);
                    }
                }
            });
        }

        void scheduleReconnect(String message, Throwable cause) {
            LOG.warn("{}: {}; schedule reconnecting to {} in {}", new Object[]{this, message, this.address, RECONNECT});
            if (cause != null) {
                LOG.warn("", cause);
            }
            this.getWorkerGroup().schedule(this::reconnect, RECONNECT.getDuration(), RECONNECT.getUnit());
        }

        private ChannelFuture reconnect() {
            MemoizedSupplier supplier = MemoizedSupplier.valueOf(this::connect);
            ChannelFuture previous = this.ref.getAndUpdate(prev -> prev == null ? null : (ChannelFuture)supplier.get());
            if (previous != null) {
                previous.channel().close();
            }
            return supplier.isInitialized() ? (ChannelFuture)supplier.get() : null;
        }

        void close() {
            ChannelFuture previous = this.ref.getAndSet(null);
            if (previous != null) {
                previous.channel().close();
            }
            this.workerGroup.shutdownGracefully();
        }

        boolean isClosed() {
            return this.ref.get() == null;
        }

        public String toString() {
            return JavaUtils.getClassSimpleName(this.getClass()) + "-" + this.address;
        }
    }

    static class ReplyQueue
    implements Iterable<CompletableFuture<DataStreamReply>> {
        static final ReplyQueue EMPTY = new ReplyQueue();
        private final Queue<CompletableFuture<DataStreamReply>> queue = new ConcurrentLinkedQueue<CompletableFuture<DataStreamReply>>();
        private int emptyId;

        ReplyQueue() {
        }

        synchronized Integer getEmptyId() {
            return this.queue.isEmpty() ? Integer.valueOf(this.emptyId) : null;
        }

        synchronized boolean offer(CompletableFuture<DataStreamReply> f) {
            if (this.queue.offer(f)) {
                ++this.emptyId;
                return true;
            }
            return false;
        }

        CompletableFuture<DataStreamReply> poll() {
            return this.queue.poll();
        }

        int size() {
            return this.queue.size();
        }

        @Override
        public Iterator<CompletableFuture<DataStreamReply>> iterator() {
            return this.queue.iterator();
        }
    }

    private static class WorkerGroupGetter
    implements Supplier<EventLoopGroup> {
        private static final AtomicReference<EventLoopGroup> SHARED_WORKER_GROUP = new AtomicReference();
        private final EventLoopGroup workerGroup;
        private final boolean ignoreShutdown;

        static EventLoopGroup newWorkerGroup(RaftProperties properties) {
            return NettyUtils.newEventLoopGroup(JavaUtils.getClassSimpleName(NettyClientStreamRpc.class) + "-workerGroup", NettyConfigKeys.DataStream.Client.workerGroupSize(properties), false);
        }

        WorkerGroupGetter(RaftProperties properties) {
            if (NettyConfigKeys.DataStream.Client.workerGroupShare(properties)) {
                this.workerGroup = SHARED_WORKER_GROUP.updateAndGet(g -> g != null ? g : WorkerGroupGetter.newWorkerGroup(properties));
                this.ignoreShutdown = true;
            } else {
                this.workerGroup = WorkerGroupGetter.newWorkerGroup(properties);
                this.ignoreShutdown = false;
            }
        }

        @Override
        public EventLoopGroup get() {
            return this.workerGroup;
        }

        void shutdownGracefully() {
            if (!this.ignoreShutdown) {
                this.workerGroup.shutdownGracefully();
            }
        }
    }
}

