본문 바로가기

언어/Python

PyQt serial terminal

출처: https://iosoft.blog/2019/04/30/pyqt-serial-terminal/

 

PyQt serial terminal

A simple demonstration of threading in PyQt I do a lot of work with serial comms; TeraTerm is an excellent serial terminal, but sometimes a customised application is required, for example when deal…

iosoft.blog

A simple demonstration of threading in PyQt

I do a lot of work with serial comms; TeraTerm is an excellent serial terminal, but sometimes a customised application is required, for example when dealing with binary data.

The following blog describes a simple Windows & Linux serial terminal, that can be adapted to handle special protocols; it also demonstrates the creation of Python threads, and can serve as a basis for other multi-threaded applications.

It won’t win any awards for style, but could come in handy next time you encounter an obscure serial protocol.

Compatibility

My code is compatible with Python 2.7 and 3.x, PyQt v4 and v5, running on Windows or Linux. This necessitates some rather clunky inclusions at the top of the file:

try: from PyQt4 import QtGui, QtCore from PyQt4.QtGui import QTextEdit, QWidget, QApplication, QVBoxLayout except: from PyQt5 import QtGui, QtCore from PyQt5.QtWidgets import QTextEdit, QWidget, QApplication, QVBoxLayout try: import Queue except: import queue as Queue

There is also an issue with the different ways Python 2 and 3 handle serial data; older versions assume that the data type is an ASCII string, while in later versions it is a binary type.

This can lead to all sorts of problems (such as unwanted exceptions) so I’ve included specific functions to ensure that all outgoing data is converted from the internal representation (which could be Unicode) to a string of bytes, and incoming data is converted from the external type (data bytes or string) to a string type:

# Convert a string to bytes (for Python 3) def str_bytes(s): return s.encode('latin-1') # Convert bytes to string (if Python 3) def bytes_str(d): return d if type(d) is str else "".join([chr(b) for b in d])

You’ll need to install pySerial by the usual methods (e.g. ‘pip install’).

Using threading

The key problem with serial communications is the time it takes; the slower the data rate, the longer the delay before anything happens. Without threading (or any other mitigation scheme) the User Interface (UI) will lock up after each command, waiting for the response. At best, this makes the application appear sluggish and unresponsive; at worst, it can appear to have failed, waiting for a response that never comes.

Threading allows the UI to carry on interacting with the user, while simultaneously keeping the serial link alive. Creating a new thread is really easy in Python; you just subclass QThread, and instantiate it:

class SerialThread(QtCore.QThread): def __init__(self, portname, baudrate): QtCore.QThread.__init__(self) self.portname, self.baudrate = portname, baudrate ... class MyWidget(QWidget): ... self.serth = SerialThread(portname, baudrate)

I have chosen to supply the serial parameters (port name & baud rate) when the thread object is instantiated, so they are available when the thread is started; this is done by calling the start() method, which will call a run() method in the QThread class:

# In serial thread: def run(self): print("Opening %s at %u baud" % (self.portname, self.baudrate)) ... # In UI thread: self.serth.start()

The run() method needs to keep running in a perpetual loop, but it must be possible to terminate that loop when the program exits – Python won’t do it automatically. I use a global variable, that can be set false to terminate, e.g. in pseudocode:

def run(self): [starting: open serial port] while self.running: [check for incoming characters] [check for outgoing characters] [finished: close serial port]

When the application is closing, it terminates the thread by setting the boolean variable false, then (very importantly) waits for the thread to finish its execution:

def closeEvent(self, event): self.serth.running = False self.serth.wait()

Transmitting

The user will be entering keystrokes in the UI thread, and it is tempting to call the serial transmit function from that thread, but this isn’t a good idea; it is better to pass the keystrokes across to the serial thread for transmission, and we need a thread-safe method of doing this. That means we can’t just use a global shared string variable; Python does a lot of behind-the-scenes processing that could lead to an unpredictable result. Instead, we’ll use a First In First Out (FIFO) queue:

# In UI thread.. txq = Queue.Queue() ... txq.put(s) # Add string to queue ... # In serial thread.. if not txq.empty(): txd = txq.get() # Get string from queue

So the serial thread polls the transmit queue for any data, outputting it to the serial port.

Receiving

We could use the same technique for received data; the serial thread could add it to a queue that is polled by the UI thread, and somehow trigger a UI redraw when the new data arrives, but I prefer to use a signal; the data is attached to that signal, and is received by a UI function that has registered a connection. The signal has to be in a class definition, and must specify the type of data that will be attached:

class MyWidget(QWidget): text_update = QtCore.pyqtSignal(str)

The signal is connected to a function that will process the data:

self.text_update.connect(self.append_text)

So now the serial thread just has to generate a signal when new data is received:

self.text_update.emit(text)

This technique would be quite adequate, but I do like having the output from all my ‘print’ function calls redirected to the same window; it makes for cleaner error reporting when things go wrong, rather than having a separate console with error messages. This is done by redirecting stdout to my widget, and adding write() and flush() handlers:

class MyWidget(QWidget): text_update = QtCore.pyqtSignal(str) def __init__(self, *args): ... self.text_update.connect(self.append_text) sys.stdout = self ... def write(self, text): self.text_update.emit(text) def flush(self): pass ... def append_text(self, text):   [add text to UI display]

So now, every time I make a print() call, a signal is sent to my append_text function, where the display is updated. The use of a signal means that I can still call print() from any thread, without fear of strange cross-threading problems.

Polling in serial thread

The serial thread is just polling for incoming and outgoing characters, and if there are none, the processor will execute the ‘while’ loop really quickly. In the absence of any delays, it will consume a lot of CPU time just checking for things that don’t exist. This may appear harmless, but it is quite alarming for the user if the CPU fan starts spinning rapidly whenever your application is running, and a laptop user won’t be happy if you needlessly drain their battery by performing pointless tasks. So we need to add some harmless time-wasting to the polling loop, by frequently returning control to the operating system. This can be done by calling the ‘sleep’ function, but we still want the software to be responsive when some serial data actually arrives. A suitable compromise is to use a serial read-function with a timeout, so the software ‘blocks’ (i.e. stalls) until either some characters are received, or there is a timeout:

self.ser = serial.Serial(self.portname, self.baudrate, timeout=SER_TIMEOUT) ... s = self.ser.read(self.ser.in_waiting or 1)

In case you are unfamiliar with the usage, the ‘or’ function returns the left-hand side if it is true (non-zero), otherwise the right-hand side. So every read attempt is at least 1 character, or more characters if they are available. If none are present, the read function waits until the timeout, so when the serial line is idle, most time will be spent waiting in the operating system.

User Interface

This has been kept as simple as possible, with just a text box as a main widget. One minor complication is that as standard, the text box (which is actually a QTextEdit control) will capture and display all keystrokes, so we need to subclass it to intercept the keys, and call a handler function that adds them to the serial transmit queue. I didn’t want to burden the text box with this functionality, so put the handler in its parent, which is the main widget:

class MyTextBox(QTextEdit): def __init__(self, *args): QTextEdit.__init__(self, *args) def keyPressEvent(self, event): self.parent().keypress_handler(event)

The keystroke handler in the main widget gets the character from the key event, and checks for a ctrl-V ‘paste’ request; I’ve included this feature because I find it useful to cut-and-paste frequently-used serial commands from a document, rather than re-typing them every time.

# In main widget: def keypress_handler(self, event): k = event.key() s = RETURN_CHAR if k==QtCore.Qt.Key_Return else event.text() if len(s)>0 and s[0]==PASTE_CHAR: cb = QApplication.clipboard() self.serth.ser_out(cb.text()) else: self.serth.ser_out(s)

Interestingly, with PyQt5 you can cut-and-paste full 8-bit data (i.e. bytes with the high bit set), but this doesn’t seem to work in PyQt4, which only accepts the usual ASCII character set.

I haven’t included any menu options, you have to specify the COM port on the command-line using the -c option, and baud rate using -b. There is also a -x option to display incoming data in hexadecimal, for example:

Windows: python pyqt_serialterm.py -c com2 -b 9600 -x Linux: python pyqt_serialterm.py -c /dev/ttyUSB0 -b 115200

On Linux, if you want non-root access to a serial port, you will generally need to add your username to the ‘dialout’ group:

sudo usermod -a -G dialout $USER

..then log out & back in.

Source code

# Simple PyQT serial terminal v0.09 from iosoft.blog
 
from PyQt5 import QtGui, QtCore
from PyQt5.QtWidgets import QTextEdit, QWidget, QApplication, QVBoxLayout
try:
    import Queue
except:
    import queue as Queue
import sys, time, serial
 
WIN_WIDTH, WIN_HEIGHT = 684, 400    # Window size
SER_TIMEOUT = 0.1                   # Timeout for serial Rx
RETURN_CHAR = "\n"                  # Char to be sent when Enter key pressed
PASTE_CHAR  = "\x16"                # Ctrl code for clipboard paste
baudrate    = 115200                # Default baud rate
portname    = "COM1"                # Default port name
hexmode     = False                 # Flag to enable hex display
 
# Convert a string to bytes
def str_bytes(s):
    return s.encode('latin-1')
     
# Convert bytes to string
def bytes_str(d):
    return d if type(d) is str else "".join([chr(b) for b in d])
     
# Return hexadecimal values of data
def hexdump(data):
    return " ".join(["%02X" % ord(b) for b in data])
 
# Return a string with high-bit chars replaced by hex values
def textdump(data):
    return "".join(["[%02X]" % ord(b) if b>'\x7e' else b for b in data])
     
# Display incoming serial data
def display(s):
    if not hexmode:
        sys.stdout.write(textdump(str(s)))
    else:
        sys.stdout.write(hexdump(s) + ' ')
 
# Custom text box, catching keystrokes
class MyTextBox(QTextEdit):
    def __init__(self, *args): 
        QTextEdit.__init__(self, *args)
         
    def keyPressEvent(self, event):     # Send keypress to parent's handler
        self.parent().keypress_handler(event)
             
# Main widget            
class MyWidget(QWidget):
    text_update = QtCore.pyqtSignal(str)
         
    def __init__(self, *args): 
        QWidget.__init__(self, *args)
        self.textbox = MyTextBox()              # Create custom text box
        font = QtGui.QFont()
        font.setFamily("Courier New")           # Monospaced font
        font.setPointSize(10)
        self.textbox.setFont(font)
        layout = QVBoxLayout()
        layout.addWidget(self.textbox)
        self.setLayout(layout)
        self.resize(WIN_WIDTH, WIN_HEIGHT)      # Set window size
        self.text_update.connect(self.append_text)      # Connect text update to handler
        sys.stdout = self                               # Redirect sys.stdout to self
        self.serth = SerialThread(portname, baudrate)   # Start serial thread
        self.serth.start()
         
    def write(self, text):                      # Handle sys.stdout.write: update display
        self.text_update.emit(text)             # Send signal to synchronise call with main thread
         
    def flush(self):                            # Handle sys.stdout.flush: do nothing
        pass
 
    def append_text(self, text):                # Text display update handler
        cur = self.textbox.textCursor()
        cur.movePosition(QtGui.QTextCursor.End) # Move cursor to end of text
        s = str(text)
        while s:
            head,sep,s = s.partition("\n")      # Split line at LF
            cur.insertText(head)                # Insert text at cursor
            if sep:                             # New line if LF
                cur.insertBlock()
        self.textbox.setTextCursor(cur)         # Update visible cursor
 
    def keypress_handler(self, event):          # Handle keypress from text box
        k = event.key()
        s = RETURN_CHAR if k==QtCore.Qt.Key_Return else event.text()
        if len(s)>0 and s[0]==PASTE_CHAR:       # Detect ctrl-V paste
            cb = QApplication.clipboard() 
            self.serth.ser_out(cb.text())       # Send paste string to serial driver
        else:
            self.serth.ser_out(s)               # ..or send keystroke
     
    def closeEvent(self, event):                # Window closing
        self.serth.running = False              # Wait until serial thread terminates
        self.serth.wait()
         
# Thread to handle incoming & outgoing serial data
class SerialThread(QtCore.QThread):
    def __init__(self, portname, baudrate): # Initialise with serial port details
        QtCore.QThread.__init__(self)
        self.portname, self.baudrate = portname, baudrate
        self.txq = Queue.Queue()
        self.running = True
 
    def ser_out(self, s):                   # Write outgoing data to serial port if open
        self.txq.put(s)                     # ..using a queue to sync with reader thread
         
    def ser_in(self, s):                    # Write incoming serial data to screen
        display(s)
         
    def run(self):                          # Run serial reader thread
        print("Opening %s at %u baud %s" % (self.portname, self.baudrate,
              "(hex display)" if hexmode else ""))
        try:
            self.ser = serial.Serial(self.portname, self.baudrate, timeout=SER_TIMEOUT)
            time.sleep(SER_TIMEOUT*1.2)
            self.ser.flushInput()
        except:
            self.ser = None
        if not self.ser:
            print("Can't open port")
            self.running = False
        while self.running:
            s = self.ser.read(self.ser.in_waiting or 1)
            if s:                                       # Get data from serial port
                self.ser_in(bytes_str(s))               # ..and convert to string
            if not self.txq.empty():
                txd = str(self.txq.get())               # If Tx data in queue, write to serial port
                self.ser.write(str_bytes(txd))
        if self.ser:                                    # Close serial port when thread finished
            self.ser.close()
            self.ser = None
             
if __name__ == "__main__":
    app = QApplication(sys.argv) 
    opt = err = None
    for arg in sys.argv[1:]:                # Process command-line options
        if len(arg)==2 and arg[0]=="-":
            opt = arg.lower()
            if opt == '-x':                 # -X: display incoming data in hex
                hexmode = True
                opt = None
        else:
            if opt == '-b':                 # -B num: baud rate, e.g. '9600'
                try:
                    baudrate = int(arg)
                except:
                    err = "Invalid baudrate '%s'" % arg
            elif opt == '-c':               # -C port: serial port name, e.g. 'COM1'
                portname = arg
    if err:
        print(err)
        sys.exit(1)
    w = MyWidget()
    w.setWindowTitle('PyQT Serial Terminal ' + VERSION)
    w.show() 
    sys.exit(app.exec_())
         
# EOF