/*
 * Decompiled with CFR 0.152.
 */
package io.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Charsets;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;
import io.druid.client.CacheUtil;
import io.druid.client.DruidServer;
import io.druid.client.ServerView;
import io.druid.client.TimelineServerView;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
import io.druid.guice.annotations.BackgroundCaching;
import io.druid.guice.annotations.Smile;
import io.druid.java.util.common.Intervals;
import io.druid.java.util.common.Pair;
import io.druid.java.util.common.StringUtils;
import io.druid.java.util.common.concurrent.Execs;
import io.druid.java.util.common.guava.BaseSequence;
import io.druid.java.util.common.guava.LazySequence;
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.emitter.EmittingLogger;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryContexts;
import io.druid.query.QueryPlus;
import io.druid.query.QueryRunner;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.QueryToolChest;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.query.SegmentDescriptor;
import io.druid.query.aggregation.MetricManipulatorFns;
import io.druid.query.filter.DimFilter;
import io.druid.query.filter.DimFilterUtils;
import io.druid.query.spec.MultipleSpecificSegmentSpec;
import io.druid.query.spec.QuerySegmentSpec;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;
import io.druid.timeline.partition.PartitionHolder;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.codec.binary.Base64;
import org.joda.time.Interval;

public class CachingClusteredClient
implements QuerySegmentWalker {
    private static final EmittingLogger log = new EmittingLogger(CachingClusteredClient.class);
    private final QueryToolChestWarehouse warehouse;
    private final TimelineServerView serverView;
    private final Cache cache;
    private final ObjectMapper objectMapper;
    private final CacheConfig cacheConfig;
    private final ListeningExecutorService backgroundExecutorService;

    @Inject
    public CachingClusteredClient(QueryToolChestWarehouse warehouse, TimelineServerView serverView, Cache cache, @Smile ObjectMapper objectMapper, @BackgroundCaching ExecutorService backgroundExecutorService, CacheConfig cacheConfig) {
        this.warehouse = warehouse;
        this.serverView = serverView;
        this.cache = cache;
        this.objectMapper = objectMapper;
        this.cacheConfig = cacheConfig;
        this.backgroundExecutorService = MoreExecutors.listeningDecorator((ExecutorService)backgroundExecutorService);
        if (cacheConfig.isQueryCacheable("groupBy")) {
            log.warn("Even though groupBy caching is enabled, v2 groupBys will not be cached. Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching.", new Object[0]);
        }
        serverView.registerSegmentCallback(Execs.singleThreaded((String)"CCClient-ServerView-CB-%d"), new ServerView.BaseSegmentCallback(){

            @Override
            public ServerView.CallbackAction segmentRemoved(DruidServerMetadata server, DataSegment segment) {
                CachingClusteredClient.this.cache.close(segment.getIdentifier());
                return ServerView.CallbackAction.CONTINUE;
            }
        });
    }

    public <T> QueryRunner<T> getQueryRunnerForIntervals(Query<T> query, Iterable<Interval> intervals) {
        return new QueryRunner<T>(){

            public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext) {
                return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> timeline);
            }
        };
    }

    private <T> Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext, UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter) {
        return new SpecificQueryRunnable<T>(queryPlus, responseContext).run(timelineConverter);
    }

    public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, final Iterable<SegmentDescriptor> specs) {
        return new QueryRunner<T>(){

            public Sequence<T> run(QueryPlus<T> queryPlus, Map<String, Object> responseContext) {
                return CachingClusteredClient.this.run(queryPlus, responseContext, timeline -> {
                    VersionedIntervalTimeline timeline2 = new VersionedIntervalTimeline((Comparator)Ordering.natural());
                    for (SegmentDescriptor spec : specs) {
                        PartitionChunk chunk;
                        PartitionHolder entry = timeline.findEntry(spec.getInterval(), (Object)spec.getVersion());
                        if (entry == null || (chunk = entry.getChunk(spec.getPartitionNumber())) == null) continue;
                        timeline2.add(spec.getInterval(), (Object)spec.getVersion(), chunk);
                    }
                    return timeline2;
                });
            }
        };
    }

    private class CachePopulator {
        private final Cache cache;
        private final ObjectMapper mapper;
        private final Cache.NamedKey key;
        private final ConcurrentLinkedQueue<ListenableFuture<Object>> cacheFutures = new ConcurrentLinkedQueue();

        CachePopulator(Cache cache, ObjectMapper mapper, Cache.NamedKey key) {
            this.cache = cache;
            this.mapper = mapper;
            this.key = key;
        }

        public void populate() {
            Futures.addCallback((ListenableFuture)Futures.allAsList(this.cacheFutures), (FutureCallback)new FutureCallback<List<Object>>(){

                public void onSuccess(List<Object> cacheData) {
                    CacheUtil.populate(CachePopulator.this.cache, CachePopulator.this.mapper, CachePopulator.this.key, cacheData);
                    CachePopulator.this.cacheFutures.clear();
                }

                public void onFailure(Throwable throwable) {
                    log.error(throwable, "Background caching failed", new Object[0]);
                }
            }, (Executor)CachingClusteredClient.this.backgroundExecutorService);
        }
    }

    private static class ServerToSegment
    extends Pair<ServerSelector, SegmentDescriptor> {
        private ServerToSegment(ServerSelector server, SegmentDescriptor segment) {
            super((Object)server, (Object)segment);
        }

        ServerSelector getServer() {
            return (ServerSelector)this.lhs;
        }

        SegmentDescriptor getSegmentDescriptor() {
            return (SegmentDescriptor)this.rhs;
        }
    }

    private class SpecificQueryRunnable<T> {
        private final QueryPlus<T> queryPlus;
        private final Map<String, Object> responseContext;
        private final Query<T> query;
        private final QueryToolChest<T, Query<T>> toolChest;
        @Nullable
        private final CacheStrategy<T, Object, Query<T>> strategy;
        private final boolean useCache;
        private final boolean populateCache;
        private final boolean isBySegment;
        private final int uncoveredIntervalsLimit;
        private final Query<T> downstreamQuery;
        private final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();

        SpecificQueryRunnable(QueryPlus<T> queryPlus, Map<String, Object> responseContext) {
            this.queryPlus = queryPlus;
            this.responseContext = responseContext;
            this.query = queryPlus.getQuery();
            this.toolChest = CachingClusteredClient.this.warehouse.getToolChest(this.query);
            this.strategy = this.toolChest.getCacheStrategy(this.query);
            this.useCache = CacheUtil.useCacheOnBrokers(this.query, this.strategy, CachingClusteredClient.this.cacheConfig);
            this.populateCache = CacheUtil.populateCacheOnBrokers(this.query, this.strategy, CachingClusteredClient.this.cacheConfig);
            this.isBySegment = QueryContexts.isBySegment(this.query);
            this.uncoveredIntervalsLimit = QueryContexts.getUncoveredIntervalsLimit(this.query);
            this.downstreamQuery = this.query.withOverriddenContext(this.makeDownstreamQueryContext());
        }

        private ImmutableMap<String, Object> makeDownstreamQueryContext() {
            ImmutableMap.Builder contextBuilder = new ImmutableMap.Builder();
            int priority = QueryContexts.getPriority(this.query);
            contextBuilder.put((Object)"priority", (Object)priority);
            if (this.populateCache) {
                contextBuilder.put((Object)"populateCache", (Object)false);
                contextBuilder.put((Object)"bySegment", (Object)true);
            }
            return contextBuilder.build();
        }

        Sequence<T> run(UnaryOperator<TimelineLookup<String, ServerSelector>> timelineConverter) {
            TimelineLookup timeline = CachingClusteredClient.this.serverView.getTimeline(this.query.getDataSource());
            if (timeline == null) {
                return Sequences.empty();
            }
            timeline = (TimelineLookup)timelineConverter.apply(timeline);
            if (this.uncoveredIntervalsLimit > 0) {
                this.computeUncoveredIntervals((TimelineLookup<String, ServerSelector>)timeline);
            }
            Set<ServerToSegment> segments = this.computeSegmentsToQuery((TimelineLookup<String, ServerSelector>)timeline);
            byte[] queryCacheKey = this.computeQueryCacheKey();
            if (this.query.getContext().get("If-None-Match") != null) {
                String prevEtag = (String)this.query.getContext().get("If-None-Match");
                String currentEtag = this.computeCurrentEtag(segments, queryCacheKey);
                if (currentEtag != null && currentEtag.equals(prevEtag)) {
                    return Sequences.empty();
                }
            }
            List<Pair<Interval, byte[]>> alreadyCachedResults = this.pruneSegmentsWithCachedResults(queryCacheKey, segments);
            SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer = this.groupSegmentsByServer(segments);
            return new LazySequence(() -> {
                ArrayList<Sequence<T>> sequencesByInterval = new ArrayList<Sequence<T>>(alreadyCachedResults.size() + segmentsByServer.size());
                this.addSequencesFromCache(sequencesByInterval, alreadyCachedResults);
                this.addSequencesFromServer(sequencesByInterval, segmentsByServer);
                return Sequences.simple(sequencesByInterval).flatMerge(seq -> seq, this.query.getResultOrdering());
            });
        }

        private Set<ServerToSegment> computeSegmentsToQuery(TimelineLookup<String, ServerSelector> timeline) {
            List serversLookup = this.toolChest.filterSegments(this.query, this.query.getIntervals().stream().flatMap(i -> timeline.lookup(i).stream()).collect(Collectors.toList()));
            LinkedHashSet segments = Sets.newLinkedHashSet();
            HashMap dimensionRangeCache = Maps.newHashMap();
            for (TimelineObjectHolder holder : serversLookup) {
                Set filteredChunks = DimFilterUtils.filterShards((DimFilter)this.query.getFilter(), (Iterable)holder.getObject(), partitionChunk -> ((ServerSelector)partitionChunk.getObject()).getSegment().getShardSpec(), (Map)dimensionRangeCache);
                for (PartitionChunk chunk : filteredChunks) {
                    ServerSelector server = (ServerSelector)chunk.getObject();
                    SegmentDescriptor segment = new SegmentDescriptor(holder.getInterval(), (String)holder.getVersion(), chunk.getChunkNumber());
                    segments.add(new ServerToSegment(server, segment));
                }
            }
            return segments;
        }

        private void computeUncoveredIntervals(TimelineLookup<String, ServerSelector> timeline) {
            ArrayList<Interval> uncoveredIntervals = new ArrayList<Interval>(this.uncoveredIntervalsLimit);
            boolean uncoveredIntervalsOverflowed = false;
            for (Interval interval : this.query.getIntervals()) {
                List lookup = timeline.lookup(interval);
                long startMillis = interval.getStartMillis();
                long endMillis = interval.getEndMillis();
                for (TimelineObjectHolder holder : lookup) {
                    Interval holderInterval = holder.getInterval();
                    long intervalStart = holderInterval.getStartMillis();
                    if (!uncoveredIntervalsOverflowed && startMillis != intervalStart) {
                        if (this.uncoveredIntervalsLimit > uncoveredIntervals.size()) {
                            uncoveredIntervals.add(Intervals.utc((long)startMillis, (long)intervalStart));
                        } else {
                            uncoveredIntervalsOverflowed = true;
                        }
                    }
                    startMillis = holderInterval.getEndMillis();
                }
                if (uncoveredIntervalsOverflowed || startMillis >= endMillis) continue;
                if (this.uncoveredIntervalsLimit > uncoveredIntervals.size()) {
                    uncoveredIntervals.add(Intervals.utc((long)startMillis, (long)endMillis));
                    continue;
                }
                uncoveredIntervalsOverflowed = true;
            }
            if (!uncoveredIntervals.isEmpty()) {
                this.responseContext.put("uncoveredIntervals", uncoveredIntervals);
                this.responseContext.put("uncoveredIntervalsOverflowed", uncoveredIntervalsOverflowed);
            }
        }

        @Nullable
        private byte[] computeQueryCacheKey() {
            if ((this.populateCache || this.useCache) && !this.isBySegment) {
                assert (this.strategy != null);
                return this.strategy.computeCacheKey(this.query);
            }
            return null;
        }

        @Nullable
        private String computeCurrentEtag(Set<ServerToSegment> segments, @Nullable byte[] queryCacheKey) {
            Hasher hasher = Hashing.sha1().newHasher();
            boolean hasOnlyHistoricalSegments = true;
            for (ServerToSegment p : segments) {
                if (!p.getServer().pick().getServer().segmentReplicatable()) {
                    hasOnlyHistoricalSegments = false;
                    break;
                }
                hasher.putString((CharSequence)p.getServer().getSegment().getIdentifier(), Charsets.UTF_8);
            }
            if (hasOnlyHistoricalSegments) {
                hasher.putBytes(queryCacheKey == null ? this.strategy.computeCacheKey(this.query) : queryCacheKey);
                String currEtag = Base64.encodeBase64String((byte[])hasher.hash().asBytes());
                this.responseContext.put("ETag", currEtag);
                return currEtag;
            }
            return null;
        }

        private List<Pair<Interval, byte[]>> pruneSegmentsWithCachedResults(byte[] queryCacheKey, Set<ServerToSegment> segments) {
            if (queryCacheKey == null) {
                return Collections.emptyList();
            }
            ArrayList alreadyCachedResults = Lists.newArrayList();
            Map<ServerToSegment, Cache.NamedKey> perSegmentCacheKeys = this.computePerSegmentCacheKeys(segments, queryCacheKey);
            Map<Cache.NamedKey, byte[]> cachedValues = this.computeCachedValues(perSegmentCacheKeys);
            perSegmentCacheKeys.forEach((segment, segmentCacheKey) -> {
                Interval segmentQueryInterval = segment.getSegmentDescriptor().getInterval();
                byte[] cachedValue = (byte[])cachedValues.get(segmentCacheKey);
                if (cachedValue != null) {
                    segments.remove(segment);
                    alreadyCachedResults.add(Pair.of((Object)segmentQueryInterval, (Object)cachedValue));
                } else if (this.populateCache) {
                    String segmentIdentifier = segment.getServer().getSegment().getIdentifier();
                    this.addCachePopulator((Cache.NamedKey)segmentCacheKey, segmentIdentifier, segmentQueryInterval);
                }
            });
            return alreadyCachedResults;
        }

        private Map<ServerToSegment, Cache.NamedKey> computePerSegmentCacheKeys(Set<ServerToSegment> segments, byte[] queryCacheKey) {
            LinkedHashMap cacheKeys = Maps.newLinkedHashMap();
            for (ServerToSegment serverToSegment : segments) {
                Cache.NamedKey segmentCacheKey = CacheUtil.computeSegmentCacheKey(serverToSegment.getServer().getSegment().getIdentifier(), serverToSegment.getSegmentDescriptor(), queryCacheKey);
                cacheKeys.put(serverToSegment, segmentCacheKey);
            }
            return cacheKeys;
        }

        private Map<Cache.NamedKey, byte[]> computeCachedValues(Map<ServerToSegment, Cache.NamedKey> cacheKeys) {
            if (this.useCache) {
                return CachingClusteredClient.this.cache.getBulk(Iterables.limit(cacheKeys.values(), (int)CachingClusteredClient.this.cacheConfig.getCacheBulkMergeLimit()));
            }
            return ImmutableMap.of();
        }

        private void addCachePopulator(Cache.NamedKey segmentCacheKey, String segmentIdentifier, Interval segmentQueryInterval) {
            this.cachePopulatorMap.put(StringUtils.format((String)"%s_%s", (Object[])new Object[]{segmentIdentifier, segmentQueryInterval}), new CachePopulator(CachingClusteredClient.this.cache, CachingClusteredClient.this.objectMapper, segmentCacheKey));
        }

        @Nullable
        private CachePopulator getCachePopulator(String segmentId, Interval segmentInterval) {
            return this.cachePopulatorMap.get(StringUtils.format((String)"%s_%s", (Object[])new Object[]{segmentId, segmentInterval}));
        }

        private SortedMap<DruidServer, List<SegmentDescriptor>> groupSegmentsByServer(Set<ServerToSegment> segments) {
            TreeMap serverSegments = Maps.newTreeMap();
            for (ServerToSegment serverToSegment : segments) {
                QueryableDruidServer queryableDruidServer = serverToSegment.getServer().pick();
                if (queryableDruidServer == null) {
                    log.makeAlert("No servers found for SegmentDescriptor[%s] for DataSource[%s]?! How can this be?!", new Object[]{serverToSegment.getSegmentDescriptor(), this.query.getDataSource()}).emit();
                    continue;
                }
                DruidServer server = queryableDruidServer.getServer();
                serverSegments.computeIfAbsent(server, s -> new ArrayList()).add(serverToSegment.getSegmentDescriptor());
            }
            return serverSegments;
        }

        private void addSequencesFromCache(List<Sequence<T>> listOfSequences, List<Pair<Interval, byte[]>> cachedResults) {
            if (this.strategy == null) {
                return;
            }
            Function pullFromCacheFunction = this.strategy.pullFromCache();
            final TypeReference cacheObjectClazz = this.strategy.getCacheObjectClazz();
            for (Pair<Interval, byte[]> cachedResultPair : cachedResults) {
                final byte[] cachedResult = (byte[])cachedResultPair.rhs;
                BaseSequence cachedSequence = new BaseSequence((BaseSequence.IteratorMaker)new BaseSequence.IteratorMaker<Object, Iterator<Object>>(){

                    public Iterator<Object> make() {
                        try {
                            if (cachedResult.length == 0) {
                                return Iterators.emptyIterator();
                            }
                            return CachingClusteredClient.this.objectMapper.readValues(CachingClusteredClient.this.objectMapper.getFactory().createParser(cachedResult), cacheObjectClazz);
                        }
                        catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    }

                    public void cleanup(Iterator<Object> iterFromMake) {
                    }
                });
                listOfSequences.add(Sequences.map((Sequence)cachedSequence, (Function)pullFromCacheFunction));
            }
        }

        private void addSequencesFromServer(List<Sequence<T>> listOfSequences, SortedMap<DruidServer, List<SegmentDescriptor>> segmentsByServer) {
            segmentsByServer.forEach((server, segmentsOfServer) -> {
                QueryRunner serverRunner = CachingClusteredClient.this.serverView.getQueryRunner((DruidServer)server);
                if (serverRunner == null) {
                    log.error("Server[%s] doesn't have a query runner", new Object[]{server});
                    return;
                }
                MultipleSpecificSegmentSpec segmentsOfServerSpec = new MultipleSpecificSegmentSpec(segmentsOfServer);
                Sequence<T> serverResults = this.isBySegment ? this.getBySegmentServerResults(serverRunner, segmentsOfServerSpec) : (!server.segmentReplicatable() || !this.populateCache ? this.getSimpleServerResults(serverRunner, segmentsOfServerSpec) : this.getAndCacheServerResults(serverRunner, segmentsOfServerSpec));
                listOfSequences.add(serverResults);
            });
        }

        private Sequence<T> getBySegmentServerResults(QueryRunner serverRunner, MultipleSpecificSegmentSpec segmentsOfServerSpec) {
            Sequence resultsBySegments = serverRunner.run(this.queryPlus.withQuerySegmentSpec((QuerySegmentSpec)segmentsOfServerSpec), this.responseContext);
            return resultsBySegments.map(result -> result.map(resultsOfSegment -> resultsOfSegment.mapResults(arg_0 -> ((Function)this.toolChest.makePreComputeManipulatorFn(this.query, MetricManipulatorFns.deserializing())).apply(arg_0))));
        }

        private Sequence<T> getSimpleServerResults(QueryRunner serverRunner, MultipleSpecificSegmentSpec segmentsOfServerSpec) {
            return serverRunner.run(this.queryPlus.withQuerySegmentSpec((QuerySegmentSpec)segmentsOfServerSpec), this.responseContext);
        }

        private Sequence<T> getAndCacheServerResults(QueryRunner serverRunner, MultipleSpecificSegmentSpec segmentsOfServerSpec) {
            Sequence resultsBySegments = serverRunner.run(this.queryPlus.withQuery(this.downstreamQuery).withQuerySegmentSpec((QuerySegmentSpec)segmentsOfServerSpec), this.responseContext);
            Function cacheFn = this.strategy.prepareForCache();
            return resultsBySegments.map(result -> {
                BySegmentResultValueClass resultsOfSegment = (BySegmentResultValueClass)result.getValue();
                CachePopulator cachePopulator = this.getCachePopulator(resultsOfSegment.getSegmentId(), resultsOfSegment.getInterval());
                Sequence res = Sequences.simple((Iterable)resultsOfSegment.getResults()).map(r -> {
                    if (cachePopulator != null) {
                        cachePopulator.cacheFutures.add(CachingClusteredClient.this.backgroundExecutorService.submit(() -> cacheFn.apply(r)));
                    }
                    return r;
                }).map(arg_0 -> ((Function)this.toolChest.makePreComputeManipulatorFn(this.downstreamQuery, MetricManipulatorFns.deserializing())).apply(arg_0));
                if (cachePopulator != null) {
                    res = res.withEffect(cachePopulator::populate, (Executor)MoreExecutors.sameThreadExecutor());
                }
                return res;
            }).flatMerge(seq -> seq, this.query.getResultOrdering());
        }
    }
}

