/*
 * Decompiled with CFR 0.152.
 */
package io.nats.examples.jetstream.simple;

import io.nats.client.Connection;
import io.nats.client.ConsumerContext;
import io.nats.client.FetchConsumeOptions;
import io.nats.client.FetchConsumer;
import io.nats.client.JetStream;
import io.nats.client.JetStreamApiException;
import io.nats.client.JetStreamManagement;
import io.nats.client.JetStreamStatusCheckedException;
import io.nats.client.Message;
import io.nats.client.Nats;
import io.nats.client.Options;
import io.nats.client.StreamContext;
import io.nats.client.api.ConsumerConfiguration;
import io.nats.examples.jetstream.NatsJsUtils;
import java.io.IOException;

public class FetchBytesExample {
    private static final String STREAM = "fetch-bytes-stream";
    private static final String SUBJECT = "fetch-bytes-subject";
    private static final String MESSAGE_TEXT = "fetch-bytes";
    private static final String CONSUMER_NAME_PREFIX = "fetch-bytes-consumer";
    private static final int MESSAGES = 20;
    private static final int EXPIRES_SECONDS = 2;
    public static String SERVER = "nats://localhost:4222";

    public static void main(String[] args) {
        Options options = Options.builder().server(SERVER).build();
        try (Connection nc = Nats.connect(options);){
            if (nc.getServerInfo().isOlderThanVersion("2.9.1")) {
                return;
            }
            JetStreamManagement jsm = nc.jetStreamManagement();
            JetStream js = nc.jetStream();
            NatsJsUtils.createOrReplaceStream(jsm, STREAM, SUBJECT);
            NatsJsUtils.publishOrExit(js, SUBJECT, MESSAGE_TEXT, 20);
            FetchBytesExample.simpleFetch(nc, js, "A", 0, 1000);
            FetchBytesExample.simpleFetch(nc, js, "B", 10, 2000);
            FetchBytesExample.simpleFetch(nc, js, "C", 0, 4000);
        }
        catch (IOException iOException) {
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    private static void simpleFetch(Connection nc, JetStream js, String label, int maxMessages, int maxBytes) {
        ConsumerContext consumerContext;
        String consumerName = FetchBytesExample.generateConsumerName(maxMessages, maxBytes);
        try {
            StreamContext streamContext = nc.getStreamContext(STREAM);
            consumerContext = streamContext.createOrUpdateConsumer(ConsumerConfiguration.builder().durable(consumerName).build());
        }
        catch (JetStreamApiException | IOException e) {
            return;
        }
        FetchConsumeOptions.Builder builder = FetchConsumeOptions.builder().expiresIn(2000L);
        if (maxMessages == 0) {
            builder.maxBytes(maxBytes);
        } else {
            builder.max(maxBytes, maxMessages);
        }
        FetchConsumeOptions fetchConsumeOptions = builder.build();
        FetchBytesExample.printExplanation(label, consumerName, maxMessages, maxBytes);
        int receivedMessages = 0;
        long receivedBytes = 0L;
        long start = System.currentTimeMillis();
        try (FetchConsumer consumer = consumerContext.fetch(fetchConsumeOptions);){
            Message msg = consumer.nextMessage();
            while (msg != null) {
                msg.ack();
                if ((receivedBytes += msg.consumeByteCount()) >= (long)maxBytes || ++receivedMessages == maxMessages) {
                    msg = null;
                    continue;
                }
                msg = consumer.nextMessage();
            }
        }
        catch (JetStreamApiException | JetStreamStatusCheckedException | IOException | InterruptedException e) {
            System.err.println("Exception should be handled properly, just exiting here.");
            System.exit(-1);
        }
        catch (Exception e) {
            System.err.println("Exception should be handled properly, just exiting here.");
            System.exit(-1);
        }
        long elapsed = System.currentTimeMillis() - start;
        FetchBytesExample.printSummary(receivedMessages, receivedBytes, elapsed);
    }

    private static String generateConsumerName(int maxMessages, int maxBytes) {
        if (maxMessages == 0) {
            return "fetch-bytes-consumer-" + maxBytes + "-bytes-unlimited-messages";
        }
        return "fetch-bytes-consumer-" + maxBytes + "-bytes-" + maxMessages + "-messages";
    }

    private static void printSummary(int receivedMessages, long receivedBytes, long elapsed) {
        System.out.println("+++ Fetch executed and " + receivedBytes + "/" + receivedMessages + " bytes/message(s) were received in " + elapsed + "ms\n");
    }

    private static void printExplanation(String label, String name, int maxMessages, int maxBytes) {
        System.out.println("--------------------------------------------------------------------------------");
        System.out.println(label + ". " + name);
        switch (label) {
            case "A": {
                System.out.println("=== Max bytes (" + maxBytes + ") threshold will be met since the next message would put the byte count over " + maxBytes + " bytes");
                System.out.println("=== nextMessage() will return null when consume is done.");
                break;
            }
            case "B": {
                System.out.println("=== Fetch max messages (" + maxMessages + ") will be reached before max bytes (" + maxBytes + ")");
                System.out.println("=== nextMessage() will return null when consume is done");
                break;
            }
            case "C": {
                System.out.println("=== Max bytes (" + maxBytes + ") is larger than available bytes (about 2700).");
                System.out.println("=== FetchConsumeOption \"expires in\" is 2 seconds.");
                System.out.println("=== nextMessage() blocks until expiration when there are no messages available, then returns null.");
            }
        }
    }
}

