分类 eKuiper 下的文章

本文使用 Base64,OPENCV,Numpy 和 Image 组件进行数据读取转换。

图片格式

一般在边缘计算端进行推理,不会使用一张一张图片,而是采用视频流读帧的方式。

二进制文件格式

file_bytes = "binary file" // 初始化图片二进制
encoded = base64.decodebytes(file_bytes.encode("ascii")) // 转换成base64
img = Image.open(io.BytesIO(encoded)).resize((width, height)) // 打开图片
input_data = np.array(img) // 转换成 numpy 
image = cv2.cvtColor(input_data, cv2.COLOR_BGR2RGB) // OPENCV 读取图片

图片(二进制)-->base64-->Image-->numpy array-->opencv

Base64 编码格式

如果是 Base64 格式输入,处理方法一样,但是,在输入流的地方,要注意 source 采用 json 编码。

// 代码通用
// 输入格式: {"image": "/9j/4AAQSk....X//2Q=="}

图片(json base64)-->Image-->numpy array-->opencv

eKuiper source 读取源(输入输入)

eKuiper 定义 source 唯一区别就是 format: binary/json,假设都是采用 mqtt broker 作为图片输入。

Binary 二进制格式

输入源定义:

{
   "Name" : "binaryimage",
   "Options" : {
      "confKey" : "edgex_mqtt_broker",
      "datasource" : "image",
      "format" : "binary",
      "type" : "mqtt"
   },
   "Statement" : null,
   "StreamFields" : null,
   "StreamType" : 0
}

输入图片示例代码, GOLANG,这里的 payload 直接是 ReadFile,二进制:

package main

import (
        "fmt"
        "io/ioutil"
        "time"

        mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
        const TOPIC = "binaryimage"

        images := []string{
                "parrot.jpg",
                "owl.jpg",
                "out.jpg",
                // 其他你需要的图像
        }
        opts := mqtt.NewClientOptions().AddBroker("tcp://192.168.123.184:1883")
        client := mqtt.NewClient(opts)
        if token := client.Connect(); token.Wait() && token.Error() != nil {
                panic(token.Error())
        }
        for _, image := range images {
                fmt.Println("Publishing " + image)
                payload, err := ioutil.ReadFile(image)
                if err != nil {
                        fmt.Println(err)
                        continue
                }
                if token := client.Publish(TOPIC, 0, false, payload); token.Wait() && token.Error() != nil {
                        fmt.Println(token.Error())
                } else {
                        fmt.Println("Published " + image)
                }
                time.Sleep(1 * time.Second)
        }
        client.Disconnect(0)
}

Json 格式

输入源定义:

{
   "Name" : "jsonimage",
   "Options" : {
      "confKey" : "edgex_mqtt_broker",
      "datasource" : "image",
      "format" : "json",
      "type" : "mqtt"
   },
   "Statement" : null,
   "StreamFields" : null,
   "StreamType" : 0
}

输入图片为 base64 编码后的结果,json 格式如下,发送到 mqtt broker 即可:

image.json
{
  "image" : "/9j/4AAQSkZJRgABAQEA8ADwAAD/7Ukit.....ucen55+KEpqqGC7pTdx3X//2Q=="
}

eKuiper rule 规则定义

labelImageRknn 函数是由 pyairknn 插件(portable)来实现的逻辑处理。

Json 输入格式处理

这里的 FROM jsonimage 来自上面定义的 source,其他根据自身需要修改即可。labelImageRknn(image) 为 MQTT broker 输出内容作为 rule 的输入,这个有些难以理解,多试几遍就明白了。

{
   "actions" : [
      {
         "mqtt" : {
            "bufferLength" : 1024,
            "enableCache" : false,
            "format" : "json",
            "insecureSkipVerify" : false,
            "omitIfEmpty" : false,
            "protocolVersion" : "3.1.1",
            "qos" : 0,
            "resourceId" : "edgex_mqtt_broker",
            "runAsync" : false,
            "sendSingle" : true,
            "server" : "tcp://edgex-mqtt-broker:1883",
            "topic" : "label"
         }
      }
   ],
   "id" : "rule-mqtt-image",
   "options" : {
      "bufferLength" : 1024,
      "checkpointInterval" : 300000,
      "concurrency" : 1,
      "isEventTime" : false,
      "lateTolerance" : 1000,
      "qos" : 0,
      "restartStrategy" : {
         "attempts" : 0,
         "delay" : 1000,
         "jitter" : 0.1,
         "maxDelay" : 30000,
         "multiplier" : 2
      },
      "sendError" : true,
      "sendMetaToSink" : false
   },
   "sql" : "SELECT labelImageRknn(image) as label FROM jsonimage"
}

二进制输入格式处理

与 Json 格式处理差别只有 labelImageRknn(image) --> labelImageRknn(self),pyairknn 逻辑并没有变化。

{
    "sql" : "SELECT labelImageRknn(self) as label FROM jsonimage"
}

pyairknn 核心处理模块

ekuiper 插件 portable

参照官方 pyai 插件,编写特定格式文件一堆,改几个字符就可以。

实现原理

主要逻辑分析如下几个方面:

初始化 rknn

  • load model;
  • 检查是否符合 RV1126 格式;
  • model 载入时间,如果太长,就需要用整机来预编译model;
  • init runtime,不出错即可。

尤其要注意,这些过程要在 5 秒内完成,因为 eKuiper 超时时间是 5 秒,目前版本还不能对开发者可见修改。这里写一个函数 load_rknn_mode() 来实现清晰的代码。

class LabelImageFunc(Function):

    def __init__(self):
        load_rknn_model()
        pass

    def validate(self, args: List[Any]):
        if len(args) != 1:
            return "invalid arg length"
        return ""

    def exec(self, args: List[Any], ctx: Context):
        logging.debug("executing label")
        return label(args[0])

    def is_aggregate(self):
        return False

核心推理逻辑: label 函数

  • 注意这里的 return top5 格式就是返回给 rule 里面的结果,不支持动态解析,要做好准备;
  • 中间业务逻辑,就是前面说的载入图片到 numpy;
  • 然后给 rknn 模块进行 inference;
  • 最后对推理结果进行格式化处理;
def label(file_bytes):
    logging.info('image object detecting...')

    encoded = base64.decodebytes(file_bytes.encode("ascii"))
    input_data = np.array(Image.open(io.BytesIO(encoded)))

    image = cv2.cvtColor(input_data, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))
    t0 = time.time()
    outputs = rknn_lite.inference(inputs=[image])
    print('rknn inference time: {:.3f}ms'.format((time.time() - t0) * 1000))
    show_outputs(outputs)

    output = outputs[0][0]
    output_sorted = sorted(output, reverse=True)
    labels = load_labels(LABELS)
    top5 = []
    for i in range(5):
        value = output_sorted[i]
        index = np.where(output == value)
        for j in range(len(index)):
            if (i + j) >= 5:
                break
            if value > 0:
                for k in range(len(index[j])):
                    top5.append({"index": labels[index[j][k]], "value": float(value)})
            else:
                top5.append({"index": "-1", "value": 0.0})
    return top5

最终效果

eKuiper 日志打印

如果有错也会在这里出现,便于查看。可以打开 DEBUG 日志,更加详细。

time="2023-02-17 13:16:23" level=info msg="Start source mqttimage instance 0 successfully" file="node/source_node.go:138" rule=rule-mqtt-image
time="2023-02-17 13:16:23" level=info msg="new subscription for topic image, reqId is rule-mqtt-image_mqttimage_0" file="mqtt/mqtt_wrapper.go:220" rule=rule-mqtt-image
time="2023-02-17 13:16:23" level=info msg="Successfully subscribed to topic image." file="source/mqtt_source.go:111" rule=rule-mqtt-image
I:root:image object detecting...
rknn inference time: 7.212ms
mobilenet_v1
-----TOP 5-----
[ 9 83]: 0.2010498046875
[ 9 83]: 0.2010498046875
2023-02-17 13:16:26,316 - /kuiper/plugins/portable/pyairknn/label.py[line:68] - I: image object detecting...

Mqtt 输出

测试阶段,使用 mqtt broker 可以实时看到结果,方便调试。

{
  "label" : [
    {
      "index" : "84:草原鸡、草原松鸡、草原鸡",
      "value" : 0.2236328125
    },
    {
      "index" : "9:母鸡",
      "value" : 0.2010498046875
    },
    {
      "index" : "83:皱褶松鸡、鹧鸪、Bonasa umbellus",
      "value" : 0.2010498046875
    },
    {
      "index" : "9:母鸡",
      "value" : 0.2010498046875
    },
    {
      "index" : "83:皱褶松鸡、鹧鸪、Bonasa umbellus",
      "value" : 0.2010498046875
    },
    {
      "index" : "139:大鸨",
      "value" : 0.10638427734375
    },
    {
      "index" : "87:鹧鸪",
      "value" : 0.0626220703125
    }
  ]
}

YiCLOUD IoT 平台输出

还可以通过脚本或插件方式,将结果输出到 IoT 平台,这里以 YiCLOUD 平台为例。

2023-02-17T14:18:55.png