/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.api.client.impl;

import io.confluent.ksql.api.client.ColumnType;
import io.confluent.ksql.api.client.Row;
import io.confluent.ksql.api.client.StreamedQueryResult;
import io.confluent.ksql.api.client.impl.PollableSubscriber;
import io.confluent.ksql.reactive.BufferedPublisher;
import io.vertx.core.Context;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import java.time.Duration;
import java.util.List;
import org.reactivestreams.Subscriber;

public class StreamedQueryResultImpl
extends BufferedPublisher<Row>
implements StreamedQueryResult {
    private static final Logger log = LoggerFactory.getLogger(StreamedQueryResultImpl.class);
    private final String queryId;
    private final List<String> columnNames;
    private final List<ColumnType> columnTypes;
    private final PollableSubscriber pollableSubscriber;
    private volatile boolean polling;
    private boolean subscribing;

    StreamedQueryResultImpl(Context context, String queryId, List<String> columnNames, List<ColumnType> columnTypes) {
        super(context);
        this.queryId = queryId;
        this.columnNames = columnNames;
        this.columnTypes = columnTypes;
        this.pollableSubscriber = new PollableSubscriber(this.ctx, this::handleErrorWhilePolling);
    }

    @Override
    public List<String> columnNames() {
        return this.columnNames;
    }

    @Override
    public List<ColumnType> columnTypes() {
        return this.columnTypes;
    }

    @Override
    public String queryID() {
        return this.queryId;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void subscribe(Subscriber<? super Row> subscriber) {
        if (this.polling) {
            throw new IllegalStateException("Cannot set subscriber if polling");
        }
        StreamedQueryResultImpl streamedQueryResultImpl = this;
        synchronized (streamedQueryResultImpl) {
            this.subscribing = true;
            super.subscribe(subscriber);
        }
    }

    @Override
    public Row poll() {
        return this.poll(Duration.ZERO);
    }

    @Override
    public Row poll(Duration timeout) {
        return this.poll(timeout, null);
    }

    private synchronized Row poll(Duration timeout, Runnable callback) {
        if (this.subscribing) {
            throw new IllegalStateException("Cannot poll if subscriber has been set");
        }
        if (this.isFailed()) {
            throw new IllegalStateException("Cannot poll on StreamedQueryResult that has failed. Check logs for failure reason.");
        }
        if (callback != null) {
            callback.run();
        }
        if (!this.polling) {
            this.subscribe((Subscriber<? super Row>)this.pollableSubscriber);
            this.subscribing = false;
            this.polling = true;
        }
        return this.pollableSubscriber.poll(timeout);
    }

    @Override
    public boolean isComplete() {
        return super.isComplete();
    }

    @Override
    public boolean isFailed() {
        return super.isFailed();
    }

    public void handleError(Exception e) {
        this.sendError(e);
    }

    private void handleErrorWhilePolling(Throwable t) {
        log.error((Object)("Unexpected error while polling: " + t));
    }

    public static Row pollWithCallback(StreamedQueryResult queryResult, Runnable callback) {
        if (!(queryResult instanceof StreamedQueryResultImpl)) {
            throw new IllegalArgumentException("Can only poll with callback on StreamedQueryResultImpl");
        }
        StreamedQueryResultImpl streamedQueryResult = (StreamedQueryResultImpl)queryResult;
        return streamedQueryResult.poll(Duration.ZERO, callback);
    }
}

