Hey everyone,
I'm currently working on scheduling a Dataflow pipeline, and I'm facing a challenge in parameterizing the source file path based on the latest received file in our Google Cloud Storage (GCS) bucket.
Every day, we receive a new source file with the naming convention "SALES_YYYY-MM-DD". Since the date suffix changes daily according to the current date, I need to dynamically select the appropriate file for processing in the pipeline.
I've tried to set up parameters as shown in the screenshot, but I keep encountering an "object not found" error.
Could anyone provide guidance on the correct approach for this situation? I'd greatly appreciate any help or insights you can offer!
Thanks in advance,
Murali
The "Object not found" error likely arises because the wildcard (%Y-%m-%d) used directly within the Cloud Storage input file path doesn't automatically select the latest file. While effective for pattern matching, it requires an additional mechanism to dynamically identify and retrieve the most recent file.
The Solution: Orchestration and Dynamic Parameterization
Step 1: Cloud Function Trigger
Step 2: Find the Latest File
time_created
property in descending order.Step 3: Dynamic Parameter in Dataflow Template
input_file
parameter, which will specify the path to the file to be processed.Step 4: Launch Dataflow from Cloud Function
input_file
parameter.Implementation Notes
projects.locations.templates.launch
method from the Dataflow API to initiate your pipeline with the necessary parameters.Example (Python Cloud Function):
from google.cloud import storage
from google.cloud import dataflow_v1beta3
import datetime
def launch_dataflow(event, context):
storage_client = storage.Client()
bucket = storage_client.bucket('your_bucket_name')
blobs = list(bucket.list_blobs(prefix='SALES_'))
if blobs: # Check if any blobs match the pattern
# Adjust for time zone (if needed)
latest_blob = max(blobs, key=lambda blob: blob.time_created)
dataflow_client = dataflow_v1beta3.DataflowServiceClient()
# Launch Dataflow job with latest_blob.name as input_file
# ... (rest of the Dataflow launch code)
else:
print("No matching files found.")
Additional Considerations:
User | Count |
---|---|
1 | |
1 | |
1 | |
1 | |
1 |