/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.partitionsender;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.ops.QueryCancelledException;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionOutgoingBatch;
import org.apache.drill.exec.physical.impl.partitionsender.Partitioner;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.apache.drill.exec.testing.CountDownLatchInjection;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
import org.apache.drill.shaded.guava.com.google.common.util.concurrent.MoreExecutors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class PartitionerDecorator {
    private static final Logger logger = LoggerFactory.getLogger(PartitionerDecorator.class);
    private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(PartitionerDecorator.class);
    private final List<Partitioner> partitioners;
    private final OperatorStats stats;
    private final ExecutorService executor;
    private final FragmentContext context;
    private final Thread thread;
    private final boolean enableParallelTaskExecution;

    PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context) {
        this(partitioners, stats, context, partitioners.size() > 1);
    }

    PartitionerDecorator(List<Partitioner> partitioners, OperatorStats stats, FragmentContext context, boolean enableParallelTaskExecution) {
        this.partitioners = partitioners;
        this.stats = stats;
        this.context = context;
        this.enableParallelTaskExecution = enableParallelTaskExecution;
        this.executor = enableParallelTaskExecution ? context.getExecutor() : MoreExecutors.newDirectExecutorService();
        this.thread = Thread.currentThread();
    }

    public void partitionBatch(RecordBatch incoming) throws ExecutionException {
        this.executeMethodLogic(new PartitionBatchHandlingClass(incoming));
    }

    public void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws ExecutionException {
        this.executeMethodLogic(new FlushBatchesHandlingClass(isLastBatch, schemaChanged));
    }

    public void initialize() {
        for (Partitioner part : this.partitioners) {
            part.initialize();
        }
    }

    public void clear() {
        for (Partitioner part : this.partitioners) {
            part.clear();
        }
    }

    public PartitionOutgoingBatch getOutgoingBatches(int index) {
        for (Partitioner part : this.partitioners) {
            PartitionOutgoingBatch outBatch = part.getOutgoingBatch(index);
            if (outBatch == null) continue;
            return outBatch;
        }
        return null;
    }

    List<Partitioner> getPartitioners() {
        return this.partitioners;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    void executeMethodLogic(GeneralExecuteIface iface) throws ExecutionException {
        try (CountDownLatchInjection testCountDownLatch = injector.getLatch(this.context.getExecutionControls(), "partitioner-sender-latch");){
            testCountDownLatch.initialize(1);
            AtomicInteger count = new AtomicInteger();
            ArrayList<PartitionerTask> partitionerTasks = new ArrayList<PartitionerTask>(this.partitioners.size());
            ExecutionException executionException = null;
            this.startWait();
            try {
                this.partitioners.forEach(partitioner -> this.createAndExecute(iface, testCountDownLatch, count, (List<PartitionerTask>)partitionerTasks, (Partitioner)partitioner));
                injector.injectInterruptiblePause(this.context.getExecutionControls(), "wait-for-fragment-interrupt", logger);
                testCountDownLatch.countDown();
            }
            catch (InterruptedException e) {
                logger.warn("fragment thread interrupted", (Throwable)e);
                throw new QueryCancelledException();
            }
            catch (RejectedExecutionException e) {
                logger.warn("Failed to execute partitioner tasks. Execution service down?", (Throwable)e);
                executionException = new ExecutionException(e);
            }
            finally {
                this.await(count, partitionerTasks);
                this.stopWait();
                this.processPartitionerTasks(partitionerTasks, executionException);
            }
        }
    }

    private void createAndExecute(GeneralExecuteIface iface, CountDownLatchInjection testCountDownLatch, AtomicInteger count, List<PartitionerTask> partitionerTasks, Partitioner partitioner) {
        PartitionerTask partitionerTask = new PartitionerTask(this, iface, partitioner, count, testCountDownLatch);
        this.executor.execute(partitionerTask);
        partitionerTasks.add(partitionerTask);
        count.incrementAndGet();
    }

    private void await(AtomicInteger count, List<PartitionerTask> partitionerTasks) {
        boolean cancelled = false;
        while (count.get() > 0) {
            if (this.context.getExecutorState().shouldContinue() || cancelled) {
                LockSupport.park();
                continue;
            }
            logger.warn("Cancelling fragment {} partitioner tasks...", (Object)this.context.getFragIdString());
            partitionerTasks.forEach(partitionerTask -> partitionerTask.cancel(true));
            cancelled = true;
        }
    }

    private void startWait() {
        if (this.enableParallelTaskExecution) {
            this.stats.startWait();
        }
    }

    private void stopWait() {
        if (this.enableParallelTaskExecution) {
            this.stats.stopWait();
        }
    }

    private void processPartitionerTasks(List<PartitionerTask> partitionerTasks, ExecutionException executionException) throws ExecutionException {
        long maxProcessTime = 0L;
        for (PartitionerTask partitionerTask : partitionerTasks) {
            long currentProcessingNanos;
            ExecutionException e = partitionerTask.getException();
            if (e != null) {
                if (executionException == null) {
                    executionException = e;
                } else {
                    executionException.getCause().addSuppressed(e.getCause());
                }
            }
            if (executionException != null) continue;
            OperatorStats localStats = partitionerTask.getStats();
            maxProcessTime = this.enableParallelTaskExecution ? ((currentProcessingNanos = localStats.getProcessingNanos()) > maxProcessTime ? currentProcessingNanos : maxProcessTime) : (maxProcessTime += localStats.getWaitNanos());
            this.stats.mergeMetrics(localStats);
        }
        if (executionException != null) {
            throw executionException;
        }
        if (this.enableParallelTaskExecution) {
            this.stats.adjustWaitNanos(-maxProcessTime);
        } else {
            this.stats.adjustWaitNanos(maxProcessTime);
        }
    }

    private static class PartitionBatchHandlingClass
    implements GeneralExecuteIface {
        private final RecordBatch incoming;

        PartitionBatchHandlingClass(RecordBatch incoming) {
            this.incoming = incoming;
        }

        @Override
        public void execute(Partitioner part) throws IOException {
            part.partitionBatch(this.incoming);
        }
    }

    protected static interface GeneralExecuteIface {
        public void execute(Partitioner var1) throws IOException;
    }

    private static class FlushBatchesHandlingClass
    implements GeneralExecuteIface {
        private final boolean isLastBatch;
        private final boolean schemaChanged;

        public FlushBatchesHandlingClass(boolean isLastBatch, boolean schemaChanged) {
            this.isLastBatch = isLastBatch;
            this.schemaChanged = schemaChanged;
        }

        @Override
        public void execute(Partitioner part) throws IOException {
            part.flushOutgoingBatches(this.isLastBatch, this.schemaChanged);
        }
    }

    private static class PartitionerTask
    implements Runnable {
        private final AtomicReference<STATE> state = new AtomicReference<STATE>(STATE.NEW);
        private final AtomicReference<Thread> runner = new AtomicReference();
        private final PartitionerDecorator partitionerDecorator;
        private final AtomicInteger count;
        private final GeneralExecuteIface iface;
        private final Partitioner partitioner;
        private final CountDownLatchInjection testCountDownLatch;
        private volatile ExecutionException exception;

        public PartitionerTask(PartitionerDecorator partitionerDecorator, GeneralExecuteIface iface, Partitioner partitioner, AtomicInteger count, CountDownLatchInjection testCountDownLatch) {
            this.partitionerDecorator = partitionerDecorator;
            this.iface = iface;
            this.partitioner = partitioner;
            this.count = count;
            this.testCountDownLatch = testCountDownLatch;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Thread thread = Thread.currentThread();
            if (this.runner.compareAndSet(null, thread)) {
                String name = thread.getName();
                thread.setName(String.format("Partitioner-%s-%d", this.partitionerDecorator.thread.getName(), thread.getId()));
                OperatorStats localStats = this.partitioner.getStats();
                localStats.clear();
                localStats.startProcessing();
                ExecutionException executionException = null;
                try {
                    this.testCountDownLatch.await();
                    if (this.state.get() == STATE.NEW) {
                        this.iface.execute(this.partitioner);
                    }
                }
                catch (InterruptedException e) {
                    if (this.state.compareAndSet(STATE.NEW, STATE.INTERRUPTED)) {
                        logger.warn("Partitioner Task interrupted during the run", (Throwable)e);
                    }
                }
                catch (Throwable t) {
                    executionException = new ExecutionException(t);
                }
                finally {
                    if (this.state.compareAndSet(STATE.NEW, STATE.COMPLETING)) {
                        if (executionException == null) {
                            localStats.stopProcessing();
                            this.state.lazySet(STATE.NORMAL);
                        } else {
                            this.exception = executionException;
                            this.state.lazySet(STATE.EXCEPTIONAL);
                        }
                    }
                    if (this.count.decrementAndGet() == 0) {
                        LockSupport.unpark(this.partitionerDecorator.thread);
                    }
                    thread.setName(name);
                    while (this.state.get() == STATE.INTERRUPTING) {
                        Thread.yield();
                    }
                    Thread.interrupted();
                }
            }
        }

        void cancel(boolean mayInterruptIfRunning) {
            Preconditions.checkState(Thread.currentThread() == this.partitionerDecorator.thread, String.format("PartitionerTask can be cancelled only from the main %s thread", this.partitionerDecorator.thread.getName()));
            if (this.runner.compareAndSet(null, this.partitionerDecorator.thread)) {
                if (this.partitionerDecorator.executor instanceof ThreadPoolExecutor) {
                    ((ThreadPoolExecutor)this.partitionerDecorator.executor).remove(this);
                }
                this.count.decrementAndGet();
            } else if (mayInterruptIfRunning) {
                if (this.state.compareAndSet(STATE.NEW, STATE.INTERRUPTING)) {
                    try {
                        this.runner.get().interrupt();
                    }
                    finally {
                        this.state.lazySet(STATE.INTERRUPTED);
                    }
                }
            } else {
                this.state.compareAndSet(STATE.NEW, STATE.CANCELLED);
            }
        }

        public ExecutionException getException() {
            return this.exception;
        }

        public OperatorStats getStats() {
            return this.partitioner.getStats();
        }

        private static enum STATE {
            NEW,
            COMPLETING,
            NORMAL,
            EXCEPTIONAL,
            CANCELLED,
            INTERRUPTING,
            INTERRUPTED;

        }
    }
}

