/*
 * Decompiled with CFR 0.152.
 */
package org.talend.sdk.component.runtime.beam;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.json.bind.Jsonb;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedReadFromUnboundedSource;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.talend.sdk.component.api.processor.OutputEmitter;
import org.talend.sdk.component.api.record.Record;
import org.talend.sdk.component.api.service.record.RecordBuilderFactory;
import org.talend.sdk.component.runtime.base.Lifecycle;
import org.talend.sdk.component.runtime.beam.BaseProcessorFn;
import org.talend.sdk.component.runtime.beam.coder.NoCheckpointCoder;
import org.talend.sdk.component.runtime.beam.coder.registry.SchemaRegistryCoder;
import org.talend.sdk.component.runtime.input.Input;
import org.talend.sdk.component.runtime.input.Mapper;
import org.talend.sdk.component.runtime.output.Processor;
import org.talend.sdk.component.runtime.record.RecordConverters;
import org.talend.sdk.component.runtime.serialization.ContainerFinder;
import org.talend.sdk.component.runtime.serialization.LightContainer;

@Experimental(value=Experimental.Kind.SOURCE_SINK)
public final class TalendIO {
    public static Base<PBegin, PCollection<Record>, Mapper> read(Mapper mapper) {
        return TalendIO.read(mapper, Collections.emptyMap());
    }

    public static Base<PBegin, PCollection<Record>, Mapper> read(Mapper mapper, Map<String, String> mapperConfiguration) {
        if (mapper.isStream()) {
            if (mapperConfiguration != null) {
                String maxRecords = mapperConfiguration.get("maxRecords");
                String maxDurationMs = mapperConfiguration.get("maxDurationMs");
                if (mapperConfiguration.keySet().stream().anyMatch(it -> Stream.of("maxRecords", "maxDurationMs").noneMatch(k -> k.equals(it)))) {
                    throw new IllegalArgumentException("Unsupported configuration: " + mapperConfiguration);
                }
                return new InfiniteRead(mapper, maxRecords == null ? -1L : Long.parseLong(maxRecords.trim()), maxDurationMs == null ? -1L : Long.parseLong(maxDurationMs.trim()));
            }
            return new InfiniteRead(mapper, -1L, -1L);
        }
        if (!mapperConfiguration.isEmpty()) {
            throw new IllegalArgumentException("Unsupported configuration: " + mapperConfiguration);
        }
        return new Read(mapper);
    }

    public static Write write(Processor output) {
        return new Write(output);
    }

    private static class UnBoundedReaderImpl<T>
    extends UnboundedSource.UnboundedReader<T> {
        private UnboundedSource<T, ?> source;
        private Input input;
        private Object current;
        private volatile Converter converter;

        UnBoundedReaderImpl(UnboundedSource<T, ?> source, Input input) {
            this.source = source;
            this.input = input;
        }

        public boolean start() {
            this.input.start();
            return this.advance();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean advance() {
            Object next = this.input.next();
            if (next != null && !Record.class.isInstance(next)) {
                if (this.converter == null) {
                    UnBoundedReaderImpl unBoundedReaderImpl = this;
                    synchronized (unBoundedReaderImpl) {
                        if (this.converter == null) {
                            this.converter = new Converter(ContainerFinder.Instance.get().find(this.input.plugin()));
                        }
                    }
                }
                this.current = this.converter.convert(next);
            } else {
                this.current = next;
            }
            return this.current != null;
        }

        public T getCurrent() throws NoSuchElementException {
            return (T)this.current;
        }

        public void close() {
            this.input.stop();
        }

        public Instant getCurrentTimestamp() throws NoSuchElementException {
            return Instant.now();
        }

        public Instant getWatermark() {
            return Instant.now();
        }

        public UnboundedSource.CheckpointMark getCheckpointMark() {
            return UnboundedSource.CheckpointMark.NOOP_CHECKPOINT_MARK;
        }

        public UnboundedSource<T, ?> getCurrentSource() {
            return this.source;
        }
    }

    private static class BoundedReaderImpl<T>
    extends BoundedSource.BoundedReader<T> {
        private BoundedSource<T> source;
        private Input input;
        private Object current;
        private volatile Converter converter;

        BoundedReaderImpl(BoundedSource<T> source, Input input) {
            this.source = source;
            this.input = input;
        }

        public boolean start() throws IOException {
            this.input.start();
            return this.advance();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public boolean advance() {
            Object next = this.input.next();
            if (next != null && !Record.class.isInstance(next)) {
                if (this.converter == null) {
                    BoundedReaderImpl boundedReaderImpl = this;
                    synchronized (boundedReaderImpl) {
                        if (this.converter == null) {
                            this.converter = new Converter(ContainerFinder.Instance.get().find(this.input.plugin()));
                        }
                    }
                }
                this.current = this.converter.convert(next);
            } else {
                this.current = next;
            }
            return this.current != null;
        }

        public T getCurrent() throws NoSuchElementException {
            return (T)this.current;
        }

        public void close() {
            this.input.stop();
        }

        public BoundedSource<T> getCurrentSource() {
            return this.source;
        }
    }

    private static class Converter {
        private final RecordConverters converters;
        private final RecordConverters.MappingMetaRegistry registry = new RecordConverters.MappingMetaRegistry();
        private final RecordBuilderFactory recordBuilder;
        private final Jsonb jsonb;

        private Converter(LightContainer container) {
            this.recordBuilder = (RecordBuilderFactory)container.findService(RecordBuilderFactory.class);
            this.jsonb = (Jsonb)container.findService(Jsonb.class);
            this.converters = new RecordConverters();
        }

        private Object convert(Object next) {
            return this.converters.toRecord(this.registry, next, () -> this.jsonb, () -> this.recordBuilder);
        }
    }

    private static class UnBoundedSourceImpl
    extends UnboundedSource<Record, UnboundedSource.CheckpointMark> {
        private Mapper mapper;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<? extends UnboundedSource<Record, UnboundedSource.CheckpointMark>> split(int desiredNumSplits, PipelineOptions options) {
            this.mapper.start();
            try {
                List list = this.mapper.split((long)desiredNumSplits).stream().map(UnBoundedSourceImpl::new).collect(Collectors.toList());
                return list;
            }
            finally {
                this.mapper.stop();
            }
        }

        public UnboundedSource.UnboundedReader<Record> createReader(PipelineOptions options, UnboundedSource.CheckpointMark checkpointMark) {
            return new UnBoundedReaderImpl<Record>(this, this.mapper.create());
        }

        public Coder<Record> getOutputCoder() {
            return SchemaRegistryCoder.of();
        }

        public Coder<UnboundedSource.CheckpointMark> getCheckpointMarkCoder() {
            return new NoCheckpointCoder();
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || ((Object)((Object)this)).getClass() != o.getClass()) {
                return false;
            }
            return this.mapper.equals(((UnBoundedSourceImpl)((Object)UnBoundedSourceImpl.class.cast((Object)o))).mapper);
        }

        public int hashCode() {
            return this.mapper.hashCode();
        }

        public UnBoundedSourceImpl() {
        }

        public UnBoundedSourceImpl(Mapper mapper) {
            this.mapper = mapper;
        }
    }

    private static class BoundedSourceImpl
    extends BoundedSource<Record> {
        private Mapper mapper;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<? extends BoundedSource<Record>> split(long desiredBundleSizeBytes, PipelineOptions options) {
            this.mapper.start();
            try {
                List list = this.mapper.split(desiredBundleSizeBytes).stream().map(BoundedSourceImpl::new).collect(Collectors.toList());
                return list;
            }
            finally {
                this.mapper.stop();
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public long getEstimatedSizeBytes(PipelineOptions options) {
            this.mapper.start();
            try {
                long l = this.mapper.assess();
                return l;
            }
            finally {
                this.mapper.stop();
            }
        }

        public BoundedSource.BoundedReader<Record> createReader(PipelineOptions options) {
            this.mapper.start();
            try {
                BoundedReaderImpl<Record> boundedReaderImpl = new BoundedReaderImpl<Record>(this, this.mapper.create());
                return boundedReaderImpl;
            }
            finally {
                this.mapper.stop();
            }
        }

        public void validate() {
        }

        public Coder<Record> getOutputCoder() {
            return SchemaRegistryCoder.of();
        }

        public BoundedSourceImpl() {
        }

        public BoundedSourceImpl(Mapper mapper) {
            this.mapper = mapper;
        }
    }

    private static class WriteFn
    extends BaseProcessorFn<Void> {
        private static final Consumer<Record> NOOP_CONSUMER = record -> {};
        private static final OutputEmitter NOOP_OUTPUT_EMITTER = value -> {};
        private static final BaseProcessorFn.BeamOutputFactory NOOP_OUTPUT_FACTORY = new BaseProcessorFn.BeamOutputFactory(null, null, null){

            @Override
            public OutputEmitter create(String name) {
                return NOOP_OUTPUT_EMITTER;
            }

            @Override
            public void postProcessing() {
            }
        };

        WriteFn(Processor processor) {
            super(processor);
        }

        @Override
        protected Consumer<Record> toEmitter(DoFn.ProcessContext context) {
            return NOOP_CONSUMER;
        }

        @Override
        protected BaseProcessorFn.BeamOutputFactory getFinishBundleOutputFactory(DoFn.FinishBundleContext context) {
            return NOOP_OUTPUT_FACTORY;
        }

        public WriteFn() {
        }
    }

    public static class Write
    extends Base<PCollection<Record>, PDone, Processor> {
        private Write(Processor delegate) {
            super(delegate);
        }

        public PDone expand(PCollection<Record> incoming) {
            WriteFn fn = new WriteFn((Processor)this.delegate);
            incoming.apply((PTransform)ParDo.of((DoFn)fn));
            return PDone.in((Pipeline)incoming.getPipeline());
        }
    }

    private static class InfiniteRead
    extends Base<PBegin, PCollection<Record>, Mapper> {
        private final long maxRecords;
        private final long maxDurationMs;

        private InfiniteRead(Mapper delegate, long maxRecords, long maxDurationMs) {
            super(delegate);
            this.maxRecords = maxRecords;
            this.maxDurationMs = maxDurationMs;
        }

        public PCollection<Record> expand(PBegin incoming) {
            Read.Unbounded unbounded = org.apache.beam.sdk.io.Read.from((UnboundedSource)new UnBoundedSourceImpl((Mapper)this.delegate));
            if (this.maxRecords > 0L) {
                unbounded = unbounded.withMaxNumRecords(this.maxRecords);
            }
            if (this.maxDurationMs > 0L) {
                unbounded = UnboundedSource.class.isInstance(unbounded) ? unbounded.withMaxReadTime(Duration.millis((long)this.maxDurationMs)) : ((BoundedReadFromUnboundedSource)unbounded).withMaxReadTime(Duration.millis((long)this.maxDurationMs));
            }
            return (PCollection)incoming.apply((PTransform)unbounded);
        }
    }

    private static class Read
    extends Base<PBegin, PCollection<Record>, Mapper> {
        private Read(Mapper delegate) {
            super(delegate);
        }

        public PCollection<Record> expand(PBegin incoming) {
            return (PCollection)incoming.apply((PTransform)org.apache.beam.sdk.io.Read.from((BoundedSource)new BoundedSourceImpl((Mapper)this.delegate)));
        }
    }

    public static abstract class Base<A extends PInput, B extends POutput, D extends Lifecycle>
    extends PTransform<A, B> {
        protected D delegate;

        protected Base(D delegate) {
            this.delegate = delegate;
        }

        protected Base() {
        }

        public void validate(PipelineOptions options) {
        }

        protected String getKindString() {
            return "Talend[" + this.getName() + "]";
        }

        public String getName() {
            return this.delegate.rootName() + "/" + this.delegate.name();
        }

        protected Coder<?> getDefaultOutputCoder() {
            return SchemaRegistryCoder.of();
        }
    }
}

