/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.UnconsumedReads;
import org.apache.beam.runners.core.construction.WriteFilesTranslation;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineTranslator;
import org.apache.beam.runners.flink.FlinkStreamingTransformTranslators;
import org.apache.beam.runners.flink.FlinkStreamingTranslationContext;
import org.apache.beam.runners.flink.translation.wrappers.streaming.FlinkKeyUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.ShardedKeyCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.ShardingFunction;
import org.apache.beam.sdk.io.WriteFiles;
import org.apache.beam.sdk.io.WriteFilesResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformMatcher;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
import org.apache.beam.sdk.runners.TransformHierarchy;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.ShardedKey;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlinkStreamingPipelineTranslator
extends FlinkPipelineTranslator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkStreamingPipelineTranslator.class);
    private final FlinkStreamingTranslationContext streamingContext;
    private int depth = 0;

    public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) {
        this.streamingContext = new FlinkStreamingTranslationContext(env, options);
    }

    @Override
    public void translate(Pipeline pipeline) {
        UnconsumedReads.ensureAllReadsConsumed((Pipeline)pipeline);
        super.translate(pipeline);
    }

    public Pipeline.PipelineVisitor.CompositeBehavior enterCompositeTransform(TransformHierarchy.Node node) {
        StreamTransformTranslator<?> translator;
        LOG.info("{} enterCompositeTransform- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
        ++this.depth;
        PTransform transform = node.getTransform();
        if (transform != null && (translator = FlinkStreamingTransformTranslators.getTranslator(transform)) != null && this.applyCanTranslate(transform, node, translator)) {
            this.applyStreamingTransform(transform, node, translator);
            LOG.info("{} translated- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
            return Pipeline.PipelineVisitor.CompositeBehavior.DO_NOT_ENTER_TRANSFORM;
        }
        return Pipeline.PipelineVisitor.CompositeBehavior.ENTER_TRANSFORM;
    }

    public void leaveCompositeTransform(TransformHierarchy.Node node) {
        --this.depth;
        LOG.info("{} leaveCompositeTransform- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
    }

    public void visitPrimitiveTransform(TransformHierarchy.Node node) {
        LOG.info("{} visitPrimitiveTransform- {}", (Object)FlinkStreamingPipelineTranslator.genSpaces(this.depth), (Object)node.getFullName());
        PTransform transform = node.getTransform();
        StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform);
        if (translator == null || !this.applyCanTranslate(transform, node, translator)) {
            String transformUrn = PTransformTranslation.urnForTransform((PTransform)transform);
            LOG.info(transformUrn);
            throw new UnsupportedOperationException("The transform " + transformUrn + " is currently not supported.");
        }
        this.applyStreamingTransform(transform, node, translator);
    }

    public void visitValue(PValue value, TransformHierarchy.Node producer) {
    }

    private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformHierarchy.Node node, StreamTransformTranslator<?> translator) {
        PTransform<?, ?> typedTransform = transform;
        StreamTransformTranslator<?> typedTranslator = translator;
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform(this.getPipeline()));
        typedTranslator.translateNode(typedTransform, this.streamingContext);
    }

    private <T extends PTransform<?, ?>> boolean applyCanTranslate(PTransform<?, ?> transform, TransformHierarchy.Node node, StreamTransformTranslator<?> translator) {
        PTransform<?, ?> typedTransform = transform;
        StreamTransformTranslator<?> typedTranslator = translator;
        this.streamingContext.setCurrentTransform(node.toAppliedPTransform(this.getPipeline()));
        return typedTranslator.canTranslate(typedTransform, this.streamingContext);
    }

    @VisibleForTesting
    static class FlinkAutoBalancedShardKeyShardingFunction<UserT, DestinationT>
    implements ShardingFunction<UserT, DestinationT> {
        @VisibleForTesting
        static final int CACHE_MAX_SIZE = 100;
        private static final long CACHE_EXPIRE_SECONDS = 600L;
        private final int parallelism;
        private final int maxParallelism;
        private final Coder<DestinationT> destinationCoder;
        private final ShardedKeyCoder<Integer> shardedKeyCoder = ShardedKeyCoder.of((Coder)VarIntCoder.of());
        private transient Cache<Integer, Map<Integer, ShardedKey<Integer>>> cache;
        private int shardNumber = -1;

        @VisibleForTesting
        Map<Integer, Map<Integer, ShardedKey<Integer>>> getCache() {
            return this.cache == null ? null : this.cache.asMap();
        }

        @VisibleForTesting
        int getMaxParallelism() {
            return this.maxParallelism;
        }

        FlinkAutoBalancedShardKeyShardingFunction(int parallelism, int maxParallelism, Coder<DestinationT> destinationCoder) {
            this.parallelism = parallelism;
            this.maxParallelism = maxParallelism > 0 ? maxParallelism : KeyGroupRangeAssignment.computeDefaultMaxParallelism((int)parallelism);
            this.destinationCoder = destinationCoder;
        }

        public ShardedKey<Integer> assignShardKey(DestinationT destination, UserT element, int shardCount) throws Exception {
            this.shardNumber = this.shardNumber == -1 ? ThreadLocalRandom.current().nextInt(shardCount) : (this.shardNumber + 1) % shardCount;
            int destinationKey = Arrays.hashCode(CoderUtils.encodeToByteArray(this.destinationCoder, destination));
            if (this.cache == null) {
                this.cache = CacheBuilder.newBuilder().maximumSize(100L).expireAfterAccess(600L, TimeUnit.SECONDS).build();
            }
            if (this.cache.getIfPresent((Object)destinationKey) == null) {
                this.cache.put((Object)destinationKey, this.generateShardedKeys(destinationKey, shardCount));
            }
            return (ShardedKey)((Map)this.cache.getIfPresent((Object)destinationKey)).get(this.shardNumber);
        }

        private Map<Integer, ShardedKey<Integer>> generateShardedKeys(int key, int shardCount) {
            HashMap<Integer, ShardedKey<Integer>> shardedKeys = new HashMap<Integer, ShardedKey<Integer>>();
            for (int shard = 0; shard < shardCount; ++shard) {
                int targetPartition;
                ShardedKey shk;
                ByteBuffer effectiveKey;
                int partition;
                int salt = -1;
                do {
                    if (salt++ == Integer.MAX_VALUE) {
                        throw new RuntimeException("Failed to find sharded key in [ 2147483647 ] iterations");
                    }
                    shk = ShardedKey.of((Object)Objects.hash(key, salt), (int)shard);
                    targetPartition = shard % this.parallelism;
                } while ((partition = KeyGroupRangeAssignment.assignKeyToParallelOperator((Object)(effectiveKey = FlinkKeyUtils.encodeKey(shk, this.shardedKeyCoder)), (int)this.maxParallelism, (int)this.parallelism)) != targetPartition);
                shardedKeys.put(shard, (ShardedKey<Integer>)shk);
            }
            return shardedKeys;
        }
    }

    @VisibleForTesting
    static class StreamingShardedWriteFactory<UserT, DestinationT, OutputT>
    implements PTransformOverrideFactory<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> {
        FlinkPipelineOptions options;

        static PTransformMatcher writeFilesNeedsOverrides() {
            return application -> {
                if (PTransformTranslation.WRITE_FILES_TRANSFORM_URN.equals(PTransformTranslation.urnForTransformOrNull((PTransform)application.getTransform()))) {
                    try {
                        FlinkPipelineOptions options = (FlinkPipelineOptions)application.getPipeline().getOptions().as(FlinkPipelineOptions.class);
                        ShardingFunction shardingFn = ((WriteFiles)application.getTransform()).getShardingFunction();
                        return WriteFilesTranslation.isRunnerDeterminedSharding((AppliedPTransform)application) || options.isAutoBalanceWriteFilesShardingEnabled() != false && shardingFn == null;
                    }
                    catch (IOException exc) {
                        throw new RuntimeException(String.format("Transform with URN %s failed to parse: %s", PTransformTranslation.WRITE_FILES_TRANSFORM_URN, application.getTransform()), exc);
                    }
                }
                return false;
            };
        }

        StreamingShardedWriteFactory(PipelineOptions options) {
            this.options = (FlinkPipelineOptions)options.as(FlinkPipelineOptions.class);
        }

        public PTransformOverrideFactory.PTransformReplacement<PCollection<UserT>, WriteFilesResult<DestinationT>> getReplacementTransform(AppliedPTransform<PCollection<UserT>, WriteFilesResult<DestinationT>, WriteFiles<UserT, DestinationT, OutputT>> transform) {
            Integer jobParallelism = this.options.getParallelism();
            Preconditions.checkArgument((jobParallelism > 0 ? 1 : 0) != 0, (String)"Parallelism of a job should be greater than 0. Currently set: %s", (Object[])new Object[]{jobParallelism});
            int numShards = jobParallelism * 2;
            try {
                List sideInputs = WriteFilesTranslation.getDynamicDestinationSideInputs(transform);
                FileBasedSink sink = WriteFilesTranslation.getSink(transform);
                WriteFiles replacement = WriteFiles.to((FileBasedSink)sink).withSideInputs(sideInputs);
                if (WriteFilesTranslation.isWindowedWrites(transform)) {
                    replacement = replacement.withWindowedWrites();
                }
                if (WriteFilesTranslation.isRunnerDeterminedSharding(transform)) {
                    replacement = replacement.withNumShards(numShards);
                } else {
                    if (((WriteFiles)transform.getTransform()).getNumShardsProvider() != null) {
                        replacement = replacement.withNumShards(((WriteFiles)transform.getTransform()).getNumShardsProvider());
                    }
                    if (((WriteFiles)transform.getTransform()).getComputeNumShards() != null) {
                        replacement = replacement.withSharding(((WriteFiles)transform.getTransform()).getComputeNumShards());
                    }
                }
                if (this.options.isAutoBalanceWriteFilesShardingEnabled().booleanValue()) {
                    replacement = replacement.withShardingFunction(new FlinkAutoBalancedShardKeyShardingFunction(jobParallelism, this.options.getMaxParallelism(), sink.getDynamicDestinations().getDestinationCoder()));
                }
                return PTransformOverrideFactory.PTransformReplacement.of((PInput)PTransformReplacements.getSingletonMainInput(transform), (PTransform)replacement);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        public Map<PValue, PTransformOverrideFactory.ReplacementOutput> mapOutputs(Map<TupleTag<?>, PValue> outputs, WriteFilesResult<DestinationT> newOutput) {
            return ReplacementOutputs.tagged(outputs, newOutput);
        }
    }

    static abstract class StreamTransformTranslator<T extends PTransform> {
        StreamTransformTranslator() {
        }

        abstract void translateNode(T var1, FlinkStreamingTranslationContext var2);

        boolean canTranslate(T transform, FlinkStreamingTranslationContext context) {
            return true;
        }
    }
}

