/*
 * Decompiled with CFR 0.152.
 */
package org.apache.activemq.artemis.core.protocol.mqtt;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnection;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTInterceptor;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolHandler;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTRoutingHandler;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTStateManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MQTTProtocolManager
extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection, MQTTRoutingHandler>
implements NotificationListener {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
    private ActiveMQServer server;
    private final List<MQTTInterceptor> incomingInterceptors = new ArrayList<MQTTInterceptor>();
    private final List<MQTTInterceptor> outgoingInterceptors = new ArrayList<MQTTInterceptor>();
    private int defaultMqttSessionExpiryInterval = -1;
    private int topicAliasMaximum = MQTTUtil.DEFAULT_TOPIC_ALIAS_MAX;
    private int receiveMaximum = MQTTUtil.DEFAULT_RECEIVE_MAXIMUM;
    private int serverKeepAlive = 60;
    private int maximumPacketSize = 0xFFFFFFF;
    private boolean closeMqttConnectionOnPublishAuthorizationFailure = true;
    private boolean allowLinkStealing = true;
    private final MQTTRoutingHandler routingHandler;
    private MQTTStateManager sessionStateManager;

    MQTTProtocolManager(final ActiveMQServer server, List<BaseInterceptor> incomingInterceptors, List<BaseInterceptor> outgoingInterceptors) throws Exception {
        this.server = server;
        this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
        server.getManagementService().addNotificationListener((NotificationListener)this);
        this.routingHandler = new MQTTRoutingHandler(server);
        this.sessionStateManager = MQTTStateManager.getInstance(server);
        server.registerActivateCallback((ActivateCallback)new CleaningActivateCallback(){

            public void deActivate() {
                MQTTStateManager.removeInstance(server);
                MQTTProtocolManager.this.sessionStateManager = null;
            }
        });
    }

    public int getDefaultMqttSessionExpiryInterval() {
        return this.defaultMqttSessionExpiryInterval;
    }

    public MQTTProtocolManager setDefaultMqttSessionExpiryInterval(int sessionExpiryInterval) {
        this.defaultMqttSessionExpiryInterval = sessionExpiryInterval;
        return this;
    }

    public int getTopicAliasMaximum() {
        return this.topicAliasMaximum;
    }

    public MQTTProtocolManager setTopicAliasMaximum(int topicAliasMaximum) {
        this.topicAliasMaximum = topicAliasMaximum;
        return this;
    }

    public int getReceiveMaximum() {
        return this.receiveMaximum;
    }

    public MQTTProtocolManager setReceiveMaximum(int receiveMaximum) {
        this.receiveMaximum = receiveMaximum;
        return this;
    }

    public int getMaximumPacketSize() {
        return this.maximumPacketSize;
    }

    public MQTTProtocolManager setMaximumPacketSize(int maximumPacketSize) {
        this.maximumPacketSize = maximumPacketSize;
        return this;
    }

    public int getServerKeepAlive() {
        return this.serverKeepAlive;
    }

    public MQTTProtocolManager setServerKeepAlive(int serverKeepAlive) {
        this.serverKeepAlive = serverKeepAlive;
        return this;
    }

    public boolean isCloseMqttConnectionOnPublishAuthorizationFailure() {
        return this.closeMqttConnectionOnPublishAuthorizationFailure;
    }

    public void setCloseMqttConnectionOnPublishAuthorizationFailure(boolean closeMqttConnectionOnPublishAuthorizationFailure) {
        this.closeMqttConnectionOnPublishAuthorizationFailure = closeMqttConnectionOnPublishAuthorizationFailure;
    }

    public boolean isAllowLinkStealing() {
        return this.allowLinkStealing;
    }

    public void setAllowLinkStealing(boolean allowLinkStealing) {
        this.allowLinkStealing = allowLinkStealing;
    }

    public void onNotification(Notification notification) {
        if (!(notification.getType() instanceof CoreNotificationType)) {
            return;
        }
        CoreNotificationType type = (CoreNotificationType)notification.getType();
        if (type != CoreNotificationType.SESSION_CREATED) {
            return;
        }
        TypedProperties props = notification.getProperties();
        SimpleString protocolName = props.getSimpleStringProperty(ManagementHelper.HDR_PROTOCOL_NAME);
        if (protocolName == null || !protocolName.toString().startsWith("MQTT")) {
            return;
        }
        int distance = props.getIntProperty(ManagementHelper.HDR_DISTANCE);
        if (distance > 0) {
            String clientId = props.getSimpleStringProperty(ManagementHelper.HDR_CLIENT_ID).toString();
            MQTTConnection mqttConnection = this.sessionStateManager.getConnectedClients().get(clientId);
            if (mqttConnection != null) {
                mqttConnection.destroy();
            }
        }
    }

    public ProtocolManagerFactory getFactory() {
        return new MQTTProtocolManagerFactory();
    }

    public void updateInterceptors(List incoming, List outgoing) {
        this.incomingInterceptors.clear();
        this.incomingInterceptors.addAll(this.getFactory().filterInterceptors(incoming));
        this.outgoingInterceptors.clear();
        this.outgoingInterceptors.addAll(this.getFactory().filterInterceptors(outgoing));
    }

    public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) {
        try {
            MQTTConnection mqttConnection = new MQTTConnection(connection);
            ConnectionEntry entry = new ConnectionEntry((RemotingConnection)mqttConnection, null, System.currentTimeMillis(), this.getServerKeepAlive() == -1 || this.getServerKeepAlive() == 0 ? -1L : (long)this.getServerKeepAlive() * 1500L);
            NettyServerConnection nettyConnection = (NettyServerConnection)connection;
            MQTTProtocolHandler protocolHandler = (MQTTProtocolHandler)nettyConnection.getChannel().pipeline().get(MQTTProtocolHandler.class);
            protocolHandler.setConnection(mqttConnection, entry);
            return entry;
        }
        catch (Exception e) {
            logger.error("Error creating connection entry", (Throwable)e);
            return null;
        }
    }

    public boolean acceptsNoHandshake() {
        return false;
    }

    public void handleBuffer(RemotingConnection connection, ActiveMQBuffer buffer) {
        connection.bufferReceived(connection.getID(), buffer);
    }

    public void addChannelHandlers(ChannelPipeline pipeline) {
        pipeline.addLast(new ChannelHandler[]{MqttEncoder.INSTANCE});
        pipeline.addLast(new ChannelHandler[]{new MqttDecoder(0xFFFFFFF)});
        pipeline.addLast(new ChannelHandler[]{new MQTTProtocolHandler(this.server, this)});
    }

    public boolean isProtocol(byte[] array) {
        ByteBuf buf = Unpooled.wrappedBuffer((byte[])array);
        if (this.readByte(buf) != 16 || !this.validateRemainingLength(buf) || this.readByte(buf) != 0) {
            return false;
        }
        byte b = this.readByte(buf);
        return b != 4 && b != 6 || this.readByte(buf) == 77 && this.readByte(buf) == 81;
    }

    byte readByte(ByteBuf buf) {
        byte b = buf.readByte();
        if (logger.isTraceEnabled()) {
            logger.trace(String.format("%8s", Integer.toBinaryString(b & 0xFF)).replace(' ', '0'));
        }
        return b;
    }

    private boolean validateRemainingLength(ByteBuf buffer) {
        int msb = -128;
        for (int i = 0; i < 4; i = (int)((byte)(i + 1))) {
            if ((this.readByte(buffer) & msb) == msb) continue;
            return true;
        }
        return false;
    }

    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }

    public List<String> websocketSubprotocolIdentifiers() {
        return websocketRegistryNames;
    }

    public MQTTRoutingHandler getRoutingHandler() {
        return this.routingHandler;
    }

    public String invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
        return super.invokeInterceptors(this.incomingInterceptors, (Object)mqttMessage, (RemotingConnection)connection);
    }

    public String invokeOutgoing(MqttMessage mqttMessage, MQTTConnection connection) {
        return super.invokeInterceptors(this.outgoingInterceptors, (Object)mqttMessage, (RemotingConnection)connection);
    }

    public MQTTStateManager getStateManager() {
        return this.sessionStateManager;
    }
}

