/*
 * Decompiled with CFR 0.152.
 */
package org.apache.airavata.gfac.monitor.impl.push.amqp;

import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Consumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import org.apache.airavata.common.utils.ServerSettings;
import org.apache.airavata.commons.gfac.type.HostDescription;
import org.apache.airavata.gfac.core.monitor.JobIdentity;
import org.apache.airavata.gfac.core.monitor.MonitorID;
import org.apache.airavata.gfac.core.monitor.state.JobStatusChangeRequest;
import org.apache.airavata.gfac.core.notification.MonitorPublisher;
import org.apache.airavata.gfac.monitor.core.PushMonitor;
import org.apache.airavata.gfac.monitor.exception.AiravataMonitorException;
import org.apache.airavata.gfac.monitor.impl.push.amqp.BasicConsumer;
import org.apache.airavata.gfac.monitor.impl.push.amqp.JSONMessageParser;
import org.apache.airavata.gfac.monitor.util.AMQPConnectionUtil;
import org.apache.airavata.gfac.monitor.util.CommonUtils;
import org.apache.airavata.model.workspace.experiment.JobState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AMQPMonitor
extends PushMonitor {
    private static final Logger logger = LoggerFactory.getLogger(AMQPMonitor.class);
    private Map<String, Channel> availableChannels;
    private MonitorPublisher publisher;
    private MonitorPublisher localPublisher;
    private BlockingQueue<MonitorID> runningQueue;
    private BlockingQueue<MonitorID> finishQueue;
    private String connectionName;
    private String proxyPath;
    private List<String> amqpHosts;
    private boolean startRegister;

    public AMQPMonitor() {
    }

    public AMQPMonitor(MonitorPublisher publisher, BlockingQueue<MonitorID> runningQueue, BlockingQueue<MonitorID> finishQueue, String proxyPath, String connectionName, List<String> hosts) {
        this.publisher = publisher;
        this.runningQueue = runningQueue;
        this.finishQueue = finishQueue;
        this.availableChannels = new HashMap<String, Channel>();
        this.connectionName = connectionName;
        this.proxyPath = proxyPath;
        this.amqpHosts = hosts;
        this.localPublisher = new MonitorPublisher(new EventBus());
        this.localPublisher.registerListener((Object)this);
    }

    public void initialize(String proxyPath, String connectionName, List<String> hosts) {
        this.availableChannels = new HashMap<String, Channel>();
        this.connectionName = connectionName;
        this.proxyPath = proxyPath;
        this.amqpHosts = hosts;
        this.localPublisher = new MonitorPublisher(new EventBus());
        this.localPublisher.registerListener((Object)this);
    }

    @Override
    public boolean registerListener(MonitorID monitorID) throws AiravataMonitorException {
        HostDescription host = monitorID.getHost();
        String hostAddress = host.getType().getHostAddress();
        String channelID = CommonUtils.getChannelID(monitorID);
        if (this.availableChannels.get(channelID) == null) {
            try {
                Connection connection = AMQPConnectionUtil.connect(this.amqpHosts, this.connectionName, this.proxyPath);
                Channel channel = null;
                channel = connection.createChannel();
                this.availableChannels.put(channelID, channel);
                String queueName = channel.queueDeclare().getQueue();
                BasicConsumer consumer = new BasicConsumer(new JSONMessageParser(), this.localPublisher);
                channel.basicConsume(queueName, true, (Consumer)consumer);
                String filterString = CommonUtils.getRoutingKey(monitorID.getUserName(), hostAddress);
                channel.queueBind(queueName, "glue2.computing_activity", filterString);
                logger.info("Using filtering string to monitor: " + filterString);
            }
            catch (IOException e) {
                logger.error("Error creating the connection to finishQueue the job:" + monitorID.getUserName());
            }
        }
        return true;
    }

    public void run() {
        this.startRegister = true;
        while (this.startRegister || !ServerSettings.isStopAllThreads()) {
            try {
                MonitorID take = this.runningQueue.take();
                this.registerListener(take);
            }
            catch (AiravataMonitorException e) {
                e.printStackTrace();
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        }
        Set<String> strings = this.availableChannels.keySet();
        for (String key : strings) {
            Channel channel = this.availableChannels.get(key);
            try {
                channel.close();
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    @Override
    @Subscribe
    public boolean unRegisterListener(MonitorID monitorID) throws AiravataMonitorException {
        Iterator iterator = this.finishQueue.iterator();
        MonitorID next = null;
        while (iterator.hasNext() && !(next = (MonitorID)iterator.next()).getJobID().endsWith(monitorID.getJobID())) {
        }
        if (next == null) {
            logger.error("Job has removed from the queue, old obsolete message recieved");
            return false;
        }
        String channelID = CommonUtils.getChannelID(next);
        if (JobState.FAILED.equals((Object)monitorID.getStatus()) || JobState.COMPLETE.equals((Object)monitorID.getStatus())) {
            this.finishQueue.remove(next);
            if (CommonUtils.isTheLastJobInQueue(this.finishQueue, next)) {
                logger.info("There are no jobs to monitor for common ChannelID:" + channelID + " , so we unsubscribe it" + ", incase new job created we do subscribe again");
                Channel channel = this.availableChannels.get(channelID);
                if (channel == null) {
                    logger.error("Already Unregistered the listener");
                    throw new AiravataMonitorException("Already Unregistered the listener");
                }
                try {
                    channel.queueUnbind(channel.queueDeclare().getQueue(), "glue2.computing_activity", CommonUtils.getRoutingKey(next));
                    channel.close();
                    channel.getConnection().close();
                    this.availableChannels.remove(channelID);
                }
                catch (IOException e) {
                    logger.error("Error unregistering the listener");
                    throw new AiravataMonitorException("Error unregistering the listener");
                }
            }
        }
        next.setStatus(monitorID.getStatus());
        this.publisher.publish((Object)new JobStatusChangeRequest(next, new JobIdentity(next.getExperimentID(), next.getWorkflowNodeID(), next.getTaskID(), next.getJobID()), next.getStatus()));
        return true;
    }

    @Override
    public boolean stopRegister() throws AiravataMonitorException {
        return false;
    }

    public Map<String, Channel> getAvailableChannels() {
        return this.availableChannels;
    }

    public void setAvailableChannels(Map<String, Channel> availableChannels) {
        this.availableChannels = availableChannels;
    }

    @Override
    public MonitorPublisher getPublisher() {
        return this.publisher;
    }

    @Override
    public void setPublisher(MonitorPublisher publisher) {
        this.publisher = publisher;
    }

    public BlockingQueue<MonitorID> getRunningQueue() {
        return this.runningQueue;
    }

    public void setRunningQueue(BlockingQueue<MonitorID> runningQueue) {
        this.runningQueue = runningQueue;
    }

    public BlockingQueue<MonitorID> getFinishQueue() {
        return this.finishQueue;
    }

    public void setFinishQueue(BlockingQueue<MonitorID> finishQueue) {
        this.finishQueue = finishQueue;
    }

    public String getProxyPath() {
        return this.proxyPath;
    }

    public void setProxyPath(String proxyPath) {
        this.proxyPath = proxyPath;
    }

    public List<String> getAmqpHosts() {
        return this.amqpHosts;
    }

    public void setAmqpHosts(List<String> amqpHosts) {
        this.amqpHosts = amqpHosts;
    }

    public boolean isStartRegister() {
        return this.startRegister;
    }

    public void setStartRegister(boolean startRegister) {
        this.startRegister = startRegister;
    }
}

