/*
 * Decompiled with CFR 0.152.
 */
package io.druid.segment.realtime.plumber;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.common.guava.ThreadRenamingCallable;
import io.druid.java.util.common.DateTimes;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.concurrent.ScheduledExecutors;
import io.druid.java.util.common.granularity.Granularity;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.java.util.emitter.service.ServiceEmitter;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
import io.druid.segment.realtime.plumber.RealtimePlumber;
import io.druid.segment.realtime.plumber.Sink;
import io.druid.server.coordination.DataSegmentAnnouncer;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public class FlushingPlumber
extends RealtimePlumber {
    private static final EmittingLogger log = new EmittingLogger(FlushingPlumber.class);
    private final DataSchema schema;
    private final RealtimeTuningConfig config;
    private final Duration flushDuration;
    private volatile ScheduledExecutorService flushScheduledExec = null;
    private volatile boolean stopped = false;

    public FlushingPlumber(Duration flushDuration, DataSchema schema, RealtimeTuningConfig config, FireDepartmentMetrics metrics, ServiceEmitter emitter, QueryRunnerFactoryConglomerate conglomerate, DataSegmentAnnouncer segmentAnnouncer, ExecutorService queryExecutorService, IndexMerger indexMerger, IndexIO indexIO, Cache cache, CacheConfig cacheConfig, ObjectMapper objectMapper) {
        super(schema, config, metrics, emitter, conglomerate, segmentAnnouncer, queryExecutorService, null, null, null, indexMerger, indexIO, cache, cacheConfig, objectMapper);
        this.flushDuration = flushDuration;
        this.schema = schema;
        this.config = config;
    }

    @Override
    public Object startJob() {
        log.info("Starting job for %s", new Object[]{this.getSchema().getDataSource()});
        this.computeBaseDir(this.getSchema()).mkdirs();
        this.initializeExecutors();
        if (this.flushScheduledExec == null) {
            this.flushScheduledExec = Execs.scheduledSingleThreaded((String)"flushing_scheduled_%d");
        }
        Object retVal = this.bootstrapSinksFromDisk();
        this.startFlushThread();
        return retVal;
    }

    protected void flushAfterDuration(final long truncatedTime, final Sink sink) {
        log.info("Abandoning segment %s at %s", new Object[]{sink.getSegment().getIdentifier(), DateTimes.nowUtc().plusMillis((int)this.flushDuration.getMillis())});
        ScheduledExecutors.scheduleWithFixedDelay((ScheduledExecutorService)this.flushScheduledExec, (Duration)this.flushDuration, (Callable)new Callable<ScheduledExecutors.Signal>(){

            @Override
            public ScheduledExecutors.Signal call() throws Exception {
                log.info("Abandoning segment %s", new Object[]{sink.getSegment().getIdentifier()});
                FlushingPlumber.this.abandonSegment(truncatedTime, sink);
                return ScheduledExecutors.Signal.STOP;
            }
        });
    }

    private void startFlushThread() {
        final Granularity segmentGranularity = this.schema.getGranularitySpec().getSegmentGranularity();
        DateTime truncatedNow = segmentGranularity.bucketStart(DateTimes.nowUtc());
        final long windowMillis = this.config.getWindowPeriod().toStandardDuration().getMillis();
        log.info("Expect to run at [%s]", new Object[]{DateTimes.nowUtc().plus((ReadableDuration)new Duration(System.currentTimeMillis(), this.schema.getGranularitySpec().getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis))});
        String threadName = StringUtils.format((String)"%s-flusher-%d", (Object[])new Object[]{this.getSchema().getDataSource(), this.getConfig().getShardSpec().getPartitionNum()});
        ThreadRenamingCallable<ScheduledExecutors.Signal> threadRenamingCallable = new ThreadRenamingCallable<ScheduledExecutors.Signal>(threadName){

            public ScheduledExecutors.Signal doCall() {
                if (FlushingPlumber.this.stopped) {
                    log.info("Stopping flusher thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                long minTimestamp = segmentGranularity.bucketStart(FlushingPlumber.this.getRejectionPolicy().getCurrMaxTime().minus(windowMillis)).getMillis();
                ArrayList sinksToPush = Lists.newArrayList();
                for (Map.Entry<Long, Sink> entry : FlushingPlumber.this.getSinks().entrySet()) {
                    Long intervalStart = entry.getKey();
                    if (intervalStart >= minTimestamp) continue;
                    log.info("Adding entry[%s] to flush.", new Object[]{entry});
                    sinksToPush.add(entry);
                }
                for (Map.Entry<Long, Sink> entry : sinksToPush) {
                    FlushingPlumber.this.flushAfterDuration(entry.getKey(), entry.getValue());
                }
                if (FlushingPlumber.this.stopped) {
                    log.info("Stopping flusher thread", new Object[0]);
                    return ScheduledExecutors.Signal.STOP;
                }
                return ScheduledExecutors.Signal.REPEAT;
            }
        };
        Duration initialDelay = new Duration(System.currentTimeMillis(), this.schema.getGranularitySpec().getSegmentGranularity().increment(truncatedNow).getMillis() + windowMillis);
        Duration rate = new Duration((ReadableInstant)truncatedNow, (ReadableInstant)segmentGranularity.increment(truncatedNow));
        ScheduledExecutors.scheduleAtFixedRate((ScheduledExecutorService)this.flushScheduledExec, (Duration)initialDelay, (Duration)rate, (Callable)threadRenamingCallable);
    }

    @Override
    public void finishJob() {
        log.info("Stopping job", new Object[0]);
        for (Map.Entry<Long, Sink> entry : this.getSinks().entrySet()) {
            this.abandonSegment(entry.getKey(), entry.getValue());
        }
        this.shutdownExecutors();
        if (this.flushScheduledExec != null) {
            this.flushScheduledExec.shutdown();
        }
        this.stopped = true;
    }
}

