/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.lifecycle.validation;

import java.util.ArrayList;
import java.util.BitSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.flink.runtime.OperatorIDPair;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.lifecycle.TestJobWithDescription;
import org.apache.flink.runtime.operators.lifecycle.command.TestCommand;
import org.apache.flink.runtime.operators.lifecycle.event.InputEndedEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestCommandAckEvent;
import org.apache.flink.runtime.operators.lifecycle.event.TestEvent;
import org.apache.flink.runtime.operators.lifecycle.event.WatermarkReceivedEvent;
import org.apache.flink.runtime.operators.lifecycle.validation.TestOperatorLifecycleValidator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.junit.Assert;

public class DrainingValidator
implements TestOperatorLifecycleValidator {
    @Override
    public void validateOperatorLifecycle(TestJobWithDescription job, String operatorId, int subtaskIndex, List<TestEvent> operatorEvents) {
        HashMap<Integer, List> byAttempt = new HashMap<Integer, List>();
        HashSet<Integer> normallyFinishedAttempts = new HashSet<Integer>();
        int lastAttempt = Integer.MIN_VALUE;
        for (TestEvent testEvent : operatorEvents) {
            byAttempt.computeIfAbsent(testEvent.attemptNumber, ign -> new ArrayList()).add(testEvent);
            if (this.isFinishAck(testEvent)) {
                normallyFinishedAttempts.add(testEvent.attemptNumber);
            }
            lastAttempt = Math.max(lastAttempt, testEvent.attemptNumber);
        }
        for (Map.Entry entry : byAttempt.entrySet()) {
            if (lastAttempt != (Integer)entry.getKey() && !normallyFinishedAttempts.contains(entry.getKey())) continue;
            this.validateSubtaskAttempt(job, operatorId, subtaskIndex, (List)entry.getValue());
        }
    }

    private void validateSubtaskAttempt(TestJobWithDescription job, String operatorId, int subtaskIndex, List<TestEvent> operatorEvents) {
        BitSet endedInputs = new BitSet();
        BitSet inputsWithMaxWatermark = new BitSet();
        for (TestEvent ev : operatorEvents) {
            TestEvent w;
            if (ev instanceof WatermarkReceivedEvent) {
                w = (WatermarkReceivedEvent)ev;
                if (w.ts != Watermark.MAX_WATERMARK.getTimestamp()) continue;
                Assert.assertFalse((String)String.format("Max Watermark received twice by %s/%d/%d", w.operatorId, w.subtaskIndex, w.inputId), (boolean)inputsWithMaxWatermark.get(w.inputId));
                inputsWithMaxWatermark.set(w.inputId);
                continue;
            }
            if (!(ev instanceof InputEndedEvent)) continue;
            w = (InputEndedEvent)ev;
            Assert.assertTrue((String)String.format("Input %d ended before receiving max watermark by %s[%d]#%d", ((InputEndedEvent)w).inputId, operatorId, subtaskIndex, ((InputEndedEvent)w).attemptNumber), (boolean)inputsWithMaxWatermark.get(((InputEndedEvent)w).inputId));
            Assert.assertFalse((boolean)endedInputs.get(((InputEndedEvent)w).inputId));
            endedInputs.set(((InputEndedEvent)w).inputId);
        }
        Assert.assertEquals((String)String.format("Incorrect number of ended inputs for %s[%d]", operatorId, subtaskIndex), (long)DrainingValidator.getNumInputs(job, operatorId), (long)endedInputs.cardinality());
    }

    private boolean isFinishAck(TestEvent ev) {
        return ev instanceof TestCommandAckEvent && ((TestCommandAckEvent)ev).getCommand() == TestCommand.FINISH_SOURCES;
    }

    private static int getNumInputs(TestJobWithDescription testJob, String operator) {
        Integer explicitNumInputs = testJob.operatorsNumberOfInputs.get(operator);
        if (explicitNumInputs != null) {
            return explicitNumInputs;
        }
        Iterable vertices = testJob.jobGraph.getVertices();
        for (JobVertex vertex : vertices) {
            for (OperatorIDPair p : vertex.getOperatorIDs()) {
                OperatorID operatorID = p.getUserDefinedOperatorID().orElse(p.getGeneratedOperatorID());
                if (!operatorID.toString().equals(operator)) continue;
                return vertex.getNumberOfInputs();
            }
        }
        throw new NoSuchElementException(operator);
    }
}

