/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.util.Collection;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamAggProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.processor.AbstractProcessor;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class KTablePassThrough<K, V>
implements KTableProcessorSupplier<K, V, V> {
    private final Collection<KStreamAggProcessorSupplier> parents;
    private final String storeName;

    KTablePassThrough(Collection<KStreamAggProcessorSupplier> parents, String storeName) {
        this.parents = parents;
        this.storeName = storeName;
    }

    @Override
    public Processor<K, Change<V>> get() {
        return new KTablePassThroughProcessor();
    }

    @Override
    public boolean enableSendingOldValues(boolean forceMaterialization) {
        for (KStreamAggProcessorSupplier parent : this.parents) {
            parent.enableSendingOldValues();
        }
        return true;
    }

    @Override
    public KTableValueGetterSupplier<K, V> view() {
        return new KTableValueGetterSupplier<K, V>(){

            @Override
            public KTableValueGetter<K, V> get() {
                return new KTablePassThroughValueGetter();
            }

            @Override
            public String[] storeNames() {
                return new String[]{KTablePassThrough.this.storeName};
            }
        };
    }

    private class KTablePassThroughValueGetter
    implements KTableValueGetter<K, V> {
        private TimestampedKeyValueStore<K, V> store;

        private KTablePassThroughValueGetter() {
        }

        @Override
        public void init(ProcessorContext context) {
            this.store = (TimestampedKeyValueStore)context.getStateStore(KTablePassThrough.this.storeName);
        }

        @Override
        public ValueAndTimestamp<V> get(K key) {
            return (ValueAndTimestamp)this.store.get(key);
        }
    }

    private class KTablePassThroughProcessor
    extends AbstractProcessor<K, Change<V>> {
        private KTablePassThroughProcessor() {
        }

        @Override
        public void process(K key, Change<V> value) {
            this.context().forward(key, value);
        }
    }
}

