/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.repackaged.direct_java.runners.fnexecution.control;

import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespaces;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.core.construction.Timer;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.OutputReceiverFactory;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.ProcessBundleDescriptors;
import org.apache.beam.repackaged.direct_java.runners.fnexecution.control.StageBundleFactory;
import org.apache.beam.repackaged.direct_java.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TimerReceiverFactory
implements OutputReceiverFactory {
    private static final Logger LOG = LoggerFactory.getLogger(TimerReceiverFactory.class);
    private final HashMap<String, ProcessBundleDescriptors.TimerSpec> timerOutputIdToSpecMap = new HashMap();
    private final BiConsumer<WindowedValue, TimerInternals.TimerData> timerDataConsumer;
    private final Coder windowCoder;

    public TimerReceiverFactory(StageBundleFactory stageBundleFactory, BiConsumer<WindowedValue, TimerInternals.TimerData> timerDataConsumer, Coder windowCoder) {
        for (Map<String, ProcessBundleDescriptors.TimerSpec> transformTimerMap : stageBundleFactory.getProcessBundleDescriptor().getTimerSpecs().values()) {
            for (ProcessBundleDescriptors.TimerSpec timerSpec : transformTimerMap.values()) {
                this.timerOutputIdToSpecMap.put(timerSpec.outputCollectionId(), timerSpec);
            }
        }
        this.timerDataConsumer = timerDataConsumer;
        this.windowCoder = windowCoder;
    }

    @Override
    public <OutputT> FnDataReceiver<OutputT> create(String pCollectionId) {
        ProcessBundleDescriptors.TimerSpec timerSpec = this.timerOutputIdToSpecMap.get(pCollectionId);
        return receivedElement -> {
            WindowedValue windowedValue = (WindowedValue)receivedElement;
            Timer timer = (Timer)Preconditions.checkNotNull((Object)((Timer)((KV)windowedValue.getValue()).getValue()), (String)"Received null Timer from SDK harness: %s", (Object)receivedElement);
            LOG.debug("Timer received: {} {}", (Object)pCollectionId, (Object)timer);
            for (Object window : windowedValue.getWindows()) {
                StateNamespace namespace = StateNamespaces.window(this.windowCoder, (BoundedWindow)window);
                TimeDomain timeDomain = timerSpec.getTimerSpec().getTimeDomain();
                String timerId = timerSpec.inputCollectionId();
                TimerInternals.TimerData timerData = TimerInternals.TimerData.of(timerId, namespace, timer.getTimestamp(), timeDomain);
                this.timerDataConsumer.accept(windowedValue, timerData);
            }
        };
    }
}

