/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.disk.iomanager;

import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.disk.iomanager.ReadRequest;
import org.apache.flink.runtime.io.disk.iomanager.RequestQueue;
import org.apache.flink.runtime.io.disk.iomanager.WriteRequest;
import org.assertj.core.api.AbstractIntegerAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

class IOManagerAsyncTest {
    private IOManagerAsync ioManager;

    IOManagerAsyncTest() {
    }

    @BeforeEach
    void beforeTest() {
        this.ioManager = new IOManagerAsync();
    }

    @AfterEach
    void afterTest() throws Exception {
        this.ioManager.close();
    }

    @Test
    void channelReadWriteOneSegment() throws Exception {
        int NUM_IOS = 1111;
        FileIOChannel.ID channelID = this.ioManager.createChannel();
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
        MemorySegment memSeg = MemorySegmentFactory.allocateUnpooledSegment((int)32768);
        for (int i = 0; i < 1111; ++i) {
            for (int pos = 0; pos < memSeg.size(); pos += 4) {
                memSeg.putInt(pos, i);
            }
            writer.writeBlock((Object)memSeg);
            memSeg = (MemorySegment)writer.getNextReturnedBlock();
        }
        writer.close();
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID);
        for (int i = 0; i < 1111; ++i) {
            reader.readBlock((Object)memSeg);
            memSeg = (MemorySegment)reader.getNextReturnedBlock();
            for (int pos = 0; pos < memSeg.size(); pos += 4) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)memSeg.getInt(pos)).withFailMessage("Read memory segment contains invalid data.", new Object[0])).isEqualTo(i);
            }
        }
        reader.closeAndDelete();
    }

    @Test
    void channelReadWriteMultipleSegments() throws Exception {
        int NUM_IOS = 1111;
        int NUM_SEGS = 16;
        ArrayList<Object> memSegs = new ArrayList<Object>();
        for (int i = 0; i < 16; ++i) {
            memSegs.add(MemorySegmentFactory.allocateUnpooledSegment((int)32768));
        }
        FileIOChannel.ID channelID = this.ioManager.createChannel();
        BlockChannelWriter writer = this.ioManager.createBlockChannelWriter(channelID);
        for (int i = 0; i < 1111; ++i) {
            MemorySegment memSeg = memSegs.isEmpty() ? (MemorySegment)writer.getNextReturnedBlock() : (MemorySegment)memSegs.remove(memSegs.size() - 1);
            for (int pos = 0; pos < memSeg.size(); pos += 4) {
                memSeg.putInt(pos, i);
            }
            writer.writeBlock((Object)memSeg);
        }
        writer.close();
        while (memSegs.size() < 16) {
            memSegs.add(writer.getNextReturnedBlock());
        }
        BlockChannelReader reader = this.ioManager.createBlockChannelReader(channelID);
        while (!memSegs.isEmpty()) {
            reader.readBlock(memSegs.remove(0));
        }
        for (int i = 0; i < 1111; ++i) {
            MemorySegment memSeg = (MemorySegment)reader.getNextReturnedBlock();
            for (int pos = 0; pos < memSeg.size(); pos += 4) {
                ((AbstractIntegerAssert)Assertions.assertThat((int)memSeg.getInt(pos)).withFailMessage("Read memory segment contains invalid data.", new Object[0])).isEqualTo(i);
            }
            reader.readBlock((Object)memSeg);
        }
        reader.closeAndDelete();
        while (memSegs.size() < 16) {
            memSegs.add(reader.getNextReturnedBlock());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testExceptionPropagationReader() throws Exception {
        final AtomicBoolean handlerCalled = new AtomicBoolean();
        final AtomicBoolean exceptionForwarded = new AtomicBoolean();
        ReadRequest req = new ReadRequest(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void requestDone(IOException ioex) {
                if (ioex instanceof TestIOException) {
                    exceptionForwarded.set(true);
                }
                AtomicBoolean atomicBoolean = handlerCalled;
                synchronized (atomicBoolean) {
                    handlerCalled.set(true);
                    handlerCalled.notifyAll();
                }
            }

            public void read() throws IOException {
                throw new TestIOException();
            }
        };
        RequestQueue rq = this.ioManager.getReadRequestQueue(this.ioManager.createChannel());
        rq.add((Object)req);
        AtomicBoolean atomicBoolean = handlerCalled;
        synchronized (atomicBoolean) {
            while (!handlerCalled.get()) {
                handlerCalled.wait();
            }
        }
        Assertions.assertThat((AtomicBoolean)exceptionForwarded).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testExceptionPropagationWriter() throws Exception {
        final AtomicBoolean handlerCalled = new AtomicBoolean();
        final AtomicBoolean exceptionForwarded = new AtomicBoolean();
        WriteRequest req = new WriteRequest(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void requestDone(IOException ioex) {
                if (ioex instanceof TestIOException) {
                    exceptionForwarded.set(true);
                }
                AtomicBoolean atomicBoolean = handlerCalled;
                synchronized (atomicBoolean) {
                    handlerCalled.set(true);
                    handlerCalled.notifyAll();
                }
            }

            public void write() throws IOException {
                throw new TestIOException();
            }
        };
        RequestQueue rq = this.ioManager.getWriteRequestQueue(this.ioManager.createChannel());
        rq.add((Object)req);
        AtomicBoolean atomicBoolean = handlerCalled;
        synchronized (atomicBoolean) {
            while (!handlerCalled.get()) {
                handlerCalled.wait();
            }
        }
        Assertions.assertThat((AtomicBoolean)exceptionForwarded).isTrue();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testExceptionInCallbackRead() throws Exception {
        final AtomicBoolean handlerCalled = new AtomicBoolean();
        ReadRequest regularRequest = new ReadRequest(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void requestDone(IOException ioex) {
                AtomicBoolean atomicBoolean = handlerCalled;
                synchronized (atomicBoolean) {
                    handlerCalled.set(true);
                    handlerCalled.notifyAll();
                }
            }

            public void read() {
            }
        };
        ReadRequest exceptionThrower = new ReadRequest(){

            public void requestDone(IOException ioex) {
                throw new RuntimeException();
            }

            public void read() {
            }
        };
        RequestQueue rq = this.ioManager.getReadRequestQueue(this.ioManager.createChannel());
        rq.add((Object)exceptionThrower);
        rq.add((Object)regularRequest);
        AtomicBoolean atomicBoolean = handlerCalled;
        synchronized (atomicBoolean) {
            while (!handlerCalled.get()) {
                handlerCalled.wait();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testExceptionInCallbackWrite() throws Exception {
        final AtomicBoolean handlerCalled = new AtomicBoolean();
        WriteRequest regularRequest = new WriteRequest(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            public void requestDone(IOException ioex) {
                AtomicBoolean atomicBoolean = handlerCalled;
                synchronized (atomicBoolean) {
                    handlerCalled.set(true);
                    handlerCalled.notifyAll();
                }
            }

            public void write() {
            }
        };
        WriteRequest exceptionThrower = new WriteRequest(){

            public void requestDone(IOException ioex) {
                throw new RuntimeException();
            }

            public void write() {
            }
        };
        RequestQueue rq = this.ioManager.getWriteRequestQueue(this.ioManager.createChannel());
        rq.add((Object)exceptionThrower);
        rq.add((Object)regularRequest);
        AtomicBoolean atomicBoolean = handlerCalled;
        synchronized (atomicBoolean) {
            while (!handlerCalled.get()) {
                handlerCalled.wait();
            }
        }
    }

    private static final class TestIOException
    extends IOException {
        private static final long serialVersionUID = -814705441998024472L;

        private TestIOException() {
        }
    }
}

