Tag Archives: AWS Big Data

Publish and update data products dynamically with AWS Data Exchange

Post Syndicated from Akram Chetibi original https://aws.amazon.com/blogs/big-data/publish-and-update-data-products-dynamically-with-aws-data-exchange/

Data is revolutionizing the way organizations of all sizes conduct their business. Companies are increasingly using third-party data to complement their internal data and deliver value for their customers. Third party data is used across a wide variety of use-cases, such as to build applications for customers, to run analytics workloads to improve business operations and marketing activities, or to build predictive models using machine learning (ML) techniques.

However, as data becomes the center of how companies operate, the way data providers deliver to data subscribers has not changed in years. As data providers, you spend time and effort on undifferentiated heavy lifting to build data delivery and entitlement management mechanisms to serve your customers. Many data providers also rely on traditional sales and delivery channels and are often unable to reach many customers interested in their data, which leads to slower adoption of their data products.

Enter AWS Data Exchange.

AWS Data Exchange makes it easy to exchange data in the cloud efficiently. In a few minutes, customers can find and subscribe to hundreds of data products from more than 80 qualified data providers across industries, such as Financial Services, Healthcare and Life Sciences, and Consumer and Retail. After subscribing, customers can download a dataset or copy it to Amazon S3 and analyze it with a wide variety of AWS analytics and ML services. AWS Data Exchange gives data providers a secure, transparent, and reliable channel to reach millions of AWS customers. AWS Data Exchange also helps you service your existing customer subscriptions more efficiently and at a lower cost by eliminating the need to build and maintain data delivery, licensing, or billing infrastructure.

Many data providers publish data products that are updated regularly. For example, a stock market data provider may want to publish daily closing prices every day, or a weather forecast data provider may want to provide an updated forecast every week. This post walks through the process of publishing and updating products dynamically on AWS Data Exchange. The post first shows how to publish a new product and make it available to subscribers, which can be done in minutes using the AWS Data Exchange console. The post also reviews a workflow using a Lambda function to automatically update the product by publishing new revisions to its underlying data sets.

Prerequisites

Before you begin, complete the following prerequisites:

  1. You must be a registered provider on AWS Data Exchange. Only eligible and registered providers can publish data products on AWS Data Exchange. Eligible providers must agree to the Terms and Conditions for AWS Marketplace under a valid legal entity domiciled in the United States or a member state of the EU, supply valid banking and taxation identification, and be qualified by the AWS Data Exchange business operations team. For more information, see Providing Data Products on AWS Data Exchange.
  2. The data that you publish must be compliant with the AWS Marketplace Terms and Conditions and the AWS Data Exchange Publishing Guidelines.
  3. You must have the appropriate IAM permissions to use AWS Data Exchange as a provider. For example, you can use the AWSDataExchangeProviderFullAccess managed IAM policy.
  4. You need an S3 bucket for your ready-to-publish data files. For more information, see Create a Bucket and What is Amazon S3?

AWS Data Exchange concepts

Products are the unit of exchange in AWS Data Exchange. A product is a package of data sets that a provider publishes and others subscribe to. The AWS Data Exchange product catalog and AWS Marketplace website both list products. A product can contain one or more data sets, as well as product details, including the product’s name and description, categories, and contact details. The product also contains information related to the product’s offer terms, which are the terms that subscribers agree to when subscribing to a product. These terms include the available pricing and duration options, the data subscription agreement, and the refund policy.

A data set is a dynamic set of file-based data content. Data sets are dynamic and versioned using revisions. A revision is a specific version of a data set. Each revision can contain multiple files called assets, which you can import to a revision using an asynchronous workflow called a job. After creating a revision and importing assets into it, you need to finalize the revision to mark it as ready for publishing, before publishing it into the dataset’s product. For more information, see Working with Data Sets.

The following diagram summarizes the concepts described above and the hierarchy of the different resources.

Publishing a new product to AWS Data Exchange

Before reviewing how to automatically update an existing product, let’s start by setting up and creating a new product. If you already have a published product, skip this section and move on to “Publishing new data files to the product automatically.”

Creating a dataset

To publish a product, first create a dataset. Complete the following steps:

  1. On the AWS Data Exchange console’s, under Data sets, choose Create data set.
  2. Enter a Name and Description for the dataset and choose Create.

The name of the data set is visible as part of the product details in the catalog; consider using a concise name that enables customers to understand the content of the data set easily. The description is visible to subscribers who have an active subscription to the product; consider including coverage information as well as the features and benefits of the dataset.

The following screenshot shows the Create data set section with name and description. This post entered the name Exchange-A End of Day Prices, and the description, End-of-day pricing of all equities listed on Exchange-A. Covers all industries and all equities traded on the exchange (2,000+). This data set contains full history from 1985, and is updated daily with a new file every day around 5pm EST

Creating a revision

After creating the dataset, but before publishing it into a product, you need to create its first initial revision. Complete the following steps:

  1. On your data set’s page, choose the Revisions.
  2. Choose Create revision.
  3. For Revision settings, enter a brief comment about the data in this revision.
  4. Choose Create.The revision comment is visible to subscribers after they subscribe to your product.The following screenshot shows that this post entered the comment Historical data from January 1st, 1985 to November 13th, 2019.You can choose to import files (assets) to this revision from either an S3 bucket or your computer. This post imports a file from an S3 bucket. It is important to note that by default, AWS Data Exchange uses the source S3 Object’s key as an Asset name. The following screenshot shows the example file this post uses.
  5. When the import status is complete, choose Finalize.

Marking a revision as finalized means that it is staged for publishing. You can only publish finalized revisions to subscribers; you can’t modify a revision after publishing it.

Publishing a new product

You are now ready to publish a new product using this data set. Complete the following steps:

  1. On the AWS Data Exchange console, under Publish data, choose Products.
  2. Choose Publish new product.
  3. In Product overview, enter the product details that subscribers can use to identify the product. For information about best practices when populating your product’s details, see Publishing Products. In particular you may want to consider including links to a Data due diligence questionnaire (DDQ), information about the data set file types and schemas, and any other fact sheets.Note that you can use markdown to include links and format your product description.
  4. Choose Next to proceed to the Add data You can then add the dataset that you created above.
  5. Choose Next to proceed to the Configure the public offer page. This is the page where you configure the offer details for your product, including the available pricing options, the Data Subscription Agreement, and the refund policy.You can also choose whether you would like to enable subscription verification. If you enable subscription verification, prospective subscribers will have to fill in information such as their name, company name, email address, and use-case before being able to subscribe. The subscription request will then appear on your Product Dashboard page, and you will have up to 45 days to approve or decline the request. For information about subscription verification, see Subscription Verification for Providers.
  6. Choose Next to review your product. You can preview the product as it will appear on the AWS Data Exchange product catalog. When you are satisfied with your product and offer details, choose Publish the product.Important: Choosing Publish the product will publish your product to the AWS Data Exchange catalog and make it publicly available to subscribers.

You have now created a new data set, added your first revision to this data set with historical data, finalized the revision, and published a product using this finalized revision. This product is available for subscribers to purchase within a few hours after publishing.

Publishing new data files to the product automatically

Now that the product is available to customers, you need to update the product and continuously publish new revisions to it. In our example, you need to publish new equity prices every day. To do so, set up the following architecture, which automatically picks any files uploaded to your S3 bucket and publishes them to the product’s dataset as part of a new revision. The workflow creates and publishes a new revision for each file uploaded to the S3 bucket.

The workflow is as follows:

  1. You upload a ready-to-publish data file to the S3 bucket to update your data set.
  2. S3 invokes an AWS Lambda function with the S3 API event that contains details about the object. For more information, see Using AWS Lambda with Amazon S3.
  3. The AWS Lambda function creates a new revision under the pre-existing data set and starts a job to import the file.
  4. The AWS Lambda function modifies the pre-existing product to include the new dataset revision.
  5. Subscribers can now consume the new revision, which appears as part of their entitled data set.

Building a Lambda function

Now that you published a product with a data set, you have the foundational pieces in place to build a Lambda function that picks a new data file uploaded to S3 and publishes it as a part of that product.

To configure your Lambda function correctly, you first need to record the dataset ID and product ID that you created earlier. You can retrieve them from the AWS Data Exchange console. The product ID is available on the product page, which you can access from your Product Dashboard. The data set ID is available in the data set’s page, which you can access from the Data sets pages.

Data set page

Product page

Creating an IAM role

To give the Lambda function permission to read from the source S3 bucket, create a revision, upload files to it, and publish it to a product, you need to create an IAM role with the appropriate permissions.

To do so, create an IAM role and attach the following policy to it. Be sure to replace {INSERT-BUCKET-NAME} and {INSERT-ACCOUNTID} with your S3 bucket’s name and your account ID respectively.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "S3PermissionforGettingDataSet",
            "Effect": "Allow",
            "Action": "s3:GetObject",
            "Resource": "arn:aws:s3:::{INSERT-BUCKET-NAME}/*"
        },
        {
            "Sid": "S3DataExchangeServicePermissions",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:PutObjectAcl"
            ],
            "Resource": "arn:aws:s3:::*aws-data-exchange*"
        },
        {
            "Sid": "DataExchangeAPIPermissions",
            "Effect": "Allow",
            "Action": [
                "dataexchange:CreateRevision",
                "dataexchange:UpdateRevision",
                "dataexchange:CreateJob",
                "dataexchange:StartJob",
                "dataexchange:GetJob"
            ],
            "Resource": "*"
        },
        {
            "Sid": "MarketplaceAPIPermissions",
            "Effect": "Allow",
            "Action": [
                "aws-marketplace:DescribeEntity",
                "aws-marketplace:StartChangeSet",
                "aws-marketplace:DescribeChangeSet"
            ],
            "Resource": "*"
        },
        {
            "Sid": "CreateCloudwatchLogGroup",
            "Effect": "Allow",
            "Action": "logs:CreateLogGroup",
            "Resource": "arn:aws:logs:us-east-1:{INSERT-ACCOUNTID}:*"
        },
        {
            "Sid": "CloudwatchLogsPermissions",
            "Effect": "Allow",
            "Action": [
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": "arn:aws:logs:us-east-1:{INSERT-ACCOUNTID}:log-group:*"
        }
    ]
}

For more information, see Creating IAM Roles.

Deploying a Lambda layer

This post uses a Lambda layer that extends the AWS Python SDK (boto3) that is built into the Lambda Python runtime by adding the AWS Data Exchange and AWS Marketplace Catalog API SDKs as of November 13, 2019. You can deploy a sample layer published for this post, but you should use the version of the AWS SDK that matches your needs.

Creating a Lambda function

Now that you created the IAM role and deployed the Lambda layer with your latest SDK, you can create a Lambda function using the following steps:

  1. On the Lambda console, choose Create a function.
  2. In the Create function section, choose Author from scratch.
  3. In the Basic information section, configure your Lambda function with the following information:
    1. For Function name, enter a name of your choice.
    2. For Runtime, choose Python 3.7.
    3. For Permissions, select Use an existing role.
    4. From the Existing role dropdown, select the Lambda role you created earlier.
  4. Choose Create function.

Configuring your Lambda function

You can now configure your Lambda function. You first need to configure the function to be triggered when new files upload to the S3 bucket. Complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Select the newly created function.
  3. On the function configuration page, choose Add trigger.
  4. Under Trigger Configuration, choose S3.
  5. From the drop-down, select the bucket you created as a part of the prerequisites.
  6. Under Event type, choose All Object Create Events.
  7. Optionally, choose a Prefix or a Suffix if you want to only publish specific files to your AWS Data Exchange product.
  8. Choose Add.

To confirm your code is running with the appropriate SDK, associate the Lambda layer that you deployed earlier with your Lambda function. As noted previously, this post published a sample layer, but you should use the appropriate version of the AWS SDK that matches your needs.

  1. On the Lambda console, choose Functions.
  2. Select the newly created function.
  3. On the function configuration page, under the function name, choose Layers.
  4. Choose Add a layer.
  5. Under Layer Selection, deselect Select from list of runtime compatible layers.
  6. From the drop-down, choose the layer you deployed earlier.
  7. Choose Add.

You now need to configure the Lambda function’s code. You can copy the following code for the Lambda function. This code programmatically calls the following APIs, which are the same APIs that you performed earlier using the console:

  • CreateRevision creates a new revision.
  • CreateJob and StartJob start importing the file to the revision.
  • GetJob checks the status of the import.
  • UpdateRevision marks the revision as finalized.

To publish an update to the product, the Lambda function uses the AWS Marketplace Catalog API service with the following APIs. To learn more, see the AWS Marketplace Catalog API Reference.

  • DescribeEntity gets the product details.
  • StartChangeSet starts an update.
  • DescribeChangeSet checks the status of the product update.

Complete the following steps:

  1. On the Lambda console, choose Functions.
  2. Select your newly created function.
  3. Scroll down to the Function code
  4. Enter the following code:
    import os
    
    #Include the Lambda layer extracted location
    os.environ['AWS_DATA_PATH'] = '/opt/' 
    
    import boto3
    import time
    import datetime
    import json
    
    region = os.environ['AWS_REGION']
    
    try:
        data_set_id = os.environ['DATA_SET_ID']
    except KeyError:
        raise Exception("DATA_SET_ID environment variable must be defined!") 
    
    try:
        product_id = os.environ['PRODUCT_ID']
    except KeyError:
        raise Exception("PRODUCT_ID environment variable must be defined!")
    
    def lambda_handler(event, context):
        # Setup the boto3 clients needed
        dataexchange = boto3.client(
            service_name='dataexchange',
            region_name=region
    
        )
        marketplace_catalog = boto3.client(
            service_name='marketplace-catalog',
            region_name=region
        )
    
        # parse the s3 details from the triggered event
        bucket_name = event['Records'][0]['s3']['bucket']['name']
        object_key = event['Records'][0]['s3']['object']['key']
    
        # CREATE REVISION under the dataset provided as an environment variable
        current_time_for_creating_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
        create_revision_response = dataexchange.create_revision(DataSetId=data_set_id,
                                                         Comment='Revision created programmatically on ' + current_time_for_creating_revision)
        revision_id = create_revision_response['Id']
    
        # CREATE JOB under the revision to import file from S3 to DataExchange
        create_job_s3_import = dataexchange.create_job(
            Type='IMPORT_ASSETS_FROM_S3',
            Details={
                'ImportAssetsFromS3': {
                    'DataSetId': data_set_id,
                    'RevisionId': revision_id,
                    'AssetSources': [
                        {
                            'Bucket': bucket_name,
                            'Key': object_key
                        }
    
                    ]
                }
            }
        )
    
        # Filter the ID of the Job from the response
        job_id = create_job_s3_import['Id']
    
        # invoke START JOB on the created job to change it from Waiting to Completed state
        start_created_job = dataexchange.start_job(JobId=job_id)
    
        # GET JOB details to track the state of the job and wait until it reaches COMPLETED state
        job_status = ''
    
        while job_status != 'COMPLETED':
            get_job_status = dataexchange.get_job(JobId=job_id)
            job_status = get_job_status['State']
            print('Job Status ' + job_status)
            
            if job_status=='ERROR' :
                job_errors = get_job_status['Errors']
                raise Exception('JobId: {} failed with error:{}'.format(job_id, job_errors))
            
            time.sleep(.5)
            
        # Finalize revision by invoking UPDATE REVISION
        current_time_for_finalize_revision = datetime.datetime.utcnow().strftime("%d %B %Y %I:%M%p UTC")
        print(current_time_for_finalize_revision)
        finalize_revision = dataexchange.update_revision(DataSetId=data_set_id, RevisionId=revision_id, Finalized=True,
                                                  Comment='Revision finalized programmatically on ' + current_time_for_finalize_revision)
    
        # New dataset version created and finalized, now let’s add it to an existing product specified as an env variable
    
        # Describe Product details to get the metadata about the product
        describe_entity = marketplace_catalog.describe_entity(Catalog='AWSMarketplace', EntityId=product_id)
    
        # Use the output to pull out producttype, productid and datasetarn for startchangeset call
        entity_type = describe_entity['EntityType']
        entity_id = describe_entity['EntityIdentifier']
        dataset_arn = ((json.loads(describe_entity['Details']))['DataSets'][0]['DataSetArn'])
        revision_arn = create_revision_response['Arn']
     
    
        # StartChangeSet to add the newly finalized revision to an existing product
        start_change_set = marketplace_catalog.start_change_set(
            Catalog='AWSMarketplace',
            ChangeSetName="Adding revision to my Product",
            ChangeSet=[
                {
                    "ChangeType": "AddRevisions",
                    "Entity": {
                        "Identifier": entity_id,
                        "Type": entity_type
                    },
                    "Details": json.dumps({
                        "DataSetArn": dataset_arn,
                        "RevisionArns": [revision_arn]
                    })
                }
            ]
        )
        
        #Filter the changeset id from the response
        changeset_id = start_change_set['ChangeSetId']
    
        # DESCRIBE CHANGESET to get the status of the changeset and wait until it reaches SUCCEEDED state
        change_set_status = ''
    
        while change_set_status != 'SUCCEEDED':
            describe_change_set = marketplace_catalog.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=changeset_id
                )
            change_set_status = describe_change_set['Status']
            print('Change Set Status ' + change_set_status)
    
            if change_set_status=='FAILED' :
                print(describe_change_set)
                failurereason = describe_change_set['FailureDescription']
                raise Exception('ChangeSetID: {} failed with error:\n{}'.format(changeset_id, failurereason))
            time.sleep(1)
            
        return ('Your data has been published successfully')

  5. Scroll down to Environment Variables
  6. Set the DATA_SET_ID and PRODUCT_ID variables to the values you retrieved from the console.
  7. Scroll further down to Basic Settings and set the Timeout value to 1 minute.
  8. Choose Save.

When you upload a file to your S3 bucket, the S3 event now triggers the Lambda function, which updates the dataset automatically and publishes the new file to your subscribers. Subscribers also receive an Amazon CloudWatch event from AWS Data Exchange to automate exporting the data to their S3 buckets.

Conclusion

AWS Data Exchange provides an easy and convenient way for data providers to exchange data with their customers in a cloud-native, secure, and efficient way. This post showed you how to publish a new product from on a newly created data set and revision in the AWS Data Exchange Console. You also learned how to automatically publish files uploaded to your S3 bucket as new revisions. To learn more, visit AWS Data Exchange.

 


About the Authors

Akram Chetibi is a senior product manager of AWS Data Exchange. Akram joined AWS more than two years ago, and has launched multiple services including AWS Data Exchange and AWS Fargate.

 

 

 

 

Keerti Shah is a global solutions architect with Amazon Web Services. She enjoys working with Financial Services customers to drive innovation, digitization, and modernization of legacy applications.

 

 

 

 

Harsha W. Sharma is a global account solutions architect with AWS New York. Harsha joined AWS more than three years ago and works with Global Financial Services customers to design and develop architectures on AWS and support their journey on the cloud.

 

 

 

Find and acquire new data sets and retrieve new updates automatically using AWS Data Exchange

Post Syndicated from Akram Chetibi original https://aws.amazon.com/blogs/big-data/find-and-acquire-new-data-sets-and-retrieve-new-updates-automatically-using-aws-data-exchange/

Customers are doing some amazing things with data, such as improving medicine and tackling climate change. With AWS services, such as AWS Glue, Amazon EMR, Amazon SageMaker, Amazon QuickSight, and Amazon Athena, it is easier than ever to get data insights for your business. But how can you find the right data to fuel these analytics? This is where AWS Data Exchange steps in.

AWS Data Exchange makes it simple to exchange data in the cloud. In a few minutes, you can find and subscribe to hundreds of data products from more than 80 qualified data providers across industries such as Financial Services, Healthcare and Life Sciences, and Consumer and Retail. After subscribing, you can download data sets or copy them to Amazon S3 and analyze them with AWS’s analytics and machine learning services. With AWS Data Exchange, you can subscribe to data products and get access to data sets. Subscribers also access new data set revisions as providers publish new data.

This post uses an example scenario in which you would like to analyze daily treasury maturities in order to understand changes in the economy. We will use Rearc’s Daily Treasury Maturities | Federal Reserve Board product, which contains a data set that is updated daily with new data. This post walks through the process, from browsing the catalog and subscribing to the data products to setting up an automation to retrieve new revisions to S3 automatically, making it readily available to analyze using other AWS services.

Solution overview

The solution has three steps:

  1. Configure your prerequisites: an S3 bucket for your data and IAM permissions for using AWS Data Exchange.
  2. Subscribe to a new data product in AWS Data Exchange.
  3. Set up an automation using Amazon CloudWatch events to retrieve new revisions of subscribed data products in AWS Data Exchange automatically.

Prerequisites

This post assumes you have an S3 bucket to which you export your data sets. For more information, see Create a Bucket.

You also need permissions to use AWS Data Exchange and associated services to subscribe to and export data sets. You can, for example, use the AWS Data Exchange managed policy AWSDataExchangeSubscriberFullAccess, which gives you all the necessary permissions needed to use AWS Data Exchange as a subscriber. For more information, see Identity and Access Management in AWS Data Exchange.

Browsing the catalog and subscribing to data products

Browsing and subscribing to a new data product is straightforward. The first step is to determine what data products you wish to subscribe to. Complete the following steps:

  1. On the AWS Data Exchange console, choose Product catalog.You can search for a term and filter results by provider name and pricing plan.
  2. For Product catalog, enter federal reserve.
  3. Choose Search.You can see multiple data products listed, including a few products by Rearc and Crux Informatics. You can filter the results further by refining the results.
  4. Under Refine results, under Vendors, select Rearc.This post is searching for free product offerings, so filters the results further.
  5. Under Pricing plans, select Free.The filtered results contain Daily Treasury Maturities | Federal Reserve Board, which you can use for testing.Choosing the product name shows more product details, including its full description, which data sets are included in the product (some products offer multiple data sets in a single subscription), the product’s support contact information, as well as the its offer details, such as the data subscription agreement, available pricing options, and the refund policy. See the following screenshot of the product detail page.It is important to understand the offer details you are agreeing to, including the price and Data Subscription Agreement (DSA). A link to view the DSA is under the Usage tab. Read over the DSA; it is a legal agreement that defines the rights to use the data. You need to make sure that the agreement aligns with your intended usage before subscribing.
  6. Choose Continue to subscribe.
  7. Under Complete subscription, for Pricing information, choose a subscription duration and price.
  8. For Renewal settings, choose whether you want to enable auto-renewal when the subscription expires.The following screenshot shows that this post chooses a subscription for 12 months, and to renew automatically.
  9. Choose Subscribe.The subscription process can take up to a few minutes to complete.

When your subscription is active, it is visible under the Active subscriptions tab of the Subscriptions page. Choose your subscription to view its details, including the data sets included in the subscription. You can also see the Region to which the vendor publishes the data set.

Viewing revisions and exporting assets

When you click on the data set name, you proceed to the data set page. You can view revisions under the Revisions tab. The following screenshot shows the list of revisions organized by Revision ID and time created.

Over time, as Rearc updates the data set, you can see multiple revisions listed.

Choosing the latest Revision ID brings up all the files (called assets in AWS Data Exchange) available in the revision. To export the assets, complete the following steps:

  1. Choose the asset to export.
  2. Choose Export to Amazon S3.
  3. Choose an S3 Bucket in the S3 navigation modal.
  4. Choose Export.

AWS Data Exchange starts copying the asset to your S3 bucket. In the console, AWS Data Exchange uses the asset name as an S3 object key. View the export progress in the Jobs list. It progresses through three steps: Waiting, Processing, and Completed.

Subscription Verification

AWS Data Exchange also has a feature called Subscription Verification. Subscription Verification allows providers to approve or decline subscription requests before granting subscription to certain products. For products with Subscription Verification enabled, you need to complete a form to share some information with the provider, who has up to 45 days to approve or reject the request. The form includes information such as your contact name, email address, company name, AWS account number, and intended use case for the data. The provider uses this information (and potentially reaches out to you for more information) to decide whether to approve your subscription request. You can view your subscription request status on the Subscriptions page under the Subscription requests tab. To learn more about subscription verification, see Subscription Verification for Subscribers.

Automating the retrieval for new data set revisions

Providers update many products regularly by creating and publishing new revisions to the underlying data sets. For example, the Rearc data product is updated daily. You want your analytics and visualizations to add these revisions to their insights easily. To do so, you need to set up an automation to retrieve the new files stored in newly published revisions.

The following diagram shows the workflow of this process.

Every time a new revision is published, AWS Data Exchange publishes a CloudWatch event sourced from aws.dataexchange. Using a Cloudwatch event rule to trigger a Lambda function, an AWS Data Exchange Job exports the revision’s assets to a pre-defined S3 bucket. It is interesting to note that because AWS Data Exchange uses the asset name as a default S3 object key when exporting to Amazon S3, and since Rearc is publishing a new revision with the same asset name every day, this automation will always override the previous day’s file with a new file, allowing you to always refer to the same S3 object, which will have the latest data.

An AWS CloudFormation template packages this automation. It contains all the necessary resources, including an S3 bucket to store the data, the Lambda function to export the data, its IAM role and policy, and the CloudWatch event rule to trigger the function. Packaging this automation in an AWS CloudFormation template makes it simple to repeat the automation for each data set you subscribe to. You can configure the template using the Data Set ID, which you can retrieve from the data set page that we have seen above.

In this post, we use a Lambda layer that extends the AWS Python SDK (boto3) that is built into the Lambda Python runtime by adding the AWS Data Exchange and AWS Marketplace Catalog API SDKs as of November 13, 2019. This is an example layer published for this post; use the correct version of the AWS SDK for your needs.

Deploying the automation

Before deploying the automation, make sure you are in the Region in which the data set is located. You can find this on the Subscription details page under Data sets.

  1. Click this button to deploy the CloudFormation template in us-east-1 region from the CloudFormation console.

    Alternatively, if you’re using a different region, you can manually create the stack in that region:

    • On the AWS CloudFormation console, choose Create Stack.
    • On the Create stack screen, for Template source, select Amazon S3 URL, and enter this URL in the box:
      https://aws-bigdata-blog.s3.amazonaws.com/artifacts/aws-blog-DataExchange/DataExchangeDownloadDataSet-v0.5.yaml
  1. On the stack details screen, give the Stack a name and paste in the ID of the dataset from the subscription. You can retrieve the Data Set ID from the AWS Data Exchange Console’s Subscriptions. Optionally, you can enter a Revision ID to download an existing revision to the s3 bucket immediately after stack creation. You can leave the revision ID blank, and only revisions published after this time will be downloaded to the s3 bucket. Choose Next.
  2. On the Configure stack options page, choose Next.
  3. On the Review screen, scroll down and check the three boxes in the Capabilities and transforms Then choose the Create stack button.

The stack takes 3–4 minutes to complete. Choose the refresh icon to see the latest status.  You can see the created S3 bucket under the Resources tab. This is where you can see new data set revisions.

Conclusion

In this post, you have searched and subscribed to a product and deployed the automation needed to automatically export new revisions to Amazon S3. This automation makes the data readily available to catalog and analyze using other AWS services. For example, you can catalog the new data automatically with an AWS Glue crawler, which creates and updates a table in your database with the Rearc data automatically. For more information, see Build and automate a serverless data lake using an AWS Glue trigger for the Data Catalog and ETL jobs. After cataloging the data, you can run a serverless ETL job to transform it into Parquet, or use it directly as-is from Amazon Athena or Amazon QuickSight.

 


About the Authors

Akram Chetibi is a Senior Product Manager of AWS Data Exchange. Akram joined AWS more than two years ago, and has launched multiple services including AWS Data Exchange and AWS Fargate.

 

 

 

 

George Seib is an Enterprise Solutions Architect with Amazon Web Services. He helps Financial Services and Enterprise customers cost effectively scale and secure data workloads.

 

 

 

Enhancing dashboard interactivity with Amazon QuickSight Actions

Post Syndicated from Sahitya Pandiri original https://aws.amazon.com/blogs/big-data/enhancing-dashboard-interactivity-with-amazon-quicksight-actions/

Amazon QuickSight now offers enhanced dashboard interactivity capabilities through QuickSight Actions. QuickSight Actions provide advanced filtering capabilities through single point-and-click actions on dashboards. With Actions, you can link visuals within a dashboard so that selecting a dimensional point on one visual provides you with granular insights on the selected point on other visuals within your dashboard. Therefore, you can start with summaries and dive deep into details of your business metrics, all within the same dashboard sheet. You can define what visuals within your dashboard are interactive and how these interact with each other. As of this writing, QuickSight Actions lets you define two primary actions of interactivity: filter actions and URL actions. URL actions within Amazon QuickSight are not new, but the point of entry to create URL actions is now consolidated with Actions.

You can apply QuickSight Actions to any supported chart that holds at least one dimension. This post provides examples of getting starting with Actions, configuring different Actions on a dashboard, and enabling different forms of interactivity for each action configured.

This post uses the following data sets:

B2B Sales
This data set holds order details for a fictitious company ABCDO for 2016 and 2017. The dashboard we will build will report on sales metrics by industry, segment, region as primary dimensions, and also provides granular details for each order purchased.

Product Availability
This data set holds available quantity for every product by ID.

Prerequisites

Before implementing Actions on your Amazon QuickSight dashboards, review how to create and publish dashboards.

Getting started with QuickSight Actions

This screenshot is a dashboard built from the above two data sets. It shows sales by category, industry and region on line 1; segment sales by quarter, industry sales by segment on line 2; total profit, sales, discount and quanity sold on line 3; order details pivot on line 4 and shipping details pivot on line 5.

Before getting started, note the following terminology:

  • Source visual – The visual on which an action is created. Choosing a point on the source visual triggers the action, and the dimensional values chosen are passed as filters to target visuals.
  • Target visual – The visual that is filtered by the dimensional values chosen on the source visual.
  • Activation – You can trigger an action either by  selecting it directly (left-click), or selecting from the menu options (right-click).
  • Action type – You can configure two types of actions on a visual: filter actions and URL actions. Filter actions pass select or all dimensions as filters across select or all visuals across the dashboard. URL actions allow you to navigate from the dashboard to an external site or different sheet within the same dashboard with the selected dimension passed along.

Setting up a click/select action

To set up a click/select action, complete the following steps:

  1. Select the source visual on the analysis and choose Actions.The following screenshot shows you this step.
  2. Within Actions, you can either create and configure your actions or use the 1-click action setup. The 1-click setup creates a default action on the visual. As a result, when a point on the visual is selected/clicked, all dimensions on a selected point are passed as filters across all other visuals on the dashboard. Under Quick create, choose Filter same-sheet visuals.

This creates a default action, which you can modify at any time.

After creating the 1-click action, you can select any point on the source visual. The following screenshot shows that the selected point is highlighted and all the underlying dimensions are used as filters on all other visuals on the analysis or dashboard.

In this screenshot, selecting Copper and Diamonds for the year 2017 passes these two as filters to all other visuals.

To verify what filter actions are applied on a particular visual, choose the filter icon on the top right. This shows all the filters applied through filter actions and also regular filters.

Setting up custom filter actions

To perform further analysis on the sales metrics for any segment, configure an action on the Industry sales by Segment visual. To configure a filter action to pass segment as the filter onto the KPI charts, complete the following steps:

  1. Choose the source visual and choose Actions.
  2. Choose Define a custom action.
  3. For Action name, enter a name for your action. You could also add dynamic placeholder values to the name to be more descriptive to your readers by choosing the plus sign next to the Action name.You have four configuration settings. While this post uses values specific for this use case, you can choose other values according to your specific needs.
  4. For Activation, select Menu option.
  5. For Action type, choose Filter action.
  6. For Filter scope, select Selected fields, segment.
  7. For Target visuals, select Select visuals and check needed target visuals Total quantity, Total discount offered, and Total profit by year.
  8. Choose Save.

To view menu options and choose a filter action, right-click any segment on the visual. This post selects Strategic. The following screenshot shows that all the Strategic points are highlighted and the KPIs are updated.

Adding filter actions on a pivot tables

Filter actions on pivot tables provide exceptional flexibility to pass anything from the innermost to the outermost dimensions. Additionally, when a filter action is triggered, the source pivot table is highlighted to show values that are passed as filters to the target visuals.

This post adds click/select filter actions on the Order details pivot table to analyze shipping details for each order. This post also adds two filter actions to the menu options: one to filter by the product name and the other by the customer name.

To create click/select filter actions on order_id, complete the following steps:

  1. Choose the Order details pivot table chart.
  2. Choose Actions.
  3. Choose Define custom action.
  4. For Action name, enter your action name.
  5. For Activation, select Select.
  6. For Filter scope, select Selected fields and check order_id.
  7. For Target visuals, select Select visuals, and check Shipping details.
  8. Choose Save.

The following screenshot shows that when choosing any point on the pivot table, the order pertaining to the point is highlighted and the Shipping details table shows shipping details for the selected order.

You can only create one Selection action on a source visual and any number of menu option actions.

You can now create additional actions in the menu. To create a new actions, go to Actions, choose the plus sign and create these two actions.

After creating the two actions, they are visible in the Actions menu. See the following screenshot.

After setting up these action filters, you can right-click on the visual and trigger these filters from the menu.

The screenshots below shows you the menu on right-clicking product name.

The screenshots below shows you the menu on right-clicking customer name.

Removing applied filter actions

There are three ways to remove a triggered filter action:

  • On source visuals with filter actions triggered via select or menu options, clear the filter action by choosing the selected point again or a different point.
  • On source visuals with empty spaces within the visual (such as bar chart family or pie chart), clicking within the empty space also clears the filter action selection. This deselection is not applicable to heat maps and tree maps. Deselect points on these charts by choosing an already selected point.
  • On target visuals with a highlighted filter icon, choosing the X on the filter actions displayed removes the filter for this visual and all other visuals the filter is applied to.

Using action filters when the source and target visuals use different data sets

If you use multiple data sets within an analysis, Amazon QuickSight automatically maps fields across these data sets by the field name. For example, product_id is a field in the B2B Sales data set and Product Availability data set. When setting a filter action on product_id from a source visual using the B2B Sales data set, the target visual (the adjacent KPI) showing product quantity availability shows results for the selected product_id from the source visual. This implicit mapping of fields is case sensitive.

You can add the new data set Product Availability to this analysis. For more information, see Adding a Data Set to an Analysis.

Add a KPI chart to show count of product_id as in the screenshot below.

To show total quantity available of select products, create an action filter on the menu options, as seen in the screenshot below:

To drill down into product quantity availability, choose (right-click) product_id on the order details pivot table. The following screenshot shows the product availability details for the selected product_id.

Creating custom URL actions

Custom URL actions aren’t new to Amazon QuickSight, but the point of entry to create URL actions is now consolidated with Actions. To create a custom URL action on the Order details pivot table to see Google search results on a customer, complete the following steps:

  1. Choose the Order details pivot table.
  2. Choose Actions.
  3. Choose Add a new action.
  4. For Action name, enter an action name.
  5. For Action type, choose URL action.
  6. Enter the URL https://www.google.com/search?q=<<customer_name>>.Customer_name is the value of the dimension field to select.
  7. Choose New browser tab.
  8. Choose Save.

When you right-click on the pivot table, menu options appear with your two filter actions created previously and the new custom URL action. See the following screenshot.

Conclusion

With Amazon QuickSight, you can create filter actions on any chart type with at least one dimensional value. You can also create URL actions on all chart types, including KPIs and gauge charts, which do not hold dimensional values. As of this writing, QuickSight Actions don’t support selecting multiple data points within the same source visual, cascading filter actions in which you choose one filter action on one visual and a subsequent filter on another visual, or adding customer mappings between different data sets used in the dashboard. You can expect to see these features in future releases.

QuickSight Actions is available on both Enterprise and Standard editions in all QuickSight-supported regions.

 


About the Author

Sahitya Pandiri is a technical program manager with Amazon Web Services. Sahitya has been in product/program management for 6 years now, and has built multiple products in the retail, healthcare, and analytics space.

 

 

 

Secure your data on Amazon EMR using native EBS and per bucket S3 encryption options

Post Syndicated from Duncan Chan original https://aws.amazon.com/blogs/big-data/secure-your-data-on-amazon-emr-using-native-ebs-and-per-bucket-s3-encryption-options/

Data encryption is an effective solution to bolster data security. You can make sure that only authorized users or applications read your sensitive data by encrypting your data and managing access to the encryption key. One of the main reasons that customers from regulated industries such as healthcare and finance choose Amazon EMR is because it provides them with a compliant environment to store and access data securely.

This post provides a detailed walkthrough of two new encryption options to help you secure your EMR cluster that handles sensitive data. The first option is native EBS encryption to encrypt volumes attached to EMR clusters. The second option is an Amazon S3 encryption that allows you to use different encryption modes and customer master keys (CMKs) for individual S3 buckets with Amazon EMR.

Local disk encryption on Amazon EMR

Previously you could only choose Linux Unified Key Setup (LUKS) for at-rest encryption. You now have a choice of using LUKS or native EBS encryption to encrypt EBS volumes attached to an EMR cluster. EBS encryption provides the following benefits:

  • End-to-end encryption – When you enable EBS encryption for Amazon EMR, all data on EBS volumes, including intermediate disk spills from applications and Disk I/O between the nodes and EBS volumes, are encrypted. The snapshots that you take of an encrypted EBS volume are also encrypted and you can move them between AWS Regions as needed.
  • Amazon EMR root volumes encryption – There is no need to create a custom Amazon Linux Image for encrypting root volumes.
  • Easy auditing for encryption When you use LUKS encryption, though your EBS volumes are encrypted along with any instance store volumes, you still see EBS with Not Encrypted status when you use an Amazon EC2 API or the EC2 console to check on the encryption status. This is because the API doesn’t look into the EMR cluster to check the disk status; your auditors would need to SSH into the cluster to check for disk encrypted compliance. However, with EBS encryption, you can check the encryptions status from the EC2 console or through an EC2 API call.
  • Transparent Encryption – EBS encryption is transparent to any applications running on Amazon EMR and doesn’t require you to modify any code.

Amazon EBS encryption integrates with AWS KMS to provide the encryption keys that protect your data. To use this feature, you have to use a CMK in your account and Region. A CMK gives you control to create and manage the key, including enabling and disabling the key, controlling access, rotating the key, and deleting it. For more information, see Customer Master Keys.

Enabling EBS encryption on Amazon EMR

To enable EBS encryption on Amazon EMR, complete the following steps:

  1. Create your CMK in AWS KMS.
    You can do this either through the AWS KMS console, AWS CLI, or the AWS KMS CreateKey API. Create keys in the same Region as your EMR cluster. For more information, see Creating Keys.
  2. Give the Amazon EMR service role and EC2 instance profile permission to use your CMK on your behalf.
    If you are using the EMR_DefaultRole, add the policy with the following steps:

    • Open the AWS KMS console.
    • Choose your AWS Region.
    • Choose the key ID or alias of the CMK you created.
    • On the key details page, under Key Users, choose Add.
    • Choose the Amazon EMR service role.The name of the default role is EMR_DefaultRole.
    • Choose Attach.
    • Choose the Amazon EC2 instance profile.The name of the default role for the instance profile is EMR_EC2_DefaultRole.
    • Choose Attach.
      If you are using a customized policy, add the following code to the service role to allow Amazon EMR to create and use the CMK, with the resource being the CMK ARN:

      { 
      "Version": "2012-10-17", 
      "Statement": [ 
         { 
         "Sid": "EmrDiskEncryptionPolicy", 
         "Effect": "Allow", 
         "Action": [ 
            "kms:Encrypt", 
            "kms:Decrypt", 
            "kms:ReEncrypt*", 
            "kms:CreateGrant", 
            "kms:GenerateDataKeyWithoutPlaintext", 
            "kms:DescribeKey" 
            ], 
         "Resource": [ 
            " arn:aws:kms:region:account-id:key/xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx " " 
            ] 
         } 
      ] 
      } 

       

  3. Create and configure the Amazon EMR Security configuration template.Do this either through the console or using CLI or SDK, with the following steps:
    • Open the Amazon EMR console.
    • Choose Security Configuration.
    • Under Local disk encryption, choose Enable at-rest encryption for local disks
    • For Key provider type, choose AWS KMS.
    • For AWS KMS customer master key, choose the key ARN of your CMK.This post uses the key ARN ebsEncryption_emr_default_role.
    • Select Encrypt EBS volumes with EBS encryption.

Default encryption with EC2 vs. Amazon EMR EBS encryption

EC2 has a similar feature called default encryption. With this feature, all EBS volumes in your account are encrypted without exception using a single CMK that you specify per Region. With EBS encryption from Amazon EMR, you can use different a KMS key per EMR cluster to secure your EBS volumes. You can use both EBS encryption provided by Amazon EMR and default encryption provided by EC2.

For this post, EBS encryption provided by Amazon EMR takes precedent, and you encrypt the EBS volumes attached to the cluster with the CMK that you selected in the security configuration.

S3 encryption

Amazon S3 encryption also works with Amazon EMR File System (EMRFS) objects read from and written to S3. You can use either server-side encryption (SSE) or client-side encryption (CSE) mode to encrypt objects in S3 buckets. The following table summarizes the different encryption modes available for S3 encryption in Amazon EMR.

Encryption locationKey storageKey management
SSE-S3Server side on S3S3S3
SSE-KMSServer side on S3KMS

Choose the AWS managed CMK for Amazon S3 with the alias aws/s3, or create a custom CMK.

 

CSE-KMSClient side on the EMR clusterKMSA custom CMK that you create.
CSE-CustomClient side on the EMR clusterYouYour own key provider.

The encryption choice you make depends on your specific workload requirements. Though SSE-S3 is the most straightforward option that allows you to fully delegate the encryption of S3 objects to Amazon S3 by selecting a check box, SSE-KMS or CSE-KMS are better options that give you granular control over CMKs in KMS by using policies. With AWS KMS, you can see when, where, and by whom your customer managed keys (CMK) were used, because AWS CloudTrail logs API calls for key access and key management. These logs provide you with full audit capabilities for your keys. For more information, see Encryption at Rest for EMRFS Data in Amazon S3.

Encrypting your S3 buckets with different encryption modes and keys

With S3 encryption on Amazon EMR, all the encryption modes use a single CMK by default to encrypt objects in S3. If you have highly sensitive content in specific S3 buckets, you may want to manage the encryption of these buckets separately by using different CMKs or encryption modes for individual buckets. You can accomplish this using the per bucket encryption overrides option in Amazon EMR. To do so, complete the following steps:

  1. Open the Amazon EMR console.
  2. Choose Security Configuration.
  3. Under S3 encryption, select Enable at-rest encryption for EMRFS data in Amazon S3.
  4. For Default encryption mode, choose your encryption mode.This post uses SSE-KMS.
  5. For AWS KMS customer master key, choose your key.The key you provide here encrypts all S3 buckets used with Amazon EMR. This post uses ebsEncryption_emr_default_role.
  6. Choose Per bucket encryption overrides.You can set different encryption modes for different buckets.
  7. For S3 bucket, add your S3 bucket that you want to encrypt differently.
  8. For Encryption mode, choose an encryption mode.
  9. For Encryption materials, enter your CMK.

If you have already enabled default encryption for S3 buckets directly in Amazon S3, you can also choose to bypass the S3 encryption options in the security configuration setting in Amazon EMR. This allows Amazon EMR to delegate encrypting objects in the buckets to Amazon S3, which uses the encryption key specified in the bucket policy to encrypt objects before persisting it on S3.

Summary

This post walked through the native EBS and S3 encryption options available with Amazon EMR to encrypt and secure your data. Please share your feedback on how these optimizations benefit your real-world workloads.

 


About the Author

Duncan Chan is a software development engineer for Amazon EMR. He enjoys learning and working on big data technologies. When he is not working, he will be playing with his dogs.

 

 

 

Amazon QuickSight announces the all-new QuickSight Mobile app

Post Syndicated from Tina Kelleher original https://aws.amazon.com/blogs/big-data/announcing-the-new-mobile-app-for-amazon-quicksight/

AWS is happy to announce the release of QuickSight Mobile for iOS and Android devices. This release is both a major update to the existing iOS app and the launch of a new Android application. The app enables you to securely get insights from your data from anywhere; favorite, browse, and interact with your dashboards; explore your data with drilldowns and filters; stay ahead of the curve via forecasting; get email alerts when unexpected changes happen in your data; and share those insights with colleagues.

To download the QuickSight Mobile app, visit the iOS App Store and Google Play.

The new QuickSight Mobile optimizes the dashboard consumption experience with newly added features and enhanced interactivity. In this blog, we will walk you through the new Mobile experience in detail.

Features and capabilities

Home page

After authenticating with the app, you land on the QuickSight home screen. From here, you have quick access to your favorite dashboards (synched from the browser application), and most recently viewed dashboards. The recently viewed list can display up to the last 100 dashboards you previously opened.

The following screenshots show your favorited and recently viewed dashboards.

Dashboards

You can browse through all your dashboards via the Dashboards tab. You can sort the dashboards by name or updated (published) date and change to a list or a grid view. You can also search for dashboards by name. Lastly, you can easily add any dashboard to your favorites by choosing the star icon next to the dashboard’s name.

The following screenshots show dashboards in grid view, display preferences and search tool.

Dashboard view

The dashboard view is optimized for your mobile device by showing the visuals in vertical stack view, with each visual expanded to full width. A multi-sheet dashboard shows the first sheet by default. To navigate across sheets, choose the sheet title and select the sheet name you want to view.

The following screenshots show an opened dashboard with visuals in vertical stack view and how you navigate across different sheets within the dashboard.

To apply filters to visuals, choose the funnel icon to view all available filter controls and options for your dashboard sheet. After you’ve made your selection, choose Apply, and choose Done. You can see the number of filters applied via the small blue tag next to the funnel icon. You can also expand the filter control view and reset a given filter to the default.

The following screenshots show dashboard filters, expanded filter pane and how you select filter values.

Some visuals have a scroll bar, which allows you to customize the range to zoom in on a visual. A long press on a data point on most charts, such as a line or a bar, brings up the context menu. You can focus on this data point, drill up or down if the visual has a drill hierarchy, and exclude the chosen data point.

The following screenshots show interactions with zoom bars and context menu.

To go back to the full view, choose the … icon in the upper right and choose Undo. To exit the dashboard view, choose the back arrow.

If it is an anomaly detection widget, you can click on “Explore anomalies” and get redirected to the detailed anomalies page. Here, you can see all of the anomalies detected for the latest period. To perform contribution analysis to uncover the hidden drivers, long press the anomaly data point and you will see the option show up in the bottom sheet. To find out more about anomaly detection and contribution analysis, see Amazon QuickSight Announces General Availability of ML Insights.

You can expect contribution analysis to extend support to all visuals beyond anomaly detection on the app in the near future.

The following screenshots show the anomaly details and contribution analysis flow.

Visual view

From the dashboard view, choose the expand icon in the upper right corner to expand the desired visual to full-screen. The app supports both portrait and landscape modes in the expanded visual, and you can use the same interactions such as zoom, drill, focus, and exclude.

The following screenshots show expanded visuals in both portrait and landscape modes.

Settings

The Settings tab allows you to change language, Region, and biometric authentication methods. You can also provide feedback by choosing Provide feedback, which initiates an email to our mobile team.

The following screenshot shows you the options on settings page.

Setup and authentication

The QuickSight Mobile app supports authentication by any provisioned user. You can authenticate by using either your Amazon QuickSight username and password (with MFA, if this option is enabled), or through your own identity provider (IdP). After authentication, you can re-authenticate using biometric authentication such as face ID or touch ID, if supported by your mobile device, or an application-specific PIN.

The following screenshots show the generic sign-in flow.

IdP-initiated login

For IdP-initiated logins, set up your account with identity federation. For more information, see Federate Amazon QuickSight access with Okta. You can federate using standard mobile federation processes that your IdP provides, either through an app or through the web. This post uses the Okta mobile app. When you are federated, you receive the prompt to either continue your session using the mobile web experience or in the QuickSight Mobile app.

Account administrators can set the duration of mobile sessions to be anywhere from 1–30 days through the administrative console. The default session length for new accounts is 30 days.

The following screenshots show IdP-initiated login flow.

Microsoft Active Directory

If your account uses Microsoft Active Directory, you can log in to the app using your current AD credentials. If your account requires MFA, you also need to provide your MFA token to authenticate. For more information, see Can I use AWS Directory Service for Microsoft Active Directory to authenticate users in Amazon QuickSight?

Email Invitations

If you receive an email invitation to Amazon QuickSight, follow the instructions in the email to set your username and password through the website (mobile or desktop). Use these credentials to authenticate with the mobile applications.

IAM Users

If your account is an IAM user, you need to provision yourself as an Amazon QuickSight user before authenticating in the mobile application. For more information on provisioning users, Provisioning Users for Amazon QuickSight.

Summary

This post provided an overview of the key features and functionalities as well as sign-in flows for the new QuickSight Mobile app. The app is available as a free download in Google Play and the iOS App Store. You can expect the app to continue to evolve its features and capabilities to give you the best experience for getting insights from your data while on the go!

If you have any questions or feedback, please leave a comment.

 


About the Authors

Susan Fang is a senior product manager for QuickSight with AWS.

 

 

 

 

Brian Stein is a software development manager at AWS.

 

 

 

 

Joining across data sources on Amazon QuickSight

Post Syndicated from Rakshith Dayananda original https://aws.amazon.com/blogs/big-data/joining-across-data-sources-on-amazon-quicksight/

Amazon QuickSight announced the launch of Cross Data Source Join, which allows you to connect to multiple data sources and join data across these sources in Amazon QuickSight directly to create data sets used to build dashboards. For example, you can join transactional data in Amazon Redshift that contains customer IDs with Salesforce tables that contain customer profile data to create an interactive dashboard with order and customer details. You can slice and dice transactional data by various customer dimensional data such as segment, geographic, or demographic without first pulling the data to a single source outside Amazon QuickSight.

With cross data source joins, you can join across all data sources supported by Amazon QuickSight, including file-to-file, file-to-database, and database-to-database joins using the built-in drag-and-drop UI, without heavily relying on the BI and data engineering teams to set up complex and time-consuming ETLs. Whether it is local CSV files, Amazon RDS databases, or JSON objects on an S3 bucket, you can now join any of these data sources together to create data sets.

Finally, you can set up a scheduled refresh up to the hour and confirm that the joined data set is always up to date with the latest information.

Getting started with Cross Data Source Join

The screenshot below shows all data sources you can connect to on QuickSight.

Amazon QuickSight allows you to connect to different data sources. It is common for businesses to have data spread across multiple data sources, depending on your data requirements. For example, you might have your web server logs stored in Amazon S3, customer details on Amazon Redshift tables, and order details on RDS. You may need to build reports from data combined from two or more of these different data sources.

You can accomplish this to some extent by building data pipelines to consolidate from multiple data sources into one single data source. However, creating these data pipelines results in data duplications across various AWS services, and adds additional cost in terms of effort and time to move data to a single data source. You would then build Amazon QuickSight data sets from this single data source. With cross data source join available directly on Amazon QuickSight, you can eliminate this problem.

There is no size restriction on your largest source, as long as the post-join table can fit into your SPICE capacity per data set. The rest of the tables together need to be within 1 GB in size. For example, if you have 20 numeric columns in your smaller table, you can fit about 5 million rows until you exceed the 1 GB memory limit.

This post demonstrates how to create data sets from two CSV files and how to join an RDS table with an S3 file. The post uses an example table with orders-related data in one data source, and returns-related data in another data source. The final goal is to create one single data set that contains both orders and returns data.

Prerequisites

Before getting started, download these CSV files in to your local machines:

Also, learn how to create, edit, delete QuickSight data sources from Working with Data Sources in Amazon QuickSight.

Joining multiple CSV files

To join two CSV files, complete the following steps:

  1. Use the orders CSV file downloaded from the S3 bucket above and upload to QuickSight.
  2. After selecting the ‘orders’ sheet, go to edit/preview data page where your data set details appears.
  1. From the top menu, choose Add data.A pop-up window appears with the option to either switch data sources or upload a new CSV to join with the current data set. The window also shown has existing source.
  1. Choose Upload a file and upload the ‘returns’ CSV file.After uploading the file, you can see sheet names for both the CSVs. The following screenshot shows orders and returns.
  1. Choose the two circles between the files.This step allows you to edit the join configuration between the two CSVs. In the Join configuration section, you can select the join type (inner, left, right, or full) and also select the column on which to apply the join. This post uses an inner join.
  1. For Join type, choose Inner.This post is joining the order ID classes of the two files.
  1. For Join classes, select Order ID in both drop-downs.
  1. Choose Apply.

You now have a data set that contains both orders and returns data from two different CSVs. You can save your data set and continue to create your analysis.

Joining an RDS table with S3

In this example, you have orders data in RDS and returns data as a JSON file in an S3 bucket. To join this data, complete the following steps:

  1. Create a data set on QuickSight from the RDS table. Review Create a Database Data Set and an Analysis to learn to connect to RDS to create data sets.
  2. Next, go to Manage data set, select the RDS data set. This post uses the orders data set.
  1. Choose Edit data set.A page with your data set details appears.
  1. From the top menu, choose Add data.A pop-up window appears with the option to either switch data sources or upload a new CSV to join with the current data set. The window also shown has existing source.
  2. Choose Switch data source.A list appears of the different data sets and their sources.
  1. Choose a data set. This post chooses Returns.You can now see both data sets linked together.
  2. Choose the two spheres between the data sets.
  3. Under Join configuration, choose your desired Join type. This post chooses Inner.
  1. For Join clauses, from the drop-downs of each data set, select the column on which to apply the join.This post chooses order_id for orders and Order ID for Returns.
  1. Choose Apply.

Your new data set contains both orders and returns data from two different CSVs. You can save your data set and continue to create your analysis.

Conclusion

This post showed you how to join data from two files, join tables from RDS and S3. You can join data from any two data sources (except IoT) on Amazon QuickSight with this method. Cross data source join capability is now available in both Enterprise and Standard editions in all Amazon QuickSight Regions.

 


About the Author

Rakshith Dayananda is an IT app dev engineer at AWS.

 

 

 

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 2

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-2/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

In Part 1 of this post series, you learned how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows.

This post guides you through deploying the AWS CloudFormation templates, configuring Genie, and running an example workflow authored in Apache Airflow.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Solution overview

This solution uses an AWS CloudFormation template to create the necessary resources.

Users access the Apache Airflow Web UI and the Genie Web UI via SSH tunnel to the bastion host.

The Apache Airflow deployment uses Amazon ElastiCache for Redis as a Celery backend, Amazon EFS as a mount point to store DAGs, and Amazon RDS PostgreSQL for database services.

Genie uses Apache Zookeeper for leader election, an Amazon S3 bucket to store configurations (binaries, application dependencies, cluster metadata), and Amazon RDS PostgreSQL for database services. Genie submits jobs to an Amazon EMR cluster.

The architecture in this post is for demo purposes. In a production environment, the Apache Airflow and the Genie instances should be part of an Auto Scaling Group. For more information, see Deployment on the Genie Reference Guide.

The following diagram illustrates the solution architecture.

Creating and storing admin passwords in AWS Systems Manager Parameter Store

This solution uses AWS Systems Manager Parameter Store to store the passwords used in the configuration scripts. With AWS Systems Manager Parameter Store, you can create secure string parameters, which are parameters that have a plaintext parameter name and an encrypted parameter value. Parameter Store uses AWS KMS to encrypt and decrypt the parameter values of secure string parameters.

Before deploying the AWS CloudFormation templates, execute the following AWS CLI commands. These commands create AWS Systems Manager Parameter Store parameters to store the passwords for the RDS master user, the Airflow DB administrator, and the Genie DB administrator.

aws ssm put-parameter --name "/GenieStack/RDS/Settings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/AirflowSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

aws ssm put-parameter --name "/GenieStack/RDS/GenieSettings" --type SecureString --value "ch4ng1ng-s3cr3t" --region Your-AWS-Region

Creating an Amazon S3 Bucket for the solution and uploading the solution artifacts to S3

This solution uses Amazon S3 to store all artifacts used in the solution. Before deploying the AWS CloudFormation templates, create an Amazon S3 bucket and download the artifacts required by the solution from this link.

Unzip the artifacts required by the solution and upload the airflow and genie directories to the Amazon S3 bucket you just created. Keep a record of the Amazon S3 root path because you add it as a parameter to the AWS CloudFormation template later.

As an example, the following screenshot uses the root location geniestackbucket.

Use the value of the Amazon S3 Bucket you created for the AWS CloudFormation parameters GenieS3BucketLocation and AirflowBucketLocation.

Deploying the AWS CloudFormation stack

To launch the entire solution, choose Launch Stack.

The following table explains the parameters that the template requires. You can accept the default values for any parameters not in the table. For the full list of parameters, see the AWS CloudFormation template.

ParameterValue
Location of the configuration artifactsGenieS3BucketLocationThe S3 bucket with Genie artifacts and Genie’s installation scripts. For example: geniestackbucket.
AirflowBucketLocationThe S3 bucket with the Airflow artifacts. For example: geniestackbucket.
NetworkingSSHLocationThe IP address range to SSH to the Genie, Apache Zookeeper, and Apache Airflow EC2 instances.
SecurityBastionKeyNameAn existing EC2 key pair to enable SSH access to the bastion host instance.
AirflowKeyNameAn existing EC2 key pair to enable SSH access to the Apache Airflow instance.
ZKKeyNameAn existing EC2 key pair to enable SSH access to the Apache Zookeeper instance.
GenieKeyNameAn existing EC2 key pair to enable SSH access to the Genie.
EMRKeyNameAn existing Amazon EC2 key pair to enable SSH access to the Amazon EMR cluster.
LoggingemrLogUriThe S3 location to store Amazon EMR cluster Logs. For example: s3://replace-with-your-bucket-name/emrlogs/

Post-deployment steps

To access the Apache Airflow and Genie Web Interfaces, set up an SSH and configure a SOCKS proxy for your browser. Complete the following steps:

  1. On the AWS CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the public DNS of the bastion host instance.The following screenshot shows the instance this post uses.
  4. Set up an SSH tunnel to the master node using dynamic port forwarding.
    Instead of using the master public DNS name of your cluster and the username hadoop, which the walkthrough references, use the public DNS of the bastion host instance and replace the user hadoop for the user ec2-user.
  1. Configure the proxy settings to view websites hosted on the master node.
    You do not need to modify any of the steps in the walkthrough.

This process configures a SOCKS proxy management tool that allows you to automatically filter URLs based on text patterns and limit the proxy settings to domains that match the form of the Amazon EC2 instance’s public DNS name.

Accessing the Web UI for Apache Airflow and Genie

To access the Web UI for Apache Airflow and Genie, complete the following steps:

  1. On the CloudFormation console, choose the stack you created.
  2. Choose the Outputs
  3. Find the URLs for the Apache Airflow and Genie Web UI.The following screenshot shows the URLs that this post uses.
  1. Open two tabs in your web browser. You will use the tabs for the Apache Airflow UI and the Genie UI.
  2. For the Foxy Proxy you configured previously, click the icon Foxy Proxy added to the top right section of your browser and choose Use proxies based on their predefined patterns and priorities.The following screenshot shows the proxy options.
  1. Enter the URL for the Apache Airflow Web UI and for the Genie Web UI on their respective tabs.

You are now ready to run a workflow in this solution.

Preparing application resources

The first step as a platform admin engineer is to prepare the binaries and configurations of the big data applications that the platform supports. In this post, the Amazon EMR clusters use release 5.26.0. Because Amazon EMR release 5.26.0 has Hadoop 2.8.5 and Spark 2.4.3 installed, those are the applications you want to support in the big data platform. If you decide to use a different EMR release, prepare binaries and configurations for those versions. The following sections guide you through the steps to prepare binaries should you wish to use a different EMR release version.

To prepare a Genie application resource, create a YAML file with fields that are sent to Genie in a request to create an application resource.

This file defines metadata information about the application, such as the application name, type, version, tags, the location on S3 of the setup script, and the location of the application binaries. For more information, see Create an Application in the Genie REST API Guide.

Tag structure for application resources

This post uses the following tags for application resources:

  • type – The application type, such as Spark, Hadoop, Hive, Sqoop, or Presto.
  • version – The version of the application, such as 2.8.5 for Hadoop.

The next section shows how the tags are defined in the YAML file for an application resource. You can add an arbitrary number of tags to associate with Genie resources. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the Hadoop 2.8.5 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: hadoop-2.8.5
name: hadoop
user: hadoop
status: ACTIVE
description: Hadoop 2.8.5 Application
setupFile: s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/setup.sh
configs: []
version: 2.8.5
type: hadoop
tags:
  - type:hadoop
  - version:2.8.5
dependencies:
  - s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.tar.gz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/hadoop-2.8.5.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The S3 objects referenced by the setupFile and dependencies labels are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download hadoop-2.8.5.tar.gz from https://www.apache.org/dist/hadoop/core/hadoop-2.8.5/.
  2. Upload hadoop-2.8.5.tar.gz to s3://Your_Bucket_Name/genie/applications/hadoop-2.8.5/.

Preparing the Spark 2.4.3 application resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3
name: spark
user: hadoop
status: ACTIVE
description: Spark 2.4.3 Application
setupFile: s3://Your_Bucket_Name/genie/applications/spark-2.4.3/setup.sh
configs: []
version: 2.4.3
type: spark
tags:
  - type:spark
  - version:2.4.3
  - version:2.4
dependencies:
  - s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3-bin-hadoop2.7.tgz

The file is also available directly at s3://Your_Bucket_Name/genie/applications/spark-2.4.3/spark-2.4.3.yml.

NOTE: The following steps are for reference only, should you be completing this manually, rather than using the automation option provided.

The objects in setupFile and dependencies are available in your S3 bucket. For your reference, the steps to prepare the artifacts used by properties setupFile and dependencies are as follows:

  1. Download spark-2.4.3-bin-hadoop2.7.tgz from https://archive.apache.org/dist/spark/spark-2.4.3/ .
  2. Upload spark-2.4.3-bin-hadoop2.7.tgz to s3://Your_Bucket_Name/genie/applications/spark-2.4.3/ .

Because spark-2.4.3-bin-hadoop2.7.tgz uses Hadoop 2.7 and not Hadoop 2.8.3, you need to extract the EMRFS libraries for Hadoop 2.7 from an EMR cluster running Hadoop 2.7 (release 5.11.3). This is already available in your S3 Bucket. For reference, the steps to extract the EMRFS libraries are as follows:

  1. Deploy an EMR cluster with release 5.11.3.
  2. Run the following command:
aws s3 cp /usr/share/aws/emr/emrfs/lib/emrfs-hadoop-assembly-2.20.0.jar s3://Your_Bucket_Name/genie/applications/spark-2.4.3/hadoop-2.7/aws/emr/emrfs/lib/

Preparing a command resource

The next step as a platform admin engineer is to prepare the Genie commands that the platform supports.

In this post, the workflows use Apache Spark. This section shows the steps to prepare a command resource of type Apache Spark.

To prepare a Genie command resource, create a YAML file with fields that are sent to Genie in a request to create a command resource.

This file defines metadata information about the command, such as the command name, type, version, tags, the location on S3 of the setup script, and the parameters to use during command execution. For more information, see Create a Command in the Genie REST API Guide.

Tag structure for command resources

This post uses the following tag structure for command resources:

  • type – The command type, for example, spark-submit.
  • version – The version of the command, for example, 2.4.3 for Spark.

The next section shows how the tags are defined in the YAML file for a command resource. Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files.

Preparing the spark-submit command resource

This post provides an automated creation of the YAML file. The following code shows the resulting file details:

id: spark-2.4.3_spark-submit
name: Spark Submit 
user: hadoop 
description: Spark Submit Command 
status: ACTIVE 
setupFile: s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/setup.sh
configs: [] 
executable: ${SPARK_HOME}/bin/spark-submit --master yarn --deploy-mode client 
version: 2.4.3 
tags:
  - type:spark-submit
  - version:2.4.3
checkDelay: 5000

The file is also available at s3://Your_Bucket_Name/genie/commands/spark-2.4.3_spark-submit/spark-2.4.3_spark-submit.yml.

The objects in setupFile are available in your S3 bucket.

Preparing cluster resources

This post also automated the step to prepare cluster resources; it follows a similar process as described previously but applied to cluster resources.

During the startup of the Amazon EMR cluster, a custom script creates a YAML file with the metadata details about the cluster and uploads the file to S3. For more information, see Create a Cluster in the Genie REST API Guide.

The script also extracts all Amazon EMR libraries and uploads them to S3. The next section discusses the process of registering clusters with Genie.

The script is available at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

Tag structure for cluster resources

This post uses the following tag structure for cluster resources:

  • cluster.release – The Amazon EMR release name. For example, emr-5.26.0.
  • cluster.id – The Amazon EMR cluster ID. For example, j-xxxxxxxx.
  • cluster.name – The Amazon EMR cluster name.
  • cluster.role – The role associated with this cluster. For this post, the role is batch. Other possible roles would be ad hoc or Presto, for example.

You can add new tags for a cluster resource or change the values of existing tags by editing s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh.

You could also use other combinations of tags, such as a tag to identify the application lifecycle environment or required custom jars.

Genie also maintains their own tags in addition to the ones the platform admins define, which you can see in the field ID and field name of the files. If multiple clusters share the same tag, by default, Genie distributes jobs across clusters associated with the same tag randomly. For more information, see Cluster Load Balancing in the Genie Reference Guide.

Registering resources with Genie

Up to this point, all the configuration activities mentioned in the previous sections were already prepared for you.

The following sections show how to register resources with Genie. In this section you will be connecting to the bastion via SSH to run configuration commands.

Registering application resources

To register the application resources you prepared in the previous section, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_application_resources.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the resource information, navigate to the Genie Web UI and choose the Applications tab. See the following screenshot. The screenshot shows two application resources, one for Apache Spark (version 2.4.3) and the other for Apache Hadoop (version 2.8.5).

Registering commands and associate commands with applications

The next step is to register the Genie command resources with specific applications. For this post, because spark-submit depends on Apache Hadoop and Apache Spark, associate the spark-submit command with both applications.

The order you define for the applications in file genie_register_command_resources_and_associate_applications.py is important. Because Apache Spark depends on Apache Hadoop, the file first references Apache Hadoop and then Apache Spark. See the following code:

commands = [{'command_name' : 'spark-2.4.3_spark-submit', 'applications' : ['hadoop-2.8.5', 'spark-2.4.3']}]

To register the command resources and associate them with the application resources registered in the previous step, SSH into the bastion host and run the following command:

python /tmp/genie_assets/scripts/genie_register_command_resources_and_associate_applications.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

To see the command you registered plus the applications it is linked to, navigate to the Genie Web UI and choose the Commands tab.

The following screenshot shows the command details and the applications it is linked to.

Registering Amazon EMR clusters

As previously mentioned, the Amazon EMR cluster deployed in this solution registers the cluster when the cluster starts via an Amazon EMR step. You can access the script that Amazon EMR clusters use at s3://Your_Bucket_Name/genie/scripts/genie_register_cluster.sh. The script also automates deregistering the cluster from Genie when the cluster terminates.

In the Genie Web UI, choose the Clusters tab. This page shows you the current cluster resources. You can also find the location of the configuration files that uploaded to the cluster S3 location during the registration step.

The following screenshot shows the cluster details and the location of configuration files (yarn-site.xml, core-site.xml, mapred-site.xml).

Linking commands to clusters

You have now registered all applications, commands, and clusters, and associated commands with the applications on which they depend. The final step is to link a command to a specific Amazon EMR cluster that is configured to run it.

Complete the following steps:

  1. SSH into the bastion host.
  2. Open /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py with your preferred text editor.
  3. Look for the following lines in the code:# Change cluster_name below
    clusters = [{'cluster_name' : 'j-xxxxxxxx', 'commands' :
    ['spark-2.4.3_spark-submit']}]
  1. Replace j-xxxxxxxx in the file with the cluster_name.
    To see the name of the cluster, navigate to the Genie Web UI and choose Clusters.
  2. To link the command to a specific Amazon EMR cluster run the following command:
    python /tmp/genie_assets/scripts/genie_link_commands_to_clusters.py Replace-With-Your-Bucket-Name Your-AWS-Region http://replace-with-your-genie-server-url:8080

The command is now linked to a cluster.

In the Genie Web UI, choose the Commands tab. This page shows you the current command resources. Select spark-2.4.3_spark_submit and see the clusters associated with the command.

The following screenshot shows the command details and the clusters it is linked to.

You have configured Genie with all resources; it can now receive job requests.

Running an Apache Airflow workflow

It is out of the scope of this post to provide a detailed description of the workflow code and dataset. This section provides details of how Apache Airflow submits jobs to Genie via a GenieOperator that this post provides.

The GenieOperator for Apache Airflow

The GenieOperator allows the data engineer to define the combination of tags to identify the commands and the clusters in which the tasks should run.

In the following code example, the cluster tag is ‘emr.cluster.role:batch‘ and the command tags are ‘type:spark-submit‘ and ‘version:2.4.3‘.

spark_transform_to_parquet_movies = GenieOperator(
    task_id='transform_to_parquet_movies',
    cluster_tags=['emr.cluster.role:batch'],
    command_tags=['type:spark-submit', 'version:2.4.3'],
    command_arguments="transform_to_parquet.py s3://{}/airflow/demo/input/csv/{}  s3://{}/demo/output/parquet/{}/".format(bucket,'movies.csv',bucket,'movies'), dependencies="s3://{}/airflow/dag_artifacts/transforms/transform_to_parquet.py".format(bucket),
    description='Demo Spark Submit Job',
    job_name="Genie Demo Spark Submit Job",
    tags=['transform_to_parquet_movies'],
    xcom_vars=dict(),
    retries=3,
    dag=extraction_dag)

The property command_arguments defines the arguments to the spark-submit command, and dependencies defines the location of the code for the Apache Spark Application (PySpark).

You can find the code for the GenieOperator in the following location: s3://Your_Bucket_Name/airflow/plugins/genie_plugin.py.

One of the arguments to the DAG is the Genie connection ID (genie_conn_id). This connection was created during the automated setup of the Apache Airflow Instance. To see this and other existing connections, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Connections.

The following screenshot shows the connection details.

The Airflow variable s3_location_genie_demo reference in the DAG was set during the installation process. To see all configured Apache Airflow variables, complete the following steps:

  1. In the Apache Airflow Web UI, choose the Admin
  2. Choose Variables.

The following screenshot shows the Variables page.

Triggering the workflow

You can now trigger the execution of the movie_lens_transfomer_to_parquet DAG. Complete the following steps:

  1. In the Apache Airflow Web UI, choose the DAGs
  2. Next to your DAG, change Off to On.

The following screenshot shows the DAGs page.

For this example DAG, this post uses a small subset of the movielens dataset. This dataset is a popular open source dataset, which you can use in exploring data science algorithms. Each dataset file is a comma-separated values (CSV) file with a single header row. All files are available in your solution S3 bucket under s3://Your_Bucket_Name/airflow/demo/input/csv .

movie_lens_transfomer_to_parquet is a simple workflow that triggers a Spark job that converts input files from CSV to Parquet.

The following screenshot shows a graphical representation of the DAG.

In this example DAG, after transform_to_parquet_movies concludes, you can potentially execute four tasks in parallel. Because the DAG concurrency is set to 3, as seen in the following code example, only three tasks can run at the same time.

# Initialize the DAG
# Concurrency --> Number of tasks allowed to run concurrently
extraction_dag = DAG(dag_name,
          default_args=dag_default_args,
          start_date=start_date,
          schedule_interval=schedule_interval,
          concurrency=3,
          max_active_runs=1
          )

Visiting the Genie job UI

The GenieOperator for Apache Airflow submitted the jobs to Genie. To see job details, in the Genie Web UI, choose the Jobs tab. You can see details such as the jobs submitted, their arguments, the cluster it is running, and the job status.

The following screenshot shows the Jobs page.

You can now experiment with this architecture by provisioning a new Amazon EMR cluster, registering it with a new value (for example, “production”) for Genie tag “emr.cluster.role”, linking the cluster to a command resource, and updating the tag combination in the GenieOperator used by some of the tasks in the DAG.

Cleaning up

To avoid incurring future charges, delete the resources and the S3 bucket created for this post.

Conclusion

This post showed how to deploy an AWS CloudFormation template that sets up a demo environment for Genie, Apache Airflow, and Amazon EMR. It also demonstrated how to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Authors

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

 

 

 

 

Prasad Alle is a Senior Big Data Consultant with AWS Professional Services. He spends his time leading and building scalable, reliable Big data, Machine learning, Artificial Intelligence and IoT solutions for AWS Enterprise and Strategic customers. His interests extend to various technologies such as Advanced Edge Computing, Machine learning at Edge. In his spare time, he enjoys spending time with his family.

Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1

Post Syndicated from Francisco Oliveira original https://aws.amazon.com/blogs/big-data/orchestrate-big-data-workflows-with-apache-airflow-genie-and-amazon-emr-part-1/

Large enterprises running big data ETL workflows on AWS operate at a scale that services many internal end-users and runs thousands of concurrent pipelines. This, together with a continuous need to update and extend the big data platform to keep up with new frameworks and the latest releases of big data processing frameworks, requires an efficient architecture and organizational structure that both simplifies management of the big data platform and promotes easy access to big data applications.

This post introduces an architecture that helps centralized platform teams maintain a big data platform to service thousands of concurrent ETL workflows, and simplifies the operational tasks required to accomplish that.

Architecture components

At high level, the architecture uses two open source technologies with Amazon EMR to provide a big data platform for ETL workflow authoring, orchestration, and execution. Genie provides a centralized REST API for concurrent big data job submission, dynamic job routing, central configuration management, and abstraction of the Amazon EMR clusters. Apache Airflow provides a platform for job orchestration that allows you to programmatically author, schedule, and monitor complex data pipelines. Amazon EMR provides a managed cluster platform that can run and scale Apache Hadoop, Apache Spark, and other big data frameworks.

The following diagram illustrates the architecture.

Apache Airflow

Apache Airflow is an open source tool for authoring and orchestrating big data workflows.

With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that are executed independently. The DAG keeps track of the relationships and dependencies between tasks.

Operators define a template to define a single task in the workflow. Airflow provides operators for common tasks, and you can also define custom operators. This post discusses the custom operator (GenieOperator) to submit tasks to Genie.

A task is a parameterized instance of an operator. After an operator is instantiated, it’s referred to as a task. A task instance represents a specific run of a task. A task instance has an associated DAG, task, and point in time.

You can run DAGs and tasks on demand or schedule them to run at a specific time defined as a cron expression in the DAG.

For additional details on Apache Airflow, see Concepts in the Apache Airflow documentation.

Genie

Genie is an open source tool by Netflix that provides configuration-management capabilities and dynamic routing of jobs by abstracting access to the underlining Amazon EMR clusters.

Genie provides a REST API to submit jobs from big data applications such as Apache Hadoop MapReduce or Apache Spark. Genie manages the metadata of the underlining clusters and the commands and applications that run in the clusters.

Genie abstracts access to the processing clusters by associating one or more tags with the clusters. You can also associate tags with the metadata details for the applications and commands that the big data platform supports. As Genie receives job submissions for specific tags, it uses a combination of the cluster/command tag to route each job to the correct EMR cluster dynamically.

Genie’s data model

Genie provides a data model to capture the metadata associated with resources in your big data environment.

An application resource is a reusable set of binaries, configuration files, and setup files to install and configure applications supported by the big data platform on the Genie node that submits the jobs to the clusters. When Genie receives a job, the Genie node downloads all dependencies, configuration files, and setup files associated with the applications and stores it in a job working directory. Applications are linked to commands because they represent the binaries and configurations needed before a command runs.

Command resources represent the parameters when using the command line to submit work to a cluster and which applications need to be available on the PATH to run the command. Command resources glue metadata components together. For example, a command resource representing a Hive command would include a hive-site.xml and be associated with a set of application resources that provide the Hive and Hadoop binaries needed to run the command. Moreover, a command resource is linked to the clusters it can run on.

A cluster resource identifies the details of an execution cluster, including connection details, cluster status, tags, and additional properties. A cluster resource can register with Genie during startup and deregister during termination automatically. Clusters are linked to one or more commands that can run in it. After a command is linked to a cluster, Genie can start submitting jobs to the cluster.

Lastly, there are three job resource types: job request, job, and job execution. A job request resource represents the request submission with details to run a job. Based on the parameters submitted in the request, a job resource is created. The job resource captures details such as the command, cluster, and applications associated with the job. Additionally, information on status, start time, and end time is also available on the job resource. A job execution resource provides administrative details so you can understand where the job ran.

For more information, see Data Model on the Genie Reference Guide.

Amazon EMR and Amazon S3

Amazon EMR is a managed cluster platform that simplifies running big data frameworks, such as Apache Hadoop and Apache Spark, on AWS to process and analyze vast amounts of data. For more information, see Overview of Amazon EMR Architecture and Overview of Amazon EMR.

Data is stored in Amazon S3, an object storage service with scalable performance, ease-of-use features, and native encryption and access control capabilities. For more details on S3, see Amazon S3 as the Data Lake Storage Platform.

Architecture deep dive

Two main actors interact with this architecture: platform admin engineers and data engineers.

Platform admin engineers have administrator access to all components. They can add or remove clusters, and configure the applications and the commands that the platform supports.

Data engineers focus on writing big data applications with their preferred frameworks (Apache Spark, Apache Hadoop MR, Apache Sqoop, Apache Hive, Apache Pig, and Presto) and authoring python scripts to represent DAGs.

At high level, the team of platform admin engineers prepares the supported big data applications and its dependencies and registers them with Genie. The team of platform admin engineers launches Amazon EMR clusters that register with Genie during startup.

The team of platform admin engineers associates each Genie metadata resource (applications, commands, and clusters) with Genie tags. For example, you can associate a cluster resource with a tag named environment and the value can be “Production Environment”, “Test Environment”, or “Development Environment”.

Data engineers author workflows as Airflow DAGs and use a custom Airflow Operator—GenieOperator—to submit tasks to Genie. They can use a combination of tags to identify the type of tasks they are running plus where the tasks should run. For example, you might need to run Apache Spark 2.4.3 tasks in the environment identified by the “Production Environment” tag. To do this, set the cluster and command tags in the Airflow GenieOperator as the following code:

(cluster_tags=['emr.cluster.environment:production'],command_tags=['type:spark-submit','ver:2.4.3'])

The following diagram illustrates this architecture.

The workflow, as it corresponds to the numbers in this diagram are as follows:

  1. A platform admin engineer prepares the binaries and dependencies of the supported applications (Spark-2.4.5, Spark-2.1.0, Hive-2.3.5, etc.). The platform admin engineer also prepares commands (spark-submit, hive). The platform admin engineer registers applications and commands with Genie. Moreover, the platform admin engineer associates commands with applications and links commands to a set of clusters after step 2 (below) is concluded.
  2. Amazon EMR cluster(s) register with Genie during startup.
  3. A data engineer authors Airflow DAGs and uses the Genie tag to reference the environment, application, command or any combination of the above. In the workflow code, the data engineer uses the GenieOperator. The GenieOperator submits jobs to Genie.
  4. A schedule triggers workflow execution or a data engineer manually triggers the workflow execution. The jobs that compose the workflow are submitted to Genie for execution with a set of Genie tags that specify where the job should be run.
  5. The Genie node, working as the client gateway, will set up a working directory with all binaries and dependencies. Genie dynamically routes the jobs to the cluster(s) associated with the provided Genie tags. The Amazon EMR clusters run the jobs.

For details on the authorization and authentication mechanisms supported by Apache Airflow and Genie see Security in the Apache Airflow documentation and Security in the Genie documentation.  This architecture pattern does not expose SSH access to the Amazon EMR clusters. For details on providing different levels of access to data in Amazon S3 through EMR File System (EMRFS), see Configure IAM Roles for EMRFS Requests to Amazon S3.

Use cases enabled by this architecture

The following use cases demonstrate the capabilities this architecture provides.

Managing upgrades and deployments with no downtime and adopting the latest open source release

In a large organization, teams that use the data platform use heterogeneous frameworks and different versions. You can use this architecture to support upgrades with no downtime and offer the latest version of open source frameworks in a short amount of time.

Genie and Amazon EMR are the key components to enable this use case. As the Amazon EMR service team strives to add the latest version of the open source frameworks running on Amazon EMR in a short release cycle, you can keep up with your internal teams’ needs of the latest features of their preferred open source framework.

When a new version of the open source framework is available, you need to test it, add the new supported version and its dependencies to Genie, and move tags in the old cluster to the new one. The new cluster takes new job submissions, and the old cluster concludes jobs it is still running.

Moreover, because Genie centralizes the location of application binaries and its dependencies, upgrading binaries and dependencies in Genie also upgrades any upstream client automatically. Using Genie removes the need for upgrading all upstream clients.

Managing a centralized configuration, job and cluster status, and logging

In a universe of thousands of jobs and multiple clusters, you need to identify where a specific job is running and access logging details quickly. This architecture gives you visibility into jobs running on the data platform, logging of jobs, clusters, and their configurations.

Having programmatic access to the big data platform

This architecture enables a single point of job submissions by using Genie’s REST API. Access to the underlying cluster is abstracted through a set of APIs that enable administration tasks plus submitting jobs to the clusters. A REST API call submits jobs into Genie asynchronously. If accepted, a job-id is returned that you can use to get job status and outputs programmatically via API or web UI. A Genie node sets up the working directory and runs the job on a separate process.

You can also integrate this architecture with continuous integration and continuous delivery (CI/CD) pipelines for big data application and Apache Airflow DAGs.

Enabling scalable client gateways and concurrent job submissions

The Genie node acts as a client gateway (edge node) and can scale horizontally to make sure the client gateway resources used to submit jobs to the data platform meet demand. Moreover, Genie allows the submission of concurrent jobs.

When to use this architecture

This architecture is recommended for organizations that use multiple large, multi-tenant processing clusters instead of transient clusters. It is out of the scope of this post to address when organizations should consider always-on clusters versus transient clusters (you can use an EMR Airflow Operator to spin up Amazon EMR clusters that register with Genie, run a job, and tear them down). You should use Reserved Instances with this architecture. For more information, see Using Reserved Instances.

This architecture is especially recommended for organizations that choose to have a central platform team to administer and maintain a big data platform that supports many internal teams that require thousands of jobs to run concurrently.

This architecture might not make sense for organizations that are not at as large or don’t expect to grow to that scale. The benefits of cluster abstraction and centralized configuration management are ideal in bringing structured access to a potentially chaotic environment of thousands of concurrent workflows and hundreds of teams.

This architecture is also recommended for organizations that support a high percentage of multi-hour or overlapping workflows and heterogeneous frameworks (Apache Spark, Apache Hive, Apache Pig, Apache Hadoop MapReduce, Apache Sqoop, or Presto).

If your organization relies solely on Apache Spark and is aligned with the recommendations discussed previously, this architecture might still apply. For organizations that don’t have the scale to justify the need for centralized REST API for job submission, cluster abstraction, dynamic job routing, or centralized configuration management, Apache Livy plus Amazon EMR might be the appropriate option. Genie has its own scalable infrastructure that acts as the edge client. This means that Genie does not compete with Amazon EMR master instance resources, whereas Apache Livy does.

If the majority of your organization’s workflows are a few short-lived jobs, opting for a serverless processing layer, serverless ad hoc querying layer, or using dedicated transient Amazon EMR clusters per workflow might be more appropriate. If the majority of your organization’s workflows are composed of thousands of short-lived jobs, the architecture still applies because it removes the need to spin up and down clusters.

This architecture is recommended for organizations that require full control of the processing platform to optimize component performance. Moreover, this architecture is recommended for organizations that need to enforce centralized governance on their workflows via CI/CD pipelines.

It is out of the scope of this post to evaluate different orchestration options or the benefits of adopting Airflow as the orchestration layer. When considering adopting an architecture, also consider the existing skillset and time to adopt tooling. The open source nature of Genie may allow you to integrate other orchestration tools. Evaluating that route might be an option if you wish to adopt this architecture with another orchestration tool.

Conclusion

This post presented how to use Apache Airflow, Genie, and Amazon EMR to manage big data workflows. The post described the architecture components, the use cases the architecture supports, and when to use it. The second part of this post deploys a demo environment and walks you through the steps to configure Genie and use the GenieOperator for Apache Airflow.

 


About the Author

Francisco Oliveira is a senior big data solutions architect with AWS. He focuses on building big data solutions with open source technology and AWS. In his free time, he likes to try new sports, travel and explore national parks.

 

 

 

Jelez Raditchkov is a practice manager with AWS.

Access and manage data from multiple accounts from a central AWS Lake Formation account

Post Syndicated from Shilpa Mehta original https://aws.amazon.com/blogs/big-data/access-and-manage-data-from-multiple-accounts-from-a-central-aws-lake-formation-account/

This post shows how to access and manage data in multiple accounts from a central AWS Lake Formation account. The walkthrough demonstrates a centralized catalog residing in the master Lake Formation account, with data residing in the different accounts.  The post shows how to grant access permissions from the Lake Formation service to read, write and update the catalog and access data in different accounts.

The post uses two datasets of data to determine if there is a cor-relation between the news generated around the world (gdelt) ) and the number of reviews that Amazon’s products received (amazonreviews).

Prerequisites

This walkthrough requires the use of three accounts, each with S3 buckets and their account numbers.

Setting up the Environment

The three accounts are as follows:

  • Account Products (AP) – This is the account in which the Amazon product reviews are stored. This post deploys the configuration using AWS CloudFormation.
  • Account External (AE) – This account monitors the world’s broadcast, print, and web news from around the world in over 100 languages. It identifies the people, locations, organizations, counts, themes, sources, emotions, quotes, images, and events driving global society every second of every day. This post deploys the configuration using AWS CloudFormation.
  • Main Account (MA) – This is the main account, which gathers data from the other two accounts. This post configures Lake Formation in this account. This account has access to the product data and world news account.

The following diagram shows the account architecture.

Account Products

Deploy the following AWS Cloud Formation template in the AP.

This creates an S3 bucket called productsaccountcf-bucketname-1pcfoxar1pxp (templateName-bucket-name-random_string) and a cross account bucket policy in the AP. This policy gives the main account root ID access to this bucket.

  • It uses a Lambda function to download the amazonreviews dataset into the new bucket created.

Make sure you insert the account number of your main account in the datalake-AccountId field. The following screenshot shows that for this post, the MA account number is 1111111111111.

Account External

Deploy the  following AWS Cloud Formation template in the AE.

This creates an S3 bucket called externalaccountcf-resultbucket-12ecq638afqiq (templateName-bucket-name_random_string) and a cross account bucket policy in the AE. This policy gives the main account access to this bucket. Due to the nature of the dataset this template also creates tables in the data catalog. Instead of AWS Glue crawlers, Athena queries are created on the structure of the tables.

The template executes queries in Amazon Athena to download the gdelt dataset, and to create the metadata of the tables that Lake Formation uses.

Make sure you insert the account number of your main account datalakeAccountid field. For this post, the MA account number is 1111111111111.Though the CloudFormation console will show CREATE_COMPLETE, a query is still executing, which you can observe in the Athena console. You can access the Athena console from AWS Management Console. The query that continues to run is creating a new table in the AE with the data in Parquet format so that queries can perform better.

The following screenshot shows your query history and status.

You are now ready to go to Lake Formation in your main account and start configuring.

Registering data stores in Lake Formation

Login to the Lake Formation console in your main account. If it’s the first time you are accessing Lake Formation, you need to add administrators to the account. The user is the account user you have logged into.

To add your data lakes, complete the following steps:

  1. On the Lake Formation console, under Register and ingest, choose Data lake locations. The page displays a list of S3 buckets that are marked as data lake storage resources for Lake Formation. Here, a single S3 bucket may act as the repository for many datasets, or you could use separate buckets for separate data sources. This post registers the S3 buckets in the other accounts and creates a master catalog in Lake Formation
  2. Choose Register location.The following screenshot shows the Data lake locations pane.You can now register both buckets you created in your AP and AE.
  3. In Amazon S3 location, for Amazon S3 path, enter s3://productsaccountcf-bucketname-1pcfoxar1pxp
  4. For IAM role, select You need an IAM role that gives Lake Formation the necessary permissions (GetObject, PutObject, DeleteObject, and ListBucket) to properly use your S3 bucket as a data lake. This default role has the necessary permissions. Alternatively, select a pre-existing IAM role that has required permissions and is configured with lakeformation.amazonaws.com as a trusted entity.
  5. Choose Register location.The following screenshot shows the Amazon S3 location pane.You now have a storage resource and are ready to register the second bucket.
  6. Repeat the previous steps, but in Step 3, register the bucket externalaccountcf-resultbucket-12ecq638afqiq from your AE (888888888888) as s3://externalaccountcf-resultbucket-12ecq638afqiq.

Setting up your IAM role

You need an IAM role that allows Lake Formation to create catalog tables of the datasets in the storage locations. Complete the following steps:

  1. On the AWS console, access IAM and create an IAM role
  2. Attach AWS Glue and AWS Lambda policies as described here.
  3. Edit the trust relationship for the role with the following policy:
    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Sid": "",
          "Effect": "Allow",
          "Principal": {
            "Service": ["glue.amazonaws.com", "lambda.amazonaws.com"]
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }

Add an inline policy  to give the role permissions to S3 to execute Athena queries, AWS Glue and to publish AWS CloudWatch logs as shown in the following screenshot.

Creating a database

Lake Formation maintains a Hive-compatible data catalog within your data lake. Before you can catalog data within your S3 storage backend or use Lake Formation data importers to push data to S3 (which this post discusses later), you must first create a database within your Lake Formation catalog.

A Lake Formation database is a logical construct to which you can later add tables. Each table contains a mapping to one or more objects in S3 that, collectively, represent that table. Tables also contain basic column metadata such as file format, S3 location, and column definitions. Optionally, you can also define arbitrary key-value pairs for tables and columns to better describe the data and act as queryable attributes for data discovery.

You can create one or more databases and populate their tables either manually in the console, programmatically via the AWS SDKs or AWS CLI, or automatically by defining AWS Glue crawlers.

This post defines two logical databases, amazonreviews and gdelt.

  1. On the Lake Formation console, under Data catalog, choose Databases.
  2. Choose Create database.The following screenshot shows the Databases pane.
  3. For Name, enter amazonreviews.
  4. For Location, enter s3://productsaccountcf-bucketname-1pcfoxar1pxp/amazonreviews.
  5. For Description, enter a brief, meaningful description.
  6. Clear Grant All to Everyone for new tables in this database.
  7. Choose Create database.The following screenshot shows the database details.
    1. Create gdelt database. Set Name to gdelt
    2. Set Locationto s3://externalaccountcf-resultbucket-12ecq638afqiq/gdelt
    3. Set Description to a brief, meaningful description like the one shown below
    4. Uncheck the box for Grant All to Everyone for new tables in this database

Granting permissions

You now have your databases and need to grant permissions to the role you created in Lake Formation. You need to configure your IAM users and roles as administrators.

  1. On the Lake Formation console, under Permissions, choose Admins and database creators. The following screenshot shows the Admins and database creators.
  2. Under Permissions choose Data Permissions
  3. From the Actions menu, choose Grant
  4. Select your new IAM role.
  5. For Database permissions, choose Create table and Grant all.
  6. Choose Grant.The following screenshot shows the Grant permissions pane. Repeat the previous steps for the amazonreviews and gdelt databases.
  7. Repeat the previous steps for the amazonreviews and gdelt.The next step is granting your role permissions to the data lakes you created.
  8. From Permissions, choose Data locations.
  9. Choose Grant
  10. Select your new role.The following screenshot shows the Data locations pane.
  11. For IAM users and roles, select your role.
  12. For Storage locations, enter s3://productsaccountcf-bucketname-1pcfoxar1pxp.
  13. Choose Grant.The following screenshot shows the Grant permissions pane.
  14. Repeat the steps for datalake s3://externalaccountcf-resultbucket-12ecq638afqiq/

Setting up your main account

Deploy the following CloudFormation stack in the MA.

This creates tables in the amazonreviews and gdelt databases

From the Actions menu choose Grant. Select the role or users to grant access and select the two check boxes, and choose Grant.

Querying the data

Now that you have the data in the catalog, you can perform queries from the master account with Athena between the datasets in different accounts.

Grant table permissions by completing the following steps:

  1. On the Lake Formation console, under Data catalog, choose Tables.
  2. Choose the table to query.
  3. From the Actions menu, choose Enter your role or user name.The following screenshot shows the Tables pane.
  1. For Table permissions, select Select.
  2. For Grantable permissions, select Alter, Insert, Drop, Delete, Select, and Grant all.The following screenshot shows the Grant permissions pane.
  1. Repeat the previous steps for the events table in the gdeltYou are now ready to query the data.
  1. In Tables, select the events
  2. From the Actions menu, choose View data.The following screenshot shows the Tables pane.
  1. Repeat the previous steps for the events table in the gdeltYou are now ready to query the data.
  1. In Tables, select the events
  2. From the Actions menu, choose View data.
  3. You will be taken to the AWS Athena consoleThe following screenshot shows the Athena console.
  1. Use the Query Editor tab and enter SQL queries for the reviews and events

To query the information by date, standardize the date columns and do aggregations by creating views. In sequential order, run the following queries:

CREATE VIEW amznrevw.aggreviews AS
SELECT count() as reviewcount, star_rating, verified_purchase, from_iso8601_date(review_date) as reviewdate FROM "amznrevw"."reviews2" group by star_rating, verified_purchase, review_date;

CREATE VIEW gdelt.eventsformatted AS
SELECT from_iso8601_date(substr(cast(day as varchar),1,4) || '-' || substr(cast(day as varchar),5,2)||'-' || substr(cast(day as varchar),7,2)) as eventdate, actor1code,actor1name,actor1countrycode, actor2code,actor2name,actor2countrycode FROM "gdelt"."events" ;

CREATE VIEW gdelt.eventsagregated AS
select count() as numevetns, eventdate, (count(distinct actor1code) + count(distinct actor2code)) as numactors from gdelt.eventsformatted group by eventdate;

You are now ready to query. You can determine how many gdelt events were in the five days with the most amount of reviews by performing the following query:

select eventdate, sum(reviewcount) as totalreviews, sum(numevetns) as totalnumevetns from gdelt.eventsagregated as event, amznrevw.aggreviews as review where event.eventdate = review.reviewdate group by eventdate order by totalreviews desc, totalnumevetns desc limit 5;

The following screenshot shows the query results.

January 3, 2015, had the most reviews but not the most gdelt events (833,890).

You can also discover how many reviews where performed in the five days with the most amount of gdelt events by performing the following query:

select eventdate, sum(reviewcount) as totalreviews, sum(numevetns) as totalnumevetns from gdelt.eventsagregated as event, amznrevw.aggreviews as review where event.eventdate = review.reviewdate group by eventdate order by totalnumevetns desc, totalreviews desc limit 5;

The following screenshot shows the query results.

January 25, 2012, had 2 million events but only 378 reviews.

You can also perform a final query to check the correlation between the two with the following query:

SELECT corr(reviewcount,
numevetns) AS review_event_correlation
FROM gdelt.eventsagregated AS event, amznrevw.aggreviews AS review
WHERE event.eventdate = review.reviewdate

The following screenshot shows the query results.

You can likely identify that there is no correlation between the two columns.

Conclusion

This post demonstrated how to set up cross-account access of datastores through a central Lake Formation catalog. The solution walked through creating two S3 buckets in external accounts, downloading some datasets on these buckets, and giving Lake Formation permission to access the data. You also learned how to govern the data in the data lakes from Lake Formation, and how to query the data in the two data lakes using Athena and Glue crawlers.

 


About the Authors


Shilpa Mehta is a Data Lab solutions architect at AWS.
Shilpa helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab.

 

 

 

Laura Caicedo Camacho is a solutions architect at AWS. She works with customers to help them embrace and adopt the cloud.

 

 

 

 

Luis Caro Perez is a solutions architect at AWS.  He works with our customers to provide guidance and technical assistance on their applications, helping them improving the value of their solutions when using AWS.

 

 

 

Best practices to scale Apache Spark jobs and partition data with AWS Glue

Post Syndicated from Mohit Saxena original https://aws.amazon.com/blogs/big-data/best-practices-to-scale-apache-spark-jobs-and-partition-data-with-aws-glue/

AWS Glue provides a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs. This series of posts discusses best practices to help developers of Apache Spark applications and Glue ETL jobs, big data architects, data engineers, and business analysts scale their data processing jobs running on AWS Glue automatically.

The first post of this series discusses two key AWS Glue capabilities to manage the scaling of data processing jobs. The first allows you to horizontally scale out Apache Spark applications for large splittable datasets. The second allows you to vertically scale up memory-intensive Apache Spark applications with the help of new AWS Glue worker types. The post also shows how to use AWS Glue to scale Apache Spark applications with a large number of small files commonly ingested from streaming applications using Amazon Kinesis Data Firehose. Finally, the post shows how AWS Glue jobs can use the partitioning structure of large datasets in Amazon S3 to provide faster execution times for Apache Spark applications.

Understanding AWS Glue worker types

AWS Glue comes with three worker types to help customers select the configuration that meets their job latency and cost requirements. These workers, also known as Data Processing Units (DPUs), come in Standard, G.1X, and G.2X configurations.

The standard worker configuration allocates 5 GB for Spark driver and executor memory, 512 MB for spark.yarn.executor.memoryOverhead, and 50 GB of attached EBS storage. The G.1X worker allocates 10 GB for driver and executor memory, 2 GB memoryOverhead, and 64 GB of attached EBS storage. The G.2X worker allocates 20 GB for driver and executor memory, 4 GB memoryOverhead, and 128 GB of attached EBS storage.

The compute parallelism (Apache Spark tasks per DPU) available for horizontal scaling is the same regardless of the worker type. For example, both standard and G1.X workers map to 1 DPU, each of which can run eight concurrent tasks. A G2.X worker maps to 2 DPUs, which can run 16 concurrent tasks. As a result, compute-intensive AWS Glue jobs that possess a high degree of data parallelism can benefit from horizontal scaling (more standard or G1.X workers). AWS Glue jobs that need high memory or ample disk space to store intermediate shuffle output can benefit from vertical scaling (more G1.X or G2.x workers).

Horizontal scaling for splittable datasets

AWS Glue automatically supports file splitting when reading common native formats (such as CSV and JSON) and modern file formats (such as Parquet and ORC) from S3 using AWS Glue DynamicFrames. For more information about DynamicFrames, see Work with partitioned data in AWS Glue.

A file split is a portion of a file that a Spark task can read and process independently on an AWS Glue worker. By default, file splitting is enabled for line-delimited native formats, which allows Apache Spark jobs running on AWS Glue to parallelize computation across multiple executors. AWS Glue jobs that process large splittable datasets with medium (hundreds of megabytes) or large (several gigabytes) file sizes can benefit from horizontal scaling and run faster by adding more AWS Glue workers.

File splitting also benefits block-based compression formats such as bzip2. You can read each compression block on a file split boundary and process them independently. Unsplittable compression formats such as gzip do not benefit from file splitting. To horizontally scale jobs that read unsplittable files or compression formats, prepare the input datasets with multiple medium-sized files.

 

Each file split (the blue square in the figure) is read from S3, deserialized into an AWS Glue DynamicFrame partition, and then processed by an Apache Spark task (the gear icon in the figure). Deserialized partition sizes can be significantly larger than the on-disk 64 MB file split size, especially for highly compressed splittable file formats such as Parquet or large files using unsplittable compression formats such as gzip. Typically, a deserialized partition is not cached in memory, and only constructed when needed due to Apache Spark’s lazy evaluation of transformations, thus not causing any memory pressure on AWS Glue workers. For more information on lazy evaluation, see the RDD Programming Guide on the Apache Spark website.

However, explicitly caching a partition in memory or spilling it out to local disk in an AWS Glue ETL script or Apache Spark application can result in out-of-memory (OOM) or out-of-disk exceptions. AWS Glue can support such use cases by using larger AWS Glue worker types with vertically scaled-up DPU instances for AWS Glue ETL jobs.

Vertical scaling for Apache Spark jobs using larger worker types

A variety of AWS Glue ETL jobs, Apache Spark applications, and new machine learning (ML) Glue transformations supported with AWS Lake Formation have high memory and disk requirements. Running these workloads may put significant memory pressure on the execution engine. This memory pressure can result in job failures because of OOM or out-of-disk space exceptions. You may see exceptions from Yarn about memory and disk space.

Exceeding Yarn memory overhead

Apache Yarn is responsible for allocating cluster resources needed to run your Spark application. An application includes a Spark driver and multiple executor JVMs. In addition to the memory allocation required to run a job for each executor, Yarn also allocates an extra overhead memory to accommodate for JVM overhead, interned strings, and other metadata that the JVM needs. The configuration parameter spark.yarn.executor.memoryOverhead defaults to 10% of the total executor memory. Memory-intensive operations such as joining large tables or processing datasets with a skew in the distribution of specific column values may exceed the memory threshold, and result in the following error message:

18/06/13 16:54:29 ERROR YarnClusterScheduler: Lost executor 1 on ip-xxx:
Container killed by YARN for exceeding memory limits. 5.5 GB of 5.5 GB physical memory used.
Consider boosting spark.yarn.executor.memoryOverhead.

Disk space

Apache Spark uses local disk on Glue workers to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. During the sort or shuffle stages of a job, Spark writes intermediate data to local disk before it can exchange that data between the different workers. Jobs may fail due to the following exception when no disk space remains:

java.io.IOException: No space left on device
UnsafeExternalSorter: Thread 20 spilling sort data of 141.0 MB to disk (90 times so far)

AWS Glue job metrics

Most commonly, this is a result of a significant skew in the dataset that the job is processing. You can also identify the skew by monitoring the execution timeline of different Apache Spark executors using AWS Glue job metrics. For more information, see Debugging Demanding Stages and Straggler Tasks.

The following AWS Glue job metrics graph shows the execution timeline and memory profile of different executors in an AWS Glue ETL job. One of the executors (the red line) is straggling due to processing of a large partition, and actively consumes memory for the majority of the job’s duration.

With AWS Glue’s Vertical Scaling feature, memory-intensive Apache Spark jobs can use AWS Glue workers with higher memory and larger disk space to help overcome these two common failures. Using AWS Glue job metrics, you can also debug OOM and determine the ideal worker type for your job by inspecting the memory usage of the driver and executors for a running job. For more information, see Debugging OOM Exceptions and Job Abnormalities.

In general, jobs that run memory-intensive operations can benefit from the G1.X worker type, and those that use AWS Glue’s ML transforms or similar ML workloads can benefit from the G2.X worker type.

Apache Spark UI for AWS Glue jobs

You can also use AWS Glue’s support for Spark UI to inpect and scale your AWS Glue ETL job by visualizing the Directed Acyclic Graph (DAG) of Spark’s execution, and also monitor demanding stages, large shuffles, and inspect Spark SQL query plans. For more information, see Monitoring Jobs Using the Apache Spark Web UI.

The following Spark SQL query plan on the Spark UI shows the DAG for an ETL job that reads two tables from S3, performs an outer-join that results in a Spark shuffle, and writes the result to S3 in Parquet format.

As seen from the plan, the Spark shuffle and subsequent sort operation for the join transformation takes the majority of the job execution time. With AWS Glue vertical scaling, each AWS Glue worker co-locates more Spark tasks, thereby saving on the number of data exchanges over the network.

Scaling to handle large numbers of small files

An AWS Glue ETL job might read thousands or millions of files from S3. This is typical for Kinesis Data Firehose or streaming applications writing data into S3. The Apache Spark driver may run out of memory when attempting to read a large number of files. When this happens, you see the following error message:

# java.lang.OutOfMemoryError: Java heap space
# -XX:OnOutOfMemoryError="kill -9 %p"
# Executing /bin/sh -c "kill -9 12039"...

Apache Spark v2.2 can manage approximately 650,000 files on the standard AWS Glue worker type. To handle more files, AWS Glue provides the option to read input files in larger groups per Spark task for each AWS Glue worker. For more information, see Reading Input Files in Larger Groups.

You can reduce the excessive parallelism from the launch of one Apache Spark task to process each file by using AWS Glue file grouping. This method reduces the chances of an OOM exception on the Spark driver. To configure file grouping, you need to set groupFiles and groupSize parameters. The following code example uses AWS Glue DynamicFrame API in an ETL script with these parameters:

dyf = glueContext.create_dynamic_frame_from_options("s3",
    {'paths': ["s3://input-s3-path/"],
    'recurse':True,
    'groupFiles': 'inPartition',
    'groupSize': '1048576'}, 
    format="json")

You can set groupFiles to group files within a Hive-style S3 partition (inPartition) or across S3 partitions (acrossPartition). In most scenarios, grouping within a partition is sufficient to reduce the number of concurrent Spark tasks and the memory footprint of the Spark driver. In benchmarks, AWS Glue ETL jobs configured with the inPartition grouping option were approximately seven times faster than native Apache Spark v2.2 when processing 320,000 small JSON files distributed across 160 different S3 partitions. A large fraction of the time in Apache Spark is spent building an in-memory index while listing S3 files and scheduling a large number of short-running tasks to process each file. With AWS Glue grouping enabled, the benchmark AWS Glue ETL job could process more than 1 million files using the standard AWS Glue worker type.

groupSize is an optional field that allows you to configure the amount of data each Spark task reads and processes as a single AWS Glue DynamicFrame partition. Users can set groupSize if they know the distribution of file sizes before running the job. The groupSize parameter allows you to control the number of AWS Glue DynamicFrame partitions, which also translates into the number of output files. However, using a considerably small or large groupSize can result in significant task parallelism or under-utilization of the cluster, respectively.

By default, AWS Glue automatically enables grouping without any manual configuration when the number of input files or task parallelism exceeds a threshold of 50,000. The default value of the groupFiles parameter is inPartition, so that each Spark task only reads files within the same S3 partition. AWS Glue computes the groupSize parameter automatically and configures it to reduce the excessive parallelism, and makes use of the cluster compute resources with sufficient Spark tasks running in parallel.

Partitioning data and pushdown predicates

Partitioning has emerged as an important technique for organizing datasets so that a variety of big data systems can query them efficiently. A hierarchical directory structure organizes the data, based on the distinct values of one or more columns. For example, you can partition your application logs in S3 by date, broken down by year, month, and day. Files corresponding to a single day’s worth of data receive a prefix such as the following:

s3://my_bucket/logs/year=2018/month=01/day=23/

Predicate pushdowns for partition columns

AWS Glue supports pushing down predicates, which define a filter criteria for partition columns populated for a table in the AWS Glue Data Catalog. Instead of reading all the data and filtering results at execution time, you can supply a SQL predicate in the form of a WHERE clause on the partition column. For example, assume the table is partitioned by the year column and run SELECT * FROM table WHERE year = 2019. year represents the partition column and 2019 represents the filter criteria.

AWS Glue lists and reads only the files from S3 partitions that satisfy the predicate and are necessary for processing.

To accomplish this, specify a predicate using the Spark SQL expression language as an additional parameter to the AWS Glue DynamicFrame getCatalogSource method. This predicate can be any SQL expression or user-defined function that evaluates to a Boolean, as long as it uses only the partition columns for filtering.

This example demonstrates this functionality with a dataset of Github events partitioned by year, month, and day. The following code example reads only those S3 partitions related to events that occurred on weekends:

%spark

val partitionPredicate ="date_format(to_date(concat(year, '-', month, '-', day)), 'E') in ('Sat', 'Sun')"

val pushdownEvents = glueContext.getCatalogSource(
   database = "githubarchive_month",
   tableName = "data",
   pushDownPredicate = partitionPredicate).getDynamicFrame()

Here you can use the SparkSQL string concat function to construct a date string. The to_date function converts it to a date object, and the date_format function with the ‘E’ pattern converts the date to a three-character day of the week (for example, Mon or Tue). For more information about these functions, Spark SQL expressions, and user-defined functions in general, see the Spark SQL, DataFrames and Datasets Guide and list of functions on the Apache Spark website.

There is a significant performance boost for AWS Glue ETL jobs when pruning AWS Glue Data Catalog partitions. It reduces the time needed for the Spark query engine for listing files in S3 and reading and processing data at runtime. You can achieve further improvement as you exclude additional partitions by using predicates with higher selectivity.

Partitioning data before and during writes to S3

By default, data is not partitioned when writing out the results from an AWS Glue DynamicFrame—all output files are written at the top level under the specified output path. AWS Glue enables partitioning of DynamicFrame results by passing the partitionKeys option when creating a sink. For example, the following code example writes out the dataset in Parquet format to S3 partitioned by the type column:

%spark

glueContext.getSinkWithFormat(
    connectionType = "s3",
    options = JsonOptions(Map("path" -> "$outpath", "partitionKeys" -> Seq("type"))),
    format = "parquet").writeDynamicFrame(projectedEvents)

In this example, $outpath is a placeholder for the base output path in S3. The partitionKeys parameter corresponds to the names of the columns used to partition the output in S3. When you execute the write operation, it removes the type column from the individual records and encodes it in the directory structure. To demonstrate this, you can list the output path using the following aws s3 ls command from the AWS CLI:

PRE type=CommitCommentEvent/
PRE type=CreateEvent/
PRE type=DeleteEvent/
PRE type=ForkEvent/
PRE type=GollumEvent/
PRE type=IssueCommentEvent/
PRE type=IssuesEvent/
PRE type=MemberEvent/
PRE type=PublicEvent/
PRE type=PullRequestEvent/
PRE type=PullRequestReviewCommentEvent/
PRE type=PushEvent/
PRE type=ReleaseEvent/
PRE type=WatchEvent/

For more information, see aws . s3 . ls in the AWS CLI Command Reference.

In general, you should select columns for partitionKeys that are of lower cardinality and are most commonly used to filter or group query results. For example, when analyzing AWS CloudTrail logs, it is common to look for events that happened between a range of dates. Therefore, partitioning the CloudTrail data by year, month, and day would improve query performance and reduce the amount of data that you need to scan to return the answer.

The benefit of output partitioning is two-fold. First, it improves execution time for end-user queries. Second, having an appropriate partitioning scheme helps avoid costly Spark shuffle operations in downstream AWS Glue ETL jobs when combining multiple jobs into a data pipeline. For more information, see Working with partitioned data in AWS Glue.

S3 or Hive-style partitions are different from Spark RDD or DynamicFrame partitions. Spark partitioning is related to how Spark or AWS Glue breaks up a large dataset into smaller and more manageable chunks to read and apply transformations in parallel. AWS Glue workers manage this type of partitioning in memory. You can control Spark partitions further by using the repartition or coalesce functions on DynamicFrames at any point during a job’s execution and before data is written to S3. You can set the number of partitions using the repartition function either by explicitly specifying the total number of partitions or by selecting the columns to partition the data.

Repartitioning a dataset by using the repartition or coalesce functions often results in AWS Glue workers exchanging (shuffling) data, which can impact job runtime and increase memory pressure. In contrast, writing data to S3 with Hive-style partitioning does not require any data shuffle and only sorts it locally on each of the worker nodes. The number of output files in S3 without Hive-style partitioning roughly corresponds to the number of Spark partitions. In contrast, the number of output files in S3 with Hive-style partitioning can vary based on the distribution of partition keys on each AWS Glue worker.

Conclusion

This post showed how to scale your ETL jobs and Apache Spark applications on AWS Glue for both compute and memory-intensive jobs. AWS Glue enables faster job execution times and efficient memory management by using the parallelism of the dataset and different types of AWS Glue workers. It also helps you overcome the challenges of processing many small files by automatically adjusting the parallelism of the workload and cluster. AWS Glue ETL jobs use the AWS Glue Data Catalog and enable seamless partition pruning using predicate pushdowns. It also allows for efficient partitioning of datasets in S3 for faster queries by downstream Apache Spark applications and other analytics engines such as Amazon Athena and Amazon Redshift. We hope you try out these best practices for your Apache Spark applications on AWS Glue.

The second post in this series will show how to use AWS Glue features to batch process large historical datasets and incrementally process deltas in S3 data lakes. It also demonstrates how to use a custom AWS Glue Parquet writer for faster job execution.

 


About the Author

Mohit Saxena is a technical lead at AWS Glue. His passion is building scalable distributed systems for efficiently managing data on cloud. He also enjoys watching movies, and reading about the latest technology.

 

 

 

Orchestrate Amazon Redshift-Based ETL workflows with AWS Step Functions and AWS Glue

Post Syndicated from Ben Romano original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-redshift-based-etl-workflows-with-aws-step-functions-and-aws-glue/

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud that offers fast query performance using the same SQL-based tools and business intelligence applications that you use today. Many customers also like to use Amazon Redshift as an extract, transform, and load (ETL) engine to use existing SQL developer skillsets, to quickly migrate pre-existing SQL-based ETL scripts, and—because Amazon Redshift is fully ACID-compliant—as an efficient mechanism to merge change data from source data systems.

In this post, I show how to use AWS Step Functions and AWS Glue Python Shell to orchestrate tasks for those Amazon Redshift-based ETL workflows in a completely serverless fashion. AWS Glue Python Shell is a Python runtime environment for running small to medium-sized ETL tasks, such as submitting SQL queries and waiting for a response. Step Functions lets you coordinate multiple AWS services into workflows so you can easily run and monitor a series of ETL tasks. Both AWS Glue Python Shell and Step Functions are serverless, allowing you to automatically run and scale them in response to events you define, rather than requiring you to provision, scale, and manage servers.

While many traditional SQL-based workflows use internal database constructs like triggers and stored procedures, separating workflow orchestration, task, and compute engine components into standalone services allows you to develop, optimize, and even reuse each component independently. So, while this post uses Amazon Redshift as an example, my aim is to more generally show you how to orchestrate any SQL-based ETL.

Prerequisites

If you want to follow along with the examples in this post using your own AWS account, you need a Virtual Private Cloud (VPC) with at least two private subnets that have routes to an S3 VPC endpoint.

If you don’t have a VPC, or are unsure if yours meets these requirements, I provide an AWS CloudFormation template stack you can launch by selecting the following button. Provide a stack name on the first page and leave the default settings for everything else. Wait for the stack to display Create Complete (this should only take a few minutes) before moving on to the other sections.

Scenario

For the examples in this post, I use the Amazon Customer Reviews Dataset to build an ETL workflow that completes the following two tasks which represent a simple ETL process.

  • Task 1: Move a copy of the dataset containing reviews from the year 2015 and later from S3 to an Amazon Redshift table.
  • Task 2: Generate a set of output files to another Amazon S3 location which identifies the “most helpful” reviews by market and product category, allowing an analytics team to glean information about high quality reviews.

This dataset is publicly available via an Amazon Simple Storage Service (Amazon S3) bucket. Complete the following tasks to get set up.

Solution overview

The following diagram highlights the solution architecture from end to end:

The steps in this process are as follows:

  1. The state machine launches a series of runs of an AWS Glue Python Shell job (more on how and why I use a single job later in this post!) with parameters for retrieving database connection information from AWS Secrets Manager and an .sql file from S3.
  2. Each run of the AWS Glue Python Shell job uses the database connection information to connect to the Amazon Redshift cluster and submit the queries contained in the .sql file.
    1. For Task 1: The cluster utilizes Amazon Redshift Spectrum to read data from S3 and load it into an Amazon Redshift table. Amazon Redshift Spectrum is commonly used as an means for loading data to Amazon Redshift. (See Step 7 of Twelve Best Practices for Amazon Redshift Spectrum for more information.)
    2. For Task 2: The cluster executes an aggregation query and exports the results to another Amazon S3 location via UNLOAD.
  3. The state machine may send a notification to an Amazon Simple Notification Service (SNS) topic in the case of pipeline failure.
  4. Users can query the data from the cluster and/or retrieve report output files directly from S3.

I include an AWS CloudFormation template to jumpstart the ETL environment so that I can focus this post on the steps dedicated to building the task and orchestration components. The template launches the following resources:

  • Amazon Redshift Cluster
  • Secrets Manager secret for storing Amazon Redshift cluster information and credentials
  • S3 Bucket preloaded with Python scripts and .sql files
  • Identity and Access Management (IAM) Role for AWS Glue Python Shell jobs

See the following resources for how to complete these steps manually:

Be sure to select at least two private subnets and the corresponding VPC, as shown in the following screenshot. If you are using the VPC template from above, the VPC appears as 10.71.0.0/16 and the subnet names are A private and B private.

The stack should take 10-15 minutes to launch. Once it displays Create Complete, you can move on to the next section. Be sure to take note of the Resources tab in the AWS CloudFormation console, shown in the following screenshot, as I refer to these resources throughout the post.

Building with AWS Glue Python Shell

Begin by navigating to AWS Glue in the AWS Management Console.

Making a connection

Amazon Redshift cluster resides in a VPC, so you first need to create a connection using AWS Glue. Connections contain properties, including VPC networking information, needed to access your data stores. You eventually attach this connection to your Glue Python Shell Job so that it can reach your Amazon Redshift cluster.

Select Connections from the menu bar, and then select Add connection. Give your connection a name like blog_rs_connection,  select Amazon Redshift as the Connection type, and then select Next, as shown in the following screenshot.

Under Cluster, enter the name of the cluster that the AWS CloudFormation template launched, i.e blogstack-redshiftcluster-####. Because the Python code I provide for this blog already handles credential retrieval, the rest of the values around database information you enter here are largely placeholders. The key information you are associating with the connection is networking-related.

Please note that you are not able to test the connection without the correct cluster information.  If you are interested in doing so, note that Database name and Username are auto-populated after selecting the correct cluster, as shown in the following screenshot. Follow the instructions here to retrieve the password information from Secrets Manager to copy into the Password field.

ETL code review

Take a look at the two main Python scripts used in this example:

Pygresql_redshift_common.py is a set of functions that can retrieve cluster connection information and credentials from Secrets Manger, make a connection to the cluster, and submit queries respectively. By retrieving cluster information at runtime via a passed parameter, these functions allow the job to connect to any cluster to which it has access. You can package these functions into a library by following the instructions to create a python .egg file (already completed as a part of the AWS CloudFormation template launch). Note that AWS Glue Python Shell supports several python libraries natively.

import pg
import boto3
import base64
from botocore.exceptions import ClientError
import json

#uses session manager name to return connection and credential information
def connection_info(db):

	session = boto3.session.Session()
	client = session.client(
		service_name='secretsmanager'
	)

	get_secret_value_response = client.get_secret_value(SecretId=db)

	if 'SecretString' in get_secret_value_response:
		secret = json.loads(get_secret_value_response['SecretString'])
	else:
		secret = json.loads(base64.b64decode(get_secret_value_response['SecretBinary']))
		
	return secret


#creates a connection to the cluster
def get_connection(db,db_creds):

	con_params = connection_info(db_creds)
	
	rs_conn_string = "host=%s port=%s dbname=%s user=%s password=%s" % (con_params['host'], con_params['port'], db, con_params['username'], con_params['password'])
	rs_conn = pg.connect(dbname=rs_conn_string)
	rs_conn.query("set statement_timeout = 1200000")
	
	return rs_conn


#submits a query to the cluster
def query(con,statement):
    res = con.query(statement)
    return res

The AWS Glue Python Shell job runs rs_query.py when called. It starts by parsing job arguments that are passed at invocation. It uses some of those arguments to retrieve a .sql file from S3, then connects and submits the statements within the file to the cluster using the functions from pygresql_redshift_common.py. So, in addition to connecting to any cluster using the Python library you just packaged, it can also retrieve and run any SQL statement. This means you can manage a single AWS Glue Python Shell job for all of your Amazon Redshift-based ETL by simply passing in parameters on where it should connect and what it should submit to complete each task in your pipeline.

from redshift_module import pygresql_redshift_common as rs_common
import sys
from awsglue.utils import getResolvedOptions
import boto3

#get job args
args = getResolvedOptions(sys.argv,['db','db_creds','bucket','file'])
db = args['db']
db_creds = args['db_creds']
bucket = args['bucket']
file = args['file']

#get sql statements
s3 = boto3.client('s3') 
sqls = s3.get_object(Bucket=bucket, Key=file)['Body'].read().decode('utf-8')
sqls = sqls.split(';')

#get database connection
print('connecting...')
con = rs_common.get_connection(db,db_creds)

#run each sql statement
print("connected...running query...")
results = []
for sql in sqls[:-1]:
    sql = sql + ';'
    result = rs_common.query(con, sql)
    print(result)
    results.append(result)

print(results)

Creating the Glue Python Shell Job

Next, put that code into action:

  1. Navigate to Jobs on the left menu of the AWS Glue console page and from there, select Add job.
  2. Give the job a name like blog_rs_query.
  3. For the IAM role, select the same GlueExecutionRole you previously noted from the Resources section of the AWS CloudFormation console.
  4. For Type, select Python shell, leave Python version as the default of Python 3, and for This job runs select An existing script that you provide.
  5. For S3 path where the script is stored, navigate to the script bucket created by the AWS CloudFormation template (look for ScriptBucket in the Resources), then select the python/py file.
  6. Expand the Security configuration, script libraries, and job parameters section to add the Python .egg file with the Amazon Redshift connection library to the Python library path. It is also located in the script bucket under python /redshift_module-0.1-py3.6.egg.

When all is said and done everything should look as it does in the following screenshot:

Choose Next. Add the connection you created by choosing Select to move it under Required connections. (Recall from the Making a connection section that this gives the job the ability to interact with your VPC.) Choose Save job and edit script to finish, as shown in the following screenshot.

Test driving the Python Shell job

After creating the job, you are taken to the AWS Glue Python Shell IDE. If everything went well, you should see the rs_query.py code. Right now, the Amazon Redshift cluster is sitting there empty, so use the Python code to run the following SQL statements and populate it with tables.

  1. Create an external database (amzreviews).
  2. Create an external table (reviews) from which Amazon Redshift Spectrum can read from the source data in S3 (the public reviews dataset). The table is partitioned by product_category because the source files are organized by category, but in general you should partition on frequently filtered columns (see #4).
  3. Add partitions to the external table.
  4. Create an internal table (reviews) local to the Amazon Redshift cluster. product_id works well as a DISTKEY because it has high cardinality, even distribution, and most likely (although not explicitly part of this blog’s scenario) a column that will be used to join with other tables. I choose review_date as a SORTKEY to efficiently filter out review data that is not part of my target query (after 2015). Learn more about how to best choose DISTKEY/SORTKEY as well as additional table design parameters for optimizing performance by reading the Designing Tables documentation.
    CREATE EXTERNAL SCHEMA amzreviews 
    from data catalog
    database 'amzreviews'
    iam_role 'rolearn'
    CREATE EXTERNAL database IF NOT EXISTS;
    
    
    
    CREATE EXTERNAL TABLE amzreviews.reviews(
      marketplace varchar(10), 
      customer_id varchar(15), 
      review_id varchar(15), 
      product_id varchar(25), 
      product_parent varchar(15), 
      product_title varchar(50), 
      star_rating int, 
      helpful_votes int, 
      total_votes int, 
      vine varchar(5), 
      verified_purchase varchar(5), 
      review_headline varchar(25), 
      review_body varchar(1024), 
      review_date date, 
      year int)
    PARTITIONED BY ( 
      product_category varchar(25))
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    LOCATION
      's3://amazon-reviews-pds/parquet/';
      
      
      
    ALTER TABLE amzreviews.reviews ADD
    partition(product_category='Apparel') 
    location 's3://amazon-reviews-pds/parquet/product_category=Apparel/'
    partition(product_category='Automotive') 
    location 's3://amazon-reviews-pds/parquet/product_category=Automotive'
    partition(product_category='Baby') 
    location 's3://amazon-reviews-pds/parquet/product_category=Baby'
    partition(product_category='Beauty') 
    location 's3://amazon-reviews-pds/parquet/product_category=Beauty'
    partition(product_category='Books') 
    location 's3://amazon-reviews-pds/parquet/product_category=Books'
    partition(product_category='Camera') 
    location 's3://amazon-reviews-pds/parquet/product_category=Camera'
    partition(product_category='Grocery') 
    location 's3://amazon-reviews-pds/parquet/product_category=Grocery'
    partition(product_category='Furniture') 
    location 's3://amazon-reviews-pds/parquet/product_category=Furniture'
    partition(product_category='Watches') 
    location 's3://amazon-reviews-pds/parquet/product_category=Watches'
    partition(product_category='Lawn_and_Garden') 
    location 's3://amazon-reviews-pds/parquet/product_category=Lawn_and_Garden';
    
    
    CREATE TABLE reviews(
      marketplace varchar(10),
      customer_id varchar(15), 
      review_id varchar(15), 
      product_id varchar(25) DISTKEY, 
      product_parent varchar(15), 
      product_title varchar(50), 
      star_rating int, 
      helpful_votes int, 
      total_votes int, 
      vine varchar(5), 
      verified_purchase varchar(5), 
      review_date date, 
      year int,
      product_category varchar(25))
      
      SORTKEY (
         review_date
        );

Do this first job run manually so you can see where all of the elements I’ve discussed come into play. Select Run Job at the top of the IDE screen. Expand the Security configuration, script libraries, and job parameters section. This is where you add in the parameters as key-value pairs, as shown in the following screenshot.

KeyValue
–dbreviews
–db_credsreviewssecret
–bucket<name of s3 script bucket>
–filesql/reviewsschema.sql

Select Run job to start it. The job should take a few seconds to complete. You can look for log outputs below the code in the IDE to watch job progress.

Once the job completes, navigate to Databases in the AWS Glue console and look for the amzreviews database and reviews table, as shown in the following screenshot. If they are there, then everything worked as planned! You can also connect to your Amazon Redshift cluster using the Redshift Query Editor or with your own SQL client tool and look for the local reviews table.

Step Functions Orchestration

Now that you’ve had a chance to run a job manually, it’s time to move onto something more programmatic that is orchestrated by Step Functions.

Launch Template

I provide a third AWS CloudFormation template for kickstarting this process as well. It creates a Step Functions state machine that calls two instances of the AWS Glue Python Shell job you just created to complete the two tasks I outlined at the beginning of this post.

For BucketName, paste the name of the script bucket created in the second AWS CloudFormation stack. For GlueJobName, type in the name of the job you just created. Leave the other information as default, as shown in the following screenshot. Launch the stack and wait for it to display Create Complete—this should take only a couple of minutes—before moving on to the next section.

Working with the Step Functions State Machine

State Machines are made up of a series of steps, allowing you to stitch together services into robust ETL workflows. You can monitor each step of execution as it happens, which means you can identify and fix problems in your ETL workflow quickly, and even automatically.

Take a look at the state machine you just launched to get a better idea. Navigate to Step Functions in the AWS Console and look for a state machine with a name like GlueJobStateMachine-######. Choose Edit to view the state machine configuration, as shown in the following screenshot.

It should look as it does in the following screenshot:

As you can see, state machines are created using JSON templates made up of task definitions and workflow logic. You can run parallel tasks, catch errors, and even pause workflows and wait for manual callback to continue. The example I provide contains two tasks for running the SQL statements that complete the goals I outlined at the beginning of the post:

  1. Load data from S3 using Redshift Spectrum
  2. Transform and writing data back to S3

Each task contains basic error handling which, if caught, routes the workflow to an error notification task. This example is a simple one to show you how to build a basic workflow, but you can refer to the Step Functions documentation for an example of more complex workflows to help build a robust ETL pipeline. Step Functions also supports reusing modular components with Nested Workflows.

SQL Review

The state machine will retrieve and run the following SQL statements:

INSERT INTO reviews
SELECT marketplace, customer_id, review_id, product_id, product_parent, product_title, star_rating, helpful_votes, total_votes, vine, verified_purchase, review_date, year, product_category
FROM amzreviews.reviews
WHERE year > 2015;

As I mentioned previously, Amazon Redshift Spectrum is a great way to run ETL using an INSERT INTO statement. This example is a simple load of the data as it is in S3, but keep in mind you can add more complex SQL statements to transform your data prior to loading.

UNLOAD ('SELECT marketplace, product_category, product_title, review_id, helpful_votes, AVG(star_rating) as average_stars FROM reviews GROUP BY marketplace, product_category, product_title, review_id, helpful_votes ORDER BY helpful_votes DESC, average_stars DESC')
TO 's3://bucket/testunload/'
iam_role 'rolearn';

This statement groups reviews by product, ordered by number of helpful votes, and writes to Amazon S3 using UNLOAD.

State Machine execution

Now that everything is in order, start an execution. From the state machine main page select Start an Execution.

Leave the defaults as they are and select Start to begin execution. Once execution begins you are taken to a visual workflow interface where you can follow the execution progress, as shown in the following screenshot.

Each of the queries takes a few minutes to run. In the meantime, you can watch the Amazon Redshift query logs to track the query progress in real time. These can be found by navigating to Amazon Redshift in the AWS Console, selecting your Amazon Redshift cluster, and then selecting the Queries tab, as shown in the following screenshot.

Once you see COMPLETED for both queries, navigate back to the state machine execution. You should see success for each of the states, as shown in the following screenshot.

Next, navigate to the data bucket in the S3 AWS Console page (refer to the DataBucket in the CloudFormation Resources tab). If all went as planned, you should see a folder named testunload in the bucket with the unloaded data, as shown in the following screenshot.

Inject Failure into Step Functions State Machine

Next, test the error handling component of the state machine by intentionally causing an error. An easy way to do this is to edit the state machine and misspell the name of the Secrets Manager secret in the ReadFilterJob task, as shown in the following screenshot.

If you want the error output sent to you, optionally subscribe to the error notification SNS Topic. Start another state machine execution as you did previously. This time the workflow should take the path toward the NotifyFailure task, as shown in the following screenshot. If you subscribed to the SNS Topic associated with it, you should receive a message shortly thereafter.

The state machine logs will show the error in more detail, as shown in the following screenshot.

Conclusion

In this post I demonstrated how you can orchestrate Amazon Redshift-based ETL using serverless AWS Step Functions and AWS Glue Python Shells jobs. As I mentioned in the introduction, the concepts can also be more generally applied to other SQL-based ETL, so use them to start building your own SQL-based ETL pipelines today!

 


About the Author

Ben Romano is a Data Lab solution architect at AWS. Ben helps our customers architect and build data and analytics prototypes in just four days in the AWS Data Lab.

 

 

 

 

Install Python libraries on a running cluster with EMR Notebooks

Post Syndicated from Parag Chaudhari original https://aws.amazon.com/blogs/big-data/install-python-libraries-on-a-running-cluster-with-emr-notebooks/

Last year, AWS introduced EMR Notebooks, a managed notebook environment based on the open-source Jupyter notebook application.

This post discusses installing notebook-scoped libraries on a running cluster directly via an EMR Notebook. Before this feature, you had to rely on bootstrap actions or use custom AMI to install additional libraries that are not pre-packaged with the EMR AMI when you provision the cluster. This post also discusses how to use the pre-installed Python libraries available locally within EMR Notebooks to analyze and plot your results. This capability is useful in scenarios in which you don’t have access to a PyPI repository but need to analyze and visualize a dataset.

Benefits of using notebook-scoped libraries with EMR Notebooks

Notebook-scoped libraries provide you the following benefits:

  • Runtime installation – You can import your favorite Python libraries from PyPI repositories and install them on your remote cluster on the fly when you need them. These libraries are instantly available to your Spark runtime environment. There is no need to restart the notebook session or recreate your cluster.
  • Dependency isolation – The libraries you install using EMR Notebooks are isolated to your notebook session and don’t interfere with bootstrapped cluster libraries or libraries installed from other notebook sessions. These notebook-scoped libraries take precedence over bootstrapped libraries. Multiple notebook users can import their preferred version of the library and use it without dependency clashes on the same cluster.
  • Portable library environment – The library package installation happens from your notebook file. This allows you to recreate the library environment when you switch the notebook to a different cluster by re-executing the notebook code. At the end of the notebook session, the libraries you install through EMR Notebooks are automatically removed from the hosting EMR cluster.

Prerequisites

To use this feature in EMR Notebooks, you need a notebook attached to a cluster running EMR release 5.26.0 or later. The cluster should have access to the public or private PyPI repository from which you want to import the libraries. For more information, see Creating a Notebook.

There are different ways to configure your VPC networking to allow clusters inside the VPC to connect to an external repository. For more information, see Scenarios and Examples in the Amazon VPC User Guide.

Using notebook-scoped libraries

This post demonstrates the notebook-scoped libraries feature of EMR Notebooks by analyzing the publicly available Amazon customer reviews dataset for books. For more information, see Amazon Customer Reviews Dataset on the Registry of Open Data for AWS.

Open your notebook and make sure the kernel is set to PySpark. Run the following command from the notebook cell:

print("Welcome to my EMR Notebook!")

You get the following output:

Output shows newly created spark session.

You can examine the current notebook session configuration by running the following command:

%%info

You get the following output:

Output shows spark session properties which include Python version and properties to enable this new feature.

The notebook session is configured for Python 3 by default (through spark.pyspark.python). If you prefer to use Python 2, reconfigure your notebook session by running the following command from your notebook cell:

%%configure -f { "conf":{ "spark.pyspark.python": "python", 
                "spark.pyspark.virtualenv.enabled": "true", 
                "spark.pyspark.virtualenv.type":"native", 
                "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv" }}

You can also verify the Python version used in your current notebook session by running the following code:

import sys
sys.version

You get the following output:

Output shows Python version 3.6.8

Before starting your analysis, check the libraries that are already available on the cluster. You can do this using the list_packages() PySpark API, which lists all the Python libraries on the cluster. Run the following code:

sc.list_packages()

You get an output similar to the following code, which shows all the available Python 3-compatible packages on your cluster:

Output shows list of Python packages along with their versions.

Load the Amazon customer reviews data for books into a Spark DataFrame with the following code:

df = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')

You are now ready to explore the data. Determine the schema and number of available columns in your dataset with the following code:

# Total columns
print(f'Total Columns: {len(df.dtypes)}')
df.printSchema()

The following code is the output:

Output shows that Spark DataFrame has 15 columns. It also shows the schema of the Spark DataFrame.

This dataset has a total of 15 columns. You can also check the total rows in your dataset by running the following code:

# Total row
print(f'Total Rows: {df.count():,}')

You get the following output:

Output shows that Spark DataFrame has more than 20 million rows.

Check the total number of books with the following code:

# Total number of books
num_of_books = df.select('product_id').distinct().count()
print(f'Number of Books: {num_of_books:,}')

You get the following output:

Output shows that there are more than 3 million books.

You can also analyze the number of book reviews by year and find the distribution of customer ratings. To do this, import the Pandas library version 0.25.1 and the latest Matplotlib library from the public PyPI repository. Install them on the cluster attached to your notebook using the install_pypi_package API. See the following code:

sc.install_pypi_package("pandas==0.25.1") #Install pandas version 0.25.1 
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") #Install matplotlib from given PyPI repository

You get the following output:

Output shows that “pandas” and “matplotlib” packages are successfully installed.

The install_pypi_package PySpark API installs your libraries along with any associated dependencies. By default, it installs the latest version of the library that is compatible with the Python version you are using. You can also install a specific version of the library by specifying the library version from the previous Pandas example.

Verify that your imported packages successfully installed by running the following code:

sc.list_packages()

You get the following output:

Output shows list of Python packages along with their versions.

You can also analyze the trend for the number of reviews provided across multiple years. Use ‘toPandas()’ to convert the Spark data frame to a Pandas data frame, which you can visualize with Matplotlib. See the following code:

# Number of reviews across years
num_of_reviews_by_year = df.groupBy('year').count().orderBy('year').toPandas()

import matplotlib.pyplot as plt
plt.clf()
num_of_reviews_by_year.plot(kind='area', x='year',y='count', rot=70, color='#bc5090', legend=None, figsize=(8,6))
plt.xticks(num_of_reviews_by_year.year)
plt.xlim(1995, 2015)
plt.title('Number of reviews across years')
plt.xlabel('Year')
plt.ylabel('Number of Reviews')

The preceding commands render the plot on the attached EMR cluster. To visualize the plot within your notebook, use %matplot magic. See the following code:

%matplot plt

The following graph shows that the number of reviews provided by customers increased exponentially from 1995 to 2015. Interestingly, 2001, 2002, and 2015 are outliers, when the number of reviews dropped from the previous years.

A line chart showing number of reviews across years.

You can analyze the distribution of star ratings and visualize it using a pie chart. See the following code:

# Distribution of overall star ratings
product_ratings_dist = df.groupBy('star_rating').count().orderBy('count').toPandas()

plt.clf()
labels = [f"Star Rating: {rating}" for rating in product_ratings_dist['star_rating']]
reviews = [num_reviews for num_reviews in product_ratings_dist['count']]
colors = ['#00876c', '#89c079', '#fff392', '#fc9e5a', '#de425b']
fig, ax = plt.subplots(figsize=(8,5))
w,a,b = ax.pie(reviews, autopct='%1.1f%%', colors=colors)
plt.title('Distribution of star ratings for books')
ax.legend(w, labels, title="Star Ratings", loc="center left", bbox_to_anchor=(1, 0, 0.5, 1))

Print the pie chart using %matplot magic and visualize it from your notebook with the following code:

%matplot plt

The following pie chart shows that 80% of users gave a rating of 4 or higher. Approximately 10% of users rated their books 2 or lower. In general, customers are happy about their book purchases from Amazon.

Output shows pie chart depicting distribution of star ratings.

Lastly, use the ‘uninstall_package’ Pyspark API to uninstall the Pandas library that you installed using the install_package API. This is useful in scenarios in which you want to use a different version of a library that you previously installed using EMR Notebooks. See the following code:

sc.uninstall_package('pandas')

You get the following output:

Output shows that “pandas” package is successfully uninstalled.

Next, run the following code:

sc.list_packages()

You get the following output:

Output shows list of Python packages along with their versions.

After closing your notebook, the Pandas and Matplot libraries that you installed on the cluster using the install_pypi_package API are garbage and collected out of the cluster.

Using local Python libraries in EMR Notebooks

The notebook-scoped libraries discussed previously require your EMR cluster to have access to a PyPI repository. If you cannot connect your EMR cluster to a repository, use the Python libraries pre-packaged with EMR Notebooks to analyze and visualize your results locally within the notebook. Unlike the notebook-scoped libraries, these local libraries are only available to the Python kernel and are not available to the Spark environment on the cluster. To use these local libraries, export your results from your Spark driver on the cluster to your notebook and use the notebook magic to plot your results locally. Because you are using the notebook and not the cluster to analyze and render your plots, the dataset that you export to the notebook has to be small (recommend less than 100 MB).

To see the list of local libraries, run the following command from the notebook cell:

%%local
conda list

You get a list of all the libraries available in the notebook. Because the list is rather long, this post doesn’t include them.

For this analysis, find out the top 10 children’s books from your book reviews dataset and analyze the star rating distribution for these children’s books.

You can identify the children’s books by using customers’ written reviews with the following code:

kids_books = (
df
.where("lower(review_body) LIKE '%child%' OR lower(review_body) LIKE '%kid%' OR lower(review_body) LIKE '%infant%'OR lower(review_body) LIKE '%Baby%'")
.select("customer_id", "product_id", "star_rating", "product_title", "year")
)

Plot the top 10 children’s books by number of customer reviews with the following code:

top_10_book_titles = kids_books.groupBy('product_title') \
                       .count().orderBy('count', ascending=False) \
                       .limit(10)
top_10_book_titles.show(10, False)

You get the following output:

Output shows list of book titles with their corresponding review count.

Analyze the customer rating distribution for these books with the following code:

top_10 = kids_books.groupBy('product_title', 'star_rating') \
           .count().join(top_10_book_titles, ['product_title'], 'leftsemi') \
           .orderBy('count', ascending=False) 
top_10.show(truncate=False)

You get the following output:

Output shows list of book titles along with start ratings and review counts.

To plot these results locally within your notebook, export the data from the Spark driver and cache it in your local notebook as a Pandas DataFrame. To achieve this, first register a temporary table with the following code:

top_10.createOrReplaceTempView("top_10_kids_books")

Use the local SQL magic to extract the data from this table with the following code:

%%sql -o top_10 -n -1
SELECT product_title, star_rating, count from top_10_kids_books
GROUP BY product_title, star_rating, count
ORDER BY count Desc

For more information about these magic commands, see the GitHub repo.

After you execute the code, you get a user-interface to interactively plot your results. The following pie chart shows the distribution of ratings:

Output shows pie chart depicting distribution of star ratings.

You can also plot more complex charts by using local Matplot and seaborn libraries available with EMR Notebooks. See the following code:

%%local 
%matplotlib inline
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd
sns.set()
plt.clf()
top_10['book_name'] = top_10['product_title'].str.slice(0,30)
colormap = sns.color_palette("hls", 8)
pivot_df = top_10.pivot(index= 'book_name', columns='star_rating', values='count')
pivot_df.plot.barh(stacked=True, color = colormap, figsize=(15,11))
plt.title('Top 10 children books',fontsize=16)
plt.xlabel('Number of reviews',fontsize=14)
plt.ylabel('Book',fontsize=14)
plt.subplots_adjust(bottom=0.2)

You get the following output:

Output shows stacked bar chart for top 10 children books with star ratings and review counts.

Summary

This post showed how to use the notebook-scoped libraries feature of EMR Notebooks to import and install your favorite Python libraries at runtime on your EMR cluster, and use these libraries to enhance your data analysis and visualize your results in rich graphical plots. The post also demonstrated how to use the pre-packaged local Python libraries available in EMR Notebook to analyze and plot your results.

 


About the Author

Parag Chaudhari is a software development engineer at AWS.

 

 

 

Analyze Google Analytics data using Upsolver, Amazon Athena, and Amazon QuickSight

Post Syndicated from Roy Hasson original https://aws.amazon.com/blogs/big-data/analyze-google-analytics-data-using-upsolver-amazon-athena-and-amazon-quicksight/

In this post, we present a solution for analyzing Google Analytics data using Amazon Athena. We’re including a reference architecture built on moving hit-level data from Google Analytics to Amazon S3, performing joins and enrichments, and visualizing the data using Amazon Athena and Amazon QuickSight. Upsolver is used for data lake automation and orchestration, enabling customers to get started quickly.

Google Analytics is a popular solution for organizations who want to understand the performance of their web properties and applications. Google Analytics data is collected and aggregated to help users extract insights quickly.  This works great for simple analytics. It’s less than ideal, however, when you need to enrich Google Analytics data with other datasets to produce a comprehensive view of the customer journey.

Why analyze Google Analytics data on AWS?

Google Analytics has become the de-facto standard web analytics tool. It is offered for free at lower data volumes and provides tracking, analytics, and reporting.  It enables non-technical users to understand website performance by answering questions such as: where are users coming from? Which pages have the highest conversion rates? Where are users experiencing friction and abandoning their shopping cart?

While these questions are answered within the Google Analytics UI, there are however some limitation, such as:

  • Data sampling: Google Analytics standard edition displays sampled data when running ad hoc queries on time periods that contain more than 500,000 sessions. Large websites can easily exceed this number on a weekly or even daily basis. This can create reliability issues between different reports, as each query can be fed by a different sample of the data.
  • Difficulty integrating with existing AWS stack: Many customers have built or are in the process of building their data and analytics platform on AWS. Customers want to use the AWS analytics and machine learning capabilities with their Google Analytics data to enable new and innovative use cases.
  • Joining with external data sources: Seeing the full picture of a business’ online activity might require combining web traffic data with other sources. Google Analytics does not offer a simple way to either move raw data in or out of the system. Custom dimensions in Google Analytics can be used, but they are limited to 20 for the standard edition and are difficult to use.
  • Multi-dimensional analysis: Google Analytics custom reports and APIs are limited to seven dimensions per query. This limits the depth of analysis and requires various workarounds for more granular slicing and dicing.
  • Lack of alternatives: Google Analytics 360, which allows users to export raw data to Google BigQuery, carries a hefty annual fee. This can be prohibitive for organizations. And even with this upgrade, the native integration is only with BigQuery, which means users still can’t use their existing AWS stack.

Building or buying a new web analytics solution (including cookie-based tracking) is also cost-prohibitive, and can interrupt existing workflows that rely on Google Analytics data.

Customers are looking for a solution to enable their analysts and business users to incorporate Google Analytics data into their existing workflows using familiar AWS tools.

Moving Google Analytics data to AWS: Defining the requirements

To provide an analytics solution with the same or better level of reporting as Google Analytics, we designed our solution around the following tenets:

  1. Analytics with a low technical barrier to entry: Google Analytics is built for business users, and our solution is designed to provide a similar experience. This means that beyond ingesting the data, we want to automate the data engineering work that goes into making the data ready for analysis.  This includes data retention, partitioning, and compression. All of this work must be done under the hood and remain invisible to the user querying the data.
  2. Hit-level data: Google Analytics tracks clickstream activity based on Hits – the lowest level of interaction between a user and a webpage. These hits are then grouped into Sessions – hits within a given time period, and Users – groups of sessions (more details here). The standard Google Analytics API is limited to session and user-based queries, and does not offer any simple way of extracting hit-level data. Our solution, however, does provide access to this granular data.
  3. Unsampled data: By extracting the data from Google Analytics and storing it on Amazon S3, we are able to bypass the 500K sessions limitation. We also have access to unsampled data for any query at any scale.
  4. Data privacy: If sensitive data is stored in Google Analytics, relying on third-party ETL tools can create risks around data privacy, especially in the era of GDPR. Therefore, our solution encrypts data in transit and relies exclusively on processing within the customer’s VPC.

Solution overview

The solution is built on extracting hit-level data and storing it in a data lake architecture on Amazon S3. We then use Amazon Athena and Amazon QuickSight for analytics and reporting. Upsolver, an AWS premier solution provider, is used to automate ingestion, ETL and data management on S3. Upsolver also orchestrate the entire solution with a simple-to-use graphical user interface.  The following diagram shows the high level architecture of our solutions.

Reference architecture showing the flow of data across Google Anaytics, Amazon Athena and Amazon QuickSight

Using Upsolver’s GA connector we extract unsampled, hit-level data from Google Analytics. This data is then automatically ingested according to accepted data lake best practices and stored in an optimized form on Amazon S3. The following best practices are applied to the data:

  • Store data in Apache Parquet columnar file format to improve read performance and reduce the amount of data scanned per query.
  • Partition data by event (hit) time rather than by API query time.
  • Perform periodic compaction by which small files are merged into larger ones improving performance and optimizing compression.

Once data is stored on S3, we use Upsolver’s GUI to create structured fact tables from the Google Analytics data. Users can query them using Amazon Athena and Amazon Redshift. Upsolver provides simple to use templates to help users quickly create tables from their Google Analytics data.  Finally, we use Amazon QuickSight to create interactive dashboards to visualize the data.

The result is a complete view of our Google Analytics data. This view provides the level of self-service analytics that users have grown accustomed to, at any scale, and without the limitations outlined earlier.

Building the solution: Step by step guide

In this section, we walk through the steps to set up the environment, configure Upsolver’s Google Analytics plugin, extract the data, and begin exploring.

Step 1: Installation and permissions

  1. Sign up for Upsolver (can also be done via the AWS Marketplace).
  2. Allow Upsolver access to read data from Google Analytics and add new custom dimensions. Custom dimensions enable Upsolver to read non-sampled hit-level data directly from Google Analytics instead of creating parallel tracking mechanisms that aren’t as trust-worthy.
  3. To populate the custom dimensions that were added to Google Analytics, allow Upsolver to run a small JavaScript code on your website. If you’re using GA360, this is not required.

Step 2: Review and clean the raw data

For supported data sources, Upsolver automatically discovers the schema and collects key statistics for every field in the table. Doing so gives users a glimpse into their data.

In the following screenshot, you can see schema-on-read information on the left side, stats per field and value distribution on the right side.

Screen shot of the Upsolver UI showing schema-on-read information on the left side, stats per field and value distribution on the right side

Step 3: Publishing to Amazon Athena

Upsolver comes with four templates for creating tables in your AWS based data lake according to the Google Analytics entity being analyzed:

  • Pageviews – used to analyze user flow and behavior on specific sections of the web property using metrics such as time on page and exit rate.
  • Events – user-defined interactions such as scroll depth and link clicks.
  • Sessions – monitor a specific journey in the web property (all pageviews and events).
  • Users – understand a user’s interaction with the web property or app over time.

All tables are partitioned by event time, which helps improve query performance.

Upsolver users can choose to run the templates as-is, modify them first or create new tables unique to their needs.

The following screenshot shows the schema produced by the Pageviews template:

Screen shot of the Upsolver UI showing the schema produced by the Pageviews template:

The following screenshot shows the Pageviews and Events tables as well as the Amazon Athena views for Sessions and Users generated by the Upsolver templates.

Screenshot showing the Pageviews and Events tables as well as the Athena views for Sessions and Users generated from the Upsolver templates.

The following are a couple example queries you may want to run to extract specific insights:

-- Popular page titles 
SELECT page_title, 
       Count(*) AS num_hits 
FROM   ga_hits_pageviews 
GROUP  BY page_title 
ORDER  BY 2 DESC 
-- User aggregations from hit data 
SELECT user_id, 
       Count(*)                   AS num_hits, 
       Count(DISTINCT session_id) AS num_of_sessions, 
       Sum(session_duration)      AS total_sessions_time 
FROM   ga_hits_pageviews 
GROUP  BY user_id 

Step 4: Visualization in Amazon QuickSight

Now that the data has been ingested, cleansed, and written to S3 in a structured manner, we are ready visualize it with Amazon QuickSight. Start by creating a dashboard to mimic the one provided by Google Analytics.  But we don’t need to stop there.  We can use QuickSight ML Insights to extract deeper insights from our data.  We can also embed Amazon QuickSight visualizations into existing web portals and applications making insights available to everyone.

Screenshot of QuickSight visual ization showing several sections, one with a graph, several others with various statistics

Screen shot of QuickSight showing a global map with usage concentrations marked by bubbles, alongside a pie graph.

Sreenshot of QuickSight showing a bar graph, alongside a table with various data values.

Conclusion

With minimal setup, we were able to extract raw hit-level Google Analytics data, prepare, and stored it in a data lake on Amazon S3.  Using Upsolver, combined with Amazon Athena and Amazon QuickSight, we built a feature-complete solution for analyzing web traffic collected by Google Analytics on AWS.

Key technical benefits:

  • Schema on-read means data consumers don’t need to model the data into a table structure, and can instantly understand what their top dimensions are. For example, 85% of my users navigate my website using Google Chrome browser.
  • Graphical user interface that enables self-service consumption of Google Analytics data.
  • Fast implementation using pre-defined templates that map raw data from Google Analytics to tables in the data lake.
  • Ability to replay historical Google Analytics data stored on Amazon S3.
  • Ability to partition the data on Amazon S3 by hit time reducing complexity of handling late arriving events.
  • Optimize data on Amazon S3 automatically for improved query performance.
  • Automatically manage tables and partitions in AWS Glue Data Catalog.
  • Fully integrated with a suite of AWS native services – Amazon S3, Amazon Athena, Amazon Redshift and Amazon QuickSight.

Now that we have feature parity, we can begin to explore integrating other data sources such as CRM, sales, and customer profile to build a true 360-degree view of the customer.  Furthermore, you can now begin using AWS Machine Learning services to optimize traffic to your websites, forecast demand and personalize the user experience.

We’d love to hear what you think. Please feel free to leave a comment with any feedback or questions you may have.

The content and opinions in this post are those of the third-party author and AWS is not responsible for the content or accuracy of this post.

 


About the Authors


Roy Hasson is the global business development lead of analytics and data lakes at AWS.
He works with customers around the globe to design solutions to meet their data processing, analytics and business intelligence needs. Roy is big Manchester United fan, cheering his team on and hanging out with his family.

 

 

 

Eran Levy is the director of marketing at Upsolver.

 

 

 

 

Upgrade your resume with the AWS Certified Big Data — Specialty Certification

Post Syndicated from Tina Kelleher original https://aws.amazon.com/blogs/big-data/upgrade-your-resume-with-the-aws-certified-big-data-specialty-certification/

While most cloud computing professionals are aware of the Foundational, Associate, and Professional AWS Certifications, it’s worth mentioning that AWS also offers specialty certifications. Anyone pursuing a career that includes data analysis, data lakes, and data warehouse solutions is a solid candidate to earn the AWS Certified Big Data — Specialty certification.

The AWS Certified Big Data — Specialty certification is a great option to help grow your career. AWS Certification shows prospective employers that you have the technical skills and expertise required to perform complex data analyses using core AWS Big Data services like Amazon EMR, Amazon Redshift, Amazon QuickSight, and more. This certification validates your understanding of data collection, storage, processing, analysis, visualization, and security.

You can learn more about the full range of industry-recognized credentials that AWS offers on the AWS Certification page.

Recommended knowledge and experience

In addition to having a solid passion for cloud computing, it’s recommended that those interested in taking the AWS Certified Big Data — Specialty exam meet the following criteria:

You can find a complete list of recommended knowledge and the exam content covered in the Exam Guide. If you can tick the boxes on each of these criteria, then you’re ready to start preparing for the AWS Certified Big Data — Specialty exam.

Recommended resources

While there are no training completion requirements, AWS offers several options to help you prepare for the exam with best practices and technical skill checks to self-assess your readines

In addition to these exam prep resources, you might also find useful information on the Getting Started with Big Data on AWS and Learn to Build on AWS: Big Data pages.

Conclusion

In this post, I provided an overview of the value in earning the AWS Certified Big Data — Specialty certification. I covered the recommended knowledge that is a strong indicator of having reached a level of experience that qualifies you as a solid candidate for this AWS certification. I also provided training resources to help you brush up on your knowledge of AWS Big Data services.

For more information on the training programs offered by AWS, visit the AWS Digital and Classroom Training Overview page. You might also find helpful information on the AWS Training FAQs page.

If you have any feedback or questions, please leave a comment… and good luck on your exam!

 


About the Author

Tina Kelleher is a program manager at AWS.

 

 

 

Secure your Amazon EMR cluster from unintentional network exposure with Block Public Access configuration

Post Syndicated from Vignesh Rajamani original https://aws.amazon.com/blogs/big-data/secure-your-amazon-emr-cluster-from-unintentional-network-exposure-with-block-public-access-configuration/

AWS security groups act as a network firewall that allows you to control access to your cluster to only whitelisted IP addresses. Proper management of security groups rules is critical to protect your application and data on the cluster. Amazon EMR strongly recommends creating restrictive security group rules that include the necessary network ports, protocols, and IP addresses based on your application requirements.

While AWS account administrators can protect cloud network security in different ways, a new feature helps them prevent account users from launching clusters with misconfigured security group rules. Misconfiguration can open a broad range of cluster ports to unrestricted traffic from the public internet and expose cluster resources to outside threats.

This post discusses a new account level feature called Block Public Access (BPA) configuration that helps administrators enforce a common public access rule across all of their EMR clusters in a region.

Overview of Block Public Access configuration

BPA configuration is an account-level configuration that helps you centrally manage public network access to EMR clusters in a region. You can enable this configuration in a region and block your account users from launching clusters that allow unrestricted inbound traffic from the public IP address ( source set to 0.0.0.0/0 for IPv4 and ::/0 for IPv6) through its ports. Your applications may require specific ports to be open to the internet. In that case, configure these ports (or port ranges) in the BPA configuration as exceptions to allow public access before you launch clusters.

When account users launch clusters in the region where you have enabled BPA configuration, EMR will check the port rules defined in this configuration and  compare it with inbound traffic rules specified in the security groups associated with the clusters. If these security groups have inbound rules that open ports to the public IP address but you did not configure these ports as exception in BPA configuration, then EMR will fail the cluster creation and send an exception to the user.

 Enabling BPA configuration from the AWS Management Console

To enable BPA configuration, you need permission to call PutBlockPublicAccessConfiguration API.

  • Log in to the AWS Management Console. From the console, navigate to the Amazon EMR
  • From the navigation panel, choose Block Public Access.
  • Choose Change and select On to enable BPA.

By default, all ports are blocked except port 22 for SSH traffic. To allow more ports for public access, add them as exceptions.

  • Choose Add a port range.

Before launching your cluster, define these exceptions. The port number or range should be the only ones in the security group rules that have an inbound source IP address of 0.0.0.0/0 for IPv4 and ::/0 for IPv6.

  • Enter a port number or the range of ports for public access.
  • Choose Save Changes.

Block public access section with the "Change" hyperlink circled in red.

Block public access settings, under Exceptions section +Add a port range circled in red.

For information about configuring BPA using the AWS CLI, see Configure Block Public Access.

Summary

In this post ,we discussed a new account level feature on Amazon EMR called Block Public Access  (BPA) configuration that helps administrators manage public access to their EMR clusters. You can enable BPA configuration today and prevent your EMR cluster in a region from being unintentionally exposed to public network.

 


About the Author

Vignesh Rajamani is a senior product manager for EMR at AWS.

 

Federate Amazon QuickSight access with Okta

Post Syndicated from Loc Trinh original https://aws.amazon.com/blogs/big-data/federate-amazon-quicksight-access-with-okta/

Amazon QuickSight is a fast, cloud-powered business intelligence service that makes it easy to deliver insights to everyone in your organization. As a fully managed service, Amazon QuickSight lets you easily create and publish interactive dashboards that can then be accessed from any device and embedded into your applications, portals, and websites.

Amazon QuickSight supports identity federation through Security Assertion Markup Language 2.0 (SAML 2.0) in both Standard and Enterprise editions. With federation, you can manage users using your enterprise identity provider (IdP) and pass them to Amazon QuickSight at log-in. Such IdPs include Microsoft Active Directory Federation Services, Ping One Federation Server, and Okta.

This post provides step-by-step guidance for how to use Okta to federate access to Amazon QuickSight.

Create an Okta application

Sign in to your Okta admin dashboard. You can create a free Okta Developer Edition account.

  1. From the Okta admin dashboard ribbon, choose Applications.
  2. If you are viewing the Developer Console, switch to Classic UI, as shown in the following screenshot.
  3. Choose Add Application.
  4. Search for Amazon Web Services and choose Add.
  5. Rename Application label to Amazon QuickSight and choose Next.
  6. For Sign-On Options, choose SAML 2.0.
  7. For Default Relay State, type https://quicksight.aws.amazon.com.
  8. Right-click on Identity Provider metadata and choose Save Link As…
  9. Save the XML file to disk and choose Done. You need to use this file in the next steps.

Create a SAML provider in AWS

Open a new window and sign in to the AWS Management Console.

  1. Open the IAM console.
  2. In the navigation pane, choose Identity Providers, Create Provider.
  3. For Provider Type, choose SAML and provide a Provider Name (for example, Okta).
  4. For Metadata Document, upload the XML file from the previous steps.
  5. Choose Next Step, Create.
  6. Locate the IdP that you just created and make note of the Provider ARN

Create a role for federated users

This section describes the steps for creating an IAM SAML 2.0 federation role. While Okta is used for a single sign-on, there are two ways to provision users in Amazon QuickSight:

  • Grant the federation role permission to create new Amazon QuickSight users when a user visits for the first time.
  • Pre-provision Amazon QuickSight users using the API and add users to the appropriate groups. This is preferred for adding users to groups within Amazon QuickSight, because you can provision the user and add them to the groups at the same time.

The following steps demonstrate how to create a federation role with permission to create new Amazon QuickSight users. If you would rather pre-provision Amazon QuickSight users, instructions for using the API are at the end of this post.

  1. Open the IAM console.
  2. In the navigation pane, choose Roles, Create Role, Select type of trusted entity as SAML 2.0 federation.
  3. For SAML provider, select the IdP that you created in the previous steps (Okta).
  4. Select Allow programmatic and AWS Management Console access.
  5. Choose Next: Permissions, Create policy.
  6. In the Create policy window, navigate to the JSON tab and type the following:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": "sts:AssumeRoleWithSAML",
                "Resource": "<YOUR SAML IDENTITY PROVIDER ARN>",
                "Condition": {
                    "StringEquals": {
                        "saml:aud": "https://signin.aws.amazon.com/saml"
                    }
                }
            },
            {
                "Action": [
                    "quicksight:CreateReader"
                ],
                "Effect": "Allow",
                "Resource": [
                    "arn:aws:quicksight::<YOUR ACCOUNT ID>:user/${aws:userid}"
                ]
            }
        ]
    }

The IAM policy above grants the federation role permission to self-provision an Amazon QuickSight reader with the quicksight:CreateReader action. Best practice is to grant users in your organization reader access, and then upgrade users from within the application. Instructions for upgrading users are at the end of this post.

If you would rather pre-provision Amazon QuickSight users using the API, do not include any actions in the permission policy.

  1. Choose Review Policy.
  2. For Name, enter a value (for example, QuicksightOktaFederatedPolicy) and choose Create policy.
  3. On the Create role page, choose Refresh and select your new policy.
  4. Choose Next: Tags and Next: Review.
  5. Provide a Role name (for example, QuicksightOktaFederatedRole) and Role description.
  6. Choose Create role.

Create an AWS access key for Okta

To create an access key for Okta, follow these steps.

  1. Open the IAM console.
  2. In the navigation pane, choose Users, Add user.
  3. For User name, enter a value (for example, OktaSSOUser).
  4. For Access type, choose Programmatic access.
  5. Choose Next: Permissions, Attach existing policies directly, and Create policy.
  6. On the Create policy page, navigate to the JSON tab and type the following:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "iam:ListRoles",
                    "iam:ListAccountAliases"
                ],
                "Resource": "*"
            }
        ]
    }

  7. Choose Review Policy.
  8. For a Name, enter a value (for example, OktaListRolesPolicy) and choose Create policy.
  9. On the Create user page, choose Refresh and select your new policy.
  10. Choose Next: Tags, Next: Review, and Create user.
  11. To save your access key and secret key, choose Download .csv.

Configure the Okta application

Return to the window with your Okta admin dashboard.

  1. For Identity Provider ARN (Required only for SAML SSO), provide the ARN (for example, arn:aws:iam::<YOUR ACCOUNT ID>:saml-provider/Okta) of the IdP that you created in previous steps.
  2. Choose Done.
  3. From the Applications dashboard, choose Provisioning.
  4. Choose Configure API Integration.
  5. Select Enable API Integration.
  6. For Access Key and Secret Key, provide the access key and secret key that you downloaded in previous steps.
  7. Choose Test API CredentialsSave.
  8. From the SETTINGS pane, navigate to To App.
  9. Choose Edit.
  10. Enable Create Usersand choose Save.
  11. Choose Assignments, Assign and then select the users or groups to which to grant federated access.
  12. Select the Roles and SAML User Roles to grant to the users, as shown in the following screenshot.
  13. Choose Save and Go Back, Done.

Launch Amazon QuickSight

Log in to your Okta Applications dashboard with a user (if you are using an admin account, switch to user mode) that has been granted federated access. You should see a new application with your label (for example, Amazon QuickSight). Choose on the application icon to launch Amazon QuickSight.

You can now manage your users and groups using Okta as your IdP and federate access to Amazon QuickSight.

Pre-provisioning Amazon QuickSight users

The outlined steps demonstrate how to grant users permission to self-provision Amazon QuickSight users when they visit Amazon QuickSight for the first time. If you would rather pre-provision Amazon QuickSight users, you can use the API to create users and groups and then add users to those groups.

  1. To create an Amazon QuickSight user, run the following AWS CLI Link the Amazon QuickSight user to your federated Okta username by providing the user-name parameter with the format <role name>\<email> (for example, QuicksightOktaFederatedRole\[email protected])
    aws quicksight register-user \
        --aws-account-id=<YOUR ACCOUNT ID> \
        --namespace=default \
        --email=<[email protected]> \
        --user-name=<ROLE NAME>\<[email protected]> \
        --identity-type=QUICKSIGHT \
        --user-role=READER

  2. Optionally, create an Amazon QuickSight group.
    aws quicksight create-group \
        --aws-account-id=<YOUR ACCOUNT ID> \
        --namespace=default \
        --group-name="<YOUR GROUP NAME>" \
        --description="<YOUR GROUP DESCRIPTION>"

  3. Add users to groups.
    aws quicksight create-group-membership \
        --aws-account-id=<YOUR ACCOUNT ID> \
        --namespace=default \
        --group-name="<YOUR GROUP NAME>" \
        --member-name="<YOUR MEMBER USER NAME>"

By using the Amazon QuickSight API, you can manage users, groups, and group membership. After they’re created, groups automatically become available for use when modifying permissions to data sets, analyses, or dashboards by typing in the group name instead of a specific user. For other supported group and user management functions, see List of Actions by Function.

Managing users

You can upgrade users between reader and author or admin in the Manage users tab of the Manage QuickSight screen.

  1. In the Amazon QuickSight console, choose your user name in the upper-right corner and choose Manage QuickSight.
  2. In the navigation pane, choose Manage users.
  3. Locate the user to upgrade, and select the role to grant from the Role

Deep-linking dashboards

AmazonQuickSight dashboards can be shared using the Okta application’s single sign-on URL so that users can be federated directly to specific dashboards.

To deep link to a specific Amazon QuickSight dashboard with single sign-on, first locate the Okta application’s single sign-on URL. This can be found by opening the metadata XML file that you downloaded in the Create an Okta application steps above. The URL is the value of the Location attribute in the md:SingleSignOnService element and ends with /sso/saml.

After you have the Okta application’s single sign-on URL, append ?RelayState= to the end of the URL followed by the URL to your Amazon QuickSight dashboard. For example, your deep link URL might look as follows:

https://my-test-org.okta.com/app/amazon_aws/abcdefg12345XYZ678/sso/saml?RelayState=https://us-east-1.quicksight.aws.amazon.com/sn/dashboards/11111111-abcd-1234-efghi-111111111111

By deep-linking dashboards, you can provide users with a way to use single sign-on and directly access specific dashboards.

Summary

This post provided a step-by-step guide for configuring Okta as your IdP, and using IAM roles to enable single sign-on to Amazon QuickSight. It also showed how users and groups can be managed using the Amazon QuickSight API.

Although this post demonstrated the integration of IAM and Okta, you can replicate this solution using your choice of SAML 2.0 IdPs. For other supported federation options, see Enabling Single Sign-On Access to Amazon QuickSight Using SAML 2.0.

If you have any questions or feedback, please leave a comment.

 


About the Authors

Loc Trinh is a solutions architect at Amazon Web Services.

 

 

 

 

Naresh Gautam is a senior solutions architect at Amazon Web Services.

 

 

 

Create advanced insights using Level Aware Aggregations in Amazon QuickSight

Post Syndicated from Arun Baskar original https://aws.amazon.com/blogs/big-data/create-advanced-insights-using-level-aware-aggregations-in-amazon-quicksight/

Amazon QuickSight recently launched Level Aware Aggregations (LAA), which enables you to perform calculations on your data to derive advanced and meaningful insights. In this blog post, we go through examples of applying these calculations to a sample sales dataset so that you can start using these for your own needs.

What are Level Aware Aggregations?

Level aware aggregations are aggregation calculations that can be computed at a desired level in the overall query evaluation order of QuickSight. Please check this link for details on QuickSight’s Order of Evaluation. Up until now, the only types of aggregations possible in QuickSight were Display-level and Table calculation aggregation types.

  • Display-level aggregations are aggregations that are defined by the dimensions and metrics present in the field wells of a QuickSight visual.
  • Table calculations are computed by windowing/rolling-up over the display-level aggregated values of the visual. Hence, by definition, these are calculated after the Display-level aggregations are computed.

With Level Aware Aggregations, QuickSight now allows you to aggregate values before the Display-level aggregation. For more information, please visit the Level Aware Aggregations documentation.

Customer use cases

Distribution of customers by lifetime orders

Customer question: How many customers have made one order, two orders, three orders, and so forth?

In this case, we first want to aggregate the total number of orders made by each customer, then use the output of that as a visual dimension. This isn’t feasible to compute without LAA.

Solution using LAA

1.) Compute the number of orders per customer.

Calculated field name : NumberOrdersPerCustomer

Calculated field expression : countOver({order_id}, [{Customer Id}], PRE_AGG)

This computes the number of orders per customer, before the display-level aggregation of the visual.

2.) Create the visual.

Create the visual with the above field NumberOrdersPerCustomer in the “X-Axis well of the Field Wells. Add “Count Distinct” of “Customer Id” in the “Value” section of the Field Wells to create a histogram on number of orders made by customers.

As we can see, there are around 5000 unique customers with one order, around 3500 customers with two orders, and so on.

Filter out customers based on lifetime spends

Customer Question: How do I filter out customers with life-time spend less than $100,000? My visual’s dimension (group by) and metric definitions are independent of total spend per customer.

If the group dimensions of the aforementioned aggregation(spend) is exactly the same as the group dimensions in the field well, the customer can achieve this using aggregated filters feature. But that’s not always the case. As mentioned in the customer question, the visual’s definition can be different from the filter’s aggregation.

Solution using LAA

1.) Compute sum of sales per customer.

Calculated field name :  salesPerCustomer

Calculated field expression : sumOver(sales,[{customer_id}],PRE_AGG)

PRE_AGG indicates that the computation must occur before display-level aggregation.

2.) Create the visuals.

The visual on the left shows sum of sales per segment and the visual on the right shows the total number of customers. Note that there are no filters applied at this point.

3.) Create the filter on salesPerCustomer. 

Create a filter on top of the above field salesPerCustomer to select items greater than $100,000.

4.) Apply the filter.

The above image shows applying the filter on “salesPerCustomer” greater than $100,000.

With the filter applied, we have excluded the customers whose total spend is less than $100,000, regardless of what we choose to display in the visuals.

Fixed percent of total sales even with filters applied

Customer Question: How much is the contribution of each industry to the entire company’s profit (percent of total)? I don’t want the total to recompute when filters are applied.

The existing table calculation function percentOfTotal isn’t able to solve this problem, since filters on categories are applied before computing the total. Using percentOfTotal would recalculate the total every time filters are applied. We need a solution that doesn’t consider the filtering when computing the total.

Solution using LAA

1.) Compute total sales before filters through a calculated field.

Calculated field name : totalSalesBeforeFilters

Calculated field expression : sumOver(sales,[],PRE_FILTER)

PRE_FILTER indicates that this computation must be done prior to applying filters.

The partition dimension list (second argument) is empty since we want to compute the overall total.

2.) Compute the fixed percent of total sales.

Calculated field name : fixedPercentOfTotal

Calculated field expression : sum(sales) / min(totalSalesBeforeFilters)

Note: totalSalesBeforeFilters is the same for every row of the unaggregated data. Since we want to use it post-aggregation, we are using the aggregation min on top of it. If all values are the same, max or avg aggregations can be used as well as it serves the same purpose.

3.) Create the visual.

Add “industry” field to “Rows” well. Add “sales (SUM)” and “fixedPercentOfTotal“ to the ”values“ section. Now, the percent of total metric would remain fixed even if we filter out the data based on any underlying dimension or measure.

The visual shows sales per industry along with percent of total, computed using the table calculation percentOfTotal and using Level Aware Aggregation as described above. Both the percent of total values are currently the same since there aren’t any filters applied.

The visual shows the same metrics but with industries filtered only to 5 of them. As we can see “Percent of total sales” got re-adjusted to represent only the filtered data, whereas “Fixed Percent of total sales” remains the same even after filtering. Both the metrics are valuable customer use cases now feasible through QuickSight.

Compare sales in a category to industry average

Customer question: How do I compare sales in a category to the industry average? I want the industry average to include all categories even after filtering.

Since we want the industry average to stay fixed even with filtering, we need PRE_FILTER aggregation to achieve this.

Solution using LAA

1.) Compute the industry average.

Calculated field name : IndustryAverage

Calculated field expression : avgOver(sumOver(sales,[{category}],PRE_FILTER),[],PRE_FILTER)

We first compute the sum of sales per category and then average it across all categories. It’s important to note here that we first computed a finer level aggregation and fed that into a coarser level aggregation.

2.) Compute the difference from IndustryAverage.

Calculated field name : FixedDifferenceFromIndustryAverage

Calculated field expression : sum(sales) – min(IndustryAverage)

As mentioned in one of the examples above, we use min aggregation to retain the data while going.

3.) Create the visual.

Create the visual by adding “Category” in “X axis” field well and SUM(Sales), IndustryAverage and FixedDifferenceFromIndustryAverage as the values in a bar chart.

Visual shows total sales per category, the average across all industries and each category’s difference from average.

This visual shows the same metrics, but with categories filtered to include only 6 of them. As we can see, the industry average remained the same before and after filtering, keeping the difference the same whether you choose to show all categories, some of them, or just one.

Categorize customers based on lifetime spend

Customer question: How do I classify customers based on cumulative sales contribution? I then want to use that classification as my visual’s grouping.

The objective here is create custom sized bins to classify the customer. Even though we could do this classification post display-level aggregation, we wouldn’t be able to use it as a dimension/group by in the visual.

Solution using LAA

1.) Compute sales per customer before display-level aggregation.

Calculated field name : salesPerCustomer

Calculated field expression : sumOver({sales amount},[{customer id}],PRE_AGG)

2.) Categorize Customers.

Calculated field name : Customer Category

Calculated field expression : ifelse(salesPerCustomer < 1000, “VERY_LOW”, salesPerCustomer < 10000, “LOW”, salesPerCustomer < 100000, “MEDIUM”, “HIGH”)

3.) Create the visual.

Create the visual by adding “Customer Category” to the “Y-axis” field well, “Count Distinct” of “customer id” to the value field well.

Above image shows the number of unique customers per Custom Category.

Filtering can be done on top of these categories as well to build other relevant visuals, since the categories are tagged before aggregation.

Above image shows the number of unique customers per custom category split by gender.

Availability

Level aware aggregations are available in both Standard and Enterprise editions, in all supported AWS Regions. For more information, see the Amazon QuickSight documentation.

 


About the Author

Arun Baskar is a software development engineer for QuickSight at Amazon Web Services.

 

 

 

 

Implement perimeter security in EMR using Apache Knox

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/implement-perimeter-security-in-emr-using-apache-knox/

Perimeter security helps secure Apache Hadoop cluster resources to users accessing from outside the cluster. It enables a single access point for all REST and HTTP interactions with Apache Hadoop clusters and simplifies client interaction with the cluster. For example, client applications must acquire Kerberos tickets using Kinit or SPNEGO before interacting with services on Kerberos enabled clusters. In this post, we walk through setup of Apache Knox to enable perimeter security for EMR clusters.

It provides the following benefits:

  • Simplify authentication of various Hadoop services and UIs
  • Hide service-specific URL’s/Ports by acting as a Proxy
  • Enable SSL termination at the perimeter
  • Ease management of published endpoints across multiple clusters

Overview

Apache Knox

Apache Knox provides a gateway to access Hadoop clusters using REST API endpoints. It simplifies client’s interaction with services on the Hadoop cluster by integrating with enterprise identity management solutions and hiding cluster deployment details.

In this post, we run the following setup:

  • Create a virtual private cloud (VPC) based on the Amazon VPC
  • Provision an Amazon EC2 Windows instance for Active Directory domain controller.
  • Create an Amazon EMR security configuration for Kerberos and cross-realm trust.
  • Set up Knox on EMR master node and enable LDAP authentication

Visually, we are creating the following resources:

Figure 1: Provisioned infrastructure from CloudFormation

Prerequisites and assumptions

Before getting started, the following prerequisites must be met:

IMPORTANT: The templates use hardcoded user name and passwords, and open security groups. They are not intended for production use without modification.

NOTE:

  • Single VPC has been used to simplify networking
  • CloudFormationtemplates use hardcoded user names and passwords and open security groups for simplicity.

Implementation

Single-click solution deployment

If you don’t want to set up each component individually, you can use the single-step AWS CloudFormation template. The single-step template is a master template that uses nested stacks (additional templates) to launch and configure all the resources for the solution in one operation.

To launch the entire solution, click on the Launch Stack button below that directs you to the console. Do not change to a different Region because the template is designed to work only in US-EAST-1 Region.

This template requires several parameters that you must provide. See the table below, noting the parameters marked with *, for which you have to provide values. The remaining parameters have default values and should not be edited.

For this parameterUse this
1Domain Controller NameDC1
2Active Directory domainawsknox.com
3Domain NetBIOS nameAWSKNOX (NetBIOS name of the domain (up to 15 characters).
4Domain admin userUser name for the account to be added as Domain administrator. (awsadmin)
5Domain admin password *Password for the domain admin user. Must be at least eight characters containing letters, numbers, and symbols – for example, CheckSum123
6Key pair name *Name of an existing EC2 key pair to enable access to the domain controller instance.
7Instance typeInstance type for the domain controller EC2 instance.
8LDAP Bind user nameLDAP Bind user name.
Default value is: CN=awsadmin,CN=Users,DC=awsknox,DC=com
9EMR Kerberos realmEMR Kerberos realm name. This is usually the VPC’s domain name in upper case letters Eg: EC2.INTERNAL
10Cross-realm trust password *Password for cross-realm trust Eg: CheckSum123
11Trusted Active Directory DomainThe Active Directory domain that you want to trust. This is same as Active Directory in name, but in upper case letters. Default value is “AWSKNOX.COM”
12Instance typeInstance type for the domain controller EC2 instance. Default: m4.xlarge
13Instance countNumber of core instances of EMR cluster. Default: 2
14Allowed IP addressThe client IP address that can reach your cluster. Specify an IP address range in CIDR notation (for example, 203.0.113.5/32). By default, only the VPC CIDR (10.0.0.0/16) can reach the cluster. Be sure to add your client IP range so that you can connect to the cluster using SSH.
15EMR applicationsComma-separated list of applications to install on the cluster. By default it selects “Hadoop,” “Spark,” “Ganglia,” “Hive” and “HBase”
16LDAP search baseLDAP search base: Only value is : “CN=Users,DC=awshadoop,DC=com”
17LDAP search attributeProvide LDAP user search attribute. Only value is : “sAMAccountName”
18LDAP user object classProvide LDAP user object class value. Only value is : “person”
19LDAP group search baseProvide LDAP group search base value. Only value is : “dc=awshadoop, dc=com”
20LDAP group object classProvide LDAP group object class. Only value is “group”
21LDAP member attributeProvide LDAP member attribute. Only value is : “member”
22EMRLogDir *Provide an Amazon S3 bucket where the EMRLogs are stored. Also provide “s3://” as prefix.
23S3 BucketAmazon S3 bucket where the artifacts are stored. In this case, all the artifacts are stored in “aws-bigdata-blog” public S3 bucket. Do not change this value.

Deploying each component individually

If you used the CloudFormation Template in the single-step solution, you can skip this section and start from the Access the Cluster section. This section describes how to use AWS CloudFormation templates to perform each step separately in the solution.

1.     Create and configure an Amazon VPC

In this step, we set up an Amazon VPC, a public subnet, an internet gateway, a route table, and a security group.

In order for you to establish a cross-realm trust between an Amazon EMR Kerberos realm and an Active Directory domain, your Amazon VPC must meet the following requirements:

  • The subnet used for the Amazon EMR cluster must have a CIDR block of fewer than nine digits (for example, 10.0.1.0/24).
  • Both DNS resolution and DNS hostnames must be enabled (set to “yes”).
  • The Active Directory domain controller must be the DNS server for instances in the Amazon VPC (this is configured in the next step).

To launch directly through the console, choose Launch Stack.

2.     Launch and configure an Active Directory domain controller

In this step, you use an AWS CloudFormation template to automatically launch and configure a new Active Directory domain controller and cross-realm trust.

Next, launch a windows EC2 instance and install and configure an Active Directory domain controller. In addition to launching and configuring an Active Directory domain controller and cross realm trust, this AWS CloudFormation template also sets the domain controller as the DNS server (name server) for your Amazon VPC.

To launch directly through the console, choose Launch Stack.

3.     Launch and configure EMR cluster with Apache Knox

To launch a Kerberized Amazon EMR cluster, first we must create a security configuration containing the cross-realm trust configuration. For more details on this, please refer to the blog post, Use Kerberos Authentication to integerate Amazon EMR with Microsoft Active Directory.

In addition to the steps that are described in the above blog, this adds an additional step to the EMR cluster, which creates a Kerberos principal for Knox.

The CloudFormation script also updates the below parameters in core-site.xml, hive-site.xml, hcatalog-webchat-site.xml and oozie-site.xml files. You can see these in “create_emr.py” script. Once the EMR cluster is created, it also runs a shell script as an EMR step. This shell script downloads and installs Knox software on EMR master machine. It also creates a Knox topology file with the name: emr-cluster-top.

To launch directly through the console, choose Launch Stack.

Accessing the cluster

API access to Hadoop Services

One of the main reasons to use Apache Knox is the isolate the Hadoop cluster from direct connectivity by users. Below, we demonstrate how you can interact with several Hadoop services like WebHDFS, WebHCat, Oozie, HBase, Hive, and Yarn applications going through the Knox endpoint using REST API calls. The REST calls can be called on the EMR cluster or outside of the EMR cluster. However, in a production environment, EMR cluster’s security groups should be set to only allow traffic on Knox’s port number to block traffic to all other applications.

For the purposes of this blog, we make the REST calls on the EMR cluster by SSH’ing to master node on the EMR cluster using the LDAP credentials:

ssh [email protected]<EMR-Master-Machine-Public-DNS>

Replace <EMR-Master-Machine-Public-DNS> with the value from the CloudFormation outputs to the EMR cluster’s master node. Find this CloudFormation Output value from the stack you deployed in Step 3 above.

You are prompted for the ‘awsadmin’ LDAP password. Please use the password you selected during the CloudFormation stack creation.

NOTE: In order to connect, your client machine’s IP should fall within the CIDR range specified by “Allowed IP address in the CloudFormation parameters. If you are not able to connect to the master node, check the master instance’s security group for the EMR cluster has a rule to allow traffic from your client. Otherwise, your organizations firewall may be blocking your traffic.

Demonstrating access to the WebHDFS service API:

Here we will invoke the LISTSTATUS operation on WebHDFS via the knox gateway. In our setup, knox is running on port number 8449. The below command will return a directory listing of the root directory of HDFS.

curl -ku awsadmin 'https://localhost:8449/gateway/emr-cluster-top/webhdfs/v1/?op=LISTSTATUS'

You can use both “localhost” or the private DNS of the EMR master node.

You are prompted for the password. This is the same “Domain admin password” that was passed as the parameter into the CloudFormation stack.

Demonstrating access Resource Manager service API:

The Resource manager REST API provides information about the Hadoop cluster status, applications that are running on the cluster etc. We can use the below command to get the cluster information.

curl -ikv -u awsadmin -X GET 'https://localhost:8449/gateway/emr-cluster-top/resourcemanager/v1/cluster'

You are prompted for the password. This is the same “Domain admin password” that was passed as the parameter into the CloudFormation stack.

Demonstrating connecting to Hive using Beeline through Apache Knox:

We can use Beeline, a JDBC client tool to connect to HiveServer2. Here we will connect to Beeline via Knox.

Use the following command to connect to hive shell

$hive

Use the following syntax to connect to Hive from beeline

!connect jdbc:hive2://<EMR-Master-Machine-Public-DNS>:8449/;transportMode=http;httpPath=gateway/emr-cluster-top/hive;ssl=true;sslTrustStore=/home/knox/knox/data/security/keystores/gateway.jks;trustStorePassword=CheckSum123

NOTE: You must update the <EMR-Master-Machine-Public-DNS> with the public DNS name of the EMR master node.

Demonstrating submitting an Spark job using Apache Livy through Apache Knox

You can use the following command to submit a spark job to an EMR cluster. In this example, we run SparkPi program that is available in spark-examples.jar.

curl -i -k -u awsadmin -X POST --data '{"file": "s3://aws-bigdata-blog/artifacts/aws-blog-emr-knox/spark-examples.jar", "className": "org.apache.spark.examples.SparkPi", "args": ["100"]}' -H "Content-Type: application/json" https://localhost:8449/gateway/emr-cluster-top/livy/v1/batches

You can use both “localhost” or the private DNS of EMR master node.

Securely accessing Hadoop Web UIs

In addition to providing API access to Hadoop clusters, Knox also provides proxying service for Hadoop UIs. Below is a table of available UIs:

Application NameApplication URL
1Resource Managerhttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/yarn/
2Gangliahttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/ganglia/
3Apache HBasehttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/hbase/webui/master-status
4WebHDFShttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/hdfs/
5Spark Historyhttps://<EMRClusterURL>:8449/gateway/emr-cluster-top/sparkhistory/

On the first visit of any UI above, you are presented with a drop-down for login credentials. Enter the login user awsadmin and the password you specified as a parameter to your CloudFormation template.

You can now browse the UI as you were directly connected to the cluster. Below is a sample of the Yarn UI:

And the scheduler information in the Yarn UI:

Ganglia:

Spark History UI:

Lastly, HBase UI. The entire URL to the “master-status” page must be provided

Troubleshooting

It’s always clear when there is an error interacting with Apache Knox. Below are a few troubleshooting steps.

I cannot connect to the UI. I do not get any error codes.

  • Apache Knox may not be running. Check that its running by logging into the master node of your cluster and running “ps -ef | grep knox”. There should be a process running.
ps -ef | grep knox
Knox 114022 1 0 Aug24 ? 00:04:21 /usr/lib/jvm/java/bin/java -Djava.library.path=/home/knox/knox/ext/native -jar /home/knox/knox/bin/gateway.jar

If the process is not running, start the process by running “/home/knox/knox/bin/gateway.sh start” as the Knox user (sudo su – knox).

  • Your browser may not have connectivity to the cluster. Even though you may be able to SSH to the cluster, a firewall rule or security group rule may be preventing traffic on the port number that Knox is running on. You can route traffic through SSH by building an SSH tunnel and enable port forwarding.

I get an HTTP 400, 404 or 503 code when accessing a UI:

  • Ensure that the URL you are entering is correct. If you do not enter the correct path, then Knox provides an HTTP 404.
  • There is an issue with the routing rules within Apache Knox and it does not know how to route the requests. The logs for Knox are at INFO level by default and is available in /home/knox/knox/logs/. If you want to change the logging level, change the following lines in /home/knox/knox/conf/gateway-log4j.properties:log4j.logger.org.apache.knox.gateway=INFO
    #log4j.logger.org.apache.knox.gateway=DEBUGto#log4j.logger.org.apache.knox.gateway=INFO
    log4j.logger.org.apache.knox.gateway=DEBUGThe logs will provide a lot more information such as how Knox is rewriting URL’s. This could provide insight whether Knox is translating URL’s correctly.You can use the below “ldap”, “knoxcli” and “curl” commands to verify that the setup is correct. Run these commands as “knox” user.
  • To verify search base, search attribute and search class, run the below ldap command
    ldapsearch -h <Active-Directory-Domain-Private-IP-Address> -p 389 -x -D 'CN=awsadmin,CN=Users,DC=awsknox,DC=com' -w 'CheckSum123' -b 'CN=Users,DC=awsknox,DC=com' -z 5 '(objectClass=person)' sAMAccountName

  • Replace “<Active-Directory-Domain-Private-IP-Address>” with the private IP address of the Active Directory EC2 instance. You can get this IP address from the output of second CloudFormation template.
  • To verify the values for server host, port, username, and password, run the below ldap command.
    ldapwhoami -h <Active-Directory-Domain-Private-IP-Address> -p 389 -x -D 'CN=awsadmin,CN=Users,DC=awsknox,DC=com' -w 'CheckSum123'

  • Replace “<Active-Directory-Domain-Private-IP-Address>” with the private IP address of the Active Directory EC2 instance. You can get this IP address from the output of second CloudFormation template.
  • It should display the below output:

  • To verify the System LDAP bind successful or not:
    /home/knox/knox/bin/knoxcli.sh user-auth-test --cluster emr-cluster-top --u awsadmin --p 'CheckSum123'

  • Here “emr-cluster-top” is the topology file that defines the applications that are available and the endpoints that Knox should connect to service the application.
  • The output from the command should return the below output:

“System LDAP Bind successful!”

  • To verify LDAP authentication successful or not, run the below command.
    /home/knox/knox/bin/knoxcli.sh user-auth-test --cluster emr-cluster-top --u awsadmin --p 'CheckSum123'

  • Here “emr-cluster-top” is the topology file name that we created.
  • The output the command should return the below output:

“LDAP authentication successful!”

  • Verify if WebHDFS is reachable directly using the service
  • First, we must get a valid Kerberos TGT, for that we must use the kinit command as below:
    kinit -kt /mnt/var/lib/bigtop_keytabs/knox.keytab knox/<EMR-Master-Machine-Private-DNS>@EC2.INTERNAL
    curl --negotiate -u : http://<EMR-Master-Machine-Private-DNS>:50070/webhdfs/v1/?op=GETHOMEDIRECTORY

  • For example: EMR-Master-Machine-Private-DNS appears in this format: ip-xx-xx-xx-xx.ec2.internal
  • It should return a JSON object containing a “Path” variable of the user’s home directory.

Cleanup

Delete the CloudFormation stack to clean up all the resources created for this setup. If you used the nested stack, CloudFormation deletes all resources in one operation. If you deployed the templates individually, delete them in the reverse order of creation, deleting the VPC stack last.

Conclusion

In this post, we went through the setup, configuration, and validation of Perimeter security for EMR clusters using Apache Knox. This helps simplify Authentication for various Hadoop services. In our next post, we will show you how to integrate Apache Knox and Apache Ranger to enable authorization and audits.

Stay tuned!

 


Related

 


About the Author


Varun Rao is a enterprise solutions architect
. He works with enterprise customers in their journey to the cloud with focus of data strategy and security. In his spare time, he tries to keep up with his 4-year old.

 

 

 

Mert Hocanin is a big data architect with AWS, covering several products, including EMR, Athena and Managed Blockchain. Prior to working in AWS, he has worked on Amazon.com’s retail business as a Senior Software Development Engineer, building a data lake to process vast amounts of data from all over the company for reporting purposes. When not building and designing data lakes, Mert enjoys traveling and food.

 

 

 

Photo of Srikanth KodaliSrikanth Kodali is a Sr. IOT Data analytics architect at Amazon Web Services. He works with AWS customers to provide guidance and technical assistance on building IoT data and analytics solutions, helping them improve the value of their solutions when using AWS.

 

 

 

Run Spark applications with Docker using Amazon EMR 6.0.0 (Beta)

Post Syndicated from Paul Codding original https://aws.amazon.com/blogs/big-data/run-spark-applications-with-docker-using-amazon-emr-6-0-0-beta/

The Amazon EMR team is excited to announce the public beta release of EMR 6.0.0 with Spark 2.4.3, Hadoop 3.1.0, Amazon Linux 2, and Amazon Corretto 8. With this beta release, Spark users can use Docker images from Docker Hub and Amazon Elastic Container Registry (Amazon ECR) to define environment and library dependencies. Using Docker, users can easily define their dependencies and use them for individual jobs, avoiding the need to install dependencies on individual cluster hosts.

This post shows you how to use Docker with the EMR release 6.0.0 Beta. You’ll learn how to launch an EMR release 6.0.0 Beta cluster and run Spark jobs using Docker containers from both Docker Hub and Amazon ECR.

Hadoop 3 Docker support

EMR 6.0.0 (Beta) includes Hadoop 3.1.0, which allows the YARN NodeManager to launch containers either directly on the host machine of the cluster, or inside a Docker container. Docker containers provide a custom execution environment in which the application’s code runs isolated from the execution environment of the YARN NodeManager and other applications.

These containers can include special libraries needed by the application, and even provide different versions of native tools and libraries such as R, Python, Python libraries. This allows you to easily define the libraries and runtime dependencies that your applications need, using familiar Docker tooling.

Clusters running the EMR 6.0.0 (Beta) release are configured by default to allow YARN applications such as Spark to run using Docker containers. To customize this, use the configuration for Docker support defined in the yarn-site.xml and container-executor.cfg files available in the /etc/hadoop/conf directory. For details on each configuration option and how it is used, see Launching Applications Using Docker Containers.

You can choose to use Docker when submitting a job. On job submission, the following variables are used to specify the Docker runtime and Docker image used:

    • YARN_CONTAINER_RUNTIME_TYPE=docker
    • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={DOCKER_IMAGE_NAME}

When you use Docker containers to execute your YARN applications, YARN downloads the Docker image specified when you submit your job. For YARN to resolve this Docker image, it must be configured with a Docker registry. Options to configure a Docker registry differ based on how you chose to deploy EMR (using either a public or private subnet).

Docker registries

A Docker registry is a storage and distribution system for Docker images. For EMR 6.0.0 (Beta), the following Docker registries can be configured:

  • Docker Hub: A public Docker registry containing over 100,000 popular Docker images.
  • Amazon ECR: A fully-managed Docker container registry that allows you to create your own custom images and host them in a highly available and scalable architecture.

Deployment considerations

Docker registries require network access from each host in the cluster, as each host downloads images from the Docker registry when your YARN application is running on the cluster. How you choose to deploy your EMR cluster (launching it into a public or private subnet) may limit your choice of Docker registry due to network connectivity requirements.

Public subnet

With EMR public subnet clusters, nodes running YARN NodeManager can directly access any registry available over the internet, such as Docker Hub, as shown in the following diagram.

Private Subnet

With EMR private subnet clusters, nodes running YARN NodeManager don’t have direct access to the internet.  Docker images can be hosted in the ECR and accessed through AWS PrivateLink, as shown in the following diagram.

For details on how to use AWS PrivateLink to allow access to ECR in a private subnet scenario, see Setting up AWS PrivateLink for Amazon ECS, and Amazon ECR.

Configuring Docker registries

Docker must be configured to trust the specific registry used to resolve Docker images. The default trust registries are local (private) and centos (on public Docker Hub). You can override docker.trusted.registries in /etc/hadoop/conf/container-executor.cfg to use other public repositories or ECR. To override this configuration, use the EMR Classification API with the container-executor classification key.

The following example shows how to configure the cluster to trust both a public repository (your-public-repo) and an ECR registry (123456789123.dkr.ecr.us-east-1.amazonaws.com). When using ECR, replace this endpoint with your specific ECR endpoint.  When using Docker Hub, please replace this repository name with your actual repository name.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos, your-public-repo,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                "docker.privileged-containers.registries": "local,centos, your-public-repo,123456789123.dkr.ecr.us-east-1.amazonaws.com"
            }
        }
    ]
  }
]

To launch an EMR 6.0.0 (Beta) cluster with this configuration using the AWS Command Line Interface (AWS CLI), create a file named container-executor.json with the contents of the preceding JSON configuration.  Then, use the following commands to launch the cluster:

$ export KEYPAIR=<Name of your Amazon EC2 key-pair>
$ export SUBNET_ID=<ID of the subnet to which to deploy the cluster>
$ export INSTANCE_TYPE=<Name of the instance type to use>
$ export REGION=<Region to which to deploy the cluster deployed>

$ aws emr create-cluster \
    --name "EMR-6-Beta Cluster" \
    --region $REGION \
    --release-label emr-6.0.0-beta \
    --applications Name=Hadoop Name=Spark \
    --service-role EMR_DefaultRole \
    --ec2-attributes KeyName=$KEYPAIR,InstanceProfile=EMR_EC2_DefaultRole,SubnetId=$SUBNET_ID \
    --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=$INSTANCE_TYPE InstanceGroupType=CORE,InstanceCount=2,InstanceType=$INSTANCE_TYPE \
    --configuration file://container-executor.json

Using ECR

If you’re new to ECR, follow the instructions in Getting Started with Amazon ECR and verify you have access to ECR from each instance in your EMR cluster.

To access ECR using the docker command, you must first generate credentials. To make sure that YARN can access images from ECR, pass a reference to those generated credentials using the container environment variable YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG.

Run the following command on one of the core nodes to get the login line for your ECR account.

$ aws ecr get-login --region us-east-1 --no-include-email

The get-login command generates the correct Docker CLI command to run to create credentials. Copy and run the output from get-login.

$ sudo docker login -u AWS -p <password> https://<account-id>.dkr.ecr.us-east-1.amazonaws.com

This command generates a config.json file in the /root/.docker folder.  Copy this file to HDFS so that jobs submitted to the cluster can use it to authenticate to ECR.

Execute the commands below to copy the config.json file to your home directory.

$ mkdir -p ~/.docker
$ sudo cp /root/.docker/config.json ~/.docker/config.json
$ sudo chmod 644 ~/.docker/config.json

Execute the commands below to put the config.json in HDFS so it may be used by jobs running on the cluster.

$ hadoop fs -put ~/.docker/config.json /user/hadoop/

At this point, YARN can access ECR as a Docker image registry and pull containers during job execution.

Using Spark with Docker

With EMR 6.0.0 (Beta), Spark applications can use Docker containers to define their library dependencies, instead of requiring dependencies to be installed on the individual Amazon EC2 instances in the cluster. This integration requires configuration of the Docker registry, and definition of additional parameters when submitting a Spark application.

When the application is submitted, YARN invokes Docker to pull the specified Docker image and run the Spark application inside of a Docker container. This allows you to easily define and isolate dependencies. It reduces the time spent bootstrapping or preparing instances in the EMR cluster with the libraries needed for job execution.

When using Spark with Docker, make sure that you consider the following:

  • The docker package and CLI are only installed on core and task nodes.
  • The spark-submit command should always be run from a master instance on the EMR cluster.
  • The Docker registries used to resolve Docker images must be defined using the Classification API with the container-executor classification key to define additional parameters when launching the cluster:
    • docker.trusted.registries
    • docker.privileged-containers.registries
  • To execute a Spark application in a Docker container, the following configuration options are necessary:
    • YARN_CONTAINER_RUNTIME_TYPE=docker
    • YARN_CONTAINER_RUNTIME_DOCKER_IMAGE={DOCKER_IMAGE_NAME}
  • When using ECR to retrieve Docker images, you must configure the cluster to authenticate itself. To do so, you must use the following configuration option:
    • YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG={DOCKER_CLIENT_CONFIG_PATH_ON_HDFS}
  • Mount the /etc/passwd file into the container so that the user running the job can be identified in the Docker container.
    • YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro
  • Any Docker image used with Spark must have Java installed in the Docker image.

Creating a Docker image

Docker images are created using a Dockerfile, which defines the packages and configuration to include in the image.  The following two example Dockerfiles use PySpark and SparkR.

PySpark Dockerfile

Docker images created from this Dockerfile include Python 3 and the numpy Python package.  This Dockerfile uses Amazon Linux 2 and the Amazon Corretto JDK 8.

FROM amazoncorretto:8

RUN yum -y update
RUN yum -y install yum-utils
RUN yum -y groupinstall development

RUN yum list python3*
RUN yum -y install python3 python3-dev python3-pip python3-virtualenv

RUN python -V
RUN python3 -V

ENV PYSPARK_DRIVER_PYTHON python3
ENV PYSPARK_PYTHON python3

RUN pip3 install --upgrade pip
RUN pip3 install numpy panda

RUN python3 -c "import numpy as np"

SparkR Dockerfile

Docker images created from this Dockerfile include R and the randomForest CRAN package. This Dockerfile includes Amazon Linux 2 and the Amazon Corretto JDK 8.

FROM amazoncorretto:8

RUN java -version

RUN yum -y update
RUN amazon-linux-extras enable R3.4

RUN yum -y install R R-devel openssl-devel
RUN yum -y install curl

#setup R configs
RUN echo "r <- getOption('repos'); r['CRAN'] <- 'http://cran.us.r-project.org'; options(repos = r);" > ~/.Rprofile

RUN Rscript -e "install.packages('randomForest')"

For more information on Dockerfile syntax, see the Dockerfile reference documentation.

Using Docker images from ECR

Amazon Elastic Container Registry (ECR) is a fully-managed Docker container registry that makes it easy for developers to store, manage, and deploy Docker container images. When using ECR, the cluster must be configured to trust your instance of ECR, and you must configure authentication in order for the cluster to use Docker images from ECR.

In this example, our cluster must be created with the following additional configuration, to ensure the ECR registry is trusted. Please replace the 123456789123.dkr.ecr.us-east-1.amazonaws.com endpoint with your ECR endpoint.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos,123456789123.dkr.ecr.us-east-1.amazonaws.com",
                "docker.privileged-containers.registries": "local,centos, 123456789123.dkr.ecr.us-east-1.amazonaws.com"
            }
        }
    ]
  }
]

Using PySpark with ECR

This example uses the PySpark Dockerfile.  It will be tagged and upload to ECR. Once uploaded, you will run the PySpark job and reference the Docker image from ECR.

After you launch the cluster, use SSH to connect to a core node and run the following commands to build the local Docker image from the PySpark Dockerfile example.

First, create a directory and a Dockerfile for our example.

$ mkdir pyspark

$ vi pyspark/Dockerfile

Paste the contents of the PySpark Dockerfile and run the following commands to build a Docker image.

$ sudo docker build -t local/pyspark-example pyspark/

Create the emr-docker-examples ECR repository for our examples.

$ aws ecr create-repository --repository-name emr-docker-examples

Tag and upload the locally built image to ECR, replacing 123456789123.dkr.ecr.us-east-1.amazonaws.com with your ECR endpoint.

$ sudo docker tag local/pyspark-example 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example
$ sudo docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example

Use SSH to connect to the master node and prepare a Python script with the filename main.py. Paste the following content into the main.py file and save it.

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("docker-numpy").getOrCreate()
sc = spark.sparkContext

import numpy as np
a = np.arange(15).reshape(3, 5)
print(a)

To submit the job, reference the name of the Docker. Define the additional configuration parameters to make sure that the job execution uses Docker as the runtime. When using ECR, the YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG must reference the config.json file containing the credentials used to authenticate to ECR.

$ DOCKER_IMAGE_NAME=123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:pyspark-example
$ DOCKER_CLIENT_CONFIG=hdfs:///user/hadoop/config.json
$ spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--num-executors 2 \
main.py -v

When the job has completed, take note of the YARN application ID, and use the following command to obtain the output of the PySpark job.

$ yarn logs --applicationId application_id | grep -C2 '\[\['
LogLength:55
LogContents:
[[ 0  1  2  3  4]
 [ 5  6  7  8  9]
 [10 11 12 13 14]]

Using SparkR with ECR

This example uses the SparkR Dockerfile. It will be tagged and upload to ECR. Once uploaded, you will run the SparkR job and reference the Docker image from ECR.

After you launch the cluster, use SSH to connect to a core node and run the following commands to build the local Docker image from the SparkR Dockerfile example.

First, create a directory and the Dockerfile for this example.

$ mkdir sparkr

$ vi sparkr/Dockerfile

Paste the contents of the SparkR Dockerfile and run the following commands to build a Docker image.

$ sudo docker build -t local/sparkr-example sparkr/

Tag and upload the locally built image to ECR, replacing 123456789123.dkr.ecr.us-east-1.amazonaws.com with your ECR endpoint.

$ sudo docker tag local/sparkr-example 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example
$ sudo docker push 123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example

Use SSH to connect to the master node and prepare an R script with name sparkR.R. Paste the following contents into the sparkR.R file.

library(SparkR)
sparkR.session(appName = "R with Spark example", sparkConfig = list(spark.some.config.option = "some-value"))

sqlContext <- sparkRSQL.init(spark.sparkContext)
library(randomForest)
# check release notes of randomForest
rfNews()

sparkR.session.stop()

To submit the job, reference the name of the Docker. Define the additional configuration parameters to make sure that the job execution uses Docker as the runtime. When using ECR, the YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG must reference the config.json file containing the credentials used to authenticate to ECR.

$ DOCKER_IMAGE_NAME=123456789123.dkr.ecr.us-east-1.amazonaws.com/emr-docker-examples:sparkr-example
$ DOCKER_CLIENT_CONFIG=hdfs:///user/hadoop/config.json
$ spark-submit --master yarn \
--deploy-mode cluster \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.executorEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_TYPE=docker \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_IMAGE=$DOCKER_IMAGE_NAME \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_CLIENT_CONFIG=$DOCKER_CLIENT_CONFIG \
--conf spark.yarn.appMasterEnv.YARN_CONTAINER_RUNTIME_DOCKER_MOUNTS=/etc/passwd:/etc/passwd:ro \
sparkR.R

When the job has completed, note the YARN application ID, and use the following command to obtain the output of the SparkR job. This example includes testing to make sure that the randomForest library, version installed, and release notes are available.

$ yarn logs --applicationId application_id | grep -B4 -A10 "Type rfNews"
randomForest 4.6-14
Type rfNews() to see new features/changes/bug fixes.
Wishlist (formerly TODO):

* Implement the new scheme of handling classwt in classification.

* Use more compact storage of proximity matrix.

* Allow case weights by using the weights in sampling?

========================================================================
Changes in 4.6-14:

Using a Docker image from Docker Hub

To use Docker Hub, you must deploy your cluster to a public subnet, and configure it to use Docker Hub as a trusted registry. In this example, the cluster needs the following additional configuration to to make sure that the your-public-repo repository on Docker Hub is trusted. When using Docker Hub, please replace this repository name with your actual repository.

[
  {
    "Classification": "container-executor",
    "Configurations": [
        {
            "Classification": "docker",
            "Properties": {
                "docker.trusted.registries": "local,centos,your-public-repo ",
                "docker.privileged-containers.registries": "local,centos,your-public-repo"
            }
        }
    ]
  }
]

Beta limitations

EMR 6.0.0 (Beta) focuses on helping you get value from using Docker with Spark to simplify dependency management. You can also use EMR 6.0.0 (Beta) to get familiar with Amazon Linux 2, and Amazon Corretto JDK 8.

The EMR 6.0.0 (Beta) supports the following applications:

  • Spark 2.4.3
  • Livy 0.6.0
  • ZooKeeper 3.4.14
  • Hadoop 3.1.0

This beta release is supported in the following Regions:

  • US East (N. Virginia)
  • US West (Oregon)

The following EMR features are currently not available with this beta release:

  • Cluster integration with AWS Lake Formation
  • Native encryption of Amazon EBS volumes attached to an EMR cluster

Conclusion

In this post, you learned how to use an EMR 6.0.0 (Beta) cluster to run Spark jobs in Docker containers and integrate with both Docker Hub and ECR. You’ve seen examples of both PySpark and SparkR Dockerfiles.

The EMR team looks forward to hearing about how you’ve used this integration to simplify dependency management in your projects. If you have questions or suggestions, feel free to leave a comment.


About the Authors

Paul Codding is a senior product manager for EMR at Amazon Web Services.

 

 

 

 

Ajay Jadhav is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Rentao Wu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Stephen Wu is a software development engineer for EMR at Amazon Web Services.

 

 

 

 

Extract Oracle OLTP data in real time with GoldenGate and query from Amazon Athena

Post Syndicated from Sreekanth Krishnavajjala original https://aws.amazon.com/blogs/big-data/extract-oracle-oltp-data-in-real-time-with-goldengate-and-query-from-amazon-athena/

This post describes how you can improve performance and reduce costs by offloading reporting workloads from an online transaction processing (OLTP) database to Amazon Athena and Amazon S3. The architecture described allows you to implement a reporting system and have an understanding of the data that you receive by being able to query it on arrival. In this solution:

  • Oracle GoldenGate generates a new row on the target for every change on the source to create Slowly Changing Dimension Type 2 (SCD Type 2) data.
  • Athena allows you to run ad hoc queries on the SCD Type 2 data.

Principles of a modern reporting solution

Advanced database solutions use a set of principles to help them build cost-effective reporting solutions. Some of these principles are:

  • Separate the reporting activity from the OLTP. This approach provides resource isolation and enables databases to scale for their respective workloads.
  • Use query engines running on top of distributed file systems like Hadoop Distributed File System (HDFS) and cloud object stores, such as Amazon S3. The advent of query engines that can run on top of open-source HDFS and cloud object stores further reduces the cost of implementing dedicated reporting systems.

Furthermore, you can use these principles when building reporting solutions:

  • To reduce licensing costs of the commercial databases, move the reporting activity to an open-source database.
  • Use a log-based, real-time, change data capture (CDC), data-integration solution, which can replicate OLTP data from source systems, preferably in real-time mode, and provide a current view of the data. You can enable the data replication between the source and the target reporting systems using database CDC solutions. The transaction log-based CDC solutions capture database changes noninvasively from the source database and replicate them to the target datastore or file systems.

Prerequisites

If you use GoldenGate with Kafka and are considering cloud migration, you can benefit from this post. This post also assumes prior knowledge of GoldenGate and does not detail steps to install and configure GoldenGate. Knowledge of Java and Maven is also assumed. Ensure that a VPC with three subnets is available for manual deployment.

Understanding the architecture of this solution

The following workflow diagram (Figure 1) illustrates the solution that this post describes:

  1. Amazon RDS for Oracle acts as the source.
  2. A GoldenGate CDC solution produces data for Amazon Managed Streaming for Apache Kafka (Amazon MSK). GoldenGate streams the database CDC data to the consumer. Kafka topics with an MSK cluster receives the data from GoldenGate.
  3. The Apache Flink application running on Amazon EMR consumes the data and sinks it into an S3 bucket.
  4. Athena analyzes the data through queries. You can optionally run queries from Amazon Redshift Spectrum.

Data Pipeline

Figure 1

Amazon MSK is a fully managed service for Apache Kafka that makes it easy to provision  Kafka clusters with few clicks without the need to provision servers, storage and configuring Apache Zookeeper manually. Kafka is an open-source platform for building real-time streaming data pipelines and applications.

Amazon RDS for Oracle is a fully managed database that frees up your time to focus on application development. It manages time-consuming database administration tasks, including provisioning, backups, software patching, monitoring, and hardware scaling.

GoldenGate is a real-time, log-based, heterogeneous database CDC solution. GoldenGate supports data replication from any supported database to various target databases or big data platforms like Kafka. GoldenGate’s ability to write the transactional data captured from the source in different formats, including delimited text, JSON, and Avro, enables seamless integration with a variety of BI tools. Each row has additional metadata columns including database operation type (Insert/Update/Delete).

Flink is an open-source, stream-processing framework with a distributed streaming dataflow engine for stateful computations over unbounded and bounded data streams. EMR supports Flink, letting you create managed clusters from the AWS Management Console. Flink also supports exactly-once semantics with the checkpointing feature, which is vital to ensure data accuracy when processing database CDC data. You can also use Flink to transform the streaming data row by row or in batches using windowing capabilities.

S3 is an object storage service with high scalability, data availability, security, and performance. You can run big data analytics across your S3 objects with AWS query-in-place services like Athena.

Athena is a serverless query service that makes it easy to query and analyze data in S3. With Athena and S3 as a data source, you define the schema and start querying using standard SQL. There’s no need for complex ETL jobs to prepare your data for analysis, which makes it easy for anyone familiar with SQL skills to analyze large-scale datasets quickly.

The following diagram shows a more detailed view of the data pipeline:

  1. RDS for Oracle runs in a Single-AZ.
  2. GoldenGate runs on an Amazon EC2 instance.
  3. The MSK cluster spans across three Availability Zones.
  4. Kafka topic is set up in MSK.
  5. Flink runs on an EMR Cluster.
  6. Producer Security Group for Oracle DB and GoldenGate instance.
  7. Consumer Security Group for EMR with Flink.
  8. Gateway endpoint for S3 private access.
  9. NAT Gateway to download software components on GoldenGate instance.
  10. S3 bucket and Athena.

For simplicity, this setup uses a single VPC with multiple subnets to deploy resources.

Figure 2

Configuring single-click deployment using AWS CloudFormation

The AWS CloudFormation template included in this post automates the deployment of the end-to-end solution that this blog post describes. The template provisions all required resources including RDS for Oracle, MSK, EMR, S3 bucket, and also adds an EMR step with a JAR file to consume messages from Kafka topic on MSK. Here’s the list of steps to launch the template and test the solution:

  1. Launch the AWS CloudFormation template in the us-east-1
  2. After successful stack creation, obtain GoldenGate Hub Server public IP from the Outputs tab of cloudformation.
  3. Login to GoldenGate hub server using the IP address from step 2 as ec2-user and then switch to oracle user.sudo su – oracle
  4. Connect to the source RDS for Oracle database using the sqlplus client and provide password(source).[[email protected] ~]$ sqlplus [email protected]
  5. Generate database transactions using SQL statements available in oracle user’s home directory.
    SQL> @s
    
     SQL> @s1
    
     SQL> @s2

  6. Query STOCK_TRADES table from Amazon Athena console. It takes a few seconds after committing transactions on the source database for database changes to be available for Athena for querying.

Manually deploying components

The following steps describe the configurations required to stream Oracle-changed data to MSK and sink it to an S3 bucket using Flink running on EMR. You can then query the S3 bucket using Athena. If you deployed the solution using AWS CloudFormation as described in the previous step, skip to the Testing the solution section.

 

  1. Prepare an RDS source database for CDC using GoldenGate.The RDS source database version is Enterprise Edition 12.1.0.2.14. For instructions on configuring the RDS database, see Using Oracle GoldenGate with Amazon RDS. This post does not consider capturing data definition language (DDL).
  2. Configure an EC2 instance for the GoldenGate hub server.Configure the GoldenGate hub server using Oracle Linux server 7.6 (ami-b9c38ad3) image in the us-east-1 Region. The GoldenGate hub server runs the GoldenGate extract process that extracts changes in real time from the database transaction log files. The server also runs a replicat process that publishes database changes to MSK.The GoldenGate hub server requires the following software components:
  • Java JDK 1.8.0 (required for GoldenGate big data adapter).
  • GoldenGate for Oracle (12.3.0.1.4) and GoldenGate for big data adapter (12.3.0.1).
  • Kafka 1.1.1 binaries (required for GoldenGate big data adapter classpath).
  • An IAM role attached to the GoldenGate hub server to allow access to the MSK cluster for GoldenGate processes running on the hub server.Use the GoldenGate (12.3.0) documentation to install and configure the GoldenGate for Oracle database. The GoldenGate Integrated Extract parameter file is eora2msk.prm.
    EXTRACT eora2msk
    SETENV (NLSLANG=AL32UTF8)
    
    USERID [email protected], password ggadmin
    TRANLOGOPTIONS INTEGRATEDPARAMS (max_sga_size 256)
    EXTTRAIL /u01/app/oracle/product/ogg/dirdat/or
    LOGALLSUPCOLS
    
    TABLE SOURCE.STOCK_TRADES;

    The logallsupcols extract parameter ensures that a full database table row is generated for every DML operation on the source, including updates and deletes.

  1. Create a Kafka cluster using MSK and configure Kakfa topic.You can create the MSK cluster from the AWS Management Console, using the AWS CLI, or through an AWS CloudFormation template.
  • Use the list-clusters command to obtain a ClusterArn and a Zookeeper connection string after creating the cluster. You need this information to configure the GoldenGate big data adapter and Flink consumer. The following code illustrates the commands to run:
    $aws kafka list-clusters --region us-east-1
    {
        "ClusterInfoList": [
            {
                "EncryptionInfo": {
                    "EncryptionAtRest": {
                        "DataVolumeKMSKeyId": "arn:aws:kms:us-east-1:xxxxxxxxxxxx:key/717d53d8-9d08-4bbb-832e-de97fadcaf00"
                    }
                }, 
                "BrokerNodeGroupInfo": {
                    "BrokerAZDistribution": "DEFAULT", 
                    "ClientSubnets": [
                        "subnet-098210ac85a046999", 
                        "subnet-0c4b5ee5ff5ef70f2", 
                        "subnet-076c99d28d4ee87b4"
                    ], 
                    "StorageInfo": {
                        "EbsStorageInfo": {
                            "VolumeSize": 1000
                        }
                    }, 
                    "InstanceType": "kafka.m5.large"
                }, 
                "ClusterName": "mskcluster", 
                "CurrentBrokerSoftwareInfo": {
                    "KafkaVersion": "1.1.1"
                }, 
                "CreationTime": "2019-01-24T04:41:56.493Z", 
                "NumberOfBrokerNodes": 3, 
                "ZookeeperConnectString": "10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181", 
                "State": "ACTIVE", 
                "CurrentVersion": "K13V1IB3VIYZZH", 
                "ClusterArn": "arn:aws:kafka:us-east-1:xxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3", 
                "EnhancedMonitoring": "DEFAULT"
            }
        ]
    }

  • Obtain the IP addresses of the Kafka broker nodes by using the ClusterArn.
    $aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn arn:aws:kafka:us-east-1:xxxxxxxxxxxx:cluster/mskcluster/8920bb38-c227-4bef-9f6c-f5d6b01d2239-3
    {
        "BootstrapBrokerString": "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092"
    }

  • Create a Kafka topic. The solution in this post uses the same name as table name for Kafka topic.
    ./kafka-topics.sh --create --zookeeper 10.0.2.9:2181,10.0.0.4:2181,10.0.3.14:2181 --replication-factor 3 --partitions 1 --topic STOCK_TRADES

  1. Provision an EMR cluster with Flink.Create an EMR cluster 5.25 with Flink 1.8.0 (advanced option of the EMR cluster), and enable SSH access to the master node. Create and attach a role to the EMR master node so that Flink consumers can access the Kafka topic in the MSK cluster.
  2. Configure the Oracle GoldenGate big data adapter for Kafka on the GoldenGate hub server.Download and install the Oracle GoldenGate big data adapter (12.3.0.1.0) using the Oracle GoldenGate download link. For more information, see the Oracle GoldenGate 12c (12.3.0.1) installation documentation.The following is the GoldenGate producer property file for Kafka (custom_kafka_producer.properties):
    #Bootstrap broker string obtained from Step 3
    bootstrap.servers= 10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092
    #bootstrap.servers=localhost:9092
    acks=1
    reconnect.backoff.ms=1000
    value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
    # 100KB per partition
    batch.size=16384
    linger.ms=0

    The following is the GoldenGate properties file for Kafka (Kafka.props):

    gg.handlerlist = kafkahandler
    gg.handler.kafkahandler.type=kafka
    gg.handler.kafkahandler.KafkaProducerConfigFile=custom_kafka_producer.properties
    #The following resolves the topic name using the short table name
    #gg.handler.kafkahandler.topicName=SOURCE
    gg.handler.kafkahandler.topicMappingTemplate=${tableName}
    #The following selects the message key using the concatenated primary keys
    gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
    gg.handler.kafkahandler.format=json_row
    #gg.handler.kafkahandler.format=delimitedtext
    #gg.handler.kafkahandler.SchemaTopicName=mySchemaTopic
    #gg.handler.kafkahandler.SchemaTopicName=oratopic
    gg.handler.kafkahandler.BlockingSend =false
    gg.handler.kafkahandler.includeTokens=false
    gg.handler.kafkahandler.mode=op
    goldengate.userexit.writers=javawriter
    javawriter.stats.display=TRUE
    javawriter.stats.full=TRUE
    
    gg.log=log4j
    #gg.log.level=INFO
    gg.log.level=DEBUG
    gg.report.time=30sec
    gg.classpath=dirprm/:/home/oracle/kafka/kafka_2.11-1.1.1/libs/*
    
    javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar

    The following is the GoldenGate replicat parameter file (rkafka.prm):

    REPLICAT rkafka
    -- Trail file for this example is located in "AdapterExamples/trail" directory
    -- Command to add REPLICAT
    -- add replicat rkafka, exttrail AdapterExamples/trail/tr
    TARGETDB LIBFILE libggjava.so SET property=dirprm/kafka.props
    REPORTCOUNT EVERY 1 MINUTES, RATE
    GROUPTRANSOPS 10000
    MAP SOURCE.STOCK_TRADES, TARGET SOURCE.STOCK_TRADES;

  3. Create an S3 bucket and directory with a table name underneath for Flink to store (sink) Oracle CDC data.
  4. Configure a Flink consumer to read from the Kafka topic that writes the CDC data to an S3 bucket.For instructions on setting up a Flink project using the Maven archetype, see Flink Project Build Setup.The following code example is the pom.xml file, used with the Maven project. For more information, see Getting Started with Maven.
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-quickstart-java</artifactId>
      <version>1.8.0</version>
      <packaging>jar</packaging>
    
      <name>flink-quickstart-java</name>
      <url>http://www.example.com</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <slf4j.version>@[email protected]</slf4j.version>
        <log4j.version>@[email protected]</log4j.version>
        <java.version>1.8</java.version>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
      </properties>
    
    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-hadoop-compatibility_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
         <groupId>org.apache.flink</groupId>
         <artifactId>flink-connector-filesystem_2.11</artifactId>
         <version>1.8.0</version>
        </dependency>
    
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.8.0</version>
            <scope>compile</scope>
        </dependency>
         <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-s3-fs-presto</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
       <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-scala_2.11</artifactId>
          <version>1.8.0</version>
        </dependency>
    
        <dependency>
          <groupId>com.typesafe.akka</groupId>
          <artifactId>akka-actor_2.11</artifactId>
          <version>2.4.20</version>
        </dependency>
        <dependency>
           <groupId>com.typesafe.akka</groupId>
           <artifactId>akka-protobuf_2.11</artifactId>
           <version>2.4.20</version>
        </dependency>
    <build>
      <plugins>
         <plugin>
             <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.1</version>
                <executions>
                       <execution>
                          <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                             </goals>
                           <configuration>
                          <artifactSet>
                      <excludes>
    
                             <!-- Excludes here -->
                               </excludes>
    </artifactSet>
                    <filters>
                            <filter>
                                                                                     <artifact>org.apache.flink:*</artifact>
                            </filter>
                       </filters>
                 <transformers>
                   <!-- add Main-Class to manifest file -->
                                                                            <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                                                            <mainClass>flinkconsumer.flinkconsumer</mainClass>
                   </transformer>
                                                                             <transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
                                                                         <resource>reference.conf</resource>
                                                                    </transformer>
                                                            </transformers>
                    <relocations>
                          <relocation>
                                                                <pattern>org.codehaus.plexus.util</pattern>
                                                                  <shadedPattern>org.shaded.plexus.util</shadedPattern>
                        <excludes>
                                                                      <exclude>org.codehaus.plexus.util.xml.Xpp3Dom</exclude>
                                                                      <exclude>org.codehaus.plexus.util.xml.pull.*</exclude>
                                                                  </excludes>
                                                               </relocation>
                                                            </relocations>
                                                            <createDependencyReducedPom>false</createDependencyReducedPom>
                                                    </configuration>
                                            </execution>
                                    </executions>
                            </plugin>
    <!-- Add the main class as a manifest entry -->
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-jar-plugin</artifactId>
                                    <version>2.5</version>
                                    <configuration>
                                            <archive>
                                                    <manifestEntries>
                                                            <Main-Class>flinkconsumer.flinkconsumer</Main-Class>
                                                    </manifestEntries>
                                            </archive>
           </configuration>
                            </plugin>
    
                            <plugin>
                                    <groupId>org.apache.maven.plugins</groupId>
                                    <artifactId>maven-compiler-plugin</artifactId>
                                    <version>3.1</version>
                                    <configuration>
                                            <source>1.7</source>
                                            <target>1.7</target>
                                    </configuration>
                            </plugin>
                    </plugins>
    
    </build>
    <profiles>
                    <profile>
                            <id>build-jar</id>
                            <activation>
                                    <activeByDefault>false</activeByDefault>
                            </activation>
                    </profile>
            </profiles>
    
    
    </project>

    Compile the following Java program using mvn clean install and generate the JAR file:

    package flinkconsumer;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.typeutils.TypeExtractor;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.util.serialization.DeserializationSchema;
    import org.apache.flink.streaming.util.serialization.SerializationSchema;
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.slf4j.LoggerFactory;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import akka.actor.ActorSystem;
    import akka.stream.ActorMaterializer;
    import akka.stream.Materializer;
    import com.typesafe.config.Config;
    import org.apache.flink.streaming.connectors.fs.*;
    import org.apache.flink.streaming.api.datastream.*;
    import org.apache.flink.runtime.fs.hdfs.HadoopFileSystem;
    import java.util.stream.Collectors;
    import java.util.Arrays;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    import java.util.regex.Pattern;
    import java.io.*;
    import java.net.BindException;
    import java.util.*;
    import java.util.Map.*;
    import java.util.Arrays;
    
    public class flinkconsumer{
    
        public static void main(String[] args) throws Exception {
            // create Streaming execution environment
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setBufferTimeout(1000);
            env.enableCheckpointing(5000);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "10.0.3.6:9092,10.0.2.10:9092,10.0.0.5:9092");
            properties.setProperty("group.id", "flink");
            properties.setProperty("client.id", "demo1");
    
            DataStream<String> message = env.addSource(new FlinkKafkaConsumer<>("STOCK_TRADES", new SimpleStringSchema(),properties));
            env.enableCheckpointing(60_00);
            env.setStateBackend(new FsStateBackend("hdfs://ip-10-0-3-12.ec2.internal:8020/flink/checkpoints"));
    
            RollingSink<String> sink= new RollingSink<String>("s3://flink-stream-demo/STOCK_TRADES");
           // sink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HHmm"));
           // The bucket part file size in bytes.
               sink.setBatchSize(400);
             message.map(new MapFunction<String, String>() {
                private static final long serialVersionUID = -6867736771747690202L;
                @Override
                public String map(String value) throws Exception {
                    //return " Value: " + value;
                    return value;
                }
            }).addSink(sink).setParallelism(1);
            env.execute();
        }
    }

    Log in as a Hadoop user to an EMR master node, start Flink, and execute the JAR file:

    $ /usr/bin/flink run ./flink-quickstart-java-1.7.0.jar

  5. Create the stock_trades table from the Athena console. Each JSON document must be on a new line.
    CREATE EXTERNAL TABLE `stock_trades`(
      `trade_id` string COMMENT 'from deserializer', 
      `ticker_symbol` string COMMENT 'from deserializer', 
      `units` int COMMENT 'from deserializer', 
      `unit_price` float COMMENT 'from deserializer', 
      `trade_date` timestamp COMMENT 'from deserializer', 
      `op_type` string COMMENT 'from deserializer')
    ROW FORMAT SERDE 
      'org.openx.data.jsonserde.JsonSerDe' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.IgnoreKeyTextOutputFormat'
    LOCATION
      's3://flink-cdc-demo/STOCK_TRADES'
    TBLPROPERTIES (
      'has_encrypted_data'='false', 
      'transient_lastDdlTime'='1561051196')

    For more information, see Hive JSON SerDe.

Testing the solution

To test that the solution works, complete the following steps:

  1. Log in to the source RDS instance from the GoldenGate hub server and perform insert, update, and delete operations on the stock_trades table:
    $sqlplus [email protected]
    SQL> insert into stock_trades values(6,'NEW',29,75,sysdate);
    SQL> update stock_trades set units=999 where trade_id=6;
    SQL> insert into stock_trades values(7,'TEST',30,80,SYSDATE);
    SQL>insert into stock_trades values (8,'XYZC', 20, 1800,sysdate);
    SQL> commit;

  2. Monitor the GoldenGate capture from the source database using the following stats command:
    [[email protected] 12.3.0]$ pwd
    /u02/app/oracle/product/ogg/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate Command Interpreter for Oracle
    Version 12.3.0.1.4 OGGCORE_12.3.0.1.0_PLATFORMS_180415.0359_FBO
    Linux, x64, 64bit (optimized), Oracle 12c on Apr 16 2018 00:53:30
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats eora2msk

  3. Monitor the GoldenGate replicat to a Kafka topic with the following:
    [[email protected] 12.3.0]$ pwd
    /u03/app/oracle/product/ogg/bdata/12.3.0
    [[email protected] 12.3.0]$ ./ggsci
    
    Oracle GoldenGate for Big Data
    Version 12.3.2.1.1 (Build 005)
    
    Oracle GoldenGate Command Interpreter
    Version 12.3.0.1.2 OGGCORE_OGGADP.12.3.0.1.2_PLATFORMS_180712.2305
    Linux, x64, 64bit (optimized), Generic on Jul 13 2018 00:46:09
    Operating system character set identified as UTF-8.
    
    Copyright (C) 1995, 2018, Oracle and/or its affiliates. All rights reserved.
    
    
    
    GGSCI (ip-10-0-1-170.ec2.internal) 1> stats rkafka

  4. Query the stock_trades table using the Athena console.

Summary

This post illustrates how you can offload reporting activity to Athena with S3 to reduce reporting costs and improve OLTP performance on the source database. This post serves as a guide for setting up a solution in the staging environment.

Deploying this solution in a production environment may require additional considerations, for example, high availability of GoldenGate hub servers, different file encoding formats for optimal query performance, and security considerations. Additionally, you can achieve similar outcomes using technologies like AWS Database Migration Service instead of GoldenGate for database CDC and Kafka Connect for the S3 sink.

 


About the Authors

Sreekanth Krishnavajjala is a solutions architect at Amazon Web Services.

 

 

 

 

Vinod Kataria is a senior partner solutions architect at Amazon Web Services.