/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.filecache;

import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileCache {
    private static final Logger LOG = LoggerFactory.getLogger(FileCache.class);
    private final Object lock = new Object();
    private final Map<JobID, Map<String, Future<Path>>> entries;
    private final Map<JobID, Set<ExecutionAttemptID>> jobRefHolders;
    private final ScheduledExecutorService executorService;
    private final File[] storageDirectories;
    private final Thread shutdownHook;
    private int nextDirectory;
    private final PermanentBlobService blobService;
    private final long cleanupInterval;

    public FileCache(String[] tempDirectories, PermanentBlobService blobService) throws IOException {
        this(tempDirectories, blobService, Executors.newScheduledThreadPool(10, new ExecutorThreadFactory("flink-file-cache")), 5000L);
    }

    @VisibleForTesting
    FileCache(String[] tempDirectories, PermanentBlobService blobService, ScheduledExecutorService executorService, long cleanupInterval) throws IOException {
        Preconditions.checkNotNull((Object)tempDirectories);
        this.cleanupInterval = cleanupInterval;
        this.storageDirectories = new File[tempDirectories.length];
        for (int i = 0; i < tempDirectories.length; ++i) {
            String cacheDirName = "flink-dist-cache-" + UUID.randomUUID().toString();
            this.storageDirectories[i] = new File(tempDirectories[i], cacheDirName);
            String path = this.storageDirectories[i].getAbsolutePath();
            if (!this.storageDirectories[i].mkdirs()) {
                LOG.error("User file cache cannot create directory " + path);
                for (int k = 0; k < i; ++k) {
                    if (this.storageDirectories[k].delete()) continue;
                    LOG.warn("User file cache cannot remove prior directory " + this.storageDirectories[k].getAbsolutePath());
                }
                throw new IOException("File cache cannot create temp storage directory: " + path);
            }
            LOG.info("User file cache uses directory " + path);
        }
        this.shutdownHook = FileCache.createShutdownHook(this, LOG);
        this.entries = new HashMap<JobID, Map<String, Future<Path>>>();
        this.jobRefHolders = new HashMap<JobID, Set<ExecutionAttemptID>>();
        this.executorService = executorService;
        this.blobService = blobService;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void shutdown() {
        Object object = this.lock;
        synchronized (object) {
            ScheduledExecutorService es = this.executorService;
            if (es != null) {
                es.shutdown();
                try {
                    es.awaitTermination(this.cleanupInterval, TimeUnit.MILLISECONDS);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            this.entries.clear();
            this.jobRefHolders.clear();
            for (File dir : this.storageDirectories) {
                try {
                    FileUtils.deleteDirectory((File)dir);
                }
                catch (IOException e) {
                    LOG.error("File cache could not properly clean up storage directory.");
                }
            }
            ShutdownHookUtil.removeShutdownHook((Thread)this.shutdownHook, (String)this.getClass().getSimpleName(), (Logger)LOG);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Future<Path> createTmpFile(String name, DistributedCache.DistributedCacheEntry entry, JobID jobID, ExecutionAttemptID executionId) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            Map jobEntries = this.entries.computeIfAbsent(jobID, k -> new HashMap());
            Set refHolders = this.jobRefHolders.computeIfAbsent(jobID, id -> new HashSet());
            refHolders.add(executionId);
            Future fileEntry = (Future)jobEntries.get(name);
            if (fileEntry != null) {
                return fileEntry;
            }
            File tempDirToUse = new File(this.storageDirectories[this.nextDirectory++], jobID.toString());
            if (this.nextDirectory >= this.storageDirectories.length) {
                this.nextDirectory = 0;
            }
            Callable<Path> cp = entry.blobKey != null ? new CopyFromBlobProcess(entry, jobID, this.blobService, new Path(tempDirToUse.getAbsolutePath())) : new CopyFromDFSProcess(entry, new Path(tempDirToUse.getAbsolutePath()));
            FutureTask<Path> copyTask = new FutureTask<Path>(cp);
            this.executorService.submit(copyTask);
            jobEntries.put(name, copyTask);
            return copyTask;
        }
    }

    private static Thread createShutdownHook(FileCache cache, Logger logger) {
        return ShutdownHookUtil.addShutdownHook(cache::shutdown, (String)FileCache.class.getSimpleName(), (Logger)logger);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseJob(JobID jobId, ExecutionAttemptID executionId) {
        Preconditions.checkNotNull((Object)jobId);
        Object object = this.lock;
        synchronized (object) {
            Set<ExecutionAttemptID> jobRefCounter = this.jobRefHolders.get(jobId);
            if (jobRefCounter == null || jobRefCounter.isEmpty()) {
                return;
            }
            jobRefCounter.remove((Object)executionId);
            if (jobRefCounter.isEmpty()) {
                this.executorService.schedule(new DeleteProcess(jobId), this.cleanupInterval, TimeUnit.MILLISECONDS);
            }
        }
    }

    @VisibleForTesting
    class DeleteProcess
    implements Runnable {
        private final JobID jobID;

        DeleteProcess(JobID jobID) {
            this.jobID = jobID;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            try {
                Object object = FileCache.this.lock;
                synchronized (object) {
                    Set jobRefs = (Set)FileCache.this.jobRefHolders.get(this.jobID);
                    if (jobRefs != null && jobRefs.isEmpty()) {
                        for (Future fileFuture : ((Map)FileCache.this.entries.get(this.jobID)).values()) {
                            fileFuture.cancel(true);
                        }
                        FileCache.this.entries.remove(this.jobID);
                        FileCache.this.jobRefHolders.remove(this.jobID);
                        for (File storageDirectory : FileCache.this.storageDirectories) {
                            File tempDir = new File(storageDirectory, this.jobID.toString());
                            FileUtils.deleteDirectory((File)tempDir);
                        }
                    }
                }
            }
            catch (IOException e) {
                LOG.error("Could not delete file from local file cache.", (Throwable)e);
            }
        }
    }

    private static class CopyFromDFSProcess
    implements Callable<Path> {
        private final Path filePath;
        private final Path cachedPath;
        private boolean executable;

        public CopyFromDFSProcess(DistributedCache.DistributedCacheEntry e, Path cachedPath) {
            this.filePath = new Path(e.filePath);
            this.executable = e.isExecutable;
            String sourceFile = e.filePath;
            int posOfSep = sourceFile.lastIndexOf("/");
            if (posOfSep > 0) {
                sourceFile = sourceFile.substring(posOfSep + 1);
            }
            this.cachedPath = new Path(cachedPath, sourceFile);
        }

        @Override
        public Path call() throws IOException {
            FileUtils.copy((Path)this.filePath, (Path)this.cachedPath, (boolean)this.executable);
            return this.cachedPath;
        }
    }

    private static class CopyFromBlobProcess
    implements Callable<Path> {
        private final PermanentBlobKey blobKey;
        private final Path target;
        private final boolean isDirectory;
        private final boolean isExecutable;
        private final JobID jobID;
        private final PermanentBlobService blobService;

        CopyFromBlobProcess(DistributedCache.DistributedCacheEntry e, JobID jobID, PermanentBlobService blobService, Path target) throws Exception {
            this.isExecutable = e.isExecutable;
            this.isDirectory = e.isZipped;
            this.jobID = jobID;
            this.blobService = blobService;
            this.blobKey = (PermanentBlobKey)InstantiationUtil.deserializeObject((byte[])e.blobKey, (ClassLoader)Thread.currentThread().getContextClassLoader());
            this.target = target;
        }

        @Override
        public Path call() throws IOException {
            File file = this.blobService.getFile(this.jobID, this.blobKey);
            if (this.isDirectory) {
                Path directory = FileUtils.expandDirectory((Path)new Path(file.getAbsolutePath()), (Path)this.target);
                return directory;
            }
            file.setExecutable(this.isExecutable);
            return Path.fromLocalFile((File)file);
        }
    }
}

