/*
 * Decompiled with CFR 0.152.
 */
package org.apache.qpid.qmf2.tools;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.Connection;
import org.apache.qpid.qmf2.common.QmfCallback;
import org.apache.qpid.qmf2.common.QmfData;
import org.apache.qpid.qmf2.common.QmfEvent;
import org.apache.qpid.qmf2.common.QmfEventListener;
import org.apache.qpid.qmf2.common.QmfException;
import org.apache.qpid.qmf2.common.WorkItem;
import org.apache.qpid.qmf2.console.Agent;
import org.apache.qpid.qmf2.console.Console;
import org.apache.qpid.qmf2.console.EventReceivedWorkItem;
import org.apache.qpid.qmf2.console.QmfConsoleData;
import org.apache.qpid.qmf2.util.ConnectionHelper;
import org.apache.qpid.qmf2.util.GetOpt;

public final class QueueFuse
implements QmfEventListener {
    private static final String _usage = "Usage: QueueFuse [options] [broker-addr]...\n";
    private static final String _description = "Monitors one or more Qpid message brokers for queueThresholdExceeded Events.\n\nIf a queueThresholdExceeded Event occurs messages are purged from the queue,\nin other words this class behaves rather like a fuse 'blowing' if the\nthreshold gets exceeded.\n\nIf no broker-addr is supplied, QueueFuse connects to 'localhost:5672'.\n\n[broker-addr] syntax:\n\n[username/password@] hostname\nip-address [:<port>]\n\nExamples:\n\n$ QueueFuse localhost:5672\n$ QueueFuse 10.1.1.7:10000\n$ QueueFuse guest/guest@broker-host:10000\n";
    private static final String _options = "Options:\n  -h, --help            show this help message and exit\n  -f <filter>, --filter=<filter>\n                        a list of comma separated queue names (regex are\n                        accepted) to protect (default is to protect all).\n  -p <PERCENT>, --purge=<PERCENT>\n                        The percentage of messages to purge when the queue\n                        threshold gets exceeded (default = 20%).\n                        N.B. if this gets set too low the fuse may not blow.\n  --sasl-mechanism=<mech>\n                        SASL mechanism for authentication (e.g. EXTERNAL,\n                        ANONYMOUS, PLAIN, CRAM-MD5, DIGEST-MD5, GSSAPI). SASL\n                        automatically picks the most secure available\n                        mechanism - use this option to override.\n";
    private final String _url;
    private final List<Pattern> _filter;
    private final float _purge;
    private Map<String, QmfConsoleData> _queueCache = new HashMap<String, QmfConsoleData>(50);
    private Console _console;

    public QueueFuse(String url, String connectionOptions, List<Pattern> filter, float purge) {
        System.out.println("QueueFuse Connecting to " + url);
        if (filter.size() > 0) {
            System.out.println("Filter = " + filter);
        }
        this._url = url;
        this._filter = filter;
        this._purge = purge;
        try {
            Connection connection = ConnectionHelper.createConnection((String)url, (String)connectionOptions);
            this._console = new Console((QmfCallback)this);
            this._console.addConnection(connection);
            this.updateQueueCache();
        }
        catch (QmfException qmfe) {
            System.err.println("QmfException " + qmfe.getMessage() + " caught in QueueFuse constructor");
        }
    }

    private void updateQueueCache() {
        this._queueCache.clear();
        List queues = this._console.getObjects("org.apache.qpid.broker", "queue");
        for (QmfConsoleData queue : queues) {
            String queueName = queue.getStringValue("name");
            this._queueCache.put(queueName, queue);
        }
    }

    private void purgeQueue(String queueName, long msgDepth) {
        QmfConsoleData queue = this._queueCache.get(queueName);
        if (queue == null) {
            System.out.printf("%s ERROR QueueFuse.disconnectQueue() %s reference couldn't be found\n", new Date().toString(), queueName);
        } else {
            Map args = (Map)queue.getValue("arguments");
            String policyType = (String)args.get("qpid.policy_type");
            if (policyType != null && policyType.equals("ring")) {
                return;
            }
            try {
                QmfData arguments = new QmfData();
                arguments.setValue("request", (Object)((long)(this._purge * (float)msgDepth)));
                queue.invokeMethod("purge", arguments);
            }
            catch (QmfException e) {
                System.out.println(e.getMessage());
            }
        }
    }

    public void onEvent(WorkItem wi) {
        if (wi instanceof EventReceivedWorkItem) {
            EventReceivedWorkItem item = (EventReceivedWorkItem)wi;
            Agent agent = item.getAgent();
            QmfEvent event = item.getEvent();
            String className = event.getSchemaClassId().getClassName();
            if (className.equals("queueDeclare")) {
                this.updateQueueCache();
            } else if (className.equals("queueThresholdExceeded")) {
                String queueName = event.getStringValue("qName");
                boolean matches = false;
                for (Pattern x : this._filter) {
                    Matcher m = x.matcher(queueName);
                    if (!m.find()) continue;
                    matches = true;
                    break;
                }
                if (this._filter.isEmpty() || matches) {
                    long msgDepth = event.getLongValue("msgDepth");
                    this.purgeQueue(queueName, msgDepth);
                }
            }
        }
    }

    public static void main(String[] args) {
        String logLevel = System.getProperty("amqj.logging.level");
        logLevel = logLevel == null ? "FATAL" : logLevel;
        System.setProperty("amqj.logging.level", logLevel);
        String[] longOpts = new String[]{"help", "filter=", "purge=", "sasl-mechanism="};
        try {
            boolean includeRingQueues = false;
            String connectionOptions = "{reconnect: true}";
            ArrayList<Pattern> filter = new ArrayList<Pattern>();
            float purge = 0.2f;
            GetOpt getopt = new GetOpt(args, "hf:p:", longOpts);
            List optList = getopt.getOptList();
            String[] cargs = new String[]{};
            cargs = getopt.getEncArgs().toArray(cargs);
            for (String[] opt : optList) {
                if (opt[0].equals("-h") || opt[0].equals("--help")) {
                    System.out.println(_usage);
                    System.out.println(_description);
                    System.out.println(_options);
                    System.exit(1);
                    continue;
                }
                if (opt[0].equals("-f") || opt[0].equals("--filter")) {
                    String[] split;
                    for (String s : split = opt[1].split(",")) {
                        Pattern p = Pattern.compile(s);
                        filter.add(p);
                    }
                    continue;
                }
                if (opt[0].equals("-p") || opt[0].equals("--purge")) {
                    int percent = Integer.parseInt(opt[1]);
                    if (percent < 0 || percent > 100) {
                        System.out.println(_usage);
                        System.exit(1);
                    }
                    purge = (float)percent / 100.0f;
                    continue;
                }
                if (!opt[0].equals("--sasl-mechanism")) continue;
                connectionOptions = "{reconnect: true, sasl_mechs: " + opt[1] + "}";
            }
            int nargs = cargs.length;
            if (nargs == 0) {
                cargs = new String[]{"localhost"};
            }
            for (String url : cargs) {
                QueueFuse queueFuse = new QueueFuse(url, connectionOptions, filter, purge);
            }
        }
        catch (IllegalArgumentException e) {
            System.out.println(_usage);
            System.out.println(e.getMessage());
            System.exit(1);
        }
        try {
            Thread.currentThread().join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
    }
}

