Tag Archives: Analytics

Build a data lake using Amazon Kinesis Data Streams for Amazon DynamoDB and Apache Hudi

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/build-a-data-lake-using-amazon-kinesis-data-streams-for-amazon-dynamodb-and-apache-hudi/

Amazon DynamoDB helps you capture high-velocity data such as clickstream data to form customized user profiles and online order transaction data to develop customer order fulfillment applications, improve customer satisfaction, and get insights into sales revenue to create a promotional offer for the customer. It’s essential to store these data points in a centralized data lake, which can be transformed, analyzed, and combined with diverse organizational datasets to derive meaningful insights and make predictions.

A popular use case in order management is receiving, tracking, and fulfilling customer orders. The order management process begins when an order is placed and ends when the customer receives their package. When storing high-velocity order transaction data in DynamoDB, you can use Amazon Kinesis streaming to extract data and store it in a centralized data lake built on Amazon Simple Storage Service (Amazon S3).

Amazon Kinesis Data Streams for DynamoDB helps you to publish item-level changes in any DynamoDB table to a Kinesis data stream of your choice. Additionally, you can take advantage of this feature for use cases that require longer data retention on the stream and fan out to multiple concurrent stream readers. You also can integrate with Amazon Kinesis Data Analytics or Amazon Kinesis Data Firehose to publish data to downstream destinations such as Amazon Elasticsearch Service (Amazon ES), Amazon Redshift, or Amazon S3.

In this post, you use Kinesis Data Streams for DynamoDB and take advantage of managed streaming delivery of DynamoDB data to other Kinesis Data Stream by simply enabling Kinesis streaming connection from Amazon DynamoDB console. To process DynamoDB events from Kinesis, you have multiple options: Amazon Kinesis Client Library (KCL) applications, Lambda, Kinesis Data Analytics for Apache Flink, and Kinesis Data Firehose. In this post, you use Kinesis Data Firehose to save the raw data in the S3 data lake and Apache Hudi to batch process the data.

Architecture

The following diagram illustrates the order processing system architecture.

In this architecture, users buy products in online retail shops and internally create an order transaction stored in DynamoDB. The order transaction data is ingested to the data lake and stored in the raw data layer. To achieve this, you enable Kinesis Data Streams for DynamoDB and use Kinesis Data Firehose to store data in Amazon S3. You use Lambda to transform the data from the delivery stream to remove unwanted data and finally store it in Parquet format. Next, you batch process the raw data and store it back in the Hudi dataset in the S3 data lake. You can then use Amazon Athena to do sales analysis. You build this entire data pipeline in a serverless manner.

Prerequisites

Complete the following steps to create AWS resources to build a data pipeline as mentioned in the architecture. For this post, we use the AWS Region us-west-1.

  1. On the Amazon Elastic Compute Cloud (Amazon EC2) console, create a keypair.
  2. Download the data files, Amazon EMR cluster, and Athena DDL code from GitHub.
  3. Deploy the necessary Amazon resources using the provided AWS CloudFormation template.
  4. For Stack name, enter a stack name of your choice.
  5. For Keypair name, choose a key pair.

A key pair is required to connect to the EMR cluster nodes. For more information, see Use an Amazon EC2 Key Pair for SSH Credentials.

  1. Keep the remaining default parameters.
  2. Acknowledge that AWS CloudFormation might create AWS Identity and Access Management (IAM) resources.

For more information about IAM, see Resources to learn more about IAM.

  1. Choose Create stack.

You can check the Resources tab for the stack after the stack is created.

The following table summarizes the resources that you created, which you use to build the data pipeline and analysis.

Logical ID Physical ID Type
DeliveryPolicy kines-Deli-* AWS::IAM::Policy
DeliveryRole kinesis-hudi-DeliveryRole-* AWS::IAM::Role
Deliverystream kinesis-hudi-Deliverystream-* AWS::KinesisFirehose::DeliveryStream
DynamoDBTable order_transaction_* AWS::DynamoDB::Table
EMRClusterServiceRole kinesis-hudi-EMRClusterServiceRole-* AWS::IAM::Role
EmrInstanceProfile kinesis-hudi-EmrInstanceProfile-* AWS::IAM::InstanceProfile
EmrInstanceRole kinesis-hudi-EmrInstanceRole-* AWS::IAM::Role
GlueDatabase gluedatabase-* AWS::Glue::Database
GlueTable gluetable-* AWS::Glue::Table
InputKinesisStream order-data-stream-* AWS::Kinesis::Stream
InternetGateway igw-* AWS::EC2::InternetGateway
InternetGatewayAttachment kines-Inter-* AWS::EC2::VPCGatewayAttachment
MyEmrCluster AWS::EMR::Cluster
ProcessLambdaExecutionRole kinesis-hudi-ProcessLambdaExecutionRole-* AWS::IAM::Role
ProcessLambdaFunction kinesis-hudi-ProcessLambdaFunction-* AWS::Lambda::Function
ProcessedS3Bucket kinesis-hudi-processeds3bucket-* AWS::S3::Bucket
PublicRouteTable AWS::EC2::RouteTable
PublicSubnet1 AWS::EC2::Subnet
PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
PublicSubnet2 AWS::EC2::Subnet
PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
RawS3Bucket kinesis-hudi-raws3bucket-* AWS::S3::Bucket
S3Bucket kinesis-hudi-s3bucket-* AWS::S3::Bucket
SourceS3Bucket kinesis-hudi-sources3bucket-* AWS::S3::Bucket
VPC vpc-* AWS::EC2::VPC

Enable Kinesis streaming for DynamoDB

AWS recently launched Kinesis Data Streams for DynamoDB so you can send data from DynamoDB to Kinesis data streams. You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to enable this feature.

To enable this feature from the console, complete the following steps:

  1. On the DynamoDB console, choose the table you created in the CloudFormation stack earlier (it begins with the prefix order_transaction_).
  2. On the Overview tab, choose Manage streaming to Kinesis.
  3. Choose your input stream (it starts with order-data-stream-).
  4. Choose Enable.
  5. Choose Close.
  6. Make sure that stream enabled is set to Yes.

Populate the sales order transaction dataset

To replicate a real-life use case, you need an online retail application. For this post, you upload raw data files in the S3 bucket and use a Lambda function to upload the data in DynamoDB. You can download the order data CSV files from the AWS Sample GitHub repository. Complete the following steps to upload the data in DynamoDB:

  1. On the Amazon S3 console, choose the bucket <stack-name>-sourcess3bucket-*.
  2. Choose Upload.
  3. Choose Add files.
  4. Choose the order_data_09_02_2020.csv and order_data_10_02_2020.csv files.
  5. Choose Upload.
  6. On the Lambda console, choose the function <stack-name>-CsvToDDBLambdaFunction-*.
  7. Choose Test.
  8. For Event template, enter an event name.
  9. Choose Create.
  10. Choose Test.

This runs the Lambda function and loads the CSV file order_data_09_02_2020.csv to the DynamoDB table.

  1. Wait until the message appears that the function ran successfully.

You can now view the data on the DynamoDB console, in the details page for your table.

Because you enabled the Kinesis data stream in the DynamoDB table, it starts streaming the data to Amazon S3. You can check the data by viewing the bucket on the Amazon S3 console. The following screenshot shows that a Parquet file is under the prefix in the bucket.

Use Apache Hudi with Amazon EMR

Now it’s time to process the streaming data using Hudi.

  1. Log in to the Amazon EMR leader node.

You can use the key pair you chose in the security options to SSH into the leader node.

  1. Use the following bash command to start the Spark shell to use it with Apache Hudi:
spark-shell --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.hive.convertMetastoreParquet=false" --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

The Amazon EMR instance looks like the following screenshot.

  1. You can use the following Scala code to import the order transaction data from the S3 data lake to a Hudi dataset using the copy-on-write storage type. Change inputDataPath as per file path in <stack-name>-raws3bucket-* in your environment, and replace the bucket name in hudiTablePath as <stack-name>- processeds3bucket-*.
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor

//Set up various input values as variables
val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/15/"
val hudiTableName = "order_hudi_cow"
val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName

// Set up our Hudi Data Source Options
val hudiOptions = Map[String,String](
    DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
    DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
    HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
    DataSourceWriteOptions.OPERATION_OPT_KEY ->
        DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
    DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
    DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
    DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
    DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
    DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
        classOf[MultiPartKeysValueExtractor].getName)

// Read data from S3 and create a DataFrame with Partition and Record Key
val inputDF = spark.read.format("parquet").load(inputDataPath)

// Write data into the Hudi dataset
inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath)

For more information about DataSourceWriteOptions, see Work with a Hudi Dataset.

  1. In the Spark shell, you can now count the total number of records in the Apache Hudi dataset:
scala> inputDF.count()
res1: Long = 1000

You can check the processed Apache Hudi dataset in the S3 data lake via the Amazon S3 console. The following screenshot shows the prefix order_hudi_cow is in <stack-name>- processeds3bucket-*.

When navigating into the order_hudi_cow prefix, you can find a list of Hudi datasets that are partitioned using the transaction_date key—one for each date in our dataset.

Let’s analyze the data stored in Amazon S3 using Athena.

Analyze the data with Athena

To analyze your data, complete the following steps:

  1. On the Athena console, create the database order_db using the following command:
create database order_db;

You use this database to create all the Athena tables.

  1. Create your table using the following command (replace the S3 bucket name with <stack-name>- processeds3bucket* created in your environment):
    CREATE EXTERNAL TABLE order_transaction_cow (
      `_hoodie_commit_time` string,
      `_hoodie_commit_seqno` string,
      `_hoodie_record_key` string,
      `_hoodie_partition_path` string,
      `_hoodie_file_name` string,
      `order_id` string,
      `item_id` string,
      `customer_id` string,
      `product` string,
      `amount` decimal(3,1),
      `currency` string,
      `time_stamp` string
      )
      PARTITIONED BY ( 
      `transaction_date` string)
    ROW FORMAT SERDE
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT
      'org.apache.hudi.hadoop.HoodieParquetInputFormat'
    OUTPUTFORMAT
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow'

  2. Add partitions by running the following query on the Athena console:
    ALTER TABLE order_transaction_cow ADD
    PARTITION (transaction_date = '2020-09-02') LOCATION 's3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/order_hudi_cow/2020-09-02/';

  3. Check the total number of records in the Hudi dataset with the following query:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

It should return a single row with a count of 1,000.

Now check the record that you want to update.

 

  1. Run the following query on the Athena console:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The output should look like the following screenshot. Note down the value of product and amount.

Analyze the change data capture

Now let’s test the change data capture (CDC) in streaming. Let’s take an example where the customer changed an existing order. We load the order_data_10_02_2020.csv file, where order_id 3801 has a different product and amount.

To test the CDC feature, complete the following steps:

  1. On the Lambda console, choose the stack <stack-name>-CsvToDDBLambdaFunction-*.
  2. In the Environment variables section, choose Edit.
  3. For key, enter order_data_10_02_2020.csv.
  4. Choose Save.

You can see another prefix has been created in <stack-name>-raws3bucket-*.

  1. In Amazon EMR, run the following code in the Scala shell prompt to update the data (change inputDataPath to the file path in <stack-name>-raws3bucket-* and hudiTablePath to <stack-name>- processeds3bucket-*):
    import org.apache.spark.sql.SaveMode
    import org.apache.spark.sql.functions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    
    //Set up various input values as variables
    val inputDataPath = "s3://kinesis-hudi-raws3bucket-1p6nszvqd9awz/2021/02/01/18/"
    val hudiTableName = "order_hudi_cow"
    val hudiTablePath = "s3://kinesis-hudi-processeds3bucket-yfc6843vmq1o/" + hudiTableName
    
    // Set up our Hudi Data Source Options
    val hudiOptions = Map[String,String](
        DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "order_id",
    	DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "time_stamp",
        DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "transaction_date", 
        HoodieWriteConfig.TABLE_NAME -> hudiTableName, 
        DataSourceWriteOptions.OPERATION_OPT_KEY ->
            DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL, 
        DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "time_stamp", 
        DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", 
        DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> hudiTableName, 
        DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "transaction_date", 
        DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY ->
            classOf[MultiPartKeysValueExtractor].getName)
    
    // Read data from S3 and create a DataFrame with Partition and Record Key
    val inputDF = spark.read.format("parquet").load(inputDataPath)
    
    // Write data into the Hudi dataset
    inputDF.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save(hudiTablePath
    

  2. Run the following query on the Athena console to check for the change to the total number of records as 1,000:
    SELECT count(*) FROM "order_db"."order_transaction_cow";

  3. Run the following query on the Athena console to test for the update:
SELECT * FROM "order_db"."order_transaction_cow"
where order_id ='3801'
and item_id ='23'
and transaction_date ='2020-09-02';

The following screenshot shows that the product and amount values for the same order are updated.

In a production workload, you can trigger the updates on a schedule or by S3 modification events. A fully automated data lake makes sure your business analysts are always viewing the latest available data.

Clean up the resources

To avoid incurring future charges, follow these steps to remove the example resources:

  1. Delete the resources you created earlier in the pre-requisite section by deleting the stack instances from your stack set, if you created the EMR cluster with the CloudFormation template,.
  2. Stop the cluster via the Amazon EMR console, if you launched the EMR cluster manually.
  3. Empty all the relevant buckets via the Amazon S3 console.

Conclusion

You can build an end-to-end serverless data lake to get real-time insights from DynamoDB by using Kinesis Data Streams—all without writing any complex code. It allows your team to focus on solving business problems by getting useful insights immediately. Application developers have various use cases for moving data quickly through an analytics pipeline, and you can make this happen by enabling Kinesis Data Streams for DynamoDB.

If this post helps you or inspires you to solve a problem, we would love to hear about it! The code for this solution is available in the GitHub repository for you to use and extend. Contributions are always welcome!


About the Authors

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to guide enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

 

 

 

Saurabh Shrivastava is a solutions architect leader and analytics/ML specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

 

 

 

Dylan Qu is an AWS solutions architect responsible for providing architectural guidance across the full AWS stack with a focus on data analytics, AI/ML, and DevOps.

 

Amazon EMR 2020 year in review

Post Syndicated from Abhishek Sinha original https://aws.amazon.com/blogs/big-data/amazon-emr-2020-year-in-review/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto at scale. Amazon EMR automates the provisioning and scaling of these frameworks, and delivers high performance at low cost with optimized runtimes and support for a wide range of Amazon Elastic Compute Cloud (Amazon EC2) instance types and Amazon Elastic Kubernetes Service (Amazon EKS) clusters. Amazon EMR makes it easy for data engineers and data scientists to develop, visualize, and debug data science applications with Amazon EMR Studio (preview) and Amazon EMR Notebooks.

You can hear customers describe how they use Amazon EMR in the following 2020 AWS re:Invent sessions:

You can also find more information in the following posts:

Throughout 2020, we worked to deliver better Amazon EMR performance at a lower price, and to make Amazon EMR easier to manage and use for big data analytics within your Lake House Architecture. This post summarizes the key improvements during the year and provides links to additional information.

Differentiated engine performance

Amazon EMR simplifies building and operating big data environments and applications. You can launch an EMR cluster in minutes. You don’t need to worry about infrastructure provisioning, cluster setup, configuration, or tuning. Amazon EMR takes care of these tasks, allowing you to focus your teams on developing differentiated big data applications. In addition to eliminating the need for you to build and manage your own infrastructure to run big data applications, Amazon EMR gives you better performance than simply using open-source distributions, and provides 100% API compatibility. This means you can run your workloads faster without changing any code.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Spark that is active by default. We first introduced the EMR runtime for Apache Spark in Amazon EMR release 5.28.0 in November 2019, and used queries based on the TPC-DS benchmark to measure the performance improvement over open-source Spark 2.4. Those results showed considerable improvement: the geometric mean in query execution time was 2.4 times faster and the total query runtime was 3.2 times faster. As discussed in Turbocharging Query Execution on Amazon EMR at AWS re:Invent 2020, we’ve continued to improve the runtime, and our latest results show that Amazon EMR 5.30 is three times faster than without the runtime, which means you can run petabyte-scale analysis at less than half the cost of traditional on-premises solutions. For more information, see How Drop used the EMR runtime for Apache Spark to halve costs and get results 5.4 times faster.

We’ve also improved Hive and PrestoDB performance. In April 2020, we announced support for Hive Low Latency Analytical Processing (LLAP) as a YARN service starting with Amazon EMR 6.0. Our tests show that Apache Hive is two times faster with Hive LLAP on Amazon EMR 6.0. You can choose to use Hive LLAP or dynamically allocated containers. In May 2020, we introduced the Amazon EMR runtime for PrestoDB in Amazon EMR 5.30. Our most recent tests based on TPC-DS benchmark queries compare Amazon EMR 5.31, which uses the runtime, to Amazon EMR 5.29, which does not. The geometric mean in query execution time is 2.6 times faster with Amazon EMR 5.31 and the runtime for PrestoDB.

Simpler incremental data processing

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is an open-source data management framework used for simplifying incremental data processing and data pipeline development. You can use it to perform record-level inserts, updates, and deletes in Amazon Simple Storage Service (Amazon S3) data lakes, thereby simplifying building change data capture (CDC) pipelines. With this capability, you can comply with data privacy regulations and simplify data ingestion pipelines that deal with late-arriving or updated records from sources like streaming inputs and CDC from transactional systems. Apache Hudi integrates with open-source big data analytics frameworks like Apache Spark, Apache Hive, and Presto, and allows you to maintain data in Amazon S3 or HDFS in open formats like Apache Parquet and Apache Avro.

We first supported Apache Hudi starting with Amazon EMR release 5.28 in November 2019. In June 2020, Apache Hudi graduated from incubator with release 0.6.0, which we support with Amazon EMR releases 5.31.0, 6.2.0, and higher. The Amazon EMR team collaborated with the Apache Hudi community to create a new bootstrap operation, which allows you to use Hudi with your existing Parquet datasets without needing to rewrite the dataset. This bootstrap operation accelerates the process of creating a new Apache Hudi dataset from existing datasets—in our tests using a 1 TB Parquet dataset on Amazon S3, the bootstrap performed five times faster than bulk insert.

Also in June 2020, starting with Amazon EMR release 5.30.0, we added support for the HoodieDeltaStreamer utility, which provides an easy way to ingest data from many sources, including AWS Data Migration Services (AWS DMS). With this integration, you can now ingest data from upstream relational databases to your S3 data lakes in a seamless, efficient, and continuous manner. For more information, see Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service.

Amazon Athena and Amazon Redshift Spectrum added support for querying Apache Hudi datasets in S3-based data lakes—Athena announcing in July 2020 and Redshift Spectrum announcing in September. Now, you can query the latest snapshot of Apache Hudi Copy-on-Write (CoW) datasets from both Athena and Redshift Spectrum, even while you continue to use Apache Hudi support in Amazon EMR to make changes to the dataset.

Differentiated instance performance

In addition to providing better software performance with Amazon EMR runtimes, we offer more instance options than any other cloud provider, allowing you to choose the instance that gives you the best performance and cost for your workload. You choose what types of EC2 instances to provision in your cluster (standard, high memory, high CPU, high I/O) based on your application’s requirements, and fully customize your cluster to suit your requirements.

In December 2020, we announced that Amazon EMR now supports M6g, C6g, and R6g instances with versions 6.1.0, 5.31.0 and later, which enables you to use instances powered by AWS Graviton2 processors. Graviton2 processors are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon EC2. Although your performance benefit will vary based on the unique characteristics of your workloads, our tests based on the TPC-DS 3 TB benchmark showed that the EMR runtime for Apache Spark provides up to 15% improved performance and up to 30% lower costs on Graviton2 instances relative to equivalent previous generation instances.

Easier cluster optimization

We’ve also made it easier to optimize your EMR clusters. In July 2020, we introduced Amazon EMR Managed Scaling, a new feature that automatically resizes your EMR clusters for best performance at the lowest possible cost. EMR Managed Scaling eliminates the need to predict workload patterns in advance or write custom automatic scaling rules that depend on an in-depth understanding of the application framework (for example, Apache Spark or Apache Hive). Instead, you specify the minimum and maximum compute resource limits for your clusters, and Amazon EMR constantly monitors key metrics based on the workload and optimizes the cluster size for best resource utilization. Amazon EMR can scale the cluster up during peaks and scale it down gracefully during idle periods, reducing your costs by 20–60% and optimizing cluster capacity for best performance.

EMR Managed Scaling is supported for Apache Spark, Apache Hive, and YARN-based workloads on Amazon EMR versions 5.30.1 and above. EMR Managed Scaling supports EMR instance fleets, enabling you to seamlessly scale Spot Instances, On-Demand Instances, and instances that are part of a Savings Plan, all within the same cluster. You can take advantage of Managed Scaling and instance fleets to provision the cluster capacity that has the lowest chance of getting interrupted, for the lowest cost.

In October 2020, we announced Amazon EMR support for the capacity-optimized allocation strategy for provisioning EC2 Spot Instances. The capacity-optimized allocation strategy automatically makes the most efficient use of available spare capacity while still taking advantage of the steep discounts offered by Spot Instances. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

For more information, see How Nielsen built a multi-petabyte data platform using Amazon EMR and Contextual targeting and ad tech migration best practices.

Workload consolidation

Previously, you had to choose between using fully managed Amazon EMR on Amazon EC2 or self-managing Apache Spark on Amazon EKS. When you use Amazon EMR on Amazon EC2, you can choose from a wide range of EC2 instance types to meet price and performance requirements, but you can’t run multiple versions of Apache Spark or other applications on a cluster, and you can’t use unused capacity for non-Amazon EMR applications. When you self-manage Apache Spark on Amazon EKS, you have to do the heavy lifting of installing, managing, and optimizing Apache Spark to run on Kubernetes, and you don’t get the benefit of optimized runtimes in Amazon EMR.

You no longer have to choose. In December 2020, we announced the general availability of Amazon EMR on Amazon EKS, a new deployment option for Amazon EMR that allows you to run fully managed open-source big data frameworks on Amazon EKS. If you already use Amazon EMR, you can now consolidate Amazon EMR-based applications with other Kubernetes-based applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management using common Amazon EKS tools. If you currently self-manage big data frameworks on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and take advantage of the optimized Amazon EMR runtimes to deliver better performance at lower cost.

Amazon EMR on EKS enables your team to collaborate more efficiently. You can run applications on a common pool of resources without having to provision infrastructure, and co-locate multiple Amazon EMR versions on a single Amazon EKS cluster to rapidly test and verify new Amazon EMR versions and the included open-source frameworks. You can improve developer productivity with faster cluster startup times because Amazon EMR application containers on existing Amazon EKS cluster instances start within 15 seconds, whereas creating new clusters of EC2 instances can take several minutes. You can use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to programmatically author, schedule, and monitor workflows, and use EMR Studio (preview) to develop, visualize, and debug applications. We discuss Amazon MWAA and EMR Studio more in the next section.

For more information, see Run Spark on Kubernetes with Amazon EMR on Amazon EKS and Amazon EMR on EKS Development Guide.

Higher developer productivity

Of course, your goal with Amazon EMR is not only to achieve the best price performance for your big data analytics workloads, but also to deliver new insights that help you run your business.

In November 2020, we announced Amazon MWAA, a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines. Airflow workflows retrieve input from sources like Amazon S3 using Athena queries, perform transformations on EMR clusters, and can use the resulting data to train machine learning (ML) models on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) using the Python programming language.

At AWS re:Invent 2020, we introduced the preview of EMR Studio, a new notebook-first integrated development environment (IDE) experience with Amazon EMR. EMR Studio makes it easy for data scientists to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. It provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. You can install custom Python libraries or Jupyter kernels required for your applications directly to your EMR clusters, and can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers. EMR Studio uses AWS Single Sign-On (AWS SSO), enabling you to log in directly with your corporate credentials without signing in to the AWS Management Console.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can create cluster templates in AWS Service Catalog to simplify running jobs for your data scientists and data engineers, and can take advantage of EMR clusters running on Amazon EC2, Amazon EKS, or both. For example, you might reuse existing EC2 instances in your shared Kubernetes cluster to enable fast startup time for development work and ad hoc analysis, and use EMR clusters on Amazon EC2 to ensure the best performance for frequently run, long-running workloads.

To learn more, see Introducing a new notebook-first IDE experience with Amazon EMR and Amazon EMR Studio.

Unified governance

At AWS, we recommend you use a Lake House Architecture to modernize your data and analytics infrastructure in the cloud. A Lake House Architecture acknowledges the idea that taking a one-size-fits-all approach to analytics eventually leads to compromises. It’s not simply about integrating a data lake with a data warehouse, but rather about integrating a data lake, data warehouse, and purpose-built analytics services, and enabling unified governance and easy data movement. For more information about this approach, see Harness the power of your data with AWS Analytics by Rahul Pathak, and his AWS re:Invent 2020 analytics leadership session.

As shown in the following diagram, Amazon EMR is one element in a Lake House Architecture on AWS, along with Amazon S3, Amazon Redshift, and more.

One of the most important pieces of a modern analytics architecture is the ability for you to authorize, manage, and audit access to data. AWS gives you the fine-grained access control and governance you need to manage access to data across a data lake and purpose-built data stores and analytics services from a single point of control.

In October 2020, we announced the general availability of Amazon EMR integration with AWS Lake Formation. By integrating Amazon EMR with AWS Lake Formation, you can enhance data access control on multi-tenant EMR clusters by managing Amazon S3 data access at the level of databases, tables, and columns. This feature also enables SAML-based single sign-on to EMR Notebooks and Apache Zeppelin, and simplifies the authentication for organizations using Active Directory Federation Services (ADFS). With this integration, you have a single place to manage data access for Amazon EMR, along with the other AWS analytics services shown in the preceding diagram. At AWS re:Invent 2020, we announced the preview of row-level security for Lake Formation, which makes it even easier to control access for all the people and applications that need to share data.

In January 2021, we introduced Amazon EMR integration with Apache Ranger. Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka. Starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects.

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

To learn more, see Integrate Amazon EMR with AWS Lake Formation and Integrate Amazon EMR with Apache Ranger.

Jumpstart your migration to Amazon EMR

Building a modern data platform using the Lake House Architecture enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools like Amazon EMR. Migrating your big data and ML to AWS and Amazon EMR offers many advantages over on-premises deployments. These include separation of compute and storage, increased agility, resilient and persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. We can help you design, deploy, and architect your analytics application workloads in AWS and help you migrate your big data and applications.

The AWS Well-Architected Framework helps you understand the pros and cons of decisions you make while building systems on AWS. By using the framework, you learn architectural best practices for designing and operating reliable, secure, efficient, and cost-effective systems in the cloud, and ways to consistently measure your architectures against best practices and identify areas for improvement. In May 2020, we announced the Analytics Lens for the AWS Well-Architected Framework, which offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. We believe that having well-architected systems greatly increases the likelihood of business success.

To move to Amazon EMR, you can download the Amazon EMR migration guide to follow step-by-step instructions, get guidance on key design decisions, and learn best practices. You can also request an Amazon EMR Migration Workshop, a virtual workshop to jumpstart your Apache Hadoop/Spark migration to Amazon EMR. You can also learn how AWS partners have helped customers migrate to Amazon EMR in Mactores’s Seagate case study, Cloudwick’s on-premises to AWS Cloud migration to drive cost efficiency, and DNM’s global analytics platform for the cinema industry.


About the Authors

Abhishek Sinha is a Principal Product Manager at Amazon Web Services.

 

 

 

 

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

BJ Haberkorn is principal product marketing manager for analytics at Amazon Web Services. BJ has worked previously on voice technology including Amazon Alexa, real time communications systems, and processor design. He holds BS and MS degrees in electrical engineering from the University of Virginia.

Effective data lakes using AWS Lake Formation, Part 1: Getting started with governed tables

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/part-1-effective-data-lakes-using-aws-lake-formation-part-1-getting-started-with-governed-tables/

Thousands of customers are building their data lakes on Amazon Simple Storage Service (Amazon S3). You can use AWS Lake Formation to build your data lakes easily—in a matter of days as opposed to months. However, there are still some difficult challenges to address with your data lakes:

  • Supporting streaming updates and deletes in your data lakes, for example, database replication, and supporting privacy regulations such as GDPR and CCPA
  • Achieving fine-grained secure sharing not only with table- or column-level access control, but with row-level access control
  • Optimizing the layout of various tables and files on Amazon S3 to improve analytics performance

We announced Lake Formation transactions, row-level security, and acceleration for preview at AWS re:Invent 2020. These capabilities are available via new, open, and public update and access APIs for data lakes. These APIs extend the governance capabilities of Lake Formation with row-level security, and provide transactions semantics on data lakes.

In this series of the posts, we provide a step-by-step instruction to use these new Lake Formation features. In this post, we focus on the first step of setting up governed tables.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, sign up for the preview. You need to be approved for the preview to gain access to these features.

Governed Table

The Data Catalog supports a new type of metadata tables: governed tables. Governed tables are unique to Lake Formation. Governed tables are a new Amazon S3 table type that supports atomic, consistent, isolated, and durable (ACID) transactions. Lake Formation transactions simplify ETL script and workflow development, and allow multiple users to concurrently and reliably insert, delete, and modify rows across multiple governed tables. Lake Formation automatically compacts and optimizes storage of governed tables in the background to improve query performance. When you create a table, you can specify whether or not the table is governed.

Setting up resources with AWS CloudFormation

In this post, I demonstrate how you can create a new governed table using existing data on Amazon S3. We use the Amazon Customer Reviews Dataset, which is stored in a public S3 bucket as sample data. You don’t need to copy the data to your bucket or worry about Amazon S3 storage costs. You can just set up a governed table pointing to this existing public data to see how it works.

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. If you prefer setting up resources on the AWS Management Console rather than AWS CloudFormation, see the instructions in the appendix at the end of this post.

The CloudFormation template generates the following resources:

To create your resources, complete the following steps:

  1. Sign in to the CloudFormation console in us-east-1 Region.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatalakeAdminUserNameand DatalakeAdminUserPassword, enter your IAM user name and password for data lake admin user.
  5. For DataAnalystUserNameand DataAnalystUserPassword, enter your IAM user name and password for data analyst user.
  6. For DatabaseName, leave as the default.
  7. Choose Next.
  8. On the next page, choose Next.
  9. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  10. Choose Create.

Stack creation can take up to 2 minutes.

Setting up a governed table

Now you can create and configure your first governed table in AWS Lake Formation.

Creating a governed table

To create your governed table, complete the following steps:

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin1 user.
  2. Choose Tables.
  3. Choose Create table.
  4. For Name, enter amazon_reviews_governed.
  5. For Database, enter lakeformation_tutorial_amazon_reviews.
  6. Select Enable governed data access and management.
  7. Select Enable row based permissions.

Select Enable row based permissions.

    1. For Data is located in, choose Specified path in another account.
    2. Enter the path s3://amazon-reviews-pds/parquet/.
    3. For Classification, choose PARQUET.
    4. Choose Upload Schema.
    5. Enter the following JSON array into the text box:
[
    {
        "Name": "marketplace",
        "Type": "string"
    },
    {
        "Name": "customer_id",
        "Type": "string"
    },
    {
        "Name": "review_id",
        "Type": "string"
    },
    {
        "Name": "product_id",
        "Type": "string"
    },
    {
        "Name": "product_parent",
        "Type": "string"
    },
    {
        "Name": "product_title",
        "Type": "string"
    },
    {
        "Name": "star_rating",
        "Type": "int"
    },
    {
        "Name": "helpful_votes",
        "Type": "int"
    },
    {
        "Name": "total_votes",
        "Type": "int"
    },
    {
        "Name": "vine",
        "Type": "string"
    },
    {
        "Name": "verified_purchase",
        "Type": "string"
    },
    {
        "Name": "review_headline",
        "Type": "string"
    },
    {
        "Name": "review_body",
        "Type": "string"
    },
    {
        "Name": "review_date",
        "Type": "bigint"
    },
    {
        "Name": "year",
        "Type": "int"
    }
]
  1. Choose Upload.
  2. Choose Add column.
  3. For Column name, enter product_category.
  4. For Data type, choose String.
  5. Select Partition Key.
  6. Choose Add.
  7. Choose Submit.

Now you can see that the new governed table has been created.

When you choose the table name, you can see the details of the governed table, and you can also see Governance: Enabled in this view. It means that it’s a Lake Formation governed table. If you have other existing tables, it should show as Governance: Disabled because the tables are not governed tables.
Now you can see that the new governed table has been created.

You can also see lakeformation.aso.status: true under Table properties. It means that automatic compaction is enabled for this table. For this post, we use a read-only table and don’t utilize automatic compaction. To disable the automatic compaction, complete the following steps:

  1. Choose Edit table.
  2. Deselect Automatic compaction.
  3. Choose Save.

Currently, no data and no partitions are registered to this governed table. In the next step, we register existing S3 objects to the governed table using Lake Formation manifest APIs.

Even if you locate your data in the table location of the governed table, the data isn’t recognized yet. To make the governed table aware of the data, you need to make a Lake Formation API call, or use an AWS Glue job with Lake Formation transactions.

Even if you locate your data in the table location of the governed table, the data isn’t recognized yet.

Configuring Lake Formation permissions

You need to grant Lake Formation permissions for your governed table. Complete the following steps:

Table-level permissions

  1. Sign in to the Lake Formation console in us-east-1 Region using the DatalakeAdmin1 user.
  2. Under Permissions, choose Data permissions.
  3. Under Data permission, choose Grant.
  4. For Database, choose lakeformation_tutorial_amazon_reviews.
  5. For Table, choose amazon_reviews_governed.
  6. For IAM users and roles, choose the role LFRegisterLocationServiceRole-<CloudFormation stack name> and the user DatalakeAdmin1.
  7. Select Table permissions.
  8. Under Table permissions, select Alter, Insert, Drop, Delete, Select, and Describe.
  9. Choose Grant.
  10. Under Data permission, choose Grant.
  11. For Database, choose lakeformation_tutorial_amazon_reviews.
  12. For Table, choose amazon_reviews_governed.
  13. For IAM users and roles, choose the user DataAnalyst1.
  14. Under Table permissions, select Select and Describe.
  15. Choose Grant.

Row-level permissions

  1. Under Permissions, choose Data permissions.
  2. Under Data permission, choose Grant.
  3. For Database, choose lakeformation_tutorial_amazon_reviews.
  4. For Table, choose amazon_reviews_governed.
  5. For IAM users and roles, choose the role LFRegisterLocationServiceRole-<CloudFormation stack name>, the users DatalakeAdmin1 and DataAnalyst.
  6. Select Row-based permissions.
  7. For Filter name, enter allowAll.
  8. For Choose filter type, select Allow access to all rows.
  9. Choose Grant.

Adding table objects into the governed table

To register S3 objects to a governed table, you need to call the UpdateTableObjects API needs for the objects. You can call it using the AWS Command Line Interface (AWS CLI) and SDK, and also the AWS Glue ETL library (the API is called implicitly in the library). For this post, we use the AWS CLI to explain the behavior in the API level. If you don’t have the AWS CLI, see Installing, updating, and uninstalling the AWS CLI. You also need to install the service model file provided in the Lake Formation preview program. You need to run the following commands using DatalakeAdmin1 user’s credential (or an IAM role or user where sufficient permissions are granted).

First, begin a new transaction with the BeginTransaction API:

$ aws lakeformation-preview begin-transaction
{
    "TransactionId": "7e5d506a757f32252ae3402a10191b13bfd1d7aa1c26a099d4a1911241589b8f"
}

Now you can register any files on the location. For this post, we choose one sample partition product_category=Camera from the amazon-reviews-pds table, and choose one file under this partition. Uri, ETag, and Size are the required information for further steps, so you need to copy them.

$ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Camera/
2018-04-09 15:37:05   65386769 part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:06   65619234 part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:06   64564669 part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65148225 part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65227429 part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:07   65269357 part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:08   65595867 part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:08   65012056 part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:09   65137504 part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
2018-04-09 15:37:09   64992488 part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

$ aws s3api head-object --bucket amazon-reviews-pds --key parquet/product_category=Camera/part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
{
    "AcceptRanges": "bytes",
    "LastModified": "Mon, 09 Apr 2018 06:37:07 GMT",
    "ContentLength": 65227429,
    "ETag": "\"980669fcf6ccf31d2d686b9cccdd45e3-8\"",
    "ContentType": "binary/octet-stream",
    "Metadata": {}
}

Create a new file named write-operations1.json and enter the following JSON: (replace Uri, ETag, and Size with the values you copied.)

[
    {
        "AddObject": {
            "Uri": "s3://amazon-reviews-pds/parquet/product_category=Camera/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
            "ETag": "d4c25c40f33071620fb31cf0346ed2ec-8",
            "Size": 65386769,
            "PartitionValues": [
                "Camera"
            ]
        }
    }
]

Let’s register an existing object on the bucket to the governed table by making an UpdateTableObjects API call using write-operations1.json you created. (replace <transaction-id> with the transaction id you got in begin-transaction command.)

$ aws lakeformation-preview update-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id> --write-operations file://./write-operations1.json$ 
Note current date time right after making the UpdateTableObjects API call here. We use this timestamp for time travel queries later.
$ date -u
Tue Feb  2 12:12:00 UTC 2021

You can ensure the change before the transaction commit by making the GetTableObjects API call with the same transaction ID: (Replace <transaction-id> with the id you got in begin-transaction command.)

$ aws lakeformation-preview get-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id>
{
    "Objects": [
        {
            "PartitionValues": [
                "Camera"
            ],
            "Objects": [
                {
                    "Uri": "s3://amazon-reviews-pds/parquet/product_category=Camera/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
                    "ETag": "d4c25c40f33071620fb31cf0346ed2ec-8",
                    "Size": 65386769
                }
            ]
        }
    ]
}

To make this data available for other transactions, you need to call the CommitTransaction API: (replace <transaction-id> with the transaction id you got in begin-transaction command.)

$ aws lakeformation-preview commit-transaction --transaction-id <transaction-id>
After running the preceding command, you can see the partition on the Lake Formation console.

After running the preceding command, you can see the partition on the Lake Formation console.

Let’s add one more partition into this table. This time we add one file per partition, and add only two partitions as an example. For actual usage, you need to add all the files under all the partitions that you need.

Add partitions with following commands:

  1. Call the BeginTransaction API to start another Lake Formation transaction:
    $ aws lakeformation-preview begin-transaction
    {
         "TransactionId": "d70c60e859e832b312668723cf48c1b84ef9109c5dbf6e9dbe8834c481c0ec81"
    }

  2. List Amazon S3 objects located on amazon-reviews-pds bucket to choose another sample file:
    $ aws s3 ls s3://amazon-reviews-pds/parquet/product_category=Books/
    2018-04-09 15:35:58 1094842361 part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:35:59 1093295804 part-00001-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1095643518 part-00002-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1095218865 part-00003-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:00 1094787237 part-00004-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:33 1094302491 part-00005-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1094565655 part-00006-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1095288096 part-00007-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1092058864 part-00008-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    2018-04-09 15:36:35 1093613569 part-00009-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet

  3. Call the HeadObject API against one sample file in order to copy ETag and Size
    $ aws s3api head-object --bucket amazon-reviews-pds --key parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet
    {
         "AcceptRanges": "bytes",
         "LastModified": "Mon, 09 Apr 2018 06:35:58 GMT",
         "ContentLength": 1094842361,
         "ETag": "\"9805c2c9a0459ccf337e01dc727f8efc-131\"",
         "ContentType": "binary/octet-stream",
         "Metadata": {}
    }

  4. Create a new file named write-operations2.json and enter the following JSON: (Replace Uri, ETag, and Size with the values you copied.)
    [
        {
                "AddObject": {
                "Uri": "s3://amazon-reviews-pds/parquet/product_category=Books/part-00000-495c48e6-96d6-4650-aa65-3c36a3516ddd.c000.snappy.parquet",
                "ETag": "9805c2c9a0459ccf337e01dc727f8efc-131",
                "Size": 1094842361,
                "PartitionValues": [
                    "Books"
               ]
           }
        }
    ]

  5. Call the UpdateTableObjects API using write-operations2.json: (replace <transaction-id> with the transaction id you got in begin-transaction command.)
    $ aws lakeformation-preview update-table-objects --database-name lakeformation_tutorial_amazon_reviews --table-name amazon_reviews_governed --transaction-id <transaction-id> --write-operations file://./write-operations2.json

    Call the CommitTransaction API: (replace <transaction-id> with the transaction id you got in begin-transaction command.)

    $ aws lakeformation-preview commit-transaction --transaction-id <transaction-id>

    Now the two partitions are visible on the Lake Formation console.

Now the two partitions are visible on the Lake Formation console.

Querying the governed table using Amazon Athena

Now your governed table is ready! Let’s start querying the governed table using Amazon Athena. Sign in to the Athena console in us-east-1 Region using DataAnalyst1 user.

If it’s your first time running queries on Athena, you need to configure a query result location. For more information, see Specifying a Query Result Location.

To utilize Lake Formation preview features, you need to create a special workgroup named AmazonAthenaLakeFormationPreview, and join the workgroup. For more information, see Managing Workgroups.

Running a simple query

Sign in to the Athena console in us-east-1 Region using the DataAnalyst1 user. First, let’s preview 10 records stored in a governed table:

SELECT * 
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed
LIMIT 10

The following screenshot shows the query results.

The following screenshot shows the query results.

Running an analytic query

Next, let’s run an analytic query with aggregation for simulating real-world use cases:

SELECT product_category, count(*) as TotalReviews, avg(star_rating) as AverageRating
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed 
GROUP BY product_category

The following screenshot shows the results. This query returned the total number of reviews and average rating per product category.

The following screenshot shows the results

Running an analytic query with time travel

Each governed table maintains a versioned manifest of the Amazon S3 objects that it comprises. You can use previous versions of the manifest for time travel queries. Your queries against governed tables in Athena can include a timestamp to indicate that you want to discover the state of the data at a particular date and time.

To submit a time travel query in Athena, add a WHERE clause that sets the column __asOfDate to the epoch time (long integer) representation of the required date and time. Let’s run the time travel query: (replace <epoch-milliseconds> with the timestamp which is right after you made the first UpdateTableObjects call. To retrieve the epoch milliseconds, see the tips introduced after the screenshots in this post.)

SELECT product_category, count(*) as TotalReviews, avg(star_rating) as AverageRating
FROM lakeformation.lakeformation_tutorial_amazon_reviews.amazon_reviews_governed
WHERE __asOfDate = <epoch-milliseconds>
GROUP BY product_category

The following screenshot shows the query results. The result only includes the record of product_category=Camera. This is because that the file under product_category=Books has been added after this timestamp (1612267920000 ms = 2021/02/02 12:12:00 UTC), which has been specified in the time travel column __asOfDate.

The following screenshot shows the query results.

To retrieve epoch time from commands, you can run below commands.

The following command is for Linux (GNU date command):

$ echo $(($(date -u -d '2021/02/02 12:12:00' +%s%N)/1000000)) 
1612267920000

The following command is for OSX (BSD date command):

$ echo $(($(date -u -j -f "%Y/%m/%d %T" "2021/02/02 12:12:00" +'%s * 1000 + %-N / 1000000')))
1612267920000

Cleaning up

Now to the final step, cleaning up the resources.

  1. Delete the CloudFormation stack. The governed table you created is automatically deleted with the stack.
  2. Delete the Athena workgroup AmazonAthenaLakeFormationPreview.

Conclusion

In this blog post, we explained how to create a Lake Formation governed table with existing data in an AWS public dataset. In addition, we explained how to query against governed tables and how to run time travel queries for governed tables. With Lake Formation governed tables, you can achieve transactions, row-level security, and query acceleration. In Part 2 of this series, we show you how to create a governed table for streaming data sources and demonstrate how Lake Formation transactions work.

Lake Formations transactions, row-level security, and acceleration are currently available for preview in the US East (N. Virginia) AWS Region. To get early access to these capabilities, please sign up for the preview.


Appendix: Setting up resources via the console

When following the steps in this section, use the Region us-east-1 because as of this writing, this Lake Formation preview feature is available only in us-east-1.

Configuring IAM roles and IAM users

First, you need to set up two IAM roles, one is for AWS Glue ETL jobs, another is for the Lake Formation data lake location.

IAM policies

To create your policies, complete the following steps:

  1. On the IAM console, create a new Policy for Amazon S3.
  2. Save the policy as S3DataLakePolicy as follows:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject"
                ],
                "Resource": [
                    "arn:aws:s3:::amazon-reviews-pds/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::amazon-reviews-pds"
                ]
            }
        ]
    }

  3. Create a new IAM policy named LFLocationPolicy with the following statements:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFPreview1",
                "Effect": "Allow",
                "Action": "execute-api:Invoke",
                "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus"
            },
            {
                "Sid": "LFPreview2",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:BeginTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:AbortTransaction",
                    "lakeformation:GetTableObjects",
                    "lakeformation:UpdateTableObjects"
                ],
                "Resource": "*"
            }
        ]
    }

    
    

  4. Create a new IAM policy named LFQuery Policy with the following statements:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "LFPreview1",
                "Effect": "Allow",
                "Action": "execute-api:Invoke",
                "Resource": "arn:aws:execute-api:*:*:*/*/POST/reportStatus"
            },
            {
                "Sid": "LFPreview2",
                "Effect": "Allow",
                "Action": [
                    "lakeformation:BeginTransaction",
                    "lakeformation:CommitTransaction",
                    "lakeformation:AbortTransaction",
                    "lakeformation:ExtendTransaction",
                    "lakeformation:PlanQuery",
                    "lakeformation:GetTableObjects",
                    "lakeformation:GetQueryState",
                    "lakeformation:GetWorkUnits",
                    "lakeformation:Execute"
                ],
                "Resource": "*"
            }
        ]
    }

    IAM role for AWS Lake Formation

To create your IAM role for the Lake Formation data lake location, complete the following steps:

  1. Create a new Lake Formation role called LFRegisterLocationServiceRole with a Lake Formation trust relationship:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": [
              "lakeformation.amazonaws.com"
            ]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
    

    Attach the customer managed policies S3DataLakePolicy and LFLocationPolicy you created in the previous step.

This role is used to register locations with Lake Formation which in-turn performs credential vending for Athena at query time.

IAM users

To create your users, complete the following steps:

  1. Create an IAM user named DatalakeAdmin.
  2. Attach the following AWS managed policies:
    1. AWSLakeFormationDataAdmin
    2. AmazonAthenaFullAccess
    3. IAMReadOnlyAccess
  3. Attach the customer managed policy LFQueryPolicy.
  4. Create an IAM user named DataAnalyst that can use Athena to query data.
  5. Attach the AWS managed policy AmazonAthenaFullAccess.
  6. Attach the customer managed policy LFQueryPolicy.

Configuring Lake Formation

If you’re new to Lake Formation, you can follow below steps for getting started with AWS Lake Formation.

  1. On the Lake Formation console, under Permissions, choose Admins and database creators.
  2. In the Data lake administratorssection, choose Grant.
  3. For IAM users and roles, choose your IAM user DatalakeAdmin.
  4. Choose Save.
  5. In the Database creators section, choose Grant.
  6. For IAM users and roles, choose the LFRegisterLocationServiceRole.
  7. Select Create Database.
  8. Choose Grant.
  9. Under Register and ingest, choose Data lake locations.
  10. Choose Register location.
  11. For Amazon S3 path, enter your Amazon S3 path to the bucket where your data is stored. This needs to be the same bucket you listed in LFLocationPolicy. Lake Formation uses this role to vend temporary Amazon S3 credentials to query services that need read/write access to the bucket and all prefixes under it.
  12. For IAM role, choose the LFRegisterLocationServiceRole.
  13. Choose Register location.
  14. Under Data catalog, choose Settings.
  15. Make sure that both check boxes for Use only IAM access control for new databases and Use only IAM access control for new tables in new databases are deselected.
  16. Under Data catalog, choose Databases.
  17. Choose Create database.
  18. Select Database.
  19. For Name, enter lakeformation_tutorial_amazon_reviews.
  20. Choose Create database.

About the Author

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue & Lake Formation. His passion is for implementing software artifacts for building data lakes more effectively and easily. During his spare time, he loves to spend time with his family, especially hunting bugs—not software bugs, but bugs like butterflies, pill bugs, snails, and grasshoppers.

Analyzing Freshdesk data using Amazon EventBridge and Amazon Athena

Post Syndicated from Benjamin Smith original https://aws.amazon.com/blogs/compute/analyzing-freshdesk-data-using-amazon-eventbridge-and-amazon-athena/

This post is written by Shashi Shankar, Application Architect, Shared Delivery Teams

Freshdesk is an omnichannel customer service platform by Freshworks. It provides automation services to help speed up customer support processes.

The Freshworks connector to Amazon EventBridge allows real time streaming of Freshdesk events with minimal configuration and setup. This integration provides real-time insights into customer support operations without the operational overhead of provisioning and maintaining any servers.

In this blog post, I walk through a serverless approach to ingest and analyze Freshdesk data. This solution uses EventBridge, Amazon Kinesis Data Firehose, Amazon S3, and Amazon Athena. I also look at examples of customer service questions that can be answered using this approach.

The following diagram shows a high-level architecture of the proposed solution:

  1. When a Freshdesk ticket is updated or created, the Freshworks connector pushes event data to the Amazon EventBridge partner event bus.
  2. A rule on the partner event bus pushes the event data to Kinesis Data Firehose.
  3. Kinesis Data Firehose batches data before sending to S3. An AWS Lambda function transforms the data by adding a new line to each record before sending.
  4. Kinesis Data Firehose delivers the batch of records to S3.
  5. Athena is used to query relevant data from S3 using standard SQL.

The walkthrough shows you how to:

  1. Add the EventBridge app to Freshdesk account.
  2. Configure a Freshworks partner event bus in EventBridge.
  3. Deploy a Kinesis Data Firehose stream, a Lambda function, and an S3 bucket.
  4. Set up a custom rule on the event bus to push data to Kinesis Data Firehose.
  5. Generate sample Freshdesk data to validate the ingestion process.
  6. Set up a table in Athena to query the S3 bucket.
  7. Query and analyze data

Pre-requisites

  • A Freshdesk account (which can be created here).
  • An AWS account.
  • AWS Serverless Application Model (AWS SAM CLI), installed and configured.

Adding the Amazon EventBridge app to a Freshdesk account

  1. Log in to your Freshdesk account and navigate to Admin Helpdesk Productivity Apps. Search for EventBridge:
  2. Choose the Amazon EventBridge icon and choose Install.
  • Enter your AWS account number in the AWS Account ID field.
  • Enter “OnTicketCreate”, “OnTicketUpdate” in the Events field.
  • Enter the AWS Region to send the Freshdesk events in the Region field. This walkthrough uses the us-east-1 Region.

Configuring a Freshworks partner event bus in EventBridge

Once previous step is completed, a partner event source is automatically created in the EventBridge console. Copy the partner event source name to a clipboard.

  1. Clone the GitHub repo and deploy the AWS SAM template:
    git clone https://github.com/aws-samples/amazon-eventbridge-freshdesk-example.git
    cd ./amazon-eventbridge-freshdesk-example
    sam deploy --guided
  2. PartnerEventSource – Enter partner event source name copied from the previous step.
  3. S3BucketName – Enter an S3 bucket name to store Freshdesk ticket event data.

The AWS SAM template creates an association between the partner event source and event bus:

    Type: AWS::Events::EventBus
    Properties:
      EventSourceName: !Ref PartnerEventSource
      Name: !Ref PartnerEventSource

The template creates a Kinesis Data Firehose delivery stream, Lambda function, and S3 bucket to process and store the events from Freshdesk tickets. It also adds a rule to the custom event bus with the Kinesis Data Firehose stream as the target:

  PushToFirehoseRule:
    Type: "AWS::Events::Rule"
    Properties:
      Description: Test Freshdesk Events Rule
      EventBusName: !Ref PartnerEventSource
      EventPattern:
        account: [!Ref AWS::AccountId]
      Name: freshdeskeventrule
      State: ENABLED
      Targets:
        - Arn:
            Fn::GetAtt:
              - "FirehoseDeliveryStream"
              - "Arn"
          Id: "idfreshdeskeventrule"
          RoleArn: !GetAtt EventRuleTargetIamRole.Arn

  EventRuleTargetIamRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17"
        Statement:
          - Sid: ""
            Effect: "Allow"
            Principal:
              Service:
                - "events.amazonaws.com"
            Action:
              - "sts:AssumeRole"
      Path: "/"
      Policies:
        - PolicyName: Invoke_Firehose
          PolicyDocument:
            Version: "2012-10-17"
            Statement:
              - Effect: "Allow"
                Action:
                  - "firehose:PutRecord"
                  - "firehose:PutRecordBatch"
                Resource:
                  - !GetAtt FirehoseDeliveryStream.Arn

Generating sample Freshdesk data to validate the ingestion process:

To generate sample Freshdesk data, login to the Freshdesk account and browse to the “Tickets” screen as shown:

Follow the steps to simulate two customer service operations:

  1. To create a ticket of type “Refund”. Choose the New button and enter the details:
  2. Update an existing ticket and change the priority to “Urgent”.
  3. Within a few minutes of updating the ticket, the data is pushed via the Freshworks connector to the S3 bucket created using the AWS SAM template. To verify this, browse to the S3 bucket and see that a new object with the ticket data is created:

You can also use the S3 Select option under object actions to view the raw JSON data that is sent from the partner system. You are now ready to analyze the data using Athena.

Setting up a table in Athena to query the S3 bucket

If you are familiar with Apache Hive, you may find creating tables on Athena helpful. You can create tables by writing the DDL statement in the query editor or by using the wizard or JDBC driver. To create a table in Athena:

  1. Copy and paste the following DDL statement in the Athena query editor to create a Freshdesk’s events table. For this example, the table is created in the default database.
  2. Replace S3_Bucket_Name in the following query with the name of the S3 bucket created by deploying the previous AWS SAM template:
CREATE EXTERNAL TABLE ` freshdeskevents`(
  `id` string COMMENT 'from deserializer', 
  `detail-type` string COMMENT 'from deserializer', 
  `source` string COMMENT 'from deserializer', 
  `account` string COMMENT 'from deserializer', 
  `time` string COMMENT 'from deserializer', 
  `region` string COMMENT 'from deserializer', 
  `detail` struct<ticket:struct<subject:string,description:string,is_description_truncated:boolean,description_text:string,is_description_text_truncated:boolean,due_by:string,fr_due_by:string,fr_escalated:boolean,is_escalated:boolean,fwd_emails:array<string>,reply_cc_emails:array<string>,email_config_id:string,id:int,group_id:bigint,product_id:string,company_id:string,requester_id:bigint,responder_id:bigint,status:int,priority:int,type:string,tags:array<string>,spam:boolean,source:int,tweet_id:string,cc_emails:array<string>,to_emails:string,created_at:string,updated_at:string,attachments:array<string>,custom_fields:string,changes:struct<responder_id:array<bigint>,ticket_type:array<string>,status:array<int>,status_details:array<struct<id:int,name:string>>,group_id:array<bigint>>>,requester:struct<id:bigint,name:string,email:string,mobile:string,phone:string,language:string,created_at:string>> COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'paths'='account,detail,detail-type,id,region,resources,source,time,version') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION  's3://S3_Bucket_Name/'

The table is created on the data stored in S3 and is ready to be queried. Note that table freshdeskevents points at the bucket s3://S3_Bucket_Name/. As more data is added to the bucket, the table automatically grows, providing a near-real-time data analysis experience.

Querying and analyzing data

You can use the following examples to get started with querying the Athena table.

  1. To get all the events data, run:
SELECT * FROM default.freshdeskevents  limit 10

The preceding output has a detail column containing the details related to the ticket. Tickets can be filtered on nested notations to build more insightful queries. Also, the detail-type column provides classification of tickets as new (onTicketCreate) vs updated (onTicketUpdate).

  1. To show new tickets created today with the type “Refund”:
SELECT detail.ticket.subject,detail.ticket.description_text, detail.ticket.type  FROM default.freshdeskevents
where detail.ticket.type = 'Refund' and "detail-type" = 'onTicketCreate' and date(from_iso8601_timestamp(time)) = date(current_date)
  1. All tickets with an “Urgent” priority but not assigned to an agent:
SELECT "detail-type", detail.ticket.responder_id,detail.ticket.priority, detail.ticket.subject, detail.ticket.type  FROM default.freshdeskevents
where detail.ticket.responder_id is null and detail.ticket.priority = 4

Conclusion

In this blog post, you learn how to configure Freshworks partner event source from the Freshdesk console. Once a partner event source is configured, an AWS SAM template is deployed that creates a custom event bus by attaching the partner event source. A Kinesis Data Firehose, Lambda function, and S3 bucket is used to ingest Freshdesk’s ticket events data for analysis. An EventBridge rule is configured to route the event data to the S3 bucket.

Once event data starts flowing into the S3 bucket, an Amazon Athena table is created to run queries and analyze the ticket events data. Alternative customer service data analysis use cases can be built on the architecture shown in this blog.

To learn more about other partner integrations and the native capabilities of EventBridge, visit the AWS Compute Blog.

Monitor data quality in your data lake using PyDeequ and AWS Glue

Post Syndicated from Joan Aoanan original https://aws.amazon.com/blogs/big-data/monitor-data-quality-in-your-data-lake-using-pydeequ-and-aws-glue/

In our previous post, we introduced PyDeequ, an open-source Python wrapper over Deequ, which enables you to write unit tests on your data to ensure data quality. The use case we ran through was on static, historical data, but most datasets are dynamic, so how can you quantify how your data is changing and detect anomalous changes over time?

At Amazon, we’ve leveraged PyDeequ on AWS Glue to address this problem. AWS Glue is a serverless data integration service that allows you to easily prepare and combine your data for analytics, machine learning (ML), and application development. AWS Glue enables data engineers to build extract, transform, and load (ETL) workflows with ease. By using PyDeequ with AWS Glue, you can create a metrics repository on your data and check for anomalous changes over time inside your ETL workflows. In this post, we share this design pattern with you.

Use cases of PyDeequ on AWS Glue include:

  • Identifying and counting mismatched schema items and then immediately correcting them
  • Reviewing your incoming data with standard or custom, predefined analytics before storing it for big data validation
  • Tracking changes in data distribution by using a data quality metric file
  • Immediately identifying and creating useful constraints based on data distribution

The post describes the implementation process and provides a step-by-step tutorial of tracking changes in data quality. It walks you through an example of transforming a large dataset to identify the seasonality of the trends over time. Next, you create, sort, and load a metrics repository using PyDeequ, which allows you to persist your analysis over time. Finally, you create an alert that notifies you when a data point is outside the forecasted range.

Where are the Anomalies?

It can be difficult to immediately find anomalies within your incoming data stream over time. PyDeequ makes it easier to identify changes in data distribution by creating a metrics repository. The repository allows you to store and load a variety of anomaly checks to compare current and past metric values. For this post, you learn about the Holt Winters anomaly detection strategy, one of the various anomaly detection strategies that PyDeequ provides. The Holt Winters model forecasts future datasets based on a repeated periodical pattern (seasonality), a trend (slope), and the average between two corresponding time points.

You can apply the Holt Winters method in many different use cases, such as the following:

  • Business problem – Identifying a shift in the demand of a product
  • Data pattern – Input data deviates from trend and seasonality
  • Business analysis – Detecting changes in profits over time

To demonstrate this anomaly detection strategy, you use the AWS Customer Reviews Dataset, a collection of over 130 million reviews written in Amazon.com marketplace from 1995–2015. Specifically, you narrow down the dataset to focus on the total votes in the jewelry subset from 2013–2015. A graph of this data shows a tight correlation and seasonality with more engagement throughout the winter holidays. However, by 2015, the correlation deviates.

The following graph illustrates February 2015 as divergent from the previous years, with nearly 30% more engagement in votes.

How can we detect similar events like these in new data?

With PyDeequ, you can easily identify anomalies without any visuals. February 2015 is outside the calculated forecast range; therefore, PyDeequ flags the data point as anomalous. This post demonstrates using PyDeequ’s anomaly detection to get email notifications for anomalous events, which look like the following screenshot.

Solution architecture

With Amazon Athena and an AWS Glue crawler, you can create an AWS Glue Data Catalog to access the Amazon Simple Storage Service (Amazon S3) data source. This allows the data to be easily queried for usage downstream. You can use an Amazon SageMaker notebook with a configured AWS Glue development endpoint to interact with your AWS Glue ETL jobs. We configure our AWS Glue ETL jobs to use PyDeequ to store results in Amazon S3, and use Amazon Simple Notification Service (Amazon SNS) to notify administrators of any anomalies.

The following diagram illustrates this architecture.

Solution overview

To implement this solution, you complete the following high-level steps:

  1. Create an SNS topic.
  2. Upload PyDeequ and Deequ to Amazon S3.
  3. Create an AWS Identity and Access Management (IAM) role for AWS Glue.
  4. Crawl, query, and create your dataset.
  5. Transform the dataset into a table.
  6. Create an AWS Glue development endpoint.
  7. Create a SageMaker notebook to interface with the endpoint.
  8. Create a new AWS Glue session.
  9. Extract the table.
  10. Transform the table.
  11. Use PyDeequ to detect anomalous data points.

Create an SNS topic

Complete the following steps to create your SNS topic:

  1. On the Amazon SNS console, choose Topics.
  2. Choose Create topic.
  3. For Type, choose Standard.
  4. For Name, enter jewelry_hw.
  5. For Display name, enter Holt Winters Anomaly Example.
  6. Choose Create Topic.
  7. On the details page for the topic you just created, under Subscription, choose Create subscription.
  8. For Protocol, choose Email.
  9. For Endpoint, enter the email you want to receive the notification.
  10. Choose Create subscription. An email is sent to the entered endpoint.
  11. Open the email message and choose Confirm subscription.

Upload PyDeequ and Deequ to Amazon S3

In this step, you create an S3 bucket and upload PyDeequ and Deequ.

  1. On the Amazon S3 console, create a new bucket. We reference it as <__YOUR_BUCKET__> throughout this post.
  2. Inside your bucket, create a folder called dependencies.
  3. Download the deequ-1.0.3.jar file.
  4. Create a .zip file for PyDeequ by compressing the folder that contains the __init__.py file.
  5. Upload the Deequ and PyDeequ file to your dependencies folder.

If you’re on a *nix operating system or have the AWS Command Line Interface (AWS CLI) configured, you can use the following code:

$ wget https://repo1.maven.org/maven2/com/amazon/deequ/deequ/1.0.3/deequ-1.0.3.jar 
$ git clone https://github.com/awslabs/python-deequ.git
$ cd python-deequ && zip -r ../pydeequ.zip pydeequ && cd ../
$ aws s3 cp deequ-1.0.3.jar s3://<__YOUR_BUCKET__>/dependencies/
$ aws s3 cp pydeequ.zip s3://<__YOUR_BUCKET__>/dependencies/

Create an IAM role for AWS Glue

You now create an IAM role for AWS Glue and attach the required policies.

  1. On the IAM console, choose Roles.
  2. Choose Create a role.
  3. For Trusted entity, choose AWS Service.
  4. For Use case, choose Glue.
  5. Choose Next.
  6. Add the following policies to the role:
    1. AWSGlueServiceRole
    2. AWSGlueConsoleSageMakerNotebookFullAccess
  7. Add an inline policy to the role with the following JSON code.

Be sure to replace the resource values in the code. If you’re unsure what your Athena query outputs location is in Amazon S3, you can find it on the Settings tab on the Athena console.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:Put*",
                "s3:Get*",
                "s3:Create*",
                "s3:Delete*"
            ],
            "Resource": [
                "arn:aws:s3:::<__YOUR_BUCKET__>/*",
                "arn:aws:s3:::<__ATHENA_QUERY_OUTPUTS_BUCKET__>/*",
                "arn:aws:s3:::amazon-reviews-pds/parquet/*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": "sns:Publish",
            "Resource": "arn:aws:sns:*:*:jewelry_hw"
        }
    ]
}

Crawl, query, and create the dataset

First, you use an AWS Glue crawler to add the AWS Customer Reviews Dataset to the Data Catalog.

  1. On the Athena console, choose Connect Data Source.
  2. For Choose where your data is located, select Query data in Amazon S3.
  3. For Choose a metadata catalog, select AWS Glue data catalog.
  4. Choose Set up a crawler in AWS Glue to retrieve schema information automatically.
  5. Choose Connect to AWS Glue.
  6. For Crawler Name, enter jewelry_dataset_crawler.
  7. Choose Next.
  8. Choose Next again.
  9. For Crawler Source Type, choose Data stores.
  10. For Repeat crawls of S3 data stores, choose Crawl all folders.
  11. Choose Next.
  12. For Choose a data store, choose S3.
  13. For Crawl data in, select Specified path in another account.
  14. For Include path, enter: s3://amazon-reviews-pds/parquet/.
  15. Choose Next.
  16. In the Choose an IAM role section, select Choose an existing IAM role.
  17. Choose the IAM role we created earlier.
  18. Choose Next.
  19. Under Frequency, choose Run on Demand.

Alternatively, to test incoming data in the Data Catalog, you can change the frequency of the crawler.

  1. Choose Next.
  2. For Database, choose Add Database and enter jewelry_db.
  3. Choose Next.
  4. Review the crawler properties and choose Finish.
  5. Run the data crawler.

Transform the dataset into a table

Next, we transform the AWS Customer Reviews Dataset into a table with Athena.

  1. On the Athena console, under Database, choose the jewelry_db table.

The table parquet(Partitioned) should be listed under Tables. If the database doesn’t show up, choose the refresh icon above Connect data source.

Now let’s create a second table from this dataset. This table includes three columns, which contain where data has a product category jewelry and the marketplace is US. We use US as a filter to closely match holiday seasonal trends.

  1. Enter the following query:
    /*Athena jewelry dataset*/
    CREATE TABLE jewelry_db.jewelry_dataset
    WITH (
    format='PARQUET'
    ) AS
    SELECT total_votes, year,
    Date_FORMAT(review_date,
    '%Y-%c-01') AS review_date
    FROM parquet
    WHERE product_category = 'Jewelry' AND marketplace = 'US'
    ORDER BY review_date DESC

  2. Choose Run Query.

Under Tables, a new data table has been added called jewelry_dataset.

Create an AWS Glue development endpoint

To create your AWs Glue development endpoint, complete the following steps:

  1. On the AWS Glue console, choose Dev Endpoints.
  2. Choose Add endpoint.
  3. For Development endpoint name, enter jewelry_hw_example.
  4. In the IAM role section, select Choose an existing IAM role and choose the IAM role we created earlier.
  5. Under Python Library Path, choose the folder icon to navigate to the pydeequ.zip file in your S3 bucket.
  6. Under Dependent Jars Path, choose the folder icon to select the deequ-1.0.3.jar file in your S3 bucket.
  7. For AWS Glue Version, choose Spark 2.4, Python 3 (Glue Version 1.0).
  8. Choose Next.
  9. Review your settings and choose Finish.

Create a SageMaker notebook to interface with our endpoint

You’re redirected to the dev endpoint page. Under Provisioning Status, it currently says Provisioning. Wait until that changes to Ready. This may take more than 5 minutes.

  1. On the AWS Glue console, choose Notebooks.
  2. Choose Create notebook.
  3. For Notebook name, enter jewelry-hw.
  4. For Attach to development endpoint, choose jewelry_hw_example.
  5. Select Create an IAM Role.
  6. For IAM role, enter a name for your role.
  7. Choose Create notebook.

Now we can do our data analysis! You can walk through the following sections in your newly created SageMaker notebook.

Create an AWS Glue session

To create your AWS Glue session, complete the following steps:

  1. In your SageMaker notebook instance, choose New.
  2. Choose Sparkmagic (PySpark).

This creates a new notebook for you with a Sparkmagic (PySpark) kernel.

  1. Create an AWS Glue session using the following code:
    import sys
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    
    session = glueContext.spark_session
    
    # UPDATE ME:
    topic_arn = "<__SNS_TOPIC_ARN__>"
    s3_bucket = "<__S3_BUCKET_NAME__>"
    region = "<__REGION_YOUR_DEV_ENDPOINT_AND_SNS_TOPIC_ARE_IN__>"

Extract the table

You extract the data table jewelry_dataset and turn it into to a DataFrame so that it can be used with PyDeequ. Next, you use the dropDuplicates method to remove any potential duplicates within the dataset. See the following code:

jewelry_dyf = glueContext.create_dynamic_frame.from_catalog(database="jewelry_db", table_name="jewelry_dataset")
jewelry_df = jewelry_dyf.toDF()
jewelry_df.dropDuplicates()

The following screenshot shows your output.

Transform the table

We can further simply the jewelry_df table by using the date_format method to change the column to only show the month and year of total_votes. Afterwards, we can filter jewelry_df2 by year to contain only the two columns needed. See the following code:

import pyspark.sql.functions as f

jewelry_df2 = jewelry_df.withColumn('review_date', f.date_format('review_date', 'yyyy/M'))\
.orderBy('review_date', ascending = False)

df_2013 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")
df_2014 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")
df_2015 = jewelry_df2.filter("year ='2013'").select("review_date","total_votes")

We can use df_2013.show(10) to see an iteration of what our data table looks like before iterating through PyDeequ. The following screenshot shows our output.

Use PyDeequ to detect anomalous data points

For this post, we demonstrate detecting anomalous data points with the FileSystemMetricsRepository class. A metrics repository is stored in JSON format to be used as a data quality report over time in Amazon S3, HDFS, or in memory. The variable s3_write_path is where you want your JSON file to be stored within Amazon S3. See the following code:

s3_write_path = f"s3://{s3_bucket}/tmp/holt_winters_tutorial.json"
import pydeequ
from pydeequ.repository import *
metricsRepository = FileSystemMetricsRepository(session,s3_write_path)

We now load the 2013–2014 dataset into metrics.

If your dataset is collected monthly, and follows an annual seasonal trend, use the MetricInterval.Monthly and SeriesSeasonality.Yearly metrics. This selection requires you to collect at least 25 data points. The initial 24 data points are monthly values from 2013–2014, which we use to create the Holt Winters model. The values in 2015 are the forecasted points, which could can concede an anomalous value.

As shown in the following code, we create a for loop that iterates through df_2013. We use month to create a date to later help us query values from df_2013. The filter method allows us create a df data frame that contains the total_votes values by month (for this post, the first iteration is a table of values from January 2013).

Next, each set of metrics that we computed needs be indexed by a ResultKey, which contains a timestamp and supports arbitrary tags in the form of key-value pairs.

Finally, we create a VerificationSuite. We make PyDeequ write and store our metrics in Amazon S3 by adding the useRepository and saveOrAppendResult method. Then we add Holt Winters with a Sum analyzer to calculate monthly total_votes. See the following code:

from pydeequ.verification import *

for year in ['2013','2014']:
    for month in range(1,13):
        date = f"\'{year}/{month}\'"
        df = df_2013.filter(f"review_date = {date}")

        key_tags = {'tag':  date}
        result_key_2013 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

        jewelry_result = VerificationSuite(session).onData(df)\
            .useRepository(metricsRepository) \
            .saveOrAppendResult(result_key_2013) \
            .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
            .run()

Great! We have created the trend for the Holt Winters algorithm. Now it’s time to detect any anomalies within 2015.

Create another Holt Winters anomaly check similar to the 2013–2014 dataset, except  iterate only to August (because the dataset only goes to August of 2015). For each month, we check for an anomaly using jewelry_result.status. If it’s not a success, that means an anomaly has been detected. Collect the constraint_message to see the error value. Use publish to create an SNS notification. Include the topicArn that we created in Amazon SNS, a Message, subject, and MessageAttribute. If an anomaly has been detected, break out of the loop. See the following code:

# Use AWS SNS 
import boto3 
import json

# Topic for AWS SNS 
snsClient = boto3.client('sns', region_name = region)

for month in range(1,9):
    date = "\'2015" +'/'+str(month)+"\'"
    df = df_2015.filter("review_date =" + date)
    key_tags = {'tag':  date}
    result_key_2015 = ResultKey(session, ResultKey.current_milli_time(), key_tags)

    jewelry_result = VerificationSuite(session).onData(df)\
        .useRepository(metricsRepository) \
        .saveOrAppendResult(result_key_2015) \
        .addAnomalyCheck(HoltWinters(MetricInterval.Monthly, SeriesSeasonality.Yearly), Sum('total_votes'))\
        .run()
    
    df = VerificationResult.checkResultsAsDataFrame(session, jewelry_result)
    
    if (jewelry_result.status != "Success"):
        print("Anomaly for total_votes has been detected")
        print(date)
        message = df.select("constraint_message").collect()
        response = snsClient.publish(TopicArn = topic_arn,
                             Message = "anomaly detected in data frame: \n" + json.dumps(message),
                             Subject = "Anomaly Detected in the jewelry dataset:"+ date,
                             MessageAttributes = {"TransactionType":
                                            {"DataType": "String.Array", "StringValue": "Anomaly Detected in Glue"}})
        break

After completing this tutorial, you should receive an email notification stating an anomaly has been detected for February 2015. This coincides with our hypothesis that PyDeequ will flag the same anomaly from the graph!

More on using AWS Glue and PyDeequ

This post shows how you can start exploring anomaly detection with PyDeequ. This simple tutorial is just the beginning of what you can do with AWS Glue. To add to this tutorial, you can create a time-based schedule for jobs and crawlers to run every time a dataset is appended.

Alternatively, you can use the different modules provided by PyDeequ and its tutorials, or the use case examples provided at the beginning of this post to further understand the dataset.

Resource cleanup

Clean up the resources created in this post when you’re finished:

Conclusion

This post demonstrates the basics of detecting anomalies using PyDeequ and AWS Glue. Anomaly detection relies on the metrics repository file. This repository can easily be stored within Amazon S3, HDFS, or in memory as a JSON object for future test usage and AWS Glue ETL jobs. In addition to AWS Glue, PyDeequ can function within Amazon EMR and SageMaker in order to best handle the needs of your data pipeline.

This approach allows you to improve the quality and your own knowledge of your dataset. You can also apply this tool to a variety of business scenarios. The contents of this tutorial are for demonstration purposes and not production workloads. Be sure to follow security best practices for handling data at rest and in transit when you adapt PyDeequ into your workflows.


About the Authors

Joan Aoanan is a ProServe Consultant at AWS. With her B.S. Mathematics-Computer Science degree from Gonzaga University, she is interested in integrating her interests in math and science with technology.

 

 

Veronika Megler, PhD, is Principal Data Scientist for Amazon.com Consumer Packaging. Until recently she was the Principal Data Scientist for AWS Professional Services. She enjoys adapting innovative big data, AI, and ML technologies to help companies solve new problems, and to solve old problems more efficiently and effectively. Her work has lately been focused more heavily on economic impacts of ML models and exploring causality.

 

 

Calvin Wang is a Data Scientist at AWS AI/ML. He holds a B.S. in Computer Science from UC Santa Barbara and loves using machine learning to build cool stuff.

 

 

 

Run usage analytics on Amazon QuickSight using AWS CloudTrail

Post Syndicated from Sunil Salunkhe original https://aws.amazon.com/blogs/big-data/run-usage-analytics-on-amazon-quicksight-using-aws-cloudtrail/

Amazon QuickSight is a cloud-native BI service that allows end users to create and publish dashboards in minutes, without provisioning any servers or requiring complex licensing. You can view these dashboards on the QuickSight product console or embed them into applications and websites. After you deploy a dashboard, it’s important to assess how they and other assets are being adopted, accessed, and used across various departments or customers.

In this post, we use a QuickSight dashboard to present the following insights:

  • Most viewed and accessed dashboards
  • Most updated dashboards and analyses
  • Most popular datasets
  • Active users vs. idle users
  • Idle authors
  • Unused datasets (wasted SPICE capacity)

You can use these insights to reduce costs and create operational efficiencies in a deployment. The following diagram illustrates this architecture.

The following diagram illustrates this architecture.

Solution components

The following table summarizes the AWS services and resources that this solution uses.

Resource Type Name Purpose
AWS CloudTrail logs CloudTrailMultiAccount Capture all API calls for all AWS services across all AWS Regions for this account. You can use AWS Organizations to consolidate trails across multiple AWS accounts.
AWS Glue crawler

QSCloudTrailLogsCrawler

QSProcessedDataCrawler

Ensures that all CloudTrail data is crawled periodically and that partitions are updated in the AWS Glue Data Catalog.
AWS Glue ETL job QuickSightCloudTrailProcessing Reads catalogued data from the crawler, processes, transforms, and stores it in an S3 output bucket.
AWS Lambda function ExtractQSMetadata_func Extracts event data using the AWS SDK for Python, Boto3. The event data is enriched with QuickSight metadata objects like user, analysis, datasets, and dashboards.
Amazon Simple Storage Service (s3)

CloudTrailLogsBucket

QuickSight-BIonBI-processed

One bucket stores CloudTrail data. The other stores processed data.
Amazon QuickSight Quicksight_BI_On_BO_Analysis Visualizes the processed data.

 Solution walkthrough

AWS CloudTrail is a service that enables governance, compliance, operational auditing, and risk auditing of your AWS account. You can use CloudTrail to log, continuously monitor, and retain account activity related to actions across your AWS infrastructure. You can define a trail to collect API actions across all AWS Regions. Although we have enabled a trail for all Regions in our solution, the dashboard shows the data for single Region only.

After you enable CloudTrail, it starts capturing all API actions and then, at 15-minute intervals, delivers logs in JSON format to a configured Amazon Simple Storage Service (Amazon S3) bucket. Before the logs are made available to our ad hoc query engine, Amazon Athena, they must be parsed, transformed, and processed by the AWS Glue crawler and ETL job.

Before the logs are made available to our ad hoc query engine

This will be handled by AWS Glue Crawler & AWS Glue ETL Job. The AWS Glue crawler crawls through the data every day and populates new partitions in the Data Catalog. The data is later made available as a table on the Athena console for processing by the AWS Glue ETL job. Glue ETL Job QuickSightCloudtrail_GlueJob.txt filters logs and processes only those events where the event source is QuickSight. (for example, eventSource = quicksight.amazonaws.com’).

  This will be handled by AWS Glue Crawler & AWS Glue ETL Job.

The following screenshot shows the sample JSON for the QuickSight API calls.

The following screenshot shows the sample JSON for the QuickSight API calls.

The job processes those events and creates a Parquet file. The following table summarizes the file’s data points.

Quicksightlogs
Field Name Data Type
eventtime Datetime
eventname String
awsregion String
accountid String
username String
analysisname String
Date Date

The processed data is stored in an S3 folder at s3://<BucketName>/processedlogs/. For performance optimization during querying and connecting this data to QuickSight for visualization, these logs are partitioned by date field. For this reason, we recommend that you configure the AWS Glue crawler to detect the new data and partitions and update the Data Catalog for subsequent analysis. We have configured the crawler to run one time a day.

We need to enrich this log data with metadata from QuickSight, such as a list of analyses, users, and datasets. This metadata can be extracted using descibe_analysis, describe_user, describe_data_set in the AWS SDK for Python.

We provide an AWS Lambda function that is ideal for this extraction. We configured it to be triggered once a day through Amazon EventBridge. The extracted metadata is stored in the S3 folder at s3://<BucketName>/metadata/.

Now that we have processed logs and metadata for enrichment, we need to prepare the data visualization in QuickSight. Athena allows us to build views that can be imported into QuickSight as datasets.

We build the following views based on the tables populated by the Lambda function and the ETL job:

CREATE VIEW vw_quicksight_bionbi 
AS 
  SELECT Date_parse(eventtime, '%Y-%m-%dT%H:%i:%SZ') AS "Event Time", 
         eventname  AS "Event Name", 
         awsregion  AS "AWS Region", 
         accountid  AS "Account ID", 
         username   AS "User Name", 
         analysisname AS "Analysis Name", 
         dashboardname AS "Dashboard Name", 
         Date_parse(date, '%Y%m%d') AS "Event Date" 
  FROM   "quicksightbionbi"."quicksightoutput_aggregatedoutput" 

CREATE VIEW vw_users 
AS 
  SELECT usr.username "User Name", 
         usr.role     AS "Role", 
         usr.active   AS "Active" 
  FROM   (quicksightbionbi.users 
          CROSS JOIN Unnest("users") t (usr)) 

CREATE VIEW vw_analysis 
AS 
  SELECT aly.analysisname "Analysis Name", 
         aly.analysisid   AS "Analysis ID" 
  FROM   (quicksightbionbi.analysis 
          CROSS JOIN Unnest("analysis") t (aly)) 

CREATE VIEW vw_analysisdatasets 
AS 
  SELECT alyds.analysesname "Analysis Name", 
         alyds.analysisid   AS "Analysis ID", 
         alyds.datasetid    AS "Dataset ID", 
         alyds.datasetname  AS "Dataset Name" 
  FROM   (quicksightbionbi.analysisdatasets 
          CROSS JOIN Unnest("analysisdatasets") t (alyds)) 

CREATE VIEW vw_datasets 
AS 
  SELECT ds.datasetname AS "Dataset Name", 
         ds.importmode  AS "Import Mode" 
  FROM   (quicksightbionbi.datasets 
          CROSS JOIN Unnest("datasets") t (ds))

QuickSight visualization

Follow these steps to connect the prepared data with QuickSight and start building the BI visualization.

  1. Sign in to the AWS Management Console and open the QuickSight console.

You can set up QuickSight access for end users through SSO providers such as AWS Single Sign-On (AWS SSO), Okta, Ping, and Azure AD so they don’t need to open the console.

You can set up QuickSight access for end users through SSO providers

  1. On the QuickSight console, choose Datasets.
  2. Choose New dataset to create a dataset for our analysis.

Choose New dataset to create a dataset for our analysis.

  1. For Create a Data Set, choose Athena.

In the previous steps, we prepared all our data in the form of Athena views.

  1. Configure permission for QuickSight to access AWS services, including Athena and its S3 buckets. For information, see Accessing Data Sources.

Configure permission for QuickSight to access AWS services,

  1. For Data source name, enter QuickSightBIbBI.
  2. Choose Create data source.

Choose Create data source.

  1. On Choose your table, for Database, choose quicksightbionbi.
  2. For Tables, select vw_quicksight_bionbi.
  3. Choose Select.

Choose Select.

  1. For Finish data set creation, there are two options to choose from:
    1. Import to SPICE for quicker analytics – Built from the ground up for the cloud, SPICE uses a combination of columnar storage, in-memory technologies enabled through the latest hardware innovations, and machine code generation to run interactive queries on large datasets and get rapid responses. We use this option for this post.
    2. Directly query your data – You can connect to the data source in real time, but if the data query is expected to bring bulky results, this option might slow down the dashboard refresh.
  2. Choose Visualize to complete the data source creation process.

Choose Visualize to complete the data source creation process.

Now you can build your visualizations sheets. QuickSight refreshes the data source first. You can also schedule a periodic refresh of your data source.

Now you can build your visualizations sheets.

The following screenshot shows some examples of visualizations we built from the data source.

The following screenshot shows some examples of visualizations we built from the data source.

 

This dashboard presents us with two main areas for cost optimization:

  • Usage analysis – We can see how analyses and dashboards are being consumed by users. This area highlights the opportunity for cost saving by looking at datasets that have not been used for the last 90 days in any of the analysis but are still holding a major chunk of SPICE capacity.
  • Account governance – Because author subscriptions are charged on a fixed fee basis, it’s important to monitor if they are actively used. The dashboard helps us identify idle authors for the last 60 days.

Based on the information in the dashboard, we could do the following to save costs:

Conclusion

In this post, we showed how you can use CloudTrail logs to review the use of QuickSight objects, including analysis, dashboards, datasets, and users. You can use the information available in dashboards to save money on storage, subscriptions, understand maturity of QuickSight Tool adoption and more.


About the Author

Sunil SalunkheSunil Salunkhe is a Senior Solution Architect working with Strategic Accounts on their vision to leverage the cloud to drive aggressive growth strategies. He practices customer obsession by solving their complex challenges in all the aspects of the cloud journey including scale, security and reliability. While not working, he enjoys playing cricket and go cycling with his wife and a son.

Retaining data streams up to one year with Amazon Kinesis Data Streams

Post Syndicated from Nihar Sheth original https://aws.amazon.com/blogs/big-data/retaining-data-streams-up-to-one-year-with-amazon-kinesis-data-streams/

Streaming data is used extensively for use cases like sharing data between applications, streaming ETL (extract, transform, and load), real-time analytics, processing data from internet of things (IoT) devices, application monitoring, fraud detection, live leaderboards, and more. Typically, data streams are stored for short durations of time before being loaded into a permanent data store like a data lake or analytics service.

Additional use cases are becoming more prevalent that may require you retain data in streams for longer periods of time. For example, compliance programs like HIPAA and FedRAMP may require you to store raw data for more than a few days or weeks, or you may want to backtest machine learning (ML) algorithms with historical data that may be several months old.

A challenge arises when you want to process historical data and newly arriving data streams. This requires complex logic to access your data lake and your data stream store, or two sets of code—one to process data from your data lake and one to process your new data streams.

Amazon Kinesis Data Streams solves this challenge by storing your data streams up to 1 year with long-term retention. You can use the same Kinesis Data Streams code base to process both historical and newly arriving data streams, and continue to use features like enhanced fan-out to read large data volumes at very high throughput.

In this post, we describe how long-term retention enables new use cases by bridging real-time and historical data processing. We also demonstrate how you can reduce the time to retrieve 30 days of data from a data stream by an order of magnitude using Kinesis Data Streams enhanced fan-out.

Simple setup, no resource provisioning

Kinesis Data Streams durably stores all data stream records in a shard, an append-only log ordered by arrival time. The time period from when a record is added to when it’s no longer accessible is called the retention period. A Kinesis data stream stores records for 24 hours by default, up to 365 days (8,760 hours). Applications can start reading data at any point in the retention period in the exact order in which the data stream is stored. Shards enable these applications to process data in parallel and at low-latency.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

You can select a preset retention period or define a custom retention period in days or hours using the Kinesis Data Streams console, as in the following screenshot.

The default retention period is 24 hours and covers scenarios where intermittent lags in processing need to catch up with the real-time data. You can extend retention up to 7 days to reprocess slightly aged data to resolve potential downstream data losses. You can also use long-term retention to store data for more than 7 days and up to 365 days to reprocess historical data for use cases like algorithm backtesting, data store backfills, and auditing. For more information, see Changing the Data Retention Period.

Similarly, you can use the following AWS Command Line Interface (AWS CLI) command to set the retention period in hours (the following code sets it to 9 days, or 216 hours):

aws kinesis increase-stream-retention-period \
    --stream-name samplestream \
    --retention-period-hours 216

Read new and historical data, no code changes necessary

All the data captured in the stream is stored in a durable, encrypted, and secure manner for the specified retention period up to a maximum of 1 year. You can store any amount of data, retrieve it by specifying a start position, and read sequentially using the familiar getRecords and SubscribeToShard APIs. The start position can be the sequence number of a data record in a shard or a timestamp. This enables you to use the same code to process older data. You can set up multiple consuming applications to start processing data at different points in the data stream.

Speed up data reads using enhanced fan-out consumers

Kinesis Data Streams provides two types of models to consume data: shared throughput consumer and enhanced fan-out (EFO) consumer. In the shared throughput consumer model, all the consuming applications share 2 MB/s per shard read throughput and a 5 transactions per second (TPS) quota. In the enhanced fan-out model, each consumer gets a dedicated read throughput of 2MB/s per shard. Because it uses an HTTP/2 data retrieval API, there is no longer a limit of 5 TPS. You can attach up to 20 EFO consumers to a single stream and read data at a total rate of 40MB/s per shard. Because each consumer gets dedicated read throughput, processing one doesn’t impact another. So you can attach new consumers to process old data without worrying about the performance of the existing consumer processing real-time data. For example, you can retrain an ML model in an ad hoc fashion without impacting real-time workflows.

You can add and remove EFO consumers at any time and avoid paying for over-provisioned resources. For example, when backtesting, you can register EFO consumers before the test and remove them after completion. You’re only charged for resources used during the test. Also, you can use EFO consumers to accelerate the speed of processing. Each consuming application can process different parts of streams across the retention period to process all the data in parallel, thereby dramatically reducing the total processing time.

Clickstream pipeline use case

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

Let’s look at a clickstream use case to see how this works for an existing streaming pipeline like the one in the following diagram.

This pipeline takes clickstream data and creates an alert every time a user leaves your ecommerce site without purchasing the items in their cart. A simple pipeline like this is a great way to start with stream processing, but soon you may want to implement a recommendation system based on user activity on your website and mobile app. To do this, you need to gather historical data in your existing data stream and send it to Amazon Simple Storage Service (Amazon S3) so it can be used for training a recommendation ML model. This scenario illustrates a key benefit of enabling long-term retention: it gives you the flexibility to “go back in time” and replay the existing data in your stream to generate new analytics that you may not have considered when you initially set up the streaming pipeline.

Let’s say you enabled 30 days of retention on your Kinesis data stream. After you train your ML model, you can set up a new streaming pipeline that generates recommendations by calling an inference endpoint hosted on Amazon SageMaker based on the trained ML model. The following diagram illustrates the final state of this architecture.

The following diagram illustrates the final state of this architecture.

You can efficiently and quickly consume the existing data in the stream and write it to Amazon S3 so it can be used for training your ML model. The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

The following diagram illustrates the architecture of this intermediate pipeline to generate training data.

You may wonder, why read from Kinesis Data Streams and write to Amazon S3? Why not write to Amazon S3 directly without enabling long-term retention? First, ingesting into Kinesis Data Streams with long-term retention enabled gives you the flexibility to generate additional streaming analytics as time passes. Second, this gives you the flexibility to filter and transform the data being read from Kinesis Data Streams before generating analytics or writing to Amazon S3. Lastly, you can use this approach to render analytics onto other systems besides Amazon S3, such as Amazon Elasticsearch Service (Amazon ES) using the Elasticsearch sink for Apache Flink.

Keep in mind that we only use this pipeline to bootstrap our second, long-lived pipeline that does recommendations, but this is an important step and we need a way to do this efficiently. Although there are multiple options for consuming data from Kinesis Data Streams, Amazon Kinesis Data Analytics for Apache Flink provides an elegant way to attach multiple EFO consumers in the same consuming application.

You can find more information at the official Apache Flink website, and about Kinesis Data Analytics for Apache Flink in the Kinesis Data Analytics developer guide. Apache Flink has a number of connectors, like the recently released FlinkKinesisConsumer, which supports enhanced fan-out for consuming from Kinesis Data Streams, or the Streaming File Sink to write to Amazon S3 from your Apache Flink application.

Accelerating data consumption

For the sake of simplicity, let’s use just one shard in our data stream, ingest data at the maximum rate of 1MB/s, and specify a retention period of 30 days. To bootstrap our new analytics, reading the full amount of data over 30 days with one EFO consumer at 2MB/s could potentially take up to 15 days to load this data into Amazon S3. However, you can accelerate this to 20 times faster using 20 EFO consumers at the same time, each reading from different points in the stream at 2 MB/s. The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

The following diagram illustrates the architecture of multiple EFO consumers reading from multiple time slices.

This gives us a total of 40MB/s in consumption capacity as opposed to 2MB/s per shard with just one EFO consumer, reducing the overall time by 95%. In most use cases, this combination of Kinesis Data Analytics and EFO allows you to process 30 days of data in hours, instead of days.

A point of clarification regarding our approach: When all 20 consumers are finished reading past their respective endpoints in the stream, we stop the Apache Flink application. You can do this by raising an exception when all 20 consumers finish reading their respective time slices—effectively stopping the application. The following diagram illustrates the time savings we get from using 20 EFO consumers.

The following diagram illustrates the time savings we get from using 20 EFO consumers.

For more information about implementing this approach, see the GitHub repo.

Pricing

An additional cost is associated with long-term retention (from 7–365 days) and EFO consumers. For more information, see Amazon Kinesis Data Streams pricing. Because you can register EFO consumers on demand, you pay only for the limited time you used all 20 consumers to load data, resulting in faster loads. It’s important to point out that you pay roughly the same amount to consume a fixed volume of data from the stream with 20 EFO consumers as you do with 1 EFO consumer because of the shorter duration required when using 20 consumers. 

Summary

In this post, we discussed long-term retention use cases of Kinesis Data Streams, how to increase the retention of a data stream, and related feature enhancements with Kinesis Data Streams APIs and KCL. We took a deep dive into the Apache Flink-based enhanced-fan out consumer approach to replay long-term data quickly. We shared open-source code based on this approach so you can easily implement your use cases using Kinesis Data Streams long-term retention. 

You should use long-term retention if you’re planning to develop ML systems, generate customer behavior insights, or have compliance requirements for retaining raw data for more than 7 days. We would love to hear about your use cases with the long-term retention feature. Please submit your feedback to [email protected].


About the Authors

Nihar ShethNihar Sheth is a Senior Product Manager on the Amazon Kinesis Data Streams team at Amazon Web Services. He is passionate about developing intuitive product experiences that solve complex customer problems and enables customers to achieve their business goals. Outside of work, he is focusing on hiking 200 miles of beautiful PNW trails with his son in 2021.

 

 

Karthi Thyagarajan is a Solutions Architect on the Amazon Kinesis Team focusing on all things streaming and he enjoys helping customers tackle distributed systems challenges.

 

 

 

 

Sai Maddali is a Sr. Product Manager – Tech at Amazon Web Services where he works on Amazon Kinesis Data Streams . He is passionate about understanding customer needs, and using technology to deliver services that empowers customers to build innovative applications. Besides work, he enjoys traveling, cooking, and running.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

Building an administrative console in Amazon QuickSight to analyze usage metrics

Post Syndicated from Ying Wang original https://aws.amazon.com/blogs/big-data/building-an-administrative-console-in-amazon-quicksight-to-analyze-usage-metrics/

Given the scalability of Amazon QuickSight to hundreds and thousands of users, a common use case is to monitor QuickSight group and user activities, analyze the utilization of dashboards, and identify usage patterns of an individual user and dashboard. With timely access to interactive usage metrics, business intelligence (BI) administrators and data team leads can efficiently plan for stakeholder engagement and dashboard improvements. For example, you can remove inactive authors to reduce license cost, as well as analyze dashboard popularity to understand user acceptance and stickiness.

This post demonstrates how to build an administrative console dashboard and serverless data pipeline. We combine QuickSight APIs with AWS CloudTrail logs to create the datasets to collect comprehensive information of user behavior and QuickSight asset usage patterns.

This post provides a detailed workflow that covers the data pipeline, sample Python code, and a sample dashboard of this administrative console. With the guidance of this post, you can configure this administrative console in your own environment.

Let’s look at Forwood Safety, an innovative, values-driven company with a laser focus on fatality prevention. An early adopter of QuickSight, they have collaborated with AWS to deploy this solution to collect BI application usage insights.

“Our engineers love this admin console solution,” says Faye Crompton, Leader of Analytics and Benchmarking at Forwood. “It helps us to understand how users analyze critical control learnings by helping us to quickly identify the most frequently visited dashboards in Forwood’s self-service analytics and reporting tool, FAST.”

Solution overview

The following diagram illustrates the workflow of the solution.

The following diagram illustrates the workflow of the solution.

The workflow involves the following steps:

  1. The AWS Lambda function Data_Prepare is scheduled to run hourly. This function calls QuickSight APIs to get QuickSight namespace, group, user, and assets access permissions information and saves the results to an Amazon Simple Storage Service (Amazon S3) bucket.
  2. CloudTrail logs are stored in S3 bucket.
  3. Based on the file in Amazon S3 that contains user-group information, the QuickSight assets access permissions information, as well as view dashboard and user login events in CloudTrail logs. Three Amazon Athena tables and several views are created. Optionally, the BI engineer can combine these two tables with employee information tables to display human resource information of the users.
  4. Two QuickSight datasets fetch the data in the Athena tables created in Step 3 through SPICE mode. Then, based on these datasets, a QuickSight dashboard is created.

Prerequisites

For this walkthrough, you should have the following prerequisites:

  • An AWS account
  • Access to the following AWS services:
    • Amazon QuickSight
    • Amazon Athena
    • AWS Lambda
    • Amazon S3
  • Basic knowledge of Python
  • Optionally, Security Assertion Markup Language 2.0 (SAML 2.0) or OpenID Connect (OIDC) single sign-on (SSO) configured for QuickSight access

Creating resources

Create your resources by launching the following AWS CloudFormation stack:

After the stack creation is successful, you have one Amazon CloudWatch Events rule, one Lambda function, one S3 bucket, and the corresponding AWS Identity and Access Management (IAM) policies.

To create the resources in a Region other than us-east-1, download the Lambda function.

Creating Athena tables

The Data_Prepare Lambda function is scheduled to run hourly with the CloudWatch Events rule admin-console-every-hour. This function calls the QuickSight APIs list_namespaces, list_users, list_user_groups, list_dashboards, list_datasets, list_datasources, list_analyses, list_themes, describe_data_set_permissions, describe_dashboard_permissions, describe_data_source_permissions, describe_analysis_permissions, and describe_theme_permissions to get QuickSight users and assets access permissions information. Finally, this function creates two files, group_membership.csv and object_access.csv, and saves these files to an S3 bucket.

Run the following SQL query to create two Athena tables (group_membership and object_access):

CREATE EXTERNAL TABLE `group_membership`(
`namespace` string,   
`group` string, 
`user` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3:// admin-console<aws_account_id>/monitoring/quicksight/group_membership/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',',
  'typeOfData'='file')
CREATE EXTERNAL TABLE `object_access`(
`aws_region` string,   
`object_type` string, 
`object_name` string,
`object_id` string,
`principal_type` string,
`principal_name` string,
`namespace` string,
`permissions` string
)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT   'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3:// admin-console<aws_account_id>/monitoring/quicksight/object_access/'
TBLPROPERTIES (
  'areColumnsQuoted'='false', 
  'classification'='csv', 
  'columnsOrdered'='true', 
  'compressionType'='none', 
  'delimiter'=',',
  'typeOfData'='file')

The following screenshot is sample data of the group_membership table.

The following screenshot is sample data of the group_membership table.

The following screenshot is sample data of the object_access table.

The following screenshot is sample data of the object_access table.

For instructions on building an Athena table with CloudTrail events, see Amazon QuickSight Now Supports Audit Logging with AWS CloudTrail. For this post, we create the table cloudtrail_logs in the default database.

Creating views in Athena

Now we have the tables ready in Athena and can run SQL queries against them to generate some views to analyze the usage metrics of dashboards and users.

Create a view of a user’s role status with the following code:

CREATE OR REPLACE VIEW users AS
(select Namespace,
 Group,
 User,
(case 
when Group in ('quicksight-fed-bi-developer', 'quicksight-fed-bi-admin') 
then 'Author' 
else 'Reader' 
end) 
as author_status
from "group_membership" );

Create a view of GetDashboard events that happened in the last 3 months with the following code:

CREATE OR REPLACE VIEW getdashboard AS 
(SELECT 
"useridentity"."type",   "split_part"("useridentity"."sessioncontext"."sessionissuer"."arn",'/', 2) AS "assumed_role", COALESCE("useridentity"."username","concat"("split_part"("userid
entity"."arn", '/', 2), '/', "split_part"("useridentity"."arn",
'/', 3))) AS "user_name",
awsregion,
"split_part"("split_part"("serviceeventdetails", 'dashboardName":', 2),',', 1) AS dashboard_name, "split_part"("split_part"("split_part"("split_part"("serviceeventdetails", 'dashboardId":', 2),',', 1), 'dashboard/', 2),'"}',1) AS dashboardId,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time, max(date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ')) AS latest_event_time
FROM cloudtrail_logs
WHERE 
eventsource = 'quicksight.amazonaws.com' 
AND
eventname = 'GetDashboard' 
AND
DATE_TRUNC('day',date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ')) > cast(current_date - interval '3' month AS date)
GROUP BY  1,2,3,4,5,6,7)

In the preceding query, the conditions defined in the where clause only fetch the records of GetDashboard events of QuickSight.

How can we design queries to fetch records of other events? We can review the CloudTrail logs to look for the information. For example, let’s look at the sample GetDashboard CloudTrail event:

{
    "userIdentity": {
        "type": "AssumedRole",
        "principalId": "<principal_id>: <user_name>",
        "arn": "arn:aws:sts:: <aws_account_id>:assumed-role/<IAM_role_ name>/<user_name>",
        "accountId": "<aws_account_id>",
        "sessionContext": {
            "sessionIssuer": {
                "type": "Role",
                "principalId": "<principal_id>",
                …
            }
        }
    },
    "eventTime": "2021-01-13T16:55:36Z",
    "eventSource": "quicksight.amazonaws.com",
    "eventName": "GetDashboard",
    "awsRegion": "us-east-1",
    "eventID": "a599c8be-003f-46b7-a40f-2319efb6b87a",
    "readOnly": true,
    "eventType": "AwsServiceEvent",
    "serviceEventDetails": {
        "eventRequestDetails": {
            "dashboardId": "arn:aws:quicksight:us-east-1: <aws_account_id>:dashboard/<dashboard_id>"
        },
        "eventResponseDetails": {
            "dashboardDetails": {
                "dashboardName": "Admin Console",
                "dashboardId": "arn:aws:quicksight:us-east-1: <aws_account_id>:dashboard/<dashboard_id>",
                "analysisIdList": [
                    "arn:aws:quicksight:us-east-1: <aws_account_id>:analysis/<analysis_id>"
            }
        }
    }
}

With eventSource=“quicksight.amazonaws.com” and eventName=“GetDashboard”, we can get all the view QuickSight dashboard events.

Similarly, we can define the condition as eventname = ‘AssumeRoleWithSAML‘ to fetch the user login events. (This solution assumes that the users log in to their QuickSight account with identity federation through SAML.) For more information about querying CloudTrail logs to monitor other interesting user behaviors, see Using administrative dashboards for a centralized view of Amazon QuickSight objects.

Furthermore, we can join with employee information tables to get a QuickSight user’s human resources information.

Finally, we can generate a view called admin_console with QuickSight group and user information, assets information, CloudTrail logs, and, optionally, employee information. The following screenshot shows an example preview.

The following screenshot shows an example preview.

Creating datasets

With the Athena views ready, we can build some QuickSight datasets. We can load the view called admin_console to build a SPICE dataset called admin_console and schedule this dataset to be refreshed hourly. Optionally, you can create a similar dataset called admin_console_login_events with the Athena table based on eventname = ‘AssumeRoleWithSAML‘ to analyze QuickSight users log in events. According to the usage metrics requirement in your organization, you can create other datasets to serve the different requests.

Building dashboards

Now we can build a QuickSight dashboard as the administrative console to analyze usage metrics. The following steps are based on the dataset admin_console. The schema of the optional dataset admin_console_login_events is the same as admin_console. You can apply the same logic to create the calculated fields to analyze user login activities.

  1. Create parameters.

For example, we can create a parameter called InActivityMonths, as in the following screenshot.For example, we can create a parameter called InActivityMonths, as in the following screenshot.Similarly, we can create other parameters such as InActivityDays, Start Date, and End Date.

  1. Create controls based on the parameters.

Create controls based on the parameters.

  1. Create calculated fields.

For instance, we can create a calculated field to detect the active or inactive status of QuickSight authors. If the time span between the latest view dashboard activity and now is larger or equal to the number defined in the Inactivity Months control, the author status is Inactive. The following screenshot shows the relevant code.

The following screenshot shows the relevant code.

According to end user’s requirement, we can define several calculated fields to perform the analysis.

  1. Create visuals.

For example, we create an insight to display the top three dashboards view by readers and a visual to display the authors of these dashboards.

For example, we create an insight to display the top three dashboards view by readers and a visual to display the authors of these dashboards.

  1. We can add URL action to define some extra features to email inactive authors or check details of users.

We can add URL action to define some extra features to email inactive authors or check details of users.

The following sample code defines the action to email inactive authors:

mailto:<<email>>?subject=Alert to inactive author! &body=Hi, <<username>>, any author without activity for more than a month will be deleted. Please log in to your QuickSight account to continue accessing and building analyses and dashboards!

The following sample code defines the action to email inactive authors:
The following screenshots show an example dashboard that you can make using our data.

The following is the administrative console landing page. We provide the overview, terminology explanation and thumbnails of the other two tabs in this page.

The following is the administrative console landing page.

The following screenshots show the User Analysis tab.

The following screenshots show the User Analysis tab.

The following screenshots show the Dashboards Analysis tab.

The following screenshots show the Dashboards Analysis tab.

You can interactively play with the sample dashboard in the following Interactive Dashboard Demo.

You can reference to public template of the preceding dashboard in create-template, create-analysis, and create-dashboard API calls to create this dashboard and analysis in your account. The public template of this dashboard with the template ARN is 'TemplateArn': 'arn:aws:quicksight:us-east-1:889399602426:template/admin-console'.

Additional usage metrics

Additionally, we can perform some complicated analysis to collect advanced usage metrics. For example, Forwood Safety raised a unique request to analyze the readers who log in but don’t do any viewing of dashboard actions (see the following code). This helps their clients identify and prevent any wasting of reader sessions fees. Leadership teams value the ability to minimize uneconomical user activity.

CREATE OR REPLACE VIEW "loginwithoutviewdashboard" AS
with login as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventname = 'AssumeRoleWithSAML'
GROUP BY  1,2,3),
dashboard as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventsource = 'quicksight.amazonaws.com'
AND
eventname = 'GetDashboard'
GROUP BY  1,2,3),
users as 
(select Namespace,
Group,
User,
(case
when Group in (‘quicksight-fed-bi-developer’, ‘quicksight-fed-bi-admin’)
then ‘Author’
else ‘Reader’
end)
as author_status
from "group_membership" )
select l.* 
from login as l 
join dashboard as d 
join users as u 
on l.user_name=d.user_name 
and 
l.awsregion=d.awsregion 
and 
l.user_name=u.user_name
where d.event_time>(l.event_time + interval '30' minute ) 
and 
d.event_time<l.event_time 
and 
u.author_status='Reader'

Cleaning up

To avoid incurring future charges, delete the resources you created with the CloudFormation template.

Conclusion

This post discussed how BI administrators can use QuickSight, CloudTrail, and other AWS services to create a centralized view to analyze QuickSight usage metrics. We also presented a serverless data pipeline to support the administrative console dashboard.

You can request a demo of this administrative console to try for yourself.


About the Authors

Ying Wang is a Data Visualization Engineer with the Data & Analytics Global Specialty Practice in AWS Professional Services.

 

 

 

Jill FlorantJill Florant manages Customer Success for the Amazon QuickSight Service team

How the Yahoo! JAPAN Smart Devices Team is improving voice user interfaces with Amazon QuickSight business intelligence

Post Syndicated from Kazuhide Fujita original https://aws.amazon.com/blogs/big-data/how-the-yahoo-japan-smart-devices-team-is-improving-voice-user-interfaces-with-amazon-quicksight-business-intelligence/

This is a guest blog post by Kazuhide Fujita, Product Manager at Yahoo! JAPAN.

Yahoo! JAPAN is a large internet search and media company, with Yahoo! JAPAN’s web portal being the one of the most commonly used websites in Japan. Our smart devices team is responsible for building and improving Yahoo! JAPAN apps for voice user interfaces (VUI) such as Amazon Alexa and Google Assistant. We see VUI as a market that will grow exponentially in the future, and we want to be ready to lead the consumer experience with such devices. In this post, I discuss how we’re using Amazon QuickSight business intelligence (BI) to help our product teams improve these services.

Enhanced access to insights at lower cost

To continuously improve our services, we use data to understand how consumers are interacting with the software and to identify growth trends. However, the data we get directly from smart device makers is limited. So, we built our own log system to capture more granular data, such as the types of commands customers are using, the time of day they use the application, and how frequently they use it.

Early on, we used Amazon Elasticsearch Service (Amazon ES) and Kibana to analyze data. Although this solution was very capable, it came at a higher price point than we were targeting. Another option was to export data directly to Microsoft Excel for ad hoc analysis. However, this was very time consuming and limited us to working with extracts of historical data rather than the latest information.

We decided to look for a solution that would suit the full spectrum of our needs while being cost-effective for our specific use case. While we were searching, our data team made the decision to standardize on a data lake architecture using Amazon Simple Storage Service (Amazon S3) and Amazon Athena. This approach provided a high level of flexibility and scalability. To visualize our data, it made sense to use QuickSight, the serverless BI solution with pay-per-session pricing on AWS.

Unifying data to understand customers better

This system has proven to be a good fit for our needs. The data lake allows us to accumulate different types of data from monitoring many KPIs and VUI products. For example, we might want to know the number of active users over a given period, and then drill down into how active those users were in the 2 weeks from when they registered. The data lake makes this possible. It’s easy to maintain even though the data is very diverse. For aggregating and performing calculations on the data, we use Athena because it provides optimal performance for complex queries thanks to the distributed computing model.

For ad hoc analysis, dashboards, and reporting, QuickSight connects seamlessly to our data lake. QuickSight makes it easy to view trends in customer behavior such as the time of usage, method of interaction, typical settings, and so on. The following screenshot shows a sample dashboard in QuickSight.

The following screenshot shows a sample dashboard in QuickSight.

For example, the default wake word for Alexa-powered devices is to say the name of the voice assistant: “Hey, Alexa.” However, Japanese customers may prefer to say “ohayō,” which means “good morning” in Japanese. Which setting customers prefer could be an important trend for us to know when we configure our offerings. With QuickSight, it’s easy to compare trends for this type of behavior across other user characteristics.

This is only one small example of the kinds of insights we glean by using QuickSight. Another use case is regarding initiatives to increase product usage through marketing or incentives. We can track the outcome of these programs using QuickSight by tracking whether they result in an uptick in usage relative to the communications we send out.

The freedom to focus on what matters to the business

One of the big advantages of using QuickSight and other AWS services is that we don’t have to worry about maintaining on-premises systems for our data lake and analytics. It’s easy to manage and we can focus on gaining insights and improving our products—not running data center infrastructure. Building our end-to-end data-to-insights pipeline on AWS ensures that we can easily apply security and governance policies to all our data.

Overall, QuickSight provides us with the flexibility to analyze all kinds of data quickly, so we can aim to be the market leader in the VUI marketplace. We’re excited to see what the future holds for this powerful tool—and to apply the knowledge we gain to improving our services.


About the Author

Kazuhide Fujita is the Skill Team Product Manager, Smart Device Division, at Yahoo Japan Corporation

Implementing multi-tenant patterns in Amazon Redshift using data sharing

Post Syndicated from Rajesh Francis original https://aws.amazon.com/blogs/big-data/implementing-multi-tenant-patterns-in-amazon-redshift-using-data-sharing/

Software service providers offer subscription-based analytics capabilities in the cloud with Analytics as a Service (AaaS), and increasingly customers are turning to AaaS for business insights. A multi-tenant storage strategy allows the service providers to build a cost-effective architecture to meet increasing demand.

Multi-tenancy means a single instance of software and its supporting infrastructure is shared to serve multiple customers. For example, a software service provider could generate data that is housed in a single data warehouse cluster, but accessed securely by multiple customers. This storage strategy offers an opportunity to centralize management of data, simplify ETL processes, and optimize costs. However, service providers have to constantly balance between cost and providing a better user experience for their customers.

With the new data sharing feature, you can use Amazon Redshift to scale and meet both objectives of managing costs by simplifying storage and ETL pipelines while still providing consistent performance to customers.  You can ingest data into a cluster designated as a producer cluster, and share this live data with one or more consumer clusters. Clusters accessing this shared data are isolated from each other, therefore performance of a producer cluster isn’t impacted by workloads on consumer clusters. This enables consuming clusters to get consistent performance based on individual compute capacity.

In this post, we focus on various AaaS patterns, and discuss how you can use data sharing in a multi-tenant architecture to scale for virtually unlimited users. We discuss detailed steps to use data sharing with different storage strategies.

Multi-tenant storage patterns

Multi-tenant storage patterns help simplify the architecture and long-term maintenance of the analytics platform. In a multi-tenant strategy, data is stored centrally in a single cluster for all tenants, enabling simplification of the ETL ingestion pipeline and data management. In the previously published whitepaper SaaS Storage Strategies, various models of storage and benefits are covered for a single cluster scenario.

The three strategies you can choose from are:

  • Pool model – Data is stored in a single database schema for all tenants, and a new column (tenant_id) is used to scope and control access to individual tenant data. Access to the multi-tenant data is controlled using views built on the tables.
  • Bridge model – Storage and access to data for each tenant is controlled at individual schema level in the same database.
  • Silo model – Storage and access control to data for each tenant is maintained in separate databases

The following diagram illustrates the architecture of these multi-tenant storage strategies.

The following diagram illustrates the architecture of these multi-tenant storage strategies.

In the following sections, we will discuss how these multi-tenant strategies can be implemented using Amazon Redshift data sharing feature with a multi-cluster architecture.

Scaling your multi-tenant architecture using data sharing

AaaS providers implementing multi-tenant architectures were previously limited to resources of a single cluster to meet the compute and concurrency requirements of users across all the tenants. As the number of tenants increased, you could either turn on concurrency scaling or create additional clusters. However, the addition of new clusters means additional ingestion pipelines and increased operational overhead.

With data sharing in Amazon Redshift, you can easily and securely share data across clusters. Data ingested into the producer cluster is shared with one or more consumer clusters, which allows total separation of ETL and BI workloads. Several consumer clusters can read data from the managed storage of a producer cluster. This enables instant, granular, and high-performance access without data copies and movement. Workloads accessing shared data are isolated from each other and the producer. You can distribute workloads across multiple clusters while simplifying and consolidating the ETL ingestion pipeline into one main producer cluster, providing optimal price for performance.

Consumer clusters can in turn be producers for the data sets they own. Customers can optimize costs even further by collocating multiple tenants on the same consumer cluster. For instance, you can group low volume tier 3 tenants into a single consumer cluster to provider a lower cost offering, while high volume tier 1 tenants get their own isolated compute clusters. Consumer clusters can be created in the same account as producer or in a different AWS account. With this you can have separate billing for the consumer clusters, where you can chargeback to the business group that uses the consumer cluster or even allow your customers to use their own Redshift cluster in their account, so they pay for usage of the consumer cluster. The following diagram shows the difference in ETL and consumer access patterns in a multi-tenant architecture using data sharing versus a single cluster approach without data sharing.

Consumer clusters can in turn be producers for the data sets they own.

Multi-tenant architecture with data sharing compared to single cluster approach

Creating a multi-tenant architecture for an AaaS solution

For this post, we use a simple data model with a fact and a dimension table to demonstrate how to leverage data sharing to design a scalable multi-tenant AaaS solution. We cover detailed steps involved for each storage strategy using this data model. The tables are as follows:

  • Customer – dimension table containing customer details
  • Sales – fact table containing sales transactions

We use two Amazon Redshift ra3.4xl clusters, with 2 nodes each, and designate one cluster as producer and other as consumer.

The high-level steps involved in enabling data sharing across clusters are as follows:

  1. Create a data share in the producer cluster and assign database objects to the data share.
  2. From the producer cluster, grant usage on the data share to consumer clusters, identified by namespace or AWS account.
  3. From the consumer cluster, create an external database using the data share from the producer
  4. Query the tables in the data share through the external shared database in the consumer cluster. Grant access to other users to access this shared database and objects.

Creating producer and consumer Amazon Redshift clusters

Let us start by creating two Amazon Redshift ra3.4xl clusters with 2-nodes each, one for the producer and other for consumer.

  1. On the Amazon Redshift cluster, create two clusters of RA3 instance type, and name them ds-producer and ds-consumer-c1, respectively.
  1. Next, log in to Amazon Redshift using the query editor. You can also use a SQL client tool like DBeaver, SQL Workbench, or Aginity Workbench. For configuration information, see Connecting to an Amazon Redshift cluster using SQL client tools.

Get the cluster namespace of the producer and consumer clusters from the console. We will use the namespaces to create the tenant table and to create and access the data shares. You can also get the cluster namespaces by logging into each of the clusters and executing the SELECT CURRENT_NAMESPACE statement in the query editor.

Please note to replace the corresponding namespaces in the code sections wherever producercluster_namespace, consumercluster1_namespace, and consumercluster_namespace is referenced.

The following screenshot shows the namespace on the Amazon Redshift console.

Now that we have the clusters created, we will go through the detailed steps for the three models. First, we will cover the Pool model, followed by Bridge model and finally the Silo model.

Pool model

The pool model represents an all-in, multi-tenant model where all tenants share the same storage constructs and provides the most benefit in simplifying the AaaS solution.

With this model, data storage is centralized in one cluster database, and data is stored for all tenants in the same set of data models. To scope and control access to tenant data, we introduce a column (tenant_id) that serves as a unique identifier for each tenant.

Security management to prevent cross-tenant access is one of the main aspects to address with the pool model. We can implement row-level security and provide secure access to the data by creating database views and set application-level policies by creating groups with specific access and assigning users to the groups. The following diagram illustrates the pool model architecture.

The following diagram illustrates the pool model architecture.

To create a multi-tenant solution using the pool model, you create data shares for the pool model in the producer cluster, and share data with the consumer cluster. We provide more detail on these steps in the following sections.

Creating data shares for the pool model in the producer cluster

To create data shares for the pool model in the producer cluster, complete the following steps:

  1. Log in to the producer cluster as an admin user and run the following script.

Note that we have a tenant table to store unique identifiers for each tenant or consumer (tenant).

We add a column (tenant_id) to the sales and customer tables to uniquely identify tenant data. This tenant_id references the tenant_id in the tenant table to uniquely identify the tenant and consumer records. See the following code:

/**********************************************/
/*       datasharing datasetup pool model     */
/**********************************************/

-- Create Schema
create schema sales;

CREATE TABLE IF NOT EXISTS sales.tenant ( 
t_tenantid int8 not null,
t_name varchar(50) not null,
t_namespace varchar(50),
t_account varchar(16)
)
DISTSTYLE AUTO
SORTKEY AUTO;

-- Create tables for multi-tenant sales schema
drop table sales.customer;
CREATE TABLE IF NOT EXISTS sales.customer(
  c_tenantid int8 not null,
  c_custid int8 not null,
  c_name varchar(25) not null,
  c_region varchar(40) not null,
  Primary Key(c_tenantid, c_custid)
) 
DISTSTYLE AUTO
SORTKEY AUTO;

CREATE TABLE IF NOT EXISTS sales.sales (
  s_tenantid int8 not null,
  s_orderid int8 not null,
  s_custid int8 not null,
  s_totalprice numeric(12,2) not null,
  s_orderdate date not null,
  Primary Key(s_tenantid, s_orderid)
) 
DISTSTYLE AUTO
SORTKEY AUTO;
  1. Set up the tenant table with the details for each consumer cluster, and ingest data into the customer dimension and sales fact tables. Using the COPY command is the recommended way to ingest data into Amazon Redshift, but for illustration purposes, we use INSERT statements:
    -- Ingest data 
    insert into sales.tenant values
    (0, 'primary', '<producercluster_namespace>',''),
    (1, 'tenant1', '<consumercluster1_namespace>',''),
    (2, 'tenant2', '<consumercluster2_namespace>','');
    
    insert into sales.customer values
    (1, 1, 'Customer 1', 'NorthEast'),
    (1, 2, 'Customer 2', 'SouthEast'),
    (2, 1, 'Customer 3', 'NorthWest'),
    (2, 2, 'Customer 4', 'SouthEast');
    
    truncate table sales.sales;
    insert into sales.sales values
    (1, 1, 1, 2434.33, '2020-11-21'),
    (1, 2, 2, 54.90, '2020-5-5'),
    (1, 3, 2, 9678.99, '2020-3-8'),
    (2, 1, 2, 452.24, '2020-1-23'),
    (2, 2, 1, 76523.10, '2020-11-3'),
    (2, 3, 1, 6745.20, '2020-10-01');
    
    select count(*) from sales.tenant;
    select count(*) from sales.customer;
    select count(*) from sales.sales;

Securing data on the producer cluster by restricting access

In the pool model, no external user has direct access to underlying tables. All access is restricted using views.

  1. Create a view for each of the fact and dimension tables to include a condition to filter records from the consumer tenant’s namespace. In our example, we create v_customersales to combine sales fact and customer dimension tables with a restrictive filter for tenant.namespace = current_namespace. See the following code:
    /**********************************************/
    /* We will create late binding views          */
    /* but materialized views could also be used  */
    /**********************************************/
    
    create or replace view sales.v_customer as
    select * 
    from sales.customer c, sales.tenant t
    where c.c_tenantid = t.t_tenantid 
    and t.t_namespace  = current_namespace;
    
    create or replace view sales.v_sales as
    select * 
    from sales.sales s, sales.tenant t
    where s.s_tenantid = t.t_tenantid 
    and t.t_namespace  = current_namespace;
    
    create or replace view sales.v_customersales as 
    select c_tenantid, c_name, c_region, 
    	date_part(w, to_date(s_orderdate,'YYYY-MM-DD')) as "week", 
    	date_part(mon, to_date(s_orderdate,'YYYY-MM-DD')) as "month", 
    	date_part(dow, to_date(s_orderdate,'YYYY-MM-DD')) as "dow",
    	date_part(yr, to_date(s_orderdate,'YYYY-MM-DD')) as "year",
    	date_part(d, to_date(s_orderdate,'YYYY-MM-DD')) as "dom", 
    	t.t_namespace
    from sales.tenant t, sales.customer c, sales.sales s
    where t.t_tenantid = c.c_tenantid 
    and c.c_tenantid = s.s_tenantid 
    and c.c_custid = s.s_custid 
    and t.t_namespace = current_namespace 
    WITH NO SCHEMA BINDING;
    
    select * from sales.v_customersales;
          
    

Now that we have database objects created in the producer cluster, we can share the data with the consumer clusters.

Sharing data with the consumer cluster

To share data with the consumer cluster, complete the following steps:

  1. Create a data share for the sales data:
    /***************************************************/
    /* Create Datashare and add objects to the share    */
    /****************************************************/
    CREATE DATASHARE salesshare;
    

  1. Enter the following code to alter the data share, add the sales schema to be shared with the consumer clusters, and add all tables in the sales schema to be shared with the consumer cluster:
    /************************************************************/
    /* Add objects at desired granularities: schemas, tables,   */
    /* views include materialized, and SQL UDFs                 */
    /************************************************************/
    ALTER DATASHARE salesshare ADD SCHEMA sales;  -- New addition to create SCHEMA first
    
    /*For pool model, we share only the views and not tables */
    ALTER DATASHARE SalesShare ADD TABLE sales.v_customer;
    ALTER DATASHARE SalesShare ADD TABLE sales.v_customersales;
    

For the pool model, we share only the views with the consumer cluster and not the tables. The ALTER statement ADD TABLE is used to add both views and tables.

  1. Grant usage on the sales data share to the namespace of the BI consumer cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster. See the following code:
    /********************************************************************/
    /* Grant access to consumer clusters                                */
    /* login to Consumer BI Cluster and get the Namespace from          */
    /* the Redshift console or using SELECT CURRENT_NAMESPACE           */
    /********************************************************************/
    SELECT CURRENT_NAMESPACE;
    
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    GRANT USAGE ON DATASHARE salesshare TO NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long
    GRANT USAGE ON DATASHARE salesshare TO ACCOUNT 'Consumer_AWSAccount';
    

  1. View data shares that are shared from the producer cluster:
    SELECT * FROM SVV_DATASHARES;

The following screenshot shows the output.

The following screenshot shows the output.

You can also see the data shares and their detailed objects and consumers using the following commands:

SHOW DATASHARES;
DESC DATASHARE salesshare;
select * from SVV_DATASHARE_OBJECTS;
select * from SVV_DATASHARE_CONSUMERS;

Viewing and querying data shares for the pool model from the consumer cluster

To view and query data shares from the consumer cluster, complete the following steps:

  1. Log in to the consumer cluster as an admin user and view the data share objects:
    /**********************************************************/
    /* Login to Consumer cluster as awsuser:                  */
    /* View datashares and create local database for querying */
    /**********************************************************/
    select * from SVV_DATASHARE_OBJECTS;

The following screenshot shows the results.

The following screenshot shows the results.

  1. Create a new database from the data share of the producer cluster:
    /**********************************************************/
    /* Create a local database and schema reference           */
    /**********************************************************/
    CREATE DATABASE sales_db FROM DATASHARE salesshare
    OF NAMESPACE '<producercluster_namespace>';

  1. Optionally, you can create an external schema in the consumer cluster pointing to the schema in the database of the producer cluster.

Creating a local external schema in the consumer cluster allows schema-level access controls within the consumer cluster, and uses a two-part notation when referencing shared data objects (localschema.table; vs. external_db.producerschema.table). See the following code:

/*********************************************************/
/* Create External Schema - Optional                     */
/* reason for schema: give specific access to schema     */
/* using shared alias get access to a secondary database */
/*********************************************************/
CREATE EXTERNAL SCHEMA sales_schema 
FROM REDSHIFT DATABASE 'sales_db' SCHEMA 'sales';
  1. Now you can query the shared data from the producer cluster by using the syntax tenant.schema.table:
    select * from sales_db.sales.customer;
    select * from sales_db.sales.v_customersales;

  1. From the tenant1 consumer cluster, you can view the databases and the tenants that are accessible to tenant1. tenant1_schema is as follows:
    select * from SVV_REDSHIFT_DATABASES;
    

The following screenshot shows the results.

The following screenshot shows the results.

Creating local consumer users and controlling access

You can control access to users in your consumer cluster by creating users and groups, and assigning access to the data share objects.

  1. Log in as an admin user on consumer cluster 1 and enter the following code to create tenant1_group, grant usage on the local database sales_db and schema sales_schema to the group, and assign the user tenant1_user to the tenant1_group:
    /********************************************************/
    /* Consumers can create own users and assign privileges */
    /* Create tenant1_group and assign privileges to read   */
    /* sales_db and the sales_schema                        */
    /* Create tenant1_user in tenant1_group                 */
    /********************************************************/
    create group tenant1_group;
    create user tenant1_user password 'Redshift#123!' in group tenant1_group;
    
    GRANT USAGE ON DATABASE sales_db TO tenant1_group;
    GRANT USAGE ON SCHEMA sales_schema TO GROUP tenant1_group;
    

  1. Now, login as tenant1_user to consumer cluster 1 and select data from the views v_customer and v_customersales:
    /*******************************************************/
    /* select from view returns only sales records related */
    /* to Consumer A namespace                             */
    /*******************************************************/
    select * from sales_db.sales.v_customer;
    select * from sales_db.sales.v_customersales;
    

You should see only the data relevant to tenant 1 and not the data that is associated with tenant 2.

You should see only the data relevant to tenant 1 and not the data that is associated with tenant 2.

 

Create Materialized views to optimize performance

Consumer clusters can have their own database objects which are local to the consumer. You can also create materialized views on the datashare objects and control when to refresh the dataset for your consumers. This provides another level of isolation from the producer cluster, and will ensure the consumer clusters go against their local dataset.

  1. Log in as an admin user on consumer cluster 1 and enter the following code to create a materialized view for customersales. This will create a local view that can be periodically refreshed from the consumer cluster.

 

/*******************************************************/
/* Create materialized view in consumer cluster        */
/*******************************************************/
create MATERIALIZED view tenant1_sales.mv_customersales as 
select c_tenantid, c_name, c_region, 
	date_part(w, to_date(s_orderdate,'YYYY-MM-DD')) as "week", 
	date_part(mon, to_date(s_orderdate,'YYYY-MM-DD')) as "month", 
	date_part(dow, to_date(s_orderdate,'YYYY-MM-DD')) as "dow",
	date_part(yr, to_date(s_orderdate,'YYYY-MM-DD')) as "year",
	date_part(d, to_date(s_orderdate,'YYYY-MM-DD')) as "dom", 
	t.t_namespace
from sales_db.tenant t, sales_db.customer c, sales_db.sales s
where t.t_tenantid = c.c_tenantid 
and c.c_tenantid = s.s_tenantid 
and c.c_custid = s.s_custid 
and t.t_namespace = current_namespace;

select * from tenant1_sales.mv_customersales top 100;

REFRESH MATERIALIZED VIEW tenant1_sales.mv_customersales;

 

With the preceding steps, we have demonstrated how you can control access to the tenant data in the same datastore using views. We also reviewed how data shares help efficiently share data between producer and consumer clusters with transaction consistency. We also saw how a local materialized view can be created to further isolate your BI workloads for your customers and provide a consistent, performant user experience. In the next section we will discuss the Bridge model.

Bridge model

In the bridge model, data for each tenant is stored in its own schema in a database and contains a similar set of tables. Data shares are created for each schema and shared with the corresponded consumer. This is an appealing balance between silo and pool model, providing both data isolation and ETL consolidation. With Amazon Redshift, you can create up to 9,900 schemas. For more information, see Quotas and limits in Amazon Redshift.

With data sharing, separate consumer clusters can be provisioned to use the same managed storage from producer cluster. Consumer clusters have all the capabilities of a producer cluster, and can in turn be producer clusters for data objects they own. Consumers can’t share data that is already shared with them. Without data sharing, queries from all customers are directed to a single cluster. The following diagram illustrates the bridge model.

The following diagram illustrates the bridge model.

To create a multi-tenant architecture using bridge model, complete the steps in the following sections.

Creating database schemas and tables for the bridge model in the producer cluster

As we did in the pool model, the first step is to create the database schema and tables. We log in to the producer cluster as an admin user and create separate schemas for each tenant. For our post, we create two schemas, tenant1 and tenant2, to store data for two tenants.

  1. Log in to the producer cluster as the admin user.
  1. Use the script below to create two schemas, tenant1 and tenant2, and create tables for customer dimension and sales facts under each of the two schemas. See the following code:
    /****************************************/
    /* Bridge -  Data Model */
    /****************************************/
    -- Create schemas tenant1 and tenant2
    create schema tenant1;
    create schema tenant2;
    
    -- Create tables for tenant1
    CREATE TABLE IF NOT EXISTS tenant1.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant1.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    
      
    
    -- Create tables for tenant2
    CREATE TABLE IF NOT EXISTS tenant2.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant2.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    

  1. Ingest data into the customer dimension and sales fact tables. Using the COPY command is the recommended way to ingest data into Amazon Redshift, but for illustration purposes, we use the INSERT statement:
    -- ingest data for tenant1
    -- ingest customer data
    insert into tenant1.customer values
    (1, 'Customer 1', 'NorthEast'),
    (2, 'Customer 2', 'SouthEast');
    
    -- ingest sales data
    insert into tenant1.sales values
    (1, 1, 2434.33, '2020-11-21'),
    (2, 2, 54.90, '2020-5-5'),
    (3, 2, 9678.99, '2020-3-8');
    
    select count(*) from tenant1.customer;
    select count(*) from tenant1.sales;
    
    
    -- ingest data for tenant2
    -- ingest customer data
    insert into tenant2.customer values
    (1, 'Customer 3', 'NorthWest'),
    (2, 'Customer 4', 'SouthEast');
    
    -- ingest sales data
    truncate table tenant2.sales;
    insert into tenant2.sales values
    (1, 2, 452.24, '2020-1-23'),
    (2, 1, 76523.10, '2020-11-3'),
    (3, 1, 6745.20, '2020-10-01');
    
    
    select count(*) from tenant2.customer;
    select count(*) from tenant2.sales;
    

Creating data shares and granting usage to the consumer cluster

In the following code, we create two data shares, tenant1share and tenant2share, to share the database objects under the two schemas to the respective consumer clusters.

  1. Create two datashares tenant1share and tenant2share to share the database objects under the two schemas to the respective consumer clusters.
    /******************************************************************/
    /*   Create Datashare and add database objects to the datashare   */
    /******************************************************************/
    CREATE DATASHARE tenant1share;
    CREATE DATASHARE tenant2share;

  1. Alter the datashare and add the schema(s) for respective tenants to be shared with the consumer cluster
    ALTER DATASHARE tenant1share ADD SCHEMA tenant1;
    ALTER DATASHARE tenant2share ADD SCHEMA tenant2;

  1. Alter the datashare and add all tables in the schema(s) to be shared with the consumer cluster
    ALTER DATASHARE tenant1share ADD ALL TABLES IN SCHEMA tenant1;
    ALTER DATASHARE tenant2share ADD ALL TABLES IN SCHEMA tenant2;

Getting the namespace of the first consumer cluster

  1. Log in to the consumer cluster and get the namespace from the console or by running the select current_namespace command:
    /* Grant access to tenant1 schema for Tenant1 BI Cluster */
    /* login to tenant1 BI Cluster and get the Namespace 
     * or get the Namespace from the Redshift console */
    SELECT CURRENT_NAMESPACE;

 

  1. Grant usage on the data share for the first tenant to the namespace of the BI cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster:
    -- Grant usage on the datashare to the first consumer cluster
    -- Namespace refers to the namespace GUID of the consumer cluster 
    GRANT USAGE ON DATASHARE tenant1share TO NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant1share TO ACCOUNT '<Consumer_AWSAccount>';

Getting the namespace of the second consumer cluster

  1. Log in to the second consumer cluster and get the namespace from the console or by running the select current_namespace command:
    /* Grant access to tenant2 schema for Tenant2 BI Cluster */
    /*login to tenant1 BI Cluster and get the Namespace      */
    SELECT CURRENT_NAMESPACE;
    

  1. Grant usage on the data share for the second tenant to the namespace of the second consumer cluster you just got from the previous step:
    -- Grant usage on the datashare to the second consumer cluster
    GRANT USAGE ON DATASHARE tenant2share TO NAMESPACE '<consumercluster2_namespace>'
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant2share TO ACCOUNT '<Consumer_AWSAccount>';

  1. To view data shares from the producer cluster, enter the following code:
    /******************************************************************/
    /*   View Datashares created, Datashare objects and consumers     */
    /******************************************************************/
    select * from SVV_DATASHARES;
    select * from SVV_DATASHARE_OBJECTS;
    select * from SVV_DATASHARE_CONSUMERS;

The following screenshot shows the commands in the query editor.

The following screenshot shows the commands in the query editor.

The following screenshot shows the query results.

The following screenshot shows the query results.

Accessing data using the consumer cluster from the data share

To access data using the consumer cluster, complete the following steps:

  1. Log in to the first consumer cluster ds-consumer-c1, as an admin user.
  1. View data share objects from the SVV_DATASHARE_OBJECTS system view:
    /*****************************************************/
    /* Consumer cluster as adminuser
    /* List the shares available and review contents for each 
    /********************************************************/
    -- You can view datashare objects associated with the cluster
    -- using either of the two commands
    SHOW DATASHARES;
    select * from SVV_DATASHARES;

The following screenshot shows the query results.

The following screenshot shows the query results.

--View objects shared in inbound share for consumer
select * from SVV_DATASHARE_OBJECTS;

The following screenshot shows the query results.

The following screenshot shows the query results.

--View namespace or clusters granted usage to a datashare
select * from svv_datashare_consumers;
  1. Create a local database in the first consumer cluster, and an external schema to be able to provide controlled access to the specific schema to the consumer clusters:
    /*******************************************************/
    /* Create a local database and schema reference        */
    /* to the share objects                                */
    /*******************************************************/
    CREATE DATABASE tenant1_db FROM DATASHARE tenant1share
    OF NAMESPACE '<producercluster_namespace>';

  1. Query the database tables using the three-part notation db.tenant.table:
    select * from tenant1_db.tenant1.customer;
    select * from tenant1_db.tenant1.sales;

  1. Optionally, you can create an external schema.

There are two reasons to create an external schema: either to enable two-part notation access to the tables from the consumer cluster, or to provide restricted access to the specific schemas for selected users, when multiple schemas are shared from the producer cluster. See the following code for our external schema:

/* Create External Schema */
CREATE EXTERNAL SCHEMA tenant1_schema FROM REDSHIFT DATABASE 'tenant1_db' SCHEMA 'tenant1';
  1. If you created the local schemas, you can use the following two-part notation to query the database tables:
    select * from tenant1_schema.customer;
    select * from tenant1_schema.sales;

  1. You can view the shared databases by querying the SVV_REDSHIFT_DATABASES table:
    select * from SVV_REDSHIFT_DATABASES;

The following screenshot shows the query results.

The following screenshot shows the query results.

Creating consumer users for managing access

Still logged in as an admin user to the consumer cluster, you can create other users who have access to the database objects.

  1. Create users and groups, and assign users and object privileges to the groups with the following code:
    /*******************************************************/
    /* Consumer can create own users and assign privileges */
    /* Create tenant1_user and assign privileges to        */
    /* read datashare from tenant1 schema                  */
    /*******************************************************/
    create group tenant1_group;
    create user tenant1_user password 'Redshift#123!' in group tenant1_group;
    
    GRANT USAGE ON DATABASE tenant1_db TO tenant1_user;
    GRANT USAGE ON SCHEMA tenant1_schema TO GROUP tenant1_group;
    

Now tenant1_user can log in and query the shared tables from tenant schema.

  1. Log in to the consumer cluster as tenant1_user and query the tables:
    /************************************************************/
    /* Consumer cluster as tenant1_user - As a consumer cluster */
    /* administrator, you must follow these steps:              */
    /************************************************************/
    
    select * from tenant1_db.tenant1.customer;
    select * from tenant1_db.tenant1.sales;
    
    
    /************************************************************/
    /* If you have created and External Schema                  */
    /* you can use the two-part notation to query tables.       */
    /************************************************************/
    
    select * from tenant1_schema.customer;
    select * from tenant1_schema.sales;
    

Revoking access to a data share (optional)

  1. At any point, if you want to revoke access to the data share, you can the REVOKE USAGE command:
    /*************************************************************/
    /* To revoke access at any time use the REVOKE USAGE command */
    /*************************************************************/
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    REVOKE USAGE ON DATASHARE Salesshare FROM NAMESPACE '<consumercluster1_namespace>'
    --Account numbers are 12 digit long
    REVOKE USAGE ON DATASHARE Salesshare FROM ACCOUNT '<Consumer_AWSAccount>';
    

Silo model

The third option is to store data for each tenant in separate databases within a cluster. If you need your data isolated from other tenants, you can use the silo model and each database may have distinct data models, monitoring, management, and security footprints.

Amazon Redshift supports cross-database queries across databases, which allow you to simplify data organization. You can store common or granular datasets used across all tenants in a centralized database, and use the cross-database query capability to join relevant data for each tenant.

The steps to create a data share in a silo model is similar to a bridge model; however, unlike a bridge model (where data share is for each schema), the silo model has a data share created for each database. The following diagram illustrates the architecture of the silo model.

The following diagram illustrates the architecture of the silo model.

Creating data shares for the silo model in the producer cluster

To create data shares for the silo model in the producer cluster, complete the following steps:

  1. Log in to the producer cluster as an admin user and create separate databases for each tenant:
    /*****************************************************/
    /** Silo Model – Create databases for the 2 tenants **/
    /*****************************************************/
    create database tenant1_silodb;
    
    create database tenant2_silodb;
    

  1. Log in again to the producer cluster with the database name and user ID for the database that you want to share (tenant1_silodb) and create the schema and tables:
    /***********************************************************/
    /* login to tenant1_db and create schema tenant1 and tables*/
    /***********************************************************/
    create schema tenant1_siloschema;
    
    -- Create tables for tenant1
    CREATE TABLE IF NOT EXISTS tenant1_silodb.tenant1_siloschema.customer (
      c_custid int8 not null ,
      c_name varchar(25) not null,
      c_region varchar(40) not null,
      Primary Key(c_custid)
    ) diststyle ALL sortkey(c_custid);
    
    CREATE TABLE IF NOT EXISTS tenant1_silodb.tenant1_siloschema.sales (
      s_orderid int8 not null,
      s_custid int8 not null,
      s_totalprice numeric(12,2) not null,
      s_orderdate date not null,
      Primary Key(s_orderid)
    ) distkey(s_orderid) sortkey(s_orderdate, s_orderid) ;
    
    insert into tenant1_siloschema.customer values
    (1, 'Customer 1', 'NorthEast'),
    (2, 'Customer 2', 'SouthEast');
    
    truncate table tenant1_siloschema.sales;
    insert into tenant1.sales values
    (1, 1, 2434.33, '2020-11-21'),
    (2, 2, 54.90, '2020-5-5'),
    (3, 2, 9678.99, '2020-3-8');
    

  1. Create a data share with a name for the first tenant (for example, tenant1dbshare):
    /******************************************************************/
    /*   Create datashare and add database objects to the datashare   */
    /******************************************************************/
    CREATE DATASHARE tenant1_silodbshare;
    

  1. Run Alter datashare commands to add the schemas to be shared with the consumer cluster and add all tables in the schemas to be shared with the consumer cluster:
    ALTER DATASHARE tenant1_silodbshare ADD SCHEMA tenant1_siloschema;
    ALTER DATASHARE tenant1_silodbshare ADD ALL TABLES IN SCHEMA tenant1_siloschema;

  1. Grant usage on the data share for first tenant to the namespace of the BI cluster. You can get the namespace of the BI cluster from the console or using the SELECT CURRENT_NAMESPACE statement in the BI cluster:
    --Namespace refers to the namespace GUID of the consumer cluster in the account
    GRANT USAGE ON DATASHARE tenant1_silodbshare TO NAMESPACE ‘<consumercluster1_namespace>’
    --Account numbers are 12 digit long (optional)
    --GRANT USAGE ON DATASHARE tenant1_silodbshare TO ACCOUNT '<AWS-Account>';
    

Viewing and querying data shares for the silo model from the consumer cluster

To view and query your data shares, complete the following steps:

  1. Log in to the consumer cluster as an admin user.
  2. Create a new database from the data share of the producer cluster:
    CREATE DATABASE tenant1_silodb FROM DATASHARE tenant1_silodbshare
    OF NAMESPACE ‘<producercluster_namespace>’;

Now you can start querying the shared data from the producer cluster by using the syntax – tenant.schema.table. If you created an external schema, then you can also use the two-part notation to query the tables.

  1. Query the data with the following code:
    select * from tenant1_silodb.tenant1.customer;
    select * from tenant1_silodb.tenant1.sales;
    

  1. Optionally, you can create an external schema pointing to the schema in the database of the producer cluster. This allows you query shared tables using a two-part notation. See the following code:
    CREATE EXTERNAL SCHEMA tenant1_siloschema FROM REDSHIFT DATABASE 'tenant1_silodb' SCHEMA 'tenant1';
    
    --With this Schema, you can access using two-part notation to select from data share tables
    select * from tenant1_siloschema.customer;
    select * from tenant1_siloschema.sales;
    

  1. You can repeat the same steps for tenant2 to share the tenant2 database with tenant2 You can also control access to users in your consumer cluster by creating users and groups, and assigning access to the data share objects.

System views to view data shares

We have introduced new system tables and views to easily identify the data shares and related objects. You can use three different groups of system views to view the data share objects:

  • Views starting with SVV_DATASHARES – has detail of datashares and objects in a datashare.
View Name Purpose
SVV_DATASHARES View a list of data shares created on the cluster and data shares shared with the cluster
SVV_DATASHARE_OBJECTS View a list of objects in all data shares created on the cluster or shared with the cluster
SVV_DATASHARE_CONSUMERS View a list of consumers for data share created on the cluster
  • Views starting with SVV_REDSHIFT – contains details on both local and remote Redshift databases.
View Name Purpose
SVV_REDSHIFT_DATABASES List of all databases that a user has access to
SVV_REDSHIFT_SCHEMAS List of all schemas that user has access to
SVV_REDSHIFT_TABLES List of all tables that a user has access to
SVV_REDSHIFT_COLUMNS List of all columns that a user has access to
SVV_REDSHIFT_FUNCTIONS List of all functions that user has access to
  • Views starting with SVV_ALL– contain local and remote databases, external schemas including  spectrum and federated query, and external schema references to shared data.  If you create external schemas in consumer cluster, you need to use the SVV_ALL views to look at the objects.
View Name Purpose
SVV_ ALL _SCHEMAS Union of list of all schemas from SVV_REDSHIFT_SCHEMA view and consolidated list of all external tables and schemas that user has access to
SVV_ ALL _TABLES List of all tables that a user has access to
SVV_ ALL _COLUMNS List of all columns that a user has access to
SVV_ ALL _FUNCTIONS List of all functions that user has access to

Considerations for choosing a storage strategy

You can adopt a storage strategy or choose a hybrid approach based on business, technical, and operational requirements. Before deciding on a strategy, consider the quotas and limits for various objects in Amazon Redshift, and the number of databases per cluster or number of schemas per database to check if it meets your requirements. The following table summarizes these considerations.

  Pool Bridge Silo
Separation of tenant data Views Schema Database
ETL pipeline complexity Low Low Medium
Limits 100,000 tables (RA3 – 4x, 16x large clusters) 9,900 schemas per database 60 databases per cluster
Chargeback to consumer accounts Yes Yes Yes
Scalability High High High

Conclusion

In this post, we discussed how you can use the new data sharing feature of Amazon Redshift to implement an AaaS solution with a multi-tenant architecture while meeting SLAs for consumers using separate Amazon Redshift clusters. We demonstrated three types of models providing various levels of isolation for the tenant data. We compared and contrasted the models and provided guidance on when to choose an implementation model. We encourage you to try the data sharing feature to build your AaaS or software as a service (SaaS) solutions.


About the Authors

Rajesh Francis is a Sr. Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and works with customers to build scalable Analytic solutions.

 

 

 

Neeraja Rentachintala is a Principal Product Manager with Amazon Redshift. Neeraja is a seasoned Product Management and GTM leader, bringing over 20 years of experience in product vision, strategy and leadership roles in data products and platforms. Neeraja delivered products in analytics, databases, data Integration, application integration, AI/Machine Learning, large scale distributed systems across On-Premise and Cloud, serving Fortune 500 companies as part of ventures including MapR (acquired by HPE), Microsoft SQL Server, Oracle, Informatica and Expedia.com.

 

Jeetesh Srivastva is a Sr. Analytics specialist solutions architect at AWS. He specializes in Amazon Redshift and works with customers to implement scalable solutions leveraging Redshift and other AWS Analytic services. He has worked to deliver on premises and cloud based analytic solutions for customers in banking & finance and hospitality industry verticals.

Querying a Vertica data source in Amazon Athena using the Athena Federated Query SDK

Post Syndicated from Kelly Ragan original https://aws.amazon.com/blogs/big-data/querying-a-vertica-data-source-in-amazon-athena-using-the-athena-federated-query-sdk/

The ability to query data and perform ad hoc analysis across multiple platforms and data stores with a single tool brings immense value to the big data analytical arena. As organizations build out data lakes with increasing volumes of data, there is a growing need to combine that data with large amounts of data in other data stores. As the variety of data increases, it becomes paramount to have a query tool to bridge two or more data stores with a single query.

Even though data lakes became popular for analytic workloads recently, it’s not uncommon to have data warehouses in addition to data lakes for various reporting and business intelligence (BI) use cases. It becomes imperative to be able to seamlessly query the data stored in the data warehouse and the data lake. To address this issue, Amazon Athena has released a feature called Athena Federated Query. Athena is an interactive query service provided by AWS that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Vertica is a columnar MPP database platform that can be deployed in the cloud or on premises, and supports exabyte scale data warehouses. With Athena Federated Query and the Vertica connector, you can now run analytical queries over a data warehouse on Vertica and a data lake in Amazon S3.

Athena Federated Query includes pre-built connectors to a variety of AWS services and databases, as well as an SDK to build custom connectors to other databases and data stores. With this feature, federated queries can pull data from a data lake in an S3 bucket and from an external data source, and then combine it into a single result set in Athena. These connectors are an extension of the Athena query engine, which translates content between Athena and the external data source. Pre-built connectors exist for Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), and Amazon Relational Database Service (Amazon RDS), as well as a JDBC connector for Amazon Redshift, MySQL, and PostgreSQL. For other types of relational databases, you can use the Athena Federated Query SDK to create a custom connector.

In this post, we demonstrate how to deploy the custom connector between Athena and a Vertica database built using the Athena Federated Query SDK. After deploying the custom connector, we demonstrate issuing federated queries and moving data from Vertica to a data lake using CREATE TABLE AS (CTAS) with a federated query.

AWS services used in the solution

The Athena Federated Query SDK is an open-source framework to build custom connectors, and comes with a connector publish tool that deploys the connector executables in an application to the AWS Serverless Application Repository. The Athena Federated Query uses an AWS Lambda function that in turn uses the application deployed to the AWS Serverless Application Repository.

A custom connector is composed of a Lambda function that utilizes three components:

  • MetadataHandler – An interface that exposes metadata information of schemas, tables, and columns from the underlying data store to Athena
  • RecordHandler – An interface that provides hooks to read data from the external source and share it with the Athena query engine in Apache Arrow columnar format
  • CompositeHandler – For managing running the MetadataHandler and RecordHandler

The Lambda function connects to the external data store using an appropriate connection protocol and sends the parsed SQL statement. In the case of Vertica, it is JDBC. The RecordHandler processes the result set produced by the external data store and passes the rows to Athena for final processing. Multiple Lambda functions are called by Athena depending on the Lambda concurrency settings to read the result set in parallel. A spill bucket is used to handle a large dataset that exceeds the Lambda server’s capacity to process the result set.

The JDBC connection established by the Lambda function to the external database is used to send the parsed SQL statement and retrieve the result set rows from the external database. This scenario works well in terms of bandwidth for smaller databases and result sets. However, you might have Vertica deployments with petabyte or exabyte data warehouses. Typical queries return result sets on the order of 10, 20, 30 gigabytes, or more. Due to the bandwidth issue with a JDBC connection, the solution presented in this post modifies the Athena Federated Query SDK to implement a different route for the transmission of large result sets from Vertica to the Athena server for final processing.

The alternate solution utilizes the Vertica EXPORT command as a wrapper around the parsed SQL statement. You can use the EXPORT command to write a result set for a SQL statement directly to an S3 bucket using Vertica’s highly parallelized write to Amazon S3 using partitioning. This solution modifies the SDK to allow Athena to read the result set in the S3 bucket, determine the number of partitions, and call subsequent Lambda functions to parallelize the read of the result set. This produces an efficient way to move a multi-gigabyte result set from Vertica to Athena with parallelized writes from Vertica to Amazon S3 and parallelized reads from Amazon S3 to Athena. When connecting to the Vertica database, the SDK uses AWS Secrets Manager to retrieve a user ID and password for a service account on the Vertica database.

Solution architecture

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The connector components are as follows:

  1. A user issues a federated SQL query in Athena against a table in Vertica.
  2. Athena parses the query and calls a Lambda function.
  3. The Lambda function makes a call to Secrets Manager to get the user ID and password for connecting to Vertica.
  4. The connector sends an EXPORT statement wrapper with the embedded SQL statement to Vertica through the JDBC connection. For example, see the following code:
    EXPORT TO PARQUET (directory = 's3://<bucket_name>/<folder_name>, 
    Compression='Snappy', fileSizeMB=64) OVER() as   
    SELECT  
    ORDER_ID,  
    ITEM,  
    CUSTOMER_ID,
    ORDERED_DATE
    FROM SCHEMA1.ORDERS  
    WHERE CUSTOMER_ID = 2;
    

  5. Vertica processes the SQL query and writes the result set to the S3 bucket specified in the EXPORT command. Vertica parallelizes the write to S3 bucket based on the fileSizeMB parameter into as many partitions as needed for the result set.
  6. Athena calls a Lambda function to scan the S3 bucket in order to determine the number of files to read for the result set.
  7. Athena invokes multiple Lambda functions depending on the number of partitions using Amazon S3 Select. This allows Athena to parallelize the read of the S3 files.
  8. Athena combines the result set returned from Vertica with data scanned from the data lake, and returns the combined result set to the user.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • Amazon EC2 IAM role permissions – The AWS Identity and Access Management (IAM) role of the Amazon Elastic Compute Cloud (Amazon EC2) machines hosting the Vertica database must be given write permissions to the VerticaExport S3 bucket, which is created when deploying the connector.
  • Secrets Manager – The Vertica connection credentials are stored in Secrets Manager. The secret name is prefixed with Vertica– and the secret value is the connection credentials.
  • Lambda IAM role permissions – When the Lambda function is deployed to the AWS Serverless Application Repository, it creates a custom IAM role for the function to run. The custom role has the following IAM permissions in order to successfully perform the read and write functions associated with the MetadataHandler and RecordHandler:
    • AWSLambdaBasicExecutionRole
    • AWSLambdaVPCAccessExecutionRole
    • For Secrets Manager, GetSecretValue for secrets with a prefix given in SecretNameOrPrefix
    • For Amazon S3, list, read, and write permissions for SpillBucket and ExportBucket, and list permissions for all S3 buckets
    • For Athena, GetQueryExecution

Demonstration tables

To demonstrate the Athena Vertica connector capabilities, we use the following components:

  • A Vertica database running in our AWS environment.
  • A Vertica table called orders containing details of customer orders.
  • An Athena table called customer, which has an S3 bucket as a data source. This table contains information regarding customers.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the orders table in Vertica.

The following screenshot shows the details of the orders table in Vertica.

Setting up the Athena Vertica connector project

To set up your connector project, complete the following steps:

  1. Create an S3 bucket in your AWS account. This is the bucket where the result set from Vertica is exported.
  2. Create another S3 bucket in your AWS account. This is the bucket where the code for the connector is stored and retrieved.
  3. Grant the IAM role of the EC2 machines hosting the Vertica database read and write permissions to the S3 result set bucket, allowing Vertica to export data to the bucket.
  4. Clone the GitHub repo in your local folder.
  5. Open the project in your preferred IDE.
  6. From the athena-query-federation directory, run mvn clean install.
  7. From the athena-vertica directory, run mvn clean install.
  8. From the athena-vertica directory, run ../tools/publish.sh <s3_code_bucket_name> athena-vertica [region] to publish the connector to your private AWS Serverless Application Repository.
  9. Upon successful completion of the script, the connector’s serverless application is published to the AWS Serverless Application Repository.

Deploying the connector

To deploy your connector, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Published Applications.
  2. On the Private Applications tab, select Show apps that create in order to see deployed applications.
  3. Choose the VerticaAthenaConnector serverless app.
  4. For AthenaCatalogName, enter the name of the connector Lambda function used when querying the Vertica tables (avc).
  5. For SecretNameOrPrefix, enter the prefix used to store the Vertica credentials in Secrets Manager (the default is Vertica-).
  6. For SpillBucket, enter the S3 bucket name where data is spilled in case the result set data volume crosses a certain limit (test-spill-bucket).
  7. For VerticaExportBucket, enter the S3 bucket where the result set from Vertica is exported (test-export-bucket).
  8. For VpcId, enter your VPC ID.

For VpcId, enter your VPC ID.

  1. For SpillPrefix, enter athena-spill.
  2. For SubnetIds, enter your subnet IDs.
  3. For VerticaConnectionString, enter the connection string of the Vertica database in the following format:
    jdbc:vertica://<host_name>:<port>/<database>?user=${vertica-username}&password=${vertica-password}
    

    Where, vertica-username and vertica-password are the secret names of the Vertica   user credentials stored in AWS Secrets Manager.

  1. Select I acknowledge that this app creates custom IAM roles.

Select I acknowledge that this app creates custom IAM roles.

  1. Choose Deploy.

Upon successful deployment, a Lambda function with the name given for AthenaCatalogName is deployed in your AWS environment. We use this function to issue federated queries to Vertica. The connector is now deployed and ready to use.

Using the connector

On the Athena console, you can query Vertica tables as shown in the following code. The value for <lambda_function> corresponds to the function you created in the previous section.

SELECT  
ORDER_ID,   
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
ORDER BY ORDER_ID DESC

In this example, we named the function as avc. The following screenshot shows our query results.

The following screenshot shows our query results.

This demonstrates that the newly deployed connector read the user-requested columns and the Vertica source table, wrapped an EXPORT statement around the SQL statement, and ran it in Vertica. The results of this query were exported to the specified S3 bucket (test-export-bucket) in Parquet format. The connector then invoked multiple Lambda functions to read the data from the S3 bucket using Amazon S3 Select and displayed it on the Athena console. Note that currently the connector exports Vertica timestamp and timestamptz data types as a varchar data type. Therefore we need to use the date_parse(string, format) function to convert the timestamps columns into the correct data type.

We can also create an Athena table using CTAS with the result set of the Vertica query using the following query:

CREATE TABLE default.vertica_customers_table AS (
SELECT  
ORDER_ID,
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
);

We can then use the newly created table to query the data as shown in the following screenshot.

We can then use the newly created table to query the data as shown in the following screenshot.

In addition, we can also query and join the customer data in Amazon S3 and orders data in Vertica using the following sample query:

WITH   
customer_data AS (  
  SELECT   
   	CUSTOMER_NAME,  
   	CUSTOMER_ID  
    
  FROM default.customer
  ),  
orders_data AS (  
  SELECT    
   	ORDER_ID,  
   	PRODUCT_NAME,  
   	CUSTOMER_ID   
  FROM "lambda:<lambda_function>".schema1.orders  
  )  
SELECT a.CUSTOMER_ID, b.ORDER_ID, b.PRODUCT_NAME
FROM customer_data a 
INNER JOIN orders_data b
ON a.customer_data.customer_id = b.orders_data.customer_id 
WHERE lower(b.PRODUCT_NAME) like 'pencil'
ORDER BY b.ORDER_ID DESC

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This demonstrates the ease of performing analytics across multiple platforms and data stores.

Conclusion

In this post, we introduced the Athena Vertica connector, its solution architecture, and demonstrated how to deploy the connector using the Athena Federated Query SDK. We saw how to run SQL queries on the Vertica data source. We also learned that we can use the connector to perform extract, transform, and load operations on the data in the Vertica tables and Amazon S3, enabling us to perform faster and better analytics across multiple platforms and data sources.

For more information about Athena Federated Query, see the GitHub repo.

Special Acknowledgement

Special acknowledgement goes to the Intuit Data Engineering staff Denise McInerney – Data Architect, Sanjay Rane – Group Engineering Manager – Data, and Kannan Nagarajan – Database Architect. They helped design, review, and support the development of the custom connector and architecture.


About the Authors

Kelly RaganKelly Ragan is a Senior Data Architect, Strategic Accounts Team, AWS Professional Services. He helps customers solve big data problems and wrestle with large-scale data warehouses. In his spare time, he enjoys snow skiing, bicycling, and camping in the Pacific Northwest.

 

 

Rohit MasurRohit Masur is an Associate Big Data Consultant, Data and Analytics Team, AWS Professional Services. He helps customers architect and implement solutions on AWS to get business value out of data. In his spare time, he enjoys reading books, going on long walks, and exploring new hiking trails in the Bay Area.

Automating AWS service logs table creation and querying them with Amazon Athena

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automating-aws-service-logs-table-creation-and-querying-them-with-amazon-athena/

I was working with a customer who was just getting started using AWS, and they wanted to understand how to query their AWS service logs that were being delivered to Amazon Simple Storage Service (Amazon S3). I introduced them to Amazon Athena, a serverless, interactive query service that allows you to easily analyze data in Amazon S3 and other sources. Together, we used Athena to query service logs, and were able to create tables for AWS CloudTrail logs, Amazon S3 access logs, and VPC flow logs. As I was walking the customer through the documentation and creating tables and partitions for each service log in Athena, I thought there had to be an easier and faster way to allow customers to query their logs in Amazon S3, which is the focus of this post.

This post demonstrates how to use AWS CloudFormation to automatically create AWS service log tables, partitions, and example queries in Athena. We also use the SQL query editor in Athena to query the AWS service log tables that AWS CloudFormation created.

Athena best practices

This solution is appropriate for ad hoc use and queries the raw log files. These raw files can range from compressed JSON to uncompressed text formats, depending on how they were configured to be sent to Amazon S3. If you need to query over hundreds of GBs or TBs of data per day in Amazon S3, performing ETL on your raw files and transforming them to a columnar file format like Apache Parquet can lead to increased performance and cost savings. You can save on your Amazon S3 storage costs by using snappy compression for Parquet files stored in Amazon S3. To learn more about Athena best practices, see Top 10 Performance Tuning Tips for Amazon Athena.

Table partition strategies

There are a few important considerations when deciding how to define your table partitions. Mainly you should ask: what types of queries will I be writing against my data in Amazon S3? Do I only need to query data for that day and for a single account, or do I need to query across months of data and multiple accounts? In this post, we talk about how to query across a single, partitioned account.

By partitioning data, you can restrict the amount of data scanned per query, thereby improving performance and reducing cost. When creating a table schema in Athena, you set the location of where the files reside in Amazon S3, and you can also define how the table is partitioned. The location is a bucket path that leads to the desired files. If you query a partitioned table and specify the partition in the WHERE clause, Athena scans the data only for that partition. For more information, see Table Location in Amazon S3 and Partitioning Data. You can then define partitions in Athena that map to the data residing in Amazon S3.

Let’s look at an example to see how defining a location and partitioning our table can improve performance and reduce costs. In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket, starting from the bucket name and going all the way down to the day.

In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket

Outlined in red is where we set the location for our table schema, and Athena then scans everything after the CloudTrail folder. We then outlined our partitions in blue. This is where we can specify the granularity of our queries. In this case, we partition our table down to the day, which is very granular because we can tell Athena exactly where to look for our data. This is also the most performant and cost-effective option because it results in scanning only the required data and nothing else.

If you have to query multiple accounts and Regions, you should back off the location to AWSLogs and then create a non-partitioned CloudTrail table. This allows you to write queries across all your accounts and Regions, but the trade-off is that your queries take much longer and are more expensive due to Athena having to scan all the data that comes after AWSLogs every query. However, querying multiple accounts is beyond the scope of this post.

Prerequisites

Before you get started, you should have the following prerequisites:

  • Service logs already being delivered to Amazon S3
  • An AWS account with access to your service logs

Deploying the automated solution in your AWS account

The following steps walk you through deploying a CloudFormation template that creates saved queries for you to run (Create Table, Create Partition, and example queries for each service log).

  1. Choose Launch Stack:

  1. Choose Next.
  2. For Stack name, enter a name for your stack.

You don’t need to have every AWS service log that the template asks for. If you don’t have CloudFront logs for example, you can leave the PathParameter as is. If you need CloudFront logs in the future, you can simply update the Create Table statement with the correct Amazon S3 location in Athena.

  1. For each service log table you want to create, follow the steps below:
  • Replace <_BUCKET_NAME> with the name of your S3 bucket that holds each AWS service log. You can use the same bucket name if it’s used to hold more than one type of service log.
  • Replace <Prefix> with your own folder prefix in Amazon S3. If you don’t have a prefix, make sure to remove it from the path parameters.
  • Replace <ACCOUNT-ID> and <REGION> with desired account and region.

Choose Next.

  1. Choose Next.
  2. Enter any tags you wish to assign to the stack.
  3. Choose Next.
  4. Verify parameters are correct and choose Create stack at the bottom.

Verify the stack has been created successfully. The stack takes about 1 minute to create the resources.

Querying your tables

You’re now ready to start querying your service logs.

  1. On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

  1. Choose Create Table – CloudTrail Logs to run the SQL statement in the Athena query editor.

Make sure the location for Amazon S3 is correct in your SQL statement and verify you have the correct database selected.

  1. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

The table cloudtrail_logs is created in the selected database. You can repeat this process to create other service log tables.

For partitioned tables like cloudtrail_logs, you must add partitions to your table before querying.

  1. On the Saved queries tab, choose Create Partition – CloudTrail.
  2. Update the Region, year, month, and day you want to partition. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

After you run the query, you have successfully added a partition to your cloudtrail_logs table. Let’s look at some of the example queries we can run now.

  1. On the Saved queries tab, choose Query – CloudTrail Logs.

This is a base template included to begin querying your CloudTrail logs.

  1. Highlight the query and choose Run query.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

Let’s say we have a spike in API calls from AWS Lambda and we want to see the users that the calls were coming from in a specific time range as well as the count for each user. Our query looks like the following code:

SELECT useridentity.sessioncontext.sessionissuer.username as "User",
       count(eventname) as "Lambda API Calls"
FROM cloudtrail_logs
WHERE eventsource = 'lambda.amazonaws.com'
       AND eventtime BETWEEN '2020-11-24T18:00:00Z' AND '2020-11-24T21:00:00Z' 
group by useridentity.sessioncontext.sessionissuer.username
order by count(eventname) desc

Or if we wanted to check our S3 Access Logs to make sure only authorized users are accessing certain prefixes:

SELECT *
FROM s3_access_logs
WHERE key='prefix/images/example.jpg'
        AND requester != 'arn:aws:iam::accountid:user/username'

Cost of solution and cleaning up

Deploying the CloudFormation template doesn’t cost anything. You’re only charged for the amount of data scanned by Athena. Remember to use the best practices we discussed earlier when querying your data in Amazon S3. For more pricing information, see Amazon Athena pricing and Amazon S3 pricing.

To clean up the resources that were created, delete the CloudFormation stack you created earlier. This also deletes the saved queries in Athena.

Summary

In this post, we discussed how we can use AWS CloudFormation to easily create AWS service log tables, partitions, and starter queries in Athena by entering bucket paths as parameters. We used CloudTrail and Amazon S3 access logs as examples, but you can replicate these steps for other service logs that you may need to query by visiting the Saved queries tab in Athena. Feel free to check out the video as well, where I go over how we store logs in Amazon S3 and then give a quick demo on how to deploy the solution.

For more information about service logs, see Easily query AWS service logs using Amazon Athena.


About the Author

Michael Hamilton is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife, kids, and a 2-year-old German shepherd.

Building Product Intelligence Platform with Cloudflare Workers

Post Syndicated from Robert Cepa original https://blog.cloudflare.com/building-product-intelligence-platform-with-cloudflare-workers/

Building Product Intelligence Platform with Cloudflare Workers

Building Product Intelligence Platform with Cloudflare Workers

“You can only improve what you can measure.”

We try to make Cloudflare’s onboarding experience as accessible as possible. For this reason, many customers are able to set up Cloudflare, configure their accounts and products, and discover additional products entirely on their own in our dashboard. Our Customer Onboarding team builds the dashboard experiences that make this possible.

The Onboarding team is data-driven, so we use data to validate our ideas. Rather than shipping the implementation of some idea right away, we run A/B tests with a small percentage of our customers. The results of these tests tell us what we should do with our idea next – either ship it to everyone, try to improve it (and run the test again), or discard it. This practice helps us with hedging our efforts so we don’t waste time on an idea that isn’t fruitful, and it provides us a method to reliably gather more information about needs of our customers. We use a third-party analytics tool to produce data for these A/B tests. This tool helps us to collect and analyse data about how our customers interact with the experiences that we build.

The onboarding experience in the dashboard is just one of many places where our customers interact with Cloudflare. Other Product teams, Customer Success team, and Marketing team build their own experiences in the dashboard and beyond, and they use their own analytics tools that best suit their needs.  Each of those teams has different goals, but we all have one thing in common – we want to understand our customers.

For example, knowing how our customers interact with campaigns and emails that Marketing teams build can help us on the Onboarding team to build a better, more personalized onboarding experience. Similarly, understanding how our customers interact with the onboarding experience in the dashboard can help our Marketing team to create more personalized emails and campaigns.

Using multiple third-party analytics tools across multiple teams created many challenges related to data integrity, security, privacy and performance. In this blogpost, we are going to talk about how we used Cloudflare Workers to build our product intelligence platform to overcome these challenges, serving hundreds of millions of requests per month from over 200 cities over the world, close to our customers, all without having to configure and maintain infrastructure.

Motivation: Data integrity, security, privacy, and performance

In the past, teams at Cloudflare used third-party scripts provided by analytics platforms like Google Analytics and Heap to measure user behavior. These scripts presented multiple challenges:

Data Integrity

In the product analytics world, an “event” is any user interaction with the product. Because we were using third-party scripts to send event data to varied analytics destinations, it was hard to make sure that these event data are consistent across all these destinations. In our case, our analytics tools categorized event data in different ways, creating confusion for our teams. For example, if a Cloudflare customer purchased our Workers product, Heap would send an event named “Purchase Workers”, while Google Analytics called it “Product Purchase Success” with a data attribute label: “workers”. Nobody trusted this data, so they sought out more reliable sources, such as billing databases.

Security and privacy

Third-party analytics vendors use third-party scripts to track end-user behavior. We take security and data privacy very seriously, and these scripts pose risks to us and our customers. They are hard to audit, and make it hard to ensure they don’t send data we don’t want them to send. They also change over time, and can be buggy, inefficient, and hard to test.

Performance

We want to give Cloudflare dashboard users a highly performant experience, but third-party scripts can cause slowdowns. For example, they can have a significant size because they try to do a lot of things automatically. Having to load and parse too much JavaScript can extend page load and render times, delay user interaction, and drain more battery. They can also fire too many network requests to multiple servers.

Vendor lock-in

Cloudflare’s dashboard codebase is massive, and hardcoded tracking calls tied to a specific analytics vendor makes that vendor difficult to replace. Moreover, adding a new vendor would require significant effort to add that vendor’s tracking calls everywhere, and would have a negative impact on performance on the frontend.

We wanted to solve these problems by creating a system that would decouple what we measure from how we measure it. The requirements were:

  • Unified API: a single API that all clients adhere to regardless of the vendor they primarily use. Engineers don’t need to understand how analytics vendors work and what data they require.
  • Secure and compliant: we fully own and control the code, protecting our customers from vulnerabilities in third-party code. We fully control how our data is measured, distributed, and stored.
  • Performant: lightweight, fast, and non-blocking on the frontend. Move as much logic as we can to the backend.
  • Flexible: ability to add/replace/remove vendors with relatively small effort on the backend, and no effort on the frontend.

We chose to use Cloudflare Workers, which deploys serverless code on the edge across the globe, as our backend infrastructure. Workers offers the following advantages:

  • Nimbleness through serverless development: Our team is small, and analytics wasn’t our primary focus at the time, so we wanted to create something quickly without having to worry about setting up and maintaining the infrastructure. With Workers, we never have to look at things like system health status, or load balancing and scaling, or how fast it is across the world. Everything is included in the package, and works really well.
  • JavaScript support: Since we work on user experiences, we are mostly UI-engineering focused and use React+TypeScript every day. Our team can write frontend and backend code in the same language, which reduces cognitive load.
  • Dogfooding opportunities: We help to test Workers at scale, which makes the product stronger.

Iteration #1: Sparrow and Trace Worker

Our analytics platform has gone through multiple iterations. The first version had two components – a JavaScript SDK called Sparrow, and a corresponding worker we call Trace.

Sparrow

The Sparrow SDK turns various data about product events into a consistent format, so internal users don’t need to understand API requirements further down the data pipeline.

Sparrow has 2 main features:

// tracks page visits
sparrow.pageview(pathname: string);

// tracks user interaction
sparrow.track(event: string, properties: Record<string, any>);

The pageview function can be run whenever a page loads in an application, which allows us to track where users navigate.

The track function is more generic. We can send any event name with any metadata. For example, the event name can be “purchase product” with properties: { product: “workers” }.

Both functions create a JSON object with the following interface and send it to the Trace Worker, which forwards it along to various analytics platforms:

{
  event: string,
  deviceId: string,
  userId?: string,
  properties: Record<string, any>
}

Trace Worker

The Trace Worker receives event data from Sparrow, checks payload correctness to make sure the request came from valid sources, and fans out the data to all connected analytics providers. The following diagram shows the pipeline.

Building Product Intelligence Platform with Cloudflare Workers

Any third-party vendor can be added to Trace Worker, as long as that vendor provides a REST API. How the data are parsed, transformed, and sent to those APIs is implemented by us in our custom functions we call trackers. Trackers aim to replicate the behavior of third-party scripts provided by these vendors. Why are we doing this when we can just use third-party scripts? The main reasons are security and data privacy.

  • We use allowlists to explicitly define event names and event properties that can be sent further upstream. This helps us to prevent sending potentially sensitive information from cookies, URL query parameters, or data payloads. Every event and data property that is not in the allowlist is ignored.
  • On top of that, all allowed properties are sanitized by our internal data scrubber.
  • Always HTTPS: Some third-party scripts still use non-secure HTTP protocol. Trace Worker runs on HTTPS, and we make sure that outgoing requests are also using HTTPS.
  • We fully control the code, which means there are no surprises – the code cannot update without us knowing it.
  • Because the logic lives in the worker, our customers are not exposed to unnecessary client-side risks from using eval or document.write.

Another benefit is performance – because most of our analytics framework’s logic lives in the worker, there’s less JavaScript we need to send to the client, which means faster load times! SparrowJS on its own is super lightweight.

Here’s the simplified implementation of Trace Worker:

import trackers from ‘./trackers;
import { generateContext } from ‘./utils’;
Import { sanitize } from ‘./sanitizer’;

addEventListener(‘fetch’, event => {
  event.respondWith(handle(event));
}

async function handle(event: FetchEvent) {
  try {
    const payload = sanitize(await event.request.json());

    const context = await generateContext(event);


    // fan out
    event.waitUntil(Promise.allSettled(trackers.map(tracker => tracker[payload.event === “pageview” ? “pageview” : “event”](payload, context))));

    // return new Response(“OK”, { status:”OK”, statusCode: 200  })
  } catch (err) {
    // logging
    return new Response(“Something went wrong”, { 
      status:”Internal Server Error”, 
      statusCode: 500 
    })
  }
}

trackers is an array of tracking functions for each third-party vendor. Under the hood, they transform the incoming requests from Sparrow and send them to each vendor’s REST APIs. For example, this is a simplified implementation of Google Analytics tracker that transforms Sparrow payloads to adhere to Measurement Protocol:

const URL = ‘https://www.google-analytics.com/collect’;

export async function event(event: TrackingEvent, context: Context) {
  return fetch(URL, {
    method: ‘POST’,
    body: new URLSearchParams({
      ...createCommonParams(context),
      t: ‘event’,
      ea: context.data.event,
      ec: context.data.properties.category || ‘Uncategorized Event’,
      el: context.data.properties.label,
    }).toString()
  });
}

export async function pageview(event: TrackingEvent, context: Context) {
  return fetch(URL, {
    method: ‘POST’,
    body: new URLSearchParams({
      ...createCommonParams(context),
      t: ‘pageview’,
      dp: context.data.event
    }).toString()
  });
}

function createCommonParams(context: Context) {
  return {
    tid: context.gaId,
    v: ‘1’,
    cid: context.data.deviceId,
    uid: context.data.userId,
    ...context.data.properties
  }
}

Similarly, Heap tracker implements its own transformation for https://heapanalytics.com/api/track.

As you may have noticed, Trace Worker is not your typical service worker. There is no origin service – Trace Worker is the service, except it runs everywhere in the network.

Problem: Nobody (still) trusts the data

Iteration #1 of our data analytics’ platform worked well for a while, but as more product teams used Sparrow to run their own analyses, we started getting reports of data not looking right. The reports were along the lines of:

  • “Google Analytics underreports Heap by x percent…”
  • “Product purchases are not consistent with DB…”
  • “Signup conversion dropped by x percent, but we don’t think that’s actually happening…”

At the same time, we added another vendor – Amplitude, which made these problems even more complicated, because we now had three systems out of sync.

Due to the distributed nature of our analytics platform, we had a lot of potential breaking points. To find a solution, we needed to answer questions like:

Dashboard/Sparrow problems

  • Are product teams using Sparrow correctly?
  • Do we have any hard redirects that cause request cancellation? Should we try Beacon API and see what changes?
  • How does Trace Worker respond?

Trace Worker problems

  • Are we not catching some exceptions?
  • Are we exceeding runtime limits?
  • Are we hitting firewall/DDoS protection?

Third-party vendors problems

  • Are they silently rejecting or not storing some payloads sent from Trace Worker? Google Analytics always responds with 200 OK to any request. How are other vendors handling requests?
  • Are they having internal issues? How can we know, since these systems are blackboxes?
  • Can we recover any lost data?

If we release Trace Worker and our event volume takes a nosedive, it’s a pretty strong clue that it’s caused by us and we should roll back.

But what if nothing unusual happens for a while, and then page views drop by 20%? 10%? 5%? Is it us, or third-party vendors, or just a nice sunny day in Europe and so people are not sitting behind their desks? As we had no source of truth to compare these data against, this was impossible to answer.

Regardless, we knew we had to get serious about observability before we even begin asking these questions. At the time, there was no wrangler tail or Workers analytics (there are now!). Also, even though we used Sentry, an app monitoring platform, our logger was a very basic wrapper around Sentry’s REST API, because there was no full-blown Sentry SDK for Workers runtime – the current SDKs use globals, causing race-conditions in Workers.

The goals were:

1. Get better at diagnosing our own problems – catch every exception in Trace Worker and every non-200 HTTP response from third-party vendors, and log it to some visible source, with some helpful stack-traces and other metadata like request headers and body.

2. Be able to isolate problems that may be happening outside of our codebase – have a single source of truth for all incoming/outgoing requests which we can query and compare against data in our third-party tools. This would help us discover dropped requests that weren’t represented with an error state.

Iteration #2: New Sentry SDK for Workers

To meet these goals, we implemented and open-sourced a new Sentry SDK for Workers called toucan-js. Toucan adheres to the Sentry unified API guidelines, so the interface is familiar from other SDKs (node/browser). It currently supports capturing errors and messages with stack-traces that can be enhanced with source maps, breadcrumbs, request data/headers/cookies, tags, and extra metadata.

Since we replaced our simple Sentry logger with toucan-js, every single log started having:

  • Full request payload and some allowed headers
  • Stack-trace with source maps
  • Response status code and body (if applicable)

If anything goes wrong, we have all the information we need to reproduce and fix it – request body, headers, stack-trace, and all necessary context.

At the same time, we started sending cloned requests to the /debug/collect endpoint in Google Analytics Tracker that, combined with Sentry alerts, helped us find many dropped requests due to schema adherence problems such as “The value provided for parameter ‘cid’ is invalid.”

Iteration #3: The single source of truth

Better Sentry logs helped us with major drifts, but the data were still slightly off. While we observed some intermittent HTTP errors in Sentry, when we compared the number of these alerts with differences between different analytics platforms the numbers didn’t add up.

Due to this uncertainty, we decided to own the data layer, and create our own database – the single source of truth of all incoming payloads sent from Sparrow to Trace Worker before any transformation.

In order for us to trust the data in this ‘single source of truth’ database, the database needed to receive Sparrow payloads from outside of Trace Worker, preferably from a system that sits right in front of it, with minimal logic, that changes rarely, and that is highly available. Ideally, this system was to do three things – grab the incoming request payload, log it, and forward it to Trace Worker. These payloads should be logged raw, untouched, corresponding to whatever is sent from clients (SparrowJS).

The nice thing about this solution is that even if Trace Worker gets a bad release, we will not lose any data. Another strong case for us owning the data is that incidents in third-party vendors will not affect us anymore, because the solution will open the door for backfilling of dropped requests.

We considered Workers KV — Cloudflare’s low latency key-value store hosted at the network edge — for our storage needs, but being able to query the data was really important for us, because we wanted to diagnose complex problems quickly and select the data based on some property. For this reason, we went in a different direction.

Google BigQuery was our storage solution

We decided to use Google BigQuery for our ‘single source of truth’ database because:

  • It was designed for big data
  • It lets us use SQL to query what we need
  • We can use REST API in our new system to send the logs

Of course, Google BigQuery is a columnar database. How would we use it to store JSON data?

The first option was to write some kind of transformer that would map every object property to a column, but that was against our requirement of a system with minimal logic. The set of allowed characters we could use to name a column was also limited, so we wouldn’t be able to map column names back to original properties.

Due to these limitations we decided to store raw json strings, and use JSON functions to build views on top of these data.

First, we created a partitioned-by-day table called raw with the following schema:

Field name Type
eventId STRING
timestamp TIMESTAMP
data STRING

Sparrow’s payloads are stored in the data field as stringified JSON.

We don’t run queries against this table directly. Instead, we built a view called raw_normalized that looks something like this:

select 
  json_extract_scalar(data, '$.event') as event, 
  json_extract_scalar(data, '$.deviceId') as deviceId, 
  json_extract_scalar(data, '$.userId') as userId, 
  json_extract_scalar(data, '$.properties.category') as category, 
  json_extract_scalar(data, '$.properties.productName) as productName 
from raw;

With this setup, we can write complex SQL queries while retaining the original JSON values. To demonstrate on a simple example, when we insert a row with data being:

{
  event: “purchase product”,
  deviceId: “desktop1”,
  userId: “michelle1”,
  properties: { category: “billing”, productName: “workers” }
}

and then run:

select * from data_normalized where event = ‘purchase product’;

we get:

event deviceId userId category productName
purchase product desktop1 michelle1 billing workers

We had our data layer prepared. But how to actually push the data into BigQuery?

Dispatcher Worker

We created another worker, the Dispatcher, that sits in front of Trace Worker! As we said earlier, the sole purpose of Dispatcher Worker is to:

  1. Read the incoming request body
  2. Send it to BigQuery
  3. Forward the incoming request to Trace Worker

The architecture changed to:

Building Product Intelligence Platform with Cloudflare Workers

Here’s a simplified implementation:

import Toucan from 'toucan-js';
import { BigQueryClient } from “./bigquery”;

const bigQuery = new BigQueryClient({
  serviceAccountEmail: SERVICE_ACCOUNT_EMAIL,
  serviceAccountSecret: SERVICE_ACCOUNT_SECRET,
  projectId: PROJECT_ID,
  datasetId: DATASET_ID
});
 
addEventListener('fetch', event => {
  const toucan = new Toucan({dsn: DSN, event});
 
  // do the work without blocking the response
  event.waitUntil(dispatch(event, biqQuery, toucan));

  event.respondWith(return new Response('OK', {
      status: 200,
      statusText: 'OK'
  });
});
 
async function dispatch(event: FetchEvent, bigQuery: BigQueryClient, toucan: Toucan) { 
  try {
     // Original request to be sent to Trace Worker
    const requestOriginal = event.request;
    
    // We clone the request here to allow multiple uses of Body
    const requestClone = requestOriginal.clone();
    
    // read the request payload
    const payload = await requestClone.text();
 
    // create a timestamp
    const timestamp = Date.now();
 
    // send to BQ
    const bigQueryResponse = await bigQuery.insertRow({timestamp, json: payload});
     
    // log failed logs
    if (!bigQueryResponse.ok) {    
       sentry.captureException(await HttpError.fromResponse(bigQueryResponse));
    }
 
    // send to trace worker
    const traceResponse = await fetch(TRACE_WORKER_URL, requestOriginal)

    // log failed logs
    if (!traceResponse.ok) {    
       toucan.captureException(await HttpError.fromResponse(traceResponse));
    }
  } catch (err) {
   toucan.captureException(err);
  }
}

BigQueryClient is a lightweight SDK we implemented to be able to send data to BigQuery. Internally, it builds a request and sends it to Google Cloud Platform using their Stream API. We won’t go into details, but we want to briefly cover how we handle authentication.

Google Cloud APIs use the OAuth 2.0 protocol for authenticating both user accounts and service accounts. In short, the protocol involves building a signed JWT (JSON Web Token), sending it to Google Authorization Server to obtain access token, and sending that access token with all requests to GCP API.

We tried a few libraries to help us build that JWK (such as jsonwebtoken), but they were too slow due to their RSA implementation, and we were hitting runtime limits. So we implemented our own JWT builder using SubtleCrypto, which is a web standard that is also implemented in Cloudflare Workers!

JSON Web Tokens consist of 3 parts:

  1. Header
  2. Body
  3. Signature

First, we build the header:

const tokenHeader = base64UrlEncode(JSON.stringify({
  alg: ‘RS256’,
  typ: ‘JWT’
}));

Then we build the token body, and concatenate with the header to build the token base:

const nowSeconds = Date.now() / 1000;
const tokenTtl = 3600;
const expire = nowSeconds + tokenTtl;

const tokenBody = base64UrlEncode(JSON.stringify({
  iss: SERVICE_ACCOUNT_EMAIL,
  scope : ‘https://www.googleapis.com/auth/bigquery.insertdata’
  aud: ‘https://www.googleapis.com/oauth2/v4/token’,
  exp: expire,
  iat: nowSeconds
}));
 
const tokenBase = `${tokenHeader}.${tokenBody}`;

All that’s left is signing the token base:

const signature = base64UrlEncode(
  arrayBufferToString(
    await crypto.subtle.sign(
      {
        name: ‘RSASSA-PKCS1-v1_5’,
        hash: { name: ‘SHA-256’ }
      },
     SERVICE_ACCOUNT_JWK,
     new TextEncoder.encode(tokenBase)
    )
  )
);
 
const jwt = `${tokenBase}.${signature}`;

Once we have the JWT, our SDK sends a request to the Authorization Server to retrieve the access token:

const token = await (await fetch (‘https://www.googleapis.com/oauth2/v4/token’, {
  method: ‘POST’,
  body: 'grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=' + jwt,
  headers: { ‘content-type’: ‘application/x-www-form-urlencoded’ }
})).json();

The access token doesn’t need to be requested with every FetchEvent – it can be reused until it expires. Caching the token helps with performance, because RSA encryption is costly.

BigQueryClient stores the access token in a global variable, so all isolates that share the environment can use it. In fact, that’s the reason we initialize BigQueryClient outside of addEventListener. The SDK manages the token internally and handles the OAuth2 ceremony for the clients – the first call of insertAll generates a JWT to retrieve and store an access token, but subsequent calls of insertAll use the access token from the memory.

With all the pieces put together, this is the state of our analytics pipeline today.

Future work: Risk management

Logging all data gives us great visibility and makes debugging easier. We now have a clear picture of where in the pipeline the problems are, and we have all possible information to fix them. We can react to problems pretty well, but we would like to get better at preventing problems in production before they happen.

We currently have two environments: staging and production. Our staging environment is behind Cloudflare Access, only accessible to Cloudflare employees. When we merge our changes, the CI pipeline automatically deploys them to the staging environment where we can test these changes before they get to production.

While our staging environment helps us with catching catastrophic errors early, it’s not too great for finding errors that may cause partial data drops, because the staging traffic is generally very low, which makes it harder to spot changes in data patterns. From this perspective, our releases are very risky.

To reduce these risks, we need to test our changes with production traffic. We can achieve that with a variant of blue-green deployment approach, ensuring we have two production environments, as identical as possible. We could use another Worker, sitting in front of Blue and Green Dispatcher Workers, receiving live traffic and duplicating all requests to blue/green variants. One of these variants, green for example, would be using live code and live data sources, and the other one, blue, would be in the final stage of testing, with our new code, logging to the staging data sources. Once we validate the data in staging data sources, we can easily cut-over by switching data sources between green and blue.

Future work: Durability

Outages of third-party vendors don’t affect us anymore, since we essentially own the data layer and can backfill dropped events once a vendor is fixed and back online.

This presents us with a question: How can we automate this with Workers?

How can we make sure that everything in our source of truth will eventually get stuffed to the analytics tools?

Can we implement a true distributed and durable streaming platform like Kafka at the edge?

We think we can! More about this next time.

Using AWS DevOps Tools to model and provision AWS Glue workflows

Post Syndicated from Nuatu Tseggai original https://aws.amazon.com/blogs/devops/provision-codepipeline-glue-workflows/

This post provides a step-by-step guide on how to model and provision AWS Glue workflows utilizing a DevOps principle known as infrastructure as code (IaC) that emphasizes the use of templates, source control, and automation. The cloud resources in this solution are defined within AWS CloudFormation templates and provisioned with automation features provided by AWS CodePipeline and AWS CodeBuild. These AWS DevOps tools are flexible, interchangeable, and well suited for automating the deployment of AWS Glue workflows into different environments such as dev, test, and production, which typically reside in separate AWS accounts and Regions.

AWS Glue workflows allow you to manage dependencies between multiple components that interoperate within an end-to-end ETL data pipeline by grouping together a set of related jobs, crawlers, and triggers into one logical run unit. Many customers using AWS Glue workflows start by defining the pipeline using the AWS Management Console and then move on to monitoring and troubleshooting using either the console, AWS APIs, or the AWS Command Line Interface (AWS CLI).

Solution overview

The solution uses COVID-19 datasets. For more information on these datasets, see the public data lake for analysis of COVID-19 data, which contains a centralized repository of freely available and up-to-date curated datasets made available by the AWS Data Lake team.

Because the primary focus of this solution showcases how to model and provision AWS Glue workflows using AWS CloudFormation and CodePipeline, we don’t spend much time describing intricate transform capabilities that can be performed in AWS Glue jobs. As shown in the Python scripts, the business logic is optimized for readability and extensibility so you can easily home in on the functions that aggregate data based on monthly and quarterly time periods.

The ETL pipeline reads the source COVID-19 datasets directly and writes only the aggregated data to your S3 bucket.

The solution exposes the datasets in the following tables:

Table Name Description Dataset location Provider
countrycode Lookup table for country codes s3://covid19-lake/static-datasets/csv/countrycode/ Rearc
countypopulation Lookup table for the population of each county s3://covid19-lake/static-datasets/csv/CountyPopulation/ Rearc
state_abv Lookup table for US state abbreviations s3://covid19-lake/static-datasets/json/state-abv/ Rearc
rearc_covid_19_nyt_data_in_usa_us_counties Data on COVID-19 cases at US county level s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-counties/ Rearc
rearc_covid_19_nyt_data_in_usa_us_states Data on COVID-19 cases at US state level s3://covid19-lake/rearc-covid-19-nyt-data-in-usa/csv/us-states/ Rearc
rearc_covid_19_testing_data_states_daily Data on COVID-19 cases at US state level s3://covid19-lake/rearc-covid-19-testing-data/csv/states_daily/ Rearc
rearc_covid_19_testing_data_us_daily US total test daily trend s3://covid19-lake/rearc-covid-19-testing-data/csv/us_daily/ Rearc
rearc_covid_19_testing_data_us_total_latest US total tests s3://covid19-lake/rearc-covid-19-testing-data/csv/us-total-latest/ Rearc
rearc_covid_19_world_cases_deaths_testing World total tests s3://covid19-lake/rearc-covid-19-world-cases-deaths-testing/ Rearc
rearc_usa_hospital_beds Hospital beds and their utilization in the US s3://covid19-lake/rearc-usa-hospital-beds/ Rearc
world_cases_deaths_aggregates Monthly and quarterly aggregate of the world s3://<your-S3-bucket-name>/covid19/world-cases-deaths-aggregates/ Aggregate

Prerequisites

This post assumes you have the following:

  • Access to an AWS account
  • The AWS CLI (optional)
  • Permissions to create a CloudFormation stack
  • Permissions to create AWS resources, such as AWS Identity and Access Management (IAM) roles, Amazon Simple Storage Service (Amazon S3) buckets, and various other resources
  • General familiarity with AWS Glue resources (triggers, crawlers, and jobs)

Architecture

The CloudFormation template glue-workflow-stack.yml defines all the AWS Glue resources shown in the following diagram.

architecture diagram showing ETL process

Figure: AWS Glue workflow architecture diagram

Modeling the AWS Glue workflow using AWS CloudFormation

Let’s start by exploring the template used to model the AWS Glue workflow: glue-workflow-stack.yml

We focus on two resources in the following snippet:

  • AWS::Glue::Workflow
  • AWS::Glue::Trigger

From a logical perspective, a workflow contains one or more triggers that are responsible for invoking crawlers and jobs. Building a workflow starts with defining the crawlers and jobs as resources within the template and then associating it with triggers.

Defining the workflow

This is where the definition of the workflow starts. In the following snippet, we specify the type as AWS::Glue::Workflow and the property Name as a reference to the parameter GlueWorkflowName.

Parameters:
  GlueWorkflowName:
    Type: String
    Description: Glue workflow that tracks all triggers, jobs, crawlers as a single entity
    Default: Covid_19

Resources:
  Covid19Workflow:
    Type: AWS::Glue::Workflow
    Properties: 
      Description: Glue workflow that tracks specified triggers, jobs, and crawlers as a single entity
      Name: !Ref GlueWorkflowName

Defining the triggers

This is where we define each trigger and associate it with the workflow. In the following snippet, we specify the property WorkflowName on each trigger as a reference to the logical ID Covid19Workflow.

These triggers allow us to create a chain of dependent jobs and crawlers as specified by the properties Actions and Predicate.

The trigger t_Start utilizes a type of SCHEDULED, which means that it starts at a defined time (in our case, one time a day at 8:00 AM UTC). Every time it runs, it starts the job with the logical ID Covid19WorkflowStarted.

The trigger t_GroupA utilizes a type of CONDITIONAL, which means that it starts when the resources specified within the property Predicate have reached a specific state (when the list of Conditions specified equals SUCCEEDED). Every time t_GroupA runs, it starts the crawlers with the logical ID’s CountyPopulation and Countrycode, per the Actions property containing a list of actions.

  TriggerJobCovid19WorkflowStart:
    Type: AWS::Glue::Trigger
    Properties:
      Name: t_Start
      Type: SCHEDULED
      Schedule: cron(0 8 * * ? *) # Runs once a day at 8 AM UTC
      StartOnCreation: true
      WorkflowName: !Ref GlueWorkflowName
      Actions:
        - JobName: !Ref Covid19WorkflowStarted

  TriggerCrawlersGroupA:
    Type: AWS::Glue::Trigger
    Properties:
      Name: t_GroupA
      Type: CONDITIONAL
      StartOnCreation: true
      WorkflowName: !Ref GlueWorkflowName
      Actions:
        - CrawlerName: !Ref CountyPopulation
        - CrawlerName: !Ref Countrycode
      Predicate:
        Conditions:
          - JobName: !Ref Covid19WorkflowStarted
            LogicalOperator: EQUALS
            State: SUCCEEDED

Provisioning the AWS Glue workflow using CodePipeline

Now let’s explore the template used to provision the CodePipeline resources: codepipeline-stack.yml

This template defines an S3 bucket that is used as the source action for the pipeline. Any time source code is uploaded to a specified bucket, AWS CloudTrail logs the event, which is detected by an Amazon CloudWatch Events rule configured to start running the pipeline in CodePipeline. The pipeline orchestrates CodeBuild to get the source code and provision the workflow.

For more information on any of the available source actions that you can use with CodePipeline, such as Amazon S3, AWS CodeCommit, Amazon Elastic Container Registry (Amazon ECR), GitHub, GitHub Enterprise Server, GitHub Enterprise Cloud, or Bitbucket, see Start a pipeline execution in CodePipeline.

We start by deploying the stack that sets up the CodePipeline resources. This stack can be deployed in any Region where CodePipeline and AWS Glue are available. For more information, see AWS Regional Services.

Cloning the GitHub repo

Clone the GitHub repo with the following command:

$ git clone https://github.com/aws-samples/provision-codepipeline-glue-workflows.git

Deploying the CodePipeline stack

Deploy the CodePipeline stack with the following command:

$ aws cloudformation deploy \
--stack-name codepipeline-covid19 \
--template-file cloudformation/codepipeline-stack.yml \
--capabilities CAPABILITY_NAMED_IAM \
--no-fail-on-empty-changeset \
--region <AWS_REGION>

When the deployment is complete, you can view the pipeline that was provisioned on the CodePipeline console.

CodePipeline console showing the deploy pipeline in failed state

Figure: CodePipeline console

The preceding screenshot shows that the pipeline failed. This is because we haven’t uploaded the source code yet.

In the following steps, we zip and upload the source code, which triggers another (successful) run of the pipeline.

Zipping the source code

Zip the source code containing Glue scripts, CloudFormation templates, and Buildspecs file with the following command:

$ zip -r source.zip . -x images/\* *.history* *.git* *.DS_Store*

You can omit *.DS_Store* from the preceding command if you are not a Mac user.

Uploading the source code

Upload the source code with the following command:

$ aws s3 cp source.zip s3://covid19-codepipeline-source-<AWS_ACCOUNT_ID>-<AWS_REGION>

Make sure to provide your account ID and Region in the preceding command. For example, if your AWS account ID is 111111111111 and you’re using Region us-west-2, use the following command:

$ aws s3 cp source.zip s3://covid19-codepipeline-source-111111111111-us-west-2

Now that the source code has been uploaded, view the pipeline again to see it in action.

CodePipeline console showing the deploy pipeline in success state

Figure: CodePipeline console displaying stage “Deploy” in-progress

Choose Details within the Deploy stage to see the build logs.

CodeBuild console displaying build logs

Figure: CodeBuild console displaying build logs

To modify any of the commands that run within the Deploy stage, feel free to modify: deploy-glue-workflow-stack.yml

Try uploading the source code a few more times. Each time it’s uploaded, CodePipeline starts and runs another deploy of the workflow stack. If nothing has changed in the source code, AWS CloudFormation automatically determines that the stack is already up to date. If something has changed in the source code, AWS CloudFormation automatically determines that the stack needs to be updated and proceeds to run the change set.

Viewing the provisioned workflow, triggers, jobs, and crawlers

To view your workflows on the AWS Glue console, in the navigation pane, under ETL, choose Workflows.

Glue console showing workflows

Figure: Navigate to Workflows

To view your triggers, in the navigation pane, under ETL, choose Triggers.

Glue console showing triggers

Figure: Navigate to Triggers

To view your crawlers, under Data Catalog, choose Crawlers.

Glue console showing crawlers

Figure: Navigate to Crawlers

To view your jobs, under ETL, choose Jobs.

Glue console showing jobs

Figure: Navigate to Jobs

Running the workflow

The workflow runs automatically at 8:00 AM UTC. To start the workflow manually, you can use either the AWS CLI or the AWS Glue console.

To start the workflow with the AWS CLI, enter the following command:

$ aws glue start-workflow-run --name Covid_19 --region <AWS_REGION>

To start the workflow on the AWS Glue console, on the Workflows page, select your workflow and choose Run on the Actions menu.

Glue console run workflow

Figure: AWS Glue console start workflow run

To view the run details of the workflow, choose the workflow on the AWS Glue console and choose View run details on the History tab.

Glue console view run details of a workflow

Figure: View run details

The following screenshot shows a visual representation of the workflow as a graph with your run details.

Glue console showing visual representation of the workflow as a graph.

Figure: AWS Glue console displaying details of successful workflow run

Cleaning up

To avoid additional charges, delete the stack created by the CloudFormation template and the contents of the buckets you created.

1. Delete the contents of the covid19-dataset bucket with the following command:

$ aws s3 rm s3://covid19-dataset-<AWS_ACCOUNT_ID>-<AWS_REGION> --recursive

2. Delete your workflow stack with the following command:

$ aws cloudformation delete-stack --stack-name glue-covid19 --region <AWS_REGION>

To delete the contents of the covid19-codepipeline-source bucket, it’s simplest to use the Amazon S3 console because it makes it easy to delete multiple versions of the object at once.

3. Navigate to the S3 bucket named covid19-codepipeline-source-<AWS_ACCOUNT_ID>- <AWS_REGION>.

4. Choose List versions.

5. Select all the files to delete.

6. Choose Delete and follow the prompts to permanently delete all the objects.

S3 console delete all object versions

Figure: AWS S3 console delete all object versions

7. Delete the contents of the covid19-codepipeline-artifacts bucket:

$ aws s3 rm s3://covid19-codepipeline-artifacts-<AWS_ACCOUNT_ID>-<AWS-REGION> --recursive

8. Delete the contents of the covid19-cloudtrail-logs bucket:

$ aws s3 rm s3://covid19-cloudtrail-logs-<AWS_ACCOUNT_ID>-<AWS-REGION> --recursive

9. Delete the pipeline stack:

$ aws cloudformation delete-stack --stack-name codepipeline-covid19 --region <AWS-REGION>

Conclusion

In this post, we stepped through how to use AWS DevOps tooling to model and provision an AWS Glue workflow that orchestrates an end-to-end ETL pipeline on a real-world dataset.

You can download the source code and template from this Github repository and adapt it as you see fit for your data pipeline use cases. Feel free to leave comments letting us know about the architectures you build for your environment. To learn more about building ETL pipelines with AWS Glue, see the AWS Glue Developer Guide and the AWS Data Analytics learning path.

About the Authors

Nuatu Tseggai

Nuatu Tseggai is a Cloud Infrastructure Architect at Amazon Web Services. He enjoys working with customers to design and build event-driven distributed systems that span multiple services.

Suvojit Dasgupta

Suvojit Dasgupta is a Sr. Customer Data Architect at Amazon Web Services. He works with customers to design and build complex data solutions on AWS.

How EMX reduced data pipeline costs by 85% with Amazon Athena

Post Syndicated from Gary Bouton original https://aws.amazon.com/blogs/big-data/how-emx-reduced-data-pipeline-costs-by-85-with-amazon-athena/

This is a guest blog post by Gary Bouton and Louis Ashner from EMX. In their own words, “ENGINE Media Exchange (EMX) is a leading marketing technology company, leveraging a patented, end-to-end tech stack purpose-built to meet the demands of today’s digital marketplace. The company creates both open- and closed-loop solutions designed to unify advertisers, platforms, and publishers across digital media channels—including advanced TV, video, display, search, and social.”

While recognized as an independent solutions provider for the digital media landscape, EMX also serves as the technology and programmatic division for its parent, ENGINE—a global data-driven marketing company serving advertising’s most recognized brands.

In the past, we used typical legacy data warehouse solutions for our data pipeline. We needed massive clusters to house all our raw data as well as advanced pipelines to import that data into the final database. We then needed to query the raw data clusters to aggregate and move it into a separate cluster for more frequent querying. This process was not only time consuming, but also quite expensive, because these legacy database clusters weren’t cheap in order to house that much data.

Then came Amazon Athena, which allowed us to not only simplify our pipeline, but also save on costs significantly. We were able to simply route the raw data straight to Amazon Simple Storage Service (Amazon S3) at minimal storage costs, then query the data with Athena to move aggregate data into small Amazon Redshift clusters for querying more frequently. Athena’s querying is not only quick, but the query costs are mere pennies when the table is set up correctly with partitions and the query utilizes them properly. Additionally, we can increase our data retention time, because the cost of storing data in Amazon S3 is significantly cheaper than an ever-growing legacy data warehousing solution.

In short, Athena allowed us to simplify our data pipeline while saving 85% on data storage costs at the same time.

This post discusses the following:

  • Why EMX digital chose Athena for its backend ETL workflow
  • How EMX manages Athena performance and run time
  • How EMX continues to scale Athena with new products and create coherent workflows
  • The benefits of this solution for EMX

Advantages of a robust backend ETL workflow when dealing with fast and furious big data

For most companies, data is an ever-growing problem. The volume, velocity, variety, and veracity of its availability can be performance limiting and a financial burden.

Data is very important to us at EMX; we process over 450,000 requests per second, which we clean, audit, and deliver for reporting and optimizations to keep our clients informed and current in an ever-changing ad space.

To do this, we have to have a backend system that is robust, on time, available, and cost effective in order to meet the demands of our split-second decision-making.

Why Athena is the right tool for EMX

As detailed below, Athena’s pay-per-query pricing model, performance and reliability at scale, and ease of use made it the right tool for us.

  • Scale – EMX processes over 2 TB of raw and sometimes unstructured data every hour for reporting and optimizations. The ability to run these jobs without managing the cluster optimizations directly allows the team to focus on more research and product goals. Using Athena allows us to focus more on research, product development ideas, and ad hoc tasks, and alleviates us from having to take time to estimate the process and computational power needed to complete jobs in time.
  • Cost – Cost per query is at least four times cheaper than other backend ETL tools, and its on-demand nature means we only pay for what we use. We’re no longer losing increasing costs by keeping up a system that isn’t being used. The feedback of cost per query through Athena also allows us to tune and optimize our logic, to not only reduce that cost further but test new ways to split into and run our production ETL jobs.
  • Resilience – We have thrown everything but the kitchen sink problems at Athena while building out our production pipeline, and were impressed at the lack of failure from the service. Even though we don’t directly own the resources to the cloud solution of Athena, it has always had high availability. In instances where availability was hampered, Athena has made it easy and straightforward to add in workflow hooks to retry failed jobs when a queue becomes available.
  • Ease of use – Unlike most competitor offerings, Athena works out of the box. It’s very easily customized using the Athena GUI, or you can build your own roles, rules, database structures, and projections. The documentation for tuning AWS performance with Presto is very easy and straightforward, making it a small learning curve for any new user.
  • Data transformations – Athena’s robust Presto query language allows us to perform regex, quartile, and percentile statistics without resorting to an outside transformation step in Python or other languages. Going further, using window functions inside those same queries allows us to do some of the heavy mathematical lifting we would have needed to do outside of the backend process, thus saving cost and time. With Athena, these extra vital steps see no difference in cost or performance to our backend pipeline and allow us to condense complicated parts into one step.

Why we continue to grow with Athena

We continue to grow with Athena for the following reasons:

  • Future scale – Athena and its team keep improving and adding resources that support our ever-growing data needs, which have increased by 200% since Athena’s implementation. This has served as the bedrock to our backend solutions.
  • Improvements – The Sales and Engineering team at AWS has always been open to feedback and has turned that into better error reporting, work groups for Athena, changes in policy, and workload management through roles. This has allowed us to split Athena resources with workgroups from production-level work to running ad hoc jobs in future updates.
  • Cost is king – Every dollar we have saved through Athena has been put into products to make Athena better for us. Using Athena has allowed us to improve our front-end delivery products—from building our own workflows right into Athena, to taking time to work with the right compression for Athena ingestion, and even offloading more work that would have gone to a traditional ETL box. Cost for us is not just dollars but the time it takes to manage; that time saved is allowing us to be on the bleeding edge in development of new tools to deliver the data Athena helps us serve.

The following sections detail how EMX uses Athena to build, manage, and orchestrate its backend ELT work with minimal coding and maintenance.

Solution architecture

The following diagram shows the architecture EMX uses.

The following diagram shows the architecture EMX uses.

How we use Athena

Our custom scripts stream batched data each minute from auction servers directly to raw S3 buckets. The data is dropped in a .gzip format to datetime-partitioned S3 buckets. This partition structure helps us limit our Athena query scan. For example, the partitioned buckets look like the following screenshot.

For example, the partitioned buckets look like the following screenshot.

When the data has reached these partitioned buckets, EMX uses Apache Airflow to schedule various jobs across Athena. The following screenshot shows our DAG for our most-used pipeline.

The following screenshot shows our DAG for our most-used pipeline.

Before beginning to run Athena queries, we run two checks on our data in Amazon S3:

  • Check if all the expected data has arrived and is in the bucket.
  • Check logic match rules and clean illegal fields in the data.

On the success of both tasks, we start adding the latest partition to an Athena table.

When the partition is added to the table, we start running the query. The query status is polled every 10 seconds to get the latest status on the query performance until completion. The query returns the status as success, failed, or canceled. Depending on what query status is returned, further tasks are then forked.

At times, we have noticed queries fail with the error Query resource exhausted at this scale, which usually goes away on triggered retries. For the same reason, we have a retry mechanism in place on the execute_athena_sql task. If the retry fails, it alerts the team and data is copied over to a debug bucket for further investigation. If it succeeds, it moves ahead with further transformation.

For further transformation, we get the output of the Athena query back in Amazon S3, and then we add the business rules to enrich the data in Amazon S3.

Based on the pipeline logic, the data is then copied from Amazon S3 to different data stores, one of them being Amazon Redshift.

The last step is to clean up the metadata that was generated by the Athena query.

The following is an example from one of our pipelines. This Athena table is projected on top of the partitioned buckets, and the table is also partitioned by datetime so that the table can be read off the data directly when it’s ready. The following screenshot is what the sample table campaigns_stream looks like, which reads the data from the aforementioned bucket.

The following screenshot is what the sample table campaigns_stream looks like, which reads the data from the aforementioned bucket.

As soon as our scheduled jobs are triggered, the job runs data checks, data matches, and complex SQL queries on this table using the latest date partition, which was loaded in the last DAG task, which limits the data scan and reduces costs. The results are generated and pushed to the S3 reporting bucket to be picked up by other processes. The results can be generated in different formats like CSV, Apache Avro, and Apache Parquet using the CTAS or INSERT INTO command.

For example, running the following simple count query for each domain scans approximately 1.65 TB of data and gives back the results in less than 600 seconds, without needing us to set up or manage any infrastructure.

For example, running the following simple count query for each domain scans approximately.

When the query is complete and the output files in the S3 reporting bucket are ready, they’re picked up by our DAG and pushed into data storage like Amazon Redshift.

Optimization on Athena

By default, Athena has a soft limit of 20 DML active queries (CTAS). When we have multiple jobs running in parallel, we may hit that limit, delaying our time-sensitive pipelines and jobs. To overcome this, we allocated a fixed time window in each hour for our most critical pipelines, and other jobs with lower priority are run later.

For example, our production pipelines get priority 1 – with window minute 0 to minute 15 of every hour. We’re aware that we can request a limit increase from AWS, but we instead decided to use this opportunity to improve the resilience and robustness of our system.

Conclusion

“Build, don’t buy” has been EMX’s motto. It drives our innovation forward, much like Athena continues to be able to solve all the questions we ask of it. We build boutique and large-scale solutions for our advertising clients, which require a malleable and robust ETL backend that takes the work and cost to a manageable level. We built an ever-scaling, cost-effective, and highly available ETL backend with Athena.

Our successes with Athena are shown through both time and cost savings, including:

  • 30% of the time used on maintenance of a traditional ETL structure is now moved into Athena improvement, which sees improved feedback in reduced costs that we can pass on to our clients
  • Four times less cost per query than competitors has allowed us to put money into different tools for storage and modeling, giving even more entropy to driving more revenue for our clients and less cost
  • 10 times less technical debt in Athena setup, research, staging, and production, which goes back into other future-thinking projects

What we can do with data is only limited by the time we need in herding, cleaning, and delivering this data for insights and development. Since throwing 100% of our ETL backend systems into Athena, we have increased product delivery and systems optimization four-fold in only a quick short year. Athena and the Athena team continue to grow with us even as our data needs begin to soar exponentially, adding more tools that reduce workflows, management, and job distribution in the AWS ecosystem itself. This entropy between EMX and Athena has resulted in increased cooperation and more business with us and our growing lists of clients.

Our “Why” is building the tools for the future, and Athena personifies our “Why” in delivering what EMX is about: scale, on time, and delivery of data optimized for the modern era.


About the Authors

Gary Bouton is VP of Data Engineering at ENGINE Media Exchange and leads their Data Engineering and Data Science Product teams. Pipeline implementation is led by Director of Data Pipeline Rahul Gupta, Senior Engineer Nader S. Gharawi, Data Science Engineer Raghav Gupta. Data model implementation is led by Senior Data Scientist Gabrielle Agrocostea , and Data Scientist Heena Otia.

 

Louis Ashner is EVP of Technology at ENGINE Media Exchange. He has a passion for making the Internet faster, and is an ad-tech pioneer with more than 10 years of experience working with digital advertising, including real-time bidding and programmatic advertising. His 9 patents in networking optimization and data caching are used to power EMX’s proprietary ad exchange.

Data monetization and customer experience optimization using telco data assets: Part 1

Post Syndicated from Vikas Omer original https://aws.amazon.com/blogs/big-data/part-1-data-monetization-and-customer-experience-optimization-using-telco-data-assets/

The landscape of the telecommunications industry is changing rapidly. For telecom service providers (TSPs), revenue from core voice and data services continues to shrink due to regulatory pressure and emerging OTT players that offer an attractive alternative. Despite increasing demand from customers for bandwidth, speed, and efficiency, TSPs are finding that ROI from implementing new access technologies like 5G are unsubstantial.

To overcome the risk of being relegated to a utility or dumb pipe, TSPs today are looking to diversify, adopting alternative business models to generate new revenue streams.

In recent times, adopting customer experience (CX) and data monetization initiatives has been a key theme across all industries. Although many Tier-1 TSPs are leading this transformation by using new technologies to improve CX and improve profitability, many TSPs have yet to embark on this challenging but rewarding journey.

Building and implementing a CX management and data monetization strategy

Data monetization is often misunderstood as making dollars by selling data, but what it really means is to drive revenue by increasing the top line or the bottom line. It can be tangible or intangible, internal or external, or by making use of data assets.

According to Gartner, most data and analytics leaders are looking to increase investments in business intelligence (BI) and analytics (see the following study results).

The preceding visualization is from “The 2019 CIO Agenda: Securing a New Foundation for Digital Business”, published October 15, 2018.

Although the external monetization opportunities are limited due to strict regulations, a plethora of opportunities exist for TSPs to monetize data both internally (regulated but much less compared to external) and externally via a marketplace (highly regulated). If TSPs can shift their mindsets from selling data to focus on using data insights for monetization and improving CX, they can adopt a significant number of use cases to realize an immediate positive impact.

Tapping and utilizing insights around customer behavior acts like a Swiss Army Knife for businesses. You can use these insights to drive CX, hyper-personalization and localization, micro-segmentation, subscriber retention, loyalty and rewards programs, network planning and optimization, internal and external data monetization, and more. The following are some use cases that can be driven using CX and data monetization strategies:

  • Segmentation/micro-segmentation (cross-sell, up-sell, targeted advertising, enhanced market locator); for example:
    • Identify targets for consuming baby products or up-selling a kids-related TV channel
    • Identify females in the age range of 18-35 to target for high-end beauty products or apparels

You can build hundreds of such segments.

  • Personalized loyalty and reward programs (incentivize customers with what they like). For example, movie tickets or discounts for a movie lover, or food coupons and deals for a food lover.
  • CX-driven network optimization (allocate more resources to streaming hotspots with high-value customers).
  • Identifying potential partners for joint promotions. For example, bundling device offers with a music app subscription.
  • Hyper-personalization. For example, personalized recommendations for on-portal apps and websites.
  • Next best action and next best offer. For example, intelligent bundling and packaging of offerings.

Challenges with driving CX and data monetization

In this digital era, TSPs consider data analytics a strategic pillar in their quest to evolve into a true data-driven organization. Although many TSPs are harnessing the power of data to drive and improve CX, there are technological gaps and challenges to baseline and formulate internal and external data monetization strategies. Some of these challenges include:

  • Non-overlapping technology investments for CX and data monetization due to misaligned business and IT initiatives
  • Huge CAPEX requirements to process massive volumes of data
  • Inability to unearth hidden insights due to siloed data initiatives
  • Inability to marry various datasets together due to missing pieces around data standardization techniques
  • Lack of user-friendly tools and techniques to discover, ingest, process, correlate, analyze, and consume the data
  • Inability to experiment and innovate with agility and low cost

In this two-part series, I demonstrate a working solution with an AWS CloudFormation template for how a TSP can use existing data assets to generate new revenue streams and improve and personalize CX using AWS services. I also include key pieces of information around data standardization, baselining an analytics data model to marry different datasets in the data warehouse, self-service analytics, metadata search, and media dictionary framework.

In this post, you deploy the stack using a CloudFormation template and follow simple steps to transform, enrich, and bring multiple datasets together so that they can be correlated and queried.

In part 2, you learn how advanced business users can query enriched data and derive meaningful insights using Amazon Redshift and Amazon Redshift Spectrum or Amazon Athena, enable self-service analytics for business users, and publish ready-made dashboards via Amazon QuickSight.

Solution overview

The main ingredient of this solution is Packet Switch (PS) probe data embedded with a deep packet inspection (DPI) engine, which can reveal a lot of information about user interests and usage behavior. This data is transformed and enriched with DPI media and device dictionaries, along with other standard telco transformations to deduce insights, profile and micro-segment subscribers. Enriched data is made available along with other transformed dimensional attributes (CRM, subscriptions, media, carrier, device and network configuration management) for rich slicing and dicing.

For example, the following QuickSight visualizations depict a use case to identity music lovers ages 18-55 with Apple devices. You can also generate micro-segments by capturing the top X subscribers by consumption or adding KPIs like recency and frequency.

The following diagram illustrates the workflow of the solution.

For this post, AWS CloudFormation sets up the required folder structure in Amazon Simple Storage Service (Amazon S3) and provides sample data and dictionary file. Most of the data included as part of the CloudFormation template is dummy and is as follows:

  • CRM
  • Subscription and subscription mapping
  • Network 3G & 4G configuration management
  • Operator PLMN
  • DPI and device dictionary
  • PS probe data

Descriptions of all the input datasets and attributes are available with AWS Glue Data Catalog tables and as part of Amazon Redshift metadata for all tables in Amazon Redshift.

The workflow for this post includes the following steps:

  1. Catalog all the files in the AWS Glue Data Catalog using the following AWS Glue data crawlers:
    1. DPI data crawler (to crawl incoming PS probe DPI data)
    2. Dimension data crawler (to crawl all dimension data)
  2. Update attribute descriptions in the Data Catalog (this step is optional).
  3. Create Amazon Redshift schema, tables, procedures, and metadata using an AWS Lambda
  4. Process each data source file using separate AWS Glue Spark jobs. These jobs enrich, transform, and apply business filtering rules before ingesting data into an Amazon Redshift cluster.
  5. Trigger Amazon Redshift hourly and daily aggregation procedures using Lambda functions to aggregate data from the raw table into hourly and daily tables.

Part 2 includes the following steps:

  1. Catalog the processed raw, aggregate, and dimension data in the Data Catalog using the DPI processed data crawler.
  2. Interactively query data directly from Amazon S3 using Amazon Athena.
  3. Enable self-service analytics using QuickSight to prepare and publish insights based on data residing in the Amazon Redshift cluster.

The workflow can change depending on the complexity of the environment and your use case, but the fundamental idea remains the same. For example, your use case could be processing PS probe DPI data in real time rather than in batch mode, keeping hot data in Amazon Redshift, storing cold and historical data on Amazon S3, or archiving data in Amazon S3 Glacier for regulatory compliance. Amazon S3 offers several storage classes designed for different use cases. You can move the data among these different classes based on Amazon S3 lifecycle properties. For more information, see Amazon S3 Storage Classes.

Prerequisites

For this walkthrough, you should have the following prerequisites:

For more information about AWS Regions and where AWS services are available, see Region Table.

Creating your resources with AWS CloudFormation

To get started, create your resources with the following CloudFormation stack.

  1. Click the Launch Stack button below:
  2. Leave the parameters at their default, with the following exceptions:
    1. Enter RedshiftPassword and S3BucketNameParameter parameters, which aren’t populated by default.
    2. An Amazon S3 bucket name is globally unique, so enter a unique bucket name for S3BucketNameParameter.

The following screenshot shows the parameters for our use case.

  1. Choose Next.
  2. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create stack.

It takes approximately 10 minutes to deploy the stack. For more information about the key resources deployed through the stack, see Data Monetization and Customer Experience(CX)Optimization using telco data assets: Amazon CloudFormation stack details. You can view all the resources on the AWS CloudFormation console. For instructions, see Viewing AWS CloudFormation stack data and resources on the AWS Management Console.

The CloudFormation stack we provide in this post serves as a baseline and is not a production-grade solution.

Building a Data Catalog using AWS Glue

You start by discovering sample data stored on Amazon S3 through an AWS Glue crawler. For more information, see Populating the AWS Glue Data Catalog. To catalog data, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, choose Crawlers.
  2. Select DPIRawDataCrawler and choose Run crawler.
  3. Select DimensionDataCrawler and choose Run crawler.
  4. Wait for the crawlers to show the status Stopping.

The tables added against the DimensionDataCrawler and DPIRawDataCrawler crawlers should show 9 and 1, respectively.

  1. In the navigation pane, choose Tables.
  2. Verify the following 10 tables are created under the cemdm database:
    • d_crm_demographics
    • d_device
    • d_dpi_dictionary
    • d_network_cm_3g
    • d_network_cm_4g
    • d_operator_plmn
    • d_tac
    • d_tariff_plan
    • d_tariff_plan_desc
    • raw_dpi_incoming

Updating attribute descriptions in the Data Catalog

The AWS Glue Data Catalog has a comment field to store the metadata under each table in the AWS Glue database. Anybody who has access to this database can easily understand attributes coming from different data sources through metadata provided in the comment field. The CloudFormation stack includes a CSV file that contains a description of all the attributes from the source files. This file is used to update the comment field for all the Data Catalog tables this stack deployed. This step is not mandatory to proceed with the workflow. However, if you want to update the comment field against each table, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose the GlueCatalogUpdate
  3. Configure a test event by choosing Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which implies that the Data Catalog attribute description is complete.

Attributes of the table under the Data Catalog database should now have descriptions in the Comment column. For example, the following screenshot shows the d_operator_plmn table.

Creating Amazon Redshift schema, tables, procedures, and metadata

To create schema, tables, procedures, and metadata in Amazon Redshift, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose the RedshiftDDLCreation
  3. Choose Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which means that the schema, table, procedures, and metadata generation is complete.

Running AWS Glue ETL jobs

AWS Glue provides the serverless, scalable, and distributed processing capability to transform and enrich your datasets. To run AWS Glue extract, transform, and load (ETL) jobs, complete the following steps:

  1. On the AWS Glue console, in the navigation pane, choose Jobs.
  2. Select the following jobs (one at a time) and choose Run job from Action
    • d_customer_demographics
    • d_device
    • d_dpi_dictionary
    • d_location
    • d_operator_plmn
    • d_tac
    • d_tariff_plan
    • d_tariff_plan_desc
    • f_dpi_enrichment

You can run all these jobs in parallel.

All dimension data jobs should finish successfully within 3 minutes, and the fact data enrichment job should finish within 5 minutes.

  1. Verify the jobs are complete by selecting each job and checking Run status on the History tab.

Aggregating hourly and daily DPI data in Amazon Redshift

To aggregate hourly and daily sample data in Amazon Redshift using Lambda functions, complete the following steps:

  1. On the Lambda console, in the navigation pane, choose Functions.
  2. Choose the RedshiftDPIHourlyAgg function.
  3. Choose Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which means that hourly aggregation is complete.

  1. In the navigation pane, choose Functions.
  2. Choose the RedshiftDPIDailyAgg function.
  3. Choose Configure test events.
  4. For Event name, enter Test.
  5. Choose Create.
  6. Choose Test.

You should see a message that the test succeeded, which means that daily aggregation is complete.

Both hourly and daily Lambda functions are hardcoded with the date and hour to aggregate the sample data. To make them generic, there are a few commented lines of code that need to be uncommented and a few lines to be commented. Both functions are also equipped with offset parameters to decide how far back in time you want to do the aggregations. However, this isn’t required for this walkthrough.

You can schedule these functions with CloudWatch. However, this is not required for this walkthrough.

So far, we have completed the following:

  1. Deployed the CloudFormation stack.
  2. Cataloged sample raw data by running DimensionDataCrawler and DPIRawDataCrawler AWS Glue crawlers.
  3. Updated attribute descriptions in the AWS Glue Data Catalog by running the GlueCatalogUpdate Lambda function.
  4. Created Amazon Redshift schema, tables, stored procedures, and metadata through the RedshiftDDLCreation Lambda function.
  5. Ran all AWS Glue ETL jobs to transform raw data and load it into their respective Amazon Redshift tables.
  6. Aggregated hourly and daily data from enriched raw data into hourly and daily Amazon Redshift tables by running the RedshiftDPIHourlyAgg and RedshiftDPIDailyAgg Lambda functions.

Cleaning up

If you don’t plan to proceed to the part 2 of this series, and want to avoid incurring future charges, delete the resources you created by deleting the CloudFormation stack.

Conclusion

In this post, I demonstrated how you can easily transform, enrich, and bring multiple telco datasets together in an Amazon Redshift data warehouse cluster. You can correlate these datasets to produce multi-dimensional insights from several angles, like subscriber, network, device, subscription, roaming, and more.

In part 2 of this series, I demonstrate how you can enable data analysts, scientists, and advanced business users to query data from Amazon Redshift or Amazon S3 directly.

As always, AWS welcomes feedback. This is a wide space to explore, so reach out to us if you want a deep dive into building this solution and more on AWS. Please submit comments or questions in the comments section.


About the Author

Vikas Omer is an analytics specialist solutions architect at Amazon Web Services. Vikas has a strong background in analytics, customer experience management (CEM), and data monetization, with over 11 years of experience in the telecommunications industry globally. With six AWS Certifications, including Analytics Specialty, he is a trusted analytics advocate to AWS customers and partners. He loves traveling, meeting customers, and helping them become successful in what they do.

Validate, evolve, and control schemas in Amazon MSK and Amazon Kinesis Data Streams with AWS Glue Schema Registry

Post Syndicated from Brian Likosar original https://aws.amazon.com/blogs/big-data/validate-evolve-and-control-schemas-in-amazon-msk-and-amazon-kinesis-data-streams-with-aws-glue-schema-registry/

Data streaming technologies like Apache Kafka and Amazon Kinesis Data Streams capture and distribute data generated by thousands or millions of applications, websites, or machines. These technologies serve as a highly available transport layer that decouples the data-producing applications from data processors. However, the sheer number of applications producing, processing, routing, and consuming data can make it hard to coordinate and evolve data schemas, like adding or removing a data field, without introducing data quality issues and downstream application failures. Developers often build complex tools, write custom code, or rely on documentation, change management, and Wikis to protect against schema changes. This is quite error prone because it relies too heavily on human oversight. A common solution with data streaming technologies is a schema registry that provides for validation of schema changes to allow for safe evolution as business needs adjust over time.

AWS Glue Schema Registry, a serverless feature of AWS Glue, enables you to validate and reliably evolve streaming data against Apache Avro schemas at no additional charge. Through Apache-licensed serializers and deserializers, the Glue Schema Registry integrates with Java applications developed for Apache Kafka, Amazon Managed Streaming for Apache Kafka (Amazon MSK), Kinesis Data Streams, Apache Flink, Amazon Kinesis Data Analytics for Apache Flink, and AWS Lambda.

This post explains the benefits of using the Glue Schema Registry and provides examples of how to use it with both Apache Kafka and Kinesis Data Streams.

With the Glue Schema Registry, you can eliminate defensive coding and cross-team coordination, improve data quality, reduce downstream application failures, and use a registry that is integrated across multiple AWS services. Each schema can be versioned within the guardrails of a compatibility mode, providing developers the flexibility to reliably evolve schemas. Additionally, the Glue Schema Registry can serialize data into a compressed format, helping you save on data transfer and storage costs.

Although there are many ways to leverage the Glue Schema Registry (including using the API to build your own integrations), in this post, we show two use cases. The Schema Registry is a free feature that can significantly improve data quality and developer productivity. If you use Avro schemas, you should be using the Schema Registry to supplement your solutions built on Apache Kafka (including Amazon MSK) or Kinesis Data Streams. The following diagram illustrates this architecture.

AWS Glue Schema Registry features

Glue Schema Registry has the following features:

  • Schema discovery – When a producer registers a schema change, metadata can be applied as a key-value pair to provide searchable information for administrators or developers. This metadata can indicate the original source of the data (source=MSK_west), the team name to contact (owner=DataEngineering), or AWS tags (environment=Production). You could potentially encrypt a field in your data on the producing client and use metadata to specify to potential consumer clients which public key fingerprint to use for decryption.
  • Schema compatibility – The versioning of each schema is governed by a compatibility mode. If a new version of a schema is requested to be registered that breaks the specified compatibility mode, the request fails and an exception is thrown. Compatibility checks enable developers building downstream applications to have a bounded set of scenarios to build applications against, which helps to prepare for the changes without issue. Commonly used modes are FORWARD, BACKWARD, and FULL. For more information about mode definitions, see Schema Versioning and Compatibility.
  • Schema validation – Glue Schema Registry serializers work to validate that the schema used during data production is compatible. If it isn’t, the data producer receives an exception from the serializer. This ensures that potentially breaking changes are found earlier in development cycles, and can also help prevent unintentional schema changes due to human error.
  • Auto-registration of schemas – If configured to do so, the producer of data can auto-register schema changes as they flow in the data stream. This is especially useful for use cases where the source of the data is change data capture from a database.
  • IAM support – Thanks to integrated AWS Identity and Access Management (IAM) support, only authorized producers can change certain schemas. Furthermore, only those consumers authorized to read the schema can do so. Schema changes are typically performed deliberately and with care, so it’s important to use IAM to control who performs these changes. Additionally, access control to schemas is important in situations where you might have sensitive information included in the schema definition itself. In the examples that follow, IAM roles are inferred via the AWS SDK for Java, so they are inherited from the Amazon Elastic Compute Cloud (Amazon EC2) instance’s role that the application runs in. IAM roles can also be applied to any other AWS service that could contain this code, such as containers or Lambda functions.
  • Integrations and other support – The provided serializers and deserializers are currently for Java clients using Apache Avro for data serialization. The GitHub repo also contains support for Apache Kafka Streams, Apache Kafka Connect, and Apache Flink—all licensed using the Apache License 2.0. We’re already working on additional language and data serialization support, but we need your feedback on what you’d like to see next.
  • Secondary deserializer – If you have already registered schemas in another schema registry, there’s an option for specifying a secondary deserializer when performing schema lookups. This allows for migrations from other schema registries without having to start anew. If the schema ID being used isn’t known to the Glue Schema Registry, it’s looked for in the secondary deserializer.
  • Compression – Using the Avro format already reduces message size due to its compact, binary format. Using a schema registry can further reduce data payload by no longer needing to send and receive schemas with each message. Glue Schema Registry libraries also provide an option for zlib compression, which can reduce data requirements even further by compressing the payload of the message. This varies by use case, but compression can reduce the size of the message significantly.

Example schema

For this post, we use the following schema to begin each of our use cases:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"}
 ]
}

Using AWS Glue Schema Registry with Amazon MSK and Apache Kafka

You can use the following Apache Kafka producer code to produce Apache Avro formatted messages to a topic with the preceding schema:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer;
import com.amazonaws.services.schemaregistry.serializers.avro.AWSAvroSerializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Parser;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;

public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, AWSKafkaAvroSerializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "customer");
properties.put(AWSSchemaRegistryConstants.COMPATIBILITY_SETTING, Compatibility.FULL);
properties.put(AWSSchemaRegistryConstants.SCHEMA_AUTO_REGISTRATION_SETTING, true);
Schema schema_customer = new Parser().parse(new File("Customer.avsc"));
GenericRecord customer = new GenericData.Record(schema_customer);

try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(properties)) {
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>(topic, customer);
customer.put("first_name", "Ada");
customer.put("last_name", "Lovelace");
customer.put("full_name", "Ada Lovelace");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Sue");
customer.put("last_name", "Black");
customer.put("full_name", "Sue Black");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Anita");
customer.put("last_name", "Borg");
customer.put("full_name", "Anita Borg");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Grace");
customer.put("last_name", "Hopper");
customer.put("full_name", "Grace Hopper");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);

customer.put("first_name", "Neha");
customer.put("last_name", "Narkhede");
customer.put("full_name", "Neha Narkhede");
producer.send(record);
System.out.println("Sent message");
Thread.sleep(1000L);
producer.flush();
System.out.println("Successfully produced 5 messages to a topic called " + topic);
} catch (final InterruptedException | SerializationException e) {
e.printStackTrace();
}
}
}

Use the following Apache Kafka consumer code to look up the schema information while consuming from a topic to learn the schema details:

package com.amazon.gsrkafka;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSKafkaAvroDeserializer;
import com.amazonaws.services.schemaregistry.deserializers.avro.AWSAvroDeserializer;
import com.amazonaws.services.schemaregistry.utils.AvroRecordType;
import com.amazonaws.services.schemaregistry.utils.AWSSchemaRegistryConstants;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import java.util.Collections;
import java.util.Properties;
import java.io.IOException;
import java.io.FileInputStream;
import java.io.InputStream;
import java.io.File;


public class gsrkafka {
private static final Properties properties = new Properties();
private static final String topic = "test";
public static void main(final String[] args) throws IOException {
// Set the default synchronous HTTP client to UrlConnectionHttpClient
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "gsr-client");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, AWSKafkaAvroDeserializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "us-east-2");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "liko-schema-registry");
properties.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());

try (final KafkaConsumer<String, GenericRecord> consumer = new KafkaConsumer<String, GenericRecord>(properties)) {
consumer.subscribe(Collections.singletonList(topic));
while (true) {
final ConsumerRecords<String, GenericRecord> records = consumer.poll(1000);
for (final ConsumerRecord<String, GenericRecord> record : records) {
final GenericRecord value = record.value();
System.out.println("Received message: value = " + value);
}
			}
} catch (final SerializationException e) {
e.printStackTrace();
}
}
}

Using AWS Glue Schema Registry with Kinesis Data Streams

You can use the following Kinesis Producer Library (KPL) code to publish messages in Apache Avro format to a Kinesis data stream with the preceding schema:

private static final String SCHEMA_DEFINITION = "{"namespace": "Customer.avro",\n"
+ " "type": "record",\n"
+ " "name": "Customer",\n"
+ " "fields": [\n"
+ " {"name": "first_name", "type": "string"},\n"
+ " {"name": "last_name", "type": "string"}\n"
+ " ]\n"
+ "}";

KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion("us-west-1")

//[Optional] configuration for Schema Registry.

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration("us-west-1");

schemaRegistryConfig.setCompression(true);

config.setGlueSchemaRegistryConfiguration(schemaRegistryConfig);

///Optional configuration ends.

final KinesisProducer producer = 
new KinesisProducer(config);

final ByteBuffer data = getDataToSend();

com.amazonaws.services.schemaregistry.common.Schema gsrSchema = 
new Schema(SCHEMA_DEFINITION, DataFormat.AVRO.toString(), "demoSchema");

ListenableFuture<UserRecordResult> f = producer.addUserRecord(
config.getStreamName(), TIMESTAMP, Utils.randomExplicitHashKey(), data, gsrSchema);

private static ByteBuffer getDataToSend() {
org.apache.avro.Schema avroSchema = 
new org.apache.avro.Schema.Parser().parse(SCHEMA_DEFINITION);

GenericRecord user = new GenericData.Record(avroSchema);
user.put("name", "Emily");
user.put("favorite_number", 32);
user.put("favorite_color", "green");

ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
Encoder encoder = EncoderFactory.get().directBinaryEncoder(outBytes, null);
new GenericDatumWriter<>(avroSchema).write(user, encoder);
encoder.flush();
return ByteBuffer.wrap(outBytes.toByteArray());
}

On the consumer side, you can use the Kinesis Client Library (KCL) (v2.3 or later) to look up schema information while retrieving messages from a Kinesis data stream:

GlueSchemaRegistryConfiguration schemaRegistryConfig = 
new GlueSchemaRegistryConfiguration(this.region.toString());

 GlueSchemaRegistryDeserializer glueSchemaRegistryDeserializer = 
new GlueSchemaRegistryDeserializerImpl(DefaultCredentialsProvider.builder().build(), schemaRegistryConfig);

 RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(new PollingConfig(streamName, kinesisClient));
 retrievalConfig.glueSchemaRegistryDeserializer(glueSchemaRegistryDeserializer);
 
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig(),
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
retrievalConfig
);

 public void processRecords(ProcessRecordsInput processRecordsInput) {
MDC.put(SHARD_ID_MDC_KEY, shardId);
try {
log.info("Processing {} record(s)", 
processRecordsInput.records().size());
processRecordsInput.records()
.forEach(
r -> 
log.info("Processed record pk: {} -- Seq: {} : data {} with schema: {}", 
r.partitionKey(), r.sequenceNumber(), recordToAvroObj(r).toString(), r.getSchema()));
} catch (Throwable t) {
log.error("Caught throwable while processing records. Aborting.");
Runtime.getRuntime().halt(1);
} finally {
MDC.remove(SHARD_ID_MDC_KEY);
}
 }
 
 private GenericRecord recordToAvroObj(KinesisClientRecord r) {
byte[] data = new byte[r.data().remaining()];
r.data().get(data, 0, data.length);
org.apache.avro.Schema schema = new org.apache.avro.Schema.Parser().parse(r.schema().getSchemaDefinition());
DatumReader datumReader = new GenericDatumReader<>(schema);

BinaryDecoder binaryDecoder = DecoderFactory.get().binaryDecoder(data, 0, data.length, null);
return (GenericRecord) datumReader.read(null, binaryDecoder);
 }

Example of schema evolution

As a producer, let’s say you want to add an additional field to our schema:

{
 "namespace": "Customer.avro",
 "type": "record",
 "name": "Customer",
 "fields": [
 {"name": "first_name", "type": "string"},
 {"name": "last_name", "type": "string"},
 {"name": "full_name", "type": ["string", “null”], “default”: null}
]
}

Regardless of whether you’re following the Apache Kafka or Kinesis Data Streams example, you can use the previously provided producer code to publish new messages using this new schema version with the full_name field. This is simply a concatenation of first_name and last_name.

This schema change added an optional field (full_name), which is indicated by the type field having an option of null in addition to string with a default of null. In adding this optional field, we’ve created a schema evolution. This qualifies as a FORWARD compatible change because the producer has modified the schema and the consumer can read without updating its version of the schema. It’s a good practice to provide a default for a given field. This allows for its eventual removal if necessary. If it’s removed by the producer, the consumer uses the default that it knew for that field from before the removal.

This change is also a BACKWARD compatible change, because if the consumer changes the schema it expects to receive, it can use that default to fill in the value for the field it isn’t receiving. By being both FORWARD and BACKWARD compatible, it is therefore a FULL compatible change. The Glue Schema Registry serializers default to BACKWARD compatible, so we have to add a line declaring it as FULL.

In looking at the full option set, you may find FORWARD_ALL, BACKWARD_ALL, and FULL_ALL. These typically only come into play when you want to change data types for a field whose name you don’t change. The most common observed compatibility mode is BACKWARD, which is why it’s the default.

As a consumer application, however, you don’t want to have to recompile your application to handle the addition of a new field. If you want to reference the customer by full name, that’s your choice in your app instead of being forced to consume the new field and use it. When you consume the new messages you’ve just produced, your application doesn’t crash or have problems, because it’s still using the prior version of the schema, and that schema change is compatible with your application. To experience this in action, run the consumer code in one window and don’t interrupt it. As you run the producer application again, this time with messages following the new schema, you can still see output without issue, thanks to the Glue Schema Registry.

Conclusion

In this post, we discussed the benefits of using the Glue Schema Registry to register, validate, and evolve schemas for data streams as business needs change. We also provided examples of how to use Glue Schema Registry with Apache Kafka and Kinesis Data Streams.

For more information and to get started, see AWS Glue Schema Registry.


About the Authors

Brian Likosar is a Senior Streaming Specialist Solutions Architect at Amazon Web Services. Brian loves helping customers capture value from real-time streaming architectures, because he knows life doesn’t happen in batch. He’s a big fan of open-source collaboration, theme parks, and live music.

 

 

Larry Heathcote is a Senior Product Marketing Manager at Amazon Web Services for data streaming and analytics. Larry is passionate about seeing the results of data-driven insights on business outcomes. He enjoys walking his Samoyed Sasha in the mornings so she can look for squirrels to bark at.

 

 

The best new features for data analysts in Amazon Redshift in 2020

Post Syndicated from Helen Anderson original https://aws.amazon.com/blogs/big-data/the-best-new-features-for-data-analysts-in-amazon-redshift-in-2020/

This is a guest post by Helen Anderson, data analyst and AWS Data Hero

Every year, the Amazon Redshift team launches new and exciting features, and 2020 was no exception. New features to improve the data warehouse service and add interoperability with other AWS services were rolling out all year.

I am part of a team that for the past 3 years has used Amazon Redshift to store source tables from systems around the organization and usage data from our software as a service (SaaS) product. Amazon Redshift is our one source of truth. We use it to prepare operational reports that support the business and for ad hoc queries when numbers are needed quickly.

When AWS re:Invent comes around, I look forward to the new features, enhancements, and functionality that make things easier for analysts. If you haven’t tried Amazon Redshift in a while, or even if you’re a longtime user, these new capabilities are designed with analysts in mind to make it easier to analyze data at scale.

Amazon Redshift ML

The newly launched preview of Amazon Redshift ML lets data analysts use Amazon SageMaker over datasets in Amazon Redshift to solve business problems without the need for a data scientist to create custom models.

As a data analyst myself, this is one of the most interesting announcements to come out in re:Invent 2020. Analysts generally use SQL to query data and present insights, but they don’t often do data science too. Now there is no need to wait for a data scientist or learn a new language to create predictive models.

For information about what you need to get started with Amazon Redshift ML, see Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

For information about what you need to get started with Amazon Redshift ML, see the Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML blog post.

Federated queries

As analysts, we often have to join datasets that aren’t in the same format and sometimes aren’t ready for use in the same place. By using federated queries to access data in other databases or Amazon Simple Storage Service (Amazon S3), you don’t need to wait for a data engineer or ETL process to move data around.

re:Invent 2019 featured some interesting talks from Amazon Redshift customers who were tackling this problem. Now that federated queries over operational databases like Amazon RDS for PostgreSQL and Amazon Aurora PostgreSQL are generally available and querying Amazon RDS for MySQL and Amazon Aurora MySQL is in preview, I’m excited to hear more.

For a step-by-step example to help you get started, see Build a Simplified ETL and Live Data Query Solution Using Redshift Federated Query.

SUPER data type

Another problem we face as analysts is that the data we need isn’t always in rows and columns. The new SUPER data type makes JSON data easy to use natively in Amazon Redshift with PartiQL.

PartiQL is an extension that helps analysts get up and running quickly with structured and semistructured data so you can unnest and query using JOINs and aggregates. This is really exciting for those who deal with data coming from applications that store data in JSON or unstructured formats.

For use cases and a quickstart, see Ingesting and querying semistructured data in Amazon Redshift (preview).

Partner console integration

The preview of the native console integration with partners announced at AWS re:Invent 2020 will also make data analysis quicker and easier. Although analysts might not be doing the ETL work themselves, this new release makes it easier to move data from platforms like Salesforce, Google Analytics, and Facebook Ads into Amazon Redshift.

Matillion, Sisense, Segment, Etleap, and Fivetran are launch partners, with other partners coming soon. If you’re an Amazon Redshift partner and would like to integrate into the console, contact [email protected].

RA3 nodes with managed storage

Previously, when you added Amazon Redshift nodes to a cluster, both storage and compute were scaled up. This all changed with the 2019 announcement of RA3 nodes, which upgrade storage and compute independently.

In 2020, the Amazon Redshift team introduced RA3.xlplus nodes, which offer even more compute sizing options to address a broader set of workload requirements.

AQUA for Amazon Redshift

As analysts, we want our queries to run quickly so we can spend more time empowering the users of our insights and less time watching data slowly return. AQUA, the Advanced Query Accelerator for Amazon Redshift tackles this problem at an infrastructure level by bringing the stored data closer to the compute power

This hardware-accelerated cache enables Amazon Redshift to run up to 10 times faster as it scales out and processes data in parallel across many nodes. Each node accelerates compression, encryption, and data processing tasks like scans, aggregates, and filtering. Analysts should still try their best to write efficient code, but the power of AQUA will speed up the return of results considerably.

AQUA is available on Amazon Redshift RA3 instances at no additional cost. To get started with AQUA, sign up for the preview.

The following diagram shows Amazon Redshift architecture with an AQUA layer.

AQUA is available on Amazon Redshift RA3 instances at no additional cost.

Figure 1: Amazon Redshift architecture with AQUA layer

Automated performance tuning

For analysts who haven’t used sort and distribution keys, the learning curve can be steep. A table created with the wrong keys can mean results take much longer to return.

Automatic table optimization tackles this problem by using machine learning to select the best keys and tune the physical design of tables. Letting Amazon Redshift determine how to improve cluster performance reduces manual effort.

Summary

These are just some of the Amazon Redshift announcements made in 2020 to help analysts get query results faster. Some of these features help you get access to the data you need, whether it’s in Amazon Redshift or somewhere else. Others are under-the-hood enhancements that make things run smoothly with less manual effort.

For more information about these announcements and a complete list of new features, see What’s New in Amazon Redshift.


About the Author

Helen Anderson is a Data Analyst based in Wellington, New Zealand. She is well known in the data community for writing beginner-friendly blog posts, teaching, and mentoring those who are new to tech. As a woman in tech and a career switcher, Helen is particularly interested in inspiring those who are underrepresented in the industry.

How FanDuel Group secures personally identifiable information in a data lake using AWS Lake Formation

Post Syndicated from Damian Grech original https://aws.amazon.com/blogs/big-data/how-fanduel-group-secures-personally-identifiable-information-in-a-data-lake-using-aws-lake-formation/

This post is co-written with Damian Grech from FanDuel

FanDuel Group is an innovative sports-tech entertainment company that is changing the way consumers engage with their favorite sports, teams, and leagues. The premier gaming destination in the US, FanDuel Group consists of a portfolio of leading brands across gaming, sports betting, daily fantasy sports, advance-deposit wagering, and TV/media, including FanDuel, Betfair US, and TVG. FanDuel Group has a presence across 50 states and over 8.5 million customers. The company is based in New York with offices in California, New Jersey, Florida, Oregon, and Scotland. FanDuel Group is a subsidiary of Flutter Entertainment plc, the world’s largest sports betting and gaming operator with a portfolio of globally recognized brands and a constituent of the FTSE 100 index of the London Stock Exchange.

In this post, we discuss how FanDuel used AWS Lake Formation and Amazon Redshift Spectrum to restrict access to personally identifiable information (PII) in their data lake.

The challenge

In 2018, a series of mergers led to the creation of FanDuel Group, and the combined data engineering team found themselves operating three data warehouses running on Amazon Redshift. The team decided to create a new single platform to replace the three separate warehouses, consisting of a data warehouse containing the core business data model and a data lake to catalog and hold all other types of data. FanDuel’s vision was to create an unified data platform that served their data requirements. This included the ability to ingest and organize real-time and batch datasets, and secure and govern PII.

Because the end-users of the existing data warehouses were familiar with Amazon Redshift, it was critical that they be able to access the data lake using Amazon Redshift. Other important architecture considerations included a simplified user experience, the ability to scale to huge data volumes, and a robust security model to provision relevant data to analysts and data scientists.

To accomplish the vision, FanDuel decided to modernize the data platform and introduce Amazon Simple Storage Service (Amazon S3)-based data lakes. Data lakes are a logical construct that allows data to be stored in its native format using open data formats. With a data lake architecture, FanDuel can enable data analysts to analyze large volume of data without significant modeling. Also, data lakes allow FanDuel to store structured and unstructured data.

Some of the data to be stored in the data lake was customer PII, so access to this category of data needed to be carefully restricted to only employees who required access to perform their job functions. To address these security challenges, FanDuel first tested out a tag-based approach on Amazon S3 to restrict access to the PII data. The idea was to write two datasets for a single dataset—one with PII and another without PII—and apply tags for files where PII is stored, securing files using AWS Identity and Access Management (IAM) policies. This approach was complex and needed 100–200 hours of development time for every data source that was ingested.

Solution overview

FanDuel decided to use Lake Formation and Redshift Spectrum to solve this challenge. The following architectural diagram shows how FanDuel secured their data lake.

The solution includes the following steps:

  1. The FanDuel team registered the S3 location in Lake Formation.

After the location is registered, Lake Formation takes control of the data lake, thereby eliminating the need to set up complicated policies in IAM.

  1. FanDuel built AWS Glue ETL jobs to extract data from sources, including MySQL databases and flat files. They used AWS Glue to cleanse and transform raw data to form refined datasets stored in Parquet-formatted files. They also used AWS Glue crawlers to register the cleansed datasets in the Data Catalog.
  2. The team used Lake Formation to set up column-based permissions using two roles:
    1. LimitedPIIAnalyst – Granted access to all columns. Only analysts who needed access to PII data were assigned this role.
    2. NonPIIAnalyst – Granted access to non-PII columns. By default, analysts using the data lake were assigned this role.
  3. FanDuel created two external schemas using Redshift Spectrum: one using the NonPIIAnalyst role, and one using the LimitedPIIAnalyst The following code is an example of the DDL that uses the role that was set up in Lake Formation:
    CREATE EXTERNAL SCHEMA nonpii_data_lake FROM DATA CATALOG
    DATABASE 'fanduel_data_lake' REGION 'us-east-1'
    IAM_ROLE 'arn:aws:iam::123456789012:role/NonPIIAnalyst';
    
    CREATE EXTERNAL SCHEMA limitedpii_data_lake FROM DATA CATALOG
    DATABASE 'fanduel_data_lake' REGION 'us-east-1'
    IAM_ROLE 'arn:aws:iam::123456789012:role/LimitedPIIAnalyst';
    

FanDuel could already manage access permissions by adding or removing users from a group in Amazon Redshift, so they already had a group consisting of only the analysts who should be permitted access to PII. The following code grants this group access to the limitedpii_data_lake schema, which effectively means only this group can query the data lake using the LimitedPIIAnalyst role:

GRANT USAGE ON SCHEMA nonpii_data_lake TO base_group;
GRANT SELECT ON ALL TABLES IN SCHEMA nonpii_data_lake TO base_group;
GRANT USAGE ON SCHEMA limitedpii_data_lake TO pii_permitted_group;
GRANT SELECT ON ALL TABLES IN SCHEMA limitedpii_data_lake TO pii_permitted_group;

Benefits

The ability to extend queries to the data lake with Redshift Spectrum and have column-level access control provides superior control over the S3 tag-based permissions approach that was originally considered. This architecture provided the following benefits for FanDuel:

  • FanDuel could offer new capabilities to data analysts. For example, data analysts could quickly access raw data with PII and combine it with existing data in Amazon Redshift. Lake Formation provided a single view for monitoring the data access patterns.
  • Lake Formation column-level access control allowed them to secure PII data, which otherwise would have taken a complex S3 tag-based approach. This saved 100–200 hours of development time for every new data source and data footprint, because the original approach required creating two files (one with PII and another without PII), tagging files, and setting up permissions based on tags.
  • The ability to extend access from Amazon Redshift to the data lake with appropriate access control has allowed FanDuel to reduce data stored in Amazon Redshift.

Conclusion

FanDuel will leverage its new data platform to ingest additional data sources with real-time data so analysts and data scientists can gain insights and improve customer experience.

Questions or feedback? Send an email to [email protected].


About the Authors

Damian Grech is a Data Engineering Senior Manager at FanDuel. Damian has over 15 years of experience in software delivery and has worked with organizations ranging from large enterprises to start-ups at their infant stages. In his spare time, you can find him either experimenting in the kitchen or trailing the Scottish Highlands.

 

 

Shiv Narayanan is Global Business Development Manager for Data Lakes and Analytics solutions at AWS. He works with AWS customers across the globe to strategize, build, develop and deploy modern data platforms. Shiv loves music, travel, food and trying out new tech.

 

 

 

Sidhanth Muralidhar is a Senior Technical Account Manager at Amazon Web Services. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them in their cloud journey. In his spare time, he loves to play and watch football.

 

 

 

 

 

 

Mythbusting the Analytics Journey

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/mythbusting-the-analytics-journey-58d692ea707e

Part of our series on who works in Analytics at Netflix — and what the role entails

by Alex Diamond

This Q&A aims to mythbust some common misconceptions about succeeding in analytics at a big tech company.

This isn’t your typical recruiting story. I wasn’t actively looking for a new job and Netflix was the only place I applied. I didn’t know anyone who worked there and just submitted my resume through the Jobs page 🤷🏼‍♀️ . I wasn’t even entirely sure what the right role fit would be and originally applied for a different position, before being redirected to the Analytics Engineer role. So if you find yourself in a similar situation, don’t be discouraged!

How did you come to Netflix?

Movies and TV have always been one of my primary sources of joy. I distinctly remember being a teenager, perching my laptop on the edge of the kitchen table to “borrow” my neighbor’s WiFi (back in the days before passwords 👵🏻), and streaming my favorite Netflix show. I felt a little bit of ✨magic✨ come through the screen each time, and that always stuck with me. So when I saw the opportunity to actually contribute in some way to making the content I loved, I jumped at it. Working in Studio Data Science & Engineering (“Studio DSE”) was basically a dream come true.

Not only did I find the subject matter interesting, but the Netflix culture seemed to align with how I do my best work. I liked the idea of Freedom and Responsibility, especially if it meant having autonomy to execute projects all the way from inception through completion. Another major point of interest for me was working with “stunning colleagues”, from whom I could continue to learn and grow.

What was your path to working with data?

My road-to-data was more of a stumbling-into-data. I went to an alternative high school for at-risk students and had major gaps in my formal education — not exactly a head start. I then enrolled at a local public college at 16. When it was time to pick a major, I was struggling in every subject except one: Math. I completed a combined math bachelors + masters program, but without any professional guidance, networking, or internships, I was entirely lost. I had the piece of paper, but what next? I held plenty of jobs as a student, but now I needed a career.

A visual representation of all the jobs I had in high school and college: From pizza, to gourmet rice krispie treats, to clothing retail, to doors and locks

After receiving a grand total of *zero* interviews from sending out my resume, the natural next step was…more school. I entered a PhD program in Computer Science and shortly thereafter discovered I really liked the coding aspects more than the theory. So I earned the honor of being a PhD dropout.

A visual representation of all the hats I’ve worn

And here’s where things started to click! I used my newfound Python and SQL skills to land an entry-level Business Intelligence Analyst position at a company called Big Ass Fans. They make — you guessed it — very large industrial ventilation fans. I was given the opportunity to branch out and learn new skills to tackle any problem in front of me, aka my “becoming useful” phase. Within a few months I’d picked up BI tools, predictive modeling, and data ingestion/ETL. After a few years of wearing many different proverbial hats, I put them all to use in the Analytics Engineer role here. And ever since, Netflix has been a place where I can do my best work, put to use the skills I’ve gathered over the years, and grow in new ways.

What does an ordinary day look like?

As part of the Studio DSE team, our work is focused on aiding the movie-making process for our Netflix Originals, leading all the way up to a title’s launch on the service. Despite the affinity for TV and movies that brought me here, I didn’t actually know very much about how they got made. But over time, and by asking lots of questions, I’ve picked up the industry lingo! (Can you guess what “DOOD” stands for?)

My main stakeholders are members of our Studio team. They’re experts on the production process and an invaluable resource for me, sharing their expertise and providing context when I don’t know what something means. True to the “people over process” philosophy, we adapt alongside our stakeholders’ needs throughout the production process. That means the work products don’t always fit what you might imagine a traditional Analytics Engineer builds — if such a thing even exists!

A typical production lifecycle

On an ordinary day, my time is generally split evenly across:

  • 🤝📢 Speaking with stakeholders to understand their primary needs
  • 🐱💻 Writing code (SQL, Python)
  • 📊📈 Building visual outputs (Tableau, memos, scrappy web apps)
  • 🤯✍️ Brainstorming and vision planning for future work

Some days have more of one than the others, but variety is the spice of life! The one constant is that my day always starts with a ridiculous amount of coffee. And that it later continues with even more coffee. ☕☕☕

My road-to-data was more of a stumbling-into-data.

What advice would you give to someone just starting their career in data?

🐾 Dip your toes in things. As you try new things, your interests will evolve and you’ll pick up skills across a broad span of subject areas. The first time I tried building the front-end for a small web app, it wasn’t very pretty. But it piqued my interest and after a few times it started to become second nature.

💪 Find your strengths and weaknesses. You don’t have to be an expert in everything. Just knowing when to reach out for guidance on something allows you to uplevel your skills in that area over time. My weakness is statistics: I can use it when needed but it’s just not a subject that comes naturally to me. I own that about myself and lean on my stats-loving peers when needed.

🌸 Look for roles that allow you to grow. As you grow in your career, you’ll provide impact to the business in ways you didn’t even expect. As a business intelligence analyst, I gained data science skills. And in my current Analytics Engineer role, I’ve picked up a lot of product management and strategic thinking experience.

This is what I look like.

☝️ One Last Thing

I started off my career with the vague notion of, “I guess I want to be a data scientist?” But what that’s meant in practice has really varied depending on the needs of each job and project. It’s ok if you don’t have it all figured out. Be excited to try new things, lean into strengths, and don’t be afraid of your weaknesses — own them.

If this post resonates with you and you’d like to explore opportunities with Netflix, check out our analytics site, search open roles, and learn about our culture. You can also find more stories like this here.


Mythbusting the Analytics Journey was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.