/*
 * Decompiled with CFR 0.152.
 */
package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.QueryLogger;
import io.asyncer.r2dbc.mysql.client.FluxExchangeable;
import io.asyncer.r2dbc.mysql.message.client.ClientMessage;
import io.asyncer.r2dbc.mysql.message.client.LocalInfileResponse;
import io.asyncer.r2dbc.mysql.message.server.CompleteMessage;
import io.asyncer.r2dbc.mysql.message.server.ErrorMessage;
import io.asyncer.r2dbc.mysql.message.server.LocalInfileRequest;
import io.asyncer.r2dbc.mysql.message.server.ServerMessage;
import java.util.Queue;
import org.jetbrains.annotations.Nullable;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Sinks;
import reactor.core.publisher.SynchronousSink;
import reactor.util.concurrent.Queues;

abstract class BaseFluxExchangeable
extends FluxExchangeable<ServerMessage> {
    protected final Sinks.Many<ClientMessage> requests = Sinks.many().unicast().onBackpressureBuffer((Queue)Queues.one().get());

    BaseFluxExchangeable() {
    }

    public final void subscribe(CoreSubscriber<? super ClientMessage> actual) {
        this.requests.asFlux().subscribe(actual);
        this.tryNextOrComplete(null);
    }

    @Override
    public final void accept(ServerMessage message, SynchronousSink<ServerMessage> sink) {
        if (message instanceof ErrorMessage) {
            sink.next((Object)((ErrorMessage)message).offendedBy(this.offendingSql()));
            sink.complete();
        } else if (message instanceof LocalInfileRequest) {
            LocalInfileRequest request = (LocalInfileRequest)message;
            String path = request.getPath();
            QueryLogger.logLocalInfile(path);
            this.requests.emitNext((Object)new LocalInfileResponse(path, sink), Sinks.EmitFailureHandler.FAIL_FAST);
        } else {
            sink.next((Object)message);
            if (message instanceof CompleteMessage && ((CompleteMessage)message).isDone()) {
                this.tryNextOrComplete(sink);
            }
        }
    }

    protected abstract void tryNextOrComplete(@Nullable SynchronousSink<ServerMessage> var1);

    protected abstract String offendingSql();
}

