-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathomron_modbus.py
87 lines (74 loc) · 3.01 KB
/
omron_modbus.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
from pyModbusTCP.client import ModbusClient
from pyModbusTCP import utils
class modBusWriteRead():
def __init__(self,client_host):
self.client_host = client_host
self.client_port = 502
self.err_list = []
self.connect() #buradan bağlantı yapılacak;
def connect(self):
self.modbus_c = ModbusClient()
self.modbus_c.host(self.client_host)
self.modbus_c.port(self.client_port)
if not self.modbus_c.is_open():
if not self.modbus_c.open():
text="unable to connect to " + self.client_host + ":" + str(self.client_port)
print(text)
def write_data_reg(self,address,list):
if self.modbus_c.open():
if len(list)>120:
sent_list = self.hazirla_dizi_to_write(list)
i = 0
hedef_reg_taban = address
for list_to_sent in sent_list:
hedef_reg = hedef_reg_taban + (i * 120)
a = self.modbus_c.write_multiple_registers(hedef_reg, list_to_sent)
if a == None or a == False:
self.err_list.append(False)
i += 1
else:
a = self.modbus_c.write_multiple_registers(address, list)
if a == None or a == False:
self.err_list.append(False)
if len(self.err_list) > 0:
self.err_list = []
pass
# dikkat
# print("data göndermede hata oluştu, tekrar deneyin !")
def hazirla_dizi_to_write(self,d_list):
# eğer gönderilecek değer 120 den büyük ise aşağıdaki fonksiyon 120 lik diziler döndürüyor
r_list = []
g_list = []
i = 0
for index in range(len(d_list)):
g_list.append(d_list[index])
i += 1
if i > 119:
i = 0
r_list.append(g_list)
g_list = []
if (len(d_list) - 1) == index and i < 119:
r_list.append(g_list)
return r_list
def read_data_reg(self,address,reg_count,read_float=False ):
# burada 16 lık ya da float olarak okunabiliyor
if self.modbus_c.is_open():
if read_float == False:
plc_list_int = self.modbus_c.read_holding_registers(address, reg_count)
return plc_list_int
elif read_float == True:
plc_list_f_16=self.modbus_c.read_holding_registers(address,reg_count)
if plc_list_f_16 is not None:
plc_list_float=self.long_to_float(plc_list_f_16)
return plc_list_float
def long_to_float(self,list_16):
list_float=[]
list_16.reverse()
list_long=utils.word_list_to_long(list_16)
for any_long in list_long:
list_float.append(utils.decode_ieee(any_long))
list_float.reverse()
return list_float
plc = modBusWriteRead("192.168.250.3")
deneme = plc.read_data_reg(0, 10, False)
print(deneme)