0

I have a script that initiates two classes (control of a led strip and temp/hum sensor). Each class runs a while loop that can be terminated with signal_handler() which basically calls sys.exit(0). I was thinking about handling the exit of the main program with signal_handler() as I did for the classes themselves. However, when I try to CTRL + C out of the script, the program exits with error (see below the code) and the lights program doesn't exit properly (i.e., lights are still on when they should be off if exiting gracefully).

import threading
from light_controller import LightController
from thermometer import Thermometer
import signal

def signal_handler():
    print("\nhouse.py terminated with Ctrl+C.")
    if l_thread.is_alive():
        l_thread.join()
    if t_thread.is_alive():
        t_thread.join()
    sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)

lights = LightController()
temp = Thermometer()

t_thread = threading.Thread(target = temp.run)
t_thread.daemon = True
t_thread.start()

l_thread = threading.Thread(target = lights.run)
l_thread.daemon = True
l_thread.start()

Thermometer() terminated with Ctrl+C.
Exception ignored in: <module 'threading' from '/usr/lib/python3.7/threading.py'>
Traceback (most recent call last):
  File "/usr/lib/python3.7/threading.py", line 1281, in _shutdown
    t.join()
  File "/usr/lib/python3.7/threading.py", line 1032, in join
    self._wait_for_tstate_lock()
  File "/usr/lib/python3.7/threading.py", line 1048, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
  File "/home/pi/Desktop/house/thermometer.py", line 51, in signal_handler
    sys.exit(0)

My take is that this is happening because I have the signal_handler() replicated in the two classes and the main program. Both classes will run infinite loops and might be used by themselves, so I rather keep the signal_handler() inside each of the two classes. I'm not sure if it's possible to actually keep it like this. I also don't know if sys.exit() is actually the way to get out without causing errors down the line. I am OK with using a different exit method for the main program house.py instead of CTRL+C.

Update

Thank you for the spellcheck!

Here's the code for the classes.

thermometer.py

from luma.core.interface.serial import i2c
from luma.core.render import canvas
from luma.oled.device import ssd1306, ssd1325, ssd1331, sh1106
from luma.core.error import DeviceNotFoundError
import os
import time
import signal
import sys
import socket
from PIL import ImageFont, ImageDraw


# adafruit 
import board
import busio
from adafruit_htu21d import HTU21D


class Thermometer(object):
    """docstring for Thermometer"""
    def __init__(self):
        super(Thermometer, self).__init__()
        # TODO: Check for pixelmix.ttf in folder
        self.drawfont = "pixelmix.ttf"
        self.sleep_secs = 30
        try:
            signal.signal(signal.SIGINT, self.signal_handler)
            self.serial = i2c(port=1, address=0x3C)
            self.oled_device = ssd1306(self.serial, rotate=0)
        except DeviceNotFoundError:
            print("I2C mini OLED display not found.")
            sys.exit(1)
        try:
            # Create library object using our Bus I2C port
            #self.i2c_port = busio.I2C(board.SCL, board.SDA)
            #self.temp_sensor = HTU21D(self.i2c_port)
            print("Running temp in debug mode")
        except ValueError:
            print("Temperature sensor not found")
            sys.exit(1)

    def getIP(self):
        s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
        s.connect(("8.8.8.8", 80))
        ip = s.getsockname()[0]
        s.close()
        return ip

    def signal_handler(self, sig, frame):
            print("\nThermometer() terminated with Ctrl+C.")
            sys.exit(0)

    def run(self):
        try:
            while True:
                # Measure things
                temp_value = 25
                hum_value = 50
                #temp_value = round(self.temp_sensor.temperature, 1)
                #hum_value = round(self.temp_sensor.relative_humidity, 1)
                # Display results
                with canvas(self.oled_device) as draw:
                    draw.rectangle(self.oled_device.bounding_box, outline="white", fill="black")
                    font = ImageFont.truetype(self.drawfont, 10)
                    ip = self.getIP()
                    draw.text((5, 5), "IP: " + ip, fill="white", font=font)
                    font = ImageFont.truetype(self.drawfont, 12)
                    draw.text((5, 20), f"T: {temp_value} C", fill="white", font=font)
                    draw.text((5, 40), f"H: {hum_value}%", fill="white", font=font)
                # TODO ADD SAVING Here
                time.sleep(self.sleep_secs)
        except SystemExit:
            print("Exiting...")
            sys.exit(0)
        except:
            print("Unexpected error:", sys.exc_info()[0])
            sys.exit(2)

if __name__ == '__main__':
    thermo = Thermometer()
    thermo.run()

light_controller.py

import RPi.GPIO as GPIO
import time
import signal
import datetime
import sys

class LightController(object):
    """docstring for LightController"""
    def __init__(self):
        super(LightController, self).__init__()
        signal.signal(signal.SIGTERM, self.safe_exit)
        signal.signal(signal.SIGHUP, self.safe_exit)
        signal.signal(signal.SIGINT, self.safe_exit)
        self.red_pin = 9
        self.green_pin = 11
        # might be white pin if hooking up a white LED here
        self.blue_pin = 10
        GPIO.setmode(GPIO.BCM)
        GPIO.setwarnings(False)
        GPIO.setup(self.red_pin, GPIO.OUT)
        GPIO.setup(self.green_pin, GPIO.OUT)
        GPIO.setup(self.blue_pin, GPIO.OUT)

        self.pwm_red = GPIO.PWM(self.red_pin, 500)  # We need to activate PWM on LED so we can dim, use 1000 Hz 
        self.pwm_green = GPIO.PWM(self.green_pin, 500)  
        self.pwm_blue  = GPIO.PWM(self.blue_pin, 500)
        # Start PWM at 0% duty cycle (off)
        self.pwm_red.start(0)
        self.pwm_green.start(0)
        self.pwm_blue.start(0)

        self.pin_zip = zip([self.red_pin, self.green_pin, self.blue_pin], 
            [self.pwm_red, self.pwm_green, self.pwm_blue])

        # Config lights on-off cycle here
        self.lights_on = 7
        self.lights_off = 19
        
        print(f"Initalizing LightController with lights_on: {self.lights_on}h & lights_off: {self.lights_off}h")
        print("------------------------------")

    def change_intensity(self, pwm_object, intensity):
        pwm_object.ChangeDutyCycle(intensity)

    def run(self):
        while True:
            #for pin, pwm_object in self.pin_zip:
            #   pwm_object.ChangeDutyCycle(100)
            #   time.sleep(10)
            #   pwm_object.ChangeDutyCycle(20)
            #   time.sleep(10)
            #   pwm_object.ChangeDutyCycle(0)
            current_hour = datetime.datetime.now().hour
            # evaluate between
            if self.lights_on <= current_hour <= self.lights_off:
                self.pwm_blue.ChangeDutyCycle(100)
            else:
                self.pwm_blue.ChangeDutyCycle(0)
            # run this once a second
            time.sleep(1)

    # ------- Safe Exit ---------- #
    def safe_exit(self, signum, frame):
        print("\nLightController() terminated with Ctrl+C.")
        sys.exit(0)

if __name__ == '__main__':
    controller = LightController()
    controller.run()


5
  • You might want to change the spelling of deamon to daemon first, to see if that makes a difference... :) Commented May 31, 2022 at 13:57
  • Either way: please also show your thermometer and light_controller code. In general you'd have your main thread signal the other threads to stop their infinite loops. Commented May 31, 2022 at 13:59
  • @AKX Thank you for the comments, I guess now it goes directly to segmentation fault. I might need to hook up a while True to keep the classes running in the main program Commented May 31, 2022 at 14:05
  • Right – considering there's no need for those classes to have internal loops of their own, I'd really recommend switching to a while True: thermometer.step(); light_controller.step(); time.sleep(1) sort of thing in your main program. Commented May 31, 2022 at 14:07
  • When you say "no need for them to have internal loops on their own", wouldn't I need that to be able to just call python3 thermometer.py ? Commented May 31, 2022 at 14:09

1 Answer 1

1

Option 1: Threading is hard

To expand on what I mean with "no internal loops" – threading is hard, so let's do something else instead.

  1. I've added __enter__ and __exit__ to the Thermometer and LightController classes here; this makes them usable as context managers (i.e. with the with block). This is useful when you have objects that "own" other resources; in this case, the thermometer owns the serial device and the light controller touches GPIO.
  2. Then, instead of each class having .run(), where they'd stay forever, let's have the "outer" program control that: it runs in a forever while loop, and asks each "device" to do its thing before waiting for a second again. (You could also use the stdlib sched module to have the classes register functions to run at different intervals, or be otherwise clever if the different classes happen to need different check intervals.)
  3. Since there are no threads, there's no need to set up signal handlers either; a ctrl+c in the program bubbles up a KeyboardInterrupt exception like regular, and the with blocks' __exit__ handlers get their chance of cleaning up.
class Thermometer:
    def __enter__(self):
        self.serial = ...
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # TODO: Cleanup the i2c/ssd devices
        pass

    def step(self):
        """ Measure and draw things """
        # Measure things...
        # Draw things...


class LightController:
    def __enter__(self):
        GPIO.setmode(...)

    def __exit__(self, exc_type, exc_val, exc_tb):
        # TODO: cleanup GPIO
        pass

    def step(self):
        current_hour = datetime.datetime.now().hour
        # etc...


def main():
    with LightController() as lights, Thermometer() as temp:
        while True:
            lights.step()
            temp.step()
            time.sleep(1)


if __name__ == '__main__':
    main()

Option 2: Threading is hard but let's do it anyway

Another option, to have your threads cooperate and shut down when you want to, is to use an Event to control their internal loops.

The idea here is that instead of time.sleep() in the loops, you have Event.wait() doing the waiting, since it accepts an optional timeout to hang around for to wait for the event being set (or not). In fact, on some OSes, time.sleep() is implemented as having the thread wait on an anonymous event.

When you want the threads to quit, you set the stop event, and they'll finish up what they're doing.

I've also packaged this concept up into a "DeviceThread" here for convenience's sake.

import threading
import time


class DeviceThread(threading.Thread):
    interval = 1

    def __init__(self, stop_event):
        super().__init__(name=self.__class__.__name__)
        self.stop_event = stop_event

    def step(self):
        pass

    def initialize(self):
        pass

    def cleanup(self):
        pass

    def run(self):
        try:
            self.initialize()
            while not self.stop_event.wait(self.interval):
                self.step()
        finally:
            self.cleanup()


class ThermometerThread(DeviceThread):
    def initialize(self):
        self.serial = ...

    def cleanup(self):
        ...  # close serial port

    def step(self):
        ...  # measure and draw


def main():
    stop_event = threading.Event()
    threads = [ThermometerThread(stop_event)]
    for thread in threads:
        thread.start()
    try:
        while True:
            # Nothing to do in the main thread...
            time.sleep(1)
    except KeyboardInterrupt:
        print("Caught keyboard interrupt, stopping threads")
        stop_event.set()
    for thread in threads:
        print(f"Waiting for {thread.name} to stop")
        thread.join()
Sign up to request clarification or add additional context in comments.

3 Comments

Thank you I think I see your point, I will reconsider the code. And get back to accept the answer
You're welcome, @MatiasAndina! Please see my edit though, if you would like to keep on using threads :)
I ended up going with option 1 because it was the fastest way to get a working example, now the exit works as expected with no errors and everything shutting down. I have to add data saving and I guess the handling of the exit will be important to trigger save on __exit__() but I think this option should work fine :)

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.