/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import javax.json.bind.Jsonb;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.record.Schema;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.runtime.beam.spi.record.RecordCollectors;
import org.talend.sdk.component.runtime.output.InputFactory;
import org.talend.sdk.component.runtime.output.OutputFactory;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.output.ProcessorImpl;
import org.talend.sdk.component.runtime.record.RecordConverters;
import org.talend.sdk.component.runtime.record.Schemas;
import org.talend.sdk.component.runtime.serialization.ContainerFinder;
import org.talend.sdk.component.runtime.serialization.LightContainer;

abstract class BaseProcessorFn<O>
extends DoFn<Record, O> {
    private static final Logger log = LoggerFactory.getLogger(BaseProcessorFn.class);
    protected Processor processor;
    protected int maxBatchSize = -1;
    protected int currentCount;
    protected volatile RecordBuilderFactory recordFactory;
    protected volatile Jsonb jsonb;

    BaseProcessorFn(Processor processor) {
        this.processor = processor;
        if (ProcessorImpl.class.isInstance(processor)) {
            ((ProcessorImpl)ProcessorImpl.class.cast(processor)).getInternalConfiguration().entrySet().stream().filter(it -> ((String)it.getKey()).endsWith("$maxBatchSize") && it.getValue() != null && !((String)it.getValue()).trim().isEmpty()).findFirst().ifPresent(val -> {
                try {
                    this.maxBatchSize = Integer.parseInt(((String)val.getValue()).trim());
                }
                catch (NumberFormatException nfe) {
                    log.warn("Invalid configuration: " + val);
                }
            });
        }
    }

    protected abstract Consumer<Record> toEmitter(DoFn.ProcessContext var1);

    protected abstract BeamOutputFactory getFinishBundleOutputFactory(DoFn.FinishBundleContext var1);

    @DoFn.Setup
    public void setup() throws Exception {
        this.processor.start();
    }

    @DoFn.ProcessElement
    public void processElement(DoFn.ProcessContext context) {
        this.ensureInit();
        if (this.currentCount == 0) {
            this.processor.beforeGroup();
        }
        BeamSingleOutputFactory output = new BeamSingleOutputFactory(this.toEmitter(context), this.recordFactory, this.jsonb);
        this.processor.onNext((InputFactory)new BeamInputFactory(context), (OutputFactory)output);
        ((BeamOutputFactory)output).postProcessing();
        ++this.currentCount;
        if (this.maxBatchSize > 0 && this.currentCount >= this.maxBatchSize) {
            this.currentCount = 0;
            BeamMultiOutputFactory ago = new BeamMultiOutputFactory(this.toEmitter(context), this.recordFactory, this.jsonb);
            this.processor.afterGroup((OutputFactory)output);
            ((BeamOutputFactory)ago).postProcessing();
        }
    }

    @DoFn.FinishBundle
    public void finishBundle(DoFn.FinishBundleContext context) {
        if (this.currentCount > 0) {
            this.ensureInit();
            this.currentCount = 0;
            BeamOutputFactory output = this.getFinishBundleOutputFactory(context);
            this.processor.afterGroup((OutputFactory)output);
            output.postProcessing();
        }
    }

    @DoFn.Teardown
    public void tearDown() {
        this.processor.stop();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensureInit() {
        if (this.jsonb == null) {
            BaseProcessorFn baseProcessorFn = this;
            synchronized (baseProcessorFn) {
                if (this.jsonb == null) {
                    LightContainer container = ContainerFinder.Instance.get().find(this.processor.plugin());
                    this.recordFactory = (RecordBuilderFactory)container.findService(RecordBuilderFactory.class);
                    this.jsonb = (Jsonb)container.findService(Jsonb.class);
                }
            }
        }
    }

    public BaseProcessorFn() {
    }

    public void setMaxBatchSize(int maxBatchSize) {
        this.maxBatchSize = maxBatchSize;
    }

    private static class BeamOutputEmitter
    implements OutputEmitter {
        private final Collection<Record> builder;
        private final RecordBuilderFactory recordBuilderFactory;
        private final Jsonb jsonb;
        private final RecordConverters converters = new RecordConverters();
        private final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry();

        public void emit(Object value) {
            if (value == null) {
                return;
            }
            this.builder.add(this.toRecord(value));
        }

        private Record toRecord(Object value) {
            return (Record)Record.class.cast(this.converters.toRecord(this.registry, value, () -> this.jsonb, () -> this.recordBuilderFactory));
        }

        public BeamOutputEmitter(Collection<Record> builder, RecordBuilderFactory recordBuilderFactory, Jsonb jsonb) {
            this.builder = builder;
            this.recordBuilderFactory = recordBuilderFactory;
            this.jsonb = jsonb;
        }
    }

    protected static final class BeamMultiOutputFactory
    extends BeamOutputFactory {
        private final Collection<Record> outputs = new ArrayList<Record>();

        protected BeamMultiOutputFactory(Consumer<Record> emit, RecordBuilderFactory factory, Jsonb jsonb) {
            super(emit, factory, jsonb);
        }

        @Override
        public OutputEmitter create(final String name) {
            return value -> {
                final ArrayList values = new ArrayList();
                new BeamOutputEmitter(values, this.factory, this.jsonb){

                    @Override
                    public void emit(Object value) {
                        super.emit(value);
                        Record first = values.isEmpty() ? null : (Record)values.iterator().next();
                        outputs.add(factory.newRecordBuilder().withArray(factory.newEntryBuilder().withName(name).withType(Schema.Type.ARRAY).withElementSchema((Schema)(first == null ? Schemas.EMPTY_RECORD : first.getSchema())).build(), values).build());
                    }
                }.emit(value);
            };
        }

        @Override
        public void postProcessing() {
            if (!this.outputs.isEmpty()) {
                this.outputs.forEach(this.emit::accept);
            }
        }
    }

    protected static final class BeamSingleOutputFactory
    extends BeamOutputFactory {
        private final Map<String, Collection<Record>> outputs = new HashMap<String, Collection<Record>>();

        protected BeamSingleOutputFactory(Consumer<Record> emit, RecordBuilderFactory factory, Jsonb jsonb) {
            super(emit, factory, jsonb);
        }

        @Override
        public OutputEmitter create(String name) {
            return new BeamOutputEmitter(this.outputs.computeIfAbsent(Schema.sanitizeConnectionName((String)name), k -> new ArrayList()), this.factory, this.jsonb);
        }

        @Override
        public void postProcessing() {
            if (!this.outputs.isEmpty()) {
                Record record = this.outputs.entrySet().stream().collect(() -> ((RecordBuilderFactory)this.factory).newRecordBuilder(), (a, o) -> {
                    Record firstElement = ((Collection)o.getValue()).isEmpty() ? null : (Record)((Collection)o.getValue()).iterator().next();
                    a.withArray(this.factory.newEntryBuilder().withName((String)o.getKey()).withType(Schema.Type.ARRAY).withElementSchema((Schema)(firstElement == null ? Schemas.EMPTY_RECORD : firstElement.getSchema())).build(), (Collection)o.getValue());
                }, RecordCollectors::merge).build();
                this.emit.accept(record);
            }
        }
    }

    protected static abstract class BeamOutputFactory
    implements OutputFactory {
        protected final Consumer<Record> emit;
        protected final RecordBuilderFactory factory;
        protected final Jsonb jsonb;
        private final Map<String, Collection<Record>> outputs = new HashMap<String, Collection<Record>>();

        public OutputEmitter create(String name) {
            return new BeamOutputEmitter(this.outputs.computeIfAbsent(Schema.sanitizeConnectionName((String)name), k -> new ArrayList()), this.factory, this.jsonb);
        }

        public abstract void postProcessing();

        public BeamOutputFactory(Consumer<Record> emit, RecordBuilderFactory factory, Jsonb jsonb) {
            this.emit = emit;
            this.factory = factory;
            this.jsonb = jsonb;
        }
    }

    protected static final class BeamInputFactory
    implements InputFactory {
        private final Map<String, Iterator<Record>> objects;

        BeamInputFactory(DoFn.ProcessContext context) {
            Record element = (Record)context.element();
            this.objects = element.getSchema().getAllEntries().filter(e -> !e.getName().startsWith("__talend_internal")).collect(Collectors.toMap(Schema.Entry::getName, e -> element.getArray(Record.class, e.getName()).iterator()));
        }

        public Object read(String name) {
            Iterator values = this.objects.getOrDefault(Schema.sanitizeConnectionName((String)name), Collections.emptyIterator());
            return values.hasNext() ? values.next() : null;
        }
    }
}

