/*
 * Decompiled with CFR 0.152.
 */
package com.arangodb.internal.velocystream.internal;

import com.arangodb.ArangoDBException;
import com.arangodb.internal.ArangoDefaults;
import com.arangodb.internal.net.Connection;
import com.arangodb.internal.net.HostDescription;
import com.arangodb.internal.velocystream.internal.Chunk;
import com.arangodb.internal.velocystream.internal.ChunkStore;
import com.arangodb.internal.velocystream.internal.Message;
import com.arangodb.internal.velocystream.internal.MessageStore;
import com.arangodb.velocypack.VPackBuilder;
import com.arangodb.velocypack.VPackSlice;
import com.arangodb.velocypack.ValueType;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocket;
import javax.net.ssl.SSLSocketFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class VstConnection<T>
implements Connection {
    private static final Logger LOGGER = LoggerFactory.getLogger(VstConnection.class);
    private static final byte[] PROTOCOL_HEADER = "VST/1.0\r\n\r\n".getBytes();
    private ExecutorService executor;
    private ScheduledExecutorService keepAliveScheduler;
    private final AtomicLong keepAliveId = new AtomicLong();
    protected final MessageStore messageStore;
    protected final Integer timeout;
    private final Long ttl;
    private final Integer keepAliveInterval;
    private int keepAliveFailCounter = 0;
    private final Boolean useSsl;
    private final SSLContext sslContext;
    private Socket socket;
    private OutputStream outputStream;
    private InputStream inputStream;
    private final HostDescription host;
    private final Map<Long, Long> sendTimestamps = new ConcurrentHashMap<Long, Long>();
    private final String connectionName;
    private final VPackSlice keepAliveRequest = new VPackBuilder().add(ValueType.ARRAY).add(Integer.valueOf(1)).add(Integer.valueOf(1)).add("_system").add(Integer.valueOf(1)).add("/_admin/server/availability").add(ValueType.OBJECT).close().add(ValueType.OBJECT).close().close().slice();

    protected VstConnection(HostDescription host, Integer timeout, Long ttl, Integer keepAliveInterval, Boolean useSsl, SSLContext sslContext, MessageStore messageStore) {
        this.host = host;
        this.timeout = timeout;
        this.ttl = ttl;
        this.keepAliveInterval = keepAliveInterval;
        this.useSsl = useSsl;
        this.sslContext = sslContext;
        this.messageStore = messageStore;
        this.connectionName = "connection_" + System.currentTimeMillis() + "_" + Math.random();
        LOGGER.debug("[" + this.connectionName + "]: Connection created");
    }

    protected T sendKeepAlive() {
        long id = this.keepAliveId.decrementAndGet();
        Message message = new Message(id, this.keepAliveRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("[%s]: Send keepalive probe (id=%s, head=%s, body=%s)", this.connectionName, message.getId(), message.getHead(), message.getBody() != null ? message.getBody() : "{}"));
        }
        return this.write(message, Collections.singleton(new Chunk(id, 0, 1, -1L, 0, this.keepAliveRequest.getByteSize())));
    }

    public abstract T write(Message var1, Collection<Chunk> var2);

    protected abstract void doKeepAlive();

    private void keepAlive() {
        block2: {
            try {
                this.doKeepAlive();
                this.keepAliveFailCounter = 0;
            }
            catch (Exception e) {
                LOGGER.error("Got exception while performing keepAlive request:", (Throwable)e);
                ++this.keepAliveFailCounter;
                if (this.keepAliveFailCounter < 3) break block2;
                LOGGER.error("KeepAlive request failed consecutively for 3 times, closing connection now...");
                this.messageStore.clear(new IOException("Connection unresponsive!"));
                this.close();
            }
        }
    }

    public boolean isOpen() {
        return this.socket != null && this.socket.isConnected() && !this.socket.isClosed();
    }

    public synchronized void open() throws IOException {
        if (this.isOpen()) {
            return;
        }
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("[%s]: Open connection to %s", this.connectionName, this.host));
        }
        this.socket = Boolean.TRUE == this.useSsl ? (this.sslContext != null ? this.sslContext.getSocketFactory().createSocket() : SSLSocketFactory.getDefault().createSocket()) : SocketFactory.getDefault().createSocket();
        this.socket.connect(new InetSocketAddress(this.host.getHost(), this.host.getPort()), this.timeout != null ? this.timeout : ArangoDefaults.DEFAULT_TIMEOUT);
        this.socket.setKeepAlive(true);
        this.socket.setTcpNoDelay(true);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("[%s]: Connected to %s", this.connectionName, this.socket));
        }
        this.outputStream = new BufferedOutputStream(this.socket.getOutputStream());
        this.inputStream = this.socket.getInputStream();
        if (Boolean.TRUE == this.useSsl) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug(String.format("[%s]: Start Handshake on %s", this.connectionName, this.socket));
            }
            ((SSLSocket)this.socket).startHandshake();
        }
        this.sendProtocolHeader();
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.submit(() -> {
            LOGGER.debug("[" + this.connectionName + "]: Start Callable");
            long openTime = new Date().getTime();
            Long ttlTime = this.ttl != null ? Long.valueOf(openTime + this.ttl) : null;
            ChunkStore chunkStore = new ChunkStore(this.messageStore);
            while (true) {
                if (ttlTime != null && new Date().getTime() > ttlTime && this.messageStore.isEmpty()) {
                    this.close();
                    break;
                }
                if (!this.isOpen()) {
                    this.messageStore.clear(new IOException("The socket is closed."));
                    this.close();
                    break;
                }
                try {
                    Chunk chunk = this.readChunk();
                    ByteBuffer chunkBuffer = chunkStore.storeChunk(chunk);
                    if (chunkBuffer == null) continue;
                    byte[] buf = new byte[chunk.getContentLength()];
                    this.readBytesIntoBuffer(buf, 0, buf.length);
                    chunkBuffer.put(buf);
                    chunkStore.checkCompleteness(chunk.getMessageId());
                }
                catch (Exception e) {
                    this.messageStore.clear(e);
                    this.close();
                    break;
                }
            }
            LOGGER.debug("[" + this.connectionName + "]: Stop Callable");
            return null;
        });
        if (this.keepAliveInterval != null) {
            this.keepAliveScheduler = Executors.newScheduledThreadPool(1);
            this.keepAliveScheduler.scheduleAtFixedRate(this::keepAlive, 0L, this.keepAliveInterval.intValue(), TimeUnit.SECONDS);
        }
    }

    @Override
    public synchronized void close() {
        if (this.keepAliveScheduler != null) {
            this.keepAliveScheduler.shutdownNow();
        }
        this.messageStore.clear();
        if (this.executor != null && !this.executor.isShutdown()) {
            this.executor.shutdown();
        }
        if (this.socket != null && !this.socket.isClosed()) {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("[%s]: Close connection %s", this.connectionName, this.socket));
                }
                this.socket.close();
            }
            catch (IOException e) {
                throw new ArangoDBException(e);
            }
        }
    }

    private synchronized void sendProtocolHeader() throws IOException {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("[%s]: Send velocystream protocol header to %s", this.connectionName, this.socket));
        }
        this.outputStream.write(PROTOCOL_HEADER);
        this.outputStream.flush();
    }

    protected synchronized void writeIntern(Message message, Collection<Chunk> chunks) throws ArangoDBException {
        for (Chunk chunk : chunks) {
            try {
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug(String.format("[%s]: Send chunk %s:%s from message %s", this.connectionName, chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
                    this.sendTimestamps.put(chunk.getMessageId(), System.currentTimeMillis());
                }
                this.writeChunkHead(chunk);
                int contentOffset = chunk.getContentOffset();
                int contentLength = chunk.getContentLength();
                VPackSlice head = message.getHead();
                int headLength = head.getByteSize();
                int written = 0;
                if (contentOffset < headLength) {
                    written = Math.min(contentLength, headLength - contentOffset);
                    this.outputStream.write(head.getBuffer(), contentOffset, written);
                }
                if (written < contentLength) {
                    VPackSlice body = message.getBody();
                    this.outputStream.write(body.getBuffer(), contentOffset + written - headLength, contentLength - written);
                }
                this.outputStream.flush();
            }
            catch (IOException e) {
                LOGGER.error("Error on Connection " + this.connectionName);
                throw new ArangoDBException(e);
            }
        }
    }

    private synchronized void writeChunkHead(Chunk chunk) throws IOException {
        long messageLength = chunk.getMessageLength();
        int headLength = messageLength > -1L ? 24 : 16;
        int length = chunk.getContentLength() + headLength;
        ByteBuffer buffer = ByteBuffer.allocate(headLength).order(ByteOrder.LITTLE_ENDIAN);
        buffer.putInt(length);
        buffer.putInt(chunk.getChunkX());
        buffer.putLong(chunk.getMessageId());
        if (messageLength > -1L) {
            buffer.putLong(messageLength);
        }
        this.outputStream.write(buffer.array());
    }

    protected Chunk readChunk() throws IOException {
        int contentLength;
        long messageLength;
        ByteBuffer chunkHeadBuffer = this.readBytes(16);
        int length = chunkHeadBuffer.getInt();
        int chunkX = chunkHeadBuffer.getInt();
        long messageId = chunkHeadBuffer.getLong();
        if (1 == (chunkX & 1) && chunkX >> 1 > 1) {
            messageLength = this.readBytes(8).getLong();
            contentLength = length - 24;
        } else {
            messageLength = -1L;
            contentLength = length - 16;
        }
        Chunk chunk = new Chunk(messageId, chunkX, messageLength, 0, contentLength);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug(String.format("[%s]: Received chunk %s:%s from message %s", this.connectionName, chunk.getChunk(), chunk.isFirstChunk() ? 1 : 0, chunk.getMessageId()));
            LOGGER.debug("[" + this.connectionName + "]: Responsetime for Message " + chunk.getMessageId() + " is " + (System.currentTimeMillis() - this.sendTimestamps.get(chunk.getMessageId())));
        }
        return chunk;
    }

    private ByteBuffer readBytes(int len) throws IOException {
        byte[] buf = new byte[len];
        this.readBytesIntoBuffer(buf, 0, len);
        return ByteBuffer.wrap(buf).order(ByteOrder.LITTLE_ENDIAN);
    }

    protected void readBytesIntoBuffer(byte[] buf, int off, int len) throws IOException {
        int read;
        for (int readed = 0; readed < len; readed += read) {
            read = this.inputStream.read(buf, off + readed, len - readed);
            if (read != -1) continue;
            throw new IOException("Reached the end of the stream.");
        }
    }

    public String getConnectionName() {
        return this.connectionName;
    }

    @Override
    public void setJwt(String jwt) {
    }
}

