/*
 * Decompiled with CFR 0.152.
 */
package net.openhft.chronicle.engine.pubsub;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.openhft.chronicle.core.Jvm;
import net.openhft.chronicle.core.pool.ClassAliasPool;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Publisher;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.Asset;
import net.openhft.chronicle.engine.api.tree.AssetNotFoundException;
import net.openhft.chronicle.engine.api.tree.RequestContext;
import net.openhft.chronicle.engine.server.internal.PublisherHandler;
import net.openhft.chronicle.engine.server.internal.ReferenceHandler;
import net.openhft.chronicle.engine.server.internal.TopicPublisherHandler;
import net.openhft.chronicle.network.connection.AbstractAsyncSubscription;
import net.openhft.chronicle.network.connection.AbstractStatelessClient;
import net.openhft.chronicle.network.connection.AsyncSubscription;
import net.openhft.chronicle.network.connection.CoreFields;
import net.openhft.chronicle.network.connection.TcpChannelHub;
import net.openhft.chronicle.wire.ValueIn;
import net.openhft.chronicle.wire.WireIn;
import net.openhft.chronicle.wire.WireKey;
import net.openhft.chronicle.wire.WireOut;
import net.openhft.chronicle.wire.Wires;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemotePublisher<T, M>
extends AbstractStatelessClient<TopicPublisherHandler.EventId>
implements Publisher<M> {
    private static final Logger LOG = LoggerFactory.getLogger(ReferenceHandler.class);
    private final Class<M> messageClass;
    private final Map<Object, Long> subscribersToTid = new ConcurrentHashMap<Object, Long>();

    public RemotePublisher(@NotNull RequestContext context, @NotNull Asset asset) throws AssetNotFoundException {
        super(asset.findView(TcpChannelHub.class), 0L, RemotePublisher.toUri(context));
        this.messageClass = context.messageType();
    }

    private static String toUri(@NotNull RequestContext context) {
        StringBuilder uri = new StringBuilder(context.fullName() + "?view=publisher");
        if (context.messageType() != String.class) {
            uri.append("&messageType=").append(ClassAliasPool.CLASS_ALIASES.nameFor(context.messageType()));
        }
        if (context.elementType() != String.class) {
            uri.append("&elementType=").append(ClassAliasPool.CLASS_ALIASES.nameFor(context.elementType()));
        }
        if (context.dontPersist()) {
            uri.append("&dontPersist=").append(context.dontPersist());
        }
        return uri.toString();
    }

    private void checkTopic(@Nullable Object topic) {
        if (topic == null) {
            throw new NullPointerException("topic can not be null");
        }
    }

    private void checkMessage(@Nullable Object message) {
        if (message == null) {
            throw new NullPointerException("message can not be null");
        }
    }

    private void onEvent(T topic, @Nullable M message, @NotNull TopicSubscriber<T, M> topicSubscriber) throws InvalidSubscriberException {
        if (message != null) {
            topicSubscriber.onMessage(topic, message);
        }
    }

    private void onEvent(@Nullable M message, @NotNull Subscriber<M> topicSubscriber) throws InvalidSubscriberException {
        if (message != null) {
            topicSubscriber.onMessage(message);
        }
    }

    @Override
    public void publish(M event) {
        this.checkMessage(event);
        this.sendEventAsync((WireKey)TopicPublisherHandler.EventId.publish, valueOut -> valueOut.marshallable(m -> m.write((WireKey)TopicPublisherHandler.Params.message).object(event)), true);
    }

    @Override
    public void registerSubscriber(boolean bootstrap, int throttlePeriodMs, final @NotNull Subscriber<M> subscriber) throws AssetNotFoundException {
        if (this.hub.outBytesLock().isHeldByCurrentThread()) {
            throw new IllegalStateException("Cannot view map while debugging");
        }
        this.hub.subscribe((AsyncSubscription)new AbstractAsyncSubscription(this.hub, this.csp, "Remote Topic publisher register subscribe"){

            public void onSubscribe(@NotNull WireOut wireOut) {
                RemotePublisher.this.subscribersToTid.put(subscriber, this.tid());
                wireOut.writeEventName((WireKey)PublisherHandler.EventId.registerSubscriber).text("");
            }

            public void onConsumer(@NotNull WireIn w) {
                w.readDocument(null, d -> {
                    StringBuilder eventname = Wires.acquireStringBuilder();
                    ValueIn valueIn = d.readEventName(eventname);
                    if (TopicPublisherHandler.EventId.onEndOfSubscription.contentEquals(eventname)) {
                        subscriber.onEndOfSubscription();
                        RemotePublisher.this.subscribersToTid.remove((Object)this);
                        RemotePublisher.this.hub.unsubscribe(this.tid());
                    } else if (CoreFields.reply.contentEquals((CharSequence)eventname)) {
                        valueIn.marshallable(m -> {
                            try {
                                Object message = m.read(() -> "message").object(RemotePublisher.this.messageClass);
                                RemotePublisher.this.onEvent(message, subscriber);
                            }
                            catch (InvalidSubscriberException e) {
                                throw Jvm.rethrow((Throwable)e);
                            }
                        });
                    }
                });
            }
        });
    }

    @Override
    public void unregisterSubscriber(Subscriber subscriber) {
        Long tid = this.subscribersToTid.get(subscriber);
        if (tid == null) {
            Jvm.debug().on(this.getClass(), "No subscriber to unsubscribe");
            return;
        }
        this.hub.preventSubscribeUponReconnect(tid.longValue());
        if (!this.hub.isOpen()) {
            this.hub.unsubscribe(tid.longValue());
            return;
        }
        this.sendEventAsync((WireKey)ReferenceHandler.EventId.unregisterSubscriber, valueOut -> valueOut.int64(tid.longValue()), false);
    }

    @Override
    public int subscriberCount() {
        throw new UnsupportedOperationException("todo");
    }
}

