/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;

import java.util.Objects;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.CombinedKey;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionResponseWrapper;
import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.ValueAndTimestamp;

public class SubscriptionJoinForeignProcessorSupplier<K, KO, VO>
implements ProcessorSupplier<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> {
    private final KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier;

    public SubscriptionJoinForeignProcessorSupplier(KTableValueGetterSupplier<KO, VO> foreignValueGetterSupplier) {
        this.foreignValueGetterSupplier = foreignValueGetterSupplier;
    }

    @Override
    public Processor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>> get() {
        return new ContextualProcessor<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>, K, SubscriptionResponseWrapper<VO>>(){
            private KTableValueGetter<KO, VO> foreignValues;

            @Override
            public void init(ProcessorContext<K, SubscriptionResponseWrapper<VO>> context) {
                super.init(context);
                this.foreignValues = SubscriptionJoinForeignProcessorSupplier.this.foreignValueGetterSupplier.get();
                this.foreignValues.init(context);
            }

            @Override
            public void process(Record<CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> record) {
                Objects.requireNonNull(record.key(), "This processor should never see a null key.");
                Objects.requireNonNull(record.value(), "This processor should never see a null value.");
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp)record.value().newValue;
                Objects.requireNonNull(valueAndTimestamp, "This processor should never see a null newValue.");
                SubscriptionWrapper value = (SubscriptionWrapper)valueAndTimestamp.value();
                if (value.getVersion() > 1) {
                    throw new UnsupportedVersionException("SubscriptionWrapper is of an incompatible version.");
                }
                ValueAndTimestamp foreignValueAndTime = this.foreignValues.get(record.key().getForeignKey());
                long resultTimestamp = foreignValueAndTime == null ? valueAndTimestamp.timestamp() : Math.max(valueAndTimestamp.timestamp(), foreignValueAndTime.timestamp());
                switch (value.getInstruction()) {
                    case DELETE_KEY_AND_PROPAGATE: {
                        this.context().forward(record.withKey(record.key().getPrimaryKey()).withValue(new SubscriptionResponseWrapper<Object>(value.getHash(), null, value.getPrimaryPartition())).withTimestamp(resultTimestamp));
                        break;
                    }
                    case PROPAGATE_NULL_IF_NO_FK_VAL_AVAILABLE: {
                        Object valueToSend = foreignValueAndTime == null ? null : (Object)foreignValueAndTime.value();
                        this.context().forward(record.withKey(record.key().getPrimaryKey()).withValue(new SubscriptionResponseWrapper<Object>(value.getHash(), valueToSend, value.getPrimaryPartition())).withTimestamp(resultTimestamp));
                        break;
                    }
                    case PROPAGATE_ONLY_IF_FK_VAL_AVAILABLE: {
                        if (foreignValueAndTime == null) break;
                        this.context().forward(record.withKey(record.key().getPrimaryKey()).withValue(new SubscriptionResponseWrapper(value.getHash(), foreignValueAndTime.value(), value.getPrimaryPartition())).withTimestamp(resultTimestamp));
                        break;
                    }
                    case DELETE_KEY_NO_PROPAGATE: {
                        break;
                    }
                    default: {
                        throw new IllegalStateException("Unhandled instruction: " + (Object)((Object)value.getInstruction()));
                    }
                }
            }
        };
    }
}

