/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.jgroups.Address;
import org.jgroups.BatchMessage;
import org.jgroups.Event;
import org.jgroups.Header;
import org.jgroups.Message;
import org.jgroups.NullAddress;
import org.jgroups.View;
import org.jgroups.annotations.Experimental;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.conf.AttributeType;
import org.jgroups.stack.Protocol;
import org.jgroups.util.DefaultThreadFactory;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.Runner;
import org.jgroups.util.Util;

@Experimental
@MBean(description="Protocol just below flow control that wraps messages to improve throughput with small messages.")
public class BATCH2
extends Protocol {
    @Property(description="The maximum number of messages per batch")
    public int max_batch_size = 100;
    @Property(description="Time (microseconds) to wait on poll() from the down_queue. A value of <= 0 doesn't wait", type=AttributeType.TIME, unit=TimeUnit.MICROSECONDS)
    protected long poll_timeout = 100L;
    @ManagedAttribute(description="Number of messages sent in BatchMessages", type=AttributeType.SCALAR)
    protected long num_msgs_sent;
    @ManagedAttribute(description="Number of BatchMessages sent", type=AttributeType.SCALAR)
    protected long num_ebs_sent;
    @ManagedAttribute(description="Number of BatchMessages sent because the queue was full", type=AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_full_queue;
    @ManagedAttribute(description="Number of BatchMessages sent because the max number of messages has been reached (max_batch_size)", type=AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_max_number_of_msgs;
    @ManagedAttribute(description="Number of BatchMessages sent because the timeout kicked in", type=AttributeType.SCALAR)
    protected long num_ebs_sent_due_to_timeout;
    protected final NullAddress nullAddress = new NullAddress();
    protected volatile boolean running;
    protected Map<Address, Buffer> msgMap = Util.createConcurrentMap();
    protected static Batch2Header HEADER = new Batch2Header();
    protected final BlockingQueue<Message> down_queue = new ArrayBlockingQueue<Message>(9084);
    protected final List<Message> remove_queue = new ArrayList<Message>(1024);
    protected final Runner runner = new Runner(new DefaultThreadFactory("runner", true, true), "runner", this::processDownMessage, null);

    @ManagedAttribute(description="Average number of messages in an BatchMessage")
    public double avgBatchSize() {
        if (this.num_ebs_sent == 0L || this.num_msgs_sent == 0L) {
            return 0.0;
        }
        return (double)this.num_msgs_sent / (double)this.num_ebs_sent;
    }

    @Override
    public void init() throws Exception {
        this.msgMap.putIfAbsent(this.nullAddress, new Buffer(this.nullAddress));
    }

    @Override
    public void resetStats() {
        super.resetStats();
        this.num_msgs_sent = 0L;
        this.num_ebs_sent = 0L;
        this.num_ebs_sent_due_to_full_queue = 0L;
        this.num_ebs_sent_due_to_timeout = 0L;
        this.num_ebs_sent_due_to_max_number_of_msgs = 0L;
    }

    @Override
    public Object down(Event evt) {
        switch (evt.getType()) {
            case 6: {
                View v = (View)evt.getArg();
                this.handleViewChange(v.getMembers());
            }
        }
        return this.down_prot.down(evt);
    }

    @Override
    public Object up(Event evt) {
        switch (evt.getType()) {
            case 6: {
                this.handleViewChange(((View)evt.getArg()).getMembers());
            }
        }
        return this.up_prot.up(evt);
    }

    protected void handleViewChange(List<Address> mbrs) {
        if (mbrs == null) {
            return;
        }
        mbrs.stream().filter(dest -> !this.msgMap.containsKey(dest)).forEach(dest -> this.msgMap.putIfAbsent((Address)dest, new Buffer((Address)dest)));
        this.msgMap.keySet().stream().filter(dest -> !mbrs.contains(dest) && !(dest instanceof NullAddress)).forEach(dest -> {
            Buffer removed = this.msgMap.remove(dest);
            removed.close();
        });
    }

    @Override
    public Object down(Message msg) {
        if (msg.isFlagSet(Message.Flag.OOB) || msg.isFlagSet(Message.Flag.DONT_BUNDLE)) {
            return this.down_prot.down(msg);
        }
        if (msg.getSrc() == null) {
            msg.setSrc(this.local_addr);
        }
        if (!Objects.equals(msg.getSrc(), this.local_addr)) {
            return this.down_prot.down(msg);
        }
        try {
            this.down_queue.put(msg);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        return msg;
    }

    protected void processDownMessage() {
        try {
            Message msg = this.down_queue.take();
            if (msg == null) {
                return;
            }
            this._down(msg);
            block2: while (true) {
                this.remove_queue.clear();
                int num_msgs = this.down_queue.drainTo(this.remove_queue);
                if (num_msgs > 0) {
                    Iterator<Message> iterator = this.remove_queue.iterator();
                    while (true) {
                        if (!iterator.hasNext()) continue block2;
                        Message m = iterator.next();
                        this._down(m);
                    }
                }
                msg = this.down_queue.poll(this.poll_timeout, TimeUnit.MICROSECONDS);
                if (msg == null) break;
                this._down(msg);
            }
            this.msgMap.forEach((k, v) -> v.sendBatch(true));
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }

    protected Object _down(Message msg) {
        Address dest = msg.dest() == null ? this.nullAddress : msg.dest();
        Buffer ebbuffer = this.msgMap.computeIfAbsent(dest, k -> new Buffer(dest));
        boolean add_successful = ebbuffer.addMessage(msg);
        if (!add_successful) {
            return this.down_prot.down(msg);
        }
        return null;
    }

    @Override
    public Object up(Message msg) {
        if (msg.getHeader(this.getId()) == null) {
            return this.up_prot.up(msg);
        }
        BatchMessage comp = (BatchMessage)msg;
        for (Message bundledMsg : comp) {
            bundledMsg.setDest(comp.getDest());
            if (bundledMsg.getSrc() != null) continue;
            bundledMsg.setSrc(comp.getSrc());
        }
        MessageBatch batch = new MessageBatch();
        batch.set(comp.getDest(), comp.getSrc(), comp.getMessages());
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
        return null;
    }

    @Override
    public void up(MessageBatch batch) {
        int len = 0;
        for (Message msg : batch) {
            if (!(msg instanceof BatchMessage)) continue;
            len += ((BatchMessage)msg).getNumberOfMessages();
        }
        if (len > 0) {
            MessageBatch mb = new MessageBatch(len + 1).setDest(batch.dest()).setSender(batch.getSender());
            Iterator<Message> it = batch.iterator();
            while (it.hasNext()) {
                Message m = it.next();
                if (!(m instanceof BatchMessage)) continue;
                BatchMessage ebm = (BatchMessage)m;
                it.remove();
                mb.add(ebm.getMessages(), ebm.getNumberOfMessages());
            }
            if (!mb.isEmpty()) {
                this.up_prot.up(mb);
            }
        }
        if (!batch.isEmpty()) {
            this.up_prot.up(batch);
        }
    }

    @Override
    public void start() throws Exception {
        this.running = true;
        this.runner.start();
    }

    @Override
    public void stop() {
        this.running = false;
        this.runner.stop();
    }

    public static class Batch2Header
    extends Header {
        @Override
        public short getMagicId() {
            return 96;
        }

        @Override
        public Supplier<? extends Header> create() {
            return Batch2Header::new;
        }

        @Override
        public int serializedSize() {
            return 0;
        }

        @Override
        public void writeTo(DataOutput out) throws IOException {
        }

        @Override
        public void readFrom(DataInput in) throws IOException {
        }

        @Override
        public String toString() {
            return "BatchHeader";
        }
    }

    protected class Buffer {
        private final Address dest;
        private Message[] msgs;
        private int index;
        private boolean closed;
        private long total_bytes;

        protected Buffer(Address address) {
            this.dest = address;
            this.msgs = new Message[BATCH2.this.max_batch_size];
            this.index = 0;
        }

        protected boolean addMessage(Message msg) {
            if (this.closed) {
                return false;
            }
            int msg_bytes = msg.getLength();
            if (this.total_bytes + (long)msg_bytes > (long)BATCH2.this.getTransport().getBundler().getMaxSize()) {
                ++BATCH2.this.num_ebs_sent_due_to_full_queue;
                this.sendBatch(false);
            }
            this.msgs[this.index++] = msg;
            this.total_bytes += (long)msg_bytes;
            if (this.index == this.msgs.length) {
                ++BATCH2.this.num_ebs_sent_due_to_max_number_of_msgs;
                this.sendBatch(false);
            }
            return true;
        }

        protected void sendBatch(boolean due_to_timeout) {
            if (this.index == 0) {
                return;
            }
            if (this.index == 1) {
                BATCH2.this.down_prot.down(this.msgs[0]);
                this.msgs[0] = null;
                this.index = 0;
                this.total_bytes = 0L;
                ++BATCH2.this.num_msgs_sent;
                return;
            }
            Address ebdest = this.dest instanceof NullAddress ? null : this.dest;
            Message comp = new BatchMessage(ebdest, BATCH2.this.local_addr, this.msgs, this.index).putHeader(BATCH2.this.id, HEADER).setSrc(BATCH2.this.local_addr);
            this.msgs = new Message[BATCH2.this.max_batch_size];
            BATCH2.this.num_msgs_sent += (long)this.index;
            ++BATCH2.this.num_ebs_sent;
            if (due_to_timeout) {
                ++BATCH2.this.num_ebs_sent_due_to_timeout;
            }
            this.index = 0;
            this.total_bytes = 0L;
            BATCH2.this.down_prot.down(comp);
        }

        protected void close() {
            this.closed = true;
            this.sendBatch(false);
        }
    }
}

