Tag Archives: Amazon Athena

Amazon Personalize customer outreach on your ecommerce platform

Post Syndicated from Sridhar Chevendra original https://aws.amazon.com/blogs/architecture/amazon-personalize-customer-outreach-on-your-ecommerce-platform/

In the past, brick-and-mortar retailers leveraged native marketing and advertisement channels to engage with consumers. They have promoted their products and services through TV commercials, and magazine and newspaper ads. Many of them have started using social media and digital advertisements. Although marketing approaches are beginning to modernize and expand to digital channels, businesses still depend on expensive marketing agencies and inefficient manual processes to measure campaign effectiveness and understand buyer behavior. The recent pandemic has forced many retailers to take their businesses online. Those who are ready to embrace these changes have embarked on a technological and digital transformation to connect to their customers. As a result, they have begun to see greater business success compared to their peers.

Digitizing a business can be a daunting task, due to lack of expertise and high infrastructure costs. By using Amazon Web Services (AWS), retailers are able to quickly deploy their products and services online with minimal overhead. They don’t have to manage their own infrastructure. With AWS, retailers have no upfront costs, have minimal operational overhead, and have access to enterprise-level capabilities that scale elastically, based on their customers’ demands. Retailers can gain a greater understanding of customers’ shopping behaviors and personal preferences. Then, they are able to conduct effective marketing and advertisement campaigns, and develop and measure customer outreach. This results in increased satisfaction, higher retention, and greater customer loyalty. With AWS you can manage your supply chain and directly influence your bottom line.

Building a personalized shopping experience

Let’s dive into the components involved in building this experience. The first step in a retailer’s digital transformation journey is to create an ecommerce platform for their customers. This platform enables the organization to capture their customers’ actions, also referred to as ‘events’. Some examples of events are clicking on the shopping site to browse product categories, searching for a particular product, adding an item to the shopping cart, and purchasing a product. Each of these events gives the organization information about their customer’s intent, which is invaluable in creating a personalized experience for that customer. For instance, if a customer is browsing the “baby products” category, it indicates their interest in that category even if a purchase is not made. These insights are typically difficult to capture in an in-store experience. Online shopping makes gaining this knowledge much more straightforward and scalable.

The proposed solution outlines the use of AWS services to create a digital experience for a retailer and consumers. The three key areas are: 1) capturing customer interactions, 2) making real-time recommendations using AWS managed Artificial Intelligence/Machine Learning (AI/ML) services, and 3) creating an analytics platform to detect patterns and adjust customer outreach campaigns. Figure 1 illustrates the solution architecture.

Digital shopping experience architecture

Figure 1. Digital shopping experience architecture

For this use case, let’s assume that you are the owner of a local pizzeria, and you manage deliveries through an ecommerce platform like Shopify or WooCommerce. We will walk you through how to best serve your customer with a personalized experience based on their preferences.

The proposed solution consists of the following components:

  1. Data collection
  2. Promotion campaigns
  3. Recommendation engine
  4. Data analytics
  5. Customer reachability

Let’s explore each of these components separately.

Data collection with Amazon Kinesis Data Streams

When a customer uses your web/mobile application to order a pizza, the application captures their activity as click-stream ‘events’. These events provide valuable insights about your customers’ behavior. You can use these insights to understand the trends and browsing pattern of prospects who visited your web/mobile app, and use the data collected for creating promotion campaigns. As your business scales, you’ll need a durable system to preserve these events against system failures, and scale based on unpredictable traffic on your platform.

Amazon Kinesis is a Multi-AZ, managed streaming service that provides resiliency, scalability, and durability to capture an unlimited number of events without any additional operational overhead. Using Kinesis producers (Kinesis Agent, Kinesis Producer Library, and the Kinesis API), you can configure applications to capture your customer activity. You can ingest these events from the frontend, and then publish them to Amazon Kinesis Data Streams.

Let us start by setting up Amazon Kinesis Data Streams to capture the real-time sales transactions from the online channels like a portal or mobile app. For this blog post, we have used the Kaggle’s public data set as a reference. Figure 2 illustrates a snapshot of sample data to build personalized recommendations for a customer.

Sample sales transaction data

Figure 2. Sample sales transaction data

Promotion campaigns with AWS Lambda

One way to increase customer conversion is by offering discounts. When the customer adds a pizza to their cart, you want to make sure they are receiving the best deal. Let’s assume that by adding an additional item, your customer will receive the best possible discount. Just by knowing the total cost of added items to the cart, you can provide these relevant promotions to this customer.

For this scenario, the AWS Lambda service polls the Amazon Kinesis Data Streams to read all the events in the stream. It then matches the events based on your criteria of items in the cart. In turn, these events will be processed by the Lambda function. The Lambda function will read your up-to-date promotions stored in Amazon DynamoDB. As an option, caching recent or most popular promotions will improve your application response time, as well as improve the customer experience on your platform. Amazon DynamoDB DAX is an integrated caching for DynamoDB that caches the most recent or popular promotions or items.

For example, when the customer added the items to their shopping cart, Lambda will send promotion details to them based on the purchase amount. This can be for free shipping or discount of a certain percentage. Figure 3 illustrates the snapshot of sample promotions.

Promotions table in DynamoDB

Figure 3. Promotions table in DynamoDB

Recommendations engine with Amazon Personalize

In addition to sharing these promotions with your customer, you may also want to share the recommended add-ons. In order to understand your customer preferences, you must gather historical datasets to determine patterns and generate relevant recommendations. Since web activity consists of millions of events, this would be a daunting task for humans to review, determine the patterns, and make recommendations. And since user preferences change, you need a system that can use all this volume of data and provide accurate predictions.

Amazon Personalize is a managed AI/ML service that will help you to train an ML model based on datasets. It provides an inference point for real-time recommendations prior to having ML experience. Based on the datasets, Amazon Personalize also provides recipes to generate recommendations. As your customers interact on the ecommerce platform, your frontend application calls Amazon Personalize inference endpoints. It then retrieves a set of personalized recommendations based on your customer preferences.

Here is the sample Python code to display the list of available recommenders, and associated recommendations.

import boto3
import json
client = boto3.client('personalize')

# Connect to the personalize runtime for the customer recommendations

recomm_endpoint = boto3.client('personalize-runtime')
response = recomm_endpoint.get_recommendations(itemId='79323P',
  recommenderArn='arn:aws:personalize:us-east-1::recommender/my-items',
  numResults=5)

print(json.dumps(response['itemList'], indent=2))

[
  {
    "itemId": "79323W"
  },
  {
    "itemId": "79323GR"
  },
  {
    "itemId": "79323LP"
  },
  {
  "itemId": "79323B"
  },
  {
    "itemId": "79323G"
  }
]

You can use Amazon Kinesis Data Firehose to read the data near real time from the Amazon Kinesis Data Streams collected the data from the front-end applications. Then you can store this data in Amazon Simple Storage Service (S3). Amazon S3 is peta-byte scale storage help you scale and acts as a repository and single source of truth. We use S3 data as seed data to build a personalized recommendation engine using Amazon Personalize. As your customers interact on the ecommerce platform, call the Amazon Personalize inference endpoint to make personalized recommendations based on user preferences.

Customer reachability with Amazon Pinpoint

If a customer adds products to their cart but never checks out, you may want to send them a reminder. You can set up an email to suggest they re-order after a period of time after their first order. Or you may want to send them promotions based on their preferences. And as your customers’ behavior changes, you probably want to adapt your messaging accordingly.

Your customer may have a communication preference, such as phone, email, SMS, or in-app notifications. If an order has an issue, you can inform the customer as soon as possible using their preferred method of communication, and perhaps follow it up with a discount.

Amazon Pinpoint is a flexible and scalable outbound and inbound marketing communications service. You can add users to Audience Segments, create reusable content templates integrated with Amazon Personalize, and run scheduled campaigns. With Amazon Pinpoint journeys, you can send action or time-based notifications to your users.

The following workflow shown in Figure 4, illustrates customer communication workflow for promotion. A journey is created for a cohort of college students: a “Free Drink” promotion is offered with a new order. You can send this promotion over email. If the student opens the email, you can immediately send them a push notification reminding them to place an order. But if they didn’t open this email, you could wait three days, and follow up with a text message.

Promotion workflow in Amazon Pinpoint

Figure 4. Promotion workflow in Amazon Pinpoint

Data analytics with Amazon Athena and Amazon QuickSight

To understand the effectiveness of your campaigns, you can use S3 data as a source for Amazon Athena. Athena is an interactive query service that analyzes data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

There are different ways to create visualizations in Amazon QuickSight. For instance, you can use Amazon S3 as a data lake. One option is to import your data into SPICE (Super-fast, Parallel, In-memory Calculation Engine) to provide high performance and concurrency. You can also create a direct connection to the underlying data source. For this use case, we choose to import to SPICE, which provides faster visualization in a production setup. Schedule consistent refreshes to help ensure that dashboards are referring to the most current data.

Once your data is imported to your SPICE, review QuickSight’s visualization dashboard. Here, you’ll be able to choose from a wide variety of charts and tables, while adding interactive features like drill downs and filters.

The process following illustrates how to create a customer outreach strategy using ZIP codes, and allocate budgets to the marketing campaigns accordingly. First, we use this sample SQL command that we ran in Athena to query for top 10 pizza providers. The results are shown in Figure 5.

SELECT name, count(*) as total_count FROM "analyticsdemodb"."fooddatauswest2"
group by name
order by total_count desc
limit 10

Athena query results for top 10 pizza providers

Figure 5. Athena query results for top 10 pizza providers

Second, here is the sample SQL command that we ran in Athena to find Total pizza counts by postal code (ZIP code). Figure 6 shows a visualization to help create customer outreach strategy per ZIP codes and budget the marketing campaigns accordingly.

SELECT postalcode, count(*) as total_count FROM "analyticsdemodb"."fooddatauswest2"
where postalcode is not null
group by postalcode
order by total_count desc limit 50;

QuickSight visualization showing pizza orders by zip codes

Figure 6. QuickSight visualization showing pizza orders by zip codes

Conclusion

AWS enables you to build an ecommerce platform and scale your existing business with minimal operational overhead and no upfront costs. You can augment your ecommerce platform by building personalized recommendations and effective marketing campaigns based on your customer needs. The solution approach provided in the blog will help organizations build re-usable architecture pattern and personalization using AWS managed services.

Visualize Amazon S3 data using Amazon Athena and Amazon Managed Grafana

Post Syndicated from Pedro Sola Pimentel original https://aws.amazon.com/blogs/big-data/visualize-amazon-s3-data-using-amazon-athena-and-amazon-managed-grafana/

Grafana is a popular open-source analytics platform that you can employ to create, explore, and share your data through flexible dashboards. Its use cases include application and IoT device monitoring, and visualization of operational and business data, among others. You can create your dashboard with your own datasets or publicly available datasets related to your industry.

In November 2021, the AWS team together with Grafana Labs announced the Amazon Athena data source plugin for Grafana. The feature allows you to visualize information on a Grafana dashboard using data stored in Amazon Simple Storage Service (Amazon S3) buckets, with help from Amazon Athena, a serverless interactive query service. In addition, you can provision Grafana dashboards using Amazon Managed Grafana, a fully managed service for open-source Grafana and Enterprise Grafana.

In this post, we show how you can create and configure a dashboard in Amazon Managed Grafana that queries data stored on Amazon S3 using Athena.

Solution overview

The following diagram is the architecture of the solution.

Architecture diagram

The solution is comprised of a Grafana dashboard, created in Amazon Managed Grafana, populated with data queried using Athena. Athena runs queries against data stored in Amazon S3 using standard SQL. Athena integrates with the AWS Glue Data Catalog, a metadata store for data in Amazon S3, which includes information such as the table schema.

To implement this solution, you complete the following high-level steps:

  1. Create and configure an Athena workgroup.
  2. Configure the dataset in Athena.
  3. Create and configure a Grafana workspace.
  4. Create a Grafana dashboard.

Create and configure an Athena workgroup

By default, the AWS Identity and Access Management (IAM) role used by Amazon Managed Grafana has the AmazonGrafanaAthenaAccess IAM policy attached. This policy gives the Grafana workspace access to query all Athena databases and tables. More importantly, it gives the service access to read data written to S3 buckets with the prefix grafana-athena-query-results-. In order for Grafana to be able to read the Athena query results, you have two options:

In this post, we go with the first option. To do that, complete the following steps:

  1. Create an S3 bucket named grafana-athena-query-results-<name>. Replace <name> with a unique name of your choice.
  2. On the Athena console, choose Workgroups in the navigation pane.
  3. Choose Create workgroup.
  4. Under Workgroup name, enter a unique name of your choice.
  5. For Query result configuration, choose Browse S3.
  6. Select the bucket you created and choose Choose.
  7. For Tags, choose Add new tag.
  8. Add a tag with the key GrafanaDataSource and the value true.
  9. Choose Create workgroup.

It’s important that you add the tag described in steps 7–8. If the tag isn’t present, the workgroup won’t be accessible by Amazon Managed Grafana.

For more information about the Athena query results location, refer to Working with query results, recent queries, and output files.

Configure the dataset in Athena

For this post, we use the NOAA Global Historical Climatology Network Daily (GHCN-D) dataset, from the National Oceanic and Atmospheric Administration (NOAA) agency. The dataset is available in the Registry of Open Data on AWS, a registry that exists to help people discover and share datasets.

The GHCN-D dataset contains meteorological elements such as daily maximum and minimum temperatures. It’s a composite of climate records from numerous locations—some locations contain more than 175 years recorded.

The GHCN-D data is in CSV format and is stored in a public S3 bucket (s3://noaa-ghcn-pds/). You access the data through Athena. To start using Athena, you need to create a database:

  1. On the Athena console, choose Query editor in the navigation pane.
  2. Choose the workgroup, created in the previous step, on the top right menu.
  3. To create a database named mydatabase, enter the following statement:
CREATE DATABASE mydatabase
  1. Choose Run.
  2. From the Database list on the left, choose mydatabase to make it your current database.

Now that you have a database, you can create a table in the AWS Glue Data Catalog to start querying the GHCN-D dataset.

  1. In the Athena query editor, run the following query:
CREATE EXTERNAL TABLE `noaa_ghcn_pds`(
  `id` string, 
  `year_date` string, 
  `element` string, 
  `data_value` string, 
  `m_flag` string, 
  `q_flag` string, 
  `s_flag` string, 
  `obs_time` string
)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
WITH SERDEPROPERTIES ('separatorChar'=',')
STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION 's3://noaa-ghcn-pds/csv/'
TBLPROPERTIES ('classification'='csv')

After that, the table noaa_ghcn_pds should appear under the list of tables for your database. In the preceding statement, we define columns based on the GHCN-D data structure. For a full description of the variables and data structure, refer to the dataset’s readme file.

With the database and the table configured, you can start running SQL queries against the entire dataset. For the purpose of this post, you create a second table containing a subset of the data: the maximum temperatures of one weather station located in Retiro Park (or simply El Retiro), one of the largest parks of the city of Madrid, Spain. The identification of the station is SP000003195 and the element of interest is TMAX.

  1. Run the following statement on the Athena console to create the second table:
CREATE TABLE madrid_tmax WITH (format = 'PARQUET') AS
SELECT CAST(data_value AS real) / 10 AS t_max,
  CAST(
    SUBSTR(year_date, 1, 4) || '-' || SUBSTR(year_date, 5, 2) || '-' || SUBSTR(year_date, 7, 2) AS date
  ) AS iso_date
FROM "noaa_ghcn_pds"
WHERE id = 'SP000003195'
  AND element = 'TMAX'

After that, the table madrid_tmax should appear under the list of tables for your database. Note that in the preceding statement, the temperature value is divided by 10. That’s because temperatures are originally recorded in tenths of Celsius degrees. We also adjust the date format. Both adjustments make the consumption of the data easier.

Unlike the noaa_ghcn_pds table, the madrid_tmax table isn’t linked with the original dataset. That means its data won’t reflect updates made to the GHCN-D dataset. Instead, it holds a snapshot of the moment of its creation. That may not be ideal in certain scenarios, but is acceptable here.

Create and configure a Grafana workspace

The next step is to provision and configure a Grafana workspace and assign a user to the workspace.

Create your workspace

In this post, we use the AWS Single Sign-On (AWS SSO) option to set up the users. You can skip this step if you already have a Grafana workspace.

  1. On the Amazon Managed Grafana console, choose Create Workspace.
  2. Give your workspace a name, and optionally a description.
  3. Choose Next.
  4. Select AWS IAM Identity Center (successor to AWS SSO).
  5. For Permission type, choose Service Managed and choose Next.
  6. For Account access, select Current account.
  7. For Data sources, select Amazon Athena and choose Next.
  8. Review the details and choose Create workspace.

This starts the creation of the Grafana workspace.

Create a user and assign it to the workspace

The last step of the configuration is to create a user to access the Grafana dashboard. Complete the following steps:

  1. Create a user for your AWS SSO identity store if you don’t have one already.
  2. On the Amazon Managed Grafana console, choose All workspaces in the navigation pane.
  3. Choose your Grafana workspace to open the workspace details.
  4. On the Authentication tab, choose Assign new user or group.
  5. Select the user you created and choose Assign users and groups.
  6. Change the user type by selecting the user and on the Action menu, choose Make admin.

Create a Grafana dashboard

Now that you have Athena and Amazon Managed Grafana configured, create a Grafana dashboard with data fetched from Amazon S3 using Athena. Complete the following steps:

  1. On the Amazon Managed Grafana console, choose All workspaces in the navigation pane.
  2. Choose the Grafana workspace URL link.
  3. Log in with the user you assigned in the previous step.
  4. In the navigation pane, choose the lower AWS icon (there are two) and then choose Athena on the AWS services tab.
  5. Choose the Region, database, and workgroup used previously, then choose Add 1 data source.
  6. Under Provisioned data sources, choose Go to settings on the newly created data source.
  7. Select Default and then choose Save & test.
  8. In the navigation pane, hover over the plus sign and then choose Dashboard to create a new dashboard.
  9. Choose Add a new panel.
  10. In the query pane, enter the following query:
select iso_date as time, t_max from madrid_tmax where $__dateFilter(iso_date) order by iso_date
  1. Choose Apply.
  2. Change the time range on the top right corner.

For example, if you change to Last 2 years, you should see something similar to the following screenshot.

Temperature visualization

Now that you’re able to populate your Grafana dashboard with data fetched from Amazon S3 using Athena, you can experiment with different visualizations and configurations. Grafana provides lots of options, and you can adjust your dashboard to your preferences, as shown in the following example screenshot of daily maximum temperatures.

Temperature visualization - colorful

As you can see in this visualization, Madrid can get really hot on the summer!

For more information on how to customize Grafana visualizations, refer to Visualization panels.

Clean up

If you followed the instructions in this post in your own AWS account, don’t forget to clean up the created resources to avoid further charges.

Conclusion

In this post, you learned how to use Amazon Managed Grafana in conjunction with Athena to query data stored in an S3 bucket. As an example, we used a subset of the GHCN-D dataset, available in the Registry of Open Data on AWS.

Check out Amazon Managed Grafana and start creating other dashboards using your own data or other publicly available datasets stored in Amazon S3.


About the authors

Pedro Pimentel is a Prototyping Architect working on the AWS Cloud Engineering and Prototyping team, based in Brazil. He works with AWS customers to innovate using new technologies and services. In his spare time, Pedro enjoys traveling and cycling.

Rafael Werneck is a Senior Prototyping Architect at AWS Cloud Engineering and Prototyping, based in Brazil. Previously, he worked as a Software Development Engineer on Amazon.com.br and Amazon RDS Performance Insights.

Set up federated access to Amazon Athena for Microsoft AD FS users using AWS Lake Formation and a JDBC client

Post Syndicated from Mostafa Safipour original https://aws.amazon.com/blogs/big-data/set-up-federated-access-to-amazon-athena-for-microsoft-ad-fs-users-using-aws-lake-formation-and-a-jdbc-client/

Tens of thousands of AWS customers choose Amazon Simple Storage Service (Amazon S3) as their data lake to run big data analytics, interactive queries, high-performance computing, and artificial intelligence (AI) and machine learning (ML) applications to gain business insights from their data. On top of these data lakes, you can use AWS Lake Formation to ingest, clean, catalog, transform, and help secure your data and make it available for analysis and ML. Once you have setup your data lake, you can use Amazon Athena which is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL.

With Lake Formation, you can configure and manage fine-grained access control to new or existing databases, tables, and columns defined in the AWS Glue Data Catalog for data stored in Amazon S3. After you set access permissions using Lake Formation, you can use analytics services such as Amazon Athena, Amazon Redshift, and Amazon EMR without needing to configure policies for each service.

Many of our customers use Microsoft Active Directory Federation Services (AD FS) as their identity provider (IdP) while using cloud-based services. In this post, we provide a step-by-step walkthrough of configuring AD FS as the IdP for SAML-based authentication with Athena to query data stored in Amazon S3, with access permissions defined using Lake Formation. This enables end-users to log in to their SQL client using Active Directory credentials and access data with fine-grained access permissions.

Solution overview

To build the solution, we start by establishing trust between AD FS and your AWS account. With this trust in place, AD users can federate into AWS using their AD credentials and assume permissions of an AWS Identity and Access Management (IAM) role to access AWS resources such as the Athena API.

To create this trust, you add AD FS as a SAML provider into your AWS account and create an IAM role that federated users can assume. On the AD FS side, you add AWS as a relying party and write SAML claim rules to send the right user attributes to AWS (specifically Lake Formation) for authorization purposes.

The steps in this post are structured into the following sections:

  1. Set up an IAM SAML provider and role.
  2. Configure AD FS.
  3. Create Active Directory users and groups.
  4. Create a database and tables in the data lake.
  5. Set up the Lake Formation permission model.
  6. Set up a SQL client with JDBC connection.
  7. Verify access permissions.

The following diagram provides an overview of the solution architecture.

The flow for the federated authentication process is as follows:

  1. The SQL client which has been configured with Active Directory credentials sends an authentication request to AD FS.
  2. AD FS authenticates the user using Active Directory credentials, and returns a SAML assertion.
  3. The client makes a call to Lake Formation, which initiates an internal call with AWS Security Token Service (AWS STS) to assume a role with SAML for the client.
  4. Lake Formation returns temporary AWS credentials with permissions of the defined IAM role to the client.
  5. The client uses the temporary AWS credentials to call the Athena API StartQueryExecution.
  6. Athena retrieves the table and associated metadata from the AWS Glue Data Catalog.
  7. On behalf of the user, Athena requests access to the data from Lake Formation (GetDataAccess). Lake Formation assumes the IAM role associated with the data lake location and returns temporary credentials.
  8. Athena uses the temporary credentials to retrieve data objects from Amazon S3.
  9. Athena returns the results to the client based on the defined access permissions.

For our use case, we use two sample tables:

  • LINEORDER – A fact table containing orders
  • CUSTOMER – A dimension table containing customer information including Personally Identifiable Information (PII) columns (c_name, c_phone, c_address)

We also have data consumer users who are members of the following teams:

  • CustomerOps – Can see both orders and customer information, including PII attributes of the customer
  • Finance – Can see orders for analytics and aggregation purposes but only non-PII attributes of the customer

To demonstrate this use case, we create two users called CustomerOpsUser and FinanceUser and three AD groups for different access patterns: data-customer (customer information access excluding PII attributes), data-customer-pii (full customer information access including PII attributes), and data-order (order information access). By adding the users to these three groups, we can grant the right level of access to different tables and columns.

Prerequisites

To follow along with this walkthrough, you must meet the following prerequisites:

Set up an IAM SAML provider and role

To set up your SAML provider, complete the following steps:

  1. In the IAM console, choose Identity providers in the navigation pane.
  2. Choose Add provider.
  3. For Provider Type, choose SAML.
  4. For Provider Name, enter adfs-saml-provider.
  5. For Metadata Document, download your AD FS server’s federation XML file by entering the following address in a browser with access to the AD FS server:
    https://<adfs-server-name>/FederationMetadata/2007-06/FederationMetadata.xml

  6. Upload the file to AWS by choosing Choose file.
  7. Choose Add provider to finish.

Now you’re ready to create a new IAM role.

  1. In the navigation pane, choose Roles.
  2. Choose Create role.
  3. For the type of trusted entity, choose SAML 2.0 federation.
  4. For SAML provider, choose the provider you created (adfs-saml-provider).
  5. Choose Allow programmatic and AWS Management Console access.
  6. The Attribute and Value fields should automatically populate with SAML:aud and https://signin.aws.amazon.com/saml.
  7. Choose Next:Permissions.
  8. Add the necessary IAM permissions to this role. For this post, attach AthenaFullAccess.

If the Amazon S3 location for your Athena query results doesn’t start with aws-athena-query-results, add another policy to allow users write query results into your Amazon S3 location. For more information, see Specifying a Query Result Location Using the Athena Console and Writing IAM Policies: How to Grant Access to an Amazon S3 Bucket.

  1. Leave the defaults in the next steps and for Role name, enter adfs-data-access.
  2. Choose Create role.
  3. Take note of the SAML provider and IAM role names to use in later steps when creating the trust between the AWS account and AD FS.

Configure AD FS

SAML-based federation has two participant parties: the IdP (Active Directory) and the relying party (AWS), which is the service or application that wants to use authentication from the IdP.

To configure AD FS, you first add a relying party trust, then you configure SAML claim rules for the relying party. Claim rules are the way that AD FS forms a SAML assertion sent to a relying party. The SAML assertion states that the information about the AD user is true, and that it has authenticated the user.

Add a relying party trust

To create your relying party in AD FS, complete the following steps:

  1. Log in to the AD FS server.
  2. On the Start menu, open ServerManger.
  3. On the Tools menu, choose the AD FS Management console.
  4. Under Trust Relationships in the navigation pane, choose Relying Party Trusts.
  5. Choose Add Relying Party Trust.
  6. Choose Start.
  7. Select Import data about the relying party published online or on a local network and enter the URL https://signin.aws.amazon.com/static/saml-metadata.xml.

The metadata XML file is a standard SAML metadata document that describes AWS as a relying party.

  1. Choose Next.
  2. For Display name, enter a name for your relying party.
  3. Choose Next.
  4. Select I do not want to configure multi-factor authentication.

For increased security, we recommend that you configure multi-factor authentication to help protect your AWS resources. We don’t enable multi-factor authentication for this post because we’re using a sample dataset.

  1. Choose Next.
  2. Select Permit all users to access this relying party and choose Next.

This allows all users in Active Directory to use AD FS with AWS as a relying party. You should consider your security requirements and adjust this configuration accordingly.

  1. Finish creating your relying party.

Configure SAML claim rules for the relying party

You create two sets of claim rules in this post. The first set (rules 1–4) contains AD FS claim rules that are required to assume an IAM role based on AD group membership. These are the rules that you also create if you want to establish federated access to the AWS Management Console. The second set (rules 5–6) are claim rules that are required for Lake Formation fine-grained access control.

To create AD FS claim rules, complete the following steps:

  1. On the AD FS Management console, find the relying party you created in the previous step.
  2. Right-click the relying party and choose Edit Claim Rules.
  3. Choose Add Rule and create your six new rules.
  4. Create claim rule 1, called NameID:
    1. For Rule template, use Transform an Incoming Claim.
    2. For Incoming claim type, choose Windows account name.
    3. For Outgoing claim type, choose Name ID.
    4. For Outgoing name ID format, choose Persistent Identifier.
    5. Select Pass through all claim values.
  5. Create claim rule 2, called RoleSessionName:
    1. For Rule template, use Send LDAP Attribute as Claims.
    2. For Attribute store, choose Active Directory.
    3. For Mapping of LDAP attributes to outgoing claim types, add the attribute E-Mail-Addresses and outgoing claim type https://aws.amazon.com/SAML/Attributes/RoleSessionName.
  6. Create claim rule 3, called Get AD Groups:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code:
      c:[Type == "http://schemas.microsoft.com/ws/2008/06/identity/claims/windowsaccountname", Issuer == "AD AUTHORITY"]
      => add(store = "Active Directory", types = ("http://temp/variable"), query = ";tokenGroups;{0}", param = c.Value);

  7. Create claim rule 4, called Roles:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code (enter your account number and name of the SAML provider you created earlier):
      c:[Type == "http://temp/variable", Value =~ "(?i)^aws-"]
      => issue(Type = "https://aws.amazon.com/SAML/Attributes/Role", Value = RegExReplace(c.Value, "aws-", "arn:aws:iam::<AWS ACCOUNT NUMBER>:saml-provider/<adfs-saml-provider>,arn:aws:iam::<AWS ACCOUNT NUMBER>:role/"));

Claim rules 5 and 6 allow Lake Formation to make authorization decisions based on user name or the AD group membership of the user.

  1. Create claim rule 5, called LF-UserName, which passes the user name and SAML assertion to Lake Formation:
    1. For Rule template, use Send LDAP Attributes as Claims.
    2. For Attribute store, choose Active Directory.
    3. For Mapping of LDAP attributes to outgoing claim types, add the attribute User-Principal-Name and outgoing claim type https://lakeformation.amazon.com/SAML/Attributes/Username.
  2. Create claim rule 6, called LF-Groups, which passes data and analytics-related AD groups that the user is a member of, along with the SAML assertion to Lake Formation:
    1. For Rule template, use Send Claims Using a Custom Rule.
    2. For Custom rule, enter the following code:
      c:[Type == "http://temp/variable", Value =~ "(?i)^data-"]
      => issue(Type = "https://lakeformation.amazon.com/SAML/Attributes/Groups", Value = c.Value);

The preceding rule snippet filters AD group names starting with data-. This is an arbitrary naming convention; you can adopt your preferred naming convention for AD groups that are related to data lake access.

Create Active Directory users and groups

In this section, we create two AD users and required AD groups to demonstrate varying levels of access to the data.

Create users

You create two AD users: FinanceUser and CustomerOpsUser. Each user corresponds to an individual who is a member of the Finance or Customer business units. The following table summarizes the details of each user.

 

FinanceUser CustomerOpsUser
First Name FinanceUser CustomerOpsUser
User logon name [email protected] [email protected]
Email [email protected] [email protected]

To create your users, complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. In the navigation pane, choose Users.
  3. On the tool bar, choose the Create user icon.
  4. For First name, enter FinanceUser.
  5. For Full name, enter FinanceUser.
  6. For User logon name, enter [email protected].
  7. Choose Next.
  8. Enter a password and deselect User must change password at next logon.

We choose this option for simplicity, but in real-world scenarios, newly created users must change their password for security reasons.

  1. Choose Next.
  2. In Active Directory Users and Computers, choose the user name.
  3. For Email, enter [email protected].

Adding an email is mandatory because it’s used as the RoleSessionName value in the SAML assertion.

  1. Choose OK.
  2. Repeat these steps to create CustomerOpsUser.

Create AD groups to represent data access patterns

Create the following AD groups to represent three different access patterns and also the ability to assume an IAM role:

  • data-customer – Members have access to non-PII columns of the customer table
  • data-customer-pii – Members have access to all columns of the customer table, including PII columns
  • data-order – Members have access to the lineorder table
  • aws-adfs-data-access – Members assume the adfs-data-access IAM role when logging in to AWS

To create the groups, complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. On the tool bar, choose the Create new group icon.
  3. For Group name¸ enter data-customer.
  4. For Group scope, select Global.
  5. For Group type¸ select Security.
  6. Choose OK.
  7. Repeat these steps to create the remaining groups.

Add users to appropriate groups

Now you add your newly created users to their appropriate groups, as detailed in the following table.

User Group Membership Description
CustomerOpsUser data-customer-pii
data-order
aws-adfs-data-access
Sees all customer information including PII and their orders
FinanceUser data-customer
data-order
aws-adfs-data-access
Sees only non-PII customer data and orders

Complete the following steps:

  1. On the Server Manager Dashboard, on the Tools menu, choose Active Directory Users and Computers.
  2. Choose the user FinanceUser.
  3. On the Member Of tab, choose Add.
  4. Add the appropriate groups.
  5. Repeat these steps for CustomerOpsUser.

Create a database and tables in the data lake

In this step, you copy data files to an S3 bucket in your AWS account by running the following AWS Command Line Interface (AWS CLI) commands. For more information on how to set up the AWS CLI, refer to Configuration Basics.

These commands copy the files that contain data for customer and lineorder tables. Replace <BUCKET NAME> with the name of an S3 bucket in your AWS account.

aws s3 sync s3://awssampledb/load/ s3://<BUCKET NAME>/customer/ \
--exclude "*" --include "customer-fw.tbl-00*" --exclude "*.bak"

aws s3api copy-object --copy-source awssampledb/load/lo/lineorder-single.tbl000.gz \
--key lineorder/lineorder-single.tbl000.gz --bucket <BUCKET NAME> \
--tagging-directive REPLACE

For this post, we use the default settings for storing data and logging access requests within Amazon S3. You can enhance the security of your sensitive data with the following methods:

  • Implement encryption at rest using AWS Key Management Service (AWS KMS) and customer managed encryption keys
  • Use AWS CloudTrail and audit logging
  • Restrict access to AWS resources based on the least privilege principle

Additionally, Lake Formation is integrated with CloudTrail, a service that provides a record of actions taken by a user, role, or AWS service in Lake Formation. CloudTrail captures all Lake Formation API calls as events and is enabled by default when you create a new AWS account. When activity occurs in Lake Formation, that activity is recorded as a CloudTrail event along with other AWS service events in event history. For audit and access monitoring purposes, all federated user logins are logged via CloudTrail under the AssumeRoleWithSAML event name. You can also view specific user activity based on their user name in CloudTrail.

To create a database and tables in the Data Catalog, open the query editor on the Athena console and enter the following DDL statements. Replace <BUCKET NAME> with the name of the S3 bucket in your account.

CREATE DATABASE salesdata;
CREATE EXTERNAL TABLE salesdata.customer
(
    c_custkey VARCHAR(10),
    c_name VARCHAR(25),
    c_address VARCHAR(25),
    c_city VARCHAR(10),
    c_nation VARCHAR(15),
    c_region VARCHAR(12),
    c_phone VARCHAR(15),
    c_mktsegment VARCHAR(10)
)
-- The data files contain fixed width columns hence using RegExSerDe
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
    "input.regex" = "(.{10})(.{25})(.{25})(.{10})(.{15})(.{12})(.{15})(.{10})"
)
LOCATION 's3://<BUCKET NAME>/customer/';

CREATE EXTERNAL TABLE salesdata.lineorder(
  `lo_orderkey` int, 
  `lo_linenumber` int, 
  `lo_custkey` int, 
  `lo_partkey` int, 
  `lo_suppkey` int, 
  `lo_orderdate` int, 
  `lo_orderpriority` varchar(15), 
  `lo_shippriority` varchar(1), 
  `lo_quantity` int, 
  `lo_extendedprice` int, 
  `lo_ordertotalprice` int, 
  `lo_discount` int, 
  `lo_revenue` int, 
  `lo_supplycost` int, 
  `lo_tax` int, 
  `lo_commitdate` int, 
  `lo_shipmode` varchar(10))
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY '|' 
LOCATION 's3://<BUCKET NAME>/lineorder/';

Verify that tables are created and you can see the data:

SELECT * FROM "salesdata"."customer" limit 10;
SELECT * FROM "salesdata"."lineorder" limit 10;

Set up the Lake Formation permission model

Lake Formation uses a combination of Lake Formation permissions and IAM permissions to achieve fine-grained access control. The recommended approach includes the following:

  • Coarse-grained IAM permissions – These apply to the IAM role that users assume when running queries in Athena. IAM permissions control access to Lake Formation, AWS Glue, and Athena APIs.
  • Fine-grained Lake Formation grants – These control access to Data Catalog resources, Amazon S3 locations, and the underlying data at those locations. With these grants, you can give access to specific tables or only columns that contain specific data values.

Configure IAM role permissions

Earlier in the walkthrough, you created the IAM role adfs-data-access and attached the AWS managed IAM policy AthenaFullAccess to it. This policy has all the permissions required for the purposes of this post.

For more information, see the Data Analyst Permissions section in Lake Formation Personas and IAM Permissions Reference.

Register an S3 bucket as a data lake location

The mechanism to govern access to an Amazon S3 location using Lake Formation is to register a data lake location. Complete the following steps:

  1. On the Lake Formation console, choose Data lake locations.
  2. Choose Register location.
  3. For Amazon S3 path, choose Browse and locate your bucket.
  4. For IAM role, choose AWSServiceRoleForLakeFormationDataAccess.

In this step, you specify an IAM service-linked role, which Lake Formation assumes when it grants temporary credentials to integrated AWS services that access the data in this location. This role and its permissions are managed by Lake Formation and can’t be changed by IAM principals.

  1. Choose Register location.

Configure data permissions

Now that you have registered the Amazon S3 path, you can give AD groups appropriate permissions to access tables and columns in the salesdata database. The following table summarizes the new permissions.

Database and Table AD Group Name Table Permissions Data Permissions
salesdata.customer data-customer Select c_city, c_custkey, c_mktsegment, c_nation, and c_region
salesdata.customer data-customer-pii Select All data access
salesdata.lineorder data-order Select All data access
  1. On the Lake Formation console, choose Tables in the navigation pane.
  2. Filter tables by the salesdata database.
  3. Select the customer table and on the Actions menu, choose View permissions.

You should see following existing permissions. These entries allow the current data lake administrator to access the table and all its columns.

  1. To add new permissions, select the table and on the Actions menu, choose Grant.
  2. Select SAML user and groups.
  3. For SAML and Amazon QuickSight users and groups, enter arn:aws:iam::<AWS ACCOUNT NUMBER>:saml-provider/adfs-saml-provider:group/data-customer.

To get this value, get the ARN of the SAML provider from the IAM console and append :group/data-customer to the end of it.

  1. Select Named data catalog resources.
  2. For Databases, choose the salesdata database.
  3. For Tables, choose the customer table.
  4. For Table permissions, select Select.
  5. For Data permissions, select Column-based access.
  6. For Select columns, add the columns c_city, c_custkey, c_mktsegment, c_nation, and c_region.
  7. Choose Grant.

You have now allowed members of the AD group data-customer to have access to columns of the customer table that don’t include PII.

  1. Repeat these steps for the customer table and data-customer-pii group with all data access.
  2. Repeat these steps for the lineorder table and data-order group with all data access.

Set up a SQL client with JDBC connection and verify permissions

In this post, we use SQL Workbench to access Athena through AD authentication and verify the Lake Formation permissions you created in the previous section.

Prepare the SQL client

To set up the SQL client, complete the following steps:

  1. Download and extract the Lake Formation-compatible Athena JDBC driver with AWS SDK (2.0.14 or later version) from Using Athena with the JDBC Driver.
  2. Go to the SQL Workbench/J website and download the latest stable package.
  3. Install SQL Workbench/J on your client computer.
  4. In SQL Workbench, on the File menu, choose Manage Drivers.
  5. Choose the New driver icon.
  6. For Name, enter Athena JDBC Driver.
  7. For Library, browse to and choose the Simba Athena JDBC .jar file that you just downloaded.
  8. Choose OK.

You’re now ready to create connections in SQL Workbench for your users.

Create connections in SQL Workbench

To create your connections, complete the following steps:

  1. On the File menu, choose Connect.
  2. Enter the name Athena-FinanceUser.
  3. For Driver, choose the Simba Athena JDBC driver.
  4. For URL, enter the following code (replace the placeholders with actual values from your setup and remove the line breaks to make a single line connection string):
jdbc:awsathena://AwsRegion=<AWS Region Name e.g. ap-southeast-2>;
S3OutputLocation=s3://<Athena Query Result Bucket Name>/jdbc;
plugin_name=com.simba.athena.iamsupport.plugin.AdfsCredentialsProvider;
idp_host=<adfs-server-name e.g. adfs.company.com>;
idp_port=443;
preferred_role=<ARN of the role created in step1 e.g. arn>;
[email protected]<Domain Name e.g. company.com>;
password=<password>;
SSL_Insecure=true;
LakeFormationEnabled=true;

For this post, we used a self-signed certificate with AD FS. This certificate is not trusted by the client, therefore authentication doesn’t succeed. This is why the SSL_Insecure attribute is set to true to allow authentication despite the self-signed certificate. In real-world setups, you would use valid trusted certificates and can remove the SSL_Insecure attribute.

  1. Create a new SQL workbench profile named Athena-CustomerOpsUser and repeat the earlier steps with CustomerOpsUser in the connection URL string.
  2. To test the connections, choose Test for each user, and confirm that the connection succeeds.

Verify access permissions

Now we can verify permissions for FinanceUser. In the SQL Workbench Statement window, run the following SQL SELECT statement:

SELECT * FROM "salesdata"."lineorder" limit 10;
SELECT * FROM "salesdata"."customer" limit 10;

Verify that only non-PII columns are returned from the customer table.

As you see in the preceding screenshots, FinanceUser only has access to non-PII columns of the customer table and full access to (all columns) of the lineorder table. This allows FinanceUser, for example, to run aggregate and summary queries based on market segment or location of customers without having access to their personal information.

Run a similar query for CustomerOpsUser. You should be able to see all columns, including columns containing PII, in the customer table.

Conclusion

This post demonstrated how to configure your data lake permissions using Lake Formation for AD users and groups. We configured AD FS 3.0 on your Active Directory and used it as an IdP to federate into AWS using SAML. This post also showed how you can integrate your Athena JDBC driver to AD FS and use your AD credentials directly to connect to Athena.

Integrating your Active Directory with the Athena JDBC driver gives you the flexibility to access Athena from business intelligence tools you’re already familiar with to analyze the data in your Amazon S3 data lake. This enables you to have a consistent central permission model that is managed through AD users and their group memberships.


About the Authors

Mostafa Safipour is a Solutions Architect at AWS based out of Sydney. Over the past decade he has helped many large organizations in the ANZ region build their data, digital, and enterprise workloads on AWS.

Praveen Kumar is a Specialist Solution Architect at AWS with expertise in designing, building, and implementing modern data and analytics platforms using cloud-native services. His areas of interests are serverless technology, streaming applications, and modern cloud data warehouses.

How NerdWallet uses AWS and Apache Hudi to build a serverless, real-time analytics platform

Post Syndicated from Kevin Chun original https://aws.amazon.com/blogs/big-data/how-nerdwallet-uses-aws-and-apache-hudi-to-build-a-serverless-real-time-analytics-platform/

This is a guest post by Kevin Chun, Staff Software Engineer in Core Engineering at NerdWallet.

NerdWallet’s mission is to provide clarity for all of life’s financial decisions. This covers a diverse set of topics: from choosing the right credit card, to managing your spending, to finding the best personal loan, to refinancing your mortgage. As a result, NerdWallet offers powerful capabilities that span across numerous domains, such as credit monitoring and alerting, dashboards for tracking net worth and cash flow, machine learning (ML)-driven recommendations, and many more for millions of users.

To build a cohesive and performant experience for our users, we need to be able to use large volumes of varying user data sourced by multiple independent teams. This requires a strong data culture along with a set of data infrastructure and self-serve tooling that enables creativity and collaboration.

In this post, we outline a use case that demonstrates how NerdWallet is scaling its data ecosystem by building a serverless pipeline that enables streaming data from across the company. We iterated on two different architectures. We explain the challenges we ran into with the initial design and the benefits we achieved by using Apache Hudi and additional AWS services in the second design.

Problem statement

NerdWallet captures a sizable amount of spending data. This data is used to build helpful dashboards and actionable insights for users. The data is stored in an Amazon Aurora cluster. Even though the Aurora cluster works well as an Online Transaction Processing (OLTP) engine, it’s not suitable for large, complex Online Analytical Processing (OLAP) queries. As a result, we can’t expose direct database access to analysts and data engineers. The data owners have to solve requests with new data derivations on read replicas. As the data volume and the diversity of data consumers and requests grow, this process gets more difficult to maintain. In addition, data scientists mostly require data files access from an object store like Amazon Simple Storage Service (Amazon S3).

We decided to explore alternatives where all consumers can independently fulfill their own data requests safely and scalably using open-standard tooling and protocols. Drawing inspiration from the data mesh paradigm, we designed a data lake based on Amazon S3 that decouples data producers from consumers while providing a self-serve, security-compliant, and scalable set of tooling that is easy to provision.

Initial design

The following diagram illustrates the architecture of the initial design.

The design included the following key components:

  1. We chose AWS Data Migration Service (AWS DMS) because it’s a managed service that facilitates the movement of data from various data stores such as relational and NoSQL databases into Amazon S3. AWS DMS allows one-time migration and ongoing replication with change data capture (CDC) to keep the source and target data stores in sync.
  2. We chose Amazon S3 as the foundation for our data lake because of its scalability, durability, and flexibility. You can seamlessly increase storage from gigabytes to petabytes, paying only for what you use. It’s designed to provide 11 9s of durability. It supports structured, semi-structured, and unstructured data, and has native integration with a broad portfolio of AWS services.
  3. AWS Glue is a fully managed data integration service. AWS Glue makes it easier to categorize, clean, transform, and reliably transfer data between different data stores.
  4. Amazon Athena is a serverless interactive query engine that makes it easy to analyze data directly in Amazon S3 using standard SQL. Athena scales automatically—running queries in parallel—so results are fast, even with large datasets, high concurrency, and complex queries.

This architecture works fine with small testing datasets. However, the team quickly ran into complications with the production datasets at scale.

Challenges

The team encountered the following challenges:

  • Long batch processing time and complexed transformation logic – A single run of the Spark batch job took 2–3 hours to complete, and we ended up getting a fairly large AWS bill when testing against billions of records. The core problem was that we had to reconstruct the latest state and rewrite the entire set of records per partition for every job run, even if the incremental changes were a single record of the partition. When we scaled that to thousands of unique transactions per second, we quickly saw the degradation in transformation performance.
  • Increased complexity with a large number of clients – This workload contained millions of clients, and one common query pattern was to filter by single client ID. There were numerous optimizations that we were forced to tack on, such as predicate pushdowns, tuning the Parquet file size, using a bucketed partition scheme, and more. As more data owners adopted this architecture, we would have to customize each of these optimizations for their data models and consumer query patterns.
  • Limited extendibility for real-time use cases – This batch extract, transform, and load (ETL) architecture wasn’t going to scale to handle hourly updates of thousands of records upserts per second. In addition, it would be challenging for the data platform team to keep up with the diverse real-time analytical needs. Incremental queries, time-travel queries, improved latency, and so on would require heavy investment over a long period of time. Improving on this issue would open up possibilities like near-real-time ML inference and event-based alerting.

With all these limitations of the initial design, we decided to go all-in on a real incremental processing framework.

Solution

The following diagram illustrates our updated design. To support real-time use cases, we added Amazon Kinesis Data Streams, AWS Lambda, Amazon Kinesis Data Firehose and Amazon Simple Notification Service (Amazon SNS) into the architecture.

The updated components are as follows:

  1. Amazon Kinesis Data Streams is a serverless streaming data service that makes it easy to capture, process, and store data streams. We set up a Kinesis data stream as a target for AWS DMS. The data stream collects the CDC logs.
  2. We use a Lambda function to transform the CDC records. We apply schema validation and data enrichment at the record level in the Lambda function. The transformed results are published to a second Kinesis data stream for the data lake consumption and an Amazon SNS topic so that changes can be fanned out to various downstream systems.
  3. Downstream systems can subscribe to the Amazon SNS topic and take real-time actions (within seconds) based on the CDC logs. This can support use cases like anomaly detection and event-based alerting.
  4. To solve the problem of long batch processing time, we use Apache Hudi file format to store the data and perform streaming ETL using AWS Glue streaming jobs. Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and data pipeline development. Hudi allows you to build streaming data lakes with incremental data pipelines, with support for transactions, record-level updates, and deletes on data stored in data lakes. Hudi integrates well with various AWS analytics services such as AWS Glue, Amazon EMR, and Athena, which makes it a straightforward extension of our previous architecture. While Apache Hudi solves the record-level update and delete challenges, AWS Glue streaming jobs convert the long-running batch transformations into low-latency micro-batch transformations. We use the AWS Glue Connector for Apache Hudi to import the Apache Hudi dependencies in the AWS Glue streaming job and write transformed data to Amazon S3 continuously. Hudi does all the heavy lifting of record-level upserts, while we simply configure the writer and transform the data into Hudi Copy-on-Write table type. With Hudi on AWS Glue streaming jobs, we reduce the data freshness latency for our core datasets from hours to under 15 minutes.
  5. To solve the partition challenges for high cardinality UUIDs, we use the bucketing technique. Bucketing groups data based on specific columns together within a single partition. These columns are known as bucket keys. When you group related data together into a single bucket (a file within a partition), you significantly reduce the amount of data scanned by Athena, thereby improving query performance and reducing cost. Our existing queries are filtered on the user ID already, so we significantly improve the performance of our Athena usage without having to rewrite queries by using bucketed user IDs as the partition scheme. For example, the following code shows total spending per user in specific categories:
    SELECT ID, SUM(AMOUNT) SPENDING
    FROM "{{DATABASE}}"."{{TABLE}}"
    WHERE CATEGORY IN (
    'ENTERTAINMENT',
    'SOME_OTHER_CATEGORY')
    AND ID_BUCKET ='{{ID_BUCKET}}'
    GROUP BY ID;

  1. Our data scientist team can access the dataset and perform ML model training using Amazon SageMaker.
  2. We maintain a copy of the raw CDC logs in Amazon S3 via Amazon Kinesis Data Firehose.

Conclusion

In the end, we landed on a serverless stream processing architecture that can scale to thousands of writes per second within minutes of freshness on our data lakes. We’ve rolled out to our first high-volume team! At our current scale, the Hudi job is processing roughly 1.75 MiB per second per AWS Glue worker, which can automatically scale up and down (thanks to AWS Glue auto scaling). We’ve also observed an outstanding improvement of end-to-end freshness at less than 5 minutes due to Hudi’s incremental upserts vs. our first attempt.

With Hudi on Amazon S3, we’ve built a high-leverage foundation to personalize our users’ experiences. Teams that own data can now share their data across the organization with reliability and performance characteristics built into a cookie-cutter solution. This enables our data consumers to build more sophisticated signals to provide clarity for all of life’s financial decisions.

We hope that this post will inspire your organization to build a real-time analytics platform using serverless technologies to accelerate your business goals.


About the authors

Kevin Chun is a Staff Software Engineer in Core Engineering at NerdWallet. He builds data infrastructure and tooling to help NerdWallet provide clarity for all of life’s financial decisions.

Dylan Qu is a Specialist Solutions Architect focused on big data and analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Use Amazon Athena parameterized queries to provide data as a service

Post Syndicated from Blayze Stefaniak original https://aws.amazon.com/blogs/big-data/use-amazon-athena-parameterized-queries-to-provide-data-as-a-service/

Amazon Athena now provides you more flexibility to use parameterized queries for any query you send to Athena, and we recommend you use them as the best practice for your Athena queries moving forward so you benefit from the security, reusability, and simplicity they offer. In a previous post, Improve reusability and security using Amazon Athena parameterized queries, we explained how parameterized queries with prepared statements provide reusability of queries, protection against SQL injection, and masking of query strings from AWS CloudTrail events. In this post, we explain how you can run Athena parameterized queries using the ExecutionParameters property in your StartQueryExecution requests. We provide a sample application you can reference for using parameterized queries, with and without prepared statements. Athena parameterized queries can be integrated into many data driven applications, and we walk you through a sample data as a service application to see how parameterized queries can plug in.

Customers tell us they are finding new ways to make effective use of their data assets by providing data as a service (DaaS). In this post, we share a sample architecture using parameterized queries applied in the form of a DaaS application. This is helpful for many types of organizations, whether you’re working with an enterprise making data available to other lines of business, a regulator making reports available to your industry, a company monetizing your data assets, an independent software vendor (ISV) enabling your applications’ tenants to query their data when they need it, or trying to share data at scale in other ways. In DaaS applications, you can provide predefined queries to run against your governed datasets with values your users input. You can expand your DaaS application to break away from monolithic data infrastructure by treating data as a product (DaaP) and providing a distribution of datasets, which have distinct domain-specific data pipelines. You can authorize these datasets to consumers in your DaaS application permissions. You can use Athena parameterized queries as a way to predefine your queries, which you can use to run queries across your datasets, and serve as a layer of protection for your DaaS applications. This post first describes how parameterized queries work, then applies parameterized queries in the form of a DaaS application.

Feature overview

In any query you send to Athena, you can use positional parameters declared by a question mark (?) in your query string, then declare values as execution parameters sequentially in your StartQueryExecution request. You can use execution parameters with your existing prepared statements and also with any SQL queries in Athena. You can still take advantage of the reusability and security benefits of parameterized queries, and using execution parameters also masks your query’s parameters when viewing recent queries in Athena. You can also change from building SQL query strings manually to using execution parameters; this allows you to run parameterized queries without needing to first create prepared statements. For more information on using execution parameters, refer to StartQueryExecution.

Previously, you could only run parameterized queries by first creating prepared statements in your Athena workgroup, then running parameterized queries while passing variables into an EXECUTE SQL statement with the USING clause. You are no longer required to create and maintain prepared statements across all of your Athena workgroups to take advantage of parameterization. This is helpful if you run the same queries across multiple workgroups or otherwise do not need the prepared statements feature.

You can continue to use Athena workgroups to isolate, implement individual cost constraints, and track query-related metrics for tenants within your multi-tenant application. For example, your DaaS application’s customers can run the same queries against your dataset with separate workgroups. For more information on Athena workgroups, refer to Using workgroups for running queries.

Changing your code to use parameterized queries

Changing your existing code to use parameterized queries is a small change which will have an immediate positive impact. Previously, you were required to build your query string value manually using environment variables as parameter placeholders. Manipulating the query string can be burdensome and has an inherent risk for injecting undesired values or SQL fragments (such as SQL operators), regardless of intent. You can now replace variables in your query string with a question mark (?), and declare your variable values sequentially with the ExecutionParameters option. By doing so, you take advantage of the security benefits of parameterized queries, and your queries are less complicated to author and maintain. The syntax change is shown in the following code, using the AWS Command Line Interface (AWS CLI) as an example.

Previously, running queries against Athena without execution parameters:

aws athena start-query-execution \
--query-string "SELECT * FROM table WHERE x = $ARG1 AND y = $ARG2 AND z = $ARG3" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup

Now, running parameterized queries against Athena with execution parameters:

aws athena start-query-execution \
--query-string "SELECT * FROM table WHERE x = ? AND y = ? AND z = ?" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup \
--execution-parameters $ARG1 $ARG2 $ARG3

The following is an example of a command that creates a prepared statement in your Athena workgroup. To learn more about creating prepared statements, refer to Querying with prepared statements.

aws athena start-query-execution \
--query-string "PREPARE my-prepared-statement FROM SELECT * FROM table WHERE x = ? AND y = ? AND z = ?" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup

Previously, running parameterized queries against prepared statements without execution parameters:

aws athena start-query-execution \
--query-string "EXECUTE my-prepared-statement USING $ARG1, $ARG2, $ARG3“ \
--query-execution-context "Database"="default" \
--work-group myWorkGroup

Now, running parameterized queries against prepared statements with execution parameters:

aws athena start-query-execution \
--query-string "EXECUTE my-prepared-statement" \
--query-execution-context "Database"="default" \
--work-group myWorkGroup \
--execution-parameters $ARG1 $ARG2 $ARG3

Sample architecture

The purpose of this sample architecture is to apply the ExecutionParameters feature when running Athena queries, with and without prepared statements. This is not intended to be a DaaS solution for use with your production data.

This sample architecture exhibits a DaaS application with a user interface (UI) that presents three Athena parameterized queries written against the public Amazon.com customer reviews dataset. The following figure depicts this workflow when a user submits a query to Athena. This example uses AWS Amplify to host a front-end application. The application calls an Amazon API Gateway HTTP API, which invokes AWS Lambda functions to authenticate requests, fetch the Athena prepared statements and named queries, and run the parameterized queries against Athena. The Lambda function uses the name of the Athena workgroup, statement name, statement type (prepared statement or not), and a list of query parameters input by the user. Athena queries data in an Amazon Simple Storage Service (Amazon S3), bucket which is cataloged in AWS Glue, and presents results to the user on the DaaS application UI.

Diagram showing the process of using a sample DaaS web application. Web Application Users use an Amplify application to run Athena parameterized queries. The application sends HTTP requests to API Gateway. API Gateway authenticates incoming requests with a Lambda function. API Gateway processes the request to start the query against Athena. Athena uses Glue Data Catalog and queries data from an S3 bucket. The query results are stored in an S3 bucket, and presented to the Web Application Users.

End-users of the DaaS application UI can run only parameterized queries against Athena. The DaaS application UI demonstrates two ways to run parameterized queries with execution parameters: with and without prepared statements. In both cases, the Lambda function submits the query, waits for the query to complete, and provides the results that match the query parameters. The following figure depicts the DaaS application UI.

Screenshot of the application divided into two sections, one for querying prepared statements another without prepared statements. Both sections include a Workgroup name selector, statement selector, statement description, statement SQL query string, input fields to enter parameter arguments, and a button to launch the query. Selected on the screenshot is the sample workgroup created by the CloudFormation template, and a count of reviews in a given product category sample query statement. User entered ‘Video_Games’ as the product category.

You may want your users to have the ability to list all Athena prepared statements within your Athena workgroup, select a statement, input arguments, and run the query; on the left side of the DaaS application UI, you use an EXECUTE statement to query the data lake with an Athena prepared statement. You may have several reporting queries maintained in your code base. In this case, your users select a statement, input arguments, and run the query. On the right side of the DaaS application UI, you use a SELECT statement to use Athena parameterized queries without prepared statements.

Prerequisites

This post uses the following AWS services to demonstrate a DaaS architecture pattern that uses Athena to query the Amazon.com customer reviews dataset:

This post assumes you have the following:

Deploy the CloudFormation stack

In this section, you deploy a CloudFormation template that creates the following resources:

  • AWS Glue Data Catalog database
  • AWS Glue Data Catalog table
  • An Athena workgroup
  • Three Athena prepared statements
  • Three Athena named queries
  • The API Gateway HTTP API
  • The Lambda execution role for Athena queries
  • The Lambda execution role for API Gateway HTTP API authorization
  • Five Lambda functions:
    • Update the AWS Glue Data Catalog
    • Authorize API Gateway requests
    • Submit Athena queries
    • List Athena prepared statements
    • List Athena named queries

Note that this CloudFormation template was tested in AWS Regions ap-southeast-2, ca-central-1, eu-west-2, us-east-1, us-east-2, and us-west-2. Note that deploying this into your AWS account will incur cost. Steps for cleaning up the resources are included later in this post.

To deploy the CloudFormation stack, follow these steps:

  1. Navigate to this post’s GitHub repository.
  2. Clone the repository or copy the CloudFormation template athena-parameterized-queries.yaml.
  3. On the AWS CloudFormation console, choose Create stack.
  4. Select Upload a template file and choose Choose file.
  5. Upload athena-parameterized-queries.yaml, then choose Next.
  6. On the Specify stack details page, enter the stack name athena-parameterized-queries.
  7. On the same page, there are two parameters:
    1. For S3QueryResultsBucketName, enter the S3 bucket name in your AWS account and in the same AWS Region as where you’re running your CloudFormation stack. (For this post, we use the bucket name value, like my-bucket).
    2. For APIPassphrase, enter a passphrase to authenticate API requests. You use this later.
  8. Choose Next.
  9. On the Configure stack options page, choose Next.
  10. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names, and choose Create stack.

The script takes less than two minutes to run and change to a CREATE_COMPLETE state. If you deploy the stack twice in the same AWS account and Region, some resources may already exist, and the process fails with a message indicating the resource already exists in another template.

  1. On the Outputs tab, copy the APIEndpoint value to use later.

For least-privilege authorization for deployment of the CloudFormation template, you can create an AWS CloudFormation service role with the following IAM policy actions. To do this, you must create an IAM policy and IAM role, and choose this role when configuring stack options. You need to replace the values for ${Partition}, ${AccountId}, and ${Region} with your own values; for more information on these values, refer to Pseudo parameters reference.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "IAM",
            "Effect": "Allow",
            "Action": [
                "iam:GetRole",
                "iam:UntagRole",
                "iam:TagRole",
                "iam:CreateRole",
                "iam:DeleteRole",
                "iam:PassRole",
                "iam:GetRolePolicy",
                "iam:PutRolePolicy",
                "iam:AttachRolePolicy",
                "iam:TagPolicy",
                "iam:DeleteRolePolicy",
                "iam:DetachRolePolicy",
                "iam:UntagPolicy"
            ],
            "Resource": [
                "arn:${Partition}:iam::${AccountId}:role/LambdaAthenaExecutionRole-athena-parameterized-queries",
                "arn:${Partition}:iam::${AccountId}:role/service-role/LambdaAthenaExecutionRole-athena-parameterized-queries",
                "arn:${Partition}:iam::${AccountId}:role/service-role/LambdaAuthorizerExecutionRole-athena-parameterized-queries",
                "arn:${Partition}:iam::${AccountId}:role/LambdaAuthorizerExecutionRole-athena-parameterized-queries"
            ]
        },
        {
            "Sid": "LAMBDA",
            "Effect": "Allow",
            "Action": [
                "lambda:CreateFunction",
                "lambda:GetFunction",
                "lambda:InvokeFunction",
                "lambda:AddPermission",
                "lambda:DeleteFunction",
                "lambda:RemovePermission",
                "lambda:UpdateFunctionConfiguration"
            ],
            "Resource": [
                "arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaRepairFunction-athena-parameterized-queries",
                "arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaAthenaFunction-athena-parameterized-queries",
                "arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaAuthorizerFunction-athena-parameterized-queries",
                "arn:${Partition}:lambda:${Region}:${AccountId}:function:GetPrepStatements-athena-parameterized-queries",
                "arn:${Partition}:lambda:${Region}:${AccountId}:function:GetNamedQueries-athena-parameterized-queries"
            ]
        },
        {
            "Sid": "ATHENA",
            "Effect": "Allow",
            "Action": [
                "athena:GetWorkGroup",
                "athena:CreateWorkGroup",
                "athena:DeleteWorkGroup",
                "athena:DeleteNamedQuery",
                "athena:CreateNamedQuery",
                "athena:CreatePreparedStatement",
                "athena:DeletePreparedStatement",
                "athena:GetPreparedStatement"
            ],
            "Resource": [
                "arn:${Partition}:athena:${Region}:${AccountId}:workgroup/ParameterizedStatementsWG"
            ]
        },
        {
            "Sid": "GLUE",
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase",
                "glue:DeleteDatabase",
                "glue:CreateTable",
                "glue:DeleteTable"
            ],
            "Resource": [
                "arn:${Partition}:glue:${Region}:${AccountId}:catalog",
                "arn:${Partition}:glue:${Region}:${AccountId}:database/athena_prepared_statements",
                "arn:${Partition}:glue:${Region}:${AccountId}:table/athena_prepared_statements/*",
                "arn:${Partition}:glue:${Region}:${AccountId}:userDefinedFunction/athena_prepared_statements/*"
            ]
        },
        {
            "Sid": "APIGATEWAY",
            "Effect": "Allow",
            "Action": [
                "apigateway:DELETE",
                "apigateway:PUT",
                "apigateway:PATCH",
                "apigateway:POST",
                "apigateway:TagResource",
                "apigateway:UntagResource"
            ],
            "Resource": [
                "arn:${Partition}:apigateway:${Region}::/apis/*/integrations*",
                "arn:${Partition}:apigateway:${Region}::/apis/*/stages*",
                "arn:${Partition}:apigateway:${Region}::/apis/*/authorizers*",
                "arn:${Partition}:apigateway:${Region}::/apis/*/routes*",
                "arn:${Partition}:apigateway:${Region}::/tags/arn%3Aaws%3Aapigateway%3A${Region}%3A%3A%2Fv2%2Fapis%2F*"
            ]
        },
        {
            "Sid": "APIGATEWAYMANAGEAPI",
            "Effect": "Allow",
            "Action": [
                "apigateway:DELETE",
                "apigateway:PUT",
                "apigateway:PATCH",
                "apigateway:POST",
                "apigateway:GET"
            ],
            "Resource": [
                "arn:${Partition}:apigateway:${Region}::/apis"
            ],
            "Condition": {
                "StringEquals": {
                    "apigateway:Request/ApiName": "AthenaAPI-athena-parameterized-queries"
                }
            }
        },
        {
            "Sid": "APIGATEWAYMANAGEAPI2",
            "Effect": "Allow",
            "Action": [
                "apigateway:DELETE",
                "apigateway:PUT",
                "apigateway:PATCH",
                "apigateway:POST",
                "apigateway:GET"
            ],
            "Resource": [
                "arn:${Partition}:apigateway:${Region}::/apis/*"
            ],
            "Condition": {
                "StringEquals": {
                    "apigateway:Resource/ApiName": "AthenaAPI-athena-parameterized-queries"
                }
            }
        },
        {
            "Sid": "APIGATEWAYGET",
            "Effect": "Allow",
            "Action": [
                "apigateway:GET"
            ],
            "Resource": [
                "arn:${Partition}:apigateway:${Region}::/apis/*"
            ]
        },
        {
            "Sid": "LAMBDALAYER",
            "Effect": "Allow",
            "Action": [
                "lambda:GetLayerVersion"
            ],
            "Resource": [
                "arn:${Partition}:lambda:*:280475519630:layer:boto3-1_24*"
            ]
        }
    ]
}

After you create the CloudFormation stack, you use the AWS management console to deploy an Amplify application and view the Lambda functions. The following is the scoped-down IAM policy that you can attach to an IAM user or role to perform these operations:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "AmplifyCreateApp",
            "Effect": "Allow",
            "Action": [
                "amplify:CreateBranch",
                "amplify:StartDeployment",
                "amplify:CreateDeployment",
                "amplify:CreateApp",
                "amplify:StartJob"
            ],
            "Resource": "arn:${Partition}:amplify:${Region}:${AccountId}:apps/*"
        },
        {
            "Sid": "AmplifyList",
            "Effect": "Allow",
            "Action": "amplify:List*",
            "Resource": "arn:${Partition}:amplify:${Region}:${AccountId}:apps/*"
        },
        {
            "Sid": "AmplifyGet",
            "Effect": "Allow",
            "Action": "amplify:GetJob",
            "Resource": "arn:${Partition}:amplify:${Region}:${AccountId}:apps/*"
        },
        {
            "Sid": "LambdaList",
            "Effect": "Allow",
            "Action": [
                "lambda:GetAccountSettings",
                "lambda:ListFunctions"
            ],
            "Resource": "*"
        },
        {
            "Sid": "LambdaFunction",
            "Effect": "Allow",
            "Action": [
                "lambda:GetFunction"
            ],
            "Resource": "arn:${Partition}:lambda:${Region}:${AccountId}:function:LambdaAthenaFunction-athena-parameterized-queries"
        }
    ]
}

Note that you need the following IAM policy when deploying your Amplify application to set a global password, and when cleaning up your resources to delete the Amplify application. Remember to replace ${AppARN} with the ARN of the Amplify application. You can find the ARN after creating the Amplify app on the General tab in the App Settings section of the Amplify console.

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Sid": "UpdateAndDeleteAmplifyApp",
           "Effect": "Allow",
            "Action": [
                "amplify:DeleteApp",
                "amplify:UpdateApp"
            ],
           "Resource": "${AppARN}"
       }
   ]
}

Deploy the Amplify application

In this section, you deploy your Amplify application.

  1. In the cloned repository, open web-application/.env in a text editor.
  2. Set AWS_API_ENDPOINT as the APIEndpoint value from the CloudFormation stack Outputs For example: AWS_API_ENDPOINT="https://123456abcd.execute-api.your-region.amazonaws.com".
  3. Set API_AUTH_CODE as the value you input as the CloudFormation stack’s APIPassphrase parameter argument. For example: API_AUTH_CODE="YOUR_PASSPHRASE".
  4. Navigate to the web-application/ directory and run npm install.
  5. Run npm run build to compile distribution assets.
  6. On the Amplify console, choose All apps.
  7. Choose New app.
  8. Select Host web app, select Deploy without Git provider, then choose Continue.
  9. For App name, enter Athena Parameterized Queries App.
  10. For Environment name¸ you don’t need to enter a value.
  11. Select Drag and Drop.
  12. Locate the dist/ directory inside web-application/, drag it into the window and drop it. Ensure you drag the entire directory, not the files within it.Screen shot depicting dragging and dropping the "dist/" directory into Amplify's drag and drop area for the Athena Parameterized Queries App.
  13. Choose Save and deploy to deploy the web application on Amplify.

This step takes less than a minute to complete.

  1. Under App settings, choose Access control, then choose Manage access.
  2. Select Apply a global password, then enter values for Username and Password.

You use these credentials to access your Amplify application.

Access your Amplify application and run queries

In this section, you use the Amplify application to run Athena parameterized queries against the Amazon.com customer reviews dataset. The left side of the application shows how you can run parameterized queries using Athena prepared statements. The right side of the application shows how you can run parameterized queries without prepared statements, such as if the queries are written in your code. The sample in this post uses named queries within the Athena workgroup. For more information about named queries, refer to NamedQuery.

  1. Open the Amplify web application link located under Domain. For example: https://dev123.abcd12345xyz.amplifyapp.com/.
  2. In the Sign in prompt, enter the user name and password you provided as the Amplify application global password.
  3. For Workgroup Name, choose the ParameterizedStatementsWG workgroup.
  4. Choose a statement example on the Prepared Statement or SQL Statement drop-down menu.

Selecting a statement displays a description about the query, including examples of parameters you can try with this statement, and the original SQL query string. SQL parameters of type string must be surrounded by single quotes, for example: 'your_string_value'.

  1. Enter your query parameters.

The following figure shows an example of the parameters to input for the product_helpful_reviews prepared statement.

Screenshot of the Athena prepared statements window in the DaaS application. The sample workgroup created by the CloudFormation template is selected. A sample query is selected, which retrieves customer reviews for a given product id based on the review's star rating and count of helpful votes. The user entered ‘BT00DDVMVQ’ as the product id value, 4 as the star rating value, and 10 as the value for minimum count of helpful votes.

  1. Choose Run Query to send the query request to the API endpoint.

After the query runs, the sample application presents the results in a table format, as depicted in the following screenshot. This is one of many ways to present results, and your application can display results in the format which makes the most sense for your users. The complete query workflow is depicted in the previous architecture diagram.

Screenshot of the sample application's query results rendered in a table format. The table has columns for product_id, product_title, star_rating, helpful_votes, review_headline, and review_body. The query returned two results, which are 4 star reviews for the Amazon Smile eGift Card.

Using execution parameters with the AWS SDK for Python (Boto3)

In this section, you inspect the Lambda function code for using the StartQueryExecution API with and without prepared statements.

  1. On the Lambda console, choose Functions.
  2. Navigate to the LambdaAthenaFunction-athena-parameterized-queries function.
  3. Choose the Code Source window.

Examples of passing parameters to the Athena StartQueryExecution API using the AWS SDK for Python (Boto3) begin on lines 39 and 49. Note the ExecutionParameters option on lines 45 and 55.

The following code uses execution parameters with Athena prepared statements:

response = athena.start_query_execution(
    QueryString=f'EXECUTE {statement}', # Example: "EXECUTE prepared_statement_name"
    WorkGroup=workgroup,
    QueryExecutionContext={
        'Database': 'athena_prepared_statements'
    },
    ExecutionParameters=input_parameters
)

The following code uses execution parameters without Athena prepared statements:

response = athena.start_query_execution(
    QueryString=statement, # Example: "SELECT * FROM TABLE WHERE parameter_name = ?"
    WorkGroup=workgroup,
    QueryExecutionContext={
        'Database': 'athena_prepared_statements'
    },
    ExecutionParameters=input_parameters
)

Clean up

In this post, you created several components, which generate cost. To avoid incurring future charges, remove the resources with the following steps:

  1. Delete the S3 bucket’s results prefix created after you ran a query on your workgroup.

With the default template, the prefix is named <S3QueryResultsBucketName>/athena-results. Use caution in this step. Unless you are using versioning on your S3 bucket, deleting S3 objects can’t be undone.

  1. On the Amplify console, select the app to delete and on the Actions menu, choose Delete app, then confirm.
  2. On the AWS CloudFormation console, select the stack to delete, choose Delete, and confirm.

Conclusion

In this post, we showed how you can build a DaaS application using Athena parameterized queries. The StartQueryExecution API in Athena now supports execution parameters, which allows you to run any Athena query as a parameterized query. You can decouple your execution parameters from your query strings, and use parameterized queries without being limited to the Athena workgroups where you have created prepared statements. You can take advantage of the security benefits Athena offers with parameterized queries, and developers no longer need to build query strings manually. In this post, you learned how to use execution parameters, and you deployed a DaaS reference architecture to see how parameterized queries can be applied.

You can get started with Athena parameterized queries by using the Athena console, the AWS CLI, or the AWS SDK. To learn more about Athena, refer to the Amazon Athena User Guide.

Thanks for reading this post! If you have questions about Athena prepared statements and parameterized queries, don’t hesitate to leave a comment.


About the Authors

Blayze Stefaniak is a Senior Solutions Architect for the Technical Strategist Program supporting Executive Customer Programs in AWS Marketing. He has experience working across industries including healthcare, automotive, and public sector. He is passionate about breaking down complex situations into something practical and actionable. In his spare time, you can find Blayze listening to Star Wars audiobooks, trying to make his dogs laugh, and probably talking on mute.

Daniel Tatarkin is a Solutions Architect at Amazon Web Services (AWS) supporting Federal Financial organizations. He is passionate about big data analytics and serverless technologies. Outside of work, he enjoys learning about personal finance, coffee, and trying out new programming languages for fun.

Matt Boyd is a Senior Solutions Architect at AWS working with federal financial organizations. He is passionate about effective cloud management and governance, as well as data governance strategies. When he’s not working, he enjoys running, weight lifting, and teaching his elementary-age son ethical hacking skills.

Analyzing Amazon SES event data with AWS Analytics Services

Post Syndicated from Oscar Mendoza original https://aws.amazon.com/blogs/messaging-and-targeting/analyzing-amazon-ses-event-data-with-aws-analytics-services/

In this post, we will walk through using AWS Services, such as, Amazon Kinesis Firehose, Amazon Athena and Amazon QuickSight to monitor Amazon SES email sending events with the granularity and level of detail required to get insights from your customers engage with the emails you send.

Nowadays, email Marketers rely on internal applications to create their campaigns or any communications requirements, such us newsletters or promotional content. From those activities, they need to collect as much information as possible to analyze and improve their pipeline to get better interaction with the customers. Data such us bounces, rejections, success reception, delivery delays, complaints or open rate can be a powerful tool to understand the customers. Usually applications work with high-level data points without detailed logging or granular information that could help improve even better the effectiveness of their campaigns.

Amazon Simple Email Service (SES) is a smart tool for companies that wants a cost-effective, flexible, and scalable email service solution to easily integrate with their own products. Amazon SES provides methods to control your sending activity with built-in integration with Amazon CloudWatch Metrics and also provides a mechanism to collect the email sending events data.

In this post, we propose an architecture and step-by-step guide to track your email sending activities at a granular level, where you can configure several types of email sending events, including sends, deliveries, opens, clicks, bounces, complaints, rejections, rendering failures, and delivery delays. We will use the configuration set feature of Amazon SES to send detailed logging to our analytics services to store, query and create dashboards for a detailed view.

Overview of solution

This architecture uses Amazon SES built-in features and AWS analytics services to provide a quick and cost-effective solution to address your mail tracking requirements. The following services will be implemented or configured:

The following diagram shows the architecture of the solution:

Serverless Architecture to Analyze Amazon SES events

Figure 1. Serverless Architecture to Analyze Amazon SES events

The flow of the events starts when a customer uses Amazon SES to send an email. Each of those send events will be capture by the configuration set feature and forward the events to a Kinesis Firehose delivery stream to buffer and store those events on an Amazon S3 bucket.

After storing the events, it will be required to create a database and table schema and store it on AWS Glue Data Catalog in order for Amazon Athena to be able to properly query those events on S3. Finally, we will use Amazon QuickSight to create interactive dashboard to search and visualize all your sending activity with an email level of detailed.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Walkthrough

Step 1: Use AWS CloudFormation to deploy some additional prerequisites

You can get started with our sample AWS CloudFormation template that includes some prerequisites. This template creates an Amazon S3 Bucket, an IAM role needed to access from Amazon SES to Amazon Kinesis Data Firehose.

To download the CloudFormation template, run one of the following commands, depending on your operating system:

In Windows:

curl https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml -o SES-Blog-PreRequisites.yml

In MacOS

wget https://raw.githubusercontent.com/aws-samples/amazon-ses-analytics-blog/main/SES-Blog-PreRequisites.yml

To deploy the template, use the following AWS CLI command:

aws cloudformation deploy --template-file ./SES-Blog-PreRequisites.yml --stack-name ses-dashboard-prerequisites --capabilities CAPABILITY_NAMED_IAM

After the template finishes creating resources, you see the IAM Service role and the Delivery Stream on the stack Outputs tab. You are going to use these resources in the following steps.

IAM Service role and Delivery Stream created by CloudFormation template

Figure 2. CloudFormation template outputs

Step 2: Creating a configuration set in SES and setting the default configuration set for a verified identity

SES can track the number of send, delivery, open, click, bounce, and complaint events for each email you send. You can use event publishing to send information about these events to other AWS service. In this case we are going to send the events to Kinesis Firehose. To do this, a configuration set is required.

To create a configuration set, complete the following steps:

  1. On the AWS Console, choose the Amazon Simple Email Service.
  2. Choose Configuration sets.
  3. Click on Create set.

    Create a configuration set in Amazon SES

    Figure 3. Amazon SES Create Configuration Set

  4. Set a Configuration set name.
  5. Leave the other configurations by default.

    Write a name for your configuration set

    Figure 4. Configuration Set Name

  6. Once the configuration set is created, select Event destinations

    Configuration set created successfully

    Figure 5. Configuration set created successfully

  7. Click on Add destination
  8. Select the event types you would like to analyze and then click on next.

    Sending Events to analyze

    Figure 6. Sending Events to analyze

  9. Select Amazon Kinesis Data Firehose as the destination, choose the delivery stream and the IAM role created previously, click on next and in the review page, click on Add destination.

    Destination for Amazon SES sending events

    Figure 7. Destination for Amazon SES sending events

  10. Once you have created the configuration set and added the event destination, you can define the Default configuration set for the verified identity (domain or email address). In the SES console, choose Verified identities.

    Amazon SES Verified Identity

    Figure 8 Amazon SES Verified Identity

  11. Choose the verified identity from which you want to collect events and select Configuration set. Click on Edit.

    Edit Configuration Set for Verified Identity

    Figure 9. Edit Configuration Set for Verified Identity

  12. Click on the checkbox Assign a default configuration set and choose the configuration set created previously.

    Assign default configuration set

    Figure 10. Assign default configuration set

  13. Once you have completed the previous steps, your events will be sent to Amazon S3. Due to the buffer’s configuration on the Kinesis Delivery Stream, the data will be loaded every 5 minutes or every 5 MiB to Amazon S3. You can check the structure created on the bucket and see json logs with SES events data.

    Amazon S3 bucket structure

    Figure 11. Amazon S3 bucket structure

Step 3: Using Amazon Athena to query the SES event logs

Amazon SES publishes email sending event records to Amazon Kinesis Data Firehose in JSON format. The top-level JSON object contains an eventType string, a mail object, and either a Bounce, Complaint, Delivery, Send, Reject, Open, Click, Rendering Failure, or DeliveryDelay object, depending on the type of event.

  1. In order to simplify the analysis of email sending events, create the sesmaster table by running the following script in Amazon Athena. Don’t forget to change the location in the following script with your own bucket containing the data of email sending events.
    CREATE EXTERNAL TABLE sesmaster (
    
    eventType string,
    
    complaint struct<arrivaldate:string,
    complainedrecipients:array<struct<emailaddress:string>>,
    complaintfeedbacktype:string,
    feedbackid:string,
    `timestamp`:string,
    useragent:string>,
    
    bounce struct<bouncedrecipients:array<struct<action:string,
    diagnosticcode:string,
    emailaddress:string,
    status:string>>,
    bouncesubtype:string,
    bouncetype:string,
    feedbackid:string,
    reportingmta:string,
    `timestamp`:string>,
    
    mail struct<`timestamp`:string,
    source:string,
    sourceArn:string,
    sendingAccountId:string,
    messageId:string,
    destination:string,
    headersTruncated:boolean,
    headers:array<struct<name:string,
    value:string>>,
    commonHeaders:struct<`from`:array<string>,
    to:array<string>,
    messageId:string,
    subject:string>,
    tags:struct<ses_configurationset:string,
    ses_source_ip:string,
    ses_outgoing_ip:string,
    ses_from_domain:string,
    ses_caller_identity:string> >,
    
    send string,
    
    delivery struct<processingtimemillis:int,
    recipients:array<string>,
    reportingmta:string,
    smtpresponse:string,
    `timestamp`:string>,
    
    open struct<ipaddress:string,
    `timestamp`:string,
    userAgent:string>,
    
    reject struct<reason:string>,
    
    click struct<ipAddress:string,
    `timestamp`:string,
    userAgent:string,
    link:string>
    
    ) 
    ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    "mapping.ses_configurationset"="ses:configuration-set" , "mapping.ses_source_ip"="ses:source-ip" , 
    "mapping.ses_from_domain"="ses:from-domain" , "mapping.ses_caller_identity"="ses:caller-identity" , 
    "mapping.ses_outgoing_ip"="ses:outgoing-ip" ) LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/'

    The sesmaster table uses the org.openx.data.jsonserde.JsonSerDe SerDe library to deserialize the JSON data.

    We have leveraged the support for JSON arrays and maps and the support for nested data structures. Those features ease the process of preparation and visualization of data.

    In the sesmaster table, the following mappings were applied to avoid errors due to name of JSON fields containing colons.

    • “mapping.ses_configurationset”=”ses:configuration-set”
    • “mapping.ses_source_ip”=”ses:source-ip”
    • “mapping.ses_from_domain”=”ses:from-domain”
    • “mapping.ses_caller_identity”=”ses:caller-identity” “mapping.ses_outgoing_ip”=”ses:outgoing-ip”
  2. Once the sesmaster table is ready, it is a good strategy to create curated views of its data. The first view called vwSESMaster contains all the records of email sending events and all the fields which are unique on each event. Create the vwSESMaster view by running the following script in Amazon Athena.
    CREATE OR REPLACE VIEW vwSESMaster AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , delivery.processingtimemillis as deliveryprocessingtimemillis
    , delivery.reportingmta as deliveryreportingmta
    , delivery.smtpresponse as deliverysmtpresponse
    , delivery.timestamp as deliverytimestamp
    , delivery.recipients[1] as deliveryrecipient
    , open.ipaddress as openipaddress
    , open.timestamp as opentimestamp
    , open.userAgent as openuseragent
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , click.ipAddress as clickipaddress
    , click.timestamp as clicktimestamp
    , click.userAgent as clickuseragent
    , click.link as clicklink
    , complaint.timestamp as complainttimestamp
    , complaint.userAgent as complaintuseragent
    , complaint.complaintFeedbackType as complaintcomplaintfeedbacktype
    , complaint.arrivalDate as complaintarrivaldate
    , reject.reason as rejectreason
    FROM
    sesmaster

    The sesmaster table contains some fields which are represented by nested arrays, so it is necessary to flatten them into multiples rows. Following you can see the event types and the fields which need to be flatten.

    • Event type SEND: field mail.commonHeaders
    • Event type BOUNCE: field bounce.bouncedrecipients
    • Event type COMPLAINT: field complaint.complainedrecipients

    To flatten those arrays into multiple rows, we used the CROSS JOIN in conjunction with the UNNEST operator using the following strategy for all the three events:

    • Create a temporal view with the mail.messageID and the field to be flattened.
    • Create another temporal view with the array flattened into multiple rows.
    • Create the final view joining the sesmaster table with the second temporal view by event type and mail.messageID.

    To create those views, follow the next steps.

  3. Run the following scripts in Amazon Athena to flat the mail.commonHeaders array in the SEND event type
    CREATE OR REPLACE VIEW vwSendMailTmpSendTo AS 
    SELECT
    mail.messageId as messageid
    , mail.commonHeaders.to as recipients
    FROM
    sesmaster
    WHERE 
    eventtype='Send'
    
    CREATE OR REPLACE VIEW vwsendmailrecipients AS 
    SELECT
    messageid
    , recipient
    FROM
    ("vwSendMailTmpSendTo"
    CROSS JOIN UNNEST(recipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwSentMails AS
    SELECT 
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , dest.recipient as mailto
    FROM
    sesmaster as sm
    ,vwsendmailrecipients as dest
    WHERE
    sm.eventtype = 'Send'
    and sm.mail.messageid = dest.messageid
  4. Run the following scripts in Amazon Athena to flat the bounce.bouncedrecipients array in the BOUNCE event type
    CREATE OR REPLACE VIEW vwbouncemailtmprecipients AS 
    SELECT
    mail.messageId as messageid
    , bounce.bouncedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Bounce')
    
    CREATE OR REPLACE VIEW vwbouncemailrecipients AS 
    SELECT
    messageid
    , recipient.action
    , recipient.diagnosticcode
    , recipient.emailaddress
    FROM
    (vwbouncemailtmprecipients
    CROSS JOIN UNNEST(bouncedrecipients) t (recipient))
    
    CREATE OR REPLACE VIEW vwBouncedMails AS
    SELECT
    eventtype as eventtype
    , mail.messageId as mailmessageid
    , mail.timestamp as mailtimestamp
    , mail.source as mailsource
    , mail.sendingAccountId as mailsendingAccountId
    , mail.commonHeaders.subject as mailsubject
    , mail.tags.ses_configurationset as mailses_configurationset
    , mail.tags.ses_source_ip as mailses_source_ip
    , mail.tags.ses_from_domain as mailses_from_domain
    , mail.tags.ses_outgoing_ip as mailses_outgoing_ip
    , bounce.bounceType as bouncebounceType
    , bounce.bouncesubtype as bouncebouncesubtype
    , bounce.feedbackid as bouncefeedbackid
    , bounce.timestamp as bouncetimestamp
    , bounce.reportingMTA as bouncereportingmta
    , bd.action as bounceaction
    , bd.diagnosticcode as bouncediagnosticcode
    , bd.emailaddress as bounceemailaddress
    FROM
    sesmaster as sm
    ,vwbouncemailrecipients as bd
    WHERE
    sm.eventtype = 'Bounce'
    and sm.mail.messageid = bd.messageid
    
  5. Run the following scripts in Amazon Athena to flat the complaint.complainedrecipients array in the COMPLAINT event type
    CREATE OR REPLACE VIEW vwcomplainttmprecipients AS 
    SELECT
    mail.messageId as messageid
    , complaint.complainedrecipients
    FROM
    sesmaster
    WHERE (eventtype = 'Complaint')
    
    CREATE OR REPLACE VIEW vwcomplainedrecipients AS 
    SELECT
    messageid
    , recipient.emailaddress
    FROM
    (vwcomplainttmprecipients 
    CROSS JOIN UNNEST(complainedrecipients) t (recipient))
    

    At the end we have one table and four views which can be used in Amazon QuickSight to analyze email sending events:

    • Table sesmaster
    • View vwSESMaster
    • View vwSentMails
    • View vwBouncedMails
    • View vwComplainedemails

Step 4: Analyze and visualize data with Amazon QuickSight

 In this blog post, we use Amazon QuickSight to analyze and to visualize email sending events from the sesmaster and the four curated views created previously. Amazon QuickSight can directly access data through Athena. Its pay-per-session pricing enables you to put analytical insights into the hands of everyone in your organization.

Let’s set this up together. We first need to select our table and our views to create new data sources in Athena and then we use these data sources to populate the visualization. We are creating just an example of visualization. Feel free to create your own visualization based on your information needs.

Before we can use the data in Amazon QuickSight, we need to first grant access to the underlying S3 bucket. If you haven’t done so already for other analyses, see our documentation on how to do so.

  1. On the Amazon QuickSight home page, choose Datasets from the menu on the left side, then choose New dataset from the upper-right corner, set and pick Athena as data source. In the following dialog box, give the data source a descriptive name and choose Create data source.

    Create New Athena Data Source

    Figure 12. Create New Athena Data Source

  2. In the following dialog box, select the Catalog and the Database containing your sesmaster and curated views. Let’s select the sesmaster table in order to create some basic Key Performance Indicators. Select the table sesmaster and click on the Select

    Select Sesmaster Table

    Figure 13. Select Sesmaster Table

  3. Our sesmaster table now is a data source for Amazon QuickSight and we can turn to visualizing the data.

    QuickSight Visualize Data

    Figure 14. QuickSight Visualize Data

  4. You can see the list fields on the left. The canvas on the right is still empty. Before we populate it with data, let’s select Key Performance Indicator from the available visual types.

    QuickSight Visual Types

    Figure 15. QuickSight Visual Types

  5. To populate the graph, drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the field send onto the value well and use count as aggregation.

    Add Send field to visualization

    Figure 16. Add Send field to visualization

  6. Add another visual from the left-upper side and select Key Performance Indicator as visual type.
    Add a new visual

    Figure 17. Add a new visual

    Key Performance Indicator Visual Type

    Figure 18. Key Performance Indicator Visual Type

  7. Put the field Delivery onto the value well and use count as aggregation.

    Add Delivery Field to visualization

    Figure 19. Add Delivery Field to visualization

  8. Repeat the same procedure, (steps 1 to 4) to count the number of Open, Click, Bounce, Complaint and Reject Events. At the end, you should see something similar to the following visualization. After resizing and rearranging the visuals, you should get an analysis like the shown in the image below.

    Preview of Key Performance Indicators

    Figure 20. Preview of Key Performance Indicators

  9. Let´s add another dataset by clicking the pencil on the right of the current Dataset.

    Add a New Dataset

    Figure 21. Add a New Dataset

  10. On the following dialog box, select Add Dataset.

    Add a New Dataset

    Figure 22. Add a New Dataset

  11. Select the view called vwsesmaster and click Select.
    Add vwsesmaster dataset

    Figure 23. Add vwsesmaster dataset

    Now you can see all the available fields of the vwsesmaster view.

    New fields from vwsesmaster dataset

    Figure 24. New fields from vwsesmaster dataset

  12. Let’s create a new visual and select the Table visual type.

    QuickSight Visual Types

    Figure 25. QuickSight Visual Types

  13. Drag and drop the fields from the field list on the left onto their respective destinations. In our case, we put the fields eventtype, mailmessageid, and mailsubject onto the Group By well, but you can add as many fields as you need.

    Add eventtype, mailmessageid and mailsubject fields

    Figure 26. Add eventtype, mailmessageid and mailsubject fields

  14. Now let’s create a filter for this visual in order to filter by type of event. Be sure you select the table and then click on Filter on the left menu.

    Add a Filter

    Figure 27. Add a Filter

  15. Click on Create One and select the field eventtype on the popup window. Now select the eventtype filter to see the following options.

    Create eventtype filter

    Figure 28. Create eventtype filter

  16. Click on the dots on the right of the eventtype filter and select Add to Sheet.

    Add filter to sheet

    Figure 29. Add filter to sheet

  17. Leave all the default values, scroll down and select Apply

    Apply filters with default values

    Figure 30. Apply filters with default values

  18. Now you can filter the vwsesmaster view by eventtype.

    Filter vwsesmasterview by eventtype

    Figure 31. Filter vwsesmasterview by eventtype

  19. You can continue customizing your visualization with all the available data in the sesmaster table, the vwsesmaster view and even add more datasets to include data from the vwSentMails, vwBouncedMails, and vwComplainedemails views. Below, you can see some other visualizations created from those views.
    Final visualization 1

    Figure 32. Final visualization 1

    Final visualization 2

    Figure 33. Final visualization 2

    Final visualization 3

    Figure 34. Final visualization 3

Clean up

To avoid ongoing charges, clean up the resources you created as part of this post:

  1. Delete the visualizations created in Amazon Quicksight.
  2. Unsubscribe from Amazon QuickSight if you are not using it for other projects.
  3. Delete the views and tables created in Amazon Athena.
  4. Delete the Amazon SES configuration set.
  5. Delete the Amazon SES events stored in S3.
  6. Delete the CloudFormation stack in order to delete the Amazon Kinesis Delivery Stream.

Conclusion

In this blog we showed how you can use AWS native services and features to quickly create an email tracking solution based on Amazon SES events to have a more detailed view on your sending activities. This solution uses a full serverless architecture without having to manage the underlying infrastructure and giving you the flexibility to use the solution for small, medium or intense Amazon SES usage, without having to take care of any servers.

We showed you some samples of dashboards and analysis that can be built for most of customers requirements, but of course you can evolve this solution and customize it according to your needs, adding or removing charts, filters or events to the dashboard. Please refer to the following documentation for the available Amazon SES Events, their structure and also how to create analysis and dashboards on Amazon QuickSight:

From a performance and cost efficiency perspective there are still several configurations that can be done to improve the solution, for example using a columnar file formant like parquet, compressing with snappy or setting your S3 partition strategy according to your email sending usage. Another improvement could be importing data into SPICE to read data in Amazon Quicksight. Using SPICE results in the data being loaded from Athena only once, until it is either manually refreshed or automatically refreshed using a schedule.

You can use this walkthrough to configure your first SES dashboard and start visualizing events detail. You can adjust the services described in this blog according to your company requirements.

About the authors

Oscar Mendoza AWS Solutions Architect Oscar Mendoza is a Solutions Architect at AWS based in Bogotá, Colombia. Oscar works with our customers to provide guidance in architectural best practices and to build Well Architected solutions on the AWS platform. He enjoys spending time with his family and his dog and playing music.
Luis Eduardo Torres AWS Solutions Architect Luis Eduardo Torres is a Solutions Architect at AWS based in Bogotá, Colombia. He helps companies to build their business using the AWS cloud platform. He has a great interest in Analytics and has been leading the Analytics track of AWS Podcast in Spanish.
Santiago Benavidez AWS Solutions Architect Santiago Benavídez is a Solutions Architect at AWS based in Buenos Aires, Argentina, with more than 13 years of experience in IT, currently helping DNB/ISV customers to achieve their business goals using the breadth and depth of AWS services, designing highly available, resilient and cost-effective architectures.

Building a low-code speech “you know” counter using AWS Step Functions

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/building-a-low-code-speech-you-know-counter-using-aws-step-functions/

This post is written by Doug Toppin, Software Development Engineer, and Kishore Dhamodaran, Solutions Architect.

In public speaking, filler phrases can distract the audience and reduce the value and impact of what you are telling them. Reviewing recordings of presentations can be helpful to determine whether presenters are using filler phrases. Instead of manually reviewing prior recordings, automation can process media files and perform a speech-to-text function. That text can then be processed to report on the use of filler phrases.

This blog explains how to use AWS Step Functions, Amazon EventBridge, Amazon Transcribe and Amazon Athena to report on the use of the common phrase “you know” in media files. These services can automate and reduce the time required to find the use of filler phrases.

Step Functions can automate and chain together multiple activities and other Amazon services. Amazon Transcribe is a speech to text service that uses media files as input and produces textual transcripts from them. Athena is an interactive query service that makes it easier to analyze data in Amazon S3 using standard SQL. Athena enables the use of standard SQL to query data in S3.

This blog shows a low-code, configuration driven approach to implementing this solution. Low-code means writing little or no custom software to perform a function. Instead, you use a configuration drive approach using service integrations where state machine tasks call AWS services using existing SDKs, APIs, or interfaces. A configuration driven approach in this example is using Step Functions’ Amazon States Language (ASL) to tie actions together rather than writing traditional code. This requires fewer details for data management and error handling combined with a visual user interface for composing the workflow. As the actions and logic are clearly defined with the visual workflow, this reduces maintenance.

Solution overview

The following diagram shows the solution architecture.

SolutionOverview

Solution Overview

  1. You upload a media file to an Amazon S3 Media bucket.
  2. The media file upload to S3 triggers an EventBridge rule.
  3. The EventBridge rule starts the Step Functions state machine execution.
  4. The state machine invokes Amazon Transcribe to process the media file.
  5. The transcription output file is stored in the Amazon S3 Transcript bucket.
  6. The state machine invokes Athena to query the textual transcript for the filler phrase. This uses the AWS Glue table to describe the format of the transcription results file.
  7. The filler phrase count determined by Athena is returned and stored in the Amazon S3 Results bucket.

Prerequisites

  1. An AWS account and an AWS user or role with sufficient permissions to create the necessary resources.
  2. Access to the following AWS services: Step Functions, Amazon Transcribe, Athena, and Amazon S3.
  3. Latest version of the AWS Serverless Application Model (AWS SAM) CLI, which helps developers create and manage serverless applications in the AWS Cloud.
  4. Test media files (for example, the Official AWS Podcast).

Example walkthrough

  1. Clone the GitHub repository to your local machine.
  2. git clone https://github.com/aws-samples/aws-stepfunctions-examples.git
  3. Deploy the resources using AWS SAM. The deploy command processes the AWS SAM template file to create the necessary resources in AWS. Choose you-know as the stack name and the AWS Region that you want to deploy your solution to.
  4. cd aws-stepfunctions-examples/sam/app-low-code-you-know-counter/
    sam deploy --guided

Use the default parameters or replace with different values if necessary. For example, to get counts of a different filler phrase, replace the FillerPhrase parameter.

GlueDatabaseYouKnowP Name of the AWS Glue database to create.
AthenaTableName Name of the AWS Glue table that is used by Athena to query the results.
FillerPhrase The filler phrase to check.
AthenaQueryPreparedStatementName Name of the Athena prepared statement used to run SQL queries on.
AthenaWorkgroup Athena workgroup to use
AthenaDataCatalog The data source for running the Athena queries
SAM Deploy

SAM Deploy

Running the filler phrase counter

  1. Navigate to the Amazon S3 console and upload an mp3 or mp4 podcast recording to the bucket named bucket-{account number}-{Region}-you-know-media.
  2. Navigate to the Step Functions console. Choose the running state machine, and monitor the execution of the transcription state machine.
  3. State Machine Execution

    State Machine Execution

  4. When the execution completes successfully, select the QueryExecutionSuccess task to examine the output and see the filler phrase count.
  5. State Machine Output

    State Machine Output

  6. Amazon Transcribe produces the transcript text of the media file. You can examine the output in the Results bucket. Using the S3 console, navigate to the bucket, choose the file matching the media file name and use ‘Query with S3 Select’ to view the content.
  7. If the transcription job does not execute, the state machine reports the failure and exits.
  8. State Machine Fail

    State Machine Fail

Exploring the state machine

The state machine orchestrates the transcription processing:

State Machine Explore

State Machine Explore

The StartTranscriptionJob task starts the transcription job. The Wait state adds a 60-second delay before checking the status of the transcription job. Until the status of the job changes to FAILED or COMPLETED, the choice state continues.

When the job successfully completes, the AthenaStartQueryExecutionUsingPreparedStatement task starts the Athena query, and stores the results in the S3 results bucket. The AthenaGetQueryResults task retrieves the count from the resultset.

The TranscribeMediaBucket holds the media files to be uploaded. The configuration sends the upload notification event to EventBridge:

      
   NotificationConfiguration:
     EventBridgeConfiguration:
       EventBridgeEnabled: true
	  

The TranscribeResultsBucket has an associated policy to provide access to Amazon Transcribe. Athena stores the output from the queries performed by the state machine in the AthenaQueryResultsBucket .

When a media upload occurs, the YouKnowTranscribeStateMachine uses Step Functions’ native event integration to trigger an EventBridge rule. This contains an event object similar to:

{
  "version": "0",
  "id": "99a0cb40-4b26-7d74-dc59-c837f5346ac6",
  "detail-type": "Object Created",
  "source": "aws.s3",
  "account": "012345678901",
  "time": "2022-05-19T22:21:10Z",
  "region": "us-east-2",
  "resources": [
    "arn:aws:s3:::bucket-012345678901-us-east-2-you-know-media"
  ],
  "detail": {
    "version": "0",
    "bucket": {
      "name": "bucket-012345678901-us-east-2-you-know-media"
    },
    "object": {
      "key": "Podcase_Episode.m4a",
      "size": 202329,
      "etag": "624fce93a981f97d85025e8432e24f48",
      "sequencer": "006286C2D604D7A390"
    },
    "request-id": "B4DA7RD214V1QG3W",
    "requester": "012345678901",
    "source-ip-address": "172.0.0.1",
    "reason": "PutObject"
  }
}

The state machine allows you to prepare parameters and use the direct SDK integrations to start the transcription job by calling the Amazon Transcribe service’s API. This integration means you don’t have to write custom code to perform this function. The event triggering the state machine execution contains the uploaded media file location.


  StartTranscriptionJob:
	Type: Task
	Comment: Start a transcribe job on the provided media file
	Parameters:
	  Media:
		MediaFileUri.$: States.Format('s3://{}/{}', $.detail.bucket.name, $.detail.object.key)
	  TranscriptionJobName.$: "$.detail.object.key"
	  IdentifyLanguage: true
	  OutputBucketName: !Ref TranscribeResultsBucket
	Resource: !Sub 'arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:aws-sdk:transcribe:startTranscriptionJob'

The SDK uses aws-sdk:transcribe:getTranscriptionJob to get the status of the job.


  GetTranscriptionJob:
	Type: Task
	Comment: Retrieve the status of an Amazon Transcribe job
	Parameters:
	  TranscriptionJobName.$: "$.TranscriptionJob.TranscriptionJobName"
	Resource: !Sub 'arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:aws-sdk:transcribe:getTranscriptionJob'
	Next: TranscriptionJobStatus

The state machine uses a polling loop with a delay to check the status of the transcription job.


  TranscriptionJobStatus:
	Type: Choice
	Choices:
	- Variable: "$.TranscriptionJob.TranscriptionJobStatus"
	  StringEquals: COMPLETED
	  Next: AthenaStartQueryExecutionUsingPreparedStatement
	- Variable: "$.TranscriptionJob.TranscriptionJobStatus"
	  StringEquals: FAILED
	  Next: Failed
	Default: Wait

When the transcription job completes successfully, the filler phrase counting process begins.

An Athena prepared statement performs the query with the transcription job name as a runtime parameter. The AWS SDK starts the query and the state machine execution pauses, waiting for the results to return before progressing to the next state:

athena:startQueryExecution.sync

When the query completes, Step Functions uses the SDK integration to retrieve the results using athena:getQueryResults:

athena:getQueryResults

It creates an Athena prepared statement to pass the transcription jobname as a parameter for the query execution:

  ResultsQueryPreparedStatement:
    Type: AWS::Athena::PreparedStatement
    Properties:
      Description: Create a statement that allows the use of a parameter for specifying an Amazon Transcribe job name in the Athena query
      QueryStatement: !Sub >-
        select cardinality(regexp_extract_all(results.transcripts[1].transcript, '${FillerPhrase}')) AS item_count from "${GlueDatabaseYouKnow}"."${AthenaTableName}" where jobname like ?
      StatementName: !Ref AthenaQueryPreparedStatementName
      WorkGroup: !Ref AthenaWorkgroup

There are several opportunities to enhance this tool. For example, adding support for multiple filler phrases. You could build a larger application to upload media and retrieve the results. You could take advantage of Amazon Transcribe’s real-time transcription API to display the results while a presentation is in progress to provide immediate feedback to the presenter.

Cleaning up

  1. Navigate to the Amazon Transcribe console. Choose Transcription jobs in the left pane, select the jobs created by this example, and choose Delete.
  2. Cleanup Delete

    Cleanup Delete

  3. Navigate to the S3 console. In the Find buckets by name search bar, enter “you-know”. This shows the list of buckets created for this example. Choose each of the radio buttons next to the bucket individually and choose Empty.
  4. Cleanup S3

    Cleanup S3

  5. Use the following command to delete the stack, and confirm the stack deletion.
  6. sam delete

Conclusion

Low-code applications can increase developer efficiency by reducing the amount of custom code required to build solutions. They can also enable non-developer roles to create automation to perform business functions by providing drag-and-drop style user interfaces.

This post shows how a low-code approach can build a tool chain using AWS services. The example processes media files to produce text transcripts and count the use of filler phrases in those transcripts. It shows how to process EventBridge data and how to invoke Amazon Transcribe and Athena using Step Functions state machines.

For more serverless learning resources, visit Serverless Land.

Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue

Post Syndicated from Kishore Dhamodaran original https://aws.amazon.com/blogs/big-data/build-an-apache-iceberg-data-lake-using-amazon-athena-amazon-emr-and-aws-glue/

Most businesses store their critical data in a data lake, where you can bring data from various sources to a centralized storage. The data is processed by specialized big data compute engines, such as Amazon Athena for interactive queries, Amazon EMR for Apache Spark applications, Amazon SageMaker for machine learning, and Amazon QuickSight for data visualization.

Apache Iceberg is an open-source table format for data stored in data lakes. It is optimized for data access patterns in Amazon Simple Storage Service (Amazon S3) cloud object storage. Iceberg helps data engineers tackle complex challenges in data lakes such as managing continuously evolving datasets while maintaining query performance. Iceberg allows you to do the following:

  • Maintain transactional consistency where files can be added, removed, or modified atomically with full read isolation and multiple concurrent writes
  • Implement full schema evolution to process safe table schema updates as the table data evolves
  • Organize tables into flexible partition layouts with partition evolution, enabling updates to partition schemes as queries and data volume changes without relying on physical directories
  • Perform row-level update and delete operations to satisfy new regulatory requirements such as the General Data Protection Regulation (GDPR)
  • Provide versioned tables and support time travel queries to query historical data and verify changes between updates
  • Roll back tables to prior versions to return tables to a known good state in case of any issues

In 2021, AWS teams contributed the Apache Iceberg integration with the AWS Glue Data Catalog to open source, which enables you to use open-source compute engines like Apache Spark with Iceberg on AWS Glue. In 2022, Amazon Athena announced support of Iceberg and Amazon EMR added support of Iceberg starting with version 6.5.0.

In this post, we show you how to use Amazon EMR Spark to create an Iceberg table, load sample books review data, and use Athena to query, perform schema evolution, row-level update and delete, and time travel, all coordinated through the AWS Glue Data Catalog.

Solution overview

We use the Amazon Customer Reviews public dataset as our source data. The dataset contains data files in Apache Parquet format on Amazon S3. We load all the book-related Amazon review data as an Iceberg table to demonstrate the advantages of using the Iceberg table format on top of raw Parquet files. The following diagram illustrates our solution architecture.

Architecture that shows the flow from Amazon EMR loading data into Amazon S3, and queried by Amazon Athena through AWS Glue Data Catalog.

To set up and test this solution, we complete the following high-level steps:

  1. Create an S3 bucket.
  2. Create an EMR cluster.
  3. Create an EMR notebook.
  4. Configure a Spark session.
  5. Load data into the Iceberg table.
  6. Query the data in Athena.
  7. Perform a row-level update in Athena.
  8. Perform a schema evolution in Athena.
  9. Perform time travel in Athena.
  10. Consume Iceberg data across Amazon EMR and Athena.

Prerequisites

To follow along with this walkthrough, you must have the following:

  • An AWS Account with a role that has sufficient access to provision the required resources.

Create an S3 bucket

To create an S3 bucket that holds your Iceberg data, complete the following steps:

  1. On the Amazon S3 console, choose Buckets in the navigation pane.
  2. Choose Create bucket.
  3. For Bucket name, enter a name (for this post, we enter aws-lake-house-iceberg-blog-demo).

Because S3 bucket names are globally unique, choose a different name when you create your bucket.

  1. For AWS Region, choose your preferred Region (for this post, we use us-east-1).

Create a new Amazon S3 bucket. Choose us-east-1 as region

  1. Complete the remaining steps to create your bucket.
  2. If this is the first time that you’re using Athena to run queries, create another globally unique S3 bucket to hold your Athena query output.

Create an EMR cluster

Now we’re ready to start an EMR cluster to run Iceberg jobs using Spark.

  1. On the Amazon EMR console, choose Create cluster.
  2. Choose Advanced options.
  3. For Software Configuration, choose your Amazon EMR release version.

Iceberg requires release 6.5.0 and above.

  1. Select JupyterEnterpriseGateway and Spark as the software to install.
  2. For Edit software settings, select Enter configuration and enter [{"classification":"iceberg-defaults","properties":{"iceberg.enabled":true}}].
  3. Leave other settings at their default and choose Next.

Choose Amazon EMR release 6.6.0 and JupyterEnterpriseGateway and Spark. Enter configuration information.

  1. You can change the hardware used by the Amazon EMR cluster in this step. In this demo, we use the default setting.
  2. Choose Next.
  3. For Cluster name, enter Iceberg Spark Cluster.
  4. Leave the remaining settings unchanged and choose Next.

Provide Iceberg Spark Cluster as the Cluster name

  1. You can configure security settings such as adding an EC2 key pair to access your EMR cluster locally. In this demo, we use the default setting.
  2. Choose Create cluster.

You’re redirected to the cluster detail page, where you wait for the EMR cluster to transition from Starting to Waiting.

Create an EMR notebook

When the cluster is active and in the Waiting state, we’re ready to run Spark programs in the cluster. For this demo, we use an EMR notebook to run Spark commands.

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Choose Create notebook.
  3. For Notebook name, enter a name (for this post, we enter iceberg-spark-notebook).
  4. For Cluster, select Choose an existing cluster and choose Iceberg Spark Cluster.
  5. For AWS service role, choose Create a new role to create EMR_Notebook_DefaultRole or choose a different role to access resources in the notebook.
  6. Choose Create notebook.

Create an Amazon EMR notebook. Use EMR_Notebooks_DefaultRole

You’re redirected to the notebook detail page.

  1. Choose Open in JupyterLab next to your notebook.
  2. Choose to create a new notebook.
  3. Under Notebook, choose Spark.

Choose Spark from the options provided in the Launcher

Configure a Spark session

In your notebook, run the following code:

%%configure -f
{
  "conf": {
    "spark.sql.catalog.demo": "org.apache.iceberg.spark.SparkCatalog",
    "spark.sql.catalog.demo.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog",
    "spark.sql.catalog.demo.warehouse": "s3://<your-iceberg-blog-demo-bucket>",
    "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"
  }
}

This sets the following Spark session configurations:

  • spark.sql.catalog.demo – Registers a Spark catalog named demo, which uses the Iceberg Spark catalog plugin
  • spark.sql.catalog.demo.catalog-impl – The demo Spark catalog uses AWS Glue as the physical catalog to store Iceberg database and table information
  • spark.sql.catalog.demo.warehouse – The demo Spark catalog stores all Iceberg metadata and data files under the root path s3://<your-iceberg-blog-demo-bucket>
  • spark.sql.extensions – Adds support to Iceberg Spark SQL extensions, which allows you to run Iceberg Spark procedures and some Iceberg-only SQL commands (you use this in a later step)

Load data into the Iceberg table

In our Spark session, run the following commands to load data:

// create a database in AWS Glue named reviews if not exist
spark.sql("CREATE DATABASE IF NOT EXISTS demo.reviews")

// load reviews related to books
val book_reviews_location = "s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet"
val book_reviews = spark.read.parquet(book_reviews_location)

// write book reviews data to an Iceberg v2 table
book_reviews.writeTo("demo.reviews.book_reviews").tableProperty("format-version", "2").createOrReplace()

Iceberg format v2 is needed to support row-level updates and deletes. See Format Versioning for more details.

It may take up to 15 minutes for the commands to complete. When it’s complete, you should be able to see the table on the AWS Glue console, under the reviews database, with the table_type property shown as ICEBERG.

Shows the table properties for book_reviews table

The table schema is inferred from the source Parquet data files. You can also create the table with a specific schema before loading data using Spark SQL, Athena SQL, or Iceberg Java and Python SDKs.

Query in Athena

Navigate to the Athena console and choose Query editor. If this is your first time using the Athena query editor, you need to configure to use the S3 bucket you created earlier to store the query results.

The table book_reviews is available for querying. Run the following query:

SELECT * FROM reviews.book_reviews LIMIT 5;

The following screenshot shows the first five records from the table being displayed.

Amazon Athena query the first 5 rows and show the results

Perform a row-level update in Athena

In the next few steps, let’s focus on a record in the table with review ID RZDVOUQG1GBG7. Currently, it has no total votes when we run the following query:

SELECT total_votes FROM reviews.book_reviews 
WHERE review_id = 'RZDVOUQG1GBG7'

Query total_votes for a particular review which shows a value of 0

Let’s update the total_votes value to 2 using the following query:

UPDATE reviews.book_reviews
SET total_votes = 2
WHERE review_id = 'RZDVOUQG1GBG7'

Update query to set the total_votes for the previous review_id to 2

After your update command runs successfully, run the below query and note the updated result showing a total of two votes:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Athena enforces ACID transaction guarantee for all the write operations against an Iceberg table. This is done through the Iceberg format’s optimistic locking specification. When concurrent attempts are made to update the same record, a commit conflict occurs. In this scenario, Athena displays a transaction conflict error, as shown in the following screenshot.

Concurrent updates causes a failure. This shows the TRANSACTION_CONFLICT error during this scenario.

Delete queries work in a similar way; see DELETE for more details.

Perform a schema evolution in Athena

Suppose the review suddenly goes viral and gets 10 billion votes:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Based on the AWS Glue table information, the total_votes is an integer column. If you try to update a value of 10 billion, which is greater than the maximum allowed integer value, you get an error reporting a type mismatch.

Updating to a very large value greater than maximum allowed integer value results in an error

Iceberg supports most schema evolution features as metadata-only operations, which don’t require a table rewrite. This includes add, drop, rename, reorder column, and promote column types. To solve this issue, you can change the integer column total_votes to a BIGINT type by running the following DDL:

ALTER TABLE reviews.book_reviews
CHANGE COLUMN total_votes total_votes BIGINT;

You can now update the value successfully:

UPDATE reviews.book_reviews
SET total_votes = 10000000000
WHERE review_id = 'RZDVOUQG1GBG7'

Querying the record now gives us the expected result in BIGINT:

SELECT total_votes FROM reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'

Perform time travel in Athena

In Iceberg, the transaction history is retained, and each transaction commit creates a new version. You can perform time travel to look at a historical version of a table. In Athena, you can use the following syntax to travel to a time that is after when the first version was committed:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Query an earlier snapshot using time travel feature

Consume Iceberg data across Amazon EMR and Athena

One of the most important features of a data lake is for different systems to seamlessly work together through the Iceberg open-source protocol. After all the operations are performed in Athena, let’s go back to Amazon EMR and confirm that Amazon EMR Spark can consume the updated data.

First, run the same Spark SQL and see if you get the same result for the review used in the example:

val select_votes = """SELECT total_votes FROM demo.reviews.book_reviews
WHERE review_id = 'RZDVOUQG1GBG7'"""

spark.sql(select_votes).show()

Spark shows 10 billion total votes for the review.

Shows the latest value of total_votes when querying using the Amazon EMR notebook

Check the transaction history of the operation in Athena through Spark Iceberg’s history system table:

val select_history = "SELECT * FROM demo.reviews.book_reviews.history"

spark.sql(select_history).show()

This shows three transactions corresponding to the two updates you ran in Athena.

Shows snapshots corresponding to the two updates you ran in Athena

Iceberg offers a variety of Spark procedures to optimize the table. For example, you can run an expire_snapshots procedure to remove old snapshots, and free up storage space in Amazon S3:

import java.util.Calendar
import java.text.SimpleDateFormat

val now = Calendar.getInstance().getTime()
val form = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
val now_formatted = form.format(now.getTime())
val procedure = s"""CALL demo.system.expire_snapshots(
  table => 'reviews.book_reviews',
  older_than => TIMESTAMP '$now_formatted',
  retain_last => 1)"""

spark.sql(procedure)

Note that, after running this procedure, time travel can no longer be performed against expired snapshots.

Examine the history system table again and notice that it shows you only the most recent snapshot.

Running the following query in Athena results in an error “No table snapshot found before timestamp…” as older snapshots were deleted, and you are no longer able to time travel to the older snapshot:

SELECT total_votes FROM reviews.book_reviews
FOR SYSTEM_TIME AS OF localtimestamp + interval '-20' minute
WHERE review_id = 'RZDVOUQG1GBG7'

Clean up

To avoid incurring ongoing costs, complete the following steps to clean up your resources:

  1. Run the following code in your notebook to drop the AWS Glue table and database:
// DROP the table 
spark.sql("DROP TABLE demo.reviews.book_reviews") 
// DROP the database 
spark.sql("DROP DATABASE demo.reviews")
  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook iceberg-spark-notebook and choose Delete.
  3. Choose Clusters in the navigation pane.
  4. Select the cluster Iceberg Spark Cluster and choose Terminate.
  5. Delete the S3 buckets and any other resources that you created as part of the prerequisites for this post.

Conclusion

In this post, we showed you an example of using Amazon S3, AWS Glue, Amazon EMR, and Athena to build an Iceberg data lake on AWS. An Iceberg table can seamlessly work across two popular compute engines, and you can take advantage of both to design your customized data production and consumption use cases.

With AWS Glue, Amazon EMR, and Athena, you can already use many features through AWS integrations, such as SageMaker Athena integration for machine learning, or QuickSight Athena integration for dashboard and reporting. AWS Glue also offers the Iceberg connector, which you can use to author and run Iceberg data pipelines.

In addition, Iceberg supports a variety of other open-source compute engines that you can choose from. For example, you can use Apache Flink on Amazon EMR for streaming and change data capture (CDC) use cases. The strong transaction guarantee and efficient row-level update, delete, time travel, and schema evolution experience offered by Iceberg offers a sound foundation and infinite possibilities for users to unlock the power of big data.


About the Authors

Kishore Dhamodaran is a Senior Solutions Architect at AWS. Kishore helps strategic customers with their cloud enterprise strategy and migration journey, leveraging his years of industry and cloud experience.

Jack Ye is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.

Mohit Mehta is a Principal Architect at AWS with expertise in AI/ML and data analytics. He holds 12 AWS certifications and is passionate about helping customers implement cloud enterprise strategies for digital transformation. In his free time, he trains for marathons and plans hikes across major peaks around the world.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Jared Keating is a Senior Cloud Consultant with AWS Professional Services. Jared assists customers with their cloud infrastructure, compliance, and automation requirements, drawing from his 20+ years of IT experience.

Optimize Federated Query Performance using EXPLAIN and EXPLAIN ANALYZE in Amazon Athena

Post Syndicated from Nishchai JM original https://aws.amazon.com/blogs/big-data/optimize-federated-query-performance-using-explain-and-explain-analyze-in-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. In 2019, Athena added support for federated queries to run SQL queries across data stored in relational, non-relational, object, and custom data sources.

In 2021, Athena added support for the EXPLAIN statement, which can help you understand and improve the efficiency of your queries. The EXPLAIN statement provides a detailed breakdown of a query’s run plan. You can analyze the plan to identify and reduce query complexity and improve its runtime. You can also use EXPLAIN to validate SQL syntax prior to running the query. Doing so helps prevent errors that would have occurred while running the query.

Athena also added EXPLAIN ANALYZE, which displays the computational cost of your queries alongside their run plans. Administrators can benefit from using EXPLAIN ANALYZE because it provides a scanned data count, which helps you reduce financial impact due to user queries and apply optimizations for better cost control.

In this post, we demonstrate how to use and interpret EXPLAIN and EXPLAIN ANALYZE statements to improve Athena query performance when querying multiple data sources.

Solution overview

To demonstrate using EXPLAIN and EXPLAIN ANALYZE statements, we use the following services and resources:

Athena uses the AWS Glue Data Catalog to store and retrieve table metadata for the Amazon S3 data in your AWS account. The table metadata lets the Athena query engine know how to find, read, and process the data that you want to query. We use Athena data source connectors to connect to data sources external to Amazon S3.

Prerequisites

To deploy the CloudFormation template, you must have the following:

Provision resources with AWS CloudFormation

To deploy the CloudFormation template, complete the following steps:

  1. Choose Launch Stack:

  1. Follow the prompts on the AWS CloudFormation console to create the stack.
  2. Note the key-value pairs on the stack’s Outputs tab.

You use these values when configuring the Athena data source connectors.

The CloudFormation template creates the following resources:

  • S3 buckets to store data and act as temporary spill buckets for Lambda
  • AWS Glue Data Catalog tables for the data in the S3 buckets
  • A DynamoDB table and Amazon RDS for MySQL tables, which are used to join multiple tables from different sources
  • A VPC, subnets, and endpoints, which are needed for Amazon RDS for MySQL and DynamoDB

The following figure shows the high-level data model for the data load.

Create the DynamoDB data source connector

To create the DynamoDB connector for Athena, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select Amazon DynamoDB.
  4. Choose Next.

  1. For Data source name, enter DDB.

  1. For Lambda function, choose Create Lambda function.

This opens a new tab in your browser.

  1. For Application name, enter AthenaDynamoDBConnector.
  2. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  3. For AthenaCatalogName, enter dynamodb-lambda-func.
  4. Leave the remaining values at their defaults.
  5. Select I acknowledge that this app creates custom IAM roles and resource policies.
  6. Choose Deploy.

You’re returned to the Connect data sources section on the Athena console.

  1. Choose the refresh icon next to Lambda function.
  2. Choose the Lambda function you just created (dynamodb-lambda-func).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. If you haven’t already set up the Athena query results location, choose View settings on the Athena query editor page.

  1. Choose Manage.
  2. For Location of query result, browse to the S3 bucket specified for the Athena spill bucket in the CloudFormation template.
  3. Add Athena-query to the S3 path.
  4. Choose Save.

  1. In the Athena query editor, for Data source, choose DDB.
  2. For Database, choose default.

You can now explore the schema for the sportseventinfo table; the data is the same in DynamoDB.

  1. Choose the options icon for the sportseventinfo table and choose Preview Table.

Create the Amazon RDS for MySQL data source connector

Now let’s create the connector for Amazon RDS for MySQL.

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select MySQL.
  4. Choose Next.

  1. For Data source name, enter MySQL.

  1. For Lambda function, choose Create Lambda function.

  1. For Application name, enter AthenaMySQLConnector.
  2. For SecretNamePrefix, enter AthenaMySQLFederation.
  3. For SpillBucket, enter the value from the CloudFormation stack for AthenaSpillBucket.
  4. For DefaultConnectionString, enter the value from the CloudFormation stack for MySQLConnection.
  5. For LambdaFunctionName, enter mysql-lambda-func.
  6. For SecurityGroupIds, enter the value from the CloudFormation stack for RDSSecurityGroup.
  7. For SubnetIds, enter the value from the CloudFormation stack for RDSSubnets.
  8. Select I acknowledge that this app creates custom IAM roles and resource policies.
  9. Choose Deploy.

  1. On the Lambda console, open the function you created (mysql-lambda-func).
  2. On the Configuration tab, under Environment variables, choose Edit.

  1. Choose Add environment variable.
  2. Enter a new key-value pair:
    • For Key, enter MYSQL_connection_string.
    • For Value, enter the value from the CloudFormation stack for MySQLConnection.
  3. Choose Save.

  1. Return to the Connect data sources section on the Athena console.
  2. Choose the refresh icon next to Lambda function.
  3. Choose the Lambda function you created (mysql-lamdba-function).

  1. Choose Next.
  2. Review the settings and choose Create data source.
  3. In the Athena query editor, for Data Source, choose MYSQL.
  4. For Database, choose sportsdata.

  1. Choose the options icon by the tables and choose Preview Table to examine the data and schema.

In the following sections, we demonstrate different ways to optimize our queries.

Optimal join order using EXPLAIN plan

A join is a basic SQL operation to query data on multiple tables using relations on matching columns. Join operations affect how much data is read from a table, how much data is transferred to the intermediate stages through networks, and how much memory is needed to build up a hash table to facilitate a join.

If you have multiple join operations and these join tables aren’t in the correct order, you may experience performance issues. To demonstrate this, we use the following tables from difference sources and join them in a certain order. Then we observe the query runtime and improve performance by using the EXPLAIN feature from Athena, which provides some suggestions for optimizing the query.

The CloudFormation template you ran earlier loaded data into the following services:

AWS Storage Table Name Number of Rows
Amazon DynamoDB sportseventinfo 657
Amazon S3 person 7,025,585
Amazon S3 ticketinfo 2,488

Let’s construct a query to find all those who participated in the event by type of tickets. The query runtime with the following join took approximately 7 mins to complete:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

Now let’s use EXPLAIN on the query to see its run plan. We use the same query as before, but add explain (TYPE DISTRIBUTED):

EXPLAIN (TYPE DISTRIBUTED)
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."person" p, 
"AwsDataCatalog"."athenablog"."ticketinfo" t 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output

Notice the cross-join in Fragment 1. The joins are converted to a Cartesian product for each table, where every record in a table is compared to every record in another table. Therefore, this query takes a significant amount of time to complete.

To optimize our query, we can rewrite it by reordering the joining tables as sportseventinfo first, ticketinfo second, and person last. The reason for this is because the WHERE clause, which is being converted to a JOIN ON clause during the query plan stage, doesn’t have the join relationship between the person table and sportseventinfo table. Therefore, the query plan generator converted the join type to cross-joins (a Cartesian product), which less efficient. Reordering the tables aligns the WHERE clause to the INNER JOIN type, which satisfies the JOIN ON clause and runtime is reduced from 7 minutes to 10 seconds.

The code for our optimized query is as follows:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE 
t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following is the EXPLAIN output of our query after reordering the join clause:

EXPLAIN (TYPE DISTRIBUTED) 
SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"DDB"."default"."sportseventinfo" e, 
"AwsDataCatalog"."athenablog"."ticketinfo" t, 
"AwsDataCatalog"."athenablog"."person" p 
WHERE t.sporting_event_id = cast(e.eventid as double) 
AND t.ticketholder_id = p.id

The following screenshot shows our output.

The cross-join changed to INNER JOIN with join on columns (eventid, id, ticketholder_id), which results in the query running faster. Joins between the ticketinfo and person tables converted to the PARTITION distribution type, where both left and right tables are hash-partitioned across all worker nodes due to the size of the person table. The join between the sportseventinfo table and ticketinfo are converted to the REPLICATED distribution type, where one table is hash-partitioned across all worker nodes and the other table is replicated to all worker nodes to perform the join operation.

For more information about how to analyze these results, refer to Understanding Athena EXPLAIN statement results.

As a best practice, we recommend having a JOIN statement along with an ON clause, as shown in the following code:

SELECT t.id AS ticket_id, 
e.eventid, 
p.first_name 
FROM 
"AwsDataCatalog"."athenablog"."person" p 
JOIN "AwsDataCatalog"."athenablog"."ticketinfo" t ON t.ticketholder_id = p.id 
JOIN "ddb"."default"."sportseventinfo" e ON t.sporting_event_id = cast(e.eventid as double)

Also as a best practice when you join two tables, specify the larger table on the left side of join and the smaller table on the right side of the join. Athena distributes the table on the right to worker nodes, and then streams the table on the left to do the join. If the table on the right is smaller, then less memory is used and the query runs faster.

In the following sections, we present examples of how to optimize pushdowns for filter predicates and projection filter operations for the Athena data source using EXPLAIN ANALYZE.

Pushdown optimization for the Athena connector for Amazon RDS for MySQL

A pushdown is an optimization to improve the performance of a SQL query by moving its processing as close to the data as possible. Pushdowns can drastically reduce SQL statement processing time by filtering data before transferring it over the network and filtering data before loading it into memory. The Athena connector for Amazon RDS for MySQL supports pushdowns for filter predicates and projection pushdowns.

The following table summarizes the services and tables we use to demonstrate a pushdown using Aurora MySQL.

Table Name Number of Rows Size in KB
player_partitioned 5,157 318.86
sport_team_partitioned 62 5.32

We use the following query as an example of a filtering predicate and projection filter:

SELECT full_name,
name 
FROM "sportsdata"."player_partitioned" a 
JOIN "sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

This query selects the players and their team based on their ID. It serves as an example of both filter operations in the WHERE clause and projection because it selects only two columns.

We use EXPLAIN ANALYZE to get the cost for the running this query:

EXPLAIN ANALYZE 
SELECT full_name,
name 
FROM "MYSQL"."sportsdata"."player_partitioned" a 
JOIN "MYSQL"."sportsdata"."sport_team_partitioned" b ON a.sport_team_id=b.id 
WHERE a.id='1.0'

The following screenshot shows the output in Fragment 2 for the table player_partitioned, in which we observe that the connector has a successful pushdown filter on the source side, so it tries to scan only one record out of the 5,157 records in the table. The output also shows that the query scan has only two columns (full_name as the projection column and sport_team_id and the join column), and uses SELECT and JOIN, which indicates the projection pushdown is successful. This helps reduce the data scan when using Athena data source connectors.

Now let’s look at the conditions in which a filter predicate pushdown doesn’t work with Athena connectors.

LIKE statement in filter predicates

We start with the following example query to demonstrate using the LIKE statement in filter predicates:

SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

We then add EXPLAIN ANALYZE:

EXPLAIN ANALYZE 
SELECT * 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

The EXPLAIN ANALYZE output shows that the query performs the table scan (scanning the table player_partitioned, which contains 5,157 records) for all the records even though the WHERE clause only has 30 records matching the condition %Aar%. Therefore, the data scan shows the complete table size even with the WHERE clause.

We can optimize the same query by selecting only the required columns:

EXPLAIN ANALYZE 
SELECT sport_team_id,
full_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name LIKE '%Aar%'

From the EXPLAIN ANALYZE output, we can observe that the connector supports the projection filter pushdown, because we select only two columns. This brought the data scan size down to half of the table size.

OR statement in filter predicates

We start with the following query to demonstrate using the OR statement in filter predicates:

SELECT id,
first_name 
FROM "MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

We use EXPLAIN ANALYZE with the preceding query as follows:

EXPLAIN ANALYZE 
SELECT * 
FROM 
"MYSQL"."sportsdata"."player_partitioned" 
WHERE first_name = 'Aaron' OR id ='1.0'

Similar to the LIKE statement, the following output shows that query scanned the table instead of pushing down to only the records that matched the WHERE clause. This query outputs only 16 records, but the data scan indicates a complete scan.

Pushdown optimization for the Athena connector for DynamoDB

For our example using the DynamoDB connector, we use the following data:

Table Number of Rows Size in KB
sportseventinfo 657 85.75

Let’s test the filter predicate and project filter operation for our DynamoDB table using the following query. This query tries to get all the events and sports for a given location. We use EXPLAIN ANALYZE for the query as follows:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE Location = 'Chase Field'

The output of EXPLAIN ANALYZE shows that the filter predicate retrieved only 21 records, and the project filter selected only two columns to push down to the source. Therefore, the data scan for this query is less than the table size.

Now let’s see where filter predicate pushdown doesn’t work. In the WHERE clause, if you apply the TRIM() function to the Location column and then filter, predicate pushdown optimization doesn’t apply, but we still see the projection filter optimization, which does apply. See the following code:

EXPLAIN ANALYZE 
SELECT EventId,
Sport 
FROM "DDB"."default"."sportseventinfo" 
WHERE trim(Location) = 'Chase Field'

The output of EXPLAIN ANALYZE for this query shows that the query scans all the rows but is still limited to only two columns, which shows that the filter predicate doesn’t work when the TRIM function is applied.

We’ve seen from the preceding examples that the Athena data source connector for Amazon RDS for MySQL and DynamoDB do support filter predicates and projection predicates for pushdown optimization, but we also saw that operations such as LIKE, OR, and TRIM when used in the filter predicate don’t support pushdowns to the source. Therefore, if you encounter unexplained charges in your federated Athena query, we recommend using EXPLAIN ANALYZE with the query and determine whether your Athena connector supports the pushdown operation or not.

Please note that running EXPLAIN ANALYZE incurs cost because it scans the data.

Conclusion

In this post, we showcased how to use EXPLAIN and EXPLAIN ANALYZE to analyze Athena SQL queries for data sources on AWS S3 and Athena federated SQL query for data source like DynamoDB and Amazon RDS for MySQL. You can use this as an example to optimize queries which would also result in cost savings.


About the Authors

Nishchai JM is an Analytics Specialist Solutions Architect at Amazon Web services. He specializes in building Big-data applications and help customer to modernize their applications on Cloud. He thinks Data is new oil and spends most of his time in deriving insights out of the Data.

Varad Ram is Senior Solutions Architect in Amazon Web Services. He likes to help customers adopt to cloud technologies and is particularly interested in artificial intelligence. He believes deep learning will power future technology growth. In his spare time, he like to be outdoor with his daughter and son.

Build a multilingual dashboard with Amazon Athena and Amazon QuickSight

Post Syndicated from Francesco Marelli original https://aws.amazon.com/blogs/big-data/build-a-multilingual-dashboard-with-amazon-athena-and-amazon-quicksight/

Amazon QuickSight is a serverless business intelligence (BI) service used by organizations of any size to make better data-driven decisions. QuickSight dashboards can also be embedded into SaaS apps and web portals to provide interactive dashboards, natural language query or data analysis capabilities to app users seamlessly. The QuickSight Demo Central contains many dashboards, feature showcase and tips and tricks that you can use; in the QuickSight Embedded Analytics Developer Portal you can find details on how to embed dashboards in your applications.

The QuickSight user interface currently supports 15 languages that you can choose on a per-user basis. The language selected for the user interface localizes all text generated by QuickSight with respect to UI components and isn’t applied to the data displayed in the dashboards.

This post describes how to create multilingual dashboards at the data level by creating new columns that contain the translated text and providing a language selection parameter and associated control to display data in the selected language in a QuickSight dashboard. You can create new columns with the translated text in several ways; in this post we create new columns using Amazon Athena user-defined functions implemented in the GitHub project sample Amazon Athena UDFs for text translation and analytics using Amazon Comprehend and Amazon Translate. This approach makes it easy to automatically create columns with translated text using neural machine translation provided by Amazon Translate.

Solution overview

The following diagram illustrates the architecture of this solution.

Architecture

For this post, we use the sample SaaS-Sales.csv dataset and follow these steps:

  1. Copy the dataset to a bucket in Amazon Simple Storage Service (Amazon S3).
  2. Use Athena to define a database and table to read the CSV file.
  3. Create a new table in Parquet format with the columns with the translated text.
  4. Create a new dataset in QuickSight.
  5. Create the parameter and control to select the language.
  6. Create dynamic multilingual calculated fields.
  7. Create an analysis with calculated multilingual calculated fields.
  8. Publish the multilingual dashboard.
  9. Create parametric headers and titles for visuals for use in an embedded dashboard.

An alternative approach might be to directly upload the CSV dataset to QuickSight and create the new columns with translated text as QuickSight calculated fields, for example using the ifelse() conditional function to directly assign the translated values.

Prerequisites

To follow the steps in this post, you need to have an AWS account with an active QuickSight Standard Edition or Enterprise Edition subscription.

Copy the dataset to a bucket in Amazon S3

Use the AWS Command Line Interface (AWS CLI) to create the S3 bucket qs-ml-blog-data and copy the dataset under the prefix saas-sales in your AWS account. You must follow the bucket naming rules to create your bucket. See the following code:

$ MY_BUCKET=qs-ml-blog-data
$ PREFIX=saas-sales

$ aws s3 mb s3://${MY_BUCKET}/

$ aws s3 cp \
    "s3://ee-assets-prod-us-east-1/modules/337d5d05acc64a6fa37bcba6b921071c/v1/SaaS-Sales.csv" \
    "s3://${MY_BUCKET}/${PREFIX}/SaaS-Sales.csv" 

Define a database and table to read the CSV file

Use the Athena query editor to create the database qs_ml_blog_db:

CREATE DATABASE IF NOT EXISTS qs_ml_blog_db;

Then create the new table qs_ml_blog_db.saas_sales:

CREATE EXTERNAL TABLE IF NOT EXISTS qs_ml_blog_db.saas_sales (
  row_id bigint, 
  order_id string, 
  order_date string, 
  date_key bigint, 
  contact_name string, 
  country_en string, 
  city_en string, 
  region string, 
  subregion string, 
  customer string, 
  customer_id bigint, 
  industry_en string, 
  segment string, 
  product string, 
  license string, 
  sales double, 
  quantity bigint, 
  discount double, 
  profit double)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<MY_BUCKET>/saas-sales/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',', 
  'skip.header.line.count'='1', 
  'typeOfData'='file')

Create a new table in Parquet format with the columns with the translated text

We want to translate the columns country_en, city_en, and industry_en to German, Spanish, and Italian. To do this in a scalable and flexible way, we use the GitHub project sample Amazon Athena UDFs for text translation and analytics using Amazon Comprehend and Amazon Translate.

After you set up the user-defined functions following the instructions in the GitHub repo, run the following SQL query in Athena to create the new table qs_ml_blog_db.saas_sales_ml with the translated columns using the translate_text user-defined function and some other minor changes:

CREATE TABLE qs_ml_blog_db.saas_sales_ml WITH (
    format = 'PARQUET',
    parquet_compression = 'SNAPPY',
    external_location = 's3://<MY_BUCKET>/saas-sales-ml/'
) AS 
USING EXTERNAL FUNCTION translate_text(text_col VARCHAR, sourcelang VARCHAR, targetlang VARCHAR, terminologyname VARCHAR) RETURNS VARCHAR LAMBDA 'textanalytics-udf'
SELECT 
row_id,
order_id,
date_parse("order_date",'%m/%d/%Y') as order_date,
date_key,
contact_name,
country_en,
translate_text(country_en, 'en', 'de', NULL) as country_de,
translate_text(country_en, 'en', 'es', NULL) as country_es,
translate_text(country_en, 'en', 'it', NULL) as country_it,
city_en,
translate_text(city_en, 'en', 'de', NULL) as city_de,
translate_text(city_en, 'en', 'es', NULL) as city_es,
translate_text(city_en, 'en', 'it', NULL) as city_it,
region,
subregion,
customer,
customer_id,
industry_en,
translate_text(industry_en, 'en', 'de', NULL) as industry_de,
translate_text(industry_en, 'en', 'es', NULL) as industry_es,
translate_text(industry_en, 'en', 'it', NULL) as industry_it,
segment,
product,
license,
sales,
quantity,
discount,
profit
FROM qs_ml_blog_db.saas_sales
;

Run three simple queries, one per column, to check the generation of the new columns with the translation was successful. We include a screenshot after each query showing its results.

SELECT 
distinct(country_en),
country_de,
country_es,
country_it
FROM qs_ml_blog_db.saas_sales_ml 
ORDER BY country_en
limit 10
;

Original and translated values for column Country

SELECT 
distinct(city_en),
city_de,
city_es,
city_it
FROM qs_ml_blog_db.saas_sales_ml 
ORDER BY city_en
limit 10
;

Original and translated values for column City

SELECT 
distinct(industry_en),
industry_de,
industry_es,
industry_it
FROM qs_ml_blog_db.saas_sales_ml 
ORDER BY industry_en
limit 10
;

Original and translated values for column Industry

Now you can use the new table saas_sales_ml as input to create a dataset in QuickSight.

Create a dataset in QuickSight

To create your dataset in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose Create a dataset.
  3. Choose Athena.
  4. For Data source name¸ enter athena_primary.
  5. For Athena workgroup¸ choose primary.
  6. Choose Create data source.
    New Athena data source
  7. Select the saas_sales_ml table previously created and choose Select.
    Choose your table
  8. Choose to import the table to SPICE and choose Visualize to start creating the new dashboard.
    Finish dataset creation

In the analysis section, you receive a message that informs you that the table was successfully imported to SPICE.

SPICE import complete

Create the parameter and control to select the language

To create the parameter and associate the control that you use to select the language for the dashboard, complete the following steps:

  1. In the analysis section, choose Parameters and Create one.
  2. For Name, enter Language.
  3. For Data type, choose String.
  4. For Values, select Single value.
  5. For Static default value, enter English.
  6. Choose Create.
    Create new parameter
  7. To connect the parameter to a control, choose Control.
    Connect parameter to control
  8. For Display name, choose Language.
  9. For Style, choose Dropdown.
  10. For Values, select Specific values.
  11. For Define specific values, enter English, German, Italian, and French (one value per line).
  12. Select Hide Select all option from the control values if the parameter has a default configured.
  13. Choose Add.
    Define control properties

The control is now available, linked to the parameter and displayed in the Controls section of the current sheet in the analysis.

Language control preview

Create dynamic multilingual calculated fields

You’re now ready to create the calculated fields whose value will change based on the currently selected language.

  1. In the menu bar, choose Add and choose Add calculated field.
    Add calculated field
  2. Use the ifelse conditional function to evaluate the value of the Language parameter and select the correct column in the dataset to assign the value to the calculated field.
  3. Create the Country calculated field using the following expression:
    ifelse(
        ${Language} = 'English', {country_en},
        ${Language} = 'German', {country_de},
        ${Language} = 'Italian', {country_it},
        ${Language} = 'Spanish', {country_es},
        {country_en}
    )

  4. Choose Save.
    Calculated field definition in Amazon QuickSight
  5. Repeat the process for the City calculated field:
    ifelse(
        ${Language} = 'English', {city_en},
        ${Language} = 'German', {city_de},
        ${Language} = 'Italian', {city_it},
        ${Language} = 'Spanish', {city_es},
        {city_en}
    )

  6. Repeat the process for the Industry calculated field:
    ifelse(
        ${Language} = 'English', {industry_en},
        ${Language} = 'German', {industry_de},
        ${Language} = 'Italian', {industry_it},
        ${Language} = 'Spanish', {industry_es},
        {industry_en}
    )

The calculated fields are now available and ready to use in the analysis.

Calculated fields available in analysis

Create an analysis with calculated multilingual calculated fields

Create an analysis with two donut charts and a pivot table that use the three multilingual fields. In the subtitle of the visuals, use the string Language: <<$Language>> to display the currently selected language. The following screenshot shows our analysis.

Analysis with Language control - English

If you choose a new language from the Language control, the visuals adapt accordingly. The following screenshot shows the analysis in Italian.

Analysis with Language control - Italian

You’re now ready to publish the analysis as a dashboard.

Publish the multilingual dashboard

In the menu bar, choose Share and Publish dashboard.

Publish dashboard menu

Publish the new dashboard as “Multilingual dashboard,” leave the advanced publish options at their default values, and choose Publish dashboard.

Publish dashboard with name

The dashboard is now ready.

Published dashboard

We can take the multilingual features one step further by embedding the dashboard and controlling the parameters in the external page using the Amazon QuickSight Embedding SDK.

Create parametric headers and titles for visuals for use in an embedded dashboard

When embedding an QuickSight dashboard, the locale and parameters’ values can be set programmatically from JavaScript. This can be useful to set default values and change the settings for localization and the default data language. The following steps show how to use these features by modifying the dashboard we have created so far, embedding it in an HTML page, and using the Amazon QuickSight Embedding SDK to dynamically set the value of parameters used to display titles, legends, headers, and more in translated text. The full code for the HTML page is also provided in the appendix of this post.

Create new parameters for the titles and the headers of the visuals in the analysis, the sheet name, visuals legends, and control labels as per the following table.

Name Data type Values Static default value
city String Single value City
country String Single value Country
donut01title String Single value Sales by Country
donut02title String Single value Quantity by Industry
industry String Single value Industry
Language String Single value English
languagecontrollabel String Single value Language
pivottitle String Single value Sales by Country, City and Industy
sales String Single value Sales
sheet001name String Single value Summary View

The parameters are now available on the Parameters menu.

You can now use the parameters inside each sheet title, visual title, legend title, column header, axis label, and more in your analysis. The following screenshots provide examples that illustrate how to insert these parameters into each title.

First, we insert the sheet name.

Then we add the language control name.

We edit the donut charts’ titles.

Donut chart title

We also add the donut charts’ legend titles.

Donut chart legend title

In the following screenshot, we specify the pivot table row names.

Pivot table row names

We also specify the pivot table column names.

Pivot table column names

Publish the analysis to a new dashboard and follow the steps in the post Embed interactive dashboards in your apps and portals in minutes with Amazon QuickSight’s new 1-click embedding feature to embed the dashboard in an HTML page hosted in your website or web application.

The example HTML page provided in the appendix of this post contains one control to switch among the four languages you created in the dataset in the previous sections with the option to automatically sync the QuickSight UI locale when changing the language, and one control to independently change the UI locale as required.

The following screenshots provide some examples of combinations of data language and QuickSight UI locale.

The following is an example of English data language with the English QuickSight UI locale.

Embedded dashboard with English language and English locale

The following is an example of Italian data language with the synchronized Italian QuickSight UI locale.

Embedded dashboard with Italian language and synced Italian locale

The following screenshot shows German data language with the Japanese QuickSight UI locale.

Embedded dashboard with German language and Japanese locale

Conclusion

This post demonstrated how to automatically translate data using machine learning and build a multilingual dashboard with Athena, QuickSight, and Amazon Translate, and how to add advanced multilingual features with QuickSight embedded dashboards. You can use the same approach to display different values for dimensions as well as metrics depending on the values of one or more parameters.

QuickSight provides a 30-day free trial subscription for four users; you can get started immediately. You can learn more and ask questions about QuickSight in the Amazon QuickSight Community.

Appendix: Embedded dashboard host page

The full code for the HTML page is as follows:

<!DOCTYPE html>
<html>
    <head>
        <title>Amazon QuickSight Multilingual Embedded Dashboard</title>
        <script src="https://unpkg.com/[email protected]/dist/quicksight-embedding-js-sdk.min.js"></script>
        <script type="text/javascript">

            var url = "https://<<YOUR_AMAZON_QUICKSIGHT_REGION>>.quicksight.aws.amazon.com/sn/embed/share/accounts/<<YOUR_AWS_ACCOUNT_ID>>/dashboards/<<DASHBOARD_ID>>?directory_alias=<<YOUR_AMAZON_QUICKSIGHT_ACCOUNT_NAME>>"
            var defaultLanguageOptions = 'en_US'
            var dashboard

            var trns = {
                en_US: {
                    locale: "en-US",
                    language: "English",
                    languagecontrollabel: "Language",
                    sheet001name: "Summary View",
                    sales: "Sales",
                    country: "Country",
                    city: "City",
                    industry: "Industry",
                    quantity: "Quantity",
                    by: "by",
                    and: "and"
                },
                de_DE: {
                    locale: "de-DE",
                    language: "German",
                    languagecontrollabel: "Sprache",
                    sheet001name: "Zusammenfassende Ansicht",
                    sales: "Umsätze",
                    country: "Land",
                    city: "Stadt",
                    industry: "Industrie",
                    quantity: "Anzahl",
                    by: "von",
                    and: "und"
                },
                it_IT: {
                    locale: "it-IT",
                    language: "Italian",
                    languagecontrollabel: "Lingua",
                    sheet001name: "Prospetto Riassuntivo",
                    sales: "Vendite",
                    country: "Paese",
                    city: "Città",
                    industry: "Settore",
                    quantity: "Quantità",
                    by: "per",
                    and: "e"
                },
                es_ES: {
                    locale: "es-ES",
                    language: "Spanish",
                    languagecontrollabel: "Idioma",
                    sheet001name: "Vista de Resumen",
                    sales: "Ventas",
                    country: "Paìs",
                    city: "Ciudad",
                    industry: "Industria",
                    quantity: "Cantidad",
                    by: "por",
                    and: "y"
                }
            }

            function setLanguageParameters(l){

                return {
                            Language: trns[l]['language'],
                            languagecontrollabel: trns[l]['languagecontrollabel'],
                            sheet001name: trns[l]['sheet001name'],
                            donut01title: trns[l]['sales']+" "+trns[l]['by']+" "+trns[l]['country'],
                            donut02title: trns[l]['quantity']+" "+trns[l]['by']+" "+trns[l]['industry'],
                            pivottitle: trns[l]['sales']+" "+trns[l]['by']+" "+trns[l]['country']+", "+trns[l]['city']+" "+trns[l]['and']+" "+trns[l]['industry'],
                            sales: trns[l]['sales'],
                            country: trns[l]['country'],
                            city: trns[l]['city'],
                            industry: trns[l]['industry'],
                        }
            }

            function embedDashboard(lOpts, forceLocale) {

                var languageOptions = defaultLanguageOptions
                if (lOpts) languageOptions = lOpts

                var containerDiv = document.getElementById("embeddingContainer");
                containerDiv.innerHTML = ''

                parameters = setLanguageParameters(languageOptions)

                if(!forceLocale) locale = trns[languageOptions]['locale']
                else locale = forceLocale

                var options = {
                    url: url,
                    container: containerDiv,
                    parameters: parameters,
                    scrolling: "no",
                    height: "AutoFit",
                    loadingHeight: "930px",
                    width: "1024px",
                    locale: locale
                };

                dashboard = QuickSightEmbedding.embedDashboard(options);
            }

            function onLangChange(langSel) {

                var l = langSel.value

                if(!document.getElementById("changeLocale").checked){
                    dashboard.setParameters(setLanguageParameters(l))
                }
                else {
                    var selLocale = document.getElementById("locale")
                    selLocale.value = trns[l]['locale']
                    embedDashboard(l)
                }
            }

            function onLocaleChange(obj) {

                var locl = obj.value
                var lang = document.getElementById("lang").value

                document.getElementById("changeLocale").checked = false
                embedDashboard(lang,locl)

            }

            function onSyncLocaleChange(obj){

                if(obj.checked){
                    var selLocale = document.getElementById('locale')
                    var selLang = document.getElementById('lang').value
                    selLocale.value = trns[selLang]['locale']
                    embedDashboard(selLang, trns[selLang]['locale'])
                }            
            }

        </script>
    </head>

    <body onload="embedDashboard()">

        <div style="text-align: center; width: 1024px;">
            <h2>Amazon QuickSight Multilingual Embedded Dashboard</h2>

            <span>
                <label for="lang">Language</label>
                <select id="lang" name="lang" onchange="onLangChange(this)">
                    <option value="en_US" selected>English</option>
                    <option value="de_DE">German</option>
                    <option value="it_IT">Italian</option>
                    <option value="es_ES">Spanish</option>
                </select>
            </span>

            &nbsp;-&nbsp;
            
            <span>
                <label for="changeLocale">Sync UI Locale with Language</label>
                <input type="checkbox" id="changeLocale" name="changeLocale" onchange="onSyncLocaleChange(this)">
            </span>

            &nbsp;|&nbsp;

            <span>
                <label for="locale">QuickSight UI Locale</label>
                <select id="locale" name="locale" onchange="onLocaleChange(this)">
                    <option value="en-US" selected>English</option>
                    <option value="da-DK">Dansk</option>
                    <option value="de-DE">Deutsch</option>
                    <option value="ja-JP">日本語</option>
                    <option value="es-ES">Español</option>
                    <option value="fr-FR">Français</option>
                    <option value="it-IT">Italiano</option>
                    <option value="nl-NL">Nederlands</option>
                    <option value="nb-NO">Norsk</option>
                    <option value="pt-BR">Português</option>
                    <option value="fi-FI">Suomi</option>
                    <option value="sv-SE">Svenska</option>
                    <option value="ko-KR">한국어</option>
                    <option value="zh-CN">中文 (简体)</option>
                    <option value="zh-TW">中文 (繁體)</option>            
                </select>
            </span>
        </div>

        <div id="embeddingContainer"></div>

    </body>

</html>

About the Author

Author Francesco MarelliFrancesco Marelli is a principal solutions architect at Amazon Web Services. He is specialized in the design and implementation of analytics, data management, and big data systems. Francesco also has a strong experience in systems integration and design and implementation of applications. He is passionate about music, collecting vinyl records, and playing bass.

A serverless operational data lake for retail with AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, and Amazon QuickSight

Post Syndicated from Gandhi Raketla original https://aws.amazon.com/blogs/big-data/a-serverless-operational-data-lake-for-retail-with-aws-glue-amazon-kinesis-data-streams-amazon-dynamodb-and-amazon-quicksight/

Do you want to reduce stockouts at stores? Do you want to improve order delivery timelines? Do you want to provide your customers with accurate product availability, down to the millisecond? A retail operational data lake can help you transform the customer experience by providing deeper insights into a variety of operational aspects of your supply chain.

In this post, we demonstrate how to create a serverless operational data lake using AWS services, including AWS Glue, Amazon Kinesis Data Streams, Amazon DynamoDB, Amazon Athena, and Amazon QuickSight.

Retail operations is a critical functional area that gives retailers a competitive edge. An efficient retail operation can optimize the supply chain for a better customer experience and cost reduction. An optimized retail operation can reduce frequent stockouts and delayed shipments, and provide accurate inventory and order details. Today, a retailer’s channels aren’t just store and web—they include mobile apps, chatbots, connected devices, and social media channels. The data is both structured and unstructured. This coupled with multiple fulfillment options like buy online and pick up at store, ship from store, or ship from distribution centers, which increases the complexity of retail operations.

Most retailers use a centralized order management system (OMS) for managing orders, inventory, shipments, payments, and other operational aspects. These legacy OMSs are unable to scale in response to the rapid changes in retail business models. The enterprise applications that are key for efficient and smooth retail operations rely on a central OMS. Applications for ecommerce, warehouse management, call centers, and mobile all require an OMS to get order status, inventory positions of different items, shipment status, and more. Another challenge with legacy OMSs is they’re not designed to handle unstructured data like weather data and IoT data that could impact inventory and order fulfillment. A legacy OMS that can’t scale prohibits you from implementing new business models that could transform your customer experience.

A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. An operational data lake addresses this challenge by providing easy access to structured and unstructured operational data in real time from various enterprise systems. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, real-time analytics, and machine learning (ML)—to guide better decisions. This can ease the burden on OMSs that can instead focus on order orchestration and management.

Solution overview

In this post, we create an end-to-end pipeline to ingest, store, process, analyze, and visualize operational data like orders, inventory, and shipment updates. We use the following AWS services as key components:

  • Kinesis Data Streams to ingest all operational data in real time from various systems
  • DynamoDB, Amazon Aurora, and Amazon Simple Storage Service (Amazon S3) to store the data
  • AWS Glue DataBrew to clean and transform the data
  • AWS Glue crawlers to catalog the data
  • Athena to query the processed data
  • A QuickSight dashboard that provides insights into various operational metrics

The following diagram illustrates the solution architecture.

The data pipeline consists of stages to ingest, store, process, analyze, and finally visualize the data, which we discuss in more detail in the following sections.

Data ingestion

Orders and inventory data is ingested in real time from multiple sources like web applications, mobile apps, and connected devices into Kinesis Data Streams. Kinesis Data Streams is a massively scalable and durable real-time data streaming service. Kinesis Data Streams can continuously capture gigabytes of data per second from hundreds of thousands of sources, such as web applications, database events, inventory transactions, and payment transactions. Frontend systems like ecommerce applications and mobile apps ingest the order data as soon as items are added to a cart or an order is created. The OMS ingests orders when the order status changes. OMSs, stores, and third-party suppliers ingest inventory updates into the data stream.

To simulate orders, an AWS Lambda function is triggered by a scheduled Amazon CloudWatch event every minute to ingest orders to a data stream. This function simulates the typical order management system lifecycle (order created, scheduled, released, shipped, and delivered). Similarly, a second Lambda function is triggered by a CloudWatch event to generate inventory updates. This function simulates different inventory updates such as purchase orders created from systems like the OMS or third-party suppliers. In a production environment, this data would come from frontend applications and a centralized order management system.

Data storage

There are two types of data: hot and cold data. Hot data is consumed by frontend applications like web applications, mobile apps, and connected devices. The following are some example use cases for hot data:

  • When a customer is browsing products, the real-time availability of the item must be displayed
  • Customers interacting with Alexa to know the status of the order
  • A call center agent interacting with a customer needs to know the status of the customer order or its shipment details

The systems, APIs, and devices that consume this data need the data within seconds or milliseconds of the transactions.

Cold data is used for long-term analytics like orders over a period of time, orders by channel, top 10 items by number of orders, or planned vs. available inventory by item, warehouse, or store.

For this solution, we store orders hot data in DynamoDB. DynamoDB is a fully managed NoSQL database that delivers single-digit millisecond performance at any scale. A Lambda function processes records in the Kinesis data stream and stores it in a DynamoDB table.

Inventory hot data is stored in an Amazon Aurora MySQL-Compatible Edition database. Inventory is transactional data that requires high consistency so that customers aren’t over-promised or under-promised when they place orders. Aurora MySQL is fully managed database that is up to five times faster than standard MySQL databases and three times faster than standard PostgreSQL databases. It provides the security, availability, and reliability of commercial databases at a tenth of the cost.

Amazon S3 is object storage built to store and retrieve any amount of data from anywhere. It’s a simple storage service that offers industry-leading durability, availability, performance, security, and virtually unlimited scalability at very low cost. Order and inventory cold data is stored in Amazon S3.

Amazon Kinesis Data Firehose reads the data from the Kinesis data stream and stores it in Amazon S3. Kinesis Data Firehose is the easiest way to load streaming data into data stores and analytics tools. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, Amazon OpenSearch Service, and Splunk, enabling near-real-time analytics.

Data processing

The data processing stage involves cleaning, preparing, and transforming the data to help downstream analytics applications easily query the data. Each frontend system might have a different data format. In the data processing stage, data is cleaned and converted into a common canonical form.

For this solution, we use DataBrew to clean and convert orders into a common canonical form. DataBrew is a visual data preparation tool that makes it easy for data analysts and data scientists to prepare data with an interactive, point-and-click visual interface without writing code. DataBrew provides over 250 built-in transformations to combine, pivot, and transpose the data without writing code. The cleaning and transformation steps in DataBrew are called recipes. A scheduled DataBrew job applies the recipes to the data in an S3 bucket and stores the output in a different bucket.

AWS Glue crawlers can access data stores, extract metadata, and create table definitions in the AWS Glue Data Catalog. You can schedule a crawler to crawl the transformed data and create or update the Data Catalog. The AWS Glue Data Catalog is your persistent metadata store. It’s a managed service that lets you store, annotate, and share metadata in the AWS Cloud in the same way you would in an Apache Hive metastore. We use crawlers to populate the Data Catalog with tables.

Data analysis

We can query orders and inventory data from S3 buckets using Athena. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Views are created in Athena that can be consumed by business intelligence (BI) services like QuickSight.

Data visualization

We generate dashboards using QuickSight. QuickSight is a scalable, serverless, embeddable BI service powered by ML and built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

QuickSight also has features to forecast orders, detect anomalies in the order, and provide ML-powered insights. We can create analyses such as orders over a period of time, orders split by channel, top 10 locations for orders, or order fulfillment timelines (the time it took from order creation to order delivery).

Walkthrough overview

To implement this solution, you complete the following high-level steps:

  1. Create solution resources using AWS CloudFormation.
  2. Connect to the inventory database.
  3. Load the inventory database with tables.
  4. Create a VPC endpoint using Amazon Virtual Private Cloud (Amazon VPC).
  5. Create gateway endpoints for Amazon S3 on the default VPC.
  6. Enable CloudWatch rules via Amazon EventBridge to ingest the data.
  7. Transform the data using AWS Glue.
  8. Visualize the data with QuickSight.

Prerequisites

Complete the following prerequisite steps:

  1. Create AWS account if you don’t have done already.
  2. Sign up for QuickSight if you’ve never used QuickSight in this account before. To use the forecast ability in QuickSight, sign up for the Enterprise Edition.

Create resources with AWS CloudFormation

To launch the provided CloudFormation template, complete the following steps:

  1. Choose Launch Stack:
  2. Choose Next.
  3. For Stack name, enter a name.
  4. Provide the following parameters:
    1. The name of the S3 bucket that holds all the data for the data lake.
    2. The name of the database that holds the inventory tables.
    3. The database user name.
    4. The database password.
  5. Enter any tags you want to assign to the stack and choose Next.
  6. Select the acknowledgement check boxes and choose Create stack.

The stack takes 5–10 minutes to complete.

On the AWS CloudFormation console, you can navigate to the stack’s Outputs tab to review the resources you created.

If you open the S3 bucket you created, you can observe its folder structure. The stack creates sample order data for the last 7 days.

Connect to the inventory database

To connect to your database in the query editor, complete the following steps:

  1. On the Amazon RDS console, choose the Region you deployed the stack in.
  2. In the navigation pane, choose Query Editor.

    If you haven’t connected to this database before, the Connect to database page opens.
  3. For Database instance or cluster, choose your database.
  4. For Database username, choose Connect with a Secrets Manager ARN.
    The database user name and password provided during stack creation are stored in AWS Secrets Manager. Alternatively, you can choose Add new database credentials and enter the database user name and password you provided when creating the stack.
  5. For Secrets Manager ARN, enter the value for the key InventorySecretManager from the CloudFormation stack outputs.
  6. Optionally, enter the name of your database.
  7. Choose Connect to database.

Load the inventory database with tables

Enter the following DDL statement in the query editor and choose Run:

CREATE TABLE INVENTORY (
    ItemID varchar(25) NOT NULL,
    ShipNode varchar(25) NOT NULL,
    SupplyType varchar(25) NOT NULL,
    SupplyDemandType varchar(25) NOT NULL,
    ItemName varchar(25),
    UOM varchar(10),
    Quantity int(11) NOT NULL,
    ETA varchar(25)	 ,
    UpdatedDate DATE,
    PRIMARY KEY (ItemID,ShipNode,SupplyType)
);

Create a VPC endpoint

To create your VPC endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for rds and choose the service name ending with rds-data.
  6. For VPC, choose the default VPC.
  7. Leave the remaining settings at their default and choose Create endpoint.

Create a gateway endpoint for Amazon S3

To create your gateway endpoint, complete the following steps:

  1. On the Amazon VPC console, choose VPC Dashboard.
  2. Choose Endpoints in the navigation pane.
  3. Choose Create Endpoint.
  4. For Service category, select AWS services.
  5. For Service name, search for S3 and choose the service name with type Gateway.
  6. For VPC, choose the default VPC.
  7. For Configure route tables, select the default route table.
  8. Leave the remaining settings at their default and choose Create endpoint.

Wait for both the gateway endpoint and VPC endpoint status to change to Available.

Enable CloudWatch rules to ingest the data

We created two CloudWatch rules via the CloudFormation template to ingest the order and inventory data to Kinesis Data Streams. To enable the rules via EventBridge, complete the following steps:

  1. On the CloudWatch console, under Events in the navigation pane, choose Rules.
  2. Make sure you’re in the Region where you created the stack.
  3. Choose Go to Amazon EventBridge.
  4. Select the rule Ingest-Inventory-Update-Schedule-Rule and choose Enable.
  5. Select the rule Ingest-Order-Schedule-Rule and choose Enable.

After 5–10 minutes, the Lambda functions start ingesting orders and inventory updates to their respective streams. You can check the S3 buckets orders-landing-zone and inventory-landing-zone to confirm that the data is being populated.

Perform data transformation

Our CloudFormation stack included a DataBrew project, a DataBrew job that runs every 5 minutes, and two AWS Glue crawlers. To perform data transformation using our AWS Glue resources, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose the project OrderDataTransform.

    You can review the project and its recipe on this page.
  3. In the navigation pane, choose Jobs.
  4. Review the job status to confirm it’s complete.
  5. On the AWS Glue console, choose Crawlers in the navigation pane.
    The crawlers crawl the transformed data and update the Data Catalog.
  6. Review the status of the two crawlers, which run every 15 minutes.
  7. Choose Tables in the navigation pane to view the two tables the crawlers created.
    If you don’t see these tables, you can run the crawlers manually to create them.

    You can query the data in the tables with Athena.
  8. On the Athena console, choose Query editor.
    If you haven’t created a query result location, you’re prompted to do that first.
  9. Choose View settings or choose the Settings tab.
  10. Choose Manage.
  11. Select the S3 bucket to store the results and choose Choose.
  12. Choose Query editor in the navigation pane.
  13. Choose either table (right-click) and choose Preview Table to view the table contents.

Visualize the data

If you have never used QuickSight in this account before, complete the prerequisite step to sign up for QuickSight. To use the ML capabilities of QuickSight (such as forecasting) sign up for the Enterprise Edition using the steps in this documentation.

While signing up for QuickSight, make sure to use the same region where you created the CloudFormation stack.

Grant QuickSight permissions

To visualize your data, you must first grant relevant permissions to QuickSight to access your data.

  1. On the QuickSight console, on the Admin drop-down menu, choose Manage QuickSight.
  2. In the navigation pane, choose Security & permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. Select Amazon Athena.
  5. Select Amazon S3 to edit QuickSight access to your S3 buckets.
  6. Select the bucket you specified during stack creation (for this post, operational-datalake).
  7. Choose Finish.
  8. Choose Save.

Prepare the datasets

To prepare your datasets, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena.
  4. For Data source name, enter retail-analysis.
  5. Choose Validate connection.
  6. After your connection is validated, choose Create data source.
  7. For Database, choose orderdatalake.
  8. For Tables, select orders_clean.
  9. Choose Edit/Preview data.
  10. For Query mode, select SPICE.
    SPICE (Super-fast, Parallel, In-memory Calculation Engine) is the robust in-memory engine that QuickSight uses.
  11. Choose the orderdatetime field (right-click), choose Change data type, and choose Date.
  12. Enter the date format as MM/dd/yyyy HH:mm:ss.
  13. Choose Validate and Update.
  14. Change the data types of the following fields to QuickSight geospatial data types:
    1. billingaddress.zipcode – Postcode
    2. billingaddress.city – City
    3. billingaddress.country – Country
    4. billingaddress.state – State
    5. shippingaddress.zipcode – Postcode
    6. shippingaddress.city – City
    7. shippingaddress.country – Country
    8. shippingaddress.state – State
  15. Choose Save & publish.
  16. Choose Cancel to exit this page.

    Let’s create another dataset for the Athena table inventory_landing_zone.
  17. Follow steps 1–7 to create a new dataset. For Table selection, choose inventory_landing_zone.
  18. Choose Edit/Preview data.
  19. For Query mode, select SPICE.
  20. Choose Save & publish.
  21. Choose Cancel to exit this page.

    Both datasets should now be listed on the Datasets page.
  22. Choose each dataset and choose Refresh now.
  23. Select Full refresh and choose Refresh.

To set up a scheduled refresh, choose Schedule a refresh and provide your schedule details.

Create an analysis

To create an analysis in QuickSight, complete the following steps:

  1. On the QuickSight console, choose Analyses in the navigation pane.
  2. Choose New analysis.
  3. Choose the orders_clean dataset.
  4. Choose Create analysis.
  5. To adjust the theme, choose Themes in the navigation pane, choose your preferred theme, and choose Apply.
  6. Name the analysis retail-analysis.

Add visualizations to the analysis

Let’s start creating visualizations. The first visualization shows orders created over time.

  1. Choose the empty graph on the dashboard and for Visual type¸ choose the line chart.
    For more information about visual types, see Visual types in Amazon QuickSight.
  2. Under Field wells, drag orderdatetime to X axis and ordernumber to Value.
  3. Set ordernumber to Aggregate: Count distinct.

    Now we can filter these orders by Created status.
  4. Choose Filter in the navigation pane and choose Create one.
  5. Search for and choose status.
  6. Choose the status filter you just created.
  7. Select Created from the filter list and choose Apply.
  8. Choose the graph (right-click) and choose Add forecast.
    The forecasting ability is only available in the Enterprise Edition. QuickSight uses a built-in version of the Random Cut Forest (RCF) algorithm. For more information, refer to Understanding the ML algorithm used by Amazon QuickSight.
  9. Leave the settings as default and choose Apply.
  10. Rename the visualization to “Orders Created Over Time.”

If the forecast is applied successfully, the visualization shows the expected number of orders as well as upper and lower bounds.

If you get the following error message, allow for the data to accumulate for a few days before adding the forecast.

Let’s create a visualization on orders by location.

  1. On the Add menu, choose Add visual.
  2. Choose the points on map visual type.
  3. Under Field wells, drag shippingaddress.zipcode to Geospatial and ordernumber to Size.
  4. Change ordernumber to Aggregate: Count distinct.

    You should now see a map indicating the orders by location.
  5. Rename the visualization accordingly.

    Next, we create a drill-down visualization on the inventory count.
  6. Choose the pencil icon.
  7. Choose Add dataset.
  8. Select the inventory_landing_zone dataset and choose Select.
  9. Choose the inventory_landing_zone dataset.
  10. Add the vertical bar chart visual type.
  11. Under Field wells, drag itemname, shipnode, and invtype to X axis, and quantity to Value.
  12. Make sure that quantity is set to Sum.

    The following screenshot shows an example visualization of order inventory.
  13. To determine how many face masks were shipped out from each ship node, choose Face Masks (right-click) and choose Drill down to shipnode.
  14. You can drill down even further to invtype to see how many face masks in a specific ship node are in which status.

The following screenshot shows this drilled-down inventory count.

As a next step, you can create a QuickSight dashboard from the analysis you created. For instructions, refer to Tutorial: Create an Amazon QuickSight dashboard.

Clean up

To avoid any ongoing charges, on the AWS CloudFormation console, select the stack you created and choose Delete. This deletes all the created resources. On the stack’s Events tab, you can track the progress of the deletion, and wait for the stack status to change to DELETE_COMPLETE.

The Amazon EventBridge rules generate orders and inventory data every 15 minutes, to avoid generating huge amount of data, please ensure to delete the stack after testing the blog.

If the deletion of any resources fails, ensure that you delete them manually. For deleting Amazon QuickSight datasets, you can follow these instructions. You can delete the QuickSight Analysis using these steps. For deleting the QuickSight subscription and closing the account, you can follow these instructions.

Conclusion

In this post, we showed you how to use AWS analytics and storage services to build a serverless operational data lake. Kinesis Data Streams lets you ingest large volumes of data, and DataBrew lets you cleanse and transform the data visually. We also showed you how to analyze and visualize the order and inventory data using AWS Glue, Athena, and QuickSight. For more information and resources for data lakes on AWS, visit Analytics on AWS.


About the Authors

Gandhi Raketla is a Senior Solutions Architect for AWS. He works with AWS customers and partners on cloud adoption, as well as architecting solutions that help customers foster agility and innovation. He specializes in the AWS data analytics domain.

Sindhura Palakodety is a Solutions Architect at AWS. She is passionate about helping customers build enterprise-scale Well-Architected solutions on the AWS Cloud and specializes in the containers and data analytics domains.

Visualize MongoDB data from Amazon QuickSight using Amazon Athena Federated Query

Post Syndicated from Soujanya Konka original https://aws.amazon.com/blogs/big-data/visualize-mongodb-data-from-amazon-quicksight-using-amazon-athena-federated-query/

In this post, you will learn how to use Amazon Athena Federated Query to connect a MongoDB database to Amazon QuickSight in order to build dashboards and visualizations.

Amazon Athena is a serverless interactive query service, based on Presto, that provides full ANSI SQL support to query a variety of standard data formats, including CSV, JSON, ORC, Avro, and Parquet, that are stored on Amazon Simple Storage Service (Amazon S3). For data that isn’t stored on Amazon S3, you can use Athena Federated Query to query the data in place or build pipelines that extract data from multiple data sources and store it in Amazon S3. With Athena Federated Query, you can run SQL queries across data that is stored in relational, non-relational, object, and custom data sources.

MongoDB is a popular NoSQL database option for websites and API endpoints. You can choose to deploy MongoDB as a self-hosted or fully-managed database. Databases are a popular choice for UI applications for managing user profiles, product catalogs, profile views, clickstream events, events from a connected device, and so on. QuickSight is a serverless business analytics service with built-in machine learning (ML) capabilities that can automatically look for patterns and outliers, and has the flexibility to embed dashboards in applications for a data-driven experience. You can also use QuickSight Q to allow users to ask questions using natural language and find answers to business questions immediately.

Overview of Athena Federated Query

Athena Federated Query uses data source connectors that run on AWS Lambda to run federated queries to other data sources. Prebuilt data source connectors are available for native stores, like Amazon Timestream, Amazon CloudWatch Logs, Amazon DynamoDB, and external sources like Vertica and SAP Hana. You can also write a connector by using the Athena Query Federation SDK. You can customize Athena’s prebuilt connectors for your own use, or modify a copy of the source code to create your own AWS Serverless Application Repository package.

Solution overview

The following architecture diagram shows the components of the Athena Federated Query MongoDB connector. It contains the following components:

  • A virtual private cloud (VPC) configured with public and private subnets across three Availability Zones.
  • A MongoDB cluster with customizable Amazon Elastic Block Store (Amazon EBS) storage deployed in private subnets and NAT gateways in a public subnet for outbound internet connectivity for MongoDB instances.
  • Bastion hosts in an auto scaling group with Elastic IP addresses to allow inbound SSH access.
  • An AWS Identity and Access Management (IAM) MongoDBnode role with Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3 permissions.
  • Security groups to enable communication within the VPC.
  • Lambda functions deployed in a private subnet accessing S3 buckets. Athena invokes the Lambda function, which in turn fetches the data from MongoDB and maps the response back to Athena.
  • AWS Secrets Manager through a VPC endpoint.

Prerequisites

To implement the solution, you need the following:

  • An AWS account to access AWS services.
  • An IAM user with permission to CreateRole, ListRoles, GetPolicy, and AttachRolePolicy.
  • An IAM user with an access key and secret key to configure an integrated development environment (IDE).
  • A MongoDB database. You can deploy a hosted MongoDB on Amazon EC2 or MongoDB Atlas in a VPC.
  • If you don’t have a QuickSight subscription configured, sign up for one. You can access the QuickSight free trial as part of the AWS Free Tier option.
  • A new secret in Secrets Manager to store your MongoDB user name and password.
  • Data loaded into your MongoDB database. For this example, we used an airline dataset. Load the sample data either from the MongoDB command line or the MongoDB Atlas user interface, if using MongoDB Atlas.

Configure a Lambda connector

The first step in the deployment is to set up the connector environment. Athena uses data source connectors that run on Lambda to run federated queries. To connect with MongoDB, use the Amazon Athena DocumentDB Connector, which also works with any endpoint that is compatible with MongoDB.

To configure a Lambda connector, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. To view a published list of data sources for Athena, select Amazon DocumentDB.
  3. Choose Next.
  4. In the Data source details section, give your data source a unique name; for example, ds_mongo.
    This will be the connection name that appears under Data sources for Athena.
  5. Choose Create Lambda function.
    This launches the Create function page in Lambda. The connector is deployed by using AWS Serverless Application Repository.
  6. For SecretNameOrPrefix, enter mongo.
  7. For SpillBucket, enter spl-mongo-athena-test.
  8. For AthenaCatalogName, enter us-west-mongo-cat.
  9. For DocDBConnectionString (the MongoDB connection), enter the following:
    mongodb://${docdb_instance_1_creds}@replace_with_mongodb_private_ip:27017/?authSource=admin&readPreference=secondaryPreferred&retryWrites=false; 

  10. For SecurityGroupIds, choose the security group that you want to associate with the function. Make sure that the security group of the MongoDB instance allows traffic from the Lambda function.
  11. For SpillPrefix, enter athena-spill.
  12. For Subnetids, enter the subnet IDs of subnets with MongoDB instances.
    In this case, LambdaMemory and LambdaTimeout have been set to the maximum values, but these can vary depending on the query run and memory requirements. SpillBucket is an S3 bucket in your account to store data that exceeds the Lambda function response size limits.
  13. Keep the rest as defaults.
  14. Select the acknowledgement check box choose Deploy.
    The connection function is launched based on the given parameters.
  15. Create a VPC endpoint to allow the Lambda function to access Amazon S3 through an endpoint.
    This is for the spill bucket. The spill bucket is a staging area for copying the results of the queries that are performed on MongoDB via Athena federation. This is so that the Lambda function in the VPC can access Amazon S3.
  16. Go back to the Athena console.
  17. Under Connection details, for Lambda function, choose the newly created Lambda function.
  18. Choose Next.

  19. To verify the connection, on the Athena console, choose Data sources, then choose ds_mongo.
    Associated databases from the connection should be listed.

    You should now be able to query the datasets from the Athena query editor by using SQL.
  20. In the query editor, for Data Source, choose ds_mongo.

Athena federates the query using the connector, which invokes the Lambda function. Then the query is performed by the function on MongoDB, and the query results are translated back to Athena. The following is a sample query that was performed on the airlines dataset.

Create a dataset on QuickSight to read the data from MongoDB

Before you launch QuickSight for the first time in an AWS account, you must set up an account. For instructions, see Signing in to Amazon QuickSight.

After the initial setup, you can create a dataset with Athena as the source. The QuickSight service role needs permission to invoke the Lambda function that connects MongoDB. The aws-quicksight-service-role-v0 service role is automatically created with the QuickSight account.

To create a dataset in QuickSight, complete the following steps:

  1. On the IAM console, in the navigation pane, choose Roles.
  2. Search for the role aws-quicksight-service-role-v0 and add the permission Lambda _fullaccess.
    In an organization, there could be different data stores based on data load and consumption patterns. Examples include catalog or manual data that is associated with products in a MongoDB or key-value index store, transactions or sales data in a SQL database, and images or video clips that are associated with the product in an object store.
    In this case, an airlines table from MongoDB is joined with a flat file that contains information on the airports.
  3. Use the QuickSight cross-data store feature to join data from different sources on common fields.
  4. We then update the data types for our geographic fields like fields like city, country, latitude, and longitude so we can build maps later.
  5. You can also create calculated fields while preparing your dataset, which allows you to reuse them in other QuickSight analyses.

With a few clicks, you should be able to create a dashboard with the published dataset. For instance, you can plot your data on a map, show trends in a line chart, and add autonarratives from the list of Suggested Insights to create the analysis shown in the following screenshot.

Clean up

Make sure to clean up your resources to avoid resource spend and associated costs. You need to delete the EC2 instances with MongoDB. In the case of MongoDB Atlas, you can delete the databases and tables. Delete the Athena data source ds_mongo and unsubscribe your QuickSight account from the Manage QuickSight admin page.

Conclusion

With QuickSight and Athena Federated Query, organizations can access additional data sources beyond those already supported by QuickSight. If you have data in sources other than Amazon S3, you can use Athena Federated Query to analyze the data in place or build pipelines that extract and store data in Amazon S3. Athena now also supports cross-account federated queries to enable teams of analysts, data scientists, and data engineers to query data stored in other AWS accounts. Try connecting to proprietary data formats and sources, or build new user-defined functions, with the Athena Query Federation SDK.


About the Author

Soujanya Konka is a Solutions Architect and Analytics specialist at AWS, focused on helping customers build their ideas on cloud. Expertise in design and implementation of business information systems and Data warehousing solutions. Before joining AWS, Soujanya has had stints with companies such as HSBC, Cognizant.

Nilesh Parekh is a Partner Solution Architect with ISV India segment. Nilesh help assist partner to review and remediate their workload running on AWS based on the AWS Well-Architected and Foundational Technical Review best practices. He also helps assist partners on Application Modernizations and delivering POCs.

Analyze Amazon Ion datasets using Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/analyze-amazon-ion-datasets-using-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon Ion is a richly typed, self-describing, hierarchical data serialization format offering interchangeable binary and text representations. The text format extends JSON (meaning all JSON files are valid Ion files), and is easy to read and author, supporting rapid prototyping. The binary representation is efficient to store, transmit, and skip-scan parse. The rich type system provides unambiguous semantics for long-term preservation of data that can survive multiple generations of software evolution.

Athena now supports querying and writing data in Ion format. The Ion format is currently used by internal Amazon teams, by external services such as Amazon Quantum Ledger Database (Amazon QLDB) and Amazon DynamoDB (which can be exported into Ion), and in the open-source SQL query language PartiQL.

In this post, we discuss use cases and the unique features Ion offers, followed by examples of querying Ion with Athena. For demonstration purposes, we use the transformed version of the City Lots San Francisco dataset.

Features of Ion

In this section, we discuss some of the unique features that Ion offers:

  • Type system
  • Dual format
  • Efficiency gains
  • Skip scanning

Type system

Ion extends JSON, adding support for more precise data types to improve interpretability, simplify processing, and avoid rounding errors. These high precision numeric types are essential for financial services, where fractions of a cent on every transaction add up. Data types that are added are arbitrary-size integers, binary floating-point numbers, infinite-precision decimals, timestamps, CLOBS, and BLOBS.

Dual format

Users can be presented with a familiar text-based representation while benefiting from the performance efficiencies of a binary format. The interoperability between the two formats enables you to rapidly discover, digest, and interpret data in a familiar JSON-like representation, while underlying applications benefit from a reduction in storage, memory, network bandwidth, and latency from the binary format. This means you can write plain text queries that run against both text-based and binary-based Ion. You can rewrite parts of your data in text-based Ion when you need human readable data during development and switch to binary in production.

When debugging a process, the ability for systems engineers to locate data and understand it as quickly as possible is vital. Ion provides mechanisms to move between binary and a text-based representation, optimizing for both the human and the machine. Athena supports querying and writing data in both of these Ion formats. The following is an example Ion text document taken from the transformed version of the citylots dataset:

{ "type": "Feature"
, "properties": { "MAPBLKLOT": "0004002"
                 ,"BLKLOT": "0004002"
                 ,"BLOCK_NUM": "0004"
                 , "LOT_NUM": "002"
                 , "FROM_ST": "0"
                 , "TO_ST": "0"
                 , "STREET": "UNKNOWN"
                 , "ST_TYPE": null
                 , "ODD_EVEN": "E" }
, "geometry": { "type": "Polygon"
               , "coordinates": [ [ [ -122.415701204606876, 37.808327252671461, 0.0 ],
                                    [ -122.415760743593196, 37.808630700240904, 0.0 ],
                                    [ -122.413787891332404, 37.808566801319841, 0.0 ],
                                    [ -122.415701204606876, 37.808327252671461, 0.0 ] ] ] } }

Efficiency gains

Binary-encoded Ion reduces file size by moving repeated values, such as field names, into a symbol table. Symbol tables reduce CPU and read latency by limiting the validation of character encoding to the single instance of the value in the symbol table.

For example, a company that operates at Amazon’s scale can produce large volumes of application logs. When compressing Ion and JSON logs, we noticed approximately 35% less CPU time to compress the log, which produced an average of roughly 26% smaller files. Log files are critical when needed but costly to retain, so the reduction in file sizes combined with the read performance gains from symbol tables helps when handling these logs. The following is an example of file size reduction with the citylots JSON dataset when converted to Ion binary with GZIP and ZSTD compression:

77MB    citylots.ion
 17MB    citylots.ion.gz
 15MB    citylots.ion.zst
181MB    citylots.json
 22MB    citylots.json.gz
 18MB    citylots.json.zst

Skip-scanning

In a textual format, every byte must be read and interpreted, but because Ion’s binary format is a TLV (type-length-value) encoding, an application may skip over elements that aren’t needed. This reduces query and application processing costs correlated with the proportion of unexamined fields.

For example, forensic analysis of application log data involves reading large volumes of data where only a fraction of the data is needed for diagnosis. In these scenarios, skip-scanning allows the binary Ion reader to move past irrelevant fields without the cost of reading the element stored within a field. This results in users experiencing lower resource usage and quicker response times.

Query Ion datasets using Athena

Athena now supports querying and creating Ion-formatted datasets via an Ion-specific SerDe, which in conjunction with IonInputFormat and IonOutputFormat allows you to read and write valid Ion data. Deserialization allows you to run SELECT queries on the Ion data so that it can be queried to gain insights. Serialization through CTAS or INSERT INTO queries allows you to copy datasets from existing tables’ values or generate new data in the Ion format.

The interchangeable nature of Ion text and Ion binary means that Athena can read datasets that contain both types of files. Because Ion is a superset of JSON, a table using the Ion SerDe can also include JSON files. Unlike the JSON SerDe, where every new line character indicates a new row, the Ion SerDe uses a combination of closing brackets and new line characters to determine new rows. This means that if each JSON record in your source documents isn’t on a single line, these files can now be read in Athena via the Ion SerDe.

Create external tables

Athena supports querying Ion-based datasets by defining AWS Glue tables with the user-defined metadata. Let’s start with an example of creating an external table for a dataset stored in Ion text. The following is a sample row from the citylots dataset:

{
    type:"Feature",
    properties:{
        mapblklot:"0579021",
        blklot:"0579024",
        block_num:"0579",
        lot_num:"024",
        from_st:"2160",
        to_st:"2160",
        street:"PACIFIC",
        st_type:"AVE",
        odd_even:"E"
    },
    geometry:{
        type:"Polygon",coordinates:[[[-122.4308798855922, ...]]]
    }
}

To create an external table that has its data stored in Ion, you have two syntactic options.

First, you can specify STORED AS ION. This is a more concise method, and is best used for simple cases, when no additional properties are required. See the following code:

CREATE EXTERNAL TABLE city_lots_ion1 (
  type STRING, 
  properties struct<
    mapblklot:string,
    blklot:string,
    block_num:string,
    lot_num:string,
    from_st:string,
    to_st:string,
    street:string,
    st_type:string,
    odd_even:string>, 
  geometry struct<
    type:string,
    coordinates:array<array<array<decimal(18,15)>>>,
    multi_coordinates:array<array<array<array<decimal(18,15)>>>>>
)
STORED AS ION
LOCATION 's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_ion_binary/'

Alternatively, you can explicitly specify the Ion classpaths in ROW FORMAT SERDE, INPUTFORMAT, and OUTPUTFORMAT. Unlike the first method, you can specify a SERDEPROPERTIES clause here. In our example DDL, we added a SerDe property that allows values that are outside of the Hive data type ranges to overflow rather than fail the query:

CREATE EXTERNAL TABLE city_lots_ion2(
  type STRING, 
  properties struct<
    mapblklot:string,
    blklot:string,
    block_num:string,
    lot_num:string,
    from_st:string,
    to_st:string,
    street:string,
    st_type:string,
    odd_even:string>, 
  geometry struct<
    type:string,
    coordinates:array<array<array<decimal(18,15)>>>,
    multi_coordinates:array<array<array<array<decimal(18,15)>>>>>
)
ROW FORMAT SERDE 
  'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
 'ion.fail_on_overflow'='false'
 )
STORED AS INPUTFORMAT 
  'com.amazon.ionhiveserde.formats.IonInputFormat' 
OUTPUTFORMAT 
  'com.amazon.ionhiveserde.formats.IonOutputFormat'
LOCATION
  's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_ion_binary/'

Athena converts STORED AS ION into the explicit classpaths, so both tables look similar in the metastore. If we look in AWS Glue, we see both tables we just created have the same input format, output format, and SerDe serialization library.

Now that our table is created, we can run standard SELECT queries on the city_lots_ion table. Let’s run a query that specifies the block_num from our example row of Ion data to verify that we can read from the table:

-- QUERY
SELECT * FROM city_lots_ion1 WHERE properties.block_num='0579';

The following screenshot shows our results.

Use path extraction to read from specific fields

Athena supports further customization of how data is interpreted via SerDe properties. To specify these, you can add a WITH SERDEPROPERTIES clause, which is a subfield of the ROW FORMAT SERDE field.

In some situations, we may only care about some parts of the information. Let’s suppose we don’t want any of the geometry info from the citylots dataset, and only need a few of the fields in properties. One solution is to specify a search path using the path extractor SerDe property:

-- Path Extractor property
ion.<column>.path_extractor = <search path>

Path extractors are search paths that Athena uses to map the table columns to locations in the individual document. Full information on what can be done with path extractors is available on GitHub, but for our example, we focus on creating simple paths that use the names of each field as an index. In this case, the search path takes the form of a space-delimited set of indexes (and wraps it with parentheses) that indicate the location of each desired piece of information. We map the search paths to table columns by using the path extractor property.

By default, Athena builds path extractors dynamically based on column names unless overridden. This means that when we run our SELECT query on our city_lots_ion1 table, Athena builds the following search paths:

Default Extractors generated by Athena for city_lots_ion1.
-- Extracts the 'type' field to the 'type' column
    'ion.type.path_extractor' = '(type)'

-- Extracts the 'properties' field to the 'properties' column
    'ion.properties.path_extractor' = '(properties)'

-- Extracts the 'geometry' field to the 'geometry' column
    'ion.geometry.path_extractor' = '(geometry)'

Assuming we only care about the block and lot information from the properties struct, and the geometry type from the geometry struct, we can build search paths that map the desired fields from the row of data to table columns. First let’s build the search paths:

(properties mapblklot) - Search path for the mapblklot field in the properties struct
(properties blklot) - Search path for the blklot field in the properties struct
(properties block_num) - Search path for the block_num field in the properties struct
(properties lot_num) - Search path for the lot_num field in the properties struct
(geometry type) - Search path for the type field in the geometry struct

Now let’s map these search paths to table columns using the path extractor SerDe property. Because the search paths specify where to look for data, we are able to flatten and rename our datasets to better serve our purpose. For this example, let’s rename the mapblklot field to map_block_lot, blklot to block_lot, and the geometry type to shape:

 'ion.map_block_lot.path_extractor' = '(properties mapblklot)'
 'ion.block_lot.path_extractor' = '(properties blklot)'
 'ion.block_num.path_extractor' = '(properties block_num)'
 'ion.lot_num.path_extractor' = '(properties lot_num)'
 'ion.shape.path_extractor' = '(geometry type)'

Let’s put all of this together and create the city_blocks table:

CREATE EXTERNAL TABLE city_blocks (
    map_block_lot STRING,
    block_lot STRING,
    block_num STRING,
    lot_num STRING,
    shape STRING
)
ROW FORMAT SERDE
 'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
 'ion.map_block_lot.path_extractor' = '(properties mapblklot)',
 'ion.block_lot.path_extractor' = '(properties blklot)', 
 'ion.block_num.path_extractor' = '(properties block_num)',
 'ion.lot_num.path_extractor' = '(properties lot_num)',
 'ion.shape.path_extractor' = '(geometry type)'
 )
STORED AS ION
LOCATION 's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_ion_binary/'

Now we can run a select query on the city_blocks table, and see the results:

-- Select Query
SELECT * FROM city_blocks WHERE block_num='0579';

Utilizing search paths in this way enables skip-scan parsing when reading from Ion binary files, which allows Athena to skip over the unneeded fields and reduces the overall time it takes to run the query.

Use CTAS and UNLOAD for data transformation

Athena supports CREATE TABLE AS SELECT (CTAS), which creates a new table in Athena from the results of a SELECT statement from another query. Athena also supports UNLOAD, which writes query results to Amazon S3 from a SELECT statement to the specified data format.

Both CTAS and UNLOAD have a property to specify a format and a compression type. This allows you to easily convert Ion datasets to other data formats, such as Parquet or ORC, and vice versa, without needing to set up a complex extract, transform, and load (ETL) job. This is beneficial for situations when you want to transform your data, or know you will run repeated queries on a subset of your data and want to use some of the benefits inherent to columnar formats. Combining it with path extractors is especially helpful, because we’re only storing the data that we need in the new format.

Let’s use CTAS to convert our city_blocks table from Ion to Parquet, and compress it via GZIP. Because we have path extractors set up for the city_blocks table, we only need to convert a small portion of the original dataset:

CREATE TABLE city_blocks_parquet_gzip
WITH (format = 'PARQUET', write_compression='GZIP')
AS SELECT * FROM city_blocks;

We can now run queries against the city_block_parquet_gzip table, and should see the same result. To test this out, let’s run the same SELECT query we ran before on the Parquet table:

SELECT * FROM city_blocks_parquet_gzip WHERE block_num='0579';

When converting tables from another format to Ion, Athena supports the following compression codecs: ZSTD, BZIP2, GZIP, SNAPPY, and NONE. In addition to adding Ion as a new format for CTAS, we added the ion_encoding property, which allows you to choose whether the output files are created in Ion text or Ion binary. This allows for serialization of data from other formats back into Ion.

Let’s convert the original city_lots JSON file back to Ion, but this time we specify that we want to use ZSTD compression and a binary encoding.

The JSON file can be found at following location: s3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_json/

Because Ion is a superset of JSON, we can use the Ion SerDe to read this file:

CREATE EXTERNAL TABLE city_blocks_json_ion_serde (
    map_block_lot STRING,
    block_lot STRING,
    block_num STRING,
    lot_num STRING,
    shape STRING
)
ROW FORMAT SERDE
'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
'ion.map_block_lot.path_extractor' = '(properties mapblklot)',
'ion.block_lot.path_extractor' = '(properties blklot)',
'ion.block_num.path_extractor' = '(properties block_num)',
'ion.lot_num.path_extractor' = '(properties lot_num)',
'ion.shape.path_extractor' = '(geometry type)'
)
STORED AS ION
LOCATION 's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_json/'

Now let’s copy this table into our desired Ion binary form:

CREATE TABLE city_blocks_ion_zstd
WITH (format = 'ION', write_compression='ZSTD', ion_encoding='BINARY')
AS SELECT * FROM city_blocks_parquet_gzip

Finally, let’s run our verification SELECT statement to verify everything was created properly:

SELECT * FROM city_blocks_ion_zstd WHERE block_num='0579'; 

Use UNLOAD to store Ion data in Amazon S3

Sometimes we just want to reformat the data and don’t need to store the additional metadata to query the table. In this case, we can use UNLOAD, which stores the results of the query in the specified format in an S3 bucket.

Let’s test it out, using UNLOAD to convert the drivers_names table from Ion to ORC, compress it via ZLIB, and store it to an S3 bucket:

UNLOAD (SELECT * FROM city_blocks_ion_zstd WHERE block_num='0579') 
TO 's3://<your-s3-bucket>/athena-ion-blog/unload/orc_zlib/'
WITH (format = 'ORC', compression='ZLIB')

When you check in Amazon S3, you can find a new file in the ORC format.

Conclusion

This post talked about the new feature in Athena that allows you to query and create Ion datasets using standard SQL. We discussed use cases and unique features of the Ion format like type system, dual formats (Ion text and Ion binary), efficiency gains, and skip-scanning. You can get started with querying an Ion dataset stored in Amazon S3 by simply creating a table in Athena, and also converting existing datasets to Ion format and vice versa using CTAS and UNLOAD statements.

To learn more about querying Ion using Athena, refer to Amazon Ion Hive SerDe.

References


About the Authors

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Jacob Stein works on the Amazon Athena team as a Software Development Engineer. He led the project to add support for Ion in Athena. He loves working on technical problems unique to internet scale data, and is passionate about developing scalable solutions for distributed systems.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Pete Ford is a Sr. Technical Program Manager at Amazon.

Simplify your ETL and ML pipelines using the Amazon Athena UNLOAD feature

Post Syndicated from Dylan Qu original https://aws.amazon.com/blogs/big-data/simplify-your-etl-and-ml-pipelines-using-the-amazon-athena-unload-feature/

Many organizations prefer SQL for data preparation because they already have developers for extract, transform, and load (ETL) jobs and analysts preparing data for machine learning (ML) who understand and write SQL queries. Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

By default, Athena automatically writes SELECT query results in CSV format to Amazon S3. However, you might often have to write SELECT query results in non-CSV files such as JSON, Parquet, and ORC for various use cases. In this post, we walk you through the UNLOAD statement in Athena and how it helps you implement several use cases, along with code snippets that you can use.

Athena UNLOAD overview

CSV is the only output format used by the Athena SELECT query, but you can use UNLOAD to write the output of a SELECT query to the formats and compression that UNLOAD supports. When you use UNLOAD in a SELECT query statement, it writes the results into Amazon S3 in specified data formats of Apache Parquet, ORC, Apache Avro, TEXTFILE, and JSON.

Although you can use the CTAS statement to output data in formats other than CSV, those statements also require the creation of a table in Athena. The UNLOAD statement is useful when you want to output the results of a SELECT query in a non-CSV format but don’t require the associated table. For example, a downstream application might require the results of a SELECT query to be in JSON format, and Parquet or ORC might provide a performance advantage over CSV if you intend to use the results of the SELECT query for additional analysis.

In this post, we walk you through the following use cases for the UNLOAD feature:

  • Compress Athena query results to reduce storage costs and speed up performance for downstream consumers
  • Store query results in JSON file format for specific downstream consumers
  • Feed downstream Amazon SageMaker ML models that require files as input
  • Simplify ETL pipelines with AWS Step Functions without creating a table

Use case 1: Compress Athena query results

When you’re using Athena to process and create large volumes of data, storage costs can increase significantly if you don’t compress the data. Furthermore, uncompressed formats like CSV and JSON require you to store and transfer a large number of files across the network, which can increase IOPS and network costs. To reduce costs and improve downstream big data processing application performance such as Spark applications, a best practice is to store Athena output into compressed columnar compressed file formats such as ORC and Parquet.

You can use the UNLOAD statement in your Athena SQL statement to create compressed ORC and Parquet file formats. In this example, we use a 3 TB TPC-DS dataset to find all items returned between a store and a website. The following query joins the four tables: item, store_returns, web_returns, and customer_address:

UNLOAD (
		select *
		from store_returns, item, web_returns, customer_address
		where sr_item_sk = i_item_sk and
		wr_item_sk = i_item_sk and
		wr_refunded_addr_sk = ca_address_sk
	) to 's3://your-bucket/temp/athenaunload/usecase1/' with (
		format = 'PARQUET',
		compression = 'SNAPPY',
		partitioned_by = ARRAY['ca_location_type']
		
	)

The resulting query output when Snappy compressed and stored in Parquet format resulted in a 62 GB dataset. The same output in a non-compressed CSV format resulted in a 248 GB dataset. The Snappy compressed Parquet format yielded a 75% smaller storage size, thereby saving storage costs and resulting in faster performance.

Use case 2: Store query results in JSON file format

Some downstream systems to Athena such as web applications or third-party systems require the data formats to be in JSON format. The JSON file format is a text-based, self-describing representation of structured data that is based on key-value pairs. It’s lightweight, and is widely used as a data transfer mechanism by different services, tools, and technologies. In these use cases, the UNLOAD statement with the parameter format value of JSON can unload files in JSON file format to Amazon S3.

The following SQL extracts the returns data for a specific customer within a specific data range against the 3 TB catalog_returns table and stores it to Amazon S3 in JSON format:

UNLOAD (
		select cr_returned_date_sk, cr_returning_customer_sk, cr_catalog_page_sk, cr_net_loss
		from catalog_returns
		where cr_returned_date_sk = 2450821 and cr_returning_customer_sk = 11026691
	) to 's3://your-bucket/temp/athenaunload/usecase2/' with (
		format = 'JSON', compression = 'NONE'
	)

By default, Athena uses Gzip for JSON and TEXTFILE formats. You can set the compression to NONE to store the UNLOAD result without any compression. The query result is stored as the following JSON file:

{"cr_returned_date_sk":2450821,"cr_returning_customer_sk":11026691,"cr_catalog_page_sk":20.8,"cr_net_loss":53.31}

The query result can now be consumed by a downstream web application.

Use case 3: Feed downstream ML models

Analysts and data scientists rely on Athena for ad hoc SQL queries, data discovery, and analysis. They often like to quickly create derived columns such as aggregates or other features. These need to be written as files in Amazon S3 so a downstream ML model can directly read the files without having to rely on a table.

You can also parametrize queries using Athena prepared statements that are repetitive. Using the UNLOAD statement in a prepared statement provides the self-service capability to less technical users or analysts and data scientists to export files needed for their downstream analysis without having to write queries.

In the following example, we create derived columns and feature engineer for a downstream SageMaker ML model that predicts the best discount for catalog items in future promotions. We derive averages for quantity, list price, discount, and sales price for promotional items sold in stores where the promotion is not offered by mail or a special event. Then we restrict the results to a specific gender, marital, and educational status. We use the following query:

UNLOAD(
		Select i_item_id, 
	        avg(ss_quantity) avg_sales,
	        avg(ss_list_price) avg_list_price,
	        avg(ss_coupon_amt) avg_coupon_amt,
	        avg(ss_sales_price) avg_sales_price 
	 from store_sales, customer_demographics, date_dim, item, promotion
	 where cast(ss_sold_date_sk AS int) = d_date_sk and
	       ss_item_sk = i_item_sk and
	       ss_cdemo_sk = cd_demo_sk and
	       ss_promo_sk = p_promo_sk and
	       cd_gender = 'M' and 
	       cd_marital_status = 'M' and
	       cd_education_status = '4 yr Degree' and
	       (p_channel_email = 'N' or p_channel_event = 'N') and
	       d_year = 2001 
	 group by i_item_id
	 order by i_item_id
	) to 's3://your-bucket/temp/athenaunload/usecase3/' with (
		format = 'PARQUET',compression = 'SNAPPY'
	)

The output is written as Parquet files in Amazon S3 for a downstream SageMaker model training job to consume.

Use case 4: Simplify ETL pipelines with Step Functions

Step Functions is integrated with the Athena console to facilitate building workflows that include Athena queries and data processing operations. This helps you create repeatable and scalable data processing pipelines as part of a larger business application and visualize the workflows on the Athena console.

In this use case, we provide an example query result in Parquet format for downstream consumption. In this example, the raw data is in TSV format and gets ingested on a daily basis. We use the Athena UNLOAD statement to convert the data into Parquet format. After that, we send the location of the Parquet file as an Amazon Simple Notification Service (Amazon SNS) notification. Downstream applications can be notified via SNS to take further actions. One common example is to initiate a Lambda function that uploads the Athena transformation result into Amazon Redshift.

The following diagram illustrates the ETL workflow.

The workflow includes the following steps:

  1. Start an AWS Glue crawler pointing to the raw S3 bucket. The crawler updates the metadata of the raw table with new files and partitions.
  2. Invoke a Lambda function to clean up the previous UNLOAD result. This step is required because UNLOAD doesn’t write data to the specified location if the location already has data in it (UNLOAD doesn’t overwrite existing data). To reuse a bucket location as a destination for UNLOAD, delete the data in the bucket location, and then run the query again. Another common pattern is to UNLOAD data to a new partition with incremental data processing.
  3. Start an Athena UNLOAD query to convert the raw data into Parquet.
  4. Send a notification to downstream data consumers when the file is updated.

Set up resources with AWS CloudFormation

To prepare for querying both data sources, launch the provided AWS CloudFormation template:

Keep all the provided parameters and choose Create stack.

The CloudFormation template creates the following resources:

  • An Athena workgroup etl-workgroup, which holds the Athena UNLOAD queries.
  • A data lake S3 bucket that holds the raw table. We use the Amazon Customer Reviews Dataset in this post.
  • An Athena output S3 bucket that holds the UNLOAD result and query metadata.
  • An AWS Glue database.
  • An AWS Glue crawler pointing to the data lake S3 bucket.
  • A LoadDataBucket Lambda function to load the Amazon Customer Reviews raw data into the S3 bucket.
  • A CleanUpS3Folder Lambda function to clean up previous Athena UNLOAD result.
  • An SNS topic to notify downstream systems when the UNLOAD is complete.

When the stack is fully deployed, navigate to the Outputs tab of the stack on the AWS CloudFormation console and note the value of the following resources:

  • AthenaWorkgroup
  • AthenaOutputBucket
  • CleanUpS3FolderLambda
  • GlueCrawler
  • SNSTopic

Build a Step Functions workflow

We use the Athena Workflows feature to build the ETL pipeline.

  1. On the Athena console, under Jobs in the navigation pane, choose Workflows.
  2. Under Create Athena jobs with Step Functions workflows, for Query large datasets, choose Get started.
  3. Choose Create your own workflow.
  4. Choose Continue.

The following is a screenshot of the default workflow. Compare the default workflow against the earlier ETL workflow we described. The default workflow doesn’t contain a Lambda function invocation and has an additional GetQueryResult step.

Next, we add a Lambda Invoke step.

  1. Search for Lambda Invoke in the search bar.
  2. Choose the Lambda:Invoke step and drag it to above the Athena: StartQueryExecution step.
  3. Choose the Athena:GetQueryResults step (right-click) and choose Delete state.

  4. Now the workflow aligns with the earlier design.
  5. Choose the step Glue: StartCrawler.
  6. In the Configuration section, under API Parameters, enter the following JSON (provide the AWS Glue crawler name from the CloudFormation stack output):
    {
      "Name": "GlueCrawler"
    }

  7. Choose the step Glue: GetCrawler.
  8. In the Configuration section, under API Parameters, enter the following JSON:
    {
      "Name": "GlueCrawler"
    }

  9. Choose the step Lambda: Invoke.
  10. In the Configuration section, under API Parameters, for Function name, choose the function -CleanUpS3FolderLambda-.
  11. In the Payload section, enter the following JSON (include the Athena output bucket from the stack output):
    {
      "bucket_name": “AthenaOutputBucket”,
      "prefix": "parquet/"
    }

  12. Choose the step Athena: StartQueryExecution.
  13. In the right Configuration section, under API Parameters, enter the following JSON (provide the Athena output bucket and workgroup name):
    {
      "QueryString": "UNLOAD (SELECT * FROM \"athena_unload_blog\".\"reviews\" )  TO 's3://AthenaOutputBucket/parquet' WITH (format = 'PARQUET',compression = 'SNAPPY')",
      "WorkGroup": “AthenaWorkgroup”
    }

Notice the Wait for task to complete check box is selected. This pauses the workflow while the Athena query is running.

  1. Choose the step SNS: Publish.
  2. In the Configuration section, under API Parameters, for Topic, pick the SNSTopic created by the CloudFormation template.
  3. In the Message section, enter the following JSON to pass the data manifest file location to the downstream consumer:
    {
      "Input.$": "$.QueryExecution.Statistics.DataManifestLocation"
    }

For more information, refer to the GetQueryExecution response syntax.

  1. Choose Next.
  2. Review the generated code and choose Next.
  3. In the Permissions section, choose Create new role.
  4. Review the auto-generated permissions and choose Create state machine.
  5. In the Add additional permissions to your new execution role section, choose Edit role in IAM.
  6. Add permissions and choose Attach policies.
  7. Search for the AWSGlueConsoleFullAccess managed policy and attach it.

This policy grants full access to AWS Glue resources when using the AWS Management console. Generate a policy based on access activity in production following the least privilege principle.

Test the workflow

Next, we test out the Step Functions workflow.

  1. On the Athena console, under Jobs in the navigation pane, choose Workflows.
  2. Under State machines, choose the workflow we just created.
  3. Choose Execute, then choose Start execution to start the workflow.
  4. Wait until the workflow completes, then verify there are UNLOAD Parquet files in the bucket AthenaOutputBucket.

Clean up

To help prevent unwanted charges to your AWS account, delete the AWS resources that you used in this post.

  1. On the Amazon S3 console, choose the -athena-unload-data-lake bucket.
  2. Select all files and folders and choose Delete.
  3. Enter permanently delete as directed and choose Delete objects.
  4. Repeat these steps to remove all files and folders in the -athena-unload-output bucket.
  5. On the AWS CloudFormation console, delete the stack you created.
  6. Wait for the stack status to change to DELETE_COMPLETE.

Conclusion

In this post, we introduced the UNLOAD statement in Athena with some common use cases. We demonstrated how to compress Athena query results to reduce storage costs and improve performance, store query results in JSON file format, feed downstream ML models, and create and visualize ETL pipelines with Step Functions without creating a table.

To learn more, refer to the Athena UNLOAD documentation and Visualizing AWS Step Functions workflows from the Amazon Athena console.


About the Authors

Dylan Qu is a Specialist Solutions Architect focused on Big Data & Analytics with Amazon Web Services. He helps customers architect and build highly scalable, performant, and secure cloud-based solutions on AWS.

Harsha Tadiparthi is a Principal Solutions Architect focused on providing analytics and AI/ML strategies and solution designs to customers.

Detecting data drift using Amazon SageMaker

Post Syndicated from Shibu Nair original https://aws.amazon.com/blogs/architecture/detecting-data-drift-using-amazon-sagemaker/

As companies continue to embrace the cloud and digital transformation, they use historical data in order to identify trends and insights. This data is foundational to power tools, such as data analytics and machine learning (ML), in order to achieve high quality results.

This is a time where major disruptions are not only lasting longer, but also happening more frequently, as discussed in a McKinsey article on risk and resilience. Any disruption—a pandemic, hurricane, or even blocked sailing routes—has a major impact on the patterns of data and can create anomalous behavior.

ML models are dependent on data insights to help plan and support production-ready applications. With any disruptions, data drift can occur. Data drift is unexpected and undocumented changes to data structure, semantics, and/or infrastructure. If there is data drift, the model performance will degrade and no longer provide an accurate guidance. To mitigate the effects of the disruption, data drift needs to be detected and the ML models quickly trained and adjusted accordingly.

This blog post explains how to approach changing data patterns in the age of disruption and how to mitigate its effects on ML models. We also discuss the steps of building a feedback loop to capture the request data in the production environment and create a data pipeline to store the data for profiling and baselining. Then, we explain how Amazon SageMaker Clarify can help detect data drift.

How to detect data drift

There are three stages to detecting data drift: data quality monitoring, model quality monitoring, and drift evaluation (see Figure 1).

Stages in detecting data drift

Figure 1. Stages in detecting data drift

Data quality monitoring establishes a profile of the input data during model training, and then continuously compares incoming data with the profile. Deviations in the data profile signal a drift in the input data.

You can also detect drift through model quality monitoring, which requires capturing actual values that can be compared with the predictions. For example, using weekly demand forecasting, you can compare the forecast quantities one week later with the actual demand. Some use cases can require extra steps to collect actual values. For example, product recommendations may require you to ask a selected group of consumers for their feedback to the recommendation.

SageMaker Clarify provides insights into your trained models, including importance of model features and any biases towards certain segments of the input data. Changes of these attributes between re-trained models also signal drift. Drift evaluation constitutes the monitoring data and mechanisms to detect changes and triggering consequent actions. With Amazon CloudWatch, you can define rules and thresholds that prompt drift notifications.

Figure 2 illustrates a basic architecture with the data sources for training and production (on the left) and the observed data concerning drift (on the right). You can use Amazon SageMaker Data Wrangler, a visual data preparation tool, to clean and normalize your input data for your ML task. You can store the features that you defined for your models in the Amazon SageMaker Feature Store, a fully managed, purpose-built repository to store, update, retrieve, and share ML features.

The white, rectangular boxes in the architecture diagram represent the tasks for detecting data and model drift. You can integrate those tasks into your ML workflow with Amazon SageMaker Pipelines.

Basic architecture on how data drift is detected using Amazon SageMaker

Figure 2. Basic architecture on how data drift is detected using Amazon SageMaker

The drift observation data can be captured in tabular format, such as comma-separated values or Parquet, on Amazon Simple Storage Service (S3) and analyzed with Amazon Athena and Amazon QuickSight.

How to build a feedback loop

The baselining task establishes a data profile from training data. It uses Amazon SageMaker Model Monitor and runs before training or re-training the model. The baseline profile is stored on Amazon S3 to be referenced by the data drift monitoring job.

The data drift monitoring task continuously profiles the input data, compares it with baseline, and the results are captured in CloudWatch. This tasks runs on its own computation resources using Deequ, which checks that the monitoring job does not slow down your ML inference flow and scales with the data. The frequency of running this task can be adjusted to control cost, which can depend on how rapidly you anticipate that the data may change.

The model quality monitoring task computes model performance metrics from actuals and predicted values. The origin of these data points depends on the use case. Demand forecasting use cases naturally capture actuals that can be used to validate past predictions. Other use cases can require extra steps to acquire ground-truth data.

CloudWatch is a monitoring and observability service with which you can define rules to act on deviation in model performance or data drift. With CloudWatch, you can setup alerts to users via e-mail or SMS, and it can automatically start the ML model re-training process.

Run the baseline task on your updated data set before re-training your model. Use the SageMaker model registry to catalog your ML models for production, manage model versions, and control the associate training metrics.

Gaining insight into data and models

SageMaker Clarify provides greater visibility into your training data and models, helping identify and limit bias and explain predictions. For example, the trained models may consider some features more strongly than others when generating predictions. Compare the feature importance and bias between model-provided versions for a better understanding of the changes.

Conclusion

As companies continue to use data analytics and ML to inform daily activity, data drift may become a more common occurrence. Recognizing that drift can have a direct impact on models and production-ready applications, it is important to architect to identify potential data drift and avoid downgrading the models and negatively impacting results. Failure to capture changes in data can result in loss of process confidence, downgraded model accuracy, or a bottom-line impact to the business.

Query 10 new data sources with Amazon Athena

Post Syndicated from Scott Rigney original https://aws.amazon.com/blogs/big-data/query-10-new-data-sources-with-amazon-athena/

When we first launched Amazon Athena, our mission was to make it simple to query data stored in Amazon Simple Storage Service (Amazon S3). Athena customers found it easy to get started and develop analytics on petabyte-scale data lakes, but told us they needed to join their Amazon S3 data with data stored elsewhere. We added connectors to sources including Amazon DynamoDB and Amazon Redshift to give data analysts, data engineers, and data scientists the ability to run SQL queries on data stored in databases running on-premises or in the cloud alongside data stored in Amazon S3.

Today, thousands of AWS customers from nearly every industry use Athena federated queries to surface insights and make data-driven decisions from siloed enterprise data—using a single AWS service and SQL dialect.

We’re excited to expand your ability to derive insights from more of your data with today’s release of 10 new data source connectors, which include some of the most widely used data stores on the market.

New data sources for Athena

You can now use Athena to query and surface insights from 10 new data sources:

  • SAP HANA (Express Edition)
  • Teradata
  • Cloudera
  • Hortonworks
  • Snowflake
  • Microsoft SQL Server
  • Oracle
  • Azure Data Lake Storage (ADLS) Gen2
  • Azure Synapse
  • Google BigQuery

Today’s release greatly expands the number of data sources supported by Athena. For a complete list of supported data sources, see Using Athena Data Source Connectors.

To coincide with this release, we enhanced the Athena console to help you browse available sources and connect to your data in fewer steps. You can now search, sort, and filter the available connectors on the console, and then follow the guided setup wizard to connect to your data.

Just as before, we’ve open-sourced the new connectors to invite contributions from the developer community. For more information, see Writing a Data Source Connector Using the Athena Query Federation SDK.

Connect the dots in your analytics strategy with Athena

With the breadth of data storage options available today, it’s common for data-driven organizations to choose a data store that meets the requirements of specific use cases and applications. Although this flexibility is ideal for architects and developers, it can add complexity for analysts, data scientists, and data engineers, which prevents them from accessing the data they need. To get around this, many users resort to workarounds that often involve learning new programming languages and database concepts or building data pipelines to prepare the data before it can be analyzed. Athena helps cut through this complexity with support for over 25 data sources and its simple-to-use, pay-as-you-go, serverless design.

With Athena, you can use your existing SQL knowledge to extract insights from a wide range of data sources without learning a new language, developing scripts to extract (and duplicate) data, or managing infrastructure. Athena allows you to do the following:

  • Run on-demand analysis on data spread across multiple cloud providers and systems of record using a single tool and single SQL dialect
  • Visualize data in business intelligence applications that use Athena to perform complex, multi-source joins
  • Design self-service extract, transform, and load (ETL) pipelines and event-based data processing workflows with Athena’s integration with AWS Step Functions
  • Unify diverse data sources to produce rich input features for machine learning model training workflows
  • Develop user-facing data-as-a-product applications that surface insights across data mesh architectures
  • Support analytics use cases while your organization migrates on-premises sources to the AWS Cloud

Get started with Athena’s data source connectors

To get started with federated queries for Athena, on the Athena console, choose Data Sources in the navigation pane, choose a data source, and follow the guided setup experience to configure your connector. After the connection is established and the source is registered with Athena, you can query the data via the Athena console, API, AWS SDK, and compatible third-party applications. To learn more, see Using Amazon Athena Federated Query and Writing Federated Queries.

You can also share a data source connection with team members, allowing them to use their own AWS account to query the data without setting up a duplicate connector. To learn more, see Enabling Cross-Account Federated Queries.

Conclusion

We encourage you to evaluate Athena and federated queries on your next analytics project. For help getting started, we recommend the following resources:


About the Authors

Scott Rigney is a Senior Technical Product Manager with Amazon Web Services (AWS) and works with the Amazon Athena team based out of Arlington, Virginia. He is passionate about building analytics products that enable enterprises to make data-driven decisions.

Jean-Louis Castro-Malaspina is a Senior Product Marketing Manager with Amazon Web Services (AWS) based in Hershey, Pennsylvania. He enjoys highlighting how customers use Analytics and Amazon Athena to unlock innovation. Outside of work, Jean-Louis enjoys spending time with his wife and daughter, running, and following international soccer.

Suresh_90Suresh Akena is a Principal WW GTM Leader for Amazon Athena. He works with the startups, enterprise and strategic customers to provide leadership on large scale data strategies including migration to AWS platform, big data and analytics and ML initiatives and help them to optimize and improve time to market for data driven applications when using AWS.

Enhance analytics with Google Trends data using AWS Glue, Amazon Athena, and Amazon QuickSight

Post Syndicated from Drew Philip original https://aws.amazon.com/blogs/big-data/enhance-analytics-with-google-trends-data-using-aws-glue-amazon-athena-and-amazon-quicksight/

In today’s market, business success often lies in the ability to glean accurate insights and predictions from data. However, data scientists and analysts often find that the data they have at their disposal isn’t enough to help them make accurate predictions for their use cases. A variety of factors might alter an outcome and should be taken into account when making a prediction model. Google Trends is an available option, presenting a broad source of data that reflects global trends more comprehensively. This can help enrich a dataset to yield a better model.

You can use Google Trends data for a variety of analytical use cases. For example, you can use it to learn about how your products or brands are faring among targeted audiences. You can also use it to monitor competitors and see how well they’re performing against your brand.

In this post, we shows how to get Google Trends data programmatically, integrate it into a data pipeline, and use it to analyze data, using Amazon Simple Storage Service (Amazon S3), AWS Glue, Amazon Athena, and Amazon QuickSight. We use an example dataset of movies and TV shows and demonstrate how to get the search queries from Google Trends to analyze the popularity of movies and TV shows.

Solution overview

The following diagram shows a high-level architecture of the solution using Amazon S3, AWS Glue, the Google Trends API, Athena, and QuickSight.

The solution consists of the following components:

  1. Amazon S3 – The storage layer that stores the list of topics for which Google Trends data has to be gathered. It also stores the results returned by Google Trends.
  2. AWS Glue – The serverless data integration service that calls Google Trends for the list of topics to get the search results, aggregates the data, and loads it to Amazon S3.
  3. Athena – The query engine that allows you to query the data stored in Amazon S3. You can use it for supporting one-time SQL queries on Google Trends data and for building dashboards using tools like QuickSight.
  4. QuickSight – The reporting tool used for building visualizations.

In the following sections, we walk through the steps to set up the environment, download the libraries, create and run the AWS Glue job, and explore the data.

Set up your environment

Complete the following steps to set up your environment:

  1. Create an S3 bucket where you upload the list of movies and TV shows. For this post, we use a Netflix Movies and TV Shows public dataset from Kaggle.
  2. Create an AWS Identity and Access Management (IAM) service role that allows AWS Glue to read and write data to the S3 buckets you just created.
  3. Create a new QuickSight account with the admin/author role and access granted to Athena and Amazon S3.

Download the external libraries and dependencies for the AWS Glue Job

The AWS Glue job needs the following two external Python libraries: pytrends and awswrangler. pytrends is a library that provides a simple interface for automating the downloading of reports from Google Trends. awswrangler is a library provided by AWS to integrate data between a Pandas DataFrame and AWS repositories like Amazon S3.

Download the following .whl files for the libraries and upload them to Amazon S3:

Create and configure an AWS Glue job

To set up your AWS Glue job, complete the following steps:

  1. On the AWS Glue console, under ETL in the navigation pane, choose Jobs – New.
  2. For Create job, select Python Shell script editor.
  3. For Options, select Create a new script with boilerplate code.
  4. Choose Create.
  5. On the Script tab, enter the following script, replacing the source and target buckets with your bucket names:
    # Import external library TrendReq needed to connect to Google Trends API and library awswrangler to read/write from pandas to Amazon S3.
    
    from pytrends.request import TrendReq
    pytrend = TrendReq(hl='en-US', tz=360, timeout=10) 
    import pandas as pd
    import awswrangler as wr
    
    # Function get_gtrend, accepts a list of terms as input, calls Google Trends API for each term to get the search trends 
    def get_gtrend(terms):
      trends =[]
      for term in terms:
    # Normalizing the data using popular movie Titanic as baseline to get trends over time.
        pytrend.build_payload(kw_list=["Titanic",term.lower()])
        df = pytrend.interest_over_time()
        df["google_trend"] = round((df[term.lower()] /df['Titanic']) *100)
        
    # Transforming and filtering trends results to align with Analytics use case
        df_trend = df.loc[df.index >= "2018-1-1", "google_trend"].resample(rule="M").max().to_frame()
        df_trend["movie"] = term
        trends.append(df_trend.reset_index())
    
    # Last step in function to concatenate the results for each term and return an aggregated dataset 
      concat_df = pd.concat(trends)
      return concat_df
    
    def main():
      
    # Change the bucket and prefix name to Amazon S3 location where movie titles file from Kaggle has been downloaded. 
      source_bucket = "source_bucket"
      source_prefix = "source_prefix"
    
    # Awswrangler method s3.read_csv is called to load the titles from S3 location into a DataFrame and convert it to a list.
      df = wr.s3.read_csv(f's3://{source_bucket}/{source_prefix}/')
      movies = df['title'].head(20).values.tolist()
    
    #  Call the get_trends function and pass the list of movies as an input. Pandas dataframe is returned with trend data for movies.
      df = get_gtrend(terms=movies)
    
    # Change the prefix name to location where you want to store results. 
      target_bucket = "target_bucket" 
      target_prefix = "target_prefix" 
    
    # Use awswrangler to save pandas dataframe to Amazon S3. 
      wr.s3.to_csv(df,f's3://{target_bucket}/{target_prefix}/trends.csv',index= False)
    
    
    # Invoke the main function
    main()

  6. On the Job details tab, for Name, enter the name of the AWS Glue job.
  7. For IAM Role, choose the role that you created earlier with permissions to run the job and access Amazon S3.
  8. For Type, enter Python Shell to run the Python code.
  9. For Python Version, specify the Python version as Python 3.6.
  10. For Data processing units, choose 1 DPU.
  11. For Number of retries, enter .
  12. Expand Advanced properties and under Libraries, enter the location of the S3 bucket where the pytrends and awswrangler files were downloaded.
  13. Choose Save to save the job.

Run the AWS Glue job

Navigate to the AWS Glue console and run the AWS Glue job you created. When the job is complete, a CSV file with the Google Trends values is created in the target S3 bucket with the prefix specified in the main() function. In the next step, we create an AWS Glue table referring to the target bucket and prefix to allow queries to be run against the Google Trends data.

Create an AWS Glue table on the Google Trends data

In this step, we create a table in the AWS Glue Data Catalog using Athena. The table is created on top of the Google Trends data saved in the target S3 bucket.

In the Athena query editor, select default as the database and enter the following DDL command to create a table named trends. Replace the target bucket and prefix with your own values.

CREATE EXTERNAL TABLE `trends`(
  `date` date, 
  `google_trend` double, 
  `title` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<< target_bucket >>/<<target_prefix >>/'
TBLPROPERTIES (
  'has_encrypted_data'='false', 
  'skip.header.line.count'='1')

This table has three columns:

  • date – The time dimension for aggregating the data. In this example, the time period is monthly.
  • google_trend – The count of Google Trends values normalized on a scale of 0–100.
  • title – The name of the movie or TV show.

Query the data using Athena

Now you can run one-time queries to find the popularity of movies and TV shows.

In the first example, we find the top 10 most popular movies and TV shows for November 2021. In the Athena query editor, enter the following SQL command to query the trends table created in the previous step:

select title,google_trend
from trends 
where date = date_parse('2021-11-30','%Y-%m-%d')
order by google_trend desc
limit 10

In the following example, we find the top 10 most popular movies and TV shows that have grown most in popularity in 2021 until November 30. In the Athena query editor, enter the following SQL command to query the trends table:

select  title,max(google_trend)-min(google_trend) trend_diff
from trends
where date between date_parse('2021-01-31','%Y-%m-%d') and date_parse('2021-11-30','%Y-%m-%d')
group by title
order by 2 desc
limit 10

Build a dashboard to visualize the data using QuickSight

We can use QuickSight to build a dashboard on the data downloaded from Google Trends to identify top movies and TV shows. Complete the following steps:

  1. Sign in to your QuickSight account.
  2. On the QuickSight console, choose Datasets and choose New dataset.
  3. Choose Athena as your data source.
  4. For Data source name, enter a name.
  5. For Athena workgroup, choose [primary].
  6. Choose Create data source.
  7. For Database, choose default.
  8. For Tables, select the trends table.
  9. Choose Select.
  10. Select Directly query your data.
  11. Choose Visualize.

For the first visual, we create a bar chart of the top movies or TV shows by title sorted in ascending order of aggregated Google Trends values.

  1. Choose the horizontal bar chart visual type.
  2. For Y axis, choose title.
  3. For Value, choose google_trend (Average).

Next, we create a time series plot of Google Trends count by month for titles.

  1. Add a new visual and choose the autograph visual type.
  2. For X axis, choose date.
  3. For Value, choose google_trend (Sum).
  4. For Color¸ choose title.

Clean up

To avoid incurring future charges, delete the resources you created for AWS Glue, Amazon S3, IAM, and QuickSight.

  1. AWS Glue Catalog table
    • On the AWS Glue console, choose Tables under Databases in the navigation pane.
    • Select the AWS Glue Data Catalog table that you created.
    • On the Actions drop-down menu, choose Delete.
    • Choose Delete to confirm.
  2. AWS Glue Job
    • Choose Jobs in the navigation pane.
    • Select the AWS Glue job you created.
    • On the Actions drop-down menu, choose Delete.
  3. S3 bucket
    • On the Amazon S3 console, choose Buckets in navigation pane.
    • Choose the bucket you created.
    • Choose Empty and enter your bucket name.
    • Choose Confirm.
    • Choose Delete and enter your bucket name.
    • Choose Delete bucket.
  4. IAM Role
    • On the IAM console, choose Roles in navigation pane.
    • Choose the role you attached to AWS Glue job.
    • Choose Delete role.
    • Choose Yes.
  5. Amazon QuickSight
    • If you created a QuickSight user for trying out this blog and do not want to retain that access, please ask your QuickSight admin to delete your user.
    • If you created the QuickSight account itself just for trying this blog and no longer want to retain it, use following steps to delete it.
    • Choose your user name on the application bar, and then choose Manage QuickSight
    • Choose Account settings.
    • Choose Delete Account.

You can only have one QuickSight account active for each AWS account. Make sure that other users aren’t using QuickSight before you delete the account.

Conclusion

Integrating external data sources such as Google Trends via AWS Glue, Athena, and QuickSight can help you enrich your datasets to yield greater insights. You can use it in a data science context when the model is under-fit and requires more relevant data in order to make better predictions. In this post, we used movies as an example, but the solution extends to a wide breadth of industries, such as products in a retail context or commodities in a finance context. If the simple inventory histories or the transaction dates are available, you may find little correlation to future demand or prices. But with an integrated data pipeline using external data, new relationships in the dataset make the model more reliable.

In a business context, whether your team wants to test out a machine learning (ML) proof of concept more quickly or have limited access to pertinent data, Google Trends integration is a relatively quick way to enrich your data for the purposes of ML and data insights.

You can also extend this concept to other third-party datasets, such as social media sentiment, as your team’s expertise grows and your ML and analytics operations mature. Integrating external datasets such as Google Trends is just one part of the feature and data engineering process, but it’s a great place to start and, in our experience, most often leads to better models that businesses can innovate from.


About the Authors

Drew Philip is a Sr. Solutions Architect with AWS Private Equity. He has held senior
technical leadership positions within key AWS partners such as Microsoft, Oracle, and
Rackspace. Drew focuses on applied engineering that leverages AI-enabled digital innovation and development, application modernization, resiliency and operational excellence for workloads at scale in the public and private sector. He sits on the board of Calvin University’s computer science department and is a contributing member of the AWS Machine Learning Technical Focus Community.

Gautam Prothia is a Senior Solution Architect within AWS dedicated to Strategic Accounts. Gautam has more than 15+ years of experience designing and implementing large-scale data management and analytical solutions. He has worked with many clients across industries to help them modernize their data platforms on the cloud.

Simon Zamarin is an AI/ML Solutions Architect whose main focus is helping customers extract value from their data assets. In his spare time, Simon enjoys spending time with family, reading sci-fi, and working on various DIY house projects.

How Net at Work built an email threat report system on AWS

Post Syndicated from Florian Mair original https://aws.amazon.com/blogs/architecture/how-net-at-work-built-an-email-threat-report-system-on-aws/

Emails are often used as an entry point for malicious software like trojan horses, rootkits, or encryption-based ransomware. The NoSpamProxy offering developed by Net at Work tackles this threat, providing secure and confidential email communication.

A subservice of NoSpamProxy called 32guards is responsible for threat reports of inbound and outbound emails. With the increasing number of NoSpamProxy customers, 32guards was found to have several limitations. 32guards was previously built on a relational database. But with the growth in traffic, this database was not able to keep up with storage demands and expected query performance. Further, the relational database schema was limiting the possibilities of complex pattern detections, due to performance limitations. The NoSpamProxy team decided to rearchitect the service based on the Lake House approach.

The goal was to move away from a one-size-fits-all approach for data analytics and integrate a data lake with purpose-built data stores, unified governance, and smooth data movement.

This post shows how Net at Work modernized their 32guards service, from a relational database to a fully serverless analytics solution. With adoption of the Well-Architected Analytics Lens best practices and the use of fully managed services, the 32guards team was able to build a production-ready application within six weeks.

Architecture for email threat reports and analytics

This section gives a walkthrough of the solution’s architecture, as illustrated in Figure 1.

Figure 1. 32guards threat reports architecture

Figure 1. 32guards threat reports architecture

1. The entry point is an Amazon API Gateway, which receives email metadata in JSON format from the NoSpamProxy fleet. The message contains information about the email in general, email attachments, and URLs in the email. As an example, a subset of the data is presented in JSON as follows:

{
  ...
  "Attachments": [
    {
      "Sha256Hash": "69FB43BD7CCFD79E162B638596402AD1144DD5D762DEC7433111FC88EDD483FE",
      "Classification": 0,
      "Filename": "test.ods.tar.gz",
      "DetectedMimeType": "application/tar+gzip",
      "Size": 5895
    }
  ],
  "Urls": [
    {
      "Url": "http://www.aarhhie.work/",
      "Classification": 0,
    },        {
      "Url": "http://www.netatwork.de/",
      "Classification": 0,
    },
    {
      "Url": "http://aws.amazon.com/",
      "Classification": 0,
    }
  ]
}

2. This JSON message is forwarded to an AWS Lambda function (called “frontend”), which takes care of the further downstream processing. There are two activities the Lambda function initiates:

  • Forwarding the record for real-time analysis/storage
  • Generating a threat report based on the information derived from the data stored in the indicators of compromises (IOCs) Amazon DynamoDB table

IOCs are patterns within the email metadata that are used to determine if emails are safe or not. For example, this could be for a suspicious file attachment or domain.

Threat report for suspicious emails

In the preceding JSON message, the attachments and URLs have been classified with “0” by the email service itself, which indicates that none of them look suspicious. The frontend Lambda function uses the vast number of IOCs stored in the DynamoDB table and heuristics to determine any potential threats within the email. The use of DynamoDB enables fast lookup times to generate a threat report. For the example, the response to the API Gateway in step 2 looks like this:

{
  "ReportedOnUtc": "2021-10-14T14:33:34.5070945Z",
  "Reason": "realtimeSuspiciousOrganisationalDomain",
  "Identifier": "aarhhie.work",
  ...
}

This threat report shows that the top-level domain “aarhiie.work” has been detected as suspicious. The report is used to determine further actions for the email, such as blocking.

Real-time data processing

3. In the real-time analytics flow, the frontend Lambda function ingests email metadata into a data stream using Amazon Kinesis Data Streams. This is a massively scalable, serverless, and durable real-time data streaming service. Compared to a queue, streaming storage permits more than one consumer of the same data.

4. The first consumer is an Apache Flink application running in Amazon Kinesis Data Analytics. This application generates statistical metrics (for example, occurrences of the top-level domain “.work”). The output is stored in Apache Parquet format on Amazon S3. Parquet is a columnar storage format for row-based files like csv.

The second consumer of the streaming data is Amazon Kinesis Data Firehose. Kinesis Data Firehose is a fully managed solution to reliably load streaming data into data lakes, data stores, and analytics services. Within the 32guards service, Kinesis Data Firehose is used to store all email metadata into Amazon S3. The data is stored in Apache Parquet format, which makes queries more time and cost efficient.

IOC detection

Now that we have shown how data is ingested and threat reports are generated to respond quickly to requests, let’s look at how the IOCs are updated. These IOCs are used for generating the threat report within the “frontend” Lambda function. As attack vectors are changing over time, quickly analyzing the data for new threats, is crucial to provide high-quality reports to the NoSpamProxy service.

The incoming email metadata is stored every few minutes in Amazon S3 by Kinesis Data Firehose. To query data directly in Amazon S3, Amazon Athena is used. Athena is a serverless query service that analyzes data stored in Amazon S3, by using standard SQL syntax.

5. To be able to query data in S3, Amazon Athena uses the AWS Glue Data Catalog, which contains the structure of the email metadata stored in the data lake. The data structure is derived from the data itself using AWS Glue Crawlers. Other external downstream processing services like business intelligence applications, also use Amazon Athena to consume the data.

6. Athena queries are initiated on a predefined schedule to update or generate new IOCs. The results of these queries are stored in the DynamoDB table to enable fast lookup times for the “frontend” Lambda.

Conclusion

In this blog post, we showed how Net at Work modernized their 32guards service within their NoSpamProxy product. The previous architecture used a relational database to ingest and store email metadata. This database was running into performance and storage issues, and must be redesigned into a more performant and scalable architecture.

Amazon S3 is used as the storage layer, which can scale up to exabytes of data. With Amazon Athena as the query engine, there is no need to operate a high-performance database cluster, as compute and storage is separated. By using Amazon Kinesis Data Streams and Amazon Kinesis Data Analytics, valuable insight can be generated in real time, and acted upon more quickly.

As a serverless, fully managed solution, the 32guards service has a lower-cost footprint of as much as 50% and requires less maintenance. By moving away from a relational database model, the query runtimes decrease significantly. You can now conduct analyses that have not been feasible before.

Interested in the NoSpamProxy? Read more about NoSpamProxy or sign up for a free trial.

Looking for more architecture content? AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

Insights for CTOs: Part 3 – Growing your business with modern data capabilities

Post Syndicated from Syed Jaffry original https://aws.amazon.com/blogs/architecture/insights-for-ctos-part-3-growing-your-business-with-modern-data-capabilities/

This post was co-wrtiten with Jonathan Hwang, head of Foundation Data Analytics at Zendesk.


In my role as a Senior Solutions Architect, I have spoken to chief technology officers (CTOs) and executive leadership of large enterprises like big banks, software as a service (SaaS) businesses, mid-sized enterprises, and startups.

In this 6-part series, I share insights gained from various CTOs and engineering leaders during their cloud adoption journeys at their respective organizations. I have taken these lessons and summarized architecture best practices to help you build and operate applications successfully in the cloud. This series also covers building and operating cloud applications, security, cloud financial management, modern data and artificial intelligence (AI), cloud operating models, and strategies for cloud migration.

In Part 3, I’ve collaborated with the head of Foundation Analytics at Zendesk, Jonathan Hwang, to show how Zendesk incrementally scaled their data and analytics capabilities to effectively use the insights they collect from customer interactions. Read how Zendesk built a modern data architecture using Amazon Simple Storage Service (Amazon S3) for storage, Apache Hudi for row-level data processing, and AWS Lake Formation for fine-grained access control.

Why Zendesk needed to build and scale their data platform

Zendesk is a customer service platform that connects over 100,000 brands with hundreds of millions of customers via telephone, chat, email, messaging, social channels, communities, review sites, and help centers. They use data from these channels to make informed business decisions and create new and updated products.

In 2014, Zendesk’s data team built the first version of their big data platform in their own data center using Apache Hadoop for incubating their machine learning (ML) initiative. With that, they launched Answer Bot and Zendesk Benchmark report. These products were so successful they soon overwhelmed the limited compute resources available in the data center. By the end of 2017, it was clear Zendesk needed to move to the cloud to modernize and scale their data capabilities.

Incrementally modernizing data capabilities

Zendesk built and scaled their workload to use data lakes on AWS, but soon encountered new architecture challenges:

  • The General Data Protection Regulation (GDPR) “right to be forgotten” rule made it difficult and costly to maintain data lakes, because deleting a small piece of data required reprocessing large datasets.
  • Security and governance was harder to manage when data lake scaled to a larger number of users.

The following sections show you how Zendesk is addressing GDPR rules by evolving from plain Apache Parquet files on Amazon S3 to Hudi datasets on Amazon S3 to enable row level inserts/updates/deletes. To address security and governance, Zendesk is migrating to AWS Lake Formation centralized security for fine-grained access control at scale.

Zendesk’s data platform

Figure 1 shows Zendesk’s current data platform. It consists of three data pipelines: “Data Hub,” “Data Lake,” and “Self Service.”

Zendesk data pipelines

Figure 1. Zendesk data pipelines

Data Lake pipelines

The Data Lake and Data Hub pipelines cover the entire lifecycle of the data from ingestion to consumption.

The Data Lake pipelines consolidate the data from Zendesk’s highly distributed databases into a data lake for analysis.

Zendesk uses Amazon Database Migration Service (AWS DMS) for change data capture (CDC) from over 1,800 Amazon Aurora MySQL databases in eight AWS Regions. It detects transaction changes and applies them to the data lake using Amazon EMR and Hudi.

Zendesk ticket data consists of over 10 billion events and petabytes of data. The data lake files in Amazon S3 are transformed and stored in Apache Hudi format and registered on the AWS Glue catalog to be available as data lake tables for analytics querying and consumption via Amazon Athena.

Data Hub pipelines

The Data Hub pipelines focus on real-time events and streaming analytics use cases with Apache Kafka. Any application at Zendesk can publish events to a global Kafka message bus. Apache Flink ingests these events into Amazon S3.

The Data Hub provides high-quality business data that is highly available and scalable.

Self-managed pipeline

The self-managed pipelines empower product engineering teams to use the data lake for those use cases that don’t fit into our standard integration patterns. All internal Zendesk product engineering teams can use standard tools such as Amazon EMR, Amazon S3, Athena, and AWS Glue to publish their own analytics dataset and share them with other teams.

A notable example of this is Zendesk’s fraud detection engineering team. They publish their fraud detection data and findings through our self-manage data lake platform and use Amazon QuickSight for visualization.

You need fine-grained security and compliance

Data lakes can accelerate growth through faster decision making and product innovation. However, they can also bring new security and compliance challenges:

  • Visibility and auditability. Who has access to what data? What level of access do people have and how/when and who is accessing it?
  • Fine-grained access control. How do you define and enforce least privilege access to subsets of data at scale without creating bottlenecks or key person/team dependencies?

Lake Formation helps address these concerns by auditing data access and offering row- and column-level security and a delegated access control model to create data stewards for self-managed security and governance.

Zendesk used Lake Formation to build a fine-grained access control model that uses row-level security. It detects personally identifiable information (PII) while scaling the data lake for self-managed consumption.

Some Zendesk customers opt out of having their data included in ML or market research. Zendesk uses Lake Formation to apply row-level security to filter out records associated with a list of customer accounts who have opted out of queries. They also help data lake users understand which data lake tables contain PII by automatically detecting and tagging columns in the data catalog using AWS Glue’s PII detection algorithm.

The value of real-time data processing

When you process and consume data closer to the time of its creation, you can make faster decisions. Streaming analytics design patterns, implemented using services like Amazon Managed Streaming for Apache Kafka (Amazon MSK) or Amazon Kinesis, create an enterprise event bus to exchange data between heterogeneous applications in near real time.

For example, it is common to use streaming to augment the traditional database CDC ingestion into the data lake with additional streaming ingestion of application events. CDC is a common data ingestion pattern, but the information can be too low level. This requires application context to be reconstructed in the data lake and business logic to be duplicated in two places, inside the application and in the data lake processing layer. This creates a risk of semantic misrepresentation of the application context.

Zendesk faced this challenge with their CDC data lake ingestion from their Aurora clusters. They created an enterprise event bus built with Apache Kafka to augment their CDC with higher-level application domain events to be exchanged directly between heterogeneous applications.

Zendesk’s streaming architecture

A CDC database ticket table schema can sometimes contain unnecessary and complex attributes that are application specific and do not capture the domain model of the ticket. This makes it hard for downstream consumers to understand and use the data. A ticket domain object may span several database tables when modeled in third normal form, which makes querying for analysts difficult downstream. This is also a brittle integration method because downstream data consumers can easily be impacted when the application logic changes, which makes it hard to derive a common data view.

To move towards event-based communication between microservices, Zendesk created the Platform Data Architecture (PDA) project, which uses a standard object model to represent a higher level, semantic view of their application data. Standard objects are domain objects designed for cross-domain communication and do not suffer from the lower level fragmented scope of database CDC. Ultimately, Zendesk aims to transition their data architecture from a collection of isolated products and data silos into a cohesive unified data platform.

An application view of Zendesk’s streaming architecture

Figure 2. An application view of Zendesk’s streaming architecture

Figure 3 shows how all Zendesk products and users integrate through common standard objects and standard events within the Data Hub. Applications publish and consume standard objects and events to/from the event bus.

For example, a complete ticket standard object will be published to the message bus whenever it is created, updated, or changed. On the consumption side, these events get used by product teams to enable platform capabilities such as search, data export, analytics, and reporting dashboards.

Summary

As Zendesk’s business grew, their data lake evolved from simple Parquet files on Amazon S3 to a modern Hudi-based incrementally updateable data lake. Now, their original coarse-grained IAM security policies use fine-grained access control with Lake Formation.

We have repeatedly seen this incremental architecture evolution achieve success because it reduces the business risk associated with the change and provides sufficient time for your team to learn and evaluate cloud operations and managed services.

Looking for more architecture content? AWS Architecture Center provides reference architecture diagrams, vetted architecture solutions, Well-Architected best practices, patterns, icons, and more!

Other posts in this series