|
@@ -32,6 +32,8 @@ import org.apache.commons.beanutils.BeanUtils;
|
|
|
import javax.script.ScriptEngineManager;
|
|
|
import javax.script.ScriptException;
|
|
|
import java.util.Map;
|
|
|
+import java.util.concurrent.*;
|
|
|
+import java.util.function.Consumer;
|
|
|
|
|
|
@Slf4j
|
|
|
@Data
|
|
@@ -50,6 +52,10 @@ public class DeviceMessageHandler implements IMessageHandler {
|
|
|
|
|
|
private final DeviceRouter deviceRouter;
|
|
|
|
|
|
+ private final ExecutorService executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE,
|
|
|
+ 60L, TimeUnit.SECONDS,
|
|
|
+ new LinkedBlockingQueue<>());
|
|
|
+
|
|
|
@SneakyThrows
|
|
|
public DeviceMessageHandler(DeviceComponentManager deviceComponentManager,
|
|
|
IDeviceComponent component,
|
|
@@ -67,64 +73,78 @@ public class DeviceMessageHandler implements IMessageHandler {
|
|
|
scriptObj = engine.eval(String.format("new (function () {\n%s})()", script));
|
|
|
}
|
|
|
|
|
|
- public ReceiveResult onReceive(Map<String, Object> head, String type, String msg) {
|
|
|
- try {
|
|
|
- ScriptObjectMirror result = (ScriptObjectMirror) invokeMethod("onReceive", head, type, msg);
|
|
|
- log.info("onReceive script result:{}", JsonUtil.toJsonString(result));
|
|
|
- Object rstType = result.get("type");
|
|
|
- if (rstType == null) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- //取脚本执行后返回的数据
|
|
|
- Object data = JsonUtil.toObject((ScriptObjectMirror) result.get("data"));
|
|
|
- if (!(data instanceof Map)) {
|
|
|
- throw new BizException("script result data is incorrect");
|
|
|
- }
|
|
|
-
|
|
|
- Map<String, Object> dataMap = (Map) data;
|
|
|
- //获取动作数据
|
|
|
- Action action = getAction(result.get("action"));
|
|
|
+ public void onReceive(Map<String, Object> head, String type, String msg) {
|
|
|
+ onReceive(head, type, msg, (r) -> {
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
- if ("register".equals(rstType)) {
|
|
|
- //注册数据
|
|
|
- RegisterInfo regInfo = RegisterInfo.from(dataMap);
|
|
|
- if (regInfo == null) {
|
|
|
- return null;
|
|
|
+ public void onReceive(Map<String, Object> head, String type, String msg, Consumer<ReceiveResult> onResult) {
|
|
|
+ executorService.submit(() -> {
|
|
|
+ try {
|
|
|
+ ScriptObjectMirror result = (ScriptObjectMirror) invokeMethod("onReceive", head, type, msg);
|
|
|
+ log.info("onReceive script result:{}", JsonUtil.toJsonString(result));
|
|
|
+ Object rstType = result.get("type");
|
|
|
+ if (rstType == null) {
|
|
|
+ onResult.accept(null);
|
|
|
+ return;
|
|
|
}
|
|
|
- doRegister(regInfo);
|
|
|
- doAction(action);
|
|
|
- return new ReceiveResult(regInfo.getProductKey(), regInfo.getDeviceName(), regInfo);
|
|
|
- } else if ("auth".equals(rstType)) {
|
|
|
- //设备认证
|
|
|
- AuthInfo authInfo = new AuthInfo();
|
|
|
- BeanUtils.populate(authInfo, dataMap);
|
|
|
- doAuth(authInfo);
|
|
|
- doAction(action);
|
|
|
- return new ReceiveResult(authInfo.getProductKey(), authInfo.getDeviceName(), authInfo);
|
|
|
- } else if ("state".equals(rstType)) {
|
|
|
- //设备状态变更
|
|
|
- DeviceState state = DeviceState.from(dataMap);
|
|
|
- if (state == null) {
|
|
|
- return null;
|
|
|
+ //取脚本执行后返回的数据
|
|
|
+ Object data = JsonUtil.toObject((ScriptObjectMirror) result.get("data"));
|
|
|
+ if (!(data instanceof Map)) {
|
|
|
+ throw new BizException("script result data is incorrect");
|
|
|
}
|
|
|
- doStateChange(state);
|
|
|
- doAction(action);
|
|
|
- return new ReceiveResult(state.getProductKey(), state.getDeviceName(), state);
|
|
|
- } else if ("report".equals(rstType)) {
|
|
|
- //上报数据
|
|
|
- DeviceMessage message = new DeviceMessage();
|
|
|
- BeanUtils.populate(message, dataMap);
|
|
|
- doReport(message);
|
|
|
- doAction(action);
|
|
|
- return new ReceiveResult(message.getProductKey(), message.getDeviceName(), message);
|
|
|
- }
|
|
|
|
|
|
- } catch (BizException e) {
|
|
|
- throw e;
|
|
|
- } catch (Throwable e) {
|
|
|
- throw new BizException("receive component message error", e);
|
|
|
- }
|
|
|
- return null;
|
|
|
+ Map<String, Object> dataMap = (Map) data;
|
|
|
+ //获取动作数据
|
|
|
+ Action action = getAction(result.get("action"));
|
|
|
+
|
|
|
+ if ("register".equals(rstType)) {
|
|
|
+ //注册数据
|
|
|
+ RegisterInfo regInfo = RegisterInfo.from(dataMap);
|
|
|
+ if (regInfo == null) {
|
|
|
+ onResult.accept(null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ doRegister(regInfo);
|
|
|
+ doAction(action);
|
|
|
+ onResult.accept(new ReceiveResult(regInfo.getProductKey(), regInfo.getDeviceName(), regInfo));
|
|
|
+ return;
|
|
|
+ } else if ("auth".equals(rstType)) {
|
|
|
+ //设备认证
|
|
|
+ AuthInfo authInfo = new AuthInfo();
|
|
|
+ BeanUtils.populate(authInfo, dataMap);
|
|
|
+ doAuth(authInfo);
|
|
|
+ doAction(action);
|
|
|
+ onResult.accept(new ReceiveResult(authInfo.getProductKey(), authInfo.getDeviceName(), authInfo));
|
|
|
+ return;
|
|
|
+ } else if ("state".equals(rstType)) {
|
|
|
+ //设备状态变更
|
|
|
+ DeviceState state = DeviceState.from(dataMap);
|
|
|
+ if (state == null) {
|
|
|
+ onResult.accept(null);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ doStateChange(state);
|
|
|
+ doAction(action);
|
|
|
+ onResult.accept(new ReceiveResult(state.getProductKey(), state.getDeviceName(), state));
|
|
|
+ return;
|
|
|
+ } else if ("report".equals(rstType)) {
|
|
|
+ //上报数据
|
|
|
+ DeviceMessage message = new DeviceMessage();
|
|
|
+ BeanUtils.populate(message, dataMap);
|
|
|
+ doReport(message);
|
|
|
+ doAction(action);
|
|
|
+ onResult.accept(new ReceiveResult(message.getProductKey(), message.getDeviceName(), message));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (BizException e) {
|
|
|
+ throw e;
|
|
|
+ } catch (Throwable e) {
|
|
|
+ throw new BizException("receive component message error", e);
|
|
|
+ }
|
|
|
+ onResult.accept(null);
|
|
|
+ });
|
|
|
}
|
|
|
|
|
|
private void doRegister(RegisterInfo reg) throws ScriptException, NoSuchMethodException {
|