/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.spark.io;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.Queue;
import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableDuration;
import org.joda.time.ReadableInstant;

public final class CreateStream<@UnknownKeyFor T>
extends PTransform<PBegin, PCollection<T>> {
    public static final @UnknownKeyFor @NonNull @Initialized String TRANSFORM_URN = "beam:transform:spark:createstream:v1";
    private final @UnknownKeyFor @NonNull @Initialized Duration batchDuration;
    private final @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized TimestampedValue<T>>> batches = new ArrayDeque<Iterable<TimestampedValue<T>>>();
    private final @UnknownKeyFor @NonNull @Initialized Deque<@UnknownKeyFor @NonNull @Initialized GlobalWatermarkHolder.SparkWatermarks> times = new ArrayDeque<GlobalWatermarkHolder.SparkWatermarks>();
    private final @UnknownKeyFor @NonNull @Initialized Coder<T> coder;
    private @UnknownKeyFor @NonNull @Initialized Instant initialSystemTime;
    private final @UnknownKeyFor @NonNull @Initialized boolean forceWatermarkSync;
    private @UnknownKeyFor @NonNull @Initialized Instant lowWatermark = BoundedWindow.TIMESTAMP_MIN_VALUE;

    private CreateStream(@UnknownKeyFor @NonNull @Initialized Duration batchDuration, @UnknownKeyFor @NonNull @Initialized Instant initialSystemTime, @UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized boolean forceWatermarkSync) {
        this.batchDuration = batchDuration;
        this.initialSystemTime = initialSystemTime;
        this.coder = coder;
        this.forceWatermarkSync = forceWatermarkSync;
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized CreateStream<T> of(@UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized Duration batchDuration, @UnknownKeyFor @NonNull @Initialized boolean forceWatermarkSync) {
        return new CreateStream<T>(batchDuration, new Instant(0L), coder, forceWatermarkSync);
    }

    public static <T> @UnknownKeyFor @NonNull @Initialized CreateStream<T> of(@UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized Duration batchDuration) {
        return CreateStream.of(coder, batchDuration, true);
    }

    @SafeVarargs
    public final @UnknownKeyFor @NonNull @Initialized CreateStream<T> nextBatch(TimestampedValue<T> ... batchElements) {
        for (TimestampedValue<T> timestampedValue : batchElements) {
            Preconditions.checkArgument((boolean)timestampedValue.getTimestamp().isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), (String)"Elements must have timestamps before %s. Got: %s", (Object)BoundedWindow.TIMESTAMP_MAX_VALUE, (Object)timestampedValue.getTimestamp());
        }
        this.batches.offer(Arrays.asList(batchElements));
        return this;
    }

    @SafeVarargs
    public final @UnknownKeyFor @NonNull @Initialized CreateStream<T> nextBatch(T ... batchElements) {
        ArrayList timestamped = Lists.newArrayListWithCapacity((int)batchElements.length);
        for (T element : batchElements) {
            timestamped.add(TimestampedValue.atMinimumTimestamp(element));
        }
        this.batches.offer(timestamped);
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized CreateStream<T> emptyBatch() {
        this.batches.offer(Collections.emptyList());
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized CreateStream<T> initialSystemTimeAt(@UnknownKeyFor @NonNull @Initialized Instant initialSystemTime) {
        this.initialSystemTime = initialSystemTime;
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized CreateStream<T> advanceWatermarkForNextBatch(@UnknownKeyFor @NonNull @Initialized Instant newWatermark) {
        Preconditions.checkArgument((!newWatermark.isBefore((ReadableInstant)this.lowWatermark) ? 1 : 0) != 0, (Object)"The watermark is not allowed to decrease!");
        Preconditions.checkArgument((boolean)newWatermark.isBefore((ReadableInstant)BoundedWindow.TIMESTAMP_MAX_VALUE), (String)"The Watermark cannot progress beyond the maximum. Got: %s. Maximum: %s", (Object)newWatermark, (Object)BoundedWindow.TIMESTAMP_MAX_VALUE);
        return this.advance(newWatermark);
    }

    public @UnknownKeyFor @NonNull @Initialized CreateStream<T> advanceNextBatchWatermarkToInfinity() {
        return this.advance(BoundedWindow.TIMESTAMP_MAX_VALUE);
    }

    private @UnknownKeyFor @NonNull @Initialized CreateStream<T> advance(@UnknownKeyFor @NonNull @Initialized Instant newWatermark) {
        Instant currentSynchronizedProcessingTime = this.times.peekLast() == null ? this.initialSystemTime : this.times.peekLast().getSynchronizedProcessingTime();
        Instant nextSynchronizedProcessingTime = currentSynchronizedProcessingTime.plus((ReadableDuration)this.batchDuration);
        Preconditions.checkArgument((boolean)nextSynchronizedProcessingTime.isAfter((ReadableInstant)currentSynchronizedProcessingTime), (Object)"Synchronized processing time must always advance.");
        this.times.offer(new GlobalWatermarkHolder.SparkWatermarks(this.lowWatermark, newWatermark, nextSynchronizedProcessingTime));
        this.lowWatermark = newWatermark;
        return this;
    }

    public @UnknownKeyFor @NonNull @Initialized long getBatchDuration() {
        return this.batchDuration.getMillis();
    }

    public @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized Iterable<@UnknownKeyFor @NonNull @Initialized TimestampedValue<T>>> getBatches() {
        return this.batches;
    }

    public @UnknownKeyFor @NonNull @Initialized Queue<@UnknownKeyFor @NonNull @Initialized GlobalWatermarkHolder.SparkWatermarks> getTimes() {
        return this.times;
    }

    public @UnknownKeyFor @NonNull @Initialized boolean isForceWatermarkSync() {
        return this.forceWatermarkSync;
    }

    public @UnknownKeyFor @NonNull @Initialized PCollection<T> expand(@UnknownKeyFor @NonNull @Initialized PBegin input) {
        return PCollection.createPrimitiveOutputInternal((Pipeline)input.getPipeline(), (WindowingStrategy)WindowingStrategy.globalDefault(), (PCollection.IsBounded)PCollection.IsBounded.UNBOUNDED, this.coder);
    }
}

