1

I have this multithreading script, which operates on a data set. Each thread gets a chunk of the data set and then each thread iterates over the data frame and calls and api (MS Graph Create). What I have seen is that, my script tends to get stuck at almost finish time. I am running this on a linux Ubuntu server. 8vCpus. But this happens only when the total dataset size is in millions. (takes around 9-10 hrs for 2 million records)

I am writing a script (long running) for the first time. Would like to get an opinion if I am doing things correctly.

Please :

  1. I would like to know if my code is the reason why my script hangs.
  2. Have I done multithreading correctly ? Have I created and waited for threads to end correctly ?

UPDATE Using answers, below, still the threads seems to get stuck at the end.

import pandas as pd
import sys
import os
import logging
import string
import secrets
import random
##### ----- Logging Setup -------
logging.basicConfig(filename="pylogs.log", format='%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p')
# Creating an object
logger = logging.getLogger()
# Setting the threshold of logger to DEBUG
logger.setLevel(logging.ERROR)

#####------ Function Definitions -------
# generates random password
def generateRandomPassword(lengthOfPassword):
    # logic for random password gen

# the most important funtion
#

def createAccounts(splitData, threadID):
            batchProgress = 0
            batch_size = splitData.shape[0]
            for  row in splitData.itertuples():
                try:
                    headers = {"Content-Type": "application/json", "Authorization":"Bearer "+access_token}  
                    randomLength = [8,9,12,13,16] 
                    passwordLength = random.choice(randomLength) 
                    password = generateRandomPassword(passwordLength) # will be generated randomly - for debugging purpose
                    batchProgress+=1
                    post_request_body = {
                                "accountEnabled": True,
                                "displayName": row[5],
                                "givenName": row[3],
                                "surname": row[4],
                                "mobilePhone": row[1],
                                "mail": row[2],
                                "passwordProfile" : {
                                    "password": password,
                                    "forceChangePasswordNextSignIn": False
                                },
                                "state":"",
                                "identities": [
                                    {
                                        "signInType": "emailAddress",
                                        "issuer": tenantName,
                                        "issuerAssignedId": row[2]
                                    }
                                ]
                            }
                    # if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty       
                    if(len(row[4])):
                        post_request_body["identities"].append({"signInType": "phoneNumber","issuer": tenantName,"issuerAssignedId": row[1]}) 
                    responseFromApi = requests.post(graph_api_create, headers=headers, json=post_request_body)
                    status = responseFromApi.status_code
                    if(status == 201): #success
                        id =  responseFromApi.json().get("id")
                        print(f" {status} | {batchProgress} / {batch_size} | Success {id}")
                        errorDict =  f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}{row[11]}{row[12]}{row[13]}^Success'
                    elif(status == 429): #throttling issues
                        print(f"  Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
                        time.sleep(150)  
                    elif(status == 401): #token expiry
                        print(f"  Thread {threadID} | Token Expired. Getting it back !")
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Token Expired'
                        getRefreshToken()      
                    else:   #any other error
                        msg = ""
                        try:
                            msg = responseFromApi.json().get("error").get("message")
                        except Exception as e:
                            msg = f"Error {e}"
                        errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^{msg}'
                        print(f" {status} | {batchProgress} / {batch_size} | {msg}  {row[2]}")
                    logger.error(errorDict)    
                except Exception as e:
                    # check for refresh token errors
                    errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Exception_{e}'
                    logger.error(errorDict)
                    msg = " Error "
                    print(f" {status} | {batchProgress} / {batch_size} | {msg}  {row[2]}") 
            print(f"Thread {threadID} completed ! {batchProgress} / {batch_size}")         
            batchProgress = 0
###### ------ Main Script ------
if __name__ == "__main__":
    # get file name and appid from command line arguments
    storageFileName = sys.argv[1]
    appId = sys.argv[2]
    # setup credentials
    bigFilePath = f"./{storageFileName}"
    CreatUserUrl = "https://graph.microsoft.com/v1.0/users"
    B2C_Tenant_Name = "tenantName"
    tenantName =  B2C_Tenant_Name + ".onmicrosoft.com"
    applicationID = appId
    accessSecret = "" # will be taken from command line in future revisions
    token_api_body = {    
        "grant_type": "client_credentials",
        "scope": "https://graph.microsoft.com/.default",
        "client_Id" : applicationID,
        "client_secret": accessSecret
    }
    # Get initial access token from MS
    print("Connecting to MS Graph API")
    token_api = "https://login.microsoftonline.com/"+tenantName+"/oauth2/v2.0/token"
    response = {}
    try:
        responseFromApi = requests.post(token_api, data=token_api_body)
        responseJson = responseFromApi.json()
        print(f"Token API Success ! Expires in {responseJson.get('expires_in')} seconds")
    except Exception as e:
        print("ERROR | Token auth failed ")
    # if we get the token proceed else abort
    if(responseFromApi.status_code == 200):
        migrationData = pd.read_csv(bigFilePath)
        print(" We got the data from Storage !", migrationData.shape[0])

        global access_token
        access_token = responseJson.get('access_token')
        graph_api_create = "https://graph.microsoft.com/v1.0/users"
        dataSetSize = migrationData.shape[0]

        partitions = 50 # No of partitions # will be taken from command line in future revisions
        size = int(dataSetSize/partitions) # No of rows per file
        remainder = dataSetSize%partitions
        print(f"Data Set Size : {dataSetSize} | Per file size = {size} | Total Files = {partitions} |  Remainder: {remainder} | Start...... \n")
        ##### ------- Dataset partioning. 
        datasets = []
        range_val = partitions + 1 if remainder !=0 else partitions
        for partition in range(range_val):
            if(partition == partitions):
                df = migrationData[size*partition:dataSetSize]
            else:
                df = migrationData[size*partition:size*(partition+1)]
            datasets.append(df)
        number_of_threads = len(datasets)
        start_time = time.time()    
        spawned_threads = []
######## ---- Threads are spawned ! here --------
        for i in range(number_of_threads): # spawn threads
            t = threading.Thread(target=createAccounts, args=(datasets[i], i))
            t.start()
            spawned_threads.append(t)
        number_spawned = len(spawned_threads)  
        print(f"Started {number_spawned} threads !")   
       ###### - Threads are killed here ! ---------  
        for thread in spawned_threads: # let the script wait for thread execution
            thread.join()
        print(f"Done! It took {time.time() - start_time}s to execute")  # time check
    #### ------ Retry Mechanism -----
        print("RETRYING....... !")
        os.system(f'python3 retry.py pylogs.log {appId}')
    else:
        print(f"Token Missing ! API response {responseJson}")```



1
  • You'd be better off using multiprocessing or concurrent.futures than rolling your own threadpool and split stuff. Commented Aug 29, 2022 at 6:51

2 Answers 2

1

Here's a refactoring of your code to use the standard library multiprocessing.ThreadPool for simplicity.

Naturally I couldn't have tested it since I don't have your data, but the basic idea should work. I removed the logging and retry stuff, since I really couldn't understand why you'd need it (but feel free to add it back); this will attempt to retry each row if the problem appears to be transient.

import random
import sys
import time
from multiprocessing.pool import ThreadPool

import pandas as pd
import requests

sess = requests.Session()

# globals filled in by `main`
tenantName = None
access_token = None



def submit_user_create(row):
    headers = {"Content-Type": "application/json", "Authorization": "Bearer " + access_token}
    randomLength = [8, 9, 12, 13, 16]
    passwordLength = random.choice(randomLength)
    password = generateRandomPassword(passwordLength)  # will be generated randomly - for debugging purpose
    post_request_body = {
        "accountEnabled": True,
        "displayName": row[5],
        "givenName": row[3],
        "surname": row[4],
        "mobilePhone": row[1],
        "mail": row[2],
        "passwordProfile": {"password": password, "forceChangePasswordNextSignIn": False},
        "state": "",
        "identities": [{"signInType": "emailAddress", "issuer": tenantName, "issuerAssignedId": row[2]}],
    }
    # if phone number exists then only add - since phone number needs to have length between 1 and 64, cannot leave empty
    if len(row[4]):
        post_request_body["identities"].append({"signInType": "phoneNumber", "issuer": tenantName, "issuerAssignedId": row[1]})
    return sess.post("https://graph.microsoft.com/v1.0/users", headers=headers, json=post_request_body)


def get_access_token(tenantName, applicationID, accessSecret):
    token_api_body = {
        "grant_type": "client_credentials",
        "scope": "https://graph.microsoft.com/.default",
        "client_Id": applicationID,
        "client_secret": accessSecret,
    }
    token_api = f"https://login.microsoftonline.com/{tenantName}/oauth2/v2.0/token"
    resp = sess.post(token_api, data=token_api_body)
    if resp.status_code != 200:
        raise RuntimeError(f"Token Missing ! API response {resp.content}")
    json = resp.json()
    print(f"Token API Success ! Expires in {json.get('expires_in')} seconds")
    return json["access_token"]


def process_row(row):
    while True:
        response = submit_user_create(row)
        status = response.status_code

        if status == 201:  # success
            id = response.json().get("id")
            print(f"Success {id}")
            return True

        if status == 429:  # throttling issues
            print(f"Throttled by server ! Sleeping for 150 seconds")
            time.sleep(150)
            continue

        if status == 401:  # token expiry?
            print(f"Token Expired. Getting it back !")
            getRefreshToken()  # TODO
            continue

        try:
            msg = response.json().get("error").get("message")
        except Exception as e:
            msg = f"Error {e}"
        print(f" {status} | {msg}  {row[2]}")
        return False


def main():
    global tenantName, access_token
    # get file name and appid from command line arguments
    bigFilePath = sys.argv[1]
    appId = sys.argv[2]
    # setup credentials
    B2C_Tenant_Name = "tenantName"
    tenantName = f"{B2C_Tenant_Name}.onmicrosoft.com"
    accessSecret = ""  # will be taken from command line in future revisions
    access_token = get_access_token(tenantName, appId, accessSecret)
    migrationData = pd.read_csv(bigFilePath)
    start_time = time.time()
    with ThreadPool(10) as pool:
        for i, result in enumerate(pool.imap_unordered(process_row, migrationData.itertuples()), 1):
            progress = i / len(migrationData) * 100
            print(f"{i} / {len(migrationData)} | {progress:.2f}% | {time.time() - start_time:.2f} seconds")

    print(f"Done! It took {time.time() - start_time}s to execute")


if __name__ == "__main__":
    main()
Sign up to request clarification or add additional context in comments.

11 Comments

Hi @AKX, thank you for this.. seems very clean. I just want to ask, what happens when you return false in exception ? does that thread die ?
No, the thread doesn't die. The workers are only killed after the pool is closed.
requests.exceptions.ConnectionError: ('Connection aborted.', OSError("(104, 'ECONNRESET')")) - for 300 threads Getting this error.. So for 2-3 million records, what number of threads do you suggest ?
Far, far less. You do realize you're trying to make 300 concurrent connections to the Graph API from the same machine with that? Little wonder there'll be connection errors. The quota for writing users seems to be 3000 requests per 150 seconds anyway.
Oh, by the way, you should be able to eke out some more performance out of this by sharing a single requests.Session(); using requests.post() will have Requests reconnect and negotiate TLS, etc. for each request.
|
0

un-fair use of MS Graph

Due to possible throttling by the server, the usage of the MS Graph resource might be un-fair between threads. I use fair in the resource starvation sense.

elif(status == 429): #throttling issues
    print(f"  Thread {threadID} | Throttled by server ! Sleeping for 150 seconds")
    errorDict = f'{row[1]}^{row[2]}^{row[3]}^{row[4]}^{row[5]}^{row[6]}^{row[7]}^{row[8]}^{row[9]}^{row[10]}{row[11]}{row[12]}{row[13]}^Throttled'
    time.sleep(150)

One thread making a million calls can get a disproportionate amount of 429 responses each followed by a penalty of 150 seconds. This sleep doesn't stop the other threads from making calls though and achieving forward progress.

This would result in one thread lagging far behind the others and giving the appearance of being stuck.

17 Comments

Hi @SargeATM, This sleep stops this thread from making the api calls, but since throttling is on an Application level in MS Graph, other threads will also receive a 429 and all threads stop. (those threads who make a call in the 150s time window).
@AbhayChandramouli do logs confirm that assumption?
Do you suggest that if I remove the sleep, I would not get this issue ?
Yes Sarge, logs do confirm.
Otherwise, a script rewrite is necessary with a proper worker pool and queues with threads requesting work and getting 100 posts at one time and then requesting more work when that is done. When I do this type of setup proper, its about 6-7 hours of boilerplate setup including application, thread, queue, and work management with good logging and clean init, bootup, start, stop, restart, shutdown and terminate worker lifecycle management.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.