0

I'm getting this error when I try to use my front-end to upload a file/files in my AWS SAM Local.
Is there a known issue in AWS SAM LOCAL?

2025-08-20 11:11:24,064 | UnicodeDecodeError while processing HTTP request: 'utf-8' codec can't decode byte 0xc4 in position 682: invalid continuation byte 2025-08-20 11:11:24,074 | Lambda execution failed ()

I'M PARSING MY DATA THROUGH THIS FUNCTION. When I do a Postman call, the api can successfully insert the file upload

I have tried using requests-toolbelt for parsing,

def parse_multipart(event):
    """
    Parse multipart/form-data requests from API Gateway / Lambda.
    Returns:
      - form_data: dict of text fields
      - files_data: list of dicts with 'filename' and 'content' (bytes)
    """
    form_data = {}
    files_data = []

    headers = {k.lower(): v for k, v in (event.get("headers") or {}).items()}
    content_type = headers.get("content-type", "")
    if not content_type.startswith("multipart/form-data"):
        return form_data, files_data

    # Decode body (handle base64 if API Gateway sends binary)
    body_bytes = base64.b64decode(event['body']) if event.get('isBase64Encoded') else event['body'].encode('utf-8')
    multipart_data = decoder.MultipartDecoder(body_bytes, content_type)

    for part in multipart_data.parts:
        disposition = part.headers.get(b"Content-Disposition", b"")
        if b"filename=" in disposition:
            # File field
            try:
                filename = disposition.split(b'filename=')[1].strip(b'"').decode("utf-8")
            except UnicodeDecodeError:
                filename = disposition.split(b'filename=')[1].strip(b'"').decode("utf-8", "replace")
            files_data.append({
                "filename": filename,
                "content": part.content
            })
        elif b"name=" in disposition:
            # Text field
            try:
                name = disposition.split(b'name=')[1].strip(b'"').decode("utf-8")
            except UnicodeDecodeError:
                name = disposition.split(b'name=')[1].strip(b'"').decode("utf-8", "replace")
            try:
                value = part.content.decode("utf-8")
            except UnicodeDecodeError:
                value = part.content.decode("utf-8", "replace")
            form_data[name] = value

    return form_data, files_data

This is my create function

try:
        req_headers = event.get("headers", {})
        content_type = req_headers.get("Content-Type") or req_headers.get("content-type", "")

        body = {}
        files_data = [] # 🆕 This will now be a list of files
        body_from_event = event.get("body", "")

        # Check for an empty request body early to prevent parsing errors
        if not body_from_event:
            return format_response(400, message="Bad Request", errors={"body": "Request body is empty."})

        if content_type.startswith("multipart/form-data"):
            form_data_parts, files_data = parse_multipart(event) # 🆕 Now returns a list
            try:
                json_payload_str = form_data_parts.get("body", "{}")
                body = json.loads(json_payload_str)
            except json.JSONDecodeError as e:
                return format_response(400, message="Validation Error", errors={"body": f"Failed to parse JSON body from multipart form: {str(e)}"})
        elif content_type.startswith("application/json"):
            try:
                body = json.loads(body_from_event)
            except json.JSONDecodeError as e:
                return format_response(400, message="Bad Request", errors={"body": "Failed to parse JSON from request body: " + str(e)})
        else:
            return format_response(400, message="Unsupported Content-Type")

        # Get user email from the API Gateway event's authorizer context
        user_email = event['requestContext']['authorizer']['principalId']
        if not user_email:
            return format_response(401, message="Missing email in token payload")

        # Validate required fields
        required_fields = [
            "company_name", "tin", "invoice_number",
            "transaction_date", "items", "payee",
            "payee_account", "approver"
        ]
        missing_fields = [f for f in required_fields if f not in body]
        if missing_fields:
            return format_response(400, message="Validation Error", errors={"missing_fields": missing_fields})

        # --- LOGIC FOR GENERATING REFERENCE_ID ---
        now = datetime.utcnow()
        current_year = now.year
        current_month = now.month
        prefix = f"{current_month:02d}{current_year}"

        response = INVOICE_TABLE.scan(
            ProjectionExpression="reference_id",
        )
        items = response.get("Items", [])

        latest_number = 0
        for item in items:
            ref_id = item.get("reference_id")
            if ref_id and ref_id.startswith(prefix):
                try:
                    number_part = int(ref_id.split("-")[1])
                    if number_part > latest_number:
                        latest_number = number_part
                except (IndexError, ValueError):
                    pass

        new_number = latest_number + 1
        new_ref_id = f"{prefix}-{new_number:03d}"

        body["reference_id"] = new_ref_id
        # --- END OF LOGIC ---

        # 🆕 Handle file uploads with new naming convention
        file_urls = []
        if files_data:
            for i, file_part in enumerate(files_data):
                file_obj = BytesIO(file_part["content"])
                file_extension = file_part["filename"].split(".")[-1]
                file_key = f"invoices/{new_ref_id}-{i+1}.{file_extension}"
                S3.upload_fileobj(file_obj, BUCKET_NAME, file_key)
                file_urls.append(f"/{BUCKET_NAME}/{file_key}")
        
        # 🆕 Store the list of URLs in the body
        body["file_urls"] = file_urls or ["no-file-uploaded"]

        encoder = get_employee(user_email)
        if not encoder:
            return format_response(403, message=f"Employee record not found for the logged-in user: {user_email}")

        payee_email = body.get("payee")
        approver_email = body.get("approver")
        approver = get_employee(approver_email)

        if not approver:
            return format_response(400, message="Validation Error", errors={"approver": f"Approver {approver_email} not found"})

        approver_roles = list(approver.get("access_role", set()))

        if "approver" not in approver_roles:
            return format_response(400, message="Validation Error", errors={"approver": f"Selected approver is not marked as an approver. Roles found: {approver_roles}"})

        items_raw = body.get("items")
        if not items_raw:
             return format_response(400, message="Validation Error", errors={"items": "Items field is missing or empty."})

        if isinstance(items_raw, str):
            try:
                items = json.loads(items_raw)
            except json.JSONDecodeError as e:
                return format_response(400, message="Validation Error", errors={"items": f"Failed to parse items JSON: {str(e)}"})
        else:
            items = items_raw

        if not isinstance(items, list):
            return format_response(400, message="Validation Error", errors={"items": "Items must be a list"})

        for idx, item in enumerate(items):
            missing_item_fields = [f for f in ["particulars", "project_class", "account", "vatable", "amount"] if f not in item]
            if missing_item_fields:
                return format_response(400, message="Validation Error", errors={f"item_{idx}": f"Missing fields: {missing_item_fields}"})

        invoice_data = {
            "reference_id": new_ref_id,
            "company_name": body["company_name"],
            "tin": body["tin"],
            "invoice_number": body["invoice_number"],
            "transaction_date": body["transaction_date"],
            "items": items,
            "encoder": encoder.get("email"),
            "payee": payee_email,
            "payee_account": body["payee_account"],
            "approver": approver.get("email"),
            "file_urls": body["file_urls"], # 🆕 Use the list of URLs
            "encoding_date": datetime.utcnow().isoformat(),
            "status": "Pending",
            "remarks": body.get("remarks", "")
        }

        INVOICE_TABLE.put_item(Item=invoice_data)
        return format_response(201, message="Invoice created successfully", data=invoice_data)

    except Exception as e:
        return format_response(500, message="Internal Server Error", errors={"exception": str(e)})

sample payload from the web browser

this is what I'm sending from the front-end

------WebKitFormBoundaryoUmuXNO54JuBC1vu Content-Disposition: form-data; name="body"
{"company_name":"Black Cloud Corporation","tin":"123-456-789-001","invoice_number":"INV-2024-001","transaction_date":"2025-08-20","items":[{"particulars":"asd","project_class":"sada","account":"","vatable":true,"amount":1222}],"encoder":"[email protected]","payee":"[email protected]","payee_account":"1234567890","approver":"[email protected]","remarks":"sadfa"} ------WebKitFormBoundaryoUmuXNO54JuBC1vu Content-Disposition: form-data; name="file_urls"; filename="Dead_1.pdf" Content-Type: application/pdf
------WebKitFormBoundaryoUmuXNO54JuBC1vu--

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.