0

I am working on two scripts, one that works on a laptop and another that works on a rasberry pi. It should handle the main menu on the laptop, send it to the pi which decodes it and writes it to a csv. That part works. Next it should run a cognitive test where 15 images are displayed in a random order, when the laptop presses y or n it should advance and the key pressed should be sent to the pi to log it. When I run it, the laptop script gets stuck at "sent key" print statement and the pi gets stuck "waiting for a response" over and over again.

I'm out of ideas for how to get it to show more than 1 or 2 images.

I tried using 2 ports, I tried using 1 port. I added print statements everywhere to pin point where it stops working. Something to note is that the key reading should only start once the user information has been sent.

I am working on two scripts, one that works on a laptop and another that works on a rasberry pi. It should handle the main menu on the laptop, send it to the pi which decodes it and writes it to a csv. That part works. Next it should run a cognitive test where 15 images are displayed in a random order, when the laptop presses y or n it should advance and the key pressed should be sent to the pi to log it. When I run it, the laptop script gets stuck at "sent key" print statement and the pi gets stuck "waiting for a response" over and over again.

Here's the laptop script (the pi script is below):

from PIL import Image
import os
import socket
import time
from pynput import keyboard
from nicegui import app, ui
import random

HOST = "100.120.18.53"  # The server's hostname or IP address
PORT = 65433  # The port used by the server for the main menu in particular
content = ui.column()

# Function to send user information
def send_info(participant_information):
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect((HOST, PORT))
            s.sendall(participant_information.encode('utf-8'))
            data = s.recv(1024)
        return data.decode('utf-8')
    except ConnectionRefusedError:
        print("Connection refused. Retrying...")
        time.sleep(1)
        return send_info(participant_information)

# Function to save user information
def save_user_info(sex, height, activity, number, append=False):
    if sex == "Female":
        participant_sex = "1"
    elif sex == "Male":
        participant_sex = "0"
    if activity == 'Drunk':
        participant_activity = "1"
    else:
        participant_activity = "0"
    participant_height = str(height)
    participant_number = str(number)
    participant_info = f"S{participant_sex}H{participant_height}A{participant_activity}N{participant_number}"
    print(f"User Info Saved as {participant_info}")
    send_info(participant_info)

# Function for user input interface in NiceGUI
def user_info():
    with ui.column():
        ui.label("Enter the Participant's Information").classes('text-2xl font-bold')
        number = ui.input("Participant's Number").classes("w-64")
        sex = ui.radio(["Male", "Female"]).classes("w-64")
        height = ui.input("Participant's Height in 3 digits [cm]").classes("w-64")
        ui.label("Select the activity:").classes('text-lg')
        activity = ui.select(['Drunk', 'Sober'], value=None)

        def submit():
            print(f"Submit button clicked! S{sex.value} H{height.value} A{activity.value} N{number.value}")
            save_user_info(sex.value, height.value, activity.value, number.value)
            ui.notify("User Info Saved!", color="green")
            # After saving user info, run the cognitive test code
            run_cognitive_test()

        ui.button("Save and Continue", on_click=submit).classes("text-lg bg-blue-500 text-white p-2 rounded-lg")

# Cognitive test variables and logic
HOST_COGNITIVE = "100.120.18.53"
PORT_COGNITIVE = 65432

image_numbers = list(range(0, 17)) #image 0 is explination, others are answers corresponding to the participant images

image_paths = [
    # fr"/Users/test/Documents/HITS/Cognitive/Cognitive Proctor Images/cognitive_page_{num}.png" # Triss
    # fr"C:\Users\richy\Downloads\cognitive\images\cognitive_page_{num}.png" # Richard
     fr"C:/Users/chane/Desktop/HITS/HITS/Cognitive/Cognitive Proctor Images/cognitive_page_{num}.png" # Chanel
    for num in image_numbers
]

opened_image = None
started = False
ended = False
completed = False

def send_keystroke(key):
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.connect((HOST, PORT))
            s.sendall(key.encode('utf-8'))
            data = s.recv(1024)
        return data.decode('utf-8')
    except ConnectionRefusedError:
        print("Connection refused. Retrying...")
        time.sleep(1)
        return send_keystroke(key)

def show_image(image_path):
    global opened_image
    if opened_image:
        opened_image.close()
    opened_image = Image.open(image_path)
    opened_image.show()

def on_press(key):
    global started, ended, completed
    print(f"starting cases started = {started}, ended = {ended}, completed = {completed}") # it stops here the second time this runs
    try:
        if key.char == 's' and not started:
            started = True
            print("Test started.")
            # Show the first image
            response = send_keystroke(key.char)
            print(f"Received image number: {response}")
            show_image(image_paths[int(response)])
            print("came back from show image for instructions")
        elif (key.char == 'y' or key.char == 'n') and started:
            # Send the 'y' or 'n' response to the server
            print("sent key:", key)
            response = send_keystroke(key.char)
            print(f"received key response {response}")
            if (response == 'end'):
                print("Test Complete")
                ended = True
                completed = True
            else:
                print(f"Received image number: {response}")
                print(f"Next image: {image_numbers[response]}")
                show_image(image_paths[int(response)])  # Show the next randomized image
        elif key.char == 'e':
            print("Exiting program.")
            ended = True
            completed = False
              # Allows the user to exit the test if 'e' is pressed, false flag indicates incomplete test
        else:
                print("Invalid Input")
                
    except AttributeError:
        pass
    print ("exiting cases")
    return 0


# Run cognitive test once user info is submitted
def run_cognitive_test():
    global started, image_numbers, completed
    print("Press 's' to start the randomized image sequence.")
    print("Press 'y' or 'n' to open the next image after starting.")
    print("Press 'e' to exit the program.")

    # Show the explanation image first (cognitive_page_0)
    show_image(image_paths[0])

    while(completed == False):
        # Start listening for key events
        print("waiting for keypress")
        with keyboard.Listener(on_press=on_press) as listener:
            listener.join()

    print("loop exited")

with content:
    user_info()

ui.run()


Here's the pi script: 

from PIL import Image
import os
import random
import time
import csv
import socket

HOST = "100.120.18.53"  # Server's hostname or IP address
PORT_MAIN = 65433  # Port used by the main menu server
# PORT_COGNITIVE = 65432  # Port used by the cognitive test server
session_ended = False  # Flag to indicate if the session has ended
csv_directory = "/home/hits/Documents/GitHub/HITS/csv_files"  # Folder to save CSV files

# Function to decode the message into user info
def decode_message(message):
    """Decode the message SbHnnnAbNnnnnn into its components."""
    print(message)
    sex = "Female" if message[1] == "1" else "Male"  # Sx -> 1 is female, 0 is male
    height = int(message[3:6])  # Hnnn -> height in cm
    activity = "Drunk" if message[7] == "1" else "Sober"  # Ax -> 1 is drunk, 0 is sober
    participant_number = int(message[9:len(message)])  # b -> participant number
    return sex, height, activity, participant_number

# Function to append cognitive data starting at column F
def append_cognitive_data(file_path, cognitive_data):
    """Append cognitive data to the existing user CSV file, starting at column F."""
    with open(file_path, "a", newline="") as csvfile:
        writer = csv.writer(csvfile)
        # Add the cognitive data starting from column F
        writer.writerow(cognitive_data)

# Function to open and display an image
opened_image = None  # Initialize the variable at the global level

def show_image(image_path):
    global opened_image  # Use the global opened_image variable
    if opened_image:
        opened_image.close()  # Close the previous image if it exists
    opened_image = Image.open(image_path)
    opened_image.show()

# Function to handle client connections for the main menu
def handle_main_menu_client(conn_main, addr):
    global session_ended
    with conn_main:
        print(f"Connected by {addr} (Main Menu)")

        while True:
            data = conn_main.recv(1024)
            if not data:
                break
            message = data.decode('utf-8') 
            if session_ended:
                continue

            sex, height, activity, participant_number = decode_message(message)
            
            # Generate the CSV filename based on the message and save it in the csv_directory
            file_name = f"{message}.csv"
            file_name = file_name[:255].replace(":", "_").replace(" ", "_")
            file_path = os.path.join(csv_directory, file_name)  # Full path to the CSV file

            # Ensure the directory exists
            os.makedirs(csv_directory, exist_ok=True)

            # Open file in append mode without checking size first
            with open(file_path, "a", newline="") as csvfile:
                writer = csv.writer(csvfile)
                if os.path.getsize(file_path) == 0:
                    writer.writerow(["Sex", "Height (cm)", "Activity", "Participant Number", "Timestamp", "Image Number", "Color", "Word", "Response", "Time Taken"])
                log_data = [sex, height, activity, participant_number, time.time()]
                writer.writerow(log_data)

            # Send back the received message as the response
            conn_main.sendall(message.encode('utf-8'))  # Send the received message back to the proctor

            return file_path  # Return the file path for use in the cognitive test

# Function to handle client connections for the cognitive test
def handle_cognitive_test(conn_main, file_path):
    global session_ended
    image_numbers = list(range(1, 17))  # Assuming 16 images
    random.shuffle(image_numbers)

    # Defining attributes for each image
    image_colour = {
        1: "blue", 2: "green", 3: "red", 4: "yellow", 5: "blue", 6: "green", 7: "red", 8: "yellow", 9: "blue", 10: "green", 
        11: "red", 12: "yellow", 13: "blue", 14: "green", 15: "red", 16: "yellow"}
    image_word = {
        1: "red", 2: "red", 3: "red", 4: "red", 5: "blue", 6: "blue", 7: "blue", 8: "blue", 9: "yellow", 10: "yellow", 
        11: "yellow", 12: "yellow", 13: "green", 14: "green", 15: "green", 16: "green"}

    # List of image paths based on shuffled numbers
    image_paths = [
        fr"/home/hits/Documents/GitHub/HITS/Cognitive/Cognitive Participant Images/page_{num}.png"
        for num in image_numbers
    ]

    current_index = 0  # Tracks the current image index
    start_time = time.time()  # Start timing
    session_ended = False  # Flag to indicate if the session has ended

    with conn_main:
        print(f"Connected by {addr_main}")
        while True:
            
            print("top of while true")

            data_available = False

            while not data_available:
                data = conn_main.recv(1024)
                print("waiting for data")
                if data:
                    data_available = True
                    print("data received")

            #if not data: # this is the way it should be done but the new way is more informative
                #print("not data break trip")
                #break

            key = data.decode('utf-8')

            # Check if session has ended
            if session_ended == True:
                print("Session ended.")
                break

            end_time = time.time()
            time_taken = end_time - start_time

            print(f"Key Received: {key}") # Debug print

            if key == "s": 
                # Show the current image
                response = str(image_numbers[current_index])
                show_image(image_paths[current_index])
                start_time = time.time()  # Reset start time for the next image
                current_index += 1
            elif key in ["y", "n"]:
                if current_index < len(image_numbers):
                    response = str(image_numbers[current_index])
                    # Show next image
                    show_image(image_paths[current_index])
                    start_time = time.time()  # Reset start time for the next image
                    current_index += 1
                else:
                    # End the test when all images are shown
                    session_ended = True
                    print("Test completed.")
                    response = "end"
                    conn_main.sendall(response.encode('utf-8'))
                    break  # Exit the loop after all images are shown
            else:
                print(f"Invalid key pressed: {key}")
            
            print("logging data")
            # Log the data after each keystroke and image
            image_num = image_numbers[current_index - 1]
            colour = image_colour[image_num]
            word = image_word[image_num]
            # Additional data logging here (cognitive data)
            cognitive_data = [image_num, colour, word, key, time_taken]
            append_cognitive_data(file_path, cognitive_data)
            print("finished logging")
            conn_main.sendall(response.encode('utf-8')) 
            print("sent response")

# Main server code
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_main:
    s_main.bind((HOST, PORT_MAIN))
    s_main.listen()
    print(f"Server listening on {HOST}:{PORT_MAIN}")
    
    while True:
        conn_main, addr_main = s_main.accept()  # Accept main menu connection
        file_path = handle_main_menu_client(conn_main, addr_main)  # Handle main menu client and get file path
        conn_main, addr_main = s_main.accept()  # Accept cognitive test connection
        handle_cognitive_test(conn_main, file_path)  # Handle cognitive test client with file path
        print("finished calling handle_cognitive_test") 
        


I'm out of ideas for how to get it to show more than 1 or 2 images.

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.