/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.controller;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.controller.ClusterControlManager;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.ProducerIdsBlock;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineLong;

public class ProducerIdControlManager {
    private final ClusterControlManager clusterControlManager;
    private final TimelineLong lastProducerId;

    ProducerIdControlManager(ClusterControlManager clusterControlManager, SnapshotRegistry snapshotRegistry) {
        this.clusterControlManager = clusterControlManager;
        this.lastProducerId = new TimelineLong(snapshotRegistry);
    }

    ControllerResult<ProducerIdsBlock> generateNextProducerId(int brokerId, long brokerEpoch) {
        this.clusterControlManager.checkBrokerEpoch(brokerId, brokerEpoch);
        long producerId = this.lastProducerId.get();
        if (producerId > 9223372036854774807L) {
            throw new UnknownServerException("Exhausted all producerIds as the next block's end producerId is will has exceeded long type limit");
        }
        long nextProducerId = producerId + 1000L;
        ProducerIdsRecord record = new ProducerIdsRecord().setProducerIdsEnd(nextProducerId).setBrokerId(brokerId).setBrokerEpoch(brokerEpoch);
        ProducerIdsBlock block = new ProducerIdsBlock(brokerId, producerId, 1000);
        return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion((ApiMessage)record, 0)), block);
    }

    void replay(ProducerIdsRecord record) {
        long currentProducerId = this.lastProducerId.get();
        if (record.producerIdsEnd() <= currentProducerId) {
            throw new RuntimeException("Producer ID from record is not monotonically increasing");
        }
        this.lastProducerId.set(record.producerIdsEnd());
    }

    Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
        ArrayList<ApiMessageAndVersion> records = new ArrayList<ApiMessageAndVersion>(1);
        long producerId = this.lastProducerId.get(epoch);
        if (producerId > 0L) {
            records.add(new ApiMessageAndVersion((ApiMessage)new ProducerIdsRecord().setProducerIdsEnd(producerId).setBrokerId(0).setBrokerEpoch(0L), 0));
        }
        return Collections.singleton(records).iterator();
    }
}

