/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.AbstractStream;
import org.apache.kafka.streams.kstream.internals.FullTimeWindowedSerde;
import org.apache.kafka.streams.kstream.internals.GroupedStreamAggregateBuilder;
import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
import org.apache.kafka.streams.kstream.internals.KStreamWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KStreamWindowReduce;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.kstream.internals.graph.StreamsGraphNode;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.WindowStore;

public class TimeWindowedKStreamImpl<K, V, W extends Window>
extends AbstractStream<K, V>
implements TimeWindowedKStream<K, V> {
    private final Windows<W> windows;
    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;

    TimeWindowedKStreamImpl(Windows<W> windows, InternalStreamsBuilder builder, Set<String> sourceNodes, String name, Serde<K> keySerde, Serde<V> valSerde, GroupedStreamAggregateBuilder<K, V> aggregateBuilder, StreamsGraphNode streamsGraphNode) {
        super(name, keySerde, valSerde, sourceNodes, streamsGraphNode, builder);
        this.windows = Objects.requireNonNull(windows, "windows can't be null");
        this.aggregateBuilder = aggregateBuilder;
    }

    @Override
    public KTable<Windowed<K>, Long> count() {
        return this.doCount(Materialized.with(this.keySerde, Serdes.Long()));
    }

    @Override
    public KTable<Windowed<K>, Long> count(Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(materialized, "materialized can't be null");
        if (new MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>>(materialized).storeName() == null) {
            this.builder.newStoreName("KSTREAM-AGGREGATE-");
        }
        return this.doCount(materialized);
    }

    private KTable<Windowed<K>, Long> doCount(Materialized<K, Long, WindowStore<Bytes, byte[]>> materialized) {
        MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, Long, WindowStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, "KSTREAM-AGGREGATE-");
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        if (materializedInternal.valueSerde() == null) {
            materializedInternal.withValueSerde((Serde<Long>)Serdes.Long());
        }
        return this.aggregateBuilder.build("KSTREAM-AGGREGATE-", (StoreBuilder<? extends StateStore>)((StoreBuilder<StateStore>)this.materialize(materializedInternal)), new KStreamWindowAggregate(this.windows, materializedInternal.storeName(), this.aggregateBuilder.countInitializer, this.aggregateBuilder.countAggregator), materializedInternal.isQueryable(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<K>(materializedInternal.keySerde(), this.windows.size()) : null, materializedInternal.valueSerde());
    }

    @Override
    public <VR> KTable<Windowed<K>, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator) {
        return this.aggregate(initializer, aggregator, Materialized.with(this.keySerde, null));
    }

    @Override
    public <VR> KTable<Windowed<K>, VR> aggregate(Initializer<VR> initializer, Aggregator<? super K, ? super V, VR> aggregator, Materialized<K, VR, WindowStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(initializer, "initializer can't be null");
        Objects.requireNonNull(aggregator, "aggregator can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, "KSTREAM-AGGREGATE-");
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        return this.aggregateBuilder.build("KSTREAM-AGGREGATE-", (StoreBuilder<StateStore>)this.materialize(materializedInternal), new KStreamWindowAggregate<K, V, VR, W>(this.windows, materializedInternal.storeName(), initializer, aggregator), materializedInternal.isQueryable(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<K>(materializedInternal.keySerde(), this.windows.size()) : null, materializedInternal.valueSerde());
    }

    @Override
    public KTable<Windowed<K>, V> reduce(Reducer<V> reducer) {
        return this.reduce(reducer, Materialized.with(this.keySerde, this.valSerde));
    }

    @Override
    public KTable<Windowed<K>, V> reduce(Reducer<V> reducer, Materialized<K, V, WindowStore<Bytes, byte[]>> materialized) {
        Objects.requireNonNull(reducer, "reducer can't be null");
        Objects.requireNonNull(materialized, "materialized can't be null");
        MaterializedInternal<K, V, WindowStore<Bytes, byte[]>> materializedInternal = new MaterializedInternal<K, V, WindowStore<Bytes, byte[]>>(materialized);
        materializedInternal.generateStoreNameIfNeeded(this.builder, "KSTREAM-REDUCE-");
        if (materializedInternal.keySerde() == null) {
            materializedInternal.withKeySerde(this.keySerde);
        }
        if (materializedInternal.valueSerde() == null) {
            materializedInternal.withValueSerde(this.valSerde);
        }
        return this.aggregateBuilder.build("KSTREAM-REDUCE-", (StoreBuilder<StateStore>)this.materialize(materializedInternal), new KStreamWindowReduce(this.windows, materializedInternal.storeName(), reducer), materializedInternal.isQueryable(), materializedInternal.keySerde() != null ? new FullTimeWindowedSerde<K>(materializedInternal.keySerde(), this.windows.size()) : null, materializedInternal.valueSerde());
    }

    private <VR> StoreBuilder<WindowStore<K, VR>> materialize(MaterializedInternal<K, VR, WindowStore<Bytes, byte[]>> materialized) {
        WindowBytesStoreSupplier supplier = (WindowBytesStoreSupplier)materialized.storeSupplier();
        if (supplier == null) {
            if (materialized.retention() != null) {
                long retentionPeriod = materialized.retention().toMillis();
                if (this.windows.size() + this.windows.gracePeriodMs() > retentionPeriod) {
                    throw new IllegalArgumentException("The retention period of the window store " + this.name + " must be no smaller than its window size plus the grace period. Got size=[" + this.windows.size() + "], grace=[" + this.windows.gracePeriodMs() + "], retention=[" + retentionPeriod + "]");
                }
                supplier = Stores.persistentWindowStore(materialized.storeName(), Duration.ofMillis(retentionPeriod), Duration.ofMillis(this.windows.size()), false);
            } else {
                if (this.windows.size() + this.windows.gracePeriodMs() > this.windows.maintainMs()) {
                    throw new IllegalArgumentException("The retention period of the window store " + this.name + " must be no smaller than its window size plus the grace period. Got size=[" + this.windows.size() + "], grace=[" + this.windows.gracePeriodMs() + "], retention=[" + this.windows.maintainMs() + "]");
                }
                supplier = Stores.persistentWindowStore(materialized.storeName(), this.windows.maintainMs(), this.windows.segments, this.windows.size(), false);
            }
        }
        StoreBuilder<WindowStore<K, VR>> builder = Stores.windowStoreBuilder(supplier, materialized.keySerde(), materialized.valueSerde());
        if (materialized.loggingEnabled()) {
            builder.withLoggingEnabled(materialized.logConfig());
        } else {
            builder.withLoggingDisabled();
        }
        if (materialized.cachingEnabled()) {
            builder.withCachingEnabled();
        }
        return builder;
    }
}

