/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flume.api;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.FlumeException;
import org.apache.flume.api.AbstractRpcClient;
import org.apache.flume.api.HostInfo;
import org.apache.flume.api.RpcClient;
import org.apache.flume.api.RpcClientFactory;
import org.apache.flume.util.OrderSelector;
import org.apache.flume.util.RandomOrderSelector;
import org.apache.flume.util.RoundRobinOrderSelector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LoadBalancingRpcClient
extends AbstractRpcClient {
    private static final Logger LOGGER = LoggerFactory.getLogger(LoadBalancingRpcClient.class);
    private List<HostInfo> hosts;
    private HostSelector selector;
    private Map<String, RpcClient> clientMap;
    private Properties configurationProperties;
    private volatile boolean isOpen = false;

    @Override
    public void append(Event event) throws EventDeliveryException {
        this.throwIfClosed();
        boolean eventSent = false;
        Iterator<HostInfo> it = this.selector.createHostIterator();
        while (it.hasNext()) {
            HostInfo host = it.next();
            try {
                RpcClient client = this.getClient(host);
                client.append(event);
                eventSent = true;
                break;
            }
            catch (Exception ex) {
                this.selector.informFailure(host);
                LOGGER.warn("Failed to send event to host " + host, (Throwable)ex);
            }
        }
        if (!eventSent) {
            throw new EventDeliveryException("Unable to send event to any host");
        }
    }

    @Override
    public void appendBatch(List<Event> events) throws EventDeliveryException {
        this.throwIfClosed();
        boolean batchSent = false;
        Iterator<HostInfo> it = this.selector.createHostIterator();
        while (it.hasNext()) {
            HostInfo host = it.next();
            try {
                RpcClient client = this.getClient(host);
                client.appendBatch(events);
                batchSent = true;
                break;
            }
            catch (Exception ex) {
                this.selector.informFailure(host);
                LOGGER.warn("Failed to send batch to host " + host, (Throwable)ex);
            }
        }
        if (!batchSent) {
            throw new EventDeliveryException("Unable to send batch to any host");
        }
    }

    @Override
    public boolean isActive() {
        return this.isOpen;
    }

    private void throwIfClosed() throws EventDeliveryException {
        if (!this.isOpen) {
            throw new EventDeliveryException("Rpc Client is closed");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws FlumeException {
        this.isOpen = false;
        LoadBalancingRpcClient loadBalancingRpcClient = this;
        synchronized (loadBalancingRpcClient) {
            Iterator<String> it = this.clientMap.keySet().iterator();
            while (it.hasNext()) {
                String name = it.next();
                RpcClient client = this.clientMap.get(name);
                if (client != null) {
                    try {
                        client.close();
                    }
                    catch (Exception ex) {
                        LOGGER.warn("Failed to close client: " + name, (Throwable)ex);
                    }
                }
                it.remove();
            }
        }
    }

    @Override
    protected void configure(Properties properties) throws FlumeException {
        this.clientMap = new HashMap<String, RpcClient>();
        this.configurationProperties = new Properties();
        this.configurationProperties.putAll((Map<?, ?>)properties);
        this.hosts = HostInfo.getHostInfoList(properties);
        if (this.hosts.size() < 2) {
            throw new FlumeException("At least two hosts are required to use the load balancing RPC client.");
        }
        String lbTypeName = properties.getProperty("host-selector", "ROUND_ROBIN");
        boolean backoff = Boolean.valueOf(properties.getProperty("backoff", String.valueOf(false)));
        String maxBackoffStr = properties.getProperty("maxBackoff");
        long maxBackoff = 0L;
        if (maxBackoffStr != null) {
            maxBackoff = Long.parseLong(maxBackoffStr);
        }
        if (lbTypeName.equalsIgnoreCase("ROUND_ROBIN")) {
            this.selector = new RoundRobinHostSelector(backoff, maxBackoff);
        } else if (lbTypeName.equalsIgnoreCase("RANDOM")) {
            this.selector = new RandomOrderHostSelector(backoff, maxBackoff);
        } else {
            try {
                Class<?> klass = Class.forName(lbTypeName);
                this.selector = (HostSelector)klass.newInstance();
            }
            catch (Exception ex) {
                throw new FlumeException("Unable to instantiate host selector: " + lbTypeName, ex);
            }
        }
        this.selector.setHosts(this.hosts);
        this.isOpen = true;
    }

    private synchronized RpcClient getClient(HostInfo info) throws FlumeException, EventDeliveryException {
        this.throwIfClosed();
        String name = info.getReferenceName();
        RpcClient client = this.clientMap.get(name);
        if (client == null) {
            client = this.createClient(name);
            this.clientMap.put(name, client);
        } else if (!client.isActive()) {
            try {
                client.close();
            }
            catch (Exception ex) {
                LOGGER.warn("Failed to close client for " + info, (Throwable)ex);
            }
            client = this.createClient(name);
            this.clientMap.put(name, client);
        }
        return client;
    }

    private RpcClient createClient(String referenceName) throws FlumeException {
        Properties props = this.getClientConfigurationProperties(referenceName);
        return RpcClientFactory.getInstance(props);
    }

    private Properties getClientConfigurationProperties(String referenceName) {
        Properties props = new Properties();
        props.putAll((Map<?, ?>)this.configurationProperties);
        props.put("client.type", (Object)RpcClientFactory.ClientType.DEFAULT);
        props.put("hosts", referenceName);
        return props;
    }

    private static class RandomOrderHostSelector
    implements HostSelector {
        private OrderSelector<HostInfo> selector;

        RandomOrderHostSelector(boolean backoff, Long maxBackoff) {
            this.selector = new RandomOrderSelector<HostInfo>(backoff);
            if (maxBackoff != 0L) {
                this.selector.setMaxTimeOut(maxBackoff);
            }
        }

        @Override
        public synchronized Iterator<HostInfo> createHostIterator() {
            return this.selector.createIterator();
        }

        @Override
        public synchronized void setHosts(List<HostInfo> hosts) {
            this.selector.setObjects(hosts);
        }

        @Override
        public void informFailure(HostInfo failedHost) {
            this.selector.informFailure(failedHost);
        }
    }

    private static class RoundRobinHostSelector
    implements HostSelector {
        private OrderSelector<HostInfo> selector;

        RoundRobinHostSelector(boolean backoff, long maxBackoff) {
            this.selector = new RoundRobinOrderSelector<HostInfo>(backoff);
            if (maxBackoff != 0L) {
                this.selector.setMaxTimeOut(maxBackoff);
            }
        }

        @Override
        public synchronized Iterator<HostInfo> createHostIterator() {
            return this.selector.createIterator();
        }

        @Override
        public synchronized void setHosts(List<HostInfo> hosts) {
            this.selector.setObjects(hosts);
        }

        @Override
        public synchronized void informFailure(HostInfo failedHost) {
            this.selector.informFailure(failedHost);
        }
    }

    public static interface HostSelector {
        public void setHosts(List<HostInfo> var1);

        public Iterator<HostInfo> createHostIterator();

        public void informFailure(HostInfo var1);
    }
}

