/*
 * Decompiled with CFR 0.152.
 */
package io.axoniq.axonserver.connector.event.transformation.impl;

import io.axoniq.axonserver.connector.event.transformation.Appender;
import io.axoniq.axonserver.connector.event.transformation.impl.EventTransformationService;
import io.axoniq.axonserver.connector.event.transformation.impl.TransformationStreamCompletedException;
import io.axoniq.axonserver.grpc.event.Event;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

public class TransformationStreamAppender
implements Appender {
    private final AtomicLong sequence;
    private final AtomicLong ackSequence = new AtomicLong(-1L);
    private final EventTransformationService.TransformationStream transformationStream;
    private final AtomicReference<CompletableFuture<Long>> completeFuture = new AtomicReference();

    TransformationStreamAppender(EventTransformationService.TransformationStream transformationStream, long currentSequence) {
        this.transformationStream = transformationStream;
        this.sequence = new AtomicLong(currentSequence);
        this.transformationStream.onCompletedByServer(this::completedByServer);
    }

    @Override
    public CompletableFuture<Appender> deleteEvent(long token) {
        this.checkCompleted();
        long seq = this.sequence.incrementAndGet();
        return ((CompletableFuture)this.transformationStream().deleteEvent(token, seq).thenRun(() -> this.acceptAck(seq))).thenApply(unused -> this);
    }

    @Override
    public CompletableFuture<Appender> replaceEvent(long token, Event replacement) {
        this.checkCompleted();
        long seq = this.sequence.incrementAndGet();
        return ((CompletableFuture)this.transformationStream().replaceEvent(token, replacement, seq).thenRun(() -> this.acceptAck(seq))).thenApply(unused -> this);
    }

    private void checkCompleted() {
        CompletableFuture<Long> completed = this.completeFuture.get();
        if (completed == null) {
            return;
        }
        if (completed.isDone()) {
            throw new TransformationStreamCompletedException();
        }
        if (completed.isCompletedExceptionally()) {
            completed.join();
        }
    }

    public CompletableFuture<Long> complete() {
        this.completeFuture.compareAndSet(null, (CompletableFuture<Long>)new CompletableFuture().thenApply(seq -> {
            this.transformationStream.complete();
            return seq;
        }));
        this.checkComplete();
        return this.completeFuture.get();
    }

    private EventTransformationService.TransformationStream transformationStream() {
        return this.transformationStream;
    }

    private void acceptAck(long ackSeq) {
        this.ackSequence.updateAndGet(current -> Math.max(current, ackSeq));
        this.checkComplete();
    }

    private void checkComplete() {
        CompletableFuture<Long> completed = this.completeFuture.get();
        if (completed == null) {
            return;
        }
        if (this.sequence.get() == this.ackSequence.get()) {
            completed.complete(this.sequence.get());
        }
    }

    private void completedByServer(Throwable error) {
        this.completeFuture.compareAndSet(null, new CompletableFuture());
        this.completeFuture.get().completeExceptionally(error);
    }
}

