/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Preconditions;

public class CancellingIntegerSource
extends RichSourceFunction<Integer>
implements CheckpointedFunction,
CheckpointListener {
    private final int count;
    private final Integer cancelAfter;
    @Nullable
    private transient Long cancelAfterCheckpointId;
    private volatile transient boolean isCanceled;
    private volatile transient int sentCount;
    private transient ListState<Integer> lastSentStored;

    private CancellingIntegerSource(int count, @Nullable Integer cancelAfter) {
        Preconditions.checkArgument((count > 0 ? 1 : 0) != 0);
        Preconditions.checkArgument((cancelAfter == null || cancelAfter > 0 ? 1 : 0) != 0);
        this.cancelAfter = cancelAfter;
        this.count = count;
    }

    public void run(SourceFunction.SourceContext<Integer> ctx) throws InterruptedException {
        this.emitInLoop(ctx);
        this.awaitCancellation();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void emitInLoop(SourceFunction.SourceContext<Integer> ctx) throws InterruptedException {
        while (this.sentCount < this.count && !this.isCanceled) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                if (this.sentCount < this.count && !this.isCanceled) {
                    ctx.collect((Object)this.sentCount++);
                }
            }
            Thread.sleep(10L);
        }
    }

    private void awaitCancellation() {
        while (!this.isCanceled) {
            try {
                Thread.sleep(50L);
            }
            catch (InterruptedException e) {
                if (!this.isCanceled) continue;
                Thread.currentThread().interrupt();
            }
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
        this.lastSentStored = context.getOperatorStateStore().getListState(new ListStateDescriptor("counter", Integer.class));
        if (context.isRestored()) {
            this.sentCount = (Integer)Iterables.getOnlyElement((Iterable)((Iterable)this.lastSentStored.get()));
        }
        Preconditions.checkState((this.cancelAfter == null || this.sentCount < this.cancelAfter ? 1 : 0) != 0);
    }

    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.lastSentStored.update(Collections.singletonList(this.sentCount));
        if (this.cancelAfter != null && this.cancelAfter <= this.sentCount && this.cancelAfterCheckpointId == null) {
            this.cancelAfterCheckpointId = context.getCheckpointId();
        }
    }

    public void notifyCheckpointComplete(long checkpointId) {
        if (this.cancelAfterCheckpointId != null && this.cancelAfterCheckpointId <= checkpointId) {
            this.cancel();
        }
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public void cancel() {
        this.isCanceled = true;
    }

    public static CancellingIntegerSource upTo(int max, boolean continueAfterCount) {
        return new CancellingIntegerSource(max, continueAfterCount ? null : Integer.valueOf(max));
    }
}

