/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.ImmutableList;
import org.apache.beam.repackaged.beam_runners_direct_java.com.google.common.collect.Iterables;
import org.apache.beam.repackaged.beam_runners_direct_java.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.direct.AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard;
import org.apache.beam.runners.direct.CommittedBundle;
import org.apache.beam.runners.direct.EvaluationContext;
import org.apache.beam.runners.direct.RootInputProvider;
import org.apache.beam.runners.direct.SourceShard;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.TransformEvaluator;
import org.apache.beam.runners.direct.TransformEvaluatorFactory;
import org.apache.beam.runners.direct.TransformResult;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.runners.direct.UncommittedBundle;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

class UnboundedReadEvaluatorFactory
implements TransformEvaluatorFactory {
    private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
    private final EvaluationContext evaluationContext;
    private final PipelineOptions options;
    private final double readerReuseChance;

    UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options) {
        this(evaluationContext, options, 0.95);
    }

    @VisibleForTesting
    UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions options, double readerReuseChance) {
        this.evaluationContext = evaluationContext;
        this.options = options;
        this.readerReuseChance = readerReuseChance;
    }

    @Override
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle) {
        return this.createEvaluator(application);
    }

    private <OutputT> TransformEvaluator<?> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> application) {
        return new UnboundedReadEvaluator(application, this.evaluationContext, this.options, this.readerReuseChance);
    }

    @Override
    public void cleanup() {
    }

    static class InputProvider<T>
    implements RootInputProvider<T, UnboundedSourceShard<T, ?>, PBegin> {
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;

        InputProvider(EvaluationContext evaluationContext, PipelineOptions options) {
            this.evaluationContext = evaluationContext;
            this.options = options;
        }

        @Override
        public Collection<CommittedBundle<UnboundedSourceShard<T, ?>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> transform, int targetParallelism) throws Exception {
            UnboundedSource source = ReadTranslation.unboundedSourceFromTransform(transform);
            List splits = source.split(targetParallelism, this.options);
            UnboundedReadDeduplicator deduplicator = source.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create();
            ImmutableList.Builder initialShards = ImmutableList.builder();
            for (UnboundedSource split : splits) {
                UnboundedSourceShard shard = UnboundedSourceShard.unstarted(split, deduplicator);
                initialShards.add(this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(shard)).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return initialShards.build();
        }
    }

    @AutoValue
    static abstract class UnboundedSourceShard<T, CheckpointT extends UnboundedSource.CheckpointMark>
    implements SourceShard<T> {
        UnboundedSourceShard() {
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted(UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator) {
            return UnboundedSourceShard.of(source, deduplicator, null, null);
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> of(UnboundedSource<T, CheckpointT> source, UnboundedReadDeduplicator deduplicator, @Nullable UnboundedSource.UnboundedReader<T> reader, @Nullable CheckpointT checkpoint) {
            return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard<T, CheckpointT>(source, deduplicator, reader, checkpoint);
        }

        public abstract UnboundedSource<T, CheckpointT> getSource();

        abstract UnboundedReadDeduplicator getDeduplicator();

        @Nullable
        abstract UnboundedSource.UnboundedReader<T> getExistingReader();

        @Nullable
        abstract CheckpointT getCheckpoint();
    }

    private static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark>
    implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;
        private final double readerReuseChance;
        private final StepTransformResult.Builder resultBuilder;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> transform, EvaluationContext evaluationContext, PipelineOptions options, double readerReuseChance) {
            this.transform = transform;
            this.evaluationContext = evaluationContext;
            this.options = options;
            this.readerReuseChance = readerReuseChance;
            this.resultBuilder = StepTransformResult.withoutHold(transform);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void processElement(WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> element) throws IOException {
            block21: {
                UncommittedBundle output = this.evaluationContext.createBundle((PCollection)Iterables.getOnlyElement(this.transform.getOutputs().values()));
                UnboundedSourceShard shard = (UnboundedSourceShard)element.getValue();
                UnboundedSource.UnboundedReader<OutputT> reader = null;
                try {
                    reader = this.getReader(shard);
                    boolean elementAvailable = this.startReader(reader, shard);
                    if (elementAvailable) {
                        UnboundedReadDeduplicator deduplicator = shard.getDeduplicator();
                        int numElements = 0;
                        do {
                            if (!deduplicator.shouldOutput(reader.getCurrentRecordId())) continue;
                            output.add(WindowedValue.timestampedValueInGlobalWindow((Object)reader.getCurrent(), (Instant)reader.getCurrentTimestamp()));
                        } while (++numElements < 10 && reader.advance());
                        Instant watermark = reader.getWatermark();
                        CheckpointMarkT finishedCheckpoint = this.finishRead(reader, watermark, shard);
                        if (ThreadLocalRandom.current().nextDouble(1.0) >= this.readerReuseChance) {
                            UnboundedSource.UnboundedReader<OutputT> toClose = reader;
                            reader = null;
                            toClose.close();
                        }
                        UnboundedSourceShard residual = UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), reader, finishedCheckpoint);
                        this.resultBuilder.addOutput(output, new UncommittedBundle[0]).addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(residual, (Instant)watermark)));
                        break block21;
                    }
                    Instant watermark = reader.getWatermark();
                    if (watermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                        this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(UnboundedSourceShard.of(shard.getSource(), shard.getDeduplicator(), reader, shard.getCheckpoint()), (Instant)watermark)));
                        break block21;
                    }
                    Object checkpoint = shard.getCheckpoint();
                    IOException ioe = null;
                    try {
                        if (checkpoint != null) {
                            checkpoint.finalizeCheckpoint();
                        }
                    }
                    catch (IOException finalizeCheckpointException) {
                        ioe = finalizeCheckpointException;
                    }
                    finally {
                        try {
                            UnboundedSource.UnboundedReader<OutputT> toClose = reader;
                            reader = null;
                            toClose.close();
                        }
                        catch (IOException closeEx) {
                            if (ioe != null) {
                                ioe.addSuppressed(closeEx);
                            }
                            throw closeEx;
                        }
                    }
                    if (ioe != null) {
                        throw ioe;
                    }
                }
                catch (IOException e) {
                    if (reader != null) {
                        reader.close();
                    }
                    throw e;
                }
            }
        }

        private UnboundedSource.UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException {
            UnboundedSource.UnboundedReader<OutputT> existing = shard.getExistingReader();
            if (existing == null) {
                Object checkpoint = shard.getCheckpoint();
                if (checkpoint != null) {
                    checkpoint = (UnboundedSource.CheckpointMark)CoderUtils.clone((Coder)shard.getSource().getCheckpointMarkCoder(), checkpoint);
                }
                return shard.getSource().createReader(this.options, checkpoint);
            }
            return existing;
        }

        private boolean startReader(UnboundedSource.UnboundedReader<OutputT> reader, UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException {
            if (shard.getExistingReader() == null) {
                return reader.start();
            }
            return shard.getExistingReader().advance();
        }

        private CheckpointMarkT finishRead(UnboundedSource.UnboundedReader<OutputT> reader, Instant watermark, UnboundedSourceShard<OutputT, CheckpointMarkT> shard) throws IOException {
            CheckpointMarkT oldMark = shard.getCheckpoint();
            UnboundedSource.CheckpointMark mark = reader.getCheckpointMark();
            if (oldMark != null) {
                oldMark.finalizeCheckpoint();
            }
            if (!watermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                PCollection outputPc = (PCollection)Iterables.getOnlyElement(this.transform.getOutputs().values());
                this.evaluationContext.scheduleAfterOutputWouldBeProduced(outputPc, (BoundedWindow)GlobalWindow.INSTANCE, outputPc.getWindowingStrategy(), () -> {
                    try {
                        mark.finalizeCheckpoint();
                    }
                    catch (IOException e) {
                        throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                    }
                });
            }
            return (CheckpointMarkT)mark;
        }

        @Override
        public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle() throws IOException {
            return this.resultBuilder.build();
        }
    }
}

