/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl.spill;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashAggregate;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.config.Sort;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.shaded.guava.com.google.common.base.Joiner;
import org.apache.drill.shaded.guava.com.google.common.collect.Iterators;
import org.apache.drill.shaded.guava.com.google.common.collect.Sets;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SpillSet {
    private static final Logger logger = LoggerFactory.getLogger(SpillSet.class);
    private final Iterator<String> dirs;
    private Set<String> currSpillDirs = Sets.newTreeSet();
    private final String spillDirName;
    private int fileCount = 0;
    private FileManager fileManager;
    private long readBytes;
    private long writeBytes;

    public SpillSet(FragmentContext context, PhysicalOperator popConfig) {
        this(context.getConfig(), context.getHandle(), popConfig);
    }

    public SpillSet(DrillConfig config, ExecProtos.FragmentHandle handle, PhysicalOperator popConfig) {
        List dirList;
        String spillFs;
        String operName;
        if (popConfig instanceof Sort) {
            operName = "Sort";
            spillFs = config.getString("drill.exec.sort.external.spill.fs");
            dirList = config.getStringList("drill.exec.sort.external.spill.directories");
        } else if (popConfig instanceof HashAggregate) {
            operName = "HashAgg";
            spillFs = config.getString("drill.exec.hashagg.spill.fs");
            dirList = config.getStringList("drill.exec.hashagg.spill.directories");
        } else if (popConfig instanceof HashJoinPOP) {
            operName = "HashJoin";
            spillFs = config.getString("drill.exec.hashjoin.spill.fs");
            dirList = config.getStringList("drill.exec.hashjoin.spill.directories");
        } else {
            operName = "Unknown";
            spillFs = config.getString("drill.exec.spill.fs");
            dirList = config.getStringList("drill.exec.spill.directories");
        }
        this.dirs = Iterators.cycle(dirList);
        if (dirList.size() > 1) {
            int hash = handle.getQueryId().hashCode() + handle.getMajorFragmentId() + handle.getMinorFragmentId() + popConfig.getOperatorId();
            int offset = hash % dirList.size();
            for (int i = 0; i < offset; ++i) {
                this.dirs.next();
            }
        }
        boolean impersonationEnabled = config.getBoolean("drill.exec.impersonation.enabled");
        this.fileManager = spillFs.startsWith("file:///") && !impersonationEnabled ? new LocalFileManager(spillFs) : new HadoopFileManager(spillFs);
        this.spillDirName = String.format("%s_%s_%s-%s-%s", QueryIdHelper.getQueryId(handle.getQueryId()), operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
    }

    public String getNextSpillFile() {
        return this.getNextSpillFile(null);
    }

    public String getNextSpillFile(String extraName) {
        String spillDir = this.dirs.next();
        String currSpillPath = Joiner.on("/").join(spillDir, this.spillDirName, new Object[0]);
        this.currSpillDirs.add(currSpillPath);
        String outputFile = Joiner.on("/").join(currSpillPath, "spill" + ++this.fileCount, new Object[0]);
        if (extraName != null) {
            outputFile = outputFile + "_" + extraName;
        }
        try {
            this.fileManager.deleteOnExit(currSpillPath);
        }
        catch (IOException e) {
            logger.warn("Unable to mark spill directory " + currSpillPath + " for deleting on exit", (Throwable)e);
        }
        return outputFile;
    }

    public boolean hasSpilled() {
        return this.fileCount > 0;
    }

    public int getFileCount() {
        return this.fileCount;
    }

    public InputStream openForInput(String fileName) throws IOException {
        return this.fileManager.openForInput(fileName);
    }

    public WritableByteChannel openForOutput(String fileName) throws IOException {
        return this.fileManager.createForWrite(fileName);
    }

    public void delete(String fileName) throws IOException {
        this.fileManager.deleteFile(fileName);
    }

    public long getWriteBytes() {
        return this.writeBytes;
    }

    public long getReadBytes() {
        return this.readBytes;
    }

    public void close() {
        for (String path : this.currSpillDirs) {
            try {
                this.fileManager.deleteDir(path);
            }
            catch (IOException e) {
                logger.warn("Unable to delete spill directory " + path, (Throwable)e);
            }
            this.currSpillDirs.clear();
        }
    }

    public long getPosition(InputStream inputStream) {
        return this.fileManager.getReadBytes(inputStream);
    }

    public long getPosition(WritableByteChannel channel) {
        return this.fileManager.getWriteBytes(channel);
    }

    public void tallyReadBytes(long readLength) {
        this.readBytes += readLength;
    }

    public void tallyWriteBytes(long writeLength) {
        this.writeBytes += writeLength;
    }

    public VectorSerializer.Writer writer(String fileName) throws IOException {
        return VectorSerializer.writer(this.openForOutput(fileName));
    }

    public void close(VectorSerializer.Writer writer) throws IOException {
        this.tallyWriteBytes(writer.getBytesWritten());
        writer.close();
    }

    private static class LocalFileManager
    implements FileManager {
        private File baseDir;

        public LocalFileManager(String fsName) {
            this.baseDir = new File(fsName.replace("file:///", ""));
        }

        @Override
        public void deleteOnExit(String fragmentSpillDir) throws IOException {
            File dir = new File(this.baseDir, fragmentSpillDir);
            dir.mkdirs();
            dir.deleteOnExit();
        }

        @Override
        public WritableByteChannel createForWrite(String fileName) throws IOException {
            return FileChannel.open(new File(this.baseDir, fileName).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
        }

        @Override
        public InputStream openForInput(String fileName) throws IOException {
            return new CountingInputStream(new BufferedInputStream(new FileInputStream(new File(this.baseDir, fileName))));
        }

        @Override
        public void deleteFile(String fileName) throws IOException {
            new File(this.baseDir, fileName).delete();
        }

        @Override
        public void deleteDir(String fragmentSpillDir) throws IOException {
            File spillDir = new File(this.baseDir, fragmentSpillDir);
            for (File spillFile : spillDir.listFiles()) {
                spillFile.delete();
            }
            spillDir.delete();
        }

        @Override
        public long getWriteBytes(WritableByteChannel channel) {
            try {
                return ((FileChannel)channel).position();
            }
            catch (Exception e) {
                return 0L;
            }
        }

        @Override
        public long getReadBytes(InputStream inputStream) {
            return ((CountingInputStream)inputStream).getCount();
        }
    }

    private static interface FileManager {
        public void deleteOnExit(String var1) throws IOException;

        public WritableByteChannel createForWrite(String var1) throws IOException;

        public InputStream openForInput(String var1) throws IOException;

        public void deleteFile(String var1) throws IOException;

        public void deleteDir(String var1) throws IOException;

        public long getWriteBytes(WritableByteChannel var1);

        public long getReadBytes(InputStream var1);
    }

    private static class HadoopFileManager
    implements FileManager {
        private static final int TRANSFER_SIZE = 73728;
        private final byte[] buffer = new byte[73728];
        private FileSystem fs;

        protected HadoopFileManager(String fsName) {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", fsName);
            try {
                this.fs = FileSystem.get((Configuration)conf);
            }
            catch (IOException e) {
                throw UserException.resourceError(e).message("Failed to get the File System for external sort", new Object[0]).build(logger);
            }
        }

        @Override
        public void deleteOnExit(String fragmentSpillDir) throws IOException {
            this.fs.deleteOnExit(new Path(fragmentSpillDir));
        }

        @Override
        public WritableByteChannel createForWrite(String fileName) throws IOException {
            return new WritableByteChannelImpl(this.buffer, (OutputStream)this.fs.create(new Path(fileName)));
        }

        @Override
        public InputStream openForInput(String fileName) throws IOException {
            return this.fs.open(new Path(fileName));
        }

        @Override
        public void deleteFile(String fileName) throws IOException {
            Path path = new Path(fileName);
            if (this.fs.exists(path)) {
                this.fs.delete(path, false);
            }
        }

        @Override
        public void deleteDir(String fragmentSpillDir) throws IOException {
            Path path = new Path(fragmentSpillDir);
            if (path != null && this.fs.exists(path) && this.fs.delete(path, true)) {
                this.fs.cancelDeleteOnExit(path);
            }
        }

        @Override
        public long getWriteBytes(WritableByteChannel channel) {
            try {
                return ((FSDataOutputStream)((WritableByteChannelImpl)channel).out).getPos();
            }
            catch (Exception e) {
                return 0L;
            }
        }

        @Override
        public long getReadBytes(InputStream inputStream) {
            try {
                return ((FSDataInputStream)inputStream).getPos();
            }
            catch (IOException e) {
                return 0L;
            }
        }
    }

    private static class WritableByteChannelImpl
    implements WritableByteChannel {
        private final byte[] buffer;
        private OutputStream out;

        WritableByteChannelImpl(byte[] buffer, OutputStream out) {
            this.buffer = buffer;
            this.out = out;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public int write(ByteBuffer src) throws IOException {
            int remaining = src.remaining();
            int totalWritten = 0;
            byte[] byArray = this.buffer;
            synchronized (this.buffer) {
                for (int posn = 0; posn < remaining; posn += this.buffer.length) {
                    int len = Math.min(this.buffer.length, remaining - posn);
                    src.get(this.buffer, 0, len);
                    this.out.write(this.buffer, 0, len);
                    totalWritten += len;
                }
                // ** MonitorExit[var4_4] (shouldn't be in output)
                return totalWritten;
            }
        }

        @Override
        public boolean isOpen() {
            return this.out != null;
        }

        @Override
        public void close() throws IOException {
            this.out.close();
            this.out = null;
        }
    }

    public static class CountingOutputStream
    extends OutputStream {
        private OutputStream out;
        private long count;

        public CountingOutputStream(OutputStream out) {
            this.out = out;
        }

        @Override
        public void write(int b) throws IOException {
            ++this.count;
            this.out.write(b);
        }

        @Override
        public void write(byte[] b) throws IOException {
            this.count += (long)b.length;
            this.out.write(b);
        }

        @Override
        public void write(byte[] b, int off, int len) throws IOException {
            this.count += (long)len;
            this.out.write(b, off, len);
        }

        @Override
        public void flush() throws IOException {
            this.out.flush();
        }

        @Override
        public void close() throws IOException {
            this.out.close();
        }

        public long getCount() {
            return this.count;
        }
    }

    public static class CountingInputStream
    extends InputStream {
        private InputStream in;
        private long count;

        public CountingInputStream(InputStream in) {
            this.in = in;
        }

        @Override
        public int read() throws IOException {
            int b = this.in.read();
            if (b != -1) {
                ++this.count;
            }
            return b;
        }

        @Override
        public int read(byte[] b) throws IOException {
            int n = this.in.read(b);
            if (n != -1) {
                this.count += (long)n;
            }
            return n;
        }

        @Override
        public int read(byte[] b, int off, int len) throws IOException {
            int n = this.in.read(b, off, len);
            if (n != -1) {
                this.count += (long)n;
            }
            return n;
        }

        @Override
        public long skip(long n) throws IOException {
            return this.in.skip(n);
        }

        @Override
        public void close() throws IOException {
            this.in.close();
        }

        public long getCount() {
            return this.count;
        }
    }
}

