/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.fn.data;

import java.io.IOException;
import java.io.OutputStream;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.data.BeamFnDataBufferingOutboundObserver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.grpc.v1p26p0.io.grpc.stub.StreamObserver;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BeamFnDataSizeBasedBufferingOutboundObserver<@UnknownKeyFor T>
implements BeamFnDataBufferingOutboundObserver<T> {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(BeamFnDataSizeBasedBufferingOutboundObserver.class);
    private @UnknownKeyFor @NonNull @Initialized long byteCounter;
    private @UnknownKeyFor @NonNull @Initialized long counter;
    private @UnknownKeyFor @NonNull @Initialized boolean closed;
    private final @UnknownKeyFor @NonNull @Initialized int sizeLimit;
    private final @UnknownKeyFor @NonNull @Initialized Coder<T> coder;
    private final @UnknownKeyFor @NonNull @Initialized LogicalEndpoint outputLocation;
    private final @UnknownKeyFor @NonNull @Initialized StreamObserver<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements> outboundObserver;
    private final // Could not load outer class - annotation placement on inner may be incorrect
    @UnknownKeyFor @NonNull @Initialized ByteString.Output bufferedElements;

    BeamFnDataSizeBasedBufferingOutboundObserver(@UnknownKeyFor @NonNull @Initialized int sizeLimit, @UnknownKeyFor @NonNull @Initialized LogicalEndpoint outputLocation, @UnknownKeyFor @NonNull @Initialized Coder<T> coder, @UnknownKeyFor @NonNull @Initialized StreamObserver<// Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements> outboundObserver) {
        this.sizeLimit = sizeLimit;
        this.outputLocation = outputLocation;
        this.coder = coder;
        this.outboundObserver = outboundObserver;
        this.bufferedElements = ByteString.newOutput();
        this.closed = false;
    }

    @Override
    public void close() throws @UnknownKeyFor @NonNull @Initialized Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        BeamFnApi.Elements.Builder elements = this.convertBufferForTransmission();
        if (this.outputLocation.isTimer()) {
            elements.addTimersBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setTimerFamilyId(this.outputLocation.getTimerFamilyId()).setIsLast(true);
        } else {
            elements.addDataBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setIsLast(true);
        }
        LOG.debug("Closing stream for instruction {} and transform {} having transmitted {} values {} bytes", new Object[]{this.outputLocation.getInstructionId(), this.outputLocation.getTransformId(), this.counter, this.byteCounter});
        this.outboundObserver.onNext((Object)elements.build());
    }

    @Override
    public void flush() throws @UnknownKeyFor @NonNull @Initialized IOException {
        if (this.bufferedElements.size() > 0) {
            this.outboundObserver.onNext((Object)this.convertBufferForTransmission().build());
        }
    }

    @Override
    public void accept(T t) throws @UnknownKeyFor @NonNull @Initialized IOException {
        if (this.closed) {
            throw new IllegalStateException("Already closed.");
        }
        this.coder.encode(t, (OutputStream)this.bufferedElements);
        ++this.counter;
        if (this.bufferedElements.size() >= this.sizeLimit) {
            this.flush();
        }
    }

    private // Could not load outer class - annotation placement on inner may be incorrect
     @UnknownKeyFor @NonNull @Initialized BeamFnApi.Elements.Builder convertBufferForTransmission() {
        BeamFnApi.Elements.Builder elements = BeamFnApi.Elements.newBuilder();
        if (this.bufferedElements.size() == 0) {
            return elements;
        }
        if (this.outputLocation.isTimer()) {
            elements.addTimersBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setTimerFamilyId(this.outputLocation.getTimerFamilyId()).setTimers(this.bufferedElements.toByteString());
        } else {
            elements.addDataBuilder().setInstructionId(this.outputLocation.getInstructionId()).setTransformId(this.outputLocation.getTransformId()).setData(this.bufferedElements.toByteString());
        }
        this.byteCounter += (long)this.bufferedElements.size();
        this.bufferedElements.reset();
        return elements;
    }
}

