/*
 * Decompiled with CFR 0.152.
 */
package io.r2dbc.h2.codecs;

import io.r2dbc.spi.Blob;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import org.h2.value.Value;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.SynchronousSink;
import reactor.core.scheduler.Schedulers;

class ValueLobBlob
implements Blob {
    private final Value lobDb;
    private SynchronousSink<ByteBuffer> valueLobHandlerSink;

    ValueLobBlob(Value value) {
        this.lobDb = value;
    }

    public Flux<ByteBuffer> stream() {
        return Flux.generate(() -> ((Value)this.lobDb).getInputStream(), (source, sink) -> {
            this.valueLobHandlerSink = sink;
            try {
                byte[] data = new byte[1024];
                int readBytes = source.read(data);
                if (readBytes == -1) {
                    sink.complete();
                    return source;
                }
                sink.next((Object)this.wrap(data, readBytes));
            }
            catch (IOException e) {
                sink.error((Throwable)e);
            }
            return source;
        }, source -> {
            try {
                source.close();
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).subscribeOn(Schedulers.boundedElastic()).cancelOn(Schedulers.boundedElastic());
    }

    public Publisher<Void> discard() {
        return Mono.fromRunnable(() -> this.valueLobHandlerSink.complete()).then();
    }

    ByteBuffer wrap(byte[] data, int readBytes) {
        if (readBytes < data.length) {
            return ByteBuffer.wrap(Arrays.copyOfRange(data, 0, readBytes));
        }
        return ByteBuffer.wrap(data, 0, readBytes);
    }
}

