/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.changelog.fs.RetryPolicy;
import org.apache.flink.changelog.fs.SchedulerFactory;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.util.ExceptionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RetryingExecutor
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
    private final ScheduledExecutorService timer;
    private final ScheduledExecutorService blockingExecutor;
    private final Histogram attemptsPerTaskHistogram;
    private final Histogram totalAttemptsPerTaskHistogram;

    RetryingExecutor(int nThreads, Histogram attemptsPerTaskHistogram, Histogram totalAttemptsPerTaskHistogram) {
        this(SchedulerFactory.create(1, "ChangelogRetryScheduler", LOG), SchedulerFactory.create(nThreads, "ChangelogBlockingExecutor", LOG), attemptsPerTaskHistogram, totalAttemptsPerTaskHistogram);
    }

    @VisibleForTesting
    RetryingExecutor(ScheduledExecutorService executor, Histogram attemptsPerTaskHistogram, Histogram totalAttemptsPerTaskHistogram) {
        this(executor, executor, attemptsPerTaskHistogram, totalAttemptsPerTaskHistogram);
    }

    RetryingExecutor(ScheduledExecutorService timer, ScheduledExecutorService blockingExecutor, Histogram attemptsPerTaskHistogram, Histogram totalAttemptsPerTaskHistogram) {
        this.timer = timer;
        this.blockingExecutor = blockingExecutor;
        this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
        this.totalAttemptsPerTaskHistogram = totalAttemptsPerTaskHistogram;
    }

    <T> void execute(RetryPolicy retryPolicy, RetriableAction<T> action) {
        LOG.debug("execute with retryPolicy: {}", (Object)retryPolicy);
        RetriableActionAttempt task = RetriableActionAttempt.initialize(action, retryPolicy, this.blockingExecutor, this.attemptsPerTaskHistogram, this.totalAttemptsPerTaskHistogram, this.timer);
        this.blockingExecutor.submit(task);
    }

    @Override
    public void close() throws Exception {
        LOG.debug("close");
        Exception closeException = null;
        try {
            this.timer.shutdownNow();
        }
        catch (Exception e) {
            closeException = e;
        }
        try {
            this.blockingExecutor.shutdownNow();
        }
        catch (Exception e) {
            closeException = (Exception)ExceptionUtils.firstOrSuppressed((Throwable)e, (Throwable)closeException);
        }
        if (!this.timer.awaitTermination(1L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown scheduler in 1s");
        }
        if (!this.blockingExecutor.awaitTermination(1L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown blockingExecutor in 1s");
        }
        if (closeException != null) {
            throw closeException;
        }
    }

    private static final class RetriableActionAttempt<Result>
    implements Runnable {
        private final RetriableAction<Result> action;
        private final ScheduledExecutorService blockingExecutor;
        private final ScheduledExecutorService timer;
        private final int attemptNumber;
        private final RetryPolicy retryPolicy;
        private final AtomicBoolean actionCompleted;
        private final AtomicBoolean attemptCompleted;
        private final AtomicInteger activeAttempts;
        private final AtomicInteger totalAttempts;
        private final Histogram attemptsPerTaskHistogram;
        private final Histogram totalAttemptsPerTaskHistogram;

        private RetriableActionAttempt(int attemptNumber, AtomicBoolean actionCompleted, RetriableAction<Result> action, RetryPolicy retryPolicy, ScheduledExecutorService blockingExecutor, ScheduledExecutorService timer, AtomicInteger activeAttempts, AtomicInteger totalAttempts, Histogram attemptsPerTaskHistogram, Histogram totalAttemptsPerTaskHistogram) {
            this.attemptNumber = attemptNumber;
            this.action = action;
            this.retryPolicy = retryPolicy;
            this.blockingExecutor = blockingExecutor;
            this.actionCompleted = actionCompleted;
            this.attemptsPerTaskHistogram = attemptsPerTaskHistogram;
            this.totalAttemptsPerTaskHistogram = totalAttemptsPerTaskHistogram;
            this.timer = timer;
            this.activeAttempts = activeAttempts;
            this.totalAttempts = totalAttempts;
            this.attemptCompleted = new AtomicBoolean(false);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            LOG.debug("starting attempt {}", (Object)this.attemptNumber);
            if (this.actionCompleted.get()) {
                return;
            }
            Optional<ScheduledFuture<?>> timeoutFuture = this.scheduleTimeout();
            try {
                Result result = this.action.tryExecute();
                if (this.actionCompleted.compareAndSet(false, true)) {
                    LOG.debug("succeeded with {} attempts", (Object)this.attemptNumber);
                    this.action.completeWithResult(result);
                    this.attemptsPerTaskHistogram.update((long)this.attemptNumber);
                    this.totalAttemptsPerTaskHistogram.update((long)this.totalAttempts.get());
                } else {
                    LOG.debug("discard unnecessarily uploaded state, attempt {}", (Object)this.attemptNumber);
                    try {
                        this.action.discardResult(result);
                    }
                    catch (Exception e) {
                        LOG.warn("unable to discard execution attempt result", (Throwable)e);
                    }
                }
            }
            catch (Exception e) {
                this.handleError(e);
            }
            finally {
                timeoutFuture.ifPresent(f -> f.cancel(true));
            }
        }

        private void handleError(Exception e) {
            if (!this.attemptCompleted.compareAndSet(false, true) || this.actionCompleted.get()) {
                return;
            }
            LOG.debug("execution attempt {} failed: {}", (Object)this.attemptNumber, (Object)e.getMessage());
            long nextAttemptDelay = this.retryPolicy.retryAfter(this.attemptNumber, e);
            if (nextAttemptDelay >= 0L) {
                this.activeAttempts.incrementAndGet();
                this.totalAttempts.incrementAndGet();
                this.scheduleNext(nextAttemptDelay, this.next());
            }
            if (this.activeAttempts.decrementAndGet() == 0 && this.actionCompleted.compareAndSet(false, true)) {
                LOG.info("failed with {} attempts: {}", (Object)this.attemptNumber, (Object)e.getMessage());
                this.action.handleFailure(e);
            }
        }

        private void scheduleNext(long nextAttemptDelay, RetriableActionAttempt<Result> next) {
            if (nextAttemptDelay == 0L) {
                this.blockingExecutor.submit(next);
            } else if (nextAttemptDelay > 0L) {
                this.blockingExecutor.schedule(next, nextAttemptDelay, TimeUnit.MILLISECONDS);
            }
        }

        private static <T> RetriableActionAttempt<T> initialize(RetriableAction<T> runnable, RetryPolicy retryPolicy, ScheduledExecutorService blockingExecutor, Histogram attemptsPerTaskHistogram, Histogram totalAttemptsPerTaskHistogram, ScheduledExecutorService timer) {
            return new RetriableActionAttempt<T>(1, new AtomicBoolean(false), runnable, retryPolicy, blockingExecutor, timer, new AtomicInteger(1), new AtomicInteger(1), attemptsPerTaskHistogram, totalAttemptsPerTaskHistogram);
        }

        private RetriableActionAttempt<Result> next() {
            return new RetriableActionAttempt<Result>(this.attemptNumber + 1, this.actionCompleted, this.action, this.retryPolicy, this.blockingExecutor, this.timer, this.activeAttempts, this.totalAttempts, this.attemptsPerTaskHistogram, this.totalAttemptsPerTaskHistogram);
        }

        private Optional<ScheduledFuture<?>> scheduleTimeout() {
            long timeout = this.retryPolicy.timeoutFor(this.attemptNumber);
            return timeout <= 0L ? Optional.empty() : Optional.of(this.timer.schedule(() -> this.handleError(this.fmtError(timeout)), timeout, TimeUnit.MILLISECONDS));
        }

        private TimeoutException fmtError(long timeout) {
            return new TimeoutException(String.format("Attempt %d timed out after %dms", this.attemptNumber, timeout));
        }
    }

    static interface RetriableAction<Result> {
        public Result tryExecute() throws Exception;

        public void completeWithResult(Result var1);

        public void discardResult(Result var1) throws Exception;

        public void handleFailure(Throwable var1);
    }
}

