/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.nar.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.net.SocketFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.nifi.annotation.behavior.RequiresInstanceClassLoading;
import org.apache.nifi.hadoop.SecurityUtil;
import org.apache.nifi.nar.NarProvider;
import org.apache.nifi.nar.NarProviderInitializationContext;
import org.apache.nifi.nar.hadoop.util.ExtensionFilter;
import org.apache.nifi.processors.hadoop.ExtendedConfiguration;
import org.apache.nifi.processors.hadoop.HdfsResources;
import org.apache.nifi.security.krb.KerberosKeytabUser;
import org.apache.nifi.security.krb.KerberosPasswordUser;
import org.apache.nifi.security.krb.KerberosUser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RequiresInstanceClassLoading(cloneAncestorResources=true)
public class HDFSNarProvider
implements NarProvider {
    private static final Logger LOGGER = LoggerFactory.getLogger(HDFSNarProvider.class);
    private static final String RESOURCES_PARAMETER = "resources";
    private static final String SOURCE_DIRECTORY_PARAMETER = "source.directory";
    private static final String STORAGE_LOCATION = "storage.location";
    private static final String KERBEROS_PRINCIPAL_PARAMETER = "kerberos.principal";
    private static final String KERBEROS_KEYTAB_PARAMETER = "kerberos.keytab";
    private static final String KERBEROS_PASSWORD_PARAMETER = "kerberos.password";
    private static final String NAR_EXTENSION = "nar";
    private static final String DELIMITER = "/";
    private static final int BUFFER_SIZE_DEFAULT = 4096;
    private static final Object RESOURCES_LOCK = new Object();
    private static final String STORAGE_LOCATION_PROPERTY = "fs.defaultFS";
    private volatile List<String> resources = null;
    private volatile Path sourceDirectory = null;
    private volatile String storageLocation = null;
    private volatile NarProviderInitializationContext context;
    private volatile boolean initialized = false;

    public void initialize(NarProviderInitializationContext context) {
        this.resources = Arrays.stream(((String)Objects.requireNonNull(context.getProperties().get(RESOURCES_PARAMETER))).split(",")).map(s -> s.trim()).filter(s -> !s.isEmpty()).collect(Collectors.toList());
        if (this.resources.isEmpty()) {
            throw new IllegalArgumentException("At least one HDFS configuration resource is necessary");
        }
        String sourceDirectory = (String)context.getProperties().get(SOURCE_DIRECTORY_PARAMETER);
        if (sourceDirectory == null || sourceDirectory.isEmpty()) {
            throw new IllegalArgumentException("Provider needs the source directory to be set");
        }
        this.sourceDirectory = new Path(sourceDirectory);
        this.storageLocation = (String)context.getProperties().get(STORAGE_LOCATION);
        this.context = context;
        this.initialized = true;
    }

    public Collection<String> listNars() throws IOException {
        if (!this.initialized) {
            LOGGER.error("Provider is not initialized");
        }
        HdfsResources hdfsResources = this.getHdfsResources();
        try {
            FileStatus[] fileStatuses = (FileStatus[])hdfsResources.getUserGroupInformation().doAs(() -> hdfsResources.getFileSystem().listStatus(this.sourceDirectory, (PathFilter)new ExtensionFilter(NAR_EXTENSION)));
            List<String> result = Arrays.stream(fileStatuses).filter(fileStatus -> fileStatus.isFile()).map(fileStatus -> fileStatus.getPath().getName()).collect(Collectors.toList());
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("The following NARs were found: " + String.join((CharSequence)", ", result));
            }
            return result;
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Provider cannot list NARs", e);
        }
    }

    public InputStream fetchNarContents(String location) throws IOException {
        if (!this.initialized) {
            LOGGER.error("Provider is not initialized");
        }
        Path path = this.getNarLocation(location);
        HdfsResources hdfsResources = this.getHdfsResources();
        try {
            if (((Boolean)hdfsResources.getUserGroupInformation().doAs(() -> !hdfsResources.getFileSystem().exists(path))).booleanValue()) {
                throw new IOException("Provider cannot find " + location);
            }
            return (InputStream)hdfsResources.getUserGroupInformation().doAs(() -> hdfsResources.getFileSystem().open(path, 4096));
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new IOException("Error during acquiring file", e);
        }
    }

    private Path getNarLocation(String location) {
        String result = this.sourceDirectory.toString();
        if (!result.endsWith(DELIMITER)) {
            result = result + DELIMITER;
        }
        return new Path(result + location);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private HdfsResources getHdfsResources() throws IOException {
        FileSystem fs;
        UserGroupInformation ugi;
        KerberosKeytabUser kerberosUser;
        ExtendedConfiguration config = new ExtendedConfiguration(LOGGER);
        config.setClassLoader(Thread.currentThread().getContextClassLoader());
        for (String resource : this.resources) {
            config.addResource(new Path(resource));
        }
        if (this.storageLocation != null) {
            config.set(STORAGE_LOCATION_PROPERTY, this.storageLocation);
        }
        this.checkHdfsUriForTimeout((Configuration)config);
        String disableCacheName = String.format("fs.%s.impl.disable.cache", FileSystem.getDefaultUri((Configuration)config).getScheme());
        config.set(disableCacheName, "true");
        Object object = RESOURCES_LOCK;
        synchronized (object) {
            if (SecurityUtil.isSecurityEnabled((Configuration)config)) {
                String principal = (String)this.context.getProperties().get(KERBEROS_PRINCIPAL_PARAMETER);
                String keyTab = (String)this.context.getProperties().get(KERBEROS_KEYTAB_PARAMETER);
                String password = (String)this.context.getProperties().get(KERBEROS_PASSWORD_PARAMETER);
                if (keyTab != null) {
                    kerberosUser = new KerberosKeytabUser(principal, keyTab);
                } else if (password != null) {
                    kerberosUser = new KerberosPasswordUser(principal, password);
                } else {
                    throw new IOException("Unable to authenticate with Kerberos, no keytab or password was provided");
                }
                ugi = SecurityUtil.getUgiForKerberosUser((Configuration)config, (KerberosUser)kerberosUser);
            } else {
                config.set("ipc.client.fallback-to-simple-auth-allowed", "true");
                config.set("hadoop.security.authentication", "simple");
                ugi = SecurityUtil.loginSimple((Configuration)config);
                kerberosUser = null;
            }
            fs = this.getFileSystemAsUser((Configuration)config, ugi);
        }
        LOGGER.debug("resetHDFSResources UGI [{}], KerberosUser [{}]", new Object[]{ugi, kerberosUser});
        Path workingDir = fs.getWorkingDirectory();
        LOGGER.debug("Initialized a new HDFS File System with working dir: {} default block size: {} default replication: {} config: {}", new Object[]{workingDir, fs.getDefaultBlockSize(workingDir), fs.getDefaultReplication(workingDir), config.toString()});
        if (!fs.exists(this.sourceDirectory)) {
            throw new IllegalArgumentException("Source directory is not existing");
        }
        return new HdfsResources((Configuration)config, fs, ugi, (KerberosUser)kerberosUser);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void checkHdfsUriForTimeout(Configuration config) throws IOException {
        URI hdfsUri = FileSystem.getDefaultUri((Configuration)config);
        String address = hdfsUri.getAuthority();
        int port = hdfsUri.getPort();
        if (address == null || address.isEmpty() || port < 0) {
            return;
        }
        InetSocketAddress namenode = NetUtils.createSocketAddr((String)address, (int)port);
        SocketFactory socketFactory = NetUtils.getDefaultSocketFactory((Configuration)config);
        Socket socket = null;
        try {
            socket = socketFactory.createSocket();
            NetUtils.connect((Socket)socket, (SocketAddress)namenode, (int)1000);
        }
        finally {
            IOUtils.closeQuietly((Socket)socket);
        }
    }

    private FileSystem getFileSystemAsUser(final Configuration config, UserGroupInformation ugi) throws IOException {
        try {
            return (FileSystem)ugi.doAs((PrivilegedExceptionAction)new PrivilegedExceptionAction<FileSystem>(){

                @Override
                public FileSystem run() throws Exception {
                    return FileSystem.get((Configuration)config);
                }
            });
        }
        catch (InterruptedException e) {
            throw new IOException("Unable to create file system: " + e.getMessage(), e);
        }
    }
}

