/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BinaryProtoLookupService
implements LookupService {
    private final PulsarClientImpl client;
    protected volatile InetSocketAddress serviceAddress;
    private final boolean useTls;
    private final ExecutorService executor;
    private static final Logger log = LoggerFactory.getLogger(BinaryProtoLookupService.class);

    public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, ExecutorService executor) throws PulsarClientException {
        this.client = client;
        this.useTls = useTls;
        this.executor = executor;
        this.updateServiceUrl(serviceUrl);
    }

    @Override
    public void updateServiceUrl(String serviceUrl) throws PulsarClientException {
        try {
            URI uri = new URI(serviceUrl);
            this.serviceAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
        }
        catch (Exception e) {
            log.error("Invalid service-url {} provided {}", new Object[]{serviceUrl, e.getMessage(), e});
            throw new PulsarClientException.InvalidServiceURL(e);
        }
    }

    @Override
    public CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> getBroker(TopicName topicName) {
        return this.findBroker(this.serviceAddress, false, topicName);
    }

    @Override
    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) {
        return this.getPartitionedTopicMetadata(this.serviceAddress, topicName);
    }

    private CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> findBroker(InetSocketAddress socketAddress, boolean authoritative, TopicName topicName) {
        CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>> addressFuture = new CompletableFuture<Pair<InetSocketAddress, InetSocketAddress>>();
        ((CompletableFuture)this.client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
            long requestId = this.client.newRequestId();
            ByteBuf request = Commands.newLookup(topicName.toString(), authoritative, requestId);
            ((CompletableFuture)clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
                URI uri = null;
                try {
                    if (this.useTls) {
                        uri = new URI(lookupDataResult.brokerUrlTls);
                    } else {
                        String serviceUrl = lookupDataResult.brokerUrl;
                        uri = new URI(serviceUrl);
                    }
                    InetSocketAddress responseBrokerAddress = InetSocketAddress.createUnresolved(uri.getHost(), uri.getPort());
                    if (lookupDataResult.redirect) {
                        ((CompletableFuture)this.findBroker(responseBrokerAddress, lookupDataResult.authoritative, topicName).thenAccept(addressPair -> addressFuture.complete((Pair<InetSocketAddress, InetSocketAddress>)addressPair))).exceptionally(lookupException -> {
                            log.warn("[{}] lookup failed : {}", new Object[]{topicName.toString(), lookupException.getMessage(), lookupException});
                            addressFuture.completeExceptionally((Throwable)lookupException);
                            return null;
                        });
                    } else if (lookupDataResult.proxyThroughServiceUrl) {
                        addressFuture.complete(Pair.of(responseBrokerAddress, this.serviceAddress));
                    } else {
                        addressFuture.complete(Pair.of(responseBrokerAddress, responseBrokerAddress));
                    }
                }
                catch (Exception parseUrlException) {
                    log.warn("[{}] invalid url {} : {}", new Object[]{topicName.toString(), uri, parseUrlException.getMessage(), parseUrlException});
                    addressFuture.completeExceptionally(parseUrlException);
                }
            })).exceptionally(sendException -> {
                log.warn("[{}] failed to send lookup request : {}", new Object[]{topicName.toString(), sendException.getMessage(), sendException instanceof ClosedChannelException ? null : sendException});
                addressFuture.completeExceptionally((Throwable)sendException);
                return null;
            });
        })).exceptionally(connectionException -> {
            addressFuture.completeExceptionally((Throwable)connectionException);
            return null;
        });
        return addressFuture;
    }

    private CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(InetSocketAddress socketAddress, TopicName topicName) {
        CompletableFuture<PartitionedTopicMetadata> partitionFuture = new CompletableFuture<PartitionedTopicMetadata>();
        ((CompletableFuture)this.client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
            long requestId = this.client.newRequestId();
            ByteBuf request = Commands.newPartitionMetadataRequest(topicName.toString(), requestId);
            ((CompletableFuture)clientCnx.newLookup(request, requestId).thenAccept(lookupDataResult -> {
                try {
                    partitionFuture.complete(new PartitionedTopicMetadata(lookupDataResult.partitions));
                }
                catch (Exception e) {
                    partitionFuture.completeExceptionally(new PulsarClientException.LookupException(String.format("Failed to parse partition-response redirect=%s , partitions with %s", lookupDataResult.redirect, lookupDataResult.partitions, e.getMessage())));
                }
            })).exceptionally(e -> {
                log.warn("[{}] failed to get Partitioned metadata : {}", new Object[]{topicName.toString(), e.getCause().getMessage(), e});
                partitionFuture.completeExceptionally((Throwable)e);
                return null;
            });
        })).exceptionally(connectionException -> {
            partitionFuture.completeExceptionally((Throwable)connectionException);
            return null;
        });
        return partitionFuture;
    }

    @Override
    public CompletableFuture<Optional<SchemaInfo>> getSchema(TopicName topicName) {
        return this.client.getCnxPool().getConnection(this.serviceAddress).thenCompose(clientCnx -> {
            long requestId = this.client.newRequestId();
            ByteBuf request = Commands.newGetSchema(requestId, topicName.toString(), Optional.empty());
            return clientCnx.sendGetSchema(request, requestId);
        });
    }

    @Override
    public String getServiceUrl() {
        return this.serviceAddress.toString();
    }

    @Override
    public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace, PulsarApi.CommandGetTopicsOfNamespace.Mode mode) {
        CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>();
        AtomicLong opTimeoutMs = new AtomicLong(this.client.getConfiguration().getOperationTimeoutMs());
        Backoff backoff = new Backoff(100L, TimeUnit.MILLISECONDS, opTimeoutMs.get() * 2L, TimeUnit.MILLISECONDS, 0L, TimeUnit.MILLISECONDS);
        this.getTopicsUnderNamespace(this.serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture, mode);
        return topicsFuture;
    }

    private void getTopicsUnderNamespace(InetSocketAddress socketAddress, NamespaceName namespace, Backoff backoff, AtomicLong remainingTime, CompletableFuture<List<String>> topicsFuture, PulsarApi.CommandGetTopicsOfNamespace.Mode mode) {
        ((CompletableFuture)this.client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
            long requestId = this.client.newRequestId();
            ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(namespace.toString(), requestId, mode);
            ((CompletableFuture)clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> {
                if (log.isDebugEnabled()) {
                    log.debug("[namespace: {}] Success get topics list in request: {}", (Object)namespace.toString(), (Object)requestId);
                }
                ArrayList result = Lists.newArrayList();
                topicsList.forEach(topic -> {
                    String filtered = TopicName.get(topic).getPartitionedTopicName();
                    if (!result.contains(filtered)) {
                        result.add(filtered);
                    }
                });
                topicsFuture.complete(result);
            })).exceptionally(e -> {
                topicsFuture.completeExceptionally((Throwable)e);
                return null;
            });
        })).exceptionally(e -> {
            long nextDelay = Math.min(backoff.next(), remainingTime.get());
            if (nextDelay <= 0L) {
                topicsFuture.completeExceptionally(new PulsarClientException.TimeoutException("Could not getTopicsUnderNamespace within configured timeout."));
                return null;
            }
            ((ScheduledExecutorService)this.executor).schedule(() -> {
                log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in {} ms", (Object)namespace, (Object)nextDelay);
                remainingTime.addAndGet(-nextDelay);
                this.getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture, mode);
            }, nextDelay, TimeUnit.MILLISECONDS);
            return null;
        });
    }

    @Override
    public void close() throws Exception {
    }

    public static class LookupDataResult {
        public final String brokerUrl;
        public final String brokerUrlTls;
        public final int partitions;
        public final boolean authoritative;
        public final boolean proxyThroughServiceUrl;
        public final boolean redirect;

        public LookupDataResult(PulsarApi.CommandLookupTopicResponse result) {
            this.brokerUrl = result.getBrokerServiceUrl();
            this.brokerUrlTls = result.getBrokerServiceUrlTls();
            this.authoritative = result.getAuthoritative();
            this.redirect = result.getResponse() == PulsarApi.CommandLookupTopicResponse.LookupType.Redirect;
            this.proxyThroughServiceUrl = result.getProxyThroughServiceUrl();
            this.partitions = -1;
        }

        public LookupDataResult(int partitions) {
            this.partitions = partitions;
            this.brokerUrl = null;
            this.brokerUrlTls = null;
            this.authoritative = false;
            this.proxyThroughServiceUrl = false;
            this.redirect = false;
        }
    }
}

