/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.common.utils.Time;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.DateTimeUtils;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.OldRecordWriterWrapper;
import io.confluent.connect.hdfs.RecordWriterProvider;
import io.confluent.connect.hdfs.errors.HiveMetaStoreException;
import io.confluent.connect.hdfs.filter.TopicPartitionCommittedFileFilter;
import io.confluent.connect.hdfs.hive.HiveMetaStore;
import io.confluent.connect.hdfs.hive.HiveUtil;
import io.confluent.connect.hdfs.partitioner.Partitioner;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.storage.format.RecordWriter;
import io.confluent.connect.storage.format.SchemaFileReader;
import io.confluent.connect.storage.partitioner.TimeBasedPartitioner;
import io.confluent.connect.storage.partitioner.TimestampExtractor;
import io.confluent.connect.storage.schema.StorageSchemaCompatibility;
import io.confluent.connect.storage.wal.FilePathOffset;
import io.confluent.connect.storage.wal.WAL;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.IllegalWorkerStateException;
import org.apache.kafka.connect.errors.SchemaProjectorException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TopicPartitionWriter {
    private static final Logger log = LoggerFactory.getLogger(TopicPartitionWriter.class);
    private static final TimestampExtractor WALLCLOCK = new TimeBasedPartitioner.WallclockTimestampExtractor();
    private final io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig> newWriterProvider;
    private final String zeroPadOffsetFormat;
    private final boolean hiveIntegration;
    private final Time time;
    private final HdfsStorage storage;
    private final WAL wal;
    private final Map<String, String> tempFiles;
    private final Map<String, RecordWriter> writers;
    private final TopicPartition tp;
    private final Partitioner partitioner;
    private final TimestampExtractor timestampExtractor;
    private final boolean isWallclockBased;
    private final String url;
    private final String topicsDir;
    private State state;
    private final Queue<SinkRecord> buffer;
    private boolean recovered;
    private final SinkTaskContext context;
    private int recordCounter;
    private final int flushSize;
    private final long rotateIntervalMs;
    private Long lastRotate;
    private final long rotateScheduleIntervalMs;
    private long nextScheduledRotate;
    private final RecordWriterProvider writerProvider;
    private final HdfsSinkConnectorConfig connectorConfig;
    private final AvroData avroData;
    private final Set<String> appended;
    private long offset;
    private final Map<String, Long> startOffsets;
    private final Map<String, Long> endOffsets;
    private final long timeoutMs;
    private long failureTime;
    private final StorageSchemaCompatibility compatibility;
    private Schema currentSchema;
    private final String extension;
    private final DateTimeZone timeZone;
    private final String hiveDatabase;
    private final HiveMetaStore hiveMetaStore;
    private final SchemaFileReader<HdfsSinkConnectorConfig, Path> schemaFileReader;
    private final HiveUtil hive;
    private final ExecutorService executorService;
    private final Queue<Future<Void>> hiveUpdateFutures;
    private final Set<String> hivePartitions;
    private final String hiveTableName;

    public TopicPartitionWriter(TopicPartition tp, HdfsStorage storage, RecordWriterProvider writerProvider, io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig> newWriterProvider, Partitioner partitioner, HdfsSinkConnectorConfig connectorConfig, SinkTaskContext context, AvroData avroData, Time time) {
        this(tp, storage, writerProvider, newWriterProvider, partitioner, connectorConfig, context, avroData, null, null, null, null, null, time, tp.topic());
    }

    public TopicPartitionWriter(TopicPartition tp, HdfsStorage storage, RecordWriterProvider writerProvider, io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig> newWriterProvider, Partitioner partitioner, HdfsSinkConnectorConfig config, SinkTaskContext context, AvroData avroData, HiveMetaStore hiveMetaStore, HiveUtil hive, SchemaFileReader<HdfsSinkConnectorConfig, Path> schemaFileReader, ExecutorService executorService, Queue<Future<Void>> hiveUpdateFutures, Time time, String hiveTableName) {
        io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> inner;
        this.hiveTableName = hiveTableName;
        this.time = time;
        this.tp = tp;
        this.context = context;
        this.avroData = avroData;
        this.storage = storage;
        this.writerProvider = writerProvider;
        this.newWriterProvider = newWriterProvider;
        this.partitioner = partitioner;
        TimestampExtractor timestampExtractor = null;
        if (partitioner instanceof DataWriter.PartitionerWrapper && TimeBasedPartitioner.class.isAssignableFrom((inner = ((DataWriter.PartitionerWrapper)partitioner).partitioner).getClass())) {
            timestampExtractor = ((TimeBasedPartitioner)inner).getTimestampExtractor();
        }
        this.timestampExtractor = timestampExtractor != null ? timestampExtractor : WALLCLOCK;
        this.isWallclockBased = TimeBasedPartitioner.WallclockTimestampExtractor.class.isAssignableFrom(this.timestampExtractor.getClass());
        this.url = storage.url();
        this.connectorConfig = storage.conf();
        this.schemaFileReader = schemaFileReader;
        this.topicsDir = config.getTopicsDirFromTopic(tp.topic());
        this.flushSize = config.getInt("flush.size");
        this.rotateIntervalMs = config.getLong("rotate.interval.ms");
        this.rotateScheduleIntervalMs = config.getLong("rotate.schedule.interval.ms");
        this.timeoutMs = config.getLong("retry.backoff.ms");
        this.compatibility = StorageSchemaCompatibility.getCompatibility((String)config.getString("schema.compatibility"));
        String logsDir = config.getLogsDirFromTopic(tp.topic());
        this.wal = storage.wal(logsDir, tp);
        this.buffer = new LinkedList<SinkRecord>();
        this.writers = new HashMap<String, RecordWriter>();
        this.tempFiles = new HashMap<String, String>();
        this.appended = new HashSet<String>();
        this.startOffsets = new HashMap<String, Long>();
        this.endOffsets = new HashMap<String, Long>();
        this.state = State.RECOVERY_STARTED;
        this.failureTime = -1L;
        this.offset = -1L;
        if (writerProvider != null) {
            this.extension = writerProvider.getExtension();
        } else if (newWriterProvider != null) {
            this.extension = newWriterProvider.getExtension();
        } else {
            throw new ConnectException("Invalid state: either old or new RecordWriterProvider must be provided");
        }
        this.zeroPadOffsetFormat = "%0" + config.getInt("filename.offset.zero.pad.width") + "d";
        this.hiveIntegration = config.getBoolean("hive.integration");
        this.hiveDatabase = this.hiveIntegration ? config.getString("hive.database") : null;
        this.hiveMetaStore = hiveMetaStore;
        this.hive = hive;
        this.executorService = executorService;
        this.hiveUpdateFutures = hiveUpdateFutures;
        this.hivePartitions = new HashSet<String>();
        this.timeZone = this.rotateScheduleIntervalMs > 0L ? DateTimeZone.forID((String)config.getString("timezone")) : null;
        this.updateRotationTimers(null);
    }

    public boolean recover() {
        try {
            switch (this.state) {
                case RECOVERY_STARTED: {
                    log.info("Started recovery for topic partition {}", (Object)this.tp);
                    this.pause();
                    this.nextState();
                }
                case RECOVERY_PARTITION_PAUSED: {
                    log.debug("Start recovery state: Apply WAL for topic partition {}", (Object)this.tp);
                    this.applyWAL();
                    this.nextState();
                }
                case WAL_APPLIED: {
                    log.debug("Start recovery state: Reset Offsets for topic partition {}", (Object)this.tp);
                    this.resetOffsets();
                    this.nextState();
                }
                case OFFSET_RESET: {
                    log.debug("Start recovery state: Truncate WAL for topic partition {}", (Object)this.tp);
                    this.truncateWAL();
                    this.nextState();
                }
                case WAL_TRUNCATED: {
                    log.debug("Start recovery state: Resume for topic partition {}", (Object)this.tp);
                    this.resume();
                    this.nextState();
                    log.info("Finished recovery for topic partition {}", (Object)this.tp);
                    break;
                }
                default: {
                    log.error("{} is not a valid state to perform recovery for topic partition {}.", (Object)this.state, (Object)this.tp);
                    break;
                }
            }
        }
        catch (ConnectException e) {
            log.error("Recovery failed at state {}", (Object)this.state, (Object)e);
            this.setRetryTimeout(this.timeoutMs);
            return false;
        }
        return true;
    }

    private void updateRotationTimers(SinkRecord currentRecord) {
        long now = this.time.milliseconds();
        Long l = this.isWallclockBased ? Long.valueOf(now) : (this.lastRotate = currentRecord != null ? this.timestampExtractor.extract((ConnectRecord)currentRecord) : null);
        if (log.isDebugEnabled() && this.rotateIntervalMs > 0L) {
            log.debug("Update last rotation timer. Next rotation for {} will be in {}ms", (Object)this.tp, (Object)this.rotateIntervalMs);
        }
        if (this.rotateScheduleIntervalMs > 0L) {
            this.nextScheduledRotate = DateTimeUtils.getNextTimeAdjustedByDay((long)now, (long)this.rotateScheduleIntervalMs, (DateTimeZone)this.timeZone);
            if (log.isDebugEnabled()) {
                log.debug("Update scheduled rotation timer. Next rotation for {} will be at {}", (Object)this.tp, (Object)new DateTime(this.nextScheduledRotate).withZone(this.timeZone).toString());
            }
        }
    }

    /*
     * Unable to fully structure code
     */
    public void write() {
        now = this.time.milliseconds();
        currentRecord = null;
        if (this.failureTime > 0L && now - this.failureTime < this.timeoutMs) {
            return;
        }
        if (this.state.compareTo(State.WRITE_STARTED) < 0) {
            success = this.recover();
            if (!success) {
                return;
            }
            this.updateRotationTimers(null);
        }
        block21: while (!this.buffer.isEmpty()) {
            try {
                switch (1.$SwitchMap$io$confluent$connect$hdfs$TopicPartitionWriter$State[this.state.ordinal()]) {
                    case 6: {
                        this.pause();
                        this.nextState();
                    }
                    case 7: {
                        if (this.currentSchema == null && this.compatibility != StorageSchemaCompatibility.NONE && this.offset != -1L && (fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(this.storage, new Path(topicDir = FileUtils.topicDirectory(this.url, this.topicsDir, this.tp.topic())), filter = new TopicPartitionCommittedFileFilter(this.tp))) != null) {
                            this.currentSchema = this.schemaFileReader.getSchema((Object)this.connectorConfig, (Object)fileStatusWithMaxOffset.getPath());
                        }
                        currentRecord = record = this.buffer.peek();
                        valueSchema = record.valueSchema();
                        if ((this.recordCounter > 0 || this.currentSchema != null || valueSchema == null) && !this.compatibility.shouldChangeSchema((ConnectRecord)record, null, this.currentSchema)) ** GOTO lbl29
                        this.currentSchema = valueSchema;
                        if (this.hiveIntegration) {
                            this.createHiveTable();
                            this.alterHiveSchema();
                        }
                        if (this.recordCounter <= 0) continue block21;
                        this.nextState();
                        ** GOTO lbl38
lbl29:
                        // 1 sources

                        if (this.shouldRotateAndMaybeUpdateTimers(currentRecord, now)) {
                            TopicPartitionWriter.log.info("Starting commit and rotation for topic partition {} with start offsets {} and end offsets {}", new Object[]{this.tp, this.startOffsets, this.endOffsets});
                            this.nextState();
                        } else {
                            projectedRecord = this.compatibility.project(record, null, this.currentSchema);
                            this.writeRecord(projectedRecord);
                            this.buffer.poll();
                            continue block21;
                        }
                    }
lbl38:
                    // 3 sources

                    case 8: {
                        this.updateRotationTimers(currentRecord);
                        this.closeTempFile();
                        this.nextState();
                    }
                    case 9: {
                        this.appendToWAL();
                        this.nextState();
                    }
                    case 10: {
                        this.commitFile();
                        this.nextState();
                    }
                    case 11: {
                        this.setState(State.WRITE_PARTITION_PAUSED);
                        continue block21;
                    }
                }
                TopicPartitionWriter.log.error("{} is not a valid state to write record for topic partition {}.", (Object)this.state, (Object)this.tp);
            }
            catch (HiveMetaStoreException | IllegalWorkerStateException | SchemaProjectorException e) {
                throw new RuntimeException((Throwable)e);
            }
            catch (ConnectException e) {
                TopicPartitionWriter.log.error("Exception on topic partition {}: ", (Object)this.tp, (Object)e);
                this.failureTime = this.time.milliseconds();
                this.setRetryTimeout(this.timeoutMs);
                break;
            }
        }
        if (this.buffer.isEmpty()) {
            try {
                switch (1.$SwitchMap$io$confluent$connect$hdfs$TopicPartitionWriter$State[this.state.ordinal()]) {
                    case 6: {
                        this.pause();
                        this.nextState();
                    }
                    case 7: {
                        if (this.recordCounter == 0) break;
                        if (!this.shouldRotateAndMaybeUpdateTimers(currentRecord, now)) break;
                        TopicPartitionWriter.log.info("committing files after waiting for rotateIntervalMs time but less than flush.size records available.");
                        this.nextState();
                    }
                    case 8: {
                        this.updateRotationTimers(currentRecord);
                        this.closeTempFile();
                        this.nextState();
                    }
                    case 9: {
                        this.appendToWAL();
                        this.nextState();
                    }
                    case 10: {
                        this.commitFile();
                        this.nextState();
                    }
                    case 11: {
                        break;
                    }
                    default: {
                        TopicPartitionWriter.log.error("{} is not a valid state to empty batch for topic partition {}.", (Object)this.state, (Object)this.tp);
                    }
                }
            }
            catch (ConnectException e) {
                TopicPartitionWriter.log.error("Exception on topic partition {}: ", (Object)this.tp, (Object)e);
                this.failureTime = this.time.milliseconds();
                this.setRetryTimeout(this.timeoutMs);
                return;
            }
            this.resume();
            this.state = State.WRITE_STARTED;
        }
    }

    public void close() throws ConnectException {
        log.debug("Closing TopicPartitionWriter {}", (Object)this.tp);
        ArrayList<ConnectException> exceptions = new ArrayList<ConnectException>();
        for (String encodedPartition : this.tempFiles.keySet()) {
            log.debug("Discarding in progress tempfile {} for {} {}", new Object[]{this.tempFiles.get(encodedPartition), this.tp, encodedPartition});
            try {
                this.closeTempFile(encodedPartition);
            }
            catch (ConnectException connectException) {
                log.error("Error closing temp file {} for {} {} when closing TopicPartitionWriter:", new Object[]{this.tempFiles.get(encodedPartition), this.tp, encodedPartition, connectException});
            }
            try {
                this.deleteTempFile(encodedPartition);
            }
            catch (ConnectException connectException) {
                log.error("Error deleting temp file {} for {} {} when closing TopicPartitionWriter:", new Object[]{this.tempFiles.get(encodedPartition), this.tp, encodedPartition, connectException});
            }
        }
        this.writers.clear();
        try {
            this.wal.close();
        }
        catch (ConnectException e) {
            log.error("Error closing {}.", (Object)this.wal.getLogFile(), (Object)e);
            exceptions.add(e);
        }
        this.startOffsets.clear();
        this.endOffsets.clear();
        if (exceptions.size() != 0) {
            StringBuilder sb = new StringBuilder();
            for (Exception exception : exceptions) {
                sb.append(exception.getMessage());
                sb.append("\n");
            }
            throw new ConnectException("Error closing writer: " + sb.toString());
        }
    }

    public void buffer(SinkRecord sinkRecord) {
        log.trace("Buffering record with offset {}", (Object)sinkRecord.kafkaOffset());
        this.buffer.add(sinkRecord);
    }

    public long offset() {
        return this.offset;
    }

    public TopicPartition topicPartition() {
        return this.tp;
    }

    Map<String, RecordWriter> getWriters() {
        return this.writers;
    }

    public Map<String, String> getTempFiles() {
        return this.tempFiles;
    }

    private String getDirectory(String encodedPartition) {
        return this.partitioner.generatePartitionedPath(this.tp.topic(), encodedPartition);
    }

    private void nextState() {
        this.state = this.state.next();
    }

    private void setState(State state) {
        this.state = state;
    }

    private boolean shouldRotateAndMaybeUpdateTimers(SinkRecord currentRecord, long now) {
        Long currentTimestamp = null;
        if (this.isWallclockBased) {
            currentTimestamp = now;
        } else if (currentRecord != null) {
            currentTimestamp = this.timestampExtractor.extract((ConnectRecord)currentRecord);
            this.lastRotate = this.lastRotate == null ? currentTimestamp : this.lastRotate;
        }
        boolean periodicRotation = this.rotateIntervalMs > 0L && currentTimestamp != null && this.lastRotate != null && currentTimestamp - this.lastRotate >= this.rotateIntervalMs;
        boolean scheduledRotation = this.rotateScheduleIntervalMs > 0L && now >= this.nextScheduledRotate;
        boolean messageSizeRotation = this.recordCounter >= this.flushSize;
        log.trace("Should apply periodic time-based rotation (rotateIntervalMs: '{}', lastRotate: '{}', timestamp: '{}')? {}", new Object[]{this.rotateIntervalMs, this.lastRotate, currentTimestamp, periodicRotation});
        log.trace("Should apply scheduled rotation: (rotateScheduleIntervalMs: '{}', nextScheduledRotate: '{}', now: '{}')? {}", new Object[]{this.rotateScheduleIntervalMs, this.nextScheduledRotate, now, scheduledRotation});
        log.trace("Should apply size-based rotation (count {} >= flush size {})? {}", new Object[]{this.recordCounter, this.flushSize, messageSizeRotation});
        return periodicRotation || scheduledRotation || messageSizeRotation;
    }

    private void readOffset() {
        FilePathOffset latestOffsetEntry = this.wal.extractLatestOffset();
        if (latestOffsetEntry != null) {
            long lastCommittedOffset = latestOffsetEntry.getOffset();
            log.trace("Last committed offset based on WAL: {}", (Object)lastCommittedOffset);
            this.offset = lastCommittedOffset + 1L;
            log.trace("Next offset to read: {}", (Object)this.offset);
            return;
        }
        log.debug("Could not use WAL approach for recovering offsets, searching for latest offsets on HDFS.");
        String path = FileUtils.topicDirectory(this.url, this.topicsDir, this.tp.topic());
        TopicPartitionCommittedFileFilter filter = new TopicPartitionCommittedFileFilter(this.tp);
        FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(this.storage, new Path(path), filter);
        if (fileStatusWithMaxOffset != null) {
            long lastCommittedOffsetToHdfs = FileUtils.extractOffset(fileStatusWithMaxOffset.getPath().getName());
            log.trace("Last committed offset based on filenames: {}", (Object)lastCommittedOffsetToHdfs);
            this.offset = lastCommittedOffsetToHdfs + 1L;
            log.trace("Next offset to read: {}", (Object)this.offset);
        }
    }

    private void pause() {
        this.context.pause(new TopicPartition[]{this.tp});
    }

    private void resume() {
        this.context.resume(new TopicPartition[]{this.tp});
    }

    private RecordWriter getWriter(SinkRecord record, String encodedPartition) throws ConnectException {
        RecordWriter writer;
        block6: {
            if (this.writers.containsKey(encodedPartition)) {
                return this.writers.get(encodedPartition);
            }
            String tempFile = this.getTempFile(encodedPartition);
            try {
                if (this.writerProvider != null) {
                    writer = new OldRecordWriterWrapper(this.writerProvider.getRecordWriter(this.connectorConfig.getHadoopConfiguration(), tempFile, record, this.avroData));
                    break block6;
                }
                if (this.newWriterProvider != null) {
                    writer = this.newWriterProvider.getRecordWriter((Object)this.connectorConfig, tempFile);
                    break block6;
                }
                throw new ConnectException("Invalid state: either old or new RecordWriterProvider must be provided");
            }
            catch (IOException e) {
                throw new ConnectException("Couldn't create RecordWriter", (Throwable)e);
            }
        }
        this.writers.put(encodedPartition, writer);
        if (this.hiveIntegration && !this.hivePartitions.contains(encodedPartition)) {
            this.addHivePartition(encodedPartition);
            this.hivePartitions.add(encodedPartition);
        }
        return writer;
    }

    private String getTempFile(String encodedPartition) {
        String tempFile;
        if (this.tempFiles.containsKey(encodedPartition)) {
            tempFile = this.tempFiles.get(encodedPartition);
        } else {
            String directory = "/+tmp/" + this.getDirectory(encodedPartition);
            tempFile = FileUtils.tempFileName(this.url, this.topicsDir, directory, this.extension);
            this.tempFiles.put(encodedPartition, tempFile);
        }
        return tempFile;
    }

    private void applyWAL() throws ConnectException {
        if (!this.recovered) {
            this.wal.apply();
        }
    }

    private void truncateWAL() throws ConnectException {
        this.wal.truncate();
    }

    private void resetOffsets() throws ConnectException {
        if (!this.recovered) {
            this.readOffset();
            if (this.offset > 0L) {
                log.debug("Resetting offset for {} to {}", (Object)this.tp, (Object)this.offset);
                this.context.offset(this.tp, this.offset);
            } else {
                log.debug("Resetting offset for {} based upon existing consumer group offsets or, if there are none, the consumer's 'auto.offset.reset' value.", (Object)this.tp);
            }
            this.recovered = true;
        }
    }

    private void writeRecord(SinkRecord record) {
        if (this.offset == -1L) {
            this.offset = record.kafkaOffset();
        }
        String encodedPartition = this.partitioner.encodePartition(record);
        RecordWriter writer = this.getWriter(record, encodedPartition);
        writer.write(record);
        if (!this.startOffsets.containsKey(encodedPartition)) {
            this.startOffsets.put(encodedPartition, record.kafkaOffset());
        }
        this.endOffsets.put(encodedPartition, record.kafkaOffset());
        ++this.recordCounter;
    }

    private void closeTempFile(String encodedPartition) {
        RecordWriter writer = this.writers.remove(encodedPartition);
        if (writer != null) {
            writer.close();
        }
    }

    private void closeTempFile() {
        ConnectException connectException = null;
        for (String encodedPartition : this.tempFiles.keySet()) {
            try {
                this.closeTempFile(encodedPartition);
            }
            catch (ConnectException e) {
                connectException = e;
                log.error("Failed to close temporary file for partition {}. The connector will attempt to rewrite the temporary file.", (Object)encodedPartition);
            }
        }
        if (connectException != null) {
            for (String encodedPartition : this.tempFiles.keySet()) {
                try {
                    this.deleteTempFile(encodedPartition);
                }
                catch (ConnectException e) {
                    log.error("Failed to delete tmp file {}", (Object)this.tempFiles.get(encodedPartition), (Object)e);
                }
                this.startOffsets.remove(encodedPartition);
                this.endOffsets.remove(encodedPartition);
                this.buffer.clear();
            }
            log.debug("Resetting offset for {} to {}", (Object)this.tp, (Object)this.offset);
            this.context.offset(this.tp, this.offset);
            this.recordCounter = 0;
            throw connectException;
        }
    }

    private void appendToWAL(String encodedPartition) {
        String tempFile = this.tempFiles.get(encodedPartition);
        if (this.appended.contains(tempFile)) {
            return;
        }
        if (!this.startOffsets.containsKey(encodedPartition)) {
            return;
        }
        long startOffset = this.startOffsets.get(encodedPartition);
        long endOffset = this.endOffsets.get(encodedPartition);
        String directory = this.getDirectory(encodedPartition);
        String committedFile = FileUtils.committedFileName(this.url, this.topicsDir, directory, this.tp, startOffset, endOffset, this.extension, this.zeroPadOffsetFormat);
        this.wal.append(tempFile, committedFile);
        this.appended.add(tempFile);
    }

    private void appendToWAL() {
        this.beginAppend();
        for (String encodedPartition : this.tempFiles.keySet()) {
            this.appendToWAL(encodedPartition);
        }
        this.endAppend();
    }

    private void beginAppend() {
        if (!this.appended.contains("BEGIN")) {
            this.wal.append("BEGIN", "");
        }
    }

    private void endAppend() {
        if (!this.appended.contains("END")) {
            this.wal.append("END", "");
        }
    }

    private void commitFile() {
        log.debug("Committing files");
        this.appended.clear();
        long latestCommitted = this.tempFiles.keySet().stream().mapToLong(this::commitFile).max().orElse(-1L);
        if (latestCommitted > -1L) {
            this.offset = latestCommitted + 1L;
        }
    }

    private long commitFile(String encodedPartition) {
        if (!this.startOffsets.containsKey(encodedPartition)) {
            return -1L;
        }
        log.debug("Committing file for partition {}", (Object)encodedPartition);
        long startOffset = this.startOffsets.get(encodedPartition);
        long endOffset = this.endOffsets.get(encodedPartition);
        String tempFile = this.tempFiles.get(encodedPartition);
        String directory = this.getDirectory(encodedPartition);
        String committedFile = FileUtils.committedFileName(this.url, this.topicsDir, directory, this.tp, startOffset, endOffset, this.extension, this.zeroPadOffsetFormat);
        String directoryName = FileUtils.directoryName(this.url, this.topicsDir, directory);
        if (!this.storage.exists(directoryName)) {
            this.storage.create(directoryName);
        }
        this.storage.commit(tempFile, committedFile);
        this.startOffsets.remove(encodedPartition);
        this.endOffsets.remove(encodedPartition);
        this.recordCounter = 0;
        log.info("Committed {} for {}", (Object)committedFile, (Object)this.tp);
        return endOffset;
    }

    private void deleteTempFile(String encodedPartition) {
        this.storage.delete(this.tempFiles.get(encodedPartition));
    }

    private void setRetryTimeout(long timeoutMs) {
        this.context.timeout(timeoutMs);
    }

    private void createHiveTable() {
        Future<Void> future = this.executorService.submit(() -> {
            try {
                this.hive.createTable(this.hiveDatabase, this.hiveTableName, this.currentSchema, this.partitioner, this.tp.topic());
            }
            catch (Throwable e) {
                log.error("Creating Hive table threw unexpected error", e);
            }
            return null;
        });
        this.hiveUpdateFutures.add(future);
    }

    private void alterHiveSchema() {
        Future<Void> future = this.executorService.submit(() -> {
            try {
                this.hive.alterSchema(this.hiveDatabase, this.hiveTableName, this.currentSchema);
            }
            catch (Throwable e) {
                log.error("Altering Hive schema threw unexpected error", e);
            }
            return null;
        });
        this.hiveUpdateFutures.add(future);
    }

    private void addHivePartition(String location) {
        Future<Void> future = this.executorService.submit(() -> {
            try {
                this.hiveMetaStore.addPartition(this.hiveDatabase, this.hiveTableName, location);
            }
            catch (Throwable e) {
                log.error("Adding Hive partition threw unexpected error", e);
            }
            return null;
        });
        this.hiveUpdateFutures.add(future);
    }

    private static enum State {
        RECOVERY_STARTED,
        RECOVERY_PARTITION_PAUSED,
        WAL_APPLIED,
        OFFSET_RESET,
        WAL_TRUNCATED,
        WRITE_STARTED,
        WRITE_PARTITION_PAUSED,
        SHOULD_ROTATE,
        TEMP_FILE_CLOSED,
        WAL_APPENDED,
        FILE_COMMITTED;

        private static State[] vals;

        public State next() {
            return vals[(this.ordinal() + 1) % vals.length];
        }

        static {
            vals = State.values();
        }
    }
}

