/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.tests;

import junit.framework.Assert;
import junit.framework.TestCase;
import junit.textui.TestRunner;
import org.apache.log4j.Logger;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import org.jgroups.View;
import org.jgroups.util.Util;

public class ChannelTrio
extends TestCase {
    private Channel channel1 = null;
    private Channel channel2 = null;
    private Channel channel3 = null;
    static final boolean DEBUG = false;
    static Logger logger = Logger.getLogger((Class)(class$org$jgroups$tests$ChannelTrio == null ? (class$org$jgroups$tests$ChannelTrio = ChannelTrio.class$("org.jgroups.tests.ChannelTrio")) : class$org$jgroups$tests$ChannelTrio));
    String channelName = "ChannelLog4jTest";
    String protocol = null;
    static /* synthetic */ Class class$org$jgroups$tests$ChannelTrio;

    public ChannelTrio(String Name_) {
        super(Name_);
    }

    public String getProtocol() {
        return this.protocol;
    }

    public void setProtocol(String proto) {
        this.protocol = proto;
    }

    public void setUp() {
    }

    public void tearDown() {
    }

    public void testLargeInsertion() {
        int nitems = 10000;
        logger.info((Object)"start testLargeInsertion");
        try {
            logger.info((Object)("Inserting " + nitems + " elements"));
            this.channel1 = new JChannel();
            this.channel1.connect(this.channelName);
            ReadItems rthread1 = new ReadItems(this.channel1, 0, nitems);
            rthread1.start();
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
            ReadItems rthread2 = new ReadItems(this.channel2, 0, nitems);
            rthread2.start();
            this.channel3 = new JChannel();
            this.channel3.connect(this.channelName);
            long start = System.currentTimeMillis();
            for (int i = 0; i < nitems; ++i) {
                this.channel3.send(new Message(null, null, ("Msg #" + i).getBytes()));
            }
            rthread1.join();
            rthread2.join();
            long stop = System.currentTimeMillis();
            logger.info((Object)("Took " + (stop - start) + " msecs"));
            ChannelTrio.assertEquals((int)nitems, (int)rthread1.getNum_items());
            ChannelTrio.assertEquals((int)nitems, (int)rthread2.getNum_items());
            ChannelTrio.assertFalse((boolean)rthread1.isAlive());
            ChannelTrio.assertFalse((boolean)rthread2.isAlive());
            this.channel1.close();
            this.channel1 = null;
            this.channel2.close();
            this.channel2 = null;
            this.channel3.close();
            this.channel3 = null;
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
            ChannelTrio.assertTrue((boolean)false);
        }
        logger.info((Object)"end testLargeInsertion");
    }

    public void testBarrierWithTimeOut() {
        int i;
        int i2;
        logger.info((Object)"start testBarrierWithTimeOut");
        RemoveOneItemWithTimeout[] removersGroupOne = new RemoveOneItemWithTimeout[10];
        RemoveOneItemWithTimeout[] removersGroupTwo = new RemoveOneItemWithTimeout[10];
        int num_dead = 0;
        long timeout = 200L;
        try {
            int i3;
            this.channel1 = new JChannel();
            this.channel1.connect(this.channelName);
            for (i3 = 0; i3 < removersGroupOne.length; ++i3) {
                removersGroupOne[i3] = new RemoveOneItemWithTimeout(this.channel1, i3, timeout);
                removersGroupOne[i3].start();
            }
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
            for (i3 = 0; i3 < removersGroupTwo.length; ++i3) {
                removersGroupTwo[i3] = new RemoveOneItemWithTimeout(this.channel2, i3, timeout);
                removersGroupTwo[i3].start();
            }
            this.channel3 = new JChannel();
            this.channel3.connect(this.channelName);
        }
        catch (Exception e) {
            logger.error((Object)"Problem", (Throwable)e);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 99");
        try {
            this.channel3.send(null, null, new Long(99L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 100");
        try {
            this.channel3.send(null, null, new Long(100L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(1000L);
        num_dead = 0;
        for (int i4 = 0; i4 < removersGroupOne.length; ++i4) {
            logger.info((Object)("removersGroupOne #" + i4 + " is " + (removersGroupOne[i4].isAlive() ? "alive" : "terminated")));
            if (removersGroupOne[i4].isAlive()) continue;
            ++num_dead;
        }
        int num_deadTwo = 0;
        for (i2 = 0; i2 < removersGroupTwo.length; ++i2) {
            logger.info((Object)("removersGroupTwo #" + i2 + " is " + (removersGroupTwo[i2].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i2].isAlive()) continue;
            ++num_deadTwo;
        }
        ChannelTrio.assertEquals((int)2, (int)num_dead);
        ChannelTrio.assertEquals((int)2, (int)num_deadTwo);
        this.channel1.disconnect();
        Util.sleep(2000L);
        for (i2 = 0; i2 < removersGroupOne.length; ++i2) {
            try {
                logger.debug((Object)("Waiting for remover Group One # " + i2 + " to join"));
                removersGroupOne[i2].join();
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i2 = 0; i2 < removersGroupOne.length; ++i2) {
            logger.info((Object)("remover Group One #" + i2 + " is " + (removersGroupOne[i2].isAlive() ? "alive" : "terminated")));
            if (removersGroupOne[i2].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)removersGroupOne.length, (int)num_dead);
        Util.sleep(2000L);
        num_dead = 0;
        logger.info((Object)"though Group One stopped, Group Two shall continue");
        for (i2 = 0; i2 < removersGroupTwo.length; ++i2) {
            logger.info((Object)("removersGroupTwo #" + i2 + " is " + (removersGroupTwo[i2].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i2].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((String)"Readers thread from Group Two stop that should'nt", (int)2, (int)num_dead);
        logger.info((Object)"-- adding element 101");
        try {
            this.channel3.send(null, null, new Long(101L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        logger.info((Object)"-- adding element 102");
        try {
            this.channel3.send(null, null, new Long(102L));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        num_dead = 0;
        logger.info((Object)"Checking only 4 Group Two threads should have stop");
        for (i = 0; i < removersGroupTwo.length; ++i) {
            logger.info((Object)("removersGroupTwo #" + i + " is " + (removersGroupTwo[i].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)2, (int)(num_dead - num_deadTwo));
        this.channel2.close();
        this.channel2 = null;
        Util.sleep(2000L);
        for (i = 0; i < removersGroupTwo.length; ++i) {
            try {
                logger.debug((Object)("Waiting for removersGroupTwo " + i + " to join"));
                removersGroupTwo[i].join();
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i = 0; i < removersGroupTwo.length; ++i) {
            logger.info((Object)("removersGroupTwo #" + i + " is " + (removersGroupTwo[i].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)removersGroupTwo.length, (int)num_dead);
        this.channel3.close();
        logger.info((Object)"end testBarrierWithTimeOut");
    }

    public void testBarrier() {
        int i;
        logger.info((Object)"start testBarrier");
        ReadItems[] removersGroupOne = new ReadItems[10];
        ReadItems[] removersGroupTwo = new ReadItems[10];
        int num_dead = 0;
        try {
            this.channel1 = new JChannel();
            this.channel1.connect(this.channelName);
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        for (int i2 = 0; i2 < removersGroupOne.length; ++i2) {
            removersGroupOne[i2] = new ReadItems(this.channel1, i2, 1);
            removersGroupOne[i2].start();
        }
        try {
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(1000L);
        logger.info((Object)"-- adding Msg #1");
        try {
            this.channel2.send(new Message(null, null, "Msg #1".getBytes()));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(1000L);
        try {
            this.channel3 = new JChannel();
            this.channel3.connect(this.channelName);
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(5000L);
        for (int i3 = 0; i3 < removersGroupTwo.length; ++i3) {
            removersGroupTwo[i3] = new ReadItems(this.channel3, i3, 1);
            removersGroupTwo[i3].start();
        }
        logger.info((Object)"-- adding Msg #2");
        try {
            this.channel2.send(new Message(null, null, "Msg #2".getBytes()));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(2000L);
        num_dead = 0;
        for (int i4 = 0; i4 < removersGroupOne.length; ++i4) {
            logger.info((Object)("removersGroupOne #" + i4 + " is " + (removersGroupOne[i4].isAlive() ? "alive" : "terminated")));
            if (removersGroupOne[i4].isAlive()) continue;
            ++num_dead;
        }
        int num_deadTwo = 0;
        for (int i5 = 0; i5 < removersGroupTwo.length; ++i5) {
            logger.info((Object)("removersGroupTwo #" + i5 + " is " + (removersGroupTwo[i5].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i5].isAlive()) continue;
            ++num_deadTwo;
        }
        ChannelTrio.assertEquals((int)2, (int)num_dead);
        ChannelTrio.assertEquals((int)1, (int)num_deadTwo);
        try {
            logger.info((Object)"-- adding Msg #3");
            this.channel2.send(new Message(null, null, "Msg #3".getBytes()));
            logger.info((Object)"-- adding Msg #4");
            this.channel2.send(new Message(null, null, "Msg #4".getBytes()));
        }
        catch (Exception ex) {
            logger.error((Object)"Problem", (Throwable)ex);
        }
        Util.sleep(2000L);
        num_dead = 0;
        for (i = 0; i < removersGroupOne.length; ++i) {
            logger.info((Object)("removersGroupOne #" + i + " is " + (removersGroupOne[i].isAlive() ? "alive" : "terminated")));
            if (removersGroupOne[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)4, (int)num_dead);
        this.channel1.close();
        for (i = 0; i < removersGroupOne.length; ++i) {
            try {
                removersGroupOne[i].join(1000L);
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i = 0; i < removersGroupOne.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removersGroupOne[i].isAlive() ? "alive" : "terminated")));
            if (removersGroupOne[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)removersGroupOne.length, (int)num_dead);
        num_dead = 0;
        for (i = 0; i < removersGroupTwo.length; ++i) {
            logger.info((Object)("remover #" + i + " is " + (removersGroupTwo[i].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)(num_deadTwo + 2), (int)num_dead);
        this.channel2.close();
        this.channel2 = null;
        this.channel3.close();
        this.channel3 = null;
        for (i = 0; i < removersGroupTwo.length; ++i) {
            try {
                removersGroupTwo[i].join(1000L);
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        num_dead = 0;
        for (i = 0; i < removersGroupTwo.length; ++i) {
            logger.info((Object)("remover Group Two #" + i + " is " + (removersGroupTwo[i].isAlive() ? "alive" : "terminated")));
            if (removersGroupTwo[i].isAlive()) continue;
            ++num_dead;
        }
        ChannelTrio.assertEquals((int)removersGroupTwo.length, (int)num_dead);
        logger.info((Object)"stop testBarrier");
    }

    public void testMultipleWriterMultipleReader() {
        int i;
        int i2;
        int i3;
        int i4;
        logger.info((Object)"start testMultipleWriterMultipleReader");
        int nWriters = 10;
        int nReaders = 10;
        Writer[] adders = new Writer[nWriters];
        Reader[] readersOne = new Reader[nReaders];
        Reader[] readersTwo = new Reader[nReaders];
        boolean num_dead = false;
        boolean num_items = false;
        int[] writes = new int[nWriters];
        int[] readsOne = new int[nReaders];
        int[] readsTwo = new int[nReaders];
        try {
            this.channel2 = new JChannel();
            this.channel2.connect(this.channelName);
            this.channel1 = new JChannel();
            this.channel1.connect(this.channelName);
        }
        catch (Exception e) {
            logger.error((Object)"Problem", (Throwable)e);
        }
        for (i4 = 0; i4 < readersOne.length; ++i4) {
            readersOne[i4] = new Reader(this.channel1, i4, readsOne);
            readersOne[i4].start();
        }
        for (i4 = 0; i4 < readersTwo.length; ++i4) {
            readersTwo[i4] = new Reader(this.channel2, i4, readsTwo);
            readersTwo[i4].start();
        }
        Util.sleep(2000L);
        try {
            this.channel3 = new JChannel();
            this.channel3.connect(this.channelName);
        }
        catch (Exception e) {
            logger.error((Object)"Problem", (Throwable)e);
        }
        for (i3 = 0; i3 < adders.length; ++i3) {
            adders[i3] = new Writer(this.channel3, i3, writes);
            adders[i3].start();
        }
        Util.sleep(10000L);
        for (i3 = 0; i3 < adders.length; ++i3) {
            adders[i3].stopThread();
        }
        Util.sleep(1000L);
        for (i3 = 0; i3 < adders.length; ++i3) {
            try {
                logger.debug((Object)("Waiting for Writer thread " + i3 + " to join"));
                adders[i3].join(1000L);
                logger.info((Object)("adder #" + i3 + " is " + (adders[i3].isAlive() ? "alive" : "terminated")));
                adders[i3] = null;
                continue;
            }
            catch (InterruptedException e) {
                logger.error((Object)"Thread joining() interrupted", (Throwable)e);
            }
        }
        Util.sleep(5000L);
        this.channel2.close();
        this.channel1.close();
        boolean allStopped = true;
        do {
            allStopped = true;
            Util.sleep(2000L);
            for (i2 = 0; i2 < readersOne.length; ++i2) {
                try {
                    logger.debug((Object)("Waiting for ReaderGroupOne thread " + i2 + " to join"));
                    readersOne[i2].join(1000L);
                    if (readersOne[i2].isAlive()) {
                        allStopped = false;
                        logger.info((Object)("reader One #" + i2 + ' ' + readsOne[i2] + " read items"));
                    }
                    logger.info((Object)("reader One #" + i2 + " is " + (readersOne[i2].isAlive() ? "alive" : "terminated")));
                    continue;
                }
                catch (InterruptedException e) {
                    logger.error((Object)"Thread joining() interrupted", (Throwable)e);
                }
            }
        } while (!allStopped);
        allStopped = true;
        do {
            allStopped = true;
            Util.sleep(2000L);
            for (i2 = 0; i2 < readersTwo.length; ++i2) {
                try {
                    logger.debug((Object)("Waiting for ReaderGroupTwo thread " + i2 + " to join"));
                    readersTwo[i2].join(1000L);
                    if (readersTwo[i2].isAlive()) {
                        allStopped = false;
                        logger.info((Object)("reader Two #" + i2 + ' ' + readsTwo[i2] + " read items"));
                    }
                    logger.info((Object)("reader Two #" + i2 + " is " + (readersTwo[i2].isAlive() ? "alive" : "terminated")));
                    continue;
                }
                catch (InterruptedException e) {
                    logger.error((Object)"Thread joining() interrupted", (Throwable)e);
                }
            }
        } while (!allStopped);
        int total_writes = 0;
        for (int i5 = 0; i5 < writes.length; ++i5) {
            total_writes += writes[i5];
        }
        int total_reads = 0;
        for (i = 0; i < readsOne.length; ++i) {
            total_reads += readsOne[i];
        }
        for (i = 0; i < readsTwo.length; ++i) {
            total_reads += readsTwo[i];
        }
        logger.info((Object)("Total writes:" + total_writes));
        logger.info((Object)("Total reads:" + total_reads));
        ChannelTrio.assertEquals((int)(2 * total_writes), (int)total_reads);
        this.channel1.close();
        this.channel2.close();
        this.channel3.close();
        logger.info((Object)"end testMultipleWriterMultipleReader");
    }

    public static void main(String[] args) {
        String[] testCaseName = new String[]{(class$org$jgroups$tests$ChannelTrio == null ? (class$org$jgroups$tests$ChannelTrio = ChannelTrio.class$("org.jgroups.tests.ChannelTrio")) : class$org$jgroups$tests$ChannelTrio).getName()};
        TestRunner.main((String[])testCaseName);
    }

    static /* synthetic */ Class class$(String x0) {
        try {
            return Class.forName(x0);
        }
        catch (ClassNotFoundException x1) {
            throw new NoClassDefFoundError(x1.getMessage());
        }
    }

    class Reader
    extends Thread {
        int rank;
        int num_reads = 0;
        int[] reads = null;
        boolean running = true;
        Channel channel = null;

        Reader(Channel channel, int i, int[] reads) {
            super("Reader thread #" + i);
            this.rank = i;
            this.reads = reads;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            Message msg = null;
            while (this.running) {
                try {
                    Object obj = this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Reader thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    msg = (Message)obj;
                    Long retval = (Long)msg.getObject();
                    logger.debug((Object)("Reader thread #" + this.rank + ": received " + retval));
                    ++this.num_reads;
                    Assert.assertNotNull((Object)retval);
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Reader thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.running = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Reader thread #" + this.rank + ": channel time out but should'nt have..."), (Throwable)e);
                    this.running = false;
                }
                catch (ChannelClosedException e) {
                    this.running = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Reader thread #" + this.rank + ": problem"), (Throwable)e);
                }
            }
            this.reads[this.rank] = this.num_reads;
        }

        void stopThread() {
            this.running = false;
        }
    }

    class Writer
    extends Thread {
        int rank = 0;
        int num_writes = 0;
        boolean running = true;
        int[] writes = null;
        Channel channel = null;

        Writer(Channel channel, int i, int[] writes) {
            super("Writer thread #" + i);
            this.rank = i;
            this.writes = writes;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            while (this.running) {
                try {
                    this.channel.send(null, null, new Long(System.currentTimeMillis()));
                    Util.sleepRandom(50L);
                    ++this.num_writes;
                }
                catch (ChannelException closed) {
                    this.running = false;
                }
                catch (Throwable t) {
                    logger.debug((Object)("ChannelTest.Writer.run(): exception=" + t), t);
                }
            }
            this.writes[this.rank] = this.num_writes;
        }

        void stopThread() {
            this.running = false;
        }
    }

    class RemoveOneItemWithTimeout
    extends Thread {
        Long retval = null;
        int rank = 0;
        long timeout = 0L;
        Channel channel = null;

        RemoveOneItemWithTimeout(Channel channel, int rank, long timeout) {
            super("RemoveOneItemWithTimeout thread #" + rank);
            this.rank = rank;
            this.timeout = timeout;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            boolean finished = false;
            while (!finished) {
                try {
                    Object obj = this.channel.receive(this.timeout);
                    if (obj != null) {
                        if (obj instanceof View) {
                            logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                            continue;
                        }
                        if (!(obj instanceof Message)) continue;
                        Message msg = (Message)obj;
                        this.retval = (Long)msg.getObject();
                        finished = true;
                        logger.debug((Object)("Thread #" + this.rank + " received :" + this.retval));
                        continue;
                    }
                    logger.debug((Object)("Thread #" + this.rank + ": channel read NULL"));
                }
                catch (ChannelNotConnectedException conn) {
                    finished = true;
                }
                catch (TimeoutException e) {
                }
                catch (ChannelClosedException e) {
                    logger.debug((Object)("Thread #" + this.rank + ": channel closed"), (Throwable)e);
                    finished = true;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + " problem"), (Throwable)e);
                    finished = true;
                }
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    class AddOneItem
    extends Thread {
        Long retval = null;
        int rank = 0;
        int iteration = 0;
        Channel channel = null;

        AddOneItem(Channel channel, int rank, int iteration) {
            super("AddOneItem thread #" + rank);
            this.rank = rank;
            this.iteration = iteration;
            this.setDaemon(true);
            this.channel = channel;
        }

        public void run() {
            try {
                for (int i = 0; i < this.iteration; ++i) {
                    this.channel.send(null, null, new Long(this.rank));
                    logger.debug((Object)("Thread #" + this.rank + " added element (" + this.rank + ')'));
                    Util.sleepRandom(100L);
                }
            }
            catch (ChannelException ex) {
                logger.error((Object)("Thread #" + this.rank + ": channel was closed"), (Throwable)ex);
            }
        }
    }

    class RemoveOneItem
    extends Thread {
        private boolean looping = true;
        int rank;
        Long retval = null;

        public RemoveOneItem(int rank) {
            super("RemoveOneItem thread #" + rank);
            this.rank = rank;
            this.setDaemon(true);
        }

        public void stopLooping() {
            this.looping = false;
        }

        public void run() {
            while (this.looping) {
                try {
                    Object obj = ChannelTrio.this.channel1.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    Message msg = (Message)obj;
                    this.looping = false;
                    this.retval = (Long)msg.getObject();
                    logger.debug((Object)("Thread #" + this.rank + ": received " + this.retval));
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.looping = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Thread #" + this.rank + ": channel time out but should'nt have..."), (Throwable)e);
                    this.looping = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)e);
                }
            }
        }

        Long getRetval() {
            return this.retval;
        }
    }

    class ReadItems
    extends Thread {
        private boolean looping = true;
        int num_items = 0;
        int max = 0;
        int rank;
        Channel channel;

        public ReadItems(Channel channel, int rank, int num) {
            super("ReadItems thread #" + rank);
            this.rank = rank;
            this.max = num;
            this.channel = channel;
            this.setDaemon(true);
        }

        public void stopLooping() {
            this.looping = false;
        }

        public void run() {
            while (this.looping) {
                try {
                    Object obj = this.channel.receive(0L);
                    if (obj instanceof View) {
                        logger.info((Object)("Thread #" + this.rank + ":--> NEW VIEW: " + obj));
                        continue;
                    }
                    if (!(obj instanceof Message)) continue;
                    Message msg = (Message)obj;
                    ++this.num_items;
                    if (this.num_items < this.max) continue;
                    this.looping = false;
                }
                catch (ChannelNotConnectedException conn) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)conn);
                    this.looping = false;
                }
                catch (TimeoutException e) {
                    logger.error((Object)("Thread #" + this.rank + ": channel timed out but should'nt have..."), (Throwable)e);
                    this.looping = false;
                }
                catch (ChannelClosedException e) {
                    logger.debug((Object)("Thread #" + this.rank + ": channel closed"), (Throwable)e);
                    this.looping = false;
                }
                catch (Exception e) {
                    logger.error((Object)("Thread #" + this.rank + ": problem"), (Throwable)e);
                    this.looping = false;
                }
            }
        }

        public int getNum_items() {
            return this.num_items;
        }
    }
}

