/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BinaryProtoLookupService;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.ProducerImpl;
import org.apache.pulsar.client.impl.ProducerResponse;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Queues;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.unix.Errors;
import org.apache.pulsar.shade.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslHandler;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.io.netty.util.concurrent.Promise;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.shade.org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClientCnx
extends PulsarHandler {
    protected final Authentication authentication;
    private State state;
    private final ConcurrentLongHashMap<CompletableFuture<ProducerResponse>> pendingRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<CompletableFuture<BinaryProtoLookupService.LookupDataResult>> pendingLookupRequests = new ConcurrentLongHashMap(16, 1);
    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<BinaryProtoLookupService.LookupDataResult>>>> waitingLookupRequests;
    private final ConcurrentLongHashMap<CompletableFuture<PulsarApi.MessageIdData>> pendingGetLastMessageIdRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<CompletableFuture<Optional<SchemaInfo>>> pendingGetSchemaRequests = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap(16, 1);
    private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap(16, 1);
    private final CompletableFuture<Void> connectionFuture = new CompletableFuture();
    private final ConcurrentLinkedQueue<RequestTime> requestTimeoutQueue = new ConcurrentLinkedQueue();
    private final Semaphore pendingLookupRequestSemaphore;
    private final Semaphore maxLookupRequestSemaphore;
    private final EventLoopGroup eventLoopGroup;
    private static final AtomicIntegerFieldUpdater<ClientCnx> NUMBER_OF_REJECTED_REQUESTS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ClientCnx.class, "numberOfRejectRequests");
    private volatile int numberOfRejectRequests = 0;
    private static int maxMessageSize = 0x500000;
    private final int maxNumberOfRejectedRequestPerConnection;
    private final int rejectedRequestResetTimeSec = 60;
    private final int protocolVersion;
    private final long operationTimeoutMs;
    protected String proxyToTargetBrokerAddress = null;
    protected String remoteHostName = null;
    private boolean isTlsHostnameVerificationEnable;
    private static final DefaultHostnameVerifier HOSTNAME_VERIFIER = new DefaultHostnameVerifier();
    private ScheduledFuture<?> timeoutTask;
    protected AuthenticationDataProvider authenticationDataProvider;
    private static final Logger log = LoggerFactory.getLogger(ClientCnx.class);

    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup) {
        this(conf, eventLoopGroup, Commands.getCurrentProtocolVersion());
    }

    public ClientCnx(ClientConfigurationData conf, EventLoopGroup eventLoopGroup, int protocolVersion) {
        super(conf.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        Preconditions.checkArgument(conf.getMaxLookupRequest() > conf.getConcurrentLookupRequest());
        this.pendingLookupRequestSemaphore = new Semaphore(conf.getConcurrentLookupRequest(), false);
        this.maxLookupRequestSemaphore = new Semaphore(conf.getMaxLookupRequest() - conf.getConcurrentLookupRequest(), false);
        this.waitingLookupRequests = Queues.newConcurrentLinkedQueue();
        this.authentication = conf.getAuthentication();
        this.eventLoopGroup = eventLoopGroup;
        this.maxNumberOfRejectedRequestPerConnection = conf.getMaxNumberOfRejectedRequestPerConnection();
        this.operationTimeoutMs = conf.getOperationTimeoutMs();
        this.state = State.None;
        this.isTlsHostnameVerificationEnable = conf.isTlsHostnameVerificationEnable();
        this.protocolVersion = protocolVersion;
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        this.timeoutTask = this.eventLoopGroup.scheduleAtFixedRate(() -> this.checkRequestTimeout(), this.operationTimeoutMs, this.operationTimeoutMs, TimeUnit.MILLISECONDS);
        if (this.proxyToTargetBrokerAddress == null) {
            if (log.isDebugEnabled()) {
                log.debug("{} Connected to broker", (Object)ctx.channel());
            }
        } else {
            log.info("{} Connected through proxy to target broker at {}", (Object)ctx.channel(), (Object)this.proxyToTargetBrokerAddress);
        }
        ctx.writeAndFlush(this.newConnectCommand()).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
            if (future.isSuccess()) {
                if (log.isDebugEnabled()) {
                    log.debug("Complete: {}", (Object)future.isSuccess());
                }
                this.state = State.SentConnectFrame;
            } else {
                log.warn("Error during handshake", future.cause());
                ctx.close();
            }
        }));
    }

    protected ByteBuf newConnectCommand() throws Exception {
        this.authenticationDataProvider = this.authentication.getAuthData(this.remoteHostName);
        AuthData authData = this.authenticationDataProvider.authenticate(AuthData.of((byte[])AuthData.INIT_AUTH_DATA));
        return Commands.newConnect(this.authentication.getAuthMethodName(), authData, this.protocolVersion, PulsarVersion.getVersion(), this.proxyToTargetBrokerAddress, null, null, null);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        log.info("{} Disconnected", (Object)ctx.channel());
        if (!this.connectionFuture.isDone()) {
            this.connectionFuture.completeExceptionally(new PulsarClientException("Connection already closed"));
        }
        PulsarClientException e = new PulsarClientException("Disconnected from server at " + ctx.channel().remoteAddress());
        this.pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.waitingLookupRequests.forEach(pair -> ((CompletableFuture)((Pair)pair.getRight()).getRight()).completeExceptionally(e));
        this.pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.pendingGetSchemaRequests.forEach((key, future) -> future.completeExceptionally(e));
        this.producers.forEach((id, producer) -> producer.connectionClosed(this));
        this.consumers.forEach((id, consumer) -> consumer.connectionClosed(this));
        this.pendingRequests.clear();
        this.pendingLookupRequests.clear();
        this.waitingLookupRequests.clear();
        this.pendingGetLastMessageIdRequests.clear();
        this.pendingGetTopicsRequests.clear();
        this.producers.clear();
        this.consumers.clear();
        this.timeoutTask.cancel(true);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {} : {}", new Object[]{this.remoteAddress, cause.getClass().getSimpleName(), cause.getMessage(), ClientCnx.isKnownException(cause) ? null : cause});
            this.state = State.Failed;
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", new Object[]{this.remoteAddress, cause.getMessage(), cause});
        }
        ctx.close();
    }

    public static boolean isKnownException(Throwable t) {
        return t instanceof Errors.NativeIoException || t instanceof ClosedChannelException;
    }

    @Override
    protected void handleConnected(PulsarApi.CommandConnected connected) {
        if (this.isTlsHostnameVerificationEnable && this.remoteHostName != null && !this.verifyTlsHostName(this.remoteHostName, this.ctx)) {
            log.warn("[{}] Failed to verify hostname of {}", (Object)this.ctx.channel(), (Object)this.remoteHostName);
            this.ctx.close();
            return;
        }
        Preconditions.checkArgument(this.state == State.SentConnectFrame || this.state == State.Connecting);
        if (connected.hasMaxMessageSize()) {
            if (log.isDebugEnabled()) {
                log.debug("{} Connection has max message size setting, replace old frameDecoder with server frame size {}", (Object)this.ctx.channel(), (Object)connected.getMaxMessageSize());
            }
            maxMessageSize = connected.getMaxMessageSize();
            this.ctx.pipeline().replace("frameDecoder", "newFrameDecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(connected.getMaxMessageSize() + 10240, 0, 4, 0, 4));
        }
        if (log.isDebugEnabled()) {
            log.debug("{} Connection is ready", (Object)this.ctx.channel());
        }
        this.remoteEndpointProtocolVersion = connected.getProtocolVersion();
        this.connectionFuture.complete(null);
        this.state = State.Ready;
    }

    @Override
    protected void handleAuthChallenge(PulsarApi.CommandAuthChallenge authChallenge) {
        Preconditions.checkArgument(authChallenge.hasChallenge());
        Preconditions.checkArgument(authChallenge.getChallenge().hasAuthData() && authChallenge.getChallenge().hasAuthData());
        try {
            AuthData authData = this.authenticationDataProvider.authenticate(AuthData.of((byte[])authChallenge.getChallenge().getAuthData().toByteArray()));
            Preconditions.checkState(!authData.isComplete());
            ByteBuf request = Commands.newAuthResponse(this.authentication.getAuthMethodName(), authData, this.protocolVersion, PulsarVersion.getVersion());
            if (log.isDebugEnabled()) {
                log.debug("{} Mutual auth {}", (Object)this.ctx.channel(), (Object)this.authentication.getAuthMethodName());
            }
            this.ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
                if (!writeFuture.isSuccess()) {
                    log.warn("{} Failed to send request for mutual auth to broker: {}", (Object)this.ctx.channel(), (Object)writeFuture.cause().getMessage());
                    this.connectionFuture.completeExceptionally(writeFuture.cause());
                }
            }));
            this.state = State.Connecting;
        }
        catch (Exception e) {
            log.error("{} Error mutual verify: {}", (Object)this.ctx.channel(), (Object)e);
            this.connectionFuture.completeExceptionally(e);
            return;
        }
    }

    @Override
    protected void handleSendReceipt(PulsarApi.CommandSendReceipt sendReceipt) {
        Preconditions.checkArgument(this.state == State.Ready);
        long producerId = sendReceipt.getProducerId();
        long sequenceId = sendReceipt.getSequenceId();
        long ledgerId = -1L;
        long entryId = -1L;
        if (sendReceipt.hasMessageId()) {
            ledgerId = sendReceipt.getMessageId().getLedgerId();
            entryId = sendReceipt.getMessageId().getEntryId();
        }
        if (ledgerId == -1L && entryId == -1L) {
            log.warn("[{}] Message has been dropped for non-persistent topic producer-id {}-{}", new Object[]{this.ctx.channel(), producerId, sequenceId});
        }
        if (log.isDebugEnabled()) {
            log.debug("{} Got receipt for producer: {} -- msg: {} -- id: {}:{}", new Object[]{this.ctx.channel(), producerId, sequenceId, ledgerId, entryId});
        }
        this.producers.get(producerId).ackReceived(this, sequenceId, ledgerId, entryId);
    }

    @Override
    protected void handleMessage(PulsarApi.CommandMessage cmdMessage, ByteBuf headersAndPayload) {
        ConsumerImpl<?> consumer;
        Preconditions.checkArgument(this.state == State.Ready);
        if (log.isDebugEnabled()) {
            log.debug("{} Received a message from the server: {}", (Object)this.ctx.channel(), (Object)cmdMessage);
        }
        if ((consumer = this.consumers.get(cmdMessage.getConsumerId())) != null) {
            consumer.messageReceived(cmdMessage.getMessageId(), cmdMessage.getRedeliveryCount(), headersAndPayload, this);
        }
    }

    @Override
    protected void handleActiveConsumerChange(PulsarApi.CommandActiveConsumerChange change) {
        ConsumerImpl<?> consumer;
        Preconditions.checkArgument(this.state == State.Ready);
        if (log.isDebugEnabled()) {
            log.debug("{} Received a consumer group change message from the server : {}", (Object)this.ctx.channel(), (Object)change);
        }
        if ((consumer = this.consumers.get(change.getConsumerId())) != null) {
            consumer.activeConsumerChanged(change.getIsActive());
        }
    }

    @Override
    protected void handleSuccess(PulsarApi.CommandSuccess success) {
        long requestId;
        CompletableFuture<ProducerResponse> requestFuture;
        Preconditions.checkArgument(this.state == State.Ready);
        if (log.isDebugEnabled()) {
            log.debug("{} Received success response from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
        if ((requestFuture = this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(null);
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    @Override
    protected void handleGetLastMessageIdSuccess(PulsarApi.CommandGetLastMessageIdResponse success) {
        long requestId;
        CompletableFuture<PulsarApi.MessageIdData> requestFuture;
        Preconditions.checkArgument(this.state == State.Ready);
        if (log.isDebugEnabled()) {
            log.debug("{} Received success GetLastMessageId response from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
        if ((requestFuture = this.pendingGetLastMessageIdRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(success.getLastMessageId());
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    @Override
    protected void handleProducerSuccess(PulsarApi.CommandProducerSuccess success) {
        long requestId;
        CompletableFuture<ProducerResponse> requestFuture;
        Preconditions.checkArgument(this.state == State.Ready);
        if (log.isDebugEnabled()) {
            log.debug("{} Received producer success response from server: {} - producer-name: {}", new Object[]{this.ctx.channel(), success.getRequestId(), success.getProducerName()});
        }
        if ((requestFuture = this.pendingRequests.remove(requestId = success.getRequestId())) != null) {
            requestFuture.complete(new ProducerResponse(success.getProducerName(), success.getLastSequenceId(), success.getSchemaVersion().toByteArray()));
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    @Override
    protected void handleLookupResponse(PulsarApi.CommandLookupTopicResponse lookupResult) {
        long requestId;
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> requestFuture;
        if (log.isDebugEnabled()) {
            log.debug("Received Broker lookup response: {}", (Object)lookupResult.getResponse());
        }
        if ((requestFuture = this.getAndRemovePendingLookupRequest(requestId = lookupResult.getRequestId())) != null) {
            if (requestFuture.isCompletedExceptionally()) {
                if (log.isDebugEnabled()) {
                    log.debug("{} Request {} already timed-out", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
                }
                return;
            }
            if (!lookupResult.hasResponse() || PulsarApi.CommandLookupTopicResponse.LookupType.Failed.equals((Object)lookupResult.getResponse())) {
                if (lookupResult.hasError()) {
                    this.checkServerError(lookupResult.getError(), lookupResult.getMessage());
                    requestFuture.completeExceptionally(this.getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
                } else {
                    requestFuture.completeExceptionally((Throwable)new PulsarClientException.LookupException("Empty lookup response"));
                }
            } else {
                requestFuture.complete(new BinaryProtoLookupService.LookupDataResult(lookupResult));
            }
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
        }
    }

    @Override
    protected void handlePartitionResponse(PulsarApi.CommandPartitionedTopicMetadataResponse lookupResult) {
        long requestId;
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> requestFuture;
        if (log.isDebugEnabled()) {
            log.debug("Received Broker Partition response: {}", (Object)lookupResult.getPartitions());
        }
        if ((requestFuture = this.getAndRemovePendingLookupRequest(requestId = lookupResult.getRequestId())) != null) {
            if (requestFuture.isCompletedExceptionally()) {
                if (log.isDebugEnabled()) {
                    log.debug("{} Request {} already timed-out", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
                }
                return;
            }
            if (!lookupResult.hasResponse() || PulsarApi.CommandPartitionedTopicMetadataResponse.LookupType.Failed.equals((Object)lookupResult.getResponse())) {
                if (lookupResult.hasError()) {
                    this.checkServerError(lookupResult.getError(), lookupResult.getMessage());
                    requestFuture.completeExceptionally(this.getPulsarClientException(lookupResult.getError(), lookupResult.getMessage()));
                } else {
                    requestFuture.completeExceptionally((Throwable)new PulsarClientException.LookupException("Empty lookup response"));
                }
            } else {
                requestFuture.complete(new BinaryProtoLookupService.LookupDataResult(lookupResult.getPartitions()));
            }
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)lookupResult.getRequestId());
        }
    }

    @Override
    protected void handleReachedEndOfTopic(PulsarApi.CommandReachedEndOfTopic commandReachedEndOfTopic) {
        long consumerId = commandReachedEndOfTopic.getConsumerId();
        log.info("[{}] Broker notification reached the end of topic: {}", (Object)this.remoteAddress, (Object)consumerId);
        ConsumerImpl<?> consumer = this.consumers.get(consumerId);
        if (consumer != null) {
            consumer.setTerminated();
        }
    }

    private void addPendingLookupRequests(long requestId, CompletableFuture<BinaryProtoLookupService.LookupDataResult> future) {
        this.pendingLookupRequests.put(requestId, future);
        this.eventLoopGroup.schedule(() -> {
            if (!future.isDone()) {
                future.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(requestId + " lookup request timedout after ms " + this.operationTimeoutMs));
            }
        }, this.operationTimeoutMs, TimeUnit.MILLISECONDS);
    }

    private CompletableFuture<BinaryProtoLookupService.LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> result = this.pendingLookupRequests.remove(requestId);
        if (result != null) {
            Pair<Long, Pair<ByteBuf, CompletableFuture<BinaryProtoLookupService.LookupDataResult>>> firstOneWaiting = this.waitingLookupRequests.poll();
            if (firstOneWaiting != null) {
                this.maxLookupRequestSemaphore.release();
                this.eventLoopGroup.submit(() -> {
                    long newId = (Long)firstOneWaiting.getLeft();
                    CompletableFuture newFuture = (CompletableFuture)((Pair)firstOneWaiting.getRight()).getRight();
                    this.addPendingLookupRequests(newId, newFuture);
                    this.ctx.writeAndFlush(((Pair)firstOneWaiting.getRight()).getLeft()).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
                        if (!writeFuture.isSuccess()) {
                            log.warn("{} Failed to send request {} to broker: {}", new Object[]{this.ctx.channel(), newId, writeFuture.cause().getMessage()});
                            this.getAndRemovePendingLookupRequest(newId);
                            newFuture.completeExceptionally(writeFuture.cause());
                        }
                    }));
                });
            } else {
                this.pendingLookupRequestSemaphore.release();
            }
        }
        return result;
    }

    @Override
    protected void handleSendError(PulsarApi.CommandSendError sendError) {
        log.warn("{} Received send error from server: {} : {}", new Object[]{this.ctx.channel(), sendError.getError(), sendError.getMessage()});
        long producerId = sendError.getProducerId();
        long sequenceId = sendError.getSequenceId();
        switch (sendError.getError()) {
            case ChecksumError: {
                this.producers.get(producerId).recoverChecksumError(this, sequenceId);
                break;
            }
            case TopicTerminatedError: {
                this.producers.get(producerId).terminated(this);
                break;
            }
            default: {
                this.ctx.close();
            }
        }
    }

    @Override
    protected void handleError(PulsarApi.CommandError error) {
        CompletableFuture<ProducerResponse> requestFuture;
        Preconditions.checkArgument(this.state == State.Ready);
        log.warn("{} Received error from server: {}", (Object)this.ctx.channel(), (Object)error.getMessage());
        long requestId = error.getRequestId();
        if (error.getError() == PulsarApi.ServerError.ProducerBlockedQuotaExceededError) {
            log.warn("{} Producer creation has been blocked because backlog quota exceeded for producer topic", (Object)this.ctx.channel());
        }
        if ((requestFuture = this.pendingRequests.remove(requestId)) != null) {
            requestFuture.completeExceptionally(this.getPulsarClientException(error.getError(), error.getMessage()));
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)error.getRequestId());
        }
    }

    @Override
    protected void handleCloseProducer(PulsarApi.CommandCloseProducer closeProducer) {
        log.info("[{}] Broker notification of Closed producer: {}", (Object)this.remoteAddress, (Object)closeProducer.getProducerId());
        long producerId = closeProducer.getProducerId();
        ProducerImpl<?> producer = this.producers.get(producerId);
        if (producer != null) {
            producer.connectionClosed(this);
        } else {
            log.warn("Producer with id {} not found while closing producer ", (Object)producerId);
        }
    }

    @Override
    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer closeConsumer) {
        log.info("[{}] Broker notification of Closed consumer: {}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long consumerId = closeConsumer.getConsumerId();
        ConsumerImpl<?> consumer = this.consumers.get(consumerId);
        if (consumer != null) {
            consumer.connectionClosed(this);
        } else {
            log.warn("Consumer with id {} not found while closing consumer ", (Object)consumerId);
        }
    }

    @Override
    protected boolean isHandshakeCompleted() {
        return this.state == State.Ready;
    }

    public CompletableFuture<BinaryProtoLookupService.LookupDataResult> newLookup(ByteBuf request, long requestId) {
        CompletableFuture<BinaryProtoLookupService.LookupDataResult> future = new CompletableFuture<BinaryProtoLookupService.LookupDataResult>();
        if (this.pendingLookupRequestSemaphore.tryAcquire()) {
            this.addPendingLookupRequests(requestId, future);
            this.ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
                if (!writeFuture.isSuccess()) {
                    log.warn("{} Failed to send request {} to broker: {}", new Object[]{this.ctx.channel(), requestId, writeFuture.cause().getMessage()});
                    this.getAndRemovePendingLookupRequest(requestId);
                    future.completeExceptionally(writeFuture.cause());
                }
            }));
        } else {
            if (log.isDebugEnabled()) {
                log.debug("{} Failed to add lookup-request into pending queue", (Object)requestId);
            }
            if (this.maxLookupRequestSemaphore.tryAcquire()) {
                this.waitingLookupRequests.add(Pair.of(requestId, Pair.of(request, future)));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("{} Failed to add lookup-request into waiting queue", (Object)requestId);
                }
                future.completeExceptionally((Throwable)new PulsarClientException.TooManyRequestsException(String.format("Requests number out of config: There are {%s} lookup requests outstanding and {%s} requests pending.", this.pendingLookupRequests.size(), this.waitingLookupRequests.size())));
            }
        }
        return future;
    }

    public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf request, long requestId) {
        CompletableFuture<List<String>> future = new CompletableFuture<List<String>>();
        this.pendingGetTopicsRequests.put(requestId, future);
        this.ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send request {} to broker: {}", new Object[]{this.ctx.channel(), requestId, writeFuture.cause().getMessage()});
                this.pendingGetTopicsRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        }));
        return future;
    }

    @Override
    protected void handleGetTopicsOfNamespaceSuccess(PulsarApi.CommandGetTopicsOfNamespaceResponse success) {
        CompletableFuture<List<String>> requestFuture;
        Preconditions.checkArgument(this.state == State.Ready);
        long requestId = success.getRequestId();
        List<String> topics = success.getTopicsList();
        if (log.isDebugEnabled()) {
            log.debug("{} Received get topics of namespace success response from server: {} - topics.size: {}", new Object[]{this.ctx.channel(), success.getRequestId(), topics.size()});
        }
        if ((requestFuture = this.pendingGetTopicsRequests.remove(requestId)) != null) {
            requestFuture.complete(topics);
        } else {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)success.getRequestId());
        }
    }

    @Override
    protected void handleGetSchemaResponse(PulsarApi.CommandGetSchemaResponse commandGetSchemaResponse) {
        Preconditions.checkArgument(this.state == State.Ready);
        long requestId = commandGetSchemaResponse.getRequestId();
        CompletableFuture<Optional<SchemaInfo>> future = this.pendingGetSchemaRequests.remove(requestId);
        if (future == null) {
            log.warn("{} Received unknown request id from server: {}", (Object)this.ctx.channel(), (Object)requestId);
            return;
        }
        if (commandGetSchemaResponse.hasErrorCode()) {
            PulsarApi.ServerError rc = commandGetSchemaResponse.getErrorCode();
            if (rc == PulsarApi.ServerError.TopicNotFound) {
                future.complete(Optional.empty());
            } else {
                future.completeExceptionally(this.getPulsarClientException(rc, commandGetSchemaResponse.getErrorMessage()));
            }
        } else {
            future.complete(Optional.of(SchemaInfoUtil.newSchemaInfo(commandGetSchemaResponse.getSchema())));
        }
    }

    Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    ChannelHandlerContext ctx() {
        return this.ctx;
    }

    Channel channel() {
        return this.ctx.channel();
    }

    SocketAddress serverAddrees() {
        return this.remoteAddress;
    }

    CompletableFuture<Void> connectionFuture() {
        return this.connectionFuture;
    }

    CompletableFuture<ProducerResponse> sendRequestWithId(ByteBuf cmd, long requestId) {
        CompletableFuture<ProducerResponse> future = new CompletableFuture<ProducerResponse>();
        this.pendingRequests.put(requestId, future);
        this.ctx.writeAndFlush(cmd).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send request to broker: {}", (Object)this.ctx.channel(), (Object)writeFuture.cause().getMessage());
                this.pendingRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        }));
        this.requestTimeoutQueue.add(new RequestTime(System.currentTimeMillis(), requestId));
        return future;
    }

    public CompletableFuture<PulsarApi.MessageIdData> sendGetLastMessageId(ByteBuf request, long requestId) {
        CompletableFuture<PulsarApi.MessageIdData> future = new CompletableFuture<PulsarApi.MessageIdData>();
        this.pendingGetLastMessageIdRequests.put(requestId, future);
        this.ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send GetLastMessageId request to broker: {}", (Object)this.ctx.channel(), (Object)writeFuture.cause().getMessage());
                this.pendingGetLastMessageIdRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        }));
        return future;
    }

    public CompletableFuture<Optional<SchemaInfo>> sendGetSchema(ByteBuf request, long requestId) {
        CompletableFuture<Optional<SchemaInfo>> future = new CompletableFuture<Optional<SchemaInfo>>();
        this.pendingGetSchemaRequests.put(requestId, future);
        this.ctx.writeAndFlush(request).addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)writeFuture -> {
            if (!writeFuture.isSuccess()) {
                log.warn("{} Failed to send GetSchema request to broker: {}", (Object)this.ctx.channel(), (Object)writeFuture.cause().getMessage());
                this.pendingGetLastMessageIdRequests.remove(requestId);
                future.completeExceptionally(writeFuture.cause());
            }
        }));
        return future;
    }

    private void checkServerError(PulsarApi.ServerError error, String errMsg) {
        if (PulsarApi.ServerError.ServiceNotReady.equals((Object)error)) {
            log.error("{} Close connection because received internal-server error {}", (Object)this.ctx.channel(), (Object)errMsg);
            this.ctx.close();
        } else if (PulsarApi.ServerError.TooManyRequests.equals((Object)error)) {
            long rejectedRequests = NUMBER_OF_REJECTED_REQUESTS_UPDATER.getAndIncrement(this);
            if (rejectedRequests == 0L) {
                this.eventLoopGroup.schedule(() -> NUMBER_OF_REJECTED_REQUESTS_UPDATER.set(this, 0), 60L, TimeUnit.SECONDS);
            } else if (rejectedRequests >= (long)this.maxNumberOfRejectedRequestPerConnection) {
                log.error("{} Close connection because received {} rejected request in {} seconds ", new Object[]{this.ctx.channel(), NUMBER_OF_REJECTED_REQUESTS_UPDATER.get(this), 60});
                this.ctx.close();
            }
        }
    }

    private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
        ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");
        SSLSession sslSession = null;
        if (sslHandler != null) {
            sslSession = ((SslHandler)sslHandler).engine().getSession();
            if (log.isDebugEnabled()) {
                log.debug("Verifying HostName for {}, Cipher {}, Protocols {}", new Object[]{hostname, sslSession.getCipherSuite(), sslSession.getProtocol()});
            }
            return HOSTNAME_VERIFIER.verify(hostname, sslSession);
        }
        return false;
    }

    void registerConsumer(long consumerId, ConsumerImpl<?> consumer) {
        this.consumers.put(consumerId, consumer);
    }

    void registerProducer(long producerId, ProducerImpl<?> producer) {
        this.producers.put(producerId, producer);
    }

    void removeProducer(long producerId) {
        this.producers.remove(producerId);
    }

    void removeConsumer(long consumerId) {
        this.consumers.remove(consumerId);
    }

    void setTargetBroker(InetSocketAddress targetBrokerAddress) {
        this.proxyToTargetBrokerAddress = String.format("%s:%d", targetBrokerAddress.getHostString(), targetBrokerAddress.getPort());
    }

    void setRemoteHostName(String remoteHostName) {
        this.remoteHostName = remoteHostName;
    }

    private PulsarClientException getPulsarClientException(PulsarApi.ServerError error, String errorMsg) {
        switch (error) {
            case AuthenticationError: {
                return new PulsarClientException.AuthenticationException(errorMsg);
            }
            case AuthorizationError: {
                return new PulsarClientException.AuthorizationException(errorMsg);
            }
            case ProducerBusy: {
                return new PulsarClientException.ProducerBusyException(errorMsg);
            }
            case ConsumerBusy: {
                return new PulsarClientException.ConsumerBusyException(errorMsg);
            }
            case MetadataError: {
                return new PulsarClientException.BrokerMetadataException(errorMsg);
            }
            case PersistenceError: {
                return new PulsarClientException.BrokerPersistenceException(errorMsg);
            }
            case ServiceNotReady: {
                return new PulsarClientException.LookupException(errorMsg);
            }
            case TooManyRequests: {
                return new PulsarClientException.TooManyRequestsException(errorMsg);
            }
            case ProducerBlockedQuotaExceededError: {
                return new PulsarClientException.ProducerBlockedQuotaExceededError(errorMsg);
            }
            case ProducerBlockedQuotaExceededException: {
                return new PulsarClientException.ProducerBlockedQuotaExceededException(errorMsg);
            }
            case TopicTerminatedError: {
                return new PulsarClientException.TopicTerminatedException(errorMsg);
            }
            case IncompatibleSchema: {
                return new PulsarClientException.IncompatibleSchemaException(errorMsg);
            }
        }
        return new PulsarClientException(errorMsg);
    }

    @VisibleForTesting
    public void close() {
        if (this.ctx != null) {
            this.ctx.close();
        }
    }

    private void checkRequestTimeout() {
        RequestTime request;
        while (!this.requestTimeoutQueue.isEmpty() && (request = this.requestTimeoutQueue.peek()) != null && System.currentTimeMillis() - request.creationTimeMs >= this.operationTimeoutMs) {
            request = this.requestTimeoutQueue.poll();
            CompletableFuture<ProducerResponse> requestFuture = this.pendingRequests.remove(request.requestId);
            if (requestFuture == null || requestFuture.isDone() || !requestFuture.completeExceptionally((Throwable)new PulsarClientException.TimeoutException(request.requestId + " lookup request timedout after ms " + this.operationTimeoutMs))) continue;
            log.warn("{} request {} timed out after {} ms", new Object[]{this.ctx.channel(), request.requestId, this.operationTimeoutMs});
        }
    }

    public static int getMaxMessageSize() {
        return maxMessageSize;
    }

    static class RequestTime {
        long creationTimeMs;
        long requestId;

        public RequestTime(long creationTime, long requestId) {
            this.creationTimeMs = creationTime;
            this.requestId = requestId;
        }
    }

    static enum State {
        None,
        SentConnectFrame,
        Ready,
        Failed,
        Connecting;

    }
}

