/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.remote;

import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.nifi.admin.service.AccountDisabledException;
import org.apache.nifi.admin.service.AccountNotFoundException;
import org.apache.nifi.admin.service.AccountPendingException;
import org.apache.nifi.admin.service.AdministrationException;
import org.apache.nifi.admin.service.UserService;
import org.apache.nifi.authorization.Authority;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.controller.AbstractPort;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.events.BulletinFactory;
import org.apache.nifi.events.EventReporter;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.remote.Peer;
import org.apache.nifi.remote.PortAuthorizationResult;
import org.apache.nifi.remote.RootGroupPort;
import org.apache.nifi.remote.TransferDirection;
import org.apache.nifi.remote.codec.FlowFileCodec;
import org.apache.nifi.remote.exception.BadRequestException;
import org.apache.nifi.remote.exception.NotAuthorizedException;
import org.apache.nifi.remote.exception.ProtocolException;
import org.apache.nifi.remote.exception.RequestExpiredException;
import org.apache.nifi.remote.exception.TransmissionDisabledException;
import org.apache.nifi.remote.protocol.CommunicationsSession;
import org.apache.nifi.remote.protocol.ServerProtocol;
import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.scheduling.SchedulingStrategy;
import org.apache.nifi.user.NiFiUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StandardRootGroupPort
extends AbstractPort
implements RootGroupPort {
    private static final String CATEGORY = "Site to Site";
    private static final Logger logger = LoggerFactory.getLogger(StandardRootGroupPort.class);
    private final AtomicReference<Set<String>> groupAccessControl = new AtomicReference(new HashSet());
    private final AtomicReference<Set<String>> userAccessControl = new AtomicReference(new HashSet());
    private final ProcessScheduler processScheduler;
    private final boolean secure;
    private final UserService userService;
    private final BulletinRepository bulletinRepository;
    private final EventReporter eventReporter;
    private final ProcessScheduler scheduler;
    private final Set<Relationship> relationships;
    private final BlockingQueue<FlowFileRequest> requestQueue = new ArrayBlockingQueue<FlowFileRequest>(1000);
    private final Set<FlowFileRequest> activeRequests = new HashSet<FlowFileRequest>();
    private final Lock requestLock = new ReentrantLock();
    private boolean shutdown = false;

    public StandardRootGroupPort(String id, String name, ProcessGroup processGroup, TransferDirection direction, ConnectableType type, UserService userService, final BulletinRepository bulletinRepository, ProcessScheduler scheduler, boolean secure) {
        super(id, name, processGroup, type, scheduler);
        this.processScheduler = scheduler;
        this.setScheduldingPeriod("30000 nanos");
        this.userService = userService;
        this.secure = secure;
        this.bulletinRepository = bulletinRepository;
        this.scheduler = scheduler;
        this.setYieldPeriod("100 millis");
        this.eventReporter = new EventReporter(){

            public void reportEvent(Severity severity, String category, String message) {
                String groupId = StandardRootGroupPort.this.getProcessGroup().getIdentifier();
                String sourceId = StandardRootGroupPort.this.getIdentifier();
                String sourceName = StandardRootGroupPort.this.getName();
                bulletinRepository.addBulletin(BulletinFactory.createBulletin((String)groupId, (String)sourceId, (String)sourceName, (String)category, (String)severity.name(), (String)message));
            }
        };
        this.relationships = direction == TransferDirection.RECEIVE ? Collections.singleton(AbstractPort.PORT_RELATIONSHIP) : Collections.emptySet();
    }

    public Collection<Relationship> getRelationships() {
        return this.relationships;
    }

    public boolean isTriggerWhenEmpty() {
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onTrigger(ProcessContext context, ProcessSessionFactory sessionFactory) {
        FlowFileRequest flowFileRequest = (FlowFileRequest)this.requestQueue.poll();
        if (flowFileRequest == null) {
            context.yield();
            return;
        }
        flowFileRequest.setServiceBegin();
        this.requestLock.lock();
        try {
            CommunicationsSession commsSession;
            if (this.shutdown && (commsSession = flowFileRequest.getPeer().getCommunicationsSession()) != null) {
                commsSession.interrupt();
            }
            this.activeRequests.add(flowFileRequest);
        }
        finally {
            this.requestLock.unlock();
        }
        ProcessSession session = sessionFactory.createSession();
        try {
            this.onTrigger(context, session, flowFileRequest);
        }
        catch (TransmissionDisabledException e) {
            session.rollback();
        }
        catch (Exception e) {
            logger.error("{} Failed to process data due to {}", new Object[]{this, e});
            if (logger.isDebugEnabled()) {
                logger.error("", (Throwable)e);
            }
            session.rollback();
        }
        finally {
            this.requestLock.lock();
            try {
                this.activeRequests.remove(flowFileRequest);
            }
            finally {
                this.requestLock.unlock();
            }
        }
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
    }

    private void onTrigger(ProcessContext context, ProcessSession session, FlowFileRequest flowFileRequest) {
        int transferCount;
        ServerProtocol protocol = flowFileRequest.getProtocol();
        BlockingQueue<ProcessingResult> responseQueue = flowFileRequest.getResponseQueue();
        if (flowFileRequest.isExpired()) {
            String message = String.format("%s Cannot service request from %s because the request has timed out", new Object[]{this, flowFileRequest.getPeer()});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            responseQueue.add(new ProcessingResult((Exception)new RequestExpiredException()));
            return;
        }
        Peer peer = flowFileRequest.getPeer();
        CommunicationsSession commsSession = peer.getCommunicationsSession();
        String sourceDn = commsSession.getUserDn();
        logger.debug("{} Servicing request for {} (DN={})", new Object[]{this, peer, sourceDn});
        PortAuthorizationResult authorizationResult = this.checkUserAuthorization(sourceDn);
        if (!authorizationResult.isAuthorized()) {
            String message = String.format("%s Cannot service request from %s (DN=%s) because peer is not authorized to communicate with this port: %s", new Object[]{this, flowFileRequest.getPeer(), flowFileRequest.getPeer().getCommunicationsSession().getUserDn(), authorizationResult.getExplanation()});
            logger.error(message);
            this.eventReporter.reportEvent(Severity.ERROR, CATEGORY, message);
            responseQueue.add(new ProcessingResult((Exception)new NotAuthorizedException(authorizationResult.getExplanation())));
            return;
        }
        FlowFileCodec codec = protocol.getPreNegotiatedCodec();
        if (codec == null) {
            responseQueue.add(new ProcessingResult((Exception)new BadRequestException("None of the supported FlowFile Codecs supplied is compatible with this instance")));
            return;
        }
        try {
            transferCount = this.getConnectableType() == ConnectableType.INPUT_PORT ? this.receiveFlowFiles(context, session, codec, flowFileRequest) : this.transferFlowFiles(context, session, codec, flowFileRequest);
        }
        catch (IOException e) {
            session.rollback();
            responseQueue.add(new ProcessingResult(e));
            return;
        }
        catch (Exception e) {
            session.rollback();
            responseQueue.add(new ProcessingResult(e));
            return;
        }
        session.commit();
        responseQueue.add(new ProcessingResult(transferCount));
    }

    private int transferFlowFiles(ProcessContext context, ProcessSession session, FlowFileCodec codec, FlowFileRequest request) throws IOException, ProtocolException {
        return request.getProtocol().transferFlowFiles(request.getPeer(), context, session, codec);
    }

    private int receiveFlowFiles(ProcessContext context, ProcessSession session, FlowFileCodec codec, FlowFileRequest receiveRequest) throws IOException, ProtocolException {
        return receiveRequest.getProtocol().receiveFlowFiles(receiveRequest.getPeer(), context, session, codec);
    }

    public boolean isValid() {
        return this.getConnectableType() == ConnectableType.INPUT_PORT ? !this.getConnections(Relationship.ANONYMOUS).isEmpty() : true;
    }

    public Collection<ValidationResult> getValidationErrors() {
        ArrayList<ValidationResult> validationErrors = new ArrayList<ValidationResult>();
        if (!this.isValid()) {
            ValidationResult error = new ValidationResult.Builder().explanation(String.format("Output connection for port '%s' is not defined.", this.getName())).subject(String.format("Port '%s'", this.getName())).valid(false).build();
            validationErrors.add(error);
        }
        return validationErrors;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public boolean isTransmitting() {
        if (!this.isRunning()) {
            return false;
        }
        if (this.processScheduler.getActiveThreadCount((Object)this) > 0) {
            return true;
        }
        if (this.requestQueue.isEmpty()) {
            return false;
        }
        this.requestLock.lock();
        try {
            boolean bl = !this.activeRequests.isEmpty();
            return bl;
        }
        finally {
            this.requestLock.unlock();
        }
    }

    public void setGroupAccessControl(Set<String> groups) {
        this.groupAccessControl.set(new HashSet(Objects.requireNonNull(groups)));
    }

    public Set<String> getGroupAccessControl() {
        return Collections.unmodifiableSet(this.groupAccessControl.get());
    }

    public void setUserAccessControl(Set<String> users) {
        this.userAccessControl.set(new HashSet(Objects.requireNonNull(users)));
    }

    public Set<String> getUserAccessControl() {
        return Collections.unmodifiableSet(this.userAccessControl.get());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        super.shutdown();
        this.requestLock.lock();
        try {
            this.shutdown = true;
            for (FlowFileRequest request : this.activeRequests) {
                CommunicationsSession commsSession = request.getPeer().getCommunicationsSession();
                if (commsSession == null) continue;
                commsSession.interrupt();
            }
        }
        finally {
            this.requestLock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void onSchedulingStart() {
        super.onSchedulingStart();
        this.requestLock.lock();
        try {
            this.shutdown = false;
        }
        finally {
            this.requestLock.unlock();
        }
    }

    public PortAuthorizationResult checkUserAuthorization(String dn) {
        if (!this.secure) {
            return new StandardPortAuthorizationResult(true, "Site-to-Site is not Secure");
        }
        if (dn == null) {
            String message = String.format("%s authorization failed for user %s because the DN is unknown", new Object[]{this, dn});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            return new StandardPortAuthorizationResult(false, "User DN is not known");
        }
        try {
            NiFiUser user = this.userService.checkAuthorization(dn);
            Set authorities = user.getAuthorities();
            if (!authorities.contains(Authority.ROLE_NIFI)) {
                String message = String.format("%s authorization failed for user %s because the user does not have Role NiFi", new Object[]{this, dn});
                logger.warn(message);
                this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
                return new StandardPortAuthorizationResult(false, "User does not contain required Role: NiFi");
            }
            Set<String> allowedUsers = this.userAccessControl.get();
            if (allowedUsers.contains(dn)) {
                return new StandardPortAuthorizationResult(true, "User is Authorized");
            }
            String userGroup = user.getUserGroup();
            if (userGroup == null) {
                String message = String.format("%s authorization failed for user %s because the user does not have a group and is not in the set of Allowed Users for this Port", new Object[]{this, dn});
                logger.warn(message);
                this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
                return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + this.toString());
            }
            Set<String> allowedGroups = this.groupAccessControl.get();
            boolean allowed = allowedGroups.contains(userGroup);
            if (!allowed) {
                String message = String.format("%s authorization failed for user %s because the user is not in the set of Allowed Users, and the user's group is not in the set of Allowed Groups for this Port", new Object[]{this, dn});
                logger.warn(message);
                this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
                return new StandardPortAuthorizationResult(false, "User is not Authorized to communicate with " + this.toString());
            }
            return new StandardPortAuthorizationResult(true, "User is part of group '" + userGroup + "', which is Authorized to communicate with " + this.toString());
        }
        catch (AccountNotFoundException anfe) {
            String message = String.format("%s authorization failed for user %s because the DN is unknown", new Object[]{this, dn});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            return new StandardPortAuthorizationResult(false, "User DN is not known");
        }
        catch (AccountDisabledException ade) {
            String message = String.format("%s authorization failed for user %s because the User Status is not 'ACTIVE' but instead is 'DISABLED'", new Object[]{this, dn});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            return new StandardPortAuthorizationResult(false, "User Status is 'DISABLED' rather than 'ACTIVE'");
        }
        catch (AccountPendingException ape) {
            String message = String.format("%s authorization failed for user %s because the User Status is not 'ACTIVE' but instead is 'PENDING'", new Object[]{this, dn});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            return new StandardPortAuthorizationResult(false, "User Status is 'PENDING' rather than 'ACTIVE'");
        }
        catch (AdministrationException ae) {
            String message = String.format("%s authorization failed for user %s because ", new Object[]{this, dn, ae});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            return new StandardPortAuthorizationResult(false, "Authorization failed because " + (Object)((Object)ae));
        }
        catch (Exception e) {
            String message = String.format("%s authorization failed for user %s because ", new Object[]{this, dn, e});
            logger.warn(message);
            this.eventReporter.reportEvent(Severity.WARNING, CATEGORY, message);
            return new StandardPortAuthorizationResult(false, "Authorization failed because " + e);
        }
    }

    public int receiveFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
        if (this.getConnectableType() != ConnectableType.INPUT_PORT) {
            throw new IllegalStateException("Cannot receive FlowFiles because this port is not an Input Port");
        }
        if (!this.isRunning()) {
            throw new IllegalStateException("Port not running");
        }
        try {
            FlowFileRequest request = new FlowFileRequest(peer, serverProtocol);
            if (!this.requestQueue.offer(request)) {
                throw new RequestExpiredException();
            }
            this.scheduler.registerEvent((Connectable)this);
            ProcessingResult result = null;
            while (!request.isBeingServiced()) {
                if (request.isExpired()) {
                    throw new SocketTimeoutException("Read timed out");
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {}
            }
            result = request.getResponseQueue().take();
            Exception problem = result.getProblem();
            if (problem == null) {
                return result.getFileCount();
            }
            throw problem;
        }
        catch (BadRequestException | NotAuthorizedException | RequestExpiredException e) {
            throw e;
        }
        catch (ProtocolException e) {
            throw new BadRequestException((Throwable)e);
        }
        catch (Exception e) {
            throw new ProcessException((Throwable)e);
        }
    }

    public int transferFlowFiles(Peer peer, ServerProtocol serverProtocol, Map<String, String> requestHeaders) throws NotAuthorizedException, BadRequestException, RequestExpiredException {
        if (this.getConnectableType() != ConnectableType.OUTPUT_PORT) {
            throw new IllegalStateException("Cannot send FlowFiles because this port is not an Output Port");
        }
        if (!this.isRunning()) {
            throw new IllegalStateException("Port not running");
        }
        try {
            FlowFileRequest request = new FlowFileRequest(peer, serverProtocol);
            if (!this.requestQueue.offer(request)) {
                throw new RequestExpiredException();
            }
            this.scheduler.registerEvent((Connectable)this);
            ProcessingResult result = null;
            while (!request.isBeingServiced()) {
                if (request.isExpired()) {
                    throw new SocketTimeoutException("Read timed out");
                }
                try {
                    Thread.sleep(100L);
                }
                catch (InterruptedException e) {}
            }
            result = request.getResponseQueue().take();
            Exception problem = result.getProblem();
            if (problem == null) {
                return result.getFileCount();
            }
            throw problem;
        }
        catch (BadRequestException | NotAuthorizedException | RequestExpiredException e) {
            throw e;
        }
        catch (ProtocolException e) {
            throw new BadRequestException((Throwable)e);
        }
        catch (Exception e) {
            throw new ProcessException((Throwable)e);
        }
    }

    public SchedulingStrategy getSchedulingStrategy() {
        return SchedulingStrategy.TIMER_DRIVEN;
    }

    public boolean isSideEffectFree() {
        return false;
    }

    private static class FlowFileRequest {
        private final Peer peer;
        private final ServerProtocol protocol;
        private final BlockingQueue<ProcessingResult> queue;
        private final long creationTime;
        private final AtomicBoolean beingServiced = new AtomicBoolean(false);

        public FlowFileRequest(Peer peer, ServerProtocol protocol) {
            this.creationTime = System.currentTimeMillis();
            this.peer = peer;
            this.protocol = protocol;
            this.queue = new ArrayBlockingQueue<ProcessingResult>(1);
        }

        public void setServiceBegin() {
            this.beingServiced.set(true);
        }

        public boolean isBeingServiced() {
            return this.beingServiced.get();
        }

        public BlockingQueue<ProcessingResult> getResponseQueue() {
            return this.queue;
        }

        public Peer getPeer() {
            return this.peer;
        }

        public ServerProtocol getProtocol() {
            return this.protocol;
        }

        public boolean isExpired() {
            long expiration = this.protocol.getRequestExpiration() * 2L;
            if (expiration <= 0L) {
                return false;
            }
            if (expiration < 500L) {
                expiration = 500L;
            }
            return System.currentTimeMillis() > this.creationTime + expiration;
        }
    }

    private static class ProcessingResult {
        private final int fileCount;
        private final Exception problem;

        public ProcessingResult(int fileCount) {
            this.fileCount = fileCount;
            this.problem = null;
        }

        public ProcessingResult(Exception problem) {
            this.fileCount = 0;
            this.problem = problem;
        }

        public Exception getProblem() {
            return this.problem;
        }

        public int getFileCount() {
            return this.fileCount;
        }
    }

    public static class StandardPortAuthorizationResult
    implements PortAuthorizationResult {
        private final boolean isAuthorized;
        private final String explanation;

        public StandardPortAuthorizationResult(boolean isAuthorized, String explanation) {
            this.isAuthorized = isAuthorized;
            this.explanation = explanation;
        }

        public boolean isAuthorized() {
            return this.isAuthorized;
        }

        public String getExplanation() {
            return this.explanation;
        }
    }
}

