/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs.wal;

import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.wal.CorruptWalFileException;
import io.confluent.connect.hdfs.wal.WAL;
import io.confluent.connect.hdfs.wal.WALEntry;
import io.confluent.connect.hdfs.wal.WALFile;
import io.confluent.connect.storage.wal.FilePathOffset;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.CannotObtainBlockLengthException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FSWAL
implements WAL {
    private static final Logger log = LoggerFactory.getLogger(FSWAL.class);
    private static final String TRUNCATED_LOG_EXTENSION = ".1";
    private final HdfsSinkConnectorConfig conf;
    private final HdfsStorage storage;
    private final String logFile;
    protected WALFile.Writer writer = null;
    private WALFile.Reader reader = null;

    public FSWAL(String logsDir, TopicPartition topicPart, HdfsStorage storage) throws ConnectException {
        this.storage = storage;
        this.conf = storage.conf();
        String url = storage.url();
        this.logFile = FileUtils.logFileName(url, logsDir, topicPart);
    }

    public void append(String tempFile, String committedFile) throws ConnectException {
        try {
            this.acquireLease();
            WALEntry key = new WALEntry(tempFile);
            WALEntry value = new WALEntry(committedFile);
            this.writer.append(key, value);
            this.writer.hsync();
        }
        catch (IOException e) {
            log.error("Error appending WAL file: {}, {}", (Object)this.logFile, (Object)e);
            this.close();
            throw new DataException((Throwable)e);
        }
    }

    public void acquireLease() throws ConnectException {
        long sleepIntervalMs;
        log.debug("Attempting to acquire lease for WAL file: {}", (Object)this.logFile);
        for (sleepIntervalMs = 1000L; sleepIntervalMs < 16000L; sleepIntervalMs *= 2L) {
            try {
                if (this.writer != null) break;
                this.writer = WALFile.createWriter(this.conf, WALFile.Writer.file(new Path(this.logFile)), WALFile.Writer.appendIfExists(true));
                log.debug("Successfully acquired lease, {}-{}, file {}", new Object[]{this.conf.name(), this.conf.getTaskId(), this.logFile});
                break;
            }
            catch (RemoteException e) {
                if (e.getClassName().equals("org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException")) {
                    log.warn("Cannot acquire lease on WAL, {}-{}, file {}", new Object[]{this.conf.name(), this.conf.getTaskId(), this.logFile});
                    try {
                        Thread.sleep(sleepIntervalMs);
                        continue;
                    }
                    catch (InterruptedException ie) {
                        throw new ConnectException((Throwable)ie);
                    }
                }
                throw new ConnectException((Throwable)e);
            }
            catch (IOException e) {
                throw new DataException(String.format("Error creating writer for log file, %s-%s, file %s", this.conf.name(), this.conf.getTaskId(), this.logFile), (Throwable)e);
            }
        }
        if (sleepIntervalMs >= 16000L) {
            throw new ConnectException("Cannot acquire lease after timeout, will retry.");
        }
    }

    public void apply() throws ConnectException {
        log.debug("Starting to apply WAL: {}", (Object)this.logFile);
        if (!this.storage.exists(this.logFile)) {
            log.debug("WAL file does not exist: {}", (Object)this.logFile);
            return;
        }
        this.acquireLease();
        log.debug("Lease acquired");
        try {
            if (this.reader == null) {
                this.reader = this.newWalFileReader(this.logFile);
            }
            this.commitWalEntriesToStorage();
        }
        catch (CorruptWalFileException e) {
            log.error("Error applying WAL file '{}' because it is corrupted: {}", (Object)this.logFile, (Object)e);
            log.warn("Truncating and skipping corrupt WAL file '{}'.", (Object)this.logFile);
            this.close();
        }
        catch (CannotObtainBlockLengthException e) {
            log.error("Error applying WAL file '{}' because the task cannot obtain the block length from HDFS: {}", (Object)this.logFile, (Object)e);
            log.warn("Truncating and skipping the WAL file '{}'.", (Object)this.logFile);
            this.close();
        }
        catch (IOException e) {
            log.error("Error applying WAL file: {}, {}", (Object)this.logFile, (Object)e);
            this.close();
            throw new DataException((Throwable)e);
        }
        log.debug("Finished applying WAL: {}", (Object)this.logFile);
    }

    private void commitWalEntriesToStorage() throws IOException {
        HashMap<WALEntry, WALEntry> entries = new HashMap<WALEntry, WALEntry>();
        WALEntry key = new WALEntry();
        WALEntry value = new WALEntry();
        while (this.reader.next(key, value)) {
            String keyName = key.getName();
            if (keyName.equals("BEGIN")) {
                entries.clear();
                continue;
            }
            if (keyName.equals("END")) {
                this.commitEntriesToStorage(entries);
                continue;
            }
            WALEntry mapKey = new WALEntry(key.getName());
            WALEntry mapValue = new WALEntry(value.getName());
            entries.put(mapKey, mapValue);
        }
    }

    private void commitEntriesToStorage(Map<WALEntry, WALEntry> entries) {
        for (Map.Entry<WALEntry, WALEntry> entry : entries.entrySet()) {
            String tempFile = entry.getKey().getName();
            String committedFile = entry.getValue().getName();
            if (this.storage.exists(committedFile)) continue;
            this.storage.commit(tempFile, committedFile);
        }
    }

    public FilePathOffset extractLatestOffset() {
        String oldWALFile = this.logFile + TRUNCATED_LOG_EXTENSION;
        try {
            FilePathOffset latestOffset = null;
            if (this.storage.exists(this.logFile)) {
                log.trace("Restoring offset from WAL file: {}", (Object)this.logFile);
                if (this.reader == null) {
                    this.reader = this.newWalFileReader(this.logFile);
                } else {
                    this.reader.seekToFirstRecord();
                }
                List<String> committedFileBatch = this.getLastFilledBlockFromWAL(this.reader);
                latestOffset = this.getLatestOffsetFromList(committedFileBatch);
            }
            if (latestOffset == null && this.storage.exists(oldWALFile)) {
                log.trace("Could not find offset in log file {}. Using {} instead", (Object)this.logFile, (Object)oldWALFile);
                try (WALFile.Reader oldFileReader = this.newWalFileReader(oldWALFile);){
                    List<String> committedFileBatch = this.getLastFilledBlockFromWAL(oldFileReader);
                    latestOffset = this.getLatestOffsetFromList(committedFileBatch);
                }
            }
            return latestOffset;
        }
        catch (IOException e) {
            log.warn("Error restoring offsets from either {} or {} WAL files: {}", new Object[]{this.logFile, oldWALFile, e.getMessage()});
            return null;
        }
    }

    private List<String> getLastFilledBlockFromWAL(WALFile.Reader reader) throws IOException {
        List<String> committedFilenames = Collections.emptyList();
        ArrayList<String> tempFilenames = new ArrayList<String>();
        WALEntry key = new WALEntry();
        WALEntry value = new WALEntry();
        boolean entryBlockStarted = false;
        while (reader.next(key, value)) {
            String keyName = key.getName();
            if (keyName.equals("BEGIN")) {
                tempFilenames.clear();
                entryBlockStarted = true;
                continue;
            }
            if (keyName.equals("END")) {
                if (entryBlockStarted && !tempFilenames.isEmpty()) {
                    committedFilenames = new ArrayList<String>(tempFilenames);
                }
                tempFilenames.clear();
                entryBlockStarted = false;
                continue;
            }
            if (!entryBlockStarted) continue;
            tempFilenames.add(value.getName());
        }
        if (entryBlockStarted && !tempFilenames.isEmpty()) {
            log.warn("The last file block in the WAL is missing an END token");
        }
        return committedFilenames;
    }

    private FilePathOffset getLatestOffsetFromList(List<String> committedFileNames) {
        FilePathOffset latestOffsetEntry = null;
        long latestOffset = -1L;
        for (String fileName : committedFileNames) {
            long currentOffset = FSWAL.extractOffsetsFromFilePath(fileName);
            if (currentOffset <= latestOffset) continue;
            latestOffset = currentOffset;
            latestOffsetEntry = new FilePathOffset(latestOffset, fileName);
        }
        return latestOffsetEntry;
    }

    static long extractOffsetsFromFilePath(String fullPath) {
        try {
            if (fullPath != null) {
                String latestFileName = Paths.get(fullPath, new String[0]).getFileName().toString();
                return FileUtils.extractOffset(latestFileName);
            }
        }
        catch (IllegalArgumentException e) {
            log.warn("Could not extract offsets from file path {}: {}", (Object)fullPath, (Object)e.getMessage());
        }
        return -1L;
    }

    private WALFile.Reader newWalFileReader(String logFile) throws IOException {
        return new WALFile.Reader(this.conf.getHadoopConfiguration(), WALFile.Reader.file(new Path(logFile)));
    }

    public void truncate() throws ConnectException {
        try {
            if (this.storage.exists(this.logFile)) {
                log.debug("Truncating WAL file: {}", (Object)this.logFile);
                String oldLogFile = this.logFile + TRUNCATED_LOG_EXTENSION;
                this.storage.delete(oldLogFile);
                this.storage.commit(this.logFile, oldLogFile);
            }
        }
        finally {
            this.close();
        }
    }

    public void close() throws ConnectException {
        log.debug("Closing WAL, {}-{}, file: {}", new Object[]{this.conf.name(), this.conf.getTaskId(), this.logFile});
        try {
            if (this.writer != null) {
                this.writer.close();
            }
            if (this.reader != null) {
                this.reader.close();
            }
        }
        catch (IOException e) {
            throw new DataException("Error closing " + this.logFile, (Throwable)e);
        }
        finally {
            this.writer = null;
            this.reader = null;
        }
    }

    public String getLogFile() {
        return this.logFile;
    }
}

