/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.resources.v2;

import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.ProducerPool;
import io.confluent.kafkarest.RecordMetadataOrException;
import io.confluent.kafkarest.Utils;
import io.confluent.kafkarest.entities.AvroProduceRecord;
import io.confluent.kafkarest.entities.BinaryProduceRecord;
import io.confluent.kafkarest.entities.EmbeddedFormat;
import io.confluent.kafkarest.entities.JsonProduceRecord;
import io.confluent.kafkarest.entities.Partition;
import io.confluent.kafkarest.entities.PartitionOffset;
import io.confluent.kafkarest.entities.PartitionProduceRequest;
import io.confluent.kafkarest.entities.ProduceRecord;
import io.confluent.kafkarest.entities.ProduceResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffsetResponse;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.List;
import java.util.Vector;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/topics/{topic}/partitions")
@Produces(value={"application/vnd.kafka.binary.v2+json; qs=0.1", "application/vnd.kafka.avro.v2+json; qs=0.1", "application/vnd.kafka.v2+json; qs=0.9"})
@Consumes(value={"application/vnd.kafka.v2+json"})
public final class PartitionsResource {
    private static final Logger log = LoggerFactory.getLogger(PartitionsResource.class);
    private final KafkaRestContext ctx;

    public PartitionsResource(KafkaRestContext ctx) {
        this.ctx = ctx;
    }

    @GET
    @PerformanceMetric(value="partitions.list+v2")
    public List<Partition> list(@PathParam(value="topic") String topic) throws Exception {
        this.checkTopicExists(topic);
        return this.ctx.getAdminClientWrapper().getTopicPartitions(topic);
    }

    @GET
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.get+v2")
    public Partition getPartition(@PathParam(value="topic") String topic, @PathParam(value="partition") int partition) throws Exception {
        this.checkTopicExists(topic);
        Partition part = this.ctx.getAdminClientWrapper().getTopicPartition(topic, partition);
        if (part == null) {
            throw Errors.partitionNotFoundException();
        }
        return part;
    }

    @POST
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.produce-binary+v2")
    @Consumes(value={"application/vnd.kafka.binary.v2+json"})
    public void produceBinary(@Suspended AsyncResponse asyncResponse, @PathParam(value="topic") String topic, @PathParam(value="partition") int partition, @Valid @NotNull PartitionProduceRequest<BinaryProduceRecord> request) throws Exception {
        this.produce(asyncResponse, topic, partition, EmbeddedFormat.BINARY, request);
    }

    @POST
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.produce-json+v2")
    @Consumes(value={"application/vnd.kafka.json.v2+json"})
    public void produceJson(@Suspended AsyncResponse asyncResponse, @PathParam(value="topic") String topic, @PathParam(value="partition") int partition, @Valid @NotNull PartitionProduceRequest<JsonProduceRecord> request) throws Exception {
        this.produce(asyncResponse, topic, partition, EmbeddedFormat.JSON, request);
    }

    @POST
    @Path(value="/{partition}")
    @PerformanceMetric(value="partition.produce-avro+v2")
    @Consumes(value={"application/vnd.kafka.avro.v2+json"})
    public void produceAvro(@Suspended AsyncResponse asyncResponse, @PathParam(value="topic") String topic, @PathParam(value="partition") int partition, @Valid @NotNull PartitionProduceRequest<AvroProduceRecord> request) throws Exception {
        boolean hasKeys = false;
        boolean hasValues = false;
        for (AvroProduceRecord rec : request.getRecords()) {
            hasKeys = hasKeys || !rec.getJsonKey().isNull();
            hasValues = hasValues || !rec.getJsonValue().isNull();
        }
        if (hasKeys && request.getKeySchema() == null && request.getKeySchemaId() == null) {
            throw Errors.keySchemaMissingException();
        }
        if (hasValues && request.getValueSchema() == null && request.getValueSchemaId() == null) {
            throw Errors.valueSchemaMissingException();
        }
        this.produce(asyncResponse, topic, partition, EmbeddedFormat.AVRO, request);
    }

    protected <K, V, R extends ProduceRecord<K, V>> void produce(final AsyncResponse asyncResponse, String topic, int partition, EmbeddedFormat format, PartitionProduceRequest<R> request) throws Exception {
        if (this.topicExists(topic) && !this.ctx.getAdminClientWrapper().partitionExists(topic, partition)) {
            throw Errors.partitionNotFoundException();
        }
        log.trace("Executing topic produce request id={} topic={} partition={} format={} request={}", new Object[]{asyncResponse, topic, partition, format, request});
        this.ctx.getProducerPool().produce(topic, partition, format, request, request.getRecords(), new ProducerPool.ProduceRequestCallback(){

            @Override
            public void onCompletion(Integer keySchemaId, Integer valueSchemaId, List<RecordMetadataOrException> results) {
                ProduceResponse response = new ProduceResponse();
                Vector<PartitionOffset> offsets = new Vector<PartitionOffset>();
                for (RecordMetadataOrException result : results) {
                    if (result.getException() != null) {
                        int errorCode = Utils.errorCodeFromProducerException(result.getException());
                        String errorMessage = result.getException().getMessage();
                        offsets.add(new PartitionOffset(null, null, errorCode, errorMessage));
                        continue;
                    }
                    offsets.add(new PartitionOffset(result.getRecordMetadata().partition(), result.getRecordMetadata().offset(), null, null));
                }
                response.setOffsets(offsets);
                response.setKeySchemaId(keySchemaId);
                response.setValueSchemaId(valueSchemaId);
                log.trace("Completed topic produce request id={} response={}", (Object)asyncResponse, (Object)response);
                Response.Status requestStatus = Utils.produceRequestStatus(response);
                asyncResponse.resume((Object)Response.status((Response.Status)requestStatus).entity((Object)response).build());
            }
        });
    }

    @GET
    @Path(value="/{partition}/offsets")
    public TopicPartitionOffsetResponse getOffsets(@PathParam(value="topic") String topic, @PathParam(value="partition") int partition) throws Exception {
        this.checkTopicExists(topic);
        this.checkPartitionExists(topic, partition);
        return new TopicPartitionOffsetResponse(this.getBeginningOffset(topic, partition), this.getEndOffset(topic, partition));
    }

    private long getBeginningOffset(String topic, int partition) {
        return this.ctx.getKafkaConsumerManager().getBeginningOffset(topic, partition);
    }

    private long getEndOffset(String topic, int partition) {
        return this.ctx.getKafkaConsumerManager().getEndOffset(topic, partition);
    }

    private void checkTopicExists(String topic) throws Exception {
        if (!this.topicExists(topic)) {
            throw Errors.topicNotFoundException();
        }
    }

    private boolean topicExists(String topic) throws Exception {
        return this.ctx.getAdminClientWrapper().topicExists(topic);
    }

    private void checkPartitionExists(String topic, int partition) throws Exception {
        if (!this.ctx.getAdminClientWrapper().partitionExists(topic, partition)) {
            throw Errors.partitionNotFoundException();
        }
    }
}

