/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.rpc.control;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.proto.BitControl;
import org.apache.drill.exec.proto.CoordinationProtos;
import org.apache.drill.exec.proto.GeneralRPCProtos;
import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.rpc.BasicClient;
import org.apache.drill.exec.rpc.Response;
import org.apache.drill.exec.rpc.RpcCommand;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.exec.rpc.RpcOutcomeListener;
import org.apache.drill.exec.rpc.control.ControlConnection;
import org.apache.drill.exec.rpc.control.ControlConnectionConfig;
import org.apache.drill.exec.rpc.control.ControlConnectionManager;
import org.apache.drill.exec.rpc.control.ControlTunnel;
import org.apache.drill.exec.work.batch.ControlMessageHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LocalControlConnectionManager
extends ControlConnectionManager {
    private static final Logger logger = LoggerFactory.getLogger(LocalControlConnectionManager.class);
    private final ControlConnectionConfig config;

    public LocalControlConnectionManager(ControlConnectionConfig config, CoordinationProtos.DrillbitEndpoint localEndpoint) {
        super(localEndpoint, localEndpoint);
        this.config = config;
    }

    @Override
    protected BasicClient<?, ControlConnection, BitControl.BitControlHandshake, ?> getNewClient() {
        throw new UnsupportedOperationException("LocalControlConnectionManager doesn't support creating a control client");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void runCommand(RpcCommand cmd) {
        int rpcType = cmd.getRpcType().getNumber();
        ControlMessageHandler messageHandler = this.config.getMessageHandler();
        switch (rpcType) {
            case 6: {
                ControlTunnel.SignalFragment signalFragment = (ControlTunnel.SignalFragment)cmd;
                RpcOutcomeListener outcomeListener = signalFragment.getOutcomeListener();
                GeneralRPCProtos.Ack ackResponse = messageHandler.cancelFragment(signalFragment.getMessage());
                outcomeListener.success(ackResponse, null);
                break;
            }
            case 17: {
                DrillBuf reqDrillBuff;
                RpcOutcomeListener outcomeListener;
                ByteBuf[] dataBodies;
                if (cmd instanceof ControlTunnel.CustomMessageSender) {
                    dataBodies = ((ControlTunnel.CustomMessageSender)cmd).getDataBodies();
                    outcomeListener = ((ControlTunnel.CustomMessageSender)cmd).getOutcomeListener();
                } else if (cmd instanceof ControlTunnel.SyncCustomMessageSender) {
                    dataBodies = ((ControlTunnel.SyncCustomMessageSender)cmd).getDataBodies();
                    outcomeListener = ((ControlTunnel.SyncCustomMessageSender)cmd).getOutcomeListener();
                } else {
                    throw new UnsupportedOperationException("Unknown Custom Type control message received");
                }
                try {
                    reqDrillBuff = this.convertToByteBuf(dataBodies);
                }
                catch (Exception ex) {
                    outcomeListener.failed(new RpcException("Failed to allocate memory while sending request in LocalControlConnectionManager#convertToByteBuff", ex));
                    return;
                }
                finally {
                    this.releaseByteBuf(dataBodies);
                }
                try {
                    DrillBuf responseBuffer;
                    BitControl.CustomMessage message = (BitControl.CustomMessage)cmd.getMessage();
                    Response response = messageHandler.getHandlerRegistry().handle(message, reqDrillBuff);
                    try {
                        responseBuffer = this.convertToByteBuf(response.dBodies);
                    }
                    catch (Exception ex) {
                        outcomeListener.failed(new RpcException("Failed to allocate memory while sending response in LocalControlConnectionManager#convertToByteBuff", ex));
                        this.releaseByteBuf(reqDrillBuff);
                        return;
                    }
                    finally {
                        this.releaseByteBuf(response.dBodies);
                    }
                    outcomeListener.success((BitControl.CustomMessage)response.pBody, responseBuffer);
                    break;
                }
                catch (RpcException ex) {
                    cmd.getOutcomeListener().failed(ex);
                    break;
                }
                finally {
                    this.releaseByteBuf(reqDrillBuff);
                }
            }
            case 7: {
                ControlTunnel.ReceiverFinished receiverFinished = (ControlTunnel.ReceiverFinished)cmd;
                RpcOutcomeListener outcomeListener = receiverFinished.getOutcomeListener();
                GeneralRPCProtos.Ack ackResponse = messageHandler.receivingFragmentFinished(receiverFinished.getMessage());
                outcomeListener.success(ackResponse, null);
                break;
            }
            case 8: {
                ControlTunnel.SendFragmentStatus fragmentStatus = (ControlTunnel.SendFragmentStatus)cmd;
                RpcOutcomeListener outcomeListener = fragmentStatus.getOutcomeListener();
                GeneralRPCProtos.Ack ackResponse = messageHandler.requestFragmentStatus(fragmentStatus.getMessage());
                outcomeListener.success(ackResponse, null);
                break;
            }
            case 15: {
                ControlTunnel.CancelQuery cancelQuery = (ControlTunnel.CancelQuery)cmd;
                RpcOutcomeListener outcomeListener = cancelQuery.getOutcomeListener();
                GeneralRPCProtos.Ack ackResponse = messageHandler.requestQueryCancel(cancelQuery.getMessage());
                outcomeListener.success(ackResponse, null);
                break;
            }
            case 3: {
                ControlTunnel.SendFragment sendFragment = (ControlTunnel.SendFragment)cmd;
                RpcOutcomeListener outcomeListener = sendFragment.getOutcomeListener();
                try {
                    GeneralRPCProtos.Ack ackResponse = messageHandler.initializeFragment(sendFragment.getMessage());
                    outcomeListener.success(ackResponse, null);
                }
                catch (RpcException ex) {
                    outcomeListener.failed(ex);
                }
                break;
            }
            case 10: {
                ControlTunnel.RequestProfile requestProfile = (ControlTunnel.RequestProfile)cmd;
                RpcOutcomeListener outcomeListener = requestProfile.getOutcomeListener();
                try {
                    UserBitShared.QueryProfile profile = messageHandler.requestQueryStatus(requestProfile.getMessage());
                    outcomeListener.success(profile, null);
                }
                catch (RpcException ex) {
                    outcomeListener.failed(ex);
                }
                break;
            }
            case 16: {
                ControlTunnel.SignalFragment signalFragment = (ControlTunnel.SignalFragment)cmd;
                RpcOutcomeListener outcomeListener = signalFragment.getOutcomeListener();
                GeneralRPCProtos.Ack ackResponse = messageHandler.resumeFragment(signalFragment.getMessage());
                outcomeListener.success(ackResponse, null);
                break;
            }
            default: {
                RpcException rpcException = new RpcException(String.format("Unsupported control request type %s received on LocalControlConnectionManager", rpcType));
                cmd.getOutcomeListener().failed(rpcException);
            }
        }
    }

    private DrillBuf convertToByteBuf(ByteBuf[] byteBuffArray) throws OutOfMemoryException {
        if (byteBuffArray == null) {
            return null;
        }
        int bytesToCopy = 0;
        for (ByteBuf b : byteBuffArray) {
            int validBytes = b.readableBytes();
            if (0 == validBytes) {
                b.release();
                continue;
            }
            bytesToCopy += validBytes;
        }
        DrillBuf drillBuff = this.config.getAllocator().buffer(bytesToCopy);
        for (ByteBuf b : byteBuffArray) {
            int validBytes = b.readableBytes();
            drillBuff.writeBytes(b, 0, validBytes);
        }
        return drillBuff;
    }

    private void releaseByteBuf(ByteBuf[] byteBuffArray) {
        if (byteBuffArray != null) {
            for (ByteBuf b : byteBuffArray) {
                b.release();
            }
        }
    }

    private void releaseByteBuf(ByteBuf byteBuf) {
        if (byteBuf != null) {
            byteBuf.release();
        }
    }
}

