/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.lineage;

import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageDataset;
import org.apache.flink.streaming.api.lineage.LineageDatasetFacet;
import org.apache.flink.streaming.api.lineage.LineageVertex;
import org.apache.flink.streaming.api.lineage.SourceLineageVertex;

public class LineageUtil {
    private static final String KAFKA_DATASET_PREFIX = "kafka://";
    private static final String COMMA = ",";
    private static final String SEMICOLON = ";";

    public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet) {
        return LineageUtil.datasetOf(namespace, kafkaDatasetFacet, Collections.emptyList());
    }

    public static LineageDataset datasetOf(String namespace, KafkaDatasetFacet kafkaDatasetFacet, TypeDatasetFacet typeFacet) {
        return LineageUtil.datasetOf(namespace, kafkaDatasetFacet, Collections.singletonList(typeFacet));
    }

    private static LineageDataset datasetOf(final String namespace, final KafkaDatasetFacet kafkaDatasetFacet, final List<LineageDatasetFacet> facets) {
        return new LineageDataset(){

            public String name() {
                return kafkaDatasetFacet.getTopicIdentifier().toLineageName();
            }

            public String namespace() {
                return namespace;
            }

            public Map<String, LineageDatasetFacet> facets() {
                HashMap<String, LineageDatasetFacet> facetMap = new HashMap<String, LineageDatasetFacet>();
                facetMap.put("kafka", kafkaDatasetFacet);
                facetMap.putAll(facets.stream().collect(Collectors.toMap(LineageDatasetFacet::name, item -> item)));
                return facetMap;
            }
        };
    }

    public static String namespaceOf(Properties properties) {
        String bootstrapServers = properties.getProperty("bootstrap.servers");
        if (bootstrapServers == null) {
            return KAFKA_DATASET_PREFIX;
        }
        if (bootstrapServers.contains(COMMA)) {
            bootstrapServers = bootstrapServers.split(COMMA)[0];
        } else if (bootstrapServers.contains(SEMICOLON)) {
            bootstrapServers = bootstrapServers.split(SEMICOLON)[0];
        }
        return String.format(KAFKA_DATASET_PREFIX + bootstrapServers, new Object[0]);
    }

    public static SourceLineageVertex sourceLineageVertexOf(final Collection<LineageDataset> datasets) {
        return new SourceLineageVertex(){

            public Boundedness boundedness() {
                return Boundedness.CONTINUOUS_UNBOUNDED;
            }

            public List<LineageDataset> datasets() {
                return datasets.stream().collect(Collectors.toList());
            }
        };
    }

    public static LineageVertex lineageVertexOf(final Collection<LineageDataset> datasets) {
        return new LineageVertex(){

            public List<LineageDataset> datasets() {
                return datasets.stream().collect(Collectors.toList());
            }
        };
    }
}

