All posts by Behram Irani

Query data in Amazon OpenSearch Service using SQL from Amazon Athena

Post Syndicated from Behram Irani original https://aws.amazon.com/blogs/big-data/query-data-in-amazon-opensearch-service-using-sql-from-amazon-athena/

Amazon Athena is an interactive serverless query service to query data from Amazon Simple Storage Service (Amazon S3) in standard SQL. Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) is a fully managed, open-source, distributed search and analytics suite derived from Elasticsearch, allowing you to run OpenSearch Service or Elasticsearch clusters at scale without having to manage hardware provisioning, software installation, patching, backups, and so on.

Although both services have their own use cases, there might be situations when you want to run queries that combine data in Amazon S3 with data in an Amazon OpenSearch Service Cluster. In such cases, federated queries in Athena are a good option—they provide the capability to combine data from multiple data sources and analyze them in a single query. The federated query feature works by using data source connectors that are built for different data sources. To allow Amazon OpenSearch Service to be one of the sources that Athena can query against, AWS has made available a data source connector for OpenSearch Service clusters to be queried from Athena.

This post demonstrates how to query data in Amazon OpenSearch Service and Amazon S3 in a single query. We use the data made available in the public COVID-19 data lake, documented as part of the post A public data lake for analysis of COVID-19 data.

In particular, we use the following two datasets:

  1. alleninstitute_metadata : Metadata on papers pulled from the COVID-19 Open Research Dataset (CORD-19). The sha column indicates the paper ID, which is the file name of the paper in the data lake. This dataset is stored in Amazon OpenSearch Service because it contains the column abstract, which you can search on.
  2. alleninstitute_comprehend_medical : Results containing annotations obtained by running the papers in the preceding dataset through Amazon Comprehend Medical. This is accessed from its public storage at s3://covid19-lake/alleninstitute/CORD19/comprehendmedical/comprehend_medical.json.

Data flow when combining data from Amazon OpenSearch Service and Amazon S3

The data source connectors are implemented as AWS Lambda functions. When a user issues a query that combines data from Amazon OpenSearch Service and Amazon S3, Athena refers to the AWS Glue Data Catalog metadata to look up the table definitions. For the table whose data is in Amazon S3, Athena fetches the data from Amazon S3. For the tables that are in Amazon OpenSearch Service, Athena invokes the Lambda function (part of the data source connector application) to read the data from Amazon OpenSearch Service. Depending on the amount of data, you can invoke this function multiple times in parallel for the same query to enable faster reads.

The following diagram illustrates this data flow.

Set up the two data sources using AWS CloudFormation

To prepare for querying both data sources, launch the AWS CloudFormation template using the “Launch Stack” button below. All you need to do is choose Create stack.

To run the CloudFormation stack, you need to be logged in to an AWS account with permissions to do the following:

  • Create a CloudFormation stack
  • Create an Identity and Access Management (IAM) role
  • Create a Lambda function, assign an IAM role to it, and invoke it
  • Launch an OpenSearch Service cluster
  • Create AWS Glue databases and table
  • Create an S3 bucket

For instructions on creating a CloudFormation stack, see Get started.

For more information about controlling permissions and access for these services, see the following resources:

The CloudFormation template creates the following:

  • A table in the AWS Glue Data Catalog named alleninstitute_comprehend_medical that points to the S3 location s3://covid19-lake/alleninstitute/CORD19/comprehendmedical/comprehend_medical.json. This contains the results extracted from the CORD-19 data using the natural language processing service Amazon Comprehend Medical.
  • An S3 bucket with the name athena-es-connector-spill-bucket- followed by the first few characters from the stack ID to keep the bucket name unique.
  • An OpenSearch Service cluster with the name es-alleninstitute-data, which has two instances configured to allow a role to access the cluster.
  • An IAM role to access the OpenSearch Service cluster.
  • A Lambda function that contains a piece of Python code that reads all the metadata of the papers along with the abstract. This data is available as JSON at s3://covid19-lake/alleninstitute/CORD19/json/metadata/. For this post, we load just one of the four JSON files available.
  • A custom resource that invokes the Lambda function to load the data into the OpenSearch Service cluster.

The stack can take 15–30 minutes to complete.

When the stack is fully deployed, navigate to the Outputs tab of the stack and note the name of the S3 bucket created (the value for SpillBucket).

For the rest of the steps, you need permissions to do the following:

Deploy the Amazon Athena OpenSearch connector

When the OpenSearch Service domain with an index containing the metadata related to the COVID-19 research papers and the AWS Glue table pointing to the Amazon Comprehend Medical output data is ready, you can deploy the Amazon Athena OpenSearch connector using the AWS Serverless Application Repository.

  1. On the AWS Serverless Application Repository console, choose Available applications.
  2. Search for Athena Elasticsearch and select Show apps that create custom IAM roles or resource policies.
  3. Choose AthenaElasticsearchConnector.

You’re redirected to the application screen.

  1. Scroll down to the Application settings section.
  2. For AthenaCatalogName, enter a name (for this post, we use es-connector).

This name is the name of the application and the Lambda function that connects to Amazon OpenSearch Service every time you run a query from Athena. For more details about all the parameters, refer to the connector’s GitHub page.

  1. For SpillBucket, enter the name you noted in the previous section when we deployed the CloudFormation stack (it begins with athena-es-connector-spill-bucket).
  2. Leave all other settings as default.
  3. Select I acknowledge that this app creates custom IAM roles.
  4. Choose Deploy.

In a few seconds, you’re redirected to the Applications page. You can see the status of your deployment on the Deployments tab. The deployment takes 1–2 minutes to complete.

Create a new data source in Athena

Now that the connector application has been deployed, it’s time to set up the OpenSearch Service domain to show as a catalog on Athena.

  1. On the Athena console, navigate to the cord19 database.

The database contains the table alleninstitute_comprehend_medical, which was created as part of the CloudFormation template. This refers to the data sitting in Amazon S3 at s3://covid19-lake/alleninstitute/CORD19/comprehendmedical/.

  1. Choose Data sources in the navigation pane.
  2. Choose Connect data source.
  3. Select Custom data source.
  4. For Data source name, enter a name (for example, es-cord19-catalog).
  5. Select Use an existing Lambda function and choose es-connector on the drop-down menu.
  6. Choose Connect data source.
  7. Choose Next.
  8. For Lambda function, choose es-connector.
  9. Choose Connect.

A new catalog es-cord19-catalog should now be available, as in the following screenshot.

  1. On the Query editor tab, for Data source, choose es-cord19-catalog.

You can now query this data source from Athena.

Query OpenSearch Service domains from Athena

When you choose the es-cord19-catalog data source, the Lambda function (which was part of the connector application that we deployed) gets invoked and fetches the details about the domain and the index. The OpenSearch Service domain shows up as a database, and the index is shown as a table. You can also query the table with the following query:

select count(*) from "es-cord19-catalog"."es-alleninstitute-data".alleninstitute_metadata

Now you can join data from both Amazon OpenSearch Service and Amazon S3 with queries, such as the following:

select es.title, es.url from 
"es-cord19-catalog"."es-alleninstitute-data".alleninstitute_metadata es
    inner join
AwsDataCatalog.cord19.alleninstitute_comprehend_medical s3
    on es.sha = s3.paper_id
WHERE 
    array_join(s3.dx_name, ',') like '%infectious disease%'

The preceding query gets the title and the URL of all the research papers where the diagnosis was related to infectious diseases.

The following screenshot shows the query results.

Clean up

To clean up the resources created as part of this post, complete the following steps:

  1. On the Amazon S3 console, locate and select your S3 bucket (the same bucket you noted from the CloudFormation stack).
  2. Choose Empty.

You can also achieve this by running the following command from a command line:

aws s3 rm s3://athena-es-connector-spill-bucket-f7eb2cb0 –recursive
  1. On the AWS CloudFormation console, delete the stack you created.
  2. Delete the stack created for the Amazon Athena OpenSearch connector application. The default name is serverlessrepo-AthenaElasticsearchConnector.
  3. On the Athena console, delete the es-cord19-catalog data source.

You can also delete the data source with the following command:

aws athena delete-data-catalog --name "es-cord19-catalog"

Conclusion

In this post, we saw how to combine data from OpenSearch Service clusters with other data sources like Amazon S3 to run federated queries. You can apply this solution to other use cases, such as combining AWS CloudTrail logs loaded into OpenSearch Service clusters with VPC flow logs data in Amazon S3 to analyze unusual network traffic, or combining product reviews data in Amazon OpenSearch Service with product data in Amazon S3 or other data sources. You can also pull data from Amazon OpenSearch Service and create an AWS Glue table out of it using a CTAS query in Athena.

To learn more about the Amazon Athena OpenSearch connector and its other configuration options, see the GitHub repo.

To learn more about query federation in Athena, refer to Using Amazon Athena Federated Query or Query any data source with Amazon Athena’s new federated query.


About the Authors

Behram Irani, Sr Analytics Solutions Architect

Madhav Vishnubhatta, Sr Technical Account Manager

Build and orchestrate ETL pipelines using Amazon Athena and AWS Step Functions

Post Syndicated from Behram Irani original https://aws.amazon.com/blogs/big-data/build-and-orchestrate-etl-pipelines-using-amazon-athena-and-aws-step-functions/

Extract, transform, and load (ETL) is the process of reading source data, applying transformation rules to this data, and loading it into the target structures. ETL is performed for various reasons. Sometimes ETL helps align source data to target data structures, whereas other times ETL is done to derive business value by cleansing, standardizing, combining, aggregating, and enriching datasets. You can perform ETL in multiple ways; the most popular choices being:

  • Programmatic ETL using Apache Spark. Amazon EMR and AWS Glue both support this model.
  • SQL ETL using Apache Hive or PrestoDB/Trino. Amazon EMR supports both these tools.
  • Third-party ETL products.

Many organizations prefer the SQL ETL option because they already have developers who understand and write SQL queries. However, these developers want to focus on writing queries and not worry about setting up and managing the underlying infrastructure.

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

This post explores how you can use Athena to create ETL pipelines and how you can orchestrate these pipelines using AWS Step Functions.

Architecture overview

The following diagram illustrates our architecture.

The source data first gets ingested into an S3 bucket, which preserves the data as is. You can ingest this data in Amazon S3 multiple ways:

After the source data is in Amazon S3 and assuming that it has a fixed structure, you can either run an AWS Glue crawler to automatically generate the schema or you can provide the DDL as part of your ETL pipeline. An AWS Glue crawler is the primary method used by most AWS Glue users. You can use a crawler to populate the AWS Glue Data Catalog with tables. A crawler can crawl multiple data stores in a single run. Upon completion, the crawler creates or updates one or more tables in your Data Catalog. Athena uses this catalog to run queries against the tables.

After the raw data is cataloged, the source-to-target transformation is done through a series of Athena Create Table as Select (CTAS) and INSERT INTO statements. The transformed data is loaded into another S3 bucket. The files are also partitioned and converted into Parquet format to optimize performance and cost.

Prepare your data

For this post, we use the NYC taxi public dataset. It has the data of trips taken by taxis and for-hire vehicles in New York City organized in CSV files by each month of the year starting from 2009. For our ETL pipeline, we use two files containing yellow taxi data: one for demonstrating the initial table creation and loading this table using CTAS, and the other for demonstrating the ongoing data inserts into this table using the INSERT INTO statement. We also use a lookup file to demonstrate join, transformation, and aggregation in this ETL pipeline.

  1. Create a new S3 bucket with a unique name in your account.

You use this bucket to copy the raw data from the NYC taxi public dataset and store the data processed by Athena ETL.

  1. Create the S3 prefixes athena, nyctaxidata/data, nyctaxidata/lookup, nyctaxidata/optimized-data, and nyctaxidata/optimized-data-lookup inside this newly created bucket.

These prefixes are used in the Step Functions code provided later in this post.

  1. Copy the yellow taxi data files from the nyc-tlc public bucket described in the NYC taxi public dataset registry for January and February 2020 into the nyctaxidata/data prefix of the S3 bucket you created in your account.
  2. Copy the lookup file into the nyctaxidata/lookup prefix you created.

Create an ETL pipeline using Athena integration with Step Functions

Step Functions is a low-code visual workflow service used to orchestrate AWS services, automate business processes, and build serverless applications. Through its visual interface, you can create and run a series of checkpointed and event-driven workflows that maintain the application state. The output of one step acts as an input to the next. Each step in your application runs in order, as defined by your business logic.

The Step Functions service integration with Athena enables you to use Step Functions to start and stop query runs, and get query results.

For the ETL pipeline in this post, we keep the flow simple; however, you can build a complex flow using different features of Step Functions.

The flow of the pipeline is as follows:

  1. Create a database if it doesn’t already exist in the Data Catalog. Athena by default uses the Data Catalog as its metastore.
  2. If no tables exist in this database, take the following actions:
    1. Create the table for the raw yellow taxi data and the raw table for the lookup data.
    2. Use CTAS to create the target tables and use the raw tables created in the previous step as input in the SELECT statement. CTAS also partitions the target table by year and month, and creates optimized Parquet files in the target S3 bucket.
    3. Use a view to demonstrate the join and aggression parts of ETL.
  3. If any table exists in this database, iterate though the list of all the remaining CSV files and process by using the INSERT INTO statement.

Different use cases may make the ETL pipeline quite complex. You may be getting continuous data from the source either with AWS DMS in batch or CDC mode or by Kinesis in streaming mode. This requires mechanisms in place to process all such files during a particular window and mark it as complete so that the next time the pipeline is run, it processes only the newly arrived files. Instead of manually adding DDL in the pipeline, you can add AWS Glue crawler steps in the Step Functions pipeline to create a schema for the raw data; and instead of a view to aggregate data, you may have to create a separate table to keep the results ready for consumption. Also, many use cases get change data as part of the feed, which needs to be merged with the target datasets. Extra steps in the Step Functions pipeline are required to process such data on a case-by-case basis.

The following code for the Step Functions pipeline covers the preceding flow we described. For more details on how to get started with Step Functions, refer the tutorials. Replace the S3 bucket names with the unique bucket name you created in your account.

{
  "Comment": "An example of using Athena to query logs, get query results and send results through notification.",
  "StartAt": "Create Glue DB",
  "States": {
    "Create Glue DB": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE DATABASE if not exists nyctaxidb",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Table Lookup"
    },
    "Run Table Lookup": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "show tables in nyctaxidb",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Get lookup query results"
    },
    "Get lookup query results": {
      "Resource": "arn:aws:states:::athena:getQueryResults",
      "Parameters": {
        "QueryExecutionId.$": "$.QueryExecution.QueryExecutionId"
      },
      "Type": "Task",
      "Next": "ChoiceStateFirstRun"
    },
    "ChoiceStateFirstRun": {
      "Comment": "Based on the input table name, a choice is made for moving to the next step.",
      "Type": "Choice",
      "Choices": [
        {
          "Not": {
            "Variable": "$.ResultSet.Rows[0].Data[0].VarCharValue",
            "IsPresent": true
          },
          "Next": "Run Create data Table Query"
        },
        {
          "Variable": "$.ResultSet.Rows[0].Data[0].VarCharValue",
          "IsPresent": true,
          "Next": "Check All Tables"
        }
      ],
      "Default": "Check All Tables"
    },
    "Run Create data Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE EXTERNAL TABLE nyctaxidb.yellowtaxi_data_csv(  vendorid bigint,   tpep_pickup_datetime string,   tpep_dropoff_datetime string,   passenger_count bigint,   trip_distance double,   ratecodeid bigint,   store_and_fwd_flag string,   pulocationid bigint,   dolocationid bigint,   payment_type bigint,   fare_amount double,   extra double,   mta_tax double,   tip_amount double,   tolls_amount double,   improvement_surcharge double,   total_amount double,   congestion_surcharge double) ROW FORMAT DELIMITED   FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT   'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION  's3://MY-BUCKET/nyctaxidata/data/' TBLPROPERTIES (  'skip.header.line.count'='1')",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create lookup Table Query"
    },
    "Run Create lookup Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE EXTERNAL TABLE nyctaxidb.nyctaxi_lookup_csv(  locationid bigint,   borough string,   zone string,   service_zone string,   latitude double,   longitude double)ROW FORMAT DELIMITED   FIELDS TERMINATED BY ',' STORED AS INPUTFORMAT   'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION  's3://MY-BUCKET/nyctaxidata/lookup/' TBLPROPERTIES (  'skip.header.line.count'='1')",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create Parquet data Table Query"
    },
    "Run Create Parquet data Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE  table if not exists nyctaxidb.yellowtaxi_data_parquet WITH (format='PARQUET',parquet_compression='SNAPPY',partitioned_by=array['pickup_year','pickup_month'],external_location = 's3://MY-BUCKET/nyctaxidata/optimized-data/') AS SELECT vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,payment_type,substr(\"tpep_pickup_datetime\",1,4) pickup_year, substr(\"tpep_pickup_datetime\",6,2) AS pickup_month FROM nyctaxidb.yellowtaxi_data_csv where substr(\"tpep_pickup_datetime\",1,4) = '2020' and substr(\"tpep_pickup_datetime\",6,2) = '01'",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create Parquet lookup Table Query"
    },
    "Run Create Parquet lookup Table Query": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "CREATE table if not exists nyctaxidb.nyctaxi_lookup_parquet WITH (format='PARQUET',parquet_compression='SNAPPY', external_location = 's3://MY-BUCKET/nyctaxidata/optimized-data-lookup/') AS SELECT locationid, borough, zone , service_zone , latitude ,longitude  FROM nyctaxidb.nyctaxi_lookup_csv",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "Next": "Run Create View"
    },
    "Run Create View": {
      "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
      "Parameters": {
        "QueryString": "create or replace view nyctaxidb.yellowtaxi_data_vw as select a.*,lkup.* from (select  datatab.pulocationid pickup_location ,pickup_month, pickup_year, sum(cast(datatab.total_amount AS decimal(10, 2))) AS sum_fare , sum(cast(datatab.trip_distance AS decimal(10, 2))) AS sum_trip_distance , count(*) AS countrec   FROM nyctaxidb.yellowtaxi_data_parquet datatab WHERE datatab.pulocationid is NOT null  GROUP BY  datatab.pulocationid, pickup_month, pickup_year) a , nyctaxidb.nyctaxi_lookup_parquet lkup where lkup.locationid = a.pickup_location",
        "WorkGroup": "primary",
        "ResultConfiguration": {
          "OutputLocation": "s3://MY-BUCKET/athena/"
        }
      },
      "Type": "Task",
      "End": true
    },
    "Check All Tables": {
      "Type": "Map",
      "InputPath": "$.ResultSet",
      "ItemsPath": "$.Rows",
      "MaxConcurrency": 0,
      "Iterator": {
        "StartAt": "CheckTable",
        "States": {
          "CheckTable": {
            "Type": "Choice",
            "Choices": [
              {
                "Variable": "$.Data[0].VarCharValue",
                "StringMatches": "*data_csv",
                "Next": "passstep"
              },
              {
                "Variable": "$.Data[0].VarCharValue",
                "StringMatches": "*data_parquet",
                "Next": "Insert New Parquet Data"
              }
            ],
            "Default": "passstep"
          },
          "Insert New Parquet Data": {
            "Resource": "arn:aws:states:::athena:startQueryExecution.sync",
            "Parameters": {
              "QueryString": "INSERT INTO nyctaxidb.yellowtaxi_data_parquet select vendorid,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,ratecodeid,store_and_fwd_flag,pulocationid,dolocationid,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,payment_type,substr(\"tpep_pickup_datetime\",1,4) pickup_year, substr(\"tpep_pickup_datetime\",6,2) AS pickup_month FROM nyctaxidb.yellowtaxi_data_csv where substr(\"tpep_pickup_datetime\",1,4) = '2020' and substr(\"tpep_pickup_datetime\",6,2) = '02'",
              "WorkGroup": "primary",
              "ResultConfiguration": {
                "OutputLocation": "s3://MY-BUCKET/athena/"
              }
            },
            "Type": "Task",
            "End": true
          },
          "passstep": {
            "Type": "Pass",
            "Result": "NA",
            "End": true
          }
        }
      },
      "End": true
    }
  }
}

The first time we run this pipeline, it follows the CTAS path and creates the aggregation view.

The second time we run it, it follows the INSERT INTO statement path to add new data into the existing tables.

When to use this pattern

You should use this pattern when the raw data is structured and the metadata can easily be added to the catalog.

Because Athena charges are calculated by the amount of data scanned, this pattern is best suitable for datasets that aren’t very large and need continuous processing.

The pattern is best suitable to convert raw data into columnar formats like Parquet or ORC, and aggregate a large number of small files into larger files or partition and bucket your datasets.

Conclusion

In this post, we showed how to use Step Functions to orchestrate an ETL pipeline in Athena using CTAS and INSERT INTO statements.

As next steps to enhance this pipeline, consider the following:

  • Create an ingestion pipeline that continuously puts data in the raw S3 bucket at regular intervals
  • Add an AWS Glue crawler step in the pipeline to automatically create the raw schema
  • Add extra steps to identify change data and merge this data with the target
  • Add error handling and notification mechanisms in the pipeline
  • Schedule the pipeline using Amazon EventBridge to run at regular intervals

About the Authors

Behram Irani, Sr Analytics Solutions Architect

Dipankar Kushari, Sr Analytics Solutions Architect

Rahul Sonawane, Principal Analytics Solutions Architect