/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.hdfs;

import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.FileUtils;
import io.confluent.connect.hdfs.Format;
import io.confluent.connect.hdfs.HdfsSinkConnectorConfig;
import io.confluent.connect.hdfs.RecordWriterProvider;
import io.confluent.connect.hdfs.SchemaFileReader;
import io.confluent.connect.hdfs.TopicPartitionWriter;
import io.confluent.connect.hdfs.filter.TopicCommittedFileFilter;
import io.confluent.connect.hdfs.hive.HiveMetaStore;
import io.confluent.connect.hdfs.hive.HiveUtil;
import io.confluent.connect.hdfs.partitioner.Partitioner;
import io.confluent.connect.hdfs.storage.HdfsStorage;
import io.confluent.connect.hdfs.storage.Storage;
import io.confluent.connect.storage.StorageFactory;
import io.confluent.connect.storage.format.RecordWriter;
import io.confluent.connect.storage.hive.HiveFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataWriter {
    private static final Logger log = LoggerFactory.getLogger(DataWriter.class);
    private static final Time SYSTEM_TIME = new SystemTime();
    private final Time time;
    private final Map<TopicPartition, TopicPartitionWriter> topicPartitionWriters;
    private HdfsStorage storage;
    private HashMap<String, String> logDirs;
    private HashMap<String, String> topicDirs;
    private Format format;
    private RecordWriterProvider writerProvider;
    private io.confluent.connect.storage.format.RecordWriterProvider<HdfsSinkConnectorConfig> newWriterProvider;
    private io.confluent.connect.storage.format.SchemaFileReader<HdfsSinkConnectorConfig, Path> schemaFileReader;
    private io.confluent.connect.storage.format.Format<HdfsSinkConnectorConfig, Path> newFormat;
    private Partitioner partitioner;
    private HdfsSinkConnectorConfig connectorConfig;
    private AvroData avroData;
    private SinkTaskContext context;
    private ExecutorService executorService;
    private String hiveDatabase;
    private HiveMetaStore hiveMetaStore;
    private HiveUtil hive;
    private Queue<Future<Void>> hiveUpdateFutures;
    private Thread ticketRenewThread;
    private volatile boolean isRunning;

    public DataWriter(HdfsSinkConnectorConfig connectorConfig, SinkTaskContext context, AvroData avroData) {
        this(connectorConfig, context, avroData, SYSTEM_TIME);
    }

    public DataWriter(HdfsSinkConnectorConfig config, SinkTaskContext context, AvroData avroData, Time time) {
        this.time = time;
        this.connectorConfig = config;
        this.avroData = avroData;
        this.context = context;
        this.topicDirs = new HashMap();
        this.logDirs = new HashMap();
        this.topicPartitionWriters = new HashMap<TopicPartition, TopicPartitionWriter>();
        try {
            this.partitioner = this.newPartitioner(config);
        }
        catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            throw new ConnectException(String.format("Unable to initialize partitioner: %s", e.getMessage()), (Throwable)e);
        }
        System.setProperty("hadoop.home.dir", config.hadoopHome());
        log.info("Hadoop configuration directory {}", (Object)config.hadoopConfDir());
        Configuration hadoopConfiguration = config.getHadoopConfiguration();
        if (!config.hadoopConfDir().equals("")) {
            hadoopConfiguration.addResource(new Path(config.hadoopConfDir() + "/core-site.xml"));
            hadoopConfiguration.addResource(new Path(config.hadoopConfDir() + "/hdfs-site.xml"));
        }
        hadoopConfiguration.setBoolean("fs.automatic.close", false);
        if (config.kerberosAuthentication()) {
            this.configureKerberosAuthentication(hadoopConfiguration);
        }
        Class<? extends HdfsStorage> storageClass = config.storageClass();
        this.storage = (HdfsStorage)StorageFactory.createStorage(storageClass, HdfsSinkConnectorConfig.class, (Object)((Object)config), (String)this.connectorConfig.url());
        for (TopicPartition tp : context.assignment()) {
            String topicDir = this.connectorConfig.getTopicsDirFromTopic(tp.topic());
            String logDir = this.connectorConfig.getLogsDirFromTopic(tp.topic());
            this.topicDirs.put(tp.topic(), topicDir);
            this.logDirs.put(tp.topic(), logDir);
            this.createDir(topicDir);
            this.createDir(topicDir + "/+tmp/");
            this.createDir(logDir);
        }
        try {
            try {
                Class formatClass = config.getClass("format.class");
                this.newFormat = (io.confluent.connect.storage.format.Format)formatClass.getConstructor(HdfsStorage.class).newInstance(this.storage);
                this.newWriterProvider = this.newFormat.getRecordWriterProvider();
                this.schemaFileReader = this.newFormat.getSchemaFileReader();
            }
            catch (NoSuchMethodException e) {
                Class formatClass = config.getClass("format.class");
                this.format = (Format)formatClass.getConstructor(new Class[0]).newInstance(new Object[0]);
                this.writerProvider = this.format.getRecordWriterProvider();
                final SchemaFileReader oldReader = this.format.getSchemaFileReader(avroData);
                this.schemaFileReader = new io.confluent.connect.storage.format.SchemaFileReader<HdfsSinkConnectorConfig, Path>(){

                    public Schema getSchema(HdfsSinkConnectorConfig hdfsSinkConnectorConfig, Path path) {
                        try {
                            return oldReader.getSchema(hdfsSinkConnectorConfig.getHadoopConfiguration(), path);
                        }
                        catch (IOException e) {
                            throw new ConnectException("Failed to get schema", (Throwable)e);
                        }
                    }

                    public Iterator<Object> iterator() {
                        throw new UnsupportedOperationException();
                    }

                    public boolean hasNext() {
                        throw new UnsupportedOperationException();
                    }

                    public Object next() {
                        throw new UnsupportedOperationException();
                    }

                    public void remove() {
                        throw new UnsupportedOperationException();
                    }

                    public void close() throws IOException {
                    }
                };
            }
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new ConnectException("Reflection exception: ", (Throwable)e);
        }
        if (this.connectorConfig.hiveIntegrationEnabled()) {
            this.initializeHiveServices(hadoopConfiguration);
        }
        this.initializeTopicPartitionWriters(context.assignment());
    }

    private void configureKerberosAuthentication(Configuration hadoopConfiguration) {
        SecurityUtil.setAuthenticationMethod((UserGroupInformation.AuthenticationMethod)UserGroupInformation.AuthenticationMethod.KERBEROS, (Configuration)hadoopConfiguration);
        if (this.connectorConfig.connectHdfsPrincipal() == null || this.connectorConfig.connectHdfsKeytab() == null) {
            throw new ConfigException("Hadoop is using Kerberos for authentication, you need to provide both a connect principal and the path to the keytab of the principal.");
        }
        hadoopConfiguration.set("hadoop.security.authentication", "kerberos");
        hadoopConfiguration.set("hadoop.security.authorization", "true");
        try {
            String hostname = InetAddress.getLocalHost().getCanonicalHostName();
            String namenodePrincipal = SecurityUtil.getServerPrincipal((String)this.connectorConfig.hdfsNamenodePrincipal(), (String)hostname);
            if (hadoopConfiguration.get("dfs.namenode.kerberos.principal") == null) {
                hadoopConfiguration.set("dfs.namenode.kerberos.principal", namenodePrincipal);
            }
            log.info("Hadoop namenode principal: {}", (Object)hadoopConfiguration.get("dfs.namenode.kerberos.principal"));
            UserGroupInformation.setConfiguration((Configuration)hadoopConfiguration);
            String principal = SecurityUtil.getServerPrincipal((String)this.connectorConfig.connectHdfsPrincipal(), (String)hostname);
            UserGroupInformation.loginUserFromKeytab((String)principal, (String)this.connectorConfig.connectHdfsKeytab());
            UserGroupInformation ugi = UserGroupInformation.getLoginUser();
            log.info("Login as: " + ugi.getUserName());
            this.isRunning = true;
            this.ticketRenewThread = new Thread(() -> this.renewKerberosTicket(ugi));
        }
        catch (UnknownHostException e) {
            throw new ConnectException(String.format("Could not resolve local hostname for Kerberos authentication: %s", e.getMessage()), (Throwable)e);
        }
        catch (IOException e) {
            throw new ConnectException(String.format("Could not authenticate with Kerberos: %s", e.getMessage()), (Throwable)e);
        }
        log.info("Starting the Kerberos ticket renew thread with period {} ms.", (Object)this.connectorConfig.kerberosTicketRenewPeriodMs());
        this.ticketRenewThread.start();
    }

    private void initializeHiveServices(Configuration hadoopConfiguration) {
        this.hiveDatabase = this.connectorConfig.hiveDatabase();
        this.hiveMetaStore = new HiveMetaStore(hadoopConfiguration, this.connectorConfig);
        if (this.format != null) {
            this.hive = this.format.getHiveUtil(this.connectorConfig, this.hiveMetaStore);
        } else if (this.newFormat != null) {
            final io.confluent.connect.storage.hive.HiveUtil newHiveUtil = ((HiveFactory)this.newFormat.getHiveFactory()).createHiveUtil((AbstractConfig)this.connectorConfig, (io.confluent.connect.storage.hive.HiveMetaStore)this.hiveMetaStore);
            this.hive = new HiveUtil(this.connectorConfig, this.hiveMetaStore){

                @Override
                public void createTable(String database, String tableName, Schema schema, Partitioner partitioner, String topic) {
                    newHiveUtil.createTable(database, tableName, schema, (io.confluent.connect.storage.partitioner.Partitioner)partitioner, topic);
                    log.debug("Created Hive table {}", (Object)tableName);
                }

                public void alterSchema(String database, String tableName, Schema schema) {
                    newHiveUtil.alterSchema(database, tableName, schema);
                    log.debug("Altered Hive table {}", (Object)tableName);
                }
            };
        } else {
            throw new ConnectException("One of old or new format classes must be provided");
        }
        this.executorService = Executors.newSingleThreadExecutor();
        this.hiveUpdateFutures = new LinkedList<Future<Void>>();
    }

    private void initializeTopicPartitionWriters(Set<TopicPartition> assignment) {
        for (TopicPartition tp : assignment) {
            TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(tp, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, this.context, this.avroData, this.hiveMetaStore, this.hive, this.schemaFileReader, this.executorService, this.hiveUpdateFutures, this.time, this.connectorConfig.getHiveTableName(tp.topic()));
            this.topicPartitionWriters.put(tp, topicPartitionWriter);
        }
    }

    public void write(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            String topic = record.topic();
            int partition = record.kafkaPartition();
            TopicPartition tp = new TopicPartition(topic, partition);
            this.topicPartitionWriters.get(tp).buffer(record);
        }
        if (this.connectorConfig.hiveIntegrationEnabled()) {
            Iterator iterator = this.hiveUpdateFutures.iterator();
            while (iterator.hasNext()) {
                try {
                    Future future = (Future)iterator.next();
                    if (!future.isDone()) break;
                    future.get();
                    iterator.remove();
                }
                catch (ExecutionException e) {
                    throw new RuntimeException(e);
                }
                catch (InterruptedException e) {
                }
            }
        }
        for (TopicPartition tp : this.topicPartitionWriters.keySet()) {
            this.topicPartitionWriters.get(tp).write();
        }
    }

    public void recover(TopicPartition tp) {
        this.topicPartitionWriters.get(tp).recover();
    }

    public void syncWithHive() throws ConnectException {
        HashSet<String> topics = new HashSet<String>();
        for (TopicPartition tp : this.topicPartitionWriters.keySet()) {
            topics.add(tp.topic());
        }
        try {
            for (String topic : topics) {
                FileStatus[] statuses;
                TopicCommittedFileFilter filter;
                String topicDir = FileUtils.topicDirectory(this.connectorConfig.url(), this.topicDirs.get(topic), topic);
                FileStatus fileStatusWithMaxOffset = FileUtils.fileStatusWithMaxOffset(this.storage, new Path(topicDir), filter = new TopicCommittedFileFilter(topic));
                if (fileStatusWithMaxOffset == null) continue;
                Path path = fileStatusWithMaxOffset.getPath();
                Schema latestSchema = this.schemaFileReader.getSchema((Object)this.connectorConfig, (Object)path);
                String hiveTableName = this.connectorConfig.getHiveTableName(topic);
                this.hive.createTable(this.hiveDatabase, hiveTableName, latestSchema, this.partitioner, topic);
                List partitions = this.hiveMetaStore.listPartitions(this.hiveDatabase, hiveTableName, (short)-1);
                for (FileStatus status : statuses = FileUtils.getDirectories(this.storage, new Path(topicDir))) {
                    String location = status.getPath().toString();
                    if (partitions.contains(location)) continue;
                    String partitionValue = this.getPartitionValue(location);
                    this.hiveMetaStore.addPartition(this.hiveDatabase, hiveTableName, partitionValue);
                }
            }
        }
        catch (IOException e) {
            throw new ConnectException((Throwable)e);
        }
    }

    public void open(Collection<TopicPartition> partitions) {
        for (TopicPartition tp : partitions) {
            TopicPartitionWriter topicPartitionWriter = new TopicPartitionWriter(tp, this.storage, this.writerProvider, this.newWriterProvider, this.partitioner, this.connectorConfig, this.context, this.avroData, this.hiveMetaStore, this.hive, this.schemaFileReader, this.executorService, this.hiveUpdateFutures, this.time, this.connectorConfig.getHiveTableName(tp.topic()));
            this.topicPartitionWriters.put(tp, topicPartitionWriter);
            this.recover(tp);
        }
    }

    public void close() {
        for (TopicPartitionWriter writer : this.topicPartitionWriters.values()) {
            try {
                if (writer == null) continue;
                writer.close();
            }
            catch (ConnectException e) {
                log.warn("Unable to close writer for topic partition {}: ", (Object)writer.topicPartition(), (Object)e);
            }
        }
        this.topicPartitionWriters.clear();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() {
        if (this.executorService != null) {
            boolean terminated = false;
            try {
                log.info("Shutting down Hive executor service.");
                this.executorService.shutdown();
                long shutDownTimeout = this.connectorConfig.getLong("shutdown.timeout.ms");
                log.info("Awaiting termination.");
                terminated = this.executorService.awaitTermination(shutDownTimeout, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            if (!terminated) {
                log.warn("Unclean Hive executor service shutdown, you probably need to sync with Hive next time you start the connector");
                this.executorService.shutdownNow();
            }
        }
        this.storage.close();
        if (this.ticketRenewThread != null) {
            DataWriter dataWriter = this;
            synchronized (dataWriter) {
                this.isRunning = false;
                this.notifyAll();
            }
        }
    }

    public Partitioner getPartitioner() {
        return this.partitioner;
    }

    public Map<TopicPartition, Long> getCommittedOffsets() {
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        log.debug("Writer looking for last offsets for topic partitions {}", this.topicPartitionWriters.keySet());
        for (TopicPartition tp : this.topicPartitionWriters.keySet()) {
            long committedOffset = this.topicPartitionWriters.get(tp).offset();
            log.debug("Writer found last offset {} for topic partition {}", (Object)committedOffset, (Object)tp);
            if (committedOffset < 0L) continue;
            offsets.put(tp, committedOffset);
        }
        return offsets;
    }

    public TopicPartitionWriter getBucketWriter(TopicPartition tp) {
        return this.topicPartitionWriters.get(tp);
    }

    public Storage getStorage() {
        return this.storage;
    }

    Map<String, RecordWriter> getWriters(TopicPartition tp) {
        return this.topicPartitionWriters.get(tp).getWriters();
    }

    public Map<String, String> getTempFileNames(TopicPartition tp) {
        TopicPartitionWriter topicPartitionWriter = this.topicPartitionWriters.get(tp);
        return topicPartitionWriter.getTempFiles();
    }

    private void createDir(String dir) {
        String path = this.connectorConfig.url() + "/" + dir;
        if (!this.storage.exists(path)) {
            log.trace("Creating directory {}", (Object)path);
            this.storage.create(path);
        }
    }

    private Partitioner newPartitioner(HdfsSinkConnectorConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Partitioner partitioner;
        try {
            Class partitionerClass = config.getClass("partitioner.class");
            partitioner = (Partitioner)partitionerClass.newInstance();
        }
        catch (ClassCastException e) {
            Class partitionerClass = config.getClass("partitioner.class");
            partitioner = new PartitionerWrapper((io.confluent.connect.storage.partitioner.Partitioner<FieldSchema>)((io.confluent.connect.storage.partitioner.Partitioner)partitionerClass.newInstance()));
        }
        partitioner.configure(new HashMap<String, Object>(config.plainValues()));
        return partitioner;
    }

    private String getPartitionValue(String path) {
        String[] parts = path.split("/");
        StringBuilder sb = new StringBuilder();
        sb.append("/");
        for (int i = 3; i < parts.length; ++i) {
            sb.append(parts[i]);
            sb.append("/");
        }
        return sb.toString();
    }

    private Partitioner createPartitioner(HdfsSinkConnectorConfig config) throws ClassNotFoundException, IllegalAccessException, InstantiationException {
        Class<?> partitionerClasss = Class.forName(config.getString("partitioner.class"));
        Map<String, Object> map = this.copyConfig(config);
        Partitioner partitioner = (Partitioner)partitionerClasss.newInstance();
        partitioner.configure(map);
        return partitioner;
    }

    private Map<String, Object> copyConfig(HdfsSinkConnectorConfig config) {
        HashMap<String, Object> map = new HashMap<String, Object>();
        map.put("partition.field.name", config.getString("partition.field.name"));
        map.put("partition.duration.ms", config.getLong("partition.duration.ms"));
        map.put("path.format", config.getString("path.format"));
        map.put("locale", config.getString("locale"));
        map.put("timezone", config.getString("timezone"));
        return map;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void renewKerberosTicket(UserGroupInformation ugi) {
        DataWriter dataWriter = this;
        synchronized (dataWriter) {
            while (this.isRunning) {
                try {
                    this.wait(this.connectorConfig.kerberosTicketRenewPeriodMs());
                    if (!this.isRunning) continue;
                    log.debug("Attempting re-login from keytab for user: {}", (Object)ugi.getUserName());
                    ugi.reloginFromKeytab();
                }
                catch (IOException e) {
                    log.error("Error renewing the ticket", (Throwable)e);
                }
                catch (InterruptedException interruptedException) {}
            }
            return;
        }
    }

    public static class PartitionerWrapper
    implements Partitioner {
        public final io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> partitioner;

        public PartitionerWrapper(io.confluent.connect.storage.partitioner.Partitioner<FieldSchema> partitioner) {
            this.partitioner = partitioner;
        }

        @Override
        public void configure(Map<String, Object> config) {
            this.partitioner.configure(config);
        }

        @Override
        public String encodePartition(SinkRecord sinkRecord) {
            return this.partitioner.encodePartition(sinkRecord);
        }

        @Override
        public String generatePartitionedPath(String topic, String encodedPartition) {
            return this.partitioner.generatePartitionedPath(topic, encodedPartition);
        }

        @Override
        public List<FieldSchema> partitionFields() {
            return this.partitioner.partitionFields();
        }
    }
}

