/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.common.utils.AbstractPerformanceTest;
import io.confluent.common.utils.PerformanceStats;
import io.confluent.kafkarest.entities.BinaryTopicProduceRecord;
import io.confluent.kafkarest.entities.TopicProduceRecord;
import io.confluent.kafkarest.entities.TopicProduceRequest;
import io.confluent.rest.entities.ErrorMessage;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Arrays;

public class ProducerPerformance
extends AbstractPerformanceTest {
    long iterations;
    long iterationsPerSec;
    int recordsPerIteration;
    long bytesPerIteration;
    String targetUrl;
    String requestEntityLength;
    byte[] requestEntity;
    byte[] buffer;
    private final ObjectMapper jsonDeserializer = new ObjectMapper();

    public static void main(String[] args) throws Exception {
        if (args.length < 6) {
            System.out.println("Usage: java " + ProducerPerformance.class.getName() + " rest_url topic_name num_records record_size batch_size target_records_sec");
            System.exit(1);
        }
        String baseUrl = args[0];
        String topic = args[1];
        int numRecords = Integer.parseInt(args[2]);
        int recordSize = Integer.parseInt(args[3]);
        int batchSize = Integer.parseInt(args[4]);
        int throughput = Integer.parseInt(args[5]) / batchSize;
        ProducerPerformance perf = new ProducerPerformance(baseUrl, topic, numRecords / batchSize, batchSize, throughput, recordSize);
        perf.run(throughput);
    }

    public ProducerPerformance(String baseUrl, String topic, long iterations, int recordsPerIteration, long iterationsPerSec, int recordSize) throws Exception {
        super(iterations * (long)recordsPerIteration);
        this.iterations = iterations;
        this.iterationsPerSec = iterationsPerSec;
        this.recordsPerIteration = recordsPerIteration;
        this.bytesPerIteration = recordsPerIteration * recordSize;
        this.targetUrl = baseUrl + "/topics/" + topic;
        byte[] payload = new byte[recordSize];
        Arrays.fill(payload, (byte)1);
        BinaryTopicProduceRecord record = new BinaryTopicProduceRecord(payload);
        Object[] records = new TopicProduceRecord[recordsPerIteration];
        Arrays.fill(records, record);
        TopicProduceRequest<Object> request = new TopicProduceRequest<Object>();
        request.setRecords(Arrays.asList(records));
        this.requestEntity = new ObjectMapper().writeValueAsBytes(request);
        this.requestEntityLength = Integer.toString(this.requestEntity.length);
        this.buffer = new byte[0x100000];
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void doIteration(PerformanceStats.Callback cb) {
        HttpURLConnection connection = null;
        try {
            URL url = new URL(this.targetUrl);
            connection = (HttpURLConnection)url.openConnection();
            connection.setRequestMethod("POST");
            connection.setRequestProperty("Content-Type", "application/vnd.kafka.v2+json");
            connection.setRequestProperty("Content-Length", this.requestEntityLength);
            connection.setUseCaches(false);
            connection.setDoInput(true);
            connection.setDoOutput(true);
            OutputStream os = connection.getOutputStream();
            os.write(this.requestEntity);
            os.flush();
            os.close();
            int responseStatus = connection.getResponseCode();
            if (responseStatus >= 400) {
                InputStream es = connection.getErrorStream();
                ErrorMessage errorMessage = (ErrorMessage)this.jsonDeserializer.readValue(es, ErrorMessage.class);
                es.close();
                throw new RuntimeException(String.format("Unexpected HTTP error status %d: %s", responseStatus, errorMessage.getMessage()));
            }
            InputStream is = connection.getInputStream();
            while (is.read(this.buffer) > 0) {
            }
            is.close();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
        finally {
            if (connection != null) {
                connection.disconnect();
            }
        }
        cb.onCompletion(this.recordsPerIteration, this.bytesPerIteration);
    }

    protected boolean finished(int iteration) {
        return (long)iteration >= this.iterations;
    }

    protected boolean runningFast(int iteration, float elapsed) {
        return (float)iteration / elapsed > (float)this.iterationsPerSec;
    }
}

