/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.io.netty.incubator.channel.uring;

import java.io.IOException;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.shade.io.netty.channel.EventLoopTaskQueueFactory;
import org.apache.pulsar.shade.io.netty.channel.SingleThreadEventLoop;
import org.apache.pulsar.shade.io.netty.channel.unix.Errors;
import org.apache.pulsar.shade.io.netty.channel.unix.FileDescriptor;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.AbstractIOUringChannel;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.IOUring;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.IOUringCompletionQueue;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.IOUringCompletionQueueCallback;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.IOUringEventLoopGroup;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.IOUringSubmissionQueue;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.Native;
import org.apache.pulsar.shade.io.netty.incubator.channel.uring.RingBuffer;
import org.apache.pulsar.shade.io.netty.util.collection.IntObjectHashMap;
import org.apache.pulsar.shade.io.netty.util.collection.IntObjectMap;
import org.apache.pulsar.shade.io.netty.util.concurrent.RejectedExecutionHandler;
import org.apache.pulsar.shade.io.netty.util.internal.PlatformDependent;
import org.apache.pulsar.shade.io.netty.util.internal.logging.InternalLogger;
import org.apache.pulsar.shade.io.netty.util.internal.logging.InternalLoggerFactory;

public final class IOUringEventLoop
extends SingleThreadEventLoop {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(IOUringEventLoop.class);
    private final long eventfdReadBuf = PlatformDependent.allocateMemory(8L);
    private final IntObjectMap<AbstractIOUringChannel> channels = new IntObjectHashMap<AbstractIOUringChannel>(4096);
    private final RingBuffer ringBuffer;
    private static final long AWAKE = -1L;
    private static final long NONE = Long.MAX_VALUE;
    private final AtomicLong nextWakeupNanos = new AtomicLong(-1L);
    private final FileDescriptor eventfd;
    private final byte[] inet4AddressArray = new byte[4];
    private final byte[] inet6AddressArray = new byte[16];
    private final IOUringCompletionQueueCallback callback = this::handle;
    private final Runnable submitIOTask = () -> this.getRingBuffer().ioUringSubmissionQueue().submit();
    private long prevDeadlineNanos = Long.MAX_VALUE;
    private boolean pendingWakeup;

    IOUringEventLoop(IOUringEventLoopGroup parent, Executor executor, int ringSize, int iosqeAsyncThreshold, RejectedExecutionHandler rejectedExecutionHandler, EventLoopTaskQueueFactory queueFactory) {
        super(parent, executor, false, IOUringEventLoop.newTaskQueue(queueFactory), IOUringEventLoop.newTaskQueue(queueFactory), rejectedExecutionHandler);
        IOUring.ensureAvailability();
        this.ringBuffer = Native.createRingBuffer(ringSize, iosqeAsyncThreshold);
        this.eventfd = Native.newBlockingEventFd();
        logger.trace("New EventLoop: {}", (Object)this.toString());
    }

    public void submitIO() {
        if (this.inEventLoop()) {
            this.getRingBuffer().ioUringSubmissionQueue().submit();
        } else {
            this.execute(this.submitIOTask);
        }
    }

    private static Queue<Runnable> newTaskQueue(EventLoopTaskQueueFactory queueFactory) {
        if (queueFactory == null) {
            return IOUringEventLoop.newTaskQueue0(DEFAULT_MAX_PENDING_TASKS);
        }
        return queueFactory.newTaskQueue(DEFAULT_MAX_PENDING_TASKS);
    }

    @Override
    protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
        return IOUringEventLoop.newTaskQueue0(maxPendingTasks);
    }

    private static Queue<Runnable> newTaskQueue0(int maxPendingTasks) {
        return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.newMpscQueue() : PlatformDependent.newMpscQueue(maxPendingTasks);
    }

    void add(AbstractIOUringChannel ch) {
        if (this.isShuttingDown()) {
            throw new RejectedExecutionException("IoEventLoop is shutting down");
        }
        logger.trace("Add Channel: {} ", (Object)ch.socket.intValue());
        int fd = ch.socket.intValue();
        if (this.channels.put(fd, ch) == null) {
            this.ringBuffer.ioUringSubmissionQueue().incrementHandledFds();
        }
    }

    void remove(AbstractIOUringChannel ch) {
        logger.trace("Remove Channel: {}", (Object)ch.socket.intValue());
        int fd = ch.socket.intValue();
        AbstractIOUringChannel old = this.channels.remove(fd);
        if (old != null) {
            this.ringBuffer.ioUringSubmissionQueue().decrementHandledFds();
            if (old != ch) {
                this.channels.put(fd, old);
                assert (!ch.isOpen());
            }
        }
    }

    private void closeAll() {
        AbstractIOUringChannel[] localChannels;
        logger.trace("CloseAll IOUringEvenloop");
        for (AbstractIOUringChannel ch : localChannels = this.channels.values().toArray(new AbstractIOUringChannel[0])) {
            ch.unsafe().close(ch.unsafe().voidPromise());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void run() {
        IOUringCompletionQueue completionQueue = this.ringBuffer.ioUringCompletionQueue();
        IOUringSubmissionQueue submissionQueue = this.ringBuffer.ioUringSubmissionQueue();
        this.addEventFdRead(submissionQueue);
        int initialFlushResult = submissionQueue.submit();
        if (initialFlushResult != 1) {
            throw new AssertionError((Object)("Failed to submit EventFdRead. Result: " + initialFlushResult));
        }
        while (true) {
            try {
                logger.trace("Run IOUringEventLoop {}", (Object)this);
                long curDeadlineNanos = this.nextScheduledTaskDeadlineNanos();
                if (curDeadlineNanos == -1L) {
                    curDeadlineNanos = Long.MAX_VALUE;
                }
                this.nextWakeupNanos.set(curDeadlineNanos);
                try {
                    if (!this.hasTasks()) {
                        if (curDeadlineNanos != this.prevDeadlineNanos) {
                            this.prevDeadlineNanos = curDeadlineNanos;
                            submissionQueue.addTimeout(IOUringEventLoop.deadlineToDelayNanos(curDeadlineNanos), (short)0);
                        }
                        if (!completionQueue.hasCompletions()) {
                            logger.trace("submitAndWait {}", (Object)this);
                            submissionQueue.submitAndWait();
                        }
                    }
                }
                finally {
                    if (this.nextWakeupNanos.get() == -1L || this.nextWakeupNanos.getAndSet(-1L) == -1L) {
                        this.pendingWakeup = true;
                    }
                }
            }
            catch (Throwable t) {
                this.handleLoopException(t);
            }
            boolean maybeMoreWork = true;
            do {
                try {
                    maybeMoreWork = completionQueue.process(this.callback) != 0 | this.runAllTasks();
                }
                catch (Throwable t) {
                    this.handleLoopException(t);
                }
                try {
                    if (!this.isShuttingDown()) continue;
                    this.closeAll();
                    if (this.confirmShutdown()) {
                        return;
                    }
                    if (maybeMoreWork) continue;
                    maybeMoreWork = this.hasTasks() || completionQueue.hasCompletions();
                }
                catch (Throwable t) {
                    this.handleLoopException(t);
                }
            } while (maybeMoreWork);
        }
    }

    void handleLoopException(Throwable t) {
        logger.warn("Unexpected exception in the io_uring event loop", t);
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private void handle(int fd, int res, int flags, byte op, short data) {
        if (op == Native.IORING_OP_READ && this.eventfd.intValue() == fd) {
            this.pendingWakeup = false;
            this.addEventFdRead(this.ringBuffer.ioUringSubmissionQueue());
        } else if (op == Native.IORING_OP_TIMEOUT) {
            if (res == Native.ERRNO_ETIME_NEGATIVE) {
                this.prevDeadlineNanos = Long.MAX_VALUE;
            }
        } else {
            AbstractIOUringChannel channel = this.channels.get(fd);
            if (channel == null) {
                return;
            }
            if (op == Native.IORING_OP_RECV || op == Native.IORING_OP_ACCEPT || op == Native.IORING_OP_RECVMSG || op == Native.IORING_OP_READ) {
                this.handleRead(channel, res, data);
            } else if (op == Native.IORING_OP_WRITEV || op == Native.IORING_OP_SEND || op == Native.IORING_OP_SENDMSG || op == Native.IORING_OP_WRITE) {
                this.handleWrite(channel, res, data);
            } else if (op == Native.IORING_OP_POLL_ADD) {
                this.handlePollAdd(channel, res, data);
            } else if (op == Native.IORING_OP_POLL_REMOVE) {
                if (res == Errors.ERRNO_ENOENT_NEGATIVE) {
                    logger.trace("IORING_POLL_REMOVE not successful");
                } else if (res == 0) {
                    logger.trace("IORING_POLL_REMOVE successful");
                }
                if (!channel.isOpen()) {
                    channel.clearPollFlag(data);
                    if (!channel.ioScheduled()) {
                        this.remove(channel);
                        return;
                    }
                } else if (logger.isTraceEnabled()) {
                    logger.trace("IGNORING IORING_POLL_REMOVE on not closed fd = {}", (Object)fd);
                }
            } else if (op == Native.IORING_OP_CONNECT) {
                this.handleConnect(channel, res);
            }
            channel.ioUringUnsafe().processDelayedClose();
        }
    }

    private void handleRead(AbstractIOUringChannel channel, int res, int data) {
        channel.ioUringUnsafe().readComplete(res, data);
    }

    private void handleWrite(AbstractIOUringChannel channel, int res, int data) {
        channel.ioUringUnsafe().writeComplete(res, data);
    }

    private void handlePollAdd(AbstractIOUringChannel channel, int res, int pollMask) {
        if ((pollMask & Native.POLLOUT) != 0) {
            channel.ioUringUnsafe().pollOut(res);
        }
        if ((pollMask & Native.POLLIN) != 0) {
            channel.ioUringUnsafe().pollIn(res);
        }
        if ((pollMask & Native.POLLRDHUP) != 0) {
            channel.ioUringUnsafe().pollRdHup(res);
        }
    }

    private void addEventFdRead(IOUringSubmissionQueue submissionQueue) {
        submissionQueue.addEventFdRead(this.eventfd.intValue(), this.eventfdReadBuf, 0, 8, (short)0);
    }

    private void handleConnect(AbstractIOUringChannel channel, int res) {
        channel.ioUringUnsafe().connectComplete(res);
    }

    @Override
    protected void cleanup() {
        IOUringCompletionQueue completionQueue = this.ringBuffer.ioUringCompletionQueue();
        IOUringSubmissionQueue submissionQueue = this.ringBuffer.ioUringSubmissionQueue();
        if (this.pendingWakeup) {
            IOUringCompletionQueueCallback callback = new IOUringCompletionQueueCallback(){

                @Override
                public void handle(int fd, int res, int flags, byte op, short data) {
                    if (op == Native.IORING_OP_READ && IOUringEventLoop.this.eventfd.intValue() == fd) {
                        IOUringEventLoop.this.pendingWakeup = false;
                    } else {
                        IOUringEventLoop.this.handle(fd, res, flags, op, data);
                    }
                }
            };
            completionQueue.process(callback);
            while (this.pendingWakeup) {
                completionQueue.ioUringWaitCqe();
                completionQueue.process(callback);
            }
        }
        this.closeAll();
        while (!this.channels.isEmpty()) {
            if (this.runAllTasks()) continue;
            submissionQueue.submitAndWait();
            completionQueue.process(this.callback);
        }
        try {
            this.eventfd.close();
        }
        catch (IOException e) {
            logger.warn("Failed to close the event fd.", e);
        }
        PlatformDependent.freeMemory(this.eventfdReadBuf);
        this.ringBuffer.close();
    }

    RingBuffer getRingBuffer() {
        return this.ringBuffer;
    }

    @Override
    protected void wakeup(boolean inEventLoop) {
        if (!inEventLoop && this.nextWakeupNanos.getAndSet(-1L) != -1L) {
            Native.eventFdWrite(this.eventfd.intValue(), 1L);
        }
    }

    byte[] inet4AddressArray() {
        return this.inet4AddressArray;
    }

    byte[] inet6AddressArray() {
        return this.inet6AddressArray;
    }
}

