/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import org.apache.beam.runners.core.construction.CreatePCollectionViewTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.ParDoTranslation;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.flink.FlinkBatchPipelineTranslator;
import org.apache.beam.runners.flink.FlinkBatchTranslationContext;
import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows;
import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction;
import org.apache.beam.runners.flink.translation.functions.FlinkStatefulDoFnFunction;
import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
import org.apache.beam.runners.flink.translation.types.KvKeySelector;
import org.apache.beam.runners.flink.translation.wrappers.ImpulseInputFormat;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat;
import org.apache.beam.sdk.coders.CannotProvideCoderException;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.join.RawUnionValue;
import org.apache.beam.sdk.transforms.join.UnionCoder;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.PValue;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.GroupCombineOperator;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.Grouping;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.api.java.operators.SingleInputUdfOperator;
import org.apache.flink.api.java.operators.UnionOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;

class FlinkBatchTransformTranslators {
    private static final Map<String, FlinkBatchPipelineTranslator.BatchTransformTranslator> TRANSLATORS = new HashMap<String, FlinkBatchPipelineTranslator.BatchTransformTranslator>();

    static FlinkBatchPipelineTranslator.BatchTransformTranslator<?> getTranslator(PTransform<?, ?> transform) {
        String urn = PTransformTranslation.urnForTransformOrNull(transform);
        return urn == null ? null : TRANSLATORS.get(urn);
    }

    private static String getCurrentTransformName(FlinkBatchTranslationContext context) {
        return context.getCurrentTransform().getFullName();
    }

    private static void transformSideInputs(List<PCollectionView<?>> sideInputs, SingleInputUdfOperator<?, ?, ?> outputDataSet, FlinkBatchTranslationContext context) {
        for (PCollectionView<?> input : sideInputs) {
            DataSet broadcastSet = context.getSideInputDataSet(input);
            outputDataSet.withBroadcastSet(broadcastSet, input.getTagInternal().getId());
        }
    }

    private FlinkBatchTransformTranslators() {
    }

    static {
        TRANSLATORS.put(PTransformTranslation.IMPULSE_TRANSFORM_URN, new ImpulseTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.CREATE_VIEW_TRANSFORM_URN, new CreatePCollectionViewTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.COMBINE_PER_KEY_TRANSFORM_URN, new CombinePerKeyTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN, new GroupByKeyTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.RESHUFFLE_URN, new ReshuffleTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.FLATTEN_TRANSFORM_URN, new FlattenPCollectionTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.ASSIGN_WINDOWS_TRANSFORM_URN, new WindowAssignTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.PAR_DO_TRANSFORM_URN, new ParDoTranslatorBatch());
        TRANSLATORS.put(PTransformTranslation.READ_TRANSFORM_URN, new ReadSourceTranslatorBatch());
    }

    private static class CreatePCollectionViewTranslatorBatch<ElemT, ViewT>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PCollection<ElemT>, PCollection<ElemT>>> {
        private CreatePCollectionViewTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PCollection<ElemT>, PCollection<ElemT>> transform, FlinkBatchTranslationContext context) {
            PCollectionView input;
            DataSet inputDataSet = context.getInputDataSet((PValue)context.getInput(transform));
            AppliedPTransform<?, ?, ?> application = context.getCurrentTransform();
            try {
                input = CreatePCollectionViewTranslation.getView(application);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            context.setSideInputDataSet(input, inputDataSet);
        }
    }

    private static class FlattenPCollectionTranslatorBatch<T>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PCollectionList<T>, PCollection<T>>> {
        private FlattenPCollectionTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PCollectionList<T>, PCollection<T>> transform, FlinkBatchTranslationContext context) {
            Map<TupleTag<?>, PValue> allInputs = context.getInputs(transform);
            UnionOperator result = null;
            if (allInputs.isEmpty()) {
                DataSource dummySource = context.getExecutionEnvironment().fromElements((Object[])new String[]{"dummy"});
                result = dummySource.flatMap((FlatMapFunction & Serializable)(s, collector) -> {}).returns(new CoderTypeInformation(WindowedValue.getFullCoder((Coder)VoidCoder.of(), (Coder)GlobalWindow.Coder.INSTANCE)));
            } else {
                for (PValue taggedPc : allInputs.values()) {
                    Preconditions.checkArgument((boolean)(taggedPc instanceof PCollection), (String)"Got non-PCollection input to flatten: %s of type %s", (Object)taggedPc, (Object)taggedPc.getClass().getSimpleName());
                    PCollection collection = (PCollection)taggedPc;
                    UnionOperator current = context.getInputDataSet((PValue)collection);
                    if (result == null) {
                        result = current;
                        continue;
                    }
                    result = result.union(current);
                }
            }
            result = result.filter((FilterFunction & Serializable)tWindowedValue -> true).name("UnionFixFilter");
            context.setOutputDataSet((PValue)context.getOutput(transform), result);
        }
    }

    private static class ParDoTranslatorBatch<InputT, OutputT>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PCollection<InputT>, PCollectionTuple>> {
        private ParDoTranslatorBatch() {
        }

        /*
         * WARNING - void declaration
         */
        @Override
        public void translateNode(PTransform<PCollection<InputT>, PCollectionTuple> transform, FlinkBatchTranslationContext context) {
            MapPartitionOperator outputDataSet;
            boolean usesStateOrTimers;
            List sideInputs;
            void var12_17;
            TupleTag mainOutputTag;
            DoFn doFn;
            try {
                doFn = ParDoTranslation.getDoFn(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            Preconditions.checkState((!DoFnSignatures.signatureForDoFn((DoFn)doFn).processElement().isSplittable() ? 1 : 0) != 0, (String)"Not expected to directly translate splittable DoFn, should have been overridden: %s", (Object)doFn);
            DataSet inputDataSet = context.getInputDataSet((PValue)context.getInput(transform));
            Map<TupleTag<?>, PValue> outputs = context.getOutputs(transform);
            try {
                mainOutputTag = ParDoTranslation.getMainOutputTag(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            DoFnSchemaInformation doFnSchemaInformation = ParDoTranslation.getSchemaInformation(context.getCurrentTransform());
            Map sideInputMapping = ParDoTranslation.getSideInputMapping(context.getCurrentTransform());
            HashMap outputMap = Maps.newHashMap();
            outputMap.put(mainOutputTag, 0);
            int count = 1;
            for (TupleTag<?> tupleTag : outputs.keySet()) {
                if (outputMap.containsKey(tupleTag)) continue;
                outputMap.put(tupleTag, count++);
            }
            TreeMap indexMap = Maps.newTreeMap();
            for (Map.Entry entry : outputMap.entrySet()) {
                indexMap.put((Integer)entry.getValue(), (TupleTag)entry.getKey());
            }
            Object var12_16 = null;
            ArrayList arrayList = Lists.newArrayList();
            for (TupleTag tag : indexMap.values()) {
                PValue taggedValue = outputs.get(tag);
                Preconditions.checkState((boolean)(taggedValue instanceof PCollection), (String)"Within ParDo, got a non-PCollection output %s of type %s", (Object)taggedValue, (Object)taggedValue.getClass().getSimpleName());
                PCollection coll = (PCollection)taggedValue;
                arrayList.add(coll.getCoder());
                WindowingStrategy windowingStrategy = coll.getWindowingStrategy();
            }
            if (var12_17 == null) {
                throw new IllegalStateException("No outputs defined.");
            }
            UnionCoder unionCoder = UnionCoder.of((List)arrayList);
            CoderTypeInformation typeInformation = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)unionCoder, (Coder)var12_17.getWindowFn().windowCoder()));
            try {
                sideInputs = ParDoTranslation.getSideInputs(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            HashMap sideInputStrategies = new HashMap();
            for (PCollectionView sideInput : sideInputs) {
                sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
            }
            try {
                usesStateOrTimers = ParDoTranslation.usesStateOrTimers(context.getCurrentTransform());
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            Map<TupleTag<?>, Coder<?>> outputCoderMap = context.getOutputCoders();
            String fullName = FlinkBatchTransformTranslators.getCurrentTransformName(context);
            if (usesStateOrTimers) {
                KvCoder inputCoder = (KvCoder)context.getInput(transform).getCoder();
                FlinkStatefulDoFnFunction doFnWrapper = new FlinkStatefulDoFnFunction(doFn, fullName, (WindowingStrategy<?, ?>)var12_17, sideInputStrategies, context.getPipelineOptions(), outputMap, mainOutputTag, inputCoder, outputCoderMap, doFnSchemaInformation, sideInputMapping);
                UnsortedGrouping grouping = inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
                outputDataSet = new GroupReduceOperator((Grouping)grouping, typeInformation, doFnWrapper, fullName);
            } else {
                FlinkDoFnFunction doFnWrapper = new FlinkDoFnFunction(doFn, fullName, (WindowingStrategy<?, ?>)var12_17, sideInputStrategies, context.getPipelineOptions(), outputMap, mainOutputTag, context.getInput(transform).getCoder(), outputCoderMap, doFnSchemaInformation, sideInputMapping);
                outputDataSet = new MapPartitionOperator(inputDataSet, typeInformation, doFnWrapper, fullName);
            }
            FlinkBatchTransformTranslators.transformSideInputs(sideInputs, (SingleInputUdfOperator)outputDataSet, context);
            for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) {
                this.pruneOutput((DataSet<WindowedValue<RawUnionValue>>)outputDataSet, context, (Integer)outputMap.get(output.getKey()), (PCollection)output.getValue());
            }
        }

        private <T> void pruneOutput(DataSet<WindowedValue<RawUnionValue>> taggedDataSet, FlinkBatchTranslationContext context, int integerTag, PCollection<T> collection) {
            TypeInformation<WindowedValue<T>> outputType = context.getTypeInfo(collection);
            FlinkMultiOutputPruningFunction pruningFunction = new FlinkMultiOutputPruningFunction(integerTag);
            FlatMapOperator pruningOperator = new FlatMapOperator(taggedDataSet, outputType, pruningFunction, collection.getName());
            context.setOutputDataSet((PValue)collection, pruningOperator);
        }
    }

    private static class CombinePerKeyTranslatorBatch<K, InputT, AccumT, OutputT>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>>> {
        private CombinePerKeyTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, OutputT>>> transform, FlinkBatchTranslationContext context) {
            Coder accumulatorCoder;
            DataSet inputDataSet = context.getInputDataSet((PValue)context.getInput(transform));
            CombineFnBase.GlobalCombineFn combineFn = ((Combine.PerKey)transform).getFn();
            KvCoder inputCoder = (KvCoder)context.getInput(transform).getCoder();
            try {
                accumulatorCoder = combineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
            }
            catch (CannotProvideCoderException e) {
                throw new RuntimeException(e);
            }
            WindowingStrategy windowingStrategy = context.getInput(transform).getWindowingStrategy();
            TypeInformation partialReduceTypeInfo = context.getTypeInfo(KvCoder.of((Coder)inputCoder.getKeyCoder(), (Coder)accumulatorCoder), (WindowingStrategy<?, ?>)windowingStrategy);
            UnsortedGrouping inputGrouping = inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
            HashMap sideInputStrategies = new HashMap();
            for (PCollectionView sideInput : ((Combine.PerKey)transform).getSideInputs()) {
                sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal());
            }
            WindowingStrategy boundedStrategy = windowingStrategy;
            String fullName = FlinkBatchTransformTranslators.getCurrentTransformName(context);
            if (windowingStrategy.getWindowFn().isNonMerging()) {
                FlinkPartialReduceFunction partialReduceFunction = new FlinkPartialReduceFunction(combineFn, boundedStrategy, sideInputStrategies, context.getPipelineOptions());
                FlinkReduceFunction reduceFunction = new FlinkReduceFunction(combineFn, boundedStrategy, sideInputStrategies, context.getPipelineOptions());
                GroupCombineOperator groupCombine = new GroupCombineOperator((Grouping)inputGrouping, partialReduceTypeInfo, partialReduceFunction, "GroupCombine: " + fullName);
                FlinkBatchTransformTranslators.transformSideInputs(((Combine.PerKey)transform).getSideInputs(), (SingleInputUdfOperator)groupCombine, context);
                TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
                UnsortedGrouping intermediateGrouping = groupCombine.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
                GroupReduceOperator outputDataSet = new GroupReduceOperator((Grouping)intermediateGrouping, reduceTypeInfo, reduceFunction, fullName);
                FlinkBatchTransformTranslators.transformSideInputs(((Combine.PerKey)transform).getSideInputs(), (SingleInputUdfOperator)outputDataSet, context);
                context.setOutputDataSet((PValue)context.getOutput(transform), outputDataSet);
            } else {
                FlinkMergingNonShuffleReduceFunction reduceFunction = new FlinkMergingNonShuffleReduceFunction(combineFn, boundedStrategy, sideInputStrategies, context.getPipelineOptions());
                TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform));
                UnsortedGrouping grouping = inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
                GroupReduceOperator outputDataSet = new GroupReduceOperator((Grouping)grouping, reduceTypeInfo, reduceFunction, fullName);
                FlinkBatchTransformTranslators.transformSideInputs(((Combine.PerKey)transform).getSideInputs(), (SingleInputUdfOperator)outputDataSet, context);
                context.setOutputDataSet((PValue)context.getOutput(transform), outputDataSet);
            }
        }
    }

    private static class Concatenate<T>
    extends Combine.CombineFn<T, List<T>, List<T>> {
        private Concatenate() {
        }

        public List<T> createAccumulator() {
            return new ArrayList();
        }

        public List<T> addInput(List<T> accumulator, T input) {
            accumulator.add(input);
            return accumulator;
        }

        public List<T> mergeAccumulators(Iterable<List<T>> accumulators) {
            Object result = this.createAccumulator();
            for (List<T> accumulator : accumulators) {
                result.addAll(accumulator);
            }
            return result;
        }

        public List<T> extractOutput(List<T> accumulator) {
            return accumulator;
        }

        public Coder<List<T>> getAccumulatorCoder(CoderRegistry registry, Coder<T> inputCoder) {
            return ListCoder.of(inputCoder);
        }

        public Coder<List<T>> getDefaultOutputCoder(CoderRegistry registry, Coder<T> inputCoder) {
            return ListCoder.of(inputCoder);
        }
    }

    private static class ReshuffleTranslatorBatch<K, InputT>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Reshuffle<K, InputT>> {
        private ReshuffleTranslatorBatch() {
        }

        @Override
        public void translateNode(Reshuffle<K, InputT> transform, FlinkBatchTranslationContext context) {
            DataSet inputDataSet = context.getInputDataSet((PValue)context.getInput(transform));
            context.setOutputDataSet((PValue)context.getOutput(transform), inputDataSet.rebalance());
        }
    }

    private static class GroupByKeyTranslatorBatch<K, InputT>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>>> {
        private GroupByKeyTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform, FlinkBatchTranslationContext context) {
            Coder accumulatorCoder;
            DataSet inputDataSet = context.getInputDataSet((PValue)context.getInput(transform));
            Concatenate combineFn = new Concatenate();
            KvCoder inputCoder = (KvCoder)context.getInput(transform).getCoder();
            try {
                accumulatorCoder = combineFn.getAccumulatorCoder(context.getInput(transform).getPipeline().getCoderRegistry(), inputCoder.getValueCoder());
            }
            catch (CannotProvideCoderException e) {
                throw new RuntimeException(e);
            }
            WindowingStrategy windowingStrategy = context.getInput(transform).getWindowingStrategy();
            CoderTypeInformation partialReduceTypeInfo = new CoderTypeInformation(WindowedValue.getFullCoder((Coder)KvCoder.of((Coder)inputCoder.getKeyCoder(), (Coder)accumulatorCoder), (Coder)windowingStrategy.getWindowFn().windowCoder()));
            UnsortedGrouping inputGrouping = inputDataSet.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
            WindowingStrategy boundedStrategy = windowingStrategy;
            FlinkPartialReduceFunction partialReduceFunction = new FlinkPartialReduceFunction(combineFn, boundedStrategy, Collections.emptyMap(), context.getPipelineOptions());
            FlinkReduceFunction reduceFunction = new FlinkReduceFunction(combineFn, boundedStrategy, Collections.emptyMap(), context.getPipelineOptions());
            String fullName = FlinkBatchTransformTranslators.getCurrentTransformName(context);
            GroupCombineOperator groupCombine = new GroupCombineOperator((Grouping)inputGrouping, partialReduceTypeInfo, partialReduceFunction, "GroupCombine: " + fullName);
            UnsortedGrouping intermediateGrouping = groupCombine.groupBy(new KvKeySelector(inputCoder.getKeyCoder()));
            GroupReduceOperator outputDataSet = new GroupReduceOperator((Grouping)intermediateGrouping, partialReduceTypeInfo, reduceFunction, fullName);
            context.setOutputDataSet((PValue)context.getOutput(transform), outputDataSet);
        }
    }

    private static class WindowAssignTranslatorBatch<T>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PCollection<T>, PCollection<T>>> {
        private WindowAssignTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PCollection<T>, PCollection<T>> transform, FlinkBatchTranslationContext context) {
            PCollection<T> input = context.getInput(transform);
            TypeInformation<WindowedValue<T>> resultTypeInfo = context.getTypeInfo(context.getOutput(transform));
            DataSet inputDataSet = context.getInputDataSet((PValue)input);
            WindowingStrategy windowingStrategy = context.getOutput(transform).getWindowingStrategy();
            WindowFn windowFn = windowingStrategy.getWindowFn();
            FlinkAssignWindows assignWindowsFunction = new FlinkAssignWindows(windowFn);
            SingleInputUdfOperator resultDataSet = ((FlatMapOperator)inputDataSet.flatMap(assignWindowsFunction).name(context.getOutput(transform).getName())).returns(resultTypeInfo);
            context.setOutputDataSet((PValue)context.getOutput(transform), resultDataSet);
        }
    }

    private static class ReadSourceTranslatorBatch<T>
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PBegin, PCollection<T>>> {
        private ReadSourceTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PBegin, PCollection<T>> transform, FlinkBatchTranslationContext context) {
            BoundedSource source;
            AppliedPTransform<?, ?, ?> application = context.getCurrentTransform();
            try {
                source = ReadTranslation.boundedSourceFromTransform(application);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            PCollection<T> output = context.getOutput(transform);
            TypeInformation<WindowedValue<T>> typeInformation = context.getTypeInfo(output);
            String fullName = FlinkBatchTransformTranslators.getCurrentTransformName(context);
            DataSource dataSource = new DataSource(context.getExecutionEnvironment(), new SourceInputFormat(fullName, source, context.getPipelineOptions()), typeInformation, fullName);
            context.setOutputDataSet((PValue)output, dataSource);
        }
    }

    private static class ImpulseTranslatorBatch
    implements FlinkBatchPipelineTranslator.BatchTransformTranslator<PTransform<PBegin, PCollection<byte[]>>> {
        private ImpulseTranslatorBatch() {
        }

        @Override
        public void translateNode(PTransform<PBegin, PCollection<byte[]>> transform, FlinkBatchTranslationContext context) {
            String name = transform.getName();
            PCollection<byte[]> output = context.getOutput(transform);
            TypeInformation<WindowedValue<byte[]>> typeInformation = context.getTypeInfo(output);
            DataSource dataSource = new DataSource(context.getExecutionEnvironment(), (InputFormat)new ImpulseInputFormat(), typeInformation, name);
            context.setOutputDataSet((PValue)output, dataSource);
        }
    }
}

