/*
 * Decompiled with CFR 0.152.
 */
package cc.iotkit.comp.mqtt;

import cc.iotkit.common.exception.BizException;
import cc.iotkit.comp.IMessageHandler;
import cc.iotkit.comp.model.ReceiveResult;
import cc.iotkit.comp.mqtt.MqttConfig;
import io.netty.handler.codec.mqtt.MqttConnectReturnCode;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttQoS;
import io.vertx.core.AbstractVerticle;
import io.vertx.core.Future;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.net.PemKeyCertOptions;
import io.vertx.mqtt.MqttAuth;
import io.vertx.mqtt.MqttEndpoint;
import io.vertx.mqtt.MqttServer;
import io.vertx.mqtt.MqttServerOptions;
import io.vertx.mqtt.MqttTopicSubscription;
import io.vertx.mqtt.messages.codes.MqttSubAckReasonCode;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MqttVerticle
extends AbstractVerticle {
    private static final Logger log = LoggerFactory.getLogger(MqttVerticle.class);
    private MqttServer mqttServer;
    private final MqttConfig config;
    private IMessageHandler executor;
    private final Map<String, MqttEndpoint> endpointMap = new HashMap<String, MqttEndpoint>();

    public MqttVerticle(MqttConfig config) {
        this.config = config;
    }

    public void setExecutor(IMessageHandler executor) {
        this.executor = executor;
    }

    @Override
    public void start() throws Exception {
        MqttServerOptions options = new MqttServerOptions().setPort(this.config.getPort());
        if (this.config.isSsl()) {
            options = options.setSsl(true).setKeyCertOptions(new PemKeyCertOptions().setKeyPath(this.config.getSslKey()).setCertPath(this.config.getSslCert()));
        }
        this.mqttServer = MqttServer.create(this.vertx, options);
        this.mqttServer.endpointHandler(endpoint -> {
            log.info("MQTT client:{} request to connect, clean session = {}", (Object)endpoint.clientIdentifier(), (Object)endpoint.isCleanSession());
            MqttAuth auth = endpoint.auth();
            if (auth == null) {
                return;
            }
            String clientId = endpoint.clientIdentifier();
            String authJson = auth.toJson().put("clientid", clientId).toString();
            log.info("MQTT client auth,clientId:{},username:{},password:{}", new Object[]{clientId, auth.getUsername(), auth.getPassword()});
            try {
                this.executor.onReceive(new HashMap(), "auth", authJson, r -> this.endpointMap.put(this.getEndpointKey((ReceiveResult)r), (MqttEndpoint)endpoint));
            }
            catch (Throwable e) {
                log.error("auth failed", e);
                endpoint.reject(MqttConnectReturnCode.CONNECTION_REFUSED_NOT_AUTHORIZED);
                return;
            }
            log.info("MQTT client keep alive timeout = {} ", (Object)endpoint.keepAliveTimeSeconds());
            endpoint.accept(false);
            endpoint.closeHandler(v -> {
                log.warn("client connection closed,clientId:{}", (Object)clientId);
                this.executor.onReceive(new HashMap(), "disconnect", clientId, r -> this.endpointMap.remove(this.getEndpointKey((ReceiveResult)r)));
            }).disconnectMessageHandler(disconnectMessage -> {
                log.info("Received disconnect from client, reason code = {}", (Object)disconnectMessage.code());
                this.executor.onReceive(new HashMap(), "disconnect", clientId, r -> this.endpointMap.remove(this.getEndpointKey((ReceiveResult)r)));
            }).subscribeHandler(subscribe -> {
                ArrayList<MqttSubAckReasonCode> reasonCodes = new ArrayList<MqttSubAckReasonCode>();
                for (MqttTopicSubscription s : subscribe.topicSubscriptions()) {
                    log.info("Subscription for {},with QoS {}", (Object)s.topicName(), (Object)s.qualityOfService());
                    try {
                        HashMap<String, String> head = new HashMap<String, String>();
                        head.put("topic", s.topicName());
                        this.executor.onReceive(head, "subscribe", clientId);
                        reasonCodes.add(MqttSubAckReasonCode.qosGranted(s.qualityOfService()));
                    }
                    catch (Throwable e) {
                        log.error("subscribe failed,topic:" + s.topicName(), e);
                        reasonCodes.add(MqttSubAckReasonCode.NOT_AUTHORIZED);
                    }
                }
                endpoint.subscribeAcknowledge(subscribe.messageId(), reasonCodes, MqttProperties.NO_PROPERTIES);
            }).unsubscribeHandler(unsubscribe -> {
                for (String t : unsubscribe.topics()) {
                    log.info("Unsubscription for {}", (Object)t);
                    try {
                        HashMap<String, String> head = new HashMap<String, String>();
                        head.put("topic", t);
                        this.executor.onReceive(head, "unsubscribe", clientId);
                    }
                    catch (Throwable e) {
                        log.error("unsubscribe failed,topic:" + t, e);
                    }
                }
                endpoint.unsubscribeAcknowledge(unsubscribe.messageId());
            }).publishHandler(message -> {
                String payload = message.payload().toString(Charset.defaultCharset());
                log.info("Received message:{}, with QoS {}", (Object)payload, (Object)message.qosLevel());
                if (StringUtils.isBlank((CharSequence)payload)) {
                    return;
                }
                try {
                    HashMap<String, String> head = new HashMap<String, String>();
                    head.put("topic", message.topicName());
                    this.executor.onReceive(head, "", payload);
                    if (message.qosLevel() == MqttQoS.AT_LEAST_ONCE) {
                        endpoint.publishAcknowledge(message.messageId());
                    } else if (message.qosLevel() == MqttQoS.EXACTLY_ONCE) {
                        endpoint.publishReceived(message.messageId());
                    }
                }
                catch (Throwable e) {
                    log.error("handler message failed,topic:" + message.topicName(), e);
                }
            }).publishReleaseHandler(endpoint::publishComplete);
        }).listen(ar -> {
            if (ar.succeeded()) {
                log.info("MQTT server is listening on port " + ((MqttServer)ar.result()).actualPort());
            } else {
                log.error("Error on starting the server", ar.cause());
            }
        });
    }

    @Override
    public void stop() throws Exception {
        for (MqttEndpoint endpoint : this.endpointMap.values()) {
            this.executor.onReceive(new HashMap(), "disconnect", endpoint.clientIdentifier());
        }
        this.mqttServer.close(voidAsyncResult -> log.info("close mqtt server..."));
    }

    private String getEndpointKey(ReceiveResult result) {
        return this.getEndpointKey(result.getProductKey(), result.getDeviceName());
    }

    private String getEndpointKey(String productKey, String deviceName) {
        return String.format("%s_%s", productKey, deviceName);
    }

    public boolean exist(String productKey, String deviceName) {
        return this.endpointMap.containsKey(this.getEndpointKey(productKey, deviceName));
    }

    public void publish(String productKey, String deviceName, String topic, String msg) {
        MqttEndpoint endpoint = this.endpointMap.get(this.getEndpointKey(productKey, deviceName));
        if (endpoint == null) {
            throw new BizException("endpoint does not exist");
        }
        Future<Integer> result = endpoint.publish(topic, Buffer.buffer(msg), MqttQoS.AT_LEAST_ONCE, false, false);
        result.onFailure(e -> log.error("public topic failed", e));
        result.onSuccess(integer -> log.info("publish success,topic:{},payload:{}", (Object)topic, (Object)msg));
    }
}

