/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.resources;

import io.confluent.kafkarest.AvroConsumerState;
import io.confluent.kafkarest.BinaryConsumerState;
import io.confluent.kafkarest.ConsumerManager;
import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.ConsumerState;
import io.confluent.kafkarest.JsonConsumerState;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.UriUtils;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.List;
import javax.validation.Valid;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;

@Path(value="/consumers")
@Produces(value={"application/vnd.kafka.binary.v1+json; qs=0.1", "application/vnd.kafka.avro.v1+json; qs=0.1", "application/vnd.kafka.json.v1+json; qs=0.1", "application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5"})
@Consumes(value={"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.avro.v1+json", "application/vnd.kafka.json.v1+json", "application/vnd.kafka.v1+json", "application/vnd.kafka+json", "application/json", "application/octet-stream"})
public class ConsumersResource {
    private final KafkaRestContext ctx;

    public ConsumersResource(KafkaRestContext ctx) {
        this.ctx = ctx;
    }

    @POST
    @Valid
    @Path(value="/{group}")
    @PerformanceMetric(value="consumer.create")
    public CreateConsumerInstanceResponse createGroup(@Context UriInfo uriInfo, @PathParam(value="group") String group, @Valid ConsumerInstanceConfig config) {
        if (config == null) {
            config = new ConsumerInstanceConfig();
        }
        String instanceId = this.ctx.getConsumerManager().createConsumer(group, config);
        String instanceBaseUri = UriUtils.absoluteUriBuilder(this.ctx.getConfig(), uriInfo).path("instances").path(instanceId).build(new Object[0]).toString();
        return new CreateConsumerInstanceResponse(instanceId, instanceBaseUri);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.commit")
    public void commitOffsets(final @Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        this.ctx.getConsumerManager().commitOffsets(group, instance, new ConsumerManager.CommitCallback(){

            public void onCompletion(List<TopicPartitionOffset> offsets, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(offsets);
                }
            }
        });
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}")
    @PerformanceMetric(value="consumer.delete")
    public void deleteGroup(@PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        this.ctx.getConsumerManager().deleteConsumer(group, instance);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/topics/{topic}")
    @PerformanceMetric(value="consumer.topic.read-binary")
    @Produces(value={"application/vnd.kafka.binary.v1+json", "application/vnd.kafka.v1+json; qs=0.9", "application/vnd.kafka+json; qs=0.8", "application/json; qs=0.5"})
    public void readTopicBinary(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @PathParam(value="topic") String topic, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readTopic(asyncResponse, group, instance, topic, maxBytes, BinaryConsumerState.class);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/topics/{topic}")
    @PerformanceMetric(value="consumer.topic.read-json")
    @Produces(value={"application/vnd.kafka.json.v1+json; qs=0.1"})
    public void readTopicJson(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @PathParam(value="topic") String topic, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readTopic(asyncResponse, group, instance, topic, maxBytes, JsonConsumerState.class);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/topics/{topic}")
    @PerformanceMetric(value="consumer.topic.read-avro")
    @Produces(value={"application/vnd.kafka.avro.v1+json; qs=0.1"})
    public void readTopicAvro(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @PathParam(value="topic") String topic, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readTopic(asyncResponse, group, instance, topic, maxBytes, AvroConsumerState.class);
    }

    private <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readTopic(final @Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @PathParam(value="topic") String topic, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes, Class<? extends ConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType) {
        maxBytes = maxBytes <= 0L ? Long.MAX_VALUE : maxBytes;
        this.ctx.getConsumerManager().readTopic(group, instance, topic, consumerStateType, maxBytes, new ConsumerReadCallback<ClientKeyT, ClientValueT>(){

            public void onCompletion(List<? extends ConsumerRecord<ClientKeyT, ClientValueT>> records, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(records);
                }
            }
        });
    }
}

