/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.wal.CorruptWalFileException;
import io.confluent.connect.hdfs.wal.WALEntry;
import java.io.Closeable;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.rmi.server.UID;
import java.security.MessageDigest;
import java.util.Arrays;
import org.apache.commons.io.Charsets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.VersionMismatchException;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.serializer.Deserializer;
import org.apache.hadoop.io.serializer.SerializationFactory;
import org.apache.hadoop.io.serializer.Serializer;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Options;
import org.apache.hadoop.util.Time;
import org.apache.kafka.connect.errors.ConnectException;

public class WALFile {
    private static final Log log = LogFactory.getLog(WALFile.class);
    private static final byte INITIAL_VERSION = 0;
    private static final int SYNC_ESCAPE = -1;
    private static final int SYNC_HASH_SIZE = 16;
    private static final int SYNC_SIZE = 20;
    public static final int SYNC_INTERVAL = 2000;
    private static byte[] VERSION = new byte[]{87, 65, 76, 0};
    private static String deserErrorFmt = "Could not find a deserializer for the %s class: '%s'. Please ensure that the configuration '%s' is properly configured, if you're using custom serialization.";

    private WALFile() {
    }

    public static Writer createWriter(HdfsSinkConnectorConfig conf, Writer.Option ... opts) throws IOException {
        return new Writer(conf, opts);
    }

    private static int getBufferSize(Configuration conf) {
        return conf.getInt("io.file.buffer.size", 4096);
    }

    public static class Reader
    implements Closeable {
        private String filename;
        private FileSystem fs;
        private FSDataInputStream in;
        private DataOutputBuffer outBuf = new DataOutputBuffer();
        private byte version;
        private byte[] sync = new byte[16];
        private byte[] syncCheck = new byte[16];
        private boolean syncSeen;
        private long headerEnd;
        private long end;
        private int keyLength;
        private int recordLength;
        private Configuration conf;
        private DataInputBuffer valBuffer = null;
        private DataInputStream valIn = null;
        private Deserializer<WALEntry> keyDeserializer;
        private Deserializer<WALEntry> valDeserializer;

        public Reader(Configuration conf, Option ... opts) throws IOException {
            FileOption fileOpt = (FileOption)Options.getOption(FileOption.class, (Object[])opts);
            InputStreamOption streamOpt = (InputStreamOption)Options.getOption(InputStreamOption.class, (Object[])opts);
            LengthOption lenOpt = (LengthOption)Options.getOption(LengthOption.class, (Object[])opts);
            BufferSizeOption bufOpt = (BufferSizeOption)Options.getOption(BufferSizeOption.class, (Object[])opts);
            if (fileOpt == null == (streamOpt == null)) {
                throw new IllegalArgumentException("File or stream option must be specified");
            }
            if (fileOpt == null && bufOpt != null) {
                throw new IllegalArgumentException("buffer size can only be set when a file is specified.");
            }
            Path filename = null;
            try {
                FSDataInputStream file;
                long len;
                if (fileOpt != null) {
                    filename = fileOpt.getValue();
                    this.fs = FileSystem.newInstance((URI)filename.toUri(), (Configuration)conf);
                    int bufSize = bufOpt == null ? WALFile.getBufferSize(conf) : bufOpt.getValue();
                    len = null == lenOpt ? this.fs.getFileStatus(filename).getLen() : lenOpt.getValue();
                    file = this.openFile(this.fs, filename, bufSize, len);
                } else {
                    len = null == lenOpt ? Long.MAX_VALUE : lenOpt.getValue();
                    file = streamOpt.getValue();
                }
                StartOption startOpt = (StartOption)Options.getOption(StartOption.class, (Object[])opts);
                long start = startOpt == null ? 0L : startOpt.getValue();
                OnlyHeaderOption headerOnly = (OnlyHeaderOption)Options.getOption(OnlyHeaderOption.class, (Object[])opts);
                this.initialize(filename, file, start, len, conf, headerOnly != null);
            }
            catch (RemoteException re) {
                log.error((Object)("Failed creating a WAL Reader: " + re.getMessage()));
                if (this.fs != null) {
                    try {
                        this.fs.close();
                    }
                    catch (Throwable t) {
                        log.error((Object)"Error closing FileSystem", t);
                    }
                }
                throw re;
            }
        }

        public static Option file(Path value) {
            return new FileOption(value);
        }

        public static Option stream(FSDataInputStream value) {
            return new InputStreamOption(value);
        }

        public static Option start(long value) {
            return new StartOption(value);
        }

        public static Option length(long value) {
            return new LengthOption(value);
        }

        public static Option bufferSize(int value) {
            return new BufferSizeOption(value);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        private void initialize(Path filename, FSDataInputStream in, long start, long length, Configuration conf, boolean tempReader) throws IOException {
            if (in == null) {
                throw new IllegalArgumentException("in == null");
            }
            this.filename = filename == null ? "<unknown>" : filename.toString();
            this.in = in;
            this.conf = conf;
            boolean succeeded = false;
            try {
                this.seek(start);
                this.end = this.in.getPos() + length;
                if (this.end < length) {
                    this.end = Long.MAX_VALUE;
                }
                this.init(tempReader);
                return;
            }
            catch (Throwable throwable) {
                if (succeeded) throw throwable;
                IOUtils.cleanup((Log)log, (Closeable[])new Closeable[]{this.in});
                throw throwable;
            }
        }

        protected FSDataInputStream openFile(FileSystem fs, Path file, int bufferSize, long length) throws IOException {
            return fs.open(file, bufferSize);
        }

        private void init(boolean tempReader) throws IOException {
            byte[] versionBlock = new byte[VERSION.length];
            this.in.readFully(versionBlock);
            if (versionBlock[0] != VERSION[0] || versionBlock[1] != VERSION[1] || versionBlock[2] != VERSION[2]) {
                throw new IOException(this + " not a WALFile");
            }
            this.version = versionBlock[3];
            if (this.version > VERSION[3]) {
                throw new VersionMismatchException(VERSION[3], this.version);
            }
            this.in.readFully(this.sync);
            this.headerEnd = this.in.getPos();
            if (!tempReader) {
                this.valBuffer = new DataInputBuffer();
                this.valIn = this.valBuffer;
                SerializationFactory serializationFactory = new SerializationFactory(this.conf);
                this.keyDeserializer = this.getDeserializer(serializationFactory, WALEntry.class);
                if (this.keyDeserializer == null) {
                    String errorMsg = String.format(deserErrorFmt, "Key", WALEntry.class.getCanonicalName(), "io.serializations");
                    throw new IOException(errorMsg);
                }
                this.keyDeserializer.open((InputStream)this.valBuffer);
                this.valDeserializer = this.getDeserializer(serializationFactory, WALEntry.class);
                if (this.valDeserializer == null) {
                    String errorMsg = String.format(deserErrorFmt, "Value", WALEntry.class.getCanonicalName(), "io.serializations");
                    throw new IOException(errorMsg);
                }
                this.valDeserializer.open((InputStream)this.valIn);
            }
        }

        private <T> Deserializer<T> getDeserializer(SerializationFactory sf, Class<T> c) {
            return sf.getDeserializer(c);
        }

        private byte[] getSync() {
            return this.sync;
        }

        @Override
        public synchronized void close() throws IOException {
            try {
                if (this.keyDeserializer != null) {
                    this.keyDeserializer.close();
                }
                if (this.valDeserializer != null) {
                    this.valDeserializer.close();
                }
                this.in.close();
            }
            finally {
                try {
                    this.fs.close();
                }
                catch (Throwable t) {
                    log.error((Object)"Unable to close FileSystem", t);
                }
            }
        }

        private byte getVersion() {
            return this.version;
        }

        Configuration getConf() {
            return this.conf;
        }

        private synchronized void seekToCurrentValue() throws IOException {
            this.valBuffer.reset();
        }

        public synchronized void getCurrentValue(Writable val) throws IOException {
            if (val instanceof Configurable) {
                ((Configurable)val).setConf(this.conf);
            }
            this.seekToCurrentValue();
            val.readFields((DataInput)this.valIn);
            if (this.valIn.read() > 0) {
                log.info((Object)("available bytes: " + this.valIn.available()));
                throw new IOException(val + " read " + (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read " + (this.valBuffer.getLength() - this.keyLength));
            }
        }

        public synchronized WALEntry getCurrentValue(WALEntry val) throws IOException {
            if (val instanceof Configurable) {
                ((Configurable)val).setConf(this.conf);
            }
            this.seekToCurrentValue();
            val = this.deserializeValue(val);
            if (this.valIn.read() > 0) {
                log.info((Object)("available bytes: " + this.valIn.available()));
                throw new IOException(val + " read " + (this.valBuffer.getPosition() - this.keyLength) + " bytes, should read " + (this.valBuffer.getLength() - this.keyLength));
            }
            return val;
        }

        private WALEntry deserializeValue(WALEntry val) throws IOException {
            return (WALEntry)this.valDeserializer.deserialize((Object)val);
        }

        private synchronized int readRecordLength() throws IOException {
            if (this.in.getPos() >= this.end) {
                return -1;
            }
            int length = this.in.readInt();
            if (this.sync != null && length == -1) {
                this.in.readFully(this.syncCheck);
                if (!Arrays.equals(this.sync, this.syncCheck)) {
                    throw new CorruptWalFileException("File is corrupt!");
                }
                this.syncSeen = true;
                if (this.in.getPos() >= this.end) {
                    return -1;
                }
                length = this.in.readInt();
            } else {
                this.syncSeen = false;
            }
            return length;
        }

        public synchronized boolean next(Writable key) throws IOException {
            if (key.getClass() != WALEntry.class) {
                throw new IOException("wrong key class: " + key.getClass().getName() + " is not " + WALEntry.class);
            }
            this.outBuf.reset();
            this.keyLength = this.next(this.outBuf);
            if (this.keyLength < 0) {
                return false;
            }
            this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
            key.readFields((DataInput)this.valBuffer);
            this.valBuffer.mark(0);
            if (this.valBuffer.getPosition() != this.keyLength) {
                throw new IOException(key + " read " + this.valBuffer.getPosition() + " bytes, should read " + this.keyLength);
            }
            return true;
        }

        public synchronized boolean next(Writable key, Writable val) throws IOException {
            if (val.getClass() != WALEntry.class) {
                throw new IOException("wrong value class: " + val + " is not " + WALEntry.class);
            }
            boolean more = this.next(key);
            if (more) {
                this.getCurrentValue(val);
            }
            return more;
        }

        synchronized int next(DataOutputBuffer buffer) throws IOException {
            try {
                int length = this.readRecordLength();
                if (length == -1) {
                    return -1;
                }
                int keyLength = this.in.readInt();
                buffer.write((DataInput)this.in, length);
                return keyLength;
            }
            catch (ChecksumException e) {
                this.handleChecksumException(e);
                return this.next(buffer);
            }
        }

        public synchronized WALEntry next(WALEntry key) throws IOException {
            this.outBuf.reset();
            this.keyLength = this.next(this.outBuf);
            if (this.keyLength < 0) {
                return null;
            }
            this.valBuffer.reset(this.outBuf.getData(), this.outBuf.getLength());
            key = this.deserializeKey(key);
            this.valBuffer.mark(0);
            if (this.valBuffer.getPosition() != this.keyLength) {
                throw new IOException(key + " read " + this.valBuffer.getPosition() + " bytes, should read " + this.keyLength);
            }
            return key;
        }

        private WALEntry deserializeKey(WALEntry key) throws IOException {
            return (WALEntry)this.keyDeserializer.deserialize((Object)key);
        }

        private void handleChecksumException(ChecksumException e) throws IOException {
            if (!this.conf.getBoolean("io.skip.checksum.errors", false)) {
                throw e;
            }
            log.warn((Object)("Bad checksum at " + this.getPosition() + ". Skipping entries."));
            this.sync(this.getPosition() + (long)this.conf.getInt("io.bytes.per.checksum", 512));
        }

        synchronized void ignoreSync() {
            this.sync = null;
        }

        public synchronized void seek(long position) throws IOException {
            this.in.seek(position);
        }

        public void seekToFirstRecord() throws IOException {
            this.in.seek(this.headerEnd);
        }

        public synchronized void sync(long position) throws IOException {
            if (position + 20L >= this.end) {
                this.seek(this.end);
                return;
            }
            if (position < this.headerEnd) {
                this.in.seek(this.headerEnd);
                this.syncSeen = true;
                return;
            }
            try {
                this.seek(position + 4L);
                this.in.readFully(this.syncCheck);
                int syncLen = this.sync.length;
                int i = 0;
                while (this.in.getPos() < this.end) {
                    int j;
                    for (j = 0; j < syncLen && this.sync[j] == this.syncCheck[(i + j) % syncLen]; ++j) {
                    }
                    if (j == syncLen) {
                        this.in.seek(this.in.getPos() - 20L);
                        return;
                    }
                    this.syncCheck[i % syncLen] = this.in.readByte();
                    ++i;
                }
            }
            catch (ChecksumException e) {
                this.handleChecksumException(e);
            }
        }

        public synchronized boolean syncSeen() {
            return this.syncSeen;
        }

        public synchronized long getPosition() throws IOException {
            return this.in.getPos();
        }

        public String toString() {
            return this.filename;
        }

        private static class OnlyHeaderOption
        extends Options.BooleanOption
        implements Option {
            private OnlyHeaderOption() {
                super(true);
            }
        }

        private static class BufferSizeOption
        extends Options.IntegerOption
        implements Option {
            private BufferSizeOption(int value) {
                super(value);
            }
        }

        private static class LengthOption
        extends Options.LongOption
        implements Option {
            private LengthOption(long value) {
                super(value);
            }
        }

        private static class StartOption
        extends Options.LongOption
        implements Option {
            private StartOption(long value) {
                super(value);
            }
        }

        private static class InputStreamOption
        extends Options.FSDataInputStreamOption
        implements Option {
            private InputStreamOption(FSDataInputStream value) {
                super(value);
            }
        }

        private static class FileOption
        extends Options.PathOption
        implements Option {
            private FileOption(Path value) {
                super(value);
            }
        }

        public static interface Option {
        }
    }

    public static class Writer
    implements Closeable,
    Syncable {
        protected Serializer<WALEntry> keySerializer;
        protected Serializer<WALEntry> valSerializer;
        boolean ownOutputStream = true;
        long lastSyncPos;
        byte[] sync;
        private FileSystem fs;
        private FSDataOutputStream out;
        private DataOutputBuffer buffer = new DataOutputBuffer();
        private boolean appendMode;

        Writer(HdfsSinkConnectorConfig connectorConfig, Option ... opts) throws IOException {
            try {
                MessageDigest digester = MessageDigest.getInstance("MD5");
                long time = Time.now();
                digester.update((new UID() + "@" + time).getBytes(Charsets.UTF_8));
                this.sync = digester.digest();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
            Configuration conf = connectorConfig.getHadoopConfiguration();
            BlockSizeOption blockSizeOption = (BlockSizeOption)Options.getOption(BlockSizeOption.class, (Object[])opts);
            BufferSizeOption bufferSizeOption = (BufferSizeOption)Options.getOption(BufferSizeOption.class, (Object[])opts);
            ReplicationOption replicationOption = (ReplicationOption)Options.getOption(ReplicationOption.class, (Object[])opts);
            FileOption fileOption = (FileOption)Options.getOption(FileOption.class, (Object[])opts);
            AppendIfExistsOption appendIfExistsOption = (AppendIfExistsOption)Options.getOption(AppendIfExistsOption.class, (Object[])opts);
            StreamOption streamOption = (StreamOption)Options.getOption(StreamOption.class, (Object[])opts);
            if (fileOption == null == (streamOption == null)) {
                throw new IllegalArgumentException("file or stream must be specified");
            }
            if (fileOption == null && (blockSizeOption != null || bufferSizeOption != null || replicationOption != null)) {
                throw new IllegalArgumentException("file modifier options not compatible with stream");
            }
            boolean ownStream = fileOption != null;
            try {
                FSDataOutputStream out;
                if (ownStream) {
                    long blockSize;
                    Path p = fileOption.getValue();
                    this.fs = FileSystem.newInstance((URI)p.toUri(), (Configuration)conf);
                    int bufferSize = bufferSizeOption == null ? WALFile.getBufferSize(conf) : bufferSizeOption.getValue();
                    short replication = replicationOption == null ? this.fs.getDefaultReplication(p) : (short)replicationOption.getValue();
                    long l = blockSize = blockSizeOption == null ? this.fs.getDefaultBlockSize(p) : blockSizeOption.getValue();
                    if (appendIfExistsOption != null && appendIfExistsOption.getValue() && this.fs.exists(p) && this.hasIntactVersionHeader(p, this.fs)) {
                        try (Reader reader = new Reader(connectorConfig.getHadoopConfiguration(), Reader.file(p), new Reader.OnlyHeaderOption());){
                            if (reader.getVersion() != VERSION[3]) {
                                throw new VersionMismatchException(VERSION[3], reader.getVersion());
                            }
                            this.sync = reader.getSync();
                        }
                        out = this.fs.append(p, bufferSize);
                        this.appendMode = true;
                    } else {
                        out = this.fs.create(p, true, bufferSize, replication, blockSize);
                    }
                } else {
                    out = streamOption.getValue();
                }
                this.init(connectorConfig, out, ownStream);
            }
            catch (Exception re) {
                log.warn((Object)("Failed creating a WAL Writer: " + re.getMessage()));
                if (this.fs != null) {
                    try {
                        this.fs.close();
                    }
                    catch (Throwable t) {
                        log.error((Object)"Could not close filesystem", t);
                    }
                }
                throw re;
            }
        }

        private boolean hasIntactVersionHeader(Path p, FileSystem fs) throws IOException {
            boolean result;
            FileStatus[] statuses = fs.listStatus(p);
            if (statuses.length != 1) {
                throw new ConnectException("Expected exactly one log for WAL file " + p);
            }
            boolean bl = result = statuses[0].getLen() >= (long)VERSION.length;
            if (!result) {
                log.warn((Object)("Failed to read version header from WAL file " + p));
            }
            return result;
        }

        public static Option file(Path value) {
            return new FileOption(value);
        }

        public static Option bufferSize(int value) {
            return new BufferSizeOption(value);
        }

        public static Option stream(FSDataOutputStream value) {
            return new StreamOption(value);
        }

        public static Option replication(short value) {
            return new ReplicationOption(value);
        }

        public static Option appendIfExists(boolean value) {
            return new AppendIfExistsOption(value);
        }

        public static Option blockSize(long value) {
            return new BlockSizeOption(value);
        }

        void init(HdfsSinkConnectorConfig connectorConfig, FSDataOutputStream out, boolean ownStream) throws IOException {
            Configuration conf = connectorConfig.getHadoopConfiguration();
            this.out = out;
            this.ownOutputStream = ownStream;
            SerializationFactory serializationFactory = new SerializationFactory(conf);
            this.keySerializer = serializationFactory.getSerializer(WALEntry.class);
            if (this.keySerializer == null) {
                String errorMsg = String.format(deserErrorFmt, "Key", WALEntry.class.getCanonicalName(), "io.serializations");
                throw new IOException(errorMsg);
            }
            this.keySerializer.open((OutputStream)this.buffer);
            this.valSerializer = serializationFactory.getSerializer(WALEntry.class);
            if (this.valSerializer == null) {
                String errorMsg = String.format(deserErrorFmt, "Value", WALEntry.class.getCanonicalName(), "io.serializations");
                throw new IOException(errorMsg);
            }
            this.valSerializer.open((OutputStream)this.buffer);
            if (this.appendMode) {
                this.sync();
            } else {
                this.writeFileHeader();
            }
        }

        public synchronized void append(WALEntry key, WALEntry val) throws IOException {
            this.buffer.reset();
            this.keySerializer.serialize((Object)key);
            int keyLength = this.buffer.getLength();
            if (keyLength < 0) {
                throw new IOException("negative length keys not allowed: " + key);
            }
            this.valSerializer.serialize((Object)val);
            this.checkAndWriteSync();
            this.out.writeInt(this.buffer.getLength());
            this.out.writeInt(keyLength);
            this.out.write(this.buffer.getData(), 0, this.buffer.getLength());
        }

        public synchronized long getLength() throws IOException {
            return this.out.getPos();
        }

        private synchronized void checkAndWriteSync() throws IOException {
            if (this.sync != null && this.out.getPos() >= this.lastSyncPos + 2000L) {
                this.sync();
            }
        }

        private void writeFileHeader() throws IOException {
            this.out.write(VERSION);
            this.out.write(this.sync);
            this.out.flush();
        }

        @Override
        public synchronized void close() throws IOException {
            try {
                this.keySerializer.close();
                this.valSerializer.close();
                if (this.out != null) {
                    if (this.ownOutputStream) {
                        this.out.close();
                    } else {
                        this.out.flush();
                    }
                }
            }
            finally {
                if (this.fs != null) {
                    try {
                        this.fs.close();
                    }
                    catch (Throwable t) {
                        log.error((Object)"Could not close FileSystem", t);
                    }
                }
                this.out = null;
            }
        }

        public void sync() throws IOException {
            if (this.sync != null && this.lastSyncPos != this.out.getPos()) {
                this.out.writeInt(-1);
                this.out.write(this.sync);
                this.lastSyncPos = this.out.getPos();
            }
        }

        public void hflush() throws IOException {
            if (this.out != null) {
                this.out.hflush();
            }
        }

        public void hsync() throws IOException {
            if (this.out != null) {
                this.out.hsync();
            }
        }

        static class AppendIfExistsOption
        extends Options.BooleanOption
        implements Option {
            AppendIfExistsOption(boolean value) {
                super(value);
            }
        }

        static class ReplicationOption
        extends Options.IntegerOption
        implements Option {
            ReplicationOption(int value) {
                super(value);
            }
        }

        static class BlockSizeOption
        extends Options.LongOption
        implements Option {
            BlockSizeOption(long value) {
                super(value);
            }
        }

        static class BufferSizeOption
        extends Options.IntegerOption
        implements Option {
            BufferSizeOption(int value) {
                super(value);
            }
        }

        static class StreamOption
        extends Options.FSDataOutputStreamOption
        implements Option {
            StreamOption(FSDataOutputStream stream) {
                super(stream);
            }
        }

        static class FileOption
        extends Options.PathOption
        implements Option {
            FileOption(Path path) {
                super(path);
            }
        }

        public static interface Option {
        }
    }
}

