This module is deprecated as of March 2024 and will not be available after September 2024.
It has been replaced by the ECS worker, which offers enhanced functionality and better performance.
For upgrade instructions, see https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.
Integrations with the Amazon Elastic Container Service.
Examples:
RunataskusingECSFargate```pythonECSTask(command=["echo","hello world"]).run()```RunataskusingECSFargatewithaspotcontainerinstance```pythonECSTask(command=["echo","hello world"],launch_type="FARGATE_SPOT").run()```RunataskusingECSwithanEC2containerinstance```pythonECSTask(command=["echo","hello world"],launch_type="EC2").run()```RunataskonaspecificVPCusingECSFargate```pythonECSTask(command=["echo","hello world"],vpc_id="vpc-01abcdf123456789a").run()```Runataskandstreamthecontainer's output to the local terminal. Note anexecutionrolemustbeprovidedwithpermissions:logs:CreateLogStream,logs:CreateLogGroup,andlogs:PutLogEvents.```pythonECSTask(command=["echo","hello world"],stream_output=True,execution_role_arn="...")```Runataskusinganexistingtaskdefinitionasabase```pythonECSTask(command=["echo","hello world"],task_definition_arn="arn:aws:ecs:...")```Runataskwithaspecificimage```pythonECSTask(command=["echo","hello world"],image="alpine:latest")```RunataskwithcustommemoryandCPUrequirements```pythonECSTask(command=["echo","hello world"],memory=4096,cpu=2048)```Runataskwithcustomenvironmentvariables```pythonECSTask(command=["echo","hello $PLANET"],env={"PLANET":"earth"})```RunataskinaspecificECScluster```pythonECSTask(command=["echo","hello world"],cluster="my-cluster-name")```RunataskwithcustomVPCsubnets```pythonECSTask(command=["echo","hello world"],task_customizations=[{"op":"add","path":"/networkConfiguration/awsvpcConfiguration/subnets","value":["subnet-80b6fbcd","subnet-42a6fdgd"],},])```RunataskwithoutapublicIPassigned```pythonECSTask(command=["echo","hello world"],vpc_id="vpc-01abcdf123456789a",task_customizations=[{"op":"replace","path":"/networkConfiguration/awsvpcConfiguration/assignPublicIp","value":"DISABLED",},])```RunataskwithcustomVPCsecuritygroups```pythonECSTask(command=["echo","hello world"],vpc_id="vpc-01abcdf123456789a",task_customizations=[{"op":"add","path":"/networkConfiguration/awsvpcConfiguration/securityGroups","value":["sg-d72e9599956a084f5"],},],)```
The AWS credentials to use to connect to ECS with a
default factory of AwsCredentials.
task_definition_arn
Optional[str]
An optional identifier for an existing task definition
to use. If fields are set on the ECSTask that conflict with the task
definition, a new copy will be registered with the required values.
Cannot be used with task_definition. If not provided, Prefect will
generate and register a minimal task definition.
An optional ECS task definition to use. Prefect may set
defaults or override fields on this task definition to match other
ECSTask fields. Cannot be used with task_definition_arn.
If not provided, Prefect will generate and register
a minimal task definition.
family
Optional[str]
An optional family for the task definition. If not provided,
it will be inferred from the task definition. If the task definition
does not have a family, the name will be generated. When flow and
deployment metadata is available, the generated name will include
their names. Values for this field will be slugified to match
AWS character requirements.
image
Optional[str]
An optional image to use for the Prefect container in the task.
If this value is not null, it will override the value in the task
definition. This value defaults to a Prefect base image matching
your local versions.
auto_deregister_task_definition
bool
A boolean that controls if any task
definitions that are created by this block will be deregistered
or not. Existing task definitions linked by ARN will never be
deregistered. Deregistering a task definition does not remove
it from your AWS account, instead it will be marked as INACTIVE.
cpu
int
The amount of CPU to provide to the ECS task. Valid amounts are
specified in the AWS documentation. If not provided, a default
value of ECS_DEFAULT_CPU will be used unless present on
the task definition.
memory
int
The amount of memory to provide to the ECS task.
Valid amounts are specified in the AWS documentation.
If not provided, a default value of ECS_DEFAULT_MEMORY
will be used unless present on the task definition.
execution_role_arn
str
An execution role to use for the task.
This controls the permissions of the task when it is launching.
If this value is not null, it will override the value in the task
definition. An execution role must be provided to capture logs
from the container.
configure_cloudwatch_logs
bool
A boolean that controls if the Prefect
container will be configured to send its output to the
AWS CloudWatch logs service or not. This functionality requires
an execution role with permissions to create log streams and groups.
cloudwatch_logs_options
Dict[str, str]
A dictionary of options to pass to
the CloudWatch logs configuration.
stream_output
bool
A boolean indicating whether logs will be
streamed from the Prefect container to the local console.
@deprecated_class(start_date="Mar 2024",help=("Use the ECS worker instead."" Refer to the upgrade guide for more information:"" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/."),)classECSTask(Infrastructure):""" Run a command as an ECS task. Attributes: type: The slug for this task type with a default value of "ecs-task". aws_credentials: The AWS credentials to use to connect to ECS with a default factory of AwsCredentials. task_definition_arn: An optional identifier for an existing task definition to use. If fields are set on the ECSTask that conflict with the task definition, a new copy will be registered with the required values. Cannot be used with task_definition. If not provided, Prefect will generate and register a minimal task definition. task_definition: An optional ECS task definition to use. Prefect may set defaults or override fields on this task definition to match other ECSTask fields. Cannot be used with task_definition_arn. If not provided, Prefect will generate and register a minimal task definition. family: An optional family for the task definition. If not provided, it will be inferred from the task definition. If the task definition does not have a family, the name will be generated. When flow and deployment metadata is available, the generated name will include their names. Values for this field will be slugified to match AWS character requirements. image: An optional image to use for the Prefect container in the task. If this value is not null, it will override the value in the task definition. This value defaults to a Prefect base image matching your local versions. auto_deregister_task_definition: A boolean that controls if any task definitions that are created by this block will be deregistered or not. Existing task definitions linked by ARN will never be deregistered. Deregistering a task definition does not remove it from your AWS account, instead it will be marked as INACTIVE. cpu: The amount of CPU to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of ECS_DEFAULT_CPU will be used unless present on the task definition. memory: The amount of memory to provide to the ECS task. Valid amounts are specified in the AWS documentation. If not provided, a default value of ECS_DEFAULT_MEMORY will be used unless present on the task definition. execution_role_arn: An execution role to use for the task. This controls the permissions of the task when it is launching. If this value is not null, it will override the value in the task definition. An execution role must be provided to capture logs from the container. configure_cloudwatch_logs: A boolean that controls if the Prefect container will be configured to send its output to the AWS CloudWatch logs service or not. This functionality requires an execution role with permissions to create log streams and groups. cloudwatch_logs_options: A dictionary of options to pass to the CloudWatch logs configuration. stream_output: A boolean indicating whether logs will be streamed from the Prefect container to the local console. launch_type: An optional launch type for the ECS task run infrastructure. vpc_id: An optional VPC ID to link the task run to. This is only applicable when using the 'awsvpc' network mode for your task. cluster: An optional ECS cluster to run the task in. The ARN or name may be provided. If not provided, the default cluster will be used. env: A dictionary of environment variables to provide to the task run. These variables are set on the Prefect container at task runtime. task_role_arn: An optional role to attach to the task run. This controls the permissions of the task while it is running. task_customizations: A list of JSON 6902 patches to apply to the task run request. If a string is given, it will parsed as a JSON expression. task_start_timeout_seconds: The amount of time to watch for the start of the ECS task before marking it as failed. The task must enter a RUNNING state to be considered started. task_watch_poll_interval: The amount of time to wait between AWS API calls while monitoring the state of an ECS task. """_block_type_slug="ecs-task"_block_type_name="ECS Task"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png"# noqa_description="Run a command as an ECS task."# noqa_documentation_url=("https://prefecthq.github.io/prefect-aws/ecs/#prefect_aws.ecs.ECSTask"# noqa)type:Literal["ecs-task"]=Field("ecs-task",description="The slug for this task type.")aws_credentials:AwsCredentials=Field(title="AWS Credentials",default_factory=AwsCredentials,description="The AWS credentials to use to connect to ECS.",)# Task definition settingstask_definition_arn:Optional[str]=Field(default=None,description=("An identifier for an existing task definition to use. If fields are set ""on the `ECSTask` that conflict with the task definition, a new copy ""will be registered with the required values. ""Cannot be used with `task_definition`. If not provided, Prefect will ""generate and register a minimal task definition."),)task_definition:Optional[dict]=Field(default=None,description=("An ECS task definition to use. Prefect may set defaults or override ""fields on this task definition to match other `ECSTask` fields. ""Cannot be used with `task_definition_arn`. If not provided, Prefect will ""generate and register a minimal task definition."),)family:Optional[str]=Field(default=None,description=("A family for the task definition. If not provided, it will be inferred ""from the task definition. If the task definition does not have a family, ""the name will be generated. When flow and deployment metadata is ""available, the generated name will include their names. Values for this ""field will be slugified to match AWS character requirements."),)image:Optional[str]=Field(default=None,description=("The image to use for the Prefect container in the task. If this value is ""not null, it will override the value in the task definition. This value ""defaults to a Prefect base image matching your local versions."),)auto_deregister_task_definition:bool=Field(default=True,description=("If set, any task definitions that are created by this block will be ""deregistered. Existing task definitions linked by ARN will never be ""deregistered. Deregistering a task definition does not remove it from ""your AWS account, instead it will be marked as INACTIVE."),)# Mixed task definition / run settingscpu:int=Field(title="CPU",default=None,description=("The amount of CPU to provide to the ECS task. Valid amounts are ""specified in the AWS documentation. If not provided, a default value of "f"{ECS_DEFAULT_CPU} will be used unless present on the task definition."),)memory:int=Field(default=None,description=("The amount of memory to provide to the ECS task. Valid amounts are ""specified in the AWS documentation. If not provided, a default value of "f"{ECS_DEFAULT_MEMORY} will be used unless present on the task definition."),)execution_role_arn:str=Field(title="Execution Role ARN",default=None,description=("An execution role to use for the task. This controls the permissions of ""the task when it is launching. If this value is not null, it will ""override the value in the task definition. An execution role must be ""provided to capture logs from the container."),)configure_cloudwatch_logs:bool=Field(default=None,description=("If `True`, the Prefect container will be configured to send its output ""to the AWS CloudWatch logs service. This functionality requires an ""execution role with logs:CreateLogStream, logs:CreateLogGroup, and ""logs:PutLogEvents permissions. The default for this field is `False` ""unless `stream_output` is set."),)cloudwatch_logs_options:Dict[str,str]=Field(default_factory=dict,description=("When `configure_cloudwatch_logs` is enabled, this setting may be used to ""pass additional options to the CloudWatch logs configuration or override ""the default options. See the AWS documentation for available options. ""https://docs.aws.amazon.com/AmazonECS/latest/developerguide/using_awslogs.html#create_awslogs_logdriver_options."# noqa),)stream_output:bool=Field(default=None,description=("If `True`, logs will be streamed from the Prefect container to the local ""console. Unless you have configured AWS CloudWatch logs manually on your ""task definition, this requires the same prerequisites outlined in ""`configure_cloudwatch_logs`."),)# Task run settingslaunch_type:Optional[Literal["FARGATE","EC2","EXTERNAL","FARGATE_SPOT"]]=Field(default="FARGATE",description=("The type of ECS task run infrastructure that should be used. Note that"" 'FARGATE_SPOT' is not a formal ECS launch type, but we will configure"" the proper capacity provider strategy if set here."),)vpc_id:Optional[str]=Field(title="VPC ID",default=None,description=("The AWS VPC to link the task run to. This is only applicable when using ""the 'awsvpc' network mode for your task. FARGATE tasks require this ""network mode, but for EC2 tasks the default network mode is 'bridge'. ""If using the 'awsvpc' network mode and this field is null, your default ""VPC will be used. If no default VPC can be found, the task run will fail."),)cluster:Optional[str]=Field(default=None,description=("The ECS cluster to run the task in. The ARN or name may be provided. If ""not provided, the default cluster will be used."),)env:Dict[str,Optional[str]]=Field(title="Environment Variables",default_factory=dict,description=("Environment variables to provide to the task run. These variables are set ""on the Prefect container at task runtime. These will not be set on the ""task definition."),)task_role_arn:str=Field(title="Task Role ARN",default=None,description=("A role to attach to the task run. This controls the permissions of the ""task while it is running."),)task_customizations:JsonPatch=Field(default_factory=lambda:JsonPatch([]),description=("A list of JSON 6902 patches to apply to the task run request. ""If a string is given, it will parsed as a JSON expression."),)# Execution settingstask_start_timeout_seconds:int=Field(default=120,description=("The amount of time to watch for the start of the ECS task ""before marking it as failed. The task must enter a RUNNING state to be ""considered started."),)task_watch_poll_interval:float=Field(default=5.0,description=("The amount of time to wait between AWS API calls while monitoring the ""state of an ECS task."),)@root_validator(pre=True)defset_default_configure_cloudwatch_logs(cls,values:dict)->dict:""" Streaming output generally requires CloudWatch logs to be configured. To avoid entangled arguments in the simple case, `configure_cloudwatch_logs` defaults to matching the value of `stream_output`. """configure_cloudwatch_logs=values.get("configure_cloudwatch_logs")ifconfigure_cloudwatch_logsisNone:values["configure_cloudwatch_logs"]=values.get("stream_output")returnvalues@root_validatordefconfigure_cloudwatch_logs_requires_execution_role_arn(cls,values:dict)->dict:""" Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging. """if(values.get("configure_cloudwatch_logs")andnotvalues.get("execution_role_arn")# Do not raise if they've linked to another task definition or provided# it without using our shortcutsandnotvalues.get("task_definition_arn")andnot(values.get("task_definition")or{}).get("executionRoleArn")):raiseValueError("An `execution_role_arn` must be provided to use ""`configure_cloudwatch_logs` or `stream_logs`.")returnvalues@root_validatordefcloudwatch_logs_options_requires_configure_cloudwatch_logs(cls,values:dict)->dict:""" Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging. """ifvalues.get("cloudwatch_logs_options")andnotvalues.get("configure_cloudwatch_logs"):raiseValueError("`configure_cloudwatch_log` must be enabled to use ""`cloudwatch_logs_options`.")returnvalues@root_validator(pre=True)defimage_is_required(cls,values:dict)->dict:""" Enforces that an image is available if image is `None`. """has_image=bool(values.get("image"))has_task_definition_arn=bool(values.get("task_definition_arn"))# The image can only be null when the task_definition_arn is setifhas_imageorhas_task_definition_arn:returnvaluesprefect_container=(get_prefect_container((values.get("task_definition")or{}).get("containerDefinitions",[]))or{})image_in_task_definition=prefect_container.get("image")# If a task_definition is given with a prefect container image, use that valueifimage_in_task_definition:values["image"]=image_in_task_definition# Otherwise, it should default to the Prefect base imageelse:values["image"]=get_prefect_image_name()returnvalues@validator("task_customizations",pre=True)defcast_customizations_to_a_json_patch(cls,value:Union[List[Dict],JsonPatch,str])->JsonPatch:""" Casts lists to JsonPatch instances. """ifisinstance(value,str):value=json.loads(value)ifisinstance(value,list):returnJsonPatch(value)returnvalue# type: ignoreclassConfig:"""Configuration of pydantic."""# Support serialization of the 'JsonPatch' typearbitrary_types_allowed=Truejson_encoders={JsonPatch:lambdap:p.patch}defdict(self,*args,**kwargs)->Dict:""" Convert to a dictionary. """# Support serialization of the 'JsonPatch' typed=super().dict(*args,**kwargs)d["task_customizations"]=self.task_customizations.patchreturnddefprepare_for_flow_run(self:Self,flow_run:"FlowRun",deployment:Optional["Deployment"]=None,flow:Optional["Flow"]=None,)->Self:""" Return an copy of the block that is prepared to execute a flow run. """new_family=None# Update the family if not specified elsewhereif(notself.familyandnotself.task_definition_arnandnot(self.task_definitionandself.task_definition.get("family"))):ifflowanddeployment:new_family=f"{ECS_DEFAULT_FAMILY}__{flow.name}__{deployment.name}"elifflowandnotdeployment:new_family=f"{ECS_DEFAULT_FAMILY}__{flow.name}"elifdeploymentandnotflow:# This is a weird case and should not be see in the wildnew_family=f"{ECS_DEFAULT_FAMILY}__unknown-flow__{deployment.name}"new=super().prepare_for_flow_run(flow_run,deployment=deployment,flow=flow)ifnew_family:returnnew.copy(update={"family":new_family})else:# Avoid an extra copy if not neededreturnnew@sync_compatibleasyncdefrun(self,task_status:Optional[TaskStatus]=None)->ECSTaskResult:""" Run the configured task on ECS. """boto_session,ecs_client=awaitrun_sync_in_worker_thread(self._get_session_and_client)(task_arn,cluster_arn,task_definition,is_new_task_definition,)=awaitrun_sync_in_worker_thread(self._create_task_and_wait_for_start,boto_session,ecs_client)# Display a nice message indicating the command and imagecommand=self.commandorget_prefect_container(task_definition["containerDefinitions"]).get("command",[])self.logger.info(f"{self._log_prefix}: Running command {' '.join(command)!r} "f"in container {PREFECT_ECS_CONTAINER_NAME!r} ({self.image})...")# The task identifier is "{cluster}::{task}" where we use the configured cluster# if set to preserve matching by name rather than arn# Note "::" is used despite the Prefect standard being ":" because ARNs contain# single colons.identifier=(self.clusterifself.clusterelsecluster_arn)+"::"+task_arniftask_status:task_status.started(identifier)status_code=awaitrun_sync_in_worker_thread(self._watch_task_and_get_exit_code,task_arn,cluster_arn,task_definition,is_new_task_definitionandself.auto_deregister_task_definition,boto_session,ecs_client,)returnECSTaskResult(identifier=identifier,# If the container does not start the exit code can be null but we must# still report a status code. We use a -1 to indicate a special code.status_code=status_codeifstatus_codeisnotNoneelse-1,)@sync_compatibleasyncdefkill(self,identifier:str,grace_seconds:int=30)->None:""" Kill a task running on ECS. Args: identifier: A cluster and task arn combination. This should match a value yielded by `ECSTask.run`. """ifgrace_seconds!=30:self.logger.warning(f"Kill grace period of {grace_seconds}s requested, but AWS does not ""support dynamic grace period configuration so 30s will be used. ""See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-agent-config.html for configuration of grace periods."# noqa)cluster,task=parse_task_identifier(identifier)awaitrun_sync_in_worker_thread(self._stop_task,cluster,task)@staticmethoddefget_corresponding_worker_type()->str:"""Return the corresponding worker type for this infrastructure block."""returnECSWorker.typeasyncdefgenerate_work_pool_base_job_template(self)->dict:""" Generate a base job template for a cloud-run work pool with the same configuration as this block. Returns: - dict: a base job template for a cloud-run work pool """base_job_template=copy.deepcopy(ECSWorker.get_default_base_job_template())forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous","task_customizations",]:continueelifkey=="aws_credentials":ifnotself.aws_credentials._block_document_id:raiseBlockNotSavedError("It looks like you are trying to use a block that"" has not been saved. Please call `.save` on your block"" before publishing it as a work pool.")base_job_template["variables"]["properties"]["aws_credentials"]["default"]={"$ref":{"block_document_id":str(self.aws_credentials._block_document_id)}}elifkey=="task_definition":base_job_template["job_configuration"]["task_definition"]=valueelifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by Cloud Run work pools."" Skipping.")ifself.task_customizations:network_config_patches=JsonPatch([patchforpatchinself.task_customizationsif"networkConfiguration"inpatch["path"]])minimal_network_config=assemble_document_for_patches(network_config_patches)ifminimal_network_config:minimal_network_config_with_patches=network_config_patches.apply(minimal_network_config)base_job_template["variables"]["properties"]["network_configuration"]["default"]=minimal_network_config_with_patches["networkConfiguration"]["awsvpcConfiguration"]try:base_job_template["job_configuration"]["task_run_request"]=self.task_customizations.apply(base_job_template["job_configuration"]["task_run_request"])exceptJsonPointerException:self.logger.warning("Unable to apply task customizations to the base job template.""You may need to update the template manually.")returnbase_job_templatedef_stop_task(self,cluster:str,task:str)->None:""" Stop a running ECS task. """ifself.clusterisnotNoneandcluster!=self.cluster:raiseInfrastructureNotAvailable("Cannot stop ECS task: this infrastructure block has access to "f"cluster {self.cluster!r} but the task is running in cluster "f"{cluster!r}.")_,ecs_client=self._get_session_and_client()try:ecs_client.stop_task(cluster=cluster,task=task)exceptExceptionasexc:# Raise a special exception if the task does not existif"ClusterNotFound"instr(exc):raiseInfrastructureNotFound(f"Cannot stop ECS task: the cluster {cluster!r} could not be found.")fromexcif"not find task"instr(exc)or"referenced task was not found"instr(exc):raiseInfrastructureNotFound(f"Cannot stop ECS task: the task {task!r} could not be found in "f"cluster {cluster!r}.")fromexcif"no registered tasks"instr(exc):raiseInfrastructureNotFound(f"Cannot stop ECS task: the cluster {cluster!r} has no tasks.")fromexc# Reraise unknown exceptionsraise@propertydef_log_prefix(self)->str:""" Internal property for generating a prefix for logs where `name` may be null """ifself.nameisnotNone:returnf"ECSTask {self.name!r}"else:return"ECSTask"def_get_session_and_client(self)->Tuple[boto3.Session,_ECSClient]:""" Retrieve a boto3 session and ECS client """boto_session=self.aws_credentials.get_boto3_session()ecs_client=boto_session.client("ecs")returnboto_session,ecs_clientdef_create_task_and_wait_for_start(self,boto_session:boto3.Session,ecs_client:_ECSClient)->Tuple[str,str,dict,bool]:""" Register the task definition, create the task run, and wait for it to start. Returns a tuple of - The task ARN - The task's cluster ARN - The task definition - A bool indicating if the task definition is newly registered """new_task_definition_registered=Falserequested_task_definition=(self._retrieve_task_definition(ecs_client,self.task_definition_arn)ifself.task_definition_arnelseself.task_definition)or{}task_definition_arn=requested_task_definition.get("taskDefinitionArn",None)task_definition=self._prepare_task_definition(requested_task_definition,region=ecs_client.meta.region_name)# We must register the task definition if the arn is null or changes were madeiftask_definition!=requested_task_definitionornottask_definition_arn:# Before registering, check if the latest task definition in the family# can be usedlatest_task_definition=self._retrieve_latest_task_definition(ecs_client,task_definition["family"])ifself._task_definitions_equal(latest_task_definition,task_definition):self.logger.debug(f"{self._log_prefix}: The latest task definition matches the ""required task definition; using that instead of registering a new "" one.")task_definition_arn=latest_task_definition["taskDefinitionArn"]else:iftask_definition_arn:self.logger.warning(f"{self._log_prefix}: Settings require changes to the linked ""task definition. A new task definition will be registered. "+("Enable DEBUG level logs to see the difference."ifself.logger.level>logging.DEBUGelse""))self.logger.debug(f"{self._log_prefix}: Diff for requested task definition"+_pretty_diff(requested_task_definition,task_definition))else:self.logger.info(f"{self._log_prefix}: Registering task definition...")self.logger.debug("Task definition payload\n"+yaml.dump(task_definition))task_definition_arn=self._register_task_definition(ecs_client,task_definition)new_task_definition_registered=Trueiftask_definition.get("networkMode")=="awsvpc":network_config=self._load_vpc_network_config(self.vpc_id,boto_session)else:network_config=Nonetask_run=self._prepare_task_run(network_config=network_config,task_definition_arn=task_definition_arn,)self.logger.info(f"{self._log_prefix}: Creating task run...")self.logger.debug("Task run payload\n"+yaml.dump(task_run))try:task=self._run_task(ecs_client,task_run)task_arn=task["taskArn"]cluster_arn=task["clusterArn"]exceptExceptionasexc:self._report_task_run_creation_failure(task_run,exc)# Raises an exception if the task does not startself.logger.info(f"{self._log_prefix}: Waiting for task run to start...")self._wait_for_task_start(task_arn,cluster_arn,ecs_client,timeout=self.task_start_timeout_seconds)returntask_arn,cluster_arn,task_definition,new_task_definition_registereddef_watch_task_and_get_exit_code(self,task_arn:str,cluster_arn:str,task_definition:dict,deregister_task_definition:bool,boto_session:boto3.Session,ecs_client:_ECSClient,)->Optional[int]:""" Wait for the task run to complete and retrieve the exit code of the Prefect container. """# Wait for completion and stream logstask=self._wait_for_task_finish(task_arn,cluster_arn,task_definition,ecs_client,boto_session)ifderegister_task_definition:ecs_client.deregister_task_definition(taskDefinition=task["taskDefinitionArn"])# Check the status code of the Prefect containerprefect_container=get_prefect_container(task["containers"])assert(prefect_containerisnotNone),f"'prefect' container missing from task: {task}"status_code=prefect_container.get("exitCode")self._report_container_status_code(PREFECT_ECS_CONTAINER_NAME,status_code)returnstatus_codedef_task_definitions_equal(self,taskdef_1,taskdef_2)->bool:""" Compare two task definitions. Since one may come from the AWS API and have populated defaults, we do our best to homogenize the definitions without changing their meaning. """iftaskdef_1==taskdef_2:returnTrueiftaskdef_1isNoneortaskdef_2isNone:returnFalsetaskdef_1=copy.deepcopy(taskdef_1)taskdef_2=copy.deepcopy(taskdef_2)def_set_aws_defaults(taskdef):"""Set defaults that AWS would set after registration"""container_definitions=taskdef.get("containerDefinitions",[])essential=any(container.get("essential")forcontainerincontainer_definitions)ifnotessential:container_definitions[0].setdefault("essential",True)taskdef.setdefault("networkMode","bridge")_set_aws_defaults(taskdef_1)_set_aws_defaults(taskdef_2)def_drop_empty_keys(dict_):"""Recursively drop keys with 'empty' values"""forkey,valueintuple(dict_.items()):ifnotvalue:dict_.pop(key)ifisinstance(value,dict):_drop_empty_keys(value)ifisinstance(value,list):forvinvalue:ifisinstance(v,dict):_drop_empty_keys(v)_drop_empty_keys(taskdef_1)_drop_empty_keys(taskdef_2)# Clear fields that change on registration for comparisonforfieldinPOST_REGISTRATION_FIELDS:taskdef_1.pop(field,None)taskdef_2.pop(field,None)returntaskdef_1==taskdef_2defpreview(self)->str:""" Generate a preview of the task definition and task run that will be sent to AWS. """preview=""task_definition_arn=self.task_definition_arnor"<registered at runtime>"ifself.task_definitionornotself.task_definition_arn:task_definition=self._prepare_task_definition(self.task_definitionor{},region=self.aws_credentials.region_nameor"<loaded from client at runtime>",)preview+="---\n# Task definition\n"preview+=yaml.dump(task_definition)preview+="\n"else:task_definition=Noneiftask_definitionandtask_definition.get("networkMode")=="awsvpc":vpc="the default VPC"ifnotself.vpc_idelseself.vpc_idnetwork_config={"awsvpcConfiguration":{"subnets":f"<loaded from {vpc} at runtime>","assignPublicIp":"ENABLED",}}else:network_config=Nonetask_run=self._prepare_task_run(network_config,task_definition_arn)preview+="---\n# Task run request\n"preview+=yaml.dump(task_run)returnpreviewdef_report_container_status_code(self,name:str,status_code:Optional[int])->None:""" Display a log for the given container status code. """ifstatus_codeisNone:self.logger.error(f"{self._log_prefix}: Task exited without reporting an exit status "f"for container {name!r}.")elifstatus_code==0:self.logger.info(f"{self._log_prefix}: Container {name!r} exited successfully.")else:self.logger.warning(f"{self._log_prefix}: Container {name!r} exited with non-zero exit "f"code {status_code}.")def_report_task_run_creation_failure(self,task_run:dict,exc:Exception)->None:""" Wrap common AWS task run creation failures with nicer user-facing messages. """# AWS generates exception types at runtime so they must be captured a bit# differently than normal.if"ClusterNotFoundException"instr(exc):cluster=task_run.get("cluster","default")raiseRuntimeError(f"Failed to run ECS task, cluster {cluster!r} not found. ""Confirm that the cluster is configured in your region.")fromexcelif"No Container Instances"instr(exc)andself.launch_type=="EC2":cluster=task_run.get("cluster","default")raiseRuntimeError(f"Failed to run ECS task, cluster {cluster!r} does not appear to ""have any container instances associated with it. Confirm that you ""have EC2 container instances available.")fromexcelif("failed to validate logger args"instr(exc)and"AccessDeniedException"instr(exc)andself.configure_cloudwatch_logs):raiseRuntimeError("Failed to run ECS task, the attached execution role does not appear ""to have sufficient permissions. Ensure that the execution role "f"{self.execution_role!r} has permissions logs:CreateLogStream, ""logs:CreateLogGroup, and logs:PutLogEvents.")else:raisedef_watch_task_run(self,task_arn:str,cluster_arn:str,ecs_client:_ECSClient,current_status:str="UNKNOWN",until_status:str=None,timeout:int=None,)->Generator[None,None,dict]:""" Watches an ECS task run by querying every `poll_interval` seconds. After each query, the retrieved task is yielded. This function returns when the task run reaches a STOPPED status or the provided `until_status`. Emits a log each time the status changes. """last_status=status=current_statust0=time.time()whilestatus!=until_status:tasks=ecs_client.describe_tasks(tasks=[task_arn],cluster=cluster_arn,include=["TAGS"])["tasks"]iftasks:task=tasks[0]status=task["lastStatus"]ifstatus!=last_status:self.logger.info(f"{self._log_prefix}: Status is {status}.")yieldtask# No point in continuing if the status is finalifstatus=="STOPPED":breaklast_status=statuselse:# Intermittently, the task will not be described. We wat to respect the# watch timeout though.self.logger.debug(f"{self._log_prefix}: Task not found.")elapsed_time=time.time()-t0iftimeoutisnotNoneandelapsed_time>timeout:raiseRuntimeError(f"Timed out after {elapsed_time}s while watching task for status "f"{until_statusor'STOPPED'}")time.sleep(self.task_watch_poll_interval)def_wait_for_task_start(self,task_arn:str,cluster_arn:str,ecs_client:_ECSClient,timeout:int)->dict:""" Waits for an ECS task run to reach a RUNNING status. If a STOPPED status is reached instead, an exception is raised indicating the reason that the task run did not start. """fortaskinself._watch_task_run(task_arn,cluster_arn,ecs_client,until_status="RUNNING",timeout=timeout):# TODO: It is possible that the task has passed _through_ a RUNNING# status during the polling interval. In this case, there is not an# exception to raise.iftask["lastStatus"]=="STOPPED":code=task.get("stopCode")reason=task.get("stoppedReason")# Generate a dynamic exception type from the AWS nameraisetype(code,(RuntimeError,),{})(reason)returntaskdef_wait_for_task_finish(self,task_arn:str,cluster_arn:str,task_definition:dict,ecs_client:_ECSClient,boto_session:boto3.Session,):""" Watch an ECS task until it reaches a STOPPED status. If configured, logs from the Prefect container are streamed to stderr. Returns a description of the task on completion. """can_stream_output=Falseifself.stream_output:container_def=get_prefect_container(task_definition["containerDefinitions"])ifnotcontainer_def:self.logger.warning(f"{self._log_prefix}: Prefect container definition not found in ""task definition. Output cannot be streamed.")elifnotcontainer_def.get("logConfiguration"):self.logger.warning(f"{self._log_prefix}: Logging configuration not found on task. ""Output cannot be streamed.")elifnotcontainer_def["logConfiguration"].get("logDriver")=="awslogs":self.logger.warning(f"{self._log_prefix}: Logging configuration uses unsupported "" driver {container_def['logConfiguration'].get('logDriver')!r}. ""Output cannot be streamed.")else:# Prepare to stream the outputlog_config=container_def["logConfiguration"]["options"]logs_client=boto_session.client("logs")can_stream_output=True# Track the last log timestamp to prevent double displaylast_log_timestamp:Optional[int]=None# Determine the name of the stream as "prefix/container/run-id"stream_name="/".join([log_config["awslogs-stream-prefix"],PREFECT_ECS_CONTAINER_NAME,task_arn.rsplit("/")[-1],])self.logger.info(f"{self._log_prefix}: Streaming output from container "f"{PREFECT_ECS_CONTAINER_NAME!r}...")fortaskinself._watch_task_run(task_arn,cluster_arn,ecs_client,current_status="RUNNING"):ifself.stream_outputandcan_stream_output:# On each poll for task run status, also retrieve available logslast_log_timestamp=self._stream_available_logs(logs_client,log_group=log_config["awslogs-group"],log_stream=stream_name,last_log_timestamp=last_log_timestamp,)returntaskdef_stream_available_logs(self,logs_client:Any,log_group:str,log_stream:str,last_log_timestamp:Optional[int]=None,)->Optional[int]:""" Stream logs from the given log group and stream since the last log timestamp. Will continue on paginated responses until all logs are returned. Returns the last log timestamp which can be used to call this method in the future. """last_log_stream_token="NO-TOKEN"next_log_stream_token=None# AWS will return the same token that we send once the end of the paginated# response is reachedwhilelast_log_stream_token!=next_log_stream_token:last_log_stream_token=next_log_stream_tokenrequest={"logGroupName":log_group,"logStreamName":log_stream,}iflast_log_stream_tokenisnotNone:request["nextToken"]=last_log_stream_tokeniflast_log_timestampisnotNone:# Bump the timestamp by one ms to avoid retrieving the last log againrequest["startTime"]=last_log_timestamp+1try:response=logs_client.get_log_events(**request)exceptException:self.logger.error((f"{self._log_prefix}: Failed to read log events with request "f"{request}"),exc_info=True,)returnlast_log_timestamplog_events=response["events"]forlog_eventinlog_events:# TODO: This doesn't forward to the local logger, which can be# bad for customizing handling and understanding where the# log is coming from, but it avoid nesting logger information# when the content is output from a Prefect logger on the# running infrastructureprint(log_event["message"],file=sys.stderr)if(last_log_timestampisNoneorlog_event["timestamp"]>last_log_timestamp):last_log_timestamp=log_event["timestamp"]next_log_stream_token=response.get("nextForwardToken")ifnotlog_events:# Stop reading pages if there was no databreakreturnlast_log_timestampdef_retrieve_latest_task_definition(self,ecs_client:_ECSClient,task_definition_family:str)->Optional[dict]:try:latest_task_definition=self._retrieve_task_definition(ecs_client,task_definition_family)exceptException:# The family does not exist...returnNonereturnlatest_task_definitiondef_retrieve_task_definition(self,ecs_client:_ECSClient,task_definition_arn:str):""" Retrieve an existing task definition from AWS. """self.logger.info(f"{self._log_prefix}: Retrieving task definition {task_definition_arn!r}...")response=ecs_client.describe_task_definition(taskDefinition=task_definition_arn)returnresponse["taskDefinition"]def_register_task_definition(self,ecs_client:_ECSClient,task_definition:dict)->str:""" Register a new task definition with AWS. """# TODO: Consider including a global cache for this task definition since# registration of task definitions is frequently rate limitedtask_definition_request=copy.deepcopy(task_definition)# We need to remove some fields here if copying an existing task definitionforfieldinPOST_REGISTRATION_FIELDS:task_definition_request.pop(field,None)response=ecs_client.register_task_definition(**task_definition_request)returnresponse["taskDefinition"]["taskDefinitionArn"]def_prepare_task_definition(self,task_definition:dict,region:str)->dict:""" Prepare a task definition by inferring any defaults and merging overrides. """task_definition=copy.deepcopy(task_definition)# Configure the Prefect runtime containertask_definition.setdefault("containerDefinitions",[])container=get_prefect_container(task_definition["containerDefinitions"])ifcontainerisNone:container={"name":PREFECT_ECS_CONTAINER_NAME}task_definition["containerDefinitions"].append(container)ifself.image:container["image"]=self.image# Remove any keys that have been explicitly "unset"unset_keys={keyforkey,valueinself.env.items()ifvalueisNone}foritemintuple(container.get("environment",[])):ifitem["name"]inunset_keys:container["environment"].remove(item)ifself.configure_cloudwatch_logs:container["logConfiguration"]={"logDriver":"awslogs","options":{"awslogs-create-group":"true","awslogs-group":"prefect","awslogs-region":region,"awslogs-stream-prefix":self.nameor"prefect",**self.cloudwatch_logs_options,},}family=self.familyortask_definition.get("family")orECS_DEFAULT_FAMILYtask_definition["family"]=slugify(family,max_length=255,regex_pattern=r"[^a-zA-Z0-9-_]+",)# CPU and memory are required in some cases, retrieve the value to usecpu=self.cpuortask_definition.get("cpu")orECS_DEFAULT_CPUmemory=self.memoryortask_definition.get("memory")orECS_DEFAULT_MEMORYifself.launch_type=="FARGATE"orself.launch_type=="FARGATE_SPOT":# Task level memory and cpu are required when using fargatetask_definition["cpu"]=str(cpu)task_definition["memory"]=str(memory)# The FARGATE compatibility is required if it will be used as as launch typerequires_compatibilities=task_definition.setdefault("requiresCompatibilities",[])if"FARGATE"notinrequires_compatibilities:task_definition["requiresCompatibilities"].append("FARGATE")# Only the 'awsvpc' network mode is supported when using FARGATE# However, we will not enforce that here if the user has set itnetwork_mode=task_definition.setdefault("networkMode","awsvpc")ifnetwork_mode!="awsvpc":warnings.warn(f"Found network mode {network_mode!r} which is not compatible with "f"launch type {self.launch_type!r}. Use either the 'EC2' launch ""type or the 'awsvpc' network mode.")elifself.launch_type=="EC2":# Container level memory and cpu are required when using ec2container.setdefault("cpu",int(cpu))container.setdefault("memory",int(memory))ifself.execution_role_arnandnotself.task_definition_arn:task_definition["executionRoleArn"]=self.execution_role_arnifself.configure_cloudwatch_logsandnottask_definition.get("executionRoleArn"):raiseValueError("An execution role arn must be set on the task definition to use ""`configure_cloudwatch_logs` or `stream_logs` but no execution role ""was found on the task definition.")returntask_definitiondef_prepare_task_run_overrides(self)->dict:""" Prepare the 'overrides' payload for a task run request. """overrides={"containerOverrides":[{"name":PREFECT_ECS_CONTAINER_NAME,"environment":[{"name":key,"value":value}forkey,valuein{**self._base_environment(),**self.env,}.items()ifvalueisnotNone],}],}prefect_container_overrides=overrides["containerOverrides"][0]ifself.command:prefect_container_overrides["command"]=self.commandifself.execution_role_arn:overrides["executionRoleArn"]=self.execution_role_arnifself.task_role_arn:overrides["taskRoleArn"]=self.task_role_arnifself.memory:overrides["memory"]=str(self.memory)prefect_container_overrides.setdefault("memory",self.memory)ifself.cpu:overrides["cpu"]=str(self.cpu)prefect_container_overrides.setdefault("cpu",self.cpu)returnoverridesdef_load_vpc_network_config(self,vpc_id:Optional[str],boto_session:boto3.Session)->dict:""" Load settings from a specific VPC or the default VPC and generate a task run request's network configuration. """ec2_client=boto_session.client("ec2")vpc_message="the default VPC"ifnotvpc_idelsef"VPC with ID {vpc_id}"ifnotvpc_id:# Retrieve the default VPCdescribe={"Filters":[{"Name":"isDefault","Values":["true"]}]}else:describe={"VpcIds":[vpc_id]}vpcs=ec2_client.describe_vpcs(**describe)["Vpcs"]ifnotvpcs:help_message=("Pass an explicit `vpc_id` or configure a default VPC."ifnotvpc_idelse"Check that the VPC exists in the current region.")raiseValueError(f"Failed to find {vpc_message}. ""Network configuration cannot be inferred. "+help_message)vpc_id=vpcs[0]["VpcId"]subnets=ec2_client.describe_subnets(Filters=[{"Name":"vpc-id","Values":[vpc_id]}])["Subnets"]ifnotsubnets:raiseValueError(f"Failed to find subnets for {vpc_message}. ""Network configuration cannot be inferred.")return{"awsvpcConfiguration":{"subnets":[s["SubnetId"]forsinsubnets],"assignPublicIp":"ENABLED","securityGroups":[],}}def_prepare_task_run(self,network_config:Optional[dict],task_definition_arn:str,)->dict:""" Prepare a task run request payload. """task_run={"overrides":self._prepare_task_run_overrides(),"tags":[{"key":slugify(key,regex_pattern=_TAG_REGEX,allow_unicode=True,lowercase=False,),"value":slugify(value,regex_pattern=_TAG_REGEX,allow_unicode=True,lowercase=False,),}forkey,valueinself.labels.items()],"taskDefinition":task_definition_arn,}ifself.cluster:task_run["cluster"]=self.clusterifself.launch_type:ifself.launch_type=="FARGATE_SPOT":task_run["capacityProviderStrategy"]=[{"capacityProvider":"FARGATE_SPOT","weight":1}]else:task_run["launchType"]=self.launch_typeifnetwork_config:task_run["networkConfiguration"]=network_configtask_run=self.task_customizations.apply(task_run)returntask_rundef_run_task(self,ecs_client:_ECSClient,task_run:dict):""" Run the task using the ECS client. This is isolated as a separate method for testing purposes. """returnecs_client.run_task(**task_run)["tasks"][0]
classConfig:"""Configuration of pydantic."""# Support serialization of the 'JsonPatch' typearbitrary_types_allowed=Truejson_encoders={JsonPatch:lambdap:p.patch}
Enforces that an execution role arn is provided (or could be provided by a
runtime task definition) when configuring logging.
Source code in prefect_aws/ecs.py
528529530531532533534535536537538539540541542543
@root_validatordefcloudwatch_logs_options_requires_configure_cloudwatch_logs(cls,values:dict)->dict:""" Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging. """ifvalues.get("cloudwatch_logs_options")andnotvalues.get("configure_cloudwatch_logs"):raiseValueError("`configure_cloudwatch_log` must be enabled to use ""`cloudwatch_logs_options`.")returnvalues
@root_validatordefconfigure_cloudwatch_logs_requires_execution_role_arn(cls,values:dict)->dict:""" Enforces that an execution role arn is provided (or could be provided by a runtime task definition) when configuring logging. """if(values.get("configure_cloudwatch_logs")andnotvalues.get("execution_role_arn")# Do not raise if they've linked to another task definition or provided# it without using our shortcutsandnotvalues.get("task_definition_arn")andnot(values.get("task_definition")or{}).get("executionRoleArn")):raiseValueError("An `execution_role_arn` must be provided to use ""`configure_cloudwatch_logs` or `stream_logs`.")returnvalues
defdict(self,*args,**kwargs)->Dict:""" Convert to a dictionary. """# Support serialization of the 'JsonPatch' typed=super().dict(*args,**kwargs)d["task_customizations"]=self.task_customizations.patchreturnd
asyncdefgenerate_work_pool_base_job_template(self)->dict:""" Generate a base job template for a cloud-run work pool with the same configuration as this block. Returns: - dict: a base job template for a cloud-run work pool """base_job_template=copy.deepcopy(ECSWorker.get_default_base_job_template())forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous","task_customizations",]:continueelifkey=="aws_credentials":ifnotself.aws_credentials._block_document_id:raiseBlockNotSavedError("It looks like you are trying to use a block that"" has not been saved. Please call `.save` on your block"" before publishing it as a work pool.")base_job_template["variables"]["properties"]["aws_credentials"]["default"]={"$ref":{"block_document_id":str(self.aws_credentials._block_document_id)}}elifkey=="task_definition":base_job_template["job_configuration"]["task_definition"]=valueelifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by Cloud Run work pools."" Skipping.")ifself.task_customizations:network_config_patches=JsonPatch([patchforpatchinself.task_customizationsif"networkConfiguration"inpatch["path"]])minimal_network_config=assemble_document_for_patches(network_config_patches)ifminimal_network_config:minimal_network_config_with_patches=network_config_patches.apply(minimal_network_config)base_job_template["variables"]["properties"]["network_configuration"]["default"]=minimal_network_config_with_patches["networkConfiguration"]["awsvpcConfiguration"]try:base_job_template["job_configuration"]["task_run_request"]=self.task_customizations.apply(base_job_template["job_configuration"]["task_run_request"])exceptJsonPointerException:self.logger.warning("Unable to apply task customizations to the base job template.""You may need to update the template manually.")returnbase_job_template
@root_validator(pre=True)defimage_is_required(cls,values:dict)->dict:""" Enforces that an image is available if image is `None`. """has_image=bool(values.get("image"))has_task_definition_arn=bool(values.get("task_definition_arn"))# The image can only be null when the task_definition_arn is setifhas_imageorhas_task_definition_arn:returnvaluesprefect_container=(get_prefect_container((values.get("task_definition")or{}).get("containerDefinitions",[]))or{})image_in_task_definition=prefect_container.get("image")# If a task_definition is given with a prefect container image, use that valueifimage_in_task_definition:values["image"]=image_in_task_definition# Otherwise, it should default to the Prefect base imageelse:values["image"]=get_prefect_image_name()returnvalues
@sync_compatibleasyncdefkill(self,identifier:str,grace_seconds:int=30)->None:""" Kill a task running on ECS. Args: identifier: A cluster and task arn combination. This should match a value yielded by `ECSTask.run`. """ifgrace_seconds!=30:self.logger.warning(f"Kill grace period of {grace_seconds}s requested, but AWS does not ""support dynamic grace period configuration so 30s will be used. ""See https://docs.aws.amazon.com/AmazonECS/latest/developerguide/ecs-agent-config.html for configuration of grace periods."# noqa)cluster,task=parse_task_identifier(identifier)awaitrun_sync_in_worker_thread(self._stop_task,cluster,task)
defprepare_for_flow_run(self:Self,flow_run:"FlowRun",deployment:Optional["Deployment"]=None,flow:Optional["Flow"]=None,)->Self:""" Return an copy of the block that is prepared to execute a flow run. """new_family=None# Update the family if not specified elsewhereif(notself.familyandnotself.task_definition_arnandnot(self.task_definitionandself.task_definition.get("family"))):ifflowanddeployment:new_family=f"{ECS_DEFAULT_FAMILY}__{flow.name}__{deployment.name}"elifflowandnotdeployment:new_family=f"{ECS_DEFAULT_FAMILY}__{flow.name}"elifdeploymentandnotflow:# This is a weird case and should not be see in the wildnew_family=f"{ECS_DEFAULT_FAMILY}__unknown-flow__{deployment.name}"new=super().prepare_for_flow_run(flow_run,deployment=deployment,flow=flow)ifnew_family:returnnew.copy(update={"family":new_family})else:# Avoid an extra copy if not neededreturnnew
defpreview(self)->str:""" Generate a preview of the task definition and task run that will be sent to AWS. """preview=""task_definition_arn=self.task_definition_arnor"<registered at runtime>"ifself.task_definitionornotself.task_definition_arn:task_definition=self._prepare_task_definition(self.task_definitionor{},region=self.aws_credentials.region_nameor"<loaded from client at runtime>",)preview+="---\n# Task definition\n"preview+=yaml.dump(task_definition)preview+="\n"else:task_definition=Noneiftask_definitionandtask_definition.get("networkMode")=="awsvpc":vpc="the default VPC"ifnotself.vpc_idelseself.vpc_idnetwork_config={"awsvpcConfiguration":{"subnets":f"<loaded from {vpc} at runtime>","assignPublicIp":"ENABLED",}}else:network_config=Nonetask_run=self._prepare_task_run(network_config,task_definition_arn)preview+="---\n# Task run request\n"preview+=yaml.dump(task_run)returnpreview
@sync_compatibleasyncdefrun(self,task_status:Optional[TaskStatus]=None)->ECSTaskResult:""" Run the configured task on ECS. """boto_session,ecs_client=awaitrun_sync_in_worker_thread(self._get_session_and_client)(task_arn,cluster_arn,task_definition,is_new_task_definition,)=awaitrun_sync_in_worker_thread(self._create_task_and_wait_for_start,boto_session,ecs_client)# Display a nice message indicating the command and imagecommand=self.commandorget_prefect_container(task_definition["containerDefinitions"]).get("command",[])self.logger.info(f"{self._log_prefix}: Running command {' '.join(command)!r} "f"in container {PREFECT_ECS_CONTAINER_NAME!r} ({self.image})...")# The task identifier is "{cluster}::{task}" where we use the configured cluster# if set to preserve matching by name rather than arn# Note "::" is used despite the Prefect standard being ":" because ARNs contain# single colons.identifier=(self.clusterifself.clusterelsecluster_arn)+"::"+task_arniftask_status:task_status.started(identifier)status_code=awaitrun_sync_in_worker_thread(self._watch_task_and_get_exit_code,task_arn,cluster_arn,task_definition,is_new_task_definitionandself.auto_deregister_task_definition,boto_session,ecs_client,)returnECSTaskResult(identifier=identifier,# If the container does not start the exit code can be null but we must# still report a status code. We use a -1 to indicate a special code.status_code=status_codeifstatus_codeisnotNoneelse-1,)
Streaming output generally requires CloudWatch logs to be configured.
To avoid entangled arguments in the simple case, configure_cloudwatch_logs
defaults to matching the value of stream_output.
Source code in prefect_aws/ecs.py
493494495496497498499500501502503504
@root_validator(pre=True)defset_default_configure_cloudwatch_logs(cls,values:dict)->dict:""" Streaming output generally requires CloudWatch logs to be configured. To avoid entangled arguments in the simple case, `configure_cloudwatch_logs` defaults to matching the value of `stream_output`. """configure_cloudwatch_logs=values.get("configure_cloudwatch_logs")ifconfigure_cloudwatch_logsisNone:values["configure_cloudwatch_logs"]=values.get("stream_output")returnvalues
Extract a container from a list of containers or container definitions.
If not found, None is returned.
Source code in prefect_aws/ecs.py
188189190191192193194195196
defget_container(containers:List[dict],name:str)->Optional[dict]:""" Extract a container from a list of containers or container definitions. If not found, `None` is returned. """forcontainerincontainers:ifcontainer.get("name")==name:returncontainerreturnNone
Extract the Prefect container from a list of containers or container definitions.
If not found, None is returned.
Source code in prefect_aws/ecs.py
180181182183184185
defget_prefect_container(containers:List[dict])->Optional[dict]:""" Extract the Prefect container from a list of containers or container definitions. If not found, `None` is returned. """returnget_container(containers,PREFECT_ECS_CONTAINER_NAME)
Splits identifier into its cluster and task components, e.g.
input "cluster_name::task_arn" outputs ("cluster_name", "task_arn").
Source code in prefect_aws/ecs.py
199200201202203204205
defparse_task_identifier(identifier:str)->Tuple[str,str]:""" Splits identifier into its cluster and task components, e.g. input "cluster_name::task_arn" outputs ("cluster_name", "task_arn"). """cluster,task=identifier.split("::",maxsplit=1)returncluster,task