I'm getting this error when I try to use my front-end to upload a file/files in my AWS SAM Local.
Is there a known issue in AWS SAM LOCAL?
2025-08-20 11:11:24,064 | UnicodeDecodeError while processing HTTP request: 'utf-8' codec can't decode byte 0xc4 in position 682: invalid continuation byte 2025-08-20 11:11:24,074 | Lambda execution failed ()
I'M PARSING MY DATA THROUGH THIS FUNCTION. When I do a Postman call, the api can successfully insert the file upload
I have tried using requests-toolbelt for parsing,
def parse_multipart(event):
"""
Parse multipart/form-data requests from API Gateway / Lambda.
Returns:
- form_data: dict of text fields
- files_data: list of dicts with 'filename' and 'content' (bytes)
"""
form_data = {}
files_data = []
headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
content_type = headers.get("content-type", "")
if not content_type.startswith("multipart/form-data"):
return form_data, files_data
# Decode body (handle base64 if API Gateway sends binary)
body_bytes = base64.b64decode(event['body']) if event.get('isBase64Encoded') else event['body'].encode('utf-8')
multipart_data = decoder.MultipartDecoder(body_bytes, content_type)
for part in multipart_data.parts:
disposition = part.headers.get(b"Content-Disposition", b"")
if b"filename=" in disposition:
# File field
try:
filename = disposition.split(b'filename=')[1].strip(b'"').decode("utf-8")
except UnicodeDecodeError:
filename = disposition.split(b'filename=')[1].strip(b'"').decode("utf-8", "replace")
files_data.append({
"filename": filename,
"content": part.content
})
elif b"name=" in disposition:
# Text field
try:
name = disposition.split(b'name=')[1].strip(b'"').decode("utf-8")
except UnicodeDecodeError:
name = disposition.split(b'name=')[1].strip(b'"').decode("utf-8", "replace")
try:
value = part.content.decode("utf-8")
except UnicodeDecodeError:
value = part.content.decode("utf-8", "replace")
form_data[name] = value
return form_data, files_data
This is my create function
try:
req_headers = event.get("headers", {})
content_type = req_headers.get("Content-Type") or req_headers.get("content-type", "")
body = {}
files_data = [] # 🆕 This will now be a list of files
body_from_event = event.get("body", "")
# Check for an empty request body early to prevent parsing errors
if not body_from_event:
return format_response(400, message="Bad Request", errors={"body": "Request body is empty."})
if content_type.startswith("multipart/form-data"):
form_data_parts, files_data = parse_multipart(event) # 🆕 Now returns a list
try:
json_payload_str = form_data_parts.get("body", "{}")
body = json.loads(json_payload_str)
except json.JSONDecodeError as e:
return format_response(400, message="Validation Error", errors={"body": f"Failed to parse JSON body from multipart form: {str(e)}"})
elif content_type.startswith("application/json"):
try:
body = json.loads(body_from_event)
except json.JSONDecodeError as e:
return format_response(400, message="Bad Request", errors={"body": "Failed to parse JSON from request body: " + str(e)})
else:
return format_response(400, message="Unsupported Content-Type")
# Get user email from the API Gateway event's authorizer context
user_email = event['requestContext']['authorizer']['principalId']
if not user_email:
return format_response(401, message="Missing email in token payload")
# Validate required fields
required_fields = [
"company_name", "tin", "invoice_number",
"transaction_date", "items", "payee",
"payee_account", "approver"
]
missing_fields = [f for f in required_fields if f not in body]
if missing_fields:
return format_response(400, message="Validation Error", errors={"missing_fields": missing_fields})
# --- LOGIC FOR GENERATING REFERENCE_ID ---
now = datetime.utcnow()
current_year = now.year
current_month = now.month
prefix = f"{current_month:02d}{current_year}"
response = INVOICE_TABLE.scan(
ProjectionExpression="reference_id",
)
items = response.get("Items", [])
latest_number = 0
for item in items:
ref_id = item.get("reference_id")
if ref_id and ref_id.startswith(prefix):
try:
number_part = int(ref_id.split("-")[1])
if number_part > latest_number:
latest_number = number_part
except (IndexError, ValueError):
pass
new_number = latest_number + 1
new_ref_id = f"{prefix}-{new_number:03d}"
body["reference_id"] = new_ref_id
# --- END OF LOGIC ---
# 🆕 Handle file uploads with new naming convention
file_urls = []
if files_data:
for i, file_part in enumerate(files_data):
file_obj = BytesIO(file_part["content"])
file_extension = file_part["filename"].split(".")[-1]
file_key = f"invoices/{new_ref_id}-{i+1}.{file_extension}"
S3.upload_fileobj(file_obj, BUCKET_NAME, file_key)
file_urls.append(f"/{BUCKET_NAME}/{file_key}")
# 🆕 Store the list of URLs in the body
body["file_urls"] = file_urls or ["no-file-uploaded"]
encoder = get_employee(user_email)
if not encoder:
return format_response(403, message=f"Employee record not found for the logged-in user: {user_email}")
payee_email = body.get("payee")
approver_email = body.get("approver")
approver = get_employee(approver_email)
if not approver:
return format_response(400, message="Validation Error", errors={"approver": f"Approver {approver_email} not found"})
approver_roles = list(approver.get("access_role", set()))
if "approver" not in approver_roles:
return format_response(400, message="Validation Error", errors={"approver": f"Selected approver is not marked as an approver. Roles found: {approver_roles}"})
items_raw = body.get("items")
if not items_raw:
return format_response(400, message="Validation Error", errors={"items": "Items field is missing or empty."})
if isinstance(items_raw, str):
try:
items = json.loads(items_raw)
except json.JSONDecodeError as e:
return format_response(400, message="Validation Error", errors={"items": f"Failed to parse items JSON: {str(e)}"})
else:
items = items_raw
if not isinstance(items, list):
return format_response(400, message="Validation Error", errors={"items": "Items must be a list"})
for idx, item in enumerate(items):
missing_item_fields = [f for f in ["particulars", "project_class", "account", "vatable", "amount"] if f not in item]
if missing_item_fields:
return format_response(400, message="Validation Error", errors={f"item_{idx}": f"Missing fields: {missing_item_fields}"})
invoice_data = {
"reference_id": new_ref_id,
"company_name": body["company_name"],
"tin": body["tin"],
"invoice_number": body["invoice_number"],
"transaction_date": body["transaction_date"],
"items": items,
"encoder": encoder.get("email"),
"payee": payee_email,
"payee_account": body["payee_account"],
"approver": approver.get("email"),
"file_urls": body["file_urls"], # 🆕 Use the list of URLs
"encoding_date": datetime.utcnow().isoformat(),
"status": "Pending",
"remarks": body.get("remarks", "")
}
INVOICE_TABLE.put_item(Item=invoice_data)
return format_response(201, message="Invoice created successfully", data=invoice_data)
except Exception as e:
return format_response(500, message="Internal Server Error", errors={"exception": str(e)})
sample payload from the web browser
this is what I'm sending from the front-end
------WebKitFormBoundaryoUmuXNO54JuBC1vu Content-Disposition: form-data; name="body"
{"company_name":"Black Cloud Corporation","tin":"123-456-789-001","invoice_number":"INV-2024-001","transaction_date":"2025-08-20","items":[{"particulars":"asd","project_class":"sada","account":"","vatable":true,"amount":1222}],"encoder":"[email protected]","payee":"[email protected]","payee_account":"1234567890","approver":"[email protected]","remarks":"sadfa"} ------WebKitFormBoundaryoUmuXNO54JuBC1vu Content-Disposition: form-data; name="file_urls"; filename="Dead_1.pdf" Content-Type: application/pdf
------WebKitFormBoundaryoUmuXNO54JuBC1vu--