반응형
# 설치
pip install pymodbus
# 슬레이브 코드(데이터 응답 제공)
import asyncio
import logging
from examples import helper
from pymodbus import __version__ as pymodbus_version
from pymodbus.datastore import (
ModbusSequentialDataBlock,
ModbusServerContext,
ModbusSlaveContext,
ModbusSparseDataBlock,
)
from pymodbus.device import ModbusDeviceIdentification
from pymodbus.server import (
StartAsyncSerialServer,
StartAsyncTcpServer,
StartAsyncTlsServer,
StartAsyncUdpServer,
)
logging.basicConfig()
_logger = logging.getLogger(__file__)
_logger.setLevel(logging.INFO)
def setup_server(description=None, context=None, cmdline=None):
"""서버 설정 실행."""
args = helper.get_commandline(server=True, description=description, cmdline=cmdline)
if context:
args.context = context
if not args.context:
_logger.info("### 데이터 저장소 생성")
# 데이터 저장소는 초기화된 주소에만 응답합니다.
# 데이터 블록을 0x00에서 0xFF로 초기화하면 0x100 요청은
# 유효하지 않은 주소 예외로 응답합니다.
# 이는 많은 장치에서 나타나는 행동입니다 (하지만 모두 아닙니다)
if args.store == "sequential":
# 연속되는 갭 없는 블록 사용
datablock = ModbusSequentialDataBlock(0x00, [11] * 100)
elif args.store == "sparse":
# 연속되지 않는 데이터 블록 사용(간격 있음)
datablock = ModbusSparseDataBlock({0x00: 0, 0x05: 1})
elif args.store == "factory":
# 대신 데이터 블록을 초기화하는 팩토리 메서드 사용
# 또는 주소 범위의 0x00에서 0x00으로 초기화되도록 그냥 전달
datablock = ModbusSequentialDataBlock.create()
if args.slaves:
# 서버는 다른 슬레이브 ID에 대해 다른 슬레이브 컨텍스트로 응답할 수 있는
# 서버 컨텍스트를 활용합니다.
# 기본적으로 모든 슬레이브 ID에 대해 동일한 컨텍스트를 반환합니다.
# (브로드캐스트 모드)
# 그러나 single 플래그를 False로 설정하고 슬레이브 ID에서 컨텍스트 매핑
# 딕셔너리를 제공하여이를 오버로드 할 수 있습니다.
#
# 슬레이브 컨텍스트는 주소(0-7)에 대한 요청이 주소(0-7)에 매핑되도록
# 초기화될 수도 있으며 기본값은 False입니다.
# 이는 명세서 4.4의 섹션을 기반으로 하므로 주소(0-7)은 (1-8)에 매핑됩니다.
context = {
0x01: ModbusSlaveContext(
di=datablock,
co=datablock,
hr=datablock,
ir=datablock,
),
0x02: ModbusSlaveContext(
di=datablock,
co=datablock,
hr=datablock,
ir=datablock,
),
0x03: ModbusSlaveContext(
di=datablock,
co=datablock,
hr=datablock,
ir=datablock,
zero_mode=True,
),
}
single = False
else:
context = ModbusSlaveContext(
di=datablock, co=datablock, hr=datablock, ir=datablock
)
single = True
# 데이터 저장소 구축
args.context = ModbusServerContext(slaves=context, single=single)
# ----------------------------------------------------------------------- #
# 서버 정보 초기화
# ----------------------------------------------------------------------- #
# 이를 설정하지 않거나 어떤 필드도 설정하지 않으면 빈 문자열로 기본 설정됩니다.
# ----------------------------------------------------------------------- #
args.identity = ModbusDeviceIdentification(
info_name={
"VendorName": "Pymodbus",
"ProductCode": "PM",
"VendorUrl": "https://github.com/pymodbus-dev/pymodbus/",
"ProductName": "Pymodbus Server",
"ModelName": "Pymodbus Server",
"MajorMinorRevision": pymodbus_version,
}
)
return args
async def run_async_server(args):
"""서버 실행."""
txt = f"### 비동기 서버 시작, {args.port} 포트에서 대기 중 - {args.comm}"
_logger.info(txt)
if args.comm == "tcp":
address = ("localhost", 502)
server = await StartAsyncTcpServer(
context=args.context, # 데이터 저장소
identity=args.identity, # 서버 식별
address=address, # 리스닝 주소
framer=args.framer, # 사용할 프레이머 전략
)
elif args.comm == "udp":
address = (
args.host if args.host else "127.0.0.1",
args.port if args.port else None,
)
server = await StartAsyncUdpServer(
context=args.context, # 데이터 저장소
identity=args.identity, # 서버 식별
address=address, # 리스닝 주소
framer=args.framer, # 사용할 프레이머 전략
)
elif args.comm == "serial":
server = await StartAsyncSerialServer(
context=args.context, # 데이터 저장소
identity=args.identity, # 서버 식별
port=args.port, # 시리얼 포트
framer=args.framer, # 사용할 프레이머 전략
baudrate=args.baudrate, # 시리얼 장치의 사용할 보유율
)
elif args.comm == "tls":
address = ("localhost", 503)
server = await StartAsyncTlsServer(
context=args.context, # 데이터 저장소
host="localhost", # 연결할 TCP 주소 정의
identity=args.identity, # 서버 식별
address=address, # 리스닝 주소
framer=args.framer, # 사용할 프레이머 전략
certfile=helper.get_certificate(
"crt"
), # TLS에 대한 인증서 파일 경로 (sslctx가 None인 경우 사용됨)
keyfile=helper.get_certificate(
"key"
), # TLS에 대한 키 파일 경로 (sslctx가 None인 경우 사용됨)
)
return server
async def async_helper():
"""설정 및 실행 결합."""
_logger.info("시작 중...")
run_args = setup_server(description="비동기 서버 실행.")
await run_async_server(run_args)
if __name__ == "__main__":
asyncio.run(async_helper(), debug=True) # pragma: no cover
* 참조(helper.py 와 예제) : https://github.com/pymodbus-dev/pymodbus/tree/dev/examples
# Master READ 코드
import asyncio
from pymodbus import pymodbus_apply_logging_config
# 다양한 클라이언트 구현을 가져옵니다.
from pymodbus.client.tcp import AsyncModbusTcpClient as AsyncModbusTcpClient
from pymodbus.exceptions import ModbusException
from pymodbus.transaction import ModbusSocketFramer
async def read_data_periodically(client, interval):
while True:
try:
# 홀딩 레지스터에서 데이터 읽기 (필요에 따라 매개변수를 조정할 수 있음)
response = await client.read_holding_registers(address=0, count=10, unit=1)
if not response.isError():
print("데이터 읽기:", response.registers)
else:
print("데이터 읽기 오류:", response)
except ModbusException as exc:
print(f"라이브러리에서 ModbusException({exc})를 수신했습니다.")
# 데이터를 다시 읽기 전에 지정된 간격만큼 대기
await asyncio.sleep(interval)
async def run_async_simple_client(host, port, interval=5):
"""비동기 클라이언트 실행."""
# 디버깅 활성화
pymodbus_apply_logging_config("DEBUG")
client = AsyncModbusTcpClient(host, port=port, framer=ModbusSocketFramer)
try:
await client.connect()
assert client.connected
# 주기적으로 데이터 읽기 시작
await read_data_periodically(client, interval)
except Exception as e:
print(f"오류: {e}")
finally:
client.close()
if __name__ == "__main__":
asyncio.run(run_async_simple_client("127.0.0.1", 502, interval=5), debug=True)
# Master WRITE 코드
#!/usr/bin/env python3
"""Pymodbus 비동기 클라이언트 예제.
단일 스레드 동기 클라이언트의 예제입니다.
사용법: simple_client_async.py
모든 옵션은 코드에서 적절하게 수정해야 합니다.
대응하는 서버는 다음과 같이 시작되어야 합니다:
python3 server_sync.py
"""
import asyncio
from pymodbus import pymodbus_apply_logging_config
# --------------------------------------------------------------------------- #
# 다양한 클라이언트 구현을 가져옵니다.
# --------------------------------------------------------------------------- #
from pymodbus.client import (
AsyncModbusSerialClient,
AsyncModbusTcpClient,
AsyncModbusTlsClient,
AsyncModbusUdpClient,
)
from pymodbus.exceptions import ModbusException
from pymodbus.pdu import ExceptionResponse
from pymodbus.transaction import (
# ModbusAsciiFramer,
# ModbusBinaryFramer,
ModbusRtuFramer,
ModbusSocketFramer,
ModbusTlsFramer,
)
async def 비동기_단순_클라이언트_실행(comm, 호스트, 포트, framer=ModbusSocketFramer):
"""비동기 클라이언트 실행."""
# 디버깅 활성화
pymodbus_apply_logging_config("DEBUG")
print("클라이언트 생성")
if comm == "tcp":
client = AsyncModbusTcpClient(
호스트,
port=포트,
framer=framer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# source_address=("localhost", 0),
)
elif comm == "udp":
client = AsyncModbusUdpClient(
호스트,
port=포트,
framer=ModbusSocketFramer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# source_address=None,
)
elif comm == "serial":
client = AsyncModbusSerialClient(
포트,
framer=ModbusRtuFramer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
baudrate=9600,
bytesize=8,
parity="N",
stopbits=1,
# handle_local_echo=False,
)
elif comm == "tls":
client = AsyncModbusTlsClient(
호스트,
port=포트,
framer=ModbusTlsFramer,
# timeout=10,
# retries=3,
# retry_on_empty=False,
# close_comm_on_error=False,
# strict=True,
# sslctx=sslctx,
certfile="../examples/certificates/pymodbus.crt",
keyfile="../examples/certificates/pymodbus.key",
# password="none",
server_hostname="localhost",
)
else: # pragma no cover
print(f"알 수 없는 클라이언트 {comm} 선택됨")
return
print("서버에 연결")
await client.connect()
# 클라이언트가 연결되었는지 테스트
assert client.connected
print("데이터 가져오고 확인")
try:
# client_calls.py에서 모든 호출 참조
rr = await client.write_register(3, 1234)
# write_register(a, b)
# a슬레이브(장치)에 b값
except ModbusException as exc: # pragma no cover
print(f"라이브러리에서 ModbusException({exc})를 수신했습니다.")
client.close()
return
if rr.isError(): # pragma no cover
print(f"Modbus 라이브러리 오류({rr})를 수신했습니다.")
client.close()
return
if isinstance(rr, ExceptionResponse): # pragma no cover
print(f"Modbus 라이브러리 예외를 수신했습니다. ({rr})")
# 이것은 파이썬 예외가 아니라 유효한 모드버스 메시지입니다.
client.close()
print("연결 종료")
client.close()
if __name__ == "__main__":
asyncio.run(
비동기_단순_클라이언트_실행("tcp", "192.168.19.101", 5020), debug=True
) # pragma: no cover
728x90
'Programming > 파이썬(python) - 라이브러리' 카테고리의 다른 글
파이썬(python)/ 안면 인식 라이브러리 dlib (0) | 2023.09.28 |
---|---|
파이썬(python)/ TCP-IP 통신 사용 (0) | 2023.09.27 |
파이썬(python)/ MSSQL 연동 사용법 (0) | 2023.09.27 |
파이썬(python)/ Pyqt5 기본 문법 (0) | 2023.09.26 |
파이썬(python)/ 플라스크(flask) 배포 (0) | 2023.09.19 |