/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.common.requests;

import java.lang.constant.Constable;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.RequestLogFilter;
import org.apache.kafka.common.requests.SamplingRequestLogFilter;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogAction;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SampledLogAction;
import org.apache.kafka.common.utils.SlowLogAction;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class SamplingRequestLogFilterTest {
    private final MockTime time = new MockTime();

    @Test
    public void testAdminRequestSampling() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        Iterator<Request> createTopicGenerator = this.fixedIntervalRequestGenerator(ApiKeys.CREATE_TOPICS, Duration.ofSeconds(1L)).iterator();
        filter.configure(Collections.singletonMap("confluent.request.log.samples.per.min", 10));
        List skippedCreateTopicsRequests = this.filterForDuration(createTopicGenerator, Duration.ofMinutes(1L), request -> !filter.shouldLogRequest(((Request)request).context, ((Request)request).time)).collect(Collectors.toList());
        Assertions.assertEquals(Collections.emptyList(), skippedCreateTopicsRequests);
        HashMap<String, Constable> configs = new HashMap<String, Constable>();
        configs.put("confluent.request.log.samples.per.min", Integer.valueOf(0));
        configs.put("confluent.request.log.enable.admin.apis", Boolean.valueOf(false));
        filter.configure(configs);
        List sampledCreateTopicsRequests = this.sampleForDuration(filter, createTopicGenerator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(Collections.emptyList(), sampledCreateTopicsRequests);
    }

    @Test
    public void testDefaultSampledRequestsAugmentRequestLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        filter.configure(Collections.singletonMap("confluent.request.log.samples.per.min", 10));
        this.testSampledRequestsAugmentRequestLog(filter, true);
    }

    @Test
    public void testApiSampledRequestsAugmentRequestLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        filter.configure(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=10"));
        this.testSampledRequestsAugmentRequestLog(filter, false);
    }

    private void testSampledRequestsAugmentRequestLog(SamplingRequestLogFilter filter, boolean isDefaultSampled) {
        Iterator<Request> generator = this.fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofMillis(100L)).iterator();
        Sampling initialSample = this.runUntilFirstSample(generator, Duration.ofMinutes(1L), (RequestLogFilter)filter);
        Assertions.assertNotNull((Object)initialSample);
        Assertions.assertEquals((Object)isDefaultSampled, (Object)((Sampling)initialSample).action.isDefaultSampled);
        Assertions.assertEquals((long)0L, (long)((Sampling)initialSample).action.nanosSinceLastSample);
        Assertions.assertEquals((long)0L, (long)((Sampling)initialSample).action.requestsSinceLastSample);
        this.filterForDuration(generator, Duration.ofMinutes(1L), request -> filter.shouldLogRequest(((Request)request).context, ((Request)request).time));
        Sampling sample1 = this.runUntilFirstSample(generator, Duration.ofMinutes(1L), (RequestLogFilter)filter);
        Assertions.assertNotNull((Object)sample1);
        Sampling sample2 = this.runUntilFirstSample(generator, Duration.ofMinutes(1L), (RequestLogFilter)filter);
        Assertions.assertNotNull((Object)sample2);
        Assertions.assertEquals((Object)isDefaultSampled, (Object)((Sampling)sample2).action.isDefaultSampled);
        Assertions.assertEquals((long)sample2.skipped.size(), (long)((Sampling)sample2).action.requestsSinceLastSample);
        Assertions.assertEquals((long)(sample2.sample.time - sample1.sample.time), (long)((Sampling)sample2).action.nanosSinceLastSample);
    }

    private Sampling runUntilFirstSample(Iterator<Request> requestGenerator, Duration maxDuration, RequestLogFilter filter) {
        ArrayList<Request> skipped = new ArrayList<Request>();
        long endTimeMs = this.time.milliseconds() + maxDuration.toMillis();
        while (this.time.milliseconds() <= endTimeMs) {
            Request request = requestGenerator.next();
            long requestTimeNanos = Math.max(0L, request.time - this.time.nanoseconds());
            this.time.sleep(Duration.ofNanos(requestTimeNanos).toMillis());
            LogAction action = filter.processRequest(request.context, request.time);
            if (action.shouldLog()) {
                Assertions.assertTrue((boolean)(action instanceof SampledLogAction));
                return new Sampling(request, (SampledLogAction)action, skipped);
            }
            skipped.add(request);
        }
        return null;
    }

    @Test
    public void testDefaultSampling() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        filter.configure(Collections.singletonMap("confluent.request.log.samples.per.min", 10));
        Iterator<Request> generator = this.fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(1L)).iterator();
        List<Request> firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)10, (int)firstMinute.size());
        this.assertSampleInterval(firstMinute, Duration.ofSeconds(1L));
        List<Request> secondMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)10, (int)secondMinute.size());
        this.assertSampleInterval(secondMinute, Duration.ofSeconds(6L));
    }

    @Test
    public void testOverrideSamplingNoDefault() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        filter.configure(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=10,Produce=6"));
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(5L)), this.fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)16, (int)firstMinute.size());
        List<Request> firstMinuteSampledProduceRequests = firstMinute.stream().filter(req -> ((Request)req).context.header.apiKey() == ApiKeys.PRODUCE).collect(Collectors.toList());
        List<Request> firstMinuteSampledMetadataRequests = firstMinute.stream().filter(req -> ((Request)req).context.header.apiKey() == ApiKeys.METADATA).collect(Collectors.toList());
        Assertions.assertEquals((int)6, (int)firstMinuteSampledProduceRequests.size());
        Assertions.assertEquals((int)10, (int)firstMinuteSampledMetadataRequests.size());
        this.assertSampleInterval(firstMinuteSampledProduceRequests, Duration.ofSeconds(5L));
        this.assertSampleInterval(firstMinuteSampledMetadataRequests, Duration.ofSeconds(1L));
        List secondMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)16, (int)secondMinute.size());
        List<Request> secondMinuteSampledProduceRequests = secondMinute.stream().filter(req -> ((Request)req).context.header.apiKey() == ApiKeys.PRODUCE).collect(Collectors.toList());
        List<Request> secondMinuteSampledMetadataRequests = secondMinute.stream().filter(req -> ((Request)req).context.header.apiKey() == ApiKeys.METADATA).collect(Collectors.toList());
        Assertions.assertEquals((int)6, (int)secondMinuteSampledProduceRequests.size());
        Assertions.assertEquals((int)10, (int)secondMinuteSampledMetadataRequests.size());
        this.assertSampleInterval(secondMinuteSampledProduceRequests, Duration.ofSeconds(10L));
        this.assertSampleInterval(secondMinuteSampledMetadataRequests, Duration.ofSeconds(6L));
    }

    @Test
    public void testDisabledApi() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.request.log.api.samples.per.min", "Metadata=0");
        configs.put("confluent.request.log.samples.per.min", "10");
        filter.configure(configs);
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(5L)), this.fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)10, (int)firstMinute.size());
        Assertions.assertTrue((boolean)firstMinute.stream().allMatch(req -> ((Request)req).context.header.apiKey() != ApiKeys.METADATA));
    }

    @Test
    public void testDisabledAdminApi() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.request.log.api.samples.per.min", "CreateTopics=0");
        configs.put("confluent.request.log.enable.admin.apis", "true");
        filter.configure(configs);
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.CREATE_TOPICS, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.DELETE_TOPICS, Duration.ofSeconds(5L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertTrue((firstMinute.size() > 0 ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)firstMinute.stream().allMatch(req -> ((Request)req).context.header.apiKey() == ApiKeys.DELETE_TOPICS));
    }

    @Test
    public void testDisabledSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.request.log.enable.slowlog", "false");
        filter.configure(configs);
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertTrue((firstMinute.size() == 0 ? 1 : 0) != 0);
    }

    @Test
    public void testSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.log.samples.per.min", "0");
        configs.put("confluent.request.log.enable.admin.apis", "false");
        filter.configure(configs);
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.CREATE_TOPICS, Duration.ofSeconds(10L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertTrue((firstMinute.size() > 0 ? 1 : 0) != 0);
        Assertions.assertTrue((boolean)firstMinute.stream().allMatch(req -> EnumSet.of(ApiKeys.PRODUCE).contains(((Request)req).context.header.apiKey())));
    }

    @Test
    public void testSlowLogAndSampling() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        Iterator<Request> createTopicGenerator = this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)).iterator();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.samples.per.min", 10);
        configs.put("confluent.request.log.enable.slowlog", "true");
        filter.configure(configs);
        List skippedProduceRequests = this.filterForDuration(createTopicGenerator, Duration.ofMinutes(1L), request -> !filter.shouldLogRequest(((Request)request).context, ((Request)request).time)).collect(Collectors.toList());
        Assertions.assertEquals(Collections.emptyList(), skippedProduceRequests);
        configs = new HashMap();
        configs.put("confluent.request.log.samples.per.min", 0);
        configs.put("confluent.request.log.enable.slowlog", "false");
        filter.configure(configs);
        List sampledProduceRequests = this.sampleForDuration(filter, createTopicGenerator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals(Collections.emptyList(), sampledProduceRequests);
    }

    @Test
    public void TestApiSamplingOverrideWithSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.request.log.api.samples.per.min", "Produce=0");
        configs.put("confluent.request.log.enable.slowlog", "true");
        filter.configure(configs);
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertTrue((boolean)firstMinute.isEmpty());
    }

    @Test
    public void TestTracesDisabledForSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.samples.per.min", 0);
        configs.put("confluent.request.log.enable.slowlog", "true");
        filter.configure(configs);
        Iterator generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)))).iterator();
        Request request = (Request)generator.next();
        LogAction action = filter.processRequest(request.context, request.time);
        Assertions.assertFalse((boolean)action.shouldCaptureTrace());
        Assertions.assertTrue((boolean)action.shouldLog());
        Assertions.assertTrue((boolean)(action instanceof SlowLogAction));
    }

    @Test
    public void TestNoThresholdOverrideAndNoMinP99ThresholdSetForSlowLog_defaultCurrentP99() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.log.samples.per.min", 1);
        filter.configure(configs);
        Iterator generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)))).iterator();
        Request defaultSampledRequest = (Request)generator.next();
        filter.processRequest(defaultSampledRequest.context, defaultSampledRequest.time);
        Request request = (Request)generator.next();
        LogAction action = filter.processRequest(request.context, request.time);
        Assertions.assertTrue((boolean)(action instanceof SlowLogAction));
        SlowLogAction.SlowLogCheckResponse shouldLogResult = ((SlowLogAction)action).shouldLogSlowRequests(200.0, Double.valueOf(100.0), 2L);
        Assertions.assertTrue((boolean)shouldLogResult.shouldLog);
        Assertions.assertEquals((double)100.0, (double)shouldLogResult.slowLogThreshold);
    }

    @Test
    public void TestThresholdOverrideSetForSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        Iterator generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.slowlog.threshold.override", 98);
        configs.put("confluent.request.log.samples.per.min", 1);
        filter.reconfigure(configs);
        Request defaultSampledRequest = (Request)generator.next();
        filter.processRequest(defaultSampledRequest.context, defaultSampledRequest.time);
        Request request = (Request)generator.next();
        LogAction action = filter.processRequest(request.context, request.time);
        SlowLogAction.SlowLogCheckResponse shouldLogResult = ((SlowLogAction)action).shouldLogSlowRequests(99.0, Double.valueOf(100.0), 2L);
        Assertions.assertTrue((boolean)shouldLogResult.shouldLog);
        Assertions.assertEquals((double)98.0, (double)shouldLogResult.slowLogThreshold);
    }

    @Test
    public void TestNoThresholdOverride_minP99ThresholdSetForSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        Iterator generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.slowlog.threshold.p99.min", 150);
        configs.put("confluent.request.log.samples.per.min", 1);
        filter.reconfigure(configs);
        Request defaultSampledRequest = (Request)generator.next();
        LogAction action1 = filter.processRequest(defaultSampledRequest.context, defaultSampledRequest.time);
        Request request = (Request)generator.next();
        LogAction action = filter.processRequest(request.context, request.time);
        SlowLogAction.SlowLogCheckResponse shouldLogResult = ((SlowLogAction)action).shouldLogSlowRequests(102.0, Double.valueOf(100.0), 2L);
        Assertions.assertFalse((boolean)shouldLogResult.shouldLog);
        Assertions.assertEquals((double)150.0, (double)shouldLogResult.slowLogThreshold);
    }

    @Test
    public void TestThresholdOverrideAndMinP99ThresholdSetForSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        Iterator generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(1L)), this.fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.log.samples.per.min", 1);
        configs.put("confluent.request.slowlog.threshold.override", 98);
        configs.put("confluent.request.slowlog.threshold.p99.min", 150);
        filter.reconfigure(configs);
        Request defaultSampledRequest = (Request)generator.next();
        LogAction action1 = filter.processRequest(defaultSampledRequest.context, defaultSampledRequest.time);
        Request request = (Request)generator.next();
        LogAction action = filter.processRequest(request.context, request.time);
        SlowLogAction.SlowLogCheckResponse shouldLogResult = ((SlowLogAction)action).shouldLogSlowRequests(100.0, Double.valueOf(101.0), 2L);
        Assertions.assertTrue((boolean)shouldLogResult.shouldLog);
        Assertions.assertEquals((double)98.0, (double)shouldLogResult.slowLogThreshold);
    }

    @Test
    public void TestRequestRateNotExceedingLimitForSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.log.samples.per.min", 20);
        filter.configure(configs);
        Iterator<Request> generator = this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(2L)).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)32, (int)firstMinute.size());
        List requestsAfterDefaultSampling = firstMinute.stream().filter(request -> {
            LogAction action = filter.processRequest(((Request)request).context, ((Request)request).time);
            if (action instanceof SlowLogAction) {
                return ((SlowLogAction)action).maybeSample(2L).shouldLog();
            }
            return false;
        }).collect(Collectors.toList());
        Assertions.assertEquals((int)13, (int)requestsAfterDefaultSampling.size());
    }

    @Test
    public void TestRequestRateExceedingLimitForSlowLog() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("confluent.request.log.enable.slowlog", "true");
        configs.put("confluent.request.log.samples.per.min", 10);
        filter.configure(configs);
        Iterator<Request> generator = this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(2L)).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)32, (int)firstMinute.size());
        int slowLogSize = (int)firstMinute.stream().filter(request -> {
            LogAction action = filter.processRequest(((Request)request).context, ((Request)request).time);
            return action instanceof SlowLogAction;
        }).count();
        Assertions.assertEquals((int)22, (int)slowLogSize);
        List requestsAfterDefaultSampling = firstMinute.stream().filter(request -> {
            LogAction action = filter.processRequest(((Request)request).context, ((Request)request).time);
            if (action instanceof SlowLogAction) {
                return ((SlowLogAction)action).maybeSample(2L).shouldLog();
            }
            return false;
        }).collect(Collectors.toList());
        Assertions.assertEquals((int)10, (int)requestsAfterDefaultSampling.size());
    }

    @Test
    public void testOverrideSamplingWithDefault() {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        HashMap<String, String> configs = new HashMap<String, String>();
        configs.put("confluent.request.log.api.samples.per.min", "Produce=6");
        configs.put("confluent.request.log.samples.per.min", "10");
        filter.configure(configs);
        Iterator<Request> generator = this.joinAll(Arrays.asList(this.fixedIntervalRequestGenerator(ApiKeys.METADATA, Duration.ofSeconds(2L)), this.fixedIntervalRequestGenerator(ApiKeys.PRODUCE, Duration.ofSeconds(5L)), this.fixedIntervalRequestGenerator(ApiKeys.FETCH, Duration.ofSeconds(1L)))).iterator();
        List firstMinute = this.sampleForDuration(filter, generator, Duration.ofMinutes(1L)).collect(Collectors.toList());
        Assertions.assertEquals((int)16, (int)firstMinute.size());
        List secondMinuteSampledProduceRequests = firstMinute.stream().filter(req -> ((Request)req).context.header.apiKey() == ApiKeys.PRODUCE).collect(Collectors.toList());
        List secondMinuteSampledDefaultRequests = firstMinute.stream().filter(req -> ((Request)req).context.header.apiKey() != ApiKeys.PRODUCE).collect(Collectors.toList());
        Assertions.assertEquals((int)6, (int)secondMinuteSampledProduceRequests.size());
        Assertions.assertEquals((int)10, (int)secondMinuteSampledDefaultRequests.size());
    }

    @Test
    public void testInvalidDefaultConfigs() {
        this.assertThrowsConfigException(Collections.singletonMap("confluent.request.log.samples.per.min", "blah"));
        this.assertThrowsConfigException(Collections.singletonMap("confluent.request.log.samples.per.min", "null"));
        this.assertThrowsConfigException(Collections.singletonMap("confluent.request.log.samples.per.min", "1.3"));
        this.assertThrowsConfigException(Collections.singletonMap("confluent.request.log.api.samples.per.min", "MD=5"));
        this.assertThrowsConfigException(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=5,Fetch=dd"));
        this.assertThrowsConfigException(Collections.singletonMap("confluent.request.log.api.samples.per.min", "Metadata=5,blah,"));
    }

    private void assertThrowsConfigException(Map<String, ?> configs) {
        SamplingRequestLogFilter filter = new SamplingRequestLogFilter();
        Assertions.assertThrows(ConfigException.class, () -> filter.validateReconfiguration(configs));
        Assertions.assertThrows(ConfigException.class, () -> filter.configure(configs));
        Assertions.assertThrows(ConfigException.class, () -> filter.reconfigure(configs));
    }

    private RequestContext newRequestContext(ApiKeys apiKey) {
        RequestHeader header = new RequestHeader(apiKey, apiKey.latestVersion(), "clientId", 1);
        return new RequestContext(header, "cxnId", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, new ListenerName("PLAINTEXT"), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false, Optional.empty());
    }

    private Stream<Request> fixedIntervalRequestGenerator(ApiKeys api, Duration interval) {
        AtomicLong nextArrival = new AtomicLong(this.time.nanoseconds());
        return Stream.generate(() -> {
            Request request = new Request(this.newRequestContext(api), nextArrival.get());
            nextArrival.set(request.time + interval.toNanos());
            return request;
        });
    }

    private Stream<Request> sampleForDuration(SamplingRequestLogFilter sampler, Iterator<Request> requestGenerator, Duration duration) {
        return this.filterForDuration(requestGenerator, duration, request -> sampler.shouldLogRequest(((Request)request).context, ((Request)request).time));
    }

    private Stream<Request> filterForDuration(Iterator<Request> requestGenerator, Duration duration, Predicate<Request> filter) {
        long endTimeMs = this.time.milliseconds() + duration.toMillis();
        Stream.Builder<Request> streamBuilder = Stream.builder();
        while (this.time.milliseconds() <= endTimeMs) {
            Request request = requestGenerator.next();
            long requestTimeNanos = Math.max(0L, request.time - this.time.nanoseconds());
            this.time.sleep(Duration.ofNanos(requestTimeNanos).toMillis());
            if (!filter.test(request)) continue;
            streamBuilder.accept(request);
        }
        return streamBuilder.build();
    }

    private Stream<Long> sampleIntervals(List<Request> samples) {
        Stream.Builder<Long> streamBuilder = Stream.builder();
        Long lastSampleTime = null;
        for (Request sample : samples) {
            if (lastSampleTime != null) {
                streamBuilder.accept(sample.time - lastSampleTime);
            }
            lastSampleTime = sample.time;
        }
        return streamBuilder.build();
    }

    private void assertSampleInterval(List<Request> samples, Duration expectedInterval) {
        Assertions.assertTrue((boolean)this.sampleIntervals(samples).allMatch(intervalNanos -> Duration.ofNanos(intervalNanos).equals(expectedInterval)));
    }

    private Stream<Request> join(Stream<Request> s1, Stream<Request> s2) {
        AtomicReference next1 = new AtomicReference();
        AtomicReference next2 = new AtomicReference();
        Iterator iter1 = s1.iterator();
        Iterator iter2 = s2.iterator();
        return Stream.generate(() -> {
            Request r2;
            Request r1 = next1.getAndSet(null);
            if (r1 == null) {
                r1 = (Request)iter1.next();
            }
            if ((r2 = (Request)next2.getAndSet(null)) == null) {
                r2 = (Request)iter2.next();
            }
            if (r1.time < r2.time) {
                next2.set(r2);
                return r1;
            }
            next1.set(r1);
            return r2;
        });
    }

    private Stream<Request> joinAll(Collection<Stream<Request>> streams) {
        Iterator<Stream<Request>> iter = streams.iterator();
        if (!iter.hasNext()) {
            return Stream.empty();
        }
        Stream<Request> all = iter.next();
        while (iter.hasNext()) {
            all = this.join(all, iter.next());
        }
        return all;
    }

    private static class Sampling {
        private final Request sample;
        private final SampledLogAction action;
        private final List<Request> skipped;

        private Sampling(Request sample, SampledLogAction action, List<Request> skipped) {
            this.sample = sample;
            this.action = action;
            this.skipped = skipped;
        }
    }

    private static class Request {
        private final RequestContext context;
        private final long time;

        private Request(RequestContext context, long time) {
            this.context = context;
            this.time = time;
        }
    }
}

