I am working on two scripts, one that works on a laptop and another that works on a rasberry pi. It should handle the main menu on the laptop, send it to the pi which decodes it and writes it to a csv. That part works. Next it should run a cognitive test where 15 images are displayed in a random order, when the laptop presses y or n it should advance and the key pressed should be sent to the pi to log it. When I run it, the laptop script gets stuck at "sent key" print statement and the pi gets stuck "waiting for a response" over and over again.
I'm out of ideas for how to get it to show more than 1 or 2 images.
I tried using 2 ports, I tried using 1 port. I added print statements everywhere to pin point where it stops working. Something to note is that the key reading should only start once the user information has been sent.
I am working on two scripts, one that works on a laptop and another that works on a rasberry pi. It should handle the main menu on the laptop, send it to the pi which decodes it and writes it to a csv. That part works. Next it should run a cognitive test where 15 images are displayed in a random order, when the laptop presses y or n it should advance and the key pressed should be sent to the pi to log it. When I run it, the laptop script gets stuck at "sent key" print statement and the pi gets stuck "waiting for a response" over and over again.
Here's the laptop script (the pi script is below):
from PIL import Image
import os
import socket
import time
from pynput import keyboard
from nicegui import app, ui
import random
HOST = "100.120.18.53" # The server's hostname or IP address
PORT = 65433 # The port used by the server for the main menu in particular
content = ui.column()
# Function to send user information
def send_info(participant_information):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall(participant_information.encode('utf-8'))
data = s.recv(1024)
return data.decode('utf-8')
except ConnectionRefusedError:
print("Connection refused. Retrying...")
time.sleep(1)
return send_info(participant_information)
# Function to save user information
def save_user_info(sex, height, activity, number, append=False):
if sex == "Female":
participant_sex = "1"
elif sex == "Male":
participant_sex = "0"
if activity == 'Drunk':
participant_activity = "1"
else:
participant_activity = "0"
participant_height = str(height)
participant_number = str(number)
participant_info = f"S{participant_sex}H{participant_height}A{participant_activity}N{participant_number}"
print(f"User Info Saved as {participant_info}")
send_info(participant_info)
# Function for user input interface in NiceGUI
def user_info():
with ui.column():
ui.label("Enter the Participant's Information").classes('text-2xl font-bold')
number = ui.input("Participant's Number").classes("w-64")
sex = ui.radio(["Male", "Female"]).classes("w-64")
height = ui.input("Participant's Height in 3 digits [cm]").classes("w-64")
ui.label("Select the activity:").classes('text-lg')
activity = ui.select(['Drunk', 'Sober'], value=None)
def submit():
print(f"Submit button clicked! S{sex.value} H{height.value} A{activity.value} N{number.value}")
save_user_info(sex.value, height.value, activity.value, number.value)
ui.notify("User Info Saved!", color="green")
# After saving user info, run the cognitive test code
run_cognitive_test()
ui.button("Save and Continue", on_click=submit).classes("text-lg bg-blue-500 text-white p-2 rounded-lg")
# Cognitive test variables and logic
HOST_COGNITIVE = "100.120.18.53"
PORT_COGNITIVE = 65432
image_numbers = list(range(0, 17)) #image 0 is explination, others are answers corresponding to the participant images
image_paths = [
# fr"/Users/test/Documents/HITS/Cognitive/Cognitive Proctor Images/cognitive_page_{num}.png" # Triss
# fr"C:\Users\richy\Downloads\cognitive\images\cognitive_page_{num}.png" # Richard
fr"C:/Users/chane/Desktop/HITS/HITS/Cognitive/Cognitive Proctor Images/cognitive_page_{num}.png" # Chanel
for num in image_numbers
]
opened_image = None
started = False
ended = False
completed = False
def send_keystroke(key):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((HOST, PORT))
s.sendall(key.encode('utf-8'))
data = s.recv(1024)
return data.decode('utf-8')
except ConnectionRefusedError:
print("Connection refused. Retrying...")
time.sleep(1)
return send_keystroke(key)
def show_image(image_path):
global opened_image
if opened_image:
opened_image.close()
opened_image = Image.open(image_path)
opened_image.show()
def on_press(key):
global started, ended, completed
print(f"starting cases started = {started}, ended = {ended}, completed = {completed}") # it stops here the second time this runs
try:
if key.char == 's' and not started:
started = True
print("Test started.")
# Show the first image
response = send_keystroke(key.char)
print(f"Received image number: {response}")
show_image(image_paths[int(response)])
print("came back from show image for instructions")
elif (key.char == 'y' or key.char == 'n') and started:
# Send the 'y' or 'n' response to the server
print("sent key:", key)
response = send_keystroke(key.char)
print(f"received key response {response}")
if (response == 'end'):
print("Test Complete")
ended = True
completed = True
else:
print(f"Received image number: {response}")
print(f"Next image: {image_numbers[response]}")
show_image(image_paths[int(response)]) # Show the next randomized image
elif key.char == 'e':
print("Exiting program.")
ended = True
completed = False
# Allows the user to exit the test if 'e' is pressed, false flag indicates incomplete test
else:
print("Invalid Input")
except AttributeError:
pass
print ("exiting cases")
return 0
# Run cognitive test once user info is submitted
def run_cognitive_test():
global started, image_numbers, completed
print("Press 's' to start the randomized image sequence.")
print("Press 'y' or 'n' to open the next image after starting.")
print("Press 'e' to exit the program.")
# Show the explanation image first (cognitive_page_0)
show_image(image_paths[0])
while(completed == False):
# Start listening for key events
print("waiting for keypress")
with keyboard.Listener(on_press=on_press) as listener:
listener.join()
print("loop exited")
with content:
user_info()
ui.run()
Here's the pi script:
from PIL import Image
import os
import random
import time
import csv
import socket
HOST = "100.120.18.53" # Server's hostname or IP address
PORT_MAIN = 65433 # Port used by the main menu server
# PORT_COGNITIVE = 65432 # Port used by the cognitive test server
session_ended = False # Flag to indicate if the session has ended
csv_directory = "/home/hits/Documents/GitHub/HITS/csv_files" # Folder to save CSV files
# Function to decode the message into user info
def decode_message(message):
"""Decode the message SbHnnnAbNnnnnn into its components."""
print(message)
sex = "Female" if message[1] == "1" else "Male" # Sx -> 1 is female, 0 is male
height = int(message[3:6]) # Hnnn -> height in cm
activity = "Drunk" if message[7] == "1" else "Sober" # Ax -> 1 is drunk, 0 is sober
participant_number = int(message[9:len(message)]) # b -> participant number
return sex, height, activity, participant_number
# Function to append cognitive data starting at column F
def append_cognitive_data(file_path, cognitive_data):
"""Append cognitive data to the existing user CSV file, starting at column F."""
with open(file_path, "a", newline="") as csvfile:
writer = csv.writer(csvfile)
# Add the cognitive data starting from column F
writer.writerow(cognitive_data)
# Function to open and display an image
opened_image = None # Initialize the variable at the global level
def show_image(image_path):
global opened_image # Use the global opened_image variable
if opened_image:
opened_image.close() # Close the previous image if it exists
opened_image = Image.open(image_path)
opened_image.show()
# Function to handle client connections for the main menu
def handle_main_menu_client(conn_main, addr):
global session_ended
with conn_main:
print(f"Connected by {addr} (Main Menu)")
while True:
data = conn_main.recv(1024)
if not data:
break
message = data.decode('utf-8')
if session_ended:
continue
sex, height, activity, participant_number = decode_message(message)
# Generate the CSV filename based on the message and save it in the csv_directory
file_name = f"{message}.csv"
file_name = file_name[:255].replace(":", "_").replace(" ", "_")
file_path = os.path.join(csv_directory, file_name) # Full path to the CSV file
# Ensure the directory exists
os.makedirs(csv_directory, exist_ok=True)
# Open file in append mode without checking size first
with open(file_path, "a", newline="") as csvfile:
writer = csv.writer(csvfile)
if os.path.getsize(file_path) == 0:
writer.writerow(["Sex", "Height (cm)", "Activity", "Participant Number", "Timestamp", "Image Number", "Color", "Word", "Response", "Time Taken"])
log_data = [sex, height, activity, participant_number, time.time()]
writer.writerow(log_data)
# Send back the received message as the response
conn_main.sendall(message.encode('utf-8')) # Send the received message back to the proctor
return file_path # Return the file path for use in the cognitive test
# Function to handle client connections for the cognitive test
def handle_cognitive_test(conn_main, file_path):
global session_ended
image_numbers = list(range(1, 17)) # Assuming 16 images
random.shuffle(image_numbers)
# Defining attributes for each image
image_colour = {
1: "blue", 2: "green", 3: "red", 4: "yellow", 5: "blue", 6: "green", 7: "red", 8: "yellow", 9: "blue", 10: "green",
11: "red", 12: "yellow", 13: "blue", 14: "green", 15: "red", 16: "yellow"}
image_word = {
1: "red", 2: "red", 3: "red", 4: "red", 5: "blue", 6: "blue", 7: "blue", 8: "blue", 9: "yellow", 10: "yellow",
11: "yellow", 12: "yellow", 13: "green", 14: "green", 15: "green", 16: "green"}
# List of image paths based on shuffled numbers
image_paths = [
fr"/home/hits/Documents/GitHub/HITS/Cognitive/Cognitive Participant Images/page_{num}.png"
for num in image_numbers
]
current_index = 0 # Tracks the current image index
start_time = time.time() # Start timing
session_ended = False # Flag to indicate if the session has ended
with conn_main:
print(f"Connected by {addr_main}")
while True:
print("top of while true")
data_available = False
while not data_available:
data = conn_main.recv(1024)
print("waiting for data")
if data:
data_available = True
print("data received")
#if not data: # this is the way it should be done but the new way is more informative
#print("not data break trip")
#break
key = data.decode('utf-8')
# Check if session has ended
if session_ended == True:
print("Session ended.")
break
end_time = time.time()
time_taken = end_time - start_time
print(f"Key Received: {key}") # Debug print
if key == "s":
# Show the current image
response = str(image_numbers[current_index])
show_image(image_paths[current_index])
start_time = time.time() # Reset start time for the next image
current_index += 1
elif key in ["y", "n"]:
if current_index < len(image_numbers):
response = str(image_numbers[current_index])
# Show next image
show_image(image_paths[current_index])
start_time = time.time() # Reset start time for the next image
current_index += 1
else:
# End the test when all images are shown
session_ended = True
print("Test completed.")
response = "end"
conn_main.sendall(response.encode('utf-8'))
break # Exit the loop after all images are shown
else:
print(f"Invalid key pressed: {key}")
print("logging data")
# Log the data after each keystroke and image
image_num = image_numbers[current_index - 1]
colour = image_colour[image_num]
word = image_word[image_num]
# Additional data logging here (cognitive data)
cognitive_data = [image_num, colour, word, key, time_taken]
append_cognitive_data(file_path, cognitive_data)
print("finished logging")
conn_main.sendall(response.encode('utf-8'))
print("sent response")
# Main server code
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s_main:
s_main.bind((HOST, PORT_MAIN))
s_main.listen()
print(f"Server listening on {HOST}:{PORT_MAIN}")
while True:
conn_main, addr_main = s_main.accept() # Accept main menu connection
file_path = handle_main_menu_client(conn_main, addr_main) # Handle main menu client and get file path
conn_main, addr_main = s_main.accept() # Accept cognitive test connection
handle_cognitive_test(conn_main, file_path) # Handle cognitive test client with file path
print("finished calling handle_cognitive_test")
I'm out of ideas for how to get it to show more than 1 or 2 images.