Thursday, December 31, 2020

AWS CDK (Python) Create Step Functions for Orchestrating Lambda and Glue Tasks

 AWS Step Functions is a serverless function orchestrator that makes it easy to sequence AWS Lambda functions and multiple AWS services into business-critical applications. Through its visual interface, you can create and run a series of checkpointed and event-driven workflows that maintain the application state. The output of one step acts as an input to the next. Each step in your application executes in order, as defined by your business logic.

Orchestrating a series of individual serverless applications, managing retries, and debugging failures can be challenging. As your distributed applications become more complex, the complexity of managing them also grows. Step Functions automatically manages error handling, retry logic, and state. With its built-in operational controls, Step Functions manages sequencing, error handling, retry logic, and state, removing a significant operational burden from your team.



Setup

The cdk.json file tells the CDK Toolkit how to execute your app.

This project is set up like a standard Python project. The initialization process also creates a virtualenv within this project, stored under the .env directory. To create the virtualenv it assumes that there is a python3 (or python for Windows) executable in your path with access to the venv package. If for any reason the automatic creation of the virtualenv fails, you can create the virtualenv manually.

To manually create a virtualenv on MacOS and Linux:

$ python3 -m venv .env

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

$ source .env/bin/activate

If you are a Windows platform, you would activate the virtualenv like this:

% .env\Scripts\activate.bat

Once the virtualenv is activated, you can install the required dependencies.

$ 

At this point you can now synthesize the CloudFormation template for this code.

$ cdk synth

To add additional dependencies, for example other CDK libraries, just add them to your setup.py file and rerun the pip install -r requirements.txt command.

This thread helps to create an AWS Step Functions StateMachine with the Python language bindings for CDK.


app.py

from aws_cdk import (
    aws_stepfunctions as sfn,
    aws_stepfunctions_tasks as sfn_tasks,
    core,
)


class JobPollerStack(core.Stack):
    def __init__(self, app: core.App, id: str, **kwargs) -> None:
        super().__init__(app, id, **kwargs)

        submit_job_activity = sfn.Activity(
            self, "SubmitJob"
        )
        check_job_activity = sfn.Activity(
            self, "CheckJob"
        )
        do_mapping_activity1 = sfn.Activity(
            self, "MapJOb1"
        )
        do_mapping_activity2 = sfn.Activity(
            self, "MapJOb2"
        )

        submit_job = sfn.Task(
            self, "Submit Job",
            task=sfn_tasks.InvokeActivity(submit_job_activity),
            result_path="$.guid",
        )

        task1 = sfn.Task(
            self, "Task 1 in Mapping",
            task=sfn_tasks.InvokeActivity(do_mapping_activity1),
            result_path="$.guid",
        )

        task2 = sfn.Task(
            self, "Task 2 in Mapping",
            task=sfn_tasks.InvokeActivity(do_mapping_activity2),
            result_path="$.guid",
        )

        wait_x = sfn.Wait(
            self, "Wait X Seconds",
            time=sfn.WaitTime.seconds_path('$.wait_time'),
        )
        get_status = sfn.Task(
            self, "Get Job Status",
            task=sfn_tasks.InvokeActivity(check_job_activity),
            input_path="$.guid",
            result_path="$.status",
        )
        is_complete = sfn.Choice(
            self, "Job Complete?"
        )
        job_failed = sfn.Fail(
            self, "Job Failed",
            cause="AWS Batch Job Failed",
            error="DescribeJob returned FAILED"
        )
        final_status = sfn.Task(
            self, "Get Final Job Status",
            task=sfn_tasks.InvokeActivity(check_job_activity),
            input_path="$.guid",
        )

        definition_map = task1.next(task2)

        process_map = sfn.Map(
            self, "Process_map",
            max_concurrency=10
        ).iterator(definition_map)

        definition = submit_job \
            .next(process_map) \
            .next(wait_x) \
            .next(get_status) \
            .next(is_complete
                  .when(sfn.Condition.string_equals(
                    "$.status", "FAILED"), job_failed)
                  .when(sfn.Condition.string_equals(
                    "$.status", "SUCCEEDED"), final_status)
                  .otherwise(wait_x))

        sfn.StateMachine(
            self, "StateMachine",
            definition=definition,
            timeout=core.Duration.seconds(30),
        )


cdk.json
{
    "app": "python3 app.py"
}

Useful Commands

  • cdk ls list all stacks in the app
  • cdk synth emits the synthesized CloudFormation template
  • cdk deploy deploy this stack to your default AWS account/region
  • cdk diff compare deployed stack with current state
  • cdk docs open CDK documentation

The cdk.json file tells the CDK Toolkit how to execute your app.


To manually create a virtualenv on MacOS and Linux:

$ python  -m venv .venv

After the init process completes and the virtualenv is created, you can use the following step to activate your virtualenv.

$ source .venv/Scripts/activate

If you are a Windows platform, you would activate the virtualenv like this:

% .venv\Scripts\activate.bat

Once the virtualenv is activated, you can install the required dependencies.

$ pip install -r requirements.txt

At this point you can now synthesize the CloudFormation template for this code.

$ cdk synth


Sentiment Analysis using NLP - Java SDK for Amazon Bedrock/Amazon Sagemaker

Sentiment analysis is a natural language processing (NLP) technique used to determine the sentiment or emotional tone expressed in a piece o...