/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam.spi;

import java.io.ObjectStreamException;
import java.io.Serializable;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.source.ProducerFinder;
import org.talend.sdk.component.runtime.base.Delegated;
import org.talend.sdk.component.runtime.base.LifecycleImpl;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.manager.service.ProducerFinderImpl;
import org.talend.sdk.component.runtime.manager.service.api.ComponentInstantiator;
import org.talend.sdk.component.runtime.serialization.SerializableService;

public class BeamProducerFinder
extends ProducerFinderImpl {
    private static final Logger log = LoggerFactory.getLogger(BeamProducerFinder.class);
    static final Queue<Record> QUEUE = new ArrayBlockingQueue<Record>(20, true);

    public Iterator<Record> find(String familyName, String inputName, int version, Map<String, String> configuration) {
        ComponentInstantiator instantiator = this.getInstantiator(familyName, inputName);
        Mapper mapper = this.findMapper(instantiator, version, configuration);
        try {
            Input input = mapper.create();
            return this.iterator(input);
        }
        catch (Exception e) {
            log.warn("Component Kit Mapper instantiation failed, trying to wrap native beam mapper...");
            Object delegate = ((Delegated)Delegated.class.cast(mapper)).getDelegate();
            if (PTransform.class.isInstance(delegate)) {
                return new QueueInput(delegate, familyName, inputName, familyName, (PTransform<PBegin, PCollection<Record>>)((PTransform)PTransform.class.cast(delegate)));
            }
            throw new IllegalStateException(e);
        }
    }

    Object writeReplace() throws ObjectStreamException {
        return new SerializableService(this.plugin, ProducerFinder.class.getName());
    }

    static class PushRecord
    extends DoFn<Record, Void>
    implements Serializable {
        PushRecord() {
        }

        @DoFn.ProcessElement
        public void processElement(@DoFn.Element Record record) {
            boolean ok = QUEUE.offer(record);
            while (!ok) {
                this.sleep();
                ok = QUEUE.offer(record);
            }
        }

        private void sleep() {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    static class QueueInput
    extends LifecycleImpl
    implements Input,
    Iterator<Record> {
        private final PTransform<PBegin, PCollection<Record>> transform;
        private final PipelineResult result;
        private boolean started;
        private boolean end;
        private Record next;

        public QueueInput(Object delegate, String rootName, String name, String plugin, PTransform<PBegin, PCollection<Record>> transform) {
            super(delegate, rootName, name, plugin);
            this.transform = transform;
            this.result = this.init();
        }

        @Override
        public boolean hasNext() {
            if (this.next == null && !this.started) {
                this.next = this.findNext();
                this.started = true;
            }
            return this.next != null;
        }

        @Override
        public Record next() {
            if (!this.hasNext()) {
                return null;
            }
            Record current = this.next;
            this.next = this.findNext();
            return current;
        }

        private Record findNext() {
            Record record = QUEUE.poll();
            while (record == null && !this.end) {
                this.end = this.result.getState() != PipelineResult.State.RUNNING;
                this.sleep();
                record = QUEUE.poll();
            }
            return record;
        }

        private PipelineResult init() {
            PipelineOptions options = PipelineOptionsFactory.create();
            PushRecord pushRecord = new PushRecord();
            ParDo.SingleOutput of = ParDo.of((DoFn)pushRecord);
            Pipeline p = Pipeline.create((PipelineOptions)options);
            ((PCollection)p.apply(this.transform)).apply((PTransform)of);
            return p.run();
        }

        private void sleep() {
            try {
                Thread.sleep(100L);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

