need help in parameterizing the dataflow pipeline

Hey everyone,

I'm currently working on scheduling a Dataflow pipeline, and I'm facing a challenge in parameterizing the source file path based on the latest received file in our Google Cloud Storage (GCS) bucket.

Every day, we receive a new source file with the naming convention "SALES_YYYY-MM-DD". Since the date suffix changes daily according to the current date, I need to dynamically select the appropriate file for processing in the pipeline.

I've tried to set up parameters as shown in the screenshot, but I keep encountering an "object not found" error.

Could anyone provide guidance on the correct approach for this situation? I'd greatly appreciate any help or insights you can offer!

Thanks in advance,
Murali

Screenshot 2024-05-10 012325.png

3 1 60
1 REPLY 1

The "Object not found" error likely arises because the wildcard (%Y-%m-%d) used directly within the Cloud Storage input file path doesn't automatically select the latest file. While effective for pattern matching, it requires an additional mechanism to dynamically identify and retrieve the most recent file.

The Solution: Orchestration and Dynamic Parameterization

Step 1: Cloud Function Trigger

  • Set up a Google Cloud Function to trigger on new file events in your GCS bucket (e.g., a Finalize/Create event). Ensure this also accounts for any subsequent file modifications if your process involves changes to files post-creation.

Step 2: Find the Latest File

  • Within the Cloud Function, implement logic to:
    • List objects in your GCS bucket with a prefix or pattern (e.g., 'SALES_YYYY-MM-DD.csv').
    • Filter and sort this list by the time_created property in descending order.
    • Select the most recent file from this sorted list.

Step 3: Dynamic Parameter in Dataflow Template

  • Modify your Dataflow template to accept an input_file parameter, which will specify the path to the file to be processed.

Step 4: Launch Dataflow from Cloud Function

  • Utilize the Google Cloud Dataflow API within the Cloud Function to launch your Dataflow job, passing the path of the latest file as the input_file parameter.

Implementation Notes

  • Cloud Function Runtime: Choose a runtime (e.g., Python) that is compatible with GCS interactions.
  • GCS Client Libraries: Use these libraries to facilitate interactions with your storage bucket.
  • Dataflow API: Employ the projects.locations.templates.launch method from the Dataflow API to initiate your pipeline with the necessary parameters.

Example (Python Cloud Function):

 
from google.cloud import storage
from google.cloud import dataflow_v1beta3
import datetime

def launch_dataflow(event, context):
    storage_client = storage.Client()
    bucket = storage_client.bucket('your_bucket_name')
    blobs = list(bucket.list_blobs(prefix='SALES_'))

    if blobs:  # Check if any blobs match the pattern
        # Adjust for time zone (if needed)
        latest_blob = max(blobs, key=lambda blob: blob.time_created)
        dataflow_client = dataflow_v1beta3.DataflowServiceClient()
        # Launch Dataflow job with latest_blob.name as input_file
        # ... (rest of the Dataflow launch code)
    else:
        print("No matching files found.")