I have this multithreading script, which operates on a data set. Each thread gets a chunk of the data set and then each thread iterates over the data frame and calls and api (MS Graph Create). What I have seen is that, my script tends to get stuck at almost finish time. I am running this on a linux Ubuntu server. 8vCpus. But this happens only when the total dataset size is in millions. (takes around 9-10 hrs for 2 million records)
I am writing a script (long running) for the first time. Would like to get an opinion if I am doing things correctly.
Please :
- I would like to know if my code is the reason why my script hangs.
- Have I done multithreading correctly ? Have I created and waited for threads to end correctly ?
UPDATE Using answers, below, still the threads seems to get stuck at the end.
import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)
#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
# logic for random password gen
# the most important funtion
#
def createAccounts(splitData, threadID):
batchProgress = 0
batch_size = splitData.shape[0]
for row in splitData.itertuples():
try:
headers = {"Content-Type": "application/json", "Authorization":"Bearer "+access_token}
randomLength = [8,9,12,13,16]
passwordLength = random.choice(randomLength)
password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
batchProgress+=1
post_request_body = {
"accountEnabled": True,
"displayName": row[5],
"givenName": row[3],
"surname": row[4],
"mobilePhone": row[1],
"mail": row[2],
"passwordProfile" : {
"password": password,
"forceChangePasswordNextSignIn": False
},
"state":"",
"identities": [
{
"signInType": "emailAddress",
"issuer": tenantName,
"issuerAssignedId": row[2]
}
]
}
# if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
if(len(row[4])):
post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]})
responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
status = responseFromApi.status_code
if(status == 201): #success
id = responseFromApi.json().get("id")
print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
elif(status == 429): #throttling issues
print(f" Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
time.sleep(150)
elif(status == 401): #token expiry
print(f" Thread {threadID} | Token Expired. Getting it back !")
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
getRefreshToken()
else: #any other error
msg = ""
try:
msg = responseFromApi.json().get("error").get("message")
except Exception as e:
msg = f"Error {e}"
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
logger.error(errorDict)
except Exception as e:
# check for refresh token errors
errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
logger.error(errorDict)
msg = " Error "
print(f" {status} | {batchProgress} / {batch_size} | {msg} {row[2]}")
print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")
batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
# get file name and appid from command line arguments
storageFileName = sys.argv[1]
appId = sys.argv[2]
# setup credentials
bigFilePath = f"./{storageFileName}"
CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
B2C_Tenant_Name = "tenantName"
tenantName = B2C_Tenant_Name + ".onmicrosoft.com"
applicationID = appId
accessSecret = "" # will be taken from command line in future revisions
token_api_body = {
"grant_type": "client_credentials",
"scope": "https://graph.microsoft.com/.default",
"client_Id" : applicationID,
"client_secret": accessSecret
}
# Get initial access token from MS
print("Connecting to MS Graph API")
token_api = "https://login.microsoftonline.com/"+tenantName+"/oauth2/v2.0/token"
response = {}
try:
responseFromApi = requests.post(token_api, data=token_api_body)
responseJson = responseFromApi.json()
print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
except Exception as e:
print("ERROR | Token auth failed ")
# if we get the token proceed else abort
if(responseFromApi.status_code == 200):
migrationData = pd.read_csv(bigFilePath)
print(" We got the data from Storage !", migrationData.shape[0])
global access_token
access_token = responseJson.get('access_token')
graph_api_create = "https://graph.microsoft.com/v1.0/users"
dataSetSize = migrationData.shape[0]
partitions = 50 # No of partitions # will be taken from command line in future revisions
size = int(dataSetSize/partitions) # No of rows per file
remainder = dataSetSize%partitions
print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} | Remainder: {remainder} | Start...... \n")
##### ------- Dataset partioning.
datasets = []
range_val = partitions + 1 if remainder !=0 else partitions
for partition in range(range_val):
if(partition == partitions):
df = migrationData[size*partition:dataSetSize]
else:
df = migrationData[size*partition:size*(partition+1)]
datasets.append(df)
number_of_threads = len(datasets)
start_time = time.time()
spawned_threads = []
######## ---- Threads are spawned ! here --------
for i in range(number_of_threads): # spawn threads
t = threading.Thread(target=createAccounts, args=(datasets[i], i))
t.start()
spawned_threads.append(t)
number_spawned = len(spawned_threads)
print(f"Started {number_spawned} threads !")
###### - Threads are killed here ! ---------
for thread in spawned_threads: # let the script wait for thread execution
thread.join()
print(f"Done! It took {time.time() - start_time}s to execute") # time check
#### ------ Retry Mechanism -----
print("RETRYING....... !")
os.system(f'python3 retry.py pylogs.log {appId}')
else:
print(f"Token Missing ! API response {responseJson}")```
multiprocessingorconcurrent.futuresthan rolling your own threadpool and split stuff.