As the quantity and complexity of your knowledge processing pipelines improve, you possibly can simplify the general course of by decomposing it right into a sequence of smaller duties and coordinate the execution of those duties as a part of a workflow. To take action, many builders and knowledge engineers use Apache Airflow, a platform created by the neighborhood to programmatically writer, schedule, and monitor workflows. With Airflow you possibly can handle workflows as scripts, monitor them by way of the consumer interface (UI), and lengthen their performance by means of a set of highly effective plugins. Nonetheless, manually putting in, sustaining, and scaling Airflow, and on the similar time dealing with safety, authentication, and authorization for its customers takes a lot of the time you’d reasonably use to give attention to fixing precise enterprise issues.

For these causes, I’m glad to announce the supply of Amazon Managed Workflows for Apache Airflow (MWAA), a totally managed service that makes it simple to run open-source variations of Apache Airflow on AWS, and to construct workflows to execute your extract-transform-load (ETL) jobs and knowledge pipelines.

Airflow workflows retrieve enter from sources like Amazon Simple Storage Service (S3) utilizing Amazon Athena queries, carry out transformations on Amazon EMR clusters, and may use the ensuing knowledge to coach machine studying fashions on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) utilizing the Python programming language.

A key advantage of Airflow is its open extensibility by means of plugins which lets you create duties that work together with AWS or on-premise assets required in your workflows together with AWS Batch, Amazon CloudWatch, Amazon DynamoDB, AWS DataSync, Amazon ECS and AWS Fargate, Amazon Elastic Kubernetes Service (EKS), Amazon Kinesis Firehose, AWS Glue, AWS Lambda, Amazon Redshift, Amazon Simple Queue Service (SQS), and Amazon Simple Notification Service (SNS).

To enhance observability, Airflow metrics might be printed as CloudWatch Metrics, and logs might be despatched to CloudWatch Logs. Amazon MWAA supplies computerized minor model upgrades and patches by default, with an choice to designate a upkeep window by which these upgrades are carried out.

You need to use Amazon MWAA with these three steps:

  1. Create an atmosphere – Every atmosphere accommodates your Airflow cluster, together with your scheduler, staff, and net server.
  2. Add your DAGs and plugins to S3 – Amazon MWAA hundreds the code into Airflow mechanically.
  3. Run your DAGs in Airflow – Run your DAGs from the Airflow UI or command line interface (CLI) and monitor your atmosphere with CloudWatch.

Let’s see how this works in apply!

The right way to Create an Airflow Surroundings Utilizing Amazon MWAA
Within the Amazon MWAA console, I click on on Create atmosphere. I give the atmosphere a reputation and choose the Airflow model to make use of.

Then, I choose the S3 bucket and the folder to load my DAG code. The bucket title should begin with airflow-.

Optionally, I can specify a plugins file and a necessities file:

  • The plugins file is a ZIP file containing the plugins utilized by my DAGs.
  • The necessities file describes the Python dependencies to run my DAGs.

For plugins and necessities, I can choose the S3 object version to make use of. In case the plugins or the necessities I take advantage of create a non-recoverable error in my atmosphere, Amazon MWAA will mechanically roll again to the earlier working model.


I click on Subsequent to configure the superior settings, beginning with networking. Every atmosphere runs in a Amazon Virtual Private Cloud utilizing non-public subnets in two availability zones. Net server entry to the Airflow UI is at all times protected by a safe login utilizing AWS Identity and Access Management (IAM). Nonetheless, you possibly can select to have net server entry on a public community as a way to login over the Web, or on a non-public community in your VPC. For simplicity, I choose a Public community. I let Amazon MWAA create a brand new security group with the proper inbound and outbound guidelines. Optionally, I can add a number of present safety teams to fine-tune management of inbound and outbound site visitors in your atmosphere.

Now, I configure my atmosphere class. Every atmosphere features a scheduler, an online server, and a employee. Employees mechanically scale up and down in accordance with my workload. We offer you a suggestion on which class to make use of primarily based on the variety of DAGs, however you possibly can monitor the load in your atmosphere and modify its class at any time.

Encryption is at all times enabled for knowledge at relaxation, and whereas I can choose a custom-made key managed by AWS Key Management Service (KMS) I’ll as an alternative hold the default key that AWS owns and manages on my behalf.

For monitoring, I publish atmosphere efficiency to CloudWatch Metrics. That is enabled by default, however I can disable CloudWatch Metrics after launch. For the logs, I can specify the log stage and which Airflow elements ought to ship their logs to CloudWatch Logs. I go away the default to ship solely the duty logs and use log stage INFO.

I can modify the default settings for Airflow configuration choices, similar to default_task_retries or worker_concurrency. For now, I’m not altering these values.

Lastly, however most significantly, I configure the permissions that can be utilized by my atmosphere to entry my DAGs, write logs, and run DAGs accessing different AWS assets. I choose Create a brand new function and click on on Create atmosphere. After a couple of minutes, the brand new Airflow atmosphere is prepared for use.

Utilizing the Airflow UI
Within the Amazon MWAA console, I search for the brand new atmosphere I simply created and click on on Open Airflow UI. A brand new browser window is created and I’m authenticated with a safe login by way of AWS IAM.

There, I search for a DAG that I placed on S3 within the movie_list_dag.py file. The DAG is downloading the MovieLens dataset, processing the information on S3 utilizing Amazon Athena, and loading the end result to a Redshift cluster, creating the desk if lacking.

Right here’s the total supply code of the DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators import HttpSensor, S3KeySensor
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
from io import StringIO
from io import BytesIO
from time import sleep
import csv
import requests
import json
import boto3
import zipfile
import io
s3_bucket_name = 'my-bucket'
s3_key='information/'
redshift_cluster='my-redshift-cluster'
redshift_db='dev'
redshift_dbuser='awsuser'
redshift_table_name='movie_demo'
test_http='https://grouplens.org/datasets/movielens/newest/'
download_http='http://information.grouplens.org/datasets/movielens/ml-latest-small.zip'
athena_db='demo_athena_db'
athena_results='athena-results/'
create_athena_movie_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Movies (
  `movieId` int,
  `title` string,
  `genres` string 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'area.delim' = ','
) LOCATION 's3://my-bucket/information/ml-latest-small/motion pictures.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.depend'='1'
); 
"""
create_athena_ratings_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Ratings (
  `userId` int,
  `movieId` int,
  `score` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'area.delim' = ','
) LOCATION 's3://my-bucket/information/ml-latest-small/scores.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.depend'='1'
); 
"""
create_athena_tags_table_query="""
CREATE EXTERNAL TABLE IF NOT EXISTS Demo_Athena_DB.ML_Latest_Small_Tags (
  `userId` int,
  `movieId` int,
  `tag` int,
  `timestamp` bigint 
)
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe'
WITH SERDEPROPERTIES (
  'serialization.format' = ',',
  'area.delim' = ','
) LOCATION 's3://my-bucket/information/ml-latest-small/tags.csv/ml-latest-small/'
TBLPROPERTIES (
  'has_encrypted_data'='false',
  'skip.header.line.depend'='1'
); 
"""
join_tables_athena_query="""
SELECT REPLACE ( m.title , '"' , '' ) as title, r.score
FROM demo_athena_db.ML_Latest_Small_Movies m
INNER JOIN (SELECT score, movieId FROM demo_athena_db.ML_Latest_Small_Ratings WHERE score > 4) r on m.movieId = r.movieId
"""
def download_zip():
    s3c = boto3.shopper('s3')
    indata = requests.get(download_http)
    n=0
    with zipfile.ZipFile(io.BytesIO(indata.content material)) as z:       
        zList=z.namelist()
        print(zList)
        for i in zList: 
            print(i) 
            zfiledata = BytesIO(z.learn(i))
            n += 1
            s3c.put_object(Bucket=s3_bucket_name, Key=s3_key+i+'/'+i, Physique=zfiledata)
def clean_up_csv_fn(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey=athena_results+"join_athena_tables/"+queryId+".csv"
    print(athenaKey)
    cleanKey=athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    s3c = boto3.shopper('s3')
    obj = s3c.get_object(Bucket=s3_bucket_name, Key=athenaKey)
    infileStr=obj['Body'].learn().decode('utf-8')
    outfileStr=infileStr.exchange('"e"', '') 
    outfile = StringIO(outfileStr)
    s3c.put_object(Bucket=s3_bucket_name, Key=cleanKey, Physique=outfile.getvalue())
def s3_to_redshift(**kwargs):    
    ti = kwargs['task_instance']
    queryId = ti.xcom_pull(key='return_value', task_ids='join_athena_tables' )
    print(queryId)
    athenaKey='s3://'+s3_bucket_name+"/"+athena_results+"join_athena_tables/"+queryId+"_clean.csv"
    print(athenaKey)
    sqlQuery="copy "+redshift_table_name+" from '"+athenaKey+"' iam_role 'arn:aws:iam::163919838948:function/myRedshiftRole' CSV IGNOREHEADER 1;"
    print(sqlQuery)
    rsd = boto3.shopper('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql=sqlQuery
    )
    print(resp)
    return "OK"
def create_redshift_table():
    rsd = boto3.shopper('redshift-data')
    resp = rsd.execute_statement(
        ClusterIdentifier=redshift_cluster,
        Database=redshift_db,
        DbUser=redshift_dbuser,
        Sql="CREATE TABLE IF NOT EXISTS "+redshift_table_name+" (title	character various, score	int);"
    )
    print(resp)
    return "OK"
DEFAULT_ARGS = 
    'proprietor': 'airflow',
    'depends_on_past': False,
    'e mail': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False 

with DAG(
    dag_id='movie-list-dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    start_date=days_ago(2),
    schedule_interval='*/10 * * * *',
    tags=['athena','redshift'],
) as dag:
    check_s3_for_key = S3KeySensor(
        task_id='check_s3_for_key',
        bucket_key=s3_key,
        wildcard_match=True,
        bucket_name=s3_bucket_name,
        s3_conn_id='aws_default',
        timeout=20,
        poke_interval=5,
        dag=dag
    )
    files_to_s3 = PythonOperator(
        task_id="files_to_s3",
        python_callable=download_zip
    )
    create_athena_movie_table = AWSAthenaOperator(task_id="create_athena_movie_table",question=create_athena_movie_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_movie_table')
    create_athena_ratings_table = AWSAthenaOperator(task_id="create_athena_ratings_table",question=create_athena_ratings_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_ratings_table')
    create_athena_tags_table = AWSAthenaOperator(task_id="create_athena_tags_table",question=create_athena_tags_table_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'create_athena_tags_table')
    join_athena_tables = AWSAthenaOperator(task_id="join_athena_tables",question=join_tables_athena_query, database=athena_db, output_location='s3://'+s3_bucket_name+"/"+athena_results+'join_athena_tables')
    create_redshift_table_if_not_exists = PythonOperator(
        task_id="create_redshift_table_if_not_exists",
        python_callable=create_redshift_table
    )
    clean_up_csv = PythonOperator(
        task_id="clean_up_csv",
        python_callable=clean_up_csv_fn,
        provide_context=True     
    )
    transfer_to_redshift = PythonOperator(
        task_id="transfer_to_redshift",
        python_callable=s3_to_redshift,
        provide_context=True     
    )
    check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
    files_to_s3 >> create_athena_ratings_table >> join_athena_tables
    files_to_s3 >> create_athena_tags_table >> join_athena_tables
    files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

Within the code, completely different duties are created utilizing operators like PythonOperator, for generic Python code, or AWSAthenaOperator, to make use of the mixing with Amazon Athena. To see how these duties are linked within the workflow, you possibly can see the newest few strains, that I repeat right here (with out indentation) for simplicity:

check_s3_for_key >> files_to_s3 >> create_athena_movie_table >> join_athena_tables >> clean_up_csv >> transfer_to_redshift
files_to_s3 >> create_athena_ratings_table >> join_athena_tables
files_to_s3 >> create_athena_tags_table >> join_athena_tables
files_to_s3 >> create_redshift_table_if_not_exists >> transfer_to_redshift

The Airflow code is overloading the precise shift >> operator in Python to create a dependency, that means that the duty on the left ought to be executed first, and the output handed to the duty on the precise. Wanting on the code, that is fairly simple to learn. Every of the 4 strains above is including dependencies, and they’re all evaluated collectively to execute the duties in the precise order.

Within the Airflow console, I can see a graph view of the DAG to have a transparent illustration of how duties are executed:

Out there Now
Amazon Managed Workflows for Apache Airflow (MWAA) is on the market in the present day in US East (Northern Virginia), US West (Oregon), US East (Ohio), Asia Pacific (Singapore), Asia Pacific (Tokyo), Asia Pacific (Sydney), Europe (Eire), Europe (Frankfurt), and Europe (Stockholm). You possibly can launch a brand new Amazon MWAA atmosphere from the console, AWS Command Line Interface (CLI), or AWS SDKs. Then, you possibly can develop workflows in Python utilizing Airflow’s ecosystem of integrations.

With Amazon MWAA, you pay primarily based on the atmosphere class and the employees you utilize. For extra data, see the pricing page.

Upstream compatibility is a core tenet of Amazon MWAA. Our code adjustments to the AirFlow platform are launched again to open supply.

With Amazon MWAA you possibly can spend extra time constructing workflows in your engineering and knowledge science duties, and fewer time managing and scaling the infrastructure of your Airflow platform.

Learn more about Amazon MWAA and get started today!

Danilo





Leave a Reply

Your email address will not be published. Required fields are marked *