I am working on a project that involves simultaneously controlling multiple Android devices from a Python script. During testing, I’m encountering challenges in implementing effective multithreading with Android Studio emulators, which I’m utilizing initially before connecting to real devices. I’ve explored solutions like Appium and Selenium server, but finding clear information and straightforward workflows matching my needs is proving difficult.
I attempted using ADB and Appium, but could only control one phone at a time. My experiments with Appium and Selenium server have been challenging, and I’m struggling to find clear information or simple workflows for this task.
This situation stop my progress, and I welcome any suggestions or advice that could simplify the process of simultaneously controlling Android emulators via Python. Each emulator should not perform the same tasks; they should follow a procedural path outlined by the Python script. I’m open to change completely the workflow that I tried to implement.
Below is my attempt to retrieve connected devices using ADB and an initial try to launch Appium servers for each device:
# Function to get connected devices
def get_connected_devices():
devices_configs = []
try:
output = subprocess.check_output([adb_path, 'devices']).decode('utf-8')
devices = output.strip().split('\n')[1:]
for device in devices:
ud_id = device.split('\t')[0]
os_version = (
subprocess.check_output(
[
adb_path,
'-s',
ud_id,
'shell',
'getprop',
'ro.build.version.release',
]
)
.decode('utf-8')
.strip()
)
platform = "Android"
system_port = str(8200 + len(devices_configs))
chrome_driver_port = str(8100 + len(devices_configs))
device_details = {
"device": ud_id,
"os_version": os_version,
"ud_id": ud_id,
"platform": platform,
"systemPort": system_port,
}
devices_configs.append(device_details)
return devices_configs
except subprocess.CalledProcessError as e:
print(f"Error executing ADB command: {e}")
except Exception as e:
print(f"An unexpected error occurred: {e}")
# Retrieve connected devices
connected_devices = get_connected_devices()
# Start Appium servers for each device
processes = []
for i in range(len(connected_devices)):
port = 4724 + i
cmd = ["start", "appium", "-p", str(port)]
process = subprocess.Popen(cmd, shell=True)
processes.append(process)
Despite this, I encountered issues with forcing Appium to use specific device ports, as it kept using the same port for all devices:
# Pair each connected device with a server
device_server_pairs = list(zip(connected_devices, servers))
# Start Appium sessions using threads
with concurrent.futures.ThreadPoolExecutor() as executor:
executor.map(start_appium_session, device_server_pairs)
# Function to start Appium session for each device
def start_appium_session(device_config_server_tuple):
device_config, server = device_config_server_tuple
capabilities = {
'platformName': device_config['platform'],
'platformVersion': device_config['os_version'],
'deviceName': device_config['ud_id'],
'systemPort': device_config['systemPort'],
'app': APP_PATH,
}
driver = webdriver.Remote(server, options=UiAutomator2Options().load_capabilities(capabilities))
I appreciate any insights or improvements you could provide.