/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncBufferedMutator;
import org.apache.hadoop.hbase.client.AsyncBufferedMutatorImpl;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncConnectionImpl;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category(value={MediumTests.class, ClientTests.class})
public class TestAsyncBufferMutator {
    @ClassRule
    public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestAsyncBufferMutator.class);
    private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    private static TableName TABLE_NAME = TableName.valueOf((String)"async");
    private static TableName MULTI_REGION_TABLE_NAME = TableName.valueOf((String)"async-multi-region");
    private static byte[] CF = Bytes.toBytes((String)"cf");
    private static byte[] CQ = Bytes.toBytes((String)"cq");
    private static int COUNT = 100;
    private static byte[] VALUE = new byte[1024];
    private static AsyncConnection CONN;

    @BeforeClass
    public static void setUp() throws Exception {
        TEST_UTIL.startMiniCluster(1);
        TEST_UTIL.createTable(TABLE_NAME, CF);
        TEST_UTIL.createMultiRegionTable(MULTI_REGION_TABLE_NAME, CF);
        CONN = (AsyncConnection)ConnectionFactory.createAsyncConnection((Configuration)TEST_UTIL.getConfiguration()).get();
        Bytes.random((byte[])VALUE);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        CONN.close();
        TEST_UTIL.shutdownMiniCluster();
    }

    @Test
    public void testWithMultiRegionTable() throws InterruptedException {
        this.test(MULTI_REGION_TABLE_NAME);
    }

    @Test
    public void testWithSingleRegionTable() throws InterruptedException {
        this.test(TABLE_NAME);
    }

    private void test(TableName tableName) throws InterruptedException {
        ArrayList futures = new ArrayList();
        try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(tableName).setWriteBufferSize(16384L).build();){
            List fs = mutator.mutate(IntStream.range(0, COUNT / 2).mapToObj(i -> new Put(Bytes.toBytes((int)i)).addColumn(CF, CQ, VALUE)).collect(Collectors.toList()));
            fs.forEach(f -> {
                Void cfr_ignored_0 = (Void)f.join();
            });
            IntStream.range(COUNT / 2, COUNT).forEach(i -> futures.add(mutator.mutate((Mutation)new Put(Bytes.toBytes((int)i)).addColumn(CF, CQ, VALUE))));
            ((CompletableFuture)futures.get(0)).join();
            Thread.sleep(2000L);
            Assert.assertFalse((boolean)((CompletableFuture)futures.get(futures.size() - 1)).isDone());
        }
        futures.forEach(f -> {
            Void cfr_ignored_0 = (Void)f.join();
        });
        AsyncTable table = CONN.getTable(tableName);
        IntStream.range(0, COUNT).mapToObj(i -> new Get(Bytes.toBytes((int)i))).map(g -> (Result)table.get(g).join()).forEach(r -> Assert.assertArrayEquals((byte[])VALUE, (byte[])r.getValue(CF, CQ)));
    }

    @Test
    public void testClosedMutate() throws InterruptedException {
        AsyncBufferedMutator mutator = CONN.getBufferedMutator(TABLE_NAME);
        mutator.close();
        Put put = new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE);
        try {
            mutator.mutate((Mutation)put).get();
            Assert.fail((String)"Close check failed");
        }
        catch (ExecutionException e) {
            MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IOException.class));
            Assert.assertTrue((boolean)e.getCause().getMessage().startsWith("Already closed"));
        }
        for (CompletableFuture f : mutator.mutate(Arrays.asList(put))) {
            try {
                f.get();
                Assert.fail((String)"Close check failed");
            }
            catch (ExecutionException e) {
                MatcherAssert.assertThat((Object)e.getCause(), (Matcher)CoreMatchers.instanceOf(IOException.class));
                Assert.assertTrue((boolean)e.getCause().getMessage().startsWith("Already closed"));
            }
        }
    }

    @Test
    public void testNoPeriodicFlush() throws InterruptedException, ExecutionException {
        try (AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME).disableWriteBufferPeriodicFlush().build();){
            Put put = new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE);
            CompletableFuture future = mutator.mutate((Mutation)put);
            Thread.sleep(2000L);
            Assert.assertFalse((boolean)future.isDone());
            mutator.flush();
            future.get();
        }
        AsyncTable table = CONN.getTable(TABLE_NAME);
        Assert.assertArrayEquals((byte[])VALUE, (byte[])((Result)table.get(new Get(Bytes.toBytes((int)0))).get()).getValue(CF, CQ));
    }

    @Test
    public void testPeriodicFlush() throws InterruptedException, ExecutionException {
        AsyncBufferedMutator mutator = CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).build();
        Put put = new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE);
        CompletableFuture future = mutator.mutate((Mutation)put);
        future.get();
        AsyncTable table = CONN.getTable(TABLE_NAME);
        Assert.assertArrayEquals((byte[])VALUE, (byte[])((Result)table.get(new Get(Bytes.toBytes((int)0))).get()).getValue(CF, CQ));
    }

    @Test
    public void testCancelPeriodicFlush() throws InterruptedException, ExecutionException {
        Put put = new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE);
        try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl)CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).setWriteBufferSize(10L * put.heapSize()).build();){
            ArrayList<CompletableFuture> futures = new ArrayList<CompletableFuture>();
            futures.add(mutator.mutate((Mutation)put));
            Timeout task = mutator.periodicFlushTask;
            Assert.assertNotNull((Object)task);
            int i = 1;
            while (true) {
                futures.add(mutator.mutate((Mutation)new Put(Bytes.toBytes((int)i)).addColumn(CF, CQ, VALUE)));
                if (mutator.periodicFlushTask == null) break;
                ++i;
            }
            Assert.assertTrue((boolean)task.isCancelled());
            CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
            AsyncTable table = CONN.getTable(TABLE_NAME);
            for (int i2 = 0; i2 < futures.size(); ++i2) {
                Assert.assertArrayEquals((byte[])VALUE, (byte[])((Result)table.get(new Get(Bytes.toBytes((int)i2))).get()).getValue(CF, CQ));
            }
        }
    }

    @Test
    public void testCancelPeriodicFlushByManuallyFlush() throws InterruptedException, ExecutionException {
        try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl)CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).build();){
            CompletableFuture future = mutator.mutate((Mutation)new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE));
            Timeout task = mutator.periodicFlushTask;
            Assert.assertNotNull((Object)task);
            mutator.flush();
            Assert.assertTrue((boolean)task.isCancelled());
            future.get();
            AsyncTable table = CONN.getTable(TABLE_NAME);
            Assert.assertArrayEquals((byte[])VALUE, (byte[])((Result)table.get(new Get(Bytes.toBytes((int)0))).get()).getValue(CF, CQ));
        }
    }

    @Test
    public void testCancelPeriodicFlushByClose() throws InterruptedException, ExecutionException {
        Timeout task;
        CompletableFuture future;
        try (AsyncBufferedMutatorImpl mutator = (AsyncBufferedMutatorImpl)CONN.getBufferedMutatorBuilder(TABLE_NAME).setWriteBufferPeriodicFlush(1L, TimeUnit.SECONDS).build();){
            future = mutator.mutate((Mutation)new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE));
            task = mutator.periodicFlushTask;
            Assert.assertNotNull((Object)task);
        }
        Assert.assertTrue((boolean)task.isCancelled());
        future.get();
        AsyncTable table = CONN.getTable(TABLE_NAME);
        Assert.assertArrayEquals((byte[])VALUE, (byte[])((Result)table.get(new Get(Bytes.toBytes((int)0))).get()).getValue(CF, CQ));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRaceBetweenNormalFlushAndPeriodicFlush() throws InterruptedException, ExecutionException {
        Put put = new Put(Bytes.toBytes((int)0)).addColumn(CF, CQ, VALUE);
        try (AsyncBufferMutatorForTest mutator = new AsyncBufferMutatorForTest(AsyncConnectionImpl.RETRY_TIMER, CONN.getTable(TABLE_NAME), 10L * put.heapSize(), TimeUnit.MILLISECONDS.toNanos(200L), 0x100000);){
            CompletableFuture future = mutator.mutate((Mutation)put);
            Timeout task = mutator.periodicFlushTask;
            Assert.assertNotNull((Object)task);
            AsyncBufferMutatorForTest asyncBufferMutatorForTest = mutator;
            synchronized (asyncBufferMutatorForTest) {
                Thread.sleep(500L);
                Assert.assertTrue((boolean)task.isExpired());
                Assert.assertEquals((long)0L, (long)mutator.flushCount);
                Assert.assertFalse((boolean)future.isDone());
                mutator.flush();
            }
            Assert.assertFalse((boolean)task.isCancelled());
            future.get();
            AsyncTable table = CONN.getTable(TABLE_NAME);
            Assert.assertArrayEquals((byte[])VALUE, (byte[])((Result)table.get(new Get(Bytes.toBytes((int)0))).get()).getValue(CF, CQ));
            Assert.assertEquals((long)1L, (long)mutator.flushCount);
        }
    }

    private static final class AsyncBufferMutatorForTest
    extends AsyncBufferedMutatorImpl {
        private int flushCount;

        AsyncBufferMutatorForTest(HashedWheelTimer periodicalFlushTimer, AsyncTable<?> table, long writeBufferSize, long periodicFlushTimeoutNs, int maxKeyValueSize) {
            super(periodicalFlushTimer, table, writeBufferSize, periodicFlushTimeoutNs, maxKeyValueSize);
        }

        protected void internalFlush() {
            ++this.flushCount;
            super.internalFlush();
        }
    }
}

