/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class CommitRequestManager
implements RequestManager,
MemberStateListener {
    private final SubscriptionState subscriptions;
    private final LogContext logContext;
    private final Logger log;
    private final Optional<AutoCommitState> autoCommitState;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final long retryBackoffMs;
    private final String groupId;
    private final Optional<String> groupInstanceId;
    private final long retryBackoffMaxMs;
    private final OptionalDouble jitter;
    private final boolean throwOnFetchStableOffsetUnsupported;
    final PendingRequests pendingRequests;
    private boolean closing = false;
    private final MemberInfo memberInfo;

    public CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptions, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, String groupId, Optional<String> groupInstanceId) {
        this(time, logContext, subscriptions, config, coordinatorRequestManager, groupId, groupInstanceId, config.getLong("retry.backoff.ms"), config.getLong("retry.backoff.max.ms"), OptionalDouble.empty());
    }

    CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptions, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, String groupId, Optional<String> groupInstanceId, long retryBackoffMs, long retryBackoffMaxMs, OptionalDouble jitter) {
        Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
        this.logContext = logContext;
        this.log = logContext.logger(this.getClass());
        this.pendingRequests = new PendingRequests();
        if (config.getBoolean("enable.auto.commit").booleanValue()) {
            long autoCommitInterval = Integer.toUnsignedLong(config.getInt("auto.commit.interval.ms"));
            this.autoCommitState = Optional.of(new AutoCommitState(time, autoCommitInterval));
        } else {
            this.autoCommitState = Optional.empty();
        }
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.groupId = groupId;
        this.groupInstanceId = groupInstanceId;
        this.subscriptions = subscriptions;
        this.retryBackoffMs = retryBackoffMs;
        this.retryBackoffMaxMs = retryBackoffMaxMs;
        this.jitter = jitter;
        this.throwOnFetchStableOffsetUnsupported = config.getBoolean("internal.throw.on.fetch.stable.offset.unsupported");
        this.memberInfo = new MemberInfo();
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        if (!this.coordinatorRequestManager.coordinator().isPresent()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        if (this.closing) {
            return this.drainPendingOffsetCommitRequests();
        }
        this.maybeAutoCommitAllConsumedAsync();
        if (!this.pendingRequests.hasUnsentRequests()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        List<NetworkClientDelegate.UnsentRequest> requests = this.pendingRequests.drain(currentTimeMs);
        long timeUntilNextPoll = Math.min(CommitRequestManager.findMinTime(this.unsentOffsetCommitRequests(), currentTimeMs), CommitRequestManager.findMinTime(this.unsentOffsetFetchRequests(), currentTimeMs));
        return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests);
    }

    @Override
    public void signalClose() {
        this.closing = true;
    }

    @Override
    public long maximumTimeToWait(long currentTimeMs) {
        return this.autoCommitState.map(ac -> ac.remainingMs(currentTimeMs)).orElse(Long.MAX_VALUE);
    }

    private static long findMinTime(Collection<? extends RequestState> requests, long currentTimeMs) {
        return requests.stream().mapToLong(request -> request.remainingBackoffMs(currentTimeMs)).min().orElse(Long.MAX_VALUE);
    }

    private CompletableFuture<Void> maybeAutoCommit(Map<TopicPartition, OffsetAndMetadata> offsets, Optional<Long> expirationTimeMs, boolean checkInterval, boolean retryOnStaleEpoch) {
        if (!this.autoCommitEnabled()) {
            this.log.debug("Skipping auto-commit because auto-commit config is not enabled.");
            return CompletableFuture.completedFuture(null);
        }
        AutoCommitState autocommit = this.autoCommitState.get();
        if (checkInterval && !autocommit.shouldAutoCommit()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletionStage result = this.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).whenComplete(this.autoCommitCallback(offsets));
        autocommit.resetTimer();
        autocommit.setInflightCommitStatus(true);
        return result;
    }

    public CompletableFuture<Void> maybeAutoCommitAllConsumedAsync() {
        if (!this.autoCommitEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        Map<TopicPartition, OffsetAndMetadata> offsets = this.subscriptions.allConsumed();
        CompletableFuture<Void> result = this.maybeAutoCommit(offsets, Optional.empty(), true, true);
        result.whenComplete((__, error) -> {
            if (error != null) {
                if (error instanceof RetriableCommitFailedException) {
                    this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", (Object)offsets, error);
                    this.resetAutoCommitTimer(this.retryBackoffMs);
                } else {
                    this.log.warn("Asynchronous auto-commit of offsets {} failed: {}", (Object)offsets, (Object)error.getMessage());
                }
            } else {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", (Object)offsets);
            }
        });
        return result;
    }

    public CompletableFuture<Void> maybeAutoCommitAllConsumedNow(Optional<Long> expirationTimeMs, boolean retryOnStaleEpoch) {
        return this.maybeAutoCommit(this.subscriptions.allConsumed(), expirationTimeMs, false, retryOnStaleEpoch);
    }

    private BiConsumer<? super Void, ? super Throwable> autoCommitCallback(Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
        return (response, throwable) -> {
            this.autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false));
            if (throwable == null) {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", (Object)allConsumedOffsets);
            } else if (throwable instanceof RetriableCommitFailedException) {
                this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", (Object)allConsumedOffsets, (Object)throwable.getMessage());
            } else {
                this.log.warn("Asynchronous auto-commit of offsets {} failed", (Object)allConsumedOffsets, throwable);
            }
        };
    }

    public CompletableFuture<Void> addOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets, Optional<Long> expirationTimeMs, boolean retryOnStaleEpoch) {
        if (offsets.isEmpty()) {
            this.log.debug("Skipping commit of empty offsets");
            return CompletableFuture.completedFuture(null);
        }
        return this.pendingRequests.addOffsetCommitRequest(offsets, expirationTimeMs, retryOnStaleEpoch).future;
    }

    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(Set<TopicPartition> partitions, long expirationTimeMs) {
        return this.pendingRequests.addOffsetFetchRequest(partitions, expirationTimeMs);
    }

    public void updateAutoCommitTimer(long currentTimeMs) {
        this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));
    }

    Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
        return this.pendingRequests.unsentOffsetCommits;
    }

    private List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
        return this.pendingRequests.unsentOffsetFetches;
    }

    private void handleCoordinatorDisconnect(Throwable exception, long currentTimeMs) {
        if (exception instanceof DisconnectException) {
            this.coordinatorRequestManager.markCoordinatorUnknown(exception.getMessage(), currentTimeMs);
        }
    }

    @Override
    public void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId) {
        this.memberInfo.memberId = memberId;
        this.memberInfo.memberEpoch = memberEpoch;
    }

    public boolean autoCommitEnabled() {
        return this.autoCommitState.isPresent();
    }

    public void resetAutoCommitTimer() {
        this.autoCommitState.ifPresent(AutoCommitState::resetTimer);
    }

    public void resetAutoCommitTimer(long retryBackoffMs) {
        this.autoCommitState.ifPresent(s -> s.resetTimer(retryBackoffMs));
    }

    public NetworkClientDelegate.PollResult drainPendingOffsetCommitRequests() {
        if (this.pendingRequests.unsentOffsetCommits.isEmpty()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        List requests = this.pendingRequests.drainPendingCommits();
        return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
    }

    static class MemberInfo {
        Optional<String> memberId = Optional.empty();
        Optional<Integer> memberEpoch = Optional.empty();

        MemberInfo() {
        }
    }

    private static class AutoCommitState {
        private final Timer timer;
        private final long autoCommitInterval;
        private boolean hasInflightCommit;

        public AutoCommitState(Time time, long autoCommitInterval) {
            this.autoCommitInterval = autoCommitInterval;
            this.timer = time.timer(autoCommitInterval);
            this.hasInflightCommit = false;
        }

        public boolean shouldAutoCommit() {
            return !this.hasInflightCommit && this.timer.isExpired();
        }

        public void resetTimer() {
            this.timer.reset(this.autoCommitInterval);
        }

        public void resetTimer(long retryBackoffMs) {
            this.timer.reset(retryBackoffMs);
        }

        public long remainingMs(long currentTimeMs) {
            this.timer.update(currentTimeMs);
            return this.timer.remainingMs();
        }

        public void updateTimer(long currentTimeMs) {
            this.timer.update(currentTimeMs);
        }

        public void setInflightCommitStatus(boolean inflightCommitStatus) {
            this.hasInflightCommit = inflightCommitStatus;
        }
    }

    class PendingRequests {
        Queue<OffsetCommitRequestState> unsentOffsetCommits = new LinkedList<OffsetCommitRequestState>();
        List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<OffsetFetchRequestState>();
        List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<OffsetFetchRequestState>();

        PendingRequests() {
        }

        boolean hasUnsentRequests() {
            return !this.unsentOffsetCommits.isEmpty() || !this.unsentOffsetFetches.isEmpty();
        }

        OffsetCommitRequestState addOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets, Optional<Long> expirationTimeMs, boolean retryOnStaleEpoch) {
            OffsetCommitRequestState requestState = this.createOffsetCommitRequest(offsets, CommitRequestManager.this.jitter, expirationTimeMs, retryOnStaleEpoch);
            return this.addOffsetCommitRequest(requestState);
        }

        OffsetCommitRequestState addOffsetCommitRequest(OffsetCommitRequestState request) {
            CommitRequestManager.this.log.debug("Enqueuing OffsetCommit request for offsets: {}", (Object)request.offsets);
            this.unsentOffsetCommits.add(request);
            return request;
        }

        OffsetCommitRequestState createOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets, OptionalDouble jitter, Optional<Long> expirationTimeMs, boolean retryOnStaleEpoch) {
            return jitter.isPresent() ? new OffsetCommitRequestState(offsets, CommitRequestManager.this.groupId, CommitRequestManager.this.groupInstanceId, expirationTimeMs, CommitRequestManager.this.retryBackoffMs, CommitRequestManager.this.retryBackoffMaxMs, jitter.getAsDouble(), CommitRequestManager.this.memberInfo, retryOnStaleEpoch) : new OffsetCommitRequestState(offsets, CommitRequestManager.this.groupId, CommitRequestManager.this.groupInstanceId, expirationTimeMs, CommitRequestManager.this.retryBackoffMs, CommitRequestManager.this.retryBackoffMaxMs, CommitRequestManager.this.memberInfo, retryOnStaleEpoch);
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(OffsetFetchRequestState request) {
            Optional<OffsetFetchRequestState> dupe = this.unsentOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
            Optional<OffsetFetchRequestState> inflight = this.inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
            if (dupe.isPresent() || inflight.isPresent()) {
                CommitRequestManager.this.log.info("Duplicated OffsetFetchRequest: " + request.requestedPartitions);
                dupe.orElseGet(() -> (OffsetFetchRequestState)inflight.get()).chainFuture(request.future);
            } else {
                request.future.whenComplete((r, t) -> {
                    if (!this.inflightOffsetFetches.remove(request)) {
                        CommitRequestManager.this.log.warn("A duplicated, inflight, request was identified, but unable to find it in the outbound buffer:" + request);
                    }
                });
                this.unsentOffsetFetches.add(request);
            }
            return request.future;
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(Set<TopicPartition> partitions, long expirationTimeMs) {
            OffsetFetchRequestState request = CommitRequestManager.this.jitter.isPresent() ? new OffsetFetchRequestState(partitions, CommitRequestManager.this.retryBackoffMs, CommitRequestManager.this.retryBackoffMaxMs, expirationTimeMs, CommitRequestManager.this.jitter.getAsDouble(), CommitRequestManager.this.memberInfo) : new OffsetFetchRequestState(partitions, CommitRequestManager.this.retryBackoffMs, CommitRequestManager.this.retryBackoffMaxMs, expirationTimeMs, CommitRequestManager.this.memberInfo);
            return this.addOffsetFetchRequest(request);
        }

        List<NetworkClientDelegate.UnsentRequest> drain(long currentTimeMs) {
            ArrayList<NetworkClientDelegate.UnsentRequest> unsentRequests = new ArrayList<NetworkClientDelegate.UnsentRequest>();
            List unreadyCommitRequests = this.unsentOffsetCommits.stream().filter(request -> !request.canSendRequest(currentTimeMs)).collect(Collectors.toList());
            this.failAndRemoveExpiredCommitRequests(currentTimeMs);
            unsentRequests.addAll(this.unsentOffsetCommits.stream().filter(request -> request.canSendRequest(currentTimeMs)).peek(request -> request.onSendAttempt(currentTimeMs)).map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));
            Map<Boolean, List<OffsetFetchRequestState>> partitionedBySendability = this.unsentOffsetFetches.stream().collect(Collectors.partitioningBy(request -> request.canSendRequest(currentTimeMs)));
            this.failAndRemoveExpiredFetchRequests(currentTimeMs);
            for (OffsetFetchRequestState request2 : partitionedBySendability.get(true)) {
                request2.onSendAttempt(currentTimeMs);
                unsentRequests.add(request2.toUnsentRequest());
                this.inflightOffsetFetches.add(request2);
            }
            this.clearAll();
            this.unsentOffsetFetches.addAll((Collection<OffsetFetchRequestState>)partitionedBySendability.get(false));
            this.unsentOffsetCommits.addAll(unreadyCommitRequests);
            return Collections.unmodifiableList(unsentRequests);
        }

        private void failAndRemoveExpiredCommitRequests(long currentTimeMs) {
            this.unsentOffsetCommits.removeIf(req -> ((OffsetCommitRequestState)req).maybeExpire(currentTimeMs));
        }

        private void failAndRemoveExpiredFetchRequests(long currentTimeMs) {
            this.unsentOffsetFetches.removeIf(req -> ((OffsetFetchRequestState)req).maybeExpire(currentTimeMs));
        }

        private void clearAll() {
            this.unsentOffsetCommits.clear();
            this.unsentOffsetFetches.clear();
        }

        private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
            ArrayList<NetworkClientDelegate.UnsentRequest> res = new ArrayList<NetworkClientDelegate.UnsentRequest>();
            res.addAll(this.unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toList()));
            this.clearAll();
            return res;
        }
    }

    class OffsetFetchRequestState
    extends RetriableRequestState {
        public final Set<TopicPartition> requestedPartitions;
        private final CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;
        private final long expirationTimeMs;

        public OffsetFetchRequestState(Set<TopicPartition> partitions, long retryBackoffMs, long retryBackoffMaxMs, long expirationTimeMs, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs, memberInfo, true);
            this.requestedPartitions = partitions;
            this.future = new CompletableFuture();
            this.expirationTimeMs = expirationTimeMs;
        }

        public OffsetFetchRequestState(Set<TopicPartition> partitions, long retryBackoffMs, long retryBackoffMaxMs, long expirationTimeMs, double jitter, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter, memberInfo, true);
            this.requestedPartitions = partitions;
            this.future = new CompletableFuture();
            this.expirationTimeMs = expirationTimeMs;
        }

        public boolean sameRequest(OffsetFetchRequestState request) {
            return this.requestedPartitions.equals(request.requestedPartitions);
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            OffsetFetchRequest.Builder builder = this.memberInfo.memberId.isPresent() && this.memberInfo.memberEpoch.isPresent() ? new OffsetFetchRequest.Builder(CommitRequestManager.this.groupId, this.memberInfo.memberId.get(), this.memberInfo.memberEpoch.get(), true, new ArrayList<TopicPartition>(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported) : new OffsetFetchRequest.Builder(CommitRequestManager.this.groupId, true, new ArrayList<TopicPartition>(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported);
            return new NetworkClientDelegate.UnsentRequest(builder, CommitRequestManager.this.coordinatorRequestManager.coordinator()).whenComplete((r, t) -> this.onResponse(r.receivedTimeMs(), (OffsetFetchResponse)r.responseBody()));
        }

        public void onResponse(long currentTimeMs, OffsetFetchResponse response) {
            Errors responseError = response.groupLevelError(CommitRequestManager.this.groupId);
            if (responseError != Errors.NONE) {
                this.onFailure(currentTimeMs, responseError);
                return;
            }
            this.onSuccess(currentTimeMs, response);
        }

        private void onFailure(long currentTimeMs, Errors responseError) {
            CommitRequestManager.this.handleCoordinatorDisconnect(responseError.exception(), currentTimeMs);
            CommitRequestManager.this.log.debug("Offset fetch failed: {}", (Object)responseError.message());
            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                this.maybeRetry(currentTimeMs, responseError.exception());
            } else if (responseError == Errors.UNKNOWN_MEMBER_ID) {
                CommitRequestManager.this.log.error("OffsetFetch failed with {} because the member is not part of the group anymore.", (Object)responseError);
                this.future.completeExceptionally(responseError.exception());
            } else if (responseError == Errors.STALE_MEMBER_EPOCH) {
                if (this.maybeRetryWithNewMemberEpoch(currentTimeMs, responseError)) {
                    CommitRequestManager.this.log.debug("OffsetFetch failed with {} but the consumer is still part of the group, so the request will be retried with the latest member ID and epoch.", (Object)responseError);
                    return;
                }
                CommitRequestManager.this.log.error("OffsetFetch failed with {} and the consumer is not part of the group anymore (it probably left the group, got fenced or failed). The request cannot be retried and will fail.", (Object)responseError);
                this.future.completeExceptionally(responseError.exception());
            } else if (responseError == Errors.NOT_COORDINATOR) {
                CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown("error response " + responseError.name(), currentTimeMs);
                this.maybeRetry(currentTimeMs, responseError.exception());
            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
                this.future.completeExceptionally(GroupAuthorizationException.forGroupId(CommitRequestManager.this.groupId));
            } else {
                this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
            }
        }

        @Override
        void maybeRetry(long currentTimeMs, Throwable throwable) {
            if (this.isExpired(currentTimeMs)) {
                this.future.completeExceptionally(throwable);
                return;
            }
            this.onFailedAttempt(currentTimeMs);
            CommitRequestManager.this.pendingRequests.inflightOffsetFetches.remove(this);
            CommitRequestManager.this.pendingRequests.addOffsetFetchRequest(this);
        }

        private boolean isExpired(long currentTimeMs) {
            return this.expirationTimeMs <= currentTimeMs;
        }

        private boolean maybeExpire(long currentTimeMs) {
            if (this.isExpired(currentTimeMs)) {
                this.future.completeExceptionally(new TimeoutException("OffsetFetch request could not complete before timeout expired."));
                return true;
            }
            return false;
        }

        private void onSuccess(long currentTimeMs, OffsetFetchResponse response) {
            HashSet<String> unauthorizedTopics = null;
            Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(CommitRequestManager.this.groupId);
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(responseData.size());
            HashSet<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<TopicPartition>();
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData partitionData = entry.getValue();
                if (partitionData.hasError()) {
                    Errors error = partitionData.error;
                    CommitRequestManager.this.log.debug("Failed to fetch offset for partition {}: {}", (Object)tp, (Object)error.message());
                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        this.future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
                        return;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        if (unauthorizedTopics == null) {
                            unauthorizedTopics = new HashSet<String>();
                        }
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
                        unstableTxnOffsetTopicPartitions.add(tp);
                        continue;
                    }
                    this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response for partition " + tp + ": " + error.message()));
                    return;
                }
                if (partitionData.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
                    continue;
                }
                CommitRequestManager.this.log.info("Found no committed offset for partition {}", (Object)tp);
                offsets.put(tp, null);
            }
            if (unauthorizedTopics != null) {
                this.future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics));
            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
                CommitRequestManager.this.log.info("The following partitions still have unstable offsets which are not cleared on the broker side: {}, this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions);
                this.maybeRetry(currentTimeMs, Errors.UNSTABLE_OFFSET_COMMIT.exception());
            } else {
                this.future.complete(offsets);
            }
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> chainFuture(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> otherFuture) {
            return this.future.whenComplete((r, t) -> {
                if (t != null) {
                    otherFuture.completeExceptionally((Throwable)t);
                } else {
                    otherFuture.complete((Map<TopicPartition, OffsetAndMetadata>)r);
                }
            });
        }

        @Override
        public String toString() {
            return "OffsetFetchRequestState{requestedPartitions=" + this.requestedPartitions + ", memberId=" + this.memberInfo.memberId.orElse("undefined") + ", memberEpoch=" + (this.memberInfo.memberEpoch.isPresent() ? (Serializable)this.memberInfo.memberEpoch.get() : "undefined") + ", future=" + this.future + ", " + this.toStringBase() + '}';
        }
    }

    static abstract class RetriableRequestState
    extends RequestState {
        final MemberInfo memberInfo;
        boolean retryOnStaleEpoch;

        RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, long retryBackoffMaxMs, MemberInfo memberInfo, boolean retryOnStaleEpoch) {
            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
            this.memberInfo = memberInfo;
            this.retryOnStaleEpoch = retryOnStaleEpoch;
        }

        RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, int retryBackoffExpBase, long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, boolean retryOnStaleEpoch) {
            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter);
            this.memberInfo = memberInfo;
            this.retryOnStaleEpoch = retryOnStaleEpoch;
        }

        boolean maybeRetryWithNewMemberEpoch(long currentTimeMs, Errors responseError) {
            if (this.retryOnStaleEpoch && this.memberInfo.memberEpoch.isPresent()) {
                this.maybeRetry(currentTimeMs, responseError.exception());
                return true;
            }
            return false;
        }

        abstract void maybeRetry(long var1, Throwable var3);
    }

    private class OffsetCommitRequestState
    extends RetriableRequestState {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;
        private final String groupId;
        private final Optional<String> groupInstanceId;
        private final CompletableFuture<Void> future;
        private final Optional<Long> expirationTimeMs;

        OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> offsets, String groupId, Optional<String> groupInstanceId, Optional<Long> expirationTimeMs, long retryBackoffMs, long retryBackoffMaxMs, MemberInfo memberInfo, boolean retryOnStaleEpoch) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs, memberInfo, retryOnStaleEpoch);
            this.offsets = offsets;
            this.groupId = groupId;
            this.groupInstanceId = groupInstanceId;
            this.future = new CompletableFuture();
            this.expirationTimeMs = expirationTimeMs;
        }

        OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> offsets, String groupId, Optional<String> groupInstanceId, Optional<Long> expirationTimeMs, long retryBackoffMs, long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, boolean retryOnStaleEpoch) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter, memberInfo, retryOnStaleEpoch);
            this.offsets = offsets;
            this.groupId = groupId;
            this.groupInstanceId = groupInstanceId;
            this.future = new CompletableFuture();
            this.expirationTimeMs = expirationTimeMs;
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic>();
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = entry.getValue();
                OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap.getOrDefault(topicPartition.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topicPartition.topic()));
                topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(topicPartition.partition()).setCommittedOffset(offsetAndMetadata.offset()).setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(-1)).setCommittedMetadata(offsetAndMetadata.metadata()));
                requestTopicDataMap.put(topicPartition.topic(), topic);
            }
            OffsetCommitRequestData data = new OffsetCommitRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId.orElse(null)).setTopics(new ArrayList<OffsetCommitRequestData.OffsetCommitRequestTopic>(requestTopicDataMap.values()));
            if (this.memberInfo.memberId.isPresent()) {
                data = data.setMemberId(this.memberInfo.memberId.get());
            }
            if (this.memberInfo.memberEpoch.isPresent()) {
                data = data.setGenerationIdOrMemberEpoch(this.memberInfo.memberEpoch.get());
            }
            OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
            NetworkClientDelegate.UnsentRequest resp = new NetworkClientDelegate.UnsentRequest(builder, CommitRequestManager.this.coordinatorRequestManager.coordinator());
            resp.whenComplete((response, throwable) -> {
                try {
                    if (throwable == null) {
                        CommitRequestManager.this.log.debug("OffsetCommit response received for offsets {} ", this.offsets);
                        this.onResponse((ClientResponse)response);
                    } else {
                        CommitRequestManager.this.log.debug("OffsetCommit completed with error for offsets {}", this.offsets, throwable);
                        long currentTimeMs = resp.handler().completionTimeMs();
                        CommitRequestManager.this.handleCoordinatorDisconnect(throwable, currentTimeMs);
                        if (throwable instanceof RetriableException) {
                            this.maybeRetry(currentTimeMs, (Throwable)throwable);
                        } else {
                            this.future.completeExceptionally((Throwable)throwable);
                        }
                    }
                }
                catch (Throwable t) {
                    CommitRequestManager.this.log.error("Unexpected error when completing offset commit: {}", (Object)this, (Object)t);
                    this.future.completeExceptionally(t);
                }
            });
            return resp;
        }

        public void onResponse(ClientResponse response) {
            long currentTimeMs = response.receivedTimeMs();
            OffsetCommitResponse commitResponse = (OffsetCommitResponse)response.responseBody();
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
                for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
                    TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
                    OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                    long offset = offsetAndMetadata.offset();
                    Errors error = Errors.forCode(partition.errorCode());
                    if (error == Errors.NONE) {
                        CommitRequestManager.this.log.debug("OffsetCommit completed successfully for offset {} partition {}", (Object)offset, (Object)tp);
                        continue;
                    }
                    if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) {
                        CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs);
                        this.maybeRetry(currentTimeMs, error.exception());
                        return;
                    }
                    if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        this.maybeRetry(currentTimeMs, error.exception());
                        return;
                    }
                    if (error == Errors.UNKNOWN_MEMBER_ID) {
                        CommitRequestManager.this.log.error("OffsetCommit failed with {} on partition {} for offset {}", new Object[]{error, tp, offset});
                        this.future.completeExceptionally(new CommitFailedException("OffsetCommit failed with unknown member ID. " + error.message()));
                        return;
                    }
                    if (error == Errors.STALE_MEMBER_EPOCH) {
                        if (this.maybeRetryWithNewMemberEpoch(currentTimeMs, error)) {
                            CommitRequestManager.this.log.debug("OffsetCommit failed with {} and will be retried with the latest member ID and epoch.", (Object)error);
                            return;
                        }
                        this.future.completeExceptionally(this.commitExceptionForStaleMemberEpoch());
                        continue;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    CommitRequestManager.this.log.error("OffsetCommit failed on partition {} for offset {}: {}", new Object[]{tp, offset, error.message()});
                    if (error.exception() instanceof RetriableException) {
                        this.maybeRetry(currentTimeMs, error.exception());
                        return;
                    }
                    this.future.completeExceptionally(error.exception());
                    return;
                }
            }
            if (!unauthorizedTopics.isEmpty()) {
                CommitRequestManager.this.log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics);
                this.future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                this.future.complete(null);
            }
        }

        @Override
        void maybeRetry(long currentTimeMs, Throwable throwable) {
            if (!this.allowsRetries()) {
                this.future.completeExceptionally(this.commitExceptionForRetriableError(throwable));
                return;
            }
            if (this.isExpired(currentTimeMs)) {
                this.future.completeExceptionally(throwable);
                return;
            }
            this.onFailedAttempt(currentTimeMs);
            CommitRequestManager.this.pendingRequests.addOffsetCommitRequest(this);
        }

        private boolean isExpired(long currentTimeMs) {
            return this.expirationTimeMs.isPresent() && this.expirationTimeMs.get() <= currentTimeMs;
        }

        private boolean allowsRetries() {
            return this.expirationTimeMs.isPresent();
        }

        private boolean maybeExpire(long currentTimeMs) {
            if (this.isExpired(currentTimeMs)) {
                this.future.completeExceptionally(new TimeoutException("OffsetCommit could not complete before timeout expired."));
                return true;
            }
            return false;
        }

        private Throwable commitExceptionForRetriableError(Throwable throwable) {
            if (!this.allowsRetries() && throwable instanceof RetriableException) {
                return new RetriableCommitFailedException(throwable);
            }
            return throwable;
        }

        private Throwable commitExceptionForStaleMemberEpoch() {
            if (this.retryOnStaleEpoch) {
                return new RetriableCommitFailedException(Errors.STALE_MEMBER_EPOCH.exception());
            }
            return new CommitFailedException("OffsetCommit failed with stale member epoch." + Errors.STALE_MEMBER_EPOCH.message());
        }
    }
}

