prefect-gcp helps you leverage the capabilities of Google Cloud Platform (GCP) in your workflows.
For example, you can run flow on Vertex AI or Cloud Run, read and write data to BigQuery and Cloud Storage, retrieve secrets with Secret Manager.
Use the Python code below, replace the placeholders with your information.
fromprefect_gcpimportGcpCredentials# replace this PLACEHOLDER dict with your own service account infoservice_account_info={"type":"service_account","project_id":"PROJECT_ID","private_key_id":"KEY_ID","private_key":"-----BEGIN PRIVATE KEY-----\nPRIVATE_KEY\n-----END PRIVATE KEY-----\n","client_email":"SERVICE_ACCOUNT_EMAIL","client_id":"CLIENT_ID","auth_uri":"https://accounts.google.com/o/oauth2/auth","token_uri":"https://accounts.google.com/o/oauth2/token","auth_provider_x509_cert_url":"https://www.googleapis.com/oauth2/v1/certs","client_x509_cert_url":"https://www.googleapis.com/robot/v1/metadata/x509/SERVICE_ACCOUNT_EMAIL"}GcpCredentials(service_account_info=service_account_info).save("BLOCK-NAME-PLACEHOLDER")
service_account_info vs service_account_file
The advantage of using service_account_info, instead of service_account_file, is that it is accessible across containers.
If service_account_file is used, the provided file path must be available in the container executing the flow.
Alternatively, GCP can authenticate without storing credentials in a block.
See the Third-pary Screts Guide for an analogous example that uses AWS Secrets Manager and Snowflake.
See the Google Cloud Run Worker Guide for a walkthrough of using Google Cloud Run to run workflows with a hybrid work pool.
If you're using Prefect Cloud, Google Cloud Run push work pools provide all the benefits of Google Cloud Run along with a quick setup and no worker needed.
Read data from and write to Google BigQuery within your Prefect flows.
Be sure to installprefect-gcp with the BigQuery extra.
This code creates a new dataset in BigQuery, define a table, insert rows, and fetch data from the table:
fromprefectimportflowfromprefect_gcp.bigqueryimportGcpCredentials,BigQueryWarehouse@flowdefbigquery_flow():all_rows=[]gcp_credentials=GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")client=gcp_credentials.get_bigquery_client()client.create_dataset("test_example",exists_ok=True)withBigQueryWarehouse(gcp_credentials=gcp_credentials)aswarehouse:warehouse.execute("CREATE TABLE IF NOT EXISTS test_example.customers (name STRING, address STRING);")warehouse.execute_many("INSERT INTO test_example.customers (name, address) VALUES (%(name)s, %(address)s);",seq_of_parameters=[{"name":"Marvin","address":"Highway 42"},{"name":"Ford","address":"Highway 42"},{"name":"Unknown","address":"Highway 42"},],)whileTrue:# Repeated fetch* calls using the same operation will# skip re-executing and instead return the next set of resultsnew_rows=warehouse.fetch_many("SELECT * FROM test_example.customers",size=2)iflen(new_rows)==0:breakall_rows.extend(new_rows)returnall_rowsif__name__=="__main__":bigquery_flow()
Be sure to installprefect-gcp with the Cloud Storage extra.
The code below uses prefect_gcp to upload a file to a Google Cloud Storage bucket and download the same file under a different file name.
frompathlibimportPathfromprefectimportflowfromprefect_gcpimportGcpCredentials,GcsBucket@flowdefcloud_storage_flow():# create a dummy file to uploadfile_path=Path("test-example.txt")file_path.write_text("Hello, Prefect!")gcp_credentials=GcpCredentials.load("BLOCK-NAME-PLACEHOLDER")gcs_bucket=GcsBucket(bucket="BUCKET-NAME-PLACEHOLDER",gcp_credentials=gcp_credentials)gcs_bucket_path=gcs_bucket.upload_from_path(file_path)downloaded_file_path=gcs_bucket.download_object_to_path(gcs_bucket_path,"downloaded-test-example.txt")returndownloaded_file_path.read_text()if__name__=="__main__":cloud_storage_flow()
Upload and download directories
GcsBucket supports uploading and downloading entire directories. To view examples, check out the Examples Catalog!
Access Google credentials or clients from GcpCredentials¶
You can instantiate a Google Cloud client, such as bigquery.Client.
Note that a GcpCredentials object is NOT a valid input to the underlying BigQuery client - use the get_credentials_from_service_account method to access and pass a google.auth.Credentials object.