import codecs
import threading
from tkinter import *
import serial
def GetFromQue(self): #pobieranie z kolejki
while self.RxBuf.empty() == False:
# emb.log('before GetFromQue',self.BytesInQueue)
if self.RxState == self.oenum.LOOKING_FOR_BEG:
self.RxFrame = ''
self.RxFrame = self.RxFrame + chr(self.RxBuf.get())
self.BytesInQueue = self.BytesInQueue - 1
if (ord(self.RxFrame[0]) != 0x06 and ord(self.RxFrame[0]) != 0x15 and ord(self.RxFrame[0]) != 0x02 and ord(
self.RxFrame[0]) != 0x03 and ord(self.RxFrame[0]) != 0x3F):
while self.RxBuf.empty() == False: # trochę niep
a = self.RxBuf.get()
self.BytesInQueue = self.BytesInQueue - 1
emb.log('?', a)
if (a == 0x06 or a == 0x15 or a == 0x3F or a == 0x02 or a == 0x03):
self.RxFrame = ''
self.RxFrame = self.RxFrame + chr(a)
break # tuuuuuuuuuuuuuuuuuuuuuuu
if (ord(self.RxFrame[0]) == 0x06 or ord(self.RxFrame[0]) == 0x15):
self.ReceiveFrame()
# return
if (ord(self.RxFrame[0]) == 0x02 or ord(self.RxFrame[0]) == 0x03 or ord(self.RxFrame[0]) == 0x3F):
self.RxState = self.oenum.LOOKING_FOR_LEN
self.startTimer102(10)
if self.RxState == self.oenum.LOOKING_FOR_LEN:
if self.RxBuf.empty() == False:
self.RxFrame = self.RxFrame + chr(self.RxBuf.get()) # LEN or MS cought
self.BytesInQueue = self.BytesInQueue - 1
if (ord(self.RxFrame[0]) == 0x3F):
self.RxState = self.oenum.LOOKING_FOR_BEG
# self.startTimer102(10)
self.stopTimer102()
self.ReceiveFrame()
# return
else: # a==0x02 or a==0x03:
self.RxState = self.oenum.LOOKING_FOR_CC
self.startTimer102(10)
if self.RxState == self.oenum.LOOKING_FOR_CC:
if self.RxBuf.empty() == False:
self.RxFrame = self.RxFrame + chr(self.RxBuf.get()) # CC cought
self.BytesInQueue = self.BytesInQueue - 1
if (ord(self.RxFrame[1]) != 0):
self.RxState = self.oenum.DATA_COLLECTING
self.startTimer102(10)
self.Data_Collecting_i = ord(self.RxFrame[1])
else:
self.RxState = self.oenum.LOOKING_FOR_CHECK_1BYTE
self.startTimer102(10)
if self.RxState == self.oenum.DATA_COLLECTING:
while self.Data_Collecting_i:
if self.RxBuf.empty() == False:
self.RxFrame = self.RxFrame + chr(self.RxBuf.get()) # CC cought
self.BytesInQueue = self.BytesInQueue - 1
self.Data_Collecting_i = self.Data_Collecting_i - 1
self.startTimer102(10)
# for i in range (0, ord(self.RxFrame[1])):
# if self.RxBuf.empty()==False:
# self.RxFrame=self.RxFrame+chr(self.RxBuf.get())# CC cought
if ((len(self.RxFrame) - 3) == ord(self.RxFrame[1])): # po co
self.RxState = self.oenum.LOOKING_FOR_CHECK_1BYTE # chyba tylko po to
self.startTimer102(10)
if self.RxState == self.oenum.LOOKING_FOR_CHECK_1BYTE:
if self.RxBuf.empty() == False:
self.RxFrame = self.RxFrame + chr(self.RxBuf.get()) # CHECK_1BYTE cought
self.BytesInQueue = self.BytesInQueue - 1
self.RxState = self.oenum.LOOKING_FOR_CHECK_2BYTE
self.startTimer102(10)
if self.RxState == self.oenum.LOOKING_FOR_CHECK_2BYTE:
if self.RxBuf.empty() == False:
self.RxFrame = self.RxFrame + chr(self.RxBuf.get()) # CHECK_2BYTE cought
self.BytesInQueue = self.BytesInQueue - 1
CheckSum = 0x10000
for i in range(1, 3 + ord(self.RxFrame[1])):
CheckSum = CheckSum + ord(self.RxFrame[i])
CheckSum = CheckSum - 0x10000
if ((CheckSum % 256 == ord(self.RxFrame[ord(self.RxFrame[1]) + 3])) and (
int(CheckSum / 256) == ord(self.RxFrame[ord(self.RxFrame[1]) + 4]))):
self.stopTimer102() # tu też co by zdazyc przetwarzac
self.ReceiveFrame()
else:
emb.log('Fr Err') # ,CheckSum%256,'i',self.RxFrame[ord(self.RxFrame[1])+3])
self.RxState = self.oenum.LOOKING_FOR_BEG
self.stopTimer102()
global otworz
def retrieve_input(): #odczyt i konwersja
global ascii_to_hex
inputValue = textBox.get("1.0","end-1c")
asci=(inputValue)
try:
#print(bytearray.fromhex(asci).decode())
ascii_to_hex=str((codecs.encode(asci.encode("ascii"), "hex")))
print(ascii_to_hex[1:])
except:
print("To nie jest kod hexadecymalny")
def serial_ports(): #Wyswietlanie dostepnych portow
global port,result
ports = ['COM%s' % (i + 1) for i in range(256)]
result = []
for port in ports:
try:
s = serial.Serial(port)
s.close()
result.append(port)
except (OSError, serial.SerialException):
pass
return result
def otworz():
global ser
try:
ser = serial.Serial(port=result[0], baudrate=57600, bytesize=serial.EIGHTBITS, parity=serial.PARITY_NONE, timeout=2) # otwarcie portu
if(ser.isOpen()):
ser.setRTS(True)
a=ser.inWaiting()
print("Port otwarto")
else:
print("Port jest już otwarty")
except:
print("Błąd, nie można otworzyć tego portu lub port jest już otwarty")
def read_all(port, chunk_size=200):
"""Read all characters on the serial port and return them."""
if not port.timeout:
raise TypeError('Port needs to have a timeout set!')
read_buffer = b''
while True:
# Read in chunks. Each chunk will wait as long as specified by
# timeout. Increase chunk_size to fail quicker
byte_chunk = port.read(size=chunk_size)
read_buffer += byte_chunk
if not len(byte_chunk) == chunk_size:
break
return read_buffer
global zamknij
def zamknij():
try:
if (ser.isOpen()):
ser.close()
print("Port zamknięto")
except:
print("Port nie jest otwarty")
def worker():
print("Worker")
threads=[]
for i in range(5):
t=threading.Thread(target=worker)
threads.append((t))
t.start()
okno = Tk()
topFrame = Frame(okno)
bottomFrame = Frame(okno)
info = Label(okno, text="")
open_button = Button(okno, text="Otwórz port", fg="green", command=otworz)
close_button = Button(okno, text="Zamknij port", fg="red", command=zamknij)
ports = Label(okno, text=serial_ports())
textBox = Text(okno, height=1, width=50)
textBox.pack()
buttonCommit=Button(okno, height=1, width=10, text="wyslij", command=lambda: retrieve_input())
buttonCommit.pack()
open_button.pack()
close_button.pack()
info.pack()
#entry1.pack()
variable = StringVar(okno)
variable.set("Porty")
w = OptionMenu(okno, variable, serial_ports())
#print(worker())
w.pack()
okno.mainloop()