/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote.protocol.socket;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.zip.CRC32;
import java.util.zip.CheckedInputStream;
import java.util.zip.CheckedOutputStream;
import org.apache.nifi.cluster.NodeInformant;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RemoteResourceFactory;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.StandardVersionNegotiator;
import org.apache.nifi.remote.VersionNegotiator;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.HandshakeException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.io.CompressionInputStream;
import org.apache.nifi.remote.io.CompressionOutputStream;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.RequestType;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.remote.protocol.socket.HandshakeProperty;
import org.apache.nifi.remote.protocol.socket.Response;
import org.apache.nifi.remote.protocol.socket.ResponseCode;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketFlowFileServerProtocol
implements ServerProtocol {
    public static final String RESOURCE_NAME = "SocketFlowFileProtocol";
    private ProcessGroup rootGroup;
    private String commsIdentifier;
    private boolean handshakeCompleted;
    private Boolean useGzip;
    private long requestExpirationMillis;
    private RootGroupPort port;
    private boolean shutdown = false;
    private FlowFileCodec negotiatedFlowFileCodec = null;
    private String transitUriPrefix = null;
    private final VersionNegotiator versionNegotiator = new StandardVersionNegotiator(new int[]{4, 3, 2, 1});
    private final Logger logger = LoggerFactory.getLogger(SocketFlowFileServerProtocol.class);
    private static final long BATCH_NANOS = TimeUnit.SECONDS.toNanos(5L);

    public void setRootProcessGroup(ProcessGroup group) {
        if (!group.isRootGroup()) {
            throw new IllegalArgumentException();
        }
        this.rootGroup = group;
    }

    public void handshake(Peer peer) throws IOException, HandshakeException {
        if (this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has already been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Handshaking with {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        this.commsIdentifier = dis.readUTF();
        if (this.versionNegotiator.getVersion() >= 3) {
            this.transitUriPrefix = dis.readUTF();
            if (!this.transitUriPrefix.endsWith("/")) {
                this.transitUriPrefix = this.transitUriPrefix + "/";
            }
        }
        HashMap<String, String> properties = new HashMap<String, String>();
        int numProperties = dis.readInt();
        for (int i = 0; i < numProperties; ++i) {
            String propertyName = dis.readUTF();
            String propertyValue = dis.readUTF();
            properties.put(propertyName, propertyValue);
        }
        boolean responseWritten = false;
        block8: for (Map.Entry entry : properties.entrySet()) {
            HandshakeProperty property;
            String propertyName = (String)entry.getKey();
            String value = (String)entry.getValue();
            try {
                property = HandshakeProperty.valueOf(propertyName);
            }
            catch (Exception e) {
                ResponseCode.UNKNOWN_PROPERTY_NAME.writeResponse(dos, "Unknown Property Name: " + propertyName);
                throw new HandshakeException("Received unknown property: " + propertyName);
            }
            block1 : switch (property) {
                case GZIP: {
                    this.useGzip = Boolean.parseBoolean(value);
                    break;
                }
                case REQUEST_EXPIRATION_MILLIS: {
                    this.requestExpirationMillis = Long.parseLong(value);
                    break;
                }
                case PORT_IDENTIFIER: {
                    Port receivedPort = this.rootGroup.getInputPort(value);
                    if (receivedPort == null) {
                        receivedPort = this.rootGroup.getOutputPort(value);
                    }
                    if (receivedPort == null) {
                        this.logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", (Object)value);
                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
                        throw new HandshakeException("Received unknown port identifier: " + value);
                    }
                    if (!(receivedPort instanceof RootGroupPort)) {
                        this.logger.debug("Responding with ResponseCode UNKNOWN_PORT for identifier {}", (Object)value);
                        ResponseCode.UNKNOWN_PORT.writeResponse(dos);
                        throw new HandshakeException("Received port identifier " + value + ", but this Port is not a RootGroupPort");
                    }
                    this.port = (RootGroupPort)receivedPort;
                    PortAuthorizationResult portAuthResult = this.port.checkUserAuthorization(peer.getCommunicationsSession().getUserDn());
                    if (!portAuthResult.isAuthorized()) {
                        this.logger.debug("Responding with ResponseCode UNAUTHORIZED: ", (Object)portAuthResult.getExplanation());
                        ResponseCode.UNAUTHORIZED.writeResponse(dos, portAuthResult.getExplanation());
                        responseWritten = true;
                        break;
                    }
                    if (!receivedPort.isValid()) {
                        this.logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", (Object)receivedPort);
                        ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port is not valid");
                        responseWritten = true;
                        break;
                    }
                    if (!receivedPort.isRunning()) {
                        this.logger.debug("Responding with ResponseCode PORT_NOT_IN_VALID_STATE for {}", (Object)receivedPort);
                        ResponseCode.PORT_NOT_IN_VALID_STATE.writeResponse(dos, "Port not running");
                        responseWritten = true;
                        break;
                    }
                    if (this.getVersionNegotiator().getVersion() <= 1) break;
                    for (Connection connection : this.port.getConnections()) {
                        if (!connection.getFlowFileQueue().isFull()) continue;
                        this.logger.debug("Responding with ResponseCode PORTS_DESTINATION_FULL for {}", (Object)receivedPort);
                        ResponseCode.PORTS_DESTINATION_FULL.writeResponse(dos);
                        responseWritten = true;
                        break block1;
                    }
                    continue block8;
                }
            }
        }
        if (this.useGzip == null) {
            this.logger.debug("Responding with ResponseCode MISSING_PROPERTY because GZIP Property missing");
            ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.GZIP.name());
            throw new HandshakeException("Missing Property " + HandshakeProperty.GZIP.name());
        }
        if (this.port == null) {
            this.logger.debug("Responding with ResponseCode MISSING_PROPERTY because Port Identifier property is missing");
            ResponseCode.MISSING_PROPERTY.writeResponse(dos, HandshakeProperty.PORT_IDENTIFIER.name());
            throw new HandshakeException("Missing Property " + HandshakeProperty.PORT_IDENTIFIER.name());
        }
        if (!responseWritten) {
            ResponseCode.PROPERTIES_OK.writeResponse(dos);
        }
        this.logger.debug("{} Finished handshake with {}", (Object)this, (Object)peer);
        this.handshakeCompleted = true;
    }

    public boolean isHandshakeSuccessful() {
        return this.handshakeCompleted;
    }

    public RootGroupPort getPort() {
        return this.port;
    }

    public FlowFileCodec negotiateCodec(Peer peer) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Negotiating Codec with {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        try {
            this.negotiatedFlowFileCodec = RemoteResourceFactory.receiveCodecNegotiation(dis, dos);
            this.logger.debug("{} Negotiated Codec {} with {}", new Object[]{this, this.negotiatedFlowFileCodec, peer});
            return this.negotiatedFlowFileCodec;
        }
        catch (HandshakeException e) {
            throw new ProtocolException(e.toString());
        }
    }

    public FlowFileCodec getPreNegotiatedCodec() {
        return this.negotiatedFlowFileCodec;
    }

    public int transferFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException {
        Response transactionResponse;
        Object flowFile;
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Sending FlowFiles to {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        String remoteDn = commsSession.getUserDn();
        if (remoteDn == null) {
            remoteDn = "none";
        }
        if ((flowFile = session.get()) == null) {
            this.logger.debug("{} No data to send to {}", (Object)this, (Object)peer);
            ResponseCode.NO_MORE_DATA.writeResponse(dos);
            return 0;
        }
        this.logger.debug("{} Data is available to send to {}", (Object)this, (Object)peer);
        ResponseCode.MORE_DATA.writeResponse(dos);
        StopWatch stopWatch = new StopWatch(true);
        long bytesSent = 0L;
        HashSet<FlowFile> flowFilesSent = new HashSet<FlowFile>();
        CRC32 crc = new CRC32();
        boolean continueTransaction = true;
        long startNanos = System.nanoTime();
        String calculatedCRC = "";
        while (continueTransaction) {
            DataOutputStream flowFileOutputStream = this.useGzip != false ? new CompressionOutputStream((OutputStream)dos) : dos;
            this.logger.debug("{} Sending {} to {}", new Object[]{this, flowFile, peer});
            CheckedOutputStream checkedOutputStream = new CheckedOutputStream(flowFileOutputStream, crc);
            StopWatch transferWatch = new StopWatch(true);
            flowFile = codec.encode(flowFile, session, (OutputStream)checkedOutputStream);
            long transmissionMillis = transferWatch.getElapsed(TimeUnit.MILLISECONDS);
            if (this.useGzip.booleanValue()) {
                checkedOutputStream.close();
            }
            flowFilesSent.add((FlowFile)flowFile);
            bytesSent += flowFile.getSize();
            String transitUri = this.transitUriPrefix == null ? peer.getUrl() : this.transitUriPrefix + flowFile.getAttribute(CoreAttributes.UUID.key());
            session.getProvenanceReporter().send(flowFile, transitUri, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transmissionMillis, false);
            session.remove(flowFile);
            long sendingNanos = System.nanoTime() - startNanos;
            flowFile = sendingNanos < BATCH_NANOS ? session.get() : null;
            boolean bl = continueTransaction = flowFile != null;
            if (continueTransaction) {
                this.logger.debug("{} Sending ContinueTransaction indicator to {}", (Object)this, (Object)peer);
                ResponseCode.CONTINUE_TRANSACTION.writeResponse(dos);
                continue;
            }
            this.logger.debug("{} Sending FinishTransaction indicator to {}", (Object)this, (Object)peer);
            ResponseCode.FINISH_TRANSACTION.writeResponse(dos);
            calculatedCRC = String.valueOf(checkedOutputStream.getChecksum().getValue());
        }
        Response transactionConfirmationResponse = Response.read(dis);
        if (transactionConfirmationResponse.getCode() == ResponseCode.CONFIRM_TRANSACTION) {
            this.logger.debug("{} Received {}  from {}", new Object[]{this, transactionConfirmationResponse, peer});
            String receivedCRC = transactionConfirmationResponse.getMessage();
            if (this.versionNegotiator.getVersion() > 3 && !receivedCRC.equals(calculatedCRC)) {
                ResponseCode.BAD_CHECKSUM.writeResponse(dos);
                session.rollback();
                throw new IOException(this + " Sent data to peer " + peer + " but calculated CRC32 Checksum as " + calculatedCRC + " while peer calculated CRC32 Checksum as " + receivedCRC + "; canceling transaction and rolling back session");
            }
        } else {
            throw new ProtocolException("Expected to receive 'Confirm Transaction' response from peer " + peer + " but received " + transactionConfirmationResponse);
        }
        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, "");
        String flowFileDescription = flowFilesSent.size() < 20 ? ((Object)flowFilesSent).toString() : flowFilesSent.size() + " FlowFiles";
        try {
            transactionResponse = Response.read(dis);
        }
        catch (IOException e) {
            this.logger.error("{} Failed to receive a response from {} when expecting a TransactionFinished Indicator. It is unknown whether or not the peer successfully received/processed the data. Therefore, {} will be rolled back, possibly resulting in data duplication of {}", new Object[]{this, peer, session, flowFileDescription});
            session.rollback();
            throw e;
        }
        this.logger.debug("{} received {} from {}", new Object[]{this, transactionResponse, peer});
        if (transactionResponse.getCode() == ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL) {
            peer.penalize(this.port.getYieldPeriod(TimeUnit.MILLISECONDS));
        } else if (transactionResponse.getCode() != ResponseCode.TRANSACTION_FINISHED) {
            throw new ProtocolException("After sending data, expected TRANSACTION_FINISHED response but got " + transactionResponse);
        }
        session.commit();
        stopWatch.stop();
        String uploadDataRate = stopWatch.calculateDataRate(bytesSent);
        long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
        String dataSize = FormatUtils.formatDataSize((double)bytesSent);
        this.logger.info("{} Successfully sent {} ({}) to {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
        return flowFilesSent.size();
    }

    public int receiveFlowFiles(Peer peer, ProcessContext context, ProcessSession session, FlowFileCodec codec) throws IOException, ProtocolException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} receiving FlowFiles from {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataInputStream dis = new DataInputStream(commsSession.getInput().getInputStream());
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        String remoteDn = commsSession.getUserDn();
        if (remoteDn == null) {
            remoteDn = "none";
        }
        StopWatch stopWatch = new StopWatch(true);
        CRC32 crc = new CRC32();
        HashSet<FlowFile> flowFilesReceived = new HashSet<FlowFile>();
        long bytesReceived = 0L;
        boolean continueTransaction = true;
        String calculatedCRC = "";
        block8: while (continueTransaction) {
            long startNanos = System.nanoTime();
            DataInputStream flowFileInputStream = this.useGzip != false ? new CompressionInputStream((InputStream)dis) : dis;
            CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc);
            FlowFile flowFile = codec.decode((InputStream)checkedInputStream, session);
            long transferNanos = System.nanoTime() - startNanos;
            long transferMillis = TimeUnit.MILLISECONDS.convert(transferNanos, TimeUnit.NANOSECONDS);
            String sourceSystemFlowFileUuid = flowFile.getAttribute(CoreAttributes.UUID.key());
            flowFile = session.putAttribute(flowFile, CoreAttributes.UUID.key(), UUID.randomUUID().toString());
            String transitUri = this.transitUriPrefix == null ? peer.getUrl() : this.transitUriPrefix + sourceSystemFlowFileUuid;
            session.getProvenanceReporter().receive(flowFile, transitUri, "urn:nifi:" + sourceSystemFlowFileUuid, "Remote Host=" + peer.getHost() + ", Remote DN=" + remoteDn, transferMillis);
            session.transfer(flowFile, Relationship.ANONYMOUS);
            flowFilesReceived.add(flowFile);
            bytesReceived += flowFile.getSize();
            Response transactionResponse = Response.read(dis);
            switch (transactionResponse.getCode()) {
                case CONTINUE_TRANSACTION: {
                    this.logger.debug("{} Received ContinueTransaction indicator from {}", (Object)this, (Object)peer);
                    continue block8;
                }
                case FINISH_TRANSACTION: {
                    this.logger.debug("{} Received FinishTransaction indicator from {}", (Object)this, (Object)peer);
                    continueTransaction = false;
                    calculatedCRC = String.valueOf(checkedInputStream.getChecksum().getValue());
                    continue block8;
                }
            }
            throw new ProtocolException("Received unexpected response from peer: when expecting Continue Transaction or Finish Transaction, received" + transactionResponse);
        }
        this.logger.debug("{} Sending CONFIRM_TRANSACTION Response Code to {}", (Object)this, (Object)peer);
        ResponseCode.CONFIRM_TRANSACTION.writeResponse(dos, calculatedCRC);
        Response confirmTransactionResponse = Response.read(dis);
        this.logger.debug("{} Received {} from {}", new Object[]{this, confirmTransactionResponse, peer});
        switch (confirmTransactionResponse.getCode()) {
            case CONFIRM_TRANSACTION: {
                break;
            }
            case BAD_CHECKSUM: {
                session.rollback();
                throw new IOException(this + " Received a BadChecksum response from peer " + peer);
            }
            default: {
                throw new ProtocolException(this + " Received unexpected Response Code from peer " + peer + " : " + confirmTransactionResponse + "; expected 'Confirm Transaction' Response Code");
            }
        }
        session.commit();
        if (context.getAvailableRelationships().isEmpty()) {
            this.logger.debug("{} Sending TRANSACTION_FINISHED_BUT_DESTINATION_FULL to {}", (Object)this, (Object)peer);
            ResponseCode.TRANSACTION_FINISHED_BUT_DESTINATION_FULL.writeResponse(dos);
        } else {
            this.logger.debug("{} Sending TRANSACTION_FINISHED to {}", (Object)this, (Object)peer);
            ResponseCode.TRANSACTION_FINISHED.writeResponse(dos);
        }
        stopWatch.stop();
        String flowFileDescription = flowFilesReceived.size() < 20 ? ((Object)flowFilesReceived).toString() : flowFilesReceived.size() + " FlowFiles";
        String uploadDataRate = stopWatch.calculateDataRate(bytesReceived);
        long uploadMillis = stopWatch.getDuration(TimeUnit.MILLISECONDS);
        String dataSize = FormatUtils.formatDataSize((double)bytesReceived);
        this.logger.info("{} Successfully received {} ({}) from {} in {} milliseconds at a rate of {}", new Object[]{this, flowFileDescription, dataSize, peer, uploadMillis, uploadDataRate});
        return flowFilesReceived.size();
    }

    public RequestType getRequestType(Peer peer) throws IOException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Reading Request Type from {} using {}", new Object[]{this, peer, peer.getCommunicationsSession()});
        RequestType requestType = RequestType.readRequestType((DataInputStream)new DataInputStream(peer.getCommunicationsSession().getInput().getInputStream()));
        this.logger.debug("{} Got Request Type {} from {}", new Object[]{this, requestType, peer});
        return requestType;
    }

    public VersionNegotiator getVersionNegotiator() {
        return this.versionNegotiator;
    }

    public void shutdown(Peer peer) {
        this.logger.debug("{} Shutting down with {}", (Object)this, (Object)peer);
        this.shutdown = true;
    }

    public boolean isShutdown() {
        return this.shutdown;
    }

    public void sendPeerList(Peer peer) throws IOException {
        if (!this.handshakeCompleted) {
            throw new IllegalStateException("Handshake has not been completed");
        }
        if (this.shutdown) {
            throw new IllegalStateException("Protocol is shutdown");
        }
        this.logger.debug("{} Sending Peer List to {}", (Object)this, (Object)peer);
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        DataOutputStream dos = new DataOutputStream(commsSession.getOutput().getOutputStream());
        NiFiProperties properties = NiFiProperties.getInstance();
        dos.writeInt(1);
        dos.writeUTF(InetAddress.getLocalHost().getHostName());
        dos.writeInt(properties.getRemoteInputPort());
        dos.writeBoolean(properties.isSiteToSiteSecure());
        dos.writeInt(0);
        dos.flush();
    }

    public String getResourceName() {
        return RESOURCE_NAME;
    }

    public void setNodeInformant(NodeInformant nodeInformant) {
    }

    public long getRequestExpiration() {
        return this.requestExpirationMillis;
    }

    public String toString() {
        return "SocketFlowFileServerProtocol[CommsID=" + this.commsIdentifier + "]";
    }
}

