/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.planner.factories;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.table.FileSystemConnectorOptions;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.ProviderContext;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.SinkV2Provider;
import org.apache.flink.table.connector.source.DataStreamScanProvider;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.factories.DynamicTableSinkFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;

public class TestFileFactory
implements DynamicTableSourceFactory,
DynamicTableSinkFactory {
    private static final String IDENTIFIER = "test-file";
    private static final ConfigOption<String> RUNTIME_SOURCE = ConfigOptions.key((String)"runtime-source").stringType().defaultValue((Object)"Source");

    public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context context) {
        Configuration conf = Configuration.fromMap((Map)context.getCatalogTable().getOptions());
        return new TestFileTableSource(new Path(conf.getString(FileSystemConnectorOptions.PATH)), conf.getString(RUNTIME_SOURCE));
    }

    public DynamicTableSink createDynamicTableSink(DynamicTableFactory.Context context) {
        Configuration conf = Configuration.fromMap((Map)context.getCatalogTable().getOptions());
        return new TestFileTableSink(new Path(conf.getString(FileSystemConnectorOptions.PATH)));
    }

    public String factoryIdentifier() {
        return IDENTIFIER;
    }

    public Set<ConfigOption<?>> requiredOptions() {
        return new HashSet();
    }

    public Set<ConfigOption<?>> optionalOptions() {
        return new HashSet(Collections.singletonList(RUNTIME_SOURCE));
    }

    private static class RowDataEncoder
    implements Encoder<RowData> {
        private static final long serialVersionUID = 1L;
        private static final byte FIELD_DELIMITER = ",".getBytes(StandardCharsets.UTF_8)[0];
        private static final byte LINE_DELIMITER = "\n".getBytes(StandardCharsets.UTF_8)[0];

        public void encode(RowData rowData, OutputStream stream) throws IOException {
            for (int index = 0; index < rowData.getArity(); ++index) {
                stream.write(rowData.getString(index).toBytes());
                if (index == rowData.getArity() - 1) continue;
                stream.write(FIELD_DELIMITER);
            }
            stream.write(LINE_DELIMITER);
        }
    }

    private static class TestFileSourceDataStreamScanProvider
    implements DataStreamScanProvider {
        private final FileSource<RowData> fileSource;
        private final String name;

        private TestFileSourceDataStreamScanProvider(FileSource<RowData> fileSource, String name) {
            this.fileSource = fileSource;
            this.name = name;
        }

        public DataStream<RowData> produceDataStream(ProviderContext providerContext, StreamExecutionEnvironment execEnv) {
            DataStreamSource sourceStream = execEnv.fromSource(this.fileSource, WatermarkStrategy.noWatermarks(), this.name);
            providerContext.generateUid("file").ifPresent(arg_0 -> ((DataStreamSource)sourceStream).uid(arg_0));
            return sourceStream;
        }

        public boolean isBounded() {
            return true;
        }
    }

    private static class FileFormat
    extends SimpleStreamFormat<RowData> {
        private FileFormat() {
        }

        public StreamFormat.Reader<RowData> createReader(Configuration config, FSDataInputStream stream) {
            final BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)stream, StandardCharsets.UTF_8));
            return new StreamFormat.Reader<RowData>(){

                public RowData read() throws IOException {
                    String line = reader.readLine();
                    if (line == null) {
                        return null;
                    }
                    return GenericRowData.of((Object[])new Object[]{StringData.fromString((String)line)});
                }

                public void close() throws IOException {
                    reader.close();
                }
            };
        }

        public TypeInformation<RowData> getProducedType() {
            return null;
        }
    }

    private static class TestFileTableSink
    implements DynamicTableSink {
        private final Path path;

        private TestFileTableSink(Path path) {
            this.path = path;
        }

        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
            return requestedMode;
        }

        public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
            FileSink fileSink = FileSink.forRowFormat((Path)this.path, (Encoder)new RowDataEncoder()).build();
            return SinkV2Provider.of((Sink)fileSink);
        }

        public DynamicTableSink copy() {
            return new TestFileTableSink(this.path);
        }

        public String asSummaryString() {
            return "test-file-sink";
        }
    }

    private static class TestFileTableSource
    implements ScanTableSource {
        private final Path path;
        private final String runtimeSource;

        private TestFileTableSource(Path path, String runtimeSource) {
            this.path = path;
            this.runtimeSource = runtimeSource;
        }

        public ChangelogMode getChangelogMode() {
            return ChangelogMode.insertOnly();
        }

        public ScanTableSource.ScanRuntimeProvider getScanRuntimeProvider(ScanTableSource.ScanContext runtimeProviderContext) {
            FileSource fileSource = FileSource.forRecordStreamFormat((StreamFormat)new FileFormat(), (Path[])new Path[]{this.path}).build();
            switch (this.runtimeSource) {
                case "Source": {
                    return SourceProvider.of((Source)fileSource);
                }
                case "DataStream": {
                    return new TestFileSourceDataStreamScanProvider(fileSource, this.asSummaryString());
                }
            }
            throw new IllegalArgumentException("Unsupported runtime source class: " + this.runtimeSource);
        }

        public DynamicTableSource copy() {
            return new TestFileTableSource(this.path, this.runtimeSource);
        }

        public String asSummaryString() {
            return "test-file-source";
        }
    }
}

