/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.io;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.streaming.runtime.io.MultipleInputSelectionHandler;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.io.StreamOneInputProcessor;
import org.apache.flink.util.ExceptionUtils;

@Internal
public final class StreamMultipleInputProcessor
implements StreamInputProcessor {
    private final MultipleInputSelectionHandler inputSelectionHandler;
    private final StreamOneInputProcessor<?>[] inputProcessors;
    private final MultipleInputAvailabilityHelper availabilityHelper;
    private int lastReadInputIndex = 1;
    private boolean isPrepared;

    public StreamMultipleInputProcessor(MultipleInputSelectionHandler inputSelectionHandler, StreamOneInputProcessor<?>[] inputProcessors) {
        this.inputSelectionHandler = inputSelectionHandler;
        this.inputProcessors = inputProcessors;
        this.availabilityHelper = new MultipleInputAvailabilityHelper(inputProcessors.length);
    }

    public CompletableFuture<?> getAvailableFuture() {
        if (this.inputSelectionHandler.isAnyInputAvailable() || this.inputSelectionHandler.areAllInputsFinished()) {
            return AVAILABLE;
        }
        this.availabilityHelper.resetToUnAvailable();
        for (int i = 0; i < this.inputProcessors.length; ++i) {
            if (this.inputSelectionHandler.isInputFinished(i) || !this.inputSelectionHandler.isInputSelected(i)) continue;
            this.availabilityHelper.anyOf(i, this.inputProcessors[i].getAvailableFuture());
        }
        return this.availabilityHelper.getAvailableFuture();
    }

    @Override
    public InputStatus processInput() throws Exception {
        int readingInputIndex = this.isPrepared ? this.selectNextReadingInputIndex() : this.selectFirstReadingInputIndex();
        if (readingInputIndex == -1) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        this.lastReadInputIndex = readingInputIndex;
        InputStatus inputStatus = this.inputProcessors[readingInputIndex].processInput();
        this.inputSelectionHandler.nextSelection();
        return this.inputSelectionHandler.updateStatus(inputStatus, readingInputIndex);
    }

    private int selectFirstReadingInputIndex() {
        this.inputSelectionHandler.nextSelection();
        this.isPrepared = true;
        return this.selectNextReadingInputIndex();
    }

    @Override
    public void close() throws IOException {
        IOException ex = null;
        for (StreamOneInputProcessor<?> input : this.inputProcessors) {
            try {
                input.close();
            }
            catch (IOException e) {
                ex = (IOException)ExceptionUtils.firstOrSuppressed((Throwable)e, ex);
            }
        }
        if (ex != null) {
            throw ex;
        }
    }

    @Override
    public CompletableFuture<Void> prepareSnapshot(ChannelStateWriter channelStateWriter, long checkpointId) throws CheckpointException {
        CompletableFuture[] inputFutures = new CompletableFuture[this.inputProcessors.length];
        for (int index = 0; index < inputFutures.length; ++index) {
            inputFutures[index] = this.inputProcessors[index].prepareSnapshot(channelStateWriter, checkpointId);
        }
        return CompletableFuture.allOf(inputFutures);
    }

    private int selectNextReadingInputIndex() {
        int readingInputIndex;
        if (!this.inputSelectionHandler.isAnyInputAvailable()) {
            this.fullCheckAndSetAvailable();
        }
        if ((readingInputIndex = this.inputSelectionHandler.selectNextInputIndex(this.lastReadInputIndex)) == -1) {
            return -1;
        }
        if (this.inputSelectionHandler.shouldSetAvailableForAnotherInput()) {
            this.fullCheckAndSetAvailable();
        }
        return readingInputIndex;
    }

    private void fullCheckAndSetAvailable() {
        for (int i = 0; i < this.inputProcessors.length; ++i) {
            StreamOneInputProcessor<?> inputProcessor = this.inputProcessors[i];
            if (!inputProcessor.isApproximatelyAvailable() && !inputProcessor.isAvailable()) continue;
            this.inputSelectionHandler.setAvailableInput(i);
        }
    }

    private static class MultipleInputAvailabilityHelper {
        private final CompletableFuture<?>[] futuresToCombine;
        private volatile CompletableFuture<?> availableFuture = new CompletableFuture();

        public MultipleInputAvailabilityHelper(int inputSize) {
            this.futuresToCombine = new CompletableFuture[inputSize];
        }

        public CompletableFuture<?> getAvailableFuture() {
            return this.availableFuture;
        }

        public void resetToUnAvailable() {
            if (this.availableFuture.isDone()) {
                this.availableFuture = new CompletableFuture();
            }
        }

        private void notifyCompletion() {
            this.availableFuture.complete(null);
        }

        public void anyOf(int idx, CompletableFuture<?> availabilityFuture) {
            if (this.futuresToCombine[idx] == null || this.futuresToCombine[idx].isDone()) {
                this.futuresToCombine[idx] = availabilityFuture;
                FutureUtils.assertNoException((CompletableFuture)availabilityFuture.thenRun(this::notifyCompletion));
            }
        }
    }
}

