/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ManualMetadataUpdater;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MetadataUpdater;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.CommonFields;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ResponseHeader;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class NetworkClientTest {
    protected final int minRequestTimeoutMs = 1000;
    protected final MockTime time = new MockTime();
    protected final MockSelector selector = new MockSelector(this.time);
    protected final Metadata metadata = new Metadata(0L, Long.MAX_VALUE, true);
    protected final int nodeId = 1;
    protected final Cluster cluster = TestUtils.singletonCluster("test", 1);
    protected final Node node = (Node)this.cluster.nodes().get(0);
    protected final long reconnectBackoffMsTest = 10000L;
    protected final long reconnectBackoffMaxMsTest = 100000L;
    private final NetworkClient client = this.createNetworkClient(100000L);
    private final NetworkClient clientWithNoExponentialBackoff = this.createNetworkClient(10000L);
    private final NetworkClient clientWithStaticNodes = this.createNetworkClientWithStaticNodes();
    private final NetworkClient clientWithNoVersionDiscovery = this.createNetworkClientWithNoVersionDiscovery();

    private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
        return new NetworkClient((Selectable)this.selector, this.metadata, "mock", Integer.MAX_VALUE, 10000L, reconnectBackoffMaxMs, 65536, 65536, 1000, (Time)this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithStaticNodes() {
        return new NetworkClient((Selectable)this.selector, (MetadataUpdater)new ManualMetadataUpdater(Arrays.asList(this.node)), "mock-static", Integer.MAX_VALUE, 0L, 0L, 65536, 65536, 1000, (Time)this.time, true, new ApiVersions(), new LogContext());
    }

    private NetworkClient createNetworkClientWithNoVersionDiscovery() {
        return new NetworkClient((Selectable)this.selector, this.metadata, "mock", Integer.MAX_VALUE, 10000L, 100000L, 65536, 65536, 1000, (Time)this.time, false, new ApiVersions(), new LogContext());
    }

    @Before
    public void setup() {
        this.selector.reset();
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
    }

    @Test(expected=IllegalStateException.class)
    public void testSendToUnreadyNode() {
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Arrays.asList("test"), true);
        long now = this.time.milliseconds();
        ClientRequest request = this.client.newClientRequest("5", (AbstractRequest.Builder)builder, now, false);
        this.client.send(request, now);
        this.client.poll(1L, this.time.milliseconds());
    }

    @Test
    public void testSimpleRequestResponse() {
        this.checkSimpleRequestResponse(this.client);
    }

    @Test
    public void testSimpleRequestResponseWithStaticNodes() {
        this.checkSimpleRequestResponse(this.clientWithStaticNodes);
    }

    @Test
    public void testSimpleRequestResponseWithNoBrokerDiscovery() {
        this.checkSimpleRequestResponse(this.clientWithNoVersionDiscovery);
    }

    @Test
    public void testClose() {
        this.client.ready(this.node, this.time.milliseconds());
        this.awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue((String)"The client should be ready", (boolean)this.client.isReady(this.node, this.time.milliseconds()));
        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
        this.client.send(request, this.time.milliseconds());
        Assert.assertEquals((String)"There should be 1 in-flight request after send", (long)1L, (long)this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.client.close(this.node.idString());
        Assert.assertEquals((String)"There should be no in-flight request after close", (long)0L, (long)this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertFalse((boolean)this.client.hasInFlightRequests());
        Assert.assertFalse((String)"Connection should not be ready after close", (boolean)this.client.isReady(this.node, 0L));
    }

    private void checkSimpleRequestResponse(NetworkClient networkClient) {
        this.awaitReady(networkClient, this.node);
        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = networkClient.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        networkClient.send(request, this.time.milliseconds());
        networkClient.poll(1L, this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)networkClient.inFlightRequestCount());
        ResponseHeader respHeader = new ResponseHeader(request.correlationId());
        Struct resp = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
        resp.set("responses", (Object)new Object[0]);
        Struct responseHeaderStruct = respHeader.toStruct();
        int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
        ByteBuffer buffer = ByteBuffer.allocate(size);
        responseHeaderStruct.writeTo(buffer);
        resp.writeTo(buffer);
        buffer.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        List responses = networkClient.poll(1L, this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)responses.size());
        Assert.assertTrue((String)"The handler should have executed.", (boolean)handler.executed);
        Assert.assertTrue((String)"Should have a response body.", (boolean)handler.response.hasResponse());
        Assert.assertEquals((String)"Should be correlated to the original request", (long)request.correlationId(), (long)handler.response.requestHeader().correlationId());
    }

    private void setExpectedApiVersionsResponse(ApiVersionsResponse response) {
        short apiVersionsResponseVersion = response.apiVersion((short)ApiKeys.API_VERSIONS.id).maxVersion;
        ByteBuffer buffer = response.serialize(apiVersionsResponseVersion, new ResponseHeader(0));
        this.selector.delayedReceive(new DelayedReceive(this.node.idString(), new NetworkReceive(this.node.idString(), buffer)));
    }

    private void awaitReady(NetworkClient client, Node node) {
        if (client.discoverBrokerVersions()) {
            this.setExpectedApiVersionsResponse(ApiVersionsResponse.defaultApiVersionsResponse());
        }
        while (!client.ready(node, this.time.milliseconds())) {
            client.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
    }

    @Test
    public void testRequestTimeout() {
        this.awaitReady(this.client, this.node);
        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
        TestCallbackHandler handler = new TestCallbackHandler();
        int requestTimeoutMs = 6000;
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, requestTimeoutMs, (RequestCompletionHandler)handler);
        this.testRequestTimeout(request);
    }

    @Test
    public void testMinRequestTimeout() {
        this.awaitReady(this.client, this.node);
        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true);
        this.testRequestTimeout(request);
    }

    private void testRequestTimeout(ClientRequest request) {
        this.client.send(request, this.time.milliseconds());
        this.time.sleep(request.requestTimeoutMs() + 1);
        List responses = this.client.poll(0L, this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)responses.size());
        ClientResponse clientResponse = (ClientResponse)responses.get(0);
        Assert.assertEquals((Object)this.node.idString(), (Object)clientResponse.destination());
        Assert.assertTrue((String)"Expected response to fail due to disconnection", (boolean)clientResponse.wasDisconnected());
    }

    @Test
    public void testConnectionThrottling() {
        this.awaitReady(this.client, this.node);
        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        this.client.send(request, this.time.milliseconds());
        this.client.poll(1L, this.time.milliseconds());
        ResponseHeader respHeader = new ResponseHeader(request.correlationId());
        Struct resp = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
        resp.set("responses", (Object)new Object[0]);
        resp.set(CommonFields.THROTTLE_TIME_MS, 100);
        Struct responseHeaderStruct = respHeader.toStruct();
        int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
        ByteBuffer buffer = ByteBuffer.allocate(size);
        responseHeaderStruct.writeTo(buffer);
        resp.writeTo(buffer);
        buffer.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertFalse((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals((long)100L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
        this.time.sleep(50L);
        Assert.assertFalse((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals((long)50L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
        this.time.sleep(50L);
        Assert.assertTrue((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals((long)0L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
    }

    private ApiVersionsResponse createExpectedApiVersionsResponse(Node node, ApiKeys key, short apiVersionsMaxProtocolVersion) {
        ArrayList<ApiVersionsResponse.ApiVersion> versionList = new ArrayList<ApiVersionsResponse.ApiVersion>();
        for (ApiKeys apiKey : ApiKeys.values()) {
            if (apiKey == key) {
                versionList.add(new ApiVersionsResponse.ApiVersion(apiKey.id, 0, apiVersionsMaxProtocolVersion));
                continue;
            }
            versionList.add(new ApiVersionsResponse.ApiVersion(apiKey));
        }
        return new ApiVersionsResponse(0, Errors.NONE, versionList);
    }

    @Test
    public void testThrottlingNotEnabledForConnectionToOlderBroker() {
        this.setExpectedApiVersionsResponse(this.createExpectedApiVersionsResponse(this.node, ApiKeys.PRODUCE, (short)5));
        while (!this.client.ready(this.node, this.time.milliseconds())) {
            this.client.poll(1L, this.time.milliseconds());
        }
        this.selector.clear();
        ProduceRequest.Builder builder = ProduceRequest.Builder.forCurrentMagic((short)1, (int)1000, Collections.emptyMap());
        TestCallbackHandler handler = new TestCallbackHandler();
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, this.time.milliseconds(), true, 1000, (RequestCompletionHandler)handler);
        this.client.send(request, this.time.milliseconds());
        this.client.poll(1L, this.time.milliseconds());
        ResponseHeader respHeader = new ResponseHeader(request.correlationId());
        Struct resp = new Struct(ApiKeys.PRODUCE.responseSchema(ApiKeys.PRODUCE.latestVersion()));
        resp.set("responses", (Object)new Object[0]);
        resp.set(CommonFields.THROTTLE_TIME_MS, 100);
        Struct responseHeaderStruct = respHeader.toStruct();
        int size = responseHeaderStruct.sizeOf() + resp.sizeOf();
        ByteBuffer buffer = ByteBuffer.allocate(size);
        responseHeaderStruct.writeTo(buffer);
        resp.writeTo(buffer);
        buffer.flip();
        this.selector.completeReceive(new NetworkReceive(this.node.idString(), buffer));
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue((boolean)this.client.ready(this.node, this.time.milliseconds()));
        Assert.assertEquals((long)0L, (long)this.client.throttleDelayMs(this.node, this.time.milliseconds()));
    }

    @Test
    public void testLeastLoadedNode() {
        this.client.ready(this.node, this.time.milliseconds());
        this.awaitReady(this.client, this.node);
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertTrue((String)"The client should be ready", (boolean)this.client.isReady(this.node, this.time.milliseconds()));
        Node leastNode = this.client.leastLoadedNode(this.time.milliseconds());
        Assert.assertEquals((String)"There should be one leastloadednode", (long)leastNode.id(), (long)this.node.id());
        this.time.sleep(10000L);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1L, this.time.milliseconds());
        Assert.assertFalse((String)"After we forced the disconnection the client is no longer ready.", (boolean)this.client.ready(this.node, this.time.milliseconds()));
        leastNode = this.client.leastLoadedNode(this.time.milliseconds());
        Assert.assertNull((String)"There should be NO leastloadednode", (Object)leastNode);
    }

    @Test
    public void testConnectionDelayWithNoExponentialBackoff() {
        long now = this.time.milliseconds();
        long delay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, now);
        Assert.assertEquals((long)0L, (long)delay);
    }

    @Test
    public void testConnectionDelayConnectedWithNoExponentialBackoff() {
        this.awaitReady(this.clientWithNoExponentialBackoff, this.node);
        long now = this.time.milliseconds();
        long delay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, now);
        Assert.assertEquals((long)Long.MAX_VALUE, (long)delay);
    }

    @Test
    public void testConnectionDelayDisconnectedWithNoExponentialBackoff() {
        this.awaitReady(this.clientWithNoExponentialBackoff, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.clientWithNoExponentialBackoff.poll(1000L, this.time.milliseconds());
        long delay = this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds());
        Assert.assertEquals((long)10000L, (long)delay);
        this.time.sleep(delay);
        Assert.assertEquals((long)0L, (long)this.clientWithNoExponentialBackoff.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        Assert.assertEquals((long)10000L, (long)delay);
    }

    @Test
    public void testConnectionDelay() {
        long now = this.time.milliseconds();
        long delay = this.client.connectionDelay(this.node, now);
        Assert.assertEquals((long)0L, (long)delay);
    }

    @Test
    public void testConnectionDelayConnected() {
        this.awaitReady(this.client, this.node);
        long now = this.time.milliseconds();
        long delay = this.client.connectionDelay(this.node, now);
        Assert.assertEquals((long)Long.MAX_VALUE, (long)delay);
    }

    @Test
    public void testConnectionDelayDisconnected() {
        this.awaitReady(this.client, this.node);
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        long delay = this.client.connectionDelay(this.node, this.time.milliseconds());
        long expectedDelay = 10000L;
        double jitter = 0.3;
        Assert.assertEquals((double)expectedDelay, (double)delay, (double)((double)expectedDelay * jitter));
        this.time.sleep(delay);
        Assert.assertEquals((long)0L, (long)this.client.connectionDelay(this.node, this.time.milliseconds()));
        this.client.ready(this.node, this.time.milliseconds());
        this.selector.serverDisconnect(this.node.idString());
        this.client.poll(1000L, this.time.milliseconds());
        expectedDelay = Math.round(delay * 2L);
        delay = this.client.connectionDelay(this.node, this.time.milliseconds());
        jitter = 0.6;
        Assert.assertEquals((double)expectedDelay, (double)delay, (double)((double)expectedDelay * jitter));
    }

    @Test
    public void testDisconnectDuringUserMetadataRequest() {
        this.awaitReady(this.client, this.node);
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long now = this.time.milliseconds();
        ClientRequest request = this.client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true);
        this.client.send(request, now);
        this.client.poll(1000L, now);
        Assert.assertEquals((long)1L, (long)this.client.inFlightRequestCount(this.node.idString()));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue((boolean)this.client.hasInFlightRequests());
        this.selector.close(this.node.idString());
        List responses = this.client.poll(1000L, this.time.milliseconds());
        Assert.assertEquals((long)1L, (long)responses.size());
        Assert.assertTrue((boolean)((ClientResponse)responses.iterator().next()).wasDisconnected());
    }

    @Test
    public void testServerDisconnectAfterInternalApiVersionRequest() throws Exception {
        this.awaitInFlightApiVersionRequest();
        this.selector.serverDisconnect(this.node.idString());
        List responses = this.client.poll(0L, this.time.milliseconds());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        Assert.assertTrue((boolean)responses.isEmpty());
    }

    @Test
    public void testClientDisconnectAfterInternalApiVersionRequest() throws Exception {
        this.awaitInFlightApiVersionRequest();
        this.client.disconnect(this.node.idString());
        Assert.assertFalse((boolean)this.client.hasInFlightRequests(this.node.idString()));
        List responses = this.client.poll(0L, this.time.milliseconds());
        Assert.assertTrue((boolean)responses.isEmpty());
    }

    @Test
    public void testDisconnectWithMultipleInFlights() {
        NetworkClient client = this.clientWithNoVersionDiscovery;
        this.awaitReady(client, this.node);
        Assert.assertTrue((String)("Expected NetworkClient to be ready to send to node " + this.node.idString()), (boolean)client.isReady(this.node, this.time.milliseconds()));
        MetadataRequest.Builder builder = new MetadataRequest.Builder(Collections.emptyList(), true);
        long now = this.time.milliseconds();
        final ArrayList callbackResponses = new ArrayList();
        RequestCompletionHandler callback = new RequestCompletionHandler(){

            public void onComplete(ClientResponse response) {
                callbackResponses.add(response);
            }
        };
        ClientRequest request1 = client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true, 1000, callback);
        client.send(request1, now);
        client.poll(0L, now);
        ClientRequest request2 = client.newClientRequest(this.node.idString(), (AbstractRequest.Builder)builder, now, true, 1000, callback);
        client.send(request2, now);
        client.poll(0L, now);
        Assert.assertNotEquals((long)request1.correlationId(), (long)request2.correlationId());
        Assert.assertEquals((long)2L, (long)client.inFlightRequestCount());
        Assert.assertEquals((long)2L, (long)client.inFlightRequestCount(this.node.idString()));
        client.disconnect(this.node.idString());
        List responses = client.poll(0L, this.time.milliseconds());
        Assert.assertEquals((long)2L, (long)responses.size());
        Assert.assertEquals((Object)responses, callbackResponses);
        Assert.assertEquals((long)0L, (long)client.inFlightRequestCount());
        Assert.assertEquals((long)0L, (long)client.inFlightRequestCount(this.node.idString()));
        ClientResponse response1 = (ClientResponse)responses.get(0);
        Assert.assertTrue((boolean)response1.wasDisconnected());
        Assert.assertEquals((long)request1.correlationId(), (long)response1.requestHeader().correlationId());
        ClientResponse response2 = (ClientResponse)responses.get(1);
        Assert.assertTrue((boolean)response2.wasDisconnected());
        Assert.assertEquals((long)request2.correlationId(), (long)response2.requestHeader().correlationId());
    }

    @Test
    public void testCallDisconnect() throws Exception {
        this.awaitReady(this.client, this.node);
        Assert.assertTrue((String)("Expected NetworkClient to be ready to send to node " + this.node.idString()), (boolean)this.client.isReady(this.node, this.time.milliseconds()));
        Assert.assertFalse((String)("Did not expect connection to node " + this.node.idString() + " to be failed"), (boolean)this.client.connectionFailed(this.node));
        this.client.disconnect(this.node.idString());
        Assert.assertFalse((String)("Expected node " + this.node.idString() + " to be disconnected."), (boolean)this.client.isReady(this.node, this.time.milliseconds()));
        Assert.assertTrue((String)("Expected connection to node " + this.node.idString() + " to be failed after disconnect"), (boolean)this.client.connectionFailed(this.node));
        Assert.assertFalse((boolean)this.client.canConnect(this.node, this.time.milliseconds()));
        this.time.sleep(100000L);
        Assert.assertTrue((boolean)this.client.canConnect(this.node, this.time.milliseconds()));
        this.client.disconnect(this.node.idString());
        Assert.assertTrue((boolean)this.client.canConnect(this.node, this.time.milliseconds()));
    }

    private void awaitInFlightApiVersionRequest() throws Exception {
        this.client.ready(this.node, this.time.milliseconds());
        TestUtils.waitForCondition(new TestCondition(){

            @Override
            public boolean conditionMet() {
                NetworkClientTest.this.client.poll(0L, NetworkClientTest.this.time.milliseconds());
                return NetworkClientTest.this.client.hasInFlightRequests(NetworkClientTest.this.node.idString());
            }
        }, 1000L, "");
        Assert.assertFalse((boolean)this.client.isReady(this.node, this.time.milliseconds()));
    }

    private static class TestCallbackHandler
    implements RequestCompletionHandler {
        public boolean executed = false;
        public ClientResponse response;

        private TestCallbackHandler() {
        }

        public void onComplete(ClientResponse response) {
            this.executed = true;
            this.response = response;
        }
    }
}

