0
def check_worker_logs(event_uuid, dataflow_project, dataflow_job, timeframe_mins=30):
    # Start time of the worker log
    start_time = (datetime.utcnow() - timedelta(minutes=timeframe_mins)).isoformat() + "Z"
    print(start_time)

    # Test connection by listing log entries
    try:
        client = logging.Client()
    except Exception as e:
        print(f"Failed to connect to Google Cloud Logging: {e}")

    # Define the log filter for all logs in the given timeframe
    worker_log_filter = (
        f'resource.labels.project_id="{dataflow_project}" AND '
        f'resource.labels.job_name="{dataflow_job}" AND '
        f'timestamp >= "{start_time}" AND '
        f'resource.type="dataflow_step" AND '
        f'log_id("dataflow.googleapis.com/worker")'
    )

    print(worker_log_filter)

    # Fetch logs for the success log
    worker_logs = client.list_entries(filter_=worker_log_filter)
    if not any(worker_logs):
        print("No worker logs found.")

I’m trying to fetch Dataflow worker logs using the Python google-cloud-logging library.

  • I’m successfully connected to the correct GCP project.
  • The log filter I’m using is correct — when I paste the same filter into the Logs Explorer, it returns the expected worker logs.
  • However, when I use the exact same filter in my Python code, no logs are returned.
  • I also updated the google-cloud-logging package to the latest version, but it still doesn’t work.

Here’s what I’ve observed:

  • If I remove certain parts of the filter (job_name, resource.type, and log_id), then the code returns logs of my GCP project — but only audit logs with type type.googleapis.com/google.cloud.audit.AuditLog.
  • So it seems like the connection and basic logging setup are working, but filtering for Dataflow worker logs via the API is failing.

Has anyone encountered this issue before? Is there any subtle difference between how the Logs Explorer and the logging API interpret filters?

Any help would be appreciated.

2
  • Your question is not a minimal repro making it more difficult for us to help. What (a) identity and (b) Project is being used by the code? Please demonstrate (perhaps using gcloud logging read) that the same identity (or similarly permitted identity) generates logs when used with gcloud logging read "${FILTER}" --project=${PROJECT} but does not when using the Python code. Commented Jul 10 at 22:22
  • AND is implicit in Cloud Logging queries and may be omitted. Commented Jul 10 at 22:25

1 Answer 1

0
from datetime import datetime, timedelta, timezone
from google.cloud import logging
from google.api_core import exceptions

def check_worker_logs(dataflow_project: str, dataflow_job: str, timeframe_mins: int = 30) -> bool:
    """
    Checks if any Dataflow worker logs exist for a given job within a timeframe.

    Args:
        dataflow_project: The Google Cloud project ID.
        dataflow_job: The name of the Dataflow job.
        timeframe_mins: The timeframe in minutes to look back for logs.

    Returns:
        True if worker logs are found, False otherwise.
    """
    try:
        # Use timezone-aware datetime object for clarity
        start_time = datetime.now(timezone.utc) - timedelta(minutes=timeframe_mins)
        
        # Format for the API query
        start_time_str = start_time.isoformat()

        client = logging.Client(project=dataflow_project)

        # Define the log filter
        worker_log_filter = (
            f'resource.type="dataflow_step" AND '
            f'resource.labels.job_name="{dataflow_job}" AND '
            f'logName="projects/{dataflow_project}/logs/dataflow.googleapis.com%2Fworker" AND '
            f'timestamp >= "{start_time_str}"'
        )
        
        print(f"Querying for logs with filter: {worker_log_filter}")

        # list_entries returns an iterator. Get the first page of results.
        log_entries = client.list_entries(filter_=worker_log_filter, page_size=1)
        
        # Check if the iterator contains any items
        if any(log_entries):
            print("Success: Worker logs found.")
            return True
        else:
            print("No worker logs found for the specified job and timeframe.")
            return False

    except exceptions.GoogleAPICallError as e:
        print(f"An API error occurred: {e}")
        return False
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return False

This works for me with logName.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.