Tag Archives: AWS Data Exchange

Query, visualize, and forecast TruFactor web session intelligence with AWS Data Exchange

Post Syndicated from Jay Park original https://aws.amazon.com/blogs/big-data/query-visualize-and-forecast-trufactor-web-session-intelligence-with-aws-data-exchange/

Given the infinite nature of data, finding the right data set to gain business insights can be a challenge. You can improve your business by having access to a central repository of various data sets to query, visualize, and forecast. With AWS Data Exchange, finding the right data set has become much simpler. As an example, you can use data sets on web session visitation and demographics to help you understand which demographic groups visit your website most frequently. You can then improve your business through machine learning (ML) models and visitation forecasts.

AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. After you subscribe to a data product within AWS Data Exchange, you can use the AWS Data Exchange API, AWS CLI, or the AWS Management Console to load data into Amazon S3 directly. You can then analyze the imported data with a wide variety of AWS services, ranging from analytics to machine learning.

This post showcases TruFactor Intelligence-as-a-Service data on AWS Data Exchange. TruFactor’s anonymization platform and proprietary AI ingests, filters, and transforms more than 85 billion high-quality raw signals daily from wireless carriers, OEMs, and mobile apps into a unified phygital consumer graph across physical and digital dimensions. TruFactor intelligence is application-ready for use within any AWS analytics or ML service to power your models and applications running on AWS, with no additional processing required. Common use cases include the following:

  • Consumer segmentation – Web intelligence on internet browsing behavior in the US provides a complete view of the consumer, including interests, opinions, values, digital behavior, and sentiment, to inform segmentation of your customers and those of your competitors.
  • Customer acquisition or churn campaigns – Internet browsing behavior can identify affinity properties for new prospects as well as switching to competitors’ websites.

This walkthrough uses TruFactor’s Daily Mobile Web Session Index and Daily Demographics by Mobile Web Sessions data sets, which are both available for free subscription through the AWS Data Exchange console. While there are commercial data sets available for purchase in AWS Data Exchange, this post uses trial data sets to showcase the breadth and depth of analytics possible with TruFactor’s intelligence.

This TruFactor intelligence is aggregated on over 3 billion records from telco carrier networks and mobile apps per day, originating from approximately 30 million consistent users, distilled into session-level information that provides a complete view of user digital interests. The accuracy, breadth of data provided, and the persistency of the panel deliver a unified view of consumers that can inform insights or power analytic models or applications on AWS.

These two data sets have applications across verticals such as retail, financial services, and advertising. Common use cases include creating detailed customer segmentation (for example, full DNA maps of consumers based on visits to specific web HTTP hosts), identifying affinity properties, and estimating demand for apps or services. This intelligence is also ideal for identifying trends and changes over time.

Solution overview

The following diagram illustrates the architecture of the solution.

The workflow is comprised of the following steps:

  1. Subscribe to a data set from AWS Data Exchange and export to Amazon S3
  2. Run an AWS Glue crawler to load product data
  3. Perform queries with Amazon Athena
  4. Visualize the queries and tables with Amazon QuickSight
  5. Run an ETL job with AWS Glue
  6. Create a time series forecast with Amazon Forecast
  7. Visualize the forecasted data with Amazon QuickSight

This post looks at the demographic distributions across various websites and how to use ML to forecast website visitation.

Walkthrough overview

The walkthrough includes the following steps:

  1. Subscribe to a TruFactor data set from the AWS Data Exchange console and export the data set to Amazon S3
  2. Use an AWS Glue crawler to load the product data into an AWS Glue Data Catalog
  3. Use Amazon Athena for SQL querying
  4. Visualize the query views and tables with Amazon QuickSight
  5. Use AWS Glue jobs to extract, transform, and load your data for forecasting with Amazon Forecast
  6. Use Amazon Forecast to create a time series forecast of the transformed data
  7. Visualize the forecasted web visitation data with Amazon QuickSight

You do not have to perform additional processing or manipulation of the TruFactor intelligence for this walkthrough.

The data sets

The TruFactor data sets this post uses are in Parquet format and snappy compression. The following section provides additional details and schema for each data set.

TruFactor Daily Mobile Web Session Index (US – Nationwide) — Trial

The TruFactor Daily Mobile Web Session Index (US – Nationwide) — Trial data set provides aggregate information per HTTP host as a view of the internet browsing behavior in the US. TruFactor generates the data from high-quality packet layer data sourced from mobile carriers that includes the mobile internet traffic originating from a user’s device. TruFactor derives the projected counts from observed counts that are filtered for exclusion and anonymized to make sure users cannot be re-identified. It extrapolates values from US Census data using a proprietary algorithm. For the avoidance of doubt, this data set does not include user-level data.

The following screenshot shows the schema for the mobile web session data set by HTTP host, session time, MB transferred, number of events, sessions, users, and dates.

TruFactor Daily Demographics by Mobile Web Session (US) — Trial

The TruFactor Daily Demographics by Mobile Web Session (US) — Trial data set includes aggregate demographics: a projected distribution of users per HTTP host as a view of the internet browsing behavior in the US. TruFactor generates the data from high-quality packet layer data sourced from mobile carriers that includes the mobile internet traffic originating from a user’s device. TruFactor derives the distribution from observed counts that are filtered for exclusion and anonymized to make sure users cannot be re-identified. It extrapolates values from US Census data using a proprietary algorithm. Demographics include gender, age range, ethnicity, and income range.

The following screenshot shows the partial schema for the demographics by web session data set. The full schema includes the following attributes: HTTP host, age ranges, genders, ethnicity, income ranges, and date.

Prerequisites

To complete this walkthrough successfully, you must have the following resources:

  • An AWS account.
  • Familiarity with AWS core services and concepts.
  • The ability to launch new resources in your account. Some resources may not be eligible for Free Tier usage and might incur costs.
  • Subscription to TruFactor’s Daily Mobile Web Session Index (US – Nationwide) – Trial and Daily Demographics by Mobile Web Session (US) – Trial data sets. For instructions on subscribing to a data set on AWS Data Exchange, see AWS Data Exchange – Find, Subscribe To, and Use Data Products.

Using AWS Data Exchange, Amazon S3, AWS Glue, Amazon Athena, and Amazon QuickSight

This section examines the key demographics of visitors to the top seven e-commerce websites. This information can help you understand which demographic groups are visiting your website most frequently and also help you target ads and cater to certain demographics groups. You use AWS Glue crawlers to crawl your data sets in Amazon S3, populate your AWS Glue Data Catalog, query the AWS Glue Data Catalog using Amazon Athena, and use Amazon QuickSight to visualize the queries.

Step 1: Exporting the data from AWS Data Exchange to Amazon S3

To export your TruFactor data set subscriptions into an Amazon S3 bucket, complete the following steps:

  1. Create an Amazon S3 bucket in your working account. For the purposes of our demo, we have named our S3 bucket trufactor-data-exchange-bucket.
  2. Create two folders within the S3 bucket: web_sess and demo_by_web_sess.

This post uses a trial data set with a sample of 14 days. A paid subscription to TruFactor’s Web Sessions data on AWS Data Exchange includes 6 months of historical data, which refreshes daily.

The following screenshot shows the two folders within the S3 bucket.You are now ready to export the data sets.

  1. On the AWS Data Exchange console, under Subscriptions, locate TruFactor Daily Mobile Web Sessions Index (US – Nationwide) – Trial.
  2. Under Revisions, choose the most recent Revision ID.
  3. Choose all assets except the manifest.json files.
  4. Choose Export to Amazon S3.
  5. In the window that opens, choose the S3 bucket and folder to export the product data into.
    • Export all the assets into the S3 bucket’s web_sess folder.
  6. Repeat the previous steps for the TruFactor Daily Demographics by Mobile Web Sessions (US) – Trial data set, with the following change:
    • Export the assets into the demo_by_web_sess folder in your S3 bucket.
  7. Check to make sure you successfully imported the TruFactor data sets in the Overview. The following screenshot shows that the data sets are partitioned into folders by date. Each folder contains Parquet files of web session data for each day.

Step 2: Populating your AWS Glue Data Catalog with the TruFactor data sets

Now that you have successfully exported the TruFactor data sets into an Amazon S3 bucket, you create and run an AWS Glue crawler to crawl your Amazon S3 bucket and populate the AWS Glue Data Catalog. Complete the following steps:

  1. On the AWS Glue console, under Data Catalog, choose Crawlers.
  2. Choose Add crawler.
  3. For Crawler name, enter a name; for example, trufactor-data-exchange-crawler.
  4. For Crawler source type, choose Data stores.
  5. Choose Next.
  6. For Choose a data store, choose S3.
  7. For Crawl data in, select Specified path in my account.
  8. For Include path, enter the path for the web_sess data set folder. The crawler points to the following path: s3://<trufactor-data-exchange-bucket>/web_sess.
  9. Choose Next.
  10. Select Yes to Add another data store.
  11. Choose Next.
  12. For Include path, enter the path for the demo_by_web_sess data set folder. The crawler points to the following path: s3://<trufactor-data-exchange-bucket>/demo_by_web_sess.
  13. Choose Next.
  14. In the Choose an IAM role section, select Create an IAM role. This is the role that the AWS Glue crawler and AWS Glue jobs use to access the Amazon S3 bucket and its content.
  15. For IAM role, enter the suffix demo-data-exchange.
  16. Choose Next.
  17. In the schedule section, leave the Frequency with the default Run on Demand.
  18. Choose Next.
  19. In the Output section, choose Add database.
  20. Enter a name for the database; for example, trufactor-db.
  21. Choose Next, then choose Finish.This database contains the tables that the crawler discovers and populates. With these data sets separated into different tables, you join and relationalize the data.
  1. In the Review all steps section, review the crawler settings and choose Finish.
  2. Under Data Catalog, choose Crawlers.
  3. Select the crawler you just created.
  4. Choose Run crawler.The AWS Glue crawler crawls the data sources and populates your AWS Glue Data Catalog. This process can take up to a few minutes. When the crawler is finished, you can see two tables added to your crawler details. See the following screenshot.You can now view your new tables.
  1. Under Databases, choose Tables.
  2. Choose your database.
  3. Choose View the tables. The table names correspond to the Amazon S3 folder directory you used to point your AWS Glue crawler. See the following screenshot.

Step 3: Querying the data using Amazon Athena

After you populate the AWS Glue Data Catalog with TruFactor’s Mobile Web Session and Demographics data, you can use Amazon Athena to run SQL queries and create views for visualization. Complete the following steps:

  1. On the Amazon Athena console, choose Query Editor.
  2. On the Database drop-down menu, choose the database you created.
  3. To preview one of the tables in Amazon Athena, choose Preview table.
    On the Results section, you should see 10 records from the web_sess table. See the following screenshot.In this next step, you run a query that creates a view of the Web Session Index and Demographics data across a group of e-commerce HTTP hosts. This is broken down by the percentage of users categorized by age and gender, number of users, MB transferred, and number of sessions ordered by date.
  4. Run the following SQL query in Amazon Athena:
    CREATE OR REPLACE VIEW e_commerce_web_sess_data AS 
    SELECT
      "date_parse"("a"."partition_0", '%Y%m%d') "date",
      "a"."http_host",
      "a"."users",
      "a"."mb_transferred",
      "a"."number_of_sessions",
      "b"."18_to_25",
      "b"."26_to_35",
      "b"."36_to_45",
      "b"."46_to_55",
      "b"."56_to_65",
      "b"."66_to_75",
      "b"."76_plus",
      "b"."male",
      "b"."female"
    FROM  
      ((
       SELECT
         "partition_0",
         "http_host",
         "users",
         "mb_transferred",
         "number_of_sessions"
       FROM
         "trufactor-db"."web_sess"
       WHERE ("http_host" IN ('www.amazon.com', 'www.walmart.com', 'www.ebay.com', 'www.aliexpress.com', 'www.etsy.com', 'www.rakuten.com', 'www.craigslist.com'))
    )  a
    LEFT JOIN (
       SELECT
         "http_host" "http_host_2",
         "partition_0" "partition_2",
         "age_ranges"."18_to_25",
         "age_ranges"."26_to_35",
         "age_ranges"."36_to_45",
         "age_ranges"."46_to_55",
         "age_ranges"."56_to_65",
         "age_ranges"."66_to_75",
         "age_ranges"."76_plus",
         "genders"."male",
         "genders"."female"
       FROM
         "trufactor-db"."demo_by_web_sess"
       WHERE ("http_host" IN ('www.amazon.com', 'www.walmart.com', 'www.ebay.com', 'www.aliexpress.com', 'www.etsy.com', 'www.rakuten.com', 'www.craigslist.com'))
    )  b ON (("a"."http_host" = "b"."http_host_2") AND ("a"."partition_0" = "b"."partition_2")))
    ORDER BY "date" ASC

  5. After you create the view, you can preview it by repeating the above steps for previewing a table. The following screenshot shows the results, which include the number of users, user percentages by age group and gender, and a list of e-commerce hosts listed by date.

Step 4: Visualizing with Amazon QuickSight

After you query your data sets in Amazon Athena, you can use Amazon QuickSight to visualize your results. You must first grant Amazon QuickSight access to the Amazon S3 bucket that holds the TruFactor data sets, which you can do through the Manage QuickSight setting on the Amazon QuickSight console. After you grant access to the Amazon S3 bucket, you visualize the tables and queries with Amazon QuickSight. Complete the following steps:

  1. In the Amazon QuickSight console, choose New Analysis.
  2. Choose New data set.
  3. Choose Athena as the data source.
  4. For Data source name, enter trufactor-data-exchange-source.
  5. From the drop-down menu, choose the database and view you created.
  6. Choose Directly query your data.
  7. Choose Visualize. Because TruFactor intelligence is application-ready, you can gain immediate insights by using Amazon Athena to query and Amazon QuickSight to visualize. This post includes visualizations of the data set for the first two weeks of October 2019. The following graph visualizes the number of users on different HTTP hosts.The following pie charts further filter the HTTP hosts by age range.The following bar chart offers another visualization of users by age range.You could add other fields such as income range, ethnicity, and gender.

Running AWS Glue Jobs and Amazon Forecast

This section discusses how to use AWS Glue jobs to query and export your data set for forecasting with Amazon Forecast. This walkthrough examines the amount of users’ visitation over 14 days across the top 50 HTTP hosts ranked by users’ visitation. From there, you forecast the users’ visitation for these HTTP hosts for the next three days.

Step 1: Creating and running an AWS Glue job

To create and run your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, under ETL, choose Jobs.
  2. Choose Add job.
  3. For Name, enter a name for the AWS Glue job; for example, demo-glue-job.
  4. For Type and Glue version, keep the default values.
  5. For This job runs, select A new script to be authored by you.
  6. In the Security configuration, script libraries, and job parameters (optional) section, set the Maximum capacity cluster size to 2. This reduces the cost of running the AWS Glue job. By default, the cluster size is set to 10 Data Processing Units (DPU).
  7. Choose Next.
  8. In the Connections section, keep the default values.
  9. Choose Save job and edit script.
  10. Enter the following code in the script section, and replace YOUR_BUCKET_NAME on line 42 with the name of your bucket.
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.dynamicframe import DynamicFrame
    from awsglue.job import Job
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    db_name = "trufactor-db"
    tbl_name = "web_sess"
    
    web_sess_dyf = glueContext.create_dynamic_frame.from_catalog(database = db_name, table_name = tbl_name, transformation_ctx = "web_sess_dyf")
    web_sess_df = web_sess_dyf.toDF()
    web_sess_df.createOrReplaceTempView("webSessionTable")
    web_sess_sql_df = spark.sql("""
    SELECT to_date(partition_0, 'yyyyMMdd') AS date,
             http_host,
             users
    FROM 
        (SELECT partition_0,
             http_host,
             users,
             row_number()
            OVER ( PARTITION By partition_0
        ORDER BY users DESC ) AS rn
        FROM webSessionTable )
    WHERE rn<=50
    ORDER BY date""")
    
    web_sess_sql_df.coalesce(1).write.format("csv").option("header","false").save("s3://YOUR_BUCKET_NAME/amazon_forecast_demo/dataset/sampleset")
    job.commit()

    This code queries the top 50 HTTP hosts, ranked by users’ visitation during the first half of October and returns the users, date, and HTTP hosts columns. The query results upload to your Amazon S3 bucket in CSV format (you need the files in CSV to use Amazon Forecast).

  11. Choose Save and close the AWS Glue job screen.Before you can run the AWS Glue job, you need to modify the IAM role associated with AWS Glue. Currently, the IAM role only has permission to get and put objects in the directories you specified earlier. You need to update the IAM policy to allow permission to get and put objects in all subdirectories of the Amazon S3 bucket.
  12. On the IAM console, choose the role you used for this walkthrough: AWSGlueServiceRole-demo-data-exchange.
  13. In the Summary section for the IAM role, on the Permissions tab, choose the IAM policy associated with the Managed policy.
  14. Choose Edit policy.
  15. Change the view from Visual editor to JSON.
  16. Within this JSON object, under Resource, add another resource into the list of values. The following code is the updated IAM policy:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:PutObject"
                ],
                "Resource": [
                    "arn:aws:s3:::trufactor-data-exchange-bucket/web_sess*",
                    "arn:aws:s3:::trufactor-data-exchange-bucket/demo_by_web_sess*",
                    "arn:aws:s3:::trufactor-data-exchange-bucket/*"
                ]
            }
        ]
    }

  17. Choose Review policy and Save changes.
  18. On the AWS Glue console, under ETL, choose Jobs. Select the job you created earlier.
  19. From the Action drop-down menu, choose Run job. On the History tab, you can see when the status changes to Succeeded. See the following screenshot.This job can take 15–20 minutes to complete.

Step 2: Creating a dataset group, training a predictor, and creating forecasts in Amazon Forecast

To create your dataset group, train a predictor, and create forecasts, complete the following steps:

  1. On the Amazon Forecast console, choose View dataset groups.
  2. Choose Create dataset group.
  3. For Dataset group name, enter a name; for example, users_visitation_sample_dataset_group.
  4. For Forecasting domain, choose Web traffic.
  5. Choose Next.
  6. On the Create target time series dataset page, for Dataset name, enter the name of your dataset; for example, users_visitation_sample_dataset.
  7. For Frequency of your data, choose 1 day.
  8. For Data schema, update the data schema JSON object with the following code:
    {
      "Attributes":[
        {
          "AttributeName": "timestamp",
          "AttributeType": "timestamp"
        },
        {
          "AttributeName": "item_id",
          "AttributeType": "string"
        },
        {
          "AttributeName": "value",
          "AttributeType": "float"
        }
      ]
    }

  9. Choose Next.
  10. On the Import target time series data page, for Dataset import name, enter your dataset name; for example, users_visitation_sample_dataset_import.
  11. For Timestamp format, enter yyyy-MM-dd.
  12. For IAM Role, create a new role and grant Amazon Forecast access to the S3 bucket that you are using for this demo.
  13. For Data Location, use the S3 path that you exported your CSV file to after the AWS Glue job: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/dataset/sampleset.
  14. Review the settings for import target time series data and choose Start import.The process of importing the data can take approximately 10 minutes. When the status changes to Active, you can begin training a predictor.
  1. On the Dashboard page, choose Start next to Predictor training.
  2. On the Train predictor page, for Predictor name, enter a name for the predictor; for example, users_visitation_sample_dataset_predictor.
  3. For Forecast horizon, choose 3.
  4. For Forecast frequency, choose day.
  5. For Algorithm selection, select Manual. If you use the other algorithm option, AutoML, you allow Amazon Forecast to choose the right algorithm based on a pre-defined objective function, which is not necessary for this walkthrough.
  6. For Algorithm, choose Deep_AR_Plus (you use deep learning to forecast users’ visitation across 50 HTTP hosts).
  7. Leave all other options with the default values.
  8. Review the settings and choose Train predictor. The predictor training process can take 20–30 minutes. When the training completes, the status changes to Active. To evaluate the predictor’s (ML model) accuracy, Amazon Forecast splits the input time series data into two data sets: training and test. This process tests a predictive model on historical data and is called backtesting. When it splits the input time series data, it maintains the data’s order, which is crucial for time series data. After training the dataset, Amazon Forecast calculates the root mean square error (RSME) and weighted quantile losses to determine how well the predictor performed. For more detailed information about backtesting and predictor metrics, see Evaluating Predictor Accuracy. When the predictor is finished training, you can create a forecast.
  9. On the Dashboard page, under Generate forecasts, choose Start.
  10. For Forecast name, enter a forecast name; for example, users_visitation_sample_forecast.
  11. For Predictor, choose your trained predictor.
  12. For Forecast types, you can enter any values between 0.01 and 0.99 and the mean. These are percentage probabilities of satisfying the original demand. This post enters .50, .90, .99, mean.
  13. Choose Create a forecast.The forecast creation process can take 15–20 minutes.
  14. When the forecast is complete, choose Forecasts.
    You should see a single forecast. See the following screenshot.
    You can now export the generated forecast to a new folder within your existing Amazon S3 bucket for visualization with Amazon QuickSight.
  1. Choose the newly generated forecast.
  2. Under Exports, choose Create forecast export.
  3. For Export name, enter a name for the export; for example, users_visitation_sample_forecast_export.
  4. For Generated forecast, choose users_visitation_sample_forecast.
  5. For IAM Role, choose the role you created earlier.
  6. For S3 forecast export location, enter the S3 path to store the forecasts: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/forecasts/sampleset.
  7. Choose Create forecast export.The exporting process can take up to 5 minutes. Alternatively, you can visualize the user visitation forecasts for the 50 HTTP hosts directly through the Amazon Forecast console or Query API.

Step 3: Querying a view using Amazon Athena and downloading the forecast file

Before you visualize users’ visitation forecast data, create a view in Amazon Athena for the top 50 HTTP hosts ranked by users’ visitation over 14 days. Complete the following steps:

  1. Run the following query in Amazon Athena:
    CREATE OR REPLACE VIEW "top_50_users" AS
    SELECT date_format(date_parse(partition_0,
             '%Y%m%d'),'%Y-%m-%d') AS "date", http_host, users
    FROM 
        (SELECT partition_0,
             http_host,
             users,
             row_number()
            OVER (PARTITION By partition_0
        ORDER BY  users DESC ) AS rn
        FROM "trufactor-db"."web_sess")
    WHERE rn<=50
    ORDER BY date

    The code queries the top 50 HTTP hosts ranked by users’ visitation sorted by date.

  2. In the Amazon S3 console, navigate to the S3 bucket and directory holding the files: s3://<trufactor-data-exchange-bucket>/amazon_forecast_demo/forecasts/sampleset. The following screenshot shows three different files inside the folder.
  1. Download the CSV file.

Step 4: Visualizing in Amazon QuickSight

To visualize the data in Amazon QuickSight, complete the following steps:

  1. On the Amazon QuickSight console, choose Manage data.
  2. Choose New data set.
  3. Choose Upload a file.
  4. Upload the CSV file that you downloaded.
  5. On the Confirm file upload settings page, choose Next.
  6. Choose Visualize.
  7. Return to the Amazon QuickSight console and choose Manage data.
  8. Choose New data set for the top 50 HTTP hosts view you queried earlier.
  9. On the Create a Data set page, find the data source you created earlier: trufactor-data-exchange-source.
  10. From the drop-down list, choose the database and view you created.
  11. Choose Directly query your data.
  12. Choose Visualize.
  13. On the new Amazon QuickSight analysis page, choose the pencil icon next to Data set.
  14. Choose Add data set.
  15. Choose the CSV file you uploaded.

You now have a single Amazon QuickSight analysis with multiple data sets to visualize.

The following graphs highlight the historical data for the users’ visitation across 50 HTTP hosts for the first two weeks of October and the mean forecast for users’ visitation for the next three days.

The following graphs highlight the historical data and forecasted P50, P90, and P99 quantile values for www.google.com.

Amazon Forecast makes it easier to get started with machine learning without having to create your own ML models from scratch. You can use this information to anticipate the web traffic for the upcoming week, which can aid in scaling your resources and applications accordingly.

Cleaning up

To avoid incurring future charges, delete the following resources that you created in this walkthrough:

  • The Amazon S3 bucket trufactor-data-exchange-bucket
  • The AWS Glue crawler trufactor-data-exchange-crawler
  • The AWS Glue job demo-glue-job
  • The AWS IAM role AWSGlueServiceRole-demo-data-exchange
  • The AWS Glue database trufactor-db
  • The Amazon QuickSight demo data sets and analysis
  • The following Amazon Forecast resources (in this order) for users_visitation_sample_dataset_group via the console:
    • Existing forecasts under Forecasts
    • Existing predictors under Predictors
    • Existing datasets under Datasets

Conclusion

This walkthrough detailed how to import a data set to Amazon S3 from AWS Data Exchange, use AWS Glue to run crawlers and an ETL job on the data, run SQL queries with Amazon Athena, create a time series forecast of the queried data with Amazon Forecast, and visualize the queried and forecasted data with Amazon QuickSight.

This post used TruFactor Intelligence-as-a-Service, one of the AWS Data Exchange launch partners, to power this walkthrough. TruFactor intelligence on AWS Data Exchange highlighted the ease of loading directly into Amazon S3 and layering advanced AWS services.

For more information about TruFactor and the AWS Data Exchange, see TruFactor on AWS Data Exchange on the TruFactor website. You can subscribe to TruFactor Intelligence directly on AWS Data Exchange or engage with TruFactor directly to identify the right offering from the larger product portfolio of anonymized consumer intelligence.


About the Authors

Jay Park is a solutions architect at AWS.

 

 

 

 

Ariana Rahgozar is a solutions architect at AWS.

 

 

 

 

A public data lake for analysis of COVID-19 data

Post Syndicated from AWS Data Lake Team original https://aws.amazon.com/blogs/big-data/a-public-data-lake-for-analysis-of-covid-19-data/

As the COVID-19 pandemic continues to threaten and take lives around the world, we must work together across organizations and scientific disciplines to fight this disease. Innumerable healthcare workers, medical researchers, scientists, and public health officials are already on the front lines caring for patients, searching for therapies, educating the public, and helping to set policy. At AWS, we believe that one way we can help is to provide these experts with the data and tools needed to better understand, track, plan for, and eventually contain and neutralize the virus that causes COVID-19.

Today, we are making a public AWS COVID-19 data lake available – a centralized repository of up-to-date and curated datasets on or related to the spread and characteristics of the novel corona virus (SARS-CoV-2) and its associated illness, COVID-19. Globally, there are several efforts underway to gather this data, and we are working with partners to make this crucial data freely available and keep it up-to-date. Hosted on the AWS cloud, we have seeded our curated data lake with COVID-19 case tracking data from Johns Hopkins and The New York Times, hospital bed availability from Definitive Healthcare, and over 45,000 research articles about COVID-19 and related coronaviruses from the Allen Institute for AI. We will regularly add to this data lake as other reliable sources make their data publicly available.

The breakthroughs that can win the battle against this disease arrive faster when it’s easy for everyone to access and experiment with this vital information. The AWS COVID-19 data lake allows experimenters to quickly run analyses on the data in place without wasting time extracting and wrangling data from all the available data sources. They can use AWS or third-party tools to perform trend analysis, do keyword search, perform question/answer analysis, build and run machine learning models, or run custom analyses to meet their specific needs. Since every stakeholder in this battle brings their own perspective, users can choose to work with the public data lake, combine it with their own data, or subscribe to the source datasets directly through AWS Data Exchange.

We imagine local health authorities could build dashboards to track infections and collaborate to efficiently deploy vital resources like hospital beds and ventilators. Or epidemiologists could complement their own models and datasets to generate better forecasts of hotspots and trends.

For example, at Chan Zuckerberg Biohub, a nonprofit where leaders in science and technology collaborate to cure, prevent, or manage disease, scientists are using the AWS COVID-19 data lake for new epidemiological insights. “Our team of researchers is now analyzing trends in disease spread, its geography, and time evolution by leveraging datasets from the AWS COVID-19 data lake, combined with our own data, in order to better predict COVID epidemiology,” said Jim Karkanias, Vice President of Data Science and Information Technology at Chan Zuckerberg Biohub.

This post walks you through examples of how to use the AWS COVID-19 data lake for analysis. This data lake is comprised of data in a publicly readable Amazon S3 bucket (s3://covid19-lake). The post shows how to set up the definitions for that data in an AWS Glue Data Catalog to expose it to analytics engines. You can then query the AWS COVID-19 data lake with Amazon Athena, a serverless SQL query engine.

Prerequisites

This post assumes you have the following:

  • Access to an AWS account
  • Permissions to create an AWS CloudFormation stack
  • Permissions to create AWS Glue resources (catalog databases and tables)

Configuring access to the data using a CloudFormation template

To make the data from the AWS COVID-19 data lake available in the Data Catalog in your AWS account, create a CloudFormation stack using the following template. If you are signed in to your AWS account, the following link fills out most of the stack creation form for you. All you need to do is choose Create stack. For instructions on creating a CloudFormation stack, see Get Started in the Cloud Formation documentation.

This template creates a covid-19 database in your Data Catalog and tables that point to the public AWS COVID-19 data lake. You do not need to host the data in your account, and you can rely on AWS to refresh the data as datasets are updated through AWS Data Exchange.

Exploring the data through the Data Catalog in your AWS account

When the CloudFormation stack shows a status of CREATE_COMPLETE, access the Glue Data Catalog to see the tables that the template created. You should see the following tables:

  • Global Coronavirus (COVID-19) Data – Tracks confirmed COVID-19 cases in provinces, states, and countries across the world with a breakdown to the county level in the US.

 

Table NameDescriptionSourceProvider
enigma_jhuConfirmed COVID-19 casesJohns HopkinsEnigma

 

 

Table NameDescriptionSourceProvider
nytimes_statesData on COVID-19 cases at US state levelNY TimesRearc
nytimes_countiesData on COVID-19 cases at US county level

 

 

Table NameDescriptionSourceProvider
covid_testing_states_dailyUSA total test daily trend by stateCOVID Tracking ProjectRearc
covid_testing_us_dailyUSA total test daily trend
covid_testing_us_totalUSA total tests

 

 

Table NameDescriptionSourceProvider
hospital_bedsHospital beds and their utilization in the USDefinitive HealthcareRearc

 

 

Table NameDescriptionSource/Provider
alleninstitute_metadataMetadata on papers pulled from the CORD-19 dataset. The sha column indicates the paper ID, which is the file name of the paper in the data lake.Allen Institute for AI
alleninstitute_comprehend_medicalResults from Amazon Comprehend Medical run against the CORD-19 dataset.

 

  • Lookup tables to support visualizations.

 

Table NameDescription
country_codesLookup table for country codes
county_populationsLookup table for the population for each county based on recent census data
us_state_abbreviationsLookup table for US state abbreviations

In addition, you can see descriptions of the columns in these tables. For example, the following screenshot shows the metadata of the table containing COVID-19 cases from Johns Hopkins.

Querying data via Amazon Athena

This section demonstrates how to query these tables using Athena. Athena is a serverless interactive query service that makes it easy to analyze the data in the AWS COVID19 data lake. Athena supports SQL, a common language that data analysts use for analyzing structured data. To query the data, complete the following steps:

  1. Sign in to the Athena console.

If this is the first time you are using Athena, you must specify a query result location on Amazon S3.

  1. From the drop-down menu, choose the covid-19 database.
  2. Enter your query.

The following query returns the growth of confirmed cases for the past 7 days joined side-by-side with hospital bed availability, broken down by US county:

SELECT 
  cases.fips, 
  admin2 as county, 
  province_state, 
  confirmed,
  growth_count, 
  sum(num_licensed_beds) as num_licensed_beds, 
  sum(num_staffed_beds) as num_staffed_beds, 
  sum(num_icu_beds) as num_icu_beds
FROM 
  "covid-19"."hospital_beds" beds, 
  ( SELECT 
      fips, 
      admin2, 
      province_state, 
      confirmed, 
      last_value(confirmed) over (partition by fips order by last_update) - first_value(confirmed) over (partition by fips order by last_update) as growth_count,
      first_value(last_update) over (partition by fips order by last_update desc) as most_recent,
      last_update
    FROM  
      "covid-19"."enigma_jhu" 
    WHERE 
      from_iso8601_timestamp(last_update) > now() - interval '7' day AND country_region = 'US') cases
WHERE 
  beds.fips = cases.fips AND last_update = most_recent
GROUP BY cases.fips, confirmed, growth_count, admin2, province_state
ORDER BY growth_count desc

The following screenshot shows the results of this query.

Athena also allows you to run these queries through REST APIs, for example, for building your own visualizations. Moreover, Athena is just one of the many engines that you can use on the data lake. For example, you can use Amazon Redshift Spectrum to join lake data with other datasets in your Redshift data warehouse, or use Amazon QuickSight to visualize your datasets.

We have also created a public Amazon QuickSight dashboard from the COVID-19 case tracking data, testing data, and hospital bed data. You can track daily updates with this dashboard. You can also drill-down to see breakdowns by country, province, and county without having to write a line of SQL. The following is a recent screenshot of the dashboard.

CORD-19 research articles

The CORD-19 dataset is a collection of metadata and full-text of research articles about COVID-19, SARS-CoV-2, and related coronaviruses. You can index this data with Amazon Kendra for question/answer exploration, or enrich the data with Amazon Comprehend Medical. We have already done the latter and put it in the table called alleninstitute_comprehend_medical.

The alleninsitute_metadata table provides detailed fields for each paper, such as the title, authors, journal, and URL. The alleninstitute_comprehend_medical table contains key medical concepts such as medical condition, medication, dosage, strength, and frequency. With this metadata, you can quickly query over concepts, analyze or aggregate over authors and journals, and locate papers.

Aggregating over journals

Using IL-6 inhibitors is a possible therapy for COVID-19, and clinical trials are underway. To demonstrate how to use these tables, this post presents a use case in which you want to understand which journals discuss IL-6 the most by counting the papers they published. You can do this by running the following query:

SELECT m.journal,
       count(distinct(cm.paper_id)) as paper_count
FROM "covid-19".alleninstitute_metadata m
JOIN "covid-19".alleninstitute_comprehend_medical cm
    ON (contains(split(m.sha, '; '), cm.paper_id))
WHERE contains(generic_name, 'IL-6')
GROUP BY  m.journal
ORDER BY paper_count desc

The following screenshot shows an example of the results. The data provider updates this dataset over time, so your results may look different (here, we notice that the second highest count has no journal information).

Drilling down into papers

To see the URLs and the titles of the papers in one of these journals, you simply query both these tables again. For example, to drill into IL-6 related papers in the Crit Care journal, enter the following query:

SELECT distinct m.url, m.title
FROM "covid-19".alleninstitute_metadata m
JOIN "covid-19".alleninstitute_comprehend_medical cm
    ON (contains(split(m.sha, '; '), cm.paper_id))
WHERE contains(generic_name, 'IL-6')
      AND m.journal = 'Crit Care'

The following screenshot shows an example of the results.

These examples are a few of the innumerable analyses you can run on the public data lake. You incur no additional cost for accessing the AWS COVID-19 data lake beyond the standard charges for the AWS services that you use. For example, if you use Athena, you will incur the costs for running queries and the data storage in the S3 query result location, but incur no costs for accessing the data lake. In addition, if you want this data in raw form, you can subscribe to, download, and stay up-to-date through AWS Data Exchange. We encourage you to try using the public AWS COVID-19 data lake yourself.

Conclusion

Combining our efforts across organizations and scientific disciplines can help us win the fight against the COVID-19 pandemic. With the AWS COVID-19 data lake, anyone can experiment with and analyze curated data related to the disease, as well as share their own data and results. We believe that through an open and collaborative effort that combines data, technology, and science, we can inspire insights and foster breakthroughs necessary to contain, curtail, and ultimately cure COVID-19.

For daily updates on how AWS is addressing the crisis, see Amazon’s COVID-19 blog.

 


About the Authors

The AWS Data Lake Team members are Roy Ben-Alta, Jason Berkowitz, Chris Casey, Patrick Combes, Lucy Friedmann, Fred Lee, Megan Maxwell, Rourke McNamara, Herain Oberoi, Stephen Orban, Brian Ross, Nikki Rouda, Noah Schwartz, Noritaka Sekiyama, Mehul A. Shah, Ben Snively, and Ying Wang.

Collect and distribute high-resolution crypto market data with ECS, S3, Athena, Lambda, and AWS Data Exchange

Post Syndicated from Jared Katz original https://aws.amazon.com/blogs/big-data/collect-and-distribute-high-resolution-crypto-market-data-with-ecs-s3-athena-lambda-and-aws-data-exchange/

This is a guest post by Floating Point Group. In their own words, “Floating Point Group is on a mission to bring institutional-grade trading services to the world of cryptocurrency.”

The need and demand for financial infrastructure designed specifically for trading digital assets may not be obvious. There’s a rather pervasive narrative that these coins and tokens are effectively natively digital counterparts to traditional assets such as currencies, commodities, equities, and fixed income. This narrative often manifests in the form of pithy one-liners recycled by pundits attempting to communicate the value proposition of various projects in the space (such as, “Bitcoin is just a currency with an algorithmically controlled, tamper-proof monetary policy,” or, “Ether is just a commodity like gasoline that you can use to pay for computational work on a global computer.”). Unsurprisingly, we at FPG often hear the question, “What’s so special about cryptocurrencies that they warrant dedicated financial services? Why do we need solutions for problems that have already been solved?”

The truth is that these assets and the widespread public interest surrounding them are entirely unprecedented. The decentralized ledger technology that serves as an immutable record of network transactions, the clever use of proof-of-work algorithms to economically incentivize rational actors to help uphold the security of the network (the proof-of-work concept dates back at least as far as 1993, but it was not until bitcoin that the technology showed potential for widespread adoption), the irreversible nature of transactions that poses unique legal challenges in cases such as human error or extortion, the precariousness of self-custody (third-party custody solutions don’t exactly have track records that inspire trust), the regulatory uncertainties that come with the difficulty of both classifying these assets as well as arbitrating their exchange which must ultimately be reconciled by entities like the IRS, SEC, and CFTC—it is all very new, and very weird. With 24-hour market volume regularly exceeding $100 billion, we decided to direct our focus towards problems related specifically to trading these assets. Granted, crypto trading has undoubtedly matured since the days of bartering for bitcoin in web forums and witnessing 10% price spreads between international exchanges. But there is still a long path ahead.

One major pain point we are aiming to address for institutional traders involves liquidity (or, more precisely, the lack thereof). Simply put, the buying and selling of cryptocurrencies occurs across many different trading venues (exchanges), and liquidity (the offers to buy or sell a certain quantity of an asset at a certain price) continues to become more fragmented as new exchanges emerge. So say you’re trying to buy 100 bitcoins. You must buy from people who are willing to sell. As you take the best (cheapest) offers, you’re left with increasingly expensive offers. By the time you fill your order (in this example, buy all 100 bitcoins), you may have paid a much higher average price than, say, the price you paid for the first bitcoin of your order. This phenomenon is referred to as slippage. One easy way to minimize slippage is by expanding your search for offers. So rather than looking at the offers on just one exchange, look at the offers across hundreds of exchanges. This process, traditionally referred to as smart order routing (SOR), is one of the core services we provide. Our SOR service allows traders to easily submit orders that our system can match against the best offers available across multiple trading venues by actively monitoring liquidity across dozens of exchanges.

Fanning out large orders in search of the best prices is a rather intuitive and widely applicable concept—roughly 75% of equities are purchased and sold via SOR. But the value of such a service for crypto markets is particularly salient: a perpetual cycle of new exchanges surging in popularity while incumbents falter has resulted in a seemingly incessant fragmentation of liquidity across trading venues—yet traders tend to assume an exchange-agnostic mindset, concerned exclusively with finding the best price for a given quantity of an asset.

Access to both real-time and historical market data is essential to the functionality of our SOR service. The highest resolution data we could hope to obtain for a given market would include every trade and every change applied to the order book, effectively allowing us to recreate the state of a market at any given point in time. The updates provided through the WebSocket streams are not sufficient for reconstructing order books. We also need to periodically fetch snapshots of the order books and store those, which we can do using an exchange’s REST API. We can fetch a snapshot and apply the corresponding updates from the streams to “replay” the order book.

Fortunately, this data is freely available, because many exchanges offer real-time feeds of market data via WebSocket APIs. We found several third-party vendors selling subscriptions to these data sets, typically in the form of CSV dumps delivered at a weekly or monthly cadence. This presented the question of build vs. buy. Given that we felt capable of building a robust and reliable system for ingesting real-time market data in a relatively short amount of time and at a fraction of the cost of purchasing the data from a vendor, we were already leaning in favor of building. Further investigation made buying look like an increasingly unattractive option. Disclaimers that multiple vendors issued about their inability to guarantee data quality and consistency did not inspire confidence. Inspecting sample data sets revealed that some essential fields provided in the original data streams were missing—fields necessary for achieving our goal of recreating the state of a market at an arbitrary point in time. We also recognized that a weekly or monthly delivery schedule would restrict our ability to explore relatively recent market data.

This post provides a high-level overview of how we ingest and store real-time market data and how we use the AWS Data Exchange API to organize and publish our data sets programmatically. Our system’s functionality extends well beyond data ingestion, normalization, and persistence; we run dedicated services for data validation, caching the most recent trade and order book for every market, computing and storing derivative metrics, and other services that help safeguard data accuracy and minimize the latency of our trading systems.

Data ingestion

The WebSocket streams we connect to for data consumption are often the same APIs responsible for providing real-time updates to an exchange’s trading dashboard.

WebSocket connections transmit data as discrete messages. We can inspect the content of individual messages as they stream into the browser. For example, the following screenshot shows a batch of order book updates.

The updates are expressed as arrays of bids and asks that were either added to the book or removed from it. Client-side code processes each update, resulting in a real-time rendering of the market’s order book. In practice, our data ingestion service (Ingester) does not read a single stream, but rather thousands of different streams, covering various data feeds for all markets across multiple exchanges. All the connections required for such broad coverage and the resulting flood of incoming data raise some obvious concerns about data loss. We’ve taken several measures to mitigate such concerns, including a redundant system design that allows us to spin up an arbitrary number of instances of the Ingester service. Like most of our microservices, Ingester is a Dockerized service run on Amazon ECS and deployed via Terraform.

All these instances consume the same data feeds as each other while a downstream mechanism handles deduplication (this is covered in more detail later in this post). We also set up Amazon CloudWatch alerts to notify us when we detect non-contiguous messages, indicating a gap in the incoming data. The alerts don’t directly mitigate data loss, but they do serve the important function of prompting an investigation.

Ingester builds up separate buffers of incoming messages, split out by data-type/exchange/market. Then, after a fixed time interval, each buffer is flushed into Amazon S3 as a gzipped JSON file. The buffer-flush cycle repeats.

The following screenshot shows a portion of the file content.

This code snippet is a single, pretty-printed JSON record from the file in the screenshot above.

{
   "event_type":"trade",
   "timestamp":1571980320422,
   "ticker_pair":"BTCUSDT",
   "trade_id":194230159,
   "price":"7405.69000000",
   "quantity":"3.20285300",
   "buyer_order_id":730178987,
   "seller_order_id":730178953,
   "trade_timestamp":1571980320417,
   "buyer_market_maker":false,
   "M":true
}

Ingester handles additional functionality, such as applying pre-defined mappings of venue-specific field names to our internal field names. Data normalization is one of many processes necessary to enable our systems to build a holistic understanding of market dynamics.

As with most distributed system designs, our services are written with horizontal scalability as a first-order priority. We took the same approach in designing our data ingestion service, but it has some features that make it a bit different than the archetypical horizontally scalable microservice. The most common motivations for adjusting the number of instances of a given service are load-balancing and throttling throughput. Either your system is experiencing backpressure and a consumer service scales to alleviate that pressure, or the consumer is over-provisioned and you scale down the number of instances for the sake of parsimony. For our data ingestion service, however, our motivation for running multiple instances is to minimize data loss via redundancy. The CPU usage for each instance is independent of instance count, because each instance does identical work.

For example, rather than helping alleviate backpressure by pulling messages from a single queue, each instance of our data ingestion service connects to the same WebSocket streams and performs the same amount of work. Another somewhat unusual and confounding aspect of horizontally scaling our data ingestion service is related to state: we batch records in memory and flush the records to S3 every minute (based on the incoming message’s timestamp, not the system timestamp, because those would be inconsistent). Redundancy is our primary measure for minimizing data loss, but we also need each instance to write the files to S3 in such a way that we don’t end up with duplicate records. Our first thought was that we’d need a mechanism for coordinating activity across the instances, such as maintaining a cache that would allow us to check if a record had already been persisted. But we realized that we could perform this deduplication without any coordination between instances at all. Most of the message streams we consume publish messages with sequence IDs. We can combine the sequence IDs with the incoming message timestamp to achieve our deduplication mechanism: we can deterministically generate the same exact file names containing the exact same data by writing our service code to check that the message added to the batch has the appropriate sequence ID relative to the previous message in the batch and using the timestamp on the incoming message to determine the exact start and end of each batch (we typically get a UNIX timestamp and check when we’ve rolled over to the next clock minute). This allows us to simply rely on a key collision in S3 for deduplication.

AWS suggests a similar solution for a slightly different problem, relating to Amazon Kinesis Data Streams. For more information, see Handling Duplicate Records.

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

After we store the data, we can perform simple analytics queries on the billions of records we’ve stored in S3 using Amazon Athena, a query service that requires minimal configuration and zero infrastructure overhead. Athena has a concept of partitions (inherited from one of its underlying services, Apache Hive). Partitions are mappings between virtual columns (in our case: pair, year, month, and day) and the S3 directories in which the corresponding data is stored.

S3’s file system is not actually hierarchical. Files are prepended with long key prefixes that are rendered as directories in the AWS console when browsing a bucket’s contents. This has some non-trivial performance consequences when querying or filtering on large data sets.

The following screenshot illustrates a typical directory path.

By pointing Athena directly to a particular subset of data, a well-defined partitioning scheme can drastically reduce query run times and costs. Though the ability the perform ad hoc business analytics queries is primarily a convenience, taking time to choose a sane multi-level partitioning scheme for Athena based on some of our most common access patterns seemed worthwhile. A poorly designed partition structure can result in Athena unnecessarily scanning huge swaths of data and ultimately render the service unusable.

Data publication

Our pipeline for transforming thousands of small gzipped JSON files into clean CSVs and loading them into AWS Data Exchange involves three distinct jobs, each expressed as an AWS Lambda function.

Job 1

Job 1 is initiated shortly after midnight UTC by a cron-scheduled CloudWatch event. As mentioned previously, our data ingestion service’s batching mechanism flushes each batch to S3 at a regular time interval. A timestamp on the incoming message (applied server-side) determines the rollover from one interval to the next, as opposed to the ingestion service’s system timestamp, so in the rare case that a non-trivial amount of time elapses between the consumption of the final message of batch n and the first message of batch n+1, we kick off the first Lambda function 20 minutes after midnight UTC to minimize the likelihood of omitting data pending write.

Job 1 formats values for the date and data source into an Athena query template and outputs the query results as a CSV to a specified prefix path in S3. (Every Athena query produces a .metadata file and a CSV file of the query results, though DDL statements do not output a CSV.) This PUT request to S3 triggers an S3 event notification.

We run a full replica data ingestion system as an additional layer of redundancy. Using the coalesce conditional expression, the Athena query in Job 1 merges data from our primary system with the corresponding data from our replica system, and fills in any gaps while deduplicating redundant records.

We experimented fairly extensively with AWS Glue and PySpark for the ETL-related work performed in Job 1. When we realized that we could merge all the small source files into one, join the primary and replica data sets, and sort the results with a single Athena query, we decided to stick with this seemingly simpler and more elegant approach.

The following code shows one of our Athena query templates.

Job 2

Job 2 is triggered by the S3 event notification from Job 1. Job 2 simply copies the query results CSV file to a different key within the same S3 bucket.

The motivation for this step is twofold. First, we cannot dictate the name of an Athena query results CSV file; it is automatically set to the Athena query ID. Second, when adding an S3 object as an asset to an AWS Data Exchange revision, the asset’s name is automatically set to the S3 object’s key. So to dictate how the CSV file name appears in AWS Data Exchange, we must first rename it, which we accomplish by copying it to a specified S3 key.

Job 3

Job 3 handles all work related to AWS Data Exchange and AWS Marketplace Catalog via their respective APIs. We use boto3, AWS’s Python SDK, to interface with these APIs. The AWS Marketplace Catalog API is necessary for adding data set revisions to products that have already been published. For more information, see Tutorial: Adding New Data Set Revisions to a Published Data Product.

Our code explicitly defines mappings with the following structure:

data source / DataSet / Product

The following code shows how we configure relationships between data sources, data sets, and products.

Our data sources are typically represented by a trading venue and data type combination (such as Binance trades or CoinbasePro order books). Each new file for a given data source is delivered as a single asset within a single new revision for a particular data set.

An S3 trigger kicks off the Lambda function. The trigger is scoped to a specified prefix that maps to a single data set. The function alias feature of AWS Lambda allows us to define the unique S3 triggers for each data set while reusing the same underlying Lambda function. Job 3 carries out the following steps (note that steps 1 through 5 refer to the AWS Data Exchange API while steps 6 and 7 refer to the AWS Marketplace Catalog API):

  1. Submits a request to create a new revision for the corresponding data set via CreateRevision.
  2. Adds the file that was responsible for triggering the Lambda function to the newly created revision via CreateJob using the IMPORT_ASSETS_FROM_S3 job type. To submit this job, we need to supply a few values: the S3 bucket and key values for the file are pulled from the Lambda event message, while the RevisionID argument comes from the response to the CreateRevision call in the previous step.
  3. Kicks off the job with StartJob, sourcing the JobID argument from the response to the CreateJob call in the previous step.
  4. Polls the job’s status via GetJob (using the job ID from the response to the StartJob call in the previous step) to check that our file (the asset) was successfully added to the revision.
  5. Finalizes the revision via UpdateRevision.
  6. Requests a description of the marketplace entity using DescribeEntity, passing in the product ID stored in our hardcoded mappings as the EntityID
  7. Kicks off the entity ChangeSet via StartChangeSet, passing in the entity ID from the previous step, the entity ID from the DescribeEntity response in the previous step as EntityID, the revision ARN parsed from the response to our earlier call to CreateRevision as RevisionArn, and the data set ARN as DataSetArn, which we fetch at the start of the code’s runtime using AWS Data Exchange API’s GetDataSet.

Here’s a thin wrapper class we wrote to carry out the steps detailed above:

from time import sleep
import logging
import json

import boto3

from config import (
    DATA_EXCHANGE_REGION,
    MARKETPLACE_CATALOG_REGION,
    LambdaS3TriggerMappings
)

logger = logging.getLogger()


class CustomDataExchangeClient:
    def __init__(self):
        self._de_client = boto3.client('dataexchange', region_name=DATA_EXCHANGE_REGION)
        self._mc_client = boto3.client('marketplace-catalog', region_name=MARKETPLACE_CATALOG_REGION)
    
    def _get_s3_data_source(self, bucket, prefix):
        return LambdaS3TriggerMappings[(bucket, prefix)]

    # Job State can be one of: WAITING | IN_PROGRESS | ERROR | COMPLETED | CANCELLED | TIMED_OUT
    def _wait_for_de_job_completion(self, job_id):
        while True:
            get_job_resp = self._de_client.get_job(JobId=job_id)
            if get_job_resp['State'] == 'COMPLETED':
                logger.info(f"Job '{job_id}' succeeded:\n\t{get_job_resp}")
                break
            elif get_job_resp['State'] in ('ERROR', 'CANCELLED'):
                raise Exception(f"Job '{job_id}' failed:\n\t{get_job_resp}")
            else:
                sleep(5)
                logger.info(f"Still waiting on job {job_id}...")
        return get_job_resp

    # ChangeSet Status can be one of: PREPARING | APPLYING | SUCCEEDED | CANCELLED | FAILED
    def _wait_for_mc_change_set_completion(self, change_set_id):
        while True:
            describe_change_set_resp = self._mc_client.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=change_set_id
                )
            if describe_change_set_resp['Status'] == 'SUCCEEDED':
                logger.info(
                    f"ChangeSet '{change_set_id}' succeeded:\n\t{describe_change_set_resp}"
                )
                break
            elif describe_change_set_resp['Status'] in ('FAILED', 'CANCELLED'):
                raise Exception(
                    f"ChangeSet '{change_set_id}' failed:\n\t{describe_change_set_resp}"
                )
            else:
                sleep(1)
                logger.info(f"Still waiting on ChangeSet {change_set_id}...")
        return describe_change_set_resp

    def process_s3_event(self, s3_event):
        source_bucket = s3_event['Records'][0]['s3']['bucket']['name']
        source_key = s3_event['Records'][0]['s3']['object']['key']
        source_prefix = '/'.join(source_key.split('/')[0:-1])
        s3_data_source = self._get_s3_data_source(source_bucket, source_prefix)
        obj_name = source_key.split('/')[-1]
        
        s3_data_source.validate_object_name(obj_name)
        
        for data_set in s3_data_source.lambda_s3_trigger_target_data_sets:
            # Create revision
            create_revision_resp = self._de_client.create_revision(
                DataSetId=data_set.id,
                Comment=obj_name
            )
            logger.debug(create_revision_resp)
            revision_id = create_revision_resp['Id']
            revision_arn = create_revision_resp['Arn']

            # Create job
            create_job_resp = self._de_client.create_job(
                Type='IMPORT_ASSETS_FROM_S3',
                Details={
                    'ImportAssetsFromS3': {
                      'AssetSources': [
                          {
                              'Bucket': source_bucket,
                              'Key': source_key
                          },
                      ],
                      'DataSetId': data_set.id,
                      'RevisionId': revision_id
                    }
                }
            )
            logger.debug(create_job_resp)

            # Start job
            job_id = create_job_resp['Id']
            start_job_resp = self._de_client.start_job(JobId=job_id)
            logger.debug(start_job_resp)

            # Wait for Data Exchange job completion
            get_job_resp = self._wait_for_de_job_completion(job_id)
            logger.debug(get_job_resp)

            # Finalize revision
            update_revision_resp = self._de_client.update_revision(
                DataSetId=data_set.id,
                RevisionId=revision_id,
                Finalized=True
            )
            logger.debug(update_revision_resp)

            # Ensure revision finalization succeeded
            finalized_status = update_revision_resp['Finalized']
            if finalized_status is not True:
                raise Exception(f"Failed to finalize revision:\n{update_revision_resp}")

            # Publish the new revision to each product associated with the data set
            for product in data_set.products:
                # Describe the AWS Marketplace entity corresponding to the Data Exchange product
                describe_entity_resp = self._mc_client.describe_entity(
                    Catalog='AWSMarketplace',
                    EntityId=product.id
                )
                logger.debug(describe_entity_resp)

                entity_type = describe_entity_resp['EntityType']
                entity_id = describe_entity_resp['EntityIdentifier']

                # Isolate the target data set in the DescribeEntity response
                describe_entity_resp_data_sets = json.loads(describe_entity_resp['Details'])['DataSets']
                describe_entity_resp_data_set = list(
                    filter(lambda ds: ds['DataSetArn'] == data_set.arn, describe_entity_resp_data_sets)
                )
                # We should get the data set of interest in describe_entity_resp and only that data set
                assert len(describe_entity_resp_data_set) == 1

                # Start a ChangeSet to add the newly finalized revision to an existing product
                start_change_set_resp = self._mc_client.start_change_set(
                    Catalog='AWSMarketplace',
                    ChangeSet=[
                        {
                            "ChangeType": "AddRevisions",
                            "Entity": {
                                "Identifier": entity_id,
                                "Type": entity_type
                            },
                            "Details": json.dumps({
                                "DataSetArn": data_set.arn,
                                "RevisionArns": [revision_arn]
                            })
                        }
                    ]
                )
                logger.debug(start_change_set_resp)

                # Wait for the ChangeSet workflow to complete
                change_set_id = start_change_set_resp['ChangeSetId']
                describe_change_set_resp = self._wait_for_mc_change_set_completion(change_set_id)
                logger.debug(describe_change_set_resp)

The following screenshot shows the S3 trigger for Job 3.

The following screenshot shows an example of CloudWatch logs for Job 3.

The following screenshot shows a CloudWatch alarm for Job 3.

Finally, we can verify that our revisions were successfully added to their corresponding data sets and products through the AWS console.

AWS Data Exchange allows you to create private offers for your AWS account IDs, providing a convenient means of checking that revisions show up in each product as expected.

Conclusion

This post demonstrated how you can integrate AWS Data Exchange into an existing data pipeline frictionlessly. We’re pleased to have been invited to participate in the AWS Data Exchange private preview, and even more pleased with the service itself, which has proven to be a sophisticated yet natural extension of our system.

I want to offer special thanks to both Kyle Patsen and Rafic Melhem of the AWS Data Exchange team for generously fielding my questions (and patiently enduring my ramblings) for the better part of the past year. I also want to thank Lucas Adams for helping me design the system discussed in this post and, more importantly, for his unwavering vote of confidence.

If you are interested in learning more about FPG, don’t hesitate to contact us.

 

Publish and update data products dynamically with AWS Data Exchange

Post Syndicated from Akram Chetibi original https://aws.amazon.com/blogs/big-data/publish-and-update-data-products-dynamically-with-aws-data-exchange/

Data is revolutionizing the way organizations of all sizes conduct their business. Companies are increasingly using third-party data to complement their internal data and deliver value for their customers. Third party data is used across a wide variety of use-cases, such as to build applications for customers, to run analytics workloads to improve business operations and marketing activities, or to build predictive models using machine learning (ML) techniques.

However, as data becomes the center of how companies operate, the way data providers deliver to data subscribers has not changed in years. As data providers, you spend time and effort on undifferentiated heavy lifting to build data delivery and entitlement management mechanisms to serve your customers. Many data providers also rely on traditional sales and delivery channels and are often unable to reach many customers interested in their data, which leads to slower adoption of their data products.

Enter AWS Data Exchange.

AWS Data Exchange makes it easy to exchange data in the cloud efficiently. In a few minutes, customers can find and subscribe to hundreds of data products from more than 80 qualified data providers across industries, such as Financial Services, Healthcare and Life Sciences, and Consumer and Retail. After subscribing, customers can download a dataset or copy it to Amazon S3 and analyze it with a wide variety of AWS analytics and ML services. AWS Data Exchange gives data providers a secure, transparent, and reliable channel to reach millions of AWS customers. AWS Data Exchange also helps you service your existing customer subscriptions more efficiently and at a lower cost by eliminating the need to build and maintain data delivery, licensing, or billing infrastructure.

Many data providers publish data products that are updated regularly. For example, a stock market data provider may want to publish daily closing prices every day, or a weather forecast data provider may want to provide an updated forecast every week. This post walks through the process of publishing and updating products dynamically on AWS Data Exchange. The post first shows how to publish a new product and make it available to subscribers, which can be done in minutes using the AWS Data Exchange console. The post also reviews a workflow using a Lambda function to automatically update the product by publishing new revisions to its underlying data sets.

Prerequisites

Before you begin, complete the following prerequisites:

  1. You must be a registered provider on AWS Data Exchange. Only eligible and registered providers can publish data products on AWS Data Exchange. Eligible providers must agree to the Terms and Conditions for AWS Marketplace under a valid legal entity domiciled in the United States or a member state of the EU, supply valid banking and taxation identification, and be qualified by the AWS Data Exchange business operations team. For more information, see Providing Data Products on AWS Data Exchange.
  2. The data that you publish must be compliant with the AWS Marketplace Terms and Conditions and the AWS Data Exchange Publishing Guidelines.
  3. You must have the appropriate IAM permissions to use AWS Data Exchange as a provider. For example, you can use the AWSDataExchangeProviderFullAccess managed IAM policy.
  4. You need an S3 bucket for your ready-to-publish data files. For more information, see Create a Bucket and What is Amazon S3?

AWS Data Exchange concepts

Products are the unit of exchange in AWS Data Exchange. A product is a package of data sets that a provider publishes and others subscribe to. The AWS Data Exchange product catalog and AWS Marketplace website both list products. A product can contain one or more data sets, as well as product details, including the product’s name and description, categories, and contact details. The product also contains information related to the product’s offer terms, which are the terms that subscribers agree to when subscribing to a product. These terms include the available pricing and duration options, the data subscription agreement, and the refund policy.

A data set is a dynamic set of file-based data content. Data sets are dynamic and versioned using revisions. A revision is a specific version of a data set. Each revision can contain multiple files called assets, which you can import to a revision using an asynchronous workflow called a job. After creating a revision and importing assets into it, you need to finalize the revision to mark it as ready for publishing, before publishing it into the dataset’s product. For more information, see Working with Data Sets.

The following diagram summarizes the concepts described above and the hierarchy of the different resources.

Publishing a new product to AWS Data Exchange

Before reviewing how to automatically update an existing product, let’s start by setting up and creating a new product. If you already have a published product, skip this section and move on to “Publishing new data files to the product automatically.”

Creating a dataset

To publish a product, first create a dataset. Complete the following steps:

  1. On the AWS Data Exchange console’s, under Data sets, choose Create data set.
  2. Enter a Name and Description for the dataset and choose Create.

The name of the data set is visible as part of the product details in the catalog; consider using a concise name that enables customers to understand the content of the data set easily. The description is visible to subscribers who have an active subscription to the product; consider including coverage information as well as the features and benefits of the dataset.

The following screenshot shows the Create data set section with name and description. This post entered the name Exchange-A End of Day Prices, and the description, End-of-day pricing of all equities listed on Exchange-A. Covers all industries and all equities traded on the exchange (2,000+). This data set contains full history from 1985, and is updated daily with a new file every day around 5pm EST

Creating a revision

After creating the dataset, but before publishing it into a product, you need to create its first initial revision. Complete the following steps:

  1. On your data set’s page, choose the Revisions.
  2. Choose Create revision.
  3. For Revision settings, enter a brief comment about the data in this revision.
  4. Choose Create.The revision comment is visible to subscribers after they subscribe to your product.The following screenshot shows that this post entered the comment Historical data from January 1st, 1985 to November 13th, 2019.You can choose to import files (assets) to this revision from either an S3 bucket or your computer. This post imports a file from an S3 bucket. It is important to note that by default, AWS Data Exchange uses the source S3 Object’s key as an Asset name. The following screenshot shows the example file this post uses.
  5. When the import status is complete, choose Finalize.

Marking a revision as finalized means that it is staged for publishing. You can only publish finalized revisions to subscribers; you can’t modify a revision after publishing it.

Publishing a new product

You are now ready to publish a new product using this data set. Complete the following steps:

  1. On the AWS Data Exchange console, under Publish data, choose Products.
  2. Choose Publish new product.
  3. In Product overview, enter the product details that subscribers can use to identify the product. For information about best practices when populating your product’s details, see Publishing Products. In particular you may want to consider including links to a Data due diligence questionnaire (DDQ), information about the data set file types and schemas, and any other fact sheets.Note that you can use markdown to include links and format your product description.
  4. Choose Next to proceed to the Add data You can then add the dataset that you created above.
  5. Choose Next to proceed to the Configure the public offer page. This is the page where you configure the offer details for your product, including the available pricing options, the Data Subscription Agreement, and the refund policy.You can also choose whether you would like to enable subscription verification. If you enable subscription verification, prospective subscribers will have to fill in information such as their name, company name, email address, and use-case before being able to subscribe. The subscription request will then appear on your Product Dashboard page, and you will have up to 45 days to approve or decline the request. For information about subscription verification, see Subscription Verification for Providers.
  6. Choose Next to review your product. You can preview the product as it will appear on the AWS Data Exchange product catalog. When you are satisfied with your product and offer details, choose Publish the product.Important: Choosing Publish the product will publish your product to the AWS Data Exchange catalog and make it publicly available to subscribers.

You have now created a new data set, added your first revision to this data set with historical data, finalized the revision, and published a product using this finalized revision. This product is available for subscribers to purchase within a few hours after publishing.

Publishing new data files to the product automatically

Now that the product is available to customers, you need to update the product and continuously publish new revisions to it. In our example, you need to publish new equity prices every day. To do so, set up the following architecture, which automatically picks any files uploaded to your S3 bucket and publishes them to the product’s dataset as part of a new revision. The workflow creates and publishes a new revision for each file uploaded to the S3 bucket.

The workflow is as follows:

  1. You upload a ready-to-publish data file to the S3 bucket to update your data set.
  2. S3 invokes an AWS Lambda function with the S3 API event that contains details about the object. For more information, see Using AWS Lambda with Amazon S3.
  3. The AWS Lambda function creates a new revision under the pre-existing data set and starts a job to import the file.
  4. The AWS Lambda function modifies the pre-existing product to include the new dataset revision.
  5. Subscribers can now consume the new revision, which appears as part of their entitled data set.

Building a Lambda function

Now that you published a product with a data set, you have the foundational pieces in place to build a Lambda function that picks a new data file uploaded to S3 and publishes it as a part of that product.

To configure your Lambda function correctly, you first need to record the dataset ID and product ID that you created earlier. You can retrieve them from the AWS Data Exchange console. The product ID is available on the product page, which you can access from your Product Dashboard. The data set ID is available in the data set’s page, which you can access from the Data sets pages.

Data set page

Product page

Creating an IAM role

To give the Lambda function permission to read from the source S3 bucket, create a revision, upload files to it, and publish it to a product, you need to create an IAM role with the appropriate permissions.

To do so, create an IAM role and attach the following policy to it. Be sure to replace {INSERT-BUCKET-NAME} and {INSERT-ACCOUNTID} with your S3 bucket’s name and your account ID respectively.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3PermissionforGettingDataSet",
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::{INSERT-BUCKET-NAME}/*"
        },
        {
            "Sid": "S3DataExchangeServicePermissions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::*aws-data-exchange*"
        },
        {
            "Sid": "DataExchangeAPIPermissions",
            "Effect": "Allow",
            "Action": [
                "dataexchange:CreateRevision",
                "dataexchange:UpdateRevision",
                "dataexchange:CreateJob",
                "dataexchange:StartJob",
                "dataexchange:GetJob"
            ],
            "Resource": "*"
        },
        {
            "Sid": "MarketplaceAPIPermissions",
            "Effect": "Allow",
            "Action": [
                "aws-marketplace:DescribeEntity",
                "aws-marketplace:StartChangeSet",
                "aws-marketplace:DescribeChangeSet"
            ],
            "Resource": "*"
        },
        {
            "Sid": "CreateCloudwatchLogGroup",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:{INSERT-ACCOUNTID}:*"
        },
        {
            "Sid": "CloudwatchLogsPermissions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:us-east-1:{INSERT-ACCOUNTID}:log-group:*"
        }
    ]
}

For more information, see Creating IAM Roles.

Deploying a Lambda layer

This post uses a Lambda layer that extends the AWS Python SDK (boto3) that is built into the Lambda Python runtime by adding the AWS Data Exchange and AWS Marketplace Catalog API SDKs as of November 13, 2019. You can deploy a sample layer published for this post, but you should use the version of the AWS SDK that matches your needs.

Creating a Lambda function

Now that you created the IAM role and deployed the Lambda layer with your latest SDK, you can create a Lambda function using the following steps:

  1. On the Lambda console, choose Create a function.
  2. In the Create function section, choose Author from scratch.
  3. In the Basic information section, configure your Lambda function with the following information:
    1. For Function name, enter a name of your choice.
    2. For Runtime, choose Python 3.7.
    3. For Permissions, select Use an existing role.
    4. From the Existing role dropdown, select the Lambda role you created earlier.
  4. Choose Create function.

Configuring your Lambda function

You can now configure your Lambda function. You first need to configure the function to be triggered when new files upload to the S3 bucket. Complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Select the newly created function.
  3. On the function configuration page, choose Add trigger.
  4. Under Trigger Configuration, choose S3.
  5. From the drop-down, select the bucket you created as a part of the prerequisites.
  6. Under Event type, choose All Object Create Events.
  7. Optionally, choose a Prefix or a Suffix if you want to only publish specific files to your AWS Data Exchange product.
  8. Choose Add.

To confirm your code is running with the appropriate SDK, associate the Lambda layer that you deployed earlier with your Lambda function. As noted previously, this post published a sample layer, but you should use the appropriate version of the AWS SDK that matches your needs.

  1. On the Lambda console, choose Functions.
  2. Select the newly created function.
  3. On the function configuration page, under the function name, choose Layers.
  4. Choose Add a layer.
  5. Under Layer Selection, deselect Select from list of runtime compatible layers.
  6. From the drop-down, choose the layer you deployed earlier.
  7. Choose Add.

You now need to configure the Lambda function’s code. You can copy the following code for the Lambda function. This code programmatically calls the following APIs, which are the same APIs that you performed earlier using the console:

  • CreateRevision creates a new revision.
  • CreateJob and StartJob start importing the file to the revision.
  • GetJob checks the status of the import.
  • UpdateRevision marks the revision as finalized.

To publish an update to the product, the Lambda function uses the AWS Marketplace Catalog API service with the following APIs. To learn more, see the AWS Marketplace Catalog API Reference.

  • DescribeEntity gets the product details.
  • StartChangeSet starts an update.
  • DescribeChangeSet checks the status of the product update.

Complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Select your newly created function.
  3. Scroll down to the Function code
  4. Enter the following code:
    import os
    
    #Include the Lambda layer extracted location
    os.environ['AWS_DATA_PATH'] = '/opt/' 
    
    import boto3
    import time
    import datetime
    import json
    
    region = os.environ['AWS_REGION']
    
    try:
        data_set_id = os.environ['DATA_SET_ID']
    except KeyError:
        raise Exception("DATA_SET_ID environment variable must be defined!") 
    
    try:
        product_id = os.environ['PRODUCT_ID']
    except KeyError:
        raise Exception("PRODUCT_ID environment variable must be defined!")
    
    def lambda_handler(event, context):
        # Setup the boto3 clients needed
        dataexchange = boto3.client(
            service_name='dataexchange',
            region_name=region
    
        )
        marketplace_catalog = boto3.client(
            service_name='marketplace-catalog',
            region_name=region
        )
    
        # parse the s3 details from the triggered event
        bucket_name = event['Records'][0]['s3']['bucket']['name']
        object_key = event['Records'][0]['s3']['object']['key']
    
        # CREATE REVISION under the dataset provided as an environment variable
        current_time_for_creating_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
        create_revision_response = dataexchange.create_revision(DataSetId=data_set_id,
                                                         Comment='Revision created programmatically on ' + current_time_for_creating_revision)
        revision_id = create_revision_response['Id']
    
        # CREATE JOB under the revision to import file from S3 to DataExchange
        create_job_s3_import = dataexchange.create_job(
            Type='IMPORT_ASSETS_FROM_S3',
            Details={
                'ImportAssetsFromS3': {
                    'DataSetId': data_set_id,
                    'RevisionId': revision_id,
                    'AssetSources': [
                        {
                            'Bucket': bucket_name,
                            'Key': object_key
                        }
    
                    ]
                }
            }
        )
    
        # Filter the ID of the Job from the response
        job_id = create_job_s3_import['Id']
    
        # invoke START JOB on the created job to change it from Waiting to Completed state
        start_created_job = dataexchange.start_job(JobId=job_id)
    
        # GET JOB details to track the state of the job and wait until it reaches COMPLETED state
        job_status = ''
    
        while job_status != 'COMPLETED':
            get_job_status = dataexchange.get_job(JobId=job_id)
            job_status = get_job_status['State']
            print('Job Status ' + job_status)
            
            if job_status=='ERROR' :
                job_errors = get_job_status['Errors']
                raise Exception('JobId: {} failed with error:{}'.format(job_id, job_errors))
            
            time.sleep(.5)
            
        # Finalize revision by invoking UPDATE REVISION
        current_time_for_finalize_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
        print(current_time_for_finalize_revision)
        finalize_revision = dataexchange.update_revision(DataSetId=data_set_id, RevisionId=revision_id, Finalized=True,
                                                  Comment='Revision finalized programmatically on ' + current_time_for_finalize_revision)
    
        # New dataset version created and finalized, now let’s add it to an existing product specified as an env variable
    
        # Describe Product details to get the metadata about the product
        describe_entity = marketplace_catalog.describe_entity(Catalog='AWSMarketplace', EntityId=product_id)
    
        # Use the output to pull out producttype, productid and datasetarn for startchangeset call
        entity_type = describe_entity['EntityType']
        entity_id = describe_entity['EntityIdentifier']
        dataset_arn = ((json.loads(describe_entity['Details']))['DataSets'][0]['DataSetArn'])
        revision_arn = create_revision_response['Arn']
     
    
        # StartChangeSet to add the newly finalized revision to an existing product
        start_change_set = marketplace_catalog.start_change_set(
            Catalog='AWSMarketplace',
            ChangeSetName="Adding revision to my Product",
            ChangeSet=[
                {
                    "ChangeType": "AddRevisions",
                    "Entity": {
                        "Identifier": entity_id,
                        "Type": entity_type
                    },
                    "Details": json.dumps({
                        "DataSetArn": dataset_arn,
                        "RevisionArns": [revision_arn]
                    })
                }
            ]
        )
        
        #Filter the changeset id from the response
        changeset_id = start_change_set['ChangeSetId']
    
        # DESCRIBE CHANGESET to get the status of the changeset and wait until it reaches SUCCEEDED state
        change_set_status = ''
    
        while change_set_status != 'SUCCEEDED':
            describe_change_set = marketplace_catalog.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=changeset_id
                )
            change_set_status = describe_change_set['Status']
            print('Change Set Status ' + change_set_status)
    
            if change_set_status=='FAILED' :
                print(describe_change_set)
                failurereason = describe_change_set['FailureDescription']
                raise Exception('ChangeSetID: {} failed with error:\n{}'.format(changeset_id, failurereason))
            time.sleep(1)
            
        return ('Your data has been published successfully')

  5. Scroll down to Environment Variables
  6. Set the DATA_SET_ID and PRODUCT_ID variables to the values you retrieved from the console.
  7. Scroll further down to Basic Settings and set the Timeout value to 1 minute.
  8. Choose Save.

When you upload a file to your S3 bucket, the S3 event now triggers the Lambda function, which updates the dataset automatically and publishes the new file to your subscribers. Subscribers also receive an Amazon CloudWatch event from AWS Data Exchange to automate exporting the data to their S3 buckets.

Conclusion

AWS Data Exchange provides an easy and convenient way for data providers to exchange data with their customers in a cloud-native, secure, and efficient way. This post showed you how to publish a new product from on a newly created data set and revision in the AWS Data Exchange Console. You also learned how to automatically publish files uploaded to your S3 bucket as new revisions. To learn more, visit AWS Data Exchange.

 


About the Authors

Akram Chetibi is a senior product manager of AWS Data Exchange. Akram joined AWS more than two years ago, and has launched multiple services including AWS Data Exchange and AWS Fargate.

 

 

 

 

Keerti Shah is a global solutions architect with Amazon Web Services. She enjoys working with Financial Services customers to drive innovation, digitization, and modernization of legacy applications.

 

 

 

 

Harsha W. Sharma is a global account solutions architect with AWS New York. Harsha joined AWS more than three years ago and works with Global Financial Services customers to design and develop architectures on AWS and support their journey on the cloud.

 

 

 

Find and acquire new data sets and retrieve new updates automatically using AWS Data Exchange

Post Syndicated from Akram Chetibi original https://aws.amazon.com/blogs/big-data/find-and-acquire-new-data-sets-and-retrieve-new-updates-automatically-using-aws-data-exchange/

Customers are doing some amazing things with data, such as improving medicine and tackling climate change. With AWS services, such as AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and Amazon Athena, it is easier than ever to get data insights for your business. But how can you find the right data to fuel these analytics? This is where AWS Data Exchange steps in.

AWS Data Exchange makes it simple to exchange data in the cloud. In a few minutes, you can find and subscribe to hundreds of data products from more than 80 qualified data providers across industries such as Financial Services, Healthcare and Life Sciences, and Consumer and Retail. After subscribing, you can download data sets or copy them to Amazon S3 and analyze them with AWS’s analytics and machine learning services. With AWS Data Exchange, you can subscribe to data products and get access to data sets. Subscribers also access new data set revisions as providers publish new data.

This post uses an example scenario in which you would like to analyze daily treasury maturities in order to understand changes in the economy. We will use Rearc’s Daily Treasury Maturities | Federal Reserve Board product, which contains a data set that is updated daily with new data. This post walks through the process, from browsing the catalog and subscribing to the data products to setting up an automation to retrieve new revisions to S3 automatically, making it readily available to analyze using other AWS services.

Solution overview

The solution has three steps:

  1. Configure your prerequisites: an S3 bucket for your data and IAM permissions for using AWS Data Exchange.
  2. Subscribe to a new data product in AWS Data Exchange.
  3. Set up an automation using Amazon CloudWatch events to retrieve new revisions of subscribed data products in AWS Data Exchange automatically.

Prerequisites

This post assumes you have an S3 bucket to which you export your data sets. For more information, see Create a Bucket.

You also need permissions to use AWS Data Exchange and associated services to subscribe to and export data sets. You can, for example, use the AWS Data Exchange managed policy AWSDataExchangeSubscriberFullAccess, which gives you all the necessary permissions needed to use AWS Data Exchange as a subscriber. For more information, see Identity and Access Management in AWS Data Exchange.

Browsing the catalog and subscribing to data products

Browsing and subscribing to a new data product is straightforward. The first step is to determine what data products you wish to subscribe to. Complete the following steps:

  1. On the AWS Data Exchange console, choose Product catalog.You can search for a term and filter results by provider name and pricing plan.
  2. For Product catalog, enter federal reserve.
  3. Choose Search.You can see multiple data products listed, including a few products by Rearc and Crux Informatics. You can filter the results further by refining the results.
  4. Under Refine results, under Vendors, select Rearc.This post is searching for free product offerings, so filters the results further.
  5. Under Pricing plans, select Free.The filtered results contain Daily Treasury Maturities | Federal Reserve Board, which you can use for testing.Choosing the product name shows more product details, including its full description, which data sets are included in the product (some products offer multiple data sets in a single subscription), the product’s support contact information, as well as the its offer details, such as the data subscription agreement, available pricing options, and the refund policy. See the following screenshot of the product detail page.It is important to understand the offer details you are agreeing to, including the price and Data Subscription Agreement (DSA). A link to view the DSA is under the Usage tab. Read over the DSA; it is a legal agreement that defines the rights to use the data. You need to make sure that the agreement aligns with your intended usage before subscribing.
  6. Choose Continue to subscribe.
  7. Under Complete subscription, for Pricing information, choose a subscription duration and price.
  8. For Renewal settings, choose whether you want to enable auto-renewal when the subscription expires.The following screenshot shows that this post chooses a subscription for 12 months, and to renew automatically.
  9. Choose Subscribe.The subscription process can take up to a few minutes to complete.

When your subscription is active, it is visible under the Active subscriptions tab of the Subscriptions page. Choose your subscription to view its details, including the data sets included in the subscription. You can also see the Region to which the vendor publishes the data set.

Viewing revisions and exporting assets

When you click on the data set name, you proceed to the data set page. You can view revisions under the Revisions tab. The following screenshot shows the list of revisions organized by Revision ID and time created.

Over time, as Rearc updates the data set, you can see multiple revisions listed.

Choosing the latest Revision ID brings up all the files (called assets in AWS Data Exchange) available in the revision. To export the assets, complete the following steps:

  1. Choose the asset to export.
  2. Choose Export to Amazon S3.
  3. Choose an S3 Bucket in the S3 navigation modal.
  4. Choose Export.

AWS Data Exchange starts copying the asset to your S3 bucket. In the console, AWS Data Exchange uses the asset name as an S3 object key. View the export progress in the Jobs list. It progresses through three steps: Waiting, Processing, and Completed.

Subscription Verification

AWS Data Exchange also has a feature called Subscription Verification. Subscription Verification allows providers to approve or decline subscription requests before granting subscription to certain products. For products with Subscription Verification enabled, you need to complete a form to share some information with the provider, who has up to 45 days to approve or reject the request. The form includes information such as your contact name, email address, company name, AWS account number, and intended use case for the data. The provider uses this information (and potentially reaches out to you for more information) to decide whether to approve your subscription request. You can view your subscription request status on the Subscriptions page under the Subscription requests tab. To learn more about subscription verification, see Subscription Verification for Subscribers.

Automating the retrieval for new data set revisions

Providers update many products regularly by creating and publishing new revisions to the underlying data sets. For example, the Rearc data product is updated daily. You want your analytics and visualizations to add these revisions to their insights easily. To do so, you need to set up an automation to retrieve the new files stored in newly published revisions.

The following diagram shows the workflow of this process.

Every time a new revision is published, AWS Data Exchange publishes a CloudWatch event sourced from aws.dataexchange. Using a Cloudwatch event rule to trigger a Lambda function, an AWS Data Exchange Job exports the revision’s assets to a pre-defined S3 bucket. It is interesting to note that because AWS Data Exchange uses the asset name as a default S3 object key when exporting to Amazon S3, and since Rearc is publishing a new revision with the same asset name every day, this automation will always override the previous day’s file with a new file, allowing you to always refer to the same S3 object, which will have the latest data.

An AWS CloudFormation template packages this automation. It contains all the necessary resources, including an S3 bucket to store the data, the Lambda function to export the data, its IAM role and policy, and the CloudWatch event rule to trigger the function. Packaging this automation in an AWS CloudFormation template makes it simple to repeat the automation for each data set you subscribe to. You can configure the template using the Data Set ID, which you can retrieve from the data set page that we have seen above.

In this post, we use a Lambda layer that extends the AWS Python SDK (boto3) that is built into the Lambda Python runtime by adding the AWS Data Exchange and AWS Marketplace Catalog API SDKs as of November 13, 2019. This is an example layer published for this post; use the correct version of the AWS SDK for your needs.

Deploying the automation

Before deploying the automation, make sure you are in the Region in which the data set is located. You can find this on the Subscription details page under Data sets.

  1. Click this button to deploy the CloudFormation template in us-east-1 region from the CloudFormation console.

    Alternatively, if you’re using a different region, you can manually create the stack in that region:

    • On the AWS CloudFormation console, choose Create Stack.
    • On the Create stack screen, for Template source, select Amazon S3 URL, and enter this URL in the box:
      https://aws-bigdata-blog.s3.amazonaws.com/artifacts/aws-blog-DataExchange/DataExchangeDownloadDataSet-v0.5.yaml
  1. On the stack details screen, give the Stack a name and paste in the ID of the dataset from the subscription. You can retrieve the Data Set ID from the AWS Data Exchange Console’s Subscriptions. Optionally, you can enter a Revision ID to download an existing revision to the s3 bucket immediately after stack creation. You can leave the revision ID blank, and only revisions published after this time will be downloaded to the s3 bucket. Choose Next.
  2. On the Configure stack options page, choose Next.
  3. On the Review screen, scroll down and check the three boxes in the Capabilities and transforms Then choose the Create stack button.

The stack takes 3–4 minutes to complete. Choose the refresh icon to see the latest status.  You can see the created S3 bucket under the Resources tab. This is where you can see new data set revisions.

Conclusion

In this post, you have searched and subscribed to a product and deployed the automation needed to automatically export new revisions to Amazon S3. This automation makes the data readily available to catalog and analyze using other AWS services. For example, you can catalog the new data automatically with an AWS Glue crawler, which creates and updates a table in your database with the Rearc data automatically. For more information, see Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs. After cataloging the data, you can run a serverless ETL job to transform it into Parquet, or use it directly as-is from Amazon Athena or Amazon QuickSight.

 


About the Authors

Akram Chetibi is a Senior Product Manager of AWS Data Exchange. Akram joined AWS more than two years ago, and has launched multiple services including AWS Data Exchange and AWS Fargate.

 

 

 

 

George Seib is an Enterprise Solutions Architect with Amazon Web Services. He helps Financial Services and Enterprise customers cost effectively scale and secure data workloads.