/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.base.source.reader;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.source.ReaderOutput;
import org.apache.flink.api.connector.source.SourceEvent;
import org.apache.flink.api.connector.source.SourceOutput;
import org.apache.flink.api.connector.source.SourceReader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.connector.source.SourceSplit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.base.source.reader.RecordEmitter;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.SourceReaderOptions;
import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
import org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceReaderBase.class);
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private final Map<String, SplitContext<T, SplitStateT>> splitStates;
    protected final RecordEmitter<E, T, SplitStateT> recordEmitter;
    protected final SplitFetcherManager<E, SplitT> splitFetcherManager;
    protected final SourceReaderOptions options;
    protected final Configuration config;
    protected SourceReaderContext context;
    @Nullable
    private RecordsWithSplitIds<E> currentFetch;
    @Nullable
    private SplitContext<T, SplitStateT> currentSplitContext;
    @Nullable
    private SourceOutput<T> currentSplitOutput;
    private boolean noMoreSplitsAssignment;

    public SourceReaderBase(FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) {
        this.elementsQueue = elementsQueue;
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<String, SplitContext<T, SplitStateT>>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
    }

    public void start() {
    }

    public InputStatus pollNext(ReaderOutput<T> output) throws Exception {
        RecordsWithSplitIds<E> recordsWithSplitId = this.currentFetch;
        if (recordsWithSplitId == null && (recordsWithSplitId = this.getNextFetch(output)) == null) {
            return this.trace(this.finishedOrAvailableLater());
        }
        do {
            E record;
            if ((record = recordsWithSplitId.nextRecordFromSplit()) == null) continue;
            this.recordEmitter.emitRecord(record, this.currentSplitOutput, this.currentSplitContext.state);
            LOG.trace("Emitted record: {}", record);
            return this.trace(InputStatus.MORE_AVAILABLE);
        } while (this.moveToNextSplit(recordsWithSplitId, output));
        return this.pollNext(output);
    }

    private InputStatus trace(InputStatus status) {
        LOG.trace("Source reader status: {}", (Object)status);
        return status;
    }

    @Nullable
    private RecordsWithSplitIds<E> getNextFetch(ReaderOutput<T> output) {
        this.splitFetcherManager.checkErrors();
        LOG.trace("Getting next source data batch from queue");
        RecordsWithSplitIds<E> recordsWithSplitId = this.elementsQueue.poll();
        if (recordsWithSplitId == null || !this.moveToNextSplit(recordsWithSplitId, output)) {
            return null;
        }
        this.currentFetch = recordsWithSplitId;
        return recordsWithSplitId;
    }

    private void finishCurrentFetch(RecordsWithSplitIds<E> fetch, ReaderOutput<T> output) {
        this.currentFetch = null;
        this.currentSplitContext = null;
        this.currentSplitOutput = null;
        Set<String> finishedSplits = fetch.finishedSplits();
        if (!finishedSplits.isEmpty()) {
            LOG.info("Finished reading split(s) {}", finishedSplits);
            HashMap stateOfFinishedSplits = new HashMap();
            for (String finishedSplitId : finishedSplits) {
                stateOfFinishedSplits.put(finishedSplitId, this.splitStates.remove((Object)finishedSplitId).state);
                output.releaseOutputForSplit(finishedSplitId);
            }
            this.onSplitFinished(stateOfFinishedSplits);
        }
        fetch.recycle();
    }

    private boolean moveToNextSplit(RecordsWithSplitIds<E> recordsWithSplitIds, ReaderOutput<T> output) {
        String nextSplitId = recordsWithSplitIds.nextSplit();
        if (nextSplitId == null) {
            LOG.trace("Current fetch is finished.");
            this.finishCurrentFetch(recordsWithSplitIds, output);
            return false;
        }
        this.currentSplitContext = this.splitStates.get(nextSplitId);
        Preconditions.checkState((this.currentSplitContext != null ? 1 : 0) != 0, (Object)"Have records for a split that was not registered");
        this.currentSplitOutput = this.currentSplitContext.getOrCreateSplitOutput(output);
        LOG.trace("Emitting records from fetch for split {}", (Object)nextSplitId);
        return true;
    }

    public CompletableFuture<Void> isAvailable() {
        return this.currentFetch != null ? FutureCompletingBlockingQueue.AVAILABLE : this.elementsQueue.getAvailabilityFuture();
    }

    public List<SplitT> snapshotState(long checkpointId) {
        ArrayList splits = new ArrayList();
        this.splitStates.forEach((id, context) -> splits.add(this.toSplitType((String)id, context.state)));
        return splits;
    }

    public void addSplits(List<SplitT> splits) {
        LOG.info("Adding split(s) to reader: {}", splits);
        splits.forEach(s -> this.splitStates.put(s.splitId(), new SplitContext(s.splitId(), this.initializedState(s))));
        this.splitFetcherManager.addSplits(splits);
    }

    public void notifyNoMoreSplits() {
        LOG.info("Reader received NoMoreSplits event.");
        this.noMoreSplitsAssignment = true;
        this.elementsQueue.notifyAvailable();
    }

    public void handleSourceEvents(SourceEvent sourceEvent) {
        LOG.info("Received unhandled source event: {}", (Object)sourceEvent);
    }

    public void close() throws Exception {
        LOG.info("Closing Source Reader.");
        this.splitFetcherManager.close(this.options.sourceReaderCloseTimeout);
    }

    public int getNumberOfCurrentlyAssignedSplits() {
        return this.splitStates.size();
    }

    protected abstract void onSplitFinished(Map<String, SplitStateT> var1);

    protected abstract SplitStateT initializedState(SplitT var1);

    protected abstract SplitT toSplitType(String var1, SplitStateT var2);

    private InputStatus finishedOrAvailableLater() {
        boolean allFetchersHaveShutdown = this.splitFetcherManager.maybeShutdownFinishedFetchers();
        if (!this.noMoreSplitsAssignment || !allFetchersHaveShutdown) {
            return InputStatus.NOTHING_AVAILABLE;
        }
        if (this.elementsQueue.isEmpty()) {
            this.splitFetcherManager.checkErrors();
            return InputStatus.END_OF_INPUT;
        }
        return InputStatus.MORE_AVAILABLE;
    }

    private static final class SplitContext<T, SplitStateT> {
        final String splitId;
        final SplitStateT state;
        SourceOutput<T> sourceOutput;

        private SplitContext(String splitId, SplitStateT state) {
            this.state = state;
            this.splitId = splitId;
        }

        SourceOutput<T> getOrCreateSplitOutput(ReaderOutput<T> mainOutput) {
            if (this.sourceOutput == null) {
                this.sourceOutput = mainOutput.createOutputForSplit(this.splitId);
            }
            return this.sourceOutput;
        }
    }
}

