/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.internals.AbstractDualSchemaRocksDBSegmentedBytesStore;
import org.apache.kafka.streams.state.internals.KeyValueIterators;
import org.apache.kafka.streams.state.internals.KeyValueSegment;
import org.apache.kafka.streams.state.internals.KeyValueSegments;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.SegmentIterator;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.streams.state.internals.WindowKeySchema;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocksDBTimeOrderedSegmentedBytesStore
extends AbstractDualSchemaRocksDBSegmentedBytesStore<KeyValueSegment> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractDualSchemaRocksDBSegmentedBytesStore.class);

    RocksDBTimeOrderedSegmentedBytesStore(String name, String metricsScope, long retention, long segmentInterval, boolean withIndex) {
        super(name, metricsScope, new PrefixedWindowKeySchemas.TimeFirstWindowKeySchema(), Optional.ofNullable(withIndex ? new PrefixedWindowKeySchemas.KeyFirstWindowKeySchema() : null), new KeyValueSegments(name, metricsScope, retention, segmentInterval));
    }

    public void put(Bytes key, long timestamp, int seqnum, byte[] value) {
        Bytes baseKey = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum);
        this.put(baseKey, value);
    }

    byte[] fetch(Bytes key, long timestamp, int seqnum) {
        return this.get(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum));
    }

    @Override
    protected KeyValue<Bytes, byte[]> getIndexKeyValue(Bytes baseKey, byte[] baseValue) {
        byte[] key = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreKeyBytes(baseKey.get());
        long timestamp = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreTimestamp(baseKey.get());
        int seqnum = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.extractStoreSequence(baseKey.get());
        return KeyValue.pair(PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.toStoreKeyBinary(key, timestamp, seqnum), new byte[0]);
    }

    @Override
    Map<KeyValueSegment, WriteBatch> getWriteBatches(Collection<ConsumerRecord<byte[], byte[]>> records) {
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long timestamp = WindowKeySchema.extractStoreTimestamp((byte[])record.key());
            this.observedStreamTime = Math.max(this.observedStreamTime, timestamp);
        }
        HashMap<KeyValueSegment, WriteBatch> writeBatchMap = new HashMap<KeyValueSegment, WriteBatch>();
        for (ConsumerRecord<byte[], byte[]> record : records) {
            long timestamp = WindowKeySchema.extractStoreTimestamp((byte[])record.key());
            long segmentId = this.segments.segmentId(timestamp);
            KeyValueSegment segment = (KeyValueSegment)this.segments.getOrCreateSegmentIfLive(segmentId, this.context, this.observedStreamTime);
            if (segment == null) continue;
            ChangelogRecordDeserializationHelper.applyChecksAndUpdatePosition(record, this.consistencyEnabled, this.position);
            try {
                WriteBatch batch = writeBatchMap.computeIfAbsent(segment, s -> new WriteBatch());
                if (this.hasIndex()) {
                    byte[] indexKey = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.fromNonPrefixWindowKey((byte[])record.key());
                    byte[] value = record.value() == null ? null : new byte[]{};
                    segment.addToBatch(new KeyValue<byte[], byte[]>(indexKey, value), batch);
                }
                byte[] baseKey = PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.fromNonPrefixWindowKey((byte[])record.key());
                segment.addToBatch(new KeyValue<byte[], Object>(baseKey, record.value()), batch);
            }
            catch (RocksDBException e) {
                throw new ProcessorStateException("Error restoring batch to store " + this.name(), e);
            }
        }
        return writeBatchMap;
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to) {
        return this.fetch(key, from, to, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardFetch(Bytes key, long from, long to) {
        return this.fetch(key, from, to, false);
    }

    KeyValueIterator<Bytes, byte[]> fetch(Bytes key, long from, long to, boolean forward) {
        if (this.indexKeySchema.isPresent()) {
            List searchSpace = ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).segmentsToSearch(this.segments, from, to, forward);
            Bytes binaryFrom = ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).lowerRangeFixedSize(key, from);
            Bytes binaryTo = ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).upperRangeFixedSize(key, to);
            return new IndexToBaseStoreIterator(new SegmentIterator(searchSpace.iterator(), ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).hasNextCondition(key, key, from, to, forward), binaryFrom, binaryTo, forward));
        }
        List searchSpace = this.baseKeySchema.segmentsToSearch(this.segments, from, to, forward);
        Bytes binaryFrom = this.baseKeySchema.lowerRangeFixedSize(key, from);
        Bytes binaryTo = this.baseKeySchema.upperRangeFixedSize(key, to);
        return new SegmentIterator(searchSpace.iterator(), this.baseKeySchema.hasNextCondition(key, key, from, to, forward), binaryFrom, binaryTo, forward);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        return this.fetch(keyFrom, keyTo, from, to, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardFetch(Bytes keyFrom, Bytes keyTo, long from, long to) {
        return this.fetch(keyFrom, keyTo, from, to, false);
    }

    KeyValueIterator<Bytes, byte[]> fetch(Bytes keyFrom, Bytes keyTo, long from, long to, boolean forward) {
        if (keyFrom != null && keyTo != null && keyFrom.compareTo(keyTo) > 0) {
            LOG.warn("Returning empty iterator for fetch with invalid key range: from > to. This may be due to range arguments set in the wrong order, or serdes that don't preserve ordering when lexicographically comparing the serialized bytes. Note that the built-in numerical serdes do not follow this for negative numbers");
            return KeyValueIterators.emptyIterator();
        }
        if (this.indexKeySchema.isPresent()) {
            List searchSpace = ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).segmentsToSearch(this.segments, from, to, forward);
            Bytes binaryFrom = ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).lowerRange(keyFrom, from);
            Bytes binaryTo = ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).upperRange(keyTo, to);
            return new IndexToBaseStoreIterator(new SegmentIterator(searchSpace.iterator(), ((SegmentedBytesStore.KeySchema)this.indexKeySchema.get()).hasNextCondition(keyFrom, keyTo, from, to, forward), binaryFrom, binaryTo, forward));
        }
        List searchSpace = this.baseKeySchema.segmentsToSearch(this.segments, from, to, forward);
        Bytes binaryFrom = this.baseKeySchema.lowerRange(keyFrom, from);
        Bytes binaryTo = this.baseKeySchema.upperRange(keyTo, to);
        return new SegmentIterator(searchSpace.iterator(), this.baseKeySchema.hasNextCondition(keyFrom, keyTo, from, to, forward), binaryFrom, binaryTo, forward);
    }

    @Override
    public void remove(Bytes key, long timestamp) {
        throw new UnsupportedOperationException("Not supported operation");
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> fetchAll(long timeFrom, long timeTo) {
        List searchSpace = this.segments.segments(timeFrom, timeTo, true);
        Bytes binaryFrom = this.baseKeySchema.lowerRange(null, timeFrom);
        Bytes binaryTo = this.baseKeySchema.upperRange(null, timeTo);
        return new SegmentIterator(searchSpace.iterator(), this.baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, true), binaryFrom, binaryTo, true);
    }

    @Override
    public KeyValueIterator<Bytes, byte[]> backwardFetchAll(long timeFrom, long timeTo) {
        List searchSpace = this.segments.segments(timeFrom, timeTo, false);
        Bytes binaryFrom = this.baseKeySchema.lowerRange(null, timeFrom);
        Bytes binaryTo = this.baseKeySchema.upperRange(null, timeTo);
        return new SegmentIterator(searchSpace.iterator(), this.baseKeySchema.hasNextCondition(null, null, timeFrom, timeTo, false), binaryFrom, binaryTo, false);
    }

    private class IndexToBaseStoreIterator
    implements KeyValueIterator<Bytes, byte[]> {
        private final KeyValueIterator<Bytes, byte[]> indexIterator;
        private byte[] cachedValue;

        IndexToBaseStoreIterator(KeyValueIterator<Bytes, byte[]> indexIterator) {
            this.indexIterator = indexIterator;
        }

        @Override
        public void close() {
            this.indexIterator.close();
        }

        @Override
        public Bytes peekNextKey() {
            if (!this.hasNext()) {
                throw new NoSuchElementException();
            }
            return this.getBaseKey(this.indexIterator.peekNextKey());
        }

        @Override
        public boolean hasNext() {
            while (this.indexIterator.hasNext()) {
                Bytes key = this.indexIterator.peekNextKey();
                Bytes baseKey = this.getBaseKey(key);
                this.cachedValue = RocksDBTimeOrderedSegmentedBytesStore.this.get(baseKey);
                if (this.cachedValue == null) {
                    this.indexIterator.next();
                    RocksDBTimeOrderedSegmentedBytesStore.this.removeIndex(key);
                    continue;
                }
                return true;
            }
            return false;
        }

        @Override
        public KeyValue<Bytes, byte[]> next() {
            if (this.cachedValue == null && !this.hasNext()) {
                throw new NoSuchElementException();
            }
            KeyValue ret = (KeyValue)this.indexIterator.next();
            byte[] value = this.cachedValue;
            this.cachedValue = null;
            return KeyValue.pair(this.getBaseKey((Bytes)ret.key), value);
        }

        private Bytes getBaseKey(Bytes indexKey) {
            byte[] keyBytes = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.extractStoreKeyBytes(indexKey.get());
            long timestamp = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.extractStoreTimestamp(indexKey.get());
            int seqnum = PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.extractStoreSequence(indexKey.get());
            return PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(keyBytes, timestamp, seqnum);
        }
    }
}

