/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam.impl;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InvalidObjectException;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.base.Lifecycle;
import org.talend.sdk.component.runtime.base.Serializer;
import org.talend.sdk.component.runtime.beam.impl.BeamProcessorImpl;
import org.talend.sdk.component.runtime.beam.impl.CapturingPipeline;
import org.talend.sdk.component.runtime.beam.impl.StoringOuputFactory;
import org.talend.sdk.component.runtime.output.InputFactory;
import org.talend.sdk.component.runtime.output.OutputFactory;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.serialization.ContainerFinder;
import org.talend.sdk.component.runtime.serialization.EnhancedObjectInputStream;

public class BeamProcessorChainImpl
implements Processor,
Serializable,
Delegated {
    private final Object original;
    private final String family;
    private final String name;
    private final String plugin;
    private final List<Processor> processors;
    private final List<Processor> preProcessors;
    private final Processor lastProcessor;
    private int startedCount;
    private int beforeChunkCount;

    public BeamProcessorChainImpl(PTransform<PCollection<?>, ?> transform, CoderRegistry coderRegistry, String plugin, String family, String name) {
        this(Collections.singletonList(new CapturingPipeline.TransformWithCoder(transform, null)), coderRegistry, plugin, family, name);
    }

    public BeamProcessorChainImpl(List<CapturingPipeline.TransformWithCoder> transforms, CoderRegistry coderRegistry, String plugin, String family, String name) {
        this(transforms.get(transforms.size() - 1), transforms.stream().flatMap(t -> BeamProcessorChainImpl.toProcessors(t, coderRegistry, plugin, family, name)).collect(Collectors.toList()), plugin, family, name);
    }

    protected BeamProcessorChainImpl(Object original, List<Processor> processors, String plugin, String family, String name) {
        this.original = original;
        this.plugin = plugin;
        this.family = family;
        this.name = name;
        this.processors = processors;
        this.preProcessors = processors.size() <= 1 ? Collections.emptyList() : processors.subList(0, processors.size() - 1);
        this.lastProcessor = processors.get(processors.size() - 1);
    }

    Object writeReplace() throws ObjectStreamException {
        return new SerializationReplacer(this.plugin(), this.rootName(), this.name(), Serializer.toBytes(this.processors));
    }

    public void beforeGroup() {
        this.beforeChunkCount = 0;
        for (Processor p : this.processors) {
            p.beforeGroup();
            ++this.beforeChunkCount;
        }
    }

    public void afterGroup(OutputFactory output) {
        if (this.beforeChunkCount == 0) {
            return;
        }
        ArrayList<Processor> toExecute = new ArrayList<Processor>(this.processors.subList(0, this.beforeChunkCount));
        Collections.reverse(toExecute);
        toExecute.forEach(p -> p.afterGroup(output));
        this.beforeChunkCount = 0;
    }

    public void onNext(InputFactory input, OutputFactory output) {
        Collection<Object> finalInput = Collections.singletonList(input);
        if (!this.preProcessors.isEmpty()) {
            for (Processor p : this.preProcessors) {
                StoringOuputFactory tmpOutput = new StoringOuputFactory();
                finalInput.forEach(in -> p.onNext(in, (OutputFactory)tmpOutput));
                if (tmpOutput.getValues() == null) {
                    return;
                }
                finalInput = tmpOutput.getValues().stream().map(val -> name -> {
                    if (!"__default__".equals(name)) {
                        throw new IllegalArgumentException("Only default branch is supported at the moment");
                    }
                    return val;
                }).collect(Collectors.toList());
            }
        }
        finalInput.forEach(in -> this.lastProcessor.onNext(in, output));
    }

    public String plugin() {
        return this.plugin;
    }

    public String rootName() {
        return this.family;
    }

    public String name() {
        return this.name;
    }

    public void start() {
        this.startedCount = 0;
        for (Processor p : this.processors) {
            p.start();
            ++this.startedCount;
        }
    }

    public void stop() {
        if (this.startedCount == 0) {
            return;
        }
        ArrayList<Processor> toExecute = new ArrayList<Processor>(this.processors.subList(0, this.startedCount));
        Collections.reverse(toExecute);
        toExecute.forEach(Lifecycle::stop);
        this.startedCount = 0;
    }

    public Object getDelegate() {
        return this.original;
    }

    private static Stream<Processor> toProcessors(CapturingPipeline.TransformWithCoder transform, CoderRegistry coderRegistry, String plugin, String family, String name) {
        return BeamProcessorChainImpl.extractDoFn(transform, coderRegistry).stream().map(fn -> new BeamProcessorImpl(fn, (DoFn<?, ?>)fn, plugin, family, name));
    }

    private static Collection<DoFn<?, ?>> extractDoFn(CapturingPipeline.TransformWithCoder step, CoderRegistry coderRegistry) {
        POutput apply;
        final CapturingPipeline capturingPipeline = new CapturingPipeline(PipelineOptionsFactory.create());
        if (coderRegistry != null) {
            capturingPipeline.setCoderRegistry(coderRegistry);
        }
        if (PCollectionTuple.class.isInstance(apply = capturingPipeline.apply(new PTransform<PBegin, PCollection<Object>>(){

            public PCollection<Object> expand(PBegin input) {
                return PCollection.createPrimitiveOutputInternal((Pipeline)capturingPipeline, (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.BOUNDED, (Coder)TypingCoder.INSTANCE);
            }

            protected Coder<?> getDefaultOutputCoder() {
                return TypingCoder.INSTANCE;
            }
        }).apply(step.getTransform())) && step.getCoders() != null) {
            Map all = ((PCollectionTuple)PCollectionTuple.class.cast(apply)).getAll();
            step.getCoders().forEach((k, v) -> {
                PCollection collection = (PCollection)all.get(k);
                if (collection != null) {
                    collection.setCoder((Coder)Coder.class.cast(v));
                }
            });
        } else if (PCollection.class.isInstance(apply) && step.getCoders() != null && !step.getCoders().isEmpty()) {
            ((PCollection)PCollection.class.cast(apply)).setCoder((Coder)Coder.class.cast(step.getCoders().values().iterator().next()));
        }
        CapturingPipeline.SinkExtractor sinkExtractor = new CapturingPipeline.SinkExtractor();
        capturingPipeline.traverseTopologically(sinkExtractor);
        return sinkExtractor.getOutputs();
    }

    private static class TypingCoder
    extends Coder<Object> {
        private static final Coder<Object> INSTANCE = new TypingCoder();

        private TypingCoder() {
        }

        public void encode(Object o, OutputStream outputStream) throws IOException {
        }

        public Object decode(InputStream inputStream) throws IOException {
            return null;
        }

        public List<? extends Coder<?>> getCoderArguments() {
            return Collections.emptyList();
        }

        public void verifyDeterministic() throws Coder.NonDeterministicException {
        }
    }

    private static class SerializationReplacer
    implements Serializable {
        private final String plugin;
        private final String component;
        private final String name;
        private final byte[] value;

        Object readResolve() throws ObjectStreamException {
            try {
                List processors = (List)List.class.cast(this.loadDelegate());
                return new BeamProcessorChainImpl((Object)processors, processors, this.plugin, this.component, this.name);
            }
            catch (IOException | ClassNotFoundException e) {
                throw new InvalidObjectException(e.getMessage());
            }
        }

        private Serializable loadDelegate() throws IOException, ClassNotFoundException {
            try (EnhancedObjectInputStream ois = new EnhancedObjectInputStream((InputStream)new ByteArrayInputStream(this.value), ContainerFinder.Instance.get().find(this.plugin).classloader());){
                Serializable serializable = (Serializable)Serializable.class.cast(ois.readObject());
                return serializable;
            }
        }

        public SerializationReplacer(String plugin, String component, String name, byte[] value) {
            this.plugin = plugin;
            this.component = component;
            this.name = name;
            this.value = value;
        }
    }
}

