/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.windowing.evictors;

import java.util.Iterator;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;

@PublicEvolving
public class TimeEvictor<W extends Window>
implements Evictor<Object, W> {
    private static final long serialVersionUID = 1L;
    private final long windowSize;
    private final boolean doEvictAfter;

    public TimeEvictor(long windowSize) {
        this.windowSize = windowSize;
        this.doEvictAfter = false;
    }

    public TimeEvictor(long windowSize, boolean doEvictAfter) {
        this.windowSize = windowSize;
        this.doEvictAfter = doEvictAfter;
    }

    @Override
    public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {
        if (!this.doEvictAfter) {
            this.evict(elements, size, ctx);
        }
    }

    @Override
    public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, Evictor.EvictorContext ctx) {
        if (this.doEvictAfter) {
            this.evict(elements, size, ctx);
        }
    }

    private void evict(Iterable<TimestampedValue<Object>> elements, int size, Evictor.EvictorContext ctx) {
        if (!this.hasTimestamp(elements)) {
            return;
        }
        long currentTime = this.getMaxTimestamp(elements);
        long evictCutoff = currentTime - this.windowSize;
        Iterator<TimestampedValue<Object>> iterator = elements.iterator();
        while (iterator.hasNext()) {
            TimestampedValue<Object> record = iterator.next();
            if (record.getTimestamp() > evictCutoff) continue;
            iterator.remove();
        }
    }

    private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
        Iterator<TimestampedValue<Object>> it = elements.iterator();
        if (it.hasNext()) {
            return it.next().hasTimestamp();
        }
        return false;
    }

    private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
        long currentTime = Long.MIN_VALUE;
        for (TimestampedValue<Object> record : elements) {
            currentTime = Math.max(currentTime, record.getTimestamp());
        }
        return currentTime;
    }

    public String toString() {
        return "TimeEvictor(" + this.windowSize + ")";
    }

    @VisibleForTesting
    public long getWindowSize() {
        return this.windowSize;
    }

    public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
        return new TimeEvictor<W>(windowSize.toMilliseconds());
    }

    public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {
        return new TimeEvictor<W>(windowSize.toMilliseconds(), doEvictAfter);
    }
}

