/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
import org.apache.flink.connector.kafka.lineage.KafkaDatasetIdentifier;
import org.apache.flink.connector.kafka.sink.KafkaCommittable;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaWriter;
import org.apache.flink.connector.kafka.sink.KafkaWriterState;
import org.apache.flink.connector.kafka.sink.internal.BackchannelFactory;
import org.apache.flink.connector.kafka.sink.internal.CheckpointTransaction;
import org.apache.flink.connector.kafka.sink.internal.FlinkKafkaInternalProducer;
import org.apache.flink.connector.kafka.sink.internal.ProducerPool;
import org.apache.flink.connector.kafka.sink.internal.ProducerPoolImpl;
import org.apache.flink.connector.kafka.sink.internal.ReadableBackchannel;
import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyContextImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionAbortStrategyImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionFinished;
import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyContextImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionNamingStrategyImpl;
import org.apache.flink.connector.kafka.sink.internal.TransactionOwnership;
import org.apache.flink.connector.kafka.util.AdminUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class ExactlyOnceKafkaWriter<IN>
extends KafkaWriter<IN> {
    private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceKafkaWriter.class);
    private final String transactionalIdPrefix;
    private final TransactionAbortStrategyImpl transactionAbortStrategy;
    private final TransactionNamingStrategyImpl transactionNamingStrategy;
    private final Collection<KafkaWriterState> recoveredStates;
    private final long restoredCheckpointId;
    private final ProducerPool producerPool;
    private final ReadableBackchannel<TransactionFinished> backchannel;
    private final TransactionNamingStrategyContextImpl namingContext;
    private final int totalNumberOfOwnedSubtasks;
    private final int[] ownedSubtaskIds;
    private AdminClient adminClient;

    ExactlyOnceKafkaWriter(DeliveryGuarantee deliveryGuarantee, Properties kafkaProducerConfig, String transactionalIdPrefix, WriterInitContext sinkInitContext, KafkaRecordSerializationSchema<IN> recordSerializer, SerializationSchema.InitializationContext schemaContext, TransactionAbortStrategyImpl transactionAbortStrategy, TransactionNamingStrategyImpl transactionNamingStrategy, Collection<KafkaWriterState> recoveredStates) {
        super(deliveryGuarantee, kafkaProducerConfig, sinkInitContext, recordSerializer, schemaContext);
        this.transactionalIdPrefix = (String)Preconditions.checkNotNull((Object)transactionalIdPrefix, (String)"transactionalIdPrefix must not be null");
        this.transactionAbortStrategy = (TransactionAbortStrategyImpl)((Object)Preconditions.checkNotNull((Object)((Object)transactionAbortStrategy), (String)"transactionAbortStrategy must not be null"));
        this.transactionNamingStrategy = (TransactionNamingStrategyImpl)((Object)Preconditions.checkNotNull((Object)((Object)transactionNamingStrategy), (String)"transactionNamingStrategy must not be null"));
        try {
            recordSerializer.open(schemaContext, this.kafkaSinkContext);
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Cannot initialize schema.", (Throwable)e);
        }
        this.recoveredStates = (Collection)Preconditions.checkNotNull(recoveredStates, (String)"recoveredStates");
        TaskInfo taskInfo = sinkInitContext.getTaskInfo();
        TransactionOwnership ownership = transactionNamingStrategy.getOwnership();
        int subtaskId = taskInfo.getIndexOfThisSubtask();
        int parallelism = taskInfo.getNumberOfParallelSubtasks();
        this.ownedSubtaskIds = ownership.getOwnedSubtaskIds(subtaskId, parallelism, recoveredStates);
        this.totalNumberOfOwnedSubtasks = ownership.getTotalNumberOfOwnedSubtasks(subtaskId, parallelism, recoveredStates);
        this.initFlinkMetrics();
        this.restoredCheckpointId = sinkInitContext.getRestoredCheckpointId().orElse(0L);
        this.producerPool = new ProducerPoolImpl(kafkaProducerConfig, this::initKafkaMetrics, recoveredStates.stream().flatMap(r -> r.getPrecommittedTransactionalIds().stream()).collect(Collectors.toList()));
        this.backchannel = BackchannelFactory.getInstance().getReadableBackchannel(subtaskId, taskInfo.getAttemptNumber(), transactionalIdPrefix);
        this.namingContext = new TransactionNamingStrategyContextImpl(transactionalIdPrefix, this.ownedSubtaskIds[0], this.restoredCheckpointId, this.producerPool);
    }

    @Override
    public void initialize() {
        try {
            this.abortLingeringTransactions((Collection)Preconditions.checkNotNull(this.recoveredStates, (String)"recoveredStates"), this.restoredCheckpointId + 1L);
            this.currentProducer = this.startTransaction(this.restoredCheckpointId + 1L);
        }
        catch (Throwable t) {
            try {
                this.close();
            }
            catch (Exception e) {
                t.addSuppressed(e);
            }
            throw t;
        }
    }

    private FlinkKafkaInternalProducer<byte[], byte[]> startTransaction(long checkpointId) {
        this.namingContext.setNextCheckpointId(checkpointId);
        this.namingContext.setOngoingTransactions(this.producerPool.getOngoingTransactions().stream().map(CheckpointTransaction::getTransactionalId).collect(Collectors.toSet()));
        FlinkKafkaInternalProducer<byte[], byte[]> producer = this.transactionNamingStrategy.getTransactionalProducer(this.namingContext);
        this.namingContext.setLastCheckpointId(checkpointId);
        producer.beginTransaction();
        return producer;
    }

    @Override
    public Collection<KafkaCommittable> prepareCommit() {
        if (this.currentProducer.hasRecordsInTransaction()) {
            KafkaCommittable committable = KafkaCommittable.of(this.currentProducer);
            LOG.debug("Prepare {}.", (Object)committable);
            this.currentProducer.precommitTransaction();
            return Collections.singletonList(committable);
        }
        this.producerPool.recycle(this.currentProducer);
        return Collections.emptyList();
    }

    @Override
    public List<KafkaWriterState> snapshotState(long checkpointId) throws IOException {
        TransactionFinished finishedTransaction;
        while ((finishedTransaction = this.backchannel.poll()) != null) {
            this.producerPool.recycleByTransactionId(finishedTransaction.getTransactionId(), finishedTransaction.isSuccess());
        }
        Collection<CheckpointTransaction> ongoingTransactions = this.producerPool.getOngoingTransactions();
        this.currentProducer = this.startTransaction(checkpointId + 1L);
        return this.createSnapshots(ongoingTransactions);
    }

    private List<KafkaWriterState> createSnapshots(Collection<CheckpointTransaction> ongoingTransactions) {
        ArrayList<KafkaWriterState> states = new ArrayList<KafkaWriterState>();
        int[] subtaskIds = this.ownedSubtaskIds;
        for (int index = 0; index < subtaskIds.length; ++index) {
            int ownedSubtask = subtaskIds[index];
            states.add(new KafkaWriterState(this.transactionalIdPrefix, ownedSubtask, this.totalNumberOfOwnedSubtasks, this.transactionNamingStrategy.getOwnership(), index == 0 ? ongoingTransactions : List.of()));
        }
        LOG.debug("Snapshotting state {}", states);
        return states;
    }

    @Override
    public void close() throws Exception {
        IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this::abortCurrentProducer, () -> IOUtils.closeAll((AutoCloseable[])new AutoCloseable[]{this.producerPool}), this.backchannel, () -> super.close()});
    }

    private void abortCurrentProducer() {
        if (this.currentProducer.hasRecordsInTransaction()) {
            try {
                this.currentProducer.abortTransaction();
            }
            catch (ProducerFencedException e) {
                LOG.debug("Producer {} fenced while aborting", (Object)this.currentProducer.getTransactionalId());
            }
        }
    }

    @VisibleForTesting
    ProducerPool getProducerPool() {
        return this.producerPool;
    }

    @VisibleForTesting
    public String getTransactionalIdPrefix() {
        return this.transactionalIdPrefix;
    }

    private void abortLingeringTransactions(Collection<KafkaWriterState> recoveredStates, long startCheckpointId) {
        KafkaWriterState lastState;
        ArrayList<String> prefixesToAbort = new ArrayList<String>();
        prefixesToAbort.add(this.transactionalIdPrefix);
        LOG.info("Aborting lingering transactions from previous execution. Recovered states: {}.", recoveredStates);
        Optional<KafkaWriterState> lastStateOpt = recoveredStates.stream().findFirst();
        if (lastStateOpt.isPresent() && !(lastState = lastStateOpt.get()).getTransactionalIdPrefix().equals(this.transactionalIdPrefix)) {
            prefixesToAbort.add(lastState.getTransactionalIdPrefix());
            LOG.warn("Transactional id prefix from previous execution {} has changed to {}.", (Object)lastState.getTransactionalIdPrefix(), (Object)this.transactionalIdPrefix);
        }
        LOG.info("Aborting lingering transactions with prefixes {} using {}", prefixesToAbort, (Object)this.transactionAbortStrategy);
        TransactionAbortStrategyContextImpl context = this.getTransactionAbortStrategyContext(startCheckpointId, prefixesToAbort);
        this.transactionAbortStrategy.abortTransactions(context);
    }

    private TransactionAbortStrategyContextImpl getTransactionAbortStrategyContext(long startCheckpointId, List<String> prefixesToAbort) {
        TransactionAbortStrategyImpl.TransactionAborter aborter = transactionalId -> {
            FlinkKafkaInternalProducer<byte[], byte[]> producer = this.producerPool.getTransactionalProducer(transactionalId, 0L);
            LOG.debug("Aborting transaction {}", (Object)transactionalId);
            producer.flush();
            short epoch = producer.getEpoch();
            this.producerPool.recycle(producer);
            return epoch;
        };
        Set<String> precommittedTransactionalIds = this.recoveredStates.stream().flatMap(s -> s.getPrecommittedTransactionalIds().stream().map(CheckpointTransaction::getTransactionalId)).collect(Collectors.toSet());
        return new TransactionAbortStrategyContextImpl(this::getTopicNames, this.kafkaSinkContext.getParallelInstanceId(), this.kafkaSinkContext.getNumberOfParallelInstances(), this.ownedSubtaskIds, this.totalNumberOfOwnedSubtasks, prefixesToAbort, startCheckpointId, aborter, this::getAdminClient, precommittedTransactionalIds);
    }

    private Collection<String> getTopicNames() {
        KafkaDatasetIdentifier identifier = this.getDatasetIdentifier().orElseThrow(() -> new IllegalStateException("The record serializer does not expose a static list of target topics."));
        if (identifier.getTopics() != null) {
            return identifier.getTopics();
        }
        return AdminUtils.getTopicsByPattern(this.getAdminClient(), identifier.getTopicPattern());
    }

    private Optional<KafkaDatasetIdentifier> getDatasetIdentifier() {
        if (this.recordSerializer instanceof KafkaDatasetFacetProvider) {
            Optional<KafkaDatasetFacet> kafkaDatasetFacet = ((KafkaDatasetFacetProvider)((Object)this.recordSerializer)).getKafkaDatasetFacet();
            return kafkaDatasetFacet.map(KafkaDatasetFacet::getTopicIdentifier);
        }
        return Optional.empty();
    }

    private Admin getAdminClient() {
        if (this.adminClient == null) {
            this.adminClient = AdminClient.create((Properties)this.kafkaProducerConfig);
        }
        return this.adminClient;
    }
}

