- 快召唤伙伴们来围观吧
- 微博 QQ QQ空间 贴吧
- 文档嵌入链接
- 复制
- 微信扫一扫分享
- 已成功复制到剪贴板
Apache Pulsar 在开源物联网平台ActorCloud 上的应用
展开查看详情
1 .
2 . Apache Pulsar 在开源 物联⽹网平台 ActorCloud 上的应⽤用 PHOTO Frank Wang EMQ X 解决⽅方案总监
3 .EMQ - 全球开源物联⽹网消息引擎领导者
4 .EMQ 为全球 IoT 和 5G 企业提供优质消息产品与服务
5 .ActorCloud 整体功能 ■ 设备管理理与应⽤用使能平台,提供物联⽹网 PaaS全栈服务 ■ 对跨⽹网络、跨协议、跨⾏行行业、跨⼚厂商的 物联⽹网设备提供统⼀一快速接⼊入能⼒力力 ■ 可定制的设备数据可视化、统计数据展 现能⼒力力 ■ 可灵活定制业务规则,提供设备报警、 电⼦子围栏等实⽤用功能 ■ 提供丰富的北北向接⼝口,⽅方便便快捷地对接 ⾏行行业应⽤用
6 .ActorCloud 物联⽹网融合平台 核⼼心功能1 核⼼心功能2 核⼼心功能3 千万级容量量物联⽹网接⼊入服务 物联⽹网⼀一站式 PaaS 云服务 物联⽹网垂直应⽤用SaaS服务 为物联⽹网终端上云建⽴立可靠双向连接通道,具备 完整AEP和DMP能⼒力力,具备三级管控体系:设备级管 具备⾏行行业垂直应⽤用运⾏行行框架,可通过内置⾏行行业模块, 多协议接⼊入、海海量量连接、消息路路由、数据统⼀一存 理理(终端接⼊入、管理理、操控)、业务级管理理(数据模 直接在本平台搭建垂直⾏行行业应⽤用,也⽀支持通过丰富的 储等能⼒力力 型定义、数据可视化) 规则引擎、平台API、SDK等开发接⼝口为第三⽅方应⽤用开 发提供⽀支撑 ⾼高性能 —— 单机百万级并发,单集群千万级并发,毫秒级时延 ⼀一站式平台 —— 解耦终端和应⽤用,让客户回归核⼼心业务专注应⽤用创新 技 产 术 多协议 —— MQTT、CoAP/LwM2M、LoRaWAN、WebSocket、HTTP 品 超⾼高容量量 —— 千万级接⼊入能⼒力力,满⾜足任意规模的⽹网络需求 特 特 ⽹网络透明 —— 2G/3G/4G/NB-IoT 、LAN、PON、Wifi、Bluetooth 任意云部署 —— 能够在所有主流公有云和私有云上部署运⾏行行 征 征 微服务化 —— 以DevOps为理理念,基于微服务和容器器化实现功能松耦合 中⽴立第三⽅方 —— 不不窃取不不利利⽤用客户数据,保障客户信息安全和数据价值
7 .ActorCloud开源版架构 • 连接与数据采集 • 设备管理理 - 产品管理理(数据流、功能点) - 分组管理理、证书管理理 • 应⽤用管理理 - 应⽤用权限可⾃自定义 - 提供丰富的API:设备管理理 API,数据查询 API,告警管理理 API • ⽤用户管理理 - 租户隔离、租户内⽤用户可分权、分域 • 基于 SQL 可扩展业务规则引擎 • ⼤大数据平台对接
8 .Why Pulsar? • Pulsar Function提供了了基础 Window ⽀支持 • 提供了了 API ⽤用于 Function 的增删改查 • 灵活的订阅模式 - ⽆无状态 Function 采⽤用共享订阅,可⽆无限扩展消费能⼒力力 • 存储与计算分离 - 存储可设置 Retain 时间 - 实时数据和历史数据可通过 Presto SQL 查询,⽅方便便离线分 析
9 .ActorCloud 基于 Pulsar 数据处理理架构 { "id": “mailTest", "sql": "SELECT temp FROM sensor WHERE temp > 0", "enabled": true, "actions": [ { "mail": { "title": “温度预警", "content": “温度为${temp}度,及时处理理", "emails": [ "alert@emqx.io] } } ] }
10 .基于Pulsar Functions的 SQL 规则处理理过程 SQL Analyzer Rule REST {sql, actions… } • 解析SQL,得到 topics,windowConfig 调⽤用 Window Rule Function PulsarAdmin • 标准的 Pulsar function,预先 打包到Pulsar • createFunction( topics, config, … ) 创建 • 初始化时根据 user config 中 • SQL, actions 和 window config 作为 的 SQL 创建SQL processor user config传⼊入 • 运⾏行行时接收 Pulsar 输⼊入,由 • Class固定为 windowRuleFunction SQL processor 处理理后,根据 Actions 定义发往各种 Sink
11 .ActorCloud 数据处理理样例例 - 1 • SELECT、FROM、WHERE、HAVING、GROUP、JOIN • 窗⼝口 固定窗⼝口 TUMBLINGWINDOW ( timeunit, duration ) /* 每五分钟上报⼀一批记录 */ SELECT * FROM sensor GROUP BY TUMBLINGWINDOW ('mi', 5) 跳跃窗⼝口 HOPPINGWINDOW ( timeunit , duration , hopsize ) /* 每隔1分钟,将前5分钟的⼀一批记录上报 */ SELECT * FROM sensor GROUP BY HOPPINGWINDOW('mi', 5, 1) 会话窗⼝口 SESSIONWINDOW ( timeunit , duration , timeout ) /* 从上报第⼀一条温度⼤大于26.5数据开始计时:如果下⼀一条上报数据在1分钟内发⽣生,那么将窗⼝口扩展到包含该数据,否则1分钟后关闭窗 ⼝口;如果每⼀一个下⼀一条数据都在1分钟内发⽣生,则等到10分钟后上报这⼀一批数据 */ SELECT * FROM sensor WHERE temp > 26.5 GROUP BY SESSIONWINDOW('mi', 10, 1) 滑动窗⼝口 SLIDINGWINDOW ( timeunit, duration ) /* 在当前上报数据之前的五分钟内上报温度⼩小于10度的次数⼤大于5次触发 */ SELECT data$$temp AS temp FROM sensor WHERE temp < 10 GROUP BY SLIDINGWINDOW('mi', 5) HAVING COUNT(*) > 5
12 .ActorCloud 数据处理理样例例 - 2 电⼦子围栏 /*设备1或者设备2离开圆形围栏时触发*/ SELECT split_part(getMetadataPropertyValue('/+/tenant_id_1/#', 'topic'), '/' ,5) AS device_id FROM "/+/tenant_id_1/#"WHERE NOT(inCircle(data$$lat, data$$lng, 39.9234, 118.3845, 1968)) AND device_id IN (device_id1, device_id2) /*设备1或者设备2进⼊入多边形围栏时触发*/ SELECT split_part(getMetadataPropertyValue('/+/tenant_id_1/#', 'topic'), '/' ,5) AS device_id FROM "/+/tenant_id_1/#"WHERE inPolygon(data$$lat, data$$lng, ‘[[39.944148,116.391279],[39.897416,116.35111], [39.896802,116.495135]]') AND device_id IN (device_id1, device_id2) 列列表展开 /*将消息体中的设备列列表展开后,并且对展开后的记录进⾏行行判断*/ SELECT * FROM topic JOIN LATERAL (SELECT unnest(topic.data$$devices) AS device) AS c WHERE c.device$$data$$mode$$value = 'cold'
13 .测试结果 • ⻘青云北北京3区 • Pulsar:8核、16GB内存 * 2台 • EMQ:8核、8GB * 2台 • 测试⼯工具:XMeter (JMeter + MQTT Plugin) • 消息吞吐量量:60k/s
14 .资源 • Github:https://github.com/actorcloud/ActorCloud • Docs:https://docs.actorcloud.io • Download:https://github.com/actorcloud/ActorCloud/releases • Installation:https://docs.actorcloud.io/zh/installation/base.html • Quick start:https://docs.actorcloud.io/en/getting_started/quick_start.html • Demo:https://demo.actorcloud.io/
15 . 微信公众号 Thanks!