/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.pulsar.source.reader.split;

import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.pulsar.common.utils.PulsarExceptionUtils;
import org.apache.flink.connector.pulsar.source.config.CursorVerification;
import org.apache.flink.connector.pulsar.source.config.SourceConfiguration;
import org.apache.flink.connector.pulsar.source.enumerator.cursor.MessageIdUtils;
import org.apache.flink.connector.pulsar.source.enumerator.topic.TopicPartition;
import org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema;
import org.apache.flink.connector.pulsar.source.reader.split.PulsarPartitionSplitReaderBase;
import org.apache.flink.connector.pulsar.source.split.PulsarPartitionSplit;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class PulsarOrderedPartitionSplitReader<OUT>
extends PulsarPartitionSplitReaderBase<OUT> {
    private static final Logger LOG = LoggerFactory.getLogger(PulsarOrderedPartitionSplitReader.class);

    public PulsarOrderedPartitionSplitReader(PulsarClient pulsarClient, PulsarAdmin pulsarAdmin, SourceConfiguration sourceConfiguration, PulsarDeserializationSchema<OUT> deserializationSchema) {
        super(pulsarClient, pulsarAdmin, sourceConfiguration, deserializationSchema);
    }

    @Override
    protected Message<byte[]> pollMessage(Duration timeout) throws PulsarClientException {
        return this.pulsarConsumer.receive(Math.toIntExact(timeout.toMillis()), TimeUnit.MILLISECONDS);
    }

    @Override
    protected void finishedPollMessage(Message<byte[]> message) {
        LOG.debug("Finished polling message {}", message);
        message.release();
    }

    @Override
    protected void beforeCreatingConsumer(PulsarPartitionSplit split) {
        MessageId latestConsumedId = split.getLatestConsumedId();
        if (latestConsumedId != null) {
            LOG.info("Reset subscription position by the checkpoint {}", (Object)latestConsumedId);
            try {
                MessageId initialPosition = latestConsumedId == MessageId.latest || latestConsumedId == MessageId.earliest ? latestConsumedId : MessageIdUtils.nextMessageId(latestConsumedId);
                String topicName = split.getPartition().getFullTopicName();
                List<String> subscriptions = this.pulsarAdmin.topics().getSubscriptions(topicName);
                String subscriptionName = this.sourceConfiguration.getSubscriptionName();
                if (!subscriptions.contains(subscriptionName)) {
                    this.pulsarAdmin.topics().createSubscription(topicName, subscriptionName, initialPosition);
                } else {
                    this.pulsarAdmin.topics().resetCursor(topicName, subscriptionName, initialPosition);
                }
            }
            catch (PulsarAdminException e) {
                if (this.sourceConfiguration.getVerifyInitialOffsets() == CursorVerification.FAIL_ON_MISMATCH) {
                    throw new IllegalArgumentException(e);
                }
                LOG.warn("Failed to reset cursor to {} on partition {}", new Object[]{latestConsumedId, split.getPartition(), e});
            }
        }
    }

    public void notifyCheckpointComplete(TopicPartition partition, MessageId offsetsToCommit) {
        if (this.pulsarConsumer == null) {
            this.pulsarConsumer = this.createPulsarConsumer(partition);
        }
        PulsarExceptionUtils.sneakyClient(() -> this.pulsarConsumer.acknowledgeCumulative(offsetsToCommit));
    }
}

