/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.coordinator.helper;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import it.unimi.dsi.fastutil.objects.Object2LongMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.druid.client.indexing.ClientCompactQuery;
import org.apache.druid.client.indexing.ClientCompactQueryTuningConfig;
import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.TaskPayloadResponse;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorStats;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.DruidCoordinatorRuntimeParams;
import org.apache.druid.server.coordinator.helper.CompactionSegmentIterator;
import org.apache.druid.server.coordinator.helper.CompactionSegmentSearchPolicy;
import org.apache.druid.server.coordinator.helper.DruidCoordinatorHelper;
import org.apache.druid.server.coordinator.helper.NewestSegmentFirstPolicy;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.VersionedIntervalTimeline;
import org.joda.time.Interval;

public class DruidCoordinatorSegmentCompactor
implements DruidCoordinatorHelper {
    static final String COMPACT_TASK_COUNT = "compactTaskCount";
    static final String SEGMENT_SIZE_WAIT_COMPACT = "segmentSizeWaitCompact";
    private static final String COMPACT_TASK_TYPE = "compact";
    private static final String STORE_COMPACTION_STATE_KEY = "storeCompactionState";
    private static final Logger LOG = new Logger(DruidCoordinatorSegmentCompactor.class);
    private final CompactionSegmentSearchPolicy policy;
    private final IndexingServiceClient indexingServiceClient;
    private Object2LongMap<String> remainingSegmentSizeBytes;

    @Inject
    public DruidCoordinatorSegmentCompactor(ObjectMapper objectMapper, IndexingServiceClient indexingServiceClient) {
        this.policy = new NewestSegmentFirstPolicy(objectMapper);
        this.indexingServiceClient = indexingServiceClient;
    }

    @Override
    public DruidCoordinatorRuntimeParams run(DruidCoordinatorRuntimeParams params) {
        LOG.info("Run coordinator segment compactor", new Object[0]);
        CoordinatorCompactionConfig dynamicConfig = params.getCoordinatorCompactionConfig();
        CoordinatorStats stats = new CoordinatorStats();
        if (dynamicConfig.getMaxCompactionTaskSlots() > 0) {
            Map<String, VersionedIntervalTimeline<String, DataSegment>> dataSources = params.getUsedSegmentsTimelinesPerDataSource();
            List<DataSourceCompactionConfig> compactionConfigList = dynamicConfig.getCompactionConfigs();
            if (compactionConfigList != null && !compactionConfigList.isEmpty()) {
                Map<String, DataSourceCompactionConfig> compactionConfigs = compactionConfigList.stream().collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
                List<TaskStatusPlus> compactTasks = DruidCoordinatorSegmentCompactor.filterNonCompactTasks(this.indexingServiceClient.getActiveTasks());
                HashMap<String, List<Interval>> compactTaskIntervals = new HashMap<String, List<Interval>>(compactionConfigList.size());
                int numEstimatedNonCompleteCompactionTasks = 0;
                for (TaskStatusPlus status : compactTasks) {
                    TaskPayloadResponse response = this.indexingServiceClient.getTaskPayload(status.getId());
                    if (response == null) {
                        throw new ISE("WTH? got a null paylord from overlord for task[%s]", new Object[]{status.getId()});
                    }
                    if (COMPACT_TASK_TYPE.equals(response.getPayload().getType())) {
                        ClientCompactQuery compactQuery = (ClientCompactQuery)response.getPayload();
                        Interval interval = compactQuery.getIoConfig().getInputSpec().getInterval();
                        compactTaskIntervals.computeIfAbsent(status.getDataSource(), k -> new ArrayList()).add(interval);
                        int numSubTasks = this.findNumMaxConcurrentSubTasks(compactQuery.getTuningConfig());
                        numEstimatedNonCompleteCompactionTasks += numSubTasks + 1;
                        continue;
                    }
                    throw new ISE("WTH? task[%s] is not a compactTask?", new Object[]{status.getId()});
                }
                CompactionSegmentIterator iterator = this.policy.reset(compactionConfigs, dataSources, compactTaskIntervals);
                int compactionTaskCapacity = (int)Math.min((double)this.indexingServiceClient.getTotalWorkerCapacity() * dynamicConfig.getCompactionTaskSlotRatio(), (double)dynamicConfig.getMaxCompactionTaskSlots());
                int numAvailableCompactionTaskSlots = numEstimatedNonCompleteCompactionTasks > 0 ? Math.max(0, compactionTaskCapacity - numEstimatedNonCompleteCompactionTasks) : Math.max(1, compactionTaskCapacity);
                LOG.info("Found [%d] available task slots for compaction out of [%d] max compaction task capacity", new Object[]{numAvailableCompactionTaskSlots, compactionTaskCapacity});
                if (numAvailableCompactionTaskSlots > 0) {
                    stats.accumulate(this.doRun(compactionConfigs, numAvailableCompactionTaskSlots, iterator));
                } else {
                    stats.accumulate(this.makeStats(0, iterator));
                }
            } else {
                LOG.info("compactionConfig is empty. Skip.", new Object[0]);
            }
        } else {
            LOG.info("maxCompactionTaskSlots was set to 0. Skip compaction", new Object[0]);
        }
        return params.buildFromExisting().withCoordinatorStats(stats).build();
    }

    private int findNumMaxConcurrentSubTasks(@Nullable ClientCompactQueryTuningConfig tuningConfig) {
        if (tuningConfig != null && tuningConfig.getMaxNumConcurrentSubTasks() != null) {
            return tuningConfig.getMaxNumConcurrentSubTasks();
        }
        return 0;
    }

    private static List<TaskStatusPlus> filterNonCompactTasks(List<TaskStatusPlus> taskStatuses) {
        return taskStatuses.stream().filter(status -> {
            String taskType = status.getType();
            return taskType == null || COMPACT_TASK_TYPE.equals(taskType);
        }).collect(Collectors.toList());
    }

    private CoordinatorStats doRun(Map<String, DataSourceCompactionConfig> compactionConfigs, int numAvailableCompactionTaskSlots, CompactionSegmentIterator iterator) {
        int numSubmittedTasks;
        DataSourceCompactionConfig config;
        for (numSubmittedTasks = 0; iterator.hasNext() && numSubmittedTasks < numAvailableCompactionTaskSlots; numSubmittedTasks += this.findNumMaxConcurrentSubTasks(config.getTuningConfig()) + 1) {
            List segmentsToCompact = (List)iterator.next();
            if (!segmentsToCompact.isEmpty()) {
                String dataSourceName = ((DataSegment)segmentsToCompact.get(0)).getDataSource();
                config = compactionConfigs.get(dataSourceName);
                String taskId = this.indexingServiceClient.compactSegments(segmentsToCompact, config.getTaskPriority(), ClientCompactQueryTuningConfig.from(config.getTuningConfig(), config.getMaxRowsPerSegment()), this.newAutoCompactionContext(config.getTaskContext()));
                LOG.info("Submitted a compactTask[%s] for segments %s", new Object[]{taskId, Iterables.transform((Iterable)segmentsToCompact, DataSegment::getId)});
                continue;
            }
            throw new ISE("segmentsToCompact is empty?", new Object[0]);
        }
        return this.makeStats(numSubmittedTasks, iterator);
    }

    private Map<String, Object> newAutoCompactionContext(@Nullable Map<String, Object> configuredContext) {
        HashMap<String, Object> newContext = configuredContext == null ? new HashMap<String, Object>() : new HashMap<String, Object>(configuredContext);
        newContext.put(STORE_COMPACTION_STATE_KEY, true);
        return newContext;
    }

    private CoordinatorStats makeStats(int numCompactionTasks, CompactionSegmentIterator iterator) {
        CoordinatorStats stats = new CoordinatorStats();
        stats.addToGlobalStat(COMPACT_TASK_COUNT, numCompactionTasks);
        this.remainingSegmentSizeBytes = iterator.remainingSegmentSizeBytes();
        iterator.remainingSegmentSizeBytes().object2LongEntrySet().fastForEach(entry -> {
            String dataSource = (String)entry.getKey();
            long numSegmentsWaitCompact = entry.getLongValue();
            stats.addToDataSourceStat(SEGMENT_SIZE_WAIT_COMPACT, dataSource, numSegmentsWaitCompact);
        });
        return stats;
    }

    public long getRemainingSegmentSizeBytes(String dataSource) {
        return this.remainingSegmentSizeBytes.getLong((Object)dataSource);
    }
}

