/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.fn.harness.state;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.fn.harness.state.BeamFnStateClient;
import org.apache.beam.fn.harness.state.StateFetchingIterators;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterables;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

public class MultimapUserState<K, V> {
    private final Cache<?, ?> cache;
    private final BeamFnStateClient beamFnStateClient;
    private final Coder<K> mapKeyCoder;
    private final Coder<V> valueCoder;
    private final BeamFnApi.StateRequest keysStateRequest;
    private final BeamFnApi.StateRequest userStateRequest;
    private final StateFetchingIterators.CachingStateIterable<K> persistedKeys;
    private boolean isClosed;
    private boolean isCleared;
    private HashMap<Object, K> pendingRemoves = Maps.newHashMap();
    private HashMap<Object, KV<K, List<V>>> pendingAdds = Maps.newHashMap();
    private HashMap<Object, KV<K, StateFetchingIterators.CachingStateIterable<V>>> persistedValues = Maps.newHashMap();

    public MultimapUserState(Cache<?, ?> cache, BeamFnStateClient beamFnStateClient, String instructionId, BeamFnApi.StateKey stateKey, Coder<K> mapKeyCoder, Coder<V> valueCoder) {
        Preconditions.checkArgument(stateKey.hasMultimapKeysUserState(), "Expected MultimapKeysUserState StateKey but received %s.", (Object)stateKey);
        this.cache = cache;
        this.beamFnStateClient = beamFnStateClient;
        this.mapKeyCoder = mapKeyCoder;
        this.valueCoder = valueCoder;
        this.keysStateRequest = BeamFnApi.StateRequest.newBuilder().setInstructionId(instructionId).setStateKey(stateKey).build();
        this.persistedKeys = StateFetchingIterators.readAllAndDecodeStartingFrom(cache, beamFnStateClient, this.keysStateRequest, mapKeyCoder);
        BeamFnApi.StateRequest.Builder userStateRequestBuilder = BeamFnApi.StateRequest.newBuilder();
        userStateRequestBuilder.setInstructionId(instructionId).getStateKeyBuilder().getMultimapUserStateBuilder().setTransformId(stateKey.getMultimapKeysUserState().getTransformId()).setUserStateId(stateKey.getMultimapKeysUserState().getUserStateId()).setWindow(stateKey.getMultimapKeysUserState().getWindow()).setKey(stateKey.getMultimapKeysUserState().getKey());
        this.userStateRequest = userStateRequestBuilder.build();
    }

    public void clear() {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        this.isCleared = true;
        this.persistedValues = Maps.newHashMap();
        this.pendingRemoves = Maps.newHashMap();
        this.pendingAdds = Maps.newHashMap();
    }

    public PrefetchableIterable<V> get(K key) {
        PrefetchableIterable<Object> pendingValues;
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        Object structuralKey = this.mapKeyCoder.structuralValue(key);
        KV<K, List<V>> pendingAddValues = this.pendingAdds.get(structuralKey);
        PrefetchableIterable<Object> prefetchableIterable = pendingValues = pendingAddValues == null ? PrefetchableIterables.fromArray(new Object[0]) : PrefetchableIterables.limit((Iterable)pendingAddValues.getValue(), pendingAddValues.getValue().size());
        if (this.isCleared || this.pendingRemoves.containsKey(structuralKey)) {
            return pendingValues;
        }
        return PrefetchableIterables.concat(this.getPersistedValues(structuralKey, key), pendingValues);
    }

    public PrefetchableIterable<K> keys() {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        if (this.isCleared) {
            ArrayList<K> keys = new ArrayList<K>(this.pendingAdds.size());
            for (Map.Entry<Object, KV<K, List<V>>> entry : this.pendingAdds.entrySet()) {
                keys.add(entry.getValue().getKey());
            }
            return PrefetchableIterables.concat(keys);
        }
        final HashSet<Object> pendingRemovesNow = new HashSet<Object>(this.pendingRemoves.keySet());
        final HashMap<Object, K> pendingAddsNow = new HashMap<Object, K>();
        for (Map.Entry<Object, KV<K, List<V>>> entry : this.pendingAdds.entrySet()) {
            pendingAddsNow.put(entry.getKey(), entry.getValue().getKey());
        }
        return new PrefetchableIterables.Default<K>(){

            @Override
            public PrefetchableIterator<K> createIterator() {
                return new PrefetchableIterator<K>(){
                    PrefetchableIterator<K> persistedKeysIterator;
                    Iterator<K> pendingAddsNowIterator;
                    boolean hasNext;
                    K nextKey;
                    {
                        this.persistedKeysIterator = MultimapUserState.this.persistedKeys.iterator();
                    }

                    @Override
                    public boolean isReady() {
                        return this.persistedKeysIterator.isReady();
                    }

                    @Override
                    public void prefetch() {
                        if (!this.isReady()) {
                            this.persistedKeysIterator.prefetch();
                        }
                    }

                    @Override
                    public boolean hasNext() {
                        if (this.hasNext) {
                            return true;
                        }
                        while (this.persistedKeysIterator.hasNext()) {
                            this.nextKey = this.persistedKeysIterator.next();
                            Object nextKeyStructuralValue = MultimapUserState.this.mapKeyCoder.structuralValue(this.nextKey);
                            if (pendingRemovesNow.contains(nextKeyStructuralValue)) continue;
                            if (pendingAddsNow.containsKey(nextKeyStructuralValue)) {
                                pendingAddsNow.remove(nextKeyStructuralValue);
                            }
                            this.hasNext = true;
                            return true;
                        }
                        if (this.pendingAddsNowIterator == null) {
                            this.pendingAddsNowIterator = pendingAddsNow.values().iterator();
                        }
                        if (this.pendingAddsNowIterator.hasNext()) {
                            this.nextKey = this.pendingAddsNowIterator.next();
                            this.hasNext = true;
                            return true;
                        }
                        return false;
                    }

                    @Override
                    public K next() {
                        if (!this.hasNext()) {
                            throw new NoSuchElementException();
                        }
                        this.hasNext = false;
                        return this.nextKey;
                    }
                };
            }
        };
    }

    public void put(K key, V value) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        Object keyStructuralValue = this.mapKeyCoder.structuralValue(key);
        this.pendingAdds.putIfAbsent(keyStructuralValue, KV.of(key, new ArrayList()));
        this.pendingAdds.get(keyStructuralValue).getValue().add(value);
    }

    public void remove(K key) {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        Object keyStructuralValue = this.mapKeyCoder.structuralValue(key);
        this.pendingAdds.remove(keyStructuralValue);
        if (!this.isCleared) {
            this.pendingRemoves.put(keyStructuralValue, key);
        }
    }

    public void asyncClose() throws Exception {
        Preconditions.checkState(!this.isClosed, "Multimap user state is no longer usable because it is closed for %s", (Object)this.keysStateRequest.getStateKey());
        this.isClosed = true;
        if (!this.isCleared && this.pendingRemoves.isEmpty() && this.pendingAdds.isEmpty()) {
            return;
        }
        this.startStateApiWrites();
        this.updateCache();
    }

    private void startStateApiWrites() {
        BeamFnApi.StateRequest request;
        if (this.isCleared) {
            this.beamFnStateClient.handle(this.keysStateRequest.toBuilder().setClear(BeamFnApi.StateClearRequest.getDefaultInstance()));
        } else if (!this.pendingRemoves.isEmpty()) {
            for (K k : this.pendingRemoves.values()) {
                request = this.createUserStateRequest(k);
                this.beamFnStateClient.handle(request.toBuilder().setClear(BeamFnApi.StateClearRequest.getDefaultInstance()));
            }
        }
        if (!this.pendingAdds.isEmpty()) {
            for (KV kV : this.pendingAdds.values()) {
                request = this.createUserStateRequest(kV.getKey());
                this.beamFnStateClient.handle(request.toBuilder().setAppend(BeamFnApi.StateAppendRequest.newBuilder().setData(this.encodeValues((Iterable)kV.getValue()))));
            }
        }
    }

    private void updateCache() {
        ArrayList<K> pendingAddsKeys = new ArrayList<K>(this.pendingAdds.size());
        for (KV<K, List<V>> kV : this.pendingAdds.values()) {
            pendingAddsKeys.add(kV.getKey());
        }
        if (this.isCleared) {
            this.persistedKeys.clearAndAppend(pendingAddsKeys);
            for (Map.Entry entry : this.pendingAdds.entrySet()) {
                StateFetchingIterators.CachingStateIterable<V> iterable = this.getPersistedValues(entry.getKey(), ((KV)entry.getValue()).getKey());
                iterable.clearAndAppend((List)((KV)entry.getValue()).getValue());
            }
        } else {
            this.persistedKeys.remove(this.pendingRemoves.keySet());
            this.persistedKeys.append(pendingAddsKeys);
            for (Map.Entry entry : this.pendingRemoves.entrySet()) {
                StateFetchingIterators.CachingStateIterable<V> iterable = this.getPersistedValues(entry.getKey(), entry.getValue());
                iterable.clearAndAppend(Collections.emptyList());
            }
            for (Map.Entry entry : this.pendingAdds.entrySet()) {
                KV<K, StateFetchingIterators.CachingStateIterable<V>> value = this.persistedValues.get(entry.getKey());
                if (value == null) continue;
                value.getValue().append((List)((KV)entry.getValue()).getValue());
            }
        }
    }

    private ByteString encodeValues(Iterable<V> values) {
        try {
            ByteString.Output output = ByteString.newOutput();
            for (V value : values) {
                this.valueCoder.encode(value, output);
            }
            return output.toByteString();
        }
        catch (IOException e) {
            throw new IllegalStateException(String.format("Failed to encode values for multimap user state id %s.", this.keysStateRequest.getStateKey().getMultimapKeysUserState().getUserStateId()), e);
        }
    }

    private BeamFnApi.StateRequest createUserStateRequest(K key) {
        try {
            ByteString.Output output = ByteString.newOutput();
            this.mapKeyCoder.encode(key, output);
            BeamFnApi.StateRequest.Builder request = this.userStateRequest.toBuilder();
            request.getStateKeyBuilder().getMultimapUserStateBuilder().setMapKey(output.toByteString());
            return request.build();
        }
        catch (IOException e) {
            throw new IllegalStateException(String.format("Failed to encode key for multimap user state id %s.", this.keysStateRequest.getStateKey().getMultimapKeysUserState().getUserStateId()), e);
        }
    }

    private StateFetchingIterators.CachingStateIterable<V> getPersistedValues(Object structuralKey, K key) {
        return (StateFetchingIterators.CachingStateIterable)this.persistedValues.computeIfAbsent(structuralKey, unused -> {
            BeamFnApi.StateRequest request = this.createUserStateRequest(key);
            return KV.of(key, StateFetchingIterators.readAllAndDecodeStartingFrom(Caches.subCache(this.cache, "ValuesForKey", request.getStateKey().getMultimapUserState().getMapKey()), this.beamFnStateClient, request, this.valueCoder));
        }).getValue();
    }
}

