/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.pubsub;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import net.openhft.chronicle.bytes.BytesStore;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.threads.EventLoop;
import net.openhft.chronicle.core.threads.InvalidEventHandlerException;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.pubsub.SimpleSubscription;
import net.openhft.chronicle.engine.query.Filter;
import net.openhft.chronicle.engine.tree.ChronicleQueueView;
import net.openhft.chronicle.engine.tree.QueueView;
import org.jetbrains.annotations.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class QueueSimpleSubscription<E>
implements SimpleSubscription<E> {
    private static final Logger LOG = LoggerFactory.getLogger(QueueSimpleSubscription.class);
    private final Map<Subscriber<E>, AtomicBoolean> subscribers = new ConcurrentHashMap<Subscriber<E>, AtomicBoolean>();
    private final Function<Object, E> valueReader;
    @NotNull
    private final ChronicleQueueView<?, E> chronicleQueue;
    @NotNull
    private final EventLoop eventLoop;
    private final String topic;

    public QueueSimpleSubscription(Function<Object, E> valueReader, @NotNull Asset parent, String topic) {
        this.valueReader = valueReader;
        this.topic = topic;
        this.chronicleQueue = (ChronicleQueueView)parent.acquireView(QueueView.class);
        this.eventLoop = parent.acquireView(EventLoop.class);
    }

    @Override
    public void registerSubscriber(@NotNull RequestContext rc, @NotNull Subscriber<E> subscriber, @NotNull Filter<E> filter) {
        this.registerSubscriber(false, 0, subscriber);
    }

    public void registerSubscriber(boolean bootstrap, int throttlePeriodMs, @NotNull Subscriber<E> subscriber) throws AssetNotFoundException {
        AtomicBoolean terminate = new AtomicBoolean();
        this.subscribers.put(subscriber, terminate);
        QueueView.Tailer<?, E> tailer = this.chronicleQueue.tailer();
        this.eventLoop.addHandler(() -> {
            if (terminate.get()) {
                throw new InvalidEventHandlerException();
            }
            QueueView.Excerpt next = tailer.read();
            if (next == null) {
                return false;
            }
            try {
                Object topic = next.topic();
                if (!this.topic.equals(topic.toString())) {
                    return true;
                }
                subscriber.onMessage(next.message());
            }
            catch (InvalidSubscriberException e) {
                terminate.set(true);
            }
            return true;
        });
    }

    @Override
    public void unregisterSubscriber(Subscriber subscriber) {
        AtomicBoolean terminator = this.subscribers.remove(subscriber);
        if (terminator != null) {
            terminator.set(true);
        }
    }

    @Override
    public int keySubscriberCount() {
        return this.subscriberCount();
    }

    @Override
    public int entrySubscriberCount() {
        return 0;
    }

    @Override
    public int topicSubscriberCount() {
        return 0;
    }

    @Override
    public int subscriberCount() {
        return this.subscribers.size();
    }

    @Override
    public void notifyMessage(Object e) {
        try {
            Object object = e instanceof BytesStore ? this.valueReader.apply(e) : e;
        }
        catch (ClassCastException e1) {
            if (LOG.isDebugEnabled()) {
                Jvm.debug().on(this.getClass(), "Is " + this.valueReader + " the correct ValueReader?");
            }
            throw e1;
        }
    }

    public void close() {
        for (Subscriber<E> subscriber : this.subscribers.keySet()) {
            try {
                subscriber.onEndOfSubscription();
            }
            catch (Exception e) {
                Jvm.debug().on(this.getClass(), (Throwable)e);
            }
        }
    }
}

