/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.servicebus;

import com.azure.messaging.servicebus.ServiceBusErrorContext;
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
import com.azure.messaging.servicebus.models.DeadLetterOptions;
import com.azure.messaging.servicebus.models.SubQueue;
import java.util.Arrays;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.component.azure.servicebus.ServiceBusConfiguration;
import org.apache.camel.component.azure.servicebus.ServiceBusEndpoint;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.support.DefaultConsumer;
import org.apache.camel.support.SynchronizationAdapter;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceBusConsumer
extends DefaultConsumer {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusConsumer.class);
    private ServiceBusProcessorClient client;

    public ServiceBusConsumer(ServiceBusEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
    }

    protected void doStart() throws Exception {
        super.doStart();
        LOG.debug("Creating connection to Azure ServiceBus");
        this.client = this.getEndpoint().getServiceBusClientFactory().createServiceBusProcessorClient(this.getConfiguration(), this::processMessage, this::processError);
        this.client.start();
    }

    private void processMessage(ServiceBusReceivedMessageContext messageContext) {
        ServiceBusReceivedMessage message = messageContext.getMessage();
        Exchange exchange = this.createServiceBusExchange(message);
        ConsumerOnCompletion onCompletion = new ConsumerOnCompletion(messageContext);
        exchange.getExchangeExtension().addOnCompletion((Synchronization)onCompletion);
        AsyncCallback cb = this.defaultConsumerCallback(exchange, true);
        this.getAsyncProcessor().process(exchange, cb);
    }

    private void processError(ServiceBusErrorContext errorContext) {
        LOG.error("Error from Service Bus client: {}", (Object)errorContext.getErrorSource(), (Object)errorContext.getException());
    }

    protected void doStop() throws Exception {
        if (this.client != null) {
            this.client.close();
        }
        super.doStop();
    }

    public ServiceBusConfiguration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    public ServiceBusEndpoint getEndpoint() {
        return (ServiceBusEndpoint)super.getEndpoint();
    }

    private Exchange createServiceBusExchange(ServiceBusReceivedMessage receivedMessage) {
        Exchange exchange = this.createExchange(true);
        Message message = exchange.getIn();
        message.setBody((Object)receivedMessage.getBody());
        message.setHeader("CamelAzureServiceBusApplicationProperties", (Object)receivedMessage.getApplicationProperties());
        message.setHeader("CamelAzureServiceBusContentType", (Object)receivedMessage.getContentType());
        message.setHeader("CamelAzureServiceBusMessageId", (Object)receivedMessage.getMessageId());
        message.setHeader("CamelAzureServiceBusCorrelationId", (Object)receivedMessage.getCorrelationId());
        message.setHeader("CamelAzureServiceBusDeadLetterErrorDescription", (Object)receivedMessage.getDeadLetterErrorDescription());
        message.setHeader("CamelAzureServiceBusDeadLetterReason", (Object)receivedMessage.getDeadLetterReason());
        message.setHeader("CamelAzureServiceBusDeadLetterSource", (Object)receivedMessage.getDeadLetterSource());
        message.setHeader("CamelAzureServiceBusDeliveryCount", (Object)receivedMessage.getDeliveryCount());
        message.setHeader("CamelAzureServiceBusScheduledEnqueueTime", (Object)receivedMessage.getScheduledEnqueueTime());
        message.setHeader("CamelAzureServiceBusEnqueuedSequenceNumber", (Object)receivedMessage.getEnqueuedSequenceNumber());
        message.setHeader("CamelAzureServiceBusEnqueuedTime", (Object)receivedMessage.getEnqueuedTime());
        message.setHeader("CamelAzureServiceBusExpiresAt", (Object)receivedMessage.getExpiresAt());
        message.setHeader("CamelAzureServiceBusLockToken", (Object)receivedMessage.getLockToken());
        message.setHeader("CamelAzureServiceBusLockedUntil", (Object)receivedMessage.getLockedUntil());
        message.setHeader("CamelAzureServiceBusPartitionKey", (Object)receivedMessage.getPartitionKey());
        message.setHeader("CamelAzureServiceBusRawAmqpMessage", (Object)receivedMessage.getRawAmqpMessage());
        message.setHeader("CamelAzureServiceBusReplyTo", (Object)receivedMessage.getReplyTo());
        message.setHeader("CamelAzureServiceBusReplyToSessionId", (Object)receivedMessage.getReplyToSessionId());
        message.setHeader("CamelAzureServiceBusSequenceNumber", (Object)receivedMessage.getSequenceNumber());
        message.setHeader("CamelAzureServiceBusSessionId", (Object)receivedMessage.getSessionId());
        message.setHeader("CamelAzureServiceBusSubject", (Object)receivedMessage.getSubject());
        message.setHeader("CamelAzureServiceBusTimeToLive", (Object)receivedMessage.getTimeToLive());
        message.setHeader("CamelAzureServiceBusTo", (Object)receivedMessage.getTo());
        HeaderFilterStrategy headerFilterStrategy = this.getConfiguration().getHeaderFilterStrategy();
        message.getHeaders().putAll(receivedMessage.getApplicationProperties().entrySet().stream().filter(entry -> !headerFilterStrategy.applyFilterToExternalHeaders((String)entry.getKey(), entry.getValue(), exchange)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)));
        return exchange;
    }

    private class ConsumerOnCompletion
    extends SynchronizationAdapter {
        private final ServiceBusReceivedMessageContext messageContext;

        private ConsumerOnCompletion(ServiceBusReceivedMessageContext messageContext) {
            this.messageContext = messageContext;
        }

        public void onComplete(Exchange exchange) {
            super.onComplete(exchange);
            this.messageContext.complete();
        }

        public void onFailure(Exchange exchange) {
            Exception cause = exchange.getException();
            if (cause != null) {
                ServiceBusConsumer.this.getExceptionHandler().handleException("Error during processing exchange.", exchange, (Throwable)cause);
            }
            if (ServiceBusConsumer.this.getConfiguration().isEnableDeadLettering() && (ObjectHelper.isEmpty((Object)ServiceBusConsumer.this.getConfiguration().getSubQueue()) || ObjectHelper.equal((Object)ServiceBusConsumer.this.getConfiguration().getSubQueue(), (Object)SubQueue.NONE))) {
                DeadLetterOptions deadLetterOptions = new DeadLetterOptions();
                if (cause != null) {
                    deadLetterOptions.setDeadLetterReason(String.format("%s: %s", cause.getClass().getName(), cause.getMessage()));
                    deadLetterOptions.setDeadLetterErrorDescription(Arrays.stream(cause.getStackTrace()).map(StackTraceElement::toString).collect(Collectors.joining("\n")));
                    this.messageContext.deadLetter(deadLetterOptions);
                } else {
                    this.messageContext.deadLetter();
                }
            } else {
                this.messageContext.abandon();
            }
        }
    }
}

