/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.driver.internal.cursor;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.neo4j.bolt.connection.BoltProtocolVersion;
import org.neo4j.bolt.connection.message.Message;
import org.neo4j.bolt.connection.message.Messages;
import org.neo4j.bolt.connection.summary.RunSummary;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Logger;
import org.neo4j.driver.Logging;
import org.neo4j.driver.Query;
import org.neo4j.driver.Record;
import org.neo4j.driver.Value;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.Neo4jException;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.DatabaseBookmark;
import org.neo4j.driver.internal.GqlStatusError;
import org.neo4j.driver.internal.InternalRecord;
import org.neo4j.driver.internal.adaptedbolt.DriverBoltConnection;
import org.neo4j.driver.internal.adaptedbolt.DriverResponseHandler;
import org.neo4j.driver.internal.adaptedbolt.summary.DiscardSummary;
import org.neo4j.driver.internal.adaptedbolt.summary.PullSummary;
import org.neo4j.driver.internal.cursor.AbstractRecordStateResponseHandler;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.observation.NoopObservation;
import org.neo4j.driver.internal.observation.Observation;
import org.neo4j.driver.internal.types.InternalTypeSystem;
import org.neo4j.driver.internal.util.ErrorUtil;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.internal.util.MetadataExtractor;
import org.neo4j.driver.summary.GqlStatusObject;
import org.neo4j.driver.summary.ResultSummary;

public class RxResultCursorImpl
extends AbstractRecordStateResponseHandler
implements RxResultCursor,
DriverResponseHandler {
    private static final MetadataExtractor METADATA_EXTRACTOR = new MetadataExtractor("t_last");
    private static final ClientException IGNORED_ERROR = new ClientException(GqlStatusError.UNKNOWN.getStatus(), GqlStatusError.UNKNOWN.getStatusDescription("A message has been ignored during result streaming."), "N/A", "A message has been ignored during result streaming.", GqlStatusError.DIAGNOSTIC_RECORD, null);
    private static final Runnable NOOP_RUNNABLE = () -> {};
    private static final BiConsumer<Record, Throwable> NOOP_CONSUMER = (record, throwable) -> {};
    private static final RunSummary EMPTY_RUN_SUMMARY = new RunSummary(){

        public long queryId() {
            return -1L;
        }

        public List<String> keys() {
            return List.of();
        }

        public long resultAvailableAfter() {
            return -1L;
        }

        public Optional<String> databaseName() {
            return Optional.empty();
        }
    };
    private final Logger log;
    private final DriverBoltConnection boltConnection;
    private final Query query;
    private final RunSummary runSummary;
    private final Throwable runError;
    private final Consumer<DatabaseBookmark> bookmarkConsumer;
    private final boolean closeOnSummary;
    private final CompletableFuture<ResultSummary> summaryFuture = new CompletableFuture();
    private final CompletableFuture<Void> consumedFuture = new CompletableFuture();
    private final boolean legacyNotifications;
    private State state = State.READY;
    private boolean discardPending;
    private boolean runErrorExposed;
    private boolean summaryExposed;
    private BiConsumer<Record, Throwable> recordConsumer;
    private long outstandingDemand;
    private PullSummary pullSummary;
    private DiscardSummary discardSummary;
    private Throwable error;
    private Observation recordsObservation;

    public RxResultCursorImpl(DriverBoltConnection boltConnection, Query query, RunSummary runSummary, Throwable runError, Consumer<DatabaseBookmark> bookmarkConsumer, boolean closeOnSummary, Logging logging) {
        this.boltConnection = Objects.requireNonNull(boltConnection);
        this.legacyNotifications = new BoltProtocolVersion(5, 5).compareTo(boltConnection.protocolVersion()) > 0;
        this.query = query;
        this.runSummary = runError == null ? runSummary : EMPTY_RUN_SUMMARY;
        this.runError = runError;
        this.bookmarkConsumer = bookmarkConsumer;
        this.closeOnSummary = closeOnSummary;
        this.log = logging.getLog(this.getClass());
        this.log.trace("[%d] New instance (runError=%s)", this.hashCode(), this.throwableName(runError));
    }

    @Override
    public synchronized Throwable getRunError() {
        this.log.trace("[%d] Run error explicitly retrieved (value=%s)", this.hashCode(), this.throwableName(this.runError));
        this.runErrorExposed = true;
        return this.runError;
    }

    @Override
    public List<String> keys() {
        return this.runSummary.keys();
    }

    @Override
    public CompletionStage<Void> consumed() {
        return this.consumedFuture;
    }

    @Override
    public boolean isDone() {
        return this.summaryFuture.isDone();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void installRecordConsumer(BiConsumer<Record, Throwable> recordConsumer, Observation observation) {
        Objects.requireNonNull(recordConsumer);
        if (this.summaryExposed) {
            throw ErrorUtil.newResultConsumedError();
        }
        Runnable runnable = NOOP_RUNNABLE;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            if (this.recordConsumer == null) {
                this.recordConsumer = this.safeRecordConsumer(recordConsumer);
                this.recordsObservation = Objects.requireNonNull(observation);
                this.log.trace("[%d] Record consumer installed", this.hashCode());
                if (this.runError != null) {
                    this.handleError(this.runError);
                    runnable = this::onComplete;
                }
            } else {
                this.log.warn("[%d] Only one record consumer is supported, this request will be ignored", this.hashCode());
            }
        }
        runnable.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void request(long n) {
        if (n > 0L) {
            Runnable runnable = NOOP_RUNNABLE;
            RxResultCursorImpl rxResultCursorImpl = this;
            synchronized (rxResultCursorImpl) {
                this.updateRecordState(AbstractRecordStateResponseHandler.RecordState.NO_RECORD);
                this.log.trace("[%d] %d records requested in %s state", new Object[]{this.hashCode(), n, this.state});
                switch (this.state.ordinal()) {
                    case 0: {
                        long request = this.appendDemand(n);
                        this.state = State.STREAMING;
                        runnable = () -> this.boltConnection.writeAndFlush((DriverResponseHandler)this, (Message)Messages.pull((long)this.runSummary.queryId(), (long)request), this.recordsObservation).whenComplete((ignored, throwable) -> {
                            if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
                                this.handleError((Throwable)throwable);
                                this.onComplete();
                            }
                        });
                        break;
                    }
                    case 1: {
                        this.appendDemand(n);
                        break;
                    }
                }
            }
            runnable.run();
        } else {
            this.log.warn("[%d] %d records requested, negative amounts are ignored", this.hashCode(), n);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void cancel() {
        Runnable runnable = NOOP_RUNNABLE;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.log.trace("[%d] Cancellation requested in %s state", new Object[]{this.hashCode(), this.state});
            switch (this.state.ordinal()) {
                case 0: {
                    runnable = this.setupDiscardRunnable(this.recordsObservation);
                    break;
                }
                case 1: {
                    this.discardPending = true;
                    break;
                }
            }
        }
        runnable.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletionStage<ResultSummary> summaryAsync(Observation observation) {
        Runnable runnable = NOOP_RUNNABLE;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.log.trace("[%d] Summary requested in %s state", new Object[]{this.hashCode(), this.state});
            if (this.summaryExposed) {
                return this.summaryFuture;
            }
            this.summaryExposed = true;
            switch (this.state.ordinal()) {
                case 2: 
                case 3: 
                case 4: {
                    break;
                }
                case 0: {
                    if (this.runError != null && this.recordConsumer == null) {
                        this.handleError(this.runError);
                        runnable = this::onComplete;
                        break;
                    }
                    runnable = this.setupDiscardRunnable(observation);
                    break;
                }
                case 1: {
                    this.discardPending = true;
                }
            }
        }
        runnable.run();
        return this.summaryFuture;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletionStage<Void> rollback() {
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.log.trace("[%d] Rolling back unpublished result %s state", new Object[]{this.hashCode(), this.state});
            switch (this.state.ordinal()) {
                case 0: {
                    this.state = State.SUCCEEDED;
                    break;
                }
                case 1: 
                case 2: {
                    return this.summaryFuture.thenApply(ignored -> null);
                }
                case 3: 
                case 4: {
                    return CompletableFuture.completedFuture(null);
                }
            }
        }
        final CompletableFuture resetFuture = new CompletableFuture();
        this.boltConnection.writeAndFlush(new DriverResponseHandler(){
            Throwable throwable = null;

            @Override
            public void onError(Throwable throwable) {
                this.throwable = Futures.completionExceptionCause(throwable);
            }

            @Override
            public void onComplete() {
                if (this.throwable != null) {
                    resetFuture.completeExceptionally(this.throwable);
                } else {
                    resetFuture.complete(null);
                }
            }
        }, (Message)Messages.reset(), (Observation)NoopObservation.getInstance()).whenComplete((ignored, throwable) -> {
            if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
                resetFuture.completeExceptionally((Throwable)throwable);
            }
        });
        return ((CompletableFuture)((CompletableFuture)resetFuture.thenCompose(ignored -> this.boltConnection.close())).whenComplete((ignored, throwable) -> this.completeSummaryFuture(null, null))).exceptionally(throwable -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onComplete() {
        Runnable runnable;
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.log.trace("[%d] onComplete", this.hashCode());
            runnable = this.error != null ? this.setupCompletionRunnableWithError(this.error) : (this.pullSummary != null ? this.setupCompletionRunnableWithPullSummary() : (this.discardSummary != null ? this.setupCompletionRunnableWithSummaryMetadata(this.discardSummary.metadata()) : () -> this.log.trace("[%d] onComplete resulted in no action", this.hashCode())));
        }
        runnable.run();
    }

    @Override
    public synchronized void onError(Throwable throwable) {
        this.log.trace("[%d] onError", this.hashCode());
        this.handleError(throwable);
    }

    @Override
    public synchronized void onIgnored() {
        this.log.trace("[%d] onIgnored", this.hashCode());
        this.handleError(IGNORED_ERROR);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void onRecord(List<Value> fields) {
        this.log.trace("[%d] onRecord", this.hashCode());
        RxResultCursorImpl rxResultCursorImpl = this;
        synchronized (rxResultCursorImpl) {
            this.updateRecordState(AbstractRecordStateResponseHandler.RecordState.HAD_RECORD);
            this.decrementDemand();
        }
        InternalRecord record = new InternalRecord(this.runSummary.keys(), fields);
        this.recordConsumer.accept(record, null);
    }

    @Override
    public synchronized void onPullSummary(PullSummary summary) {
        this.log.trace("[%d] onPullSummary", this.hashCode());
        this.pullSummary = summary;
    }

    @Override
    public synchronized void onDiscardSummary(DiscardSummary summary) {
        this.log.trace("[%d] onDiscardSummary", this.hashCode());
        this.discardSummary = summary;
    }

    @Override
    public synchronized CompletionStage<Throwable> discardAllFailureAsync(Observation parentObservation) {
        this.log.trace("[%d] Discard all requested", this.hashCode());
        boolean summaryExposed = this.summaryExposed;
        boolean runErrorExposed = this.runErrorExposed;
        return this.summaryAsync(parentObservation).thenApply(ignored -> null).exceptionally(throwable -> runErrorExposed || summaryExposed ? null : throwable);
    }

    @Override
    public synchronized CompletionStage<Throwable> pullAllFailureAsync(Observation parentObservation) {
        boolean unfinishedState;
        this.log.trace("[%d] Pull all failure requested", this.hashCode());
        switch (this.state.ordinal()) {
            default: {
                throw new IncompatibleClassChangeError();
            }
            case 0: 
            case 1: 
            case 2: {
                boolean bl = true;
                break;
            }
            case 3: 
            case 4: {
                boolean bl = unfinishedState = false;
            }
        }
        if (this.recordConsumer != null && unfinishedState) {
            return CompletableFuture.completedFuture(new TransactionNestingException("You cannot run another query or begin a new transaction in the same session before you've fully consumed the previous run result."));
        }
        return this.discardAllFailureAsync(parentObservation);
    }

    private synchronized long appendDemand(long n) {
        if (n == Long.MAX_VALUE) {
            this.outstandingDemand = -1L;
        } else {
            try {
                this.outstandingDemand = Math.addExact(this.outstandingDemand, n);
            }
            catch (ArithmeticException ex) {
                this.outstandingDemand = -1L;
            }
        }
        this.log.trace("[%d] Appended demand, outstanding is %d", this.hashCode(), this.outstandingDemand);
        return this.outstandingDemand;
    }

    private synchronized long getDemand() {
        this.log.trace("[%d] Get demand, outstanding is %d", this.hashCode(), this.outstandingDemand);
        return this.outstandingDemand;
    }

    private synchronized void decrementDemand() {
        if (this.outstandingDemand > 0L) {
            --this.outstandingDemand;
        }
        this.log.trace("[%d] Decremented demand, outstanding is %d", this.hashCode(), this.outstandingDemand);
    }

    private synchronized Runnable setupDiscardRunnable(Observation observation) {
        this.state = State.DISCARDING;
        return () -> this.boltConnection.writeAndFlush((DriverResponseHandler)this, (Message)Messages.discard((long)this.runSummary.queryId(), (long)-1L), observation).whenComplete((ignored, throwable) -> {
            if ((throwable = Futures.completionExceptionCause(throwable)) != null) {
                this.handleError((Throwable)throwable);
                this.onComplete();
            }
        });
    }

    private synchronized Runnable setupCompletionRunnableWithPullSummary() {
        this.log.trace("[%d] Setting up completion with pull summary (hasMore=%b)", this.hashCode(), this.pullSummary.hasMore());
        Runnable runnable = NOOP_RUNNABLE;
        if (this.pullSummary.hasMore()) {
            this.pullSummary = null;
            if (this.discardPending) {
                this.discardPending = false;
                this.state = State.DISCARDING;
                runnable = () -> this.boltConnection.writeAndFlush((DriverResponseHandler)this, (Message)Messages.discard((long)this.runSummary.queryId(), (long)-1L), (Observation)NoopObservation.getInstance()).whenComplete((ignored, flushThrowable) -> {
                    Throwable error = Futures.completionExceptionCause(flushThrowable);
                    if (error != null) {
                        this.handleError(error);
                        this.onComplete();
                    }
                });
            } else {
                long demand = this.getDemand();
                if (demand != 0L) {
                    this.state = State.STREAMING;
                    runnable = () -> this.boltConnection.writeAndFlush((DriverResponseHandler)this, (Message)Messages.pull((long)this.runSummary.queryId(), (long)(demand > 0L ? demand : -1L)), this.recordsObservation).whenComplete((ignored, flushThrowable) -> {
                        Throwable error = Futures.completionExceptionCause(flushThrowable);
                        if (error != null) {
                            this.handleError(error);
                            this.onComplete();
                        }
                    });
                } else {
                    this.state = State.READY;
                }
            }
        } else {
            runnable = this.setupCompletionRunnableWithSummaryMetadata(this.pullSummary.metadata());
        }
        return runnable;
    }

    private synchronized Runnable setupCompletionRunnableWithSummaryMetadata(Map<String, Value> metadata) {
        this.log.trace("[%d] Setting up completion with summary metadata", this.hashCode());
        Runnable runnable = NOOP_RUNNABLE;
        ResultSummary resultSummary = null;
        try {
            resultSummary = this.resultSummary(metadata, this.generateGqlStatusObject(this.runSummary.keys()));
            this.state = State.SUCCEEDED;
        }
        catch (Throwable summaryThrowable) {
            this.handleError(summaryThrowable);
        }
        if (resultSummary != null) {
            Optional<DatabaseBookmark> bookmarkOpt = RxResultCursorImpl.databaseBookmark(metadata);
            bookmarkOpt.ifPresent(this.bookmarkConsumer);
            Runnable completeRunnable = this.setupSummaryAndRecordCompletionRunnable(resultSummary, null);
            runnable = () -> this.closeBoltConnection(completeRunnable);
        } else {
            runnable = this::onComplete;
        }
        return runnable;
    }

    private ResultSummary resultSummary(Map<String, Value> metadata, GqlStatusObject gqlStatusObject) {
        return METADATA_EXTRACTOR.extractSummary(this.query, this.boltConnection, this.runSummary.resultAvailableAfter(), metadata, this.legacyNotifications, gqlStatusObject);
    }

    private static Optional<DatabaseBookmark> databaseBookmark(Map<String, Value> metadata) {
        String bookmarkStr;
        DatabaseBookmark databaseBookmark = null;
        Value bookmarkValue = metadata.get("bookmark");
        if (bookmarkValue != null && !bookmarkValue.isNull() && bookmarkValue.hasType(InternalTypeSystem.TYPE_SYSTEM.STRING()) && !(bookmarkStr = bookmarkValue.asString()).isEmpty()) {
            databaseBookmark = new DatabaseBookmark(null, Bookmark.from(bookmarkStr));
        }
        return Optional.ofNullable(databaseBookmark);
    }

    private synchronized Runnable setupCompletionRunnableWithError(Throwable throwable) {
        this.log.trace("[%d] Setting up completion with error %s", this.hashCode(), this.throwableName(throwable));
        ResultSummary summary = null;
        try {
            summary = this.resultSummary(Collections.emptyMap(), null);
        }
        catch (Throwable summaryThrowable) {
            this.log.error(String.format("[%d] Failed to parse summary", this.hashCode()), summaryThrowable);
        }
        Runnable completeRunnable = this.setupSummaryAndRecordCompletionRunnable(summary, throwable);
        return () -> this.closeBoltConnection(completeRunnable);
    }

    private void closeBoltConnection(Runnable runnable) {
        CompletionStage<Object> closeStage = CompletableFuture.completedStage(null);
        if (this.closeOnSummary) {
            closeStage = closeStage.thenCompose(ignored -> this.boltConnection.close());
        }
        closeStage.whenComplete((ignored, closeThrowable) -> {
            if (this.log.isTraceEnabled() && closeThrowable != null) {
                this.log.error(String.format("[%d] Failed to close connection", this.hashCode()), Futures.completionExceptionCause(closeThrowable));
            }
            runnable.run();
        });
    }

    private synchronized void handleError(Throwable throwable) {
        if (this.log.isTraceEnabled()) {
            this.log.error(String.format("[%d] handleError", this.hashCode()), throwable);
        }
        this.state = State.FAILED;
        throwable = Futures.completionExceptionCause(throwable);
        if (this.error == null) {
            this.error = throwable;
        } else {
            if (throwable == IGNORED_ERROR) {
                return;
            }
            if (this.error == IGNORED_ERROR || this.error instanceof Neo4jException && !(throwable instanceof Neo4jException)) {
                this.error = throwable;
            }
        }
    }

    private synchronized Runnable setupSummaryAndRecordCompletionRunnable(ResultSummary summary, Throwable throwable) {
        BiConsumer<Record, Throwable> recordConsumerRef = this.recordConsumer;
        this.recordConsumer = NOOP_CONSUMER;
        return () -> {
            if (throwable != null) {
                if (recordConsumerRef != null && recordConsumerRef != NOOP_CONSUMER) {
                    this.completeSummaryFuture(summary, null);
                    recordConsumerRef.accept(null, throwable);
                } else {
                    this.completeSummaryFuture(null, throwable);
                }
            } else {
                this.completeSummaryFuture(summary, null);
                if (recordConsumerRef != null) {
                    recordConsumerRef.accept(null, null);
                }
            }
        };
    }

    private void completeSummaryFuture(ResultSummary summary, Throwable throwable) {
        throwable = Futures.completionExceptionCause(throwable);
        this.log.trace("[%d] Completing summary future (summary=%s, throwable=%s)", this.hashCode(), this.hash(summary), this.throwableName(throwable));
        if (throwable != null) {
            this.consumedFuture.completeExceptionally(throwable);
            this.summaryFuture.completeExceptionally(throwable);
        } else {
            this.consumedFuture.complete(null);
            this.summaryFuture.complete(summary);
        }
    }

    private BiConsumer<Record, Throwable> safeRecordConsumer(BiConsumer<Record, Throwable> recordConsumer) {
        return (record, throwable) -> {
            try {
                recordConsumer.accept((Record)record, (Throwable)throwable);
                this.log.trace("[%d] Record consumer notified with (record=%s, throwable=%s)", this.hashCode(), this.hash(record), this.throwableName((Throwable)throwable));
            }
            catch (Throwable unexpectedThrowable) {
                this.log.error(String.format("[%d] Record consumer threw an error when notified with (record=%s, throwable=%s), this will be ignored", this.hashCode(), this.hash(record), this.throwableName((Throwable)throwable)), unexpectedThrowable);
            }
        };
    }

    private String hash(Object object) {
        return object == null ? "null" : String.valueOf(object.hashCode());
    }

    private String throwableName(Throwable throwable) {
        return throwable == null ? "null" : throwable.getClass().getCanonicalName();
    }

    private static enum State {
        READY,
        STREAMING,
        DISCARDING,
        FAILED,
        SUCCEEDED;

    }
}

