/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.MembershipManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestState;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import org.apache.kafka.clients.consumer.internals.events.ErrorBackgroundEvent;
import org.apache.kafka.clients.consumer.internals.events.GroupMetadataUpdateEvent;
import org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatRequest;
import org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

public class HeartbeatRequestManager
implements RequestManager {
    private final Logger logger;
    private final int maxPollIntervalMs;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final HeartbeatRequestState heartbeatRequestState;
    private final HeartbeatState heartbeatState;
    private final MembershipManager membershipManager;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Timer pollTimer;
    private GroupMetadataUpdateEvent previousGroupMetadataUpdateEvent = null;
    private final HeartbeatMetricsManager metricsManager;

    public HeartbeatRequestManager(LogContext logContext, Time time, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, SubscriptionState subscriptions, MembershipManager membershipManager, BackgroundEventHandler backgroundEventHandler, Metrics metrics) {
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.logger = logContext.logger(this.getClass());
        this.membershipManager = membershipManager;
        this.backgroundEventHandler = backgroundEventHandler;
        this.maxPollIntervalMs = config.getInt("max.poll.interval.ms");
        long retryBackoffMs = config.getLong("retry.backoff.ms");
        long retryBackoffMaxMs = config.getLong("retry.backoff.max.ms");
        this.heartbeatState = new HeartbeatState(subscriptions, membershipManager, this.maxPollIntervalMs);
        this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0L, retryBackoffMs, retryBackoffMaxMs, (double)this.maxPollIntervalMs);
        this.pollTimer = time.timer(this.maxPollIntervalMs);
        this.metricsManager = new HeartbeatMetricsManager(metrics);
    }

    HeartbeatRequestManager(LogContext logContext, Timer timer, ConsumerConfig config, CoordinatorRequestManager coordinatorRequestManager, MembershipManager membershipManager, HeartbeatState heartbeatState, HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler, Metrics metrics) {
        this.logger = logContext.logger(this.getClass());
        this.maxPollIntervalMs = config.getInt("max.poll.interval.ms");
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.heartbeatRequestState = heartbeatRequestState;
        this.heartbeatState = heartbeatState;
        this.membershipManager = membershipManager;
        this.backgroundEventHandler = backgroundEventHandler;
        this.pollTimer = timer;
        this.metricsManager = new HeartbeatMetricsManager(metrics);
    }

    @Override
    public NetworkClientDelegate.PollResult poll(long currentTimeMs) {
        boolean heartbeatNow;
        if (!this.coordinatorRequestManager.coordinator().isPresent() || this.membershipManager.shouldSkipHeartbeat() || this.pollTimer.isExpired()) {
            this.membershipManager.onHeartbeatRequestSkipped();
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        this.pollTimer.update(currentTimeMs);
        if (this.pollTimer.isExpired()) {
            this.logger.warn("consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.");
            this.membershipManager.transitionToStale();
            NetworkClientDelegate.UnsentRequest request = this.makeHeartbeatRequest(currentTimeMs, true);
            this.heartbeatRequestState.reset();
            this.heartbeatState.reset();
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
        }
        boolean bl = heartbeatNow = this.membershipManager.shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight();
        if (!this.heartbeatRequestState.canSendRequest(currentTimeMs) && !heartbeatNow) {
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.nextHeartbeatMs(currentTimeMs));
        }
        NetworkClientDelegate.UnsentRequest request = this.makeHeartbeatRequest(currentTimeMs, false);
        return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(request));
    }

    public MembershipManager membershipManager() {
        return this.membershipManager;
    }

    @Override
    public long maximumTimeToWait(long currentTimeMs) {
        boolean heartbeatNow = this.membershipManager.shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight();
        return heartbeatNow ? 0L : this.heartbeatRequestState.nextHeartbeatMs(currentTimeMs);
    }

    public void resetPollTimer(long pollMs) {
        this.pollTimer.update(pollMs);
        this.pollTimer.reset(this.maxPollIntervalMs);
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(long currentTimeMs, boolean ignoreResponse) {
        NetworkClientDelegate.UnsentRequest request = this.makeHeartbeatRequest(ignoreResponse);
        this.heartbeatRequestState.onSendAttempt(currentTimeMs);
        this.membershipManager.onHeartbeatRequestSent();
        this.metricsManager.recordHeartbeatSentMs(currentTimeMs);
        return request;
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(boolean ignoreResponse) {
        NetworkClientDelegate.UnsentRequest request = new NetworkClientDelegate.UnsentRequest(new ConsumerGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), this.coordinatorRequestManager.coordinator());
        if (ignoreResponse) {
            return this.logResponse(request);
        }
        return request.whenComplete((response, exception) -> {
            long completionTimeMs = request.handler().completionTimeMs();
            if (response != null) {
                this.metricsManager.recordRequestLatency(response.requestLatencyMs());
                this.onResponse((ConsumerGroupHeartbeatResponse)response.responseBody(), completionTimeMs);
            } else {
                this.onFailure((Throwable)exception, completionTimeMs);
            }
        });
    }

    private NetworkClientDelegate.UnsentRequest logResponse(NetworkClientDelegate.UnsentRequest request) {
        return request.whenComplete((response, exception) -> {
            if (response != null) {
                this.metricsManager.recordRequestLatency(response.requestLatencyMs());
                Errors error = Errors.forCode(((ConsumerGroupHeartbeatResponse)response.responseBody()).data().errorCode());
                if (error == Errors.NONE) {
                    this.logger.debug("GroupHeartbeat responded successfully: {}", response);
                } else {
                    this.logger.error("GroupHeartbeat failed because of {}: {}", (Object)error, response);
                }
            } else {
                this.logger.error("GroupHeartbeat failed because of unexpected exception.", exception);
            }
        });
    }

    private void onFailure(Throwable exception, long responseTimeMs) {
        this.heartbeatRequestState.onFailedAttempt(responseTimeMs);
        this.heartbeatState.reset();
        if (exception instanceof RetriableException) {
            String message = String.format("GroupHeartbeatRequest failed because of the retriable exception. Will retry in %s ms: %s", this.heartbeatRequestState.remainingBackoffMs(responseTimeMs), exception.getMessage());
            this.logger.debug(message);
        } else {
            this.logger.error("GroupHeartbeatRequest failed due to fatal error: " + exception.getMessage());
            this.handleFatalFailure(exception);
        }
    }

    private void onResponse(ConsumerGroupHeartbeatResponse response, long currentTimeMs) {
        if (Errors.forCode(response.data().errorCode()) == Errors.NONE) {
            this.heartbeatRequestState.updateHeartbeatIntervalMs(response.data().heartbeatIntervalMs());
            this.heartbeatRequestState.onSuccessfulAttempt(currentTimeMs);
            this.heartbeatRequestState.resetTimer();
            this.membershipManager.onHeartbeatResponseReceived(response.data());
            this.maybeSendGroupMetadataUpdateEvent();
            return;
        }
        this.onErrorResponse(response, currentTimeMs);
    }

    private void maybeSendGroupMetadataUpdateEvent() {
        if (this.previousGroupMetadataUpdateEvent == null || !this.previousGroupMetadataUpdateEvent.memberId().equals(this.membershipManager.memberId()) || this.previousGroupMetadataUpdateEvent.memberEpoch() != this.membershipManager.memberEpoch()) {
            GroupMetadataUpdateEvent currentGroupMetadataUpdateEvent = new GroupMetadataUpdateEvent(this.membershipManager.memberEpoch(), this.previousGroupMetadataUpdateEvent != null && this.membershipManager.memberId() == null ? this.previousGroupMetadataUpdateEvent.memberId() : this.membershipManager.memberId());
            this.backgroundEventHandler.add(currentGroupMetadataUpdateEvent);
            this.previousGroupMetadataUpdateEvent = currentGroupMetadataUpdateEvent;
        }
    }

    private void onErrorResponse(ConsumerGroupHeartbeatResponse response, long currentTimeMs) {
        Errors error = Errors.forCode(response.data().errorCode());
        String errorMessage = response.data().errorMessage();
        this.heartbeatState.reset();
        this.heartbeatRequestState.onFailedAttempt(currentTimeMs);
        switch (error) {
            case NOT_COORDINATOR: {
                String message = String.format("GroupHeartbeatRequest failed because the group coordinator %s is incorrect. Will attempt to find the coordinator again and retry", this.coordinatorRequestManager.coordinator());
                this.logInfo(message, response, currentTimeMs);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs);
                this.heartbeatRequestState.reset();
                break;
            }
            case COORDINATOR_NOT_AVAILABLE: {
                String message = String.format("GroupHeartbeatRequest failed because the group coordinator %s is not available. Will attempt to find the coordinator again and retry", this.coordinatorRequestManager.coordinator());
                this.logInfo(message, response, currentTimeMs);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, currentTimeMs);
                this.heartbeatRequestState.reset();
                break;
            }
            case COORDINATOR_LOAD_IN_PROGRESS: {
                String message = String.format("GroupHeartbeatRequest failed because the group coordinator %s is still loading.Will retry", this.coordinatorRequestManager.coordinator());
                this.logInfo(message, response, currentTimeMs);
                break;
            }
            case GROUP_AUTHORIZATION_FAILED: {
                GroupAuthorizationException exception = GroupAuthorizationException.forGroupId(this.membershipManager.groupId());
                this.logger.error("GroupHeartbeatRequest failed due to group authorization failure: {}", (Object)exception.getMessage());
                this.handleFatalFailure(error.exception(exception.getMessage()));
                break;
            }
            case UNRELEASED_INSTANCE_ID: {
                this.logger.error("GroupHeartbeatRequest failed due to the instance id {} was not released: {}", (Object)this.membershipManager.groupInstanceId().orElse("null"), (Object)errorMessage);
                this.handleFatalFailure(Errors.UNRELEASED_INSTANCE_ID.exception(errorMessage));
                break;
            }
            case INVALID_REQUEST: 
            case GROUP_MAX_SIZE_REACHED: 
            case UNSUPPORTED_ASSIGNOR: 
            case UNSUPPORTED_VERSION: {
                this.logger.error("GroupHeartbeatRequest failed due to error: {}", (Object)error);
                this.handleFatalFailure(error.exception(errorMessage));
                break;
            }
            case FENCED_MEMBER_EPOCH: {
                String message = String.format("GroupHeartbeatRequest failed for member %s because epoch %s is fenced.", this.membershipManager.memberId(), this.membershipManager.memberEpoch());
                this.logInfo(message, response, currentTimeMs);
                this.membershipManager.transitionToFenced();
                this.heartbeatRequestState.reset();
                break;
            }
            case UNKNOWN_MEMBER_ID: {
                String message = String.format("GroupHeartbeatRequest failed because member %s is unknown.", this.membershipManager.memberId());
                this.logInfo(message, response, currentTimeMs);
                this.membershipManager.transitionToFenced();
                this.heartbeatRequestState.reset();
                break;
            }
            default: {
                this.logger.error("GroupHeartbeatRequest failed due to unexpected error: {}", (Object)error);
                this.handleFatalFailure(error.exception(errorMessage));
            }
        }
    }

    private void logInfo(String message, ConsumerGroupHeartbeatResponse response, long currentTimeMs) {
        this.logger.info("{} in {}ms: {}", new Object[]{message, this.heartbeatRequestState.remainingBackoffMs(currentTimeMs), response.data().errorMessage()});
    }

    private void handleFatalFailure(Throwable error) {
        this.backgroundEventHandler.add(new ErrorBackgroundEvent(error));
        this.membershipManager.transitionToFatal();
    }

    static class HeartbeatState {
        private final SubscriptionState subscriptions;
        private final MembershipManager membershipManager;
        private final int rebalanceTimeoutMs;
        private final SentFields sentFields;

        public HeartbeatState(SubscriptionState subscriptions, MembershipManager membershipManager, int rebalanceTimeoutMs) {
            this.subscriptions = subscriptions;
            this.membershipManager = membershipManager;
            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
            this.sentFields = new SentFields();
        }

        public void reset() {
            this.sentFields.reset();
        }

        public ConsumerGroupHeartbeatRequestData buildRequestData() {
            TreeSet<String> subscribedTopicNames;
            ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData();
            data.setGroupId(this.membershipManager.groupId());
            data.setMemberId(this.membershipManager.memberId());
            data.setMemberEpoch(this.membershipManager.memberEpoch());
            this.membershipManager.groupInstanceId().ifPresent(groupInstanceId -> {
                if (!groupInstanceId.equals(this.sentFields.instanceId)) {
                    data.setInstanceId((String)groupInstanceId);
                    this.sentFields.instanceId = groupInstanceId;
                }
            });
            if (this.sentFields.rebalanceTimeoutMs != this.rebalanceTimeoutMs) {
                data.setRebalanceTimeoutMs(this.rebalanceTimeoutMs);
                this.sentFields.rebalanceTimeoutMs = this.rebalanceTimeoutMs;
            }
            if (!this.subscriptions.hasPatternSubscription() && !(subscribedTopicNames = new TreeSet<String>(this.subscriptions.subscription())).equals(this.sentFields.subscribedTopicNames)) {
                data.setSubscribedTopicNames(new ArrayList<String>(this.subscriptions.subscription()));
                this.sentFields.subscribedTopicNames = subscribedTopicNames;
            }
            this.membershipManager.serverAssignor().ifPresent(serverAssignor -> {
                if (!serverAssignor.equals(this.sentFields.serverAssignor)) {
                    data.setServerAssignor((String)serverAssignor);
                    this.sentFields.serverAssignor = serverAssignor;
                }
            });
            TreeSet assignedPartitions = this.membershipManager.currentAssignment().entrySet().stream().map(entry -> entry.getKey() + "-" + entry.getValue()).collect(Collectors.toCollection(TreeSet::new));
            if (!assignedPartitions.equals(this.sentFields.topicPartitions)) {
                List<ConsumerGroupHeartbeatRequestData.TopicPartitions> topicPartitions = this.buildTopicPartitionsList(this.membershipManager.currentAssignment());
                data.setTopicPartitions(topicPartitions);
                this.sentFields.topicPartitions = assignedPartitions;
            }
            return data;
        }

        private List<ConsumerGroupHeartbeatRequestData.TopicPartitions> buildTopicPartitionsList(Map<Uuid, SortedSet<Integer>> topicIdPartitions) {
            return topicIdPartitions.entrySet().stream().map(entry -> new ConsumerGroupHeartbeatRequestData.TopicPartitions().setTopicId((Uuid)entry.getKey()).setPartitions(new ArrayList<Integer>((Collection)entry.getValue()))).collect(Collectors.toList());
        }

        static class SentFields {
            private String instanceId = null;
            private int rebalanceTimeoutMs = -1;
            private TreeSet<String> subscribedTopicNames = null;
            private String serverAssignor = null;
            private TreeSet<String> topicPartitions = null;

            SentFields() {
            }

            void reset() {
                this.instanceId = null;
                this.rebalanceTimeoutMs = -1;
                this.subscribedTopicNames = null;
                this.serverAssignor = null;
                this.topicPartitions = null;
            }
        }
    }

    static class HeartbeatRequestState
    extends RequestState {
        private final Timer heartbeatTimer;
        private long heartbeatIntervalMs;

        public HeartbeatRequestState(LogContext logContext, Time time, long heartbeatIntervalMs, long retryBackoffMs, long retryBackoffMaxMs, double jitter) {
            super(logContext, HeartbeatRequestState.class.getName(), retryBackoffMs, 2, retryBackoffMaxMs, jitter);
            this.heartbeatIntervalMs = heartbeatIntervalMs;
            this.heartbeatTimer = time.timer(heartbeatIntervalMs);
        }

        private void update(long currentTimeMs) {
            this.heartbeatTimer.update(currentTimeMs);
        }

        public void resetTimer() {
            this.heartbeatTimer.reset(this.heartbeatIntervalMs);
        }

        @Override
        public boolean canSendRequest(long currentTimeMs) {
            this.update(currentTimeMs);
            return this.heartbeatTimer.isExpired() && super.canSendRequest(currentTimeMs);
        }

        public long nextHeartbeatMs(long currentTimeMs) {
            if (this.heartbeatTimer.remainingMs() == 0L) {
                return this.remainingBackoffMs(currentTimeMs);
            }
            return this.heartbeatTimer.remainingMs();
        }

        private void updateHeartbeatIntervalMs(long heartbeatIntervalMs) {
            if (this.heartbeatIntervalMs == heartbeatIntervalMs) {
                return;
            }
            this.heartbeatIntervalMs = heartbeatIntervalMs;
            this.heartbeatTimer.updateAndReset(heartbeatIntervalMs);
        }
    }
}

