/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hbase.io.asyncfs;

import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ByteArrayOutputStream;
import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput;
import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Throwables;
import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.io.netty.channel.Channel;
import org.apache.hadoop.hbase.shaded.io.netty.channel.EventLoop;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.yetus.audience.InterfaceAudience;

@InterfaceAudience.Private
public final class AsyncFSOutputHelper {
    private AsyncFSOutputHelper() {
    }

    public static AsyncFSOutput createOutput(FileSystem fs, Path f, boolean overwrite, boolean createParent, short replication, long blockSize, final EventLoop eventLoop, Class<? extends Channel> channelClass) throws IOException {
        if (fs instanceof DistributedFileSystem) {
            return FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem)fs, f, overwrite, createParent, replication, blockSize, eventLoop, channelClass);
        }
        int bufferSize = fs.getConf().getInt("io.file.buffer.size", 4096);
        final FSDataOutputStream fsOut = createParent ? fs.create(f, overwrite, bufferSize, replication, blockSize, null) : fs.createNonRecursive(f, overwrite, bufferSize, replication, blockSize, null);
        final ExecutorService flushExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AsyncFSOutputFlusher-" + f.toString().replace("%", "%%")).build());
        return new AsyncFSOutput(){
            private final ByteArrayOutputStream out = new ByteArrayOutputStream();

            @Override
            public void write(byte[] b, int off, int len) {
                if (eventLoop.inEventLoop()) {
                    this.out.write(b, off, len);
                } else {
                    eventLoop.submit(() -> this.out.write(b, off, len)).syncUninterruptibly();
                }
            }

            @Override
            public void write(byte[] b) {
                this.write(b, 0, b.length);
            }

            @Override
            public void recoverAndClose(CancelableProgressable reporter) throws IOException {
                fsOut.close();
            }

            @Override
            public DatanodeInfo[] getPipeline() {
                return new DatanodeInfo[0];
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            private void flush0(CompletableFuture<Long> future, boolean sync) {
                try {
                    ByteArrayOutputStream byteArrayOutputStream = this.out;
                    synchronized (byteArrayOutputStream) {
                        fsOut.write(this.out.getBuffer(), 0, this.out.size());
                        this.out.reset();
                    }
                }
                catch (IOException e) {
                    eventLoop.execute(() -> future.completeExceptionally(e));
                    return;
                }
                try {
                    if (sync) {
                        fsOut.hsync();
                    } else {
                        fsOut.hflush();
                    }
                    long pos = fsOut.getPos();
                    eventLoop.execute(() -> future.complete(pos));
                }
                catch (IOException e) {
                    eventLoop.execute(() -> future.completeExceptionally(e));
                }
            }

            @Override
            public CompletableFuture<Long> flush(boolean sync) {
                CompletableFuture<Long> future = new CompletableFuture<Long>();
                flushExecutor.execute(() -> this.flush0(future, sync));
                return future;
            }

            @Override
            public void close() throws IOException {
                try {
                    flushExecutor.submit(() -> {
                        ByteArrayOutputStream byteArrayOutputStream = this.out;
                        synchronized (byteArrayOutputStream) {
                            fsOut.write(this.out.getBuffer(), 0, this.out.size());
                            this.out.reset();
                        }
                        return null;
                    }).get();
                }
                catch (InterruptedException e) {
                    throw new InterruptedIOException();
                }
                catch (ExecutionException e) {
                    Throwables.propagateIfPossible((Throwable)e.getCause(), IOException.class);
                    throw new IOException(e.getCause());
                }
                finally {
                    flushExecutor.shutdown();
                }
                fsOut.close();
            }

            @Override
            public int buffered() {
                return this.out.size();
            }

            @Override
            public void writeInt(int i) {
                this.out.writeInt(i);
            }

            @Override
            public void write(ByteBuffer bb) {
                this.out.write(bb, bb.position(), bb.remaining());
            }
        };
    }
}

