/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.google.pubsub;

import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.model.PubsubMessage;
import com.google.api.services.pubsub.model.PullRequest;
import com.google.api.services.pubsub.model.PullResponse;
import com.google.api.services.pubsub.model.ReceivedMessage;
import java.net.SocketTimeoutException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.component.google.pubsub.GooglePubsubConstants;
import org.apache.camel.component.google.pubsub.GooglePubsubEndpoint;
import org.apache.camel.component.google.pubsub.consumer.ExchangeAckTransaction;
import org.apache.camel.impl.DefaultConsumer;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class GooglePubsubConsumer
extends DefaultConsumer {
    private Logger localLog;
    private final GooglePubsubEndpoint endpoint;
    private final Processor processor;
    private final Synchronization ackStrategy;
    private ExecutorService executor;
    private Pubsub pubsub;

    GooglePubsubConsumer(GooglePubsubEndpoint endpoint, Processor processor) throws Exception {
        super((Endpoint)endpoint, processor);
        this.endpoint = endpoint;
        this.processor = processor;
        this.ackStrategy = new ExchangeAckTransaction(this.endpoint);
        this.pubsub = endpoint.getConnectionFactory().getMultiThreadClient(this.endpoint.getConcurrentConsumers());
        String loggerId = endpoint.getLoggerId();
        if (ObjectHelper.isEmpty((Object)loggerId)) {
            loggerId = ((Object)((Object)this)).getClass().getName();
        }
        this.localLog = LoggerFactory.getLogger((String)loggerId);
    }

    protected void doStart() throws Exception {
        super.doStart();
        this.localLog.info("Starting Google PubSub consumer for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
        this.executor = this.endpoint.createExecutor();
        for (int i = 0; i < this.endpoint.getConcurrentConsumers(); ++i) {
            this.executor.submit(new PubsubPoller(i + ""));
        }
    }

    protected void doStop() throws Exception {
        super.doStop();
        this.localLog.info("Stopping Google PubSub consumer for {}/{}", (Object)this.endpoint.getProjectId(), (Object)this.endpoint.getDestinationName());
        if (this.executor != null) {
            if (this.getEndpoint() != null && this.getEndpoint().getCamelContext() != null) {
                this.getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.executor);
            } else {
                this.executor.shutdownNow();
            }
        }
        this.executor = null;
    }

    private class PubsubPoller
    implements Runnable {
        private final String subscriptionFullName;
        private final String threadId;

        PubsubPoller(String id) {
            this.subscriptionFullName = String.format("projects/%s/subscriptions/%s", GooglePubsubConsumer.this.endpoint.getProjectId(), GooglePubsubConsumer.this.endpoint.getDestinationName());
            this.threadId = GooglePubsubConsumer.this.endpoint.getDestinationName() + "-Thread " + id;
        }

        @Override
        public void run() {
            if (GooglePubsubConsumer.this.localLog.isDebugEnabled()) {
                GooglePubsubConsumer.this.localLog.debug("Subscribing {} to {}", (Object)this.threadId, (Object)this.subscriptionFullName);
            }
            while (GooglePubsubConsumer.this.isRunAllowed() && !GooglePubsubConsumer.this.isSuspendingOrSuspended()) {
                try {
                    PullResponse pullResponse;
                    PullRequest pullRequest = new PullRequest().setMaxMessages(GooglePubsubConsumer.this.endpoint.getMaxMessagesPerPoll());
                    try {
                        if (GooglePubsubConsumer.this.localLog.isTraceEnabled()) {
                            GooglePubsubConsumer.this.localLog.trace("Polling : {}", (Object)this.threadId);
                        }
                        pullResponse = (PullResponse)GooglePubsubConsumer.this.pubsub.projects().subscriptions().pull(this.subscriptionFullName, pullRequest).execute();
                    }
                    catch (SocketTimeoutException ste) {
                        if (!GooglePubsubConsumer.this.localLog.isTraceEnabled()) continue;
                        GooglePubsubConsumer.this.localLog.trace("Socket timeout : {}", (Object)this.threadId);
                        continue;
                    }
                    if (null == pullResponse.getReceivedMessages()) continue;
                    List receivedMessages = pullResponse.getReceivedMessages();
                    for (ReceivedMessage receivedMessage : receivedMessages) {
                        PubsubMessage pubsubMessage = receivedMessage.getMessage();
                        byte[] body = pubsubMessage.decodeData();
                        if (GooglePubsubConsumer.this.localLog.isTraceEnabled()) {
                            GooglePubsubConsumer.this.localLog.trace("Received message ID : {}", (Object)pubsubMessage.getMessageId());
                        }
                        Exchange exchange = GooglePubsubConsumer.this.endpoint.createExchange();
                        exchange.getIn().setBody((Object)body);
                        exchange.getIn().setHeader("CamelGooglePubsub.MsgAckId", (Object)receivedMessage.getAckId());
                        exchange.getIn().setHeader("CamelGooglePubsub.MessageId", (Object)pubsubMessage.getMessageId());
                        exchange.getIn().setHeader("CamelGooglePubsub.PublishTime", (Object)pubsubMessage.getPublishTime());
                        if (null != receivedMessage.getMessage().getAttributes()) {
                            exchange.getIn().setHeader("CamelGooglePubsub.Attributes", (Object)receivedMessage.getMessage().getAttributes());
                        }
                        if (GooglePubsubConsumer.this.endpoint.getAckMode() != GooglePubsubConstants.AckMode.NONE) {
                            exchange.addOnCompletion(GooglePubsubConsumer.this.ackStrategy);
                        }
                        try {
                            GooglePubsubConsumer.this.processor.process(exchange);
                        }
                        catch (Throwable e) {
                            exchange.setException(e);
                        }
                    }
                }
                catch (Exception e) {
                    GooglePubsubConsumer.this.localLog.error("Failure getting messages from PubSub : ", (Throwable)e);
                }
            }
        }
    }
}

