Python 和三菱FX5U 進行通訊
三菱FX5U 支援SMTP 協議,可利用Python 的Socket 通訊按照三菱的協議格式進行通訊,上位機作為客戶端,PLC作為伺服器
"""
author: alai
date:2018/05/01
"""
import socket
import time
import xlwt,xlrd,struct
from xlutils.copy import copy
import sys
import re,os,sys
data="ok"
class TcpThread(QThread):
trigger = pyqtSignal(str)
trigger1=pyqtSignal(str)
def __int__(self):
super(TcpThread, self).__init__()
# 分離出一個函式用於迴圈tcp 重連
def doConnect(self):
with open('IP_Address.txt', 'r')as fn:
s1 = fn.read()
host = ''.join(re.findall(r'IP:(.*)', s1)) # 伺服器IP地址取出來,並轉成字串
port = int(''.join(re.findall(r'PORT:(.*)', s1))) # 從文字中提取埠號轉成整數
BUFFSIZE = int(''.join(re.findall(r'BUFFSIZE:(.*)', s1))) # 從文字中提取字元長度轉整數
ADDRESS = (''.join(re.findall(r'ADDRESS:(.*)', s1))) # 從文字中提取字元長度轉整數
LENGTH = (''.join(re.findall(r'LENGTH:(.*)', s1))) # 從文字中提取字元長度轉整數
print(host,port,ADDRESS,LENGTH)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
sock.connect((host, port))
print("connect is ok")
except:
print('fail to setup socket connection')
return sock, BUFFSIZE,ADDRESS,LENGTH
# 傳送資料
def tcpClient(self):
global glb_tag
glb_tag=0
global glb_data
sockLocal, BUFFSIZE,ADDRESS,LENGTH = self.doConnect()
while True:
# 等於0,那麼一直查詢
if glb_tag==0:
self.data1 = '500000FF03FF000018000004010000D*00'
self.data = self.data1+ADDRESS+LENGTH
# print(self.data)
try:
sockLocal.send(self.data.encode())
except socket.error:
print("\r\nsocket error,do reconnect ")
sockLocal.close()
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
except:
sockLocal.close()
print("other error")
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
time.sleep(0.5)
str1 = sockLocal.recv(BUFFSIZE).decode()
if str1[22:26]=="0001":
glb_tag=1
# 等於1,寫入並等待結果
elif glb_tag==1:
# 發射訊號,連線到datadisplay
self.trigger.emit(str1)
self.data1 = '500000FF03FF00001C000014010000D*00499900010001'
try:
sockLocal.send(self.data1.encode())
except socket.error:
print("\r\nsocket error,do reconnect ")
sockLocal.close()
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
except:
sockLocal.close()
print("other error")
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
time.sleep(0.3)
str1 = sockLocal.recv(BUFFSIZE).decode()
# print(glb_str1[16:18])
# 如果返回的資料長度是對應長度,那麼不再寫
if str1[16:18]=="04":
glb_tag = 0
# 等於2,PLC 資料查詢
elif glb_tag==2:
self.data1='500000FF03FF000018000004010000D*00'
# 先轉成字串,然後填滿四位
self.data2=("".join(re.findall("[D|d](\d+)",glb_data)))
self.data3=self.data2.zfill(4)
# print(glb_data+str(12))
# print(self.data2)
self.data=self.data1+self.data3+"0010"
# print(self.data)
try:
sockLocal.send(self.data.encode())
except socket.error:
print("\r\nsocket error,do reconnect ")
sockLocal.close()
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
except:
sockLocal.close()
print("other error")
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
time.sleep(0.3)
glb_tag = 0
str1 = sockLocal.recv(BUFFSIZE).decode()
# print(glb_str1)
self.trigger1.emit(str1)
# PLC資料寫入
elif glb_tag==3:
# print("li")
self.data1='500000FF03FF00001C000014010000D*00'
# 先轉成字串,然後填滿四位
self.data2=("".join(re.findall("(\d+)=",glb_data))).zfill(4)
self.data3=("".join(re.findall("=(\d+)",glb_data))).zfill(4)
# print(hex(int(self.data3)))
# 先轉成16進位制,然後匹配出來,舍掉0x,然後轉成字串,然後填滿0
self.data4=("".join(re.findall("0x(\w+)",hex(int(self.data3))))).zfill(4)
self.data=self.data1+self.data2+"0001"+self.data4
# print(self.data)
try:
sockLocal.send(self.data.encode())
except socket.error:
print("\r\nsocket error,do reconnect ")
sockLocal.close()
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
except:
sockLocal.close()
print("other error")
time.sleep(1)
sockLocal, BUFFSIZE = self.doConnect()
time.sleep(0.3)
str1 = sockLocal.recv(BUFFSIZE).decode()
glb_tag = 2
# glb_str1 = sockLocal.recv(BUFFSIZE).decode()
def run(self):
#必須從run開始啟動程式
# global data
self.tcpClient()