/*
 * Decompiled with CFR 0.152.
 */
package org.infinispan.server.resp.commands.pubsub;

import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.CompletionStage;
import org.infinispan.commons.logging.LogFactory;
import org.infinispan.commons.marshall.WrappedByteArray;
import org.infinispan.encoding.DataConversion;
import org.infinispan.notifications.cachelistener.filter.CacheEventFilter;
import org.infinispan.server.resp.Resp3Handler;
import org.infinispan.server.resp.RespCommand;
import org.infinispan.server.resp.RespRequestHandler;
import org.infinispan.server.resp.SubscriberHandler;
import org.infinispan.server.resp.commands.PubSubResp3Command;
import org.infinispan.server.resp.commands.Resp3Command;
import org.infinispan.server.resp.commands.pubsub.KeyChannelUtils;
import org.infinispan.server.resp.filter.EventListenerConverter;
import org.infinispan.server.resp.filter.EventListenerKeysFilter;
import org.infinispan.server.resp.logging.Log;
import org.infinispan.util.concurrent.AggregateCompletionStage;
import org.infinispan.util.concurrent.CompletionStages;

public class SUBSCRIBE
extends RespCommand
implements Resp3Command,
PubSubResp3Command {
    private static final Log log = (Log)LogFactory.getLog(MethodHandles.lookup().lookupClass(), Log.class);

    public SUBSCRIBE() {
        super(-2, 0, 0, 0);
    }

    @Override
    public CompletionStage<RespRequestHandler> perform(Resp3Handler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
        SubscriberHandler subscriberHandler = new SubscriberHandler(handler.respServer(), handler);
        return subscriberHandler.handleRequest(ctx, this, arguments);
    }

    @Override
    public CompletionStage<RespRequestHandler> perform(SubscriberHandler handler, ChannelHandlerContext ctx, List<byte[]> arguments) {
        AggregateCompletionStage aggregateCompletionStage = CompletionStages.aggregateCompletionStage();
        for (byte[] keyChannel : arguments) {
            if (log.isTraceEnabled()) {
                log.tracef("Subscriber for channel: " + String.valueOf(CharsetUtil.UTF_8.decode(ByteBuffer.wrap(keyChannel))), new Object[0]);
            }
            WrappedByteArray wrappedByteArray = new WrappedByteArray(keyChannel);
            if (handler.specificChannelSubscribers().get(wrappedByteArray) != null) continue;
            SubscriberHandler.PubSubListener pubSubListener = new SubscriberHandler.PubSubListener(ctx.channel());
            handler.specificChannelSubscribers().put(wrappedByteArray, pubSubListener);
            byte[] channel = KeyChannelUtils.keyToChannel(keyChannel);
            DataConversion dc = handler.cache().getValueDataConversion();
            CompletionStage stage = handler.cache().addListenerAsync((Object)pubSubListener, (CacheEventFilter)new EventListenerKeysFilter(channel), new EventListenerConverter(dc));
            aggregateCompletionStage.dependsOn(handler.handleStageListenerError(stage, keyChannel, true));
        }
        return handler.sendSubscriptions(ctx, aggregateCompletionStage.freeze(), arguments, true);
    }
}

