/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketState;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class Bucket<IN, BucketID> {
    private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
    private final BucketID bucketId;
    private final Path bucketPath;
    private final int subtaskIndex;
    private final BucketWriter<IN, BucketID> bucketWriter;
    private final RollingPolicy<IN, BucketID> rollingPolicy;
    private final NavigableMap<Long, InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverablesPerCheckpoint;
    private final NavigableMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint;
    private final OutputFileConfig outputFileConfig;
    @Nullable
    private final FileLifeCycleListener<BucketID> fileListener;
    private long partCounter;
    @Nullable
    private InProgressFileWriter<IN, BucketID> inProgressPart;
    private List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverablesForCurrentCheckpoint;

    private Bucket(int subtaskIndex, BucketID bucketId, Path bucketPath, long initialPartCounter, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, @Nullable FileLifeCycleListener<BucketID> fileListener, OutputFileConfig outputFileConfig) {
        this.subtaskIndex = subtaskIndex;
        this.bucketId = Preconditions.checkNotNull(bucketId);
        this.bucketPath = (Path)Preconditions.checkNotNull((Object)bucketPath);
        this.partCounter = initialPartCounter;
        this.bucketWriter = (BucketWriter)Preconditions.checkNotNull(bucketWriter);
        this.rollingPolicy = (RollingPolicy)Preconditions.checkNotNull(rollingPolicy);
        this.fileListener = fileListener;
        this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<InProgressFileWriter.PendingFileRecoverable>();
        this.pendingFileRecoverablesPerCheckpoint = new TreeMap<Long, List<InProgressFileWriter.PendingFileRecoverable>>();
        this.inProgressFileRecoverablesPerCheckpoint = new TreeMap<Long, InProgressFileWriter.InProgressFileRecoverable>();
        this.outputFileConfig = (OutputFileConfig)Preconditions.checkNotNull((Object)outputFileConfig);
    }

    private Bucket(int subtaskIndex, long initialPartCounter, BucketWriter<IN, BucketID> partFileFactory, RollingPolicy<IN, BucketID> rollingPolicy, BucketState<BucketID> bucketState, @Nullable FileLifeCycleListener<BucketID> fileListener, OutputFileConfig outputFileConfig) throws IOException {
        this(subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory, rollingPolicy, fileListener, outputFileConfig);
        super.restoreInProgressFile(bucketState);
        super.commitRecoveredPendingFiles(bucketState);
    }

    private void restoreInProgressFile(BucketState<BucketID> state) throws IOException {
        if (!state.hasInProgressFileRecoverable()) {
            return;
        }
        InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
        if (this.bucketWriter.getProperties().supportsResume()) {
            this.inProgressPart = this.bucketWriter.resumeInProgressFileFrom(this.bucketId, inProgressFileRecoverable, state.getInProgressFileCreationTime());
        } else {
            this.bucketWriter.recoverPendingFile((InProgressFileWriter.PendingFileRecoverable)inProgressFileRecoverable).commitAfterRecovery();
        }
    }

    private void commitRecoveredPendingFiles(BucketState<BucketID> state) throws IOException {
        for (List<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables : state.getPendingFileRecoverablesPerCheckpoint().values()) {
            for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : pendingFileRecoverables) {
                this.bucketWriter.recoverPendingFile(pendingFileRecoverable).commitAfterRecovery();
            }
        }
    }

    public BucketID getBucketId() {
        return this.bucketId;
    }

    public Path getBucketPath() {
        return this.bucketPath;
    }

    public long getPartCounter() {
        return this.partCounter;
    }

    boolean isActive() {
        return this.inProgressPart != null || !this.pendingFileRecoverablesForCurrentCheckpoint.isEmpty() || !this.pendingFileRecoverablesPerCheckpoint.isEmpty();
    }

    void merge(Bucket<IN, BucketID> bucket) throws IOException {
        Preconditions.checkNotNull(bucket);
        Preconditions.checkState((boolean)Objects.equals(bucket.bucketPath, this.bucketPath));
        Preconditions.checkState((boolean)bucket.pendingFileRecoverablesForCurrentCheckpoint.isEmpty());
        Preconditions.checkState((boolean)bucket.pendingFileRecoverablesPerCheckpoint.isEmpty());
        InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = super.closePartFile();
        if (pendingFileRecoverable != null) {
            this.pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} merging buckets for bucket id={}", (Object)this.subtaskIndex, this.bucketId);
        }
    }

    void write(IN element, long currentTime) throws IOException {
        if (this.inProgressPart == null || this.rollingPolicy.shouldRollOnEvent(this.inProgressPart, element)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", new Object[]{this.subtaskIndex, this.bucketId, element});
            }
            this.inProgressPart = this.rollPartFile(currentTime);
        }
        this.inProgressPart.write(element, currentTime);
    }

    private InProgressFileWriter<IN, BucketID> rollPartFile(long currentTime) throws IOException {
        this.closePartFile();
        Path partFilePath = this.assembleNewPartPath();
        if (this.fileListener != null) {
            this.fileListener.onPartFileOpened(this.bucketId, partFilePath);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.", new Object[]{this.subtaskIndex, partFilePath.getName(), this.bucketId});
        }
        return this.bucketWriter.openNewInProgressFile(this.bucketId, partFilePath, currentTime);
    }

    private Path assembleNewPartPath() {
        long currentPartCounter = this.partCounter++;
        return new Path(this.bucketPath, this.outputFileConfig.getPartPrefix() + '-' + this.subtaskIndex + '-' + currentPartCounter + this.outputFileConfig.getPartSuffix());
    }

    private InProgressFileWriter.PendingFileRecoverable closePartFile() throws IOException {
        InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable = null;
        if (this.inProgressPart != null) {
            pendingFileRecoverable = this.inProgressPart.closeForCommit();
            this.pendingFileRecoverablesForCurrentCheckpoint.add(pendingFileRecoverable);
            this.inProgressPart = null;
        }
        return pendingFileRecoverable;
    }

    void disposePartFile() {
        if (this.inProgressPart != null) {
            this.inProgressPart.dispose();
        }
    }

    BucketState<BucketID> onReceptionOfCheckpoint(long checkpointId) throws IOException {
        this.prepareBucketForCheckpointing(checkpointId);
        InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = null;
        long inProgressFileCreationTime = Long.MAX_VALUE;
        if (this.inProgressPart != null) {
            inProgressFileRecoverable = this.inProgressPart.persist();
            inProgressFileCreationTime = this.inProgressPart.getCreationTime();
            this.inProgressFileRecoverablesPerCheckpoint.put(checkpointId, inProgressFileRecoverable);
        }
        return new BucketState<BucketID>(this.bucketId, this.bucketPath, inProgressFileCreationTime, inProgressFileRecoverable, this.pendingFileRecoverablesPerCheckpoint);
    }

    private void prepareBucketForCheckpointing(long checkpointId) throws IOException {
        if (this.inProgressPart != null && this.rollingPolicy.shouldRollOnCheckpoint(this.inProgressPart)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", (Object)this.subtaskIndex, this.bucketId);
            }
            this.closePartFile();
        }
        if (!this.pendingFileRecoverablesForCurrentCheckpoint.isEmpty()) {
            this.pendingFileRecoverablesPerCheckpoint.put(checkpointId, this.pendingFileRecoverablesForCurrentCheckpoint);
            this.pendingFileRecoverablesForCurrentCheckpoint = new ArrayList<InProgressFileWriter.PendingFileRecoverable>();
        }
    }

    void onSuccessfulCompletionOfCheckpoint(long checkpointId) throws IOException {
        Preconditions.checkNotNull(this.bucketWriter);
        Iterator it = this.pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true).entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            for (InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable : (List)entry.getValue()) {
                this.bucketWriter.recoverPendingFile(pendingFileRecoverable).commit();
            }
            it.remove();
        }
        this.cleanupInProgressFileRecoverables(checkpointId);
    }

    private void cleanupInProgressFileRecoverables(long checkpointId) throws IOException {
        Iterator it = this.inProgressFileRecoverablesPerCheckpoint.headMap(checkpointId, false).entrySet().iterator();
        while (it.hasNext()) {
            InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = (InProgressFileWriter.InProgressFileRecoverable)it.next().getValue();
            boolean successfullyDeleted = this.bucketWriter.cleanupInProgressFileRecoverable(inProgressFileRecoverable);
            if (LOG.isDebugEnabled() && successfullyDeleted) {
                LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", (Object)this.subtaskIndex, this.bucketId);
            }
            it.remove();
        }
    }

    void onProcessingTime(long timestamp) throws IOException {
        if (this.inProgressPart != null && this.rollingPolicy.shouldRollOnProcessingTime(this.inProgressPart, timestamp)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy (in-progress file created @ {}, last updated @ {} and current time is {}).", new Object[]{this.subtaskIndex, this.bucketId, this.inProgressPart.getCreationTime(), this.inProgressPart.getLastUpdateTime(), timestamp});
            }
            this.closePartFile();
        }
    }

    @VisibleForTesting
    Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> getPendingFileRecoverablesPerCheckpoint() {
        return this.pendingFileRecoverablesPerCheckpoint;
    }

    @Nullable
    @VisibleForTesting
    InProgressFileWriter<IN, BucketID> getInProgressPart() {
        return this.inProgressPart;
    }

    @VisibleForTesting
    List<InProgressFileWriter.PendingFileRecoverable> getPendingFileRecoverablesForCurrentCheckpoint() {
        return this.pendingFileRecoverablesForCurrentCheckpoint;
    }

    static <IN, BucketID> Bucket<IN, BucketID> getNew(int subtaskIndex, BucketID bucketId, Path bucketPath, long initialPartCounter, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, @Nullable FileLifeCycleListener<BucketID> fileListener, OutputFileConfig outputFileConfig) {
        return new Bucket<IN, BucketID>(subtaskIndex, bucketId, bucketPath, initialPartCounter, bucketWriter, rollingPolicy, fileListener, outputFileConfig);
    }

    static <IN, BucketID> Bucket<IN, BucketID> restore(int subtaskIndex, long initialPartCounter, BucketWriter<IN, BucketID> bucketWriter, RollingPolicy<IN, BucketID> rollingPolicy, BucketState<BucketID> bucketState, @Nullable FileLifeCycleListener<BucketID> fileListener, OutputFileConfig outputFileConfig) throws IOException {
        return new Bucket<IN, BucketID>(subtaskIndex, initialPartCounter, bucketWriter, rollingPolicy, bucketState, fileListener, outputFileConfig);
    }
}

