/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp.commands.list.blocking;

import io.netty.channel.ChannelHandlerContext;
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.infinispan.AdvancedCache;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.encoding.DataConversion;
import org.infinispan.multimap.impl.EmbeddedMultimapListCache;
import org.infinispan.multimap.impl.ListBucket;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.server.resp.Consumers;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespErrorUtil;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.commands.ArgumentUtils;
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.server.resp.filter.EventListenerConverter;
import org.infinispan.server.resp.filter.EventListenerKeysFilter;
import org.infinispan.server.resp.logging.Log;

public class BPOP
extends RespCommand
implements Resp3Command {
    private static final Log log = (Log)LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);
    protected final boolean isFirst;

    public BPOP(boolean isFirst) {
        super(-3, 1, -2, 1);
        this.isFirst = isFirst;
    }

    @Override
    public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
        EmbeddedMultimapListCache<byte[], byte[]> listMultimap = handler.getListMultimap();
        int lastKeyIdx = arguments.size() - 1;
        List<byte[]> filterKeys = arguments.subList(0, lastKeyIdx);
        double argTimeout = ArgumentUtils.toDouble(arguments.get(lastKeyIdx));
        if (argTimeout < 0.0) {
            RespErrorUtil.mustBePositive(handler.allocator());
            return handler.myStage();
        }
        long timeout = (long)(argTimeout * (double)Duration.ofSeconds(1L).toMillis());
        CompletionStage<Collection<byte[]>> pollStage = BPOP.pollAllKeys(listMultimap, filterKeys, this.isFirst);
        return handler.stageToReturn(pollStage.thenCompose(v -> {
            CompletionStage<Collection<Object>> retStage = v != null && !v.isEmpty() ? CompletableFuture.completedFuture(v) : this.addSubscriber(listMultimap, filterKeys, timeout, handler);
            return retStage;
        }), ctx, Consumers.COLLECTION_BULK_BICONSUMER);
    }

    CompletionStage<Collection<byte[]>> addSubscriber(EmbeddedMultimapListCache<byte[], byte[]> listMultimap, List<byte[]> filterKeys, long timeout, Resp3Handler handler) {
        if (log.isTraceEnabled()) {
            log.tracef("Subscriber for keys: " + filterKeys.toString(), new Object[0]);
        }
        AdvancedCache cache = handler.typedCache(null);
        DataConversion vc = cache.getValueDataConversion();
        PubSubListener pubSubListener = new PubSubListener(filterKeys, handler, cache, listMultimap, this.isFirst);
        EventListenerKeysFilter filter = new EventListenerKeysFilter((byte[][])filterKeys.toArray(x$0 -> new byte[x$0][]));
        CompletionStage addListenerStage = cache.addListenerAsync((Object)pubSubListener, (CacheEventFilter)filter, new EventListenerConverter(vc));
        addListenerStage.whenComplete((ignore, t) -> {
            if (t != null) {
                pubSubListener.synchronizer.resultFuture.completeExceptionally((Throwable)t);
                return;
            }
            pubSubListener.startTimer(timeout);
            pubSubListener.synchronizer.onListenerAdded();
        });
        return pubSubListener.getFuture();
    }

    private static CompletionStage<Collection<byte[]>> pollAllKeys(EmbeddedMultimapListCache<byte[], byte[]> listMultimap, List<byte[]> filterKeys, boolean isFirst) {
        CompletionStage<Collection<Object>> pollStage = BPOP.pollKeyValue(listMultimap, filterKeys.get(0), isFirst);
        for (int i = 1; i < filterKeys.size(); ++i) {
            byte[] keyChannel = filterKeys.get(i);
            pollStage = pollStage.thenCompose(v -> v == null || v.isEmpty() ? BPOP.pollKeyValue(listMultimap, keyChannel, isFirst) : CompletableFuture.completedFuture(v));
        }
        return pollStage;
    }

    static CompletionStage<Collection<byte[]>> pollKeyValue(EmbeddedMultimapListCache<byte[], byte[]> mmList, byte[] key, boolean isFirst) {
        CompletionStage cs = isFirst ? mmList.pollFirst((Object)key, 1L) : mmList.pollLast((Object)key, 1L);
        return cs.thenApply(v -> v == null || v.isEmpty() ? null : Arrays.asList(key, (byte[])v.iterator().next()));
    }

    @Listener(clustered=true)
    public static class PubSubListener {
        private final AdvancedCache<byte[], Object> cache;
        private volatile ScheduledFuture<?> scheduledTimer;
        private final Resp3Handler handler;
        private final PollListenerSynchronizer synchronizer;

        private PubSubListener(List<byte[]> filterKeys, Resp3Handler handler, AdvancedCache<byte[], Object> cache, EmbeddedMultimapListCache<byte[], byte[]> mml, boolean isFirst) {
            this.cache = cache;
            this.handler = handler;
            this.synchronizer = new PollListenerSynchronizer(filterKeys, mml, isFirst);
            this.synchronizer.resultFuture.whenComplete((ignore_v, ignore_t) -> {
                this.deleteTimer();
                cache.removeListenerAsync((Object)this);
            });
        }

        public CompletableFuture<Collection<byte[]>> getFuture() {
            return this.synchronizer.resultFuture;
        }

        private void startTimer(long timeout) {
            this.deleteTimer();
            this.scheduledTimer = timeout > 0L ? this.handler.getScheduler().schedule(() -> {
                this.cache.removeListenerAsync((Object)this);
                this.synchronizer.resultFuture.complete(null);
            }, timeout, TimeUnit.MILLISECONDS) : null;
        }

        private void deleteTimer() {
            if (this.scheduledTimer != null) {
                this.scheduledTimer.cancel(true);
            }
            this.scheduledTimer = null;
        }

        @CacheEntryCreated
        @CacheEntryModified
        public void onEvent(CacheEntryEvent<Object, Object> entryEvent) {
            try {
                if (entryEvent.getValue() instanceof ListBucket) {
                    byte[] key = this.unwrapKey(entryEvent.getKey());
                    this.synchronizer.onEvent(key);
                }
            }
            catch (Exception ex) {
                this.synchronizer.resultFuture.completeExceptionally(ex);
            }
        }

        private byte[] unwrapKey(Object key) {
            return key instanceof WrappedByteArray ? ((WrappedByteArray)key).getBytes() : (byte[])key;
        }
    }

    public static class PollListenerSynchronizer {
        private final ArrayDeque<Object> keyQueue = new ArrayDeque();
        private final CompletableFuture<Collection<byte[]>> resultFuture = new CompletableFuture();
        private final EmbeddedMultimapListCache<byte[], byte[]> multimapList;
        private final List<byte[]> keys;
        private final BiConsumer<? super Collection<byte[]>, ? super Throwable> whenCompleteConsumer;
        private volatile boolean canPollJustEventKey;
        private final boolean isFirst;

        private PollListenerSynchronizer(List<byte[]> keys, EmbeddedMultimapListCache<byte[], byte[]> multimapList, boolean isFirst) {
            this.multimapList = multimapList;
            this.keys = keys;
            this.isFirst = isFirst;
            this.whenCompleteConsumer = (v, t) -> {
                if (t != null) {
                    this.resultFuture.completeExceptionally((Throwable)t);
                } else if (v != null && !v.isEmpty()) {
                    this.resultFuture.complete((Collection<byte[]>)v);
                } else {
                    Object key;
                    PollListenerSynchronizer pollListenerSynchronizer = this;
                    synchronized (pollListenerSynchronizer) {
                        if (this.keyQueue.poll() == this) {
                            this.canPollJustEventKey = true;
                        }
                        key = this.keyQueue.peek();
                    }
                    if (key != null) {
                        this.runPoll(key);
                    }
                }
            };
        }

        private void runPoll(Object key) {
            if (this.canPollJustEventKey && key != this) {
                BPOP.pollKeyValue(this.multimapList, (byte[])key, this.isFirst).whenComplete(this.whenCompleteConsumer);
            } else {
                BPOP.pollAllKeys(this.multimapList, this.keys, this.isFirst).whenComplete(this.whenCompleteConsumer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onListenerAdded() {
            boolean emptyQueue;
            PollListenerSynchronizer pollListenerSynchronizer = this;
            synchronized (pollListenerSynchronizer) {
                emptyQueue = this.keyQueue.isEmpty();
                this.keyQueue.offer(this);
            }
            if (emptyQueue) {
                BPOP.pollAllKeys(this.multimapList, this.keys, this.isFirst).whenComplete(this.whenCompleteConsumer);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void onEvent(byte[] key) {
            boolean emptyQueue;
            PollListenerSynchronizer pollListenerSynchronizer = this;
            synchronized (pollListenerSynchronizer) {
                emptyQueue = this.keyQueue.isEmpty();
                this.keyQueue.offer(key);
            }
            if (emptyQueue) {
                this.runPoll(key);
            }
        }
    }
}

