SignalR流 是SignalR库的最新补充,它支持在数据可用时立即向客户端发送碎片,而不是等待所有数据可用。在这篇文章中,我们将建立一个用于婴儿监控的小应用程序,使用SignalR流从 Raspberry Pi 上流传摄像机内容。这个工具也会在使用 认知视觉服务 检测到婴儿哭声时向连接的客户端发送通知。
该工具由以下模块组成。
PiMonitRHub是一个流媒体中心,持有流媒体方法 s tartstream 和 s topstream 。当SignalR客户端调用 startstream
方法时,它会调用相机服务来捕获照片,并通过写入 ChannelWriter
来将其发送到客户端。每当一个对象被写入 ChannelWriter,
中,该对象就会立即被发送到客户端。最后, ChannelWriter
完成,告诉客户端流被 writer.TryComplete
方法关闭。
public class PiMonitRHub : Hub
{
internal static bool _isStreamRunning = false;
private readonly PiCameraService _piCameraService;
public PiMonitRHub(PiCameraService piCameraService)
{
_piCameraService = piCameraService;
}
public ChannelReader<object> StartStream(CancellationToken cancellationToken)
{
var channel = Channel.CreateUnbounded<object>();
_isStreamRunning = true;
_ = WriteItemsAsync(channel.Writer, cancellationToken);
return channel.Reader;
}
private async Task WriteItemsAsync(ChannelWriter<object> writer, CancellationToken cancellationToken)
{
try
{
while (_isStreamRunning)
{
cancellationToken.ThrowIfCancellationRequested();
await writer.WriteAsync(await _piCameraService.CapturePictureAsByteArray());
await Task.Delay(100, cancellationToken);
}
}
catch (Exception ex)
{
writer.TryComplete(ex);
}
writer.TryComplete();
}
public void StopStream()
{
_isStreamRunning = false;
Clients.All.SendAsync("StopStream");
}
}
PiMonitRWorker是一个继承自后台服务的工人服务。每当一个应用程序被启动时,它就会启动新的线程,并在频繁的间隔时间内执行 ExecuteAsync
方法内的逻辑,直到 cancellationtoken
被请求。
internal class PiMonitRWorker : BackgroundService
{
private readonly IHubContext<PiMonitRHub> _piMonitRHub;
private readonly PiCameraService _piCameraService;
private readonly FaceClientCognitiveService _faceClientCognitiveService;
public PiMonitRWorker(IHubContext<PiMonitRHub> piMonitRHub,
PiCameraService piCameraService, FaceClientCognitiveService faceClientCognitiveService)
{
_piMonitRHub = piMonitRHub;
_piCameraService = piCameraService;
_faceClientCognitiveService = faceClientCognitiveService;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
if (!PiMonitRHub._isStreamRunning)
{
var stream = await _piCameraService.CapturePictureAsStream();
if (await _faceClientCognitiveService.IsCryingDetected(stream))
{
await _piMonitRHub.Clients.All.SendAsync("ReceiveNotification", "Baby Crying Detected! You want to start streaming?");
}
}
//Run the background service for every 10 seconds
await Task.Delay(10000);
}
}
}
在这个工作者服务中,它使用相机服务捕捉照片并将其发送到 认知服务API 来检测婴儿的哭声。如果检测到婴儿哭声,通知中心方法将向所有连接的客户端广播通知信息。如果客户端已经在观看流媒体,这个后台服务将不会检测到婴儿哭声,直到用户停止观看流媒体,以避免对用户的重复通知。
微软认知服务API是一个非常强大的API,在几行代码中就能提供人工智能的力量。有各种认知服务API可用。在这个应用程序中,我将使用认知视觉API来检测脸部情绪,看看婴儿是否在哭。这个API将分析给定的照片,以检测、识别人脸并分析情绪脸的属性,如微笑、悲伤等。最重要的是,这项服务有一个免费层级,每分钟可以打20个电话,所以我们可以不用支付任何费用就可以开始。
在Azure门户中注册认知服务后,你将从门户中获得API端点和密钥。
你可以把 Keys
和 EndPointURL
存储到UserSecrets/AppSettings/Azure Key Vault中,这样我们就可以从配置API中访问它。
public class FaceClientCognitiveService
{
private readonly IFaceClient faceClient;
private readonly float scoreLimit = 0.5f;
private readonly ILogger<FaceClientCognitiveService> _logger;
public FaceClientCognitiveService(IConfiguration config, ILogger<FaceClientCognitiveService> logger)
{
_logger = logger;
faceClient = new FaceClient(new ApiKeyServiceClientCredentials(config["SubscriptionKey"]),
new System.Net.Http.DelegatingHandler[] { });
faceClient.Endpoint = config["FaceEndPointURL"];
}
public async Task<bool> IsCryingDetected(Stream stream)
{
IList<FaceAttributeType> faceAttributes = new FaceAttributeType[]
{
FaceAttributeType.Emotion
};
// Call the Face API.
try
{
IList<DetectedFace> faceList = await faceClient.Face.DetectWithStreamAsync(stream, false, false, faceAttributes);
if (faceList.Count > 0)
{
var face = faceList[0];
if (face.FaceAttributes.Emotion.Sadness >= scoreLimit ||
face.FaceAttributes.Emotion.Anger >= scoreLimit ||
face.FaceAttributes.Emotion.Fear >= scoreLimit)
{
_logger.LogInformation($"Crying Detected with the score of {face.FaceAttributes.Emotion.Sadness}");
return true;
}
else
{
_logger.LogInformation($"Crying Not Detected with the score of {face.FaceAttributes.Emotion.Sadness}");
}
}
else
{
_logger.LogInformation("No Face Detected");
}
}
catch (Exception e)
{
_logger.LogError(e.Message);
}
return false;
}
}
FaceClient
。 我的Raspberry Pi运行的是Raspian操作系统,它是基于Linux ARM架构的。相机模块有一个内置的命令行工具,叫做 raspistill
来拍摄图片。然而,我想使用一些C#封装库来从Pi上捕捉图片,并发现了这个奇妙的开源项目,叫做 MMALSharp ,它是Raspberry Pi摄像头的非官方C# API,它支持Mono 4.x和.NET Standard 2.0。
我安装了MMALSharp的nuget包,并在构造函数中启动了单子对象,以便在连续拍摄图片的时候可以重复使用。我还将图片的分辨率设置为640*480,因为默认的分辨率非常高,文件大小也很大。
public class PiCameraService
{
public MMALCamera MMALCamera;
private readonly string picStoragePath = "/home/pi/images/";
private readonly string picExtension = "jpg";
public PiCameraService()
{
MMALCamera = MMALCamera.Instance;
//Setting the Average resolution for reducing the file size
MMALCameraConfig.StillResolution = new Resolution(640, 480);
}
public async Task<byte[]> CapturePictureAsByteArray()
{
var fileName = await CapturePictureAndGetFileName();
string filePath = Path.Join(picStoragePath, $"{fileName}.{picExtension}");
byte[] resultData = await File.ReadAllBytesAsync(filePath);
//Delete the captured picture from PI storage
File.Delete(filePath);
return resultData;
}
public async Task<Stream> CapturePictureAsStream()
{
return new MemoryStream(await CapturePictureAsByteArray());
}
private async Task<string> CapturePictureAndGetFileName()
{
string fileName = null;
using (var imgCaptureHandler = new ImageStreamCaptureHandler(picStoragePath, picExtension))
{
await MMALCamera.TakePicture(imgCaptureHandler, MMALEncoding.JPEG, MMALEncoding.I420);
fileName = imgCaptureHandler.GetFilename();
}
return fileName;
}
}
现在我们已经完成了服务器端应用程序的编码,我们的下一步是将其部署到Raspberry Pi。为了将应用程序发布到PI中,有两种不同的方法可以发布它。
我使用了自包含部署,这样所有的依赖都是部署的一部分。下面的发布命令将生成带有所有依赖项的最终输出。
dotnet publish -r linux-arm
你会在bin文件夹下的linux-arm/publish文件夹中找到最终的输出。我使用网络文件共享来复制文件到Raspberry Pi中。
在所有文件复制完毕后,我通过远程连接连接我的Raspberry Pi,并在终端使用以下命令运行该应用程序。
我决定使用chrome extension作为我的SignalR客户端,因为它支持实时通知,而且,它不需要任何服务器来承载这个应用程序。在这个客户端应用程序中,我有一个后台脚本,它将初始化SignalR与集线器的连接,并在后台运行以接收来自集线器的任何通知。它也有一个弹出窗口,其中有一个开始和停止流媒体的按钮来调用流媒体和查看流媒体输出。
manifest.json将定义此扩展所需的背景脚本、图标和权限。
{
"name": "Pi MonitR Client",
"version": "1.0",
"description": "Real time Streaming from Raspnerry PI using SignalR",
"browser_action": {
"default_popup": "popup.html",
"default_icon": {
"16": "images/16.png",
"32": "images/32.png",
"48": "images/48.png",
"128": "images/128.png"
}
},
"icons": {
"16": "images/16.png",
"32": "images/32.png",
"48": "images/48.png",
"128": "images/128.png"
},
"permissions": [
"tabs",
"notifications",
"http://*/*"
],
"background": {
"persistent": true,
"scripts": [
"signalr.js","background.js"
]
},
"manifest_version": 2,
"web_accessible_resources": [
"images/*.png"
]
}
// The following sample code uses modern ECMAScript 6 features
// that aren't supported in Internet Explorer 11.
// To convert the sample for environments that do not support ECMAScript 6,
// such as Internet Explorer 11, use a transpiler such as
// Babel at http://babeljs.io/.
var __awaiter = (this && this.__awaiter) || function (thisArg, _arguments, P, generator) {
return new (P || (P = Promise))(function (resolve, reject) {
function fulfilled(value) { try { step(generator.next(value)); } catch (e) { reject(e); } }
function rejected(value) { try { step(generator["throw"](value)); } catch (e) { reject(e); } }
function step(result) { result.done ? resolve(result.value) : new P(function (resolve) { resolve(result.value); }).then(fulfilled, rejected); }
step((generator = generator.apply(thisArg, _arguments || [])).next());
});
};
const hubUrl = "http://pi:5000/hubs/piMonitR"
var connection = new signalR.HubConnectionBuilder()
.withUrl(hubUrl, { logger: signalR.LogLevel.Information })
.build();
// We need an async function in order to use await, but we want this code to run immediately,
// so we use an "immediately-executed async function"
(() => __awaiter(this, void 0, void 0, function* () {
try {
yield connection.start();
}
catch (e) {
console.error(e.toString());
}
}))();
connection.on("ReceiveNotification", (message) => {
new Notification(message, {
icon: '48.png',
body: message
});
});
chrome.runtime.onConnect.addListener(function (externalPort) {
externalPort.onDisconnect.addListener(function () {
connection.invoke("StopStream").catch(err => console.error(err.toString()));
});
});
background.js将启动SignalR与集线器和定义的URL连接。我们还需要signalr.js在同一个文件夹中。为了得到signalr.js文件,我们需要安装signalr npm包,并从 node_modules\@aspnet\signalr\dist\browser 文件夹中复制signalr.js。
npm install @aspnet/signalr
这个后台脚本将保持我们的signalR客户端处于活动状态,当它收到来自hub的通知时,它将显示为chrome通知,如下所示。
<!doctype html>
<html>
<head>
<title>Pi MonitR Dashboard</title>
<script src="popup.js" type="text/javascript"></script>
</head>
<body>
<h1>Pi MonitR - Stream Dashboard</h1>
<div>
<input type="button" id="streamStartButton" value="Start Streaming" />
<input type="button" id="streamStopButton" value="Stop Streaming" disabled />
</div>
<ul id="logContent"></ul>
<img id="streamContent" width="700" height="400" src="" />
</body>
</html>
当开始流媒体按钮被点击时,弹出的HTML将显示流媒体内容。当停止流媒体按钮被点击时,它将完成流媒体。
var __awaiter = chrome.extension.getBackgroundPage().__awaiter;
var connection = chrome.extension.getBackgroundPage().connection;
document.addEventListener(‘DOMContentLoaded’, function () {
const streamStartButton = document.getElementById(‘streamStartButton’);
const streamStopButton = document.getElementById(‘streamStopButton’);
const streamContent = document.getElementById(‘streamContent’);
const logContent = document.getElementById(‘logContent’);
streamStartButton.addEventListener(“click”, (event) => __awaiter(this, void 0, void 0, function* () {
streamStartButton.setAttribute(“disabled”, “disabled”);
streamStopButton.removeAttribute(“disabled”);
try {
connection.stream(“StartStream”)
.subscribe({
next: (item) => {
streamContent.src = “data:image/jpg;base64,” + item;
},
complete: () => {
var li = document.createElement(“li”);
li.textContent = “Stream completed”;
logContent.appendChild(li);
},
error: (err) => {
var li = document.createElement(“li”);
li.textContent = err;
logContent.appendChild(li);
},
});
}
catch (e) {
console.error(e.toString());
}
event.preventDefault();
}));
streamStopButton.addEventListener(“click”, function () {
streamStopButton.setAttribute(“disabled”, “disabled”);
streamStartButton.removeAttribute(“disabled”);
connection.invoke(“StopStream”).catch(err => console.error(err.toString()));
event.preventDefault();
});
connection.on(“StopStream”, () => {
var li = document.createElement(“li”);
li.textContent = “stream closed”;
logContent.appendChild(li);
streamStopButton.setAttribute(“disabled”, “disabled”);
streamStartButton.removeAttribute(“disabled”);
});
});
当用户点击开始流媒体按钮时,它将调用流媒体中心方法( StartStream
)并订阅它。每当集线器发送数据时,它就会接收内容,并将该值直接设置为图片的src属性。
streamContent.src = "data:image/jpg;base64," + item;
当用户点击停止流媒体按钮时,客户端调用 StopStream
hub方法,将 _isStreamRunning
属性设置为false,这将完成流媒体。
这是一个有趣的项目。我想试验一下SignalR流,结果如我所料。很快,我们将有更多的新东西出现在SignalR中( IAsyncEnumerable
),这将使许多其他实时场景变得更好。我已经在我的 GitHub 仓库中上传了源代码。
编码愉快!