/*
 * Decompiled with CFR 0.152.
 */
package kafka.log.remote;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import kafka.log.remote.RemoteLogManager;
import kafka.utils.TestUtils;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.FileRecords;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.util.MockTime;
import org.apache.kafka.server.util.Scheduler;
import org.apache.kafka.storage.internals.checkpoint.LeaderEpochCheckpointFile;
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
import org.apache.kafka.storage.internals.log.AsyncOffsetReadFutureHolder;
import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import scala.Option;

class RemoteLogOffsetReaderTest {
    private final MockTime time = new MockTime();
    private final TopicPartition topicPartition = new TopicPartition("test", 0);
    private Path logDir;
    private LeaderEpochFileCache cache;
    private MockRemoteLogManager rlm;

    RemoteLogOffsetReaderTest() {
    }

    @BeforeEach
    void setUp() throws IOException {
        this.logDir = Files.createTempDirectory("kafka-test", new FileAttribute[0]);
        LeaderEpochCheckpointFile checkpoint = new LeaderEpochCheckpointFile(TestUtils.tempFile(), new LogDirFailureChannel(1));
        this.cache = new LeaderEpochFileCache(this.topicPartition, checkpoint, (Scheduler)this.time.scheduler);
        this.rlm = new MockRemoteLogManager(2, 1, this.logDir.toString());
    }

    @AfterEach
    void tearDown() throws IOException {
        this.rlm.close();
        Utils.delete((File)this.logDir.toFile());
    }

    @Test
    public void testReadRemoteLog() throws Exception {
        AsyncOffsetReadFutureHolder asyncOffsetReadFutureHolder = this.rlm.asyncOffsetRead(this.topicPartition, this.time.milliseconds(), 0L, this.cache, Option::empty);
        asyncOffsetReadFutureHolder.taskFuture().get(1L, TimeUnit.SECONDS);
        Assertions.assertTrue((boolean)asyncOffsetReadFutureHolder.taskFuture().isDone());
        OffsetResultHolder.FileRecordsOrError result = (OffsetResultHolder.FileRecordsOrError)asyncOffsetReadFutureHolder.taskFuture().get();
        Assertions.assertFalse((boolean)result.hasException());
        Assertions.assertTrue((boolean)result.hasTimestampAndOffset());
        Assertions.assertEquals((Object)new FileRecords.TimestampAndOffset(100L, 90L, Optional.of(3)), result.timestampAndOffset().get());
    }

    @Test
    public void testTaskQueueFullAndCancelTask() throws Exception {
        this.rlm.pause();
        ArrayList<AsyncOffsetReadFutureHolder> holderList = new ArrayList<AsyncOffsetReadFutureHolder>();
        for (int i = 0; i < 3; ++i) {
            holderList.add(this.rlm.asyncOffsetRead(this.topicPartition, this.time.milliseconds(), 0L, this.cache, Option::empty));
        }
        Assertions.assertThrows(TimeoutException.class, () -> ((AsyncOffsetReadFutureHolder)holderList.get(0)).taskFuture().get(10L, TimeUnit.MILLISECONDS));
        Assertions.assertEquals((long)0L, (long)holderList.stream().filter(h -> h.taskFuture().isDone()).count());
        Assertions.assertThrows(RejectedExecutionException.class, () -> holderList.add(this.rlm.asyncOffsetRead(this.topicPartition, this.time.milliseconds(), 0L, this.cache, Option::empty)));
        ((AsyncOffsetReadFutureHolder)holderList.get(2)).jobFuture().cancel(false);
        this.rlm.resume();
        for (AsyncOffsetReadFutureHolder holder : holderList) {
            if (holder.jobFuture().isCancelled()) continue;
            holder.taskFuture().get(1L, TimeUnit.SECONDS);
        }
        Assertions.assertEquals((int)3, (int)holderList.size());
        Assertions.assertEquals((long)2L, (long)holderList.stream().filter(h -> h.taskFuture().isDone()).count());
        Assertions.assertEquals((long)1L, (long)holderList.stream().filter(h -> !h.taskFuture().isDone()).count());
    }

    @Test
    public void testThrowErrorOnFindOffsetByTimestamp() throws Exception {
        final RemoteStorageException exception = new RemoteStorageException("Error");
        try (MockRemoteLogManager rlm = new MockRemoteLogManager(2, 1, this.logDir.toString()){

            @Override
            public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicPartition tp, long timestamp, long startingOffset, LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
                throw exception;
            }
        };){
            AsyncOffsetReadFutureHolder futureHolder = rlm.asyncOffsetRead(this.topicPartition, this.time.milliseconds(), 0L, this.cache, Option::empty);
            futureHolder.taskFuture().get(1L, TimeUnit.SECONDS);
            Assertions.assertTrue((boolean)futureHolder.taskFuture().isDone());
            Assertions.assertTrue((boolean)((OffsetResultHolder.FileRecordsOrError)futureHolder.taskFuture().get()).hasException());
            Assertions.assertEquals((Object)((Object)exception), ((OffsetResultHolder.FileRecordsOrError)futureHolder.taskFuture().get()).exception().get());
        }
    }

    private static RemoteLogManagerConfig rlmConfig(int threads, int taskQueueSize) {
        Properties props = new Properties();
        props.put("remote.log.storage.system.enable", "true");
        props.put("remote.log.storage.manager.class.name", "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager");
        props.put("remote.log.metadata.manager.class.name", "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager");
        props.put("remote.log.reader.threads", (Object)threads);
        props.put("remote.log.reader.max.pending.tasks", (Object)taskQueueSize);
        AbstractConfig config = new AbstractConfig(RemoteLogManagerConfig.configDef(), (Map)props, false);
        return new RemoteLogManagerConfig(config);
    }

    private static class MockRemoteLogManager
    extends RemoteLogManager {
        private final ReadWriteLock lock = new ReentrantReadWriteLock();

        public MockRemoteLogManager(int threads, int taskQueueSize, String logDir) throws IOException {
            super(RemoteLogOffsetReaderTest.rlmConfig(threads, taskQueueSize), 1, logDir, "mock-cluster-id", (Time)new MockTime(), tp -> Optional.empty(), (tp, logStartOffset) -> {}, new BrokerTopicStats(true), new Metrics());
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public Optional<FileRecords.TimestampAndOffset> findOffsetByTimestamp(TopicPartition tp, long timestamp, long startingOffset, LeaderEpochFileCache leaderEpochCache) throws RemoteStorageException {
            this.lock.readLock().lock();
            try {
                Optional<FileRecords.TimestampAndOffset> optional = Optional.of(new FileRecords.TimestampAndOffset(100L, 90L, Optional.of(3)));
                return optional;
            }
            finally {
                this.lock.readLock().unlock();
            }
        }

        void pause() {
            this.lock.writeLock().lock();
        }

        void resume() {
            this.lock.writeLock().unlock();
        }
    }
}

