/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.data;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.drill.exec.exception.FragmentSetupException;
import org.apache.drill.exec.proto.BitData;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.rpc.RequestHandler;
import org.apache.drill.exec.rpc.ResponseSender;
import org.apache.drill.exec.rpc.RpcBus;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.control.WorkEventBus;
import org.apache.drill.exec.rpc.data.AckSender;
import org.apache.drill.exec.rpc.data.DataServerConnection;
import org.apache.drill.exec.rpc.data.IncomingDataBatch;
import org.apache.drill.exec.work.WorkManager;
import org.apache.drill.exec.work.filter.RuntimeFilterWritable;
import org.apache.drill.exec.work.fragment.FragmentManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class DataServerRequestHandler
implements RequestHandler<DataServerConnection> {
    private static final Logger logger = LoggerFactory.getLogger(DataServerRequestHandler.class);
    private final WorkEventBus workBus;
    private final WorkManager.WorkerBee bee;

    public DataServerRequestHandler(WorkEventBus workBus, WorkManager.WorkerBee bee) {
        this.workBus = workBus;
        this.bee = bee;
    }

    @Override
    public void handle(DataServerConnection connection, int rpcType, ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException {
        switch (rpcType) {
            case 5: {
                this.handleRuntimeFilterRequest(pBody, dBody, sender);
                break;
            }
            case 3: {
                this.handleRecordBatchRequest(pBody, dBody, sender);
                break;
            }
            default: {
                throw new RpcException("Not yet supported.");
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRecordBatchRequest(ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException {
        BitData.FragmentRecordBatch fragmentBatch = RpcBus.get(pBody, BitData.FragmentRecordBatch.PARSER);
        AckSender ack = new AckSender(sender);
        ack.increment();
        try {
            IncomingDataBatch batch = new IncomingDataBatch(fragmentBatch, (DrillBuf)dBody, ack);
            int targetCount = fragmentBatch.getReceivingMinorFragmentIdCount();
            int firstOwner = ThreadLocalRandom.current().nextInt(targetCount);
            this.submit(batch, firstOwner, targetCount);
            this.submit(batch, 0, firstOwner);
        }
        catch (IOException | FragmentSetupException e) {
            logger.error("Failure while getting fragment manager. {}", (Object)QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(), fragmentBatch.getReceivingMajorFragmentId(), fragmentBatch.getReceivingMinorFragmentIdList()), (Object)e);
            ack.clear();
            ack.sendFail();
        }
        finally {
            ack.sendOk();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void handleRuntimeFilterRequest(ByteBuf pBody, ByteBuf dBody, ResponseSender sender) throws RpcException {
        BitData.RuntimeFilterBDef runtimeFilterBDef = RpcBus.get(pBody, BitData.RuntimeFilterBDef.PARSER);
        if (dBody == null) {
            return;
        }
        List<Integer> bfSizeInBytes = runtimeFilterBDef.getBloomFilterSizeInBytesList();
        int boomFilterNum = bfSizeInBytes.size();
        DrillBuf data = (DrillBuf)dBody;
        DrillBuf[] bufs = new DrillBuf[boomFilterNum];
        int index = 0;
        for (int i = 0; i < boomFilterNum; ++i) {
            int length = bfSizeInBytes.get(i);
            bufs[i] = data.slice(index, length);
            index += length;
        }
        RuntimeFilterWritable runtimeFilterWritable = new RuntimeFilterWritable(runtimeFilterBDef, bufs);
        AckSender ackSender = new AckSender(sender);
        ackSender.increment();
        try {
            this.bee.receiveRuntimeFilter(runtimeFilterWritable);
        }
        catch (Exception e) {
            logger.error("error to solve received runtime filter, {}", (Object)QueryIdHelper.getQueryId(runtimeFilterBDef.getQueryId()), (Object)e);
            ackSender.clear();
            ackSender.sendFail();
        }
        finally {
            ackSender.sendOk();
        }
    }

    private void submit(IncomingDataBatch batch, int minorStart, int minorStopExclusive) throws FragmentSetupException, IOException {
        for (int minor = minorStart; minor < minorStopExclusive; ++minor) {
            boolean canRun;
            FragmentManager manager = this.workBus.getFragmentManager(DataServerRequestHandler.getHandle(batch.getHeader(), minor));
            if (manager == null || !(canRun = manager.handle(batch))) continue;
            this.bee.startFragmentPendingRemote(manager);
        }
    }

    private static ExecProtos.FragmentHandle getHandle(BitData.FragmentRecordBatch batch, int index) {
        return ExecProtos.FragmentHandle.newBuilder().setQueryId(batch.getQueryId()).setMajorFragmentId(batch.getReceivingMajorFragmentId()).setMinorFragmentId(batch.getReceivingMinorFragmentId(index)).build();
    }
}

