OPC UA 和 open62541 库简介
- 作者
- 赵曦
- 日期
- 2024/12/31
- 版本
- 2.1
上一篇教程:串口通信模块 ↑
下一篇教程:相机设备 ↓
相关模块: OPC UA 模块
1. 简介
1.1 OPC UA 是什么
OPC UA(全称为 Open Platform Communications Unified Architecture)是一种用于工业和物联网(IoT)应用的开放通信协议和架构。它提供了一种统一的框架,用于在不同设备和系统之间实现数据传输、通信和集成。
OPC UA 的设计目标是建立一种通用的、独立于厂商和平台的通信标准,以实现互操作性和集成性。它提供了一套标准的服务和功能,使不同类型的设备和系统能够相互通信和交换数据,其特点包括:
特点 | 介绍 |
兼容性 | OPC UA 不依赖于特定的硬件、操作系统或网络协议,可以在不同的平台上运行,并与其他通信标准集成 |
安全性 | OPC UA 提供了强大的安全机制,包括身份验证、加密和访问控制,以确保数据和通信的机密性和完整性 |
可扩展 | OPC UA 支持灵活的数据模型和信息建模,可以适应不同应用领域和需求的变化 |
信息建模 | OPC UA 使用统一的信息模型,将数据和功能以标准化的方式表示和描述,使不同系统之间的数据交换更加简化和一致 |
可靠性 | OPC UA 提供了可靠的通信机制,包括消息确认、重试和错误处理,以确保数据的可靠传输 |
1.2 地址空间
在 OPC UA 中,所有的数据都被组织成一个地址空间,地址空间中的每一个元素都被称为一个节点。每个节点都有一个唯一的节点号,在 OPC UA 模块 中表示为 rm::NodeId 。
对象类型节点 rm::ObjectType
提供对象的定义,即对象的抽象,与类相当,且子类可以继承父类的特征,方便模型的扩充。该节点包括对象的各种数据类型,数据的语义,以及控制方式。OPC UA 命名空间 0
中规定了多个基础的对象类型节点。如使用最广的 BaseObjectType(在 RMVL 中表示为 rm::nodeBaseObjectType
),所有对象类型节点都需要继承该节点再进行扩充。在对具体设备建模的过程中,应该将设备组成的各部分分解为不同的对象分别建模,再用引用节点将各部分按照实际设备中的关系相关联,从而得到完整设备的对象类型节点。
对象节点 rm::Object
将对象类型实例化即可得到对象节点,该节点是设备在数字空间的映射。所有对设备数据的访问都能在该模型中访问到对应的数据节点。所有对 设备的控制都转换为方法节点的触发。设备产生的消息在节点对象中将触发对应的事件。
引用类型节点 ReferenceType
引用类型描述了引用的语义,而引用用于定义引用两端的节点之间的关系。最常用的引用类型如 Organizes(在 RMVL 中表示为 rm::nodeOrganizes
),表示节点之间的层级关系,如同文件夹与文件夹内的文件,数据层级复杂的设备,需要通过多种引用类型对设备信息节点之间的关系进行描述。
数据类型节点 rm::DataType
数据类型节点描述了变量节点中变量的数据类型。在 OPC UA 信息模型在命名空间 0
中定义了多种内置的数据类型,包括整型、浮点型、 字符串等多个类型,能对变量的数据进行准确的描述。也可以自定义数据类型,比如描述二维坐标的 2DPoint
等类型,获得更符合数据本身的描述。
- 注解
- 注意:此类节点并不能提供具体的数据构成,只是提供了数据类型的一个描述,因此 RMVL 中的 OPC UA 模块 仅提供内置数据类型。若计划提供数据的构成,比如包含的数据长度等信息,请使用变量类型节点 rm::VariableType 。
变量类型节点 rm::VariableType
该节点提供了对变量节点的定义,是设备中各种数据的抽象。常用引用中的 HasTypeDefinition 引用节点连接数据类型节点,对数据类型进行描述(在 RMVL 中表示为 rm::nodeHasTypeDefinition
)。用 HasProperty 引用节点对数据的语义进行描述(在 RMVL 中表示为 rm::nodeHasProperty
)。也可以使用自定义的数据类型节点对变量的数据进行描述,具有灵活性。
变量节点 rm::Variable 及 rm::DataSourceVariable
该节点是变量类型节点的实例,也是使用的最多的节点。客户端访问设备数据有以下 3 种方式。
访问方式 | 介绍 | 备注 |
直接读写 | 将设备多模态数据写入对应的变量节点,然后客户端读取对应节点内保存的数值 | 如果客户端要获取设备最新的值,需要一直手动去触发对设备数据源的读取请求 |
值回调 | 客户端发起 IO 请求后,服务器在 读取前 和 写入后 分别调用对应的回调函数 | 可以利用此功能在需要访问数据的时候才让服务器更新数据 |
数据源变量节点 | 客户端的读取请求直接重定向到设备的数据源中,即客户端直接从数据源获取数据,变量节点不存储数据 | 缩减了数据先写入变量节点再进行读取的过程,但多个客户端连接访问同一数据时会增大服务器与设备之间的传输负载 |
- 注解
- 前两种访问方式在 OPC UA 模块 中通过 rm::Variable 实现,第三种数据源变量节点在 OPC UA 模块 中通过 rm::DataSourceVariable 实现。
方法节点 rm::Method
方法节点是对设备控制方法在数字模型中的映射。方法节点可以通过服务器或客户端进行调用,然后将会对设备的控制器发送指令,使得设备执行对应的操作。常见的方法节点有:触发视觉采集、电机反转、设备初始化等。
视图节点 rm::View
视图节点可将地址空间中感兴趣的节点提取出来,作为一个子集,视图节点作为该子集的入口,方便客户端浏览。
2. 服务器/客户端
基于服务器/客户端的方式是 OPC UA 最基本的一种通信方式,上文的地址空间在服务器/客户端通信的过程中完全展现出来。下面列举一些 opcua 模块中常用的服务器与客户端通信的内容。
2.1 初始化
服务器
C++
方案一
使用 spin
作为事件循环,并提供全局变量 p_server
用于在信号处理函数中关闭服务器。
#include <csignal>
void onStop(int)
{
if (p_server)
}
int main()
{
signal(SIGINT, onStop);
p_server = &srv;
srv.spin();
}
OPC UA 服务器
定义 server.hpp:128
void shutdown()
停止服务器
定义 server.hpp:186
方案二
使用多线程,主线程负责关闭服务器。
#include <thread>
int main()
{
srv.shutdown();
t.join();
}
方案三
使用 spinOnce
执行单次事件循环,需要自定义主循环,在某个条件下退出循环,并关闭服务器(此时可不必显式调用 shutdown
)
#include <csignal>
bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
while (!stop)
{
srv.spinOnce();
}
srv.shutdown();
}
Python
Python 的信号处理机制依赖于 Python 解释器的 GIL,因此在 Python 中无法通过常规方式结束 spin
(除非直接结束进程),但是可以使用 spinOnce
执行单次事件循环,因此需要自定义主循环。
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
while not stop:
srv.spinOnce()
客户端
C++
#include <rmvl/opcua/client>
int main()
{
}
OPC UA 客户端
定义 client.hpp:118
Python
2.2 变量
在上文 1.2 地址空间 中介绍了变量的 3 种访问方式,这里使用最简单的直接读写的方式。首先在服务器中添加变量节点,后文均采用 while + spinOnce
的方式处理异步事件。
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
srv.addVariableNode(num);
while (!stop)
{
srv.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
OPC UA 变量
定义 variable.hpp:132
std::string browse_name
浏览名称 BrowseName
定义 variable.hpp:255
std::string display_name
展示名称 DisplayName
定义 variable.hpp:264
std::string description
变量的描述
定义 variable.hpp:266
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
"""
RMVL-Python 的 opcua 模块仅支持
- int, float, str, bool
- list[int], list[float]
类型的变量
"""
num.browse_name = "number"
num.display_name = "Number"
num.description = "数字"
svr.addVariableNode(num)
while not stop:
svr.spinOnce()
然后在客户端中直接读取变量节点。
C++
int main()
{
{
ERROR_(
"Failed to read the variable.");
return 0;
}
}
static Tp cast(const rm::Variable &val)
将变量节点转化为指定类型的数据
定义 variable.hpp:200
constexpr bool empty() const
判断变量节点是否为空
定义 variable.hpp:190
#define ERROR_(...)
定义 util.hpp:50
constexpr NodeId nodeObjectsFolder
对象节点:ObjectsFolder 节点 ID
定义 utilities.hpp:174
Python
import rm
node = cli.find("number")
target = cli.read(node)
if target.empty():
print("Failed to read the variable.")
exit(0)
print(target.double())
2.3 方法
在服务器中添加两数之和的方法节点,供客户端调用。
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
int num1 = iargs[0], num2 = iargs[1];
return std::make_pair(true, oargs);
};
srv.addMethodNode(method);
while (!stop)
{
srv.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
OPC UA 方法
定义 method.hpp:48
std::vector< Argument > iargs
传入参数列表
定义 method.hpp:81
std::string description
方法的描述
定义 method.hpp:78
std::string browse_name
浏览名称 BrowseName
定义 method.hpp:66
std::string display_name
展示名称 DisplayName
定义 method.hpp:75
std::vector< Argument > oargs
传出参数列表
定义 method.hpp:84
OPC UA 服务器视图
定义 server.hpp:31
constexpr DataType tpInt32
数据类型:Int32
定义 utilities.hpp:148
std::vector< Variable > Variables
变量列表别名
定义 variable.hpp:302
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
def add(sv, iargs):
num1, num2 = iargs
return True, [oarg]
method.browse_name = "add"
method.display_name = "Add"
method.description = "两数之和"
iarg1.name = "Number 1"
iarg1.type = rm.tp_int
iarg2.name = "Number 2"
iarg2.type = rm.tp_int
method.iargs = [iarg1, iarg2]
oarg.name = "Sum"
oarg.type = rm.tp_int
method.oargs = [oarg]
"""
允许有多个返回值,即 oargs 的长度允许 > 1
"""
svr.addMethodNode(method)
while not stop:
svr.spinOnce()
OPC UA 方法参数信息
定义 method.hpp:29
在客户端调用指定方法。
C++
int main()
{
std::vector<rm::Variable> iargs = {1, 2};
auto [res, oargs] = cli.call("add", iargs);
if (!res)
{
ERROR_(
"Failed to call the method");
return 0;
}
printf("retval = %d\n", oargs.front().cast<int>());
}
Python
import rm
res, oargs = cli.call("add", iargs)
if not res:
print("Failed to call the method")
exit(0)
print(f"retval = {oargs[0].int()}")
2.4 对象
在服务器中添加对象节点:
A
├── B1
│ ├── C1: 3.14
│ └── C2: 666
└── B2
└── C3: "xyz"
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
auto node_a = srv.addObjectNode(a);
srv.addObjectNode(b1, node_a);
srv.addObjectNode(b2, node_a);
while (!stop)
{
srv.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
OPC UA 对象
定义 object.hpp:157
std::string description
对象的描述 - zh-CN
定义 object.hpp:245
std::string display_name
展示名称 DisplayName
定义 object.hpp:243
std::string browse_name
浏览名称 BrowseName
定义 object.hpp:234
void add(const Variable &variable)
添加(额外的)变量节点至 rm::Object 对象中
定义 object.hpp:178
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
a.browse_name = a.description = a.display_name = "A"
node_a = svr.addObjectNode(a)
b1.browse_name = b1.description = b1.display_name = "B1"
c1.browse_name = c1.description = c1.display_name = "C1"
b1.add(c1)
c2.browse_name = c2.description = c2.display_name = "C2"
b1.add(c2)
svr.addObjectNode(b1, node_a)
b2.browse_name = b2.description = b2.display_name = "B2"
c3.browse_name = c3.description = c3.display_name = "C3"
b2.add(c3)
svr.addObjectNode(b2, node_a)
while not stop:
svr.spinOnce()
在客户端寻找 C2
和 C3
并打印。
C++
#include <iostream>
int main()
{
std::cout << c2.
cast<
int>() << std::endl;
auto node_c3 = cli.find("A/B2/C3");
std::cout << c3.
cast<std::string>() << std::endl;
}
Python
import rm
node_c2 = cli.find("A/B1/C2")
c2 = cli.read(node_c2)
print(c2.int())
node_c3 = cli.find("A/B2/C3")
c3 = cli.read(node_c3)
print(c3.str())
2.5 视图
在 nodeObjectsFolder
中先添加 A/num1
、num2
2 个变量节点,并将 num1
和 num2
加入视图,下面的示例演示在 服务器 中创建并添加视图节点。若要在客户端中进行此操作,创建并添加视图节点的步骤基本一致,这里不做展示。需要注意的是,在客户端中创建并添加视图节点,需要提前在服务器中加入对应的(变量、方法、对象……)节点
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
auto node_a = srv.addObjectNode(a);
auto node_num1 = srv.find("A/num1");
auto node_num2 = srv.addVariableNode(num2);
num_view.
add(node_num1, node_num2);
srv.addViewNode(num_view);
while (!stop)
{
srv.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
void add(NodeId_ &&...nds)
添加节点 ID
定义 view.hpp:42
#define uaCreateVariable(val,...)
创建变量,BrowseName、DisplayName、Description 均为变量类型的名称
定义 variable.hpp:297
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
a.browse_name = a.description = a.display_name = "A"
num1.browse_name = num1.description = num1.display_name = "num1"
a.add(num1)
node_a = svr.addObjectNode(a)
node_num1 = svr.find("A/num1")
num2.browse_name = num2.description = num2.display_name = "num2"
node_num2 = svr.addVariableNode(num2)
num_view.add(node_num1, node_num2)
svr.addViewNode(num_view)
while not stop:
svr.spinOnce()
2.6 监视
OPC UA 支持变量节点和事件的监视,下面分别以变量节点和事件的监视为例。
2.6.1 变量监视
首先在服务器中添加待监视的变量节点
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
srv.addVariableNode(num);
while (!stop)
{
srv.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
num.browse_name = "number"
num.display_name = "Number"
num.description = "数字"
svr.addVariableNode(num)
while not stop:
svr.spinOnce()
在客户端 1 中修改变量节点的数据
C++
#include <thread>
using namespace std::chrono_literals;
int main()
{
auto node = cli.find("number");
for (int i = 0; i < 100; ++i)
{
std::this_thread::sleep_for(1s);
bool success = cli.write(node, i + 200);
if (!success)
ERROR_(
"Failed to write data to the variable.");
}
}
Python
import rm
import time
node = cli.find("number")
for i in range(100):
time.sleep(1)
if not success:
print("Failed to write data to the variable.")
然后,在客户端 2 中监视变量节点
C++
int main()
{
auto node = cli.find("number");
int receive_data = value;
printf("Data (n=number) was changed to: %d\n", receive_data);
};
cli.monitor(node, on_change, 5);
cli.spin();
}
OPC UA 客户端视图
定义 client.hpp:31
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
node = cli.find("number")
def on_change(view, value):
receive_data = value.int()
print(f"Data (n=number) was changed to: {receive_data}")
cli.monitor(node, on_change, 5)
while not stop:
cli.spinOnce()
2.6.2 事件监视
事件需要服务器自主触发,在实际应用中,事件的触发可以是设备状态的变化、设备的报警等,例如客户端通过调用方法节点,服务端修改自身状态机状态,任务执行完毕,状态再次发生改变后将触发事件,可以实现同步非阻塞向同步阻塞的转换。例如服务器用于 异步的控制设备启动、关闭 ,客户端通过调用方法节点控制设备启动。
首先在服务器中添加
- 对应的方法节点用于发出启动、关闭指令;
- 实际发出启动、关闭指令的伪代码。
如下所示。
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
enum class OPCUAState
{
NONE,
START,
STOP,
};
int main()
{
OPCUAState mode{};
msg_type_info.
add(
"Result", 0);
if (mode != OPCUAState::NONE)
return {false, {}};
mode = OPCUAState::START;
return {true, {}};
};
if (mode != OPCUAState::NONE)
return {false, {}};
mode = OPCUAState::STOP;
return {true, {}};
};
signal(SIGINT, [](int) { stop = true; });
srv.addEventTypeNode(msg_type_info);
srv.addMethodNode(start_info);
while (!stop)
{
srv.spinOnce();
if (mode == OPCUAState::START)
{
if (true)
{
msg_info.message = "Msg_Start";
msg_info["Result"] = 0;
srv.triggerEvent(msg_info);
mode = OPCUAState::NONE;
}
}
else if (mode == OPCUAState::STOP)
{
if (true)
{
msg_info.message = "Msg_Stop";
msg_info["Result"] = 0;
srv.triggerEvent(msg_info);
mode = OPCUAState::NONE;
}
}
}
}
OPC UA 事件类型
定义 event.hpp:24
std::string display_name
展示名称 DisplayName
定义 event.hpp:72
std::string browse_name
浏览名称 BrowseName
定义 event.hpp:63
void add(const std::string &browse_name, int prop)
添加非默认属性至事件类型中
定义 event.hpp:35
std::string description
对象类型的描述 - zh-CN
定义 event.hpp:74
static Event makeFrom(const EventType &etype)
从事件类型创建新的事件
定义 event.hpp:93
Python
from signal import signal, SIGINT
from enum import Enum
import rm
stop = False
class OPCUAState(Enum):
NONE = 0
START = 1
STOP = 2
mode = OPCUAState.NONE
msg_type_info.browse_name = "msg_type"
msg_type_info.display_name = "MsgType"
msg_type_info.description = "任务执行完成时触发的事件"
msg_type_info.add("Result", 0)
def start_cb(sv, iargs):
global mode
if mode != OPCUAState.NONE:
return False, {}
mode = OPCUAState.START
return True, {}
start_info.browse_name = "start"
start_info.display_name = "Start"
start_info.description = "启动设备"
def stop_cb(sv, iargs):
global mode
if mode != OPCUAState.NONE:
return False, {}
mode = OPCUAState.STOP
return True, {}
stop_info.browse_name = "stop"
stop_info.display_name = "Stop"
stop_info.description = "关闭设备"
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
svr.addEventTypeNode(msg_type_info)
svr.addMethodNode(start_info)
while not stop:
svr.spinOnce()
if mode == OPCUAState.START:
"""
code
"""
if True:
msg_info.message = "Msg_Start"
msg_info["Result"] = 0
svr.triggerEvent(msg_info)
mode = OPCUAState.NONE
elif mode == OPCUAState.STOP:
"""
code
"""
if True:
msg_info.message = "Msg_Stop"
msg_info["Result"] = 0
svr.triggerEvent(msg_info)
mode = OPCUAState.NONE
正常情况下,客户端调用方法节点会立刻返回,如以下代码
C++
int main()
{
auto node = cli.find("start");
auto [res, oargs] = cli.call(node, {});
if (!res)
ERROR_(
"Failed to call the method");
}
Python
import rm
node = cli.find("start")
res, oargs = cli.call(node, [])
if not res:
print("Failed to call the method")
此时返回的结果表示该方法节点是否调用成功,并非任务执行结果,而真实的执行结果在多个事件循环后才能得到。不过服务器在任务执行完成后会触发事件,客户端可以通过监视事件来获取任务执行结果,如以下代码。
C++
class OpcUaController
{
public:
OpcUaController(std::string_view addr) : _cli(addr) {
if (vals[0] == "Msg_Start")
_start_res = (vals[1] == 0);
else if (vals[0] == "Msg_Stop")
_stop_res = (vals[1] == 0);
});
}
bool start()
{
auto [res, oargs] = _cli.call("Start", {});
if (!res)
{
printf("Failed to call start\n");
return false;
}
while (!_start_res.has_value())
_cli.spinOnce();
return _start_res.value();
}
bool stop()
{
auto [res, oargs] = _cli.call("Stop", {});
if (!res)
{
printf("Failed to call stop\n");
return false;
}
while (!_stop_res.has_value())
_cli.spinOnce();
return _stop_res.value();
}
private:
rm::Client _cli;
std::optional<bool> _start_res{};
std::optional<bool> _stop_res{};
};
int main()
{
OpcUaController uactl("opc.tcp://127.0.0.1:4840");
bool val = uactl.start();
printf("Start result: %d\n", val);
val = uactl.stop();
printf("Stop result: %d\n", val);
}
Python
import rm
class OpcUaController:
def __init__(self, addr):
self.__start_res = None
self.__stop_res = None
self.__cli.monitor(["Message", "Result"], self.on_event)
def on_event(self, view, vals):
if vals[0] == "Msg_Start":
self.__start_res = vals[1] == 0
elif vals[0] == "Msg_Stop":
self.__stop_res = vals[1] == 0
def start(self):
res, oargs = self.__cli.call("Start", [])
if not res:
print("Failed to call start")
return False
while self.__start_res is None:
self.__cli.spinOnce()
return self.__start_res
def stop(self):
res, oargs = self.__cli.call("Stop", [])
if not res:
print("Failed to call stop")
return False
while self.__stop_res is None:
self.__cli.spinOnce()
return self.__stop_res
uactl = OpcUaController("opc.tcp://127.0.0.1:4840")
val = uactl.start()
print(f"Start result: {val}")
"""
code
"""
val = uactl.stop()
print(f"Stop result: {val}")
2.7 定时
OPC UA 模块 为服务器和客户端均提供了循环定时器,用于周期性执行任务。下面的示例演示在 服务器 中创建并添加定时器。
C++
#include <csignal>
bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
int times{};
printf("Timer callback, times = %d\n", times);
times++;
});
while (!stop)
srv.spinOnce();
}
OPC UA 服务器定时器
定义 server.hpp:349
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
times = 0
def timer_callback(view):
global times
print(f"Timer callback, times = {times}")
times += 1
while not stop:
svr.spinOnce()
3. 发布/订阅
这是一段来自 open62541 手册中有关 PubSub 的介绍。
在 PubSub 中,参与的 OPC UA 应用程序扮演发布者和订阅者的角色。发布者是数据的来源,而订阅者则使用该数据。PubSub 中的通信是基于消息的。发布者将消息发送到面向消息的中间件,而不知道可能有哪些订阅者(如果有)。同样,订阅者表达对特定类型数据的兴趣,并处理包含此数据的消息,而不知道有哪些发布者。
面向消息的中间件是支持在分布式系统之间发送和接收消息的软件或硬件基础设施。OPC UA PubSub 支持两种不同的面向消息的中间件变体,即 无代理形式 和 基于代理的形式 。在无代理形式中,面向消息的中间件是能够路由基于数据报的消息的网络基础设施。订阅者和发布者使用 UDP 等数据报协议。在基于代理的形式中,消息中间件的核心组件是消息代理。订阅者和发布者使用 AMQP 或 MQTT 等标准消息传递协议与代理进行通信。
这使得 PubSub 适合需要位置独立性和(或)可扩展性的应用程序。
OPC UA 的发布/订阅(PubSub)扩展可实现快速高效的通信。PubSub 扩展与协议无关,可与基于代理的协议(如 MQTT 和 AMQP)或无代理实现(如 UDP 多播)一起使用。
PubSub 的配置模型使用以下组件
typedef enum {
UA_PUBSUB_COMPONENT_CONNECTION,
UA_PUBSUB_COMPONENT_WRITERGROUP,
UA_PUBSUB_COMPONENT_DATASETWRITER,
UA_PUBSUB_COMPONENT_READERGROUP,
UA_PUBSUB_COMPONENT_DATASETREADER
} UA_PubSubComponentEnumType;
open62541 PubSub API 使用以下方案
- 为所需的 PubSub 元素创建配置
- 调用
add[element]
函数并传入配置
add[element]
函数返回内部创建的元素的唯一 UA_NodeId
有关 API 使用的更多详细信息,请查看 PubSub 教程。
3.1 无代理 Pub/Sub
RMVL 提供了基于 UDP
传输协议的 Broker-less 即无代理的发布订阅机制,目前支持 UADP
的消息映射方式,对应的枚举类型是 TransportID::UDP_UADP
。
需要留意的是,OPC UA 的发布订阅模型仍然是建立在 2. 服务器/客户端 模型之上的,此外 OPC UA 模块 的 PubSub 在实现上是继承于 rm::Server 的,因此,RMVL 的发布订阅模型在使用时具备服务器的所有功能,初始化、释放资源等操作与服务器完全一致。
创建发布者
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
auto num_node = pub.addVariableNode(num);
std::vector<rm::PublishedDataSet> pds_list;
pds_list.emplace_back("Number 1", num_node);
pub.publish(pds_list, 50);
while (!stop)
{
pub.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
OPC UA 发布者
定义 publisher.hpp:37
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
pub =
rm.Publisher(rm.TransportID.UDP_UADP,
"DemoNumberPub",
"opc.udp://224.0.0.22:4840")
num.browse_name = "number"
num.display_name = "Number"
num.description = "数字"
num_node = pub.addVariableNode(num)
pub.publish(pds_list, 50)
while not stop:
pub.spinOnce()
待发布的数据集 (PDS)
定义 publisher.hpp:30
创建订阅者
C++
#include <csignal>
#include <thread>
using namespace std::chrono_literals;
static bool stop = false;
int main()
{
signal(SIGINT, [](int) { stop = true; });
auto nodes = sub.subscribe("DemoNumberPub", {meta_data});
while (!stop)
{
auto sub_val = sub.read(nodes.front());
std::printf("Sub value [1] = %f\n", sub_val.cast<double>());
sub.spinOnce();
std::this_thread::sleep_for(10ms);
}
}
OPC UA 订阅者
定义 subscriber.hpp:60
constexpr DataType tpDouble
数据类型:Double
定义 utilities.hpp:153
Python
from signal import signal, SIGINT
import rm
stop = False
def onStop(sig, frame):
global stop
stop = True
signal(SIGINT, onStop)
sub =
rm.Subscriber(rm.TransportID.UDP_UADP,
"DemoNumberSub",
"opc.udp://224.0.0.22:4840", 4841)
"""
也可以通过创建变量对 meta_data 进行初始化,例如以下代码
num = rm.Variable(1.0) # 这个 1.0 只是代表是个 Double 类型的数据
num.browse_name = "Number 1"
meta_data = rm.FieldMetaData.makeFrom(num)
"""
nodes = sub.subscribe("DemoNumberPub", [meta_data])
while not stop:
sub_val = sub.read(nodes[0])
print(f"Sub value [1] = {sub_val.float()}")
sub.spinOnce()
3.2 有代理 Pub/Sub
- 警告
- RMVL 目前暂不支持有代理的发布订阅机制。
4. 使用技巧
以下是 OPC UA 模块 的使用技巧。
4.1 参数加载
OPC UA 模块 中提供了以下几个运行时可调节参数
类型 | 参数名 | 默认值 | 注释 |
bool | SERVER_WAIT | false | 单次处理网络事件时,允许服务器等待最多 50ms |
uint32_t | CONNECT_TIMEOUT | 30000 | 请求连接时,判定为超时的时间,单位 (ms) |
uint32_t | CLIENT_WAIT_TIMEOUT | 10 | 服务器超时响应的时间,单位 (ms) |
double | SAMPLING_INTERVAL | 2 | 服务器监视变量的采样速度,单位 (ms) |
double | PUBLISHING_INTERVAL | 2 | 服务器尝试发布数据变更的期望时间间隔,若数据未变更则不会发布,单位 (ms) |
uint32_t | LIFETIME_COUNT | 100 | 在没有发布任何消息的情况下,订阅请求所期望的能够保持活动状态的最大发布周期数 |
uint32_t | MAX_KEEPALIVE_COUNT | 50 | 在没有任何通知的情况下,订阅请求所期望的服务器应该发送的最大 “保活” 消息数 |
uint32_t | MAX_NOTIFICATIONS | 100 | 服务器应该发送的期望的最大通知数(通知是服务器向客户端报告订阅的变化的方式) |
uint8_t | PRIORITY | 0 | 订阅请求的优先级 |
具体调节方式可参考引言中的 参数管理 部分。
4.2 从 XML 配置 OPC UA
4.2.1 安装 UaModeler
可使用 UaModeler 等软件进行可视化信息模型的建立,构建后可以导出为一个 *.xml
文件,首先先安装 UaModeler。
Windows EXE
- Windows 下可点击此处安装官方版本的 UaModeler 软件。
Python
- 如果有 Python 环境,也可以使用开源的 UaModeler 库,功能与官方软件基本一致。使用之前需要安装
pip3
Python 包管理工具,安装好包管理工具后,可使用以下命令行安装 UaModeler pip3 install opcua-modeler
# Linux 下可以执行以下命令行运行 UaModeler
opcua-modeler
具体安装细节可参考 opcua-modeler on Github 的 README。
4.2.2 可视化配置 OPC UA
对于项目创建或导出等内容,此处不做过多介绍,可参考此博客了解上述内容。
- 注解
- 一般的,定义对象、变量、方法等内容均按照在代码中的顺序进行定义即可,但需要注意的是,添加了方法节点后,还需要在代码中设置该方法节点执行的回调函数,可参见
rm::Server::setMethodNodeCallBack
。
NamespaceArray
的 [1]
的字符串需要更改为 urn:open62541.server.application
4.2.3 生成 *.c/*.h 文件
- 注解
- 以下生成 C/C++ 文件的介绍来自 open62541 nodeset-compiler。
进入 <path-to-open62541>/tools/nodeset_compiler
文件夹,执行以下命令行
# 获取 Opc.Ua.NodeSet2.xml 文件
wget https://files.opcfoundation.org/schemas/UA/1.05/Opc.Ua.NodeSet2.xml
# 将刚刚生成的 XML 文件移动至当前文件夹中,并重命名为 xxx.xml
mv <path-to-xml> ./xxx.xml
# 执行 nodeset_compiler
python3 ./nodeset_compiler.py \
--types-array=UA_TYPES \
--existing Opc.Ua.NodeSet2.xml \
--xml xxx.xml \
myNodeSet # myNodeSet 是要生成的文件名,包含 myNodeSet.h 和 myNodeSet.c,请自行设置
4.3 不占有所有权的 C/S 视图
rm::Server
使用 RAII 进行设计,一个对象占有了服务器的所有权和生命周期,当对象析构时,会自动停止并结束服务器。使用 rm::ServerView
来获取不占有所有权的服务器视图,并进行变量读写、路径搜索的操作,下面用服务器视图的单元测试作为示例。
C++
auto num_node = sv.
find(
"num");
int num = sv.
read(num_node).
cast<
int>();
sv.
write(num_node, iargs[0].cast<int>() + num);
};
srv.addMethodNode(method);
OPC UA 节点 ID
定义 utilities.hpp:42
Variable read(const NodeId &nd) const
从指定的变量节点读数据
NodeId find(std::string_view browse_path, const NodeId &src_nd=nodeObjectsFolder) const noexcept
通过 BrowseName 的路径搜索命名空间 ns 为 1 的节点
bool write(const NodeId &nd, const Variable &val) const
给指定的变量节点写数据
Python
def plus(sv, objnd, iargs):
num_node = sv.find("num")
num = sv.read(num_node).int()
iarg = iargs[0].int()
return True, []
method.browse_name = "plus"
method.display_name = "Input + Number"
method.description = "输入值加数"
iarg.name = "input"
iarg.type = rm.tp_int
iarg.dims = 1
iarg.description = "输入值"
method.iargs = [iarg]
srv.addMethodNode(method)
同样的,客户端也可以使用 rm::ClientView
来获取不占有所有权的客户端视图,进行变量读写、路径搜索的操作,此处不再赘述。
5. 参考内容
[6] UaModeler · FreeOpcUa/opcua-modeler · Github