/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.util.LinkedList;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.GroupRebalanceConfig;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.internals.CommitRequestManager;
import org.apache.kafka.clients.consumer.internals.ConsumerMetadata;
import org.apache.kafka.clients.consumer.internals.ConsumerUtils;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.ErrorEventHandler;
import org.apache.kafka.clients.consumer.internals.GroupState;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.clients.consumer.internals.TopicMetadataRequestManager;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.BackgroundEvent;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;

public class DefaultBackgroundThread
extends KafkaThread {
    private static final long MAX_POLL_TIMEOUT_MS = 5000L;
    private static final String BACKGROUND_THREAD_NAME = "consumer_background_thread";
    private final Time time;
    private final Logger log;
    private final BlockingQueue<ApplicationEvent> applicationEventQueue;
    private final BlockingQueue<BackgroundEvent> backgroundEventQueue;
    private final ConsumerMetadata metadata;
    private final ConsumerConfig config;
    private final ApplicationEventProcessor applicationEventProcessor;
    private final NetworkClientDelegate networkClientDelegate;
    private final ErrorEventHandler errorEventHandler;
    private final GroupState groupState;
    private boolean running;
    private final RequestManagers requestManagers;

    DefaultBackgroundThread(Time time, ConsumerConfig config, LogContext logContext, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue, ErrorEventHandler errorEventHandler, ApplicationEventProcessor processor, ConsumerMetadata metadata, NetworkClientDelegate networkClient, GroupState groupState, CoordinatorRequestManager coordinatorManager, CommitRequestManager commitRequestManager, OffsetsRequestManager offsetsRequestManager, TopicMetadataRequestManager topicMetadataRequestManager) {
        super(BACKGROUND_THREAD_NAME, true);
        this.time = time;
        this.running = true;
        this.log = logContext.logger(this.getClass());
        this.applicationEventQueue = applicationEventQueue;
        this.backgroundEventQueue = backgroundEventQueue;
        this.applicationEventProcessor = processor;
        this.config = config;
        this.metadata = metadata;
        this.networkClientDelegate = networkClient;
        this.errorEventHandler = errorEventHandler;
        this.groupState = groupState;
        this.requestManagers = new RequestManagers(offsetsRequestManager, topicMetadataRequestManager, Optional.ofNullable(coordinatorManager), Optional.ofNullable(commitRequestManager));
    }

    public DefaultBackgroundThread(Time time, ConsumerConfig config, GroupRebalanceConfig rebalanceConfig, LogContext logContext, BlockingQueue<ApplicationEvent> applicationEventQueue, BlockingQueue<BackgroundEvent> backgroundEventQueue, ConsumerMetadata metadata, SubscriptionState subscriptionState, ApiVersions apiVersions, Metrics metrics, Sensor fetcherThrottleTimeSensor) {
        super(BACKGROUND_THREAD_NAME, true);
        Objects.requireNonNull(config);
        Objects.requireNonNull(rebalanceConfig);
        Objects.requireNonNull(logContext);
        Objects.requireNonNull(applicationEventQueue);
        Objects.requireNonNull(backgroundEventQueue);
        Objects.requireNonNull(metadata);
        Objects.requireNonNull(subscriptionState);
        try {
            this.time = time;
            this.log = logContext.logger(this.getClass());
            this.applicationEventQueue = applicationEventQueue;
            this.backgroundEventQueue = backgroundEventQueue;
            this.config = config;
            this.metadata = metadata;
            NetworkClient networkClient = ClientUtils.createNetworkClient(config, metrics, "consumer", logContext, apiVersions, time, 100, metadata, fetcherThrottleTimeSensor);
            this.networkClientDelegate = new NetworkClientDelegate(this.time, this.config, logContext, networkClient);
            this.running = true;
            this.errorEventHandler = new ErrorEventHandler(this.backgroundEventQueue);
            this.groupState = new GroupState(rebalanceConfig);
            long retryBackoffMs = config.getLong("retry.backoff.ms");
            int requestTimeoutMs = config.getInt("request.timeout.ms");
            OffsetsRequestManager offsetsRequestManager = new OffsetsRequestManager(subscriptionState, metadata, ConsumerUtils.configuredIsolationLevel(config), time, retryBackoffMs, requestTimeoutMs, apiVersions, this.networkClientDelegate, logContext);
            CoordinatorRequestManager coordinatorRequestManager = null;
            CommitRequestManager commitRequestManager = null;
            TopicMetadataRequestManager topicMetadataRequestManger = new TopicMetadataRequestManager(logContext, config);
            if (this.groupState.groupId != null) {
                coordinatorRequestManager = new CoordinatorRequestManager(this.time, logContext, retryBackoffMs, this.errorEventHandler, this.groupState.groupId);
                commitRequestManager = new CommitRequestManager(this.time, logContext, subscriptionState, config, coordinatorRequestManager, this.groupState);
            }
            this.requestManagers = new RequestManagers(offsetsRequestManager, topicMetadataRequestManger, Optional.ofNullable(coordinatorRequestManager), Optional.ofNullable(commitRequestManager));
            this.applicationEventProcessor = new ApplicationEventProcessor(backgroundEventQueue, this.requestManagers, metadata);
        }
        catch (Exception e) {
            this.close();
            throw new KafkaException("Failed to construct background processor", e.getCause());
        }
    }

    @Override
    public void run() {
        try {
            this.log.debug("Background thread started");
            while (this.running) {
                try {
                    this.runOnce();
                }
                catch (WakeupException e) {
                    this.log.debug("WakeupException caught, background thread won't be interrupted");
                }
            }
        }
        catch (Throwable t) {
            this.log.error("The background thread failed due to unexpected error", t);
            throw new KafkaException(t);
        }
        finally {
            this.close();
            this.log.debug("{} closed", this.getClass());
        }
    }

    void runOnce() {
        if (!this.applicationEventQueue.isEmpty()) {
            LinkedList res = new LinkedList();
            this.applicationEventQueue.drainTo(res);
            for (ApplicationEvent event : res) {
                this.log.debug("Consuming application event: {}", (Object)event);
                Objects.requireNonNull(event);
                this.applicationEventProcessor.process(event);
            }
        }
        long currentTimeMs = this.time.milliseconds();
        long pollWaitTimeMs = this.requestManagers.entries().stream().filter(Optional::isPresent).map(m -> ((RequestManager)m.get()).poll(currentTimeMs)).map(this::handlePollResult).reduce(5000L, Math::min);
        this.networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
    }

    long handlePollResult(NetworkClientDelegate.PollResult res) {
        if (!res.unsentRequests.isEmpty()) {
            this.networkClientDelegate.addAll(res.unsentRequests);
        }
        return res.timeUntilNextPollMs;
    }

    public boolean isRunning() {
        return this.running;
    }

    public final void wakeup() {
        this.networkClientDelegate.wakeup();
    }

    public final void close() {
        this.running = false;
        this.wakeup();
        Utils.closeQuietly(this.networkClientDelegate, "network client utils");
        Utils.closeQuietly(this.metadata, "consumer metadata client");
    }
}

