Search code examples
pythonmultithreadingurwid

urwid - output screen for endless loop


I'm trying to make a simple urwid an output screen for an endless loop. It needs to output data coming from another class.

The solution I've found right now is: have a Printer class (a test replacer for the actual outputting class) with a queue attribute. When it needs to display something, it appends it to queue. Then, there is an Interface class - the actual interface - with its own Printer instance. A thread running in parallel with MainLoop checks if queue has items and, if so, outputs them. Since Printer's main function is an infinite loop, it has its own thread too - in this test, it simply outputs "Hello" every few seconds.

Here is the code:

import urwid
import threading
import time

class Interface:
    palette = [
        ('body', 'white', 'black'),
        ('ext', 'white', 'dark blue'),
        ('ext_hi', 'light cyan', 'dark blue', 'bold'),
        ]

    header_text = [
        ('ext_hi', 'ESC'), ':quit        ',
        ('ext_hi', 'UP'), ',', ('ext_hi', 'DOWN'), ':scroll',
        ]

    def __init__(self):
        self.header = urwid.AttrWrap(urwid.Text(self.header_text), 'ext')
        self.flowWalker = urwid.SimpleListWalker([])
        self.body = urwid.ListBox(self.flowWalker)
        self.footer = urwid.AttrWrap(urwid.Edit("Edit:  "), 'ext')
        self.view = urwid.Frame(
            urwid.AttrWrap(self.body, 'body'),
            header = self.header,
            footer = self.footer)
        self.loop = urwid.MainLoop(self.view, self.palette, 
            unhandled_input = self.unhandled_input)
        self.printer = Printer()

    def start(self):
        t1 = threading.Thread(target = self.fill_screen)
        t1.daemon = True
        t2 = threading.Thread(target = self.printer.fill_queue)
        t2.daemon = True
        t1.start()
        t2.start()
        self.loop.run()

    def unhandled_input(self, k):
        if k == 'esc':
            raise urwid.ExitMainLoop()

    def fill_screen(self):  
        while True:
            if self.printer.queue:
                self.flowWalker.append(urwid.Text(('body', self.printer.queue.pop(0))))
                try:
                    self.loop.draw_screen()
                    self.body.set_focus(len(self.flowWalker)-1, 'above')
                except AssertionError: pass

    def to_screen(self, text):
        self.queue.append(text)


class Printer:
    def __init__(self):
        self.message = 'Hello'
        self.queue = []

    def fill_queue(self):
        while 1:
            self.queue.append(self.message)
            time.sleep(2)


if __name__ == '__main__':
    i = Interface()
    i.start()

It works, but it seems way too messy to me and I'm afraid it could end up being some sort of coding horror. Is there a simpler way to accomplish the task?


Solution

  • If you need an external thread, consider the following code. It sets up a queue, starts a "send current time to queue" thread, then runs the main interface. The interface checks the shared queue every now and then, updating itself as necessary.

    When the interface exits, the outer code signals the thread to exit politely.

    source

    import logging, Queue, sys, threading, time
    
    import urwid
    
    logging.basicConfig(
        level=logging.DEBUG,
        format="%(asctime)-4s %(threadName)s %(message)s", 
        datefmt="%H:%M:%S",
        filename='trace.log',
    )
    
    class Interface:
        palette = [
            ('body', 'white', 'black'),
            ('ext', 'white', 'dark blue'),
            ('ext_hi', 'light cyan', 'dark blue', 'bold'),
            ]
    
        header_text = [
            ('ext_hi', 'ESC'), ':quit        ',
            ('ext_hi', 'UP'), ',', ('ext_hi', 'DOWN'), ':scroll',
            ]
    
        def __init__(self, msg_queue):
            self.header = urwid.AttrWrap(urwid.Text(self.header_text), 'ext')
            self.flowWalker = urwid.SimpleListWalker([])
            self.body = urwid.ListBox(self.flowWalker)
            self.footer = urwid.AttrWrap(urwid.Edit("Edit:  "), 'ext')
            self.view = urwid.Frame(
                urwid.AttrWrap(self.body, 'body'),
                header = self.header,
                footer = self.footer)
            self.loop = urwid.MainLoop(self.view, self.palette, 
                unhandled_input = self.unhandled_input)
            self.msg_queue = msg_queue
            self.check_messages(self.loop, None)
    
        def unhandled_input(self, k):
            if k == 'esc':
                raise urwid.ExitMainLoop()
    
        def check_messages(self, loop, *_args):
            """add message to bottom of screen"""
            loop.set_alarm_in(
                sec=0.5,
                callback=self.check_messages,
                )
            try:
                msg = self.msg_queue.get_nowait()
            except Queue.Empty:
                return
            self.flowWalker.append(
                urwid.Text(('body', msg))
                )
            self.body.set_focus(
                len(self.flowWalker)-1, 'above'
                )
    
    def update_time(stop_event, msg_queue):
        """send timestamp to queue every second"""
        logging.info('start')
        while not stop_event.wait(timeout=1.0):
            msg_queue.put( time.strftime('time %X') )
        logging.info('stop')
    
    if __name__ == '__main__':
    
        stop_ev = threading.Event()
        message_q = Queue.Queue()
    
        threading.Thread(
            target=update_time, args=[stop_ev, message_q],
            name='update_time',
        ).start()
    
        logging.info('start')
        Interface(message_q).loop.run()
        logging.info('stop')
    
        # after interface exits, signal threads to exit, wait for them
        logging.info('stopping threads')
    
        stop_ev.set()
        for th in threading.enumerate():
            if th != threading.current_thread():
                th.join()