/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.mapred.tether;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import org.apache.avro.ipc.HttpServer;
import org.apache.avro.ipc.Responder;
import org.apache.avro.ipc.SaslSocketServer;
import org.apache.avro.ipc.Server;
import org.apache.avro.ipc.specific.SpecificResponder;
import org.apache.avro.mapred.tether.InputProtocol;
import org.apache.avro.mapred.tether.TaskType;
import org.apache.avro.mapred.tether.TetherTask;
import org.apache.avro.mapred.tether.TetheredProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TetherTaskRunner
implements InputProtocol {
    static final Logger LOG = LoggerFactory.getLogger(TetherTaskRunner.class);
    private Server inputServer;
    private TetherTask task;
    private TetheredProcess.Protocol proto;

    public TetherTaskRunner(TetherTask task) throws IOException {
        this.task = task;
        String protocol = System.getenv("AVRO_TETHER_PROTOCOL");
        if (protocol == null) {
            throw new RuntimeException("AVRO_TETHER_PROTOCOL env var is null");
        }
        if ((protocol = protocol.trim().toLowerCase()).equals("http")) {
            LOG.info("Use HTTP protocol");
            this.proto = TetheredProcess.Protocol.HTTP;
        } else if (protocol.equals("sasl")) {
            LOG.info("Use SASL protocol");
            this.proto = TetheredProcess.Protocol.SASL;
        } else {
            throw new RuntimeException("AVRO_TETHER_PROTOCOL=" + protocol + " but this protocol is unsupported");
        }
        InetSocketAddress iaddress = new InetSocketAddress(0);
        switch (this.proto) {
            case SASL: {
                this.inputServer = new SaslSocketServer((Responder)new SpecificResponder(InputProtocol.class, (Object)this), (SocketAddress)iaddress);
                LOG.info("Started SaslSocketServer on port:" + iaddress.getPort());
                break;
            }
            case HTTP: {
                this.inputServer = new HttpServer((Responder)new SpecificResponder(InputProtocol.class, (Object)this), iaddress.getPort());
                LOG.info("Started HttpServer on port:" + iaddress.getPort());
            }
        }
        this.inputServer.start();
        task.open(this.inputServer.getPort());
    }

    public void configure(TaskType taskType, String inSchema, String outSchema) {
        LOG.info("got configure");
        this.task.configure(taskType, inSchema, outSchema);
    }

    public synchronized void input(ByteBuffer data, long count) {
        this.task.input(data, count);
    }

    public void partitions(int partitions) {
        this.task.partitions(partitions);
    }

    public void abort() {
        LOG.info("got abort");
        this.close();
    }

    public synchronized void complete() {
        LOG.info("got input complete");
        this.task.complete();
    }

    public void join() throws InterruptedException {
        LOG.info("TetherTaskRunner: Start join.");
        this.inputServer.join();
        LOG.info("TetherTaskRunner: Finish join.");
    }

    private void close() {
        LOG.info("Closing the task");
        this.task.close();
        LOG.info("Finished closing the task.");
        if (this.inputServer != null) {
            this.inputServer.close();
        }
    }
}

