/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.MemberStateListener;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetCommitCallbackInvoker;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TimedRequestState;
import org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.errors.UnstableOffsetCommitException;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class CommitRequestManager
implements RequestManager,
MemberStateListener {
    private final Time time;
    private final SubscriptionState subscriptions;
    private final LogContext logContext;
    private final Logger log;
    private final Optional<AutoCommitState> autoCommitState;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private final OffsetCommitMetricsManager metricsManager;
    private final long retryBackoffMs;
    private final String groupId;
    private final Optional<String> groupInstanceId;
    private final long retryBackoffMaxMs;
    private final OptionalDouble jitter;
    private final boolean throwOnFetchStableOffsetUnsupported;
    final PendingRequests pendingRequests;
    private boolean closing = false;
    private final MemberInfo memberInfo;

    public CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptions, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, String groupId, Optional<String> groupInstanceId, Metrics metrics) {
        this(time, logContext, subscriptions, config, coordinatorRequestManager, offsetCommitCallbackInvoker, groupId, groupInstanceId, config.getLong("retry.backoff.ms"), config.getLong("retry.backoff.max.ms"), OptionalDouble.empty(), metrics);
    }

    CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptions, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, String groupId, Optional<String> groupInstanceId, long retryBackoffMs, long retryBackoffMaxMs, OptionalDouble jitter, Metrics metrics) {
        Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
        this.time = time;
        this.logContext = logContext;
        this.log = logContext.logger(this.getClass());
        this.pendingRequests = new PendingRequests();
        if (config.getBoolean("enable.auto.commit").booleanValue()) {
            long autoCommitInterval = Integer.toUnsignedLong(config.getInt("auto.commit.interval.ms"));
            this.autoCommitState = Optional.of(new AutoCommitState(time, autoCommitInterval, logContext));
        } else {
            this.autoCommitState = Optional.empty();
        }
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.groupId = groupId;
        this.groupInstanceId = groupInstanceId;
        this.subscriptions = subscriptions;
        this.retryBackoffMs = retryBackoffMs;
        this.retryBackoffMaxMs = retryBackoffMaxMs;
        this.jitter = jitter;
        this.throwOnFetchStableOffsetUnsupported = config.getBoolean("internal.throw.on.fetch.stable.offset.unsupported");
        this.memberInfo = new MemberInfo();
        this.metricsManager = new OffsetCommitMetricsManager(metrics);
        this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        if (!this.coordinatorRequestManager.coordinator().isPresent()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        if (this.closing) {
            return this.drainPendingOffsetCommitRequests();
        }
        this.maybeAutoCommitAsync();
        if (!this.pendingRequests.hasUnsentRequests()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        List<NetworkClientDelegate.UnsentRequest> requests = this.pendingRequests.drain(currentTimeMs);
        long timeUntilNextPoll = Math.min(CommitRequestManager.findMinTime(this.unsentOffsetCommitRequests(), currentTimeMs), CommitRequestManager.findMinTime(this.unsentOffsetFetchRequests(), currentTimeMs));
        return new NetworkClientDelegate.PollResult(timeUntilNextPoll, requests);
    }

    @Override
    public void signalClose() {
        this.closing = true;
    }

    @Override
    public long maximumTimeToWait(long currentTimeMs) {
        return this.autoCommitState.map(ac -> ac.remainingMs(currentTimeMs)).orElse(Long.MAX_VALUE);
    }

    private static long findMinTime(Collection<? extends RequestState> requests, long currentTimeMs) {
        return requests.stream().mapToLong(request -> request.remainingBackoffMs(currentTimeMs)).min().orElse(Long.MAX_VALUE);
    }

    private KafkaException maybeWrapAsTimeoutException(Throwable t) {
        if (t instanceof TimeoutException) {
            return (TimeoutException)t;
        }
        return new TimeoutException(t);
    }

    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(OffsetCommitRequestState requestState) {
        CompletableFuture result;
        AutoCommitState autocommit = this.autoCommitState.get();
        if (requestState.offsets.isEmpty()) {
            result = CompletableFuture.completedFuture(Collections.emptyMap());
        } else {
            autocommit.setInflightCommitStatus(true);
            OffsetCommitRequestState request = this.pendingRequests.addOffsetCommitRequest(requestState);
            result = request.future;
            result.whenComplete(this.autoCommitCallback(request.offsets));
        }
        return result;
    }

    public void maybeAutoCommitAsync() {
        if (this.autoCommitEnabled() && this.autoCommitState.get().shouldAutoCommit()) {
            OffsetCommitRequestState requestState = this.createOffsetCommitRequest(this.subscriptions.allConsumed(), Long.MAX_VALUE);
            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = this.requestAutoCommit(requestState);
            this.resetAutoCommitTimer();
            this.maybeResetTimerWithBackoff(result);
        }
    }

    private void maybeResetTimerWithBackoff(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
        result.whenComplete((offsets, error) -> {
            if (error != null) {
                if (error instanceof RetriableCommitFailedException) {
                    this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", offsets, error);
                    this.resetAutoCommitTimer(this.retryBackoffMs);
                } else {
                    this.log.debug("Asynchronous auto-commit of offsets {} failed: {}", offsets, (Object)error.getMessage());
                }
            } else {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
            }
        });
    }

    public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(long deadlineMs) {
        if (!this.autoCommitEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        OffsetCommitRequestState requestState = this.createOffsetCommitRequest(this.subscriptions.allConsumed(), deadlineMs);
        this.autoCommitSyncBeforeRevocationWithRetries(requestState, result);
        return result;
    }

    private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState requestAttempt, CompletableFuture<Void> result) {
        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> commitAttempt = this.requestAutoCommit(requestAttempt);
        commitAttempt.whenComplete((committedOffsets, error) -> {
            if (error == null) {
                result.complete(null);
            } else if (error instanceof RetriableException || this.isStaleEpochErrorAndValidEpochAvailable((Throwable)error)) {
                if (requestAttempt.isExpired()) {
                    this.log.debug("Auto-commit sync before revocation timed out and won't be retried anymore");
                    result.completeExceptionally(this.maybeWrapAsTimeoutException((Throwable)error));
                } else if (error instanceof UnknownTopicOrPartitionException) {
                    this.log.debug("Auto-commit sync before revocation failed because topic or partition were deleted");
                    result.completeExceptionally((Throwable)error);
                } else {
                    requestAttempt.offsets = this.subscriptions.allConsumed();
                    requestAttempt.resetFuture();
                    this.autoCommitSyncBeforeRevocationWithRetries(requestAttempt, result);
                }
            } else {
                this.log.debug("Auto-commit sync before revocation failed with non-retriable error", error);
                result.completeExceptionally((Throwable)error);
            }
        });
    }

    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets) {
        return (response, throwable) -> {
            this.autoCommitState.ifPresent(autoCommitState -> autoCommitState.setInflightCommitStatus(false));
            if (throwable == null) {
                this.offsetCommitCallbackInvoker.enqueueInterceptorInvocation(allConsumedOffsets);
                this.log.debug("Completed auto-commit of offsets {}", (Object)allConsumedOffsets);
            } else if (throwable instanceof RetriableCommitFailedException) {
                this.log.debug("Auto-commit of offsets {} failed due to retriable error: {}", (Object)allConsumedOffsets, (Object)throwable.getMessage());
            } else {
                this.log.warn("Auto-commit of offsets {} failed", (Object)allConsumedOffsets, throwable);
            }
        };
    }

    public CompletableFuture<Void> commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty()) {
            this.log.debug("Skipping commit of empty offsets");
            return CompletableFuture.completedFuture(null);
        }
        OffsetCommitRequestState commitRequest = this.createOffsetCommitRequest(offsets, Long.MAX_VALUE);
        this.pendingRequests.addOffsetCommitRequest(commitRequest);
        CompletableFuture<Void> asyncCommitResult = new CompletableFuture<Void>();
        commitRequest.future.whenComplete((committedOffsets, error) -> {
            if (error != null) {
                asyncCommitResult.completeExceptionally(this.commitAsyncExceptionForError((Throwable)error));
            } else {
                asyncCommitResult.complete(null);
            }
        });
        return asyncCommitResult;
    }

    public CompletableFuture<Void> commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, long deadlineMs) {
        CompletableFuture<Void> result = new CompletableFuture<Void>();
        OffsetCommitRequestState requestState = this.createOffsetCommitRequest(offsets, deadlineMs);
        this.commitSyncWithRetries(requestState, result);
        return result;
    }

    private OffsetCommitRequestState createOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets, long deadlineMs) {
        return this.jitter.isPresent() ? new OffsetCommitRequestState(offsets, this.groupId, this.groupInstanceId, deadlineMs, this.retryBackoffMs, this.retryBackoffMaxMs, this.jitter.getAsDouble(), this.memberInfo) : new OffsetCommitRequestState(offsets, this.groupId, this.groupInstanceId, deadlineMs, this.retryBackoffMs, this.retryBackoffMaxMs, this.memberInfo);
    }

    private void commitSyncWithRetries(OffsetCommitRequestState requestAttempt, CompletableFuture<Void> result) {
        this.pendingRequests.addOffsetCommitRequest(requestAttempt);
        requestAttempt.future.whenComplete((res, error) -> {
            if (error == null) {
                result.complete(null);
            } else if (error instanceof RetriableException) {
                if (requestAttempt.isExpired()) {
                    this.log.info("OffsetCommit timeout expired so it won't be retried anymore");
                    result.completeExceptionally(this.maybeWrapAsTimeoutException((Throwable)error));
                } else {
                    requestAttempt.resetFuture();
                    this.commitSyncWithRetries(requestAttempt, result);
                }
            } else {
                result.completeExceptionally(this.commitSyncExceptionForError((Throwable)error));
            }
        });
    }

    private Throwable commitSyncExceptionForError(Throwable error) {
        if (error instanceof StaleMemberEpochException) {
            return new CommitFailedException("OffsetCommit failed with stale member epoch." + Errors.STALE_MEMBER_EPOCH.message());
        }
        return error;
    }

    private Throwable commitAsyncExceptionForError(Throwable error) {
        if (error instanceof RetriableException) {
            return new RetriableCommitFailedException(error);
        }
        return error;
    }

    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets(Set<TopicPartition> partitions, long deadlineMs) {
        if (partitions.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result = new CompletableFuture<Map<TopicPartition, OffsetAndMetadata>>();
        OffsetFetchRequestState request = this.createOffsetFetchRequest(partitions, deadlineMs);
        this.fetchOffsetsWithRetries(request, result);
        return result;
    }

    private OffsetFetchRequestState createOffsetFetchRequest(Set<TopicPartition> partitions, long deadlineMs) {
        return this.jitter.isPresent() ? new OffsetFetchRequestState(partitions, this.retryBackoffMs, this.retryBackoffMaxMs, deadlineMs, this.jitter.getAsDouble(), this.memberInfo) : new OffsetFetchRequestState(partitions, this.retryBackoffMs, this.retryBackoffMaxMs, deadlineMs, this.memberInfo);
    }

    private void fetchOffsetsWithRetries(OffsetFetchRequestState fetchRequest, CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> result) {
        CompletableFuture currentResult = this.pendingRequests.addOffsetFetchRequest(fetchRequest);
        currentResult.whenComplete((res, error) -> {
            boolean inflightRemoved = this.pendingRequests.inflightOffsetFetches.remove(fetchRequest);
            if (!inflightRemoved) {
                this.log.warn("A duplicated, inflight, request was identified, but unable to find it in the outbound buffer:" + fetchRequest);
            }
            if (error == null) {
                result.complete((Map<TopicPartition, OffsetAndMetadata>)res);
            } else if (error instanceof RetriableException || this.isStaleEpochErrorAndValidEpochAvailable((Throwable)error)) {
                if (fetchRequest.isExpired()) {
                    this.log.debug("OffsetFetch request for {} timed out and won't be retried anymore", fetchRequest.requestedPartitions);
                    result.completeExceptionally(this.maybeWrapAsTimeoutException((Throwable)error));
                } else {
                    fetchRequest.resetFuture();
                    this.fetchOffsetsWithRetries(fetchRequest, result);
                }
            } else {
                result.completeExceptionally((Throwable)error);
            }
        });
    }

    private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable error) {
        return error instanceof StaleMemberEpochException && this.memberInfo.memberEpoch.isPresent();
    }

    public void updateAutoCommitTimer(long currentTimeMs) {
        this.autoCommitState.ifPresent(t -> t.updateTimer(currentTimeMs));
    }

    Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
        return this.pendingRequests.unsentOffsetCommits;
    }

    private List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
        return this.pendingRequests.unsentOffsetFetches;
    }

    private void handleCoordinatorDisconnect(Throwable exception, long currentTimeMs) {
        if (exception instanceof DisconnectException) {
            this.coordinatorRequestManager.markCoordinatorUnknown(exception.getMessage(), currentTimeMs);
        }
    }

    @Override
    public void onMemberEpochUpdated(Optional<Integer> memberEpoch, Optional<String> memberId) {
        this.memberInfo.memberId = memberId;
        this.memberInfo.memberEpoch = memberEpoch;
    }

    public boolean autoCommitEnabled() {
        return this.autoCommitState.isPresent();
    }

    public void resetAutoCommitTimer() {
        this.autoCommitState.ifPresent(AutoCommitState::resetTimer);
    }

    public void resetAutoCommitTimer(long retryBackoffMs) {
        this.autoCommitState.ifPresent(s -> s.resetTimer(retryBackoffMs));
    }

    public NetworkClientDelegate.PollResult drainPendingOffsetCommitRequests() {
        if (this.pendingRequests.unsentOffsetCommits.isEmpty()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        List requests = this.pendingRequests.drainPendingCommits();
        return new NetworkClientDelegate.PollResult(Long.MAX_VALUE, requests);
    }

    static class MemberInfo {
        Optional<String> memberId = Optional.empty();
        Optional<Integer> memberEpoch = Optional.empty();

        MemberInfo() {
        }
    }

    private static class AutoCommitState {
        private final Timer timer;
        private final long autoCommitInterval;
        private boolean hasInflightCommit;
        private final Logger log;

        public AutoCommitState(Time time, long autoCommitInterval, LogContext logContext) {
            this.autoCommitInterval = autoCommitInterval;
            this.timer = time.timer(autoCommitInterval);
            this.hasInflightCommit = false;
            this.log = logContext.logger(this.getClass());
        }

        public boolean shouldAutoCommit() {
            if (!this.timer.isExpired()) {
                return false;
            }
            if (this.hasInflightCommit) {
                this.log.trace("Skipping auto-commit on the interval because a previous one is still in-flight.");
                return false;
            }
            return true;
        }

        public void resetTimer() {
            this.timer.reset(this.autoCommitInterval);
        }

        public void resetTimer(long retryBackoffMs) {
            this.timer.reset(retryBackoffMs);
        }

        public long remainingMs(long currentTimeMs) {
            this.timer.update(currentTimeMs);
            return this.timer.remainingMs();
        }

        public void updateTimer(long currentTimeMs) {
            this.timer.update(currentTimeMs);
        }

        public void setInflightCommitStatus(boolean inflightCommitStatus) {
            this.hasInflightCommit = inflightCommitStatus;
        }
    }

    class PendingRequests {
        Queue<OffsetCommitRequestState> unsentOffsetCommits = new LinkedList<OffsetCommitRequestState>();
        List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList<OffsetFetchRequestState>();
        List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList<OffsetFetchRequestState>();

        PendingRequests() {
        }

        boolean hasUnsentRequests() {
            return !this.unsentOffsetCommits.isEmpty() || !this.unsentOffsetFetches.isEmpty();
        }

        OffsetCommitRequestState addOffsetCommitRequest(OffsetCommitRequestState request) {
            CommitRequestManager.this.log.debug("Enqueuing OffsetCommit request for offsets: {}", (Object)request.offsets);
            this.unsentOffsetCommits.add(request);
            return request;
        }

        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(OffsetFetchRequestState request) {
            Optional<OffsetFetchRequestState> dupe = this.unsentOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
            Optional<OffsetFetchRequestState> inflight = this.inflightOffsetFetches.stream().filter(r -> r.sameRequest(request)).findAny();
            if (dupe.isPresent() || inflight.isPresent()) {
                CommitRequestManager.this.log.debug("Duplicated unsent offset fetch request found for partitions: {}", request.requestedPartitions);
                dupe.orElseGet(inflight::get).chainFuture(request.future);
            } else {
                CommitRequestManager.this.log.debug("Enqueuing offset fetch request for partitions: {}", request.requestedPartitions);
                this.unsentOffsetFetches.add(request);
            }
            return request.future;
        }

        List<NetworkClientDelegate.UnsentRequest> drain(long currentTimeMs) {
            List unreadyCommitRequests = this.unsentOffsetCommits.stream().filter(request -> !request.canSendRequest(currentTimeMs)).collect(Collectors.toList());
            this.failAndRemoveExpiredCommitRequests();
            List unsentRequests = this.unsentOffsetCommits.stream().filter(request -> request.canSendRequest(currentTimeMs)).peek(request -> request.onSendAttempt(currentTimeMs)).map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toCollection(ArrayList::new));
            Map<Boolean, List<OffsetFetchRequestState>> partitionedBySendability = this.unsentOffsetFetches.stream().collect(Collectors.partitioningBy(request -> request.canSendRequest(currentTimeMs)));
            this.failAndRemoveExpiredFetchRequests();
            for (OffsetFetchRequestState request2 : partitionedBySendability.get(true)) {
                request2.onSendAttempt(currentTimeMs);
                unsentRequests.add(request2.toUnsentRequest());
                this.inflightOffsetFetches.add(request2);
            }
            this.clearAll();
            this.unsentOffsetFetches.addAll((Collection<OffsetFetchRequestState>)partitionedBySendability.get(false));
            this.unsentOffsetCommits.addAll(unreadyCommitRequests);
            return Collections.unmodifiableList(unsentRequests);
        }

        private void failAndRemoveExpiredCommitRequests() {
            LinkedList<OffsetCommitRequestState> requestsToPurge = new LinkedList<OffsetCommitRequestState>(this.unsentOffsetCommits);
            requestsToPurge.forEach(RetriableRequestState::maybeExpire);
        }

        private void failAndRemoveExpiredFetchRequests() {
            LinkedList<OffsetFetchRequestState> requestsToPurge = new LinkedList<OffsetFetchRequestState>(this.unsentOffsetFetches);
            requestsToPurge.forEach(RetriableRequestState::maybeExpire);
        }

        private void clearAll() {
            this.unsentOffsetCommits.clear();
            this.unsentOffsetFetches.clear();
        }

        private List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
            List res = this.unsentOffsetCommits.stream().map(OffsetCommitRequestState::toUnsentRequest).collect(Collectors.toCollection(ArrayList::new));
            this.clearAll();
            return res;
        }
    }

    class OffsetFetchRequestState
    extends RetriableRequestState {
        public final Set<TopicPartition> requestedPartitions;
        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

        public OffsetFetchRequestState(Set<TopicPartition> partitions, long retryBackoffMs, long retryBackoffMaxMs, long deadlineMs, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs, memberInfo, OffsetFetchRequestState.deadlineTimer(CommitRequestManager.this.time, deadlineMs));
            this.requestedPartitions = partitions;
            this.future = new CompletableFuture();
        }

        public OffsetFetchRequestState(Set<TopicPartition> partitions, long retryBackoffMs, long retryBackoffMaxMs, long deadlineMs, double jitter, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter, memberInfo, OffsetFetchRequestState.deadlineTimer(CommitRequestManager.this.time, deadlineMs));
            this.requestedPartitions = partitions;
            this.future = new CompletableFuture();
        }

        public boolean sameRequest(OffsetFetchRequestState request) {
            return this.requestedPartitions.equals(request.requestedPartitions);
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            OffsetFetchRequest.Builder builder = this.memberInfo.memberId.isPresent() && this.memberInfo.memberEpoch.isPresent() ? new OffsetFetchRequest.Builder(CommitRequestManager.this.groupId, this.memberInfo.memberId.get(), this.memberInfo.memberEpoch.get(), true, new ArrayList<TopicPartition>(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported) : new OffsetFetchRequest.Builder(CommitRequestManager.this.groupId, true, new ArrayList<TopicPartition>(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported);
            return this.buildRequestWithResponseHandling(builder);
        }

        @Override
        void onResponse(ClientResponse response) {
            long currentTimeMs = response.receivedTimeMs();
            OffsetFetchResponse fetchResponse = (OffsetFetchResponse)response.responseBody();
            Errors responseError = fetchResponse.groupLevelError(CommitRequestManager.this.groupId);
            if (responseError != Errors.NONE) {
                this.onFailure(currentTimeMs, responseError);
                return;
            }
            this.onSuccess(currentTimeMs, fetchResponse);
        }

        private void onFailure(long currentTimeMs, Errors responseError) {
            CommitRequestManager.this.log.debug("Offset fetch failed: {}", (Object)responseError.message());
            this.onFailedAttempt(currentTimeMs);
            if (responseError == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                this.future.completeExceptionally(responseError.exception());
            } else if (responseError == Errors.UNKNOWN_MEMBER_ID) {
                CommitRequestManager.this.log.error("OffsetFetch failed with {} because the member is not part of the group anymore.", (Object)responseError);
                this.future.completeExceptionally(responseError.exception());
            } else if (responseError == Errors.STALE_MEMBER_EPOCH) {
                CommitRequestManager.this.log.error("OffsetFetch failed with {} and the consumer is not part of the group anymore (it probably left the group, got fenced or failed). The request cannot be retried and will fail.", (Object)responseError);
                this.future.completeExceptionally(responseError.exception());
            } else if (responseError == Errors.NOT_COORDINATOR || responseError == Errors.COORDINATOR_NOT_AVAILABLE) {
                CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown("error response " + responseError.name(), currentTimeMs);
                this.future.completeExceptionally(responseError.exception());
            } else if (responseError == Errors.GROUP_AUTHORIZATION_FAILED) {
                this.future.completeExceptionally(GroupAuthorizationException.forGroupId(CommitRequestManager.this.groupId));
            } else {
                this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + responseError.message()));
            }
        }

        @Override
        String requestDescription() {
            return "OffsetFetch request for partitions " + this.requestedPartitions;
        }

        @Override
        CompletableFuture<?> future() {
            return this.future;
        }

        void resetFuture() {
            this.future = new CompletableFuture();
        }

        @Override
        void removeRequest() {
            if (!CommitRequestManager.this.unsentOffsetFetchRequests().remove(this)) {
                CommitRequestManager.this.log.warn("OffsetFetch request to remove not found in the outbound buffer: {}", (Object)this);
            }
        }

        private void onSuccess(long currentTimeMs, OffsetFetchResponse response) {
            HashSet<String> unauthorizedTopics = null;
            Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(CommitRequestManager.this.groupId);
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(responseData.size());
            HashSet<TopicPartition> unstableTxnOffsetTopicPartitions = new HashSet<TopicPartition>();
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : responseData.entrySet()) {
                TopicPartition tp = entry.getKey();
                OffsetFetchResponse.PartitionData partitionData = entry.getValue();
                if (partitionData.hasError()) {
                    this.onFailedAttempt(currentTimeMs);
                    Errors error = partitionData.error;
                    CommitRequestManager.this.log.debug("Failed to fetch offset for partition {}: {}", (Object)tp, (Object)error.message());
                    if (error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        this.future.completeExceptionally(new KafkaException("Topic or Partition " + tp + " does not exist"));
                        return;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        if (unauthorizedTopics == null) {
                            unauthorizedTopics = new HashSet<String>();
                        }
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    if (error == Errors.UNSTABLE_OFFSET_COMMIT) {
                        unstableTxnOffsetTopicPartitions.add(tp);
                        continue;
                    }
                    this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response for partition " + tp + ": " + error.message()));
                    return;
                }
                if (partitionData.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(partitionData.offset, partitionData.leaderEpoch, partitionData.metadata));
                    continue;
                }
                CommitRequestManager.this.log.info("Found no committed offset for partition {}", (Object)tp);
                offsets.put(tp, null);
            }
            if (unauthorizedTopics != null) {
                this.future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics));
            } else if (!unstableTxnOffsetTopicPartitions.isEmpty()) {
                CommitRequestManager.this.log.info("The following partitions still have unstable offsets which are not cleared on the broker side: {}, this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log", unstableTxnOffsetTopicPartitions);
                this.future.completeExceptionally(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
            } else {
                this.onSuccessfulAttempt(currentTimeMs);
                this.future.complete(offsets);
            }
        }

        private void chainFuture(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> otherFuture) {
            this.future.whenComplete((r, t) -> {
                if (t != null) {
                    otherFuture.completeExceptionally((Throwable)t);
                } else {
                    otherFuture.complete((Map<TopicPartition, OffsetAndMetadata>)r);
                }
            });
        }

        @Override
        public String toString() {
            return "OffsetFetchRequestState{requestedPartitions=" + this.requestedPartitions + ", memberId=" + this.memberInfo.memberId.orElse("undefined") + ", memberEpoch=" + (this.memberInfo.memberEpoch.isPresent() ? (Serializable)this.memberInfo.memberEpoch.get() : "undefined") + ", future=" + this.future + ", " + this.toStringBase() + '}';
        }
    }

    abstract class RetriableRequestState
    extends TimedRequestState {
        final MemberInfo memberInfo;

        RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, long retryBackoffMaxMs, MemberInfo memberInfo, Timer timer) {
            super(logContext, owner, retryBackoffMs, retryBackoffMaxMs, timer);
            this.memberInfo = memberInfo;
        }

        RetriableRequestState(LogContext logContext, String owner, long retryBackoffMs, int retryBackoffExpBase, long retryBackoffMaxMs, double jitter, MemberInfo memberInfo, Timer timer) {
            super(logContext, owner, retryBackoffMs, retryBackoffExpBase, retryBackoffMaxMs, jitter, timer);
            this.memberInfo = memberInfo;
        }

        abstract String requestDescription();

        abstract CompletableFuture<?> future();

        void maybeExpire() {
            if (this.numAttempts > 0 && this.isExpired()) {
                this.removeRequest();
                this.future().completeExceptionally(new TimeoutException(this.requestDescription() + " could not complete before timeout expired."));
            }
        }

        NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(AbstractRequest.Builder<?> builder) {
            NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(builder, CommitRequestManager.this.coordinatorRequestManager.coordinator());
            request.whenComplete((response, throwable) -> {
                long completionTimeMs = request.handler().completionTimeMs();
                this.handleClientResponse((ClientResponse)response, (Throwable)throwable, completionTimeMs);
            });
            return request;
        }

        private void handleClientResponse(ClientResponse response, Throwable error, long requestCompletionTimeMs) {
            try {
                if (error == null) {
                    this.onResponse(response);
                } else {
                    CommitRequestManager.this.log.debug("{} completed with error", (Object)this.requestDescription(), (Object)error);
                    this.onFailedAttempt(requestCompletionTimeMs);
                    CommitRequestManager.this.handleCoordinatorDisconnect(error, requestCompletionTimeMs);
                    this.future().completeExceptionally(error);
                }
            }
            catch (Throwable t) {
                CommitRequestManager.this.log.error("Unexpected error handling response for {}", (Object)this.requestDescription(), (Object)t);
                this.future().completeExceptionally(t);
            }
        }

        abstract void onResponse(ClientResponse var1);

        abstract void removeRequest();
    }

    private class OffsetCommitRequestState
    extends RetriableRequestState {
        private Map<TopicPartition, OffsetAndMetadata> offsets;
        private final String groupId;
        private final Optional<String> groupInstanceId;
        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

        OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> offsets, String groupId, Optional<String> groupInstanceId, long deadlineMs, long retryBackoffMs, long retryBackoffMaxMs, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, retryBackoffMaxMs, memberInfo, OffsetCommitRequestState.deadlineTimer(CommitRequestManager.this.time, deadlineMs));
            this.offsets = offsets;
            this.groupId = groupId;
            this.groupInstanceId = groupInstanceId;
            this.future = new CompletableFuture();
        }

        OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> offsets, String groupId, Optional<String> groupInstanceId, long deadlineMs, long retryBackoffMs, long retryBackoffMaxMs, double jitter, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter, memberInfo, OffsetCommitRequestState.deadlineTimer(CommitRequestManager.this.time, deadlineMs));
            this.offsets = offsets;
            this.groupId = groupId;
            this.groupInstanceId = groupInstanceId;
            this.future = new CompletableFuture();
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic> requestTopicDataMap = new HashMap<String, OffsetCommitRequestData.OffsetCommitRequestTopic>();
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsets.entrySet()) {
                TopicPartition topicPartition = entry.getKey();
                OffsetAndMetadata offsetAndMetadata = entry.getValue();
                OffsetCommitRequestData.OffsetCommitRequestTopic topic = requestTopicDataMap.getOrDefault(topicPartition.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(topicPartition.topic()));
                topic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(topicPartition.partition()).setCommittedOffset(offsetAndMetadata.offset()).setCommittedLeaderEpoch(offsetAndMetadata.leaderEpoch().orElse(-1)).setCommittedMetadata(offsetAndMetadata.metadata()));
                requestTopicDataMap.put(topicPartition.topic(), topic);
            }
            OffsetCommitRequestData data = new OffsetCommitRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId.orElse(null)).setTopics(new ArrayList<OffsetCommitRequestData.OffsetCommitRequestTopic>(requestTopicDataMap.values()));
            if (this.memberInfo.memberId.isPresent()) {
                data = data.setMemberId(this.memberInfo.memberId.get());
            }
            if (this.memberInfo.memberEpoch.isPresent()) {
                data = data.setGenerationIdOrMemberEpoch(this.memberInfo.memberEpoch.get());
            }
            OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(data);
            return this.buildRequestWithResponseHandling(builder);
        }

        @Override
        public void onResponse(ClientResponse response) {
            CommitRequestManager.this.metricsManager.recordRequestLatency(response.requestLatencyMs());
            long currentTimeMs = response.receivedTimeMs();
            OffsetCommitResponse commitResponse = (OffsetCommitResponse)response.responseBody();
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            for (OffsetCommitResponseData.OffsetCommitResponseTopic topic : commitResponse.data().topics()) {
                for (OffsetCommitResponseData.OffsetCommitResponsePartition partition : topic.partitions()) {
                    TopicPartition tp = new TopicPartition(topic.name(), partition.partitionIndex());
                    Errors error = Errors.forCode(partition.errorCode());
                    if (error == Errors.NONE) {
                        OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                        long offset = offsetAndMetadata.offset();
                        CommitRequestManager.this.log.debug("OffsetCommit completed successfully for offset {} partition {}", (Object)offset, (Object)tp);
                        continue;
                    }
                    this.onFailedAttempt(currentTimeMs);
                    if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                        this.future.completeExceptionally(GroupAuthorizationException.forGroupId(this.groupId));
                        return;
                    }
                    if (error == Errors.COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR || error == Errors.REQUEST_TIMED_OUT) {
                        CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown(error.message(), currentTimeMs);
                        this.future.completeExceptionally(error.exception());
                        return;
                    }
                    if (error == Errors.FENCED_INSTANCE_ID) {
                        String fencedError = "OffsetCommit failed due to group instance id fenced: " + this.groupInstanceId;
                        CommitRequestManager.this.log.error(fencedError);
                        this.future.completeExceptionally(new CommitFailedException(fencedError));
                        return;
                    }
                    if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                        this.future.completeExceptionally(error.exception());
                        return;
                    }
                    if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS || error == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        this.future.completeExceptionally(error.exception());
                        return;
                    }
                    if (error == Errors.UNKNOWN_MEMBER_ID) {
                        CommitRequestManager.this.log.error("OffsetCommit failed with {}", (Object)error);
                        this.future.completeExceptionally(new CommitFailedException("OffsetCommit failed with unknown member ID. " + error.message()));
                        return;
                    }
                    if (error == Errors.STALE_MEMBER_EPOCH) {
                        this.future.completeExceptionally(error.exception());
                        return;
                    }
                    if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        unauthorizedTopics.add(tp.topic());
                        continue;
                    }
                    this.future.completeExceptionally(new KafkaException("Unexpected error in commit: " + error.message()));
                    return;
                }
            }
            if (!unauthorizedTopics.isEmpty()) {
                CommitRequestManager.this.log.error("OffsetCommit failed due to not authorized to commit to topics {}", unauthorizedTopics);
                this.future.completeExceptionally(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                this.future.complete(null);
            }
        }

        @Override
        String requestDescription() {
            return "OffsetCommit request for offsets " + this.offsets;
        }

        @Override
        CompletableFuture<?> future() {
            return this.future;
        }

        void resetFuture() {
            this.future = new CompletableFuture();
        }

        @Override
        void removeRequest() {
            if (!CommitRequestManager.this.unsentOffsetCommitRequests().remove(this)) {
                CommitRequestManager.this.log.warn("OffsetCommit request to remove not found in the outbound buffer: {}", (Object)this);
            }
        }
    }
}

