/*
 * Decompiled with CFR 0.152.
 */
package io.milvus.bulkwriter;

import com.google.common.collect.Lists;
import com.google.gson.JsonObject;
import io.milvus.bulkwriter.Buffer;
import io.milvus.bulkwriter.BulkWriter;
import io.milvus.bulkwriter.LocalBulkWriterParam;
import io.milvus.bulkwriter.common.clientenum.BulkFileType;
import io.milvus.param.collection.CollectionSchemaParam;
import java.io.IOException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalBulkWriter
extends BulkWriter
implements AutoCloseable {
    private static final Logger logger = LoggerFactory.getLogger(LocalBulkWriter.class);
    protected String localPath;
    private String uuid;
    private int flushCount;
    private Map<String, Thread> workingThread;
    private ReentrantLock workingThreadLock;
    private List<List<String>> localFiles;
    private final Map<String, Object> config;

    public LocalBulkWriter(LocalBulkWriterParam bulkWriterParam) throws IOException {
        super(bulkWriterParam.getCollectionSchema(), bulkWriterParam.getChunkSize(), bulkWriterParam.getFileType());
        this.localPath = bulkWriterParam.getLocalPath();
        this.uuid = UUID.randomUUID().toString();
        this.workingThreadLock = new ReentrantLock();
        this.workingThread = new HashMap<String, Thread>();
        this.localFiles = Lists.newArrayList();
        this.config = bulkWriterParam.getConfig();
        this.makeDir();
    }

    protected LocalBulkWriter(CollectionSchemaParam collectionSchema, int chunkSize, BulkFileType fileType, String localPath, Map<String, Object> config) throws IOException {
        super(collectionSchema, chunkSize, fileType);
        this.localPath = localPath;
        this.uuid = UUID.randomUUID().toString();
        this.workingThreadLock = new ReentrantLock();
        this.workingThread = new HashMap<String, Thread>();
        this.localFiles = Lists.newArrayList();
        this.config = config;
        this.makeDir();
    }

    @Override
    public void appendRow(JsonObject rowData) throws IOException, InterruptedException {
        super.appendRow(rowData);
        this.workingThreadLock.lock();
        if (super.getBufferSize() > super.getChunkSize()) {
            this.commit(true);
        }
        this.workingThreadLock.unlock();
    }

    @Override
    public void commit(boolean async) throws InterruptedException {
        String msg;
        while (!this.workingThread.isEmpty()) {
            msg = String.format("Previous flush action is not finished, %s is waiting...", Thread.currentThread().getName());
            logger.info(msg);
            TimeUnit.SECONDS.sleep(5L);
        }
        msg = String.format("Prepare to flush buffer, row_count: %s, size: %s", super.getBufferRowCount(), super.getBufferSize());
        logger.info(msg);
        int bufferRowCount = this.getBufferRowCount();
        int bufferSize = this.getBufferSize();
        Runnable runnable = () -> this.flush(bufferSize, bufferRowCount);
        Thread thread = new Thread(runnable);
        logger.info("Flush thread begin, name: {}", (Object)thread.getName());
        this.workingThread.put(thread.getName(), thread);
        thread.start();
        if (!async) {
            logger.info("Wait flush to finish");
            thread.join();
        }
        super.commit(false);
        logger.info("Commit done with async={}", (Object)async);
    }

    private void flush(Integer bufferSize, Integer bufferRowCount) {
        ++this.flushCount;
        Path path = Paths.get(this.localPath, new String[0]);
        Path flushDirPath = path.resolve(String.valueOf(this.flushCount));
        HashMap<String, Object> config = new HashMap<String, Object>(this.config);
        config.put("bufferSize", bufferSize);
        config.put("bufferRowCount", bufferRowCount);
        Buffer oldBuffer = super.newBuffer();
        if (oldBuffer.getRowCount() > 0) {
            try {
                List<String> fileList = oldBuffer.persist(flushDirPath.toString(), config);
                this.localFiles.add(fileList);
                this.callBack(fileList);
            }
            catch (IOException e) {
                logger.error(e.getMessage());
            }
        }
        this.workingThread.remove(Thread.currentThread().getName());
        String msg = String.format("Flush thread done, name: %s", Thread.currentThread().getName());
        logger.info(msg);
    }

    protected void callBack(List<String> fileList) {
    }

    @Override
    protected String getDataPath() {
        return this.localPath;
    }

    public List<List<String>> getBatchFiles() {
        return this.localFiles;
    }

    private void makeDir() throws IOException {
        Path path = Paths.get(this.localPath, new String[0]);
        this.createDirIfNotExist(path);
        Path fullPath = path.resolve(this.uuid);
        this.createDirIfNotExist(fullPath);
        this.localPath = fullPath.toString();
    }

    private void createDirIfNotExist(Path path) throws IOException {
        try {
            Files.createDirectories(path, new FileAttribute[0]);
            logger.info("Data path created: {}", (Object)path);
        }
        catch (IOException e) {
            logger.error("Data Path create failed: {}", (Object)path);
            throw e;
        }
    }

    protected void exit() throws InterruptedException {
        this.workingThreadLock.lock();
        if (this.getBufferSize() != null && this.getBufferSize() != 0) {
            this.commit(true);
        }
        this.workingThreadLock.unlock();
        if (this.workingThread.size() > 0) {
            for (String key : this.workingThread.keySet()) {
                logger.info("Wait flush thread '{}' to finish", (Object)key);
                this.workingThread.get(key).join();
            }
        }
        this.rmDir();
    }

    private void rmDir() {
        try {
            Path path = Paths.get(this.localPath, new String[0]);
            if (Files.exists(path, new LinkOption[0]) && this.isDirectoryEmpty(path)) {
                Files.delete(path);
                logger.info("Delete local directory {}", (Object)this.localPath);
            }
        }
        catch (IOException e) {
            logger.error("Error while deleting directory: " + e.getMessage());
        }
    }

    private boolean isDirectoryEmpty(Path path) throws IOException {
        try (DirectoryStream<Path> dirStream = Files.newDirectoryStream(path);){
            boolean bl = !dirStream.iterator().hasNext();
            return bl;
        }
    }

    protected String getUUID() {
        return this.uuid;
    }

    @Override
    public void close() throws Exception {
        logger.info("execute remaining actions to prevent loss of memory data or residual empty directories.");
        this.exit();
        logger.info(String.format("LocalBulkWriter done! output local files: %s", this.getBatchFiles()));
    }
}

