/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Promise;
import io.vertx.core.VertxException;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.EventBusOptions;
import io.vertx.core.eventbus.impl.MessageImpl;
import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.internal.VertxInternal;
import io.vertx.core.internal.logging.Logger;
import io.vertx.core.internal.logging.LoggerFactory;
import io.vertx.core.internal.net.NetSocketInternal;
import io.vertx.core.net.NetSocket;
import io.vertx.core.spi.metrics.EventBusMetrics;
import java.util.ArrayDeque;
import java.util.Queue;

final class OutboundConnection
implements Handler<Buffer> {
    private static final Logger log = LoggerFactory.getLogger(OutboundConnection.class);
    private static final String PING_ADDRESS = "__vertx_ping";
    private final ClusteredEventBus eventBus;
    private final String remoteNodeId;
    private final VertxInternal vertx;
    private final EventBusMetrics<?> metrics;
    private Queue<MessageWrite> pendingWrites;
    private NetSocket socket;
    private boolean connected;
    private long pingReplyTimeoutID = -1L;
    private long pingTimeoutID = -1L;
    private boolean closed;

    OutboundConnection(ClusteredEventBus eventBus, String remoteNodeId) {
        this.eventBus = eventBus;
        this.remoteNodeId = remoteNodeId;
        this.vertx = eventBus.vertx();
        this.metrics = eventBus.getMetrics();
    }

    String remoteNodeId() {
        return this.remoteNodeId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    synchronized void writeMessage(MessageImpl<?, ?> message, Promise<Void> writePromise) {
        VertxException failure;
        OutboundConnection outboundConnection = this;
        synchronized (outboundConnection) {
            if (this.closed) {
                failure = NetSocketInternal.CLOSED_EXCEPTION;
            } else if (this.connected) {
                failure = null;
            } else {
                if (this.pendingWrites == null) {
                    if (log.isDebugEnabled()) {
                        log.debug((Object)("Not connected to server " + this.remoteNodeId + " - starting queuing"));
                    }
                    this.pendingWrites = new ArrayDeque<MessageWrite>();
                }
                this.pendingWrites.add(new MessageWrite(message, writePromise));
                return;
            }
        }
        if (failure == null) {
            this.writeMessage(message).onComplete(writePromise);
        } else {
            writePromise.tryFail(failure);
        }
    }

    @Override
    public void handle(Buffer event) {
        this.vertx.cancelTimer(this.pingReplyTimeoutID);
        this.schedulePing();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void handleClose(Throwable cause) {
        if (this.pingReplyTimeoutID != -1L) {
            this.vertx.cancelTimer(this.pingReplyTimeoutID);
        }
        if (this.pingTimeoutID != -1L) {
            this.vertx.cancelTimer(this.pingTimeoutID);
        }
        OutboundConnection outboundConnection = this;
        synchronized (outboundConnection) {
            this.closed = true;
            if (this.pendingWrites != null) {
                MessageWrite msg;
                while ((msg = this.pendingWrites.poll()) != null) {
                    msg.writePromise.tryFail(cause);
                }
            }
        }
    }

    private void schedulePing() {
        EventBusOptions options = this.eventBus.options();
        this.pingTimeoutID = this.vertx.setTimer(options.getClusterPingInterval(), id1 -> {
            this.pingReplyTimeoutID = this.vertx.setTimer(options.getClusterPingReplyInterval(), id2 -> {
                log.warn((Object)("No pong from server " + this.remoteNodeId + " - will consider it dead"));
                this.socket.close();
            });
            ClusteredMessage<String, String> pingMessage = new ClusteredMessage<String, String>(this.remoteNodeId, PING_ADDRESS, null, null, new PingMessageCodec(), true, this.eventBus);
            this.writeMessage(pingMessage);
        });
    }

    synchronized void connected(NetSocket socket) {
        this.socket = socket;
        this.connected = true;
        this.schedulePing();
        if (this.pendingWrites != null) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("Draining the queue for server " + this.remoteNodeId));
            }
            for (MessageWrite ctx : this.pendingWrites) {
                this.writeMessage(ctx.message).onComplete(ctx.writePromise);
            }
        }
        this.pendingWrites = null;
    }

    private Future<Void> writeMessage(MessageImpl<?, ?> message) {
        Buffer data = ((ClusteredMessage)message).encodeToWire();
        if (this.metrics != null) {
            this.metrics.messageWritten(message.address(), data.length());
        }
        return this.socket.write(data);
    }

    private static class MessageWrite {
        final MessageImpl<?, ?> message;
        final Promise<Void> writePromise;

        MessageWrite(MessageImpl<?, ?> message, Promise<Void> writePromise) {
            this.message = message;
            this.writePromise = writePromise;
        }
    }
}

