/*
 * Decompiled with CFR 0.152.
 */
package org.apache.avro.mapred.tether;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import org.apache.avro.Schema;
import org.apache.avro.io.BinaryDecoder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.ipc.HttpTransceiver;
import org.apache.avro.ipc.SaslSocketTransceiver;
import org.apache.avro.ipc.Transceiver;
import org.apache.avro.ipc.specific.SpecificRequestor;
import org.apache.avro.mapred.tether.OutputProtocol;
import org.apache.avro.mapred.tether.TaskType;
import org.apache.avro.mapred.tether.TetheredProcess;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class TetherTask<IN, MID, OUT> {
    static final Logger LOG = LoggerFactory.getLogger(TetherTask.class);
    private Transceiver clientTransceiver;
    private OutputProtocol outputClient;
    private TaskType taskType;
    private int partitions;
    private DecoderFactory decoderFactory = DecoderFactory.get();
    private BinaryDecoder decoder;
    private SpecificDatumReader<IN> inReader;
    private SpecificDatumReader<MID> midReader;
    private IN inRecord;
    private MID midRecord;
    private MID midRecordSpare;
    private Collector<MID> midCollector;
    private Collector<OUT> outCollector;
    private TetheredProcess.Protocol proto;

    void open(int inputPort) throws IOException {
        String clientPortString = System.getenv("AVRO_TETHER_OUTPUT_PORT");
        String protocol = System.getenv("AVRO_TETHER_PROTOCOL");
        if (clientPortString == null) {
            throw new RuntimeException("AVRO_TETHER_OUTPUT_PORT env var is null");
        }
        int clientPort = Integer.parseInt(clientPortString);
        if (protocol == null) {
            throw new RuntimeException("AVRO_TETHER_PROTOCOL env var is null");
        }
        if ((protocol = protocol.trim().toLowerCase()).equals("http")) {
            this.proto = TetheredProcess.Protocol.HTTP;
        } else if (protocol.equals("sasl")) {
            this.proto = TetheredProcess.Protocol.SASL;
        } else {
            throw new RuntimeException("AVROT_TETHER_PROTOCOL=" + protocol + " but this protocol is unsupported");
        }
        switch (this.proto) {
            case SASL: {
                this.clientTransceiver = new SaslSocketTransceiver((SocketAddress)new InetSocketAddress(clientPort));
                this.outputClient = (OutputProtocol)SpecificRequestor.getClient(OutputProtocol.class, (Transceiver)this.clientTransceiver);
                break;
            }
            case HTTP: {
                this.clientTransceiver = new HttpTransceiver(new URL("http://127.0.0.1:" + clientPort));
                this.outputClient = (OutputProtocol)SpecificRequestor.getClient(OutputProtocol.class, (Transceiver)this.clientTransceiver);
            }
        }
        this.outputClient.configure(inputPort);
    }

    void configure(TaskType taskType, CharSequence inSchemaText, CharSequence outSchemaText) {
        this.taskType = taskType;
        try {
            Schema inSchema = Schema.parse((String)inSchemaText.toString());
            Schema outSchema = Schema.parse((String)outSchemaText.toString());
            switch (taskType) {
                case MAP: {
                    this.inReader = new SpecificDatumReader(inSchema);
                    this.midCollector = new Collector(outSchema);
                    break;
                }
                case REDUCE: {
                    this.midReader = new SpecificDatumReader(inSchema);
                    this.outCollector = new Collector(outSchema);
                }
            }
        }
        catch (Throwable e) {
            this.fail(e.toString());
        }
    }

    void partitions(int partitions) {
        this.partitions = partitions;
    }

    public int partitions() {
        return this.partitions;
    }

    void input(ByteBuffer data, long count) {
        try {
            this.decoder = this.decoderFactory.binaryDecoder(data.array(), this.decoder);
            block6: for (long i = 0L; i < count; ++i) {
                switch (this.taskType) {
                    case MAP: {
                        this.inRecord = this.inReader.read(this.inRecord, (Decoder)this.decoder);
                        this.map(this.inRecord, this.midCollector);
                        continue block6;
                    }
                    case REDUCE: {
                        MID prev = this.midRecord;
                        this.midRecord = this.midReader.read(this.midRecordSpare, (Decoder)this.decoder);
                        if (prev != null && !this.midRecord.equals(prev)) {
                            this.reduceFlush(prev, this.outCollector);
                        }
                        this.reduce(this.midRecord, this.outCollector);
                        this.midRecordSpare = prev;
                    }
                }
            }
        }
        catch (Throwable e) {
            LOG.warn("failing: " + e, e);
            this.fail(e.toString());
        }
    }

    void complete() {
        if (this.taskType == TaskType.REDUCE && this.midRecord != null) {
            try {
                this.reduceFlush(this.midRecord, this.outCollector);
            }
            catch (Throwable e) {
                LOG.warn("failing: " + e, e);
                this.fail(e.toString());
            }
        }
        LOG.info("TetherTask: Sending complete to parent process.");
        this.outputClient.complete();
        LOG.info("TetherTask: Done sending complete to parent process.");
    }

    public abstract void map(IN var1, Collector<MID> var2) throws IOException;

    public abstract void reduce(MID var1, Collector<OUT> var2) throws IOException;

    public abstract void reduceFlush(MID var1, Collector<OUT> var2) throws IOException;

    public void status(String message) {
        this.outputClient.status(message);
    }

    public void count(String group, String name, long amount) {
        this.outputClient.count(group, name, amount);
    }

    public void fail(String message) {
        this.outputClient.fail(message);
        this.close();
    }

    void close() {
        LOG.info("Closing the transciever");
        if (this.clientTransceiver != null) {
            try {
                this.clientTransceiver.close();
            }
            catch (IOException iOException) {
                // empty catch block
            }
        }
    }

    public class Collector<T> {
        private SpecificDatumWriter<T> writer;
        private Buffer buffer = new Buffer();
        private BinaryEncoder encoder = new EncoderFactory().configureBlockSize(512).binaryEncoder((OutputStream)this.buffer, null);

        private Collector(Schema schema) {
            this.writer = new SpecificDatumWriter(schema);
        }

        public void collect(T record) throws IOException {
            this.buffer.reset();
            this.writer.write(record, (Encoder)this.encoder);
            this.encoder.flush();
            TetherTask.this.outputClient.output(this.buffer.data());
        }

        public void collect(T record, int partition) throws IOException {
            this.buffer.reset();
            this.writer.write(record, (Encoder)this.encoder);
            this.encoder.flush();
            TetherTask.this.outputClient.outputPartitioned(partition, this.buffer.data());
        }
    }

    private static class Buffer
    extends ByteArrayOutputStream {
        private Buffer() {
        }

        public ByteBuffer data() {
            return ByteBuffer.wrap(this.buf, 0, this.count);
        }
    }
}

