/*
 * Decompiled with CFR 0.152.
 */
package org.apache.camel.component.chronicle.engine;

import java.util.ArrayList;
import java.util.List;
import net.openhft.chronicle.engine.api.map.MapEvent;
import net.openhft.chronicle.engine.api.pubsub.InvalidSubscriberException;
import net.openhft.chronicle.engine.api.pubsub.Subscriber;
import net.openhft.chronicle.engine.api.pubsub.TopicSubscriber;
import net.openhft.chronicle.engine.api.tree.AssetTree;
import net.openhft.chronicle.engine.tree.TopologicalEvent;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.component.chronicle.engine.ChronicleEngineConfiguration;
import org.apache.camel.component.chronicle.engine.ChronicleEngineEndpoint;
import org.apache.camel.component.chronicle.engine.ChronicleEngineMapEventType;
import org.apache.camel.impl.DefaultConsumer;

public class ChronicleEngineConsumer
extends DefaultConsumer {
    private final String path;
    private AssetTree client;

    public ChronicleEngineConsumer(ChronicleEngineEndpoint endpoint, Processor processor) {
        super((Endpoint)endpoint, processor);
        this.path = endpoint.getPath();
    }

    protected void doStart() throws Exception {
        if (this.client != null) {
            throw new IllegalStateException("AssetTree already configured");
        }
        ChronicleEngineEndpoint endpoint = (ChronicleEngineEndpoint)this.getEndpoint();
        ChronicleEngineConfiguration conf = endpoint.getConfiguration();
        this.client = endpoint.createRemoteAssetTree();
        if (conf.isSubscribeMapEvents()) {
            this.client.registerSubscriber(endpoint.getPath(), MapEvent.class, (Subscriber)new EngineMapEventListener(conf.getFilteredMapEvents()));
        }
        if (conf.isSubscribeTopologicalEvents()) {
            this.client.registerSubscriber(endpoint.getPath(), TopologicalEvent.class, (Subscriber)new EngineTopologicalEventListener());
        }
        if (conf.isSubscribeTopicEvents()) {
            this.client.registerTopicSubscriber(endpoint.getPath(), Object.class, Object.class, (TopicSubscriber)new EngineTopicEventListener());
        }
    }

    protected void doStop() throws Exception {
        if (this.client != null) {
            this.client.close();
            this.client = null;
        }
    }

    private class EngineTopicEventListener
    implements TopicSubscriber<Object, Object> {
        private EngineTopicEventListener() {
        }

        public void onMessage(Object topic, Object dataMessage) throws InvalidSubscriberException {
            Exchange exchange = ChronicleEngineConsumer.this.getEndpoint().createExchange();
            Message message = exchange.getIn();
            message.setHeader("ChronicleEnginePath", (Object)ChronicleEngineConsumer.this.path);
            message.setHeader("ChronicleEngineTopic", topic);
            message.setBody(dataMessage);
            try {
                ChronicleEngineConsumer.this.getProcessor().process(exchange);
            }
            catch (Exception e) {
                throw new RuntimeCamelException((Throwable)e);
            }
        }
    }

    private class EngineTopologicalEventListener
    implements Subscriber<TopologicalEvent> {
        private EngineTopologicalEventListener() {
        }

        public void onMessage(TopologicalEvent event) throws InvalidSubscriberException {
            Exchange exchange = ChronicleEngineConsumer.this.getEndpoint().createExchange();
            Message message = exchange.getIn();
            message.setHeader("ChronicleEnginePath", (Object)ChronicleEngineConsumer.this.path);
            message.setHeader("ChronicleEngineAssetName", (Object)event.assetName());
            message.setHeader("ChronicleEngineTopologicalEventName", (Object)event.name());
            message.setHeader("ChronicleEngineTopologicalEventFullName", (Object)event.fullName());
            message.setHeader("ChronicleEngineTopologicalEventAdded", (Object)Boolean.toString(event.added()));
            try {
                ChronicleEngineConsumer.this.getProcessor().process(exchange);
            }
            catch (Exception e) {
                throw new RuntimeCamelException((Throwable)e);
            }
        }
    }

    private class EngineMapEventListener
    implements Subscriber<MapEvent> {
        private List<Class<? extends MapEvent>> filteredEvents = null;

        EngineMapEventListener(String[] events) {
            if (events != null && events.length > 0) {
                this.filteredEvents = new ArrayList<Class<? extends MapEvent>>(events.length);
                for (String event : events) {
                    this.filteredEvents.add(ChronicleEngineMapEventType.getType(event));
                }
            }
        }

        public void onMessage(MapEvent event) throws InvalidSubscriberException {
            if (this.filteredEvents != null && this.filteredEvents.contains(event.getClass())) {
                return;
            }
            Exchange exchange = ChronicleEngineConsumer.this.getEndpoint().createExchange();
            Message message = exchange.getIn();
            message.setHeader("ChronicleEnginePath", (Object)ChronicleEngineConsumer.this.path);
            message.setHeader("ChronicleEngineAssetName", (Object)event.assetName());
            message.setHeader("ChronicleEngineMapEventType", (Object)ChronicleEngineMapEventType.fromEvent(event));
            message.setHeader("ChronicleEngineKey", event.getKey());
            message.setBody(event.getValue());
            if (event.oldValue() != null) {
                message.setHeader("ChronicleEngineOldValue", event.oldValue());
            }
            try {
                ChronicleEngineConsumer.this.getProcessor().process(exchange);
            }
            catch (Exception e) {
                throw new RuntimeCamelException((Throwable)e);
            }
        }
    }
}

