/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.fnexecution.translation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.SideInputHandler;
import org.apache.beam.runners.core.construction.graph.ExecutableStage;
import org.apache.beam.runners.core.construction.graph.SideInputReference;
import org.apache.beam.runners.fnexecution.state.StateRequestHandlers;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;

public class StreamingSideInputHandlerFactory
implements StateRequestHandlers.SideInputHandlerFactory {
    private final Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputToCollection;
    private final SideInputHandler runnerHandler;

    public static StreamingSideInputHandlerFactory forStage(ExecutableStage stage, Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> viewMapping, SideInputHandler runnerHandler) {
        ImmutableMap.Builder sideInputBuilder = ImmutableMap.builder();
        for (SideInputReference sideInput : stage.getSideInputs()) {
            RunnerApi.ExecutableStagePayload.SideInputId sideInputId = RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(sideInput.transform().getId()).setLocalName(sideInput.localName()).build();
            sideInputBuilder.put((Object)sideInputId, (Object)((PCollectionView)Preconditions.checkNotNull(viewMapping.get(sideInputId), (String)"No side input for %s/%s", (Object)sideInputId.getTransformId(), (Object)sideInputId.getLocalName())));
        }
        StreamingSideInputHandlerFactory factory = new StreamingSideInputHandlerFactory((Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>>)sideInputBuilder.build(), runnerHandler);
        return factory;
    }

    private StreamingSideInputHandlerFactory(Map<RunnerApi.ExecutableStagePayload.SideInputId, PCollectionView<?>> sideInputToCollection, SideInputHandler runnerHandler) {
        this.sideInputToCollection = sideInputToCollection;
        this.runnerHandler = runnerHandler;
    }

    @Override
    public <V, W extends BoundedWindow> StateRequestHandlers.IterableSideInputHandler<V, W> forIterableSideInput(String transformId, String sideInputId, final Coder<V> elementCoder, Coder<W> windowCoder) {
        final PCollectionView<?> collectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
        Preconditions.checkArgument((collectionNode != null ? 1 : 0) != 0, (String)"No side input for %s/%s", (Object)transformId, (Object)sideInputId);
        return new StateRequestHandlers.IterableSideInputHandler<V, W>(){

            @Override
            public Iterable<V> get(W window) {
                return (Iterable)Preconditions.checkNotNull((Object)StreamingSideInputHandlerFactory.this.runnerHandler.getIterable(collectionNode, window), (Object)"Element processed by SDK before side input is ready");
            }

            @Override
            public Coder<V> elementCoder() {
                return elementCoder;
            }
        };
    }

    @Override
    public <K, V, W extends BoundedWindow> StateRequestHandlers.MultimapSideInputHandler<K, V, W> forMultimapSideInput(String transformId, String sideInputId, final KvCoder<K, V> elementCoder, Coder<W> windowCoder) {
        final PCollectionView<?> collectionNode = this.sideInputToCollection.get(RunnerApi.ExecutableStagePayload.SideInputId.newBuilder().setTransformId(transformId).setLocalName(sideInputId).build());
        Preconditions.checkArgument((collectionNode != null ? 1 : 0) != 0, (String)"No side input for %s/%s", (Object)transformId, (Object)sideInputId);
        return new StateRequestHandlers.MultimapSideInputHandler<K, V, W>(){

            @Override
            public Iterable<V> get(K key, W window) {
                Iterable values = StreamingSideInputHandlerFactory.this.runnerHandler.getIterable(collectionNode, window);
                Object structuralK = this.keyCoder().structuralValue(key);
                ArrayList<Object> result = new ArrayList<Object>();
                for (KV kv : values) {
                    if (!structuralK.equals(this.keyCoder().structuralValue(kv.getKey()))) continue;
                    result.add(kv.getValue());
                }
                return Collections.unmodifiableList(result);
            }

            @Override
            public Iterable<K> get(W window) {
                Iterable values = StreamingSideInputHandlerFactory.this.runnerHandler.getIterable(collectionNode, window);
                HashMap<Object, Object> result = new HashMap<Object, Object>();
                for (KV kv : values) {
                    result.putIfAbsent(this.keyCoder().structuralValue(kv.getKey()), kv.getKey());
                }
                return Collections.unmodifiableCollection(result.values());
            }

            @Override
            public Coder<K> keyCoder() {
                return elementCoder.getKeyCoder();
            }

            @Override
            public Coder<V> valueCoder() {
                return elementCoder.getValueCoder();
            }
        };
    }
}

