/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.server.util;

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.internals.FatalExitError;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.RequestAndCompletionHandler;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

public class InterBrokerSendThreadTest {
    private final Time time = new MockTime();
    private final KafkaClient networkClient = (KafkaClient)Mockito.mock(KafkaClient.class);
    private final StubCompletionHandler completionHandler = new StubCompletionHandler();
    private final int requestTimeoutMs = 1000;

    @Test
    public void testShutdownThreadShouldNotCauseException() throws InterruptedException, IOException {
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{new DisconnectException()});
        Mockito.when((Object)this.networkClient.active()).thenReturn((Object)false);
        AtomicReference exception = new AtomicReference();
        TestInterBrokerSendThread thread = new TestInterBrokerSendThread(this.networkClient, exception::getAndSet);
        thread.shutdown();
        thread.pollOnce(100L);
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).initiateClose();
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).close();
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).active();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Assertions.assertNull(exception.get());
    }

    @Test
    public void testDisconnectWithoutShutdownShouldCauseException() {
        DisconnectException de = new DisconnectException();
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenThrow(new Throwable[]{de});
        Mockito.when((Object)this.networkClient.active()).thenReturn((Object)true);
        AtomicReference throwable = new AtomicReference();
        TestInterBrokerSendThread thread = new TestInterBrokerSendThread(this.networkClient, throwable::getAndSet);
        thread.pollOnce(100L);
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).active();
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Throwable thrown = (Throwable)throwable.get();
        Assertions.assertNotNull((Object)thrown);
        Assertions.assertInstanceOf(FatalExitError.class, (Object)thrown);
    }

    @Test
    public void testShouldNotSendAnythingWhenNoRequests() {
        TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(List.of());
        sendThread.doWork();
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Assertions.assertFalse((boolean)this.completionHandler.executedWithDisconnectedResponse);
    }

    @Test
    public void testShouldCreateClientRequestAndSendWhenNodeIsReady() {
        StubRequestBuilder request = new StubRequestBuilder();
        Node node = new Node(1, "", 8080);
        RequestAndCompletionHandler handler = new RequestAndCompletionHandler(this.time.milliseconds(), node, request, (RequestCompletionHandler)this.completionHandler);
        TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
        ClientRequest clientRequest = new ClientRequest("dest", request, 0, "1", 0L, true, 1000, handler.handler);
        Mockito.when((Object)this.networkClient.newClientRequest((String)ArgumentMatchers.eq((Object)"1"), (AbstractRequest.Builder)ArgumentMatchers.same((Object)handler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.eq((int)1000), (RequestCompletionHandler)ArgumentMatchers.same((Object)handler.handler))).thenReturn((Object)clientRequest);
        Mockito.when((Object)this.networkClient.ready(node, this.time.milliseconds())).thenReturn((Object)true);
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(List.of());
        sendThread.enqueue(handler);
        sendThread.doWork();
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).newClientRequest((String)ArgumentMatchers.eq((Object)"1"), (AbstractRequest.Builder)ArgumentMatchers.same((Object)handler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.eq((int)1000), (RequestCompletionHandler)ArgumentMatchers.same((Object)handler.handler));
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).ready((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).send((ClientRequest)ArgumentMatchers.same((Object)clientRequest), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Assertions.assertFalse((boolean)this.completionHandler.executedWithDisconnectedResponse);
    }

    @Test
    public void testShouldCallCompletionHandlerWithDisconnectedResponseWhenNodeNotReady() {
        StubRequestBuilder request = new StubRequestBuilder();
        Node node = new Node(1, "", 8080);
        RequestAndCompletionHandler handler = new RequestAndCompletionHandler(this.time.milliseconds(), node, request, (RequestCompletionHandler)this.completionHandler);
        TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
        ClientRequest clientRequest = new ClientRequest("dest", request, 0, "1", 0L, true, 1000, handler.handler);
        Mockito.when((Object)this.networkClient.newClientRequest((String)ArgumentMatchers.eq((Object)"1"), (AbstractRequest.Builder)ArgumentMatchers.same((Object)handler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.eq((int)1000), (RequestCompletionHandler)ArgumentMatchers.same((Object)handler.handler))).thenReturn((Object)clientRequest);
        Mockito.when((Object)this.networkClient.ready(node, this.time.milliseconds())).thenReturn((Object)false);
        Mockito.when((Object)this.networkClient.connectionDelay((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)0L);
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(List.of());
        Mockito.when((Object)this.networkClient.connectionFailed(node)).thenReturn((Object)true);
        Mockito.when((Object)this.networkClient.authenticationException(node)).thenReturn((Object)new AuthenticationException(""));
        sendThread.enqueue(handler);
        sendThread.doWork();
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).newClientRequest((String)ArgumentMatchers.eq((Object)"1"), (AbstractRequest.Builder)ArgumentMatchers.same((Object)handler.request), ArgumentMatchers.anyLong(), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.eq((int)1000), (RequestCompletionHandler)ArgumentMatchers.same((Object)handler.handler));
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).ready((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).connectionDelay((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).connectionFailed((Node)ArgumentMatchers.any());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).authenticationException((Node)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Assertions.assertTrue((boolean)this.completionHandler.executedWithDisconnectedResponse);
    }

    @Test
    public void testFailingExpiredRequests() {
        StubRequestBuilder request = new StubRequestBuilder();
        Node node = new Node(1, "", 8080);
        RequestAndCompletionHandler handler = new RequestAndCompletionHandler(this.time.milliseconds(), node, request, (RequestCompletionHandler)this.completionHandler);
        TestInterBrokerSendThread sendThread = new TestInterBrokerSendThread();
        ClientRequest clientRequest = new ClientRequest("dest", request, 0, "1", this.time.milliseconds(), true, 1000, handler.handler);
        this.time.sleep(1500L);
        Mockito.when((Object)this.networkClient.newClientRequest((String)ArgumentMatchers.eq((Object)"1"), (AbstractRequest.Builder)ArgumentMatchers.same((Object)handler.request), ArgumentMatchers.eq((long)handler.creationTimeMs), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.eq((int)1000), (RequestCompletionHandler)ArgumentMatchers.same((Object)handler.handler))).thenReturn((Object)clientRequest);
        Mockito.when((Object)this.networkClient.ready(node, this.time.milliseconds())).thenReturn((Object)false);
        Mockito.when((Object)this.networkClient.connectionDelay((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong())).thenReturn((Object)0L);
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenReturn(List.of());
        Mockito.when((Object)this.networkClient.connectionFailed(node)).thenReturn((Object)false);
        sendThread.enqueue(handler);
        sendThread.doWork();
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).newClientRequest((String)ArgumentMatchers.eq((Object)"1"), (AbstractRequest.Builder)ArgumentMatchers.same((Object)handler.request), ArgumentMatchers.eq((long)handler.creationTimeMs), ArgumentMatchers.eq((boolean)true), ArgumentMatchers.eq((int)1000), (RequestCompletionHandler)ArgumentMatchers.same((Object)handler.handler));
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).ready((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).connectionDelay((Node)ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).connectionFailed((Node)ArgumentMatchers.any());
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Assertions.assertFalse((boolean)sendThread.hasUnsentRequests());
        Assertions.assertTrue((boolean)this.completionHandler.executedWithDisconnectedResponse);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testInterruption(boolean isShuttingDown) throws InterruptedException, IOException {
        InterruptedException interrupted = new InterruptedException();
        Mockito.when((Object)this.networkClient.poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong())).thenAnswer(t -> {
            throw interrupted;
        });
        AtomicReference exception = new AtomicReference();
        TestInterBrokerSendThread thread = new TestInterBrokerSendThread(this.networkClient, t -> {
            if (isShuttingDown) {
                Assertions.assertInstanceOf(InterruptedException.class, (Object)t);
            } else {
                Assertions.assertInstanceOf(FatalExitError.class, (Object)t);
            }
            exception.getAndSet(t);
        });
        if (isShuttingDown) {
            thread.shutdown();
        }
        thread.pollOnce(100L);
        ((KafkaClient)Mockito.verify((Object)this.networkClient)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
        if (isShuttingDown) {
            ((KafkaClient)Mockito.verify((Object)this.networkClient)).initiateClose();
            ((KafkaClient)Mockito.verify((Object)this.networkClient)).close();
        }
        Mockito.verifyNoMoreInteractions((Object[])new Object[]{this.networkClient});
        Assertions.assertNotNull(exception.get());
    }

    private static class StubCompletionHandler
    implements RequestCompletionHandler {
        public boolean executedWithDisconnectedResponse = false;
        ClientResponse response = null;

        private StubCompletionHandler() {
        }

        public void onComplete(ClientResponse response) {
            this.executedWithDisconnectedResponse = response.wasDisconnected();
            this.response = response;
        }
    }

    class TestInterBrokerSendThread
    extends InterBrokerSendThread {
        private final Consumer<Throwable> exceptionCallback;
        private final Queue<RequestAndCompletionHandler> queue;

        TestInterBrokerSendThread() {
            this(this$0.networkClient, t -> {
                throw t instanceof RuntimeException ? (RuntimeException)t : new RuntimeException((Throwable)t);
            });
        }

        TestInterBrokerSendThread(KafkaClient networkClient, Consumer<Throwable> exceptionCallback) {
            super("name", networkClient, 1000, InterBrokerSendThreadTest.this.time);
            this.queue = new ArrayDeque<RequestAndCompletionHandler>();
            this.exceptionCallback = exceptionCallback;
        }

        void enqueue(RequestAndCompletionHandler request) {
            this.queue.offer(request);
        }

        public Collection<RequestAndCompletionHandler> generateRequests() {
            return this.queue.isEmpty() ? List.of() : List.of(this.queue.poll());
        }

        protected void pollOnce(long maxTimeoutMs) {
            try {
                super.pollOnce(maxTimeoutMs);
            }
            catch (Throwable t) {
                this.exceptionCallback.accept(t);
            }
        }
    }

    private static class StubRequestBuilder<T extends AbstractRequest>
    extends AbstractRequest.Builder<T> {
        private StubRequestBuilder() {
            super(ApiKeys.END_TXN);
        }

        public T build(short version) {
            return null;
        }
    }
}

