/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.autobench;

import io.nats.client.Connection;
import io.nats.client.Dispatcher;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.examples.autobench.ThrottledBenchmark;
import java.text.NumberFormat;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

public class PubDispatchBenchmark
extends ThrottledBenchmark {
    public PubDispatchBenchmark(String name, long messageCount, long messageSize) {
        super(name, messageCount, messageSize);
    }

    @Override
    void executeWithLimiter(Options connectOptions) throws InterruptedException {
        byte[] payload = this.createPayload();
        String subject = this.getSubject();
        CompletableFuture<Object> go = new CompletableFuture<Object>();
        CompletableFuture<Void> subReady = new CompletableFuture<Void>();
        CompletableFuture<Void> pubReady = new CompletableFuture<Void>();
        CompletableFuture<Void> subDone = new CompletableFuture<Void>();
        CompletableFuture<Void> pubDone = new CompletableFuture<Void>();
        Thread subThread = new Thread(() -> {
            try {
                Connection subConnect = Nats.connect(connectOptions);
                if (subConnect.getStatus() != Connection.Status.CONNECTED) {
                    throw new Exception("Unable to connect");
                }
                try {
                    AtomicInteger count = new AtomicInteger(0);
                    Dispatcher d = subConnect.createDispatcher(msg -> {
                        if ((long)count.incrementAndGet() >= this.getMessageCount()) {
                            subDone.complete(null);
                        }
                    });
                    d.subscribe(subject);
                    subConnect.flush(Duration.ofSeconds(5L));
                    subReady.complete(null);
                    while (subConnect.getStatus() == Connection.Status.CONNECTED && !subDone.isDone()) {
                        try {
                            subDone.get(100L, TimeUnit.MILLISECONDS);
                        }
                        catch (CancellationException | TimeoutException exception) {}
                    }
                    if ((long)count.get() < this.getMessageCount()) {
                        throw new Exception("Dispatcher missed " + NumberFormat.getIntegerInstance().format(this.getMessageCount() - (long)count.get()) + " messages.");
                    }
                }
                catch (Exception exp) {
                    this.setException(exp);
                }
                finally {
                    subConnect.close();
                }
            }
            catch (Exception ex) {
                subReady.cancel(true);
                this.setException(ex);
            }
            finally {
                subDone.complete(null);
            }
        }, "PubDispatch Test - Subscriber");
        subThread.start();
        Thread pubThread = new Thread(() -> {
            try {
                Connection pubConnect = Nats.connect(connectOptions);
                if (pubConnect.getStatus() != Connection.Status.CONNECTED) {
                    throw new Exception("Unable to connect");
                }
                try {
                    pubReady.complete(null);
                    go.get();
                    int i = 0;
                    while ((long)i < this.getMessageCount()) {
                        pubConnect.publish(subject, payload);
                        this.adjustAndSleep(pubConnect);
                        ++i;
                    }
                    this.defaultFlush(pubConnect);
                    pubDone.complete(null);
                }
                finally {
                    pubConnect.close();
                }
            }
            catch (Exception ex) {
                pubReady.cancel(true);
                this.setException(ex);
                this.pubFailed();
            }
            finally {
                pubDone.complete(null);
            }
        }, "PubDispatch Test - Publisher");
        pubThread.start();
        this.getFutureSafely(subReady);
        this.getFutureSafely(pubReady);
        if (this.getException() != null) {
            go.complete(null);
            return;
        }
        this.startTiming();
        go.complete(null);
        this.getFutureSafely(pubDone);
        this.getFutureSafely(subDone);
        this.endTiming();
    }
}

