Programming/파이썬(python) - 라이브러리

파이썬(python)/ pymodbus(Modbus) 모드버스

esoog Polaris 2023. 9. 27. 10:02
반응형

# 설치

pip install pymodbus

 

 

 

 

#  슬레이브 코드(데이터 응답 제공)

 

import asyncio
import logging

from examples import helper
from pymodbus import __version__ as pymodbus_version
from pymodbus.datastore import (
    ModbusSequentialDataBlock,
    ModbusServerContext,
    ModbusSlaveContext,
    ModbusSparseDataBlock,
)
from pymodbus.device import ModbusDeviceIdentification

from pymodbus.server import (
    StartAsyncSerialServer,
    StartAsyncTcpServer,
    StartAsyncTlsServer,
    StartAsyncUdpServer,
)

logging.basicConfig()
_logger = logging.getLogger(__file__)
_logger.setLevel(logging.INFO)

def setup_server(description=None, context=None, cmdline=None):
    """서버 설정 실행."""
    args = helper.get_commandline(server=True, description=description, cmdline=cmdline)
    if context:
        args.context = context
    if not args.context:
        _logger.info("### 데이터 저장소 생성")
        # 데이터 저장소는 초기화된 주소에만 응답합니다.
        # 데이터 블록을 0x00에서 0xFF로 초기화하면 0x100 요청은
        # 유효하지 않은 주소 예외로 응답합니다.
        # 이는 많은 장치에서 나타나는 행동입니다 (하지만 모두 아닙니다)
        if args.store == "sequential":
            # 연속되는 갭 없는 블록 사용
            datablock = ModbusSequentialDataBlock(0x00, [11] * 100)
        elif args.store == "sparse":
            # 연속되지 않는 데이터 블록 사용(간격 있음)
            datablock = ModbusSparseDataBlock({0x00: 0, 0x05: 1})
        elif args.store == "factory":
            # 대신 데이터 블록을 초기화하는 팩토리 메서드 사용
            # 또는 주소 범위의 0x00에서 0x00으로 초기화되도록 그냥 전달
            datablock = ModbusSequentialDataBlock.create()

        if args.slaves:
            # 서버는 다른 슬레이브 ID에 대해 다른 슬레이브 컨텍스트로 응답할 수 있는
            # 서버 컨텍스트를 활용합니다.
            # 기본적으로 모든 슬레이브 ID에 대해 동일한 컨텍스트를 반환합니다.
            # (브로드캐스트 모드)
            # 그러나 single 플래그를 False로 설정하고 슬레이브 ID에서 컨텍스트 매핑
            # 딕셔너리를 제공하여이를 오버로드 할 수 있습니다.
            #
            # 슬레이브 컨텍스트는 주소(0-7)에 대한 요청이 주소(0-7)에 매핑되도록
            # 초기화될 수도 있으며 기본값은 False입니다.
            # 이는 명세서 4.4의 섹션을 기반으로 하므로 주소(0-7)은 (1-8)에 매핑됩니다.
            context = {
                0x01: ModbusSlaveContext(
                    di=datablock,
                    co=datablock,
                    hr=datablock,
                    ir=datablock,
                ),
                0x02: ModbusSlaveContext(
                    di=datablock,
                    co=datablock,
                    hr=datablock,
                    ir=datablock,
                ),
                0x03: ModbusSlaveContext(
                    di=datablock,
                    co=datablock,
                    hr=datablock,
                    ir=datablock,
                    zero_mode=True,
                ),
            }
            single = False
        else:
            context = ModbusSlaveContext(
                di=datablock, co=datablock, hr=datablock, ir=datablock
            )
            single = True

        # 데이터 저장소 구축
        args.context = ModbusServerContext(slaves=context, single=single)

    # ----------------------------------------------------------------------- #
    # 서버 정보 초기화
    # ----------------------------------------------------------------------- #
    # 이를 설정하지 않거나 어떤 필드도 설정하지 않으면 빈 문자열로 기본 설정됩니다.
    # ----------------------------------------------------------------------- #
    args.identity = ModbusDeviceIdentification(
        info_name={
            "VendorName": "Pymodbus",
            "ProductCode": "PM",
            "VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
            "ProductName": "Pymodbus Server",
            "ModelName": "Pymodbus Server",
            "MajorMinorRevision": pymodbus_version,
        }
    )
    return args

async def run_async_server(args):
    """서버 실행."""
    txt = f"### 비동기 서버 시작, {args.port} 포트에서 대기 중 - {args.comm}"
    _logger.info(txt)
    if args.comm == "tcp":
        address = ("localhost", 502)
        server = await StartAsyncTcpServer(
            context=args.context,  # 데이터 저장소
            identity=args.identity,  # 서버 식별
            address=address,  # 리스닝 주소
            framer=args.framer,  # 사용할 프레이머 전략
        )
    elif args.comm == "udp":
        address = (
            args.host if args.host else "127.0.0.1",
            args.port if args.port else None,
        )
        server = await StartAsyncUdpServer(
            context=args.context,  # 데이터 저장소
            identity=args.identity,  # 서버 식별
            address=address,  # 리스닝 주소
            framer=args.framer,  # 사용할 프레이머 전략
        )
    elif args.comm == "serial":
        server = await StartAsyncSerialServer(
            context=args.context,  # 데이터 저장소
            identity=args.identity,  # 서버 식별
            port=args.port,  # 시리얼 포트
            framer=args.framer,  # 사용할 프레이머 전략
            baudrate=args.baudrate,  # 시리얼 장치의 사용할 보유율
        )
    elif args.comm == "tls":
        address = ("localhost", 503)
        server = await StartAsyncTlsServer(
            context=args.context,  # 데이터 저장소
            host="localhost",  # 연결할 TCP 주소 정의
            identity=args.identity,  # 서버 식별
            address=address,  # 리스닝 주소
            framer=args.framer,  # 사용할 프레이머 전략
            certfile=helper.get_certificate(
                "crt"
            ),  # TLS에 대한 인증서 파일 경로 (sslctx가 None인 경우 사용됨)
            keyfile=helper.get_certificate(
                "key"
            ),  # TLS에 대한 키 파일 경로 (sslctx가 None인 경우 사용됨)
        )
    return server

async def async_helper():
    """설정 및 실행 결합."""
    _logger.info("시작 중...")
    run_args = setup_server(description="비동기 서버 실행.")
    await run_async_server(run_args)

if __name__ == "__main__":
    asyncio.run(async_helper(), debug=True)  # pragma: no cover

* 참조(helper.py 와 예제) : https://github.com/pymodbus-dev/pymodbus/tree/dev/examples

 

 

 

# Master READ 코드

 

import asyncio

from pymodbus import pymodbus_apply_logging_config

# 다양한 클라이언트 구현을 가져옵니다.
from pymodbus.client.tcp import AsyncModbusTcpClient as AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
from pymodbus.transaction import ModbusSocketFramer

async def read_data_periodically(client, interval):
    while True:
        try:
            # 홀딩 레지스터에서 데이터 읽기 (필요에 따라 매개변수를 조정할 수 있음)
            response = await client.read_holding_registers(address=0, count=10, unit=1)
            if not response.isError():
                print("데이터 읽기:", response.registers)
            else:
                print("데이터 읽기 오류:", response)
        except ModbusException as exc:
            print(f"라이브러리에서 ModbusException({exc})를 수신했습니다.")

        # 데이터를 다시 읽기 전에 지정된 간격만큼 대기
        await asyncio.sleep(interval)

async def run_async_simple_client(host, port, interval=5):
    """비동기 클라이언트 실행."""

    # 디버깅 활성화
    pymodbus_apply_logging_config("DEBUG")

    client = AsyncModbusTcpClient(host, port=port, framer=ModbusSocketFramer)

    try:
        await client.connect()
        assert client.connected

        # 주기적으로 데이터 읽기 시작
        await read_data_periodically(client, interval)
    except Exception as e:
        print(f"오류: {e}")
    finally:
        client.close()

if __name__ == "__main__":
    asyncio.run(run_async_simple_client("127.0.0.1", 502, interval=5), debug=True)

 

 

# Master WRITE 코드

 

#!/usr/bin/env python3
"""Pymodbus 비동기 클라이언트 예제.

단일 스레드 동기 클라이언트의 예제입니다.

사용법: simple_client_async.py

모든 옵션은 코드에서 적절하게 수정해야 합니다.
대응하는 서버는 다음과 같이 시작되어야 합니다:
    python3 server_sync.py
"""
import asyncio

from pymodbus import pymodbus_apply_logging_config

# --------------------------------------------------------------------------- #
# 다양한 클라이언트 구현을 가져옵니다.
# --------------------------------------------------------------------------- #
from pymodbus.client import (
    AsyncModbusSerialClient,
    AsyncModbusTcpClient,
    AsyncModbusTlsClient,
    AsyncModbusUdpClient,
)
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
from pymodbus.transaction import (
    #    ModbusAsciiFramer,
    #    ModbusBinaryFramer,
    ModbusRtuFramer,
    ModbusSocketFramer,
    ModbusTlsFramer,
)


async def 비동기_단순_클라이언트_실행(comm, 호스트, 포트, framer=ModbusSocketFramer):
    """비동기 클라이언트 실행."""

    # 디버깅 활성화
    pymodbus_apply_logging_config("DEBUG")

    print("클라이언트 생성")
    if comm == "tcp":
        client = AsyncModbusTcpClient(
            호스트,
            port=포트,
            framer=framer,
            # timeout=10,
            # retries=3,
            # retry_on_empty=False,
            # close_comm_on_error=False,
            # strict=True,
            # source_address=("localhost", 0),
        )
    elif comm == "udp":
        client = AsyncModbusUdpClient(
            호스트,
            port=포트,
            framer=ModbusSocketFramer,
            # timeout=10,
            # retries=3,
            # retry_on_empty=False,
            # close_comm_on_error=False,
            # strict=True,
            # source_address=None,
        )
    elif comm == "serial":
        client = AsyncModbusSerialClient(
            포트,
            framer=ModbusRtuFramer,
            # timeout=10,
            # retries=3,
            # retry_on_empty=False,
            # close_comm_on_error=False,
            # strict=True,
            baudrate=9600,
            bytesize=8,
            parity="N",
            stopbits=1,
            # handle_local_echo=False,
        )
    elif comm == "tls":
        client = AsyncModbusTlsClient(
            호스트,
            port=포트,
            framer=ModbusTlsFramer,
            # timeout=10,
            # retries=3,
            # retry_on_empty=False,
            # close_comm_on_error=False,
            # strict=True,
            # sslctx=sslctx,
            certfile="../examples/certificates/pymodbus.crt",
            keyfile="../examples/certificates/pymodbus.key",
            # password="none",
            server_hostname="localhost",
        )
    else:  # pragma no cover
        print(f"알 수 없는 클라이언트 {comm} 선택됨")
        return


    print("서버에 연결")
    await client.connect()
    # 클라이언트가 연결되었는지 테스트
    assert client.connected

    print("데이터 가져오고 확인")
    try:
        # client_calls.py에서 모든 호출 참조
        rr = await client.write_register(3, 1234)
        # write_register(a, b)
        # a슬레이브(장치)에 b값
    except ModbusException as exc:  # pragma no cover
        print(f"라이브러리에서 ModbusException({exc})를 수신했습니다.")
        client.close()
        return
    if rr.isError():  # pragma no cover
        print(f"Modbus 라이브러리 오류({rr})를 수신했습니다.")
        client.close()
        return
    if isinstance(rr, ExceptionResponse):  # pragma no cover
        print(f"Modbus 라이브러리 예외를 수신했습니다. ({rr})")
        # 이것은 파이썬 예외가 아니라 유효한 모드버스 메시지입니다.
        client.close()

    print("연결 종료")
    client.close()


if __name__ == "__main__":
    asyncio.run(
        비동기_단순_클라이언트_실행("tcp", "192.168.19.101", 5020), debug=True
    )  # pragma: no cover

 

728x90