/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.resources.v2;

import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.UriUtils;
import io.confluent.kafkarest.entities.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.ConsumerSeekToOffsetRequest;
import io.confluent.kafkarest.entities.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.ConsumerSubscriptionResponse;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.v2.AvroKafkaConsumerState;
import io.confluent.kafkarest.v2.BinaryKafkaConsumerState;
import io.confluent.kafkarest.v2.JsonKafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import io.confluent.rest.annotations.PerformanceMetric;
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;

@Path(value="/consumers")
@Produces(value={"application/vnd.kafka.binary.v2+json; qs=0.1", "application/vnd.kafka.avro.v2+json; qs=0.1", "application/vnd.kafka.json.v2+json; qs=0.1", "application/vnd.kafka.v2+json; qs=0.9"})
@Consumes(value={"application/vnd.kafka.binary.v2+json", "application/vnd.kafka.avro.v2+json", "application/vnd.kafka.json.v2+json", "application/vnd.kafka.v2+json"})
public class ConsumersResource {
    private final KafkaRestContext ctx;

    public ConsumersResource(KafkaRestContext ctx) {
        this.ctx = ctx;
    }

    @POST
    @Valid
    @Path(value="/{group}")
    @PerformanceMetric(value="consumer.create+v2")
    public CreateConsumerInstanceResponse createGroup(@Context UriInfo uriInfo, @PathParam(value="group") String group, @Valid ConsumerInstanceConfig config) {
        if (config == null) {
            config = new ConsumerInstanceConfig();
        }
        String instanceId = this.ctx.getKafkaConsumerManager().createConsumer(group, config);
        String instanceBaseUri = UriUtils.absoluteUriBuilder(this.ctx.getConfig(), uriInfo).path("instances").path(instanceId).build(new Object[0]).toString();
        return new CreateConsumerInstanceResponse(instanceId, instanceBaseUri);
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}")
    @PerformanceMetric(value="consumer.delete+v2")
    public void deleteGroup(@PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        this.ctx.getKafkaConsumerManager().deleteConsumer(group, instance);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.subscribe+v2")
    public void subscribe(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSubscriptionRecord subscription) {
        try {
            this.ctx.getKafkaConsumerManager().subscribe(group, instance, subscription);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException((Throwable)e);
        }
    }

    @GET
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.subscription+v2")
    public ConsumerSubscriptionResponse subscription(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        return this.ctx.getKafkaConsumerManager().subscription(group, instance);
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.unsubscribe+v2")
    public void unsubscribe(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        this.ctx.getKafkaConsumerManager().unsubscribe(group, instance);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-binary+v2")
    @Produces(value={"application/vnd.kafka.binary.v2+json", "application/vnd.kafka.v2+json; qs=0.9"})
    public void readRecordBinary(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, timeout, maxBytes, BinaryKafkaConsumerState.class);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-json+v2")
    @Produces(value={"application/vnd.kafka.json.v2+json; qs=0.1"})
    public void readRecordJson(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, timeout, maxBytes, JsonKafkaConsumerState.class);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-avro+v2")
    @Produces(value={"application/vnd.kafka.avro.v2+json; qs=0.1"})
    public void readRecordAvro(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, timeout, maxBytes, AvroKafkaConsumerState.class);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.commit-offsets+v2")
    public void commitOffsets(final @Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="async") @DefaultValue(value="false") String async, @Valid ConsumerOffsetCommitRequest offsetCommitRequest) {
        this.ctx.getKafkaConsumerManager().commitOffsets(group, instance, async, offsetCommitRequest, new KafkaConsumerManager.CommitCallback(){

            @Override
            public void onCompletion(List<TopicPartitionOffset> offsets, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(offsets);
                }
            }
        });
    }

    @GET
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.committed-offsets+v2")
    public ConsumerCommittedResponse committedOffsets(@PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid ConsumerCommittedRequest request) {
        if (request == null) {
            throw Errors.partitionNotFoundException();
        }
        return this.ctx.getKafkaConsumerManager().committed(group, instance, request);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions/beginning")
    @PerformanceMetric(value="consumer.seek-to-beginning+v2")
    public void seekToBeginning(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSeekToRequest seekToRequest) {
        try {
            this.ctx.getKafkaConsumerManager().seekToBeginning(group, instance, seekToRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException((Throwable)e);
        }
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions/end")
    @PerformanceMetric(value="consumer.seek-to-end+v2")
    public void seekToEnd(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSeekToRequest seekToRequest) {
        try {
            this.ctx.getKafkaConsumerManager().seekToEnd(group, instance, seekToRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException((Throwable)e);
        }
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions")
    @PerformanceMetric(value="consumer.seek-to-offset+v2")
    public void seekToOffset(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSeekToOffsetRequest seekToOffsetRequest) {
        try {
            this.ctx.getKafkaConsumerManager().seekToOffset(group, instance, seekToOffsetRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException((Throwable)e);
        }
    }

    @POST
    @Path(value="/{group}/instances/{instance}/assignments")
    @PerformanceMetric(value="consumer.assign+v2")
    public void assign(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerAssignmentRequest assignmentRequest) {
        try {
            this.ctx.getKafkaConsumerManager().assign(group, instance, assignmentRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException((Throwable)e);
        }
    }

    @GET
    @Path(value="/{group}/instances/{instance}/assignments")
    @PerformanceMetric(value="consumer.assignment+v2")
    public ConsumerAssignmentResponse assignment(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        return this.ctx.getKafkaConsumerManager().assignment(group, instance);
    }

    private <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(final @Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeout, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType) {
        maxBytes = maxBytes <= 0L ? Long.MAX_VALUE : maxBytes;
        this.ctx.getKafkaConsumerManager().readRecords(group, instance, consumerStateType, timeout, maxBytes, new ConsumerReadCallback<ClientKeyT, ClientValueT>(){

            public void onCompletion(List<? extends ConsumerRecord<ClientKeyT, ClientValueT>> records, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(records);
                }
            }
        });
    }
}

