/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.LinkedHashSet;
import java.util.Optional;
import java.util.Set;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.ValueJoinerWithKey;
import org.apache.kafka.streams.kstream.internals.JoinWindowsInternal;
import org.apache.kafka.streams.kstream.internals.KStreamImplJoin;
import org.apache.kafka.streams.kstream.internals.StreamStreamJoinUtil;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.streams.state.internals.LeftOrRightValue;
import org.apache.kafka.streams.state.internals.TimestampedKeyAndJoinSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

abstract class KStreamKStreamJoin<K, VLeft, VRight, VOut, VThis, VOther>
implements ProcessorSupplier<K, VThis, K, VOut> {
    private static final Logger LOG = LoggerFactory.getLogger(KStreamKStreamJoin.class);
    private final StoreFactory otherWindowStoreFactory;
    private final long joinBeforeMs;
    private final long joinAfterMs;
    private final long joinGraceMs;
    private final boolean enableSpuriousResultFix;
    private final long windowsBeforeMs;
    private final long windowsAfterMs;
    private final boolean outer;
    private final Optional<StoreFactory> outerJoinWindowStoreFactory;
    private final ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner;
    private final KStreamImplJoin.TimeTrackerSupplier sharedTimeTrackerSupplier;

    KStreamKStreamJoin(JoinWindowsInternal windows, ValueJoinerWithKey<? super K, ? super VThis, ? super VOther, ? extends VOut> joiner, boolean outer, long joinBeforeMs, long joinAfterMs, KStreamImplJoin.TimeTrackerSupplier sharedTimeTrackerSupplier, StoreFactory otherWindowStoreFactory, Optional<StoreFactory> outerJoinWindowStoreFactory) {
        this.otherWindowStoreFactory = otherWindowStoreFactory;
        this.joinBeforeMs = joinBeforeMs;
        this.joinAfterMs = joinAfterMs;
        this.windowsAfterMs = windows.afterMs;
        this.windowsBeforeMs = windows.beforeMs;
        this.joinGraceMs = windows.gracePeriodMs();
        this.enableSpuriousResultFix = windows.spuriousResultFixEnabled();
        this.joiner = joiner;
        this.outer = outer;
        this.outerJoinWindowStoreFactory = outerJoinWindowStoreFactory;
        this.sharedTimeTrackerSupplier = sharedTimeTrackerSupplier;
    }

    @Override
    public Set<StoreBuilder<?>> stores() {
        LinkedHashSet stores = new LinkedHashSet();
        stores.add(new StoreFactory.FactoryWrappingStoreBuilder(this.otherWindowStoreFactory));
        if (this.outerJoinWindowStoreFactory.isPresent() && this.enableSpuriousResultFix) {
            stores.add(new StoreFactory.FactoryWrappingStoreBuilder(this.outerJoinWindowStoreFactory.get()));
        }
        return stores;
    }

    protected abstract class KStreamKStreamJoinProcessor
    extends ContextualProcessor<K, VThis, K, VOut> {
        private WindowStore<K, VOther> otherWindowStore;
        private Sensor droppedRecordsSensor;
        private Optional<KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>>> outerJoinStore = Optional.empty();
        private InternalProcessorContext<K, VOut> internalProcessorContext;
        private KStreamImplJoin.TimeTracker sharedTimeTracker;

        protected KStreamKStreamJoinProcessor() {
        }

        @Override
        public void init(ProcessorContext<K, VOut> context) {
            super.init(context);
            this.internalProcessorContext = (InternalProcessorContext)context;
            StreamsMetricsImpl metrics = (StreamsMetricsImpl)context.metrics();
            this.droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(Thread.currentThread().getName(), context.taskId().toString(), metrics);
            this.otherWindowStore = (WindowStore)context.getStateStore(KStreamKStreamJoin.this.otherWindowStoreFactory.storeName());
            this.sharedTimeTracker = KStreamKStreamJoin.this.sharedTimeTrackerSupplier.get(context.taskId());
            if (KStreamKStreamJoin.this.enableSpuriousResultFix) {
                this.outerJoinStore = KStreamKStreamJoin.this.outerJoinWindowStoreFactory.map(s -> (KeyValueStore)context.getStateStore(s.storeName()));
                this.sharedTimeTracker.setEmitInterval(StreamsConfig.InternalConfig.getLong(context.appConfigs(), "__emit.interval.ms.kstreams.outer.join.spurious.results.fix__", 1000L));
            }
        }

        @Override
        public void process(Record<K, VThis> record) {
            long inputRecordTimestamp = record.timestamp();
            this.sharedTimeTracker.advanceStreamTime(inputRecordTimestamp);
            if (KStreamKStreamJoin.this.outer && record.key() == null && record.value() != null) {
                this.context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), null)));
                return;
            }
            if (StreamStreamJoinUtil.skipRecord(record, LOG, this.droppedRecordsSensor, this.context())) {
                return;
            }
            if (inputRecordTimestamp == this.sharedTimeTracker.streamTime) {
                this.outerJoinStore.ifPresent(store -> this.emitNonJoinedOuterRecords((KeyValueStore)store, record));
            }
            long timeFrom = Math.max(0L, inputRecordTimestamp - KStreamKStreamJoin.this.joinBeforeMs);
            long timeTo = Math.max(0L, inputRecordTimestamp + KStreamKStreamJoin.this.joinAfterMs);
            try (WindowStoreIterator iter = this.otherWindowStore.fetch(record.key(), timeFrom, timeTo);){
                boolean needOuterJoin = KStreamKStreamJoin.this.outer && !iter.hasNext();
                iter.forEachRemaining(otherRecord -> this.emitInnerJoin(record, (KeyValue)otherRecord, inputRecordTimestamp));
                if (needOuterJoin) {
                    if (!this.outerJoinStore.isPresent() || timeTo < this.sharedTimeTracker.streamTime) {
                        this.context().forward(record.withValue(KStreamKStreamJoin.this.joiner.apply(record.key(), record.value(), null)));
                    } else {
                        this.sharedTimeTracker.updatedMinTime(inputRecordTimestamp);
                        this.putInOuterJoinStore(record);
                    }
                }
            }
        }

        protected abstract TimestampedKeyAndJoinSide<K> makeThisKey(K var1, long var2);

        protected abstract LeftOrRightValue<VLeft, VRight> makeThisValue(VThis var1);

        protected abstract TimestampedKeyAndJoinSide<K> makeOtherKey(K var1, long var2);

        protected abstract VThis thisValue(LeftOrRightValue<? extends VLeft, ? extends VRight> var1);

        protected abstract VOther otherValue(LeftOrRightValue<? extends VLeft, ? extends VRight> var1);

        private void emitNonJoinedOuterRecords(KeyValueStore<TimestampedKeyAndJoinSide<K>, LeftOrRightValue<VLeft, VRight>> store, Record<K, VThis> record) {
            if (this.sharedTimeTracker.minTime + KStreamKStreamJoin.this.joinAfterMs + KStreamKStreamJoin.this.joinGraceMs >= this.sharedTimeTracker.streamTime) {
                return;
            }
            if (this.internalProcessorContext.currentSystemTimeMs() < this.sharedTimeTracker.nextTimeToEmit) {
                return;
            }
            this.sharedTimeTracker.nextTimeToEmit = this.internalProcessorContext.currentSystemTimeMs();
            this.sharedTimeTracker.advanceNextTimeToEmit();
            this.sharedTimeTracker.minTime = Long.MAX_VALUE;
            try (KeyValueIterator it = store.all();){
                TimestampedKeyAndJoinSide prevKey = null;
                boolean outerJoinLeftWindowOpen = false;
                boolean outerJoinRightWindowOpen = false;
                while (it.hasNext()) {
                    KeyValue nextKeyValue = (KeyValue)it.next();
                    TimestampedKeyAndJoinSide timestampedKeyAndJoinSide = (TimestampedKeyAndJoinSide)nextKeyValue.key;
                    this.sharedTimeTracker.minTime = timestampedKeyAndJoinSide.timestamp();
                    if (outerJoinLeftWindowOpen && outerJoinRightWindowOpen) break;
                    if (this.isOuterJoinWindowOpen(timestampedKeyAndJoinSide)) {
                        if (timestampedKeyAndJoinSide.isLeftSide()) {
                            outerJoinLeftWindowOpen = true;
                            continue;
                        }
                        outerJoinRightWindowOpen = true;
                        continue;
                    }
                    LeftOrRightValue leftOrRightValue = (LeftOrRightValue)nextKeyValue.value;
                    this.forwardNonJoinedOuterRecords(record, timestampedKeyAndJoinSide, leftOrRightValue);
                    if (prevKey != null && !prevKey.equals(timestampedKeyAndJoinSide)) {
                        store.put(prevKey, null);
                    }
                    prevKey = timestampedKeyAndJoinSide;
                }
                if (prevKey != null) {
                    store.put(prevKey, null);
                }
            }
        }

        private void forwardNonJoinedOuterRecords(Record<K, VThis> record, TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide, LeftOrRightValue<VLeft, VRight> leftOrRightValue) {
            Object key = timestampedKeyAndJoinSide.key();
            long timestamp = timestampedKeyAndJoinSide.timestamp();
            Object thisValue = this.thisValue(leftOrRightValue);
            Object otherValue = this.otherValue(leftOrRightValue);
            Object nullJoinedValue = KStreamKStreamJoin.this.joiner.apply(key, thisValue, otherValue);
            this.context().forward(record.withKey(key).withValue(nullJoinedValue).withTimestamp(timestamp));
        }

        private boolean isOuterJoinWindowOpen(TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
            long outerJoinLookBackTimeMs = this.getOuterJoinLookBackTimeMs(timestampedKeyAndJoinSide);
            return this.sharedTimeTracker.minTime + outerJoinLookBackTimeMs + KStreamKStreamJoin.this.joinGraceMs >= this.sharedTimeTracker.streamTime;
        }

        private long getOuterJoinLookBackTimeMs(TimestampedKeyAndJoinSide<K> timestampedKeyAndJoinSide) {
            if (timestampedKeyAndJoinSide.isLeftSide()) {
                return KStreamKStreamJoin.this.windowsAfterMs;
            }
            return KStreamKStreamJoin.this.windowsBeforeMs;
        }

        private void emitInnerJoin(Record<K, VThis> thisRecord, KeyValue<Long, VOther> otherRecord, long inputRecordTimestamp) {
            this.outerJoinStore.ifPresent(store -> {
                TimestampedKeyAndJoinSide otherKey = this.makeOtherKey(thisRecord.key(), (Long)otherRecord.key);
                store.putIfAbsent(otherKey, null);
            });
            this.context().forward(thisRecord.withValue(KStreamKStreamJoin.this.joiner.apply(thisRecord.key(), thisRecord.value(), otherRecord.value)).withTimestamp(Math.max(inputRecordTimestamp, (Long)otherRecord.key)));
        }

        private void putInOuterJoinStore(Record<K, VThis> thisRecord) {
            this.outerJoinStore.ifPresent(store -> {
                TimestampedKeyAndJoinSide thisKey = this.makeThisKey(thisRecord.key(), thisRecord.timestamp());
                LeftOrRightValue thisValue = this.makeThisValue(thisRecord.value());
                store.put(thisKey, thisValue);
            });
        }

        @Override
        public void close() {
            KStreamKStreamJoin.this.sharedTimeTrackerSupplier.remove(this.context().taskId());
        }
    }
}

