/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.api.writer;

import java.io.IOException;
import java.util.Optional;
import java.util.Random;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
import org.apache.flink.runtime.io.network.api.writer.ChannelSelector;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.RoundRobinChannelSelector;
import org.apache.flink.runtime.io.network.buffer.BufferBuilder;
import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.XORShiftRandom;

public class RecordWriter<T extends IOReadableWritable> {
    protected final ResultPartitionWriter targetPartition;
    private final ChannelSelector<T> channelSelector;
    private final int numChannels;
    private final RecordSerializer<T>[] serializers;
    private final Optional<BufferBuilder>[] bufferBuilders;
    private final Random rng = new XORShiftRandom();
    private final boolean flushAlways;
    private Counter numBytesOut = new SimpleCounter();
    private Counter numBuffersOut = new SimpleCounter();

    public RecordWriter(ResultPartitionWriter writer) {
        this(writer, new RoundRobinChannelSelector());
    }

    public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector) {
        this(writer, channelSelector, false);
    }

    public RecordWriter(ResultPartitionWriter writer, ChannelSelector<T> channelSelector, boolean flushAlways) {
        this.flushAlways = flushAlways;
        this.targetPartition = writer;
        this.channelSelector = channelSelector;
        this.numChannels = writer.getNumberOfSubpartitions();
        this.serializers = new SpanningRecordSerializer[this.numChannels];
        this.bufferBuilders = new Optional[this.numChannels];
        for (int i = 0; i < this.numChannels; ++i) {
            this.serializers[i] = new SpanningRecordSerializer();
            this.bufferBuilders[i] = Optional.empty();
        }
    }

    public void emit(T record) throws IOException, InterruptedException {
        for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
            this.sendToTarget(record, targetChannel);
        }
    }

    public void broadcastEmit(T record) throws IOException, InterruptedException {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            this.sendToTarget(record, targetChannel);
        }
    }

    public void randomEmit(T record) throws IOException, InterruptedException {
        this.sendToTarget(record, this.rng.nextInt(this.numChannels));
    }

    private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException {
        RecordSerializer<T> serializer = this.serializers[targetChannel];
        RecordSerializer.SerializationResult result = serializer.addRecord(record);
        while (!(!result.isFullBuffer() || this.tryFinishCurrentBufferBuilder(targetChannel, serializer) && result.isFullRecord())) {
            BufferBuilder bufferBuilder = this.requestNewBufferBuilder(targetChannel);
            result = serializer.continueWritingWithNextBufferBuilder(bufferBuilder);
        }
        Preconditions.checkState((!serializer.hasSerializedData() ? 1 : 0) != 0, (Object)"All data should be written at once");
        if (this.flushAlways) {
            this.targetPartition.flush(targetChannel);
        }
    }

    public void broadcastEvent(AbstractEvent event) throws IOException {
        try (BufferConsumer eventBufferConsumer = EventSerializer.toBufferConsumer(event);){
            for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
                RecordSerializer<T> serializer = this.serializers[targetChannel];
                this.tryFinishCurrentBufferBuilder(targetChannel, serializer);
                this.targetPartition.addBufferConsumer(eventBufferConsumer.copy(), targetChannel);
            }
            if (this.flushAlways) {
                this.flushAll();
            }
        }
    }

    public void flushAll() {
        this.targetPartition.flushAll();
    }

    public void clearBuffers() {
        for (int targetChannel = 0; targetChannel < this.numChannels; ++targetChannel) {
            RecordSerializer<T> serializer = this.serializers[targetChannel];
            this.closeBufferBuilder(targetChannel);
            serializer.clear();
        }
    }

    public void setMetricGroup(TaskIOMetricGroup metrics) {
        this.numBytesOut = metrics.getNumBytesOutCounter();
        this.numBuffersOut = metrics.getNumBuffersOutCounter();
    }

    private boolean tryFinishCurrentBufferBuilder(int targetChannel, RecordSerializer<T> serializer) {
        if (!this.bufferBuilders[targetChannel].isPresent()) {
            return false;
        }
        BufferBuilder bufferBuilder = this.bufferBuilders[targetChannel].get();
        this.bufferBuilders[targetChannel] = Optional.empty();
        this.numBytesOut.inc((long)bufferBuilder.finish());
        this.numBuffersOut.inc();
        serializer.clear();
        return true;
    }

    private BufferBuilder requestNewBufferBuilder(int targetChannel) throws IOException, InterruptedException {
        Preconditions.checkState((!this.bufferBuilders[targetChannel].isPresent() ? 1 : 0) != 0);
        BufferBuilder bufferBuilder = this.targetPartition.getBufferProvider().requestBufferBuilderBlocking();
        this.bufferBuilders[targetChannel] = Optional.of(bufferBuilder);
        this.targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), targetChannel);
        return bufferBuilder;
    }

    private void closeBufferBuilder(int targetChannel) {
        if (this.bufferBuilders[targetChannel].isPresent()) {
            this.bufferBuilders[targetChannel].get().finish();
            this.bufferBuilders[targetChannel] = Optional.empty();
        }
    }
}

