Tag Archives: AWS Big Data

Run usage analytics on Amazon QuickSight using AWS CloudTrail

Post Syndicated from Sunil Salunkhe original https://aws.amazon.com/blogs/big-data/run-usage-analytics-on-amazon-quicksight-using-aws-cloudtrail/

Amazon QuickSight is a cloud-native BI service that allows end users to create and publish dashboards in minutes, without provisioning any servers or requiring complex licensing. You can view these dashboards on the QuickSight product console or embed them into applications and websites. After you deploy a dashboard, it’s important to assess how they and other assets are being adopted, accessed, and used across various departments or customers.

In this post, we use a QuickSight dashboard to present the following insights:

  • Most viewed and accessed dashboards
  • Most updated dashboards and analyses
  • Most popular datasets
  • Active users vs. idle users
  • Idle authors
  • Unused datasets (wasted SPICE capacity)

You can use these insights to reduce costs and create operational efficiencies in a deployment. The following diagram illustrates this architecture.

The following diagram illustrates this architecture.

Solution components

The following table summarizes the AWS services and resources that this solution uses.

Resource TypeNamePurpose
AWS CloudTrail logsCloudTrailMultiAccountCapture all API calls for all AWS services across all AWS Regions for this account. You can use AWS Organizations to consolidate trails across multiple AWS accounts.
AWS Glue crawler

QSCloudTrailLogsCrawler

QSProcessedDataCrawler

Ensures that all CloudTrail data is crawled periodically and that partitions are updated in the AWS Glue Data Catalog.
AWS Glue ETL jobQuickSightCloudTrailProcessingReads catalogued data from the crawler, processes, transforms, and stores it in an S3 output bucket.
AWS Lambda functionExtractQSMetadata_funcExtracts event data using the AWS SDK for Python, Boto3. The event data is enriched with QuickSight metadata objects like user, analysis, datasets, and dashboards.
Amazon Simple Storage Service (s3)

CloudTrailLogsBucket

QuickSight-BIonBI-processed

One bucket stores CloudTrail data. The other stores processed data.
Amazon QuickSightQuicksight_BI_On_BO_AnalysisVisualizes the processed data.

 Solution walkthrough

AWS CloudTrail is a service that enables governance, compliance, operational auditing, and risk auditing of your AWS account. You can use CloudTrail to log, continuously monitor, and retain account activity related to actions across your AWS infrastructure. You can define a trail to collect API actions across all AWS Regions. Although we have enabled a trail for all Regions in our solution, the dashboard shows the data for single Region only.

After you enable CloudTrail, it starts capturing all API actions and then, at 15-minute intervals, delivers logs in JSON format to a configured Amazon Simple Storage Service (Amazon S3) bucket. Before the logs are made available to our ad hoc query engine, Amazon Athena, they must be parsed, transformed, and processed by the AWS Glue crawler and ETL job.

Before the logs are made available to our ad hoc query engine

This will be handled by AWS Glue Crawler & AWS Glue ETL Job. The AWS Glue crawler crawls through the data every day and populates new partitions in the Data Catalog. The data is later made available as a table on the Athena console for processing by the AWS Glue ETL job. Glue ETL Job QuickSightCloudtrail_GlueJob.txt filters logs and processes only those events where the event source is QuickSight. (for example, eventSource = quicksight.amazonaws.com’).

  This will be handled by AWS Glue Crawler & AWS Glue ETL Job.

The following screenshot shows the sample JSON for the QuickSight API calls.

The following screenshot shows the sample JSON for the QuickSight API calls.

The job processes those events and creates a Parquet file. The following table summarizes the file’s data points.

Quicksightlogs
Field NameData Type
eventtimeDatetime
eventnameString
awsregionString
accountidString
usernameString
analysisnameString
DateDate

The processed data is stored in an S3 folder at s3://<BucketName>/processedlogs/. For performance optimization during querying and connecting this data to QuickSight for visualization, these logs are partitioned by date field. For this reason, we recommend that you configure the AWS Glue crawler to detect the new data and partitions and update the Data Catalog for subsequent analysis. We have configured the crawler to run one time a day.

We need to enrich this log data with metadata from QuickSight, such as a list of analyses, users, and datasets. This metadata can be extracted using descibe_analysis, describe_user, describe_data_set in the AWS SDK for Python.

We provide an AWS Lambda function that is ideal for this extraction. We configured it to be triggered once a day through Amazon EventBridge. The extracted metadata is stored in the S3 folder at s3://<BucketName>/metadata/.

Now that we have processed logs and metadata for enrichment, we need to prepare the data visualization in QuickSight. Athena allows us to build views that can be imported into QuickSight as datasets.

We build the following views based on the tables populated by the Lambda function and the ETL job:

CREATE VIEW vw_quicksight_bionbi 
AS 
  SELECT Date_parse(eventtime, '%Y-%m-%dT%H:%i:%SZ') AS "Event Time", 
         eventname  AS "Event Name", 
         awsregion  AS "AWS Region", 
         accountid  AS "Account ID", 
         username   AS "User Name", 
         analysisname AS "Analysis Name", 
         dashboardname AS "Dashboard Name", 
         Date_parse(date, '%Y%m%d') AS "Event Date" 
  FROM   "quicksightbionbi"."quicksightoutput_aggregatedoutput" 

CREATE VIEW vw_users 
AS 
  SELECT usr.username "User Name", 
         usr.role     AS "Role", 
         usr.active   AS "Active" 
  FROM   (quicksightbionbi.users 
          CROSS JOIN Unnest("users") t (usr)) 

CREATE VIEW vw_analysis 
AS 
  SELECT aly.analysisname "Analysis Name", 
         aly.analysisid   AS "Analysis ID" 
  FROM   (quicksightbionbi.analysis 
          CROSS JOIN Unnest("analysis") t (aly)) 

CREATE VIEW vw_analysisdatasets 
AS 
  SELECT alyds.analysesname "Analysis Name", 
         alyds.analysisid   AS "Analysis ID", 
         alyds.datasetid    AS "Dataset ID", 
         alyds.datasetname  AS "Dataset Name" 
  FROM   (quicksightbionbi.analysisdatasets 
          CROSS JOIN Unnest("analysisdatasets") t (alyds)) 

CREATE VIEW vw_datasets 
AS 
  SELECT ds.datasetname AS "Dataset Name", 
         ds.importmode  AS "Import Mode" 
  FROM   (quicksightbionbi.datasets 
          CROSS JOIN Unnest("datasets") t (ds))

QuickSight visualization

Follow these steps to connect the prepared data with QuickSight and start building the BI visualization.

  1. Sign in to the AWS Management Console and open the QuickSight console.

You can set up QuickSight access for end users through SSO providers such as AWS Single Sign-On (AWS SSO), Okta, Ping, and Azure AD so they don’t need to open the console.

You can set up QuickSight access for end users through SSO providers

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset to create a dataset for our analysis.

Choose New dataset to create a dataset for our analysis.

  1. For Create a Data Set, choose Athena.

In the previous steps, we prepared all our data in the form of Athena views.

  1. Configure permission for QuickSight to access AWS services, including Athena and its S3 buckets. For information, see Accessing Data Sources.

Configure permission for QuickSight to access AWS services,

  1. For Data source name, enter QuickSightBIbBI.
  2. Choose Create data source.

Choose Create data source.

  1. On Choose your table, for Database, choose quicksightbionbi.
  2. For Tables, select vw_quicksight_bionbi.
  3. Choose Select.

Choose Select.

  1. For Finish data set creation, there are two options to choose from:
    1. Import to SPICE for quicker analytics – Built from the ground up for the cloud, SPICE uses a combination of columnar storage, in-memory technologies enabled through the latest hardware innovations, and machine code generation to run interactive queries on large datasets and get rapid responses. We use this option for this post.
    2. Directly query your data – You can connect to the data source in real time, but if the data query is expected to bring bulky results, this option might slow down the dashboard refresh.
  2. Choose Visualize to complete the data source creation process.

Choose Visualize to complete the data source creation process.

Now you can build your visualizations sheets. QuickSight refreshes the data source first. You can also schedule a periodic refresh of your data source.

Now you can build your visualizations sheets.

The following screenshot shows some examples of visualizations we built from the data source.

The following screenshot shows some examples of visualizations we built from the data source.

 

This dashboard presents us with two main areas for cost optimization:

  • Usage analysis – We can see how analyses and dashboards are being consumed by users. This area highlights the opportunity for cost saving by looking at datasets that have not been used for the last 90 days in any of the analysis but are still holding a major chunk of SPICE capacity.
  • Account governance – Because author subscriptions are charged on a fixed fee basis, it’s important to monitor if they are actively used. The dashboard helps us identify idle authors for the last 60 days.

Based on the information in the dashboard, we could do the following to save costs:

Conclusion

In this post, we showed how you can use CloudTrail logs to review the use of QuickSight objects, including analysis, dashboards, datasets, and users. You can use the information available in dashboards to save money on storage, subscriptions, understand maturity of QuickSight Tool adoption and more.


About the Author

Sunil SalunkheSunil Salunkhe is a Senior Solution Architect working with Strategic Accounts on their vision to leverage the cloud to drive aggressive growth strategies. He practices customer obsession by solving their complex challenges in all the aspects of the cloud journey including scale, security and reliability. While not working, he enjoys playing cricket and go cycling with his wife and a son.

Retaining data streams up to one year with Amazon Kinesis Data Streams

Post Syndicated from Nihar Sheth original https://aws.amazon.com/blogs/big-data/retaining-data-streams-up-to-one-year-with-amazon-kinesis-data-streams/

Streaming data is used extensively for use cases like sharing data between applications, streaming ETL (extract, transform, and load), real-time analytics, processing data from internet of things (IoT) devices, application monitoring, fraud detection, live leaderboards, and more. Typically, data streams are stored for short durations of time before being loaded into a permanent data store like a data lake or analytics service.

Additional use cases are becoming more prevalent that may require you retain data in streams for longer periods of time. For example, compliance programs like HIPAA and FedRAMP may require you to store raw data for more than a few days or weeks, or you may want to backtest machine learning (ML) algorithms with historical data that may be several months old.

A challenge arises when you want to process historical data and newly arriving data streams. This requires complex logic to access your data lake and your data stream store, or two sets of code—one to process data from your data lake and one to process your new data streams.

Amazon Kinesis Data Streams solves this challenge by storing your data streams up to 1 year with long-term retention. You can use the same Kinesis Data Streams code base to process both historical and newly arriving data streams, and continue to use features like enhanced fan-out to read large data volumes at very high throughput.

In this post, we describe how long-term retention enables new use cases by bridging real-time and historical data processing. We also demonstrate how you can reduce the time to retrieve 30 days of data from a data stream by an order of magnitude using Kinesis Data Streams enhanced fan-out.

Simple setup, no resource provisioning

Kinesis Data Streams durably stores all data stream records in a shard, an append-only log ordered by arrival time. The time period from when a record is added to when it’s no longer accessible is called the retention period. A Kinesis data stream stores records for 24 hours by default, up to 365 days (8,760 hours). Applications can start reading data at any point in the retention period in the exact order in which the data stream is stored. Shards enable these applications to process data in parallel and at low-latency.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

The default retention period is 24 hours and covers scenarios where intermittent lags in processing need to catch up with the real-time data. You can extend retention up to 7 days to reprocess slightly aged data to resolve potential downstream data losses. You can also use long-term retention to store data for more than 7 days and up to 365 days to reprocess historical data for use cases like algorithm backtesting, data store backfills, and auditing. For more information, see Changing the Data Retention Period.

Similarly, you can use the following AWS Command Line Interface (AWS CLI) command to set the retention period in hours (the following code sets it to 9 days, or 216 hours):

aws kinesis increase-stream-retention-period \
    – stream-name samplestream \
    – retention-period-hours 216

Read new and historical data, no code changes necessary

All the data captured in the stream is stored in a durable, encrypted, and secure manner for the specified retention period up to a maximum of 1 year. You can store any amount of data, retrieve it by specifying a start position, and read sequentially using the familiar getRecords and SubscribeToShard APIs. The start position can be the sequence number of a data record in a shard or a timestamp. This enables you to use the same code to process older data. You can set up multiple consuming applications to start processing data at different points in the data stream.

Speed up data reads using enhanced fan-out consumers

Kinesis Data Streams provides two types of models to consume data: shared throughput consumer and enhanced fan-out (EFO) consumer. In the shared throughput consumer model, all the consuming applications share 2 MB/s per shard read throughput and a 5 transactions per second (TPS) quota. In the enhanced fan-out model, each consumer gets a dedicated read throughput of 2MB/s per shard. Because it uses an HTTP/2 data retrieval API, there is no longer a limit of 5 TPS. You can attach up to 20 EFO consumers to a single stream and read data at a total rate of 40MB/s per shard. Because each consumer gets dedicated read throughput, processing one doesn’t impact another. So you can attach new consumers to process old data without worrying about the performance of the existing consumer processing real-time data. For example, you can retrain an ML model in an ad hoc fashion without impacting real-time workflows.

You can add and remove EFO consumers at any time and avoid paying for over-provisioned resources. For example, when backtesting, you can register EFO consumers before the test and remove them after completion. You’re only charged for resources used during the test. Also, you can use EFO consumers to accelerate the speed of processing. Each consuming application can process different parts of streams across the retention period to process all the data in parallel, thereby dramatically reducing the total processing time.

Clickstream pipeline use case

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

This pipeline takes clickstream data and creates an alert every time a user leaves your ecommerce site without purchasing the items in their cart. A simple pipeline like this is a great way to start with stream processing, but soon you may want to implement a recommendation system based on user activity on your website and mobile app. To do this, you need to gather historical data in your existing data stream and send it to Amazon Simple Storage Service (Amazon S3) so it can be used for training a recommendation ML model. This scenario illustrates a key benefit of enabling long-term retention: it gives you the flexibility to “go back in time” and replay the existing data in your stream to generate new analytics that you may not have considered when you initially set up the streaming pipeline.

Let’s say you enabled 30 days of retention on your Kinesis data stream. After you train your ML model, you can set up a new streaming pipeline that generates recommendations by calling an inference endpoint hosted on Amazon SageMaker based on the trained ML model. The following diagram illustrates the final state of this architecture.

The following diagram illustrates the final state of this architecture.

You can efficiently and quickly consume the existing data in the stream and write it to Amazon S3 so it can be used for training your ML model. The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

You may wonder, why read from Kinesis Data Streams and write to Amazon S3? Why not write to Amazon S3 directly without enabling long-term retention? First, ingesting into Kinesis Data Streams with long-term retention enabled gives you the flexibility to generate additional streaming analytics as time passes. Second, this gives you the flexibility to filter and transform the data being read from Kinesis Data Streams before generating analytics or writing to Amazon S3. Lastly, you can use this approach to render analytics onto other systems besides Amazon S3, such as Amazon Elasticsearch Service (Amazon ES) using the Elasticsearch sink for Apache Flink.

Keep in mind that we only use this pipeline to bootstrap our second, long-lived pipeline that does recommendations, but this is an important step and we need a way to do this efficiently. Although there are multiple options for consuming data from Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink provides an elegant way to attach multiple EFO consumers in the same consuming application.

You can find more information at the official Apache Flink website, and about Kinesis Data Analytics for Apache Flink in the Kinesis Data Analytics developer guide. Apache Flink has a number of connectors, like the recently released FlinkKinesisConsumer, which supports enhanced fan-out for consuming from Kinesis Data Streams, or the Streaming File Sink to write to Amazon S3 from your Apache Flink application.

Accelerating data consumption

For the sake of simplicity, let’s use just one shard in our data stream, ingest data at the maximum rate of 1MB/s, and specify a retention period of 30 days. To bootstrap our new analytics, reading the full amount of data over 30 days with one EFO consumer at 2MB/s could potentially take up to 15 days to load this data into Amazon S3. However, you can accelerate this to 20 times faster using 20 EFO consumers at the same time, each reading from different points in the stream at 2 MB/s. The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

This gives us a total of 40MB/s in consumption capacity as opposed to 2MB/s per shard with just one EFO consumer, reducing the overall time by 95%. In most use cases, this combination of Kinesis Data Analytics and EFO allows you to process 30 days of data in hours, instead of days.

A point of clarification regarding our approach: When all 20 consumers are finished reading past their respective endpoints in the stream, we stop the Apache Flink application. You can do this by raising an exception when all 20 consumers finish reading their respective time slices—effectively stopping the application. The following diagram illustrates the time savings we get from using 20 EFO consumers.

The following diagram illustrates the time savings we get from using 20 EFO consumers.

For more information about implementing this approach, see the GitHub repo.

Pricing

An additional cost is associated with long-term retention (from 7–365 days) and EFO consumers. For more information, see Amazon Kinesis Data Streams pricing. Because you can register EFO consumers on demand, you pay only for the limited time you used all 20 consumers to load data, resulting in faster loads. It’s important to point out that you pay roughly the same amount to consume a fixed volume of data from the stream with 20 EFO consumers as you do with 1 EFO consumer because of the shorter duration required when using 20 consumers. 

Summary

In this post, we discussed long-term retention use cases of Kinesis Data Streams, how to increase the retention of a data stream, and related feature enhancements with Kinesis Data Streams APIs and KCL. We took a deep dive into the Apache Flink-based enhanced-fan out consumer approach to replay long-term data quickly. We shared open-source code based on this approach so you can easily implement your use cases using Kinesis Data Streams long-term retention. 

You should use long-term retention if you’re planning to develop ML systems, generate customer behavior insights, or have compliance requirements for retaining raw data for more than 7 days. We would love to hear about your use cases with the long-term retention feature. Please submit your feedback to [email protected].


About the Authors

Nihar ShethNihar Sheth is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enables customers to achieve their business goals. Outside of work, he is focusing on hiking 200 miles of beautiful PNW trails with his son in 2021.

 

 

Karthi Thyagarajan is a Solutions Architect on the Amazon Kinesis Team focusing on all things streaming and he enjoys helping customers tackle distributed systems challenges.

 

 

 

 

Sai Maddali is a Sr. Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Streams . He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Building an administrative console in Amazon QuickSight to analyze usage metrics

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/building-an-administrative-console-in-amazon-quicksight-to-analyze-usage-metrics/

Given the scalability of Amazon QuickSight to hundreds and thousands of users, a common use case is to monitor QuickSight group and user activities, analyze the utilization of dashboards, and identify usage patterns of an individual user and dashboard. With timely access to interactive usage metrics, business intelligence (BI) administrators and data team leads can efficiently plan for stakeholder engagement and dashboard improvements. For example, you can remove inactive authors to reduce license cost, as well as analyze dashboard popularity to understand user acceptance and stickiness.

This post demonstrates how to build an administrative console dashboard and serverless data pipeline. We combine QuickSight APIs with AWS CloudTrail logs to create the datasets to collect comprehensive information of user behavior and QuickSight asset usage patterns.

This post provides a detailed workflow that covers the data pipeline, sample Python code, and a sample dashboard of this administrative console. With the guidance of this post, you can configure this administrative console in your own environment.

Let’s look at Forwood Safety, an innovative, values-driven company with a laser focus on fatality prevention. An early adopter of QuickSight, they have collaborated with AWS to deploy this solution to collect BI application usage insights.

“Our engineers love this admin console solution,” says Faye Crompton, Leader of Analytics and Benchmarking at Forwood. “It helps us to understand how users analyze critical control learnings by helping us to quickly identify the most frequently visited dashboards in Forwood’s self-service analytics and reporting tool, FAST.”

Solution overview

The following diagram illustrates the workflow of the solution.

The following diagram illustrates the workflow of the solution.

The workflow involves the following steps:

  1. The AWS Lambda function Data_Prepare is scheduled to run hourly. This function calls QuickSight APIs to get QuickSight namespace, group, user, and assets access permissions information and saves the results to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. CloudTrail logs are stored in S3 bucket.
  3. Based on the file in Amazon S3 that contains user-group information, the QuickSight assets access permissions information, as well as view dashboard and user login events in CloudTrail logs. Three Amazon Athena tables and several views are created. Optionally, the BI engineer can combine these two tables with employee information tables to display human resource information of the users.
  4. Two QuickSight datasets fetch the data in the Athena tables created in Step 3 through SPICE mode. Then, based on these datasets, a QuickSight dashboard is created.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Access to the following AWS services:
    • Amazon QuickSight
    • Amazon Athena
    • AWS Lambda
    • Amazon S3
  • Basic knowledge of Python
  • Optionally, Security Assertion Markup Language 2.0 (SAML 2.0) or OpenID Connect (OIDC) single sign-on (SSO) configured for QuickSight access

Creating resources

Create your resources by launching the following AWS CloudFormation stack:

After the stack creation is successful, you have one Amazon CloudWatch Events rule, one Lambda function, one S3 bucket, and the corresponding AWS Identity and Access Management (IAM) policies.

To create the resources in a Region other than us-east-1, download the Lambda function.

Creating Athena tables

The Data_Prepare Lambda function is scheduled to run hourly with the CloudWatch Events rule admin-console-every-hour. This function calls the QuickSight APIs list_namespaces, list_users, list_user_groups, list_dashboards, list_datasets, list_datasources, list_analyses, list_themes, describe_data_set_permissions, describe_dashboard_permissions, describe_data_source_permissions, describe_analysis_permissions, and describe_theme_permissions to get QuickSight users and assets access permissions information. Finally, this function creates two files, group_membership.csv and object_access.csv, and saves these files to an S3 bucket.

Run the following SQL query to create two Athena tables (group_membership and object_access):

CREATE EXTERNAL TABLE `group_membership`(
`namespace` string,   
`group` string, 
`user` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3:// admin-console<aws_account_id>/monitoring/quicksight/group_membership/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',',
  'typeOfData'='file')
CREATE EXTERNAL TABLE `object_access`(
`aws_region` string,   
`object_type` string, 
`object_name` string,
`object_id` string,
`principal_type` string,
`principal_name` string,
`namespace` string,
`permissions` string
)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3:// admin-console<aws_account_id>/monitoring/quicksight/object_access/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',',
  'typeOfData'='file')

The following screenshot is sample data of the group_membership table.

The following screenshot is sample data of the group_membership table.

The following screenshot is sample data of the object_access table.

The following screenshot is sample data of the object_access table.

For instructions on building an Athena table with CloudTrail events, see Amazon QuickSight Now Supports Audit Logging with AWS CloudTrail. For this post, we create the table cloudtrail_logs in the default database.

Creating views in Athena

Now we have the tables ready in Athena and can run SQL queries against them to generate some views to analyze the usage metrics of dashboards and users.

Create a view of a user’s role status with the following code:

CREATE OR REPLACE VIEW users AS
(select Namespace,
 Group,
 User,
(case 
when Group in ('quicksight-fed-bi-developer', 'quicksight-fed-bi-admin') 
then 'Author' 
else 'Reader' 
end) 
as author_status
from "group_membership" );

Create a view of GetDashboard events that happened in the last 3 months with the following code:

CREATE OR REPLACE VIEW getdashboard AS 
(SELECT 
"useridentity"."type",   "split_part"("useridentity"."sessioncontext"."sessionissuer"."arn",'/', 2) AS "assumed_role", COALESCE("useridentity"."username","concat"("split_part"("userid
entity"."arn", '/', 2), '/', "split_part"("useridentity"."arn",
'/', 3))) AS "user_name",
awsregion,
"split_part"("split_part"("serviceeventdetails", 'dashboardName":', 2),',', 1) AS dashboard_name, "split_part"("split_part"("split_part"("split_part"("serviceeventdetails", 'dashboardId":', 2),',', 1), 'dashboard/', 2),'"}',1) AS dashboardId,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time, max(date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ')) AS latest_event_time
FROM cloudtrail_logs
WHERE 
eventsource = 'quicksight.amazonaws.com' 
AND
eventname = 'GetDashboard' 
AND
DATE_TRUNC('day',date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ')) > cast(current_date - interval '3' month AS date)
GROUP BY  1,2,3,4,5,6,7)

In the preceding query, the conditions defined in the where clause only fetch the records of GetDashboard events of QuickSight.

How can we design queries to fetch records of other events? We can review the CloudTrail logs to look for the information. For example, let’s look at the sample GetDashboard CloudTrail event:

{
    "userIdentity": {
        "type": "AssumedRole",
        "principalId": "<principal_id>: <user_name>",
        "arn": "arn:aws:sts:: <aws_account_id>:assumed-role/<IAM_role_ name>/<user_name>",
        "accountId": "<aws_account_id>",
        "sessionContext": {
            "sessionIssuer": {
                "type": "Role",
                "principalId": "<principal_id>",
                …
            }
        }
    },
    "eventTime": "2021-01-13T16:55:36Z",
    "eventSource": "quicksight.amazonaws.com",
    "eventName": "GetDashboard",
    "awsRegion": "us-east-1",
    "eventID": "a599c8be-003f-46b7-a40f-2319efb6b87a",
    "readOnly": true,
    "eventType": "AwsServiceEvent",
    "serviceEventDetails": {
        "eventRequestDetails": {
            "dashboardId": "arn:aws:quicksight:us-east-1: <aws_account_id>:dashboard/<dashboard_id>"
        },
        "eventResponseDetails": {
            "dashboardDetails": {
                "dashboardName": "Admin Console",
                "dashboardId": "arn:aws:quicksight:us-east-1: <aws_account_id>:dashboard/<dashboard_id>",
                "analysisIdList": [
                    "arn:aws:quicksight:us-east-1: <aws_account_id>:analysis/<analysis_id>"
            }
        }
    }
}

With eventSource=“quicksight.amazonaws.com” and eventName=“GetDashboard”, we can get all the view QuickSight dashboard events.

Similarly, we can define the condition as eventname = ‘AssumeRoleWithSAML‘ to fetch the user login events. (This solution assumes that the users log in to their QuickSight account with identity federation through SAML.) For more information about querying CloudTrail logs to monitor other interesting user behaviors, see Using administrative dashboards for a centralized view of Amazon QuickSight objects.

Furthermore, we can join with employee information tables to get a QuickSight user’s human resources information.

Finally, we can generate a view called admin_console with QuickSight group and user information, assets information, CloudTrail logs, and, optionally, employee information. The following screenshot shows an example preview.

The following screenshot shows an example preview.

Creating datasets

With the Athena views ready, we can build some QuickSight datasets. We can load the view called admin_console to build a SPICE dataset called admin_console and schedule this dataset to be refreshed hourly. Optionally, you can create a similar dataset called admin_console_login_events with the Athena table based on eventname = ‘AssumeRoleWithSAML‘ to analyze QuickSight users log in events. According to the usage metrics requirement in your organization, you can create other datasets to serve the different requests.

Building dashboards

Now we can build a QuickSight dashboard as the administrative console to analyze usage metrics. The following steps are based on the dataset admin_console. The schema of the optional dataset admin_console_login_events is the same as admin_console. You can apply the same logic to create the calculated fields to analyze user login activities.

  1. Create parameters.

For example, we can create a parameter called InActivityMonths, as in the following screenshot.For example, we can create a parameter called InActivityMonths, as in the following screenshot.Similarly, we can create other parameters such as InActivityDays, Start Date, and End Date.

  1. Create controls based on the parameters.

Create controls based on the parameters.

  1. Create calculated fields.

For instance, we can create a calculated field to detect the active or inactive status of QuickSight authors. If the time span between the latest view dashboard activity and now is larger or equal to the number defined in the Inactivity Months control, the author status is Inactive. The following screenshot shows the relevant code.

The following screenshot shows the relevant code.

According to end user’s requirement, we can define several calculated fields to perform the analysis.

  1. Create visuals.

For example, we create an insight to display the top three dashboards view by readers and a visual to display the authors of these dashboards.

For example, we create an insight to display the top three dashboards view by readers and a visual to display the authors of these dashboards.

  1. We can add URL action to define some extra features to email inactive authors or check details of users.

We can add URL action to define some extra features to email inactive authors or check details of users.

The following sample code defines the action to email inactive authors:

mailto:<<email>>?subject=Alert to inactive author! &body=Hi, <<username>>, any author without activity for more than a month will be deleted. Please log in to your QuickSight account to continue accessing and building analyses and dashboards!

The following sample code defines the action to email inactive authors:
The following screenshots show an example dashboard that you can make using our data.

The following is the administrative console landing page. We provide the overview, terminology explanation and thumbnails of the other two tabs in this page.

The following is the administrative console landing page.

The following screenshots show the User Analysis tab.

The following screenshots show the User Analysis tab.

The following screenshots show the Dashboards Analysis tab.

The following screenshots show the Dashboards Analysis tab.

You can interactively play with the sample dashboard in the following Interactive Dashboard Demo.

You can reference to public template of the preceding dashboard in create-template, create-analysis, and create-dashboard API calls to create this dashboard and analysis in your account. The public template of this dashboard with the template ARN is 'TemplateArn': 'arn:aws:quicksight:us-east-1:889399602426:template/admin-console'.

Additional usage metrics

Additionally, we can perform some complicated analysis to collect advanced usage metrics. For example, Forwood Safety raised a unique request to analyze the readers who log in but don’t do any viewing of dashboard actions (see the following code). This helps their clients identify and prevent any wasting of reader sessions fees. Leadership teams value the ability to minimize uneconomical user activity.

CREATE OR REPLACE VIEW "loginwithoutviewdashboard" AS
with login as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventname = 'AssumeRoleWithSAML'
GROUP BY  1,2,3),
dashboard as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventsource = 'quicksight.amazonaws.com'
AND
eventname = 'GetDashboard'
GROUP BY  1,2,3),
users as 
(select Namespace,
Group,
User,
(case
when Group in (‘quicksight-fed-bi-developer’, ‘quicksight-fed-bi-admin’)
then ‘Author’
else ‘Reader’
end)
as author_status
from "group_membership" )
select l.* 
from login as l 
join dashboard as d 
join users as u 
on l.user_name=d.user_name 
and 
l.awsregion=d.awsregion 
and 
l.user_name=u.user_name
where d.event_time>(l.event_time + interval '30' minute ) 
and 
d.event_time<l.event_time 
and 
u.author_status='Reader'

Cleaning up

To avoid incurring future charges, delete the resources you created with the CloudFormation template.

Conclusion

This post discussed how BI administrators can use QuickSight, CloudTrail, and other AWS services to create a centralized view to analyze QuickSight usage metrics. We also presented a serverless data pipeline to support the administrative console dashboard.

You can request a demo of this administrative console to try for yourself.


About the Authors

Ying Wang is a Data Visualization Engineer with the Data & Analytics Global Specialty Practice in AWS Professional Services.

 

 

 

Jill FlorantJill Florant manages Customer Success for the Amazon QuickSight Service team

Create a custom data connector to Slack’s Member Analytics API in Amazon QuickSight with Amazon Athena Federated Query

Post Syndicated from Pablo Redondo Sanchez original https://aws.amazon.com/blogs/big-data/create-a-custom-data-connector-to-slacks-member-analytics-api-in-amazon-quicksight-with-amazon-athena-federated-query/

Amazon QuickSight recently added support for Amazon Athena Federated Query, which allows you to query data in place from various data sources. With this capability, QuickSight can extend support to query additional data sources like Amazon CloudWatch Logs, Amazon DynamoDB, and Amazon DocumentDB (with Mongo DB compatibility) via their existing Amazon Athena data source. You can also use the Athena Query Federation SDK to write custom connectors and query any source accessible with a Java API, whether it is relational, non-relational, object, or a custom data endpoint.

A common analytics use case is to access data from a REST API endpoint and blend it with information from other sources. In this post, I walk you through the process of setting up a custom federated query connector in Athena to query data from a REST API endpoint and build a QuickSight dashboard that blends data from the REST API endpoint with other data sources.

To illustrate this use case, we work with Slack, the makers of a leading channel-based messaging platform, to test their Member Analytics API, which can help our mock company, Example Corp, understand Slack adoption and member engagement across different teams.

How the Slack Member Analytics API works

The following diagram illustrates the Slack Member Analytics API.

The following diagram illustrates the Slack Member Analytics API.

The Slack Member Analytics API is a REST API endpoint available for Slack Enterprise Grid customers. Authorized users and services can access the member usage stats dataset via the admin.analytics.getFile endpoint of the Slack Web API. The data consists on a new-line delimited JSON file with daily Slack activity stats at the member level. A record looks like the following code:

{ 
    "enterprise_id":"AAAAAAA",
    "date":"2020-11-10",
    "user_id":"U01ERHY4589",
    "email_address":"[email protected]",
    "is_guest":false,
    "is_billable_seat":false,
    "is_active":true,
    "is_active_ios":false,
    "is_active_android":false,
    "is_active_desktop":true,
    "reactions_added_count":3,
    "messages_posted_count":10, 
    "channel_messages_posted_count":0,
    "files_added_count":0
}

To request data, you must provide a date argument in the format of YYYY-MM-DD, a type argument with the value member, and an OAuth bearer token as the header. The response is a compressed (.gzip) JSON file with data for the requested date. See the following code of a sample request:

curl -X GET -H “Authorization: Bearer xoxp-..."  https://slack.com/api/admin.analytics.getFile?date=2020-09-01&type=member > data.gzip

Building the solution for Example Corp

For our use case, Example Corp has recently purchased Slack for 1,000 users and as the Collaboration team onboards new teams to Slack, they want to measure Slack adoption and engagement within each new team. If they see low adoption or engagement within a group at the company, they can work with that group to understand why they aren’t using Slack and provide education and support, as needed.

Example Corp wants to provide analysts access to the Slack member usage stats to run ad hoc queries in place (directly from the source) without maintaining a new extract, transform, and load (ETL) pipeline. They use the QuickSight cross data source join feature to blend their Slack usage stats with their HR dataset.

To achieve this, Example Corp implements the following steps:

  1. Authorize the custom federated query connector with Slack to access the Member Analytics API.
  2. Develop and deploy a custom federated query connector in the Example Corp AWS account.
  3. Create a dataset in the Example Corp QuickSight environment that reads Slack member usage data for the last 30 days and blends it with an HR dataset.
  4. Create a QuickSight dashboard that shows usage trends of provisioned vs. active users.

Example Corp program managers can now monitor slack engagement using their QuickSight Dashboard (see the following screenshot).

Example Corp program managers can now monitor slack engagement using their QuickSight Dashboard

The following diagram illustrates the overall architecture of the solution.

The following diagram illustrates the overall architecture of the solution.

The following sections describe the components in detail and provide sample code to implement the solution in your environment.

Authorizing the custom federated query connector to access the Slack Analytics API

Data REST API endpoints typically have an authentication mechanism such as standard HTTP authentication or a bearer token. In the case of the Slack Web API, a bearer token is required on every request. The Slack Member Analytics API uses an OAuth protocol to authorize applications’ read access to data from an organization’s Slack environment.

To perform the OAuth handshake, Example Corp deploys a custom web application on Amazon Elastic Compute Cloud (Amazon EC2) and registers it as a new Slack application. When it’s deployed, Example Corp Slack admins can access the web application UI to authenticate with Slack and authorize read access to the custom federated query connector. After successful authentication, the custom web application stores the bearer token as a secret in AWS Secrets Manager. Only the custom application server and the federated query connector have access to this secret.

The following is an architecture diagram and brief description of the OAuth authorization workflow between Slack.com and the custom web application. As a prerequisite, you need to register your custom application with Slack.

The following is an architecture diagram and brief description of the OAuth authorization workflow between Slack.com and the custom web application.

  1. The Slack admin accesses the custom application UI from their browser and chooses Add to Slack to begin the authorization process.

The Slack admin accesses the custom application UI from their browser and chooses Add to Slack to begin the authorization process.

  1. The custom application redirects the admin to Slack.com to authenticate and authorize the client with an admin.analytics:read access for Example Corp Slack Enterprise Grid.

The custom application redirects the admin to Slack.com to authenticate and authorize the client with an admin.analytics:read access for Example Corp Slack Enterprise Grid.

  1. Slack.com redirects the admin back to the custom application UI, passing a temporary authorization code in the request.
  2. On the backend, the custom application retrieves Slack client secrets from a Secrets Manager secret. The Slack client secrets are obtained during the Slack application registration.
  3. The custom application server makes a request for a bearer token to the Slack API, passing both the temporary authorization code and the Slack client secrets.
  4. If both the temporary authorization code and the client secrets are valid, then the Slack API returns a bearer token to the custom application server.
  5. The custom application saves the bearer token in the Secrets Manager secret.
  6. Finally, the application sends a confirmation of successful authorization to the admin.

Slack admins can revoke access to the application from the organization’s console at any time.

You can find the source code and detailed instructions to deploy this sample OAuth web application in the GitHub repo. When the authorization workflow is complete, you can pause or stop the resources running the web application. Going forward, the federated query connector accesses the token from Secrets Manager.

Deploying the custom federated query connector

When the OAuth workflow is complete, we can deploy the custom federated query connector in the Example Corp AWS environment. For Example Corp, we develop a custom AWS Lambda function using the Athena Query Federation Java SDK and a Java HTTP client to connect with the Slack Member Analytics REST API. Finally, we register it as a new data source within Athena.

The following is a diagram of how the custom connector workflow operates.

The following is a diagram of how the custom connector workflow operates.

The workflow includes the following steps:

  1. Users submit a query to Athena using the following query: select * from <catalog_name>.slackanalytics.member_analytics where date='2020-11-10', where <catalog_name> is the name specified when creating the Athena data source.
  2. Athena compiles the query and runs the Lambda function to retrieve the Slack authorization token from Secrets Manager and determine the number of partitions based on the query predicates (where clause).
  3. The Slack Member Analytics Connector partitions the data by date and runs a Lambda function for each partition (date) specified in the query. For example, if the predicate is WHERE date IN (‘2020-11-10’, ‘2020-11-12’), Athena runs two instances of the Lambda function. When no dates are specified in the where clause, the connector gets data for the last 30 days.
  4. Each instance of the Lambda function makes a request to the Slack Member API to retrieve data for each day.
  5. Finally, Athena performs any aggregation and computation specified in the query and return the results to the client.

You can deploy this sample Slack Member Analytics Lambda function in your AWS environment via AWS CloudFormation with the following template. If you want to modify and build the connector from scratch, you can find the source code and instructions in the GitHub repo.

After the Lambda function has been deployed, create a new data source in Athena. For step-by-step instructions, see Deploying a Connector and Connecting to a Data Source.

  1. On the Athena console, in the query editor, choose Connect data source.

On the Athena console, in the query editor, choose Connect data source.

  1. Select All other data sources.
  2. Point your catalog to your new Lambda function.

Point your catalog to your new Lambda function.

You should be able to browse your new catalog within Athena from the Athena console and query the Slack Member Analytics API using SQL.

You should be able to browse your new catalog within Athena from the Athena console and query the Slack Member Analytics API using SQL.

Creating a dataset that reads Slack member usage data and blends it with an HR dataset

As a prerequisite to query the Slack Member Analytics API from QuickSight, we must provide the proper permission for QuickSight to access the federated query data source in Athena. We do this directly from the QuickSight admin UI following these steps:

  1. As an admin, on the Admin menu, choose Manage QuickSight.
  2. Under Security & Permissions, choose QuickSight access to AWS services.
  3. Choose Add or Remove services.
  4. Select Athena.
  5. Choose Next when prompted to set the Amazon Simple Storage Service (Amazon S3) bucket and Lambda function permissions.

QuickSight browses the Athena catalogs and displays any Lambda functions associated with your account. If you don’t see a Lambda function, it means you haven’t mapped a data source within Athena.

  1. Select the function.
  2. Choose Finish.

Choose Finish.

When the Example Corp QuickSight environment has the proper permissions, analysts can query the Slack Analytics Member API using their existing Athena data source. For instructions on creating your own dataset, see Creating a Dataset Using Amazon Athena Data.

The custom connector appears as a new Catalog, Database, and Tables option.

  1. In QuickSight, on the Datasets page, choose New dataset.

In QuickSight, on the Datasets page, choose New dataset.

  1. Choose Athena as your data source.
  2. Choose Create dataset.

Choose Create dataset.

  1. Choose your table or, for this use case, choose Use custom SQL.

Choose your table or, for this use case, choose Use custom SQL.

For this analysis, we write a custom SQL that gets member activity for the last 30 days:

SELECT date,
       is_active,
       email_address,
       messages_posted_count
FROM   slackanalytics_catalog.slackanalytics.member_analytics
WHERE  date >= date_format(date_trunc('month',current_date),'%Y-%m-%d')

With the QuickSight cross data source join feature, analysts can enrich the Slack member stats with their HR info. For this use case, we imported a local HR_dataset.csv file containing the list of subscribed users with their respective Example Corp department, and joined them via the employee_email field.

With the QuickSight cross data source join feature, analysts can enrich the Slack member stats with their HR info.

The result is a dataset with Slack activity by employee and department. We’ve also updated the date field from a String type to a Date type using the QuickSight Data Prep page to take advantage of additional visualization features with Date type fields.

The result is a dataset with Slack activity by employee and department.

Creating a QuickSight dashboard that shows usage trends of provisioned vs. active users

Example Corp Analysts want to visualize the trend of provisioned users vs. active users and understand Slack adoption by department. To support these visualizations, we created the following calculated fields within our QuickSight analysis:

  • active distinct_countIf(employee,{is_active}='true')
  • provisioneddistinct_count(employee)

You can also create these calculated fields when you create your dataset. This way, you can reuse them in other QuickSight analyses. 

We use QuickSight narrative insights, a line chart, a bar chart, and a pivot table with conditional formatting to create the following analysis.

We use QuickSight narrative insights, a line chart, a bar chart, and a pivot table with conditional formatting to create the following analysis.

From this analysis, Example Corp can see that the adoption trend is positive; however, there is an adoption gap within the Marketing team. The program managers can engage the Marketing department leads and focus their training resources to improve their adoption.

From this analysis, Example Corp can see that the adoption trend is positive; however, there is an adoption gap within the Marketing team.

This dashboard can now be published to stakeholders within the organization as needed—either within the QuickSight app or embedded within existing enterprise applications. 

Conclusion

With the recent integration of QuickSight and Athena Federated Query, organizations can access additional data sources beyond those already supported by QuickSight. Analysts can leverage QuickSight capabilities to analyze and build dashboards that blend data from a variety of data sources, and with the Athena Query Federation SDK, you can build custom connectors to access relational, non-relational, object, and custom data endpoints using standard SQL.

To get started, try the lab Athena Deploying Custom Connector.


About the Author

Pablo Redondo SanchezPablo Redondo Sanchez is a Senior Solutions Architect at Amazon Web Services. He is a data enthusiast and works with customers to help them achieve better insights and faster outcomes from their data analytics workflows. In his spare time, Pablo enjoys woodworking and spending time outdoor with his family in Northern California.

 

 

 

Getting started with Trace Analytics in Amazon Elasticsearch Service

Post Syndicated from Jeff Wright original https://aws.amazon.com/blogs/big-data/getting-started-with-trace-analytics-in-amazon-elasticsearch-service/

Trace Analytics is now available for Amazon Elasticsearch Service (Amazon ES) domains running versions 7.9 or later. Developers and IT Ops teams can use this feature to troubleshoot performance and availability issues in their distributed applications. It provides end-to-end insights that are not possible with traditional methods of collecting logs and metrics from each component and service individually.

This feature provides a mechanism to ingest OpenTelemetry-standard trace data to be visualized and explored in Kibana. Trace Analytics introduces two new components that fit into the OpenTelemetry and Amazon ES ecosystems:

  • Data Prepper: A server-side application that collects telemetry data and transforms it for Amazon ES.
  • Trace Analytics Kibana plugin: A plugin that provides at-a-glance visibility into your application performance and the ability to drill down on individual traces. The plugin relies on trace data collected and transformed by Data Prepper.

Here is a component overview:

Here is a component overview:

Applications are instrumented with OpenTelemetry instrumentation, which emit trace data to OpenTelemetry Collectors. Collectors can be run as agents on Amazon EC2, as sidecars for Amazon ECS, or as sidecars or DaemonSets for Amazon EKS. They are configured to export traces to Data Prepper, which transforms the data and writes it to Amazon ES. The Trace Analytics Kibana plugin can then be used to visualize and detect problems in your distributed applications.

OpenTelemetry is a Cloud Native Computing Foundation (CNCF) project that aims to define an open standard for the collection of telemetry data. Using an OpenTelemetry Collector in your service environment allows you to ingest trace data from a other projects like Jaeger, Zipkin, and more. As of version 0.7.1, Data Prepper is currently an alpha release. It is a monolithic, vertically scaling component. Work on the next version is underway. It will support more features, including horizontal scaling.

In this blog post, we cover:

  • Launching Data Prepper to send trace data to your Amazon ES domain.
  • Configuring an OpenTelemetry Collector to send trace data to Data Prepper.
  • Exploring the Kibana Trace Analytics plugin using a sample application.

Prerequisites

To get started, you need:

  • An Amazon ES domain running version 7.9 or later.
    • An IAM role for EC2 that has been added to the domain’s access policy. For information, see Create an IAM role in the Amazon EC2 User Guide for Linux Instances.
  • This CloudFormation template, which you use in the walkthrough. Be sure to download it now.
  • An SSH key pair to be deployed to a new EC2 instance.

Deploy to EC2 with CloudFormation

Use the CloudFormation template to deploy Data Prepper to EC2.

  1. Open the AWS CloudFormation console, and choose Create stack.
  2. In Specify template, choose Upload a template file, and then upload the CloudFormation template.
  3. All fields on the Specify stack details page are required. Although you can use the defaults for most fields, enter your values for the following:
    • AmazonEsEndpoint
    • AmazonEsRegion
    • AmazonEsSubnetId (if your Amazon ES domain is in a VPC)
    • IAMRole
    • KeyName

The InstanceType parameter allows you to specify the size of the EC2 instance that will be created. For recommendations on instance sizing by workload, see Right Sizing: Provisioning Instances to Match Workloads, and the Scaling and Tuning guide of the Data Prepper repository.

It should take about three minutes to provision the stack. Data Prepper starts during the CloudFormation deployment. To view output logs, use SSH to connect to the EC2 host and then inspect the /var/log/data-prepper.out file.

Configure OpenTelemetry Collector

Now that Data Prepper is running on an EC2 instance, you can send trace data to it by running an OpenTelemetry Collector in your service environment. For information about installation, see Getting Started in the OpenTelemetry documentation. Make sure that the Collector is configured with an exporter that points to the address of the Data Prepper host. The following otel-collector-config.yaml example receives data from various sources and exports it to Data Prepper.

receivers:
  jaeger:
    protocols:
      grpc:
  otlp:
    protocols:
      grpc:
  zipkin:

exporters:
  otlp/data-prepper:
    endpoint: <data-prepper-address>:21890
    insecure: true

service:
  pipelines:
    traces:
      receivers: [jaeger, otlp, zipkin]
      exporters: [otlp/data-prepper]

Be sure to allow traffic to port 21890 on the EC2 instance. You can do this by adding an inbound rule to the instance’s security group.

Explore the Trace Analytics Kibana plugin by using a sample application

If you don’t have an OpenTelemetry Collector running and would like to send sample data to your Data Prepper instance to try out the trace analytics dashboard, you can quickly set up an instance of the Jaeger Hot R.O.D. application on the EC2 instance with Docker Compose. Our setup script creates three containers on the EC2 instance:

  • Jaeger Hot R.O.D.: The example application to generate trace data.
  • Jaeger Agent: A network daemon that batches trace spans and sends them to the Collector.
  • OpenTelemetry Collector: A vendor-agnostic executable capable of receiving, processing, and exporting telemetry data.

Although your application, the OpenTelemetry Collectors, and Data Prepper instances typically wouldn’t reside on the same host in a real production environment, for simplicity and cost, we use one EC2 instance.

To start the sample application

  1. Use SSH to connect to the EC2 instance using the private key specified in the CloudFormation stack.
    1. When connecting, add a tunnel to port 8080 (the Hot R.O.D. container accepts connections from localhost only). You can do this by adding -L 8080:localhost:8080 to your SSH command.
  2. Download the setup script by running:
    wget https://raw.githubusercontent.com/opendistro-for-elasticsearch/data-prepper/master/examples/aws/jaeger-hotrod-on-ec2/setup-jaeger-hotrod.sh

  3. Run the script with sh setup-jaeger-hotrod.sh.
  4. Visit http://localhost:8080/ to access the Hot R.O.D. dashboard and start sending trace data!

Figure 2: Hot R.O.D. Rides on Demand

  1. After generating sample data with the Hot R.O.D. application, navigate to your Kibana dashboard and from the left navigation pane, choose Trace Analytics. The Dashboard view groups traces together by HTTP method and path so that you can see the average latency, error rate, and trends associated with an operation.

Figure 3: Dashboard page

  1. For a more focused view, choose Traces to drill down into a specific trace.

Figure 4: Traces page

  1. Choose Services to view all services in the application and an interactive map that shows how the various services connect to each other.

Figure 5: Services pageConclusion

Trace Analytics adds to the existing log analytics capabilities of Amazon ES, enabling developers to isolate sources of performance problems and diagnose root causes in their distributed applications. We encourage you to start sending your trace data to Amazon ES so you can benefit from Trace Analytics today.


About the Authors

Jeff Wright is a Software Development Engineer at Amazon Web Services where he works on the Search Services team. His interests are designing and building robust, scalable distributed applications. Jeff is a contributor to Open Distro for Elasticsearch.

 

 

Kowshik Nagarajaan is a Software Development Engineer at Amazon Web Services where he works on the Search Services team. His interests are building and automating distributed analytics applications. Kowshik is a contributor to Open Distro for Elasticsearch.

 

 

Anush Krishnamurthy is an Engineering Manager working on the Search Services team at Amazon Web Services.

How the Yahoo! JAPAN Smart Devices Team is improving voice user interfaces with Amazon QuickSight business intelligence

Post Syndicated from Kazuhide Fujita original https://aws.amazon.com/blogs/big-data/how-the-yahoo-japan-smart-devices-team-is-improving-voice-user-interfaces-with-amazon-quicksight-business-intelligence/

This is a guest blog post by Kazuhide Fujita, Product Manager at Yahoo! JAPAN.

Yahoo! JAPAN is a large internet search and media company, with Yahoo! JAPAN’s web portal being the one of the most commonly used websites in Japan. Our smart devices team is responsible for building and improving Yahoo! JAPAN apps for voice user interfaces (VUI) such as Amazon Alexa and Google Assistant. We see VUI as a market that will grow exponentially in the future, and we want to be ready to lead the consumer experience with such devices. In this post, I discuss how we’re using Amazon QuickSight business intelligence (BI) to help our product teams improve these services.

Enhanced access to insights at lower cost

To continuously improve our services, we use data to understand how consumers are interacting with the software and to identify growth trends. However, the data we get directly from smart device makers is limited. So, we built our own log system to capture more granular data, such as the types of commands customers are using, the time of day they use the application, and how frequently they use it.

Early on, we used Amazon Elasticsearch Service (Amazon ES) and Kibana to analyze data. Although this solution was very capable, it came at a higher price point than we were targeting. Another option was to export data directly to Microsoft Excel for ad hoc analysis. However, this was very time consuming and limited us to working with extracts of historical data rather than the latest information.

We decided to look for a solution that would suit the full spectrum of our needs while being cost-effective for our specific use case. While we were searching, our data team made the decision to standardize on a data lake architecture using Amazon Simple Storage Service (Amazon S3) and Amazon Athena. This approach provided a high level of flexibility and scalability. To visualize our data, it made sense to use QuickSight, the serverless BI solution with pay-per-session pricing on AWS.

Unifying data to understand customers better

This system has proven to be a good fit for our needs. The data lake allows us to accumulate different types of data from monitoring many KPIs and VUI products. For example, we might want to know the number of active users over a given period, and then drill down into how active those users were in the 2 weeks from when they registered. The data lake makes this possible. It’s easy to maintain even though the data is very diverse. For aggregating and performing calculations on the data, we use Athena because it provides optimal performance for complex queries thanks to the distributed computing model.

For ad hoc analysis, dashboards, and reporting, QuickSight connects seamlessly to our data lake. QuickSight makes it easy to view trends in customer behavior such as the time of usage, method of interaction, typical settings, and so on. The following screenshot shows a sample dashboard in QuickSight.

The following screenshot shows a sample dashboard in QuickSight.

For example, the default wake word for Alexa-powered devices is to say the name of the voice assistant: “Hey, Alexa.” However, Japanese customers may prefer to say “ohayō,” which means “good morning” in Japanese. Which setting customers prefer could be an important trend for us to know when we configure our offerings. With QuickSight, it’s easy to compare trends for this type of behavior across other user characteristics.

This is only one small example of the kinds of insights we glean by using QuickSight. Another use case is regarding initiatives to increase product usage through marketing or incentives. We can track the outcome of these programs using QuickSight by tracking whether they result in an uptick in usage relative to the communications we send out.

The freedom to focus on what matters to the business

One of the big advantages of using QuickSight and other AWS services is that we don’t have to worry about maintaining on-premises systems for our data lake and analytics. It’s easy to manage and we can focus on gaining insights and improving our products—not running data center infrastructure. Building our end-to-end data-to-insights pipeline on AWS ensures that we can easily apply security and governance policies to all our data.

Overall, QuickSight provides us with the flexibility to analyze all kinds of data quickly, so we can aim to be the market leader in the VUI marketplace. We’re excited to see what the future holds for this powerful tool—and to apply the knowledge we gain to improving our services.


About the Author

Kazuhide Fujita is the Skill Team Product Manager, Smart Device Division, at Yahoo Japan Corporation

Implementing multi-tenant patterns in Amazon Redshift using data sharing

Post Syndicated from Rajesh Francis original https://aws.amazon.com/blogs/big-data/implementing-multi-tenant-patterns-in-amazon-redshift-using-data-sharing/

Software service providers offer subscription-based analytics capabilities in the cloud with Analytics as a Service (AaaS), and increasingly customers are turning to AaaS for business insights. A multi-tenant storage strategy allows the service providers to build a cost-effective architecture to meet increasing demand.

Multi-tenancy means a single instance of software and its supporting infrastructure is shared to serve multiple customers. For example, a software service provider could generate data that is housed in a single data warehouse cluster, but accessed securely by multiple customers. This storage strategy offers an opportunity to centralize management of data, simplify ETL processes, and optimize costs. However, service providers have to constantly balance between cost and providing a better user experience for their customers.

With the new data sharing feature, you can use Amazon Redshift to scale and meet both objectives of managing costs by simplifying storage and ETL pipelines while still providing consistent performance to customers.  You can ingest data into a cluster designated as a producer cluster, and share this live data with one or more consumer clusters. Clusters accessing this shared data are isolated from each other, therefore performance of a producer cluster isn’t impacted by workloads on consumer clusters. This enables consuming clusters to get consistent performance based on individual compute capacity.

In this post, we focus on various AaaS patterns, and discuss how you can use data sharing in a multi-tenant architecture to scale for virtually unlimited users. We discuss detailed steps to use data sharing with different storage strategies.

Multi-tenant storage patterns

Multi-tenant storage patterns help simplify the architecture and long-term maintenance of the analytics platform. In a multi-tenant strategy, data is stored centrally in a single cluster for all tenants, enabling simplification of the ETL ingestion pipeline and data management. In the previously published whitepaper SaaS Storage Strategies, various models of storage and benefits are covered for a single cluster scenario.

The three strategies you can choose from are:

  • Pool model – Data is stored in a single database schema for all tenants, and a new column (tenant_id) is used to scope and control access to individual tenant data. Access to the multi-tenant data is controlled using views built on the tables.
  • Bridge model – Storage and access to data for each tenant is controlled at individual schema level in the same database.
  • Silo model – Storage and access control to data for each tenant is maintained in separate databases

The following diagram illustrates the architecture of these multi-tenant storage strategies.

The following diagram illustrates the architecture of these multi-tenant storage strategies.

In the following sections, we will discuss how these multi-tenant strategies can be implemented using Amazon Redshift data sharing feature with a multi-cluster architecture.

Scaling your multi-tenant architecture using data sharing

AaaS providers implementing multi-tenant architectures were previously limited to resources of a single cluster to meet the compute and concurrency requirements of users across all the tenants. As the number of tenants increased, you could either turn on concurrency scaling or create additional clusters. However, the addition of new clusters means additional ingestion pipelines and increased operational overhead.

With data sharing in Amazon Redshift, you can easily and securely share data across clusters. Data ingested into the producer cluster is shared with one or more consumer clusters, which allows total separation of ETL and BI workloads. Several consumer clusters can read data from the managed storage of a producer cluster. This enables instant, granular, and high-performance access without data copies and movement. Workloads accessing shared data are isolated from each other and the producer. You can distribute workloads across multiple clusters while simplifying and consolidating the ETL ingestion pipeline into one main producer cluster, providing optimal price for performance.

Consumer clusters can in turn be producers for the data sets they own. Customers can optimize costs even further by collocating multiple tenants on the same consumer cluster. For instance, you can group low volume tier 3 tenants into a single consumer cluster to provider a lower cost offering, while high volume tier 1 tenants get their own isolated compute clusters. Consumer clusters can be created in the same account as producer or in a different AWS account. With this you can have separate billing for the consumer clusters, where you can chargeback to the business group that uses the consumer cluster or even allow your customers to use their own Redshift cluster in their account, so they pay for usage of the consumer cluster. The following diagram shows the difference in ETL and consumer access patterns in a multi-tenant architecture using data sharing versus a single cluster approach without data sharing.

Consumer clusters can in turn be producers for the data sets they own.

Multi-tenant architecture with data sharing compared to single cluster approach

Creating a multi-tenant architecture for an AaaS solution

For this post, we use a simple data model with a fact and a dimension table to demonstrate how to leverage data sharing to design a scalable multi-tenant AaaS solution. We cover detailed steps involved for each storage strategy using this data model. The tables are as follows:

  • Customer – dimension table containing customer details
  • Sales – fact table containing sales transactions

We use two Amazon Redshift ra3.4xl clusters, with 2 nodes each, and designate one cluster as producer and other as consumer.

The high-level steps involved in enabling data sharing across clusters are as follows:

  1. Create a data share in the producer cluster and assign database objects to the data share.
  2. From the producer cluster, grant usage on the data share to consumer clusters, identified by namespace or AWS account.
  3. From the consumer cluster, create an external database using the data share from the producer
  4. Query the tables in the data share through the external shared database in the consumer cluster. Grant access to other users to access this shared database and objects.

Creating producer and consumer Amazon Redshift clusters

Let us start by creating two Amazon Redshift ra3.4xl clusters with 2-nodes each, one for the producer and other for consumer.

  1. On the Amazon Redshift cluster, create two clusters of RA3 instance type, and name them ds-producer and ds-consumer-c1, respectively.
  1. Next, log in to Amazon Redshift using the query editor. You can also use a SQL client tool like DBeaver, SQL Workbench, or Aginity Workbench. For configuration information, see Connecting to an Amazon Redshift cluster using SQL client tools.

Get the cluster namespace of the producer and consumer clusters from the console. We will use the namespaces to create the tenant table and to create and access the data shares. You can also get the cluster namespaces by logging into each of the clusters and executing the SELECT CURRENT_NAMESPACE statement in the query editor.

Please note to replace the corresponding namespaces in the code sections wherever producercluster_namespace, consumercluster1_namespace, and consumercluster_namespace is referenced.

The following screenshot shows the namespace on the Amazon Redshift console.

Now that we have the clusters created, we will go through the detailed steps for the three models. First, we will cover the Pool model, followed by Bridge model and finally the Silo model.

Pool model

The pool model represents an all-in, multi-tenant model where all tenants share the same storage constructs and provides the most benefit in simplifying the AaaS solution.

With this model, data storage is centralized in one cluster database, and data is stored for all tenants in the same set of data models. To scope and control access to tenant data, we introduce a column (tenant_id) that serves as a unique identifier for each tenant.

Security management to prevent cross-tenant access is one of the main aspects to address with the pool model. We can implement row-level security and provide secure access to the data by creating database views and set application-level policies by creating groups with specific access and assigning users to the groups. The following diagram illustrates the pool model architecture.

The following diagram illustrates the pool model architecture.

To create a multi-tenant solution using the pool model, you create data shares for the pool model in the producer cluster, and share data with the consumer cluster. We provide more detail on these steps in the following sections.

Creating data shares for the pool model in the producer cluster

To create data shares for the pool model in the producer cluster, complete the following steps:

  1. Log in to the producer cluster as an admin user and run the following script.

Note that we have a tenant table to store unique identifiers for each tenant or consumer (tenant).

We add a column (tenant_id) to the sales and customer tables to uniquely identify tenant data. This tenant_id references the tenant_id in the tenant table to uniquely identify the tenant and consumer records. See the following code:

/**********************************************/
/*       datasharing datasetup pool model     */
/**********************************************/

 – Create Schema
create schema sales;

CREATE TABLE IF NOT EXISTS sales.tenant ( 
t_tenantid int8 not null,
t_name varchar(50) not null,
t_namespace varchar(50),
t_account varchar(16)
)
DISTSTYLE AUTO
SORTKEY AUTO;

 – Create tables for multi-tenant sales schema
drop table sales.customer;
CREATE TABLE IF NOT EXISTS sales.customer(
  c_tenantid int8 not null,
  c_custid int8 not null,
  c_name varchar(25) not null,
  c_region varchar(40) not null,
  Primary Key(c_tenantid, c_custid)
) 
DISTSTYLE AUTO
SORTKEY AUTO;

CREATE TABLE IF NOT EXISTS sales.sales (
  s_tenantid int8 not null,
  s_orderid int8 not null,
  s_custid int8 not null,
  s_totalprice numeric(12,2) not null,
  s_orderdate date not null,
  Primary Key(s_tenantid, s_orderid)
) 
DISTSTYLE AUTO
SORTKEY AUTO;
  1. Set up the tenant table with the details for each consumer cluster, and ingest data into the customer dimension and sales fact tables. Using the COPY command is the recommended way to ingest data into Amazon Redshift, but for illustration purposes, we use INSERT statements:
     – Ingest data 
    insert into sales.tenant values
    (0, 'primary', '<producercluster_namespace>',''),
    (1, 'tenant1', '<consumercluster1_namespace>',''),
    (2, 'tenant2', '<consumercluster2_namespace>','');
    
    insert into sales.customer values
    (1, 1, 'Customer 1', 'NorthEast'),
    (1, 2, 'Customer 2', 'SouthEast'),
    (2, 1, 'Customer 3', 'NorthWest'),
    (2, 2, 'Customer 4', 'SouthEast');
    
    truncate table sales.sales;
    insert into sales.sales values
    (1, 1, 1, 2434.33, '2020-11-21'),
    (1, 2, 2, 54.90, '2020-5-5'),
    (1, 3, 2, 9678.99, '2020-3-8'),
    (2, 1, 2, 452.24, '2020-1-23'),
    (2, 2, 1, 76523.10, '2020-11-3'),
    (2, 3, 1, 6745.20, '2020-10-01');
    
    select count(*) from sales.tenant;
    select count(*) from sales.customer;
    select count(*) from sales.sales;

Securing data on the producer cluster by restricting access

In the pool model, no external user has direct access to underlying tables. All access is restricted using views.

  1. Create a view for each of the fact and dimension tables to include a condition to filter records from the consumer tenant’s namespace. In our example, we create v_customersales to combine sales fact and customer dimension tables with a restrictive filter for tenant.namespace = current_namespace. See the following code:
    /**********************************************/
    /* We will create late binding views          */
    /* but materialized views could also be used  */
    /**********************************************/
    
    create or replace view sales.v_customer as
    select * 
    from sales.customer c, sales.tenant t
    where c.c_tenantid = t.t_tenantid 
    and t.t_namespace  = current_namespace;
    
    create or replace view sales.v_sales as
    select * 
    from sales.sales s, sales.tenant t
    where s.s_tenantid = t.t_tenantid 
    and t.t_namespace  = current_namespace;
    
    create or replace view sales.v_customersales as 
    select c_tenantid, c_name, c_region, 
    	date_part(w, to_date(s_orderdate,'YYYY-MM-DD')) as "week", 
    	date_part(mon, to_date(s_orderdate,'YYYY-MM-DD')) as "month", 
    	date_part(dow, to_date(s_orderdate,'YYYY-MM-DD')) as "dow",
    	date_part(yr, to_date(s_orderdate,'YYYY-MM-DD')) as "year",
    	date_part(d, to_date(s_orderdate,'YYYY-MM-DD')) as "dom", 
    	t.t_namespace
    from sales.tenant t, sales.customer c, sales.sales s
    where t.t_tenantid = c.c_tenantid 
    and c.c_tenantid = s.s_tenantid 
    and c.c_custid = s.s_custid 
    and t.t_namespace = current_namespace 
    WITH NO SCHEMA BINDING;
    
    select * from sales.v_customersales;
          
    

Now that we have database objects created in the producer cluster, we can share the data with the consumer clusters.

Sharing data with the consumer cluster

To share data with the consumer cluster, complete the following steps:

  1. Create a data share for the sales data:
    /***************************************************/
    /* Create Datashare and add objects to the share    */
    /****************************************************/
    CREATE DATASHARE salesshare;
    

  1. Enter the following code to alter the data share, add the sales schema to be shared with the consumer clusters, and add all tables in the sales schema to be shared with the consumer cluster:
    /************************************************************/
    /* Add objects at desired granularities: schemas, tables,   */
    /* views include materialized, and SQL UDFs                 */
    /************************************************************/
    ALTER DATASHARE salesshare ADD SCHEMA sales;  – New addition to create SCHEMA first
    
    /*For pool model, we share only the views and not tables */
    ALTER DATASHARE SalesShare ADD TABLE sales.v_customer;
    ALTER DATASHARE SalesShare ADD TABLE sales.v_customersales;
    

For the pool model, we share only the views with the consumer cluster and not the tables. The ALTER statement ADD TABLE is used to add both views and tables.

  1. Grant usage on the sales data share to the namespace of the BI consumer cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster. See the following code:
    /********************************************************************/
    /* Grant access to consumer clusters                                */
    /* login to Consumer BI Cluster and get the Namespace from          */
    /* the Redshift console or using SELECT CURRENT_NAMESPACE           */
    /********************************************************************/
    SELECT CURRENT_NAMESPACE;
    
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    GRANT USAGE ON DATASHARE salesshare TO NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long
    GRANT USAGE ON DATASHARE salesshare TO ACCOUNT 'Consumer_AWSAccount';
    

  1. View data shares that are shared from the producer cluster:
    SELECT * FROM SVV_DATASHARES;

The following screenshot shows the output.

The following screenshot shows the output.

You can also see the data shares and their detailed objects and consumers using the following commands:

SHOW DATASHARES;
DESC DATASHARE salesshare;
select * from SVV_DATASHARE_OBJECTS;
select * from SVV_DATASHARE_CONSUMERS;

Viewing and querying data shares for the pool model from the consumer cluster

To view and query data shares from the consumer cluster, complete the following steps:

  1. Log in to the consumer cluster as an admin user and view the data share objects:
    /**********************************************************/
    /* Login to Consumer cluster as awsuser:                  */
    /* View datashares and create local database for querying */
    /**********************************************************/
    select * from SVV_DATASHARE_OBJECTS;

The following screenshot shows the results.

The following screenshot shows the results.

  1. Create a new database from the data share of the producer cluster:
    /**********************************************************/
    /* Create a local database and schema reference           */
    /**********************************************************/
    CREATE DATABASE sales_db FROM DATASHARE salesshare
    OF NAMESPACE '<producercluster_namespace>';

  1. Optionally, you can create an external schema in the consumer cluster pointing to the schema in the database of the producer cluster.

Creating a local external schema in the consumer cluster allows schema-level access controls within the consumer cluster, and uses a two-part notation when referencing shared data objects (localschema.table; vs. external_db.producerschema.table). See the following code:

/*********************************************************/
/* Create External Schema - Optional                     */
/* reason for schema: give specific access to schema     */
/* using shared alias get access to a secondary database */
/*********************************************************/
CREATE EXTERNAL SCHEMA sales_schema 
FROM REDSHIFT DATABASE 'sales_db' SCHEMA 'sales';
  1. Now you can query the shared data from the producer cluster by using the syntax tenant.schema.table:
    select * from sales_db.sales.customer;
    select * from sales_db.sales.v_customersales;

  1. From the tenant1 consumer cluster, you can view the databases and the tenants that are accessible to tenant1. tenant1_schema is as follows:
    select * from SVV_REDSHIFT_DATABASES;
    

The following screenshot shows the results.

The following screenshot shows the results.

Creating local consumer users and controlling access

You can control access to users in your consumer cluster by creating users and groups, and assigning access to the data share objects.

  1. Log in as an admin user on consumer cluster 1 and enter the following code to create tenant1_group, grant usage on the local database sales_db and schema sales_schema to the group, and assign the user tenant1_user to the tenant1_group:
    /********************************************************/
    /* Consumers can create own users and assign privileges */
    /* Create tenant1_group and assign privileges to read   */
    /* sales_db and the sales_schema                        */
    /* Create tenant1_user in tenant1_group                 */
    /********************************************************/
    create group tenant1_group;
    create user tenant1_user password 'Redshift#123!' in group tenant1_group;
    
    GRANT USAGE ON DATABASE sales_db TO tenant1_group;
    GRANT USAGE ON SCHEMA sales_schema TO GROUP tenant1_group;
    

  1. Now, login as tenant1_user to consumer cluster 1 and select data from the views v_customer and v_customersales:
    /*******************************************************/
    /* select from view returns only sales records related */
    /* to Consumer A namespace                             */
    /*******************************************************/
    select * from sales_db.sales.v_customer;
    select * from sales_db.sales.v_customersales;
    

You should see only the data relevant to tenant 1 and not the data that is associated with tenant 2.

You should see only the data relevant to tenant 1 and not the data that is associated with tenant 2.

 

Create Materialized views to optimize performance

Consumer clusters can have their own database objects which are local to the consumer. You can also create materialized views on the datashare objects and control when to refresh the dataset for your consumers. This provides another level of isolation from the producer cluster, and will ensure the consumer clusters go against their local dataset.

  1. Log in as an admin user on consumer cluster 1 and enter the following code to create a materialized view for customersales. This will create a local view that can be periodically refreshed from the consumer cluster.

 

/*******************************************************/
/* Create materialized view in consumer cluster        */
/*******************************************************/
create MATERIALIZED view tenant1_sales.mv_customersales as 
select c_tenantid, c_name, c_region, 
	date_part(w, to_date(s_orderdate,'YYYY-MM-DD')) as "week", 
	date_part(mon, to_date(s_orderdate,'YYYY-MM-DD')) as "month", 
	date_part(dow, to_date(s_orderdate,'YYYY-MM-DD')) as "dow",
	date_part(yr, to_date(s_orderdate,'YYYY-MM-DD')) as "year",
	date_part(d, to_date(s_orderdate,'YYYY-MM-DD')) as "dom", 
	t.t_namespace
from sales_db.tenant t, sales_db.customer c, sales_db.sales s
where t.t_tenantid = c.c_tenantid 
and c.c_tenantid = s.s_tenantid 
and c.c_custid = s.s_custid 
and t.t_namespace = current_namespace;

select * from tenant1_sales.mv_customersales top 100;

REFRESH MATERIALIZED VIEW tenant1_sales.mv_customersales;

 

With the preceding steps, we have demonstrated how you can control access to the tenant data in the same datastore using views. We also reviewed how data shares help efficiently share data between producer and consumer clusters with transaction consistency. We also saw how a local materialized view can be created to further isolate your BI workloads for your customers and provide a consistent, performant user experience. In the next section we will discuss the Bridge model.

Bridge model

In the bridge model, data for each tenant is stored in its own schema in a database and contains a similar set of tables. Data shares are created for each schema and shared with the corresponded consumer. This is an appealing balance between silo and pool model, providing both data isolation and ETL consolidation. With Amazon Redshift, you can create up to 9,900 schemas. For more information, see Quotas and limits in Amazon Redshift.

With data sharing, separate consumer clusters can be provisioned to use the same managed storage from producer cluster. Consumer clusters have all the capabilities of a producer cluster, and can in turn be producer clusters for data objects they own. Consumers can’t share data that is already shared with them. Without data sharing, queries from all customers are directed to a single cluster. The following diagram illustrates the bridge model.

The following diagram illustrates the bridge model.

To create a multi-tenant architecture using bridge model, complete the steps in the following sections.

Creating database schemas and tables for the bridge model in the producer cluster

As we did in the pool model, the first step is to create the database schema and tables. We log in to the producer cluster as an admin user and create separate schemas for each tenant. For our post, we create two schemas, tenant1 and tenant2, to store data for two tenants.

  1. Log in to the producer cluster as the admin user.
  1. Use the script below to create two schemas, tenant1 and tenant2, and create tables for customer dimension and sales facts under each of the two schemas. See the following code:
    /****************************************/
    /* Bridge -  Data Model */
    /****************************************/
     – Create schemas tenant1 and tenant2
    create schema tenant1;
    create schema tenant2;
    
     – Create tables for tenant1
    CREATE TABLE IF NOT EXISTS tenant1.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant1.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    
      
    
     – Create tables for tenant2
    CREATE TABLE IF NOT EXISTS tenant2.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant2.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    

  1. Ingest data into the customer dimension and sales fact tables. Using the COPY command is the recommended way to ingest data into Amazon Redshift, but for illustration purposes, we use the INSERT statement:
     – ingest data for tenant1
     – ingest customer data
    insert into tenant1.customer values
    (1, 'Customer 1', 'NorthEast'),
    (2, 'Customer 2', 'SouthEast');
    
     – ingest sales data
    insert into tenant1.sales values
    (1, 1, 2434.33, '2020-11-21'),
    (2, 2, 54.90, '2020-5-5'),
    (3, 2, 9678.99, '2020-3-8');
    
    select count(*) from tenant1.customer;
    select count(*) from tenant1.sales;
    
    
     – ingest data for tenant2
     – ingest customer data
    insert into tenant2.customer values
    (1, 'Customer 3', 'NorthWest'),
    (2, 'Customer 4', 'SouthEast');
    
     – ingest sales data
    truncate table tenant2.sales;
    insert into tenant2.sales values
    (1, 2, 452.24, '2020-1-23'),
    (2, 1, 76523.10, '2020-11-3'),
    (3, 1, 6745.20, '2020-10-01');
    
    
    select count(*) from tenant2.customer;
    select count(*) from tenant2.sales;
    

Creating data shares and granting usage to the consumer cluster

In the following code, we create two data shares, tenant1share and tenant2share, to share the database objects under the two schemas to the respective consumer clusters.

  1. Create two datashares tenant1share and tenant2share to share the database objects under the two schemas to the respective consumer clusters.
    /******************************************************************/
    /*   Create Datashare and add database objects to the datashare   */
    /******************************************************************/
    CREATE DATASHARE tenant1share;
    CREATE DATASHARE tenant2share;

  1. Alter the datashare and add the schema(s) for respective tenants to be shared with the consumer cluster
    ALTER DATASHARE tenant1share ADD SCHEMA tenant1;
    ALTER DATASHARE tenant2share ADD SCHEMA tenant2;

  1. Alter the datashare and add all tables in the schema(s) to be shared with the consumer cluster
    ALTER DATASHARE tenant1share ADD ALL TABLES IN SCHEMA tenant1;
    ALTER DATASHARE tenant2share ADD ALL TABLES IN SCHEMA tenant2;

Getting the namespace of the first consumer cluster

  1. Log in to the consumer cluster and get the namespace from the console or by running the select current_namespace command:
    /* Grant access to tenant1 schema for Tenant1 BI Cluster */
    /* login to tenant1 BI Cluster and get the Namespace 
     * or get the Namespace from the Redshift console */
    SELECT CURRENT_NAMESPACE;

 

  1. Grant usage on the data share for the first tenant to the namespace of the BI cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster:
     – Grant usage on the datashare to the first consumer cluster
     – Namespace refers to the namespace GUID of the consumer cluster 
    GRANT USAGE ON DATASHARE tenant1share TO NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant1share TO ACCOUNT '<Consumer_AWSAccount>';

Getting the namespace of the second consumer cluster

  1. Log in to the second consumer cluster and get the namespace from the console or by running the select current_namespace command:
    /* Grant access to tenant2 schema for Tenant2 BI Cluster */
    /*login to tenant1 BI Cluster and get the Namespace      */
    SELECT CURRENT_NAMESPACE;
    

  1. Grant usage on the data share for the second tenant to the namespace of the second consumer cluster you just got from the previous step:
     – Grant usage on the datashare to the second consumer cluster
    GRANT USAGE ON DATASHARE tenant2share TO NAMESPACE '<consumercluster2_namespace>'
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant2share TO ACCOUNT '<Consumer_AWSAccount>';

  1. To view data shares from the producer cluster, enter the following code:
    /******************************************************************/
    /*   View Datashares created, Datashare objects and consumers     */
    /******************************************************************/
    select * from SVV_DATASHARES;
    select * from SVV_DATASHARE_OBJECTS;
    select * from SVV_DATASHARE_CONSUMERS;

The following screenshot shows the commands in the query editor.

The following screenshot shows the commands in the query editor.

The following screenshot shows the query results.

The following screenshot shows the query results.

Accessing data using the consumer cluster from the data share

To access data using the consumer cluster, complete the following steps:

  1. Log in to the first consumer cluster ds-consumer-c1, as an admin user.
  1. View data share objects from the SVV_DATASHARE_OBJECTS system view:
    /*****************************************************/
    /* Consumer cluster as adminuser
    /* List the shares available and review contents for each 
    /********************************************************/
     – You can view datashare objects associated with the cluster
     – using either of the two commands
    SHOW DATASHARES;
    select * from SVV_DATASHARES;

The following screenshot shows the query results.

The following screenshot shows the query results.

--View objects shared in inbound share for consumer
select * from SVV_DATASHARE_OBJECTS;

The following screenshot shows the query results.

The following screenshot shows the query results.

--View namespace or clusters granted usage to a datashare
select * from svv_datashare_consumers;
  1. Create a local database in the first consumer cluster, and an external schema to be able to provide controlled access to the specific schema to the consumer clusters:
    /*******************************************************/
    /* Create a local database and schema reference        */
    /* to the share objects                                */
    /*******************************************************/
    CREATE DATABASE tenant1_db FROM DATASHARE tenant1share
    OF NAMESPACE '<producercluster_namespace>';

  1. Query the database tables using the three-part notation db.tenant.table:
    select * from tenant1_db.tenant1.customer;
    select * from tenant1_db.tenant1.sales;

  1. Optionally, you can create an external schema.

There are two reasons to create an external schema: either to enable two-part notation access to the tables from the consumer cluster, or to provide restricted access to the specific schemas for selected users, when multiple schemas are shared from the producer cluster. See the following code for our external schema:

/* Create External Schema */
CREATE EXTERNAL SCHEMA tenant1_schema FROM REDSHIFT DATABASE 'tenant1_db' SCHEMA 'tenant1';
  1. If you created the local schemas, you can use the following two-part notation to query the database tables:
    select * from tenant1_schema.customer;
    select * from tenant1_schema.sales;

  1. You can view the shared databases by querying the SVV_REDSHIFT_DATABASES table:
    select * from SVV_REDSHIFT_DATABASES;

The following screenshot shows the query results.

The following screenshot shows the query results.

Creating consumer users for managing access

Still logged in as an admin user to the consumer cluster, you can create other users who have access to the database objects.

  1. Create users and groups, and assign users and object privileges to the groups with the following code:
    /*******************************************************/
    /* Consumer can create own users and assign privileges */
    /* Create tenant1_user and assign privileges to        */
    /* read datashare from tenant1 schema                  */
    /*******************************************************/
    create group tenant1_group;
    create user tenant1_user password 'Redshift#123!' in group tenant1_group;
    
    GRANT USAGE ON DATABASE tenant1_db TO tenant1_user;
    GRANT USAGE ON SCHEMA tenant1_schema TO GROUP tenant1_group;
    

Now tenant1_user can log in and query the shared tables from tenant schema.

  1. Log in to the consumer cluster as tenant1_user and query the tables:
    /************************************************************/
    /* Consumer cluster as tenant1_user - As a consumer cluster */
    /* administrator, you must follow these steps:              */
    /************************************************************/
    
    select * from tenant1_db.tenant1.customer;
    select * from tenant1_db.tenant1.sales;
    
    
    /************************************************************/
    /* If you have created and External Schema                  */
    /* you can use the two-part notation to query tables.       */
    /************************************************************/
    
    select * from tenant1_schema.customer;
    select * from tenant1_schema.sales;
    

Revoking access to a data share (optional)

  1. At any point, if you want to revoke access to the data share, you can the REVOKE USAGE command:
    /*************************************************************/
    /* To revoke access at any time use the REVOKE USAGE command */
    /*************************************************************/
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    REVOKE USAGE ON DATASHARE Salesshare FROM NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long
    REVOKE USAGE ON DATASHARE Salesshare FROM ACCOUNT '<Consumer_AWSAccount>';
    

Silo model

The third option is to store data for each tenant in separate databases within a cluster. If you need your data isolated from other tenants, you can use the silo model and each database may have distinct data models, monitoring, management, and security footprints.

Amazon Redshift supports cross-database queries across databases, which allow you to simplify data organization. You can store common or granular datasets used across all tenants in a centralized database, and use the cross-database query capability to join relevant data for each tenant.

The steps to create a data share in a silo model is similar to a bridge model; however, unlike a bridge model (where data share is for each schema), the silo model has a data share created for each database. The following diagram illustrates the architecture of the silo model.

The following diagram illustrates the architecture of the silo model.

Creating data shares for the silo model in the producer cluster

To create data shares for the silo model in the producer cluster, complete the following steps:

  1. Log in to the producer cluster as an admin user and create separate databases for each tenant:
    /*****************************************************/
    /** Silo Model – Create databases for the 2 tenants **/
    /*****************************************************/
    create database tenant1_silodb;
    
    create database tenant2_silodb;
    

  1. Log in again to the producer cluster with the database name and user ID for the database that you want to share (tenant1_silodb) and create the schema and tables:
    /***********************************************************/
    /* login to tenant1_db and create schema tenant1 and tables*/
    /***********************************************************/
    create schema tenant1_siloschema;
    
     – Create tables for tenant1
    CREATE TABLE IF NOT EXISTS tenant1_silodb.tenant1_siloschema.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant1_silodb.tenant1_siloschema.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    
    insert into tenant1_siloschema.customer values
    (1, 'Customer 1', 'NorthEast'),
    (2, 'Customer 2', 'SouthEast');
    
    truncate table tenant1_siloschema.sales;
    insert into tenant1.sales values
    (1, 1, 2434.33, '2020-11-21'),
    (2, 2, 54.90, '2020-5-5'),
    (3, 2, 9678.99, '2020-3-8');
    

  1. Create a data share with a name for the first tenant (for example, tenant1dbshare):
    /******************************************************************/
    /*   Create datashare and add database objects to the datashare   */
    /******************************************************************/
    CREATE DATASHARE tenant1_silodbshare;
    

  1. Run Alter datashare commands to add the schemas to be shared with the consumer cluster and add all tables in the schemas to be shared with the consumer cluster:
    ALTER DATASHARE tenant1_silodbshare ADD SCHEMA tenant1_siloschema;
    ALTER DATASHARE tenant1_silodbshare ADD ALL TABLES IN SCHEMA tenant1_siloschema;

  1. Grant usage on the data share for first tenant to the namespace of the BI cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster:
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    GRANT USAGE ON DATASHARE tenant1_silodbshare TO NAMESPACE ‘<consumercluster1_namespace>’
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant1_silodbshare TO ACCOUNT '<AWS-Account>';
    

Viewing and querying data shares for the silo model from the consumer cluster

To view and query your data shares, complete the following steps:

  1. Log in to the consumer cluster as an admin user.
  2. Create a new database from the data share of the producer cluster:
    CREATE DATABASE tenant1_silodb FROM DATASHARE tenant1_silodbshare
    OF NAMESPACE ‘<producercluster_namespace>’;

Now you can start querying the shared data from the producer cluster by using the syntax – tenant.schema.table. If you created an external schema, then you can also use the two-part notation to query the tables.

  1. Query the data with the following code:
    select * from tenant1_silodb.tenant1.customer;
    select * from tenant1_silodb.tenant1.sales;
    

  1. Optionally, you can create an external schema pointing to the schema in the database of the producer cluster. This allows you query shared tables using a two-part notation. See the following code:
    CREATE EXTERNAL SCHEMA tenant1_siloschema FROM REDSHIFT DATABASE 'tenant1_silodb' SCHEMA 'tenant1';
    
    --With this Schema, you can access using two-part notation to select from data share tables
    select * from tenant1_siloschema.customer;
    select * from tenant1_siloschema.sales;
    

  1. You can repeat the same steps for tenant2 to share the tenant2 database with tenant2 You can also control access to users in your consumer cluster by creating users and groups, and assigning access to the data share objects.

System views to view data shares

We have introduced new system tables and views to easily identify the data shares and related objects. You can use three different groups of system views to view the data share objects:

  • Views starting with SVV_DATASHARES – has detail of datashares and objects in a datashare.
View NamePurpose
SVV_DATASHARESView a list of data shares created on the cluster and data shares shared with the cluster
SVV_DATASHARE_OBJECTSView a list of objects in all data shares created on the cluster or shared with the cluster
SVV_DATASHARE_CONSUMERSView a list of consumers for data share created on the cluster
  • Views starting with SVV_REDSHIFT – contains details on both local and remote Redshift databases.
View NamePurpose
SVV_REDSHIFT_DATABASESList of all databases that a user has access to
SVV_REDSHIFT_SCHEMASList of all schemas that user has access to
SVV_REDSHIFT_TABLESList of all tables that a user has access to
SVV_REDSHIFT_COLUMNSList of all columns that a user has access to
SVV_REDSHIFT_FUNCTIONSList of all functions that user has access to
  • Views starting with SVV_ALL– contain local and remote databases, external schemas including  spectrum and federated query, and external schema references to shared data.  If you create external schemas in consumer cluster, you need to use the SVV_ALL views to look at the objects.
View NamePurpose
SVV_ ALL _SCHEMASUnion of list of all schemas from SVV_REDSHIFT_SCHEMA view and consolidated list of all external tables and schemas that user has access to
SVV_ ALL _TABLESList of all tables that a user has access to
SVV_ ALL _COLUMNSList of all columns that a user has access to
SVV_ ALL _FUNCTIONSList of all functions that user has access to

Considerations for choosing a storage strategy

You can adopt a storage strategy or choose a hybrid approach based on business, technical, and operational requirements. Before deciding on a strategy, consider the quotas and limits for various objects in Amazon Redshift, and the number of databases per cluster or number of schemas per database to check if it meets your requirements. The following table summarizes these considerations.

 PoolBridgeSilo
Separation of tenant dataViewsSchemaDatabase
ETL pipeline complexityLowLowMedium
Limits100,000 tables (RA3 – 4x, 16x large clusters)9,900 schemas per database60 databases per cluster
Chargeback to consumer accountsYesYesYes
ScalabilityHighHighHigh

Conclusion

In this post, we discussed how you can use the new data sharing feature of Amazon Redshift to implement an AaaS solution with a multi-tenant architecture while meeting SLAs for consumers using separate Amazon Redshift clusters. We demonstrated three types of models providing various levels of isolation for the tenant data. We compared and contrasted the models and provided guidance on when to choose an implementation model. We encourage you to try the data sharing feature to build your AaaS or software as a service (SaaS) solutions.


About the Authors

Rajesh Francis is a Sr. Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and works with customers to build scalable Analytic solutions.

 

 

 

Neeraja Rentachintala is a Principal Product Manager with Amazon Redshift. Neeraja is a seasoned Product Management and GTM leader, bringing over 20 years of experience in product vision, strategy and leadership roles in data products and platforms. Neeraja delivered products in analytics, databases, data Integration, application integration, AI/Machine Learning, large scale distributed systems across On-Premise and Cloud, serving Fortune 500 companies as part of ventures including MapR (acquired by HPE), Microsoft SQL Server, Oracle, Informatica and Expedia.com.

 

Jeetesh Srivastva is a Sr. Analytics specialist solutions architect at AWS. He specializes in Amazon Redshift and works with customers to implement scalable solutions leveraging Redshift and other AWS Analytic services. He has worked to deliver on premises and cloud based analytic solutions for customers in banking & finance and hospitality industry verticals.

Querying a Vertica data source in Amazon Athena using the Athena Federated Query SDK

Post Syndicated from Kelly Ragan original https://aws.amazon.com/blogs/big-data/querying-a-vertica-data-source-in-amazon-athena-using-the-athena-federated-query-sdk/

The ability to query data and perform ad hoc analysis across multiple platforms and data stores with a single tool brings immense value to the big data analytical arena. As organizations build out data lakes with increasing volumes of data, there is a growing need to combine that data with large amounts of data in other data stores. As the variety of data increases, it becomes paramount to have a query tool to bridge two or more data stores with a single query.

Even though data lakes became popular for analytic workloads recently, it’s not uncommon to have data warehouses in addition to data lakes for various reporting and business intelligence (BI) use cases. It becomes imperative to be able to seamlessly query the data stored in the data warehouse and the data lake. To address this issue, Amazon Athena has released a feature called Athena Federated Query. Athena is an interactive query service provided by AWS that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Vertica is a columnar MPP database platform that can be deployed in the cloud or on premises, and supports exabyte scale data warehouses. With Athena Federated Query and the Vertica connector, you can now run analytical queries over a data warehouse on Vertica and a data lake in Amazon S3.

Athena Federated Query includes pre-built connectors to a variety of AWS services and databases, as well as an SDK to build custom connectors to other databases and data stores. With this feature, federated queries can pull data from a data lake in an S3 bucket and from an external data source, and then combine it into a single result set in Athena. These connectors are an extension of the Athena query engine, which translates content between Athena and the external data source. Pre-built connectors exist for Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), and Amazon Relational Database Service (Amazon RDS), as well as a JDBC connector for Amazon Redshift, MySQL, and PostgreSQL. For other types of relational databases, you can use the Athena Federated Query SDK to create a custom connector.

In this post, we demonstrate how to deploy the custom connector between Athena and a Vertica database built using the Athena Federated Query SDK. After deploying the custom connector, we demonstrate issuing federated queries and moving data from Vertica to a data lake using CREATE TABLE AS (CTAS) with a federated query.

AWS services used in the solution

The Athena Federated Query SDK is an open-source framework to build custom connectors, and comes with a connector publish tool that deploys the connector executables in an application to the AWS Serverless Application Repository. The Athena Federated Query uses an AWS Lambda function that in turn uses the application deployed to the AWS Serverless Application Repository.

A custom connector is composed of a Lambda function that utilizes three components:

  • MetadataHandler – An interface that exposes metadata information of schemas, tables, and columns from the underlying data store to Athena
  • RecordHandler – An interface that provides hooks to read data from the external source and share it with the Athena query engine in Apache Arrow columnar format
  • CompositeHandler – For managing running the MetadataHandler and RecordHandler

The Lambda function connects to the external data store using an appropriate connection protocol and sends the parsed SQL statement. In the case of Vertica, it is JDBC. The RecordHandler processes the result set produced by the external data store and passes the rows to Athena for final processing. Multiple Lambda functions are called by Athena depending on the Lambda concurrency settings to read the result set in parallel. A spill bucket is used to handle a large dataset that exceeds the Lambda server’s capacity to process the result set.

The JDBC connection established by the Lambda function to the external database is used to send the parsed SQL statement and retrieve the result set rows from the external database. This scenario works well in terms of bandwidth for smaller databases and result sets. However, you might have Vertica deployments with petabyte or exabyte data warehouses. Typical queries return result sets on the order of 10, 20, 30 gigabytes, or more. Due to the bandwidth issue with a JDBC connection, the solution presented in this post modifies the Athena Federated Query SDK to implement a different route for the transmission of large result sets from Vertica to the Athena server for final processing.

The alternate solution utilizes the Vertica EXPORT command as a wrapper around the parsed SQL statement. You can use the EXPORT command to write a result set for a SQL statement directly to an S3 bucket using Vertica’s highly parallelized write to Amazon S3 using partitioning. This solution modifies the SDK to allow Athena to read the result set in the S3 bucket, determine the number of partitions, and call subsequent Lambda functions to parallelize the read of the result set. This produces an efficient way to move a multi-gigabyte result set from Vertica to Athena with parallelized writes from Vertica to Amazon S3 and parallelized reads from Amazon S3 to Athena. When connecting to the Vertica database, the SDK uses AWS Secrets Manager to retrieve a user ID and password for a service account on the Vertica database.

Solution architecture

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The connector components are as follows:

  1. A user issues a federated SQL query in Athena against a table in Vertica.
  2. Athena parses the query and calls a Lambda function.
  3. The Lambda function makes a call to Secrets Manager to get the user ID and password for connecting to Vertica.
  4. The connector sends an EXPORT statement wrapper with the embedded SQL statement to Vertica through the JDBC connection. For example, see the following code:
    EXPORT TO PARQUET (directory = 's3://<bucket_name>/<folder_name>, 
    Compression='Snappy', fileSizeMB=64) OVER() as   
    SELECT  
    ORDER_ID,  
    ITEM,  
    CUSTOMER_ID,
    ORDERED_DATE
    FROM SCHEMA1.ORDERS  
    WHERE CUSTOMER_ID = 2;
    

  5. Vertica processes the SQL query and writes the result set to the S3 bucket specified in the EXPORT command. Vertica parallelizes the write to S3 bucket based on the fileSizeMB parameter into as many partitions as needed for the result set.
  6. Athena calls a Lambda function to scan the S3 bucket in order to determine the number of files to read for the result set.
  7. Athena invokes multiple Lambda functions depending on the number of partitions using Amazon S3 Select. This allows Athena to parallelize the read of the S3 files.
  8. Athena combines the result set returned from Vertica with data scanned from the data lake, and returns the combined result set to the user.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • Amazon EC2 IAM role permissions – The AWS Identity and Access Management (IAM) role of the Amazon Elastic Compute Cloud (Amazon EC2) machines hosting the Vertica database must be given write permissions to the VerticaExport S3 bucket, which is created when deploying the connector.
  • Secrets Manager – The Vertica connection credentials are stored in Secrets Manager. The secret name is prefixed with Vertica– and the secret value is the connection credentials.
  • Lambda IAM role permissions – When the Lambda function is deployed to the AWS Serverless Application Repository, it creates a custom IAM role for the function to run. The custom role has the following IAM permissions in order to successfully perform the read and write functions associated with the MetadataHandler and RecordHandler:
    • AWSLambdaBasicExecutionRole
    • AWSLambdaVPCAccessExecutionRole
    • For Secrets Manager, GetSecretValue for secrets with a prefix given in SecretNameOrPrefix
    • For Amazon S3, list, read, and write permissions for SpillBucket and ExportBucket, and list permissions for all S3 buckets
    • For Athena, GetQueryExecution

Demonstration tables

To demonstrate the Athena Vertica connector capabilities, we use the following components:

  • A Vertica database running in our AWS environment.
  • A Vertica table called orders containing details of customer orders.
  • An Athena table called customer, which has an S3 bucket as a data source. This table contains information regarding customers.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the orders table in Vertica.

The following screenshot shows the details of the orders table in Vertica.

Setting up the Athena Vertica connector project

To set up your connector project, complete the following steps:

  1. Create an S3 bucket in your AWS account. This is the bucket where the result set from Vertica is exported.
  2. Create another S3 bucket in your AWS account. This is the bucket where the code for the connector is stored and retrieved.
  3. Grant the IAM role of the EC2 machines hosting the Vertica database read and write permissions to the S3 result set bucket, allowing Vertica to export data to the bucket.
  4. Clone the GitHub repo in your local folder.
  5. Open the project in your preferred IDE.
  6. From the athena-query-federation directory, run mvn clean install.
  7. From the athena-vertica directory, run mvn clean install.
  8. From the athena-vertica directory, run ../tools/publish.sh <s3_code_bucket_name> athena-vertica [region] to publish the connector to your private AWS Serverless Application Repository.
  9. Upon successful completion of the script, the connector’s serverless application is published to the AWS Serverless Application Repository.

Deploying the connector

To deploy your connector, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Published Applications.
  2. On the Private Applications tab, select Show apps that create in order to see deployed applications.
  3. Choose the VerticaAthenaConnector serverless app.
  4. For AthenaCatalogName, enter the name of the connector Lambda function used when querying the Vertica tables (avc).
  5. For SecretNameOrPrefix, enter the prefix used to store the Vertica credentials in Secrets Manager (the default is Vertica-).
  6. For SpillBucket, enter the S3 bucket name where data is spilled in case the result set data volume crosses a certain limit (test-spill-bucket).
  7. For VerticaExportBucket, enter the S3 bucket where the result set from Vertica is exported (test-export-bucket).
  8. For VpcId, enter your VPC ID.

For VpcId, enter your VPC ID.

  1. For SpillPrefix, enter athena-spill.
  2. For SubnetIds, enter your subnet IDs.
  3. For VerticaConnectionString, enter the connection string of the Vertica database in the following format:
    jdbc:vertica://<host_name>:<port>/<database>?user=${vertica-username}&password=${vertica-password}
    

    Where, vertica-username and vertica-password are the secret names of the Vertica   user credentials stored in AWS Secrets Manager.

  1. Select I acknowledge that this app creates custom IAM roles.

Select I acknowledge that this app creates custom IAM roles.

  1. Choose Deploy.

Upon successful deployment, a Lambda function with the name given for AthenaCatalogName is deployed in your AWS environment. We use this function to issue federated queries to Vertica. The connector is now deployed and ready to use.

Using the connector

On the Athena console, you can query Vertica tables as shown in the following code. The value for <lambda_function> corresponds to the function you created in the previous section.

SELECT  
ORDER_ID,   
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
ORDER BY ORDER_ID DESC

In this example, we named the function as avc. The following screenshot shows our query results.

The following screenshot shows our query results.

This demonstrates that the newly deployed connector read the user-requested columns and the Vertica source table, wrapped an EXPORT statement around the SQL statement, and ran it in Vertica. The results of this query were exported to the specified S3 bucket (test-export-bucket) in Parquet format. The connector then invoked multiple Lambda functions to read the data from the S3 bucket using Amazon S3 Select and displayed it on the Athena console. Note that currently the connector exports Vertica timestamp and timestamptz data types as a varchar data type. Therefore we need to use the date_parse(string, format) function to convert the timestamps columns into the correct data type.

We can also create an Athena table using CTAS with the result set of the Vertica query using the following query:

CREATE TABLE default.vertica_customers_table AS (
SELECT  
ORDER_ID,
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
);

We can then use the newly created table to query the data as shown in the following screenshot.

We can then use the newly created table to query the data as shown in the following screenshot.

In addition, we can also query and join the customer data in Amazon S3 and orders data in Vertica using the following sample query:

WITH   
customer_data AS (  
  SELECT   
   	CUSTOMER_NAME,  
   	CUSTOMER_ID  
    
  FROM default.customer
  ),  
orders_data AS (  
  SELECT    
   	ORDER_ID,  
   	PRODUCT_NAME,  
   	CUSTOMER_ID   
  FROM "lambda:<lambda_function>".schema1.orders  
  )  
SELECT a.CUSTOMER_ID, b.ORDER_ID, b.PRODUCT_NAME
FROM customer_data a 
INNER JOIN orders_data b
ON a.customer_data.customer_id = b.orders_data.customer_id 
WHERE lower(b.PRODUCT_NAME) like 'pencil'
ORDER BY b.ORDER_ID DESC

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This demonstrates the ease of performing analytics across multiple platforms and data stores.

Conclusion

In this post, we introduced the Athena Vertica connector, its solution architecture, and demonstrated how to deploy the connector using the Athena Federated Query SDK. We saw how to run SQL queries on the Vertica data source. We also learned that we can use the connector to perform extract, transform, and load operations on the data in the Vertica tables and Amazon S3, enabling us to perform faster and better analytics across multiple platforms and data sources.

For more information about Athena Federated Query, see the GitHub repo.

Special Acknowledgement

Special acknowledgement goes to the Intuit Data Engineering staff Denise McInerney – Data Architect, Sanjay Rane – Group Engineering Manager – Data, and Kannan Nagarajan – Database Architect. They helped design, review, and support the development of the custom connector and architecture.


About the Authors

Kelly RaganKelly Ragan is a Senior Data Architect, Strategic Accounts Team, AWS Professional Services. He helps customers solve big data problems and wrestle with large-scale data warehouses. In his spare time, he enjoys snow skiing, bicycling, and camping in the Pacific Northwest.

 

 

Rohit MasurRohit Masur is an Associate Big Data Consultant, Data and Analytics Team, AWS Professional Services. He helps customers architect and implement solutions on AWS to get business value out of data. In his spare time, he enjoys reading books, going on long walks, and exploring new hiking trails in the Bay Area.

Automating AWS service logs table creation and querying them with Amazon Athena

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automating-aws-service-logs-table-creation-and-querying-them-with-amazon-athena/

I was working with a customer who was just getting started using AWS, and they wanted to understand how to query their AWS service logs that were being delivered to Amazon Simple Storage Service (Amazon S3). I introduced them to Amazon Athena, a serverless, interactive query service that allows you to easily analyze data in Amazon S3 and other sources. Together, we used Athena to query service logs, and were able to create tables for AWS CloudTrail logs, Amazon S3 access logs, and VPC flow logs. As I was walking the customer through the documentation and creating tables and partitions for each service log in Athena, I thought there had to be an easier and faster way to allow customers to query their logs in Amazon S3, which is the focus of this post.

This post demonstrates how to use AWS CloudFormation to automatically create AWS service log tables, partitions, and example queries in Athena. We also use the SQL query editor in Athena to query the AWS service log tables that AWS CloudFormation created.

Athena best practices

This solution is appropriate for ad hoc use and queries the raw log files. These raw files can range from compressed JSON to uncompressed text formats, depending on how they were configured to be sent to Amazon S3. If you need to query over hundreds of GBs or TBs of data per day in Amazon S3, performing ETL on your raw files and transforming them to a columnar file format like Apache Parquet can lead to increased performance and cost savings. You can save on your Amazon S3 storage costs by using snappy compression for Parquet files stored in Amazon S3. To learn more about Athena best practices, see Top 10 Performance Tuning Tips for Amazon Athena.

Table partition strategies

There are a few important considerations when deciding how to define your table partitions. Mainly you should ask: what types of queries will I be writing against my data in Amazon S3? Do I only need to query data for that day and for a single account, or do I need to query across months of data and multiple accounts? In this post, we talk about how to query across a single, partitioned account.

By partitioning data, you can restrict the amount of data scanned per query, thereby improving performance and reducing cost. When creating a table schema in Athena, you set the location of where the files reside in Amazon S3, and you can also define how the table is partitioned. The location is a bucket path that leads to the desired files. If you query a partitioned table and specify the partition in the WHERE clause, Athena scans the data only for that partition. For more information, see Table Location in Amazon S3 and Partitioning Data. You can then define partitions in Athena that map to the data residing in Amazon S3.

Let’s look at an example to see how defining a location and partitioning our table can improve performance and reduce costs. In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket, starting from the bucket name and going all the way down to the day.

In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket

Outlined in red is where we set the location for our table schema, and Athena then scans everything after the CloudTrail folder. We then outlined our partitions in blue. This is where we can specify the granularity of our queries. In this case, we partition our table down to the day, which is very granular because we can tell Athena exactly where to look for our data. This is also the most performant and cost-effective option because it results in scanning only the required data and nothing else.

If you have to query multiple accounts and Regions, you should back off the location to AWSLogs and then create a non-partitioned CloudTrail table. This allows you to write queries across all your accounts and Regions, but the trade-off is that your queries take much longer and are more expensive due to Athena having to scan all the data that comes after AWSLogs every query. However, querying multiple accounts is beyond the scope of this post.

Prerequisites

Before you get started, you should have the following prerequisites:

  • Service logs already being delivered to Amazon S3
  • An AWS account with access to your service logs

Deploying the automated solution in your AWS account

The following steps walk you through deploying a CloudFormation template that creates saved queries for you to run (Create Table, Create Partition, and example queries for each service log).

  1. Choose Launch Stack:

  1. Choose Next.
  2. For Stack name, enter a name for your stack.

You don’t need to have every AWS service log that the template asks for. If you don’t have CloudFront logs for example, you can leave the PathParameter as is. If you need CloudFront logs in the future, you can simply update the Create Table statement with the correct Amazon S3 location in Athena.

  1. For each service log table you want to create, follow the steps below:
  • Replace <_BUCKET_NAME> with the name of your S3 bucket that holds each AWS service log. You can use the same bucket name if it’s used to hold more than one type of service log.
  • Replace <Prefix> with your own folder prefix in Amazon S3. If you don’t have a prefix, make sure to remove it from the path parameters.
  • Replace <ACCOUNT-ID> and <REGION> with desired account and region.

Choose Next.

  1. Choose Next.
  2. Enter any tags you wish to assign to the stack.
  3. Choose Next.
  4. Verify parameters are correct and choose Create stack at the bottom.

Verify the stack has been created successfully. The stack takes about 1 minute to create the resources.

Querying your tables

You’re now ready to start querying your service logs.

  1. On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

  1. Choose Create Table – CloudTrail Logs to run the SQL statement in the Athena query editor.

Make sure the location for Amazon S3 is correct in your SQL statement and verify you have the correct database selected.

  1. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

The table cloudtrail_logs is created in the selected database. You can repeat this process to create other service log tables.

For partitioned tables like cloudtrail_logs, you must add partitions to your table before querying.

  1. On the Saved queries tab, choose Create Partition – CloudTrail.
  2. Update the Region, year, month, and day you want to partition. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

After you run the query, you have successfully added a partition to your cloudtrail_logs table. Let’s look at some of the example queries we can run now.

  1. On the Saved queries tab, choose Query – CloudTrail Logs.

This is a base template included to begin querying your CloudTrail logs.

  1. Highlight the query and choose Run query.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

Let’s say we have a spike in API calls from AWS Lambda and we want to see the users that the calls were coming from in a specific time range as well as the count for each user. Our query looks like the following code:

SELECT useridentity.sessioncontext.sessionissuer.username as "User",
       count(eventname) as "Lambda API Calls"
FROM cloudtrail_logs
WHERE eventsource = 'lambda.amazonaws.com'
       AND eventtime BETWEEN '2020-11-24T18:00:00Z' AND '2020-11-24T21:00:00Z' 
group by useridentity.sessioncontext.sessionissuer.username
order by count(eventname) desc

Or if we wanted to check our S3 Access Logs to make sure only authorized users are accessing certain prefixes:

SELECT *
FROM s3_access_logs
WHERE key='prefix/images/example.jpg'
        AND requester != 'arn:aws:iam::accountid:user/username'

Cost of solution and cleaning up

Deploying the CloudFormation template doesn’t cost anything. You’re only charged for the amount of data scanned by Athena. Remember to use the best practices we discussed earlier when querying your data in Amazon S3. For more pricing information, see Amazon Athena pricing and Amazon S3 pricing.

To clean up the resources that were created, delete the CloudFormation stack you created earlier. This also deletes the saved queries in Athena.

Summary

In this post, we discussed how we can use AWS CloudFormation to easily create AWS service log tables, partitions, and starter queries in Athena by entering bucket paths as parameters. We used CloudTrail and Amazon S3 access logs as examples, but you can replicate these steps for other service logs that you may need to query by visiting the Saved queries tab in Athena. Feel free to check out the video as well, where I go over how we store logs in Amazon S3 and then give a quick demo on how to deploy the solution.

For more information about service logs, see Easily query AWS service logs using Amazon Athena.


About the Author

Michael Hamilton is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife, kids, and a 2-year-old German shepherd.

How Baqend built a real-time web analytics platform using Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Wolfram Wingerath original https://aws.amazon.com/blogs/big-data/how-baqend-built-a-real-time-web-analytics-platform-using-amazon-kinesis-data-analytics-for-apache-flink/

This is a customer post written by the engineers from German startup Baqend and the AWS EMEA Prototyping Labs team.

Baqend is one of the fastest-growing software as a service (SaaS) startups in Germany, serving over 5,000 business customers with more than 100 million monthly users and $2 billion EUR revenue per year. Baqend’s main product is a one-click solution to accelerate ecommerce websites called Speed Kit. By rerouting a portion of the web traffic through Speed Kit’s caching infrastructure, it achieves a typical performance boost between 1.5–3 times faster.

To measure the impact of Speed Kit and confirm its uplift to Baqend’s customers, we maintain several dashboards that display the technical and business performance improvements achieved by Speed Kit. This requires complex aggregations of tracking data collected during A/B tests on our customers’ websites.

The Challenge: Real-time analytics and reporting at scale

One of the key issues with our legacy solution for monitoring and reporting needed to process. The raw tracking data from all users was batched through various systems, which resulted in processing delays up to 24 hours for some analytics jobs. This impacted our operations monitoring and sales activities negatively, because our customers sometimes couldn’t analyze the impact of deployment changes until the next day. Furthermore, our legacy reporting service lacked any support for custom visualization development.

This post shows you how we transformed our batch-based analytics process into a continuous complex event-processing pipeline, which is managed by Amazon Kinesis Data Analytics for Apache Flink. The new solution exhibits less than a minute of end-to-end latency from data ingestion to visual output in the dashboard.

The key topics presented in this post are:

Solution overview and key components

Following a remote planning phase in which we defined our requirements and laid out the basic design, we built the solution on an on-site prototyping engagement with AWS over the course of 4 weeks in early 2020 in Hamburg. Seven team members from Baqend and AWS EMEA Prototyping Labs implemented the following architecture.

Following a remote planning phase in which we defined our requirements and laid out the basic design.

The workflow includes the following steps:

  1. The performance tracking data is streamed by Speed Kit Amazon Elastic Compute Cloud (Amazon EC2) instances.
  2. This data goes into an Amazon Kinesis Data Streams
  3. This data stream is consumed by a Kinesis Data Analytics for Apache Flink application.
  4. The data is ingested into Amazon ES.
  5. This streaming application relies on AWS Secrets Manager to store and access the credentials for Elasticsearch with basic HTTP authentication.
  6. An Nginx proxy server application hosted on EC2 instances in multiple public subnets and Availability Zones redirects the user requests Kibana with Amazon Cognito authentication (for more information, see How do I use an NGINX proxy to access Kibana from outside a VPC that’s using Amazon Cognito authentication?).
  7. The Apache Flink application also uses Amazon DynamoDB as a backend for long-living external states required for certain operations (covered later in this post).
  8. The streaming application also delivers the raw and intermediate data outputs to an Amazon Simple Storage Service (Amazon S3) bucket to enable historical data analysis and operational troubleshooting with Amazon Athena.

Although the prototyping engagement also covered other aspects, we focus on the Kinesis Data Analytics application in the following sections of this post.

Continuous aggregation with Kinesis Data Analytics

We need to collect all kinds of technical data points on every page load of a website visitor. Details on the individual page impressions (PI) help us analyze web performance for the websites of our customers. Speed Kit provides a performance tracking functionality that collects data within the browser of every website visitor and sends it to our analytics backend.

Aggregating page impressions

Intuitively, there should be only one data beacon for any given PI because the data could be aggregated in the browser before it’s sent to our backend. Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.

Speed Kit sends several data beacons during the page load to minimize the possibility of any data loss, as shown in the following figure.

For example, static information such as the target URL or the current time can be sent away as soon the navigation starts (navigation beacon), whereas certain measurements can’t be sent until very late in the load process, like the time it took to load the entire page (load beacon). Certain events may even occur minutes after the page load, or not at all (for example, user interaction with the page or JavaScript errors) and are therefore handled via dedicated and optional transmissions (event beacons). These beacons need to be correlated in our analytical backend later on.

Aggregating session data

Because some of the most interesting metrics are computed on the level of user sessions, aggregating all data beacons for the individual PIs isn’t enough to analyze web performance. For instance, the user engagement metrics are often quantified by the number of pages visited in one sitting (session length) or the share of users that left on the very first page (bounce rate).

Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.

Aggregating relevant information may even involve identifying and removing duplicates, as illustrated in the following figure.

Suppose the user first checks out the landing page and immediately leaves (Session 1), and then comes back later to browse through some products and buy some blue shoes (Session 2), and finally returns after a few hours to reload the order confirmation page and browse some more products (Session 3). Because Session 3 starts with a reload of the order confirmation page, tracking data on the order that was completed in Session 2 is transmitted a second time, resulting in a potentially duplicated count of the completed orders. Therefore, our analytical backend needs to identify the duplicated tracking information as such and ignore it for further analysis. To enable this, we persistently store a salted hash of every order ID and simply have the aggregation pipeline drop the tracking data on any order that has already been written to the external key value store (see the diagram in the following section).

Anatomy of the streaming application

The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.

The following diagram shows our event processing pipeline from raw data collection to the storage of aggregation results.

The workflow is as follows:

  1. The first step is tracking the data within the browsers of the end users.
  2. The data is sent to Kinesis Data Streams for consumption through a custom stateful Apache Flink process function within a Kinesis Data Analytics application.
  3. Raw data beacons are initially normalized and invalid data beacons are delivered to Amazon S3 via side outputs to facilitate later analysis of all data that has been sorted out.
  4. As mentioned earlier, we use a DynamoDB table to run a deduplication rule over all incoming order data (confirmation pages) by the DynamoDB Transactions API. We also use another DynamoDB table to identify bot traffic by storing the user agent strings that have been associated with suspicious behavior consistently (because they belong to web crawlers). Finally, the stream of cleaned tracking beacons is processed in stateful window aggregation steps for storage.
  5. We aggregate all beacons referring to the same PI and write them off to our data lake on Amazon S3 to enable offline analysis with Athena.
  6. Furthermore, we compile the tracking beacon stream into 1-minute summaries containing both PI and session data for storage via Amazon ES to enable efficient reporting with Kibana.

State storage and application management

Most of the application state for the streaming application is held in the built-in RocksDB state backend with incremental checkpointing. This default built-in state storage mechanism depends on a 50 GB storage limit provided for each Kinesis Processing Unit (KPU) allocated to a Kinesis Data Analytics application. On the other hand, we used DynamoDB tables to store the state permanently for unique conversions and user agent strings in order to decouple historical state for these two data types from Apache Flink application management and to keep the checkpointing duration and size under control. Using DynamoDB for these two use cases helps to control the overhead for creating and restoring checkpoints and thereby controls the application startup time.

Workload distribution and scalability

As of February 2021, our processing pipeline handles over 2.8 billion tracking beacons per month, which corresponds to more than 500 million individual PIs from over 140 million user sessions and more than 100 million unique users. Achieving this scale requires even distribution of both processing and storage load across all stream partitions. Therefore, we use randomly generated session IDs as a partitioning key for the input Kinesis data stream and throughout most of the remaining sections of our pipeline.

In the presence of certain anomalies such as heavy bot traffic, a load skew may occur regardless, which may impair overall throughput or even crash the entire application in extreme cases. We monitor the number of incoming and outgoing records (to derive the current buffer size) for the individual Apache Flink operators in every stream partition to identify issues with the load distribution quickly and generate alert notifications via multiple channels (such as Slack and email) if the measurements for different stream partitions diverge significantly. For convenience, we further visualize custom Amazon CloudWatch metrics in a Grafana dashboard.

Event processing, delivery semantics, and fault tolerance

The application restarts and downtime (such as during and after application deployment) can be handled seamlessly by using Apache Flink’s event time processing semantics as generated output is independent of the wall-clock time of the processing nodes. All processing is based on monotonically increasing ingestion timestamps to eliminate the possibility of late arrivers. While our data cleaning procedure identifies the invalid records, it never drops any data items from the stream, but instead it only attaches information on the detected issue to the data item in question. This approach enables us to analyze the frequency and distribution of every problem in our aggregation pipeline by using the same Kibana dashboard.

Even though the data ingestion to Amazon ES provides at-least-once delivery guarantees by default, we managed to achieve exactly-once delivery guarantees from the source Kinesis data stream to the Elasticsearch index by generating document identifiers in a deterministic fashion. Therefore, the data stream can be replayed safely because the existing data records are overwritten on re-insertion into the Elasticsearch index.

Data retention and multi-tenancy in Amazon ES

We store pre-aggregated data at the minute level in Amazon ES to make sure our Kibana dashboard remains responsive even when analyzing a scope of weeks or months. As illustrated in the following figure, the Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.

Apache Flink application summarizes the raw tracking data along different dimensions (browser, device, test group, and aggregation time in minutes) before writing it to Elasticsearch.

The Elasticsearch documents are composed of bucketed histogram data for performance timers such as the First Contentful Paint (FCP) instead of the actual timer values. Running queries over these aggregates instead of the raw data minimizes query run costs significantly: traffic-heavy customers may have tens of millions of raw tracking beacons in a single week, whereas the number of 1-minute buckets is several orders of magnitudes lower (for small and large customers alike). We observe over 5 times more PIs and 30 times more raw beacons than aggregates stored in Elasticsearch across all of our customers.

We store the data for different customers in separate indexes generated for a fixed temporal rolling period by the Apache Flink Elasticsearch Sink Connector. We also implemented customer-specific retention policies in Amazon ES by deleting the old indexes as required. Our deployment is multi-tenant so that our customers can receive fine-grained access only to their own data stored in the indexes created for them.

Kibana for continuous reporting

We used Kibana to build our dashboards because it provides powerful and easy-to-create built-in visualizations and virtually boundless flexibility through custom Vega chart visualizations. Kibana also works well in combination with Elasticsearch indexes, thereby facilitating the role-based access management that enables us to provide individual customers access to the data in our multi-tenant dashboard.

Easy data exploration

The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.

The following illustration shows one of the standard visualizations in Kibana that we use to understand the distribution of device types and browsers used by website visitors.

Real-time histogram visualization

Illustrating the distribution of performance metrics requires using a custom visualization. The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.

The following custom Vega histogram chart illustrates by the concrete performance metric LCP how Speed Kit improves the webpage load time.

In comparison with the vanilla website where page loads are almost never faster than 2 seconds (pink area), Speed Kit-accelerated end users experience comparatively faster and even sub-second level load times (blue area).

Because our main business revolves around accelerating our customers’ websites, visualizing the actual uplift is critical for all developers (to debug performance and identify issues quickly) as well as our customers (highlighting the value of our product). With the continuous aggregation and reporting solution outlined in this post, we were able to satisfy all these requirements in a scalable and fully managed fashion.

Conclusion and future directions

In this post, we shared our journey from a high-volume batch analytics solution to a continuous aggregation pipeline using Kinesis Data Analytics for Apache Flink. Key aspects are:

  • End-to-end processing time is reduced from 24 hours to sub-minute latency.
  • We implemented a fully functional prototype within 4 weeks. The AWS Prototyping team enabled us to build our system on a multitude of managed AWS services.
  • The system was used with production load after 8 weeks.
  • The new system based on the Kinesis Data Analytics for Apache Flink application exhibits extreme scalability as it handles workloads with ease that were infeasible for the old system. As of February 2021, our system processes more than 500 million page loads from over 100 million unique users every month.
  • Elasticsearch and Kibana with customized Vega visualizations provides flexible and continuously updating dashboards for all our customers.

Additional Resources

For more details on the challenges and solutions discussed in this article, we recommend the following resources:

We would be glad to get feedback on our work, so please drop us a line in case of any remaining questions!


About the Authors

Wolfram “Wolle” Wingerath heads the data engineering team that is responsible for developing and operating Baqend’s infrastructure for analytics and reporting.

 

 

 

Florian Bücklers is Baqend’s Chief Technology Officer and therefore responsible for coordinating between the different teams for front-end and backend development, devOps, onboarding, and data engineering.

 

Benjamin Wollmer develops data-intensive systems at Baqend, but he is also doing his PhD at the University of Hamburg and therefore likes to read and write about related topics.

 

 

Stephan Succo is one of the core developers of Baqend’s continuous analytics pipeline.

 

Jörn Domnik is a Senior Software Engineer at Baqend with a focus on backend development and reliability engineering.

 

 

 

As a DevOps engineer, Virginia Amberg monitors cluster health and keeps all systems running smoothly at Baqend.

 

 

As a Principal Prototyping Engagement Manager in AWS, Markus Bestehorn is responsible for building business-critical prototypes with AWS customers and is a specialist for IoT and machine learning.

 

 

 

As a Data Prototyping Architect in AWS, Anil Sener builds prototypes on big data analytics, data streaming, and machine learning, which accelerates the production journey on the AWS Cloud for top EMEA customers.

 

 

As B2B Strategic Account Manager for Startups at AWS, Daniel Zäeh works with customers to make their ideas come true and helps them grow, by connecting tech and business.

 

 

 

 

 

 

Building AWS Data Lake visualizations with Amazon Athena and Tableau

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/building-aws-data-lake-visualizations-with-amazon-athena-and-tableau/

Amazon Athena is an interactive query service that makes it easy to analyze data in a data lake using standard SQL. One of the key elements of Athena is that you only pay for the queries you run. This is an attractive feature because there is no hardware to set up, manage, or maintain.

You can query Athena with SQL or by using data visualization tools such as Amazon QuickSight, Tableau, or other third-party options. QuickSight is a cloud-native business intelligence (BI) service that you can use to visually analyze data and share interactive dashboards with all users in your organization. QuickSight is fully managed and serverless, requires no client downloads for dashboard creation, and has a pay-per-session pricing model that allows you to pay for dashboard consumption with a maximum charge of $5.00 per reader per month. The combination of QuickSight and Athena allows you to rapidly deploy dashboards and BI to tens of thousands of users, while only paying for actual usage, and not worrying about server deployment or management. Tableau allows you to similarly share dashboards with readers when utilizing Tableau servers. This post demonstrates how you can connect to Athena from a Tableau desktop, create a dashboard that queries data from Athena, and publish to a Tableau server for broader distribution.

Integration of Tableau with Athena as a data source is gaining in popularity, and many customers prefer to create Tableau dashboards on Athena. Performance of the dashboard usually varies based on many factors, such as number of fields and views, data size, network bandwidth, Athena query runtime, dashboard rendering time, connection type, and location of the Tableau server (on premises or AWS). We walk you through the required configuration to integrate Tableau with Athena, best practices, and Athena runtime analysis.

Solution overview

In this solution, we build an end-to-end Tableau dashboard using Athena as a data source for data analysts. The dashboard has two views:

  • Student’s study time based on age group and gender
  • Geo location of the students

The dashboard is very helpful in analyzing a student’s performance in the class and their health condition.

You walk through the following steps:

  1. Configure Tableau to connect to Athena.
  2. Connect Tableau Desktop to Athena to build dashboard.
  3. Create a Tableau dashboard and publish to Tableau server.
  4. Analyze the Tableau dashboard.

We also review best practices and design patterns of Tableau development with Athena.

The following diagram illustrates the architecture of this solution.

The following diagram illustrates the architecture of this solution.

Prerequisites

You need the following prerequisites before you can proceed with solution:

Configuring Tableau to connect to Athena

Athena connects to Tableau via an Athena JDBC driver. Complete the following configuration steps:

  1. Install the appropriate version of 64-bit Java. A minimum JDK 7.0 (Java 1.7) is required.
  2. Download the JDBC driver (.jar) file that matches with your version of the JDK.
  3. Move the downloaded .jar file to the following location, based on your operating system:
    1. For Windows, use C:\Program Files\Tableau\Drivers.
    2. For Mac, use ~/Library/Tableau/Drivers location.

Setting up Athena

For this use case, you create an Athena table called student that points to a student-db.csv file in an S3 bucket. Additionally, you create the view student_view on top of the student table. You build the Tableau dashboard using this view. You expose only a subset of columns from the student table in the view.

You can download the Athena DDL (student.sql and student_view.sql) and data file student-db.csv from GitHub repo.

  1. On the Amazon S3 console, upload the student-db.csv file in the S3 bucket you created as a prerequisite.

  1. On the Athena console, use the following DDL statement in the query editor to create your studentdb database:
    CREATE DATABASE studentdb;

For more information, see Creating Databases in Athena.

  1. Choose the studentdb database and use the following DDL statement to create the student table (provide the name of your S3 bucket):
    CREATE EXTERNAL TABLE student(
      `school` string, 
      `country` string, 
      `sex` string, 
      `age` string, 
      `studytime` int, 
      `failures` int, 
      `preschool` string, 
      `higher` string, 
      `remotestudy` string, 
      `health` string)
    ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY ',' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://<your_bucket_name>/'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'skip.header.line.count'='1', 
      'transient_lastDdlTime'='1595149168')

  1. Use the following DDL statement to create the student_view view. This creates an Athena view with a limited number of fields to build the Tableau dashboard.
    CREATE OR REPLACE VIEW student_view AS 
    SELECT
      "school"
    , "country"
    , "sex"
    , "age"
    , "health"
    , "studytime"
    ,"failures"
    FROM
      student

Now if you query student_view on the Athena console with a select * SQL statement, you can see the following output.Now if you query student_view on the Athena console with a select * SQL statement, you can see the following output.
Connecting Tableau Desktop to Athena

Athena connects to Tableau via a JDBC driver. With the Amazon Athena connector, you can quickly and directly connect Tableau to their Amazon S3 data for fast discovery and analysis, with drag-and-drop ease. Tableau Desktop is used to create worksheets, dashboards, stories connecting to data sources which can be files or server.

In this section, you connect Tableau Desktop to Athena.

  1. Open Tableau Desktop.
  2. On the navigation pane, choose More.
  3. Choose Amazon Athena.

Choose Amazon Athena.

  1. For Server, enter athena <region>.amazonaws.com. 

Use the Region that you’re using to set up the Athena table and view. For more information, see Amazon Athena endpoints and quotas.

  1. For Port, enter 442.
  2. For S3 Staging Directory, enter the path of the Amazon S3 location where you want to store query results.

The path is available on the Settings page on the Athena console, under Query result location.

 

  1. For Access Key ID and Secret Access Key, enter the appropriate values.

After you sign in, you create a dashboard by selecting your database and available table or view.

  1. Choose student_view.
  2. Drag and drop this view as a data source.

Drag and drop this view as a data source.

  1. Create the worksheet country-wise, as per the configuration in the following screenshot.

Create the worksheet country-wise, as per the configuration in the following screenshot.

  1. Create another worksheet called age-wise.

Create another worksheet called age-wise.

  1. On the Dashboard menu, choose New Dashboard.
  2. Drag and drop country-wise and age-wise.

Drag and drop country-wise and age-wise.

You have created a Tableau dashboard successfully, your data is in Athena, and you’re ready to share it with the rest of your organization by publishing the dashboard. Before you publish, you need to configure the plan to refresh the Athena data sources used by the Tableau dashboard.

Two options are available:

  • Live connection – A Tableau live connection offers real-time updates, with any changes in the data source reflecting immediately in Tableau.
  • Data extract – Data extracts are snapshots of data optimized into system  memory to be quickly recalled for visualization. Extracts are likely to be much faster than live connections, especially in complex visualizations with large datasets, filters, calculations, and so on. For more information, see Refresh Extracts.
  1. On the Server menu, choose Publish Workbook.

On the Server menu, choose Publish Workbook.

After you share the link with end-users, they can view the dashboard.

After you share the link with end-users, they can view the dashboard.

The Tableau dashboard run and refresh creates a query in Athena for each visualization. To view the query, choose the History tab on the Athena console.

To view the query, choose the History tab on the Athena console.

The following code is the query generated from the country-wise visualization:

SELECT "student_view"."age" AS "age",
  "student_view"."sex" AS "sex",
  SUM("student_view"."studytime") AS "sum:studytime:ok"
FROM "studentdb"."student_view" "student_view"
GROUP BY "student_view"."age",
  "student_view"."sex"

The following code is the query generated from the age-wise visualization:

SELECT "student_view"."country" AS "country",
  SUM("student_view"."studytime") AS "sum:studytime:ok"
FROM "studentdb"."student_view" "student_view"
GROUP BY "student_view"."country"

Analyzing the Athena query runtime

The Tableau dashboard is a collection of several views and visualizations. The dashboard runtime depends on many factors, including:

  • Database query execution time
  • Data transfer time
  • Dashboard visualization rendering time
  • Number of visualizations, filters, data volume, total number of fields, number of rows, KPI calculation complexity, custom scripts runtime, and so on.
  • Number of concurrent users
  • Workload and Tableau Server sizing

Each Tableau dashboard visualization generates an Athena query. Each query that runs is known as a query execution. The query execution has a unique identifier known as the query ID or query execution ID. You can identify Tableau dashboard queries on the Athena console using query IDs. The query IDs for Tableau queries can be found from the driver logs. For more information, see Enable Driver Logging for Amazon Athena Using a .properties File.

You can further check the query runtime on the Athena console. On the History tab, search for the query with the query ID. The following screenshot shows the search results for the student age query.

The following screenshot shows the search results for the student age query.

The following screenshot shows the search results for the student country query.

The following screenshot shows the search results for the student country query.

In this use case, you have two queries from two visualizations. Both queries start at the same time, and the query runtime is 1.22 seconds.

Best practices on Athena to Tableau integration

The following best practices may be useful as you build Tableau dashboards on Athena:

  • There may be use cases where you want to create complex queries as views by joining multiple tables in Athena. You use these queries in Tableau to build the dashboard. The runtime of these views can take a while to complete because it depends on the underlying complexity of the view, such as the number of joins and filters. This isn’t ideal for live database connections, but it works well in a data extract model where you can refresh the data in Tableau on a schedule.
  • By using views and extracts, you can also minimize Athena costs. You only run the query one time during extraction and then publish the extract to Tableau. This means you can be efficient in leveraging the Tableau Hyper engine while minimizing your costs in Athena.
  • Tableau data extract provides performance benefits, but there are situations where you need live data. In this case, data extract isn’t an option, and you can choose Tableau live connection.
  • You can partition your dataset. Partitioning divides your table into parts and keeps the related data together based on column values such as date, country, and region. Data partition restricts the amount of data scanned by each query, thereby improving performance and reducing cost.
  • Rather than query the CSVs directly in Athena, you can write the data to Amazon S3 as Apache Parquet or Apache ORC files, which is an optimized columnar format that is ideal for analytic queries. For more information about performance tuning, see Top 10 Performance Tuning Tips for Amazon Athena.
  • You can convert your existing data to Parquet or ORC using Apache Spark or Apache Hive on Amazon EMR or AWS Glue. For more information, see Analyzing Data in S3 using Amazon Athena. See also the following resources:
  • You can also use the Athena Create Table As feature to convert to Parquet format. The following query converts the student CSV data to Parquet and creates a student_parquet table (provide the S3 bucket name where you want to store the Parquet file):
    CREATE TABLE studentdb.student_parquet
        WITH (
              format = 'PARQUET',
              parquet_compression = 'SNAPPY',
              external_location = 's3:// <BUCKET_NAME>/parquet_files'
        ) AS SELECT * FROM student

The following table compares query data scanning and run times between the two Athena tables.

TableQueryData ScannedRuntime
studentSELECT * FROM "studentdb"."student";23.17 KB0.63 seconds
student_parquetSELECT * FROM "studentdb"."student_parquet";1.93 KB0.49 seconds

The following best practices may be useful as you deploy Tableau Server on AWS:

  • Choose the right Amazon Elastic Compute Cloud (Amazon EC2) instance type based on your workload. A total of 8 CPU cores (16 AWS vCPUs) and 64 GB RAM are recommended for a single production EC2 instance. The typical EC2 instance types to use for Tableau Server is C5.4xlarge, m5.4xlarge, and r5.4xlarge.
  • You should test your workload to check any performance bottleneck due to CPU. You can use a different set of Tableau dashboards to check the performance and add more CPU as required.
  • EC2 instances with insufficient RAM may cancel out the benefit of high-end CPU. You should choose to run with 64+ GB RAM for production workloads. Although it’s important to choose an instance with sufficient CPU, running Tableau Server on instances starved for RAM may lead to degraded performance.
  • Tableau Server has several processes and components, including a database (PostgreSQL) that stores the system’s metadata. Tableau Server needs a high level of disk throughput in order to perform well, and it’s recommended to use Amazon Elastic Block Store (Amazon EBS) SSD (gp2) or provisioned IOPS volumes. Magnetic disks are not recommended. Provision a minimum 30–50 GB volume for the operating system and over 100 GB for Tableau Server.

For more information, see Optimizing Performance of Tableau Server on AWS.

If you have Tableau client and server deployments, you can use Athena to directly connect to data lakes in Amazon S3 and pull the latest information on dashboards, either via a live connection or with an extract. QuickSight offers similar capabilities, with the option of direct connection to Athena, or via periodic refresh of data into SPICE. SPICE is a high-performance in-memory data store that natively provides high concurrency and high availability without the need for any server sizing, setup, or management. QuickSight also provides centralized, AWS native IAM-based control over access to Athena, which removes the need to store individual user credentials in client software on individual user machines.

Cleaning up

To avoid incurring future charges, delete the data file from the S3 bucket.

  1. On the Amazon S3 console, choose the bucket where you uploaded student-db.csv.
  2. Select student-db.csv.
  3. On the Actions menu, choose Delete.

Conclusion

In this post, you’ve seen how to connect Tableau to Athena and start gaining insights into data. This post also discussed the best practices when building a Tableau dashboard on Athena.


About the Author

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration and strategy. He is passionate about technology and enjoys building and experimenting in Analytics and AI/ML space.

How EMX reduced data pipeline costs by 85% with Amazon Athena

Post Syndicated from Gary Bouton original https://aws.amazon.com/blogs/big-data/how-emx-reduced-data-pipeline-costs-by-85-with-amazon-athena/

This is a guest blog post by Gary Bouton and Louis Ashner from EMX. In their own words, “ENGINE Media Exchange (EMX) is a leading marketing technology company, leveraging a patented, end-to-end tech stack purpose-built to meet the demands of today’s digital marketplace. The company creates both open- and closed-loop solutions designed to unify advertisers, platforms, and publishers across digital media channels—including advanced TV, video, display, search, and social.”

While recognized as an independent solutions provider for the digital media landscape, EMX also serves as the technology and programmatic division for its parent, ENGINE—a global data-driven marketing company serving advertising’s most recognized brands.

In the past, we used typical legacy data warehouse solutions for our data pipeline. We needed massive clusters to house all our raw data as well as advanced pipelines to import that data into the final database. We then needed to query the raw data clusters to aggregate and move it into a separate cluster for more frequent querying. This process was not only time consuming, but also quite expensive, because these legacy database clusters weren’t cheap in order to house that much data.

Then came Amazon Athena, which allowed us to not only simplify our pipeline, but also save on costs significantly. We were able to simply route the raw data straight to Amazon Simple Storage Service (Amazon S3) at minimal storage costs, then query the data with Athena to move aggregate data into small Amazon Redshift clusters for querying more frequently. Athena’s querying is not only quick, but the query costs are mere pennies when the table is set up correctly with partitions and the query utilizes them properly. Additionally, we can increase our data retention time, because the cost of storing data in Amazon S3 is significantly cheaper than an ever-growing legacy data warehousing solution.

In short, Athena allowed us to simplify our data pipeline while saving 85% on data storage costs at the same time.

This post discusses the following:

  • Why EMX digital chose Athena for its backend ETL workflow
  • How EMX manages Athena performance and run time
  • How EMX continues to scale Athena with new products and create coherent workflows
  • The benefits of this solution for EMX

Advantages of a robust backend ETL workflow when dealing with fast and furious big data

For most companies, data is an ever-growing problem. The volume, velocity, variety, and veracity of its availability can be performance limiting and a financial burden.

Data is very important to us at EMX; we process over 450,000 requests per second, which we clean, audit, and deliver for reporting and optimizations to keep our clients informed and current in an ever-changing ad space.

To do this, we have to have a backend system that is robust, on time, available, and cost effective in order to meet the demands of our split-second decision-making.

Why Athena is the right tool for EMX

As detailed below, Athena’s pay-per-query pricing model, performance and reliability at scale, and ease of use made it the right tool for us.

  • Scale – EMX processes over 2 TB of raw and sometimes unstructured data every hour for reporting and optimizations. The ability to run these jobs without managing the cluster optimizations directly allows the team to focus on more research and product goals. Using Athena allows us to focus more on research, product development ideas, and ad hoc tasks, and alleviates us from having to take time to estimate the process and computational power needed to complete jobs in time.
  • Cost – Cost per query is at least four times cheaper than other backend ETL tools, and its on-demand nature means we only pay for what we use. We’re no longer losing increasing costs by keeping up a system that isn’t being used. The feedback of cost per query through Athena also allows us to tune and optimize our logic, to not only reduce that cost further but test new ways to split into and run our production ETL jobs.
  • Resilience – We have thrown everything but the kitchen sink problems at Athena while building out our production pipeline, and were impressed at the lack of failure from the service. Even though we don’t directly own the resources to the cloud solution of Athena, it has always had high availability. In instances where availability was hampered, Athena has made it easy and straightforward to add in workflow hooks to retry failed jobs when a queue becomes available.
  • Ease of use – Unlike most competitor offerings, Athena works out of the box. It’s very easily customized using the Athena GUI, or you can build your own roles, rules, database structures, and projections. The documentation for tuning AWS performance with Presto is very easy and straightforward, making it a small learning curve for any new user.
  • Data transformations – Athena’s robust Presto query language allows us to perform regex, quartile, and percentile statistics without resorting to an outside transformation step in Python or other languages. Going further, using window functions inside those same queries allows us to do some of the heavy mathematical lifting we would have needed to do outside of the backend process, thus saving cost and time. With Athena, these extra vital steps see no difference in cost or performance to our backend pipeline and allow us to condense complicated parts into one step.

Why we continue to grow with Athena

We continue to grow with Athena for the following reasons:

  • Future scale – Athena and its team keep improving and adding resources that support our ever-growing data needs, which have increased by 200% since Athena’s implementation. This has served as the bedrock to our backend solutions.
  • Improvements – The Sales and Engineering team at AWS has always been open to feedback and has turned that into better error reporting, work groups for Athena, changes in policy, and workload management through roles. This has allowed us to split Athena resources with workgroups from production-level work to running ad hoc jobs in future updates.
  • Cost is king – Every dollar we have saved through Athena has been put into products to make Athena better for us. Using Athena has allowed us to improve our front-end delivery products—from building our own workflows right into Athena, to taking time to work with the right compression for Athena ingestion, and even offloading more work that would have gone to a traditional ETL box. Cost for us is not just dollars but the time it takes to manage; that time saved is allowing us to be on the bleeding edge in development of new tools to deliver the data Athena helps us serve.

The following sections detail how EMX uses Athena to build, manage, and orchestrate its backend ELT work with minimal coding and maintenance.

Solution architecture

The following diagram shows the architecture EMX uses.

The following diagram shows the architecture EMX uses.

How we use Athena

Our custom scripts stream batched data each minute from auction servers directly to raw S3 buckets. The data is dropped in a .gzip format to datetime-partitioned S3 buckets. This partition structure helps us limit our Athena query scan. For example, the partitioned buckets look like the following screenshot.

For example, the partitioned buckets look like the following screenshot.

When the data has reached these partitioned buckets, EMX uses Apache Airflow to schedule various jobs across Athena. The following screenshot shows our DAG for our most-used pipeline.

The following screenshot shows our DAG for our most-used pipeline.

Before beginning to run Athena queries, we run two checks on our data in Amazon S3:

  • Check if all the expected data has arrived and is in the bucket.
  • Check logic match rules and clean illegal fields in the data.

On the success of both tasks, we start adding the latest partition to an Athena table.

When the partition is added to the table, we start running the query. The query status is polled every 10 seconds to get the latest status on the query performance until completion. The query returns the status as success, failed, or canceled. Depending on what query status is returned, further tasks are then forked.

At times, we have noticed queries fail with the error Query resource exhausted at this scale, which usually goes away on triggered retries. For the same reason, we have a retry mechanism in place on the execute_athena_sql task. If the retry fails, it alerts the team and data is copied over to a debug bucket for further investigation. If it succeeds, it moves ahead with further transformation.

For further transformation, we get the output of the Athena query back in Amazon S3, and then we add the business rules to enrich the data in Amazon S3.

Based on the pipeline logic, the data is then copied from Amazon S3 to different data stores, one of them being Amazon Redshift.

The last step is to clean up the metadata that was generated by the Athena query.

The following is an example from one of our pipelines. This Athena table is projected on top of the partitioned buckets, and the table is also partitioned by datetime so that the table can be read off the data directly when it’s ready. The following screenshot is what the sample table campaigns_stream looks like, which reads the data from the aforementioned bucket.

The following screenshot is what the sample table campaigns_stream looks like, which reads the data from the aforementioned bucket.

As soon as our scheduled jobs are triggered, the job runs data checks, data matches, and complex SQL queries on this table using the latest date partition, which was loaded in the last DAG task, which limits the data scan and reduces costs. The results are generated and pushed to the S3 reporting bucket to be picked up by other processes. The results can be generated in different formats like CSV, Apache Avro, and Apache Parquet using the CTAS or INSERT INTO command.

For example, running the following simple count query for each domain scans approximately 1.65 TB of data and gives back the results in less than 600 seconds, without needing us to set up or manage any infrastructure.

For example, running the following simple count query for each domain scans approximately.

When the query is complete and the output files in the S3 reporting bucket are ready, they’re picked up by our DAG and pushed into data storage like Amazon Redshift.

Optimization on Athena

By default, Athena has a soft limit of 20 DML active queries (CTAS). When we have multiple jobs running in parallel, we may hit that limit, delaying our time-sensitive pipelines and jobs. To overcome this, we allocated a fixed time window in each hour for our most critical pipelines, and other jobs with lower priority are run later.

For example, our production pipelines get priority 1 – with window minute 0 to minute 15 of every hour. We’re aware that we can request a limit increase from AWS, but we instead decided to use this opportunity to improve the resilience and robustness of our system.

Conclusion

“Build, don’t buy” has been EMX’s motto. It drives our innovation forward, much like Athena continues to be able to solve all the questions we ask of it. We build boutique and large-scale solutions for our advertising clients, which require a malleable and robust ETL backend that takes the work and cost to a manageable level. We built an ever-scaling, cost-effective, and highly available ETL backend with Athena.

Our successes with Athena are shown through both time and cost savings, including:

  • 30% of the time used on maintenance of a traditional ETL structure is now moved into Athena improvement, which sees improved feedback in reduced costs that we can pass on to our clients
  • Four times less cost per query than competitors has allowed us to put money into different tools for storage and modeling, giving even more entropy to driving more revenue for our clients and less cost
  • 10 times less technical debt in Athena setup, research, staging, and production, which goes back into other future-thinking projects

What we can do with data is only limited by the time we need in herding, cleaning, and delivering this data for insights and development. Since throwing 100% of our ETL backend systems into Athena, we have increased product delivery and systems optimization four-fold in only a quick short year. Athena and the Athena team continue to grow with us even as our data needs begin to soar exponentially, adding more tools that reduce workflows, management, and job distribution in the AWS ecosystem itself. This entropy between EMX and Athena has resulted in increased cooperation and more business with us and our growing lists of clients.

Our “Why” is building the tools for the future, and Athena personifies our “Why” in delivering what EMX is about: scale, on time, and delivery of data optimized for the modern era.


About the Authors

Gary Bouton is VP of Data Engineering at ENGINE Media Exchange and leads their Data Engineering and Data Science Product teams. Pipeline implementation is led by Director of Data Pipeline Rahul Gupta, Senior Engineer Nader S. Gharawi, Data Science Engineer Raghav Gupta. Data model implementation is led by Senior Data Scientist Gabrielle Agrocostea , and Data Scientist Heena Otia.

 

Louis Ashner is EVP of Technology at ENGINE Media Exchange. He has a passion for making the Internet faster, and is an ad-tech pioneer with more than 10 years of experience working with digital advertising, including real-time bidding and programmatic advertising. His 9 patents in networking optimization and data caching are used to power EMX’s proprietary ad exchange.

Detecting anomalous values by invoking the Amazon Athena machine learning inference function

Post Syndicated from Amir Basirat original https://aws.amazon.com/blogs/big-data/detecting-anomalous-values-by-invoking-the-amazon-athena-machine-learning-inference-function/

Amazon Athena has released a new feature that allows you to easily invoke machine learning (ML) models for inference directly from your SQL queries. Inference is the stage in which a trained model is used to infer and predict the testing samples and comprises a similar forward pass as training to predict the values. Unlike training, it doesn’t include a backward pass to compute the error and update weights. It’s usually the production phase where you deploy your model to predict real-world data. Using ML models in SQL queries makes complex tasks such as anomaly detection, customer cohort analysis, and sales predictions as simple as invoking a function in a SQL query.

In this post, we show you how to use Athena ML to run a federated query that uses Amazon SageMaker inference to detect an anomalous value in your result set.

Solution overview

To use ML with Athena (Preview), you define an ML with Athena function with the USING FUNCTION clause. The function points to the Amazon SageMaker model endpoint that you want to use and specifies the variable names and data types to pass to the model. Subsequent clauses in the query reference the function to pass values to the model. The model runs inference based on the values that the query passes and returns inference results.

You can use more than a dozen built-in ML algorithms provided by Amazon SageMaker, train your own models, or find and subscribe to model packages from AWS Marketplace and deploy on Amazon SageMaker hosting services. No additional setup is required. You can invoke these ML models in your SQL queries from the Athena console, Athena APIs, and through the Athena JDBC driver.

To detect anomalous values, we use the Random Cut Forest (RCF) algorithm, which is an unsupervised algorithm for detecting anomalous data points within a dataset.

Prerequisites

This post continues the work done in this blog. You need to follow steps in that post to run the AWS CloudFormation template before proceeding with this post. No additional setup is required.

As part of the CloudFormation stack that you run to build the environment, we create a new AWS Identity and Access Management (IAM) role that Amazon SageMaker uses to run an Athena query to generate our training dataset, train a new model, and deploy that model to an Amazon SageMaker endpoint. To perform these tasks, our IAM role should have AmazonSageMakerFullAccess, AmazonAthenaFullAccess, and AmazonS3FullAccess managed policies. In a production setting, you should scope down the AmazonS3FullAccess policy to include only the Amazon Simple Storage Service (Amazon S3) buckets that you require for training your model.

Additionally, we create a new Amazon SageMaker notebook instance using an ml.m4.xlarge instance type. We use the ARN of the IAM role for Amazon SageMaker as the IAM role that this notebook uses when interacting with other AWS services.

Uploading and launching the Jupyter notebook

To upload and launch your Jupyter notebook, complete the following steps:

  1. On the Amazon SageMaker console, choose Notebook Instances.

You can see a workshop notebook instance of size ml.m4.xlarge, which you created when you deployed the CloudFormation stack.

  1. Select the instance and choose Open Jupyter.
  2. Download the Jupyter notebook file that we provide as part of this post.
  3. Upload the file to Jupyter.
  4. Choose the file and open the Python code so you can go through it step by step.

Running the Python code

You now run the Jupyter notebook Python code on the console, starting from the first cell.

Make sure to update the S3 bucket defined in the second cell of the notebook by replacing the bucket name with your S3 athena-federation-workshop-******** bucket, which you created when deploying the CloudFormation template. This bucket name in your account is globally unique, and we use this bucket to store our training data and model.

In the third cell, we call a federated query against the orders table on the Aurora MySQL database using the lambda:mysql connector that we defined and used in the previous post. This query generates a training dataset for number of orders per day.

After running the fourth cell and waiting for a few seconds, you should see the training dataset.

When you build, train, and deploy your ML model on Amazon SageMaker, you normally have a model training phase and a deployment phase. At the end of your deployment, Amazon SageMaker provides you with an endpoint that your client application can interact with to input data and get the inference response back. This endpoint is what we use in our SQL query to call the ML function for inference.

In the fifth cell, we train an RCF model to detect anomalies and we deploy the model to an Amazon SageMaker endpoint that our application or Athena query can call. This part can take up to 10 minutes before the training job is complete, after which you get a generated Amazon SageMaker endpoint. Record this endpoint name; we need this in our Athena federated query.

Running an Athena ML query

On the Athena console, check your workgroup and make sure that you’re switched to the AmazonAthenaPreviewFunctionality workgroup. This workgroup enables Athena ML capabilities for your query while this functionality is in preview.

Run the saved query DetectAnamolyInOrdersData after replacing the endpoint name with the one that you generated from your Amazon SageMaker notebook run.

Amazon SageMaker RCF is an unsupervised algorithm for detecting anomalous data points within a dataset. These are observations that are distinguishable from well-structured or patterned data. In the preceding results, the RCF algorithm associates each data point an anomaly score. Low score values indicate that the data point is considered normal. High values indicate the presence of an anomaly in the data. The definitions of low and high depend on the application, but common practice suggests that scores beyond three standard deviations from the mean score are considered anomalous.

Cleaning up

When you finish experimenting with the features as part of this post, remember to clean up all the AWS resources that you created using AWS CloudFormation and during the setup.

  1. On the Amazon S3 console, empty the S3 bucket the CloudFormation template created. AWS CloudFormation can only delete the bucket if it’s empty.
  2. On the AWS CloudFormation console, delete all the connectors so they’re no longer attached to the elastic network interface (ENI) of the VPC. Alternatively, you can go to each connector and deselect the VPC so it’s no longer attached to the VPC that AWS CloudFormation created.
  3. On the Amazon SageMaker console, delete any endpoints you created as part of this post.
  4. On the Athena console, delete the AmazonAthenaPreviewFunctionality workgroup.

Conclusion

In this post, you learned about Athena support for invoking ML inference model for detecting anomalous values using the RCF algorithm that was developed on Amazon SageMaker. We demonstrated how to deploy your ML model one time on Amazon SageMaker to enable anyone in your organization to run your models any number of times for inference. Additionally, if you run Athena federated queries with this feature, then you can run inference on data in any data source.


About the Authors

Amir Basirat is a Big Data specialist solutions architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a Big Data specialist for different technology companies. He also has a PhD in computer science, where his research was focused on large-scale distributed computing and neural networks.

 

Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Orchestrating analytics jobs on Amazon EMR Notebooks using Amazon MWAA

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-on-amazon-emr-notebooks-using-amazon-mwaa/

In a previous post, we introduced the Amazon EMR notebook APIs, which allow you to programmatically run a notebook on both Amazon EMR Notebooks and Amazon EMR Studio (preview) without accessing the AWS web console. With the APIs, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions triggered by AWS CloudWatch Events.

In this post, we show how to use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to orchestrate analytics jobs on EMR Notebooks. We will start by walking you through the process of using AWS CloudFormation to set up an Amazon MWAA environment, which allows you to programmatically author, schedule, and monitor different sorts of workflows on Amazon EMR. We will then use this environment to run an EMR notebook example which does data analysis with Hive.

The data source for the example in this post is from the public Amazon Customer Reviews Dataset. We use the Parquet formatted dataset as the input dataset for our EMR notebook.

Apache Airflow and Amazon MWAA

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows. With Apache Airflow, we can define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. For additional details on Apache Airflow, see Concepts. Many organizations build, manage, and maintain Apache Airflow on AWS using services such as Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). Amazon MWAA is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before getting started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services.
  • AWS Command Line Interface (AWS CLI) version 1.18.128 or later installed on your workstation.
  • An Amazon Simple Storage Service (Amazon S3) bucket that meets the following Amazon MWAA requirements:
    • The bucket must be in the same AWS Region where you create the MWAA environment.
    • The bucket name must start with airflow- and should be globally unique.
    • Bucket versioning is enabled.
    • A folder named dags must be created in the same bucket to store DAGs and associated support files.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
    • The IAM user has permissions to create an IAM role and policies, launch an EMR cluster, create an Amazon MWAA environment, and create stacks in AWS CloudFormation.
  • A possible limit increase for your account. (Usually a limit increase isn’t necessary. See AWS service quotas if you encounter a limit error while building the solution.)
  • An EMR notebook created through the Amazon EMR console, using the notebook file find_best_sellers.ipynb. See Creating a Notebook for instructions on creating an EMR notebook. Record the ID of the EMR notebook (for example, <e-*************************>); you will use this later in this post.

Architecture overview

At a high level, this solution uses Amazon MWAA with Amazon EMR to build pipelines for ETL workflow orchestration. The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We use the following services and configurations in this solution:

  • Amazon S3
  • VPC network configurations
  • VPC endpoints

Amazon S3

Amazon MWAA uses an S3 bucket to store DAGs and associated support files. You must create an S3 bucket before you can create the environment, with requirements as mentioned in the Prerequisites section. To use a bucket with an Amazon MWAA environment, you must create the bucket in the same Region where you create the environment. Refer to Create an Amazon S3 bucket for Amazon MWAA for further details.

VPC network configurations

Amazon MWAA requires a VPC network that meets the following requirements:

  • Includes two private subnets that are in two different Availability Zones within the same Region
  • Includes public subnets that are configured to route the private subnet data to the internet (via NAT gateways)

For more information, see Create the VPC network using a AWS CloudFormation template.

The Airflow UI in the Amazon MWAA environment is accessible over the internet by users granted access in the IAM policy. Amazon MWAA attaches an Application Load Balancer with an HTTPS endpoint for your web server as part of the Amazon MWAA managed service. For more information, see How it works.

VPC endpoints

VPC endpoints are highly available VPC components that enable private connections between your VPC and supported AWS services. Traffic between your VPC and the other services remains in your AWS network. For our example, we use the following VPC endpoints to ensure extra security, availability, and Amazon S3 data transfer performance:

  • An Amazon S3 gateway VPC endpoint to establish a private connection between the Amazon MWAA VPC and Amazon S3
  • An EMR interface VPC endpoint to securely route traffic directly to Amazon EMR from Amazon MWAA, instead of connecting over the internet

Setting up an Amazon MWAA environment

To make it easier to get started, we created a CloudFormation template that automatically configures and deploys the Amazon MWAA environment. The template takes care of the following tasks for you:

  • Create an Amazon MWAA execution IAM role.
  • Set up the VPC network for the Amazon MWAA environment, deploying the following resources:
    • A VPC with a pair of public and private subnets spread across two Availability Zones.
    • An internet gateway, with a default route on the public subnets.
    • A pair of NAT gateways (one in each Availability Zone), and default routes for them in the private subnets.
    • Amazon S3 gateway VPC endpoints and EMR interface VPC endpoints in the private subnets in two Availability Zones.
    • A security group to be used by the Amazon MWAA environment that only allows local inbound traffic and all outbound traffic.
  • Create an Amazon MWAA environment. For this post, we select mw1.small for the environment class and choose maximum worker count as 1. For monitoring, we choose to publish environment performance to CloudWatch Metrics. For Airflow logging configuration, we choose to send only the task logs and use log level INFO.

If you want to manually create, configure, and deploy the Amazon MWAA environment without using AWS CloudFormation, see Get started with Amazon Managed Workflows for Apache Airflow (MWAA).

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. The Amazon MWAA environment is created in the same Region as you launched the CloudFormation stack. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters.

ParameterDescriptionDefault Value
Stack nameEnter a meaningful name for the stack. We use MWAAEmrNBDemo for this example. Replace it with your own value.None
AirflowBucketNameName of the S3 bucket to store DAGs and support files. The S3 bucket must be in the same Region where you create the environment. The name must start with airflow-. Enter the S3 bucket created as a prerequisite. We use the S3 bucket airflow-emr-demo-us-west-2 for this post. You must replace it with your own value for this field.None
EnvironmentNameAn MWAA environment name that is prefixed to resource names. All the resources created by this templated are named after the value saved for this field. We name our environment mwaa-emr-blog-demo for this post. Replace it with your own value for this field.mwaa-
PrivateSubnet1CIDRThe IP range (CIDR notation) for the private subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.10.192.20.0/24
PrivateSubnet2CIDRThe IP range (CIDR notation) for the private subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications..10.192.21.0/24
PublicSubnet1CIDRThe IP range (CIDR notation) for the public subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.10.192.10.0/24
PublicSubnet2CIDRThe IP range (CIDR notation) for the public subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.10.192.11.0/24
VpcCIDRThe IP range (CIDR notation) for this VPC being created. For more information, see AWS CloudFormation VPC stack specifications.10.192.0.0/16

The default values for the IP range (CIDR notation) fields refer to the AWS CloudFormation VPC stack specifications. You can make changes based on the requirements of your own network settings.

  1. Enter the parameter values from the preceding table.
  2. Review the details on the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation takes a few minutes. After the CloudFormation stack is complete, on the Resources tab, you can find the resources being created in this CloudFormation stack. Now, we’re ready to run our example.

Orchestrating Hive analytics jobs on EMR Notebooks using Apache Airflow

The following diagram illustrates the workflow: As a user, you first need to create the DAG file that describes how to run the analytics jobs and upload it to the dags folder under the S3 bucket specified. The DAG can be triggered in Apache Airflow UI to orchestrate the job workflow, which includes creating an EMR cluster, waiting for the cluster to be ready, running Hive analytics jobs on EMR notebooks, uploading the results to Amazon S3, and cleaning up the cluster after the job is complete.

The following diagram illustrates the workflow.

Input notebook file

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

find_best_sellers.ipynb is a Python script that does analysis on the public Amazon Customer Reviews Dataset. It generates the top 20 best sellers in a given list of categories over a given period of time and saves the results to the given S3 output location. For demonstration purpose only, we rank the seller simply by the sum of review star ratings from verified purchases.

The explanations of the default parameters in the first cell and each code block are included in the notebook itself.

The last line in the first cell, we have OUTPUT_LOCATION = "s3://airflow-emr-demo-us-west-2/query_output/” as a default value for the input parameter. Replace it with your own value for the output location. You can also supply a different value for this for this parameter in the Airflow Variables later.

DAG file

The DAG file test_dag.py is used to orchestrate our job flow via Apache Airflow. It performs the following tasks:

  1. Create an EMR cluster with one m5.xlarge primary and two m5.xlarge core nodes on release version 6.2.0 with Spark, Hive, Livy and JupyterEnterpriseGateway installed as applications.
  2. Wait until the cluster is up and ready.
  3. Run the notebook find_best_sellers.ipynb on the EMR cluster created in Step 1.
  4. Wait until the notebook run is complete.
  5. Clean up the EMR cluster.

Here is the full source code of the DAG:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
import boto3, time
from builtins import range
from pprint import pprint
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.models import Variable
from airflow.utils import apply_defaults
from airflow.utils.dates import days_ago

# Available categories:
#
# Apparel,Automotive,Baby,Beauty,Books,Camera,Digital_Ebook_Purchase,Digital_Music_Purchase,
# Digital_Software,Digital_Video_Download,Digital_Video_Games,Electronics,Furniture,Gift_Card,
# Grocery,Health_&_Personal_Care,Home,Home_Entertainment,Home_Improvement,Jewelry,Kitchen,
# Lawn_and_Garden,Luggage,Major_Appliances,Mobile_Apps,Mobile_Electronics,Music,Musical_Instruments,
# Office_Products,Outdoors,PC,Personal_Care_Appliances,Pet_Products,Shoes,Software,Sports,Tools,
# Toys,Video,Video_DVD,Video_Games,Watches,Wireless

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')
# =========================================

JOB_FLOW_OVERRIDES = {
    'Name': 'Test-Cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [{'Name':'Spark'}, {'Name':'Hive'}, {'Name':'Livy'}, {'Name':'JupyterEnterpriseGateway'}],
    'Configurations': [
          {
            "Classification": "hive-site",
            "Properties": {
                "hive.execution.engine": "spark"
            }
        }
    ],
    'Instances': {
        'Ec2SubnetId': SUBNET_ID,
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': EMR_LOG_URI
}


class CustomEmrJobFlowSensor(EmrJobFlowSensor):
    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'TERMINATING']

class NotebookExecutionSensor(EmrBaseSensor):
    NON_TERMINAL_STATES = ['START_PENDING', 'STARTING', 'RUNNING', 'FINISHING', 'STOP_PENDING', 'STOPPING']
    FAILED_STATE = ['FAILING', 'FAILED']
    template_fields = ['notebook_execution_id']
    template_ext = ()
    @apply_defaults
    def __init__(self, notebook_execution_id, *args, **kwargs):
        super(NotebookExecutionSensor, self).__init__(*args, **kwargs)
        self.notebook_execution_id = notebook_execution_id
    def get_emr_response(self):
        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
        self.log.info('Poking notebook execution %s', self.notebook_execution_id)
        return emr.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)
    @staticmethod
    def state_from_response(response):
        return response['NotebookExecution']['Status']
    @staticmethod
    def failure_message_from_response(response):
        state_change_reason = response['NotebookExecution']['LastStateChangeReason']
        if state_change_reason:
            return 'Execution failed with reason: ' + state_change_reason
        return None

def start_execution(**context):
    ti = context['task_instance']
    cluster_id = ti.xcom_pull(key='return_value', task_ids='create_cluster_task')
    print("Starting an execution using cluster: " + cluster_id)
    # generate a JSON key-pair of <String : String Array>, e.g. 
    # "\"CATEGORIES\": [\"Apparel\", \"Automotive\", \"Baby\", \"Books\"]"
    categories_escaped_quotes = ""
    for category in CATEGORIES_CSV.split(','):
        categories_escaped_quotes = categories_escaped_quotes + "\"" + category + "\","
    categories_escaped_quotes = categories_escaped_quotes[:-1]
    categories_parameter = "\"CATEGORIES\" : [" + categories_escaped_quotes + "]"

    output_location_parameter = "\"OUTPUT_LOCATION\": \"" + OUTPUT_LOCATION + "\""
    from_date_parameter = "\"FROM_DATE\": \"" + FROM_DATE + "\""
    to_date_parameter = "\"TO_DATE\": \"" + TO_DATE + "\""
    parameters = f"{{ {categories_parameter}, {output_location_parameter}, {from_date_parameter}, {to_date_parameter} }}"
    emr = boto3.client('emr', region_name=REGION)
    start_resp = emr.start_notebook_execution(
        EditorId=NOTEBOOK_ID,
        RelativePath=NOTEBOOK_FILE_NAME,
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        NotebookParams=parameters,
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_resp['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id



with DAG('test_dag', description='test dag', schedule_interval='0 * * * *', start_date=datetime(2020,3,30), catchup=False) as dag:
    create_cluster = EmrCreateJobFlowOperator(
        task_id='create_cluster_task',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )
    cluster_sensor = CustomEmrJobFlowSensor(
        task_id='check_cluster_task',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    start_execution = PythonOperator(
        task_id='start_execution_task', 
        python_callable=start_execution,
        provide_context=True
    )
    execution_sensor = NotebookExecutionSensor(
        task_id='check_execution_task',
        notebook_execution_id="{{ task_instance.xcom_pull(task_ids='start_execution_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='terminate_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    
    create_cluster >> cluster_sensor >> start_execution >> execution_sensor >> cluster_remover

The very last line of the DAG code explains how the tasks are linked in the orchestration workflow. It’s overloading the right shift >> operator to create a dependency, meaning that the task on the left should be run first, and the output passed to the task on the right.

Instead of hard-coding the variables in the DAG code, we choose to supply these variables by importing a JSON file in the Airflow UI before actually running the DAG. This way, we can also update the variables without having to update the DAG code, which requires updating the DAG file in Amazon S3. We walk you through how to do so in the later steps. You can see the lines for VARIABLES that we repeated:

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')

We create a JSON formatted file named variables.json for our example. See the following code:

{
    "REGION": "us-west-2",
    "SUBNET_ID": "<subnet-********>",
    "EMR_LOG_URI": "s3://<S3 path for EMR logs>/",
    "NOTEBOOK_ID": "<e-*************************>",
    "NOTEBOOK_FILE_NAME": "find_best_sellers.ipynb",
    "CATEGORIES_CSV": "Apparel,Automotive,Baby,Beauty,Books",
    "FROM_DATE": "2015-08-25",
    "TO_DATE": "2015-08-31",
    "OUTPUT_LOCATION": "s3://<S3 path for query output>/"
}

To use this JSON code, you need to replace all the variable values (subnet and S3 paths) with the actual values.

Accessing Apache Airflow UI and running the workflow

To run the workflow, complete the following steps:

  1. On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

  1. Choose Open Airflow UI.
  2. Log in as an authenticated user.

Log in as an authenticated user.

Next, we import the JSON file for the variables into Airflow UI.

As we mentioned earlier, we want to supply the variable values for our DAG definition later upon triggering the DAG in Airflow UI instead of hard-coding the values.

  1. On the Admin menu, choose Variables.
  2. Choose Browse.
  3. Choose json.
  4. Choose Import Variables.

For more information about importing variables, see Variables.

  1. Run the following command in the same directory as where file test_dag.py is to upload the DAG file to the dags folder under the S3 bucket specified for the Airflow environment. Replace <your_airflow_bucket_name> with the S3 bucket name that you created as a prerequisite:
    aws s3 cp test_dag.py s3://<your_airflow_bucket_name>/dags/

test_dag.py should automatically appear in the Airflow UI.

  1. Trigger the DAG by turning it to On

Trigger the DAG by turning it to On

  1. Choose test_dag to go to the detail page for the DAG.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

  1. Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

You now get an email when failure happens on any of the tasks. You can also configure to get email notification when retry happens as well.

  1. On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

  1. On the Airflow UI, you can switch tabs to check the status of the workflow tasks.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

As specified in our DAG definition, the EMR cluster is stopped when the workflow is complete.

Because we use the cron expression 0 * * * * as the scheduled running interval for our workflow, if the triggered status of the DAG is ON, it runs every hour. You need to switch the status to OFF if you don’t want it to run again.

  1. On the Amazon S3 console, view the result of our notebook job in the S3 folder.

On the Amazon S3 console, view the result of our notebook job in the S3 folder.

For example, the following screenshot is the output for the Books category that we provided as a value in the CATEGORIES parameter. As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how to use the Amazon EMR Notebooks API and use orchestration services such as Amazon MWAA to build ETL pipelines. It demonstrated how set up a secured Amazon MWAA environment using a CloudFormation template and run a sample workflow with Apache Airflow.

If you want to learn how to run Amazon EMR applications such as PySpark with Amazon MWAA, see Running Spark Jobs on Amazon EMR with Apache Airflow.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

How Edmunds GovTech unifies data and analytics data for municipalities with Amazon QuickSight

Post Syndicated from Edmunds GovTech original https://aws.amazon.com/blogs/big-data/how-edmunds-govtech-unifies-data-and-analytics-data-for-municipalities-with-amazon-quicksight/

This is a guest post from an Amazon QuickSight customer, Edmunds GovTech

Over the past 30 years, Edmunds GovTech has grown to provide enterprise resource planning (ERP) solutions to thousands of East Coast municipalities. We also serve cities and towns in 25 other states. In this blog, I’ll talk about how we used Amazon QuickSight embedded business intelligence (BI) to quickly bring powerful dashboards to our on-premises and cloud-based customers.

Unifying insights

Our customers rely on our suite of solutions to manage finances, personnel, revenue, and municipal management activities such as permits, land management, business licensing, and fleet maintenance. They can access a wide variety of reports and data analysis tools tailored to the needs of users in finance, operations, and other departments. Recent acquisitions have also added new capabilities to our offering, each with its own set of reporting tools.

These reports serve specialist users well. However, we wanted to add the ability to aggregate and visualize information in one easy-to-consume service. Time-starved executives, boards, and decision-makers needed a better way to gain key insights into spending trends and implement better cost and cash management strategies. They strive to better achieve the financial goals of their municipalities without having to spend time running reports in different areas of the solution.

Production-ready in record time

With this vision in place, our primary directive was speed to market, with the aim of releasing a production-ready solution in just 4 months. We carefully evaluated our priorities and functional requirements and, ultimately outsourcing infrastructure management was key. QuickSight, a fully managed, cloud-native BI service from AWS, was the only option that allowed us to deliver so quickly.

Just as importantly, our professional services team saves an extensive amount of time to implement and train customers. That means immediate value for the customer and more time for our professional services team to spend on other activities, increasing profitability. We sell the embedded dashboard service as a subscription-based add-on, so customers can easily purchase and use it.

Flexible and future-proof

Although many of our customers use traditional client/server configurations in their own data centers, our cloud-hosted solution is becoming increasingly popular, especially with increasing numbers of remote workers. We’re also developing a software as a service (SaaS) version of our suite and continue to acquire other vendors to add functionality. All these factors mean our QuickSight dashboard service needs to be platform-agnostic. It must work with any source application, whether in AWS or on premises.

We accomplished this using Amazon Simple Queue Service (Amazon SQS) and Amazon Simple Storage Service (Amazon S3). The source application emits events about finance accounts, vendors, and yearly budgets using Amazon SQS, with Amazon S3 available to ingest large messages that exceed the limits of Amazon SQS. We rely on AWS Lambda serverless functions to handle the ingestion and routing of the messages. Each customer has an individual reporting store, separate from the database of the source system.

This system transforms data from the customer’s system into a format that is normalized for QuickSight reporting. By pointing QuickSight at these schemas, we enable it to report on that data. The customer dashboard is embedded in the ERP application, so the customer doesn’t need to go to the QuickSight website to access it.

Any source application that can adhere to the messaging format can be reported on. The source system is responsible for the number-crunching, so any customizations the customer has applied are reflected in the reports.

The following diagram illustrates this architecture.

The following diagram illustrates this architecture.

The high-level architecture is as follows:

  1. Application sends JSON message to SQS queue. If the message is too large, it is added to an S3 bucket and the message contains a reference to the large message. Note this source can be any application as long as it produces messaging adhering to predefined JSON schema.
  1. Lambda consumer ingests batch of messages, validates payloads, and transforms payloads from JSON to tenant’s MYSQL Aurora reporting database that uses a star schema. The consumer can ingest small messages directly from SQS event or retrieve large messages from S3.
  1. QuickSight Namespace for tenant contains dashboard created from a master template that points to the appropriate reporting database.
  1. Source application requests dashboard on users behalf. Dashboard is embedded within the source application UI.

Because the system relies on Lambda functions, it’s a modern, decoupled architecture that is inherently future-proof and scalable. We don’t have to manage cloud or on-premises servers, and we only pay for what clients actually use.

Additionally, we were able to build a user interface that makes it easy to deploy new customers with just a few clicks. We use the installer to create the infrastructure for new clients using the AWS Command Line Interface (AWS CLI). The customer simply pushes a button from the source system to push data to the dashboard. They can be using the dashboard in less than an hour from start to finish.

Continuously increasing customer value

QuickSight has rolled out a lot of new features just in the short time we’ve been using it, and we’re already taking advantage of them. For example, QuickSight now remembers filter selections by user, so that the choices a user makes are remembered from session to session. That saves our customers time and effort and helps them get the information they need faster.

Embedded authoring is another significant feature that we’re looking forward to rolling out soon. As of this writing, we manage and maintain reporting templates for customers and push them out to clients using the AWS CLI. With the new embedded authoring capability of QuickSight, customers will be able to explore data in a self-service manner, perform ad hoc analysis, and create new dashboards. This will greatly increase the utility of the service while maintaining ease of use for customers and simplicity of management for our team. We’re also adopting the new namespace functionality to help customers maintain data separation from others in our multi-tenant solution.

Together today and tomorrow

Working with AWS has been a great experience. Our account representative has always been available for questions and feedback, which helped us succeed especially on such an accelerated timeframe. In addition to bringing QuickSight to our customers, we value the relationship we’ve developed with AWS and look forward to building on it as we move forward with our cloud solutions. Partnering with AWS has led to many benefits across our entire organization.

Marketing and sales teams in our organization are leading client demos with the QuickSight dashboard because it looks great and works seamlessly, and it’s something a lot of customers have been asking for. For department heads, executives, and other leaders, the ability to quickly visualize current and historical budget information is huge. They can also show their boards the information they need in a very easy-to-consume way. By giving customers one place to go for a high-level strategic view across their municipality, we’re helping them make better decisions and ultimately serve their constituents more effectively.


About the Author

Thomas Mancini is the VP, Concept Development at Edmunds GovTech

Data monetization and customer experience optimization using telco data assets: Part 2

Post Syndicated from Vikas Omer original https://aws.amazon.com/blogs/big-data/part-2-data-monetization-and-customer-experience-optimization-using-telco-data-assets/

Part 1 of this series explains the importance of building and implementing a customer experience (CX) management and data monetization strategy for telecom service providers (TSPs), and the major challenges driving these initiatives. It also includes an AWS CloudFormation template to set up a demonstration of the solution using AWS services. It covers transforming and enriching multiple datasets, and offers information about data standardization, baselining an analytics data model to marry different datasets like deep packet inspection (DPI) engine embedded Packet Switch (PS) probe, CRM, subscriptions, media, carrier, device, and network configuration management in the data warehouse with AWS Glue, AWS Lambda, and Amazon Redshift.

In this post, I demonstrate how you can enable data analysts, scientists, and advanced business users to query data from Amazon Redshift or Amazon Simple Storage Service (Amazon S3) directly. I also demonstrate configuring a simple drag-and-drop interface for self-service analytics so you can prepare and publish insights based on enriched data stored in Amazon Redshift or Amazon S3 through Amazon QuickSight.

Solution overview

The following diagram illustrates the workflow of the solution.

In part 1 of this series, we discuss the overall workflow. In this post, we focus on the following steps:

  1. Catalog the processed raw, aggregate, and dimension data in the AWS Glue Data Catalog using the DPI processed data crawler.
  2. Interactively query data directly from Amazon S3 using Amazon Athena and visualize in QuickSight.
  3. Enable self-service analytics using QuickSight to prepare and publish insights based on data residing in the Amazon Redshift cluster.

Querying data using Amazon Redshift

After creating your Amazon Redshift cluster, you can immediately run queries by using the query editor on the Amazon Redshift console. Complete the following steps:

  1. On the Amazon Redshift console, in the navigation pane, choose Clusters.

A cluster with the identifier <redshift database name>-<cloudformation stack> should be present. For this example, the cluster is cemdm-telco.

  1. Choose Editor.
  2. Enter the required credentials to connect to the Amazon Redshift query editor. (Database name, Database user, and Database password are the ones you entered while creating the CloudFormation stack.)

  1. Choose Connect to database.

Upon successful authentication, you’re directed to the query editor.

  1. Run a few queries to check if data is in the tables.

In the following code, <table-name> is the Amazon Redshift table name:

select count(1) from cemdm.<table-name>;

The following query extracts the number of unique subscriber count by age group with Apple devices browsing retail domain websites or apps in or around shopping malls. You can also extract the list of subscribers and micro-segment them by consumption (total data volume) or by adding KPIs like recency and frequency.

select 
  dcd.age_range, 
  count(distinct f.customer_id)as "Unique Subs Count"
from 
  cemdm.f_daily_dpi f
inner join cemdm.d_customer_demographics dcd on f.customer_id = dcd.customer_id
inner join cemdm.d_tac dt on f.tac_code = dt.tac_sid
inner join cemdm.d_device dd on dt.device_sid = dd.device_sid
inner join cemdm.d_dpi_dictionary ddd on f.protocol_id = ddd.app_id
inner join cemdm.d_location dl on f.location_id = dl.location_id
where 
  dd.device_manufacturer = 'Apple' 
and ddd.media_category = 'Retail' 
and location_tier_4 ilike '%mall%'
group by 1 
order by 2 desc;

The following screenshot shows the output.

Unloading processed and enriched data from Amazon Redshift to Amazon S3

Amazon Redshift also includes Amazon Redshift Spectrum, which allows you to directly run SQL queries against exabytes of unstructured data in Amazon S3 data lakes. No loading or transformation is required, and you can use open data formats, including Avro, CSV, Ion, JSON, ORC, and Parquet. Amazon Redshift Spectrum automatically scales query compute capacity based on the data being retrieved, so queries against Amazon S3 run quickly, regardless of dataset size.

Amazon Redshift Spectrum gives you the freedom to store your data where you want, in the format you want, and have it available for processing when you need it. This is particularly helpful if you need to offload cold or historical data on Amazon Redshift to Amazon S3 in open data format. You can still access this data through Amazon Redshift via Amazon Redshift Spectrum plus any other application.

TSP data assets also include a lot of unstructured event data. This data is transient, and only valuable for a short amount of time. Therefore, you can leave it on Amazon S3 and access it from Amazon Redshift directly through Amazon Redshift Spectrum. You can use a lake house architecture approach, where hot, mostly static, and corporate data is in the warehouse, and the events data is in the data lake.

Alternatively, you can analyze data on Amazon S3 using Athena.

  1. Use the queries in the following table (in the Unload Statement column) in the Amazon Redshift query editor to unload data from Amazon Redshift to Amazon S3. For instructions, see Unloading data to Amazon S3. Provide the following information:
    • <aws-stack-name> – The name of the CloudFormation stack
    • <aws-region> – The Region in which you deployed the stack (for example, us-east-1)
    • <s3-bucket-name> – The bucket that you created while deploying the stack
    • <aws-account-id> – The AWS account ID in which you deployed the stack
    • <table-name> – The name of the Amazon Redshift table
Amazon Redshift TableUnload Statement

f_raw_dpi

f_hourly_dpi

unload ('select * from  cemdm.<table-name>') 
       to 's3://<s3-bucket-name>/dpi/processed/<table-name>/' 
       iam_role 'arn:aws:iam::<aws-account-id>:role /RedshiftBasicCustom-<aws-region>-<aws-stack-name>' 
       ALLOWOVERWRITE
       PARQUET 
       PARTITION BY (date_id, hour_id);

f_daily_dpi
unload ('select * from  cemdm.<table-name>') 
       to 's3://<s3-bucket-name>/dpi/processed/f_daily_dpi/' 
       iam_role 'arn:aws:iam::<aws-account-id>:role/RedshiftBasicCustom-<aws-region>-<aws-stack-name>' 
       ALLOWOVERWRITE
       PARQUET 
       PARTITION BY (date_id);

d_customer_demographics

d_device

d_dpi_dictionary

d_location

d_operator_plmn

d_tac

d_tariff_plan

d_tariff_plan_desc

unload ('select * from  cemdm.<table-name>') 
   to 's3://<s3-bucket-name>/dpi/processed/<table-name>/' 
       iam_role 'arn:aws:iam::<aws-account-id>:role /RedshiftBasicCustom-<aws-region>-<aws-stack-name>' 
       ALLOWOVERWRITE
       PARQUET;

Alternatively, you can copy the Amazon Redshift AWS Identity and Access Management (IAM) role ARN to unload data to Amazon S3 from the console under the cluster’s properties.

  1. Verify that the data has been unloaded to Amazon S3 under <s3-bucket-name>/dpi/processed/.
  2. On the AWS Glue console, in the navigation pane, choose Crawlers.
  3. Select DPIProcessedDataCrawler.
  4. Choose Run crawler.

  1. Wait for the crawler to show the status Stopping.

The tables added against the DPIProcessedDataCrawler crawlers should show 11.

  1. Under Databases, choose Tables.
  2. Verify the following 11 tables are created under the cemdm database:
    • processed_f_raw_dpi
    • processed_f_hourly_dpi
    • processed_f_daily_dpi
    • processed_d_customer_demographics
    • processed_d_device
    • processed_d_dpi_dictionary
    • processed_d_location
    • processed_d_operator_plmn
    • processed_d_tac
    • processed_d_tariff_plan
    • processed_d_tariff_plan_desc

Visualizing data using QuickSight

QuickSight is a business analytics service you can use to build visualizations, perform one-time analysis, and get business insights from your data. For more information, see What Is Amazon QuickSight?

To connect QuickSight to Amazon Redshift as your data source, complete the following steps:

  1. Create a private connection from Amazon QuickSight to an Amazon Redshift cluster.

These steps involve creating a new private subnet that the CloudFormation stack already created. Use the private subnet that isn’t used by Amazon Redshift cluster for your QuickSight connection.

QuickSight provides out-of-the-box integration with Amazon Redshift, making it simple to query and visualize your Redshift data. For more information, see Creating a Dataset from an Autodiscovered Amazon Redshift Cluster or Amazon RDS Instance.

  1. For Schema, choose cdmdm.
  2. For Tables, select f_daily_dpi.
  3. Choose Edit/Preview data.

  1. Add data and prepare the following table relationships in the Data Prep Use the information provided to create the relationships between different tables:
Table A NameTable A AttributeJoin TypeTable B NameTable B Attribute
f_daily_dpicustomer_idLEFTd_tariff_plancustomer_id
f_daily_dpitac_codeINNERd_tactac_sid
f_daily_dpisgsn_plmn_sidINNERd_operator_plmnplmn_sid
f_daily_dpilocation_idLEFTd_locationlocation_id
f_daily_dpiprotocol_idINNERd_dpi_dictionaryapp_id
f_daily_dpicustomer_idLEFTd_customer_demographicscustomer_id
d_tariff_plantariff_plan_idINNERd_tariff_plan_desctariff_plan_id
d_tacdevice_sidINNERd_devicedevice_sid

You can join d_operator_plmn with sgsn_plmn_sid and home_plmn_sid, but because the sample data only contains home subscriber data, a second join of f_raw_dpi data with d_operator_plmn on home_plmn_sid and plmn_sid is not present in the given relationship of tables.

The following screenshot shows the table relationships.

  1. Name your analysis CEMDM.
  2. Choose Save & visualize.

The following screenshots demonstrate a few QuickSight analyses created from the dataset we created. For more information about creating analyses in QuickSight, see Working with Analyses. You can divide all analyses across all the available attributes. We use the use case from part 1 of this series.

The following screenshot shows visualizations of user demographics on the Demographics tab.

The following screenshot shows visualizations of user interest on the Interest Analysis tab.

The following screenshot shows visualizations of user locations on the Location tab.

The following screenshot shows visualizations of device information on the Device tab.

The following screenshot shows visualizations of subscription information on the Subscriptions tab.

The following screenshot shows visualizations of roaming users on the Roaming tab.

The following screenshot shows visualizations on the Sub Details tab. You can drill down to subscriber-level details from any dashboard across any dimension or apply global-level filters to narrow down the desired segment.

You can also build these reports using Athena as a data connector. QuickSight provides out-of-the-box integration with Athena, which lets you run SQL queries on top of the metadata in your AWS Glue Data Catalog. For more information, see Creating a Dataset Using Amazon Athena Data.

You can also use Amazon Redshift metadata as a business glossary and visualize it using QuickSight with the following custom SQL:

SELECT * FROM (
  select 
    n.nspname as "Schema",c1.relname as "Table Name", c.attname as "Column Name", 'Attribute' as "Type",
    c.attnum as "Ordinal Position",typnotnull as "Is Not Null",typdefault as "Default Value", t.typname as "Data Type",
    split_part(d.description,'|',1) as "Category", 
    split_part(d.description,'|',2) as "Source",
    split_part(d.description,'|',3) as "Transient/Derived",
    split_part(d.description,'|',4) as "Is PII",
    split_part(d.description,'|',5) as "Is Business Sensitive",
    split_part(d.description,'|',6) as "Description"  
  from pg_catalog.pg_attribute c
  inner join pg_class c1 on c.attrelid=c1.oid
  inner JOIN pg_type t on t.oid=c.atttypid
  inner join pg_catalog.pg_namespace n on c1.relnamespace=n.oid
  inner join pg_catalog.pg_description d on d.objoid=c1.oid AND c.attnum = d.objsubid
  where n.nspname='cemdm' and c.attnum > 0
  UNION ALL
  select 
    pn.nspname as "Schema",pc.relname "Table Name",null as "Column Name", 'Table' as "Type", 
    null as "Ordinal Position",null as "Is Not Null",null as "Default Value",null as "Data Type",
    split_part(pd.description,'|',1) as "Category", 
    split_part(pd.description,'|',2) as "Source",
    split_part(pd.description,'|',3) as "Transient/Derived",
    split_part(pd.description,'|',4) as "Is PII",
    split_part(pd.description,'|',5) as "Is Business Sensitive",
    split_part(pd.description,'|',6) as "Description"
  from pg_catalog.pg_description pd 
  inner join pg_class pc on pd.objoid = pc.oid
  inner join pg_catalog.pg_namespace pn on pc.relnamespace = pn.oid
  where pn.nspname = 'cemdm' and pd.objsubid = 0
) x
order by "Table Name", nvl("Ordinal Position",0);

The following screenshot shows a sample visualization which you can build on QuickSight.

For more information about running custom Amazon Redshift SQL using Amazon QuickSight, see Using the Query Editor.

QuickSight allows creating template from existing analysis. You can use the resulting template to create a dashboard. For more information, see Evolve your analytics with Amazon QuickSight’s new APIs and theming capabilitiesYou can also embed QuickSight dashboards into your own apps, websites, and wikis without the need to provision and manage users (readers) in QuickSight. For more information, see New in Amazon QuickSight – session capacity pricing for large scale deployments, embedding in public websites, and developer portal for embedded analytics.”

Cleaning up

To avoid incurring future charges, delete the resources you created. Manually delete anything created outside of the CloudFormation stack and then the stack itself.

Conclusion

In this post, I demonstrated how data analysts, data scientists, and advanced business users can easily query multiple data sources and generate actionable insights including user interest profiles, segments, and micro-segments. Downstream systems like campaign management systems, customer care portals, and customer-facing applications; internal teams like retention, marketing, CX, and network; and workloads like machine learning can greatly benefit from the insights generated from this solution. You can automate these insights and integrate them with northbound systems, and trigger them based on a schedule or an event.

I also demonstrated how business users are empowered with self-service analytics to help them perform data exploration and publish ready-made insights in the form of dashboards. You can also create stories to drive data-heavy conversations based on enriched data stored in Amazon Redshift or Amazon S3.

Perceiving customer behavior across multiple touchpoints is the key for any business to thrive. And the essence of this solution is to capitalize on data and drive CX and monetization initiatives holistically across your organization. This framework allows you to accelerate your journey towards improving CX and generating new revenue streams by using existing data assets.

You can progressively augment this solution by adding additional data sources to evolve into a customer data platform hosting 360° profiles of individual subscribers correlated from multiple data sources. This solution can further support new and existing marketing, partnerships, loyalty, retention, network planning, and network optimization initiatives to drive revenue growth and improve profitability while keeping subscribers happy and loyal. It also helps you define an organization-wide standard for data visualization, self-service analytics, metadata discovery, and data marketplace.

For more ways to expand this solution, consider the following services:

  • AWS Data Exchange makes it easy to find, subscribe to, and use third-party data in the cloud. You can merge it with in-house data assets to span existing insights across multiple domains.
  • Amazon Pinpoint is a flexible and scalable outbound and inbound marketing communications service. You can connect with customers over channels like email, SMS, push, or voice. You can segment and micro-segment your campaign audience for the right customer and personalize your messages with the right content.

As always, AWS welcomes feedback. This is a wide-open space to explore, so reach out to us if you want to dive deep into understanding how you can build this solution and more on AWS. Please submit comments or questions in the comments section.


About the Author

Vikas Omer is an analytics specialist solutions architect at Amazon Web Services. Vikas has a strong background in analytics, customer experience management (CEM) and data monetization, with over 11 years of experience in the telecommunications industry globally. With six AWS Certifications, including Analytics Specialty, he is a trusted analytics advocate to AWS customers and partners. He loves traveling, meeting customers, and helping them become successful in what they do.

Building a cost efficient, petabyte-scale lake house with Amazon S3 lifecycle rules and Amazon Redshift Spectrum: Part 2

Post Syndicated from Cristian Gavazzeni original https://aws.amazon.com/blogs/big-data/part-2-building-a-cost-efficient-petabyte-scale-lake-house-with-amazon-s3-lifecycle-rules-and-amazon-redshift-spectrum/

In part 1 of this series, we demonstrated building an end-to-end data lifecycle management system integrated with a data lake house implemented on Amazon Simple Storage Service (Amazon S3) with Amazon Redshift and Amazon Redshift Spectrum. In this post, we address the ongoing operation of the solution we built.

Data ageing process after a month (ongoing)

Let’s assume a month has elapsed since walking through the use case in the last post, and old historical data was classified and tiered accordingly to policy. You now need to enter the new monthly data generated into the lifecycle pipeline as follows:

  • June 2020 data – Produced and consolidated into Amazon Redshift local tables
  • December 2019 data – Migrated to Amazon S3
  • June 2019 data – Migrated from Amazon S3 to S3-IA
  • March 2019 data – Migrated from S3-IA to Glacier

The first step required is to increase the ageing counter of all the Parquet files (both midterm and shortterm prefixes), using aws s3api get-object-tagging to check the current value and increasing by 1 with aws s3api set-object-tagging. This can be cumbersome if you have many objects, but you can automate it with Amazon S3 CLI scripts or SDKs like Boto3 (Phyton).

The following is a simple Python script you can use to check the current tag settings for all keys in the prefix extract_shortterm:

from boto3 import client 
import re 
conn = client('s3') 
def printtags(mybucket, myprefix): 
    for key in conn.list_objects(Bucket = mybucket, Prefix = myprefix)['Contents']: 
        if key['Key'].endswith('.parquet'): 
            tagset = conn.get_object_tagging(Bucket=mybucket, Key=key['Key'])['TagSet'] 
            stringa = str(tagset) 
            stringtag = (re.findall("\d+", stringa)) 
            tagvalue = int(stringtag[0]) 
            print((key['Key']), "ageing = ", tagvalue) 
return 
#below set parameters bucket and prefix accordingly with your env 
printtags('rs-lakehouse-blog-post', 'extract_shortterm/')

This second Python script lists all current tag settings for all keys in the prefix extract_shortterm, increases by 1 ageing, and lists the keys and new tag values. If other tags were added to these objects prior to this step, this new tag overwrites the entire tagSet. The set object tagging operation is not an append, but a completely new PUT.

from boto3 import client 
import re
conn = client('s3') 
def updateTags(mybucket, myprefix): 
    for key in conn.list_objects(Bucket = mybucket, Prefix = myprefix)['Contents']: 
        if key['Key'].endswith('.parquet'): 
            tagset = conn.get_object_tagging(Bucket = mybucket, Key=key['Key'])['TagSet'] 
            stringa = str(tagset) 
            stringtag = (re.findall("\d+", stringa)) 
            tagvalue = int(stringtag[0]) 
            print((key['Key']), "Current ageing = ", tagvalue) 
            tagvalue = tagvalue+1 
            put_tags_response = conn.put_object_tagging(Bucket=mybucket, Key = key['Key'], Tagging = {'TagSet': [ { 'Key': 'ageing', 'Value': str(tagvalue) }, ] } ) 
return 
printtags('rs-lakehouse-blog-post', 'extract_shortterm/')

To run the pipeline described before, you need to perform the following:

  1. Unload the December 2019 data.
  2. Apply the tag ageing to 6.
  3. Add the new Parquet file as a new partition to the external table taxispectrum.taxi_archive.

For the relevant syntax, see [part 1]. You can automate these tasks using an AWS Lambda function and use a monthly schedule.

Check the results with a query to the external table and don’t forget to remove unloaded items from the Amazon Redshift local table, as you did in part 1 of this series.

In this use case, we know exactly the mapping of June 2019 to the Amazon S3 key name because we used a specific naming convention. If your use case is different, you can use the two pseudo-columns automatically created in every Amazon Redshift external table: $path and $size. See the following code:

select pickup, 
    "$path", 
    "$size" 
from taxispectrum.taxi_archive 
where pickup between '2019-06-01 00:00:00' and '2019-06-30 23:59:59' 
limit 10
;

 The following screenshot shows our results.

The following screenshot shows our results. 

We’re migrating the March 2019 Parquet file to Glacier, so you should remove the related partition from the AWS Glue Data Catalog:

ALTER TABLE taxispectrum.taxi_archive
DROP PARTITION (yearmonth=‘2019-03’)
;

Right to be forgotten

One of the pillar rules of GDPR is the “right to be forgotten” rule—the ability for a customer or employee to request deletion of any personal data.

Implementing this feature for external tables on Amazon Redshift requires a different approach than for local tables, because external tables don’t support delete and update operations.

You can implement two different approaches.

In and out

In this first approach, you copy the external table to a temporary Amazon Redshift table, delete the specific rows, unload the temporary table to Amazon S3 and overwrite the key name, and drop the temporary (internal table).

Let’s assume that the drivers in the dataset are identified with column pulocid. We want to delete all records related to a driver identified with pulocid 129 and who worked between October 2019 and November 2019.

  1. With the following code (from Amazon Redshift), you can identify a every single row matched with specific single Parquet file:
    select pickup,
    	dropoff,
    	pulocid,
    	“$path”
    from taxispectrum.taxi_shortterm
    where pulocid = 129 and pickup between ‘2019-10-01 00:00:00’ and ‘2019-11-30 23:59:59’
    ;

The following screenshot shows our results.

 The following screenshot shows our results.

  1. When checking the applied tags, note the value associated to the ageing tag, or save the output of the following command in a temporary JSON file:
    aws s3api get-object-tagging \
    --bucket rs-lakehouse-blog-post \
    --key extract_shortterm/green_tripdata_2019-10000.parquet > \
    /tmp/oldtag.json

  2. Create two temporary tables, one for each of the two Parquet files matching the query (October and November):
    create table temporaryoct (like greentaxi);

  3. Copy the Parquet file to the local table, using the format as parquet attribute:
    copy temporaryoct
    from ‘s3://rs-lakehouse-blog-post/extract_shortterm/green_tripdata_2019-10000.parquet’
    iam_role ‘arn:aws:iam::123456789012:role/BlogSpectrumRole’
    format as parquet
    ;

  4. Delete the records matching the “right to be forgotten” request criteria:
    delete from temporaryoct 
    where pulocid = 129 and pickup between '2019-10-01 00:00:00' and '2019-11-30 23:59:59'
    ;

  5. Overwrite the Parquet file with the UNLOAD command (note the allowoverwrite option):
    unload ('select * from temporaryoct') 
    to 's3://rs-lakehouse-blog-post/extract_shortterm/green_tripdata_2019-10000.parquet’ 
    iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' 
    parquet parallel off allowoverwrite
    ;

  6. Drop the temporary table:
    drop table temporaryoct;

In more complex use cases, user data might span multiple months, and our approach might not be effective. In these cases, using Spark to process and rewrite the Parquet could be a better and faster solution.

In other use cases, the number of records to be deleted could be a majority. If so, as an alternative to the delete and unload steps, you could use CREATE EXTERNAL TABLE AS (CTAS). CTAS creates an external table based on the column definition from a query and writes the results of that query on Amazon S3.

Edit your own

The second option is to use an external editor to access the Amazon S3 file and remove specific records. You could use a Spark script with the following steps:

  1. Create a DataFrame.
  2. Import a Parquet file in memory.
  3. Remove records matching your criteria.
  4. Overwrite the same Amazon S3 key with the new data.

Building a simple data ageing dashboard

Sometimes data temperature is very predictable and based on ageing, but in some cases, especially when data is originated and accessed from different entities, it’s not easy to build a model to fit the best storage transition strategy. For these scenarios, you can use Amazon S3 analytics storage class analysis and Amazon S3 access logs.

Storage class analytis observes the infrequent access patterns of a filtered set of data over a period of time. You can use the analysis results to help you improve your lifecycle policies. You can configure storage class analysis to analyze all the objects in a bucket rr, and configure filters to group objects together for analysis by a common prefix (objects that have names that begin with a common string), by object tags, or by both prefix and tags. Filtering by object groups is the best way to benefit from storage class analysis.

To achieve a better understanding of how data is accessed (and who accessed it, and when) and build a custom tiering strategy, you can use Amazon S3 access logs. This feature doesn’t have any additional costs, but log retention incurs Amazon S3 storage costs. You first define a recipient to store the logs.

  1. Create a new bucket named rs-lakehouse-blog-post-logs.
  2. To set up S3 Server access logging on the source bucket rs-lakehouse-blog-post on the Amazon S3 console, on the Properties tab, choose Server access logging.
  3. For Target bucket, enter rs-lakehouse-blog-post-logs.
  4. For Target prefix, leave blank.
  5. Choose Save.

You first define a recipient to store the logs.

Let’s assume that after few days of activities, you want to discover how users and applications accessed the data.

  1. On the Amazon Redshift console, create an external table to map the Amazon S3 access logs:
    CREATE EXTERNAL TABLE taxispectrum.s3accesslogs(
        BucketOwner                   varchar(256), 
        Bucket                        varchar(256), 
        RequestDateTime               varchar(256), 
        RemoteIP                      varchar(256), 
        Requester                     varchar(256), 
        RequestID                     varchar(256), 
        Operation                     varchar(256), 
        Key                           varchar(256), 
        RequestURI_operation          varchar(256),
        RequestURI_key                varchar(256),
        RequestURI_httpProtoversion   varchar(256),
        HTTPstatus                    varchar(256), 
        ErrorCode                     varchar(256), 
        BytesSent                     varchar(256), 
        ObjectSize                    varchar(256), 
        TotalTime                     varchar(256), 
        TurnAroundTime                varchar(256), 
        Referrer                      varchar(256), 
        UserAgent                     varchar(256), 
        VersionId                     varchar(256)
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
        'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \"([^ ]*) ([^ ]*) ([^ ]*)\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)'
    )
    STORED AS TEXTFILE
    LOCATION 's3://rs-lakehouse-blog-post-logs/'
    ;

  2. Check the AWS Identity and Access Management (IAM) policy S3-Lakehouse-Policy created in part 1 and add the following two lines to the JSON definition file:
    "arn:aws:s3:::rs-lakehouse-blog-post-logs",
    "arn:aws:s3:::rs-lakehouse-blog-post-logs/*"

  3. Query a few columns:
    select bucket, 
        key, 
        requestdatetime, 
        remoteip, 
        operation, 
        useragent 
    from taxispectrum.s3accesslogs
    ;

The following screenshot shows our results.

The following screenshot shows our results.

This report is a starting point for both auditing purposes and for analyzing access patterns. As a next step, you could use a business intelligence (BI) visualization solution like Amazon QuickSight and create a dataset in order to create a dashboard showing the most and least accessed files.

Cleaning up

To clean up your resources after walking through this post, complete the following steps:

  1. Delete the Amazon Redshift cluster without the final cluster snapshot:
    aws redshift delete-cluster –-cluster-identifier redshift-cluster-1

  2. Delete the schema and table defined in AWS Glue:
    aws glue delete-table \
        –-database-name blogdb \
        –-name taxi_archive 
        
    aws glue delete-table \
        –-database-name blogdb \
        –-name s3accesslogs 
        
    aws glue delete-database –-name blogdb

  3. Delete the S3 buckets and all their content:
    aws s3 rb s3://rs-lakehouse-blog-post –-force
    aws s3 rb s3://rs-lakehouse-blog-post-logs –-force 

Conclusion

In the first post in this series, we demonstrated how to implement a data lifecycle system for a lake house using Amazon Redshift, Redshift Spectrum, and Amazon S3 lifecycle rules. In this post, we focused on how to operationalize the solution with automation scripts (with the AWS Boto3 library for Python) and S3 Server access logs.


About the Authors

Cristian Gavazzeni is a senior solution architect at Amazon Web Services. He has more than 20 years of experience as a pre-sales consultant focusing on Data Management, Infrastructure and Security. During his spare time he likes eating Japanese food and travelling abroad with only fly and drive bookings.

 

 

Francesco MarelliFrancesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Centrally tracking dashboard lineage, permissions, and more with Amazon QuickSight administrative dashboards

Post Syndicated from Jesse Gebhardt original https://aws.amazon.com/blogs/big-data/centrally-tracking-dashboard-lineage-permissions-and-more-with-amazon-quicksight-administrative-dashboards/

This post is co-written with Shawn Koupal, an Enterprise Analytics IT Architect at Best Western International, Inc.

A common ask from Amazon QuickSight administrators is to understand the lineage of a given dashboard (what analysis is it built from, what datasets are used in the analysis, and what data sources do those datasets use). QuickSight APIs allow us to capture the metadata from each object and build a complete picture of the linkages between each object. As a QuickSight administrator, you can build a dashboard that displays the lineage from dashboard to data source, along with the permissions for each asset type. It can be helpful to see all permissions assigned to each of your assets as well as the relationships between them, all in one place.

Solution overview

In this solution, you build an end-to-end data pipeline using QuickSight to ingest data from an AWS Glue table.

The following diagram illustrates the architecture of the solution.

You can invoke the QuickSight APIs via the AWS Software Development Kit (AWS SDK) or the AWS Command Line Interface (AWS CLI). For this post, we use the AWS SDK.

The solution starts with an AWS Lambda function that calls the QuickSight list APIs (list_data_sources, list_data_sets, list_analyses, list_templates, and list_dashboards) depending on the event message to build lists of assets in chunks of 100, which are iterated through by a second Lambda function. The reason for splitting the work into two functions is to work around the 15-minute time limit in Lambda. You can schedule the Lambda function to run on each asset type based on an event rule trigger. See the following code:

import boto3
import os
import time
import datetime
import json
​
AWS_ACCOUNT_ID=os.environ['AWS_ACCOUNT_ID']
AWS_REGION=os.environ['AWS_REGION']
QS_S3_BUCKET=os.environ['QS_S3_BUCKET']
DownloaderFunctionName=os.environ['DownloaderFunctionName']
​
client = boto3.client('quicksight', region_name=AWS_REGION)
lambda_client = boto3.client('lambda')
​
def invoke_downloader(iteration, apicall, list_results):
​
    apicall=apicall.replace("list_data_sources","datasource").replace("list_data_sets","dataset").replace("list_analyses","analysis").replace("list_dashboards","dashboard").replace("list_templates","template")
    msg = {"Iteration": iteration, "api": apicall, "Objects":  list_results }
    invoke_response = lambda_client.invoke(FunctionName=DownloaderFunctionName,
                                           InvocationType='Event',
                                           Payload=json.dumps(msg, default=datetime_handler))
​
​
def datetime_handler(x):
    if isinstance(x, datetime.datetime):
        return x.isoformat()
    raise TypeError("Unknown type")
​
def file_cleanup(apicall):
    #Replace the apicall with the S3 folder name
    object_type=apicall.replace("list_data_sources","datasource").replace("list_data_sets","dataset").replace("list_analyses","analysis").replace("list_dashboards","dashboard").replace("list_templates","template")
    
    s3_path='quicksight_lineage/'+object_type+'/'
    s3_path2='quicksight_lineage/'+object_type+'_permissions/'
    fileprefix="QuickSight_"+object_type
    botoSession = boto3.Session (region_name = 'us-west-2')
    s3_session = botoSession.resource('s3')
    bucket = s3_session.Bucket(QS_S3_BUCKET)
    #Delete Any files with prefix in s3_path and s3_path2
    bucket.objects.filter(Prefix=s3_path+fileprefix).delete()
    bucket.objects.filter(Prefix=s3_path2+fileprefix).delete()
​
def lambda_handler(event, context):
​
​
    if event == {}:
        #Call All APIs assests 
        apicall_list=['list_data_sources','list_data_sets','list_analyses','list_dashboards','list_templates']
    elif  event["api"] == 'datasource':
        apicall_list=['list_data_sources']
    elif event["api"] == 'dataset':
        apicall_list=['list_data_sets']
    elif event["api"] == 'analysis':
        apicall_list=['list_analyses']
    elif event["api"] == 'dashboard':
        apicall_list=['list_dashboards']
    elif event["api"] == 'template':
        apicall_list=['list_templates']
    else:
        print("[WARN] Exception: Invalid Event Type.")
        return
    for apicall in apicall_list: 
        try:
            #Clean up files from previous run
            file_cleanup(apicall)
            #Reset variables for each apicall
            iteration=0
            user_token = None
            list_results={}
​
            while True:
                iteration+=1
                print("Calling ",apicall, iteration)
                
                if user_token is None:
                    exec("""results=client."""+apicall+"""(AwsAccountId='"""+AWS_ACCOUNT_ID+"""', MaxResults=100);""",globals(), list_results)
                else:
                    exec("""results=client."""+apicall+"""(AwsAccountId='"""+AWS_ACCOUNT_ID+"""', MaxResults=100,NextToken='"""+user_token+"""');""",globals(), list_results)
​
                invoke_downloader(iteration, apicall, list_results["results"])
                user_token=list_results["results"]["NextToken"]
                print(user_token)
        except KeyError:
            print("NextToken not found.")

The second Lambda function consumes the list of assets from the event parameter from the first function and uses the QuickSight describe APIs (describe_datasource, describe_dataset, describe_analysis, describe_template, and describe_dashboard). The details of each QuickSight asset are written to CSV files in an Amazon Simple Storage Service (Amazon S3) bucket in groups of 100. Because the first function calls the second function in parallel, it’s recommended to set the reserved concurrency to 2 in the second Lambda function to avoid throttling errors (if you use the AWS CloudFormation template provided later in this post, this is automatically configured for you). See the following code:

import boto3
import os
import time
import datetime
import json

AWS_ACCOUNT_ID=os.environ['AWS_ACCOUNT_ID']
AWS_REGION=os.environ['AWS_REGION']
QS_S3_BUCKET=os.environ['QS_S3_BUCKET']

client = boto3.client('quicksight', region_name=AWS_REGION)
lambda_client = boto3.client('lambda')
s3 = boto3.client('s3')

def process_dashboards(list_dashboard,iteration,object_type):
    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("DashboardId,Name,SourceEntityArn,VersionCreatedTime,VersionNumber,CreatedTime,DataSetArns,LastPublishedTime,LastUpdatedTime" + '\n')

    for dashboard in list_dashboard["DashboardSummaryList"]:
        dashboard_desc= client.describe_dashboard(AwsAccountId=AWS_ACCOUNT_ID,DashboardId=dashboard["DashboardId"])
        
        source_entity_arn = dashboard_desc["Dashboard"]["Version"]["SourceEntityArn"]
        version_created_time = dashboard_desc["Dashboard"]["Version"]["CreatedTime"].isoformat()
        version_number = str(dashboard_desc["Dashboard"]["Version"]["VersionNumber"])
        created_time = dashboard_desc["Dashboard"]["CreatedTime"].isoformat()

        last_published_time = dashboard_desc["Dashboard"]["LastPublishedTime"].isoformat()
        last_updated_time = dashboard_desc["Dashboard"]["LastUpdatedTime"].isoformat()
        try:
            for arn in dashboard_desc["Dashboard"]["Version"]["DataSetArns"]:
                f.write(dashboard["DashboardId"]+',"'+ dashboard["Name"] + '",' + source_entity_arn + ',' + version_created_time + ',' + version_number + ','   + created_time + ','+ arn + ',' + last_published_time + ',' + last_updated_time +'\n')
        except Exception as e:
            print(e)
            dataset_arn=''
            f.write(dashboard["DashboardId"]+',"'+ dashboard["Name"] + '",' + source_entity_arn + ',' + version_created_time + ',' + version_number + ','   + created_time + ','+ dataset_arn + ',' + last_published_time + ',' + last_updated_time +'\n')

    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)            
        

def process_dashboards_permissions(list_dashboard,iteration,object_type):

    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("DashboardId,Name,Principal,Permission,Iteration" + '\n')
	
    for dashboard in list_dashboard["DashboardSummaryList"]:

        try:
            list_permissions = client.describe_dashboard_permissions(AwsAccountId=AWS_ACCOUNT_ID,DashboardId=dashboard["DashboardId"])
        except:
            print("Error Listing Permissions for:"+dashboard["DashboardId"])
            continue

        for permission in list_permissions["Permissions"]:
            #If Action includes delete operation then principal has co-owner permissions
            if "quicksight:DeleteDashboard" in permission["Actions"]:
                action = "Co-Owner"
            else:
                action = "View"

            f.write(dashboard["DashboardId"]+',"'+ dashboard["Name"] + '",' + permission["Principal"] +  ',' + action +  ',' + iteration +'\n')

    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)      


def process_analysis(list_analyses,iteration,object_type):
    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("AnalysisId,Name,AnalysisArn,CreatedTime,LastUpdatedTime,DataSetArn,Iteration" + '\n')

    for analysis in list_analyses["AnalysisSummaryList"]:
        #Call describe_analysis
        analysis_desc= client.describe_analysis(AwsAccountId=AWS_ACCOUNT_ID,AnalysisId=analysis["AnalysisId"])

        analysis_arn = analysis_desc["Analysis"]["Arn"]
        created_time = analysis_desc["Analysis"]["CreatedTime"].isoformat()
        last_updated_time = analysis_desc["Analysis"]["LastUpdatedTime"].isoformat()

        try:
            for arn in analysis_desc["Analysis"]["DataSetArns"]:
                f.write(analysis["AnalysisId"]+',"'+ analysis["Name"] + '",' + analysis_arn + ',' + created_time + ','  + last_updated_time + ',' + arn + ',' + iteration  +'\n')
        except Exception as e:
            print(e)
            dataset_arn=''
            f.write(analysis["AnalysisId"]+',"'+ analysis["Name"] + '",' + analysis_arn + ',' + created_time + ','  + last_updated_time + ',' + dataset_arn  + ',' + iteration +'\n')

    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)            

        
def process_analysis_permissions(list_analyses,iteration,object_type):

    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("AnalysisId,Name,Principal,Permission,Iteration" + '\n')
	
    for analysis in list_analyses["AnalysisSummaryList"]:

        try:
            list_permissions = client.describe_analysis_permissions(AwsAccountId=AWS_ACCOUNT_ID,AnalysisId=analysis["AnalysisId"])
        except:
            print("Error Listing Permissions for:"+analysis["AnalysisId"])
            continue
        for permission in list_permissions["Permissions"]:
            #If Action includes delete operation then principal has co-owner permissions
            if "quicksight:DeleteAnalysis" in permission["Actions"]:
                action = "Co-Owner"
            else:
                action = "View"

            f.write(analysis["AnalysisId"]+',"'+ analysis["Name"] + '",' + permission["Principal"] +  ',' + action +  ',' + iteration +'\n')
    
    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)      


def process_templates(list_templates,iteration,object_type):
    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("TemplateId,Name,TemplateArn,CreatedTime,LastUpdatedTime,SourceEntityArn,VersionNumber,Iteration" + '\n')

    for template in list_templates["TemplateSummaryList"]:
        #Call describe_template
        template_desc= client.describe_template(AwsAccountId=AWS_ACCOUNT_ID,TemplateId=template["TemplateId"])

        template_arn = template_desc["Template"]["Arn"]
        created_time = template_desc["Template"]["CreatedTime"].isoformat()
        last_updated_time = template_desc["Template"]["LastUpdatedTime"].isoformat()
        source_entity_arn = template_desc["Template"]["Version"]["SourceEntityArn"]
        version_number = str(template_desc["Template"]["Version"]["VersionNumber"])
        f.write(template["TemplateId"]+',"'+ template["Name"] + '",' + template_arn + ',' + created_time + ','  + last_updated_time + ',' + source_entity_arn + ',' + version_number +  ',' + iteration  +'\n')

    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)            

        
def process_templates_permissions(list_templates,iteration,object_type):

    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("TemplateId,Name,Principal,Permission,Iteration" + '\n')
	
    for template in list_templates["TemplateSummaryList"]:

        try:
            list_permissions = client.describe_template_permissions(AwsAccountId=AWS_ACCOUNT_ID,TemplateId=template["TemplateId"])
        except:
            print("Error Listing Permissions for:"+template["TemplateId"])
            continue
        for permission in list_permissions["Permissions"]:
            #If Action includes delete operation then principal has co-owner permissions
            if "quicksight:DeleteTemplate" in permission["Actions"]:
                action = "Co-Owner"
            else:
                action = "View"

            f.write(template["TemplateId"]+',"'+ template["Name"] + '",' + permission["Principal"] +  ',' + action +  ',' + iteration +'\n')
    
    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)      


def process_datasources(list_data_sources,iteration,object_type):
    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("DataSourceId,DataSourceArn,Name,Type,LastUpdatedTime,CreatedTime,Status,ErrorInfo,Iteration" + '\n')

    global datasource_list
    datasource_list=[]
    for datasource in list_data_sources["DataSources"]:
        datasource_id=datasource["DataSourceId"]
        name=datasource["Name"]
        datasource_type=datasource["Type"]
        try:
            status=datasource["Status"]
        except:
            status=''
        CreatedTime=str(datasource["CreatedTime"])
        LastUpdatedTime=str(datasource["LastUpdatedTime"])
        try:
            ErrorInfo="Type: "+datasource["ErrorInfo"]["Type"]+" Message: "+datasource["ErrorInfo"]["Message"]
        except:
            ErrorInfo="Null"

        f.write( datasource_id + ',' + datasource["Arn"] + ',"' + name + '",'  + datasource_type + ',' + LastUpdatedTime+ ',' + CreatedTime + ',' + status + ',' + ErrorInfo+ ',' + iteration +'\n')

    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)            


def process_datasources_permissions(list_data_sources,iteration,object_type):
    
    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write("DataSourceID,Name,Principal,Permission,Iteration" + '\n')

    for datasource in list_data_sources["DataSources"]:
        try:
            list_permissions = client.describe_data_source_permissions(AwsAccountId=AWS_ACCOUNT_ID,DataSourceId=datasource["DataSourceId"])
        except:
            print("Error Listing Permissions for:"+datasource["DataSourceId"])
            continue
        for permission in list_permissions["Permissions"]:
            #If Action includes delete operation then principal has co-owner permissions
            if "quicksight:DeleteDataSource" in permission["Actions"]:
                action = "Co-Owner"
            else:
                action = "View"
                
            f.write(datasource["DataSourceId"]+',"'+ datasource["Name"] + '",' + permission["Principal"] +  ',' + action +  ',' + iteration +'\n')
    
    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)            
    

def process_datasets(list_datasets,iteration,object_type):

    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    #CSV Header
    f.write('DatasetId,DataSetArn,Name,SpiceSize,ImportMode,LastUpdatedTime,CreatedTime,DataSourceArn,DataSourceName,DataSourceType,Source,Columns,Iteration' + '\n')
    
    for dataset in list_datasets["DataSetSummaries"]:
        
        try:
            response= client.describe_data_set(AwsAccountId=AWS_ACCOUNT_ID,DataSetId=dataset["DataSetId"])
        except Exception as e:
            print("Dataset ID: ", dataset["DataSetId"], e)
            f.write( dataset["DataSetId"] + ',' + dataset["Arn"] + ',"' + dataset["Name"] + '",' + '0' + ',' + dataset["ImportMode"] + ',' + str(dataset["LastUpdatedTime"])+ ','+ str(dataset["CreatedTime"])+ ',' + 'n/a' + ',"' + 'n/a' + '",' +  'n/a'  + ',' +  'n/a' + ',"'  + 'n/a'+ '",' + iteration +'\n')
            continue

        dataset_id=response["DataSet"]["DataSetId"]
        dataset_name=response["DataSet"]["Name"]
        dataset_size=response["DataSet"]["ConsumedSpiceCapacityInBytes"]
        ImportMode=response["DataSet"]["ImportMode"]
        LastUpdatedTime=response["DataSet"]["LastUpdatedTime"].isoformat()
        CreatedTime=response["DataSet"]["CreatedTime"].isoformat()

        try:
            for key in response["DataSet"]["PhysicalTableMap"].keys():
                
                if key == 's3PhysicalTable':
                    
                    source='S3Source'
                    DataSourceArn=response["DataSet"]["PhysicalTableMap"]["s3PhysicalTable"]["S3Source"]["DataSourceArn"]
                    Columns=response["DataSet"]["PhysicalTableMap"]["s3PhysicalTable"]["S3Source"]["InputColumns"]
                    #SqlQuery="Null"

                else:

                    try:
                        DataSourceArn=response["DataSet"]["PhysicalTableMap"][key]["RelationalTable"]["DataSourceArn"]
                        Columns=""
                        source="VisualEditor"
                    except:
                        DataSourceArn=response["DataSet"]["PhysicalTableMap"][key]["CustomSql"]["DataSourceArn"]
                        Columns=response["DataSet"]["PhysicalTableMap"][key]["CustomSql"]["Columns"]
                        source="CustomSql"

                DataSourceName=""
                DataSourceType=""
                
                f.write( dataset_id + ',' + dataset["Arn"] + ',"' + dataset_name + '",' + str(dataset_size) + ',' + ImportMode + ',' + LastUpdatedTime+ ','+ CreatedTime+ ',' + DataSourceArn + ',"' + DataSourceName + '",' +  DataSourceType  + ',' +  source + ',"'  + str(Columns) + '",' + iteration +'\n')
                
        except:
            print("[DEBUG]: Exception in main write for: " + str(dataset))
            f.write( dataset_id  + ',' + dataset["Arn"] +',"' + dataset_name + '",' + str(dataset_size) + ',' + ImportMode + ',' + LastUpdatedTime+ ',' + CreatedTime + ',,,,Unknown,"'  + str(Columns) + '",' + iteration +'\n')

    f.close()
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)


def process_datasets_permissions(list_datasets,iteration,object_type):
    
    filename="QuickSight_"+object_type+"_"+iteration+".csv"
    filePath = os.path.join("/tmp",filename)
    f=open(filePath,"w")
    f.write('DataSetID,Name,Principal,Permission,Iteration'+'\n')

    for dataset in list_datasets["DataSetSummaries"]:
        try:
            list_permissions = client.describe_data_set_permissions(AwsAccountId=AWS_ACCOUNT_ID,DataSetId=dataset["DataSetId"])
        except:
            print("Error Listing Permissions for:"+dataset["DataSetId"])
            continue
        
        for permission in list_permissions["Permissions"]:
            #If Action includes delete operation then principal has co-owner permissions
            if "quicksight:DeleteDataSet" in permission["Actions"]:
                action = "Co-Owner"
            else:
                action = "View"
                
            f.write(dataset["DataSetId"]+',"'+ dataset["Name"] + '",' + permission["Principal"] +  ',' + action+  ',' + iteration +'\n')

    f.close()        
    s3_path='quicksight_lineage/'+object_type+'/'
    s3.upload_file("{}/{}".format("/tmp",  filename), QS_S3_BUCKET, s3_path + filename)            


def lambda_handler(event, context):

    list_objects=event["Objects"]
    iteration=str(event["Iteration"])
    
    print("Iteration: ", iteration)
    print("[INFO]Processing QuickSight:", event["api"] )
    
    if  event["api"] == 'datasource':
        process_datasources(list_objects, iteration, event["api"])
        process_datasources_permissions(list_objects, iteration, event["api"]+'_permissions')
    elif event["api"] == 'dataset':
        process_datasets(list_objects, iteration, event["api"])
        process_datasets_permissions(list_objects, iteration, event["api"]+'_permissions')
    elif event["api"] == 'analysis':
        process_analysis(list_objects, iteration, event["api"])
        process_analysis_permissions(list_objects, iteration, event["api"]+'_permissions')
    elif event["api"] == 'dashboard':
        process_dashboards(list_objects, iteration, event["api"])
        process_dashboards_permissions(list_objects, iteration, event["api"]+'_permissions')
    elif event["api"] == 'template':
        process_templates(list_objects, iteration, event["api"])
        process_templates_permissions(list_objects, iteration, event["api"]+'_permissions')
    else:
        print("[WARN] Exception: Invalid Event Type.")
        return

Afterwards, the S3 bucket has the directory structure under the quicksight_lineage folder as shown in the following screenshot.

You then use AWS Glue to store the metadata of each file in an AWS Glue table, which allows you to query the information from QuickSight using an Amazon Athena or Amazon Redshift Spectrum data source (if you run the CloudFormation stack, the tables are set up for you).

The following diagram shows the tables and relationships.

Walkthrough overview

The workflow is comprised of the following high-level steps:

  1. Deploy the CloudFormation template to build the Lambda functions, AWS Identity and Access Management (IAM) roles, S3 bucket, AWS Glue database, and AWS Glue tables.
  2. Run the Python Lambda functions to build CSV files that contain the QuickSight object details.
  3. Visualize the data in QuickSight. To do so, you must create your data source, dataset, and then analysis.

For this post, we use Athena as the query engine. To use Redshift Spectrum, you must modify the provided queries.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • An IAM user with access to AWS resources used in this solution (CloudFormation, IAM, Amazon S3, AWS Glue, Athena, QuickSight)
  • Athena configured with a query result location
  • QuickSight Enterprise Edition

Creating resources

Create your resources by launching the following CloudFormation stack:

During the stack creation process, you must provide an S3 bucket name in the S3BucketName parameter (AWSAccountNumber is appended to the bucket name provided to make it unique).

After the stack creation is successful, you have two Lambda functions, two S3 buckets, an AWS Glue database and tables, and the corresponding IAM roles and policies.

Running the Lambda function

To run your Lambda function, complete the following steps:

  1. On the Lambda console, navigate to the QuickSight-Lineage-Dispatcher function.
  2. From the Select a test event menu, choose Configure test events.

  1. Select Create new test event.

You create one test event for all QuickSight assets.

  1. For Event name, enter all.
  2. Enter an empty JSON object ({}).

  1. Choose Test to run the Lambda function and generate CSV files of the assets.

Alternatively, you can create test events for each QuickSight object (Data Source, DataSet, Analysis, Dashboard, and Template) for larger QuickSight environments:

  • Test event DataSource code:
    {
      "api": "datasource"
    }

  • Test event DataSet code:
    {
      "api": "dataset"
    }

  • Test event Analysis code:
    {
      "api": "analysis"
    }

  • Test event Dashboard code:
    {
      "api": "dashboard"
    }

  • Test event Template code:
    {
      "api": "template"
    }

The following screenshot shows the configuration of a test event for Analysis.

Creating your data source and lineage data set

In this step, you use QuickSight to access the tables in your AWS Glue database.

  1. Log in to QuickSight.
  2. Choose Manage QuickSight.
  3. Choose Security & permissions.
  4. Ensure that access to the S3 bucket (that was created through CloudFormation) is enabled.
  5. Choose New analysis.
  6. Choose New dataset.
  7. For the data source, choose Athena.

  1. For your data source name, enter QuickSight-Lineage.
  2. Choose Create data source.

QuickSight prompts you to select your schema or database.

  1. Choose Use custom SQL.

  1. Update the query name from New custom SQL to QuickSight Lineage.
  2. Enter the following code into the query box:
    select 
       a.analysisid      as analysis_id,
       a.name            as analysis_name,
       a.analysisarn     as analysis_arn,
       date_parse(substr(a.createdtime,1,26),'%Y-%m-%dT%H:%i:%s.%f')     as analysis_createdtime,
       date_parse(substr(a.lastupdatedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f') as analysis_lastupdatedtime,
       a.datasetarn      as analysis_datasetarn,
       r.dashboardid        as dashboard_id,
       r.name               as dashboard_name,
       r.name||' - ID: '||r.dashboardid as dashboard_name_w_id,
       date_parse(substr(r.versioncreatedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f') as dashboard_versioncreatedtime,
       r.versionnumber      as dashboard_versionnumber     ,
       date_parse(substr(r.createdtime,1,26),'%Y-%m-%dT%H:%i:%s.%f')  as dashboard_createdtime,
       date_parse(substr(r.lastpublishedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f') as dashboard_lastpublishedtime ,
       date_parse(substr(r.lastupdatedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f') as dashboard_lastupdatedtime,
       d.datasetid        as dataset_id,
       d.datasetarn       as dataset_arn,
       d.name             as dataset_name,
       d.spicesize        as dataset_spicesize,
       d.importmode       as dataset_importmode,
       date_parse(substr(d.lastupdatedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f')  as dataset_lastupdatedtime,
       date_parse(substr(d.createdtime,1,26),'%Y-%m-%dT%H:%i:%s.%f')      as dataset_createdtime,
       d.source           as dataset_source,
       d.columns          as dataset_columns,
       s.datasourceid     as datasource_id,
       s.datasourcearn    as datasource_arn,
       s.name             as datasource_name,
       s.type             as datasource_type,
       date_parse(substr(s.lastupdatedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f') as datasource_lastupdatedtime,
       date_parse(substr(s.createdtime,1,26),'%Y-%m-%dT%H:%i:%s.%f') as datasource_createdtime,
       s.status           as datasource_status,
       s.errorinfo        as datasource_errorinfo,
       t.templateid       as template_id,
       t.name             as template_name,
       t.templatearn      as template_arn,
       date_parse(substr(t.createdtime,1,26),'%Y-%m-%dT%H:%i:%s.%f')      as template_createtime,
       date_parse(substr(t.lastupdatedtime,1,26),'%Y-%m-%dT%H:%i:%s.%f')  as template_lastupdatedtime
    from "quicksight-lineage"."dashboard" r
    left join  "quicksight-lineage"."analysis" a
    on a.analysisarn = r.sourceentityarn and a.datasetarn=r.datasetarns
    left join "quicksight-lineage"."template" t
    on t.templatearn = r.sourceentityarn
    left join  "quicksight-lineage"."dataset" d
    on d.datasetarn = r.datasetarns
    left join  "quicksight-lineage"."datasource" s
    on s.datasourcearn = d.datasourcearn

  1. Choose Confirm query.

  1. Select Import to SPICE for quicker analytics.
  2. Choose Visualize.

In the new analysis, one empty visual is loaded by default.

  1. Change the visual type to pivot table.
  2. Choose (single-click) dashboard_name, analysis_name, template_name, dataset_name, and datasource_name in the Fields list.

You can search for name in field list to make this step easier

  1. Confirm that all fields were also added to the Rows

If you have assets with duplicates names, it can helpful to add the corresponding ID columns to the visual; for example, dashboard_id, analysis_id, template_id, dataset_id, datasource_id.

Visualizing your assets and lineage

You now create five new visuals, one for each asset type (Dashboard, Analysis, Template, Dataset, Data Source), to display the additional columns pulled from the APIs.

  1. From the Add menu, choose Add visual.

  1. For the first new visual, choose the table visual type.
  2. Search for dashboard_ in Field List.
  3. Choose (single-click) all matching columns.

  1. For the second visual, choose the table visual type.
  2. Search for analysis_ in the Field List.
  3. Choose (single-click) all matching columns.
  4. Move the second visual underneath the first visual.
  5. Repeat same steps for template_, dataset_, and datasource_.

Creating your permissions data set

You now create your new data set.

  1. Leave the analysis by choosing the QuickSight logo on the top left.
  2. In the navigation pane, choose Datasets.
  3. Choose New dataset.
  4. Locate and choose the QuickSight-Lineage data source created earlier in the FROM EXISTING DATA SOURCES
  5. In the QuickSight Lineage data source window, choose Create data set.
  6. Choose Use custom SQL.

  1. Update the name from New custom SQL to QuickSight Lineage Permissions.
  2. Enter the following code into the query box:
    select distinct 'datasource' as QuickSightObjectType, sp.datasourceid as "QuickSightID",sp.name,
    split_part(principal,':',6) as principal,permission, r.name||' - ID: '||r.dashboardid as dashboard_name_w_id
     from "quicksight-lineage"."datasource_permissions"  sp
     inner join  "quicksight-lineage"."datasource" s
      on s.datasourceid = sp.datasourceid
     left join  "quicksight-lineage"."dataset" d
       on s.datasourcearn = d.datasourcearn
     left join  "quicksight-lineage"."dashboard" r
       on d.datasetarn = r.datasetarns
    union
    select distinct 'dataset' as QuickSightObjectType, dp.datasetid as "QuickSightID",dp.name,
    split_part(principal,':',6) as principal,permission, r.name||' - ID: '||r.dashboardid as dashboard_name_w_id 
     from "quicksight-lineage"."dataset_permissions" dp
     inner join  "quicksight-lineage"."dataset" d
       on d.datasetid = dp.datasetid
     left join  "quicksight-lineage"."dashboard" r
       on d.datasetarn = r.datasetarns
    union
    select distinct 'analysis' as QuickSightObjectType, ap.analysisid as "QuickSightID",ap.name,
    split_part(principal,':',6) as principal,permission, r.name||' - ID: '||r.dashboardid as dashboard_name_w_id  
     from "quicksight-lineage"."analysis_permissions" ap
      inner join  "quicksight-lineage"."analysis" a
       on a.analysisid = ap.analysisid
      left join  "quicksight-lineage"."dashboard" r
       on a.analysisarn = r.sourceentityarn  
    union
    select distinct 'template' as QuickSightObjectType, tp.templateid as "QuickSightID",tp.name,
    split_part(principal,':',6) as principal,permission, r.name||' - ID: '||r.dashboardid as dashboard_name_w_id  
     from "quicksight-lineage"."template_permissions" tp
      inner join  "quicksight-lineage"."template" t
       on t.templateid = tp.templateid
      left join  "quicksight-lineage"."dashboard" r
       on t.templatearn = r.sourceentityarn     
    union
    select distinct 'dashboard' as QuickSightObjectType, dashboardid as "QuickSightID",name,
    split_part(principal,':',6) as principal,permission, name||' - ID: '||dashboardid as dashboard_name_w_id
     from "quicksight-lineage"."dashboard_permissions"

  1. Choose Edit / Preview data.
  2. Choose Apply.
  3. For Query mode, select SPICE.

  1. Choose Save.
  2. Navigate to the Analyses page and open the analysis created earlier.
  3. Choose the pencil icon to add the new dataset.

  1. Choose Add data set.

  1. Select QuickSight Lineage Permissions.
  2. Choose Select.

  1. Make sure the new QuickSight Lineage Permissions dataset is active in the Data set drop-down menu.

Visualizing your permissions

You now add a new visual to display permissions. 

  1. Choose the table visual type.
  2. Choose (single-click) name, principal, and permission in the Field List.
  3. In the navigation pane, choose Filter.
  4. Choose +.
  5. Choose quicksightobjecttype.

  1. Choose the new filter.
  2. Deselect Select all.
  3. Select dashboard.
  4. Choose Apply.

  1. Choose Close.
  2. Move the new permissions visual so it’s to the right of the dashboard visual.

 

  1. On the new permissions visual, choose the menu options (…).
  2. Choose Duplicate visual.
  3. Repeat this step four times.
  4. Modify the quicksightobjectype filter on each new permission visual so you have one visual for each asset type.
  5. Move the visual to the right of the corresponding asset type visual.

  

Creating parameters for filtering

At this point all the visuals are created; next you need to create a parameter. You can simplify the following steps by using the new simplified filter control creation process. For more information, see Amazon QuickSight adds support for on-sheet filter controls. The following steps still work fine, but to add filter controls to an analysis, you don’t need to create parameters anymore. 

  1. Navigate to the Parameters menu.
  2. Choose Create one
  3. For Name, enter DashboardNameWithID.
  4. Choose Create.

 

  1. Choose Create a new control for a filter or a calculated field.
  2. For Display name, enter Dashboard Name with ID.
  3. For Style, choose Single select drop down.
  4. For Values, select Link to a data set field.
  5. For Select a data set, choose QuickSight Lineage Permissions.
  6. For Select a column, choose dashboard_name_w_id.
  7. Choose Add.

  1. Choose the first visual (Count of Records by Dashboard_name, Template_name, Dataset_name, Datasource_name, and Analysis_name).
  2. Add a filter in the dashboard_name_w_id field.
  3. Choose the newly added filter.
  4. Set the filter scope to All visuals.
  5. For Filter type, choose Custom filter.
  6. Select Use parameters.
  7. From the drop-down menu, choose DashboardNameWithId.
  8. Choose Apply.
  9. Choose Close.

  1. Choose the first permissions visual (Permission, Principal, and Name).
  2. Add a filter in the dashboard_name_w_id field.
  3. Set the filter scope to All visuals.
  4. For Filter type, choose Custom filter.
  5. Select Use parameters.
  6. From the drop-down menu, choose DashboardNameWithID.
  7. Choose Apply.
  8. Choose Close.

The analysis build is complete and can be published as a dashboard.

Creating additional visuals

You can also create additional visuals for different use cases.

Visualizing SPICE usage across all your SPICE datasets

To visualize Spice usage across your SPICE datasets, complete the following steps.

  1. Use the QuickSight Lineage dataset and choose the donut chart visual.
  2. For Group/Color, add dataset_name.
  3. For Value, add dataset_spicesize.
  4. Change the aggregation of dataset_spicesize to Average because a dataset can be listed multiple times in the dataset if it is reused across multiple dashboards.

This visual can be useful to track down what is consuming SPICE storage.

Visualizing SPICE refreshes by hour

To visualize SPICE refreshes by hour, complete the following steps:

  1. Use the QuickSight Lineage dataset to create a vertical stacked bar chart.
  2. For X axis, add dataset_lastupdatetime aggregated by HOUR.
  3. For Value, add dataset_id aggregated by Count district.
  4. For Group/Color, add dataset_name.
  5. Create a filter on dataset_importmode equal to SPICE.

This visual can be useful to see when all the SPICE dataset refreshes last occurred. The source data is a snapshot in time, so you need to update the source data by running the Lambda function on a regular basis.

Cleaning up

To avoid incurring future charges, delete the resources you created in this walkthrough by deleting the CloudFormation stack. Also, be sure to delete the analysis and dataset (to free up SPICE usage).

Conclusion

We also created some visuals to display SPICE usage by data set as well as the last refresh time per data set, allowing you to view the health of your SPICE refreshes and to free up SPICE capacity by cleaning up older data sets.

Give this technique of building administrative dashboards from data collected via the QuickSight APIs a try, and share you feedback and questions in the comments.


About the Authors

Jesse Gebhardt is a senior global business development manager focused on analytics. He has spent over 10 years in the Business Intelligence industry. At AWS, he aids customers around the globe gain insight and value from the data they have stored in their data lakes and data warehouses. Jesse lives in sunny Phoenix, and is an amateur electronic music producer.

 

 

Arun Santhosh is a Specialized World Wide Solution Architect for Amazon QuickSight. Arun started his career at IBM as a developer and progressed on to be an Application Architect. Later, he worked as a Technical Architect at Cognizant. Business Intelligence has been his core focus in these prior roles as well.

 

 

Shawn Koupal is an Enterprise Analytics IT Architect at Best Western International, Inc.

 

Developing, testing, and deploying custom connectors for your data stores with AWS Glue

Post Syndicated from Bo Li original https://aws.amazon.com/blogs/big-data/developing-testing-and-deploying-custom-connectors-for-your-data-stores-with-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue already integrates with various popular data stores such as the Amazon Redshift, RDS, MongoDB, and Amazon S3. Organizations continue to evolve and use a variety of data stores that best fit their applications and data requirements. We recently announced general availability of AWS Glue custom connectors, which makes it easy to discover and integrate with variety of additional data sources, such as SaaS applications and your custom data sources. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. We are also releasing a new framework to develop, validate, and deploy your own custom connectors (bring your own connectors / BYOC).

In this blog post, we go over three key aspects of AWS Glue custom connectors. First, we introduce the two mechanisms using which you can plug in a custom connector by either subscribing from AWS Marketplace or bring your own connector into Glue Spark jobs. Second, we describe the three interfaces based on Apache Spark DataSource, Amazon Athena Federated Query, and JDBC, which you can use to develop a custom connector with the released Glue Spark runtime.  Finally, we get deeper into the development process, and describe how Glue Spark runtime interfaces simplify data integration by offering powerful features that are built-in for Glue custom connectors. These features include job bookmarks for incremental loads of your data, at-source data filtering with SQL queries, partitioned execution for data parallelism, data type mapping, advanced Spark and built-in AWS Glue data transformations, integration with AWS Secrets Manager to securely store authentication credentials, AWS Glue Data Catalog for storing connections and table metadata. Glue custom connectors are also supported with AWS Glue Studio that enables visual authoring of your data integration jobs.

These data sources cover the following categories:

This post introduces two mechanisms to use custom connectors with AWS Glue Spark runtime and AWS Glue Studio console. First, we go over the user experience for seamless discovery and subscription to custom connectors developed by AWS Glue partners that are hosted on AWS Marketplace. Next, we go deeper into the five simple steps to develop and test your own connectors with AWS Glue Spark runtime, and deploy them into your production Apache Spark applications for ETL and analytics workloads that run on AWS Glue.

AWS Glue custom connectors: AWS Marketplace and BYOC

You can use an AWS Glue connector available on AWS Marketplace or bring your own connector (BYOC) and plug it into AWS Glue Spark runtime. This is in addition to the native connectors available with AWS Glue.

Connectors available on AWS Marketplace

As we make AWS Glue custom connectors generally available today, we have an ecosystem of Glue connectors listed on AWS Marketplace available from different AWS Glue partners, big data architects, and third-party developers. The following posts go into more detail on using some of these connectors for different use cases with AWS Glue:

BYOC connector example

Customers and application developers also need a method to develop connectors for custom data stores. The next section describes the end-to-end process to develop and test a custom connector using the AWS Glue Spark runtime library and interfaces locally.

After testing and validating, you can package and deploy the custom connector using the BYOC workflow in AWS Glue Studio. For instructions on deploying and using the Snowflake connector with AWS Glue jobs as a BYOC custom connector, see Performing data transformations using Snowflake and AWS Glue.

AWS Glue Spark runtime connector interfaces

AWS Glue Spark runtime offers three interfaces to plug in custom connectors built for existing frameworks: the Spark DataSource API, Amazon Athena Data Source API, or Java JDBC API. The following code snippets show how you can plug in these connectors into AWS Glue Spark runtime without any changes.

For connectors subscribed from AWS Marketplace, use the following code:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark|athena|jdbc", connection_options = {"dbTable":"Account","connectionName":"my-marketplace-connection"}, transformation_ctx = "DataSource0)

For custom connectors developed and deployed with AWS Glue, use the following code:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.spark|athena|jdbc", connection_options = {"dbTable":"Account","connectionName":"my-custom-connection"}, transformation_ctx = "DataSource0")

The following table summarizes the interfaces you need to implement for connectivity with AWS Glue Spark runtime using the Spark DataSource API.

InterfacesDescription
DataSourceV2The base interface for Spark DataSource v2 API.
ReadSupportA mix-in interface for DataSourceV2 for the connector to provide data reading ability and scan the data from the data source.
DataSourceReaderA data source reader that is created by ReadSupport to scan the data from this data source. It also supports reading actual schema and generating a list of InputPartition for parallel reads from Spark executors.
InputPartitionEach InputPartition is responsible for creating a data reader to read data into one RDD partition. InputPartitions are serialized and sent to executors, then the reader is created on executors to do the actual reading.
InputPartitionReaderResponsible for reading data into an RDD partition.

The following table summarizes the interfaces you need to implement for connectivity with AWS Glue Spark runtime using the Athena Data Source API.

InterfacesDescription
MetadataHandler
doGetSplitsSplits up the reads required to scan the requested batch of partitions.
doListSchemaNamesGets the list of schemas (databases) that this source contains.
doGetTableGets a definition (such as field names, types, and descriptions) of a table.
doListTablesGets the list of tables that this source contains.
getPartitionsGets the partitions that must be read from the request table.
RecordHandler
doReadRecordsReads the row data associated with the provided split.

The following diagram shows the class structure for the three interfaces and their execution on Spark drivers to read metadata and Spark executors to read data from the underlying datasource. The classes shown in pink are the ones that need to be implemented as part of the connector. Classes shown in green are already implemented as part of the Glue Spark runtime.

Steps to develop a custom connector

In the following sections, we describe how to develop, test, and validate an AWS Glue custom connector. We also show how to deploy the connectors to AWS Glue jobs using the AWS Glue Studio console.

Implementing the solution includes the following 5 high-level steps:

  1. Download and install AWS Glue Spark runtime, and review sample connectors.
  2. Develop using the required connector interface.
  3. Build, test, and validate your connector locally.
  4. Package and deploy the connector on AWS Glue.
  5. Use AWS Glue Studio to author a Spark application with the connector.

Downloading and installing AWS Glue Spark runtime and reviewing sample connectors

The first step to developing a connector is to install the Glue Spark runtime from Maven and refer to AWS Glue sample connectors on AWS Glue GitHub repository.

Developing and testing using the required connector interface

As discussed earlier, you can develop AWS Glue custom connectors with one of the following interfaces:

  • Spark DataSource
  • Athena Federated Query
  • JDBC

In this section, we walk you through each interface.

Spark DataSource interface

We use a simple example to illustrate the development of an AWS Glue custom connector with the Spark DataSource interface. You can also find intermediate and complex examples for developing connectors with more functionality for different data sources.

This solution implements a DataSourceReader that returns predefined data as InputPartitions stored in-memory with a given schema. The following interfaces need to be implemented for DataSourceReader. The DataSourceReader implementation runs on the Spark driver and plans the execution of Spark executors reading the data in InputPartitions:

class Reader implements DataSourceReader {
        public StructType readSchema() { ... }
        
        public List<InputPartition<InternalRow>> planInputPartitions() { ... }
}

The InputPartitions are read in parallel by different Spark executors using the InputPartitionReader implementation, which returns the records in Spark’s InternalRow format. The InputPartitionReader is essentially implemented to return an iterator of the records scanned from the underlying data store. Refer the following code:

class SimpleInputPartitionReader implements InputPartitionReader<InternalRow> {
    public boolean next() { ... }

    public InternalRow get() { ... }

    public void close() throws IOException { ... }
}

The second connector example shows how to use an Amazon S3 client to read the data in CSV format from an S3 bucket and path supplied as reader options. The third connector example shows how to use a JDBC driver to read data from a MySQL source. It also shows how to push down a SQL query to filter records at source and authenticate with the user name and password supplied as reader options.

You can plug the connectors based on the Spark DataSource API into AWS Glue Spark runtime as follows. You need to supply the connection_type for custom.spark and an AWS Glue catalog connection containing the reader options, such as user name and password. AWS Glue Spark runtime automatically converts the data source into a Glue DynamicFrame. The following code is an example to plug in the Elasticsearch Spark connector:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.spark", connection_options = {"path": "test", "es.nodes": "https://search-glue-etl-job-xxx.us-east-1.es.amazonaws.com","es.net.http.auth.user": "user","es.net.http.auth.pass": "pwd","es.port": "443","es.nodes.wan.only": "true" ,"connectionName":"my-custom-es-connection"}, transformation_ctx = "DataSource0")

AWS Glue Studio provides a visual ETL console that can also auto-generate the preceding code to construct a DynamicFrame for a deployed Spark connector (as described later in this post).

Athena Federated Query interface

AWS Glue Spark runtime also supports connectors developed with the Athena connector interface for federated queries. Similar to the Spark DataSource API, it requires implementation of two key handler interfaces: MetadataHandler and RecordHandler.

The MetadataHandler implementation runs on the Spark driver and contains the functions required to compute the schema, tables, and table partitions, and plan the actual scan by splitting the reads of individual partitions into different splits. See the following code:

public class MyMetadataHandler extends MetadataHandler{
       ListSchemasResponse doListSchemaNames(BlockAllocator allocator, ListSchemasRequest request) { … }

       ListTablesResponse doListTables(BlockAllocator allocator, ListTablesRequest request) { … }

       GetTableResponse doGetTable(BlockAllocator allocator, GetTableRequest request) { … }

       void getPartitions(BlockWriter blockWriter, GetTableLayoutRequest request, QueryStatusChecker queryStatusChecker) { … }

       GetSplitsResponse doGetSplits(BlockAllocator allocator, GetSplitsRequest request) { … }
}

The RecordHandler implements the reader to scan the data from the underlying data store associated with the split contained in the ReadRecordsRequest structure.

AWS Glue custom connectors uses the Athena RecordHandler interface, but it do not need the BlockSpiller implementation or use AWS Lambda to read the data from the underlying data store. Instead, the implementation directly runs inline within each Spark executor to return the records as Apache Arrow column vector batches. Refer the following code:

public class MyRecordHandlerextends RecordHandler{

void readWithConstraint(ConstraintEvaluator constraints, BlockSpiller spiller, ReadRecordsRequest recordsRequest, QueryStatusChecker queryStatusChecker){…}
}

AWS Glue Spark runtime can convert records returned by plugging in an Athena connector to an AWS Glue DynamicFrame as follows:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.athena", connection_options = {"tableName":"table","schemaName":"schema","connectionName":"my-custom-athena-connection"}, transformation_ctx = "DataSource0")

JDBC interface

AWS Glue Spark runtime also allows you to plug in any connector compliant with the JDBC interface. It allows you to pass in any connection option available with the JDBC connector as follows:

Datasource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"dbTable":"Account","connectionName":"my-custom-jdbc-connection"}, transformation_ctx = "DataSource0")

Advanced ETL and analytics with AWS Glue

AWS Glue Spark runtime also provides different features supported out-of-the-box with the custom connectors to enable advanced extract, data transformations, and load.

AWS Glue Studio for visual authoring of ETL jobs

Data type mapping

You can type cast the columns while reading them from the underlying data store itself. For example, a dataTypeMapping of {"INTEGER":"STRING"} casts all integer columns to string while parsing the records and constructing the DynamicFrame. This also helps you cast columns to types of your choice. Refer the following code:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}", connectionName":"test-connection-snowflake-jdbc"}, transformation_ctx = "DataSource0")

Partitioning for parallel reads

AWS Glue allows you to read data in parallel from the data store by partitioning it on a column by specifying the partitionColumn, lowerBound, upperBound, and numPartitions. This allows you to use data parallelism and multiple Spark executors allocated for the Spark application. Refer the following code, which reads data from Snowflake using 4 Spark executors in parallel. Data is partitioned across executors uniformly along the id column in the range [0, 200]:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"upperBound":"200","numPartitions":"4","partitionColumn":"id","lowerBound":"0","connectionName":"my-connection-snowflake"}, transformation_ctx = "DataSource0")

Glue Data Catalog connections

You can encapsulate all your connection properties with Glue Data Catalog connections and supply the connection name as follows. Integration with Glue Data Catalog connections allows you to use the same connection properties across multiple calls in a single Spark application or across different applications. See the following code:

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"my-connection-snowflake"}, transformation_ctx = "DataSource0")

Secrets Manager for credentials

The Data Catalog connection can also contain a secretId corresponding to a secret stored in AWS Secrets Manager that can be used to securely gather authentication and credentials information at runtime. For more details on using a secretId on the AWS Glue Studio console, see Adding connectors to AWS Glue Studio. secretId can also be specified within the ETL script as follows.

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"my-connection-snowflake", "secretId"-> "my-secret-id"}, transformation_ctx = "DataSource0")

Secret Id can be used to store credentials for different authentication mechanisms that your connector can support such as username/password, access keys, and OAuth.

SQL queries at source: Filtering with row predicates and column projections

AWS Glue Spark runtime allows you to push down SQL queries to filter data at source with row predicates and column projections. This allows you to load filtered data faster from data stores that support pushdowns. An example SQL query pushed down to a JDBC data source is SELECT id, name, department FROM department WHERE id < 200. Refer the following code:

DataSource = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"query":"SELECT id, name, department FROM department WHERE id < 200","connectionName":"my-connection-snowflake "}, transformation_ctx = "DataSource0")

Job bookmarks

AWS Glue job bookmarks allows for incremental loading of data from JDBC sources. It keeps track of the last processed record from the data store and processes new data records in subsequent AWS Glue job runs. Job bookmarks use the primary key as the default column as the bookmark key if it increases or decreases sequentially. Refer the following code that uses a transformation_ctx with job bookmarks enabled for the job:

DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "custom.jdbc", connection_options = {"connectionName":"test-connection-snowflake-jdbc"}, transformation_ctx = "DataSource0")

AWS Glue transformations

AWS Glue offers more than 35 commonly used data transformation operations with DynamicFrames and Spark DataFrames. These transforms allow you to get insights from your data and prepare it for further analytics using hundreds of available Spark SQL functions. These transformations include popular functions for schema manipulation, projecting columns, and performing joins across different data sources; transforming data with map, split, and explode; flattening nested datasets with relationalize and unnest; group and aggregate records; and run arbitrary SQL on datasets.

VPC support for networking

AWS Glue jobs allow you to securely connect to your data stores within a private VPC subnet. You can also enable a NAT (Network Address Translation) gateway within a VPC to access both VPC resources and public internet.

Building, testing, and validating your connector locally

After developing the connector for your favorite data store with the interface of your choice, you can follow the instructions to build the connector using Maven by doing a maven install. This builds the connector and installs the resulting JAR into your local Maven repository. You can now include this JAR in the class path of your IDE or AWS Glue Spark runtime downloaded from Maven.

After you build and import the JAR, you can test it locally by plugging it in AWS Glue Spark runtime and writing a validation test. We provide sample validation tests in the AWS Glue’s GitHub repository. These cover several scenarios for both local testing and validation on the AWS Glue job system. The following table lists the validation tests, the functionality they test, and the associated interfaces.

Test NameDescriptionJDBCSparkAthena
DataSourceTestTests connector connectivity and reading functionality.xxx
ReadWriteTestTests reading and writing end-to-end workflow.xxx
CatalogConnectionTestTests catalog connection integration.xxx
DataSchemaTestTests data schema from reading with the connector.xxx
SecretsManagerTestTests Secrets Manager integration.xx
DataSinkTestTests connector connectivity and writing functionalityxx
ColumnPartitioningTestTests connector column partitioning functionality.x
FilterPredicateTestTests connector filter predicate functionality.x
JDBCUrlTestTests connector extra parameters for JDBC Url functionality.x
DbtableQueryTestTests connector dbTable and query option functionality.x
DataTypeMappingTestTests connector custom data type mapping functionality.x

Functionality such as AWS Glue job bookmarks that allow incremental loads can be tested on the AWS Glue job system only. We also provide a Python script to run all tests together as a suite on the AWS Glue job system.

Packaging and deploying the connector on AWS Glue

We now discuss how you can package your connector and deploy it on AWS Glue using the BYOC workflow:

  1. Package the custom connector as a JAR and upload the JAR file to an Amaon S3 bucket in your account.
  2. Follow the flow to create a custom connector referencing the JAR in Amazon S3 from AWS Glue Studio.
  3. Instantiate a connection for that connector and create an AWS Glue job using it.

For step-by-step instructions on the BYOC workflow, see Creating custom connectors.

Alternatively, we also provide the scripts and instructions for you to share the connector publicly using AWS Marketplace for a price or free. For instructions on subscribing to the connector, see Subscribing to AWS Marketplace connectors.

Using AWS Glue Studio to author a Spark application

After you create a connection for using a BYOC or AWS Marketplace – AWS Glue connector, you can follow the instructions to visually author a Spark ETL application with AWS Glue Studio. These instructions are available here for Job Authoring with custom connectors. Following are screenshots from AWS Glue Studio:

Connectors on AWS Marketplace

Connectors on AWS Marketplace

Visually author Glue jobs using connectors with AWS Glue Studio

Step 1 – Select a connector

Following are screenshots from AWS Glue Studio:

Step 2 – Visually author the job using the associated connection

Conclusion

You can use two different mechanisms to use custom connectors with AWS Glue Spark runtime and AWS Glue Studio console. In this post, we discussed the user experience for seamless discovery and subscription to custom connectors, and walked you through developing and testing your own connectors with AWS Glue Spark runtime, and deploying them into your production Apache Spark applications for ETL and analytics workloads that run on AWS Glue.

Build a custom connector yourself or try one on AWS Marketplace with AWS Glue Studio.

If you would like to partner or add a new Glue connector to AWS Marketplace, please reach out to us at [email protected]

Resources

For additional resources, see the following:


About the Authors

Bo Li is a software engineer in AWS Glue and devoted to designing and building end-to-end solutions to address customer’s data analytic and processing needs with cloud-based data-intensive technologies.

 

 

 

Yubo Xu is a Sofware Development Engineer on the AWS Glue team. His main focus is to improve the stability and efficiency of Spark runtime for AWS Glue and the easiness to connect to various data sources. Outside of work, he enjoys reading books and hiking the trails in the Bay area with his dog, Luffy, a one-year old Shiba Inu.

 

 

Xiaoxi Liu is a software engineer at AWS Glue team. Her passion is building scalable distributed systems for efficiently managing big data on cloud and her concentrations are distributed system, big data and cloud computing

 

 

Mohit Saxena is a Software Development Manager at AWS Glue. His team works on Glue’s Spark runtime to enable new customer use cases for efficiently managing data lakes on AWS and optimize Apache Spark for performance and reliability.