/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream;

import io.nats.client.Connection;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamSubscription;
import io.nats.client.Message;
import io.nats.client.api.ConsumerInfo;
import io.nats.client.api.StorageType;
import io.nats.client.api.StreamConfiguration;
import io.nats.client.api.StreamInfo;
import io.nats.client.impl.NatsJetStreamMetaData;
import io.nats.client.impl.NatsMessage;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class NatsJsUtils {
    public static StreamInfo getStreamInfoOrNullWhenNotExist(JetStreamManagement jsm, String streamName) throws IOException, JetStreamApiException {
        try {
            return jsm.getStreamInfo(streamName);
        }
        catch (JetStreamApiException jsae) {
            if (jsae.getErrorCode() == 404) {
                return null;
            }
            throw jsae;
        }
    }

    public static boolean streamExists(JetStreamManagement jsm, String streamName) throws IOException, JetStreamApiException {
        return NatsJsUtils.getStreamInfoOrNullWhenNotExist(jsm, streamName) != null;
    }

    public static boolean streamExists(Connection nc, String streamName) throws IOException, JetStreamApiException {
        return NatsJsUtils.getStreamInfoOrNullWhenNotExist(nc.jetStreamManagement(), streamName) != null;
    }

    public static void exitIfStreamExists(JetStreamManagement jsm, String streamName) throws IOException, JetStreamApiException {
        if (NatsJsUtils.streamExists(jsm, streamName)) {
            System.out.println("\nThe example cannot run since the stream '" + streamName + "' already exists.\nIt depends on the stream being in a new state. You can either:\n  1) Change the stream name in the example.\n  2) Delete the stream.\n  3) Restart the server if the stream is a memory stream.");
            System.exit(-1);
        }
    }

    public static void exitIfStreamNotExists(Connection nc, String streamName) throws IOException, JetStreamApiException {
        if (!NatsJsUtils.streamExists(nc, streamName)) {
            System.out.println("\nThe example cannot run since the stream '" + streamName + "' does not exist.\nIt depends on the stream existing and having data.");
            System.exit(-1);
        }
    }

    public static StreamInfo createOrReplaceStream(Connection nc, String stream, String ... subjects) throws IOException {
        return NatsJsUtils.createOrReplaceStream(nc.jetStreamManagement(), stream, StorageType.Memory, subjects);
    }

    public static StreamInfo createOrReplaceStream(JetStreamManagement jsm, String stream, String ... subjects) {
        return NatsJsUtils.createOrReplaceStream(jsm, stream, StorageType.Memory, subjects);
    }

    public static StreamInfo createOrReplaceStream(JetStreamManagement jsm, String stream, StorageType storageType, String ... subjects) {
        NatsJsUtils.safeDeleteStream(jsm, stream);
        try {
            return jsm.addStream(StreamConfiguration.builder().name(stream).storageType(storageType).subjects(subjects).build());
        }
        catch (Exception e) {
            System.err.println("Fatal error, cannot create stream.");
            System.exit(-1);
            return null;
        }
    }

    public static StreamInfo createStream(JetStreamManagement jsm, String streamName, StorageType storageType, String ... subjects) throws IOException, JetStreamApiException {
        StreamConfiguration sc = StreamConfiguration.builder().name(streamName).storageType(storageType).subjects(subjects).build();
        StreamInfo si = jsm.addStream(sc);
        System.out.printf("Created stream '%s' with subject(s) %s\n", streamName, si.getConfiguration().getSubjects());
        return si;
    }

    public static StreamInfo createStream(JetStreamManagement jsm, String streamName, String ... subjects) throws IOException, JetStreamApiException {
        return NatsJsUtils.createStream(jsm, streamName, StorageType.Memory, subjects);
    }

    public static StreamInfo createStream(Connection nc, String stream, String ... subjects) throws IOException, JetStreamApiException {
        return NatsJsUtils.createStream(nc.jetStreamManagement(), stream, StorageType.Memory, subjects);
    }

    public static StreamInfo createStreamExitWhenExists(Connection nc, String streamName, String ... subjects) throws IOException, JetStreamApiException {
        return NatsJsUtils.createStreamExitWhenExists(nc.jetStreamManagement(), streamName, subjects);
    }

    public static StreamInfo createStreamExitWhenExists(JetStreamManagement jsm, String streamName, String ... subjects) throws IOException, JetStreamApiException {
        NatsJsUtils.exitIfStreamExists(jsm, streamName);
        return NatsJsUtils.createStream(jsm, streamName, StorageType.Memory, subjects);
    }

    public static StreamInfo createStreamOrUpdateSubjects(JetStreamManagement jsm, String streamName, StorageType storageType, String ... subjects) throws IOException, JetStreamApiException {
        StreamInfo si = NatsJsUtils.getStreamInfoOrNullWhenNotExist(jsm, streamName);
        if (si == null) {
            return NatsJsUtils.createStream(jsm, streamName, storageType, subjects);
        }
        StreamConfiguration sc = si.getConfiguration();
        boolean needToUpdate = false;
        for (String sub : subjects) {
            if (sc.getSubjects().contains(sub)) continue;
            needToUpdate = true;
            sc.getSubjects().add(sub);
        }
        if (needToUpdate) {
            sc = StreamConfiguration.builder(sc).subjects(sc.getSubjects()).build();
            si = jsm.updateStream(sc);
            System.out.printf("Existing stream '%s' was updated, has subject(s) %s\n", streamName, si.getConfiguration().getSubjects());
        } else {
            System.out.printf("Existing stream '%s' already contained subject(s) %s\n", streamName, si.getConfiguration().getSubjects());
        }
        return si;
    }

    public static StreamInfo createStreamOrUpdateSubjects(JetStreamManagement jsm, String streamName, String ... subjects) throws IOException, JetStreamApiException {
        return NatsJsUtils.createStreamOrUpdateSubjects(jsm, streamName, StorageType.Memory, subjects);
    }

    public static StreamInfo createStreamOrUpdateSubjects(Connection nc, String stream, String ... subjects) throws IOException, JetStreamApiException {
        return NatsJsUtils.createStreamOrUpdateSubjects(nc.jetStreamManagement(), stream, StorageType.Memory, subjects);
    }

    public static void publish(Connection nc, String subject, int count) throws IOException, JetStreamApiException {
        NatsJsUtils.publish(nc.jetStream(), subject, "data", count, -1, false);
    }

    public static void publish(JetStream js, String subject, int count) throws IOException, JetStreamApiException {
        NatsJsUtils.publish(js, subject, "data", count, -1, false);
    }

    public static void publish(JetStream js, String subject, int count, int msgSize) throws IOException, JetStreamApiException {
        NatsJsUtils.publish(js, subject, "data", count, msgSize, false);
    }

    public static void publish(JetStream js, String subject, String prefix, int count) throws IOException, JetStreamApiException {
        NatsJsUtils.publish(js, subject, prefix, count, -1, false);
    }

    public static void publish(JetStream js, String subject, String prefix, int count, int msgSize) throws IOException, JetStreamApiException {
        NatsJsUtils.publish(js, subject, prefix, count, msgSize, false);
    }

    public static void publish(JetStream js, String subject, String prefix, int count, boolean verbose) throws IOException, JetStreamApiException {
        NatsJsUtils.publish(js, subject, prefix, count, -1, verbose);
    }

    public static void publish(JetStream js, String subject, String prefix, int count, int msgSize, boolean verbose) throws IOException, JetStreamApiException {
        if (verbose) {
            System.out.print("Publish ->");
        }
        for (int x = 1; x <= count; ++x) {
            byte[] data = NatsJsUtils.makeData(prefix, msgSize, verbose, x);
            NatsMessage msg = NatsMessage.builder().subject(subject).data(data).build();
            js.publish(msg);
        }
        if (verbose) {
            System.out.println(" <-");
        }
    }

    public static void publishOrExit(JetStream js, String subject, String prefix, int count) {
        try {
            for (int x = 1; x <= count; ++x) {
                js.publish(subject, (prefix + "-" + x).getBytes());
            }
        }
        catch (Exception e) {
            System.err.println("Fatal error, publish failure.");
            System.exit(-1);
        }
    }

    public static byte[] makeData(String prefix, int msgSize, boolean verbose, int x) {
        byte[] data;
        if (msgSize == 0) {
            return null;
        }
        String text = prefix + "-" + x;
        if (verbose) {
            System.out.print(" " + text);
        }
        if (msgSize > (data = text.getBytes(StandardCharsets.US_ASCII)).length) {
            byte[] larger = new byte[msgSize];
            System.arraycopy(data, 0, larger, 0, data.length);
            data = larger;
        }
        return data;
    }

    public static long extractId(String data) {
        int at1 = data.indexOf("#");
        if (at1 == -1) {
            return -1L;
        }
        int at2 = data.indexOf("#", at1 + 1);
        if (at2 == -1) {
            return -1L;
        }
        return Long.parseLong(data.substring(at1 + 1, at2));
    }

    public static long extractId(byte[] data) {
        int at1 = -1;
        int at2 = -1;
        for (int x = 0; x < data.length; ++x) {
            if (data[x] != 35) continue;
            if (at1 == -1) {
                at1 = x;
                continue;
            }
            at2 = x;
            break;
        }
        if (at1 == -1 || at2 == -1) {
            return -1L;
        }
        return Long.parseLong(new String(data, at1 + 1, at2 - at1 - 1));
    }

    public static long extractId(Message m) {
        return NatsJsUtils.extractId(m.getData());
    }

    public static Thread publishInBackground(JetStream js, String subject, String prefix, int count) {
        return NatsJsUtils.publishInBackground(js, subject, prefix, count, 0L);
    }

    public static Thread publishInBackground(JetStream js, String subject, String prefix, int count, long delay) {
        Thread t = new Thread(() -> {
            try {
                if (delay > 0L) {
                    Thread.sleep(delay);
                }
                for (int x = 1; x <= count; ++x) {
                    String data = prefix + "-" + x;
                    NatsMessage msg = NatsMessage.builder().subject(subject).data(data.getBytes(StandardCharsets.US_ASCII)).build();
                    js.publish(msg);
                }
            }
            catch (Exception e) {
                e.printStackTrace();
                System.exit(-1);
            }
        });
        t.start();
        return t;
    }

    public static List<Message> readMessagesAck(JetStreamSubscription sub) throws InterruptedException {
        return NatsJsUtils.readMessagesAck(sub, true, Duration.ofSeconds(1L));
    }

    public static List<Message> readMessagesAck(JetStreamSubscription sub, boolean verbose) throws InterruptedException {
        return NatsJsUtils.readMessagesAck(sub, verbose, Duration.ofSeconds(1L));
    }

    public static List<Message> readMessagesAck(JetStreamSubscription sub, Duration nextMessageTimeout) throws InterruptedException {
        return NatsJsUtils.readMessagesAck(sub, true, nextMessageTimeout);
    }

    public static List<Message> readMessagesAck(JetStreamSubscription sub, boolean verbose, Duration nextMessageTimeout) throws InterruptedException {
        if (verbose) {
            System.out.print("Read/Ack ->");
        }
        ArrayList<Message> messages = new ArrayList<Message>();
        Message msg = sub.nextMessage(nextMessageTimeout);
        while (msg != null) {
            messages.add(msg);
            msg.ack();
            if (verbose) {
                System.out.print(" " + new String(msg.getData()));
            }
            msg = sub.nextMessage(nextMessageTimeout);
        }
        if (verbose) {
            System.out.println(messages.size() == 0 ? " No messages available <-" : " <- ");
        }
        return messages;
    }

    public static void printStreamInfo(StreamInfo si) {
        NatsJsUtils.printObject(si, "StreamConfiguration", "StreamState", "ClusterInfo", "Mirror", "subjects", "sources");
    }

    public static void printStreamInfoList(List<StreamInfo> list) {
        NatsJsUtils.printObject(list, "!StreamInfo", "StreamConfiguration", "StreamState");
    }

    public static void printConsumerInfo(ConsumerInfo ci) {
        NatsJsUtils.printObject(ci, "ConsumerConfiguration", "Delivered", "AckFloor");
    }

    public static void printConsumerInfoList(List<ConsumerInfo> list) {
        NatsJsUtils.printObject(list, "!ConsumerInfo", "ConsumerConfiguration", "Delivered", "AckFloor");
    }

    public static void printObject(Object o, String ... subObjectNames) {
        String s = o.toString();
        for (String sub : subObjectNames) {
            boolean noIndent = sub.startsWith("!");
            String sb = noIndent ? sub.substring(1) : sub;
            String rx1 = ", " + sb;
            String repl1 = (noIndent ? ",\n" : ",\n    ") + sb;
            s = s.replace(rx1, repl1);
        }
        System.out.println(s);
    }

    public static String metaString(NatsJetStreamMetaData meta) {
        return "Meta{str='" + meta.getStream() + '\'' + ", con='" + meta.getConsumer() + '\'' + ", delivered=" + meta.deliveredCount() + ", strSeq=" + meta.streamSequence() + ", conSeq=" + meta.consumerSequence() + ", pending=" + meta.pendingCount() + '}';
    }

    public static void report(String message) {
        String t = "" + System.currentTimeMillis();
        System.out.println("[" + t.substring(t.length() - 9) + "] " + message);
    }

    public static void reportFetch(List<Message> list) {
        System.out.print("Fetch ->");
        for (Message m : list) {
            System.out.print(" " + new String(m.getData()));
        }
        System.out.println(" <- ");
    }

    public static List<Message> reportFetch(Iterator<Message> list) {
        ArrayList<Message> messages = new ArrayList<Message>();
        System.out.print("Fetch ->");
        while (list.hasNext()) {
            Message m = list.next();
            messages.add(m);
            System.out.print(" " + new String(m.getData()));
        }
        System.out.println(" <- ");
        return messages;
    }

    public static Thread getStreamReportingThread(JetStreamManagement jsm, String stream, long reportFrequency) {
        return new Thread(() -> {
            while (true) {
                try {
                    while (true) {
                        StreamInfo si = jsm.getStreamInfo(stream);
                        NatsJsUtils.report("Stream Configuration:" + si.getConfiguration().toJson());
                        NatsJsUtils.report(si.getClusterInfo().toString());
                        NatsJsUtils.report(si.getStreamState().toString());
                        Thread.sleep(reportFrequency);
                    }
                }
                catch (Exception e) {
                    NatsJsUtils.report("Misc Reporting Exception: " + e);
                    continue;
                }
                break;
            }
        });
    }

    public static int countJs(List<Message> messages) {
        int count = 0;
        for (Message m : messages) {
            ++count;
        }
        return count;
    }

    public static int count408s(List<Message> messages) {
        int count = 0;
        for (Message m : messages) {
            if (!m.isStatusMessage() || m.getStatus().getCode() != 408) continue;
            ++count;
        }
        return count;
    }

    public static void createCleanMemStream(Connection nc, String stream, String ... subs) throws IOException, JetStreamApiException {
        NatsJsUtils.createCleanMemStream(nc.jetStreamManagement(), stream, subs);
    }

    public static void createCleanMemStream(JetStreamManagement jsm, String stream, String ... subs) throws IOException, JetStreamApiException {
        NatsJsUtils.safeDeleteStream(jsm, stream);
        StreamConfiguration sc = StreamConfiguration.builder().name(stream).storageType(StorageType.Memory).subjects(subs).build();
        jsm.addStream(sc);
    }

    public static void safeDeleteStream(JetStreamManagement jsm, String stream) {
        try {
            jsm.deleteStream(stream);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }
}

