/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.action.cdc;

import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.paimon.flink.action.cdc.CdcSourceRecord;
import org.apache.paimon.flink.action.cdc.SyncTableActionBase;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.flink.action.cdc.format.DataFormat;
import org.apache.paimon.flink.action.cdc.format.RecordParser;
import org.apache.paimon.schema.Schema;

public class MessageQueueSchemaUtils {
    private static final int MAX_RETRY = 5;
    private static final int POLL_TIMEOUT_MILLIS = 1000;

    public static Schema getSchema(ConsumerWrapper consumer, DataFormat dataFormat, TypeMapping typeMapping) throws SyncTableActionBase.SchemaRetrievalException {
        int retry = 0;
        int retryInterval = 1000;
        RecordParser recordParser = dataFormat.createParser(true, typeMapping, Collections.emptyList());
        Optional<Schema> schema;
        while (!(schema = consumer.getRecords(1000).stream().map(recordParser::buildSchema).filter(Objects::nonNull).findFirst()).isPresent()) {
            if (retry >= 5) {
                throw new SyncTableActionBase.SchemaRetrievalException(String.format("Could not get metadata from server, topic: %s. If this topic is not empty, please check the configuration of synchronization job. Otherwise, you should create the Paimon table first.", consumer.topic()));
            }
            MessageQueueSchemaUtils.sleepSafely(retryInterval);
            retryInterval *= 2;
            ++retry;
        }
        return schema.get();
    }

    private static void sleepSafely(int duration) {
        try {
            Thread.sleep(duration);
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    public static interface ConsumerWrapper
    extends AutoCloseable {
        public List<CdcSourceRecord> getRecords(int var1);

        public String topic();
    }
}

