/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state.changelog.inmemory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.changelog.inmemory.InMemoryStateChangelogHandle;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
class InMemoryStateChangelogWriter
implements StateChangelogWriter<InMemoryStateChangelogHandle> {
    private static final Logger LOG = LoggerFactory.getLogger(InMemoryStateChangelogWriter.class);
    private final Map<Integer, NavigableMap<SequenceNumber, byte[]>> changesByKeyGroup = new HashMap<Integer, NavigableMap<SequenceNumber, byte[]>>();
    private long sqn = 0L;
    private boolean closed;

    InMemoryStateChangelogWriter() {
    }

    @Override
    public void append(int keyGroup, byte[] value) {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0, (Object)"LogWriter is closed");
        LOG.trace("append, keyGroup={}, {} bytes", (Object)keyGroup, (Object)value.length);
        this.changesByKeyGroup.computeIfAbsent(keyGroup, unused -> new TreeMap()).put(SequenceNumber.of(++this.sqn), value);
    }

    @Override
    public SequenceNumber lastAppendedSequenceNumber() {
        return SequenceNumber.of(this.sqn);
    }

    @Override
    public CompletableFuture<InMemoryStateChangelogHandle> persist(SequenceNumber from) {
        LOG.debug("Persist after {}", (Object)from);
        Preconditions.checkNotNull((Object)from);
        return CompletableFuture.completedFuture(new InMemoryStateChangelogHandle(this.collectChanges(from)));
    }

    private Map<Integer, List<byte[]>> collectChanges(SequenceNumber after2) {
        return this.changesByKeyGroup.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, kv -> new ArrayList(((NavigableMap)kv.getValue()).tailMap(after2, true).values())));
    }

    @Override
    public void close() {
        Preconditions.checkState((!this.closed ? 1 : 0) != 0);
        this.closed = true;
    }

    @Override
    public void truncate(SequenceNumber before) {
        this.changesByKeyGroup.forEach((k, v) -> {});
    }

    @Override
    public void confirm(SequenceNumber from, SequenceNumber to) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void reset(SequenceNumber from, SequenceNumber to) {
        throw new UnsupportedOperationException();
    }
}

