/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.di.beam.components;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.POutput;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.runtime.di.JobStateAware;
import org.talend.sdk.component.runtime.di.beam.DelegatingBoundedSource;
import org.talend.sdk.component.runtime.di.beam.DelegatingUnBoundedSource;
import org.talend.sdk.component.runtime.di.beam.InMemoryQueueIO;
import org.talend.sdk.component.runtime.di.beam.SettableSourceListener;
import org.talend.sdk.component.runtime.di.beam.SourceListener;

public class DIPipeline
extends Pipeline {
    private final Collection<PTransform<?, ?>> transformStack = new ArrayList();
    private final Map<PTransform<?, ?>, JobStateAware.State> states = new ConcurrentHashMap();
    private String currentState;

    public DIPipeline(PipelineOptions options) {
        super(options);
    }

    public <OutputT extends POutput> OutputT apply(PTransform<? super PBegin, OutputT> root) {
        this.transformStack.add(root);
        try {
            POutput pOutput = super.apply(this.wrapTransformIfNeeded(root));
            return (OutputT)pOutput;
        }
        finally {
            this.transformStack.remove(root);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <OutputT extends POutput> OutputT apply(String name, PTransform<? super PBegin, OutputT> root) {
        this.transformStack.add(root);
        try {
            POutput pOutput = super.apply(name, this.wrapTransformIfNeeded(root));
            return (OutputT)pOutput;
        }
        finally {
            this.transformStack.remove(root);
        }
    }

    private <PT extends POutput> PTransform<? super PBegin, PT> wrapTransformIfNeeded(PTransform<? super PBegin, PT> root) {
        if (Read.Bounded.class.isInstance(root)) {
            BoundedSource source = ((Read.Bounded)Read.Bounded.class.cast(root)).getSource();
            DelegatingBoundedSource boundedSource = new DelegatingBoundedSource(source, null);
            this.setState(boundedSource);
            return Read.from(boundedSource);
        }
        if (Read.Unbounded.class.isInstance(root)) {
            UnboundedSource source = ((Read.Unbounded)Read.Unbounded.class.cast(root)).getSource();
            if (InMemoryQueueIO.UnboundedQueuedInput.class.isInstance(source)) {
                return root;
            }
            DelegatingUnBoundedSource unBoundedSource = new DelegatingUnBoundedSource(source, null);
            this.setState(unBoundedSource);
            return Read.from(unBoundedSource);
        }
        return root;
    }

    private void setState(SettableSourceListener settableSourceListener) {
        Optional<PTransform> transform = this.transformStack.stream().filter(this.states::containsKey).findFirst();
        if (!transform.isPresent()) {
            throw new IllegalStateException("No state for transforms " + this.transformStack);
        }
        transform.ifPresent(s -> {
            SourceListener.Tracker tracker = new SourceListener.Tracker();
            SourceListener.TRACKERS.put(tracker.getId(), tracker);
            SourceListener.StateReleaserSourceListener listener = new SourceListener.StateReleaserSourceListener(tracker.getId(), Objects.requireNonNull(this.currentState, "currentState is not set"), null, null);
            settableSourceListener.setSourceListener(listener);
        });
    }

    public void registerStateForTransform(PTransform<PBegin, PCollection<Record>> transform, JobStateAware.State jobState) {
        this.states.put(transform, jobState);
    }

    public void withState(String stateId, Runnable task) {
        this.currentState = stateId;
        try {
            task.run();
        }
        finally {
            this.currentState = null;
        }
    }
}

