/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.azure.servicebus;

import com.azure.core.util.BinaryData;
import com.azure.messaging.servicebus.ServiceBusSenderAsyncClient;
import java.io.File;
import java.io.InputStream;
import java.nio.file.Path;
import java.util.EnumMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.StreamSupport;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.TypeConverter;
import org.apache.camel.component.azure.servicebus.ServiceBusConfiguration;
import org.apache.camel.component.azure.servicebus.ServiceBusConfigurationOptionsProxy;
import org.apache.camel.component.azure.servicebus.ServiceBusEndpoint;
import org.apache.camel.component.azure.servicebus.ServiceBusProducerOperationDefinition;
import org.apache.camel.component.azure.servicebus.client.ServiceBusClientFactory;
import org.apache.camel.component.azure.servicebus.client.ServiceBusSenderAsyncClientWrapper;
import org.apache.camel.component.azure.servicebus.operations.ServiceBusSenderOperations;
import org.apache.camel.support.DefaultAsyncProducer;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Mono;

public class ServiceBusProducer
extends DefaultAsyncProducer {
    private static final Logger LOG = LoggerFactory.getLogger(ServiceBusProducer.class);
    private ServiceBusSenderAsyncClientWrapper senderClientWrapper;
    private ServiceBusConfigurationOptionsProxy configurationOptionsProxy;
    private ServiceBusSenderOperations serviceBusSenderOperations;
    private final Map<ServiceBusProducerOperationDefinition, BiConsumer<Exchange, AsyncCallback>> operationsToExecute = new EnumMap<ServiceBusProducerOperationDefinition, BiConsumer<Exchange, AsyncCallback>>(ServiceBusProducerOperationDefinition.class);

    public ServiceBusProducer(Endpoint endpoint) {
        super(endpoint);
        this.bind(ServiceBusProducerOperationDefinition.sendMessages, this.sendMessages());
        this.bind(ServiceBusProducerOperationDefinition.scheduleMessages, this.scheduleMessages());
    }

    protected void doInit() throws Exception {
        super.doInit();
        this.configurationOptionsProxy = new ServiceBusConfigurationOptionsProxy(this.getConfiguration());
    }

    protected void doStart() throws Exception {
        super.doStart();
        ServiceBusSenderAsyncClient senderClient = this.getConfiguration().getSenderAsyncClient() != null ? this.getConfiguration().getSenderAsyncClient() : ServiceBusClientFactory.createServiceBusSenderAsyncClient(this.getConfiguration());
        this.senderClientWrapper = new ServiceBusSenderAsyncClientWrapper(senderClient);
        this.serviceBusSenderOperations = new ServiceBusSenderOperations(this.senderClientWrapper);
    }

    public boolean process(Exchange exchange, AsyncCallback callback) {
        try {
            this.invokeOperation(this.configurationOptionsProxy.getServiceBusProducerOperationDefinition(exchange), exchange, callback);
            return false;
        }
        catch (Exception e) {
            exchange.setException((Throwable)e);
            callback.done(true);
            return true;
        }
    }

    protected void doStop() throws Exception {
        if (this.senderClientWrapper != null) {
            this.senderClientWrapper.close();
        }
        super.doStop();
    }

    public ServiceBusEndpoint getEndpoint() {
        return (ServiceBusEndpoint)super.getEndpoint();
    }

    public ServiceBusConfiguration getConfiguration() {
        return this.getEndpoint().getConfiguration();
    }

    private void bind(ServiceBusProducerOperationDefinition operation, BiConsumer<Exchange, AsyncCallback> fn) {
        this.operationsToExecute.put(operation, fn);
    }

    private void invokeOperation(ServiceBusProducerOperationDefinition operation, Exchange exchange, AsyncCallback callback) {
        ServiceBusProducerOperationDefinition operationsToInvoke = ObjectHelper.isEmpty((Object)((Object)operation)) ? ServiceBusProducerOperationDefinition.sendMessages : operation;
        BiConsumer<Exchange, AsyncCallback> fnToInvoke = this.operationsToExecute.get((Object)operationsToInvoke);
        if (fnToInvoke == null) {
            throw new RuntimeCamelException("Operation not supported. Value: " + operationsToInvoke);
        }
        fnToInvoke.accept(exchange, callback);
    }

    private BiConsumer<Exchange, AsyncCallback> sendMessages() {
        return (exchange, callback) -> {
            Mono<Void> sendMessageAsync;
            Object inputBody = exchange.getMessage().getBody();
            Map applicationProperties = (Map)exchange.getMessage().getHeader("CamelAzureServiceBusApplicationProperties", Map.class);
            if (inputBody instanceof Iterable) {
                sendMessageAsync = this.serviceBusSenderOperations.sendMessages(this.convertBodyToList((Iterable)inputBody), this.configurationOptionsProxy.getServiceBusTransactionContext((Exchange)exchange), (Map<String, Object>)applicationProperties);
            } else {
                Object convertedBody = inputBody instanceof BinaryData ? inputBody : (this.getConfiguration().isBinary() ? this.convertBodyToBinary((Exchange)exchange) : exchange.getMessage().getBody(String.class));
                sendMessageAsync = this.serviceBusSenderOperations.sendMessages(convertedBody, this.configurationOptionsProxy.getServiceBusTransactionContext((Exchange)exchange), (Map<String, Object>)applicationProperties);
            }
            this.subscribeToMono((Mono)sendMessageAsync, (Exchange)exchange, noop -> {}, (AsyncCallback)callback);
        };
    }

    private BiConsumer<Exchange, AsyncCallback> scheduleMessages() {
        return (exchange, callback) -> {
            Mono<List<Long>> scheduleMessagesAsync;
            Object inputBody = exchange.getMessage().getBody();
            Map applicationProperties = (Map)exchange.getMessage().getHeader("CamelAzureServiceBusApplicationProperties", Map.class);
            if (inputBody instanceof Iterable) {
                scheduleMessagesAsync = this.serviceBusSenderOperations.scheduleMessages(this.convertBodyToList((Iterable)inputBody), this.configurationOptionsProxy.getScheduledEnqueueTime((Exchange)exchange), this.configurationOptionsProxy.getServiceBusTransactionContext((Exchange)exchange), (Map<String, Object>)applicationProperties);
            } else {
                Object convertedBody = inputBody instanceof BinaryData ? inputBody : (this.getConfiguration().isBinary() ? this.convertBodyToBinary((Exchange)exchange) : exchange.getMessage().getBody(String.class));
                scheduleMessagesAsync = this.serviceBusSenderOperations.scheduleMessages(convertedBody, this.configurationOptionsProxy.getScheduledEnqueueTime((Exchange)exchange), this.configurationOptionsProxy.getServiceBusTransactionContext((Exchange)exchange), (Map<String, Object>)applicationProperties);
            }
            this.subscribeToMono((Mono)scheduleMessagesAsync, (Exchange)exchange, sequenceNumbers -> exchange.getMessage().setBody(sequenceNumbers), (AsyncCallback)callback);
        };
    }

    private List<?> convertBodyToList(Iterable<?> inputBody) {
        return StreamSupport.stream(inputBody.spliterator(), false).map(this::convertMessageBody).toList();
    }

    private Object convertBodyToBinary(Exchange exchange) {
        Object body = exchange.getMessage().getBody();
        if (body instanceof InputStream) {
            return BinaryData.fromStream((InputStream)((InputStream)body));
        }
        if (body instanceof Path) {
            return BinaryData.fromFile((Path)((Path)body));
        }
        if (body instanceof File) {
            return BinaryData.fromFile((Path)((File)body).toPath());
        }
        return BinaryData.fromBytes((byte[])((byte[])exchange.getMessage().getBody(byte[].class)));
    }

    private Object convertMessageBody(Object inputBody) {
        TypeConverter typeConverter = this.getEndpoint().getCamelContext().getTypeConverter();
        if (inputBody instanceof BinaryData) {
            return inputBody;
        }
        if (this.getConfiguration().isBinary()) {
            if (inputBody instanceof InputStream) {
                return BinaryData.fromStream((InputStream)((InputStream)inputBody));
            }
            if (inputBody instanceof Path) {
                return BinaryData.fromFile((Path)((Path)inputBody));
            }
            if (inputBody instanceof File) {
                return BinaryData.fromFile((Path)((File)inputBody).toPath());
            }
            return typeConverter.convertTo(byte[].class, inputBody);
        }
        return typeConverter.convertTo(String.class, inputBody);
    }

    private <T> void subscribeToMono(Mono<T> inputMono, Exchange exchange, Consumer<T> resultsCallback, AsyncCallback callback) {
        inputMono.subscribe(resultsCallback, error -> {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error processing async exchange with error: {}", (Object)error.getMessage());
            }
            exchange.setException(error);
            callback.done(false);
        }, () -> {
            LOG.trace("All events with exchange have been sent successfully.");
            callback.done(false);
        });
    }
}

