这篇文章里介绍了一些常用的OPCUA库,其中有个库叫opcua-asyncio,使用异步io来提高效率,对OPCUA的实现比较齐全,client和server都可以做。地址是https://github.com/FreeOpcUa/opcua-asyncio

安装opcua-asyncio,如下,

pip install asyncua

本文主要讲述如何使用这个库编写一个简单的Client来和Server进行通信。Server基于open62541-1.2.2版本实现。


一 Server代码

一段经典的server代码,如何编译运行不再赘述,详细可以参考之前的系列文章,

// server.c

/* This work is licensed under a Creative Commons CCZero 1.0 Universal License.
 * See http://creativecommons.org/publicdomain/zero/1.0/ for more information. */

#include "open62541.h"

#include <signal.h>
#include <stdlib.h>

UA_Boolean running = true;

static void stopHandler(int sign) 
{
    UA_LOG_INFO(UA_Log_Stdout, UA_LOGCATEGORY_SERVER, "received ctrl-c");
    running = false;
}

int main(void) 
{
    signal(SIGINT, stopHandler);
    signal(SIGTERM, stopHandler);

    UA_Server *server = UA_Server_new();
    UA_ServerConfig_setDefault(UA_Server_getConfig(server));
    UA_StatusCode retval = UA_Server_run(server, &running);
    
    UA_Server_delete(server);
    
    return retval == UA_STATUSCODE_GOOD ? EXIT_SUCCESS : EXIT_FAILURE;
}

二 Client

opcua-asyncio设计的很好,代码也简洁易懂,因为采用了异步io,很多地方需要写async和await,习惯了就好,

'''
client.py,获取server的系统时间
'''
import asyncio
from asyncua import Client


async def main():
    async with Client(url='opc.tcp://192.168.131.129:4840') as client: # ip地址改成自己的
        node = client.get_node("ns=0;i=2258") # 系统时间的node id
        value = await node.read_value()
        print('server time: {}'.format(value))


if __name__ == '__main__':
    asyncio.run(main())

使用时注意把ip地址改成自己的。无需编译,直接执行即可,

python client.py

最后打印如下,可以看出得到的时间很直观,不像open62541那样还要转换一下,
在这里插入图片描述
PS:这是UTC时间,换算成自己电脑时间需要加上所在时区,也就是加8


三 总结

可以看出opcua-asyncio很简洁,且简单易用,代价就是效率低,和C差距很大,不过作为原型验证则没有问题,或者允许低效的情况下也可以使用。

Logo

GitCode 天启AI是一款由 GitCode 团队打造的智能助手,基于先进的LLM(大语言模型)与多智能体 Agent 技术构建,致力于为用户提供高效、智能、多模态的创作与开发支持。它不仅支持自然语言对话,还具备处理文件、生成 PPT、撰写分析报告、开发 Web 应用等多项能力,真正做到“一句话,让 Al帮你完成复杂任务”。

更多推荐