/*
 * Decompiled with CFR 0.152.
 */
package org.apache.drill.exec.physical.impl;

import java.util.List;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.AccountingDataTunnel;
import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.ops.RootFragmentContext;
import org.apache.drill.exec.physical.config.SingleSender;
import org.apache.drill.exec.physical.impl.BaseRootExec;
import org.apache.drill.exec.physical.impl.RootCreator;
import org.apache.drill.exec.physical.impl.RootExec;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.FragmentWritableBatch;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.testing.ControlsInjector;
import org.apache.drill.exec.testing.ControlsInjectorFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleSenderCreator
implements RootCreator<SingleSender> {
    @Override
    public RootExec getRoot(ExecutorFragmentContext context, SingleSender config, List<RecordBatch> children) {
        assert (children != null && children.size() == 1);
        return new SingleSenderRootExec((RootFragmentContext)context, children.iterator().next(), config);
    }

    public static class SingleSenderRootExec
    extends BaseRootExec {
        private static final Logger logger = LoggerFactory.getLogger(SingleSenderRootExec.class);
        private static final ControlsInjector injector = ControlsInjectorFactory.getInjector(SingleSenderRootExec.class);
        private final ExecProtos.FragmentHandle oppositeHandle;
        private final RecordBatch incoming;
        private AccountingDataTunnel tunnel;
        private final ExecProtos.FragmentHandle handle;
        private final int recMajor;
        private volatile boolean done = false;

        public SingleSenderRootExec(RootFragmentContext context, RecordBatch batch, SingleSender config) throws OutOfMemoryException {
            super(context, context.newOperatorContext(config, null), config);
            this.incoming = batch;
            assert (this.incoming != null);
            this.handle = context.getHandle();
            this.recMajor = config.getOppositeMajorFragmentId();
            this.tunnel = context.getDataTunnel(config.getDestination());
            this.oppositeHandle = this.handle.toBuilder().setMajorFragmentId(config.getOppositeMajorFragmentId()).setMinorFragmentId(config.getOppositeMinorFragmentId()).build();
            this.tunnel = context.getDataTunnel(config.getDestination());
            this.tunnel.setTestInjectionControls(injector, context.getExecutionControls(), logger);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean innerNext() {
            RecordBatch.IterOutcome out;
            if (!this.done) {
                out = this.next(this.incoming);
            } else {
                this.incoming.cancel();
                out = RecordBatch.IterOutcome.NONE;
            }
            switch (out) {
                case NONE: {
                    BatchSchema sendSchema = this.incoming.getSchema() == null ? BatchSchema.newBuilder().build() : this.incoming.getSchema();
                    FragmentWritableBatch b2 = FragmentWritableBatch.getEmptyLastWithSchema(this.handle.getQueryId(), this.handle.getMajorFragmentId(), this.handle.getMinorFragmentId(), this.recMajor, this.oppositeHandle.getMinorFragmentId(), sendSchema);
                    this.stats.startWait();
                    try {
                        this.tunnel.sendRecordBatch(b2);
                    }
                    finally {
                        this.stats.stopWait();
                    }
                    return false;
                }
                case OK_NEW_SCHEMA: 
                case OK: {
                    FragmentWritableBatch batch = new FragmentWritableBatch(false, this.handle.getQueryId(), this.handle.getMajorFragmentId(), this.handle.getMinorFragmentId(), this.recMajor, this.oppositeHandle.getMinorFragmentId(), this.incoming.getWritableBatch().transfer(this.oContext.getAllocator()));
                    this.updateStats(batch);
                    this.stats.startWait();
                    try {
                        this.tunnel.sendRecordBatch(batch);
                    }
                    finally {
                        this.stats.stopWait();
                    }
                    return true;
                }
            }
            throw new IllegalStateException();
        }

        public void updateStats(FragmentWritableBatch writableBatch) {
            this.stats.addLongStat(Metric.BYTES_SENT, writableBatch.getByteCount());
        }

        @Override
        public void receivingFragmentFinished(ExecProtos.FragmentHandle handle) {
            this.done = true;
        }

        public static enum Metric implements MetricDef
        {
            BYTES_SENT;


            @Override
            public int metricId() {
                return this.ordinal();
            }
        }
    }
}

