/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.protocol.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PeerStatus;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.VersionedRemoteResource;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.codec.StandardFlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.ClientProtocol;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.socket.HandshakeProperty;
import org.apache.nifi.remote.protocol.socket.Response;
import org.apache.nifi.remote.protocol.socket.ResponseCode;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketClientProtocol
implements ClientProtocol {
    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(new int[]{4, 3, 2, 1});
    private RemoteGroupPort port;
    private boolean useCompression;
    private String commsIdentifier;
    private boolean handshakeComplete = false;
    private final Logger logger = LoggerFactory.getLogger(SocketClientProtocol.class);
    private Response handshakeResponse = null;
    private boolean readyForFileTransfer = false;
    private String transitUriPrefix = null;
    private static final long BATCH_SEND_NANOS = TimeUnit.SECONDS.toNanos(5L);

    public void setPort(RemoteGroupPort port) {
        this.port = port;
        this.useCompression = port.isUseCompression();
    }

    public void handshake(Peer peer) throws IOException, HandshakeException {
        if (this.handshakeComplete) {
            throw new IllegalStateException("Handshake has already been completed");
        }
        this.commsIdentifier = UUID.randomUUID().toString();
        this.logger.debug("{} handshaking with {}", (Object)this, (Object)peer);
        HashMap<HandshakeProperty, String> properties = new HashMap<HandshakeProperty, String>();
        properties.put(HandshakeProperty.GZIP, String.valueOf(this.useCompression));
        properties.put(HandshakeProperty.PORT_IDENTIFIER, this.port.getIdentifier());
        properties.put(HandshakeProperty.REQUEST_EXPIRATION_MILLIS, String.valueOf(this.port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS)));
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        commsSession.setTimeout(this.port.getRemoteProcessGroup().getCommunicationsTimeout(TimeUnit.MILLISECONDS));
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        dos.writeUTF(this.commsIdentifier);
        if (this.versionNegotiator.getVersion() >= 3) {
            dos.writeUTF(peer.getUrl());
            this.transitUriPrefix = peer.getUrl();
            if (!this.transitUriPrefix.endsWith("/")) {
                this.transitUriPrefix = this.transitUriPrefix + "/";
            }
        }
        dos.writeInt(properties.size());
        for (Map.Entry entry : properties.entrySet()) {
            dos.writeUTF(((HandshakeProperty)((Object)entry.getKey())).name());
            dos.writeUTF((String)entry.getValue());
        }
        dos.flush();
        try {
            this.handshakeResponse = Response.read(dis);
        }
        catch (ProtocolException e) {
            throw new HandshakeException((Throwable)e);
        }
        switch (this.handshakeResponse.getCode()) {
            case PORT_NOT_IN_VALID_STATE: 
            case UNKNOWN_PORT: 
            case PORTS_DESTINATION_FULL: {
                break;
            }
            case PROPERTIES_OK: {
                this.readyForFileTransfer = true;
                break;
            }
            default: {
                this.logger.error("{} received unexpected response {} from {} when negotiating Codec", new Object[]{this, this.handshakeResponse, peer});
                peer.close();
                throw new HandshakeException("Received unexpected response " + this.handshakeResponse);
            }
        }
        this.logger.debug("{} Finished handshake with {}", (Object)this, (Object)peer);
        this.handshakeComplete = true;
    }

    public boolean isReadyForFileTransfer() {
        return this.readyForFileTransfer;
    }

    public boolean isPortInvalid() {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not completed successfully");
        }
        return this.handshakeResponse.getCode() == ResponseCode.PORT_NOT_IN_VALID_STATE;
    }

    public boolean isPortUnknown() {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not completed successfully");
        }
        return this.handshakeResponse.getCode() == ResponseCode.UNKNOWN_PORT;
    }

    public boolean isDestinationFull() {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not completed successfully");
        }
        return this.handshakeResponse.getCode() == ResponseCode.PORTS_DESTINATION_FULL;
    }

    public Set<PeerStatus> getPeerStatuses(Peer peer) throws IOException {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        this.logger.debug("{} Get Peer Statuses from {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        RequestType.REQUEST_PEER_LIST.writeRequestType(dos);
        dos.flush();
        int numPeers = dis.readInt();
        HashSet<PeerStatus> peers = new HashSet<PeerStatus>(numPeers);
        for (int i = 0; i < numPeers; ++i) {
            String hostname = dis.readUTF();
            int port = dis.readInt();
            boolean secure = dis.readBoolean();
            int flowFileCount = dis.readInt();
            peers.add(new PeerStatus(hostname, port, secure, flowFileCount));
        }
        this.logger.debug("{} Received {} Peer Statuses from {}", new Object[]{this, peers.size(), peer});
        return peers;
    }

    public FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        this.logger.debug("{} Negotiating Codec with {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        RequestType.NEGOTIATE_FLOWFILE_CODEC.writeRequestType(dos);
        StandardFlowFileCodec codec = new StandardFlowFileCodec();
        try {
            codec = (FlowFileCodec)RemoteResourceFactory.initiateResourceNegotiation((VersionedRemoteResource)codec, dis, dos);
        }
        catch (HandshakeException e) {
            throw new ProtocolException(e.toString());
        }
        this.logger.debug("{} negotiated FlowFileCodec {} with {}", new Object[]{this, codec, commsSession});
        return codec;
    }

    public void receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException {
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        if (!this.readyForFileTransfer) {
            throw new IllegalStateException("Cannot receive files; handshake resolution was " + this.handshakeResponse);
        }
        this.logger.debug("{} Receiving FlowFiles from {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        String userDn = commsSession.getUserDn();
        if (userDn == null) {
            userDn = "none";
        }
        RequestType.RECEIVE_FLOWFILES.writeRequestType(dos);
        dos.flush();
        Response dataAvailableCode = Response.read(dis);
        switch (dataAvailableCode.getCode()) {
            case MORE_DATA: {
                this.logger.debug("{} {} Indicates that data is available", (Object)this, (Object)peer);
                break;
            }
            case NO_MORE_DATA: {
                this.logger.debug("{} No data available from {}", (Object)peer);
                return;
            }
            default: {
                throw new ProtocolException("Got unexpected response when asking for data: " + dataAvailableCode);
            }
        }
        StopWatch stopWatch = new StopWatch(true);
        HashSet<FlowFile> flowFilesReceived = new HashSet<FlowFile>();
        long bytesReceived = 0L;
        CRC32 crc = new CRC32();
        boolean continueTransaction = true;
        String calculatedCRC = "";
        block12: while (continueTransaction) {
            DataInputStream flowFileInputStream = this.useCompression ? new CompressionInputStream((InputStream)dis) : dis;
            CheckedInputStream checkedIn = new CheckedInputStream(flowFileInputStream, crc);
            long startNanos = System.nanoTime();
            FlowFile flowFile = codec.decode((InputStream)checkedIn, session);
            long transmissionNanos = System.nanoTime() - startNanos;
            long transmissionMillis = TimeUnit.MILLISECONDS.convert(transmissionNanos, TimeUnit.NANOSECONDS);
            String sourceFlowFileIdentifier = flowFile.getAttribute(CoreAttributes.UUID.key());
            flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
            String transitUri = this.transitUriPrefix == null ? peer.getUrl() : this.transitUriPrefix + sourceFlowFileIdentifier;
            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceFlowFileIdentifier, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transmissionMillis);
            session.transfer(flowFile, Relationship.ANONYMOUS);
            bytesReceived += flowFile.getSize();
            flowFilesReceived.add(flowFile);
            this.logger.debug("{} Received {} from {}", new Object[]{this, flowFile, peer});
            Response transactionCode = Response.read(dis);
            switch (transactionCode.getCode()) {
                case CONTINUE_TRANSACTION: {
                    this.logger.trace("{} Received ContinueTransaction indicator from {}", (Object)this, (Object)peer);
                    continue block12;
                }
                case FINISH_TRANSACTION: {
                    this.logger.trace("{} Received FinishTransaction indicator from {}", (Object)this, (Object)peer);
                    continueTransaction = false;
                    calculatedCRC = String.valueOf(checkedIn.getChecksum().getValue());
                    continue block12;
                }
            }
            throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionCode);
        }
        this.logger.trace("{} Sending CONFIRM_TRANSACTION Response Code to {}", (Object)this, (Object)peer);
        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
        Response confirmTransactionResponse = Response.read(dis);
        this.logger.trace("{} Received {} from {}", new Object[]{this, confirmTransactionResponse, peer});
        switch (confirmTransactionResponse.getCode()) {
            case CONFIRM_TRANSACTION: {
                break;
            }
            case BAD_CHECKSUM: {
                session.rollback();
                throw new IOException(this + " Received a BadChecksum response from peer " + peer);
            }
            default: {
                throw new ProtocolException(this + " Received unexpected Response from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
            }
        }
        session.commit();
        if (context.getAvailableRelationships().isEmpty()) {
            this.logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", (Object)this, (Object)peer);
            ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
        } else {
            this.logger.debug("{} Sending TRANSACTION_FINISHED to {}", (Object)this, (Object)peer);
            ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
        }
        stopWatch.stop();
        String flowFileDescription = flowFilesReceived.size() < 20 ? ((Object)flowFilesReceived).toString() : flowFilesReceived.size() + " FlowFiles";
        String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
        long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
        String dataSize = FormatUtils.formatDataSize((double)bytesReceived);
        this.logger.info("{} Successfully receveied {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
    }

    public void transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException {
        Response transactionResponse;
        if (!this.handshakeComplete) {
            throw new IllegalStateException("Handshake has not been performed");
        }
        if (!this.readyForFileTransfer) {
            throw new IllegalStateException("Cannot transfer files; handshake resolution was " + this.handshakeResponse);
        }
        Object flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        this.logger.debug("{} Sending FlowFiles to {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        String userDn = commsSession.getUserDn();
        if (userDn == null) {
            userDn = "none";
        }
        RequestType.SEND_FLOWFILES.writeRequestType(dos);
        dos.flush();
        StopWatch stopWatch = new StopWatch(true);
        CRC32 crc = new CRC32();
        long bytesSent = 0L;
        HashSet<FlowFile> flowFilesSent = new HashSet<FlowFile>();
        boolean continueTransaction = true;
        String calculatedCRC = "";
        long startSendingNanos = System.nanoTime();
        while (continueTransaction) {
            DataOutputStream flowFileOutputStream = this.useCompression ? new CompressionOutputStream((OutputStream)dos) : dos;
            this.logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
            CheckedOutputStream checkedOutStream = new CheckedOutputStream(flowFileOutputStream, crc);
            long startNanos = System.nanoTime();
            flowFile = codec.encode(flowFile, session, (OutputStream)checkedOutStream);
            long transferNanos = System.nanoTime() - startNanos;
            long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
            if (this.useCompression) {
                checkedOutStream.close();
            }
            flowFilesSent.add((FlowFile)flowFile);
            bytesSent += flowFile.getSize();
            this.logger.debug("{} Sent {} to {}", new Object[]{this, flowFile, peer});
            String transitUri = this.transitUriPrefix == null ? peer.getUrl() : this.transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
            session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + userDn, transferMillis, false);
            session.remove(flowFile);
            long sendingNanos = System.nanoTime() - startSendingNanos;
            flowFile = sendingNanos < BATCH_SEND_NANOS ? session.get() : null;
            boolean bl = continueTransaction = flowFile != null;
            if (continueTransaction) {
                this.logger.debug("{} Sent CONTINUE_TRANSACTION indicator to {}", (Object)this, (Object)peer);
                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
                continue;
            }
            this.logger.debug("{} Sent FINISH_TRANSACTION indicator to {}", (Object)this, (Object)peer);
            ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
            calculatedCRC = String.valueOf(checkedOutStream.getChecksum().getValue());
        }
        Response transactionConfirmationResponse = Response.read(dis);
        if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
            this.logger.trace("{} Received {} from {}", new Object[]{this, transactionConfirmationResponse, peer});
            String receivedCRC = transactionConfirmationResponse.getMessage();
            if (this.versionNegotiator.getVersion() > 3 && !receivedCRC.equals(calculatedCRC)) {
                ResponseCode.BAD_CHECKSUM.writeResponse(dos);
                session.rollback();
                throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
            }
        } else {
            throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
        }
        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
        String flowFileDescription = flowFilesSent.size() < 20 ? ((Object)flowFilesSent).toString() : flowFilesSent.size() + " FlowFiles";
        try {
            transactionResponse = Response.read(dis);
        }
        catch (IOException e) {
            this.logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator. It is unknown whether or not the peer successfully received/processed the data. Therefore, {} will be rolled back, possibly resulting in data duplication of {}", new Object[]{this, peer, session, flowFileDescription});
            session.rollback();
            throw e;
        }
        this.logger.debug("{} Received {} from {}", new Object[]{this, transactionResponse, peer});
        if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
            peer.penalize(this.port.getYieldPeriod(TimeUnit.MILLISECONDS));
        } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
            throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
        }
        stopWatch.stop();
        String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
        long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
        String dataSize = FormatUtils.formatDataSize((double)bytesSent);
        session.commit();
        this.logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
    }

    public VersionNegotiator getVersionNegotiator() {
        return this.versionNegotiator;
    }

    public void shutdown(Peer peer) throws IOException {
        this.readyForFileTransfer = false;
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        this.logger.debug("{} Shutting down with {}", (Object)this, (Object)peer);
        RequestType.SHUTDOWN.writeRequestType(dos);
        dos.flush();
    }

    public String getResourceName() {
        return "SocketFlowFileProtocol";
    }

    public String toString() {
        return "SocketClientProtocol[CommsID=" + this.commsIdentifier + "]";
    }
}

