Tag Archives: Analytics

Automate Amazon ES synonym file updates

Post Syndicated from Ashwini Rudra original https://aws.amazon.com/blogs/big-data/automate-amazon-es-synonym-file-updates/

Search engines provide the means to retrieve relevant content from a collection of content. However, this can be challenging if certain exact words aren’t entered. You need to find the right item from a catalog of products, or the correct provider from a list of service providers, for example. The most common method of specifying your query is through a text box. If you enter the wrong terms, you won’t match the right items, and won’t get the best results.

Synonyms enable better search results by matching words that all match to a single indexable term. In Amazon Elasticsearch Service (Amazon ES), you can provide synonyms for keywords that your application’s users may look for. For example, your website may provide medical practitioner searches, and your users may search for “child’s doctor” instead of “pediatrician.” Mapping the two words together enables either search term to match documents that contain the term “pediatrician.” You can achieve similar search results by using synonym files. Amazon ES custom packages allow you to upload synonym files that define the synonyms in your catalog. One best practice is to manage the synonyms in Amazon Relational Database Service (Amazon RDS). You then need to deploy the synonyms to your Amazon ES domain. You can do this with AWS Lambda and Amazon Simple Storage Service (Amazon S3).

In this post, we discuss an approach using Amazon Aurora and Lambda functions to automate updating synonym files for improved search results.

Overview of solution

Amazon ES is a fully managed service that makes it easy to deploy, secure, and run Elasticsearch cost-effectively and at scale. You can build, monitor, and troubleshoot your applications using the tools you love, at the scale you need. The service supports open-source Elasticsearch API operations, managed Kibana, integration with Logstash, and many AWS services with built-in alerting and SQL querying.

The following diagram shows the solution architecture. One Lambda function pushes files to Amazon S3, and another function distributes the updates to Amazon ES.

Walkthrough overview

For search engineers, the synonym file’s content is usually stored within a database or in a data lake. You may have data in tabular format in Amazon RDS (in this case, we use Amazon Aurora MySQL). When updates to the synonym data table occur, the change triggers a Lambda function that pushes data to Amazon S3. The S3 event triggers a second function, which pushes the synonym file from Amazon S3 to Amazon ES. This architecture automates the entire synonym file update process.

To achieve this architecture, we complete the following high-level steps:

  1. Create a stored procedure to trigger the Lambda function.
  2. Write a Lambda function to verify data changes and push them to Amazon S3.
  3. Write a Lambda function to update the synonym file in Amazon ES.
  4. Test the data flow.

We discuss each step in detail in the next sections.

Prerequisites

Make sure you complete the following prerequisites:

  1. Configure an Amazon ES domain. We use a domain running Elasticsearch version 7.9 for this architecture.
  2. Set up an Aurora MySQL database. For more information, see Configuring your Amazon Aurora DB cluster.

Create a stored procedure to trigger a Lambda function

You can invoke a Lambda function from an Aurora MySQL database cluster using a native function or a stored procedure.

The following script creates an example synonym data table:

CREATE TABLE SynonymsTable (
SynID int NOT NULL AUTO_INCREMENT,
Base_Term varchar(255),
Synonym_1 varchar(255),
Synonym_2 varchar(255),
PRIMARY KEY (SynID)
)

You can now populate the table with sample data. To generate sample data in your table, run the following script:

INSERT INTO SynonymsTable(Base_Term, Synonym_1, Synonym_2)
VALUES ( 'danish', 'croissant', 'pastry')

Create a Lambda function

You can use two different methods to send data from Aurora to Amazon S3: a Lambda function or SELECT INTO OUTFILE S3.

To demonstrate the ease of setting up integration between multiple AWS services, we use a Lambda function that is called every time a change occurs that must be tracked in the database table. This function passes the data to Amazon S3. First create an S3 bucket where you store the synonym file using the Lambda function.

When you create your function, make sure you give the right permissions using an AWS Identity and Access Management (IAM) role for the S3 bucket. These permissions are for the Lambda execution role and S3 bucket where you store the synonyms.txt file. By default, Lambda creates an execution role with minimal permissions when you create a function on the Lambda console. The following is the Python code to create the synonyms.txt file in S3:

import boto3
import json
import botocore
from botocore.exceptions import ClientError

s3_resource = boto3.resource('s3')

filename = 'synonyms.txt'
BucketName = '<<provide your bucket name>>
local_file = '/tmp/test.txt'

def lambda_handler(event, context):
    S3_data = (("%s,%s,%s \n") %(event['Base_Term'], event['Synonym_1'], event['Synonym_2']))
    # open  a file and append new line
    try:
        obj=s3_resource.Bucket(BucketName).download_file(local_file,filename)
    except ClientError as e:
        if e.response['Error']['Code'] == "404":
            # create a new file if file does not exits 
            s3_resource.meta.client.put_object(Body=S3_data, Bucket= BucketName,Key=filename)
        else:
            # append file
            raise
    with open('/tmp/test.txt', 'a') as fd:
        fd.write(S3_data)
        
    s3_resource.meta.client.upload_file('/tmp/test.txt', BucketName, filename)

Note the Amazon Resource Name (ARN) of this Lambda function to use in a later step.

Give Aurora permissions to invoke a Lambda function

To give Aurora permissions to invoke your function, you must attach an IAM role with the appropriate permissions to the cluster. For more information, see Invoking a Lambda function from an Amazon Aurora DB cluster.

When you’re finished, the Aurora database has access to invoke a Lambda function.

Create a stored procedure and a trigger in Aurora

To create a new stored procedure, return to MySQL Workbench. Change the ARN in the following code to your Lambda function’s ARN before running the procedure:

DROP PROCEDURE IF EXISTS Syn_TO_S3;
DELIMITER ;;
CREATE PROCEDURE Syn_TO_S3 (IN SysID INT,IN Base_Term varchar(255),IN Synonym_1 varchar(255),IN Synonym_2 varchar(255)) LANGUAGE SQL
BEGIN
   CALL mysql.lambda_async('<<Lambda-Funtion-ARN>>,
    CONCAT('{ "SysID ": "', SysID,
    '", "Base_Term" : "', Base_Term,
    '", "Synonym_1" : "', Synonym_1,
    '", "Synonym_2" : "', Synonym_2,'"}')
    );
END
;;
DELIMITER

When this stored procedure is called, it invokes the Lambda function you created.

Create a trigger TR_SynonymTable_CDC on the table SynonymTable. When a new record is inserted, this trigger calls the Syn_TO_S3 stored procedure. See the following code:

DROP TRIGGER IF EXISTS TR_Synonym_CDC;
 
DELIMITER ;;
CREATE TRIGGER TR_Synonym_CDC
  AFTER INSERT ON SynonymsTable
  FOR EACH ROW
BEGIN
  SELECT  NEW.SynID, NEW.Base_Term, New.Synonym_1, New.Synonym_2 
  INTO @SynID, @Base_Term, @Synonym_1, @Synonym_2;
  CALL  Syn_TO_S3(@SynID, @Base_Term, @Synonym_1, @Synonym_2);
END
;;
DELIMITER ;

If a new row is inserted in SynonymsTable, the Lambda function that is mentioned in the stored procedure is invoked.

Verify that data is being sent from the function to Amazon S3 successfully. You may have to insert a few records, depending on the size of your data, before new records appear in Amazon S3.

Update synonyms in Amazon ES when a new synonym file becomes available

Amazon ES lets you upload custom dictionary files (for example, stopwords and synonyms) for use with your cluster. The generic term for these types of files is packages. Before you can associate a package with your domain, you must upload it to an S3 bucket. For instructions on uploading a synonym file for the first time and associating it to an Amazon ES domain, see Uploading packages to Amazon S3 and Importing and Associating packages.

To update the synonyms (package) when a new version of the synonym file becomes available, we complete the following steps:

  1. Create a Lambda function to update the existing package.
  2. Set up an S3 event notification to trigger the function.

Create a Lambda function to update the existing package

We use a Python-based Lambda function that uses the Boto3 AWS SDK for updating the Elasticsearch package. For more information about how to create a Python-based Lambda function, see Building Lambda functions with Python. You need the following information before we start coding for the function:

  • The S3 bucket ARN where the new synonym file is written
  • The Amazon ES domain name (available on the Amazon ES console)

  • The package ID of the Elasticsearch package we’re updating (available on the Amazon ES console)

You can use the following code for the Lambda function:

import logging
import boto3
import os

# Elasticsearch client
client = boto3.client('es')
# set up logging
logger = logging.getLogger('boto3')
logger.setLevel(logging.INFO)
# fetch from Environment Variable
package_id = os.environ['PACKAGE_ID']
es_domain_nm = os.environ['ES_DOMAIN_NAME']

def lambda_handler(event, context):
    s3_bucket = event["Records"][0]["s3"]["bucket"]["name"]
    s3_key = event["Records"][0]["s3"]["object"]["key"]
    logger.info("bucket: {}, key: {}".format(s3_bucket, s3_key))
    # update package with the new Synonym file.
    up_response = client.update_package(
        PackageID=package_id,
        PackageSource={
            'S3BucketName': s3_bucket,
            'S3Key': s3_key
        },
        CommitMessage='New Version: ' + s3_key
    )
    logger.info('Response from Update_Package: {}'.format(up_response))
    # check if the package update is completed
    finished = False
    while finished == False:
        # describe the package by ID
        desc_response = client.describe_packages(
            Filters=[{
                    'Name': 'PackageID',
                    'Value': [package_id]
                }],
            MaxResults=1
        )
        status = desc_response['PackageDetailsList'][0]['PackageStatus']
        logger.info('Package Status: {}'.format(status))
        # check if the Package status is back to available or not.
        if status == 'AVAILABLE':
            finished = True
            logger.info('Package status is now Available. Exiting loop.')
        else:
            finished = False
    logger.info('Package: {} update is now Complete. Proceed to Associating to ES Domain'.format(package_id))
    # once the package update is completed, re-associate with the ES domain
    # so that the new version is applied to the nodes.
    ap_response = client.associate_package(
        PackageID=package_id,
        DomainName=es_domain_nm
    )
    logger.info('Response from Associate_Package: {}'.format(ap_response))
    return {
        'statusCode': 200,
        'body': 'Custom Package Updated.'
    }

The preceding code requires environment variables to be set to the appropriate values and the IAM execution role assigned to the Lambda function.

Set up an S3 event notification to trigger the Lambda function

Now we set up event notification (all object create events) for the S3 bucket in which the updated synonym file is uploaded. For more information about how to set up S3 event notifications with Lambda, see Using AWS Lambda with Amazon S3.

Test the solution

To test our solution, let’s consider an Elasticsearch index (es-blog-index-01) that consists of the following documents:

  • tennis shoe
  • hightop
  • croissant
  • ice cream

A synonym file is already associated with the Amazon ES domain via Amazon ES custom packages and the index (es-blog-index-01) has the synonym file in the settings (analyzer, filter, mappings). For more information about how to associate a file to an Amazon ES domain and use it with the index settings, see Importing and associating packages and Using custom packages with Elasticsearch. The synonym file contains the following data:

danish, croissant, pastry

Test 1: Search with a word present in the synonym file

For our first search, we use a word that is present in the synonym file. The following screenshot shows that searching for “danish” brings up the document croissant based on a synonym match.

Test 2: Search with a synonym not present in the synonym file

Next, we search using a synonym that’s not in the synonym file. In the following screenshot, our search for “gelato” yields no result. The word “gelato” doesn’t match with the document ice cream because no synonym mapping is present for it.

In the next test, we add synonyms for “ice cream” and perform the search again.

Test 3: Add synonyms for “ice cream” and redo the search

To add the synonyms, let’s insert a new record into our database. We can use the following SQL statement:

INSERT INTO SynonymsTable(Base_Term, Synonym_1, Synonym_2)
VALUES ('frozen custard', 'gelato', 'ice cream')

When we search with the word “gelato” again, we get the ice cream document.

This confirms that the synonym addition is applied to the Amazon ES index.

Clean up resources

To avoid ongoing charges to your AWS account, remove the resources you created:

  1. Delete the Amazon ES domain.
  2. Delete the RDS DB instance.
  3. Delete the S3 bucket.
  4. Delete the Lambda functions.

Conclusion

In this post, we implemented a solution using Aurora, Lambda, Amazon S3, and Amazon ES that enables you to update synonyms automatically in Amazon ES. This provides central management for synonyms and ensures your users can obtain accurate search results when synonyms are changed in your source database.


About the Authors

Ashwini Rudra is a Solutions Architect at AWS. He has more than 10 years of experience architecting Windows workloads in on-premises and cloud environments. He is also an AI/ML enthusiast. He helps AWS customers, namely major sports leagues, define their cloud-first digital innovation strategy.

 

 

Arnab Ghosh is a Solutions Architect for AWS in North America helping enterprise customers build resilient and cost-efficient architectures. He has over 13 years of experience in architecting, designing, and developing enterprise applications solving complex business problems.

 

 

Jennifer Ng is an AWS Solutions Architect working with enterprise customers to understand their business requirements and provide solutions that align with their objectives. Her background is in enterprise architecture and web infrastructure, where she has held various implementation and architect roles in the financial services industry.

Build and optimize real-time stream processing pipeline with Amazon Kinesis Data Analytics for Apache Flink, Part 2

Post Syndicated from Amit Chowdhury original https://aws.amazon.com/blogs/big-data/part2-build-and-optimize-real-time-stream-processing-pipeline-with-amazon-kinesis-data-analytics-for-apache-flink/

In Part 1 of this series, you learned how to calibrate Amazon Kinesis Data Streams stream and Apache Flink application deployed in Amazon Kinesis Data Analytics for tuning Kinesis Processing Units (KPUs) to achieve higher performance. Although the collection, processing, and analysis of spiky data stream in real time is crucial, reacting to the spiky data is equally important in many real-life situations as derived insights diminish with time.

In order to build a highly responsive scalable streaming application, we need to auto-scale both Kinesis Data Streams and Kinesis Data Analytics application based on incoming data streams. Refer this blog to know how to  easily monitor and automatically scale your Apache Flink application with Amazon Kinesis Data Analytics. Use Kinesis Scaling Utility, which is designed to give you the ability to scale Amazon Kinesis Streams in the same way that you scale EC2 Auto Scaling groups – up or down by a count or as a percentage of the total fleet. You can simply scale to an exact number of Shards.

In this post, we dive deep into important metrics to generate meaningful insights about the health and performance of the Amazon Kinesis Data Analytics for Apache Flink application. We will also walk you through steps to build a fully automated, scalable, and highly available pipeline to handle streaming data scaling in and out for both the Kinesis data stream (based on incoming records) and Kinesis data analytics application (by calibrating KPUs and parallelism). You use AWS Managed Services to reduce operational overhead compared to the manual approach of scaling the streaming application. Because this is a continuation of the previous post, make sure to walk through Part 1 as a prerequisite before deploying the automated pipeline code in this post.

Deploy the advanced monitoring and scaling architecture

This section uses an AWS CloudFormation template to build an advanced monitoring dashboard to capture vital metrics data. You also create an advanced scaling environment to auto scale the Kinesis data stream and Kinesis data analytics application, which scales both services in and out depending on the volume of spiky data. Furthermore, we use managed services for better operational efficiency. The template builds the following architecture.

The CloudFormation template includes the following components:

  • An advanced Amazon CloudWatch dashboard
  • Two CloudWatch alarms for scaling your Kinesis Data Analytics for Apache Flink application
  • Two CloudWatch alarms for scaling your Kinesis Data Streams
  • Accompanying auto scaling policy actions in these alarms
  • An Amazon API Gateway endpoint for triggering AWS Lambda
  • A Lambda function responsible for handling the scale-in and scale-out functions

These components work in tandem to monitor the metrics configured in the CloudWatch alarm and respond to metrics accordingly.

To provision your resources, complete the following steps:

  1. Choose Launch Stack (right-click) to open a new tab to run the CloudFormation template. Make sure to choose us-east-1 (N. Virginia) region.
  2. Choose Next.

  1. For FlinkApplicationName, enter the name of your application.
  2. For KinesisStreamName, enter the name of your data stream.
  3. Make sure ShardCount is same as the current shard count of Kinesis Data Streams created in Part 1.

This information is available on the Outputs tab of the CloudFormation stack detailed in Part 1.

  1. Choose Next.

  1. Follow the remaining instructions and choose Create stack.

This dashboard should take a few minutes to launch.

 

  1. When the stack is complete, go to the Outputs tab of the stack on the AWS CloudFormation console and choose the dashboard link.

Metrics deep dive

There are many critical operational metrics to measure when assessing the performance of a running Apache Flink application. This section looks at the essential CloudWatch metrics for Kinesis Data Analytics for Apache Flink applications, what they mean, and what appropriate alarms might be vital indicators for each. Let’s dive into how to monitor your application.

First, let’s look at the running application using our CloudWatch dashboard and point out potential issues with a given Apache Flink application indicated by our CloudWatch metrics.

The Application Health section of this dashboard can help identify fundamental issues with your application that are causing it to be inoperable. Let’s start with the first two cells: Uptime and Downtime. In an ideal state, this is precisely how your application should look—uptime measures the cumulative time in milliseconds that the application has been running without interruption, and downtime measures the time elapsed during an outage.

In an ideal state, your lastCheckpointSize and lastCheckpointDuration metrics should remain relatively stable over time. If you observe an increasing checkpoint size, this can indicate a state not being cleared within your application or a memory leak. Similarly, a longer and unexpected spike in checkpoint duration can cause backpressure of your application. Monitor these metrics for stability over time.

The resource utilization metrics section gives a glimpse into the resource usage of the running Flink application. In a healthy application, try to keep this metric under 75% usage. This is also the same metric that Kinesis Data Analytics for Apache Flink uses to auto scale your application if you have auto scaling enabled. Also, it’s normal to see CPU spikes during application startup or restart. HeapMemoryUtilization measures the memory taken up by the application, on-heap state, and any other operations that may take up memory space.

Let’s now evaluate our Flink application progress. Incoming and outgoing records per second are measured on an application level in this image. You can also measure them on a task or subtask level for finer granularity and visibility into the operators of your application. The ideal state for these depends on the use case, but if it’s a straight read, process, and write action without filtering the records, you can expect to see an equal amount of records in and out per second. If a deviation occurs on either end of these metrics, it’s a good indicator of where the bottleneck is. If numRecordsInPerSecond is lower, the source might be configured to read in less data, or it could be indicative of backpressure on the sink causing a slowdown. If numRecordsOutPerSecond is lower, it could be identifying a slow operator process in the middle of your application.

Next, let’s look at InputandOutputWatermark and EventTimeLatency. The watermarks indicate the eventTime with which data is arriving into the data stream. A large difference between these two values could indicate significantly late-arriving data into the stream. Your stream should handle this according to your use case, and EventTimeLatency measures the total latency, or OutputWatermark and InputWatermark, of the streaming workload.

The LateRecordsDropped metric measures the number of records dropped due to arriving late. If this number is spiking, there is an issue with data arriving late to the Flink application.

Now let’s dive into Kinesis source and sink metrics. The millisBehindLatest metric shows the time the consumer is behind the head of the stream, which is a good indicator of how far behind the consumer’s current time is. You can measure this metric on an application or a parallelism level—a value of 0 shows that the application is completely caught up with processing records. This is ideal; a higher value means that the application is falling behind. It could indicate that the consumer isn’t tuned to read records efficiently, backpressure, or some slowness in processing. Scale the application accordingly.

The RetriesPerRecord, UserRecordsPending, and BufferingTime metrics come from the Kinesis Producer Library (KPL), and in this case, is referring to our terminal script, which is writing to the Kinesis data stream. All applications that use the KPL report this metric, and it’s important to monitor in case of frequent retries or timeouts. The other metrics can grow exceedingly large if the data stream is under-provisioned.

Advanced scaling

Let’s dive deep into how to scale your Kinesis data analytics application based on the previously discussed metrics. The only way to scale a Kinesis data analytics application automatically is to use the built-in auto scale feature. This feature monitors your application’s CPU usage over time, and if it remains at 75% or above for 15 minutes, it increases your application’s overall parallelism. You experience some downtime during this scale-up, and an application developer should take this into account when using the auto scaling feature. It’s an excellent and helpful feature of Kinesis Data Analytics for Apache Flink. However, some applications need to scale based on other factors, not just CPU usage. In this section, we look at an external way to scale your application based on IncomingRecords or millisBehindLatest metrics on the source Kinesis data stream.

To add the functionality of scaling based on other metrics, we utilize Application Auto Scaling to specify our scaling policy and other attributes, such as cooldown periods. We can also take advantage of any auto scaling types—step scaling, target tracking scaling, and schedule-based scaling. The CloudFormation template we launched already created the necessary resources covering step scaling. For a more detailed list, view the Resources tab on the AWS CloudFormation console or view the designer before launching.

Currently, the settings are tuned to the max throughput per KPU, which is ideal for a production workload. Let’s tune this setting down to a lower value to more quickly see results.

  1. On the CloudWatch console, choose Alarms in the navigation pane.
  2. Choose the alarm KDAScaleOutAlam.

The alarm has been preconfigured for you in CloudWatch to listen for specific metrics breaching a threshold. Optionally, you can adjust the alarm to trigger scale-out or scale-in events as needed.

  1. On the Actions menu, choose Edit.

  1. In the Conditions section, adjust the threshold value as needed.
  2. Choose Update alarm.

You can also use the speedup value in the ProducerCommand found in the outputs of the CloudFormation stack from Part 1 to increase and decrease data volume per second, replicating real-life scenarios of spiky data. Observe the CloudWatch alarms changing states between OK and In alarm. When in alarm, it triggers auto scaling of the Kinesis data stream scaling in or out many shards. Alarms also scale KPUs allocated to the Kinesis data analytics application.

  1. Navigate back to your Kinesis data analytics application.
  2. On the Details tab, see in the Scaling section if this alarm has impacted the parallelism.

  1. Alternatively, stop the producer in the terminal.

Turning off the producer should show an inverse effect, causing the application to trigger the KDAScaleInAlarm, and the application parallelism should scale back down in a few minutes.

  1. On the Configuration tab of your data stream, observe the scaling operation of allocated shards.

You can open the Apache Flink dashboard from your Kinesis data analytics application, analyze the application performance, and troubleshoot by looking at Flink job-level insights, Flink task-level insights, Flink exceptions, and checkpoints. You can also calibrate your application by looking at the Flink dashboard metrics, which gives you additional granularity out of the box, and using the metrics for debugging purposes.

Conclusion

In this post, you built a reliable, scalable, and highly available advanced scaling mechanism for streaming applications based on Kinesis Data Analytics for Apache Flink and Kinesis Data Streams. The post also discussed how to auto scale your applications based on a metric other than CPU utilization and explored ways to extend observability of the application with advanced monitoring and error handling. This solution was largely enabled by using managed services, so you didn’t need to spend time provisioning and configuring the underlying infrastructure. The sources of the AWS CloudFormation templates used in this post are available from GitHub for your reference.

You should now have a good understanding of how to build, monitor and auto scale a real-time streaming application using Amazon Kinesis. You can also calibrate various components based on your application needs and volume of data by applying advanced monitoring and scaling techniques.


About the Authors

 Amit Chowdhury is a Partner Solutions Architect in the Global System Integrator (GSI) team at Amazon Web Services. He helps AWS GSI partners migrate customer workloads to the AWS Cloud, and provides guidance to build, design, and architect scalable, highly available, and secure solutions on AWS by applying AWS recommended best practices. He enjoys spending time with his family, outdoor adventures and traveling.

 

Saurabh Shrivastava is a solutions architect leader and analytics specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

Build and optimize a real-time stream processing pipeline with Amazon Kinesis Data Analytics for Apache Flink, Part 1

Post Syndicated from Amit Chowdhury original https://aws.amazon.com/blogs/big-data/part1-build-and-optimize-a-real-time-stream-processing-pipeline-with-amazon-kinesis-data-analytics-for-apache-flink/

In real-time stream processing, it becomes critical to collect, process, and analyze high-velocity real-time data to provide timely insights and react quickly to new information. Streaming data velocity could be unpredictable, and volume could spike based on user demand at a given time of day. Real-time analysis needs to handle the data spike, because any delay in processing streaming data can cause a significant impact on the desired outcome. The value of insights and actions reduces over time, whereas real-time analysis can substantially improve the effectiveness of the analytics application.

A widespread use case is fleet management for vehicles, especially in the autonomous car industry. It’s essential to collect, process, and analyze high-velocity traffic data and react in real time to control and reroute traffic. Real-time stream processing is crucial in many other use cases, such as manufacturing production lines, robotics automation, analyzing high-volume web and application logs, website clickstreams or database event streams, aggregating social media feeds, and tracking financial transactions.

Amazon Kinesis Data Analytics reduces the complexity of building, managing, and integrating Apache Flink applications with other AWS services. Kinesis Data Analytics takes care of everything required to run streaming applications continuously and scales automatically to match your incoming data volume and throughput in a serverless manner.

In this post, you learn the required concepts to implement robust, scalable, and flexible real-time streaming extract, transform, and load (ETL) pipelines with Apache Flink and Kinesis Data Analytics. We demonstrate how to calibrate the Kinesis streaming analytics pipeline to achieve higher performance efficiency and better cost optimization with the right amount of Kinesis Processing Units (KPUs). Estimating the optimal number of KPUs to handle your streaming workload depends on several factors, including the type of stream processing involved. For instance, if you’re performing CPU-intensive statistical calculations, your application might need more CPU or memory. On the other hand, if your application is simply enriching records via external API calls as they flow through, you might be I/O bound. In Part 1 of this series, you learn various parameters such as Parallelism and ParallelismPerKPU for KPU calibration. In Part 2, you learn about applying auto scaling to add the right amount of KPUs based streaming data spike.

For this post, we analyze the telemetry data of a taxi fleet in New York City in real time to optimize fleet operation using Amazon Kinesis Data Analytics for Apache Flink. Kinesis Data Analytics helps process and analyze the data in real time to identify areas currently requesting a high number of taxi rides. The derived insights are visualized on a dashboard for operational teams to inspect.

Architecture

As shown in the following architecture diagram, every taxi in the fleet is capturing information about completed trips. The tracked information includes the pickup and drop-off locations, number of passengers, and generated revenue. This information is ingested into Amazon Kinesis Data Streams as a simple JSON blob. The application reads the timestamp attribute of the stored events and replays them as if they occurred in real time. From there, the data is processed and analyzed by a Flink application, which is deployed to Kinesis Data Analytics for Apache Flink.

The application  identifies areas that are currently requesting a high number of taxi rides. The derived insights are finally persisted into Amazon Elasticsearch Service, where they can be accessed and visualized using Kibana.

In this post, you build a fully managed infrastructure that can analyze the data in near-real time—within seconds—while being scalable and highly available. The architecture uses Kinesis Data Streams as a streaming store, Kinesis Data Analytics to run an Apache Flink application in a fully managed environment, and Amazon Elasticsearch Service (Amazon ES) and Kibana for visualization.

Additionally, we discuss basic Apache Flink concepts and common patterns for streaming analytics. We also cover how Kinesis Data Analytics for Apache Flink is different from a self-managed environment and how to effectively operate and monitor streaming architectures. You also calibrate KPUs in Kinesis Data Analytics to improve performance efficiency and cost optimization.

Deploy the real-time streaming and analysis workload

To replicate the real-life scenario, you connect to a preconfigured Amazon Elastic Compute Cloud (Amazon EC2) instance running Linux over SSH. Then you use a Java application to replay a historic set of taxi trips made in NYC stored in objects in Amazon Simple Storage Service (Amazon S3) into the data stream. The Java application is compiled and loaded onto the EC2 instance.

This section uses an AWS CloudFormation template to build a producer client program that sends NYC taxi trip data to our Kinesis data stream. The template creates the following resources:

  • An S3 bucket to house the data resources.
  • A new Kinesis data stream that we use to stream a dataset of NYC taxi trips.
  • Amazon ElasticSearch cluster with Kibana integration for displaying dashboard information.
  • A build pipeline and AWS CodeBuild project along with sources for a Flink Kinesis connector application.
  • An EC2 instance for running a Flink application to replay data onto the data stream. An Elastic IP is provisioned for the EC2 instance to allow SSH access.
  • A Java application hosted on the EC2 instance, which loads data from the EC2 instance.
  • A Kinesis data analytics application to continuously monitor and analyze data from the connected data stream and run the Apache Flink 1.11 application.
  • The necessary AWS Identity and Access Management (IAM) service roles, security groups, and policies to run and communicate between the resources you create.
  • An Amazon CloudWatch alarm when the application falls behind based on the millisBehindLatest metric.

To provision these resources, complete the following steps:

  1. Choose Launch Stack (right click) and open a new tab to run the CloudFormation template. Make sure to choose us-east-1 ( N. Virginia) region.
  2. Choose Next.

  1. For Stack name, enter a name.
  2. For ClientipAddressRange, enter the IP address from which the Kibana dashboard is accessed.

Use https://checkup.amazon.com to find IP and add /32 at the end.

  1. For SshKeyName¸ enter the SSH key name for the Linux-based EC2 instance you created.
  2. Choose Next.

  1. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and I acknowledge that AWS CloudFormation might require the following capability: CAPABILITY_AUTO_EXPAND.
  2. Choose Create stack.
  3. Wait until the CloudFormation template has been successfully created.

Connect to the new EC2 instance and run the producer client program

In this section, we connect to our new EC2 instance and run the producer client program.

  1. On the AWS CloudFormation console, choose the parent in which the stack was deployed.
  2. In the Outputs section, locate the AWS Systems Manager Session Manager URL for KinesisReplayInstance.
  3. Choose the Session Manager URL to connect to the EC2 instance.

  1. After the connection has been established, start ingesting events into the Kinesis data stream by running the JAR file that was downloaded to the EC2 instance.

The command with pre-populated parameters is available in the Outputs section of the CloudFormation template for ProducerCommand.

You have now successfully created the basic infrastructure and are ingesting data into the data stream.

  1. On the Kinesis Data Streams console, go to your data stream.
  2. On the Monitoring tab, locate the Incoming Data – Sum

You may need to wait 2–3 minutes and use the refresh button for the monitoring charts to see the metrics.

Visualize the data

To visualize the data, navigate to the Kibana dashboard. The dashboard URL is in the Outputs section of the CloudFormation stack for KibanaDashboard. You can inspect the preloaded dashboard or even create your visualizations. If no data shows up, choose the clock icon and change the timeframe to January 2010 – December 2011.

AWS CloudFormation automatically grants access to the IP address provided during stack creation. However, if you encounter access issues in the Kibana dashboard, modify your Amazon ES domain’s access policy and change your local IP address on the Amazon ES console.

To change your IP address, find and choose the Amazon ES domain that you provisioned. On the Actions menu, choose Modify access policy.

Replace the IP address (for example, 123.123.123.123) with your local IP. If you don’t know your local IP, use http://checkip.amazonaws.com.

Scale the Kinesis data stream to adapt to a spiky data stream

Now that the Kinesis Data Analytics for Apache Flink application is running and sending results to Amazon ES, we can look at operational aspects, such as monitoring and scaling. When you closely inspect the output of the producer application, you can observe that it’s experiencing write provisioned throughput that has exceeded exceptions and it can’t send data fast enough. If the resources of the Apache Flink application aren’t adapted accordingly, particularly for a spiky data stream, the application may fall substantially behind. It may then generate results that are no longer relevant because they’re already too old when the overloaded application can eventually produce them.

The Kinesis data stream was deliberately under-provisioned so that the Kinesis Replay Java application is completely saturating the data stream. When you closely inspect the output of the Java application, you can notice that the replay lag is continuously increasing. This means that the producer can’t ingest events as quickly as required according to the specified speedup parameter.

In this section, we scale the Kinesis data stream to accommodate the throughput generated by the Java application ingesting events into the data stream. We then observe how the Kinesis Data Analytics for Apache Flink application automatically scales to adapt to the increased throughput.

  1. On the Kinesis Data Streams console, navigate to the stream you created.
  2. In the Shards section, choose Edit.
  3. Double the throughput of the Kinesis stream by changing the number of open shards to 16.
  4. Choose Save changes to confirm the changes.

  1. While the service doubles the number of shards and therefore the throughput of the stream, examine the metrics of the data stream on the Monitoring

After few minutes, you should notice the effect of the scaling operation as the throughput of the stream substantially increases.

While we have scaled Kinesis Data Steams manually, Kinesis Scaling Utility is designed to give you the ability to auto-scale Amazon Kinesis Streams in the same way that you scale EC2 Auto Scaling groups – up or down by a count or as a percentage of the total fleet. You can simply scale to an exact number of Shards. In Part 2, you will learn more about auto-scaling Kinesis Data Streams based on incoming data stream.

Calibrate KPUs

Currently, Kinesis Data Analytics scales your application solely based on the underlying CPU usage. However, because not all applications are CPU bound, depending on your needs, you may want to use a different mechanism for sizing your application. In this section, we demonstrate how you can use the millisBehindLatest metric (available when consuming data from a Kinesis data stream) to responsively size your Kinesis data analytics application.

Kinesis Data Analytics provisions capacity in the form of Amazon Kinesis Processing Units (KPUs). One KPU provides you with 1 vCPU and 4 GB memory. The default limit for KPUs for your application is 32. You can also request an increase to this limit in AWS Service Limits.

We recommend that you test your application with production loads to get an accurate estimate of the number of KPUs required for your application. KPUs usage can vary considerably based on your data volume and velocity, code complexity, integrations, and more. This is especially true when using the Apache Flink runtime in Kinesis Data Analytics.

You can configure the parallel run of tasks and allocate resources for Kinesis Data Analytics for Apache Flink to implement scaling. We use the following properties:

  • Parallelism – Use this property to set the default Apache Flink application parallelism. All operators, sources, and sinks run with this parallelism unless overridden in the application code. The default is 1, and the default maximum is 256.
  • ParallelismPerKPU – Use this property to set the number of parallel tasks that can be scheduled per the of your application. The default is 1, and the maximum is 8. For applications that have blocking operations (for example, I/O), a higher value of ParallelismPerKPU leads to full utilization of KPU resources.

Kinesis Data Analytics calculates the KPUs needed to run your application as Parallelism/ParallelismPerKPU.

The following example request for the CreateApplication action sets parallelism when you create an application.

{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_11",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::mybucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}

 

For more examples and instructions for using request blocks with API actions, see Kinesis Data Analytics API Example Code.

The following example request for the UpdateApplication action sets parallelism for an existing application:

{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}

Scale the Kinesis Data Analytics for Apache Flink application

Because you increased the throughput of the Kinesis data stream by doubling the number of shards, more events are sent into the stream. However, as a direct result, more events need to be processed. Now the Kinesis data analytics application becomes overloaded and can no longer keep up with the increased number of incoming events. You can observe this through the millisBehindLatest metric, which is published to CloudWatch.

Kinesis Data Analytics natively supports auto scaling. After few minutes, you can see the effect of the auto scaling activities in the metrics. The millisBehindLatest metric starts to decrease until it reaches zero when the processing has caught up with the tip of the Kinesis data stream.

You can calibrate the scaling operation based on your application needs by adjusting the KPU.

  1. On the Kinesis Data Analytics console, navigate to your application.
  2. Under Scaling, choose Configure.
  3. Adjust Parallelism to 6 and Parallelism per KPU to 2.
  4. Choose Update.

The other method that you can apply to improve throughput is the AsyncIO function. You can make AsyncIO calls asynchronously to improve throughput while other requests are in progress. The two essential parameters when defining an AsyncFunction are Capacity (how many requests are in-flight concurrently per parallel sub-task) and Timeout (the timeout duration of an individual request to the external data source). It helps if you allocate enough capacity to account for the throughput, but not more than the external data source can handle. For example, application with a parallelism of 5 and a capacity of 10 sends 50 concurrent requests to your external data source. You can learn more about using the AsyncIO function with Kinesis Data Analytics for Apache Flink on the GitHub repo.

Conclusion

In this post, you built a reliable, scalable, and highly available streaming application based on Apache Flink and Kinesis Data Analytics. You also scaled the different components while ingesting and analyzing thousands of events per second in near-real time. The solution utilizes managed services without having to provision and configure underlying infrastructure. The post also discussed what it takes to auto scale your application based on metrics such as CPU utilization and millisBehindLatest. The sources of the AWS CloudFormation templates used in this post are available from GitHub for your reference.

You now know how to build a real-time streaming application using Kinesis Data Analytics on AWS. You can also calibrate KPUs based on your application needs and volume of data. Check out Part 2 of this post to explore advanced monitoring techniques and auto scale your real-time streaming application, adapting with streaming data.


About the Author

Amit Chowdhury is a Partner Solutions Architect in the Global System Integrator (GSI) team at Amazon Web Services. He helps AWS GSI partners migrate customer workloads to AWS, and provides guidance to build, design, and architect scalable, highly available, and secure solutions on AWS by applying AWS recommended best practices. He enjoys spending time with his family, outdoor adventures and traveling.

 

Saurabh Shrivastava is a solutions architect leader and analytics specialist working with global systems integrators. He works with AWS Partners and customers to provide them with architectural guidance for building scalable architecture in hybrid and AWS environments. He enjoys spending time with his family outdoors and traveling to new destinations to discover new cultures.

Data preparation using Amazon Redshift with AWS Glue DataBrew

Post Syndicated from Dhiraj Thakur original https://aws.amazon.com/blogs/big-data/data-preparation-using-amazon-redshift-with-aws-glue-databrew/

With AWS Glue DataBrew, data analysts and data scientists can easily access and visually explore any amount of data across their organization directly from their Amazon Simple Storage Service (Amazon S3) data lake, Amazon Redshift data warehouse, Amazon Aurora, and other Amazon Relational Database Service (Amazon RDS) databases. You can choose from over 250 built-in functions to merge, pivot, and transpose the data without writing code.

Now, with added support for JDBC-accessible databases, DataBrew also supports additional data stores, including PostgreSQL, MySQL, Oracle, and Microsoft SQL Server. In this post, we use DataBrew to clean data from an Amazon Redshift table, and transform and use different feature engineering techniques to prepare data to build a machine learning (ML) model. Finally, we store the transformed data in an S3 data lake to build the ML model in Amazon SageMaker.

Use case overview

For our use case, we use mock student datasets that contain student details like school, student ID, name, age, student study time, health, country, and marks. The following screenshot shows an example of our data.

For our use case, the data scientist uses this data to build an ML model to predict a student’s score in upcoming annual exam. However, this raw data requires cleaning and transformation. A data engineer must perform the required data transformation so the data scientist can use the transformed data to build the model in SageMaker.

Solution overview

The following diagram illustrates our solution architecture.

The workflow includes the following steps:

  1. Create a JDBC connection for Amazon Redshift and a DataBrew project.
  2. AWS DataBrew queries sample student performance data from Amazon Redshift and does the transformation and feature engineering to prepare the data to build ML model.
  3. The DataBrew job writes the final output to our S3 output bucket.
  4. The data scientist builds the ML model in SageMaker to predict student marks in an upcoming annual exam.

We cover steps 1–3 in this post.

Prerequisites

To complete this solution, you should have an AWS account.

Prelab setup

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

For our use case, we use a mock dataset. You can download the DDL and data files from GitHub.

  1. Create the Amazon Redshift cluster to capture the student performance data.
  2. Set up a security group for Amazon Redshift.
  3. Create a schema called student_schema and a table called study_details. You can use DDLsql to create database objects.
  4. We recommend using the COPY command to load a table in parallel from data files on Amazon S3. However, for this post, you can use study_details.sql to insert the data in the tables.

Create an Amazon Redshift connection

To create your Amazon Redshift connection, complete the following steps:

  1. On the DataBrew console, choose Datasets.
  2. On the Connections tab, choose Create connection.
  3. For Connection name, enter a name (for example, student-db-connection).
  4. For Connection type, select JDBC.
  5. Provide other parameters like the JDBC URL and login credentials.
  6. In the Network options section, choose the VPC, subnet, and security groups of your Amazon Redshift cluster.
  7. Choose Create connection.

Create datasets

To create the datasets, complete the following steps:

  1. On the Datasets page of the DataBrew console, choose Connect new dataset.
  2. For Dataset name, enter a name (for example, student).
  3. For Your JDBC source, choose the connection you created (AwsGlueDatabrew-student-db-connection).
  4. Select the study_details table.
  5. For Enter S3 destination, enter an S3 bucket for Amazon Redshift to store the intermediate result.
  6. Choose Create dataset.

You can also configure a lifecycle rule to automatically clean up old files from the S3 bucket.

Create a project using the datasets

To create your DataBrew project, complete the following steps:

  1. On the DataBrew console, on the Projects page, choose Create project.
  2. For Project Name, enter student-proj.
  3. For Attached recipe, choose Create new recipe.

The recipe name is populated automatically.

  1. For Select a dataset, select My datasets.
  2. Select the student dataset.
  3. For Role name, choose the AWS Identity and Access Management (IAM) role to be used with DataBrew.
  4. Choose Create project.

You can see a success message along with our Amazon Redshift study_details table with 500 rows.

After the project is opened, a DataBrew interactive session is created. DataBrew retrieves sample data based on your sampling configuration selection.

Create a profiling job

DataBrew helps you evaluate the quality of your data by profiling it to understand data patterns and detect anomalies.

To create your profiling job, complete the following steps:

  1. On the DataBrew console, choose Jobs in the navigation pane.
  2. On the Profile jobs tab, choose Create job.
  3. For Job name, enter student-profile-job.
  4. Choose the student dataset.
  5. Provide the S3 location for job output.
  6. For Role name, choose the role to be used with DataBrew.
  7. Choose Create and run job.

Wait for the job to complete.

  1. Choose the Columns statistics tab.

You can see that the age column has some missing values.

You can also see that the study_time_in_hr column has two outliers.

Build a transformation recipe

All ML algorithms use input data to generate outputs. Input data comprises features usually in structured columns. To work properly, the features need to have specific characteristics. This is where feature engineering comes in. In this section, we perform some feature engineering techniques to prepare our dataset to build the model in SageMaker.

Let’s drop the unnecessary columns from our dataset that aren’t required for model building.

  1. Choose Column and choose Delete.
  2. For Source columns, choose the columns school_name, first_name, and last_name.
  3. Choose Apply.

We know from the profiling report that the age value is missing in two records. Let’s fill in the missing value with the median age of other records.

  1. Choose Missing and choose Fill with numeric aggregate.
  2. For Source column, choose age.
  3. For Numeric aggregate, choose Median.
  4. For Apply transform to, select All rows.
  5. Choose Apply.

We know from the profiling report that the study_time_in_hr column has two outliers, which we can remove.

  1. Choose Outliers and choose Remove outliers.
  2. For Source column, choose study_time_in_hr.
  3. Select Z-score outliers.
  4. For Standard deviation threshold, choose 3.
  5. Select Remove outliers.
  6. Under Remove outliers, select All outliers.
  7. Under Outlier removal options¸ select Delete outliers.
  8. Choose Apply.
  9. Choose Delete rows and click Apply.

The next step is to convert the categorical value to a numerical value for the gender column.

  1. Choose Mapping and choose Categorical mapping.
  2. For Source column, choose gender.
  3. For Mapping options, select Map top 1 values.
  4. For Map values, select Map values to numeric values.
  5. For M, choose 1.
  6. For Others, choose 2.
  7. For Destination column, enter gender_mapped.
  8. For Apply transform to, select All rows.
  9. Choose Apply.

ML algorithms often can’t work on label data directly, requiring the input variables to be numeric. One-hot encoding is one technique that converts categorical data that doesn’t have an ordinal relationship with each other to numeric data.

To apply one-hot encoding, complete the following steps:

  1. Choose Encode and choose One-hot encode column.
  2. For Source column, choose health.
  3. For Apply transform to, select All rows.
  4. Choose Apply.

The following screenshot shows the full recipe that we applied to our dataset before we can use it to build our model in SageMaker.

Run the DataBrew recipe job on the full data

Now that we have built the recipe, we can create and run a DataBrew recipe job.

  1. On the project details page, choose Create job.
  2. For Job name¸ enter student-performance.

We use CSV as the output format.

  1. For File type, choose CSV.
  2. For Role name, choose an existing role or create a new one.
  3. Choose Create and run job.
  4. Navigate to the Jobs page and wait for the student-performance job to complete.
  5. Choose the Destination link to navigate to Amazon S3 to access the job output.

Clean up

Delete the following resources that might accrue cost over time:

  • The Amazon Redshift cluster
  • The recipe job student-performance
  • The job output stored in your S3 bucket
  • The IAM roles created as part of projects and jobs
  • The DataBrew project student-proj and its associated recipe student-proj-recipe
  • The DataBrew datasets

Conclusion

In this post, we saw how to create a JDBC connection for an Amazon Redshift data warehouse. We learned how to use this connection to create a DataBrew dataset for an Amazon Redshift table. We also saw how easily we can bring data from Amazon Redshift into DataBrew, seamlessly apply transformations and feature engineering techniques, and run recipe jobs that refresh the transformed data for ML model building in SageMaker.


About the Author

Dhiraj Thakur is a Solutions Architect with Amazon Web Services. He works with AWS customers and partners to provide guidance on enterprise cloud adoption, migration, and strategy. He is passionate about technology and enjoys building and experimenting in the analytics and AI/ML space.

 

 

 

Build a real-time streaming analytics pipeline with the AWS CDK

Post Syndicated from Cody Penta original https://aws.amazon.com/blogs/big-data/build-a-real-time-streaming-analytics-pipeline-with-the-aws-cdk/

A recurring business problem is achieving the ability to capture data in near-real time to act upon any significant event close to the moment it happens. For example, you may want to tap into a data stream and monitor any anomalies that need to be addressed immediately rather than during a nightly batch. Building these types of solutions from scratch can be complex; you’re dealing with configuring a cluster of nodes as well as onboarding the application to utilize the cluster. Not only that, but maintaining, patching, and upgrading these clusters takes valuable time and effort away from business-impacting goals.

In this post, we look at how we can use managed services such as Amazon Kinesis to handle our incoming data streams while AWS handles the undifferentiated heavy lifting of managing the infrastructure, and how we can use the AWS Cloud Development Kit (AWS CDK) to provision, build, and reason about our infrastructure.

Overview of architecture

The following diagram illustrates our real-time streaming data analytics architecture.

real-time streaming data analytics architecture

This architecture has two main modules, a hot and a cold module, both of which build off an Amazon Kinesis Data Streams stream that receives end-user transactions. Our hot module has an Amazon Kinesis Data Analytics app listening in on the stream for any abnormally high values. If an anomaly is detected, Kinesis Data Analytics invokes our AWS Lambda function with the abnormal payload. The function fans out the payload to Amazon Simple Notification Service (Amazon SNS), which notifies anybody subscribed, and stores the abnormal payload into Amazon DynamoDB for later analysis by a custom web application.

Our cold module has an Amazon Kinesis Data Firehose delivery stream that reads the raw data off of our stream, compresses it, and stores it in Amazon Simple Storage Service (Amazon S3) to later run complex analytical queries against our raw data. We use the higher-level abstractions that the AWS CDK provides to help onboard and provision the necessary infrastructure to start processing the stream.

Before we begin, a quick note about the levels of abstraction the AWS CDK provides. The AWS CDK revolves around a fundamental building block called a construct. These constructs have three abstraction levels:

  • L1 – A one-to-one mapping to AWS CloudFormation
  • L2 – An intent-based API
  • L3 – A high-level pattern

You can mix these levels of abstractions, as we see in the upcoming code.

Solution overview

We can accomplish this architecture with a series of brief steps:

  1. Start a new AWS CDK project.
  2. Provision a root Kinesis data stream.
  3. Construct our cold module with Kinesis Data Firehose and Amazon S3.
  4. Construct our hot module with Kinesis Data Analytics, Lambda, and Amazon SNS.
  5. Test the application’s functionality.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Start a new AWS CDK project

We can bootstrap an AWS CDK project by installing the AWS CDK CLI tool through our preferred node dependency manager.

  1. In your preferred terminal, install the AWS CDK CLI tool.
    1. For npm, run npm install -g aws-cdk.
    2. For Yarn, run yarn global add aws-cdk .
  2. Make a project directory with mkdir <project-name>.
  3. Move into the project with cd <project-name>.
  4. With the CLI tool installed, run cdk init app --language [javascript|typescript|java|python|csharp|go].
    1. For this post, we use Typescript (cdk init app --language typescript).

Following these steps builds the initial structure of your AWS CDK project.

Provision a root Kinesis data stream

Let’s get started with the root stream. We use this stream as a baseline to build our hot and cold modules. For this root stream, we use Kinesis Data Streams because it provides us the capability to capture, process, and store data streams in a reliable and scalable manner. Making this stream with the AWS CDK is quite easy. It’s one line of code:

    // rootStream is a raw kinesis stream in which we build other modules on top of.
    const rootStream = new kinesis.Stream(this, 'RootStream')

It’s only one line because the AWS CDK has a concept of sensible defaults. If we want to override these defaults, we explicitly pass in a third argument, commonly known as props:

    // rootStream is a raw kinesis stream in which we build other modules on top of.
    const rootStream = new kinesis.Stream(this, 'RootStream', {
      encryption: StreamEncryption.KMS
    })

Construct a cold module with Kinesis Data Firehose and Amazon S3

Now that our data stream is defined, we can work on our first objective, the cold module. This module intends to capture, buffer, and compress the raw data flowing through this data stream into an S3 bucket. Putting the raw data in Amazon S3 allows us to run a plethora of analytical tools on top of it to build data visualization dashboards or run ad hoc queries.

We use Kinesis Data Firehose to buffer, compress, and load data streams into Amazon S3, which serves as a data store to persist streaming data for later analysis.

In the following AWS CDK code, we plug in Kinesis Data Firehose to our stream and configure it appropriately to load data into Amazon S3. One crucial prerequisite we need to address is that services don’t talk to each other without explicit permission. So, we have to first define the IAM roles our services assume to communicate with each other along with the destination S3 bucket.

    // S3 bucket that will serve as the destination for our raw compressed data
    const rawDataBucket = new s3.Bucket(this, "RawDataBucket", {
      removalPolicy: cdk.RemovalPolicy.DESTROY, // REMOVE FOR PRODUCTION
      autoDeleteObjects: true, // REMOVE FOR PRODUCTION
    })

    const firehoseRole = new iam.Role(this, 'firehoseRole', {
        assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com')
    });

    rootStream.grantRead(firehoseRole)
    rootStream.grant(firehoseRole, 'kinesis:DescribeStream')
    rawDataBucket.grantWrite(firehoseRole)

iam.Role is an L2 construct with a higher-level concept of grants. Grants abstract IAM policies to simple read and write mechanisms with the ability to add individual actions, such as kinesis:DescribeStream, if the default read permissions aren’t enough. The grant family of functions allows us to strike a delicate balance between least privilege and code maintainability. Now that we have the appropriate permission, let’s define our Kinesis Data Firehose delivery stream.

By default, the AWS CDK tries to protect you from deleting valuable data stored in Amazon S3. For development and POC purposes, we override the default with cdk.RemovalPolicy.DESTROY to appropriately clean up leftover S3 buckets:

    const firehoseStreamToS3 = new kinesisfirehose.CfnDeliveryStream(this, "FirehoseStreamToS3", {
      deliveryStreamName: "StreamRawToS3",
      deliveryStreamType: "KinesisStreamAsSource",
      kinesisStreamSourceConfiguration: {
        kinesisStreamArn: rootStream.streamArn,
        roleArn: firehoseRole.roleArn
      },
      s3DestinationConfiguration: {
        bucketArn: rawDataBucket.bucketArn,
        bufferingHints: {
          sizeInMBs: 64,
          intervalInSeconds: 60
        },
        compressionFormat: "GZIP",
        encryptionConfiguration: {
          noEncryptionConfig: "NoEncryption"
        },
    
        prefix: "raw/",
        roleArn: firehoseRole.roleArn
      },
    })

    // Ensures our role is created before we try to create a Kinesis Firehose
    firehoseStreamToS3.node.addDependency(firehoseRole)

The Cfn prefix is a good indication that we’re working with an L1 construct (direct mapping to AWS CloudFormation). Because we’re working at a lower-level API, we should be aware of the following:

  • It’s lengthier because there’s no such thing as sensible defaults
  • We’re passing in Amazon Resource Names (ARNs) instead of resources themselves
  • We have to ensure resources provision in the proper order, hence the addDependency() function call

Because of the differences between working with L1 and L2 constructs, it’s best to minimize interactions between them to avoid confusion. One way of doing so is defining an L2 construct yourself, if the project timeline allows it. A template can be found on GitHub.

A general guideline for being explicit about what construct depends on others, like the preceding example, is to recognize where you ask for ARNs. ARNs are only available after a resource is provisioned. Therefore, you need to ensure that resource is created before using it elsewhere.

That’s it! We’ve constructed our cold pipeline! Now let’s work on the hot module.

Construct a hot module with Kinesis Data Analytics, Amazon SNS, Lambda, and DynamoDB

In the previous section, we constructed a cold pipeline to capture the raw data in its entirety for ad hoc visualizations and analytics. The purpose of the hot module is to listen to the data stream for any abnormal values as data flows through it. If an odd value is detected, we should log it and alert stakeholders. For our use case, we define “abnormal” as an unusually high transaction (over 9000).

Databases, and often what appears at the end of architecture diagrams, usually appear first in AWS CDK code. It allows upstream components to reference downstream values. For example, the database’s name is needed first before we can provision a Lambda function that interacts with that database.

Let’s start provisioning the web app, DynamoDB table, SNS topic, and Lambda function:

    // The DynamoDB table that stores anomalies detected by our kinesis analytic app
    const abnormalsTable = new dynamodb.Table(this, 'AbnormalityTable', {
      partitionKey: { name: 'transactionId', type: dynamodb.AttributeType.STRING },
      sortKey: { name: 'createdAt', type: dynamodb.AttributeType.STRING },
      removalPolicy: cdk.RemovalPolicy.DESTROY // REMOVE FOR PRODUCTION
    })

    // TableViewer is a high level demo construct of a web app that will read and display values from DynamoDB
    const tableViewer = new TableViewer(this, 'TableViewer', {
      title: "Real Time High Transaction Table",
      table: abnormalsTable,
      sortBy: "-createdAt"
    })

    // SNS Topic that alerts anyone subscribed to an anomaly detected by the kinesis analytic application 
    const abnormalNotificationTopic = new sns.Topic(this, 'AbnormalNotification', {
      displayName: 'Abnormal detected topic'
    });
    abnormalNotificationTopic.addSubscription(new snssub.EmailSubscription('[email protected]'))

    // Lambda function that reads output from our kinesis analytic app and fans out to the above SNS and DynamoDB table
    const fanoutLambda = new lambda.Function(this, "LambdaFanoutFunction", {
      runtime: lambda.Runtime.PYTHON_3_8,
      handler: 'fanout.handler',
      code: lambda.Code.fromAsset('lib/src'),
      environment: {
        TABLE_NAME: abnormalsTable.tableName,
        TOPIC_ARN: abnormalNotificationTopic.topicArn
      }
    })
    abnormalNotificationTopic.grantPublish(fanoutLambda)
    abnormalsTable.grantReadWriteData(fanoutLambda)

In the preceding code, we define our DynamoDB table to store the entire abnormal transaction and a table viewer construct that reads our table and creates a public web app for end-users to consume. We also want to alert operators when an abnormality is detected. We can do this by constructing an SNS topic with a subscription to [email protected]. This email could be your team’s distro. Lastly, we define a Lambda function that serves as the glue between our upcoming Kinesis Data Analytics application, the DynamoDB table and SNS topic. The following is the actual code inside the Lambda function:

"""fanout.py reads from kinesis analytic output and fans out to SNS and DynamoDB."""

import base64
import json
import os
from datetime import datetime

import boto3

ddb = boto3.resource("dynamodb")
sns = boto3.client("sns")


def handler(event, context):
    payload = event["records"][0]["data"]
    data_dump = base64.b64decode(payload).decode("utf-8")
    data = json.loads(data_dump)

    table = ddb.Table(os.environ["TABLE_NAME"])

    item = {
        "transactionId": data["transactionId"],
        "name": data["name"],
        "city": data["city"],
        "transaction": data["transaction"],
        "bankId": data["bankId"],
        "createdAt": data["createdAt"],
        "customEnrichment": data["transaction"] + 500,  # Everyone gets an extra $500 woot woot
        "inspectedAt": str(datetime.now())
    }

    # Best effort, Kinesis Analytics Output is "at least once" delivery, meaning this lambda function can be invoked multiple times with the same item
    # We can ensure idempotency with a condition expression
    table.put_item(
        Item=item,
        ConditionExpression="attribute_not_exists(inspectedAt)"
    )
    sns.publish(
        TopicArn=os.environ["TOPIC_ARN"],
        Message=json.dumps(item)
    )

    return {"statusCode": 200, "body": json.dumps(item)}

We don’t have much to code when using managed services such as Kinesis Data Streams, Kinesis Data Firehose, Kinesis Data Analytics, and Amazon SNS. In this case, we take the output of our Kinesis Data Analytics application and simply copy it to our DynamoDB table, and publish a message to our SNS topic. Both follow the standard pattern of initiating a client and calling the appropriate API with the payload. Speaking of the payload, let’s move upstream to the actual Kinesis Data Analytics app. See the following code:

const streamToAnalyticsRole = new iam.Role(this, 'streamToAnalyticsRole', {
      assumedBy: new iam.ServicePrincipal('kinesisanalytics.amazonaws.com')
    });

    streamToAnalyticsRole.addToPolicy(new iam.PolicyStatement({
      resources: [
        fanoutLambda.functionArn,
        rootStream.streamArn,
      ],
      actions: ['kinesis:*', 'lambda:*'] 
    }));

Although we can use the grant* API in this scenario, we wanted to show the alternative if you come across an L2 construct that doesn’t have the grant feature yet. The addToPolicy() function is more familiar to those who had worked with IAM before, where you define what resources and what actions on those resources you wish for whoever is the trusted entity to take. As for the actual Kinesis Data Analytics application, see the following code:

const thresholdDetector = new kinesisanalytics.CfnApplication(this, "KinesisAnalyticsApplication", {
      applicationName: 'abnormality-detector',
      applicationCode: fs.readFileSync(path.join(__dirname, 'src/app.sql')).toString(),
      inputs: [
        {
          namePrefix: "SOURCE_SQL_STREAM",
          kinesisStreamsInput: {
            resourceArn: rootStream.streamArn,
            roleArn: streamToAnalyticsRole.roleArn
          },
          inputParallelism: { count: 1 },
          inputSchema: {
            recordFormat: {
              recordFormatType: "JSON",
              mappingParameters: { jsonMappingParameters: { recordRowPath: "$" } }
            },
            recordEncoding: "UTF-8",
            recordColumns: [
              {
                name: "transactionId",
                mapping: "$.transactionId",
                sqlType: "VARCHAR(64)"
              },
              {
                  name: "name",
                  mapping: "$.name",
                  sqlType: "VARCHAR(64)"
              },
              {
                  name: "age",
                  mapping: "$.age",
                  sqlType: "INTEGER"
              },
              {
                  name: "address",
                  mapping: "$.address",
                  sqlType: "VARCHAR(256)"
              },
              {
                  name: "city",
                  mapping: "$.city",
                  sqlType: "VARCHAR(32)"
              },
              {
                  name: "state",
                  mapping: "$.state",
                  sqlType: "VARCHAR(32)"
              },
              {
                  name: "transaction",
                  mapping: "$.transaction",
                  sqlType: "INTEGER"
              },
              {
                  name: "bankId",
                  mapping: "$.bankId",
                  sqlType: "VARCHAR(32)"
              },
              {
                  name: "createdAt",
                  mapping: "$.createdAt",
                  sqlType: "VARCHAR(32)"
              }
            ]
          }
        }
      ]
    })
    thresholdDetector.node.addDependency(streamToAnalyticsRole)


    const thresholdDetectorOutput = new kinesisanalytics.CfnApplicationOutput(this, 'AnalyticsAppOutput', {
      applicationName: 'abnormality-detector',
      output: {
        name: "DESTINATION_SQL_STREAM",
        lambdaOutput: {
          resourceArn: fanoutLambda.functionArn,
          roleArn: streamToAnalyticsRole.roleArn
        },
        destinationSchema: {
          recordFormatType: "JSON"
        }
      }
    })
    thresholdDetectorOutput.node.addDependency(thresholdDetector)

The AWS CDK code is similar to the way we defined our Kinesis Data Firehose delivery stream because both CfnApplication and CfnApplicationOutput are L1 constructs. There is one subtle difference here and a core benefit of using the AWS CDK even for L1 constructs: for application code, we can read in a file and render it as a string. This mechanism allows us to separate application code from infrastructure code vs. having both in a single CloudFormation template file. The following is the SQL code we wrote:

CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
(
    "transactionId"     varchar(64),
    "name"              varchar(64),
    "age"               integer,
    "address"           varchar(256),
    "city"              varchar(32),
    "state"             varchar(32),
    "transaction"       integer,
    "bankId"            varchar(32),
    "createdAt"         varchar(32)
);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM "transactionId", "name", "age", "address", "city", "state", "transaction", "bankId", "createdAt"
    FROM "SOURCE_SQL_STREAM_001"
    WHERE "transaction" > 9000;

That’s it! Now we move on to deployment and testing.

Deploy and test our architecture

To deploy our AWS CDK code, we can open up a terminal at the root of the AWS CDK project and run cdk deploy. The AWS CDK outputs a list of security-related changes that you can either confirm with a yes or no.

When the AWS CDK finishes deploying, it outputs the data stream name and the Amazon CloudFront URL to our web application. Open the CloudFront URL, the Amazon S3 console (specifically the bucket that our AWS CDK provisioned), and the Python file at scripts/producer.py. The following is the content of that Python file:

"""Producer produces fake data to be inputted into a Kinesis stream."""

import json
import time
import uuid
import random
from datetime import datetime
from pprint import pprint

import boto3

from faker import Faker

# This boots up the kinesis analytic application so you don't have to click "run" on the kinesis analytics console
try:
    kinesisanalytics = boto3.client("kinesisanalyticsv2", region_name="us-east-1")
    kinesisanalytics.start_application(
        ApplicationName="abnormality-detector",
        RunConfiguration={
            'SqlRunConfigurations': [
                {
                    'InputId': '1.1',
                    'InputStartingPositionConfiguration': {
                        'InputStartingPosition': 'NOW'
                    }
                },
            ]
        }
    )
    print("Giving 30 seconds for the kinesis analytics application to boot")
    time.sleep(30)
except kinesisanalytics.exceptions.ResourceInUseException:
    print("Application already running, skipping start up step")

rootSteamName = input("Please enter the stream name that was outputted from cdk deploy - (StreamingSolutionWithCdkStack.RootStreamName): ")
kinesis = boto3.client("kinesis", region_name="us-east-1")
fake = Faker()

# Base table, GUID with transaction key, GSI with a bank id (of 5 notes) pick one of the five bank IDs. Group by bank ID. sorted by etc

banks = []
for _ in range(10):
    banks.append(fake.swift())

while True:
    payload = {
        "transactionId": str(uuid.uuid4()),
        "name": fake.name(),
        "age": fake.random_int(min=18, max=85, step=1),
        "address": fake.address(),
        "city": fake.city(),
        "state": fake.state(),
        "transaction": fake.random_int(min=1000, max=10000, step=1),
        "bankId": banks[random.randrange(0, len(banks))],
        "createdAt": str(datetime.now()),
    }
    response = kinesis.put_record(
        StreamName=rootSteamName, Data=json.dumps(payload), PartitionKey="abc"
    )
    pprint(response)
    time.sleep(1)

The Python script is relatively rudimentary. We take the data stream name as input, construct a Kinesis client, construct a random but realistic payload using the popular faker library, and send that payload to our data stream.

We can run this script by running Python scripts/producer.py. It boots up our Kinesis Data Analytics application if it hasn’t started already and prompts you for the data stream name. After you enter the name and press Enter, you should start seeing Kinesis’s responses in your terminal.

Make sure to use python3 instead of python if your default Python command defaults to version 2. You can check your version by entering python --version in your terminal.

Leave the script running until it randomly generates a couple of high transactions. After they’re generated, you can visit the web app’s URL and see table entries for all anomalies there (as in the following screenshot).

By this time, Kinesis Data Firehose has buffered and compressed raw data from the stream and put it in Amazon S3. You can visit your S3 bucket and see your data landing inside the destination path.

Clean up

To clean up any provisioned resources, you can run cdk destroy inside the AWS CDK project and confirm the deletion, and the AWS CDK takes care of cleaning up all the resources.

Conclusion

In this post, we built a real-time application with a secondary cold path that gathers raw data for ad hoc analysis. We used the AWS CDK to provision the core managed services that handle the undifferentiated heavy lifting of a real-time streaming application. We then layered our custom application code on top of this infrastructure to meet our specific needs and tested the flow from end to end.

We covered key code snippets in this post, but if you’d like to see the project in its entirety and deploy the solution yourself, you can visit the AWS GitHub samples repo .


About the Authors

Cody Penta is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in security and CDK and enjoys solving the really difficult problems in the technology world. Off the clock, he loves relaxing in the mountains, coding personal projects, and gaming.

 

 

 

Michael Hamilton is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife, kids, and their German shepherd.

 

Effective data lakes using AWS Lake Formation, Part 4: Implementing cell-level and row-level security

Post Syndicated from Deenbandhu Prasad original https://aws.amazon.com/blogs/big-data/part-4-effective-data-lakes-using-aws-lake-formation-part-4-implementing-cell-level-and-row-level-security/

We announced the preview of AWS Lake Formation transactions, cell-level and row-level security, and acceleration at AWS re: Invent 2020. In Parts 1 , 2, and 3 of this series, we explained how to set up governed tables, add streaming and batch data to them, and use ACID transactions. In this post, we focus on cell-level and row-level security and show you how to enforce business needs by restricting access to specific rows.

Effective data lakes using AWS Lake Formation

The goal of modern data lakes is to democratize access to broad datasets to empower data analysts and business users. In these scenarios, data lake security becomes more important than ever. Enterprises want to share their data across groups, departments, and organizations, while balancing their compliance and security needs. A common paradigm used by many enterprises is to restrict data access to limit scope based on the user profile or the organizations to which they belong. Previously, you had to enforce this by duplicating the original data or creating materialized and non-materialized views of the data based on filtered datasets. However, these solutions often break the concept of a single source of truth and result in write amplification, which doubles or triples storage. The large number of copies required also increases the management effort required due to their complexity.

Lake Formation supports simple row-level security and cell-level security:

  • Basic row-level security allows you to specify filter expressions that limit access to specific rows of a table to a user.
  • Cell-level security builds on row-level security by allowing you to hide or show specific columns along with providing access to specific rows.

In this post, we consider an example use case in which Company A does business across the United States and Canada. They have built an enterprise customer data hub on Amazon Simple Storage Service (Amazon S3), which sources customers from both countries into this central location for company-wide analytics and marketing.

The marketing department of Company A is responsible for organizing promotion campaigns and developing communications content to promote services and product to prospects. The team consumes the data from this enterprise customer data hub to create the targeted campaign.

The marketing team data analysts are divided by country, and the requirement is to restrict analyst access to customer data from their country—analysts from the United States can see only customers from the United States, and analysts from Canada can only access customers from Canada. Additionally, analysts from Canada aren’t allowed to see the date of birth (DoB column) due to local company policy (this restriction is an example of cell-level security).

The dataset used for this post is synthetically generated; the following screenshot shows an example of the data.

Solution overview

Let’s see how you can use the Lake Formation row-level security feature to enforce Company A’s requirements on the data in the S3 data lake. You can apply row-level security to a governed table or to a standard table in Lake Formation. In this post, you apply row-level security on a standard Lake Formation table; you can follow a similar process for a governed table.

We walk through the following high-level steps:

  1. Create a database (lf_rls_blog) and table (customer).
  2. Grant select (row and column) permissions to the users lf-rls-blog-analyst-us and lf-rls-blog-analyst-ca.
  3. Run queries in Amazon Athena as the US and Canada analysts to verify that you only see rows from the user’s respective country (and the appropriate columns).

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • AWS Identity and Access Management (IAM) users, roles, and policies:
    • The three users include lf-rls-blog-manager (data lake administrator), lf-rls-blog-analyst-us (US data analyst), and lf-rls-blog-analyst-ca (Canada data analyst)
  • Lake Formation data lake settings and resources:
    • This includes an S3 bucket with the prefix lf-rowlevel-security-blog-* and the customer data files used in this post

As of this writing, these Lake Formation preview features are available only in us-east-1 and us-west-2. When following the steps in this post, use Region us-east-1. Check the availability of the features in other Regions in the future.

To create your resources, complete the following steps:

  1. Sign in to the AWS CloudFormation console in the us-east-1 Region.
  2. Choose Launch Stack:
  3. Create passwords for the three users.
  4. Review the details on the page and select I acknowledge that AWS CloudFormation might create IAM resources.
  5. Choose Create stack.

Create a database and table

To create your database and table, complete the following steps:

  1. Sign in to the AWS Management Console as the data lake administrator (lf-rls-blog-manager).
  2. On the Lake Formation console, choose Databases in the navigation pane.
  3. Choose Create database.
  4. For Name, enter lf_rls_blog.
  5. If Use only IAM access control for new tables in this database is selected, uncheck it.
  6. Choose Create database.

Next, you create a new data lake table.

  1. In the navigation pane, choose Tables.
  2. Choose Create table.
  3. For Name, enter customer.
  4. For Database, choose the database you just created (lf_rls_blog).
  5. Browse to the customers folder under datafiles in the S3 bucket (starting with lf-rowlevel-security-blog-*) created by the CloudFormation template.
  6. For Classification, select CSV.
  7. For Delimiter, choose Comma:
  8. Choose Upload Schema.
  9. Enter the following JSON code:
    [
    {
    "Name": "customer_id",
    "Type": "bigint"
    },
    {
    "Name": "prefix",
    "Type": "string"
    },
    {
    "Name": "first_Name",
    "Type": "string"
    },
    {
    "Name": "middle_Name",
    "Type": "string"
    },
    {
    "Name": "last_Name",
    "Type": "string"
    },
    {
    "Name": "suffix",
    "Type": "string"
    },
    {
    "Name": "gender",
    "Type": "string"
    },
    {
    "Name": "dob",
    "Type": "string"
    },
    {
    "Name": "phone",
    "Type": "string"
    },
    {
    "Name": "building_number",
    "Type": "bigint"
    },
    {
    "Name": "street_Name",
    "Type": "string"
    },
    {
    "Name": "city",
    "Type": "string"
    },
    {
    "Name": "cust_zip",
    "Type": "bigint"
    },
    {
    "Name": "country",
    "Type": "string"
    }
    ]
    

  10. Choose Upload.
  11. Choose Submit.

Create data filters and grant permissions

To implement column-level, row-level, and cell-level security, first you create data filters. Then you choose that data filter while granting the select Lake Formation permission on tables. For this use case, you create two data filters: one for the US data analyst and one for the Canada data analyst.

Permitted filter expressions are predicates that obey a subset of the WHERE clause grammar in PartiQL. You can use comparison operators to compare columns with constants. The following are the supported operators:

  • Comparison operators – =, >, <, >=, <=, <>, BETWEEN, IN, LIKE
  • Logical operators – AND, OR

Let’s first create the data filter for the US analyst.

  1. On the Lake Formation console, choose Data filters in the navigation pane.
  2. Choose Create new filter.
  3. For Data filter name, enter US Filter.
  4. For Target database, choose the lf_rls_blog database.
  5. For Target table, choose the customer table.
  6. For Column-level access, select Access to all columns.
  7. For Row filter expression, enter country='US'.
  8. Choose Create filter.

The US analyst has access to all the columns of US customers only.

Now let’s create a data filter for the Canada analyst.

  1. On the Data filters page, choose Create new filter.
  2. For Data filter name, enter Canada Filter.
  3. For Target database, choose the lf_rls_blog database.
  4. For Target table, choose the customer table.
  5. For Column-level access, select Exclude columns.
  6. For Select columns, choose the dob column.
  7. For Row filter expression, enter country='Canada'.
  8. Choose Create filter.

The Canada analyst now has access to all the columns except dob (date of birth) of Canadian customers only.

Verify both data filters are created by checking the Data filters page.

Now we can grant table and column permissions.

  1. On the Tables page, select the customer table.
  2. On the Actions menu, choose Grant.
  1. For IAM users and roles, choose lf-rls-blog-analyst-us.
  2. Choose Named data catalog resources
  3. For Databases, choose lf_rls_blog.
  4. For Tables, choose customer.
  5. For Table and column permissions, choose Select.
  6. Under Data permissions¸ select Advanced cell-level filters.
  7. Select US Filter.
  8. Choose Grant.
  9. Repeat these steps for the lf-rls-blog-analyst-ca user, choosing the lf_rls_blog database and customer table and granting Select permissions.
  10. Select Advanced cell-level filters.
  11. In the Data permissions section, select Canada Filter.
  12. Choose Grant.

Run queries to test permission levels

To utilize Lake Formation preview features in Athena, you need to create a new workgroup named AmazonAthenaLakeFormationPreview and switch to that workgroup before running queries. For more information, see Managing Workgroups. Additionally, for preview you use the lakeformation qualifier for the database and table name, as shown in the following example:

select * from lakeformation.<databasename>.<tablename>

Lake Formation implicitly grants all permissions to the table creator. In this use case, lf-rls-blog-manager has SELECT permissions on all rows and columns of the customer table. Let’s first verify permissions for lf-rls-blog-manager by querying the customer table using Athena.

  1. On the Athena console (in Region us-east-1), open the query editor.
  2. Choose set up a query result location in Amazon S3.
  3. Navigate to the S3 bucket starting with lf-rowlevel-security-blog-* and select the folder anthenaqueryresults.
  4. Choose Save.
  5. In the query editor, for Data source, choose AWSDataCatalog.
  6. For Database, choose lf_rls_blog.
  7. Create and switch to the AmazonAthenaLakeFormationPreview workgroup.

You can see the customer table under Tables.

  1. Enter the following query:
    SELECT * FROM lakeformation."lf_rls_blog"."customer"

  2. Choose Run query.

Rows from both countries are displayed to the lf-rls-blog-manager users.

Next, lets verify permission of the lf-rls-blog-analyst-us and lf-rls-blog-analyst-ca users on this table.

  1. Sign in to the console as lf-rls-blog-analyst-us.
  2. Repeat the previous steps on the Athena console (in us-east-1) to set up the query result location.
  3. Switch to the AmazonAthenaLakeFormationPreview workgroup.
  4. Run the following query:
    SELECT * FROM lakeformation."lf_rls_blog"."customer"

Only US customers are shown in the result for the US data analyst.

Now you verify the same for the Canada data analyst.

  1. Sign in to the console as lf-rls-blog-analyst-ca.
  2. Repeat the previous steps on the Athena console (in us-east-1) to set up the query result location.
  3. Switch to AmazonAthenaLakeFormationPreview workgroup.
  4. Run the following query:
    SELECT * FROM lakeformation."lf_rls_blog"."customer"

Only customers from Canada are visible to the Canada data analyst. Furthermore, the lf-rls-blog-analyst-ca user doesn’t have access to the dob column.

Clean up

For the final step, clean up the resources you created:

  1. Sign in to the console as lf-rls-blog-manager.
  2. On the Lake Formation console, choose Databases in the navigation pane.
  3. Select the database lf_rls_blog.
  4. On the Action menu, choose Delete.
  5. Delete anthenaqueryresults folder from the Amazon S3 bucket with prefix lf-rowlevel-security-blog-*.
  6. Login as the user that launched the CloudFormation stack in this post.
  7. On the AWS CloudFormation console, delete the stack lf-rowlevel-security-blog.

When you delete the stack, the resources it created are automatically deleted.

Conclusion

In this post, you learned how to implement fine-grained access control on a data lake table using the new row-level security feature of Lake Formation. This feature allows you to easily enforce privacy regulations or corporate governance data access rules on your data lake.

Learn more about Lake Formation and start using these features to build and secure your data lake on Amazon S3 using Lake Formation today, and leave your thoughts and questions in the comments.


About the Authors

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data platforms on the AWS Cloud. Deenbandhu has helped customers of all sizes implement master data management, data warehouse, and data lake solutions.

 

 

 

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue and AWS Lake Formation. His passion is for implementing software artifacts for building data lakes more effectively and easily. During his spare time, he loves to spend time with his family, especially hunting bugs—not software bugs, but bugs like butterflies, pill bugs, snails, and grasshoppers.

 

Work with semistructured data using Amazon Redshift SUPER

Post Syndicated from Satish Sathiya original https://aws.amazon.com/blogs/big-data/work-with-semistructured-data-using-amazon-redshift-super/

With the new SUPER data type and the PartiQL language, Amazon Redshift expands data warehouse capabilities to natively ingest, store, transform, and analyze semi-structured data. Semi-structured data (such as weblogs and sensor data) fall under the category of data that doesn’t conform to a rigid schema expected in relational databases. It often contain complex values such as arrays and nested structures that are associated with serialization formats, such as JSON.

The schema of the JSON can evolve over time according to the business use case. Traditional SQL users who are experienced in handling structured data often find it challenging to deal with semi-structured data sources such as nested JSON documents due to lack of SQL support, the need to learn multiple complex functions, and the need to use third-party tools.

This post is part of a series that talks about ingesting and querying semi-structured data in Amazon Redshift using the SUPER data type.

With the introduction of the SUPER data type, Amazon Redshift provides a rapid and flexible way to ingest JSON data and query it without the need to impose a schema. This means that you don’t need to worry about the schema of the incoming document, and can load it directly into Amazon Redshift without any ETL to flatten the data. The SUPER data type is stored in an efficient binary encoded Amazon Redshift native format.

The SUPER data type can represent the following types of data:

  • An Amazon Redshift scalar value:
    • A null
    • A Boolean
    • Amazon Redshift numbers, such as SMALLINT, INTEGER, BIGINT, DECIMAL, or floating point (such as FLOAT4 or FLOAT8)
    • Amazon Redshift string values, such as VARCHAR and CHAR
  • Complex values:
    • An array of values, including scalar or complex
    • A structure, also known as tuple or object, that is a map of attribute names and values (scalar or complex)

For more information about the SUPER type, see Ingesting and querying semistructured data in Amazon Redshift.

After the semi-structured and nested data is loaded into the SUPER data type, you can run queries on it by using the PartiQL extension of SQL. PartiQL is backward-compatible to SQL. It enables seamless querying of semi-structured and structured data and is used by multiple AWS services, such as Amazon DynamoDB, Amazon Quantum Ledger Database (Amazon QLDB), and AWS Glue Elastic Views. With PartiQL, the query engine can work with schema-less SUPER data that originated in serialization formats, such as JSON. With the use of PartiQL, familiar SQL constructs seamlessly combine access to both the classic, tabular SQL data and the semi-structured data in SUPER. You can perform object and array navigation and also unnesting with simple and intuitive extensions to SQL semantics.

For more information about PartiQL, see Announcing PartiQL: One query language for all your data.

Use cases

The SUPER data type is useful when processing and querying semi-structured or nested data such as web logs, data from industrial Internet of Things (IoT) machines and equipment, sensors, genomics, and so on. To explain the different features and functionalities of SUPER, we use sample industrial IoT data from manufacturing.

The following diagram shows the sequence of events in which the data is generated, collected, and finally stored in Amazon Redshift as the SUPER data type.

Manufacturers are embracing the cloud solutions with connected machines and factories to transform and use data to make data-driven decisions so they can optimize operations, increase productivity, and improve availability while reducing costs.

As an example, the following diagram depicts a common asset hierarchy of a fictional smart manufacturing company.

This data is typically semi-structured and hierarchical in nature. To find insights from this data using traditional methods and tools, you need to extract, preprocess, load, and transform it into the proper structured format to run typical analytical queries using a data warehouse. The time to insight is delayed because of the initial steps required for data cleansing and transformation typically performed using third-party tools or other AWS services.

Amazon Redshift SUPER handles these use cases by helping manufacturers extract, load, and query (without any transformation) a variety of data sources collected from edge computing and industrial IoT devices. Let’s use this sample dataset to drive our examples to explain the capabilities of Amazon Redshift SUPER.

The following is an example subscription dataset for assets (such as crowder, gauge, and pneumatic cylinder) in the workshop, which collects metrics on different properties (such as air pressure, machine state, finished parts count, and failures). Data on these metrics is generated continuously in a time series fashion and pushed to the AWS Cloud using the AWS IoT SiteWise connector. This specific example collects the Machine State property for an asset. The following sample data called subscriptions.json has scalar columns like topicId and qos, and a nested array called messages, which has metrics on the assets.

The following is another dataset called asset_metadata.json, which describes different assets and their properties, more like a dimension table. In this example, the asset_name is IAH10 Line 1, which is a processing plant line for a specific site.

Load data into SUPER columns

Now that we covered the basics behind Amazon Redshift SUPER and the industrial IoT use case, let’s look at different ways to load this dataset.

Copy the JSON document into multiple SUPER data columns

In this use case for handling semi-structured data, the user knows the incoming data’s top-level schema and structure, but some of the columns are semi-structured and nested in structures or arrays. You can choose to shred a JSON document into multiple columns that can be a combination of the SUPER data type and Amazon Redshift scalar types. The following code shows what the table DDL looks like if we translate the subscriptions.json semi-structured schema into a SUPER equivalent:

CREATE TABLE subscription_auto (
topicId INT
,topicFilter VARCHAR
,qos INT
,messages SUPER
);

To load data into this table, specify the auto option along with FORMAT JSON in the COPY command to split the JSON values across multiple columns. The COPY matches the JSON attributes with column names and allows nested values, such as JSON arrays and objects, to be ingested as SUPER values. Run the following command to ingest data into the subscription_auto table. Replace the AWS Identity and Access Management (IAM) role with your own credentials. The ignorecase option passed along with auto is required only if your JSON attribute names are in CamelCase. In our case, our columns topicId and topicFilter scalar columns are in CamelCase.

COPY subscription_auto FROM 's3://redshift-downloads/semistructured/super-blog/subscriptions/' 
IAM_ROLE '<< your IAM role >>' 
FORMAT JSON 'auto ignorecase';

A select * from subscription_auto command looks like the following code. The messages SUPER column holds the entire array in this case.

-- A sample output
SELECT * FROM subscription_auto;
[ RECORD 1 ]
topicid     | 1001
topicfilter | $aws/sitewise/asset-models/+/assets/+/properties/+
qos         | 0
messages    | [{"format":"json","topic":"$aws\/sitewise\/asset-models\/8926cf44-14ea-4cd8-a7c6-e61af641dbeb\/assets\/0aaf2aa2-0299-442a-b2ea-ecf3d62f2a2c\/properties\/3ff67d41-bf69-4d57-b461-6f1513e127a4","timestamp":1616381297183,"payload":{"type":"PropertyValueUpdate","payload":{"assetId":"0aaf2aa2-0299-442a-b2ea-ecf3d62f2a2c","propertyId":"3ff67d41-bf69-4d57-b461-6f1513e127a4","values":[{"timestamp":{"timeInSeconds":1616381289,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381292,"offsetInNanos":425000000},"quality":"FAIR","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381286,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381293,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381287,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381290,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381290,"offsetInNanos":925000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381294,"offsetInNanos":425000000},"quality":"FAIR","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381285,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381288,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}}]}}}]

Alternatively, you can specify jsonpaths to load the same data, as in the following code. The jsonpaths option is helpful if you want to load only selective columns from your JSON document. For more information, see COPY from JSON format.

COPY subscription_auto FROM 's3://redshift-downloads/semistructured/super-blog/subscriptions/' 
IAM_ROLE '<< your IAM role >>' 
FORMAT JSON 's3://redshift-downloads/semistructured/super-blog/jsonpaths/subscription_jsonpaths.json';

subscription_jsonpaths.json looks like the following:

{"jsonpaths": [
    "$.topicId",
    "$.topicFilter",
    "$.qos",
    "$.messages"
   ]
}

While we’re in the section about loading, let’s also create the asset_metadata table and load it with relevant data, which we need in our later examples. The asset_metadata table has more information about industry shop floor assets and their properties like asset_name, property_name, and model_id.

CREATE TABLE asset_metadata (
asset_id VARCHAR
,asset_name VARCHAR
,asset_model_id VARCHAR
,asset_property_id VARCHAR
,asset_property_name VARCHAR
,asset_property_data_type VARCHAR
,asset_property_unit VARCHAR
,asset_property_alias VARCHAR
);

COPY asset_metadata FROM 's3://redshift-downloads/semistructured/super-blog/asset_metadata/' 
IAM_ROLE '<< your IAM role >>' 
FORMAT JSON 'auto';

Copy the JSON document into a single SUPER column

You can also load a JSON document into a single SUPER data column. This is typical when the schema of the incoming JSON is unknown and evolving. SUPER’s schema-less ingestion comes to the forefront here, letting you load the data in a flexible fashion.

For this use case, assume that we don’t know the names of the columns in subscription.json and want to load it into Amazon Redshift. It’s as simple as the following code:

CREATE TABLE subscription_noshred (s super);

After the table is created, we can use the COPY command to ingest data from Amazon Simple Storage Service (Amazon S3) into the single SUPER column. The noshred is a required option to go along with FORMAT JSON, which tells the COPY parser not to shred the document but load it into a single column.

COPY subscription_noshred FROM 's3://redshift-downloads/semistructured/super-blog/subscriptions/' 
IAM_ROLE '<< your IAM role >>' 
FORMAT JSON 'noshred';

After the COPY has successfully ingested the JSON, the subscription_noshred table has a SUPER column s that contains the data of the entire JSON object. The ingested data maintains all the properties of the JSON nested structure but in a SUPER data type.

The following code shows how select star (*) into subscription_noshred looks; the entire JSON structure is in SUPER column s:

--A sample output
SELECT * FROM subscription_noshred;
[ RECORD 1 ]
s | {"topicId":1001,"topicFilter":"$aws\/sitewise\/asset-models\/+\/assets\/+\/properties\/+","qos":0,"messages":[{"format":"json","topic":"$aws\/sitewise\/asset-models\/8926cf44-14ea-4cd8-a7c6-e61af641dbeb\/assets\/0aaf2aa2-0299-442a-b2ea-ecf3d62f2a2c\/properties\/3ff67d41-bf69-4d57-b461-6f1513e127a4","timestamp":1616381297183,"payload":{"type":"PropertyValueUpdate","payload":{"assetId":"0aaf2aa2-0299-442a-b2ea-ecf3d62f2a2c","propertyId":"3ff67d41-bf69-4d57-b461-6f1513e127a4","values":[{"timestamp":{"timeInSeconds":1616381289,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381292,"offsetInNanos":425000000},"quality":"FAIR","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381286,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381293,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381287,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381290,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381290,"offsetInNanos":925000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381294,"offsetInNanos":425000000},"quality":"FAIR","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381285,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}},{"timestamp":{"timeInSeconds":1616381288,"offsetInNanos":425000000},"quality":"GOOD","value":{"doubleValue":0}}]}}}]}

Similar to the noshred option, we can also use jsonpaths to load complete documents. This can be useful in cases where we want to extract additional columns, such as distribution and sort keys, while still loading the complete document as a SUPER column. In the following example, we map our first column to the root JSON object, while also mapping a distribution key to the second column, and a sort key to the third column.

subscription_sorted_jsonpaths.json looks like the following:

{"jsonpaths": [
"$",
"$.topicId",
"$.topicFilter"
]
}

CREATE TABLE subscription_sorted (
s super
,topicId INT
,topicFilter VARCHAR
) DISTKEY(topicFilter) SORTKEY(topicId);

COPY subscription_sorted FROM 's3://redshift-downloads/semistructured/super-blog/subscriptions/' 
IAM_ROLE '<< your IAM role >>' 
FORMAT JSON 's3://redshift-downloads/semistructured/superblog/jsonpaths/subscription_sorted_jsonpaths.json';

Copy data from columnar formats like Parquet and ORC

If your semi-structured or nested data is already available in either Apache Parquet or Apache ORC formats, you can use the COPY command with the SERIALIZETOJSON option to ingest data into Amazon Redshift. The Amazon Redshift table structure should match the number of columns and the column data types of the Parquet or ORC files. Amazon Redshift can replace any Parquet or ORC column, including structure and array types, with SUPER data columns. The following are the COPY examples to load from Parquet and ORC format:

--Parquet 
COPY subscription_auto FROM 's3://redshift-downloads/semistructured/super-blog/subscriptions_parquet/' 
IAM_ROLE '<< your IAM role >>' FORMAT PARQUET SERIALIZETOJSON;

--ORC 
COPY subscription_auto FROM 's3://redshift-downloads/semistructured/super-blog/subscriptions_orc/' 
IAM_ROLE '<< your IAM role >>' FORMAT ORC SERIALIZETOJSON;

Apart from COPY, you can load the same data in columnar format in Amazon S3 using Amazon Redshift Spectrum and the INSERT command. The following example assumes the Redshift Spectrum external schema super_workshop and the external table subscription_parquet is already created and available in the database.

We need to set an important session-level configuration parameter for this to work: SET json_serialization_enable TO true. For more information, see Serializing complex nested JSON. This session-level parameter allows you to query top-level nested collection columns as serialized JSON.

/* -- Set the GUC for JSON serialization -- */ 
SET json_serialization_enable TO true;

INSERT INTO subscription_parquet 
SELECT topicId
,topicFilter
,qos
,JSON_PARSE(messages) FROM super_workshop.subscription_parquet;

As of this writing, we can’t use the SUPER column as such as a distribution or sort key, but if we need to use one of the attributes in a SUPER column as a distribution a sort key and keep the entire SUPER column intact as a separate column, we can use the following code as a workaround (apart from the jsonpaths example described earlier). For example, let’s assume the column format within the array messages needs to be used a distribution key for the Amazon Redshift table:

SET json_serialization_enable TO true;

DROP TABLEIF EXISTS subscription_messages;

CREATE TABLE subscription_messages (
m_format VARCHAR
,m_messages super
) distkey (m_format);

--'format' scalar column is extracted from 'messages' super column 
INSERT INTO subscription_messages 
SELECT element.format::VARCHAR
,supercol FROM (
SELECT JSON_PARSE(messages) AS supercol FROM super_workshop.subscription_parquet
) AS tbl,tbl.supercol element;

Apart from using the Amazon Redshift COPY command, we can also ingest JSON-formatted data into Amazon Redshift using the traditional SQL INSERT command:

--Insert example 
INSERT INTO subscription_auto VALUES (
0000
,'topicFiltervalue'
,0000
,JSON_PARSE('[{"format":"json"},{"topic": "sample topic"},[1,2,3]]')
); 

The JSON_PARSE() function parses the incoming data in proper JSON format and helps convert it into the SUPER data type. Without the JSON_PARSE() function, Amazon Redshift treats and ingests the value as a single string into SUPER instead of a JSON-formatted value.

Query SUPER columns

Amazon Redshift uses the PartiQL language to offer SQL-compatible access to relational, semi-structured, and nested data. PartiQL’s extensions to SQL are straightforward to understand, treat nested data as first-class citizens, and seamlessly integrate with SQL. The PartiQL syntax uses dot notation and array subscript for path navigation when accessing nested data.

The Amazon Redshift implementation of PartiQL supports dynamic typing for querying semi-structured data. This enables automatic filtering, joining, and aggregation on the combination of structured, semi-structured, and nested datasets. PartiQL allows the FROM clause items to iterate over arrays and use unnest operations.

The following sections focus on different query access patterns that involve navigating the SUPER data type with path and array navigation, unnest, or joins.

Navigation

We may want to find an array element by its index, or we may want to find the positions of certain elements in their arrays, such as the first element or the last element. We can reference a specific element simply by using the index of the element within square brackets (the index is 0-based) and within that element we can reference an object or struct by using the dot notation.

Let’s use our subscription_auto table to demonstrate the examples. We want to access the first element of the array, and within that we want to know the value of the attribute format:

SELECT messages[0].format FROM subscription_auto ;
--A sample output
 format
--------
 "json"
 "json"
 "json"
(3 rows)

Amazon Redshift can also use a table alias as a prefix to the notation. The following example is the same query as the previous example:

SELECT sub.messages[0].format FROM subscription_auto AS sub;
--A sample output
 format
--------
 "json"
 "json"
 "json"
(3 rows)

You can use the dot and bracket notations in all types of queries, such as filtering, join, and aggregation. You can use them in a query in which there are normally column references. The following example uses a SELECT statement that filters results:

SELECT COUNT(*) FROM subscription_auto WHERE messages[0].format IS NOT NULL;
-- A sample output
 count
-------
    11
(1 row)

Unnesting and flattening

To unnest or flatten an array, Amazon Redshift uses the PartiQL syntax to iterate over SUPER arrays by using the FROM clause of a query. We continue with the previous example in the following code, which iterates over the array attribute messages values:

SELECT c.*, o FROM subscription_auto c, c.messages AS o WHERE o.topic='sample topic';
-- A sample output
                             messages                 |        o
------------------------------------------------------+--------------------------
[{"format":"json"},{"topic":"sample topic"},[1,2,3]]  | {"topic":"sample topic"}
(1 row)

The preceding query has one extension over standard SQL: the c.messages AS o, where o is the alias for the SUPER array messages and serves to iterate over it. In standard SQL, the FROM clause x (AS) y means “for each tuple y in table x.” Similarly, the FROM clause x (AS) y, if x is a SUPER value, translates to “for each (SUPER) value y in (SUPER) array value x.” The projection list can also use the dot and bracket notation for regular navigation.

When unnesting an array, if we want to get the array subscripts (starting from 0) as well, we can specify AT some_index right after unnest. The following examples iterates over both the array values and array subscripts:

SELECT o, index FROM subscription_auto c, c.messages AS o AT index 
WHERE o.topic='sample topic';

-- A sample output 
       o                  | index
--------------------------+-------
{"topic":"sample topic"}  | 1
(1 row)

If we have an array of arrays, we can do multiple unnestings to iterate into the inner arrays. The following example shows how we do it. Note that unnesting non-array expressions (the objects inside c.messages) are ignored, only the arrays are unnested.

SELECT messages, array, inner_array_element FROM subscription_auto c, c.messages AS array, array AS inner_array_element WHERE array = json_parse('[1,2,3]');

-- A sample output 
    messages                                          | array   | inner_array_element
------------------------------------------------------+---------+---------------------
[{"format":"json"},{"topic":"sample topic"},[1,2,3]]  | [1,2,3] | 1
[{"format":"json"},{"topic":"sample topic"},[1,2,3]]  | [1,2,3] | 2
[{"format":"json"},{"topic":"sample topic"},[1,2,3]]  | [1,2,3] | 3
(3 rows)

Dynamic typing

With the schema-less nature of semi-structured data, the same attributes within the JSON might have values of different types. For example, asset_id from our example might have initially started with integers and then because of a business decision changed into alphanumeric (string) values before finally settling on array type. Amazon Redshift SUPER handles this situation by using dynamic typing. Schema-less SUPER data is processed without the need to statically declare the data types before using them in a query. Dynamic typing is most useful in joins and GROUP BY clauses. Although deep comparing of SUPER column is possible, we recommend restricting the joins and aggregations to use the leaf-level scalar attribute for optimal performance. The following example uses a SELECT statement that requires no explicit casting of the dot and bracket expressions to the usual Amazon Redshift types:

SELECT messages[0].format,
        messages[0].topic
FROM subscription_auto
WHERE messages[0].payload.payload."assetId" > 0;
 
--A sample output
 format | topic
--------+-------
"json"  | "sample topic" 
(1 rows)

When your JSON attribute names are in mixed case or CamelCase, the following session parameter is required to query the data. In the preceding query, assetId is in mixed case. For more information, see SUPER configurations.

SET enable_case_sensitive_identifier to TRUE;

The greater than (>) sign in this query evaluates to true when messages[0].payload.payload."assetId" is greater than 0. In all other cases, the equality sign evaluates to false, including the cases where the arguments of the equality are different types. The messages[0].payload.payload."assetId" attribute can potentially contain a string, integer, array, or structure, and Amazon Redshift only knows that it is a SUPER data type. This is where dynamic typing helps in processing the schemaless data by evaluating the types at runtime based on the query. For example, processing the first record of “assetId” may result in an integer, whereas the second record can be a string, and so on. When using an SQL operator or function with dot and bracket expressions that have dynamic types, Amazon Redshift produces results similar to using a standard SQL operator or function with the respective static types. In this example, when the dynamic type of the path expression is an integer, the comparison with the integer 0 is meaningful. Whenever the dynamic type of “assetId” is any other data type except being an integer, the equality returns false.

For queries with joins and aggregations, dynamic typing automatically matches values with different dynamic types without performing a long CASE WHEN analysis to find out what data types may appear. The following code is an example of an aggregation query in which we count the number of topics:

SELECT messages[0].format,
count(messages[0].topic)
FROM subscription_auto WHERE messages[0].payload.payload."assetId" > 'abc' GROUP BY 1;

-- a sample output
 format | count
--------+-------
"json"  |   2
(1 row)

For the next join query example, we unnest and flatten the messages array from subscription_auto and join with the asset_metadata table to get the asset_name and property_name based on the asset_id and property_id, which we use as join keys.

Joins on SUPER should preferably be on an extracted path and avoid deep compare of the entire nested field for performance. In the following examples, the join keys used are on extracted path keys and not on the whole array:

SELECT c.topicId
,c.qos
,a.asset_name
,o.payload.payload."assetId"
,a.asset_property_name
,o.payload.payload."propertyId"
FROM subscription_auto AS c
,c.messages AS o -- o is the alias for messages array
,asset_metadata AS a
WHERE o.payload.payload."assetId" = a.asset_id AND o.payload.payload."propertyId" = a.asset_property_id AND o.payload.type = 'PropertyValueUpdate';
 
--A sample output 
 topicid | qos |  asset_name  |                assetId                 | asset_property_name |               propertyId
---------+-----+--------------+----------------------------------------+---------------------+----------------------------------------
    1001 |   0 | IAH10 Line 1 | "0aaf2aa2-0299-442a-b2ea-ecf3d62f2a2c" | stop                | "3ff67d41-bf69-4d57-b461-6f1513e127a4"
(1 row)

The following code is another join query that is looking for a count on the quality of the metrics collected for a specific asset (in this case IAH10 Line) and it is property (Machine State) and categorizes it based on the quality:

SELECT v.quality::VARCHAR
,count(*)
FROM subscription_auto AS c
,c.messages AS o -- o is the alias for messages array
,o.payload.payload.VALUES AS v -- v is the alias for values array 
,asset_metadata AS a 
WHERE o."assetId" = a.asset_name 
AND o."propertyId" = a.asset_property_name 
AND a.asset_name = 'IAH10 Line 1' 
AND a.asset_property_name = 'Machine State' 
GROUP BY v.quality;

-- A sample output 
quality   | count
----------+-------
CRITICAL  | 152 
GOOD      | 2926 
FAIR      | 722

Summary

This post discussed the benefits of the new SUPER data type and use cases in which nested data types can help improve storage efficiency, performance, or simplify analysis. Amazon Redshift SUPER along with PartiQL enables you to write queries over relational and hierarchical data model with ease. The next post in this series will focus on how to speed up frequent queries on SUPER columns using materialized views. Try it out and share your experience!


About the Authors

Satish Sathiya is a Senior Product Engineer at Amazon Redshift. He is an avid big data enthusiast who collaborates with customers around the globe to achieve success and meet their data warehousing and data lake architecture needs.

 

 

 

Runyao Chen is a Software Development Engineer at Amazon Redshift. He is passionate about MPP databases and has worked on SQL language features such as querying semistructured data using SUPER. In his spare time, he enjoys reading and exploring new restaurants.

 

 

 

Cody Cunningham is a Software Development Engineer at Amazon Redshift. He works on Redshift’s data ingestion, implementing new features and performance improvements, and recently worked on the ingestion support for SUPER.

Design patterns for an enterprise data lake using AWS Lake Formation cross-account access

Post Syndicated from Satish Sarapuri original https://aws.amazon.com/blogs/big-data/design-patterns-for-an-enterprise-data-lake-using-aws-lake-formation-cross-account-access/

In this post, we briefly walk through the most common design patterns adapted by enterprises to build lake house solutions to support their business agility in a multi-tenant model using the AWS Lake Formation cross-account feature to enable a multi-account strategy for line of business (LOB) accounts to produce and consume data from your data lake.

A modern data platform enables a community-driven approach for customers across various industries, such as manufacturing, retail, insurance, healthcare, and many more, through a flexible, scalable solution to ingest, store, and analyze customer domain-specific data to generate the valuable insights they need to differentiate themselves. Building a data lake on Amazon Simple Storage Service (Amazon S3), together with AWS analytic services, sets you on a path to become a data-driven organization.

Overview of Lake House Architecture on AWS

You can deploy data lakes on AWS to ingest, process, transform, catalog, and consume analytic insights using the AWS suite of analytics services, including Amazon EMR, AWS Glue, Lake Formation, Amazon Athena, Amazon QuickSight, Amazon Redshift, Amazon Elasticsearch Service (Amazon ES), Amazon Relational Database Service (Amazon RDS), Amazon SageMaker, and Amazon S3. These services provide the foundational capabilities to realize your data vision, in support of your business outcomes. You can deploy a common data access and governance framework across your platform stack, which aligns perfectly with our own Lake House Architecture.

Large enterprise customers require a scalable data lake with a unified access enforcement mechanism to support their analytics workload. For this, you want to use a single set of single sign-on (SSO) and AWS Identity and Access Management (IAM) mappings to attest individual users, and define a single set of fine-grained access controls across various services. The AWS Lake House Architecture encompasses a single management framework; however, the current platform stack requires that you implement workarounds to meet your security policies without compromising on the ability to drive automation, data proliferation, or scale.

The following diagram illustrates the Lake House architecture.

Lake Formation serves as the central point of enforcement for entitlements, consumption, and governing user access. Furthermore, you may want to minimize data movements (copy) across LOBs and evolve on data mesh methodologies, which is becoming more and more prominent.

Most typical architectures consist of Amazon S3 for primary storage; AWS Glue and Amazon EMR for data validation, transformation, cataloging, and curation; and Athena, Amazon Redshift, QuickSight, and SageMaker for end users to get insight.

Introduction to Lake Formation

Lake Formation is a fully managed service that makes it easy to build, secure, and manage data lakes. Lake Formation simplifies and automates many of the complex manual steps that are usually required to create data lakes. These steps include collecting, cleansing, moving, and cataloging data, and securely making that data available for analytics and ML.

Lake Formation provides its own permissions model that augments the IAM permissions model. This centrally defined permissions model enables fine-grained access to data stored in data lakes through a simple grant or revoke mechanism, much like a relational database management system (RDBMS). Lake Formation permissions are enforced at the table and column level (row level in preview) across the full portfolio of AWS analytics and ML services, including Athena and Amazon Redshift.

With the new cross-account feature of Lake Formation, you can grant access to other AWS accounts to write and share data to or from the data lake to other LOB producers and consumers with fine-grained access. Data lake data (S3 buckets) and the AWS Glue Data Catalog are encrypted with AWS Key Management Service (AWS KMS) customer master keys (CMKs) for security purposes.

Common lake house design patterns using Lake Formation

A typical lake house infrastructure has three major components:

  • Data producer – Publishes the data into the data lake
  • Data consumer – Consumes that data out from the data lake and runs predictive and business intelligence (BI) insights
  • Data platform – Provides infrastructure and an environment to store data assets in the form of a layer cake such as landing, raw, and curated (conformance) data, and establishes security controls between producers and consumers

Although you can construct a data platform in multiple ways, the most common pattern is a single-account strategy, in which the data producer, data consumer, and data lake infrastructure are all in the same AWS account. There is no consensus if using a single account or multiple accounts most of the time is better, but because of the regulatory, security, performance trade-off, we have seen customers adapting to a multi-account strategy in which data producers and data consumers are in different accounts and the data lake is operated from a central, shared account.

This raised the concern of how to manage the data access controls across multiple accounts that are part of the data analytics platform to enable seamless ingestion for producers as well as improved business autonomy and agility for the needs of consumers.

With the general availability of the Lake Formation cross-account feature, the ability to manage data-driven access controls is simplified and offers an RDBMS style of managing data lake assets for producers and consumers.

You can drive your enterprise data platform management using Lake Formation as the central location of control for data access management by following various design patterns that balance your company’s regulatory needs and align with your LOB expectation. The following table summarizes different design patterns.

Design Type Lake Formation Glue Data Catalog Storage (Amazon S3) Compute
Centralized Centralized Centralized Centralized De-Centralized
De-Centralized De-Centralized Centralized De- Centralized De-Centralized

We explain each design pattern in more detail, with examples, in the following sections.\

Terminology

We use the following terms throughout this post when discussing data lake design patterns:

  • LOB – The line of business, such as inventory, marketing, or manufacturing
  • Enterprise data lake account (EDLA) – A centralized AWS account for data lake storage with a centralized AWS Glue Data Catalog and Lake Formation
  • Producer – The process or application producing data for its LOB
  • Consumer – The consumer of the LOB data via AWS services (such as Athena, AWS Glue, Amazon EMR, Amazon Redshift Spectrum, AWS Lambda, and QuickSight)

Centralized data lake design

In a centralized data lake design pattern, the EDLA is a central place to store all the data in S3 buckets along with a central (enterprise) Data Catalog and Lake Formation. The respective LOB producer and consumer accounts have all the required compute to write and read data in and from the central EDLA data, and required fine-grained access is performed using the Lake Formation cross-account feature. That’s why this architecture pattern (see the following diagram) is called a centralized data lake design pattern.

For this post, we use one LOB as an example, which has an AWS account as a producer account that generates data, which can be from on-premises applications or within an AWS environment. This account uses its compute (in this case, AWS Glue) to write data into its respective AWS Glue database. The database is created in the central EDLA where all S3 data is stored using the database link created with the Lake Formation cross-account feature. The same LOB consumer account consumes data from the central EDLA via Lake Formation to perform advanced analytics using services like AWS Glue, Amazon EMR, Redshift Spectrum, Athena, and QuickSight, using the consumer AWS account compute. The following section provides an example.

Create your database, tables and register S3 locations

In the EDLA, complete the following steps:

  1. Register the EDLA S3 bucket path in Lake Formation.
  2. Create a database called edla_lob_a, which points to the EDLA S3 bucket for LOB-A.

  3. Create a customer table in this edla_lob_a database , which points to the EDLA S3 bucket.

The LOB-A producer account can directly write or update data into tables, and create, update, or delete partitions using the LOB-A producer account compute via the Lake Formation cross-account feature.

You can trigger the table creation process from the LOB-A producer AWS account via Lambda cross-account access.

Grant Lake Formation cross-account access

Grant full access to the LOB-A producer account to write, update, and delete data into the EDLA S3 bucket via AWS Glue tables.

If your EDLA and producer accounts are part of same AWS organization, you should see the accounts on the list. If not, you need to enter the AWS account number manually as an external AWS account.

The following screenshot shows the granted permissions in the EDLA for the LOB-A producer account.


When you grant permissions to another account, Lake Formation creates resource shares in AWS Resource Access Manager (AWS RAM) to authorize all the required IAM layers between the accounts. To validate a share, sign in to the AWS RAM console as the EDLA and verify the resources are shared.

The first time you create a share, you see three resources:

  • The AWS Glue Data Catalog in the EDLA
  • The database containing the tables you shared
  • The table resource itself

You only need one share per resource, so multiple database shares only require a single Data Catalog share, and multiple table shares within the same database only require a single database share.

For the share to appear in the catalog of the receiving account (in our case the LOB-A account), the AWS RAM admin must accept the share by opening the share on the Shared With Me page and accepting it.

If both accounts are part of the same AWS organization and the organization admin has enabled automatic acceptance on the Settings page of the AWS Organizations console, then this step is unnecessary.

If your EDLA Data Catalog is encrypted with a KMS CMK, make sure to add your LOB-A producer account root user as the user for this key, so the LOB-A producer account can easily access the EDLA Data Catalog for read and write permissions with its local IAM KMS policy. Data encryption keys don’t need any additional permissions, because the LOB accounts use the Lake Formation role associated with the registration to access objects in Amazon S3.

When you sign in with the LOB-A producer account to the AWS RAM console, you should see the EDLA shared database details, as in the following screenshot.

Create a database resource link in the LOB-A producer account

Resource links are pointers to the original resource that allow the consuming account to reference the shared resource as if it were local to the account. As a pointer, resource links mean that any changes are instantly reflected in all accounts because they all point to the same resource. No sync is necessary for any of this and no latency occurs between an update and its reflection in any other accounts.

  1. Create a resource link to the shared Data Catalog database from the EDLA called shared_edla_lob_a.
  2. Grant full access to the AWS Glue role in the LOB-A producer account for this newly created shared database link from the EDLA so a producer AWS Glue job can create, update, and delete tables and partitions.

You need to perform two grants: one on the database shared link and one on the target to the AWS Glue job role. Granting on the link allows it to be visible to end-users. Data-level permissions are granted on the target itself.

  1. Create an AWS Glue job using this role to create and write data into the EDLA database and S3 bucket location.

The AWS Glue table and S3 data are in a centralized location for this architecture, using the Lake Formation cross-account feature.

This completes the configuration of the LOB-A producer account remotely writing data into the EDLA Data Catalog and S3 bucket. You can create and share the rest of the required tables for this LOB using the Lake Formation cross-account feature.

Because your LOB-A producer created an AWS Glue table and wrote data into the Amazon S3 location of your EDLA, the EDLA admin can access this data and share the LOB-A database and tables to the LOB-A consumer account for further analysis, aggregation, ML, dashboards, and end-user access.

Share the database to the LOB-A consumer account

In the EDLA, you can share the LOB-A AWS Glue database and tables (edla_lob_a, which contains tables created from the LOB-A producer account) to the LOB-A consumer account (in this case, the entire database is shared).

Next, go to the LOB-A consumer account to accept the resource share in AWS RAM.

Accepting the shared database in AWS RAM of the LOB-A consumer account

Sign in with the LOB-A consumer account to the AWS RAM console. You should see the EDLA shared database details.

Accept this resource share request so you can create a resource link in the LOB-A consumer account.

Create a database resource link in the LOB-A consumer account

Create a resource link to a shared Data Catalog database from the EDLA as consumer_edla_lob_a.

Now, grant full access to the AWS Glue role in the LOB-A consumer account for this newly created shared database link from the EDLA so the consumer account AWS Glue job can perform SELECT data queries from those tables. You need to perform two grants: one on the database shared link and one on the target to the AWS Glue job role.

A grant on the resource link allows a user to describe (or see) the resource link, which allows them to point engines such as Athena at it for queries. A grant on the target grants permissions to local users on the original resource, which allows them to interact with the metadata of the table and the data behind it. Permissions of DESCRIBE on the resource link and SELECT on the target are the minimum permissions necessary to query and interact with a table in most engines.

Create an AWS Glue job using this role to read tables from the consumer database that is shared from the EDLA and for which S3 data is also stored in the EDLA as a central data lake store. This data is accessed via AWS Glue tables with fine-grained access using the Lake Formation cross-account feature.

This completes the process of granting the LOB-A consumer account remote access to data for further analysis.

This data can be accessed via Athena in the LOB-A consumer account. LOB-A consumers can also access this data using QuickSight, Amazon EMR, and Redshift Spectrum for other use cases.

 

De-centralized data lake design

In the de-centralized design pattern, each LOB AWS account has local compute, an AWS Glue Data Catalog, and a Lake Formation along with its local S3 buckets for its LOB dataset and a central Data Catalog for all LOB-related databases and tables, which also has a central Lake Formation where all LOB-related S3 buckets are registered in EDLA.

EDLA manages all data access (read and write) permissions for AWS Glue databases or tables that are managed in EDLA. It grants the LOB producer account write, update, and delete permissions on the LOB database via the Lake Formation cross-account share. It also grants read permissions to the LOB consumer account. The respective LOB’s local data lake admins grant required access to their local IAM principals.

Refer to the earlier details on how to share database, tables, and table columns from EDLA to the producer and consumer accounts via Lake Formation cross-account sharing via AWS RAM and resource links.

Each LOB account (producer or consumer) also has its own local storage, which is registered in the local Lake Formation along with its local Data Catalog, which has a set of databases and tables, which are managed locally in that LOB account by its Lake Formation admins.

Clean up

To avoid incurring future charges, delete the resources that were created as part of this exercise.

Delete the S3 buckets in the following accounts:

  • Producer account
  • EDLA account
  • Consumer account (if any)

Delete the AWS Glue jobs in the following accounts:

  • Producer account
  • Consumer account (if any)

Lake Formation limitations

This solution has the following limitations:

  • The spark-submit action on Amazon EMR is not currently supported
  • AWS Glue Context does not yet support column-level fine-grained permissions granted via the Lake Formation

Conclusion

This post describes how you can design enterprise-level data lakes with a multi-account strategy and control fine-grained access to its data using the Lake Formation cross-account feature. This can help your organization build highly scalable, high-performance, and secure data lakes with easy maintenance of its related LOBs’ data in a single AWS account with all access logs and grant details.


About the Authors

Satish Sarapuri is a Data Architect, Data Lake at AWS. He helps enterprise-level customers build high-performance, highly available, cost-effective, resilient, and secure data lakes and analytics platform solutions, which includes streaming and batch ingestions into the data lake. In his spare time, he enjoys spending time with his family and playing tennis.

 

UmaMaheswari Elangovan is a Principal Data Lake Architect at AWS. She helps enterprise and startup customers adopt AWS data lake and analytic services, and increases awareness on building a data-driven community through scalable, distributed, and reliable data lake infrastructure to serve a wide range of data users, including but not limited to data scientists, data analysts, and business analysts. She also enjoys mentoring young girls and youth in technology by volunteering through nonprofit organizations such as High Tech Kids, Girls Who Code, and many more.

 

Zach Mitchell is a Sr. Big Data Architect. He works within the product team to enhance understanding between product engineers and their customers while guiding customers through their journey to develop data lakes and other data solutions on AWS analytics services.

 

 

 

Field Notes: Accelerating Data Science with RStudio and Shiny Server on AWS Fargate

Post Syndicated from Chayan Panda original https://aws.amazon.com/blogs/architecture/field-notes-accelerating-data-science-with-rstudio-and-shiny-server-on-aws-fargate/

Data scientists continuously look for ways to accelerate time to value for analytics projects.  RStudio Server is a popular Integrated Development Environment (IDE) for R, which is used to render analytics visualizations for faster decision making. These visualizations are traditionally hosted on legacy unix servers along with Shiny Server to support analytics. In this previous blog, we provided a solution architecture to run Data Science use cases for medium to large enterprises across industry verticals.

In this post, we describe and deliver the infrastructure code to run a secure, scalable and highly available RStudio and Shiny Server installation on AWS. We use these services: AWS Fargate, Amazon Elastic Container Service (Amazon ECS), Amazon Elastic File System (Amazon EFS), AWS DataSync, and Amazon Simple Storage Service (Amazon S3). We will then demonstrate a Data Science use case in RStudio and create an application on Shiny. The use case discussed involves pre-processing a dataset, and training a machine learning model in RStudio. The goal is to build a shiny application to surface breast cancer prediction insights against a set of parameters to users.

Overview of solution

We show how to deploy a Open Source RStudio Server and a Shiny Server in a serverless architecture from an automated deployment pipeline built with AWS Developer Tools. This is illustrated in the diagram that follows. The deployment adheres to best practices for following an AWS Multi-Account strategy using AWS Organizations.

Figure 1. RStudio/Shiny Open Source Deployment Pipeline on AWS Serverless Infrastructure

Figure 1. RStudio/Shiny Open Source Deployment Pipeline on AWS Serverless Infrastructure

Multi-Account Setup

In the preceding architecture, a central development account hosts the development resources. From this account, the deployment pipeline creates AWS services for RStudio and Shiny along with the integrated services into another AWS account. There can be multiple RStudio/Shiny accounts and instances to suit your requirements. You can also host multiple non-production instances of RStudio/Shiny in a single account.

Public URL Domain and Data Feed

The RStudio/Shiny deployment accounts obtain the networking information for the publicly resolvable domain from a central networking account. The data feed for the containers comes from a central data repository account. Users upload data to the S3 buckets in the central data account or configure an automated service like AWS Transfer Family to programmatically upload files. AWS DataSync transfers the uploaded files from Amazon S3 and stores the files on Amazon EFS mount points on the containers. Amazon EFS provides shared, persistent, and elastic storage for the containers.

Security Footprint

We recommend that you configure AWS Shield or AWS Shield Advanced for the networking account and enable Amazon GuardDuty in all accounts. You can also use AWS Config and AWS CloudTrail for monitoring and alerting on security events before deploying the infrastructure code. You should use an outbound filter such as AWS Network Firewall for network traffic destined for the internet. AWS Web Application Firewall (AWS WAF) protects the Amazon Elastic Load Balancers (Amazon ELB). You can restrict access to RStudio and Shiny from only allowed IP ranges using the automated pipeline.

High Availability

You deploy all AWS services in this architecture in one particular AWS Region. The AWS services used are managed services and configured for high availability. Should a service become unavailable, it automatically launches in the same Availability Zone (AZ) or in a different AZ within the same AWS Region. This means if Amazon ECS restarts the container in another AZ, following a failover, the files and data for the container will not be lost as these are stored on Amazon EFS.

Deployment

The infrastructure code provided in this blog creates all resources described in the preceding architecture. The following numbered items refer to Figure 1.

1. We used AWS Cloud Development Kit (AWS CDK) for Python to develop the infrastructure code and stored the code in an AWS CodeCommit repository.
2. AWS CodePipeline integrates the AWS CDK stacks for automated builds. The stacks are divided into four different stages and are organized by AWS service.
3. AWS CodePipeline fetches the container images from public Docker Hub and stores the images into Amazon Elastic Container Registry (Amazon ECR) repositories for cross-account access. The deployment pipeline accesses these images to create the Amazon ECS container on AWS Fargate in the deployment accounts.
4. The build script uses a key from AWS Key Management Service (AWS KMS) to create secrets. These include a RStudio front-end password, public key for bastion containers, and central data account access keys in AWS Secrets Manager. The deployment pipeline uses these secrets to configure the cross-account containers.
5. The central networking account Amazon Route 53 has the pre-configured base public domain. This is done outside the automated pipeline and the base domain info is passed on as a parameter to the deployment pipeline.
6. The central networking account delegates the base public domain to the RStudio deployment accounts via AWS Systems Manager (SSM) Parameter Store.
7. An AWS Lambda function retrieves the delegated Route 53 zone for configuring the RStudio and Shiny sub-domains.
8. AWS Certificate Manager configures encryption in transit by applying HTTPS certificates on the RStudio and Shiny sub-domains.
9. The pipeline configures an Amazon ECS cluster to control the RStudio, Shiny and Bastion containers and to scale up and down the number of containers as needed.
10. The pipeline creates RStudio container for the instance in a private subnet. RStudio container is not horizontally scalable for the Open Source version of RStudio.
– If you create only one container, the container will be configured for multiple front-end users. You need to specify the user names as email ids in cdk.json.
– Users receive their passwords and Rstudio/Shiny URLs via emails using Amazon Simple Email Service (SES).
– You can also create one RStudio container for each Data Scientist depending on your compute requirements by setting the cdk.json parameter individual_containers to true. You can also control the container memory/vCPU using cdk.json.
– Further details are provided in the readme. If your compute requirements exceed Fargate container compute limits, consider using EC2 launch type of Amazon ECS which offers a range of Amazon EC2 servers to fit your compute requirement. You can specify your installation type in cdk.json and choose either Fargate or EC2 launcg type for your RStudio containers.
11. To help you SSH to RStudio and Shiny containers for administration tasks, the pipeline creates a Bastion container in the public subnet. A Security Group restricts access to the bastion container and you can only access it from the IP range you provide in the cdk.json.
12. Shiny containers are horizontally scalable and the pipeline creates the Shiny containers in the private subnet using Fargate launch type of Amazon ECS. You can specify the number of containers you need for Shiny Server in cdk.json.
13. Application Load Balancers route traffic to the containers and perform health checks. The pipeline registers the RStudio and Shiny load balancers with the respective Amazon ECS services.
14. AWS WAF rules are built to provide additional security to RStudio and Shiny endpoints. You can specify approved IPs to restrict access to RStudio and Shiny from only allowed IPs.
15. Users upload files to be analysed to a central data lake account either with manual S3 upload or programmatically using AWS Transfer for SFTP.
16. AWS DataSync transfers files from Amazon S3 to cross-account Amazon EFS on an hourly interval schedule.
17. An AWS Lambda initiates DataSync transfer on demand outside of the hourly schedule for files that require urgent analysis. It is expected that bulk of the data transfer will happen on the hourly schedule and on-demand trigger will only be used when necessary.
18. Amazon EFS file systems provide shared, persistent and elastic storage for the containers. This is to facilitate deployment of Shiny Apps from RStudio containers using shared file system. The EFS file systems will live through container recycles.
19. You can create Amazon Athena tables on the central data account S3 buckets for direct interaction using JDBC from the RStudio container. Access keys for cross account operation are stored in the RStudio container R environment.

Note: It is recommended that you implement short term credential vending for this operation.

The source code for this deployment can be found in the aws-samples GitHub repository.

Prerequisites

To deploy the cdk stacks from the source code, you should have the following prerequisites:

1. Access to four AWS accounts (minimum three) for a basic multi-account deployment.
2. Permission to deploy all AWS services mentioned in the solution overview.
3. Review RStudio and Shiny Open Source Licensing: AGPL v3 (https://www.gnu.org/licenses/agpl-3.0-standalone.html)
4. Basic knowledge of R, RStudio Server, Shiny Server, Linux, AWS Developer Tools (AWS CDK in Python, AWS CodePipeline, AWS CodeCommit), AWS CLI and, the AWS services mentioned in the solution overview
5. Ensure you have a Docker hub login account, otherwise you might get an error while pulling the container images from Docker Hub with the pipeline – You have reached your pull rate limit. You may increase the limit by authenticating and upgrading: https://www.docker.com/increase-rate-limits.
6. Review the readmes delivered with the code and ensure you understand how the parameters in cdk.json control the deployment and how to prepare your environment to deploy the cdk stacks via the pipeline detailed below.

Installation

Create the AWS accounts to be used for deployment and ensure you have admin permissions access to each account. Typically, the following accounts are required:

Central Development account – this is the account where the AWS Secret Manager parameters, AWS CodeCommit repository, Amazon ECR repositories, and AWS CodePipeline will be created.
Central Network account – the Route53 base public domain will be hosted in this account
Rstudio instance account – You can use as many of these accounts as required, this account will deploy RStudio and Shiny containers for an instance (dev, test, uat, prod) along with a bastion container and associated  services as described in the solution architecture.
Central Data account – this is the account to be used for deploying the data lake resources – such as S3 bucket for selecting up ingested source files.

1. Install AWS CLI and create an AWS CLI profile for each account (pipeline, rstudio, network, datalake ) so that AWS CDK can be used.
2. Install AWS CDK in Python and bootstrap each account and allow the Central Development account to perform cross-account deployment to all the other accounts.

export CDK_NEW_BOOTSTRAP=1
npx cdk bootstrap --profile <AWS CLI profile of central development account> --cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess aws://<Central Development Account>/<Region>

cdk bootstrap \
--profile <AWS CLI profile of rstudio deployment account> \
--trust <Central Development Account> \
--cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess \
aws://<RStudio Deployment Account>/<Region>

cdk bootstrap \
--profile <AWS CLI profile of central network account> \
--trust <Central Development Account> \
--cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess \
aws://<Central Network Account>/<Region>

cdk bootstrap \
--profile <AWS CLI profile of central data account> \
--trust <Central Development Account> \
--cloudformation-execution-policies arn:aws:iam::aws:policy/AdministratorAccess \
aws://<Central Data Account>/<Region>

3. Build the Docker container images in Amazon ECR in the central development account by running the image build pipeline as instructed in the readme.
a. Using the AWS console, create an AWS CodeCommit repository to hold the source code for building the images – for example, rstudio_docker_images.
b. Clone the GitHub repository and move into the image-build folder.
c. Using the CLI – Create a secret to store your DockerHub login details as follows:

aws secretsmanager create-secret --profile <AWS CLI profile of central development account> --name ImportedDockerId --secret-string '{"username":"<dockerhub username>", "password":"<dockerhub password>"}'

d. Create an AWS CodeCommit repository to hold the source code for building the images – e.g. rstudio_docker_images and pass the repository name to the name parameter in cdk.json for the image build pipeline
e. Pass the account numbers (comma separated) where rstudio instances will be deployed in the cdk.json paramter rstudio_account_ids
f. Synthesize the image build stack

cdk synth --profile <AWS CLi profile of central development account>

g. Commit the changes into the AWS CodeCommit repo you created using GitHub.
h. Deploy the pipeline stack for container image build.

cdk deploy --profile <AWS CLI profile of central development account>

i. Log into AWS console in the central development account and navigate to CodePipeline service. Monitor the pipeline (pipeline name is the name you provided in the name parameter in cdk.json) and confirm the docker images build successfully.

4. Move into the rstudio-fargate folder. Provide the comma separated accounts where rstudio/shiny will be deployed in the cdk.json against the parameter rstudio_account_ids.

5. Synthesize the stack Rstudio-Configuration-Stack in the Central Development account.

cdk synth Rstudio-Configuration-Stack --profile <AWS CLI profile of central development account> 

6. Deploy the Rstudio-Configuration-Stack. This stack should create a new CMK KMS Key to use for creating the secrets with AWS Secrets Maanger. The stack will output the AWS ARN for the KMS key. Note down the ARN. Set the parameter “encryption_key_arn” inside cdk.json to the above ARN.

cdk deploy Rstudio-Configuration-Stack --profile <AWS CLI profile of rstudio deployment account>

7. Run the script rstudio_config.sh after setting the required cdk.json parameters. Refer readme.

sh ./rstudio_config.sh <AWS CLI profile of the central development account> "arn:aws:kms:<region>:<AWS CLI profile of central development account>:key/<key hash>" <AWS CLI profile of central data account> <comma separated AWS CLI profiles of the rstudio deployment accounts>

8. Run the script check_ses_email.sh with comma separated profiles for rstudio deployment accounts. This will check whether all user emails have been registed with Amazon SES for all the rstudio deployment accounts in the region, before you can deploy rstudio/shiny.

sh ./check_ses_email.sh <comma separated AWS CLI profiles of the rstudio deployment accounts>

9. Before committing the code into the AWS CodeCommit repository, synthesize the pipeline stack against all the accounts involved in this deployment. This ensures all the necessary context values are populated into cdk.context.json file and to avoid the DUMMY values being mapped.

cdk synth --profile <AWS CLI profile of the central development account>
cdk synth --profile <AWS CLI profile of the central network account>
cdk synth --profile <AWS CLIrepeat for each profile of the RStudio deplyment account>

10. Deploy the Rstudio Fargate pipeline stack.

cdk deploy --profile <AWS CLI profile of the central development account> Rstudio-Piplenine-Stack

Data Science use case

Now the installation is completed. We can demonstrate a typical data science use case:

  1. Explore, and pre-process a dataset, and train a machine learning model in RStudio,
  2. Build a Shiny application that makes prediction against the trained model to surface insight to dashboard users.

This showcases how to publish a Shiny application from RStudio containers to Shiny containers via a common EFS filesystem.

First, we log on to the RStudio container with the URL from the deployment and clone the accompanying repository using the command line terminal. The ML example is in ml_example directory. We use the UCI Breast Cancer Wisconsin (Diagnostic) dataset from mlbench library. Refer to the ml_example/breast_cancer_modeling.r.

$ git clone https://github.com/aws-samples/aws-fargate-with-rstudio-open-source.git
Figure 2. Use the terminal to clone the repository in RStudio IDE.

Figure 2 – Use the terminal to clone the repository in RStudio IDE.

Let’s open the ml_example/breast_cancer_modeling.r script in the RStudio IDE. The script does the following:

  1. Install and import the required libraries, mainly caret, a popular machine learning library, and mlbench, a collection of ML datasets;
  2. Import the UCI breast cancer dataset, create an 80/20 split for training and testing (in shiny app) purposes;
  3. Perform preprocessing to impute the missing values (shown as NA) in the dataframe and standardize the numeric columns;
  4. Train a stochastic gradient boosting model with cross-validation with the area under the ROC curve (AUC) as the tuning metric;
  5. Save the testing split, preprocessing object and the trained model into the directory where shiny app script is located/breast-cancer-prediction.

You can execute the whole script with this command in the console.

> source('~/aws-fargate-with-rstudio-open-source/ml_example/breast_cancer_modeling.r')

We can then inspect the model evaluation in the model object gbmFit.

> gbmFit
Stochastic Gradient Boosting 

560 samples
  9 predictor
  2 classes: 'benign', 'malignant' 

No pre-processing
Resampling: Cross-Validated (10 fold, repeated 10 times) 
Summary of sample sizes: 504, 505, 503, 504, 504, 504, ... 
Resampling results across tuning parameters:

  interaction.depth  n.trees  ROC        Sens       Spec     
  1                   50      0.9916391  0.9716967  0.9304474
  1                  100      0.9917702  0.9700676  0.9330789
  1                  150      0.9911656  0.9689790  0.9305000
  2                   50      0.9922102  0.9708859  0.9351316
  2                  100      0.9917640  0.9681682  0.9346053
  2                  150      0.9910501  0.9662613  0.9361842
  3                   50      0.9922109  0.9689865  0.9381316
  3                  100      0.9919198  0.9684384  0.9360789
  3                  150      0.9912103  0.9673348  0.9345263

If the results are as expected, move on to developing a dashboard and publishing the model for business users to consume the machine learning insights.

In the repository, ml_example/breast-cancer-prediction/app.R has a Shiny application that displays a summary statistics and distribution of the testing data, and an interactive dashboard. This allows users to select data points on the chart and understand get the machine learning model inference as needed. Users can also modify the threshold to alter the specificity and sensitivity of the prediction. Thanks to the shared EFS filesystem across the RStudio and Shiny containers, we can publish the Shiny application with the following shell command to /srv/shiny-server.

$ cp ~/aws-fargate-with-rstudio-open-source/ml_example/breast-cancer-prediction/ \ /srv/shiny-server/ -rfv

That’s it. The Shiny application is now on the Shiny containers accessible from the Shiny URL, load balanced by Application Load Balancer. You can slide over the Probability Threshold to test how it changes the total count in the prediction, change the variables for the scatter plot and select data points to test the individual predictions.

shiny-dashboard-breast-cancer

Figure 3 – The Shiny Application

Cleaning up

Please follow the readme in the repository to delete the stacks created.

Conclusion

In this blog, we demonstrated how a serverless architecture can be deployed, walked through a data science use case in RStudio server and deployed an interactive dashboard in Shiny server. The solution creates a scalable, secure, and serverless data science environment for the R community that accelerates the data science process. The infrastructure and data science code is available in the github repository.

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

 

 

Streaming Amazon DynamoDB data into a centralized data lake

Post Syndicated from Praveen Krishnamoorthy Ravikumar original https://aws.amazon.com/blogs/big-data/streaming-amazon-dynamodb-data-into-a-centralized-data-lake/

For organizations moving towards a serverless microservice approach, Amazon DynamoDB has become a preferred backend database due to its fully managed, multi-Region, multi-active durability with built-in security controls, backup and restore, and in-memory caching for internet-scale application. , which you can then use to derive near-real-time business insights. The data lake provides capabilities to business teams to plug in BI tools for analysis, and to data science teams to train models. .

This post demonstrates two common use cases of streaming a DynamoDB table into an Amazon Simple Storage Service (Amazon S3) bucket using Amazon Kinesis Data Streams, AWS Lambda, and Amazon Kinesis Data Firehose via Amazon Virtual Private Cloud (Amazon VPC) endpoints in the same AWS Region. We explore two use cases based on account configurations:

  • DynamoDB and Amazon S3 in same AWS account
  • DynamoDB and Amazon S3 in different AWS accounts

We use the following AWS services:

  • Kinesis Data Streams for DynamoDBKinesis Data Streams for DynamoDB captures item-level modifications in any DynamoDB table and replicates them to a Kinesis data stream of your choice. Your applications can access the data stream and view the item-level changes in near-real time. Streaming your DynamoDB data to a data stream enables you to continuously capture and store terabytes of data per hour. Kinesis Data Streams enables you to take advantage of longer data retention time, enhanced fan-out capability to more than two simultaneous consumer applications, and additional audit and security transparency. Kinesis Data Streams also gives you access to other Kinesis services such as Kinesis Data Firehose and Amazon Kinesis Data Analytics. This enables you to build applications to power real-time dashboards, generate alerts, implement dynamic pricing and advertising, and perform sophisticated data analytics, such as applying machine learning (ML) algorithms.
  • Lambda – Lambda lets you run code without provisioning or managing servers. It provides the capability to run code for virtually any type of application or backend service without managing servers or infrastructure. You can set up your code to automatically trigger from other AWS services or call it directly from any web or mobile app.
  • Kinesis Data Firehose – Kinesis Data Firehose helps to reliably load streaming data into data lakes, data stores, and analytics services. It can capture, transform, and deliver streaming data to Amazon S3 and other destinations. It’s a fully managed service that automatically scales to match the throughput of your data and requires no ongoing administration. It can also batch, compress, transform, and encrypt your data streams before loading, which minimizes the amount of storage used and increases security.

Security is the primary focus of our use cases, so the services used in both use server-side encryption at rest and VPC endpoints for securing the data in transit.

Use case 1: DynamoDB and Amazon S3 in same AWS account

In our first use case, our DynamoDB table and S3 bucket are in the same account. We have the following resources:

  • A Kinesis data stream is configured to use 10 shards, but you can change this as needed.
  • A DynamoDB table with Kinesis streaming enabled is a source to the Kinesis data stream, which is configured as a source to a Firehose delivery stream.
  • The Firehose delivery stream is configured to use a Lambda function for record transformation along with data delivery into an S3 bucket. The Firehose delivery stream is configured to batch records for 2 minutes or 1 MiB, whichever occurs first, before delivering the data to Amazon S3. The batch window is configurable for your use case. For more information, see Configure settings.
  • The Lambda function used for this solution transforms the DynamoDB item’s multi-level JSON structure to a single-level JSON structure. It’s configured to run in a private subnet of an Amazon VPC, with no internet access. You can extend the function to support more complex business transformations.

The following diagram illustrates the architecture of the solution.

The architecture uses the DynamoDB feature to capture item-level changes in DynamoDB tables using Kinesis Data Streams. This feature provides capabilities to securely stream incremental updates without any custom code or components.

Prerequisites

To implement this architecture, you need the following:

  • An AWS account
  • Admin access to deploy the needed resources

Deploy the solution

In this step, we create a new Amazon VPC along with the rest of the components.

We also create an S3 bucket with the following features:

You can extend the template to enable additional S3 bucket features as per your requirements.

For this post, we use an AWS CloudFormation template to deploy the resources. As part of best practices, consider organizing resources by lifecycle and ownership as needed.

We use an AWS Key Management Service (AWS KMS) key for server-side encryption to encrypt the data in Kinesis Data Streams, Kinesis Data Firehose, Amazon S3, and DynamoDB.

The Amazon CloudWatch log group data is always encrypted in CloudWatch Logs. If required, you can extend this stack to encrypt log groups using KMS CMKs.

  1. Click on Launch Stack button below to create a CloudFormation :
  2. On the CloudFormation console, accept default values for the parameters.
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

After stack creation is complete, note the value of the BucketName output variable from the stack’s Outputs tab. This is the S3 bucket name that is created as part of the stack. We use this value later to test the solution.

Test the solution

To test the solution, we insert a new item and then update the item in the DynamoDB table using AWS CloudShell and the AWS Command Line Interface (AWS CLI). We will also use the AWS Management Console to monitor and verify the solution.

  1. On the CloudShell console, verify that you’re in the same Region as the DynamoDB table (the default is us-east-1).
  2. Enter the following AWS CLI command to insert an item:
    aws dynamodb put-item \ 
    --table-name blog-srsa-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Adam"} , "Designation": {"S": "Architect"} }' \ 
    --return-consumed-capacity TOTAL

  3. Enter the following command to update the item: We are updating the Designation from “Architect” to ” Senior Architect
    aws dynamodb put-item \ 
    --table-name blog-srsa-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Adam"} , "Designation": {"S": "Senior Architect"} }' \ 
    --return-consumed-capacity TOTAL

All item-level modifications from the DynamoDB table are sent to a Kinesis data stream (blog-srsa-ddb-table-data-stream), which delivers the data to a Firehose delivery stream (blog-srsa-ddb-table-delivery-stream).

You can monitor the processing of updated records in the Firehose delivery stream on the Monitoring tab of the delivery stream.

You can verify the delivery of the updates to the data lake by checking the objects in the S3 bucket (BucketName value from the stack Outputs tab).

The Firehose delivery stream is configured to write records to Amazon S3 using a custom prefix which is based on the date the records are delivered to the delivery stream. This partitions the delivered records by date which helps improve query performance by limiting the amount of data that query engines need to scan in order to return the results for a specific query. For more information, see Custom Prefixes for Amazon S3 Objects.

The file is in JSON format. You can verify the data in the following ways:

Use case 2: DynamoDB and Amazon S3 in different AWS accounts

The solution for this use case uses two CloudFormation stacks: the producer stack (deployed in Account A) and the consumer stack (deployed in Account B).

The producer stack (Account A) deploys the following:

  • A Kinesis data stream is configured to use 10 shards, but you can change this as needed.
  • A DynamoDB table with Kinesis streaming is enabled as a source to the Kinesis data stream, and the data stream is configured as a source to a Firehose delivery stream.
  • The Firehose delivery stream is configured to use a Lambda function for record transformation along with data delivery into an S3 bucket in Account B. The delivery stream is configured to batch records for 2 minutes or 1 MiB, whichever occurs first, before delivering the data to Amazon S3. The batch window is configurable for your use case.
  • The Lambda function is configured to run in a private subnet of an Amazon VPC, with no internet access. For this solution, the function transforms the multi-level JSON structure to a single-level JSON structure. You can extend the function to support more complex business transformations.

The consumer stack (Account B) deploys an S3 bucket configured to receive the data from the Firehose delivery stream in Account A.

The following diagram illustrates the architecture of the solution.

The architecture uses the DynamoDB feature to capture item-level changes in DynamoDB tables using Kinesis Data Streams. This feature provides capabilities to securely stream incremental updates without any custom code or components. 

Prerequisites

For this use case, you need the following:

  • Two AWS accounts (for the producer and consumer)
    • If you already deployed the architecture for the first use case and want to use the same account, delete the stack from the previous use case before proceeding with this section
  • Admin access to deploy needed resources

Deploy the components in Account B (consumer)

This step creates an S3 bucket with the following features:

  • Encryption at rest using CMKs
  • Block Public Access
  • Bucket versioning

You can extend the template to enable additional S3 bucket features as needed.

We deploy the resources with a CloudFormation template. As part of best practices, consider organizing resources by lifecycle and ownership as needed.

We use the KMS key for server-side encryption to encrypt the data in Amazon S3.

The CloudWatch log group data is always encrypted in CloudWatch Logs. If required, you can extend the stack to encrypt log group data using KMS CMKs.

  1. Choose Launch Stack to create a CloudFormation stack in your account:
  2. For DDBProducerAccountID, enter Account A’s account ID.
  3. For KMSKeyAlias, the KMS key used for server-side encryption to encrypt the data in Amazon S3 is populated by default.
  4. Choose Create stack.

After stack creation is complete, note the value of the BucketName output variable. We use this value later to test the solution.

Deploy the components in Account A (producer)

In this step, we sign in to the AWS Management Console with Account A to deploy the producer stack. We use the KMS key for server-side encryption to encrypt the data in Kinesis Data Streams, Kinesis Data Firehose, Amazon S3, and DynamoDB. As with other stacks, the CloudWatch log group data is always encrypted in CloudWatch Logs, but you can extend the stack to encrypt log group data using KMS CMKs.

  1. Choose Launch Stack to create a CloudFormation stack in your account:
  2. For ConsumerAccountID, enter the ID of Account B.
  3. For CrossAccountDatalakeBucket, enter the bucket name for Account B, which you created in the previous step.
  4. For ArtifactBucket, the S3 bucket containing the artifacts required for deployment is populated by default.
  5. For KMSKeyAlias, the KMS key used for server-side encryption to encrypt the data in Amazon S3 is populated by default.
  6. For BlogTransformationLambdaFile, the Amazon S3 key for the Lambda function code to perform Amazon Firehose Data transformation is populated by default.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.

Test the solution

To test the solution, we sign in as Account A, insert a new item in the DynamoDB table, and then update that item. Make sure you’re in the same Region as your table.

  1. On the CloudShell console, enter the following AWS CLI command to insert an item:
    aws dynamodb put-item \ 
    --table-name blog-srca-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Chris"} , "Designation": {"S": "Senior Consultant"} }' \ 
    --return-consumed-capacity TOTAL

  2. Update the existing item with the following code:
    aws dynamodb put-item \ 
    --table-name blog-srca-ddb-table \ 
    --item '{ "id": {"S": "864732"}, "name": {"S": "Chris"} , "Designation": {"S": "Principal Consultant"} }' \ 
    --return-consumed-capacity TOTAL

  3. Sign out of Account A and sign in as Account B to verify the delivery of records into the data lake.

All item-level modifications from an DynamoDB table are sent to a Kinesis data stream (blog-srca-ddb-table-data-stream), which delivers the data to a Firehose delivery stream (blog-srca-ddb-table-delivery-stream) in Account A.

You can monitor the processing of the updated records on the Monitoring tab of the Firehose delivery stream.

You can verify the delivery of updates to the data lake by checking the objects in the S3 bucket that you created in Account B.

The Firehose delivery stream is configured similarly to the previous use case.

You can verify the data (in JSON format) in the same ways:

  • Download the files
  • Run an AWS Glue crawler to create a table to query in Athena
  • Query the data using Amazon S3 Select

Clean up

To avoid incurring future charges, clean up all the AWS resources that you created using AWS CloudFormation. You can delete these resources on the console or via the AWS CLI. For this post, we walk through the steps using the console.

Clean up resources from use case 1

To clean up the DynamoDB and Amazon S3 resources in the same account, complete the following steps:

  1. On the Amazon S3 console, empty the S3 bucket and remove any previous versions of S3 objects.
  2. On the AWS CloudFormation console, delete the stack bdb1040-ddb-lake-single-account-stack.

You must delete the Amazon S3 resources before deleting the stack, or the deletion fails.

Clean up resources from use case 2

To clean up the DynamoDB and Amazon S3 resources in different accounts, complete the following steps:

  1. Sign in to Account A.
  2. On the AWS CloudFormation console, delete the stack bdb1040-ddb-lake-multi-account-stack.
  3. Sign in to Account B.
  4. On the Amazon S3 console, empty the S3 bucket and remove any pervious versions of S3 objects.
  5. On the AWS CloudFormation console, delete the stack bdb1040-ddb-lake-multi-account-stack.

Extend the solution

You can extend this solution to stream DynamoDB table data into cross-Region S3 buckets by setting up cross-Region replication (using the Amazon secured private channel) on the bucket where Kinesis Data Firehose delivers the data.

You can also perform a point-in-time initial load of the DynamoDB table into the data lake before setting up DynamoDB Kinesis streams. DynamoDB provides a no-coding required feature to achieve this. For more information, see Export Amazon DynamoDB Table Data to Your Data Lake in Amazon S3, No Code Writing Required.

To extend the usability scope of DynamoDB data in S3 buckets, you can crawl the location to create AWS Glue Data Catalog database tables. Registering the locations with AWS Lake Formation helps simplify permission management and allows you to implement fine-grained access control. You can also use Athena, Amazon Redshift, Amazon SageMaker, and Amazon QuickSight for data analysis, ML, and reporting services.

Conclusion

In this post, we demonstrated two solutions for streaming DynamoDB table data into Amazon S3 to build a data lake using a secured Amazon private channel.

The CloudFormation template gives you an easy way to set up the process, which you can be further modify to meet your specific use case needs.

Please let us know if you have comments about this post!


About the Authors

Praveen Krishnamoorthy Ravikumar is a Data Architect with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics platform and solutions.

 

 

 

Abhishek Gupta is a Data and ML Engineer with AWS Professional Services Practice.

 

 

 

 

Ashok Yoganand Sridharan is a Big Data Consultant with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics platform and solutions

 

 

 

Increase Apache Kafka’s resiliency with a multi-Region deployment and MirrorMaker 2

Post Syndicated from Anusha Dharmalingam original https://aws.amazon.com/blogs/big-data/increase-apache-kafkas-resiliency-with-a-multi-region-deployment-and-mirrormaker-2/

Customers create business continuity plans and disaster recovery (DR) strategies to maximize resiliency for their applications, because downtime or data loss can result in losing revenue or halting operations. Ultimately, DR planning is all about enabling the business to continue running despite a Regional outage. This post explains how to make Apache Kafka resilient to issues that span more than a single Availability Zone using a multi-Region Apache Kafka architecture. We use Apache Kafka deployed as Amazon Managed Streaming for Apache Kafka (Amazon MSK) clusters in this example, but the same architecture also applies to self-managed Apache Kafka.

Amazon MSK is a fully managed service that makes it easy for you to build and run Apache Kafka to process streaming data. Amazon MSK provides high availability by offering Multi-AZ configurations to distribute brokers across multiple Availability Zones within an AWS Region. A single MSK cluster deployment provides message durability through intra-cluster data replication. Data replication with a replication factor of 3 and “min-ISR” value of 2 along with the producer setting acks=all provides the strongest availability guarantees, because it ensures that other brokers in the cluster acknowledge receiving the data before the leader broker responds to the producer. This design provides robust protection against single broker failure as well as Single-AZ failure. However, if an unlikely issue was impacting your applications or infrastructure across more than one Availability Zone, the architecture outlined in this post can help you prepare, respond, and recover from it.

For companies that can withstand a longer time to recover (Recovery Time Objective, RTO) but are sensitive to data loss on Amazon MSK (Recovery Point Objective, RPO), backing up data to Amazon Simple Storage Service (Amazon S3) and recovering the data from Amazon S3 is sufficient as a DR plan. However, most streaming use cases rely on the availability of the MSK cluster itself for your business continuity plan, and you may want a lower RTO as well. In these cases, setting up MSK clusters in multiple Regions and configuring them to replicate data from one cluster to another provides the required business resilience and continuity.

MirrorMaker

MirrorMaker is a utility bundled as part of Apache Kafka, which helps replicate the data between two Kafka clusters. MirrorMaker is essentially a Kafka high-level consumer and producer pair, efficiently moving data from the source cluster to the destination cluster. Use cases for MirrorMaker include aggregating data to a central cluster for analytics, isolating data based on use case, geo-proximity, migrating data from one Kafka cluster to another, and for highly resilient deployments.

In this post, we use MirrorMaker v2 (MM2), which is available as part of Apache Kafka version 2.4 onwards, because it enables us to sync topic properties and also sync offset mappings across clusters. This feature helps us migrate consumers from one cluster to another because the offsets are synced across clusters.

Solution overview

In this post, we dive into the details of how to configure Amazon MSK with cross-Region replication for the DR process. The following diagram illustrates our architecture.

We create two MSK clusters across the primary and secondary Regions (mapping to your chosen Regions), with the primary being active and secondary being passive. We can also extend this solution to an active-active setup. Our Kafka clients interact with the primary Region’s MSK cluster. The Kafka Connect cluster is deployed in the secondary Region’s MSK cluster and hosts the MirrorMaker connectors responsible for replication.

We go through the following steps to show the end-to-end process of setting up the deployment, failing over the clients if a Regional outage occurs, and failing back after the outage:

  1. Set up an MSK cluster in the primary Region.
  2. Set up an MSK cluster in the secondary Region.
  3. Set up connectivity between the two MSK clusters.
  4. Deploy Kafka Connect as containers using AWS Fargate.
  5. Deploy MirrorMaker connectors on the Kafka Connect cluster.
  6. Confirm data is replicated from one Region to another.
  7. Fail over clients to the secondary Region.
  8. Fail back clients to the primary Region.

Step 1: Set up an MSK cluster in the primary Region

To set up an MSK cluster in your primary Region, complete the following steps:

  1. Create an Amazon Virtual Private Cloud (Amazon VPC) in the Region where you want to have your primary MSK cluster.
  2. Create three (or at least two) subnets in the VPC.
  3. Create an MSK cluster using the AWS Command Line Interface (AWS CLI) or the AWS Management Console.

For this post, we use the console. For instructions, see Creating an Amazon MSK Cluster.

  1. Choose the Kafka version as 2.7 or higher.
  2. Pick the broker instance type based on your use case and configuration needs.
  3. Choose the VPC and subnets created to make sure the brokers in your MSK clusters are spread across multiple Availability Zones.
  4. For Encrypt Data in transit, choose TLS encryption between brokers and between client and brokers.
  5. For Authentication, you can choose IAM access control, TLS-based authentication, or username/password authentication.

We use SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism) authentication to authenticate Apache Kafka clients using usernames and passwords for clusters secured by AWS Secrets Manager. AWS has since launched IAM Access Control which could be used as authentication for this solution, For more information about IAM Access Control, see Securing Apache Kafka is easy and familiar with IAM Access Control for Amazon MSK.

  1. Create the secret in Secrets Manager and associate it to the MSK cluster. For instructions, see Username and password authentication with AWS Secrets Manager.

Make sure the secrets are encrypted with a customer managed key via AWS Key Management Service (AWS KMS).

Step 2: Set up an MSK cluster in the secondary Region

To set up an MSK cluster in our secondary Region, complete the following steps:

  1. Create an MSK cluster in another Region with similar configuration to the first.
  2. Make sure the number of brokers and instance type match what was configured in the primary.

This makes sure the secondary cluster has the same capacity and performance metrics as the primary cluster.

  1. For Encrypt Data in transit, choose TLS encryption between brokers and between client and brokers.
  2. For Authentication, choose the same authentication mechanism as with the cluster in the primary Region.
  3. Create a secret in Secrets Manager and secure with a customer managed KMS key in the Region of the MSK cluster.

Step 3: Set up connectivity between the two MSK clusters

For data to replicate between the two MSK clusters, you need to allow the clusters in different VPCs to communicate with each other, where VPCs are within the same or a different AWS account, or the same or different Region. You have the following options for resources in either VPC to communicate with each other as if they’re within the same network:

For more information about access options, see Accessing an Amazon MSK Cluster.

VPC peering is more suited for environments that have a high degree of trust between the parties that are peering their VPCs. This is because, after a VPC peering connection is established, the resources in either VPC can initiate a connection. You’re responsible for implementing fine-grained network access controls with security groups to make sure that only specific resources intended to be reachable are accessible between the peered VPCs. For our data replication use case, we assume that the two VPCs are trusted and therefore we can use VPC peering connectivity to replicate data between the primary and secondary MSK clusters. For instructions on setting up VPC peering connections between two VPCs across two Regions, see Creating and accepting a VPC peering connection.

When you set up VPC peering, enable DNS resolution support. This allows you to resolve public IPv4 DNS hostnames to private IPv4 addresses when queried from instances in the peer VPC. To enable DNS resolution on VPC peering, you must have the two peering VPCs enabled for DNS hostnames and DNS resolution. This step is important for you to be able to access the MSK cluster using DNS names across the VPCs.

Step 4: Deploy Kafka Connect as containers using AWS Fargate

Kafka Connect is a scalable and reliable framework to stream data between a Kafka cluster and external systems. Connectors in Kafka Connect define where data should be copied to and from. Each connector instance coordinates a set of tasks that copy the data. Connectors and tasks are logical units of work and must be scheduled to run in a process. Kafka Connect calls these processes workers and has two types of workers: standalone and distributed.

Deploying Kafka Connect in a distributed mode provides scalability and automatic fault tolerance for the tasks that are deployed in the worker. In distributed mode, you start many worker processes using the same group ID, and they automatically coordinate to schedule running connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.

Kafka Connect in distributed mode lends itself to be deployed as containers (workers) and scales based on the number of tasks and connectors that are being deployed on Kafka Connect.

Fargate is a serverless compute engine for containers that works with both Amazon Elastic Container Service (Amazon ECS) and Amazon Elastic Kubernetes Service (Amazon EKS). Fargate makes it easy for you to focus on building your applications. Fargate removes the need to provision and manage servers, lets you specify and pay for resources per application, and improves security through application isolation by design.

For replicating data using MirrorMaker, the pattern of remote-consume and local-produce is recommended, so in the simplest source-destination replication pair, you want to deploy your MirrorMaker connectors on Kafka Connect in your destination MSK cluster. This avoids loss of data because data is replicated across Regions. In this step, we build and run a distributed Kafka Connect in a Fargate cluster.

The Docker container for Kafka Connect is available on GitHub. For more details on the Docker container and its content, refer to the README.md file.

  1. Clone the code from GitHub and build the code.
  2. Push the image into a repository in Amazon Elastic Container Registry (Amazon ECR).
  3. Create a Fargate cluster in your secondary Region, in the same VPC as your MSK cluster.
  4. Deploy the Fargate cluster.
  5. Deploy the Kafka Connect containers.

The task definition JSON to deploy Kafka Connect containers is available on GitHub. The JSON file refers to a Docker container that was pushed into Amazon ECR earlier.

  1. Replace the IMAGE_URL string in the JSON file with the actual image from Amazon ECR.
  2. Replace the IAM_ROLE string with the ARN of your AWS Identity and Access Management (IAM) role.

The IAM role for the Amazon ECS task should have permission to interact with MSK clusters, read secrets from Secret Manager, decrypt the KMS key used to encrypt the secret, read images from Amazon ECR, and write logs to Amazon CloudWatch.

  1. Make sure to update in the following environment variables with the appropriate values in the task definition:
    1. BROKERS – The bootstrap servers connection string of the MSK cluster in the secondary Region.
    2. USERNAME – The username that was created as a secret in Secrets Manager and associated with the MSK cluster in the secondary Region.
    3. PASSWORD – The password that was created as a secret in Secrets Manager and associated with the MSK cluster in the secondary Region.
    4. GROUP – The Kafka Connect group ID to register all containers to the same group.
  2. Create a service based on the task definition and deploy at least two tasks on the Fargate cluster.
  3. Wait until the tasks are provisioned and in running status.
  4. Log in from a bastion host or any Amazon Elastic Compute Cloud (Amazon EC2) instance in the VPC that you can log in to (using SSH or AWS Systems Manager Session Manager).

You use this EC2 instance for your administration of the Kafka Connect cluster. Because you use this host to interact with MSK clusters, you have to download Kafka binary (greater than 2.7 version).

Each running Fargate task gets its own elastic network interface (ENI) and IPV4 address, which you can use to connect to the application running on the task. You can view the ENI attachment information for tasks on the Amazon ECS console or with the DescribeTasks API operation.

  1. Connect to one of the Amazon ECS task IPs and check if the Kafka Connect cluster is up and running (Kafka Connect runs on port 8083):
    curl <ip-address>:8083 | jq .
    
    {
      "version": "2.7.0",
      "commit": "448719dc99a19793",
      "kafka_cluster_id": "J1xVaRK9QW-1eJq3jJvbsQ"
    }

Step 5: Deploy MirrorMaker connectors on the Kafka Connect cluster

MirrorMaker 2 is based on the Kafka Connect framework and runs based on Kafka source connectors. In the Kafka Connect configuration, a source connector reads data from any data repository and writes data into a Kafka cluster, and a sink connector reads data from a Kafka cluster and writes to any data repository.

MM2 creates remote topics, which are replicated topics that refer back to the source cluster topics using an alias. This is handled by a class called the replication policy class; a default class is provided by Apache Kafka.

For example, the following diagram shows TopicA in a source cluster with alias Primary, which gets replicated to a destination cluster with the topic name Primary.TopicA.

In a failover scenario, when you move your Kafka clients from one cluster to another, you have to modify your clients to pick from a different topic as it fails over, or you have to configure them to pick from both these topics to handle the failover scenario. For example, a consumer reading from TopicA in the primary cluster upon failover has to be modified to start reading from Primary.TopicA. Alternatively, the consumers can always be configured to read from both topics.

If you want to have the same topic name across your clusters after replication, because you want to minimize changes to your clients, you can use a custom replication policy that overrides MM2’s default behavior of creating remote topics. You can find sample code on GitHub.

For an active-active setup, you have to use Kafka’s default replication policy for creating remote topics with a prefix. Having the same topic names across clusters using a custom replication policy causes an infinite loop of replication.

In this post, we use a custom replication policy with active-passive setup, in which your Kafka clients fail over in a Regional outage scenario and fail back when the outage is over.

To run a successful MirrorMaker 2 deployment, you need several connectors:

  • MirrorSourceConnector – Responsible for replicating data from topics as well as metadata about topics and partitions. This connector reads from a cluster and writes it to the cluster on which Kafka Connect is deployed.
  • HeartBeatConnector – Emits a heartbeat that gets replicated to demonstrate connectivity across clusters. We can use the internal topic heartbeats to verify that the connector is running and the cluster where the connector is running is available.
  • CheckpointConnector – Responsible for emitting checkpoints in the secondary cluster containing offsets for each consumer group in the primary cluster. To do that, it creates an internal topic called <primary-alias>.checkpoints.internal in the secondary cluster. In addition, this connector also creates the topic mm2-offset-syncs.<primary-alias>.internal in the secondary cluster, where it stores consumer offsets that are translated into the ones that make sense in another cluster. This is required as the clients fail over from the primary cluster to secondary to be able to read the messages from the secondary cluster at the correct offset. Prior to Apache Kafka 2.7, MM2 didn’t have a mechanism to sync the offsets for individual consumer groups with the __consumer_offsets internal topic in the secondary cluster. Syncing with __consumer_offsets can allow consumers to simply fail over and continue to process messages from the last offset retrieved from __consumer_offsets in the secondary cluster. Consequently, this had to be done outside of MM2 with an asynchronous process utilizing custom code. The following sample project contains code to do this translation. However, in Apache 2.7, a new feature was released that takes care of synchronizing the translated offsets directly to the _consumer_offsets topic in the cluster, so that when you switch over, you can start from last known offset. To enable this feature, you need to include the property group.offsets.enabled = true in the connector configuration.

Sample connector configurations for each of these connectors are available on GitHub. The configurations contain SASL/SCRAM-related information to connect to the cluster. Make sure the number of tasks match the number of partitions in your Kafka topics. This enables parallel processing to read multiple partitions in parallel. The configuration also uses CustomMM2ReplicationPolicy to make sure the topics are replicated with the same name across clusters. You can remove this line as long as you update the Kafka client to read from topic names with a prefix when using the MSK cluster in the secondary Region.

To deploy these connectors on the Kafka Connect cluster, log back in to the bastion host machine that acts as your administrative console. Make sure the bastion host has an IAM role that has access to the KMS key encrypting your Secrets Manager secrets corresponding to your MSK cluster. Find the IP address of one of the containers running your Kafka Connect cluster.

For instructions on reviewing your connector configuration and deploying it on Kafka Connect, see Configure and start MirrorMaker 2 connectors.

Check the topics that are created in your primary and secondary cluster. The primary MSK cluster should have a new mm2-offset-syncs.sec.internal topic and the secondary MSK cluster should have the heartbeats and pri.checkpoints.internal topics.

Step 6: Confirm the data is replicated from one Region to another

With the connectors up and running on the Kafka Connect cluster, you should now create a topic in your primary cluster and see it replicate to the secondary cluster (with a prefix, if you used the default replication policy).

After the topic replication is configured, you can start producing data into the new topic. You can use the following sample producer code for testing. You can also use a Kafka console producer or your own producer. Make sure the producer can support SASL/SCRAM based connectivity.

If you use the sample producer code, make sure to create a producer.properties file and provide the bootstrap server information of your primary cluster to the BOOTSTRAP_SERVERS_CONFIG property. Then start your producer with the following code:

java -jar KafkaClickstreamClient-1.0-SNAPSHOT.jar -t <topic-name> -pfp <properties_file_path> -nt 8 -rf 300 -sse -ssu <user name> -gsr -grn <glue schema registry name >  -gar > /tmp/producer.log 2>&1 &

The GitHub project has more details on the command line parameters and how to change the rate at which messages are produced.

Confirm the messages produced in the topic in the primary cluster are all flowing to the topic in the destination cluster by checking the message count. For this post, we use kafkacat, which supports SASL/SCRAM to count the messages:

docker run -it --network=host edenhill/kafkacat:1.6.0 -b <bootstrap-servers> -X security.protocol=SASL_SSL -X sasl.mechanism=SCRAM-SHA-512 -X sasl.username=<username>  -X sasl.password=<pwd> -t <topicname> -C -e -q| wc -l

In production environments, if the message counts are large, use your traditional monitoring tool to check the message count, because tools like kafkacat take a long time to consume messages and report on a message count.

Now that you have confirmed the producer is actively producing messages to the topic and that the data is replicated, we can spin up a consumer to consume the messages from the topic. We spin up the consumer in the primary Region, because in an active-passive setup, all activities happen in the primary Region until an outage occurs and you fail over.

You can use the following sample consumer code for testing. You can also use a Kafka console consumer or your own consumer. Make sure that the consumer code can support connectivity with SASL/SCRAM.

For the sample code, make sure to create a consumer.properties file that contains the Amazon MSK bootstrap broker information of the primary cluster for the BOOTSTRAP_SERVERS_CONFIG property. Run the following code to start the consumer:

java -jar KafkaClickstreamConsumer-1.0-SNAPSHOT.jar -t <topic> -pfp <properties file path> -nt 3 -rf 10800 -sse -ssu <username> -src <primary cluster alias> -gsr -grn <glue schema registry name> /tmp/consumer_dest.log 2>&1 &

As explained before, the consumer offsets of this consumer group get replicated to the secondary cluster and get synced up to the _consumer_offsets table. It takes a few minutes for the consumer group offset to sync from the secondary to the primary depending on the value of sync.group.offsets.interval.seconds in the checkpoint connector configuration. See the following code:

./bin/kafka-consumer-groups.sh --bootstrap-server <bootstrap-url>  --command-config /opt/ssl-user-config.properties --describe --group mm2TestConsumer1

Make sure ssl-user-config.properties contains the connectivity information:

sasl.mechanism=SCRAM-SHA-512
# Configure SASL_SSL if SSL encryption is enabled, otherwise configure SASL_PLAINTEXT
security.protocol=SASL_SSL
sasl.jaas.config=<jaas -config>

The consumer group offsets are now synced up from the primary to the secondary cluster. This helps us fail over clients to the secondary cluster because the consumer started in the primary cluster can start consuming from the secondary cluster and read from where it left off after failover.

Step 7: Fail over clients to the secondary Region

In a DR scenario, if you need to fail clients from your cluster in the primary Region to a secondary Region, follow the steps in this section.

You want to start with shutting off your consumer in the primary Region. Start the consumer in the secondary Region by updating the bootstrap server’s information pointing to the secondary MSK cluster. Because the topic name is the same across both Regions, you don’t need to change the consumer client code. This consumer starts consuming the messages from the topic even if new messages are still being produced by producers in the primary Region.

Now you can stop the producer in the primary Region (if not already stopped due to Regional failure) and start it on the secondary Region by updating the bootstrap server’s information. The consumer in the secondary Region keeps consuming messages on the topic, including the ones now being produced in the secondary Region. After the consumer and producer are failed over, you can delete the MM2 connectors on the Kafka Connect cluster using the HTTP endpoints of the connectors. See the following code:

curl -X DELETE http://<ip-address>:8083/connectors/mm2-msc

This stops all replication activities from the primary cluster to the secondary cluster. Now the MSK cluster in the primary Region is available for upgrade or any other activities.

If a Regional outage is impacting Amazon MSK or Apache Kafka on Amazon EC2, it’s highly probable that the clients, producers, and consumers running on Amazon EC2, Amazon ECS, Amazon EKS, and AWS Lambda are also impacted. In this case, you have to stop the MM2 connectors in the DR cluster because the source cluster isn’t available to replicate from. To recover clients, you can start the consumers on the secondary cluster. The consumers read messages from the topic from where it left off. Start the producers on the secondary cluster and push new messages to the topic in the secondary cluster.

Now that you have successfully failed over from the MSK cluster in the primary Region to the secondary Region, we can see how to fail back after your MSK cluster in the primary Region is ready to be operational.

Step 8: Fail back clients to the primary Region

Check the topics in your MSK cluster in the primary Region. Depending on the activity on the secondary (now primary) cluster during the DR period, you might want to start with fresh data from your secondary cluster. Follow these steps to get your primary cluster synced up with all data required:

  1. Delete all topics (if any) except the _consumer_offsets topic in the primary cluster.
  2. Create a Kafka Connect cluster deploying Fargate containers (as we walked through earlier), with brokers pointing to the MSK cluster in the primary Region.

MirrorSourceConnector can write only to the cluster where Kafka Connect is deployed. Because we want to replicate from the secondary to the primary, we need a Kafka Connect cluster associated to the primary Region.

  1. Deploy the MirrorMaker connectors (similar to what we did earlier) using the configuration samples This time, make sure the source and target bootstrap broker information is flipped on all configuration files.

These connectors are responsible for replicating data from the secondary back to the primary. Make sure to list the topic names containing your data in topics of the MirrorSourceConnector file. You don’t want to replicate topics created by Kafka Connect and MirrorMaker in the secondary Region, because that creates confusion.

This process starts replication activities from your MSK cluster in the secondary Region to the MSK cluster in the primary Region. This happens in parallel because the producers and consumers are actively writing and reading in the MSK cluster in the secondary Region.

  1. Wait until all the data topics and their messages are replicated from the MSK cluster in the secondary Region to the MSK cluster in the primary Region.

The time it takes depends on the number of messages in the topic.

  1. Check the number of messages in the topics on the MSK cluster in the primary Region.

When the number of messages is close to the number of messages in the MSK cluster in the secondary Region, it’s time to fail back your Kafka clients.

  1. Stop the consumers from the secondary Region one by one and move them to point to the primary cluster.

When the consumer is up and running, it should be able to continue to read the messages produced by producers pointing to the MSK cluster in the secondary Region. When all the consumers are healthy in the secondary, it’s time to fail back the producers as well.

  1. Stop the producers in the secondary Region’s MSK cluster and start them by pointing to the primary Region’s MSK cluster.

To enable the MirrorMaker replication back from the primary to secondary, you have to stop the MirrorMaker connectors replicating from the secondary to primary. Because we’re using CustomReplicationPolicy, which tries to use the same topic names, it’s important to have replication of data flowing only one direction, otherwise it creates a recursive loop. You have to repeat similar cleanup steps to get the replication flowing back from the primary to secondary.

Using the default replication policy in MirrorMaker 2

When you use MirrorMaker 2’s default replication policy, it creates topics with a prefix, as explained earlier. This enables you to run dual-way replication because MirrorMaker 2 ignores the topics with a prefix when replicating. This is convenient because you don’t have to delete the MirrorMaker connect configuration moving from one side to another, which makes failover and failback easier.

Make sure to update your clients to read from topics with and without prefixes, because it can read from either of the cluster as part of failover and failback. In addition, if you have a use case to enable active-active setup, it’s imperative that you choose MirrorMaker 2’s default replication policy.

Conclusion

In this post, I reviewed how to set up a highly resilient deployment across Regions for an MSK cluster using MirrorMaker 2 deployed on a distributed Kafka Connect cluster in Fargate. You can use this solution to build a data redundancy capability to meet regulatory compliance, business continuity, and DR requirements. With MirrorMaker 2, you can also set up an active-active MSK cluster, enabling clients to consume from an MSK cluster that has geographical proximity.


About the Author

Anusha Dharmalingam is a Solutions Architect at Amazon Web Services, with a passion for Application Development and Big Data solutions. Anusha works with enterprise customers to help them architect, build, and scale applications to achieve their business goals.

 

 

 

 

Preprocess logs for anomaly detection in Amazon ES

Post Syndicated from Kapil Pendse original https://aws.amazon.com/blogs/big-data/preprocess-logs-for-anomaly-detection-in-amazon-es/

Amazon Elasticsearch Service (Amazon ES) supports real-time anomaly detection, which uses machine learning (ML) to proactively detect anomalies in real-time streaming data. When used to analyze application logs, it can detect anomalies such as unusually high error rates or sudden changes in the number of requests. For example, a sudden increase in the number of food delivery orders from a particular area could be due to weather changes or due to a technical glitch experienced by users from that area. The detection of such an anomaly can facilitate quick investigation and remediation of the situation.

The anomaly detection feature of Amazon ES uses the Random Cut Forest algorithm. This is an unsupervised algorithm that constructs decision trees from numeric input data points in order to detect outliers in the data. These outliers are regarded as anomalies. To detect anomalies in logs, we have to convert the text-based log files into numeric values so that they can be interpreted by this algorithm. In ML terminology, such conversion is commonly referred to as data preprocessing. There are several methods of data preprocessing. In this post, I explain some of these methods that are appropriate for logs.

To implement the methods described in this post, you need a log aggregation pipeline that ingests log files into an Amazon ES domain. For information about ingesting Apache web logs, see Send Apache Web Logs to Amazon Elasticsearch Service with Kinesis Firehose. For a similar method for ingesting and analyzing Amazon Simple Storage Service (Amazon S3) server access logs, see Analyzing Amazon S3 server access logs using Amazon ES.

Now, let’s discuss some data preprocessing methods that we can use when dealing with complex structures within log files.

Log lines to JSON documents

Although they’re text files, usually log files have some structure to the log messages, with one log entry per line. As shown in the following image, a single line in a log file can be parsed and stored in an Amazon ES index as a document with multiple fields. This image is an example of how an entry in an Amazon S3 access log can be converted into a JSON document.

Although you can ingest JSON documents such as the preceding image as is into Amazon ES, some of the text fields require further preprocessing before you can use them for anomaly detection.

Text fields with nominal values

Let’s assume your application receives mostly GET requests and a much smaller number of POST requests. According to an OWASP security recommendation, it’s also advisable to disable TRACE and TRACK request methods because these can be misused for cross-site tracing. If you want to detect when unusual HTTP requests appear in your server logs, or when there is a sudden spike in the number of HTTP requests with methods that are normally a minority, you could do so by using the request_uri or operation fields in the preceding JSON document. These fields contain the HTTP request method, but you have to extract that and convert that into a numeric format that can be used for anomaly detection.

These are fields that have only a handful of different values, and those values don’t have any particular sequential order. If we simply convert HTTP methods to an ordered list of numbers, like GET = 1, POST = 2, and so on, we might confuse the anomaly detection algorithm into thinking that POST is somehow greater than GET, or that GET + GET equals POST. A better way to preprocess such fields is one-hot encoding. The idea is to convert the single text field into multiple binary fields, one for every possible value of the original text field. In our example, the result of this one-hot encoding is a set of nine binary fields. If the value of the field in the original log is HEAD, only the HEAD field in the preprocessed data has value 1, and all other fields are zero. The following table shows some examples.

Original Log Message Preprocessed into multiple one-hot encoded fields

HTTP Request Method

GET HEAD POST PUT DELETE CONNECT OPTIONS TRACE PATCH
GET 1 0 0 0 0 0 0 0 0
POST 0 0 1 0 0 0 0 0 0
OPTIONS 0 0 0 0 0 0 1 0 0

These generated fields data can be then processed by the Amazon ES anomaly detection feature to detect anomalies when there is a change in the pattern of HTTP requests received by your application, for example an unusually high number of DELETE requests.

Text fields with a large number of nominal values

Many log files contain HTTP response codes, error codes, or some other type of numeric codes. These codes don’t have any particular order, but the number of possible values is quite large. In such cases, one-hot encoding alone isn’t suitable because it can cause an explosion in the number of fields in the preprocessed data.

Take for example the HTTP response codes. The values are unordered, meaning that there is no particular reason for 200 being OK and 400 being Bad Request. 200 + 200 != 400 as far as HTTP response codes go. However, the number of possible values is quite large—more than 60. If we use the one-hot encoding technique, we end up creating more than 60 fields out of this 1 field, and it quickly becomes unmanageable.

However, based on our knowledge of HTTP status codes, we know that these codes are by definition binned into five ranges. Codes in the range 100–199 are informational responses, codes 200–299 indicate successful completion of the request, 300–399 are redirections, 400–499 are client errors, and 500–599 are server errors. We can take advantage of this knowledge and reduce the original values to five values, one for each range (1xx, 2xx, 3xx, 4xx and 5xx). Now this set of five possible values is easier to deal with. The values are purely nominal. Therefore, we can additionally one-hot encode these values as described in the previous section. The result after this binning and one-hot encoding process is something like the following table.

Original Log Message Preprocessed into multiple fields after binning and one-hot encoding
HTTP Response Status Code 1xx 2xx 3xx 4xx 5xx
100 (Continue) 1 0 0 0 0
101 (Switching Protocols) 1 0 0 0 0
200 (OK) 0 1 0 0 0
202 (Accepted) 0 1 0 0 0
301 (Moved Permanently) 0 0 1 0 0
304 (Not Modified) 0 0 1 0 0
400 (Bad Request) 0 0 0 1 0
401 (Unauthorized) 0 0 0 1 0
404 (Not Found) 0 0 0 1 0
500 (Internal Server Error) 0 0 0 0 1
502 (Bad Gateway) 0 0 0 0 1
503 (Service Unavailable) 0 0 0 0 1

This preprocessed data is now suitable for use in anomaly detection. Spikes in 4xx errors or drops in 2xx responses might be especially important to detect.

The following Python code snippet shows how you can bin and one-hot encode HTTP response status codes:

def http_status_bin_one_hot_encoding(http_status):
    # returns one hot encoding based on http response status bin
    # bins are: 1xx, 2xx, 3xx, 4xx, 5xx
    if 100 <= http_status <= 199: # informational responses
        return (1, 0, 0, 0, 0)
    elif 200 <= http_status < 299: # successful responses
        return (0, 1, 0, 0, 0)
    elif 300 <= http_status < 399: # redirects
        return (0, 0, 1, 0, 0)
    elif 400 <= http_status < 499: # client errors
        return (0, 0, 0, 1, 0)
    elif 500 <= http_status < 599: # server errors
        return (0, 0, 0, 0, 1)

http_1xx, http_2xx, http_3xx, http_4xx, http_5xx = http_status_bin_one_hot_encoding(status)

log_entry = {
    'timestamp': timestamp,
    'bucket': "somebucket",
    'key': "somekey",
    'operation': "REST.GET.VERSIONING",
    'request_uri': "GET /awsexamplebucket1?versioning HTTP/1.1",
    'status_code': status,
    'http_1xx': http_1xx,
    'http_2xx': http_2xx,
    'http_3xx': http_3xx,
    'http_4xx': http_4xx,
    'http_5xx': http_5xx,
    'error_code': "-",
    'bytes_sent': 113,
    'object_size': 0
}

Text fields with ordinal values

Some text fields in log files contain values that have a relative sequence. For example, a log level field might contain values like TRACE, DEBUG, INFO, WARN, ERROR, and FATAL. This is a sequence of increasing severity of the log message. As shown in the following table, these string values can be converted to numeric values in a way that retains this relative sequence.

Log Level (Original Log Message)

Preprocessed Log Level

TRACE

1

DEBUG

2

INFO

3

WARN

4

ERROR

5

FATAL

6

IP addresses

Log files often have IP addresses that can contain a large number of values, and it doesn’t make sense to bin these values together using the method described in the previous section. However, these IP addresses might be of interest from a geolocation perspective. It might be important to detect an anomaly if an application starts getting accessed from an unusual geographic location. If geographic information like country or city code isn’t directly available in the logs, you can get this information by geolocating the IP addresses using third-party services. Effectively, this is a process of binning the large number of IP addresses into a considerably smaller number of country or city codes. Although these country and city codes are still nominal values, they can be used with the cardinality aggregation of Amazon ES.

After we apply these preprocessing techniques to our example Amazon S3 server access logs, we get the resulting JSON log data:

{
    "bucket_owner": "", //string
    "bucket": "awsexamplebucket1", //string
    "timestamp": "06/Feb/2019:00:00:38 +0000",
    "remote_ip": "192.0.2.3", //string
    "country_code": 100, //numeric field generated during pre-processing
    "requester": "", //string
    "request_id": "3E57427F3EXAMPLE",
    "operation": "REST.GET.VERSIONING",
    "key": "-",
    "request_uri": "GET /awsexamplebucket1?versioning HTTP/1.1",
    "http_method_get": 1, //nine one-hot encoded fields generated during pre-processing
    "http_method_post": 0,
    "http_method_put": 0,
    "http_method_delete": 0,
    "http_method_head": 0,
    "http_method_connect": 0,
    "http_method_options": 0,
    "http_method_trace": 0,
    "http_method_patch": 0,
    "http_status": 200,
    "http_1xx": 0, //five one-hot encoded fields generated during pre-processing
    "http_2xx": 1,
    "http_3xx": 0,
    "http_4xx": 0,
    "http_5xx": 0,
    "error_code": "-",
    "bytes_sent": 113,
    "object_size": "-",
    "total_time": 7,
    "turn_around_time": "-",
    "referer": "-",
    "user_agent": "S3Console/0.4",
    "version_id": "-",
    "host_id": "", //string
    "signature_version": "SigV2",
    "cipher_suite": "ECDHE-RSA-AES128-GCM-SHA256",
    "authentication_type": "AuthHeader",
    "host_header": "awsexamplebucket1.s3.us-west-1.amazonaws.com",
    "tls_version": "TLSV1.1"
}

This data can now be ingested and indexed into an Amazon ES domain. After you set up the log preprocessing pipeline, the next thing to configure is an anomaly detector. Amazon ES anomaly detection allows you to specify up to five features (fields in your data) in a single anomaly detector. This means the anomaly detector can learn patterns in data based on the values of up to five fields.

Aggregations

You must specify an appropriate aggregation function for each feature. This is because the anomaly detector aggregates the values of all documents ingested in each detector interval to produce a single aggregate value, and then that value is used as the input to the algorithm that automatically learns the patterns in data. The following diagram depicts this process.

After you configure the right features and corresponding aggregation functions, the anomaly detector starts to initialize. After processing a sufficient amount of data, the detector enters the running state.

To help you get started with anomaly detection on your own logs, the following table shows the preprocessing techniques and aggregation functions that might make sense for some common log fields.

Log Field Name Preprocessing Aggregation
HTTP response status code One-hot encoding sum
Client IP address IP geolocation to a country or city code cardinality
Log Message Level (INFO, WARN, ERR, FATAL etc.) One-hot encoding sum
Error or Exception names Map to numeric codes, additional binning and one-hot encoding if there are large number of possible values cardinality if using single numeric code field; sum if using one-hot encodings
Object Size / Bytes Sent / Content-Length None, use numeric value itself min, max, average
To monitor general traffic levels, you can use any numeric field like response code or bytes sent to count the number of log entries per detector interval None, use numeric value itself count (value_count) – simply counts the number of documents that have a value for this field

Conclusion

IT teams can use the anomaly detection feature of Amazon ES to implement proactive monitoring and alerting for applications and infrastructure logs. Anyone with basic scripting or programming skills should be able to implement the log preprocessing techniques discussed in this post—you don’t need to have in-depth knowledge of ML or data science. The anomaly detection feature is available in Amazon ES domains running Elasticsearch version 7.4 or later. To get started, see Anomaly detection in Amazon Elasticsearch Service.


About the Author

Kapil Pendse is a Senior Solutions Architect with Amazon Web Services (Singapore) and has over 15 years of experience building technology solutions across multiple domains such as cloud computing, embedded systems, and machine learning. In his free time, Kapil likes to bike along Singapore’s coastal parks and enjoys the occasional company of otters.

DOCOMO empowers business units with self-service knowledge access thanks to agile AWS QuickSight business intelligence

Post Syndicated from Daiki Itoh original https://aws.amazon.com/blogs/big-data/docomo-empowers-business-units-with-self-service-knowledge-access-thanks-to-agile-aws-quicksight-business-intelligence/

NTT DOCOMO is the largest telecom company in Japan. It provides innovative, convenient, and secure mobile services that enable customers to realize smarter lives. More than 73 million customers in Japan connect through its advanced wireless networks, including a nationwide LTE network and one of the world’s most progressive LTE Advanced networks. In addition to a wide range of communications-related services, DOCOMO offers value-added Smart Life offerings, including those under its +d initiative. These include the d POINT CLUB customer loyalty point program and d Payment, which enables customers to shop online and make electronic payments using their mobile devices.

All of these services create tremendous amounts of data, providing an opportunity of the company to extract insights that drive business value. To accomplish this goal, the company uses Amazon QuickSight and AWS data technologies to help better understand customers and support sales teams.

Many products, one data team

The company’s data team manages the marketing platform, which includes capturing, analyzing, and reporting on data for DOCOMO Smart Life businesses. For data collection and aggregation, it uses Amazon Redshift as a data warehouse. Amazon Redshift is ideal for storing and querying large amounts of structured data with high performance and reliability.

“With millions of connected customers, we need a highly capable data platform,” says Issei Nishimura, Manager, Marketing Platform Planning Department at DOCOMO. “AWS delivers the right levels of performance.”

In addition to regular reporting from the data warehouse, the company’s business leadership is interested in real-time key performance indicators (KPIs). For d Payment, these include metrics such as active users on a monthly and daily basis. Based on these analyses, leadership can decide how to improve the usage or the sales of each service.

Helping business users access analytics

However, when non-technical decision-makers requested self-service access, the data team had no easy way to provide it—until it decided to adopt QuickSight. QuickSight is a fast, cloud-powered business intelligence service that was easy to deploy and required no on-premises infrastructure.

“Because of native integration with existing AWS services, especially Amazon Redshift, we were able to roll out Amazon QuickSight quickly and easily,” Nishimura says. “In fact, it only took one day to build our first QuickSight dashboards.”

The automated data pipeline starts with Amazon Elastic Compute Cloud (Amazon EC2) to perform extract, transform, and load (ETL) on data, which is then pushed to Amazon Redshift. Amazon SageMaker aggregates and exports it to Amazon Simple Storage Service (Amazon S3), from which QuickSight accesses data for dashboards.

The following is a sample dashboard from NTT DOCOMO. For every marketing campaign, NTT DOCOMO analyzes the number of new unique users of d Payment relative to the number of registrations to the campaign. This allows them to understand how much effect the campaign had on each user category.

With pay-per-session pricing, the company can provide ad hoc data access for line of business decision-makers without capital investment and at low total cost. At the same time, QuickSight can scale to support as many users as needed.

The dashboards can be accessed from any device, providing convenience to the product management teams. It’s easy and intuitive enough for non-technical users—there’s no need for them to write SQL or scripts.

Faster insights to compete in an accelerated marketplace

Previously, it would take a few days to meet requests for ad hoc reports. Now, when people want to check a KPI, they can do it instantly.

“Our team can focus on managing the data warehouse and the regular reporting cadence because the volume of out-of-band requests has been reduced,” Nishimura says.

The solution has had immediate benefits for d Payment sales representatives, who work closely with retailers that use the payment service. These sales representatives want to be able to present relevant KPIs and demographics to the retailers to show trends and improve the services. With QuickSight, the sales team can generate appealing, up-to-date visualizations of the relevant information. They no longer have to spend time building graphics because the QuickSight visualizations are ready to use right away.

Summary

DOCOMO is a data-driven company, using insights to continuously improve its services. AWS enables them to run an enterprise data warehouse that ingests millions of data points with unlimited scale—and provides services such as QuickSight that give non-technical users rapid access to the real-time information they need.

“With these solutions, DOCOMO is breaking down barriers to enable greater use of analytics across the organization,” Nishimura says.


About the Authors

Daiki Itoh is a Business Development Manager for Amazon QuickSight in Japan.

 

 

 

 

Chirag Dhull is a Principal Product Marketing Manager for Amazon QuickSight.

 

 

 

 

 

Amazon Redshift identity federation with multi-factor authentication

Post Syndicated from Manash Deb original https://aws.amazon.com/blogs/big-data/amazon-redshift-identity-federation-with-multi-factor-authentication/

Password-based access control alone is not considered secure enough, and many organizations are adopting multi-factor authentication (MFA) and single sign-on (SSO) as a de facto standard to prevent unauthorized access to systems and data. SSO frees up time and resources for both administrators and end users from the painful process of password-based credential management. MFA capability adds an extra layer of protection, improving organizational security and confidence.

Amazon Redshift is a fast, fully-managed cloud data warehouse that provides browser-based plugins for JDBC/ODBC drivers, which helps you easily implement SSO capabilities added with MFA to secure your data warehouse, and also helps automation and enforcement of data access policies across the organization through directory federation.

You can integrate Amazon Redshift with different identity providers (IdPs) like Microsoft Azure Active Directory, Ping, Okta, OneLogin, and more. For more information, see Using a credentials provider plugin. You may already have pre-existing integrations for federating to AWS using industry standard Security Assertion Markup Language (SAML) with these IdPs. In this post, we explain the steps to integrate the Amazon Redshift browser-based SAML plugin to add SSO and MFA capability with your federation IdP.

Solution overview

With this integration, users get authenticated to Amazon Redshift using the SSO and MFA credentials of the IdP application, which uses SAML tokens to map the IdP user identity (like login name or email address) as the database user in Amazon Redshift. It can also map users’ directory group memberships to corresponding database groups in Amazon Redshift, which allows you to control authorization grants for database objects in Amazon Redshift.

The following diagram illustrates our solution architecture.

High-level steps for SSO-MFA integration

The following diagram illustrates the authentication flow with the browser SAML plugin.

We complete the following high-level steps to set up the SSO and MFA authentication capability to an Amazon Redshift data warehouse using a browser-based SAML plugin with our IdP:

  1. Create a custom SAML 2.0 application with the IdP with the following configurations:
    1. A redirect URI (for example, http://localhost:7890/redshift/).
    2. MFA capability enabled.
    3. Relevant SAML claim attributes.
    4. Appropriate directory groups and users with the IdP.
  2. Add appropriate AWS Identity and Access Management (IAM) permissions:
    1. Add an IdP.
    2. Add appropriate IAM roles for the IdP.
    3. Use IAM policies to add appropriate permissions to the roles to access the Amazon Redshift cluster.
  3. Set up Amazon Redshift with group-level access control:
    1. Connect to Amazon Redshift using superuser credentials.
    2. Set up appropriate database groups in Amazon Redshift.
    3. Grant access permissions appropriate to relevant groups.
  4. Connect to Amazon Redshift with your JDBC/ODBC SQL client:
    1. Configure connection attributes for the IdP.
    2. Enable browser-based MFA.
    3. Connect to Amazon Redshift.

Create a custom SAML 2.0 application with the IdP

The first step of this setup is to create a SAML 2.0 application with your IdP. The various directory groups and users that need to log in to the Amazon Redshift cluster should have access to this SAML application. Provide an appropriate redirect_uri (for example, http://localhost:7890/redshift/) in the SAML application configuration, so that the IdP can seamlessly redirect SAML responses over HTTPS to this URI, which then allows the Amazon Redshift JDBC/ODBC driver to authenticate and authorize the user.

The following screenshot shows a SAML application configuration with PingOne as the IdP (for more details on PingOne Amazon Redshift SSO federation, see Federating single sign-on access to your Amazon Redshift cluster with PingIdentity).

You need to download the metadata XML file from the provider (as shown in the preceding screenshot) and use it in a later step to create a SAML IdP in IAM.

Next, you can enable MFA for this application so that users are authorized to access Amazon Redshift only after they pass the two-factor authentication with MFA.

The following screenshot shows the MFA configuration settings with PingOne as the IdP.

As part of the IdP application setup, map the following claim attributes so that Amazon Redshift can access them using the SAML response.

Claim Attribute Namespace Description Example Value
Role https://aws.amazon.com/SAML/Attributes/Role aws_iam_role_arn_for_identity_provider, aws_identity_provider_arn arn:aws:iam::<account>:role/PingOne_Redshift_SSO_Role,arn:aws:iam::<account>:saml-provider/PingOne
RoleSessionName https://aws.amazon.com/SAML/Attributes/RoleSessionName Identification for the user session, which in most cases is the email_id of the user. email
AutoCreate https://redshift.amazon.com/SAML/Attributes/AutoCreate If this parameter is set, new users authenticated by the IdP are automatically created in Amazon Redshift. "true"
DbUser https://redshift.amazon.com/SAML/Attributes/DbUser Identification for the user session, which in most cases is the email_id of the user. email
DbGroups https://redshift.amazon.com/SAML/Attributes/DbGroups Amazon Redshift database group names for the user, which in most cases is the same as the directory groups the user belongs to. data_scientist

The following screenshot is an example of these claim attributes set up for PingOne as IdP.

Apart from setting up the SAML application, you also need to set up appropriate directory groups and users with your IdP, which you will use to grant SSO and MFA access to users for different applications like AWS Single Sign-On and Amazon Redshift application single sign-on.

The following screenshot is an example of this user group set up for PingOne as IdP.

Add appropriate permissions using IAM

After you complete the configuration settings with the IdP, the next step is to configure appropriate permissions in IAM in your AWS account for identity federation using IAM.

The first step is to add an IdP using the SAML metadata XML file downloaded from the IdP’s SAML application you created in the previous step.

After you add the IdP, you need to create an IAM role with that IdP as a trusted entity.

Set the value of the SAML:aud attribute to the same redirect URI defined in your IdP’s SAML application setup (for example, http://localhost:7890/redshift/).

Create a new IAM policy with the necessary permissions needed by the users to access Amazon Redshift, and attach it to the IAM role you created earlier. See the following sample policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "redshift:CreateClusterUser",
                "redshift:JoinGroup",
                "redshift:GetClusterCredentials",
                "redshift:ListSchemas",
                "redshift:ListTables",
                "redshift:ListDatabases",
                "redshift:ExecuteQuery",
                "redshift:FetchResults",
                "redshift:CancelQuery",
                "redshift:DescribeClusters",
                "redshift:DescribeQuery",
                "redshift:DescribeTable"
            ],
            "Resource": [
                "arn:aws:redshift:<region>:<account>:cluster:<clusterName>",
                "arn:aws:redshift:<region>:<account>:dbuser:<clusterName>/${redshift:DbUser}",
                "arn:aws:redshift:<region>:<account>:dbname:<clusterName>/${redshift:DbName}",
                "arn:aws:redshift:<region>:<account>:dbgroup:<clusterName>/bi_users_group",
                "arn:aws:redshift:<region>:<account>:dbgroup:<clusterName>/etl_users_group",
                "arn:aws:redshift:<region>:<account>:dbgroup:<clusterName>/analysts_group"
            ]
        }
    ]
}

You can also use an AWS CloudFormation template to automate this IAM setup by uploading the IdP- specific SAML metadata XML file from the SAML application you created.

The following template takes care of creating the IAM resources required for this setup. You need to enter the following parameters to the template:

  • FederationProviderName – Enter a suitable name for the IAM IdP.
  • FederationXmlS3Location – Enter the Amazon Simple Storage Service (Amazon S3) URI where you uploaded the SAML metadata XML file from your IdP’s SAML application.
  • RedshiftClusterEndpoint – Enter the endpoint URI of your Amazon Redshift cluster. You can get this URI via the Amazon Redshift console. If you have multiple Amazon Redshift clusters, you may need to modify this CloudFormation template to add permissions for all the clusters in your account.

Grant group-level access control with Amazon Redshift

If you haven’t set up an Amazon Redshift cluster yet, see Getting started with Amazon Redshift for a step-by-step guide to create a new cluster in your AWS account.

If you already have an Amazon Redshift cluster, note the primary user credentials for that cluster and refer to the following resources to connect to that cluster using a SQL client like SQL Workbench/J and the latest Amazon Redshift JDBC driver with AWS SDK:

When you’re logged in, you need to set up the appropriate groups in Amazon Redshift. The following example code sets up three database groups for business intelligence (BI) users, analysts, and a cross-user group in Amazon Redshift:

CREATE GROUP bi_users_group;
CREATE GROUP analysts_group;
CREATE GROUP cross_user_group;

You can then set up database objects and appropriate access permissions for them. In the following code, we set up two schemas for analysts and BI users and then grant access on them to the relevant groups:

CREATE SCHEMA IF NOT EXISTS bi_schema;
CREATE SCHEMA IF NOT EXISTS analysts_schema;

GRANT SELECT ON ALL TABLES IN SCHEMA bi_schema TO GROUP bi_users_group, GROUP cross_user_group;
ALTER DEFAULT PRIVILEGES IN SCHEMA bi_schema GRANT SELECT ON TABLES TO GROUP bi_users_group, GROUP cross_user_group;

GRANT SELECT ON ALL TABLES IN SCHEMA analysts_schema TO GROUP analysts_group, GROUP cross_user_group;
ALTER DEFAULT PRIVILEGES IN SCHEMA analysts_schema GRANT SELECT ON TABLES TO GROUP analysts_group, GROUP cross_user_group;

These group-level grants allow federated users to access Amazon Redshift objects based on their associated permissions. As explained earlier in this post, you can map your IdP directory groups to their respective database groups in Amazon Redshift, which allows you to control both authentication and authorization to Amazon Redshift based on the IdP credentials.

However, you may choose to control the authorization part within the database itself instead of relying on IdP directory groups. In this case, you use the IdP only to facilitate system authentication to Amazon Redshift, but for data authorization, you map the users and groups manually using alter group statements, as in the following code:

CREATE USER "[email protected]" PASSWORD DISABLE;
ALTER GROUP bi_users_group ADD USER "[email protected]";
ALTER GROUP cross_user_group ADD USER "[email protected]";

In the preceding example, we create a new user, exampleuser, with password disabled. We can use the IdP credentials for this user to authenticate and therefore it doesn’t need a password. But to provide authorization, we added this user to the bi_user and cross_user groups, so that it can inherit the permissions granted to these groups and can work seamlessly with SSO and MFA federation.

Configure your JDBC/ODBC SQL client to use the browser-based plugin to connect to Amazon Redshift

In this step, you can test Amazon Redshift connectivity through your IdP using a SQL client like SQL Workbench/J.

You need to provide the following configurations in the SQL client.

Property Value
Driver Amazon Redshift
URL jdbc:redshift:iam://<your-redshift-cluster-endpoint>

Additionally, you need to set up the following extended properties.

Property Value
login_url This is the SAML application’s login page
plugin_name com.amazon.redshift.plugin.BrowserSamlCredentialsProvider
idp_response_timeout Number of seconds to allow for SSO authentication to complete before timing out

The following screenshot shows the configurations to connect SQLWorkbench/J client with PingOne as IdP.

The following table summarizes our property values.

Property Value
login_url https://sso.connect.pingidentity.com/sso/sp/initsso?saasid=<your_saas_id>&idpid=<your_idp_id> (This is the SAML application’s SSO URL from your IDP)
plugin_name com.amazon.redshift.plugin.BrowserSamlCredentialsProvider
idp_response_timeout 120

When you choose Test or OK, a new web browser window opens that shows the SAML application’s login page.

If this is the first time you’re logging in to PingOne, and haven’t set up MFA before, you can download and pair the PingID mobile app on iOS or Android.

After the PingID app is installed and paired, it pushes a notification to your phone to approve or deny the MFA authorization. When the MFA succeeds, the browser displays a success message on the redirect page.

After the connection is successful, let’s run a SQL query to confirm that the correct user identification was passed and also confirm that the correct database group was mapped for this SQL user session, based on the user’s directory group. In this case, the user manish was mapped to the bi_users_group directory group in PingOne. We should see the SQL session reflect the corresponding database group in Amazon Redshift.

We were able to successfully accomplish MFA-based SSO identity federation with PingOne using the browser-based plugin that Amazon Redshift provides.

IdP-specific configurations

As mentioned earlier, the first step of this process is to set up SSO for Amazon Redshift with your IdP. The setup steps for that may vary depending on the provider. For more information, see the following resources:

The following videos also cover these details if you want to view them in action:

Conclusion

Amazon Redshift makes it easy to integrate identity federation with your existing third-party identity providers, allowing you to centralize user and access management in a single corporate directory. In this post, we showed how the Amazon Redshift browser-based plugin works with popular SAML-based IdPs to provide an additional security layer with MFA authentication. You can also use the instructions in this post to set up various SAML-based IdPs (like Ping, Okta, JumpCloud, and OneLogin) to implement SSO and MFA with Amazon Redshift.


About the Authors

Manash Deb is a Senior Analytics Specialist Solutions Architect at AWS. He has worked on building end-to-end data driven solutions in different database and data warehousing technologies for over fifteen years. He loves to learn new technologies and solving, automating and simplifying customer problems with easy-to-use cloud data solutions on AWS.

 

 

Manish Vazirani is an Analytics Specialist Solutions Architect at Amazon Web Services.

 

 

 

 

 

 

Monitor your Amazon ES domains with Amazon Elasticsearch Service Monitor

Post Syndicated from Jon Handler original https://aws.amazon.com/blogs/big-data/monitor-your-amazon-es-domains-with-amazon-elasticsearch-service-monitor/

Amazon Elasticsearch Service (Amazon ES) is a fully managed service that you can use to deploy, secure, and run Elasticsearch cost-effectively at scale. The service provides support for open-source Elasticsearch APIs, managed Kibana, and integration with Logstash and other AWS services.

Amazon ES provides a wealth of information about your domain, surfaced through Amazon CloudWatch metrics (for more information, see Instance metrics). Your domain’s dashboard on the AWS Management Console collects key metrics and provides a view of what’s going on with that domain. This view is limited to that single domain, and for a subset of the available metrics. What if you’re running many domains? How can you see all their metrics in one place? You can set CloudWatch alarms at the single domain level, but what about anomaly detection and centralized alerting?

In this post, we detail Amazon Elasticsearch Service Monitor, an open-source monitoring solution for all the domains in your account, across all Regions, backed by a set of AWS CloudFormation templates delivered through the AWS Cloud Development Kit (AWS CDK). The templates deploy an Amazon ES domain in a VPC, an Nginx proxy for Kibana access, and an AWS Lambda function. The function is invoked by CloudWatch Events to pull metrics from all your Amazon ES domains and send them to the previously created monitoring domain for your review.

Your Amazon ES monitoring domain is an ideal way to monitor your Amazon ES infrastructure. We provide dashboards at the account and individual domain level. We also provide basic alerts that you can use as a template to build your own alerting solution.

Prerequisites

To bootstrap the solution, you need a few tools in your development environment:

Create and deploy the AWS CDK monitoring tool

Complete the following steps to set up the AWS CDK monitoring tool in your environment. Depending on your operating system, the commands may differ. This walkthrough uses Linux and bash.

Clone the code from the GitHub repo:

# clone the repo
$ git clone https://github.com/aws-samples/amazon-elasticsearch-service-monitor.git
# move to directory
$ cd amazon-elasticsearch-service-monitor

We provide a bash bootstrap script to prepare your environment for running the AWS CDK and deploying the architecture. The bootstrap.sh script is in the amazon-elasticsearch-service-monitor directory. The script creates a Python virtual environment and downloads some further dependencies. It creates an Amazon Elastic Compute Cloud (Amazon EC2) key pair to facilitate accessing Kibana, then adds that key pair to your local SSH setup. Finally, it prompts for an email address where the stack sends alerts. You can edit email_default in the script or enter it at the command line when you run the script. See the following code:

$ bash bootstrap.sh
Collecting astroid==2.4.2
  Using cached astroid-2.4.2-py3-none-any.whl (213 kB)
Collecting attrs==20.3.0
  Using cached attrs-20.3.0-py2.py3-none-any.whl (49 kB)

After the script is complete, enter the Python virtual environment:

$ source .env/bin/activate
(.env) $

Bootstrap the AWS CDK

The AWS CDK creates resources in your AWS account to enable it to track your deployments. You bootstrap the AWS CDK with the bootstrap command:

# bootstrap the cdk
(.env) $ cdk bootstrap aws://yourAccountID/yourRegion

Deploy the architecture

The monitoring_cdk directory collects all the components that enable the AWS CDK to deploy the following architecture.

You can review amazon-elasticsearch-service-monitor/monitoring_cdk/monitoring_cdk_stack.py for further details.

The architecture has the following components:

  • An Amazon Virtual Private Cloud (Amazon VPC) spanning two Amazon EC2 Availability Zones.
  • An Amazon ES cluster with two t3.medium data nodes, one in each Availability Zone, with 100 GB of EBS storage.
  • An Amazon DynamoDB table for tracking the timestamp for the last pull from CloudWatch.
  • A Lambda function to fetch CloudWatch metrics across all Regions and all domains. By default, it fetches the data every 5 minutes, which you can change if needed.
  • An EC2 instance that acts as an SSH tunnel to access Kibana, because our setup is secured and in a VPC.
  • A default Kibana dashboard to visualize metrics across all domains.
  • Default email alerts to the newly launched Amazon ES cluster.
  • An index template and Index State Management (ISM) policy to delete indexes older than 366 days. (You can change this to a different retention period if needed.)
  • A monitoring stack with the option to enable UltraWarm (UW), which is disabled by default. You can change the settings in the monitoring_cdk_stack.py file to enable UW.

The monitoring_cdk_stack.py file contains several constants at the top that let you control the domain configuration, its sizing, and the Regions to monitor. It also specifies the username and password for the admin user of your domain. You should edit and replace those constants with your own values.

For example, the following code indicates which Regions to monitor:

REGIONS_TO_MONITOR='["us-east-1", "us-east-2", "us-west-1", "us-west-2", "af-south-1", "ap-east-1", "ap-south-1", "ap-northeast-1", "ap-northeast-2", "ap-southeast-1", "ap-southeast-2", "ca-central-1", "eu-central-1", "eu-west-1", "eu-west-2", "eu-west-3", "eu-north-1", "eu-south-1", "me-south-1",   "sa-east-1"]'

Run the following command:

(.env)$ cdk deploy

The AWS CDK prompts you to apply security changes; enter y for yes.

After the app is deployed, you get the Kibana URL, user, and password to access Kibana. After you log in, use the following sections to navigate around dashboards and alerts.

After the stack is deployed, you receive an email to confirm the subscription; make sure to confirm the email to start getting the alerts.

Pre-built monitoring dashboards

The monitoring tool comes with pre-built dashboards. To access them, complete the following steps:

  1. Navigate to the IP obtained after deployment.
  2. Log in to Kibana.
    Be sure to use the endpoint you received, provided as an output from the cdk deploy command
  3. In the navigation pane, choose Dashboard.

The Dashboards page displays the default dashboards.

The Domain Metrics At A glance dashboard gives a 360-degree view of all Amazon ES domains across Regions.

The Domain Overview dashboard gives more detailed metrics for a particular domain, to help you deep dive into issues in a specific domain.

Pre-built alerts

The monitoring framework comes with pre-built alerts, as summarized in the following table. These alerts notify you on key resources like CPU, disk space, and JVM. We also provide alerts for cluster status, snapshot failures, and more. You can use the following alerts as a template to create your own alerts and monitoring for search and indexing latencies and volumes, for example.

Alert Type Frequency
Cluster Health – Red 5 Min
Cluster Index Writes Blocked 5 Min
Automated Snapshot Failure 5 Min
JVM Memory Pressure > 80% 5 Min
CPU Utilization > 80% 15 Min
No Kibana Healthy Nodes 15 Min
Invalid Host Header Requests 15 Min
Cluster Health – Yellow 30 Min

Clean up

To clean up the stacks, destroy the monitoring-cdk stack; all other stacks are torn down due to dependencies:

# Enter into python virtual environment
$ source .env/bin/activate
(.env)$ cdk destroy

CloudWatch logs need to be removed separately.

Pricing

Running this solution incurs charges of less than $10 per day for one domain, with an additional $2 per day for each additional domain.

Conclusion

In this post, we discussed Amazon Elasticsearch Service Monitor, an open-source monitoring solution for all the domains in your account, across all Regions. Amazon ES monitoring domains are an ideal way to monitor your Amazon ES infrastructure. Try it out and leave your thoughts in the comments.


About the Authors

Jon Handler (@_searchgeek) is a Principal Solutions Architect at Amazon Web Services based in Palo Alto, CA. Jon works closely with the CloudSearch and Elasticsearch teams, providing help and guidance to a broad range of customers who have search workloads that they want to move to the AWS Cloud. Prior to joining AWS, Jon’s career as a software developer included four years of coding a large-scale, eCommerce search engine.

 

 

 

Prashant Agrawal is a Specialist Solutions Architect at Amazon Web Services based in Seattle, WA.. Prashant works closely with Amazon Elasticsearch team, helping customers migrate their workloads to the AWS Cloud. Before joining AWS, Prashant helped various customers use Elasticsearch for their search and analytics use cases.

Hydrate your data lake with SaaS application data using Amazon AppFlow

Post Syndicated from Ninad Phatak original https://aws.amazon.com/blogs/big-data/hydrate-your-data-lake-with-saas-application-data-using-amazon-appflow/

Organizations today want to make data-driven decisions. The data could lie in multiple source systems, such as line of business applications, log files, connected devices, social media, and many more. As organizations adopt software as a service (SaaS) applications, data becomes increasingly fragmented and trapped in different “data islands.” To make decision-making easier, organizations are building data lakes, which is a centralized repository that allows you to store all your structured and unstructured data at any scale. You can store your data as is, without having to first structure the data, and run different types of analytics—from dashboards and visualizations to big data processing, ad hoc analytics, and machine learning (ML) to guide better decisions.

AWS provides services such as AWS Glue, AWS Lake Formation, Amazon Database Migration Service (AWS DMS), and many third-party solutions on AWS Marketplace to integrate data from various source systems into the Amazon Simple Storage Service (Amazon S3) data lake. If you’re using SaaS applications like Salesforce, Marketo, Slack, and ServiceNow to run your business, you may need to integrate data from these sources into your data lake. You likely also want to easily integrate these data sources without writing or managing any code. This is precisely where you can use Amazon AppFlow.

Amazon AppFlow is a fully managed integration service that enables you to securely transfer data between SaaS applications like Salesforce, Marketo, Slack, and ServiceNow and AWS services like Amazon S3 and Amazon Redshift. With Amazon AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event in real time, or on demand. You can configure data transformations such as data masking and concatenation of fields as well as validate and filter data (omitting records that don’t fit a criteria) to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow automatically encrypts data in motion, and optionally allows you to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats. For a complete list of all the SaaS applications that can be integrated with Amazon AppFlow, see Amazon AppFlow integrations.

In this post, we look at how to integrate data from Salesforce into a data lake and query the data via Amazon Athena. Amazon AppFlow recently announced multiple new capabilities such as availability of APIs and integration with AWS CloudFormation. We take advantage of these new capabilities and deploy the solution using a CloudFormation template.

Solution architecture

The following diagram depicts the architecture of the solution that we deploy using AWS CloudFormation.

As seen in the diagram, we use Amazon AppFlow to integrate data from Salesforce into a data lake on Amazon S3. We then use Athena to query this data with the table definitions residing in the AWS Glue Data Catalog.

Deploy the solution with AWS CloudFormation

We use AWS CloudFormation to deploy the solution components in your AWS account. Choose an AWS Region for deployment where the following services are available:

  • Amazon AppFlow
  • AWS Glue
  • Amazon S3
  • Athena

You need to meet the following prerequisites before deploying the solution:

  • Have a Salesforce account with credentials authorized to pull data using APIs.
  • If you’re deploying the stack in an account using the Lake Formation permission model, validate the following settings:
    • The AWS Identity and Access Management (IAM) user used to deploy the stack is added as a data lake administrator under Lake Formation, or the IAM user used to deploy the stack has IAM privileges to create databases in the AWS Glue Data Catalog.
    • The Data Catalog settings under Lake Formation are configured to use only IAM access control for new databases and new tables in new databases. This makes sure that all access to the newly created databases and tables in the Data Catalog are controlled solely using IAM permissions. The following screenshot shows the Data catalog settings page on the Lake Formation console, where you can set these permissions.

These Lake Formation settings are required so that all permissions to the Data Catalog objects are controlled using IAM only.

Although you need these Lake Formation settings for the CloudFormation stack to deploy properly, in a production setting we recommend you use Lake Formation to govern access to the data in the data lake. For more information about Lake Formation, see What Is AWS Lake Formation?

We now deploy the solution and the following components:

  • An Amazon AppFlow flow to integrate Salesforce account data into Amazon S3
  • An AWS Glue Data Catalog database
  • An AWS Glue crawler to crawl the data pulled into Amazon S3 so that it can be queried using Athena.
  1. On the Amazon AppFlow console, on the Connections page, choose Create connection.
  2. For Connection name, enter a name for your connection.
  3. Choose Continue.

You’re redirected to the Salesforce login page, where you enter your Salesforce account credentials.

  1. Enter the appropriate credentials and grant OAuth2 access to the Amazon AppFlow client in the next step, after which a new connector profile is set up in your AWS account.
  2. To deploy the remaining solution components, choose Launch Stack:
  3. For Stack name, enter an appropriate name for the CloudFormation stack.
  4. For Parameters, enter the name of the Salesforce connection you created.
  5. Choose Next.
  6. Follow through the CloudFormation stack creation wizard, leaving rest of the default values unchanged.
  7. On the final page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  8. Choose Create stack.
  9. Wait for the stack status to change to CREATE_COMPLETE.
  10. On the Outputs tab of the stack, record the name of the S3 bucket.

Run the flow

The CloudFormation stack has deployed a flow named SFDCAccount. Open the flow to see the configuration. The flow has been configured to do the following:

  • Pull the account object from your Salesforce account into a S3 bucket. The flow pulls certain attributes from the object in Parquet format.
  • Mask the last five digits of the phone number associated with the Salesforce account.
  • Build a validation on the Account ID field that ignores the record if the value is NULL.

Make sure that all these attributes pulled by the flow are part of your account object in Salesforce. Make any additional changes that you may want to the flow and save the flow.

  1. Run the flow by choosing Run flow.
  2. When the flow is complete, navigate to the S3 bucket created by the CloudFormation stack to confirm its contents.

The Salesforce account data is stored in Parquet format in the SFDCData/SFDCAccount/ folder in the S3 bucket.

  1. On the AWS Glue console, run the crawler AppFlowGlueCrawler.

This crawler has been created by the CloudFormation stack and is configured to crawl the S3 bucket and create a table in the appflowblogdb database in the Data Catalog.

When the crawler is complete, a table named SFDCAccount exists in the appflowblogdb database.

  1. On the Athena console, run the following query:
    Select * from appflowblogdb.SFDCAccount limit 10;

The output shows the data pulled by the Amazon AppFlow flow into the S3 bucket.

Clean up

When you’re done exploring the solution, complete the following steps to clean up the resources deployed by AWS CloudFormation:

  1. Empty the S3 bucket created by the CloudFormation stack.
  2. Delete the CloudFormation stack.

Conclusion

In this post, we saw how you can easily set up an Amazon AppFlow flow to integrate data from Salesforce into your data lake. Amazon Appflow allows you to integrate data from many other SaaS applications into your data lake. After the data lands in Amazon S3, you can take it further for downstream processing using services like Amazon EMR and AWS Glue. You can then use the data in the data lake for multiple analytics use cases ranging from dashboards to ad hoc analytics and ML.


About the Authors

Ninad Phatak is a Principal Data Architect at Amazon Development Center India. He specializes in data engineering and datawarehousing technologies and helps customers architect their analytics use cases and platforms on AWS.

 

 

 

Vinay Kondapi is Head of product for Amazon AppFlow. He specializes in Application and data integration with SaaS products at AWS.

 

 

 

Build secure encrypted data lakes with AWS Lake Formation

Post Syndicated from Daniela Dorneanu original https://aws.amazon.com/blogs/big-data/build-secure-encrypted-data-lakes-with-aws-lake-formation/

Maintaining customer data privacy, protection against intellectual property loss, and compliance with data protection laws are essential objectives of today’s organizations. To protect data against security threats, vulnerabilities within the organization, malicious software, or cyber criminality, organizations are increasingly encrypting their data. Although you can enable server-side encryption in Amazon Simple Storage Service (Amazon S3), you may prefer to manage your own encryption keys. Amazon Key Management Service (AWS KMS) makes it easy to create, rotate, and disable cryptographic keys across a wide range of AWS services, including over your data lake in Amazon S3.

AWS Lake Formation is a one-stop service to build and manage your data lake. Among its many features, it allows discovering and cataloging data sources, setting up transformation jobs, configuring fine-grained data access and security policies, and auditing and controlling access from data lake consumers. You can also provide column-level security, which is an imperative feature when you want to protect personal identifiable information (PII).

Using AWS KMS with Lake Formation requires several steps, which we discuss in this post. We create a complete solution for processing encrypted data using customer managed keys with Lake Formation, Amazon Athena, AWS Glue, and AWS KMS. We use an S3 bucket registered through Lake Formation, which only accepts encrypted data with customer managed keys. Additionally, we demonstrate how to easily restrict access to PII data for data analysis stakeholders.

To demonstrate the solution, we upload an encrypted document into the S3 bucket and run data transformations using AWS Glue. The processed data is stored back in an encrypted way to Amazon S3. We automated this solution using AWS CloudFormation to have an end-to-end deployment of data lakes supporting encryption.

Solution overview

We use AWS CloudFormation to deploy the data transformation pipeline and explain all the configurations necessary to achieve end-to-end encryption of your data into a data lake.

The following diagram shows a generic infrastructure of a serverless data lake enhanced by encryption. Transformations such as removing duplicated or bad data are required. Afterward, we want to automatically catalog the data to use it with our consumers (through SQL querying, analytics dashboards, or machine learning services).

The reproducible pattern to support customer managed key encryption requires the following steps:

  1. Configure the S3 bucket to use server-side encryption.
  2. Set up a KMS key policy to allow the AWS Identity and Access Management (IAM) role for Lake Formation to use the key for encryption.
  3. Create the AWS Glue security configuration to specify the keys to use for encryption with AWS Glue.

Prerequisites

Before getting started, complete the following prerequisites:

  1. Sign in to the AWS Management Console and choose the US East (N. Virginia) Region for this sample deployment.
  2. Ensure that Lake Formation has the administrators set up, and the default permissions go through Lake Formation for all newly created databases and tables.

Deploy the solution

To deploy the solution, complete the following steps:

  1. On the Lake Formation console, choose Add administrators.
  2. Add your current role and user as an administrator.
  3. In the navigation pane, under Data catalog, select Settings.
  4. Deselect Use only IAM access for new databases and Use only IAM access control for new tables in new databases.

This makes sure that both IAM and Lake Formation permission modules are used.

  1. Choose Save.
  2. Download the content from the following GitHub repository. The repo should contain the following files:
    • The raw data sample file data.json
    • The AWS Glue script sample script.py
    • The CloudFormation template lakeformation_encryption_demo.yaml
  3. Create an S3 bucket in us-east-1 and upload the AWS Glue script.
  4. Record the script path to use as a parameter for the CloudFormation stack.

You now deploy the CloudFormation stack.

  1. Choose Launch Stack:
  2. Leave the default location for the template and choose Next.
  3. On the Specify stack details page, enter a stack name.
  4. For GlueJobScriptBucketPath, enter the bucket containing the AWS Glue script.
  5. For DataLakeBucket, enter the name of the bucket that the stack creates.
  6. On the Configure stack options page, choose Next.
  7. On the Review page, select the check boxes.
  8. Choose Create stack.

At this point, you have successfully created the resources for the Data Lake solution supporting end-to-end encryption.

The stack deploys an S3 bucket in which you upload the file, and registers that bucket within Lake Formation. An AWS Glue job transforms the data into Parquet format, and an AWS Glue crawler detects the schema of the processed data. Additionally, the stack deploys all the AWS KMS resources, which we describe in detail in the next section.

What is happening in the background?

In this section, we describe in more detail the encryption/decryption process. Namely we talk about how encrypted data is uploaded to the S3 bucket, and the role the AWS Glue security configuration is playing to configure Glue jobs and crawlers to use a particular KMS key.

KMS key

As shown in the following screenshot, the KMS key policy enables access for several IAM roles.

lake-formation-demo-role: Lake Formation is the central service managing access to the data. To enable the Lake Formation service to use the KMS key, we add the IAM role used to register the S3 bucket to Lake Formation to the key policy used within this solution.

demo-lake-formation-glue-job-role: The AWS Glue job role also needs to use the KMS key to encrypt the output data after running the ETL job.

demo-lake-formation-glue-crawler-role: Lastly, the AWS Glue crawler uses the KMS key to decrypt the data and infer the schema of the data.

Learn more about registering an S3 location to Lake Formation in the AWS documentation.

Amazon S3 storage uploads only encrypted data

The data lake S3 bucket has a bucket policy enforcing encryption on all the data uploaded to the bucket with the KMS key. This also allows any user to use their own KMS keys to encrypt the data. Additionally, teams within an organization can use different keys when uploading the data, supporting separation of access within an organization.

The following screenshot shows the S3 bucket policy implemented through the CloudFormation stack. The policy denies Amazon S3 Put API calls for objects that aren’t AWS KMS encrypted.

AWS Glue security configuration

An AWS Glue security configuration contains the properties needed when you read and write encrypted data. To create and view all AWS security configurations, on the AWS Glue console, choose Security configurations in the navigation pane.

A security configuration was added to the AWS Glue job and the crawler to configure what encryption key AWS Glue should use when running a job or a crawler.

Test the solution

In this section, we walk through the steps of the end-to-end encryption pipeline:

  1. Upload sample data to Amazon S3.
  2. Run the AWS Glue job.
  3. Give permissions to the AWS Glue crawler to the Amazon S3 location and run the crawler.
  4. Set up permissions for the new role to query the new table.
  5. Run an Athena query.

Upload sample data to Amazon S3

Use the following command to upload a sample file to Amazon S3:

aws s3 cp data.json s3://<DATA_LAKE_BUCKET_NAME>/raw/ --sse aws:kms --sse-kms-key-id  <LAKE_FORMATION_KMS_DATA_KEY>

For <LAKE_FORMATION_KMS_DATA_KEY> value, you need to enter the Key ID of the kms key with the alias lakeformation-kms-data-key, which you can find in the AWS KMS service console.

In the preceding command, data.json is the file that we upload to Amazon S3, and we specify the prefix raw. While uploading, we provide the KMS key to encrypt the file with this encryption key.

Run the AWS Glue job

We’re now ready to run our AWS Glue job.

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Select the job lake-formation-demo-glue-job.
  3. On the Action menu, choose Run job.

When the job is complete, we should see the processed data in the S3 bucket you configured under the prefix processed. When we check the properties of the output file, we should see that the data is encrypted using the KMS key lakeformation-kms-data-key.

Give permissions to the AWS Glue crawler and run the crawler

We now give permissions to the AWS Glue crawler to access Amazon S3, and then run the crawler.

  1. On the Lake Formation console, under Permissions, choose Data locations.
  2. Choose Grant.
  3. Select My account.
  4. For IAM users and roles, choose demo-lake-formation-glue-crawler-role.
  5. For Storage locations, choose the S3 bucket where your data is stored.
  6. For Registered account location, enter the current account number.
  7. Choose Grant.

This step is required for the crawler to have permissions to the Amazon S3 location where the data to be crawled is stored.

  1. On the AWS Glue console, choose Crawlers.
  2. Select the configured crawler and choose Run crawler.

The crawler infers the schema of the processed data, and a new table is now visible within the database: lakeformation-glue-catalog-db.

This table is also visible on the Lake Formation console.

Set up permissions for the current role to query the table

Next, we configure Athena to have the proper rights to query this newly created table over the encrypted data.

One advantage of using Lake Formation to set up permissions is the ability to restrict access to PII in order to stay compliant and protect the privacy of your customers. For this post, we restrict access to all columns in the processed database that aren’t symbol.

  1. On the Lake Formation console, under Data catalog¸ choose Tables.
  2. Select the processed
  3. Click on Actions and select Grant.
  4. Select My account.
  5. For IAM users and roles, choose the current user/role.
  6. For Column-based permissions, choose Include columns.
  7. For Include columns, choose the column symbol.
  8. For Table permissions, select Select.
  9. Choose Grant.

Run an Athena query

We can now query the database with Athena.

  1. On the Athena console, choose the database lakeformation-glue-catalog-db.
  2. Choose the options icon next to the processed table and choose Preview table.
  3. Enter the following query:
    SELECT *
    FROM "lakeformation-glue-catalog-db"."processed" limit 10;

  4. Choose Run query.

The following screenshot shows our output, in which we can see the value of the symbol column. The other columns aren’t visible due to the column-level security configuration.

Further steps

We can also enable encryption at rest for the Athena results, meaning that Athena encrypts the query results in Amazon S3. For more information, see Encrypting Query Results Stored in Amazon S3.

Summary

In this post, we addressed the use case of customers with strict regulatory restrictions that require end-to-end data encryption to comply with their country regulations. Additionally, we set up a data lake to support column-level security to restrict access to PII within tables. We included a step-by-step guide and automated the solution with AWS CloudFormation to deploy it promptly.

If you need any help in building data lakes, please reach out to AWS Professional Services. If you have questions about this post, let us know in the comments section, or start a new thread on the Lake Formation forum.


About the Authors

Daniela Dorneanu is a Data Lake Architect at AWS. As part of Professional Services, Daniela supports customers hands-on to get more value out of their data. Daniela advocates for inclusive and diverse work environments, and she is co-chairing the Software Engineering conference track at the Grace Hopper Celebration, the largest gathering of women in Computing.

 

 

Muhammad Shahzad is a Professional Services consultant who enables customers to implement DevOps by explaining principles, delivering automated solutions and integrating best practices in their journey to the cloud.

Improve query performance using AWS Glue partition indexes

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/improve-query-performance-using-aws-glue-partition-indexes/

While creating data lakes on the cloud, the data catalog is crucial to centralize metadata and make the data visible, searchable, and queryable for users. With the recent exponential growth of data volume, it becomes much more important to optimize data layout and maintain the metadata on cloud storage to keep the value of data lakes.

Partitioning has emerged as an important technique for optimizing data layout so that the data can be queried efficiently by a variety of analytic engines. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. Over time, hundreds of thousands of partitions get added to a table, resulting in slow queries. To speed up query processing of highly partitioned tables cataloged in AWS Glue Data Catalog, you can take advantage of AWS Glue partition indexes.

Partition indexes are available for queries in Amazon EMRAmazon Redshift Spectrum, and AWS Glue extract, transform, and load (ETL) jobs (Spark DataFrame). When partition indexes are enabled on the heavily partitioned AWS Glue Data Catalog tables, all these query engines are accelerated. You can add partition indexes to both new tables and existing tables. This post demonstrates how to utilize partition indexes, and discusses the benefit you can get with partition indexes when working with highly partitioned data.

Partition indexes

AWS Glue partition indexes are an important configuration to reduce overall data transfers and processing, and reduce query processing time. In the AWS Glue Data Catalog, the GetPartitions API is used to fetch the partitions in the table. The API returns partitions that match the expression provided in the request. If no partition indexes are present on the table, all the partitions of the table are loaded, and then filtered using the query expression provided by the user in the GetPartitions request. The query takes more time to run as the number of partitions increase on a table with no indexes. With an index, the GetPartitions request tries to fetch a subset of the partitions instead of loading all the partitions in the table.

The following are key benefits of partition indexes:

  • Increased query performance
  • Increased concurrency as a result of fewer GetPartitions API calls
  • Cost savings:
    • Analytic engine cost (query performance is related to the charges in Amazon EMR and AWS Glue ETL)
    • AWS Glue Data Catalog API request cost

Setting up resources with AWS CloudFormation

This post provides an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. Some of the resources that this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

If you’re using AWS Lake Formation permissions, you need to ensure that the IAM user or role running AWS CloudFormation has the required permissions (to create a database on the Data Catalog).

The tables use sample data located in an Amazon Simple Storage Service (Amazon S3) public bucket. Initially, no partition indexes are configured in these AWS Glue Data Catalog tables.

To create your resources, complete the following steps:

  1. Sign in to the CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatabaseName, leave as the default.
  5. Choose Next.
  6. On the next page, choose Next.
  7. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create.

Stack creation can take up to 5 minutes. When the stack is completed, you have two Data Catalog tables: table_with_index and table_without_index. Both tables point to the same S3 bucket, and the data is highly partitioned based on yearmonthday, and hour columns for more than 42 years (1980-2021). In total, there are 367,920 partitions, and each partition has one JSON file, data.json. In the following sections, you see how the partition indexes work with these sample tables.

Setting up a partition index on the AWS Glue console

You can create partition indexes at any time. If you want to create a new table with partition indexes, you can make the CreateTable API call with a list of PartitionIndex objects. If you want to add a partition index to an existing table, make the CreatePartitionIndex API call. You can also perform these actions on the AWS Glue console. You can create up to three partition indexes on a table.

Let’s configure a new partition index for the table table_with_index we created with the CloudFormation template.

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Partitions and indices.
  4. Choose Add new index.
  5. For Index name, enter year-month-day-hour.
  6. For Selected keys from schema, select year, month, day, and hour.
  7. Choose Add index.

The Status column of the newly created partition index shows the status as Creating. Wait for the partition index to be Active. The process takes about 1 hour because more number of partitions longer it takes for index creation and we have 367,920 partitions on this table.

Now the partition index is ready for the table table_with_index. You can use this index from various analytic engines when you query against the table. You see default behavior in the table table_without_index because no partition indexes are configured for this table.

You can follow (or skip) any of the following sections based on your interest.

Making a GetPartitions API call with an expression

Before we use the partition index from various query engines, let’s try making the GetPartitions API call using AWS Command Line Interface (AWS CLI) to see the difference. The AWS CLI get-partitions command makes multiple GetPartitions API calls if needed. In this section, we simply use the time command to compare the duration for each table, and use the debug logging to compare the number of API calls for each table.

  1. Run the get-partitions command against the table table_without_index with the expression year='2021' and month='04' and day='01':
    $ time aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'"
    ...
    real    3m57.438s
    user    0m2.872s
    sys    0m0.248s
    

The command took about 4 minutes. Note that you used only three partition columns out of four.

  1. Run the same command with debug logging to get the number of the GetPartitionsAPI calls:
    $ aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-without-index.log
    $ cat get-partitions-without-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
         737

There were 737 GetPartitions API calls when the partition indexes aren’t used.

  1. Next, run the get-partitions command against table_with_index with the same expression:
    $ time aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2020' and month='07' and day='01' and hour='09'"
    ...
    real    0m2.697s
    user    0m0.442s
    sys    0m0.163s

The command took just 2.7 seconds. You can see how quickly the required partitions were returned.

  1. Run the same command with debug logging to get the number of the GetPartitionsAPI calls:
    $ aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-with-index.log
    $ cat get-partitions-with-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
           4
    

There were only four GetPartitions API calls when the partition indexes are used.

Querying a table using Apache Spark on Amazon EMR

In this section, we explore querying a table using Apache Spark on Amazon EMR.

  1. Launch a new EMR cluster with Apache Spark.

For instructions, see Setting Up Amazon EMR. You need to specify the AWS Glue Data Catalog as the metastore. In this example, we use the default EMR cluster (release: emr-6.2.0, three m5.xlarge nodes).

  1. Connect to the EMR node using SSH.
  2. Run the spark-sql command on the EMR node to start an interactive shell for Spark SQL:
    $ spark-sql

  3. Run the following SQL against partition_index.table_without_index:
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 35.518 seconds, Fetched 1 row(s)

The query took 35 seconds. Even though you aggregated records only in the specific partition, the query took so long because there are many partitions and the GetPartitions API call takes time.

Now let’s run the same query against table_with_index to see how much benefit the partition index introduces.

  1. Run the following SQL against partition_index.table_with_index:
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 2.247 seconds, Fetched 1 row(s)

The query took just 2 seconds. The reason for the difference in query duration is because the number of GetPartitions calls is smaller because of the partition index.

The following chart shows the granular metrics for query planning time without and with the partition index. The query planning time with the index is far less than that without the index.

For more information about comparing metrics in Apache Spark, see Appendix 2 at the end of this post.

Querying a table using Redshift Spectrum

To query with Redshift Spectrum, complete the following steps:

  1. Launch a new Redshift cluster.

You need to configure an IAM role for the cluster to utilize Redshift Spectrum and the Amazon Redshift query editor. Choose dc2.large, 1 node in this example. You need to launch the cluster in the us-east-1 Region because you need to place your cluster in the same Region as the bucket location.

  1. Connect with the Redshift query editor. For instructions, see Querying a database using the query editor.
  2. Create an external schema for the partition_index database to use it in Redshift Spectrum: (replace <your IAM role ARN> with your IAM role ARN).
    create external schema spectrum from data catalog 
    database 'partition_index' 
    iam_role '<your IAM role ARN>'
    create external database if not exists;

  3. Run the following SQL against spectrum_schema.table_without_index:
    SELECT count(*), sum(value) FROM spectrum.table_without_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query took more than 3 minutes.

  1. Run the following SQL against spectrum_schema.table_with_index:
    SELECT count(*), sum(value) FROM spectrum.table_with_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query for the table using indexes took just 8 seconds, which is much faster than the table without indexes.

Querying a table using AWS Glue ETL

Let’s launch an AWS Glue development endpoint and an Amazon SageMaker notebook.

  1. Open the AWS Glue console, choose Dev endpoints.
  2. Choose Add endpoint.
  3. For Development endpoint name, enter partition-index.
  4. For IAM role, choose your IAM role.

For more information about roles, see Managing Access Permissions for AWS Glue Resources.

  1. For Worker type under Security configuration, script libraries, and job parameters (optional), choose 1X.
  2. For Number of workers, enter 4.
  3. For Dependent jar path, enter s3://crawler-public/json/serde/json-serde.jar.
  4. Select Use Glue data catalog as the Hive metastore under Catalog options (optional).
  5. Choose Next.
  6. For Networking, leave as is (by default, Skip networking configuration is selected), and choose Next.
  7. For Add an SSH public key (Optional), leave it blank, and choose Next.
  8. Choose Finish.
  9. Wait for the development endpoint partition-index to show as READY.

The endpoint may take up to 10 minutes to be ready.

  1. Select the development endpoint partition-index, and choose Create SageMaker notebook on the Actions
  2. For Notebook name, enter partition-index.
  3. Select Create an IAM role.
  4. For IAM role, enter partition-index.
  5. Choose Create notebook.
  6. Wait for the notebook aws-glue-partition-index to show the status as Ready.

The notebook may take up to 3 minutes to be ready.

  1. Select the notebook aws-glue-partition-index, and choose Open notebook.
  2. Choose Sparkmagic (PySpark)on the New
  3. Enter the following code snippet against table_without_index, and run the cell:
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query took 3 minutes.

  1. Enter the following code snippet against partition_index.table_with_index, and run the cell:
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The cell took just 7 seconds. The query for the table using indexes is faster than the table without indexes.

Cleaning up

Now to the final step, cleaning up the resources:

  1. Delete the CloudFormation stack. 
  2. Delete the EMR cluster.
  3. Delete the Amazon Redshift cluster.
  4. Delete the AWS Glue development endpoint and SageMaker notebook.

Conclusion

In this post, we explained how to use partition indexes and how they accelerate queries in various query engines. If you have several millions of partitions, the performance benefit is significantly more. You can learn about partition indexes more deeply in Working with Partition Indexes.


Appendix 1: Setting up a partition index using AWS CLI

If you prefer using the AWS CLI, run the following create-partition-index command to set up a partition index:

$ aws glue create-partition-index --database-name partition_index --table-name table_with_index --partition-index Keys=year,month,day,hour,IndexName=year-month-day-hour

To get the status of the partition index, run the following get-partition-indexes command:

$ aws glue get-partition-indexes --database-name partition_index --table-name table_with_index
{
    "PartitionIndexDescriptorList": [
        {
            "IndexName": "year-month-day-hour",
            "Keys": [
                {
                    "Name": "year",
                    "Type": "string"
                },
                {
                    "Name": "month",
                    "Type": "string"
                },
                {
                    "Name": "day",
                    "Type": "string"
                },
                {
                    "Name": "hour",
                    "Type": "string"
                }
            ],
            "IndexStatus": "CREATING"
        }
    ]
}

Appendix 2: Comparing breakdown metrics in Apache Spark

If you’re interested in comparing the breakdown metrics for query planning time, you can register a SQL listener with the following Scala code snippet:

spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
  override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
    val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
    println(metricMap.toSeq)
  }
  override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
})

If you use spark-shell, you can register the listener as follows:

$ spark-shell
...
scala> spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
     |   override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
     |     val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
     |     println(metricMap.toSeq)
     |   }
     |   override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
     | })

Then run the same query without using the index to get the breakdown metrics:

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,208), (optimization,29002), (analysis,4))
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640632|
+--------+------------------+

In this example, we use the same setup for the EMR cluster (release: emr-6.2.0, three m5.xlarge nodes). The console has additional line:

Vector((planning,208), (optimization,29002), (analysis,4)) 

Apache Spark’s query planning mechanism has three phases: analysis, optimization, and physical planning (shown as just planning). This line means that the query planning took 4 milliseconds in analysis, 29,002 milliseconds in optimization, and 208 milliseconds in physical planning.

Let’s try running the same query using the index:

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,7), (optimization,608), (analysis,2))                          
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640634|
+--------+------------------+

The query planning took 2 milliseconds in analysis, 608 milliseconds in optimization, and 7 milliseconds in physical planning.


About the Authors

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue and AWS Lake Formation. He is passionate about big data technology and open source software, and enjoys building and experimenting in the analytics area.

 

 

 

Sachet Saurabh is a Senior Software Development Engineer at AWS Glue and AWS Lake Formation. He is passionate about building fault tolerant and reliable distributed systems at scale.

 

 

 

Vikas Malik is a Software Development Manager at AWS Glue. He enjoys building solutions that solve business problems at scale. In his free time, he likes playing and gardening with his kids and exploring local areas with family.

 

 

 

 

Build a data quality score card using AWS Glue DataBrew, Amazon Athena, and Amazon QuickSight

Post Syndicated from Nitin Aggarwal original https://aws.amazon.com/blogs/big-data/build-a-data-quality-score-card-using-aws-glue-databrew-amazon-athena-and-amazon-quicksight/

Data quality plays an important role while building an extract, transform, and load (ETL) pipeline for sending data to downstream analytical applications and machine learning (ML) models. The analogy “garbage in, garbage out” is apt at describing why it’s important to filter out bad data before further processing. Continuously monitoring data quality and comparing it with predefined target metrics helps you comply with your governance frameworks.

In November 2020, AWS announced the general availability of AWS Glue DataBrew, a new visual data preparation tool that helps you clean and normalize data without writing code. This reduces the time it takes to prepare data for analytics and ML by up to 80% compared to traditional approaches to data preparation.

In this post, we walk through a solution in which we apply various business rules to determine the quality of incoming data and separate good and bad records. Furthermore, we publish a data quality score card using Amazon QuickSight and make records available for further analysis.

Use case overview

For our use case, we use a public dataset that is available for download at Synthetic Patient Records with COVID-19. It contains 100,000 synthetic patient records in CSV format. Data hosted within SyntheticMass has been generated by SyntheaTM, an open-source patient population simulation made available by The MITRE Corporation.

When we unzip the 100k_synthea_covid19_csv.zip file, we see the following CSV files:

  • Allergies.csv
  • Careplans.csv
  • Conditions.csv
  • Devices.csv
  • Encounters.csv
  • Imaging_studies.csv
  • Immunizations.csv
  • Medications.csv
  • Observations.csv
  • Organizations.csv
  • Patients.csv
  • Payer_transitions.csv
  • Payers.csv
  • Procedures.csv
  • Providers.csv
  • Supplies.csv

We perform the data quality checks categorized by the following data quality dimensions:

  • Completeness
  • Consistency
  • Integrity

For our use case, these CSV files are maintained by your organization’s data ingestion team, which uploads the updated CSV file to Amazon Simple Storage Service (Amazon S3) every week. The good and bad records are separated through a series of data preparation steps, and the business team uses the output data to create business intelligence (BI) reports.

Architecture overview

The following architecture uses DataBrew for data preparation and building key KPIs, Amazon Athena for data analysis with standard SQL, and QuickSight for building the data quality score card.

The workflow includes the following steps:

  1. The ingestion team receives CSV files in an S3 input bucket every week.
  2. The DataBrew job scheduled to run every week triggers the recipe job.
  3. DataBrew processes the input files and generates output files that contain additional fields depending on the recipe job logic.
  4. After the output data is written, we create external table on top of it by creating and running an AWS Glue crawler.
  5. The good and bad records are separated by creating views on top of the external table.
  6. Data analysts can use Athena to analyze good and bad records.
  7. The records can also be separated directly using QuickSight calculated fields.
  8. We use QuickSight to create the data quality score card in the form of a dashboard, which fetches data through Athena.

Prerequisites

Before beginning this tutorial, make sure you have the required permissions to create the resources required as part of the solution.

Additionally, create the S3 input and output buckets to capture the data, and upload the input data into the input bucket.

Create DataBrew datasets

To create a DataBrew dataset for the patient data, complete the following steps:

  1. On the DataBrew console, choose Datasets.
  2. Choose Connect new dataset.
  3. For Dataset name, enter a name (for this post, Patients).
  4. For Enter your source from S3, enter the S3 path of the patients input CSV.
  5. Choose Create Dataset.

Repeat these steps to create datasets for other CSV files, such as encounters, conditions, and so on.

Create a DataBrew project

To create a DataBrew project for marketing data, complete the following steps:

  1. On the DataBrew console, choose Projects.
  2. Choose Create a project.
  3. For Project name, enter a name (for this post, patients-data-quality).
  4. For Select a dataset, select My datasets.
  5. Select the patients dataset.
  6. Under Permissions, for Role name, choose an AWS Identity and Access Management (IAM) role that allows DataBrew to read from your Amazon S3 input location.

You can choose a role if you already created one, or create a new one. For more information, see Adding an IAM role with data resource permissions.

  1. Wait till the dataset is loaded (about 1–2 minutes).
  2. To make a consistency check, choose Birthdate.
  3. On the Create menu, choose Flag column.
  4. Under Create column, for Values to flag, select Custom value.
  5. For Source column, choose BIRTHDATE.
  6. For Values to flag, enter the regular expression (?:(?:18|19|20)[0-9]{2}).
  7. For Flag values as, choose Yes or no.
  8. For Destination column, enter BIRTHDATE_flagged.

The new column BIRTHDATE_FLAGGED now displays Yes for a valid four-digit year within BIRTHDATE.

  1. To create a completeness check, repeat the preceding steps to create a DRIVERS_FLAGGED column by choosing the DRIVERS column to mark missing values.
  2. To create an integrity check, choose the JOIN transformation.
  3. Choose the encounters dataset and choose Next.
  4. For Select join type, select Left join.
  5. For Join keys, choose Id for Table A and Patient for Table B.
  6. Under Column list, unselect all columns from Table B except for Patient.
  7. Choose Finish.
  8. Choose the Patient column and create another flag column PATIENTS_FLAG to mark missing values from the Patient column.

For our use case, we created three new columns to demonstrate data quality checks for data quality dimensions in scope (consistency, completeness, and integrity), but you can integrate additional transformations on the same or additional columns as needed.

  1. After you finish applying all your transformations, choose Publish on the recipe.
  2. Enter a description of the recipe version and choose Publish.

Create a DataBrew job

Now that our recipe is ready, we can create a job for it, which gets invoked through our AWS Lambda functions.

  1. On the DataBrew console, choose Jobs.
  2. Choose Create a job.
  3. For Job name¸ enter a name (for example, patient-data-quality).

Your recipe is already linked to the job.

  1. Under Job output settings¸ for File type, choose your final storage format (for this post, we choose CSV).
  2. For S3 location, enter your final S3 output bucket path.
  3. For Compression, choose the compression type you want to apply (for this post, we choose None).
  4. For File output storage, select Replace output files for each job run.

We choose this option because our use case is to publish a data quality score card for every new set of data files.

  1. Under Permissions, for Role name¸ choose your IAM role.
  2. Choose Create and run job.

Create an Athena table

If you’re familiar with Apache Hive, you may find creating tables on Athena to be familiar. You can create tables by writing the DDL statement on the query editor, or by using the wizard or JDBC driver. To use the query editor, enter the following DDL statement to create a table:

CREATE EXTERNAL TABLE `blog_output`(
  `id` string, 
  `birthdate` string, 
  `birthdate_flagged` string, 
  `deathdate` string, 
  `ssn` string, 
  `drivers` string, 
  `drivers_flagged` string, 
  `passport` string, 
  `prefix` string, 
  `first` string, 
  `last` string, 
  `suffix` string, 
  `maiden` string, 
  `marital` string, 
  `race` string, 
  `ethnicity` string, 
  `gender` string, 
  `birthplace` string, 
  `address` string, 
  `city` string, 
  `state` string, 
  `county` string, 
  `zip` bigint, 
  `lat` double, 
  `lon` double, 
  `healthcare_expenses` double, 
  `healthcare_coverage` double, 
  `patient` string, 
  `patient_flagged` string)
ROW FORMAT DELIMITED 
  FIELDS TERMINATED BY ',' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<your-bucket>/blog_output/';

Let’s validate the table output in Athena by running a simple SELECT query. The following screenshot shows the output.

Create views to filter good and bad records (optional)

To create a good records view, enter the following code:

CREATE OR REPLACE VIEW good_records AS
SELECT * FROM "databrew_blog"."blog_output"
where 
birthdate_flagged = 'Yes' AND
drivers_flagged = 'No' AND
patient_flagged = 'No'

To create a bad records view, enter the following code:

CREATE OR REPLACE VIEW bad_records AS
SELECT * FROM "databrew_blog"."blog_output"
where 
birthdate_flagged = 'No' OR
drivers_flagged = 'Yes' OR 
patient_flagged = 'Yes'

Now you have the ability to query the good and bad records in Athena using these views.

Create a score card using QuickSight

Now let’s complete our final step of the architecture, which is creating a data quality score card through QuickSight by connecting to the Athena table.

  1. On the QuickSight console, choose Athena as your data source.
  2. For Data source name, enter a name.
  3. Choose Create data source.
  4. Choose your catalog and database.
  5. Select the table you have in Athena.
  6. Choose Select.

Now you have created a dataset.

To build the score card, you add calculated fields by editing the dataset blog_output.

  1. Locate your dataset.
  2. Choose Edit dataset.
  3. Choose Add calculated field.
  4. Add the field DQ_Flag with value ifelse({birthdate_flagged} = 'No' OR {drivers_flagged} = 'Yes' OR {patient_flagged} = 'Yes' , 'Invalid', 'Valid').

Similarly, add other calculated fields.

  1. Add the field % Birthdate Invalid Year with value countIf({birthdate_flagged}, {birthdate_flagged} = 'No')/count({birthdate_flagged}).
  2. Add the field % Drivers Missing with value countIf({drivers_flagged}, {drivers_flagged} = 'Yes')/count({drivers_flagged}).
  3. Add the field % Patients missing encounters with value countIf({patient_flagged}, {patient_flagged} = 'Yes')/count({patient_flagged}).
  4. Add the field % Bad records with the value countIf({DQ_Flag}, {DQ_Flag} = 'Invalid')/count({DQ_Flag}).

Now we create the analysis blog_output_analysis.

  1. Change the format of the calculated fields to display the Percent format.
  2. Start adding visuals by choosing Add visual on the + Add menu.

Now you can create a quick report to visualize your data quality score card, as shown in the following screenshot.

If QuickSight is using SPICE storage, you need to refresh the dataset in QuickSight after you receive notification about the completion of the data refresh. If the QuickSight report is running an Athena query for every request, you might see a “table not found” error when data refresh is in progress. We recommend using SPICE storage to get better performance.

Cleaning up

To avoid incurring future charges, delete the resources created during this walkthrough.

Conclusion

This post explains how to create a data quality score card using DataBrew, Athena queries, and QuickSight.

This gives you a great starting point for using this solution with your datasets and applying business rules to build a complete data quality framework to monitor issues within your datasets. We encourage you to use various built-in transformations to get the maximum value for your project.


About the Authors

Nitin Aggarwal is a Senior Solutions Architect at AWS, where helps digital native customers with architecting data analytics solutions and providing technical guidance on various AWS services. He brings more than 16 years of experience in software engineering and architecture roles for various large-scale enterprises.

 

 

 

Gaurav Sharma is a Solutions Architect at AWS. He works with digital native business customers providing architectural guidance on AWS services.

 

 

 

Vivek Kumar is a Solutions Architect at AWS. He works with digital native business customers providing architectural guidance on AWS services.

How Optus improves broadband and mobile customer experience using the Network Data Analytics platform on AWS

Post Syndicated from Rajagopal Mahendran original https://aws.amazon.com/blogs/big-data/how-optus-improves-broadband-and-mobile-customer-experience-using-the-network-data-analytics-platform-on-aws/

This is a guest blog post co-written by Rajagopal Mahendran, Development Manager at the Optus IT Innovation Team.


Optus is part of The Singtel group, which operates in one of the world’s fastest growing and most dynamic regions, with a presence in 21 countries. Optus provides not only core telecom services, but also an extensive range of digital solutions, including cloud, cybersecurity, and digital advertising to enterprises, as well as entertainment and mobile financial services to millions of consumers. Optus provides mobile communication services to over 10.4 million customers and broadband services to over 1.1 million homes and businesses. In addition, Optus Sport connects close to 1 million fans to Premier League, international football, and fitness content.

In this post, we look at how Optus used Amazon Kinesis to ingest and analyze network related data in a data lake on AWS and improve customer experience and the service planning process.

The challenge

A common challenge for telecommunication providers is to form an accurate, real-time view of quality of service and issues experienced by their customers. Home network and broadband connectivity quality has a significant impact on customer productivity and satisfaction, especially considering the increased reliance on home networks for work, connecting with family and friends, and entertainment during the COVID-19 pandemic.

Additionally, network operations and planning teams often don’t have access to the right data and insights to plan new rollouts and manage their current fleet of devices.

The network analytics platform provides troubleshooting and planning data and insights to Optus teams and their customers in near-real time, which helps reduce mean time to rectify and enhance the customer experience. With the right data and insights, customers have a better experience because instead of starting a support call with a lot of questions, the support staff and the customer have a current and accurate view of the services and the customer’s home network.

Service owner teams within Optus can also use the insights and trends derived from this platform to better plan for the future and provide higher-quality service to customers.

Design considerations

To address this challenge and its requirements, we embarked on a project to transform our current batch collection and processing system to a stream-based, near-real-time processing system, and introduce APIs for insights so that support systems and customer applications can show the latest snapshot of the network and service status.

We had the following functional and non-functional requirements:

  • The new platform must be capable of supporting data capture from future types of customer equipment as well as new ways of ingestion (new protocols and frequency) and new formats of data.
  • It should support multiple consumers (a near-real-time API for support staff and customer applications and operational and business reporting) to consume data and generate insights. The aim is for the platform to proactively detect issues and generate appropriate alerting to support staff as well as customers.
  • After the data arrives, insights from the data should be ready in the form of an API in a few seconds (5 seconds maximum).
  • The new platform should be resilient enough to continue processing when parts of the infrastructure fail, such as nodes or Availability Zones.
  • It can support an increased number of devices and services as well as more frequent collection from the devices.
  • A small cross-functional team across business and technology will build and run this platform. We need to ensure minimal infrastructure and operational overhead in the long run.
  • The pipeline should be highly available and allow for new deployments with no downtime.

Solution overview

With the goal of the platform and design considerations in mind, we decided to use higher-order services and serverless services from AWS where possible, to avoid unnecessary operational overhead for our team and focus on the core business needs. This includes using the Kinesis family of services for stream ingestion and processing; AWS Lambda for processing; Amazon DynamoDB, Amazon Relational Database Service (Amazon RDS), and Amazon Simple Storage Service (Amazon S3) for data persistence; and AWS Elastic Beanstalk and Amazon API Gateway for application and API serving. The following diagram shows the overall solution.

 

The solution ingests log files from thousands of customer network equipment (home routers) in predefined periods. The customer equipment is only capable of sending simple HTTP PUT and POST requests to transfer log files. To receive these files, we use a Java application running in an Auto Scaling group of Amazon Elastic Compute Cloud (Amazon EC2) instances. After some initial checks, the receiver application performs cleansing and formatting, then it streams the log files to Amazon Kinesis Data Streams.

We intentionally use a custom receiver application in the ingestion layer to provide flexibility in supporting different devices and file formats.

To understand the rest of the architecture, let’s take a look at the expected insights. The platform produces two types of insights:

  • Individual insights – Questions answered in this category include:
    • How many errors has a particular customer device experienced in the last 15 minutes?
    • What was the last error?
    • How many devices are currently connected at a particular customer home?
    • What’s the transfer/receive rate as captured by a particular customer device?
  • Base insights – Pertaining to a group or the whole user base, questions in this category include:
    • How many customer devices reported service disruption in the past 24 hours?
    • Which device types (models) have experienced the highest number of errors in the past 6 months?
    • After last night’s patch update on a group of devices, have they reported any errors? Was the maintenance successful?

The top lane in the architecture shows the pipeline that generates the individual insights.

 

The event source mapping of the Lambda function is configured to consume records from the Kinesis data stream. This function reads the records, formats, and prepares them based on the insights required. Finally, it stores the results in the Amazon S3 location and also updates a DynamoDB table that maintains a summary and the metadata of the actual data stored in Amazon S3.

To optimize performance, we configured two metrics in the Lambda event source mapping:

  • Batch size – Shows the number of records to send to the function in each batch, which helps achieve higher throughput
  • Concurrent batches per shard – Processes multiple batches from the same shard concurrently, which helps with faster processing

Finally, the API is provided via API Gateway and runs on a Spring Boot application that is hosted on Elastic Beanstalk. In the future, we may need to keep state between API calls, which is why we use Elastic Beanstalk instead of a serverless application.

The bottom lane in the architecture is the pipeline that generates base reports.

 

We use Amazon Kinesis Data Analytics, running stateful computation on streaming data, to summarize certain metrics like transfer rates or error rates in given time windows. These summaries are then pushed to an Amazon Aurora database with a data model that’s suitable for dashboarding and reporting purposes.

The insights are then presented in dashboards using a web application running on Elastic Beanstalk.

Lessons learned

Using serverless patterns and higher-order services, in particular Lambda, Kinesis Data Streams, Kinesis Data Analytics, and DynamoDB, provided a lot of flexibility in our architecture and helped us move more towards microservices rather than big monolith batch jobs.

This shift also helped us dramatically decrease our operational and service management overhead. For example, over the last several months since the launch, customers of this platform didn’t experience any service disruption.

This solution also enabled us to adopt more DevOps and agile ways of working, in the sense that a single small team develops and runs the system. This in turn enabled the organization to be more agile and innovative in this domain.

We also discovered some technical tips through the course of development and production that are worth sharing:

Outcomes and benefits

We now have near-real-time visibility of our fixed and mobile networks performance as experienced by our customers. In the past, we only had data that came in batch mode with a delay and also only from our own network probes and equipment.

With the near-real-time view of the network when changes occur, our operational teams can also carry out upgrades and maintenance across the fleet of customer devices with higher confidence and frequency.

Lastly, our planning teams use these insights to form an accurate, up-to-date performance view of various equipment and services. This leads to higher-quality service for our customers at better prices because our service planning teams are enabled to optimize cost, better negotiate with vendors and service providers, and plan for the future.

Looking ahead

With the network analytics platform in production for several months and stable now, there is demand for more insights and new use cases. For example, we’re looking into a mobile use case to better manage capacity at large-scale events (such as sporting events). The aim is for our teams to be data driven and able to react in near-real time to capacity needs in these events.

Another area of demand is around predictive maintenance: we are looking to introduce machine learning into these pipelines to help drive insights faster and more accurately by using the AWS Machine Learning portfolio of services.


About the authors

Rajagopal Mahendran is a Development Manager at the Optus IT Innovation Team. Mahendran has over 14 years of experience in various organizations delivering enterprise applications from medium-scale to very large-scale using proven to cutting-edge technologies in big data, streaming data applications, mobile, and cloud native applications. His passion is to power innovative ideas using technology for better living. In his spare time, he loves bush walking and swimming.

 

Mostafa Safipour is a Solutions Architect at AWS based out of Sydney. He works with customers to realize business outcomes using technology and AWS. Over the past decade he has helped many large organizations in the ANZ region build their data, digital, and enterprise workloads on AWS.

 

Masudur Rahaman Sayem is a Specialist Solution Architect for Analytics at AWS. He works with AWS customers to provide guidance and technical assistance on data and analytics projects, helping them improve the value of their solutions when using AWS. He is passionate about distributed systems. He also likes to read, especially classic comic books.