/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.DataWriter;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.Version;
import io.confluent.connect.storage.schema.StorageSchemaCompatibility;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.joda.time.DateTimeZone;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HdfsSinkTask
extends SinkTask {
    private static final Logger log = LoggerFactory.getLogger(HdfsSinkTask.class);
    private DataWriter hdfsWriter;
    private AvroData avroData;
    private String taskId;
    private String connectorName;
    private String connectorNameAndTaskId;

    public String version() {
        return Version.getVersion();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(Map<String, String> props) {
        this.connectorName = props.get("name");
        this.taskId = props.get("task.id");
        this.connectorNameAndTaskId = String.format("%s-%s", this.connectorName, this.taskId);
        log.info("Starting HDFS Sink Task {}", (Object)this.connectorNameAndTaskId);
        try {
            StorageSchemaCompatibility compatibility;
            HdfsSinkConnectorConfig connectorConfig = new HdfsSinkConnectorConfig(props);
            boolean hiveIntegration = connectorConfig.getBoolean("hive.integration");
            if (hiveIntegration && (compatibility = StorageSchemaCompatibility.getCompatibility((String)connectorConfig.getString("schema.compatibility"))) == StorageSchemaCompatibility.NONE) {
                throw new ConfigException("Hive Integration requires schema compatibility to be BACKWARD, FORWARD or FULL");
            }
            if (connectorConfig.getLong("rotate.schedule.interval.ms") > 0L) {
                String timeZoneString = connectorConfig.getString("timezone");
                if (timeZoneString.equals("")) {
                    throw new ConfigException("timezone", (Object)timeZoneString, "Timezone cannot be empty when using scheduled file rotation.");
                }
                DateTimeZone.forID((String)timeZoneString);
            }
            this.avroData = new AvroData(connectorConfig.avroDataConfig());
            this.hdfsWriter = new DataWriter(connectorConfig, this.context, this.avroData);
            this.recover(this.context.assignment());
            if (hiveIntegration) {
                this.syncWithHive();
            }
        }
        catch (ConfigException e) {
            throw new ConnectException("Couldn't start HdfsSinkConnector due to configuration error.", (Throwable)e);
        }
        catch (ConnectException e) {
            log.error("Couldn't start HdfsSinkConnector:", (Throwable)e);
            log.info("Shutting down HdfsSinkConnector.");
            if (this.hdfsWriter != null) {
                try {
                    try {
                        log.debug("Closing data writer due to task start failure.");
                        this.hdfsWriter.close();
                    }
                    finally {
                        log.debug("Stopping data writer due to task start failure.");
                        this.hdfsWriter.stop();
                    }
                }
                catch (Throwable t) {
                    log.debug("Error closing and stopping data writer: {}", (Object)t.getMessage(), (Object)t);
                }
            }
            throw e;
        }
        log.info("The connector relies on offsets in the WAL files, if these are not present it uses the filenames in HDFS. In both cases the connector commits offsets to Connect to enable monitoring progress of the HDFS connector. Upon startup, the HDFS Connector restores offsets from the WAL log files, if these are not present it uses the filenames in HDFS. In the absence of files in HDFS, the connector will attempt to find offsets for its consumer group in the '__consumer_offsets' topic. If offsets are not found, the consumer will rely on the reset policy specified in the 'consumer.auto.offset.reset' property to start exporting data to HDFS.");
    }

    public void put(Collection<SinkRecord> records) throws ConnectException {
        log.debug("Read {} records from Kafka", (Object)records.size());
        try {
            this.hdfsWriter.write(records);
        }
        catch (ConnectException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
        HashMap<TopicPartition, OffsetAndMetadata> result = new HashMap<TopicPartition, OffsetAndMetadata>();
        for (Map.Entry<TopicPartition, Long> entry : this.hdfsWriter.getCommittedOffsets().entrySet()) {
            log.debug("Found last committed offset {} for topic partition {}", (Object)entry.getValue(), (Object)entry.getKey());
            result.put(entry.getKey(), new OffsetAndMetadata(entry.getValue().longValue()));
        }
        log.debug("Returning committed offsets {}", result);
        return result;
    }

    public void open(Collection<TopicPartition> partitions) {
        log.debug("Opening HDFS Sink Task {}", (Object)this.connectorNameAndTaskId);
        this.hdfsWriter.open(partitions);
    }

    public void close(Collection<TopicPartition> partitions) {
        log.debug("Closing HDFS Sink Task {}", (Object)this.connectorNameAndTaskId);
        if (this.hdfsWriter != null) {
            this.hdfsWriter.close();
        }
    }

    public void stop() throws ConnectException {
        log.info("Stopping HDFS Sink Task {}", (Object)this.connectorNameAndTaskId);
        if (this.hdfsWriter != null) {
            this.hdfsWriter.stop();
        }
    }

    private void recover(Set<TopicPartition> assignment) {
        for (TopicPartition tp : assignment) {
            this.hdfsWriter.recover(tp);
        }
    }

    private void syncWithHive() throws ConnectException {
        this.hdfsWriter.syncWithHive();
    }

    public AvroData getAvroData() {
        return this.avroData;
    }
}

