Tag Archives: AWS Glue

Create a serverless event-driven workflow to ingest and process Microsoft data with AWS Glue and Amazon EventBridge

Post Syndicated from Venkata Sistla original https://aws.amazon.com/blogs/big-data/create-a-serverless-event-driven-workflow-to-ingest-and-process-microsoft-data-with-aws-glue-and-amazon-eventbridge/

Microsoft SharePoint is a document management system for storing files, organizing documents, and sharing and editing documents in collaboration with others. Your organization may want to ingest SharePoint data into your data lake, combine the SharePoint data with other data that’s available in the data lake, and use it for reporting and analytics purposes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning, and application development. AWS Glue provides all the capabilities needed for data integration so that you can start analyzing your data and putting it to use in minutes instead of months.

Organizations often manage their data on SharePoint in the form of files and lists, and you can use this data for easier discovery, better auditing, and compliance. SharePoint as a data source is not a typical relational database and the data is mostly semi structured, which is why it’s often difficult to join the SharePoint data with other relational data sources. This post shows how to ingest and process SharePoint lists and files with AWS Glue and Amazon EventBridge, which enables you to join other data that is available in your data lake. We use SharePoint REST APIs with a standard open data protocol (OData) syntax. OData advocates a standard way of implementing REST APIs that allows for SQL-like querying capabilities. OData helps you focus on your business logic while building RESTful APIs without having to worry about the various approaches to define request and response headers, query options, and so on.

AWS Glue event-driven workflows

Unlike a traditional relational database, SharePoint data may or may not change frequently, and it’s difficult to predict the frequency at which your SharePoint server generates new data, which makes it difficult to plan and schedule data processing pipelines efficiently. Running data processing frequently can be expensive, whereas scheduling pipelines to run infrequently can deliver cold data. Similarly, triggering pipelines from an external process can increase complexity, cost, and job startup time.

AWS Glue supports event-driven workflows, a capability that lets developers start AWS Glue workflows based on events delivered by EventBridge. The main reason to choose EventBridge in this architecture is because it allows you to process events, update the target tables, and make information available to consume in near-real time. Because frequency of data change in SharePoint is unpredictable, using EventBridge to capture events as they arrive enables you to run the data processing pipeline only when new data is available.

To get started, you simply create a new AWS Glue trigger of type EVENT and place it as the first trigger in your workflow. You can optionally specify a batching condition. Without event batching, the AWS Glue workflow is triggered every time an EventBridge rule matches, which may result in multiple concurrent workflows running. AWS Glue protects you by setting default limits that restrict the number of concurrent runs of a workflow. You can increase the required limits by opening a support case. Event batching allows you to configure the number of events to buffer or the maximum elapsed time before firing the particular trigger. When the batching condition is met, a workflow run is started. For example, you can trigger your workflow when 100 files are uploaded in Amazon Simple Storage Service (Amazon S3) or 5 minutes after the first upload. We recommend configuring event batching to avoid too many concurrent workflows, and optimize resource usage and cost.

To illustrate this solution better, consider the following use case for a wine manufacturing and distribution company that operates across multiple countries. They currently host all their transactional system’s data on a data lake in Amazon S3. They also use SharePoint lists to capture feedback and comments on wine quality and composition from their suppliers and other stakeholders. The supply chain team wants to join their transactional data with the wine quality comments in SharePoint data to improve their wine quality and manage their production issues better. They want to capture those comments from the SharePoint server within an hour and publish that data to a wine quality dashboard in Amazon QuickSight. With an event-driven approach to ingest and process their SharePoint data, the supply chain team can consume the data in less than an hour.

Overview of solution

In this post, we walk through a solution to set up an AWS Glue job to ingest SharePoint lists and files into an S3 bucket and an AWS Glue workflow that listens to S3 PutObject data events captured by AWS CloudTrail. This workflow is configured with an event-based trigger to run when an AWS Glue ingest job adds new files into the S3 bucket. The following diagram illustrates the architecture.

To make it simple to deploy, we captured the entire solution in an AWS CloudFormation template that enables you to automatically ingest SharePoint data into Amazon S3. SharePoint uses ClientID and TenantID credentials for authentication and uses Oauth2 for authorization.

The template helps you perform the following steps:

  1. Create an AWS Glue Python shell job to make the REST API call to the SharePoint server and ingest files or lists into Amazon S3.
  2. Create an AWS Glue workflow with a starting trigger of EVENT type.
  3. Configure CloudTrail to log data events, such as PutObject API calls to CloudTrail.
  4. Create a rule in EventBridge to forward the PutObject API events to AWS Glue when they’re emitted by CloudTrail.
  5. Add an AWS Glue event-driven workflow as a target to the EventBridge rule. The workflow gets triggered when the SharePoint ingest AWS Glue job adds new files to the S3 bucket.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Configure SharePoint server authentication details

Before launching the CloudFormation stack, you need to set up your SharePoint server authentication details, namely, TenantID, Tenant, ClientID, ClientSecret, and the SharePoint URL in AWS Systems Manager Parameter Store of the account you’re deploying in. This makes sure that no authentication details are stored in the code and they’re fetched in real time from Parameter Store when the solution is running.

To create your AWS Systems Manager parameters, complete the following steps:

  1. On the Systems Manager console, under Application Management in the navigation pane, choose Parameter Store.
    systems manager
  2. Choose Create Parameter.
  3. For Name, enter the parameter name /DATALAKE/GlueIngest/SharePoint/tenant.
  4. Leave the type as string.
  5. Enter your SharePoint tenant detail into the value field.
  6. Choose Create parameter.
  7. Repeat these steps to create the following parameters:
    1. /DataLake/GlueIngest/SharePoint/tenant
    2. /DataLake/GlueIngest/SharePoint/tenant_id
    3. /DataLake/GlueIngest/SharePoint/client_id/list
    4. /DataLake/GlueIngest/SharePoint/client_secret/list
    5. /DataLake/GlueIngest/SharePoint/client_id/file
    6. /DataLake/GlueIngest/SharePoint/client_secret/file
    7. /DataLake/GlueIngest/SharePoint/url/list
    8. /DataLake/GlueIngest/SharePoint/url/file

Deploy the solution with AWS CloudFormation

For a quick start of this solution, you can deploy the provided CloudFormation stack. This creates all the required resources in your account.

The CloudFormation template generates the following resources:

  • S3 bucket – Stores data, CloudTrail logs, job scripts, and any temporary files generated during the AWS Glue extract, transform, and load (ETL) job run.
  • CloudTrail trail with S3 data events enabled – Enables EventBridge to receive PutObject API call data in a specific bucket.
  • AWS Glue Job – A Python shell job that fetches the data from the SharePoint server.
  • AWS Glue workflow – A data processing pipeline that is comprised of a crawler, jobs, and triggers. This workflow converts uploaded data files into Apache Parquet format.
  • AWS Glue database – The AWS Glue Data Catalog database that holds the tables created in this walkthrough.
  • AWS Glue table – The Data Catalog table representing the Parquet files being converted by the workflow.
  • AWS Lambda function – The AWS Lambda function is used as an AWS CloudFormation custom resource to copy job scripts from an AWS Glue-managed GitHub repository and an AWS Big Data blog S3 bucket to your S3 bucket.
  • IAM roles and policies – We use the following AWS Identity and Access Management (IAM) roles:
    • LambdaExecutionRole – Runs the Lambda function that has permission to upload the job scripts to the S3 bucket.
    • GlueServiceRole – Runs the AWS Glue job that has permission to download the script, read data from the source, and write data to the destination after conversion.
    • EventBridgeGlueExecutionRole – Has permissions to invoke the NotifyEvent API for an AWS Glue workflow.
    • IngestGlueRole – Runs the AWS Glue job that has permission to ingest data into the S3 bucket.

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For pS3BucketName, enter the unique name of your new S3 bucket.
  5. Leave pWorkflowName and pDatabaseName as the default.

cloud formation 1

  1. For pDatasetName, enter the SharePoint list name or file name you want to ingest.
  2. Choose Next.

cloud formation 2

  1. On the next page, choose Next.
  2. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create.

It takes a few minutes for the stack creation to complete; you can follow the progress on the Events tab.

You can run the ingest AWS Glue job either on a schedule or on demand. As the job successfully finishes and ingests data into the raw prefix of the S3 bucket, the AWS Glue workflow runs and transforms the ingested raw CSV files into Parquet files and loads them into the transformed prefix.

Review the EventBridge rule

The CloudFormation template created an EventBridge rule to forward S3 PutObject API events to AWS Glue. Let’s review the configuration of the EventBridge rule:

  1. On the EventBridge console, under Events, choose Rules.
  2. Choose the rule s3_file_upload_trigger_rule-<CloudFormation-stack-name>.
  3. Review the information in the Event pattern section.

event bridge

The event pattern shows that this rule is triggered when an S3 object is uploaded to s3://<bucket_name>/data/SharePoint/tablename_raw/. CloudTrail captures the PutObject API calls made and relays them as events to EventBridge.

  1. In the Targets section, you can verify that this EventBridge rule is configured with an AWS Glue workflow as a target.

event bridge target section

Run the ingest AWS Glue job and verify the AWS Glue workflow is triggered successfully

To test the workflow, we run the ingest-glue-job-SharePoint-file job using the following steps:

  1. On the AWS Glue console, select the ingest-glue-job-SharePoint-file job.

glue job

  1. On the Action menu, choose Run job.

glue job action menu

  1. Choose the History tab and wait until the job succeeds.

glue job history tab

You can now see the CSV files in the raw prefix of your S3 bucket.

csv file s3 location

Now the workflow should be triggered.

  1. On the AWS Glue console, validate that your workflow is in the RUNNING state.

glue workflow running status

  1. Choose the workflow to view the run details.
  2. On the History tab of the workflow, choose the current or most recent workflow run.
  3. Choose View run details.

glue workflow visual

When the workflow run status changes to Completed, let’s check the converted files in your S3 bucket.

  1. Switch to the Amazon S3 console, and navigate to your bucket.

You can see the Parquet files under s3://<bucket_name>/data/SharePoint/tablename_transformed/.

parquet file s3 location

Congratulations! Your workflow ran successfully based on S3 events triggered by uploading files to your bucket. You can verify everything works as expected by running a query against the generated table using Amazon Athena.

Sample wine dataset

Let’s analyze a sample red wine dataset. The following screenshot shows a SharePoint list that contains various readings that relate to the characteristics of the wine and an associated wine category. This is populated by various wine tasters from multiple countries.

redwine dataset

The following screenshot shows a supplier dataset from the data lake with wine categories ordered per supplier.

supplier dataset

We process the red wine dataset using this solution and use Athena to query the red wine data and supplier data where wine quality is greater than or equal to 7.

athena query and results

We can visualize the processed dataset using QuickSight.

Clean up

To avoid incurring unnecessary charges, you can use the AWS CloudFormation console to delete the stack that you deployed. This removes all the resources you created when deploying the solution.

Conclusion

Event-driven architectures provide access to near-real-time information and help you make business decisions on fresh data. In this post, we demonstrated how to ingest and process SharePoint data using AWS serverless services like AWS Glue and EventBridge. We saw how to configure a rule in EventBridge to forward events to AWS Glue. You can use this pattern for your analytical use cases, such as joining SharePoint data with other data in your lake to generate insights, or auditing SharePoint data and compliance requirements.


About the Author

Venkata Sistla is a Big Data & Analytics Consultant on the AWS Professional Services team. He specializes in building data processing capabilities and helping customers remove constraints that prevent them from leveraging their data to develop business insights.

Introducing Amazon S3 shuffle in AWS Glue

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/introducing-amazon-s3-shuffle-in-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, which is an open-source, distributed processing system for your data integration tasks and big data workloads. Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple partitions so that you can execute different transformations in parallel.

Shuffling is an important step in a Spark job whenever data is rearranged between partitions. The groupByKey(), reduceByKey(), join(), and distinct() are some examples of wide transformations that can cause a shuffle. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there is not enough disk space left on the executor and there is no recovery.

This post introduces a new Spark shuffle manager available in AWS Glue that disaggregates Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle and spill files. Using Amazon S3 for Spark shuffle storage lets you run data-intensive workloads much more reliably.

Understanding the shuffle operation in AWS Glue

Spark creates physical plans for running your workflow, called Directed Acyclic Graphs (DAGs). The DAG represents a series of transformations on your dataset, each resulting in a new immutable RDD. All of the transformations in Spark are lazy, in that they are not computed until an action is called to generate results. There are two types of transformations:

  • Narrow transformation – Such as map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – Such as join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions.

In Spark, a shuffle occurs whenever data is rearranged between partitions. This is required because the wide transformation needs information from other partitions in order to complete its processing. Spark gathers the required data from each partition and combines it into a new partition. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. With AWS Glue, workers write shuffle data on local disk volumes attached to the AWS Glue workers.

In addition to shuffle writes, Spark uses local disk to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. Shuffle spill (memory) is the size of the de-serialized form of the data in the memory at the time when the worker spills it. Whereas shuffle spill (disk) is the size of the serialized form of the data on disk after the worker has spilled.

Challenges

Spark uses local disk for storing intermediate shuffle and shuffle spills. This introduces the following key challenges:

  • Hitting local storage limits – If you have a Spark job that computes transformations over a large amount of data, and results in either too much spill or shuffle or both, then you might get a failed job with  java.io.IOException: No space left on device exception if the underlying storage has filled up.
  • Co-location of storage with executors – If an executor is lost, then shuffle files are lost as well. This leads to several task and stage retries, as Spark tries to recompute stages in order to recover lost shuffle data. Spark natively provides an external shuffle service that lets it store shuffle data independent to the life of executors. But the shuffle service itself becomes a point of failure and must always be up in order to serve shuffle data. Additionally, shuffles are still stored on local disk, which might run out of space for a large job.

To illustrate one of the preceding scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join, group by, and union all. Let’s run the following query with 10 G1.x AWS Glue DPU (data processing unit). For the G.1X worker type, each worker maps to 1 DPU and 1 executor. 10 G1.x workers account for a total of 640GB of disk space. See the following sql query:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the AWS Glue job run details from the Apache Spark web UI:

The job runs for about 1 hour and 25 minutes, then we start observing task failures. Spark ends up stopping the stage and canceling the job when the task retries also fail.

The following screenshots show the aggregated metrics for the failed stage, as well as how much data is spilled to disk by individual executors:

As seen in the Shuffle Write metric from the above Spark UI screenshot, all 10 workers shuffle over 50 GB of data. Further writes aren’t allowed, and tasks start failing with a “No space left on device” error.

The remaining storage is occupied by data that is spilled to disk, as seen in the Shuffle Spill (Disk) metric from the above Spark UI screenshot. This failed job is a classic example of a data-intensive transformation where Spark is both shuffling and spilling to disk when executor memory is filled.

Solution overview

We have various methods for overcoming the disk space error:

  • Scale out – Increase the number of workers. This incurs an increase in cost. However, scaling out might not always work, especially if your data is heavily skewed on a few keys. Fixing skewness will require considerable modifications to your Spark application logic.
  • Increase shuffle partitions – Increasing the shuffle partitions can sometimes help overcome space errors. However, this might not always work, and therefore is unreliable.
  • Disaggregate compute and storage – This approach presents several of the advantages of not only scaling storage for large shuffles, but also adding reliability in the event of node failures because shuffle data is independently stored. Following are few implementations of this disaggregated approach:
    • Dedicated intermediate storage cluster – In this approach, you use an additional fleet of shuffle services to serve intermediate shuffle. It has several advantages, such as merging shuffle files and sequential I/O, but it introduces an overhead of fleet maintenance from both operations, as well as a cost standpoint. For examples of this approach, see Cosco: An Efficient Facebook-Scale Shuffle Service and Zeus: Uber’s Highly Scalable and Distributed Shuffle as a Service.
    • Serverless storage – AWS Glue implements a different approach in which you utilize Amazon S3, a cost-effective managed and serverless storage, to store intermediate shuffle data. This design does not depend upon a dedicated daemon, such as shuffle service, to preserve shuffle files. This lets you elastically scale your Spark job without the overhead of running, operating, and maintaining additional storage or compute nodes.

With AWS Glue 2.0, you can now use Amazon S3 to store Spark shuffle and spill data. Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably.

The following diagram illustrates how Spark map tasks write the shuffle and spill files to the given Amazon S3 shuffle bucket. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle bucket.

Use Amazon S3 to store shuffle and spill data

The following job parameters enable and tune Spark to use S3 buckets for storing shuffle and spill data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key  Value  Explanation
–write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
–write-shuffle-spills-to-s3 TRUE This is an optional flag that lets you offload spill files to S3 buckets, which provides additional resiliency to your Spark job. This is only required for large workloads that spill a lot of data to disk. This flag is disabled by default.
–conf spark.shuffle.glue.s3ShuffleBucket=S3://<shuffle-bucket> This is also optional, and it specifies the S3 bucket where we write the shuffle files. By default, we use —TempDir/shuffle-data.

You can also use the AWS Glue Studio console to enable Amazon S3 based shuffle or spill. You can choose the preceding properties from pre-populated options in the Job parameters section.

Results

Let’s run the same q80.sql query with Amazon S3 shuffle enabled. We can view the shuffle files stored in the S3 bucket in the following format:

shuffle_<jobid>_<mapperid>_<reducerid>.data/index

Two kinds of files are created:

  • Data – Stores the shuffle output of the current task
  • Index – Stores the classification information of the data in the data file by storing partition offsets

The following screenshots shows example shuffle directories and shuffle files:

The following screenshot shows the aggregated metrics from the Spark UI:

The following are a few key highlights:

  • q80.sql, which had failed earlier after 1 hour and 25 minutes, and was able to complete only 13 out of 18 stages, finished successfully in about 2 hours and 53 minutes, completing all 18 stages.
  • We were able to shuffle 479.7 GB of data without worrying about storage limits.
  • Additional workers aren’t required to scale storage, which provides substantial cost savings.

Considerations and best practices

Keep in mind the following best practices when considering this solution:

  • This feature is recommended when you want to ensure the reliable execution of your data intensive workloads that create a large amount of shuffle or spill data. Writing and reading shuffle files from Amazon S3 is marginally slower when compared to local disk for our experiments with TPC-DS queries. S3 shuffle performance would be impacted by the number and size of shuffle files. For example, S3 could be slower for reads as compared to local storage if you have a large number of small shuffle files or partitions in your Spark application.
  • You can use this feature if your job frequently suffers from No space left on device issues.
  • You can use this feature if your job frequently suffers fetch failure issues (org.apache.spark.shuffle.MetadataFetchFailedException).
  • You can use this feature if your data is skewed.
  • We recommend setting the S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.glue.s3ShuffleBucket) in order to clean up old shuffle data.
  • At the time of writing this blog, this feature is currently available on AWS Glue 2.0 and Spark 2.4.3.

Conclusion

This post discussed how we can independently scale storage in AWS Glue without adding additional workers. With this feature, you can expect jobs that are processing terabytes of data to run much more reliably. Happy shuffling!


About the Authors

Anubhav Awasthi is a Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything to do with the data.

Mohit Saxena is a Software Engineering Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.

Now Available: Updated guidance on the Data Analytics Lens for AWS Well-Architected Framework

Post Syndicated from Wallace Printz original https://aws.amazon.com/blogs/big-data/now-available-updated-guidance-on-the-data-analytics-lens-for-aws-well-architected-framework/

Nearly all businesses today require some form of data analytics processing, from auditing user access to generating sales reports. For all your analytics needs, the Data Analytics Lens for AWS Well-Architected Framework provides prescriptive guidance to help you assess your workloads and identify best practices aligned to the AWS Well-Architected Pillars: Operational Excellence, Security, Reliability, Performance Efficiency, and Cost Optimization. Today, we’re pleased to announce a completely revised and updated version of the Data Analytics Lens whitepaper.

Self-assess with Well-Architected design principles

The updated version of the Data Analytics Lens whitepaper has been revised to provide guidance to CxOs as well as all data personas. Within each of the five Well-Architected Pillars, we provide top-level design principles for CxOs to quickly identify areas for teams and fundamental rules that analytics workloads designers should follow. Each design principle is followed by a series of questions and best practices that architects and system designers can use to perform self-assessments. Additionally, the Data Analytics Lens includes suggestions that prescriptively explain steps to implement best practices useful for implementation teams.

For example, the Security Pillar design principle “Control data access” works with the best practice to build user identity solutions that uniquely identify people and systems. The associated suggestion for this best practice is to centralize workforce identities, which details how to use this principle and includes links to more documentation on the suggestion.

“Building Data Analytics platform or workloads is one of the complex architecture patterns. It involves multi-layered approach such as Data Ingestion, Data Landing, Transformation Layer, Analytical/Insight and Reporting. Choices of technology and service for each of these layers are wide. The AWS Well-Architected Analytics Lens helps us to design and validate with great confidence against each of the pillars. Now Cognizant Architects can perform assessments using the Data Analytics Lens to validate and help build secure, scalable and innovative data solutions for customers.”

– Supriyo Chakraborty, Principal Architect & Head of Data Engineering Guild, Cognizant Germany
– Somasundaram Janavikulam, Cloud Enterprise Architect & Well Architected Partner Program Lead, Cognizant

In addition to performing your own assessment, AWS can provide a guided experience through reviewing your workload with a Well-Architected Framework Review engagement. For customers building data analytics workloads with AWS Professional Services, our teams of Data Architects can perform assessments using the Data Analytics Lens during the project engagements. This provides you with an objective assessment of your workloads and guidance on future improvements. The integration is available now for customers of the AWS Data Lake launch offering, with additional Data Analytics offerings coming in 2022. Reach out to your AWS Account Team if you’d like to know more about these guided Reviews.

Updated architectural patterns and scenarios

In this version of the Data Analytics Lens, we have also revised the discussion of data analytics patterns and scenarios to keep up with the industry and modern data analytics practices. Each scenario includes sections on characteristics that help you plan when developing systems for that scenario, a reference architecture to visualize and explain how the components work together, and configuration notes to help you properly configure your solution.

This version covers the following topics:

  • Building a modern data architecture (formerly Lake House Architecture)
  • Organize around data domains by delivering data as a product using a data mesh
  • Efficiently and securely provide batch data processing
  • Use streaming ingest and stream processing for real-time workloads
  • Build operational analytics systems to improve business processes and performance
  • Provide data visualization securely and cost-effectively at scale

Changed from the first release, the machine learning and tenant analytics scenarios have been migrated to a separate Machine Learning Lens whitepaper and SaaS Lens whitepaper.

Conclusion

We expect this updated version will provide better guidance to validate your existing architectures, as well as provide recommendations for any gaps that identified.

For more information about building your own Well-Architected systems using the Data Analytics Lens, see the Data Analytics Lens whitepaper.

Special thanks to everyone across the AWS Solution Architecture and Data Analytics communities who contributed. These contributions encompassed diverse perspectives, expertise, and experiences in developing the new AWS Well-Architected Data Analytics Lens.


About the Authors

Wallace Printz is a Senior Solutions Architect based in Austin, Texas. He helps customers across Texas transform their businesses in the cloud. He has a background in semiconductors, R&D, and machine learning.

Indira Balakrishnan is a Senior Solutions Architect in the AWS Analytics Specialist SA Team. She is passionate about helping customers build cloud-based analytics solutions to solve their business problems using data-driven decisions. Outside of work, she volunteers at her kids’ activities and spends time with her family.

How Parametric Built Audit Surveillance using AWS Data Lake Architecture

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/how-parametric-built-audit-surveillance-using-aws-data-lake-architecture/

Parametric Portfolio Associates (Parametric), a wholly owned subsidiary of Morgan Stanley, is a registered investment adviser. Parametric provides investment advisory services to individual and institutional investors around the world. Parametric manages over 100,000 client portfolios with assets under management exceeding $400B (as of 9/30/21).

As a registered investment adviser, Parametric is subject to numerous regulatory requirements. The Parametric Compliance team conducts regular reviews on the firm’s portfolio management activities. To accomplish this, the organization needs both active and archived audit data to be readily available.

Parametric’s on-premises data lake solution was based on an MS-SQL server. They used an Apache Hadoop platform for their data storage, data management, and analytics. Significant gaps existed with the on-premises solution, which complicated audit processes. They were spending a large amount of effort on system maintenance, operational management, and software version upgrades. This required expensive consulting services and challenges with keeping the maintenance windows updated. This limited their agility, and also impacted their ability to derive more insights and value from their data. In an environment of rapid growth, adoption of more sophisticated analytics tools and processes has been slower to evolve.

In this blog post, we will show how Parametric implemented their Audit Surveillance Data Lake on AWS with purpose-built fully managed analytics services. With this solution, Parametric was able to respond to various audit requests within hours rather than days or weeks. This resulted in a system with a cost savings of 5x, with no data growth. Additionally, this new system can seamlessly support a 10x data growth.

Audit surveillance platform

The Parametric data management office (DMO) was previously running their data workloads using an on-premises data lake, which ran on the Hortonworks data platform of Apache Hadoop. This platform wasn’t up to date, and Parametric’s hardware was reaching end-of-life. Parametric was faced with a decision to either reinvest in their on-premises infrastructure or modernize their infrastructure using a modern data analytics platform on AWS. After doing a detailed cost/benefit analysis, the DMO calculated a 5x cost savings by using AWS. They decided to move forward and modernize with AWS due to these cost benefits, in addition to elasticity and security features.

The PPA compliance team asked the DMO to provide an enterprise data service to consume data from a data lake. This data was destined for downstream applications and ad-hoc data querying capabilities. It was accessed via standard JDBC tools and user-friendly business intelligence dashboards. The goal was to ensure that seven years of audit data would be readily available.

The DMO team worked with AWS to conceptualize an audit surveillance data platform architecture and help accelerate the implementation. They attended a series of AWS Immersion Days focusing on AWS fundamentals, Data Lakes, Devops, Amazon EMR, and serverless architectures. They later were involved in a four-day AWS Data Lab with AWS SMEs to create a data lake. The first use case in this Lab was creating the Audit Surveillance system on AWS.

Audit surveillance architecture on AWS

The following diagram shows the Audit Surveillance data lake architecture on AWS by using AWS purpose-built analytics services.

Figure 1. Audit Surveillance data lake architecture diagram

Figure 1. Audit Surveillance data lake architecture diagram

Architecture flow

  1. User personas: As first step, the DMO team identified three user personas for the Audit Surveillance system on AWS.
    • Data service compliance users who would like to consume audit surveillance data from the data lake into their respective applications through an enterprise data service.
    • Business users who would like to create business intelligence dashboards using a BI tool to audit data for compliance needs.
    • Complaince IT users who would like to perform ad-hoc queries on the data lake to perform analytics using an interactive query tool.
  2. Data ingestion: Data is ingested into Amazon Simple Storage Service (S3) from different on-premises data sources by using AWS Lake Formation blueprints. AWS Lake Formation provides workflows that define the data source and schedule to import data into the data lake. It is a container for AWS Glue crawlers, jobs, and triggers that are used to orchestrate the process to load and update the data lake.
  3. Data storage: Parametric used Amazon S3 as a data storage to build an Audit Surveillance data lake, as it has unmatched 11 nines of durability and 99.99% availability. The existing Hadoop storage was replaced with Amazon S3. The DMO team created a drop zone (raw), an analytics zone (transformed), and curated (enriched) storage layers for their data lake on AWS.
  4. Data cataloging: AWS Glue Data Catalog was the central catalog used to store and manage metadata for all datasets hosted in the Audit Surveillance data lake. The existing Hadoop metadata store was replaced with AWS Glue Data Catalog. AWS services such as AWS Glue, Amazon EMR, and Amazon Athena, natively integrate with AWS Glue Data Catalog.
  5. Data processing: Amazon EMR and AWS Glue process the raw data and places it into analytics zones (transformed) and curated zones (enriched) S3 buckets. Amazon EMR was used for big data processing and AWS Glue for standard ETL processes. AWS Lambda and AWS Step Functions were used to initiate monitoring and ETL processes.
  6. Data consumption: After Audit Surveillance data was transformed and enriched, the data was consumed by various personas within the firm as follows:
    • AWS Lambda and Amazon API Gateway were used to support consumption for data service compliance users.
    • Amazon QuickSight was used to create business intelligence dashboards for compliance business users.
    • Amazon Athena was used to query transformed and enriched data for compliance IT users.
  7. Security: AWS Key Management Service (KMS) customer managed keys were used for encryption at rest, and TLS for encryption at transition. Access to the encryption keys is controlled using AWS Identity and Access Management (IAM) and is monitored through detailed audit trails in AWS CloudTrail. Amazon CloudWatch was used for monitoring, and thresholds were created to determine when to send alerts.
  8. Governance: AWS IAM roles were attached to compliance users that permitted the administrator to grant access. This was only given to approved users or programs that went through authentication and authorization through AWS SSO. Access is logged and permissions can be granted or denied by the administrator. AWS Lake Formation is used for fine-grained access controls to grant/revoke permissions at the database, table, or column-level access.

Conclusion

The Parametric DMO team successfully replaced their on-premises Audit Surveillance Data Lake. They now have a modern, flexible, highly available, and scalable data platform on AWS, with purpose-built analytics services.

This change resulted in a 5x cost savings, and provides for a 10x data growth. There are now fast responses to internal and external audit requests (hours rather than days or weeks). This migration has given the company access to a wider breadth of AWS analytics services, which offers greater flexibility and options.

Maintaining the on-premises data lake would have required significant investment in both hardware upgrade costs and annual licensing and upgrade vendor consulting fees. Parametric’s decision to migrate their on-premises data lake has yielded proven cost benefits. And it has introduced new functions, service, and capabilities that were previously unavailable to Parametric DMO.

You may also achieve similar efficiencies and increase scalability by migrating on-premises data platforms into AWS. Read more and get started on building Data Lakes on AWS.

Accelerate large-scale data migration validation using PyDeequ

Post Syndicated from Mahendar Gajula original https://aws.amazon.com/blogs/big-data/accelerate-large-scale-data-migration-validation-using-pydeequ/

Many enterprises are migrating their on-premises data stores to the AWS Cloud. During data migration, a key requirement is to validate all the data that has been moved from on premises to the cloud. This data validation is a critical step and if not done correctly, may result in the failure of the entire project. However, developing custom solutions to determine migration accuracy by comparing the data between the source and target can often be time-consuming.

In this post, we walk through a step-by-step process to validate large datasets after migration using PyDeequ. PyDeequ is an open-source Python wrapper over Deequ (an open-source tool developed and used at Amazon). Deequ is written in Scala, whereas PyDeequ allows you to use its data quality and testing capabilities from Python and PySpark.

Prerequisites

Before getting started, make sure you have the following prerequisites:

Solution overview

This solution uses the following services:

  • Amazon RDS for My SQL as the database engine for the source database.
  • Amazon Simple Storage Service (Amazon S3) or Hadoop Distributed File System (HDFS) as the target.
  • Amazon EMR to run the PySpark script. We use PyDeequ to validate data between MySQL and the corresponding Parquet files present in the target.
  • AWS Glue to catalog the technical table, which stores the result of the PyDeequ job.
  • Amazon Athena to query the output table to verify the results.

We use profilers, which is one of the metrics computation components of PyDeequ. We use this to analyze each column in the given dataset to calculate statistics like completeness, approximate distinct values, and data types.

The following diagram illustrates the solution architecture.

In this example, you have four tables in your on-premises database that you want to migrate: tbl_books, tbl_sales, tbl_venue, and tbl_category.

Deploy the solution

To make it easy for you to get started, we created an AWS CloudFormation template that automatically configures and deploys the solution for you.

The CloudFormation stack performs the following actions:

  • Launches and configures Amazon RDS for MySQL as a source database
  • Launches Secrets Manager for storing the credentials for accessing the source database
  • Launches an EMR cluster, creates and loads the database and tables on the source database, and imports the open-source library for PyDeequ at the EMR primary node
  • Runs the Spark ingestion process from Amazon EMR, connecting to the source database, and extracts data to Amazon S3 in Parquet format

To deploy the solution, complete the following steps:

  1. Choose Launch Stack to launch the CloudFormation template.

The template is launched in the US East (N. Virginia) Region by default.

  1. On the Select Template page, keep the default URL for the CloudFormation template, then choose Next.
  2. On the Specify Details page, provide values for the parameters that require input (see the following screenshot).
  3. Choose Next.
  4. Choose Next again.
  5. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Create stack.

You can view the stack outputs on the AWS Management Console or by using the following AWS Command Line Interface (AWS CLI) command:

aws cloudformation describe-stacks --stack-name <stack-name> --region us-east-1 --query Stacks[0].Outputs

It takes approximately 20–30 minutes for the deployment to complete. When the stack is complete, you should see the resources in the following table launched and available in your account.

Resource Name Functionality
DQBlogBucket The S3 bucket that stores the migration accuracy results for the AWS Glue Data Catalog table
EMRCluster The EMR cluster to run the PyDeequ validation process
SecretRDSInstanceAttachment Secrets Manager for securely accessing the source database
SourceDBRds The source database (Amazon RDS)

When the EMR cluster is launched, it runs the following steps as part of the post-cluster launch:

  • DQsetupStep – Installs the Deequ JAR file and MySQL connector. This step also installs Boto3 and the PyDeequ library. It also downloads sample data files to use in the next step.
  • SparkDBLoad – Runs the initial data load to the MySQL database or table. This step creates the test environment that we use for data validation purposes. When this step is complete, we have four tables with data on MySQL and respective data files in Parquet format on HDFS in Amazon EMR.

When the Amazon EMR step SparkDBLoad is complete, we verify the data records in the source tables. You can connect to the source database using your preferred SQL editor. For more details, see Connecting to a DB instance running the MySQL database engine.

The following screenshot is a preview of sample data from the source table MyDatabase.tbl_books.

Validate data with PyDeequ

Now the test environment is ready and we can perform data validation using PyDeequ.

  1. Use Secure Shell (SSH) to connect to the primary node.
  2. Run the following Spark command, which performs data validation and persists the results to an AWS Glue Data Catalog table (db_deequ.db_migration_validation_result):
spark-submit --jars deequ-1.0.5.jar pydeequ_validation.py 

The script and JAR file are already available on your primary node if you used the CloudFormation template. The PySpark script computes PyDeequ metrics on the source MySQL table data and target Parquet files in Amazon S3. The metrics currently calculated as part of this example are as follows:

  • Completeness to measure fraction of not null values in a column
  • Approximate number of distinct values
  • Data type of column

If required, we can compute more metrics for each column. To see the complete list of supported metrics, see the PyDeequ package on GitHub.

The output metrics from the source and target are then compared using a PySpark DataFrame.

When that step is complete, the PySpark script creates the AWS Glue table db_deequ.db_migration_validation_result in your account, and you can query this table from Athena to verify the migration accuracy.

Verify data validation results with Athena

You can use Athena to check the overall data validation summary of all the tables. The following query shows you the aggregated data output. It lists all the tables you validated using PyDeequ and how many columns match between the source and target.

select table_name,
max(src_count) as "source rows",
max(tgt_count) as "target rows",
count(*) as "total columns",
sum(case when status='Match' then 1 else 0 end) as "matching columns",
sum(case when status<>'Match' then 1 else 0 end) as "non-matching columns"
from  "db_deequ"."db_migration_validation_result"
group by table_name;

The following screenshot shows our results.

Because all your columns match, you can have high confidence that the data has been exported correctly.

You can also check the data validation report for any table. The following query gives detailed information about any specific table metrics captured as part of PyDeequ validation:

select col_name,src_datatype,tgt_datatype,src_completeness,tgt_completeness,src_approx_distinct_values,tgt_approx_distinct_values,status 
from "db_deequ"."db_migration_validation_result"
where table_name='tbl_sales'

The following screenshot shows the query results. The last column status is the validation result for the columns in the table.

Clean up

To avoid incurring additional charges, complete the following steps to clean up your resources when you’re done with the solution:

  1. Delete the AWS Glue database and table db_deequ.db_migration_validation_result.
  2. Delete the prefixes and objects you created from the bucket dqblogbucket-${AWS::AccountId}.
  3. Delete the CloudFormation stack, which removes your additional resources.

Customize the solution

The solution consists of two parts:

  • Data extraction from the source database
  • Data validation using PyDeequ

In this section, we discuss ways to customize the solution based on your needs.

Data extraction from the source database

Depending on your data volume, there are multiple ways of extracting data from on-premises database sources to AWS. One recommended service is AWS Data Migration Service (AWS DMS). You can also use AWS Glue, Spark on Amazon EMR, and other services.

In this post, we use PySpark to connect to the source database using a JDBC connection and extract the data into HDFS using an EMR cluster.

The primary reason is that we’re already using Amazon EMR for PyDeequ, and we can use the same EMR cluster for data extraction.

In the CloudFormation template, the Amazon EMR step SparkDBLoad runs the PySpark script blogstep3.py. This PySpark script uses Secrets Manager and a Spark JDBC connection to extract data from the source to the target.

Data validation using PyDeequ

In this post, we use ColumnProfilerRunner from the pydeequ.profiles package for metrics computation. The source data is from the database using a JDBC connection, and the target data is from data files in HDFS and Amazon S3.

To create a DataFrame with metrics information for the source data, use the following code:

df_readsrc = spark.read.format('jdbc').option('url',sqlurl).option('dbtable',tbl_name).option('user',user).option('password',pwd).load()

result_rds = ColumnProfilerRunner(spark).onData(df_readsrc).run()
a=[]
for col, profile in result_rds.profiles.items():
    b=[]
    b.append(""+col +","+ str(profile.completeness)+","+ str(profile.approximateNumDistinctValues)+","+ str(profile.dataType)+"")
    a.append(b[0])

rdd1 = spark.sparkContext.parallelize(a)
row_rdd = rdd1.map(lambda x: Row(x))
df=spark.createDataFrame(row_rdd,['column'])

finalDataset = df.select(split(df.column,",")).rdd.flatMap(lambda x: x).toDF(schema=['column','completeness','approx_distinct_values','inferred_datatype'])

Similarly, the metrics is computed for the target (the data file).

You can create a temporary view from the DataFrame to use in the next step for metrics comparison.

After we have both the source (vw_Source) and target (vw_Target) available, we use the following query in Spark to generate the output result:

df_result = spark.sql("select t1.table_name as table_name,t1.column as col_name,t1.inferred_datatype as src_datatype,t2.inferred_datatype as tgt_datatype,t1.completeness as src_completeness,t2.completeness as tgt_completeness,t1.approx_distinct_values as src_approx_distinct_values,t2.approx_distinct_values as tgt_approx_distinct_values,t1.count as src_count,t2.count as tgt_count,case when t1.inferred_datatype=t2.inferred_datatype and t1.completeness=t2.completeness and t1.approx_distinct_values=t2.approx_distinct_values and t1.count=t2.count then 'Match' else 'No Match' end as status from vw_Source t1 left outer join vw_Target t2 on t1.column = t2.column")

The generated result is stored in the db_deequ.db_migration_validation_result table in the Data Catalog.

If you used the CloudFormation template, the entire PyDeequ code used in this post is available at the path /home/hadoop/pydeequ_validation.py in the EMR cluster.

You can modify the script to include or exclude tables as per your requirements.

Conclusion

This post showed you how you can use PyDeequ to accelerate the post-migration data validation process. PyDeequ helps you calculate metrics at the column level. You can also use more PyDeequ components like constraint verification to build a custom data validation framework.

For more use cases on Deequ, check out the following:


About the Authors

Mahendar Gajula is a Sr. Data Architect at AWS. He works with AWS customers in their journey to the cloud with a focus on Big data, Data Lakes, Data warehouse and AI/ML projects. In his spare time, he enjoys playing tennis and spending time with his family.

Nitin Srivastava is a Data & Analytics consultant at Amazon Web Services. He has more than a decade of datawarehouse experience along with designing and implementing large scale Big Data and Analytics solutions. He works with customers to deliver the next generation big data analytics platform using AWS technologies.

Stream data from relational databases to Amazon Redshift with upserts using AWS Glue streaming jobs

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/stream-data-from-relational-databases-to-amazon-redshift-with-upserts-using-aws-glue-streaming-jobs/

Traditionally, read replicas of relational databases are often used as a data source for non-online transactions of web applications such as reporting, business analysis, ad hoc queries, operational excellence, and customer services. Due to the exponential growth of data volume, it became common practice to replace such read replicas with data warehouses or data lakes to have better scalability and performance. In most real-world use cases, it’s important to replicate the data from a source relational database to the target in real time. Change data capture (CDC) is one of the most common design patterns to capture the changes made in the source database and relay them to other data stores.

AWS offers a broad selection of purpose-built databases for your needs. For analytic workloads such as reporting, business analysis, and ad hoc queries, Amazon Redshift is powerful option. With Amazon Redshift, you can query and combine exabytes of structured and semi-structured data across your data warehouse, operational database, and data lake using standard SQL.

To achieve CDC from Amazon Relational Database Service (Amazon RDS) or other relational databases to Amazon Redshift, the simplest solution is to create an AWS Database Migration Service (AWS DMS) task from the database to Amazon Redshift. This approach works well for simple data replication. To have more flexibility to denormalize, transform, and enrich the data, we recommend using Amazon Kinesis Data Streams and AWS Glue streaming jobs between AWS DMS tasks and Amazon Redshift. This post demonstrates how this second approach works in a customer scenario.

Example use case

For our example use case, we have a database that stores data of a fictional organization that holds sports events. We have three dimension tables: sport_event, ticket, and customer, and one fact table: ticket_activity. The table sport_event stores sport type (such as baseball or football), date, and location. The table ticket stores seat level, location, and ticket policy for the target sport event. The table customer stores individual customer names, email addresses, and phone numbers, which are sensitive information. When a customer buys a ticket, the activity (e.g. who purchased the ticket) is recorded in the table ticket_activity. One record is inserted into the table ticket_activity every time a customer buys a ticket, so new records are being ingested into this fact table continuously. The records ingested into the table ticket_activity are only updated when needed, when an administrator maintains the data.

We assume a persona, a data analyst, who is responsible for analyzing trends of the sports activity from this continuous data in real time. To use Amazon Redshift as a primary data mart, the data analyst needs to enrich and clean the data so that users like business analysts can understand and utilize the data easily.

The following are examples of the data in each table.

The following is the dimension table sport_event.

event_id sport_type start_date location
1 35 Baseball 9/1/2021 Seattle, US
2 36 Baseball 9/18/2021 New York, US
3 37 Football 10/5/2021 San Francisco, US

The following is the dimension table ticket (the field event_id is the foreign key for the field event_id in the table sport_event).

ticket_id event_id seat_level seat_location ticket_price
1 1315 35 Standard S-1 100
2 1316 36 Standard S-2 100
3 1317 37 Premium P-1 300

The following is the dimension table customer.

customer_id name email phone
1 222 Teresa Stein [email protected] +1-296-605-8486
2 223 Caleb Houston [email protected] 087-237-9316×2670
3 224 Raymond Turner [email protected] +1-786-503-2802×2357

The following is the fact table ticket_activity (the field purchased_by is the foreign key for the field customer_id in the table customer).

ticket_id purchased_by created_by updated_by
1 1315 222 8/15/2021 8/15/2021
2 1316 223 8/30/2021 8/30/2021
3 1317 224 8/31/2021 8/31/2021

To make the data easy to analyze, the data analyst wants to have only one table that includes all the information instead of joining all four tables every time they want to analyze. They also want to mask the field phone_number and tokenize the field email_address as sensitive information. To meet this requirement, we merge these four tables into one table and denormalize, tokenize, and mask the data.

The following is the destination table for analysis, sport_event_activity.

ticket_id event_id sport_type start_date location seat_level seat_location ticket_price purchased_by name email_address phone_number created_at updated_at
1 1315 35 Baseball 9/1/2021 Seattle, USA Standard S-1 100 222 Teresa Stein 990d081b6a420d04fbe07dc822918c7ec3506b12cd7318df7eb3af6a8e8e0fd6 +*-***-***-**** 8/15/2021 8/15/2021
2 1316 36 Baseball 9/18/2021 New York, USA Standard S-2 100 223 Caleb Houston c196e9e58d1b9978e76953ffe0ee3ce206bf4b88e26a71d810735f0a2eb6186e ***-***-****x**** 8/30/2021 8/30/2021
3 1317 37 Football 10/5/2021 San Francisco, US Premium P-1 300 224 Raymond Turner 885ff2b56effa0efa10afec064e1c27d1cce297d9199a9d5da48e39df9816668 +*-***-***-****x**** 8/31/2021 8/31/2021

Solution overview

The following diagram depicts the architecture of the solution that we deploy using AWS CloudFormation.

We use an AWS DMS task to capture the changes in the source RDS instance, Kinesis Data Streams as a destination of the AWS DMS task CDC replication, and an AWS Glue streaming job to read changed records from Kinesis Data Streams and perform an upsert into the Amazon Redshift cluster. In the AWS Glue streaming job, we enrich the sports-event records.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs.

The CloudFormation template generates the following resources:

  • An Amazon RDS database instance (source).
  • An AWS DMS replication instance, used to replicate the table ticket_activity to Kinesis Data Streams.
  • A Kinesis data stream.
  • An Amazon Redshift cluster (destination).
  • An AWS Glue streaming job, which reads from Kinesis Data Streams and the RDS database instance, denormalizes, masks, and tokenizes the data, and upserts the records into the Amazon Redshift cluster.
  • Three AWS Glue Python shell jobs:
    • rds-ingest-data-initial-<CloudFormation Stack name> creates four source tables on Amazon RDS and ingests the initial data into the tables sport_event, ticket, and customer. Sample data is automatically generated at random by Faker library.
    • rds-ingest-data-incremental-<CloudFormation Stack name> ingests new ticket activity data into the source table ticket_activity on Amazon RDS continuously. This job simulates customer activity.
    • rds-upsert-data-<CloudFormation Stack name> upserts specific records in the source table ticket_activity on Amazon RDS. This job simulates administrator activity.
  • AWS Identity and Access Management (IAM) users and policies.
  • An Amazon VPC, a public subnet, two private subnets, an internet gateway, a NAT gateway, and route tables.
    • We use private subnets for the RDS database instance, AWS DMS replication instance, and Amazon Redshift cluster.
    • We use the NAT gateway to have reachability to pypi.org to use MySQL Connector for Python from the AWS Glue Python shell jobs. It also provides reachability to Kinesis Data Streams and an Amazon Simple Storage Service (Amazon S3) API endpoint.

The following diagram illustrates this architecture.

To set up these resources, you must have the following prerequisites:

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For S3BucketName, enter the name of your new S3 bucket.
  5. For VPCCIDR, enter the CIDR IP address range that doesn’t conflict with your existing networks.
  6. For PublicSubnetCIDR, enter the CIDR IP address range within the CIDR you gave in VPCCIDR.
  7. For PrivateSubnetACIDR and PrivateSubnetBCIDR, enter the CIDR IP address range within the CIDR you gave for VPCCIDR.
  8. For SubnetAzA and SubnetAzB, choose the subnets you want to use.
  9. For DatabaseUserName, enter your database user name.
  10. For DatabaseUserPassword, enter your database user password.
  11. Choose Next.
  12. On the next page, choose Next.
  13. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  14. Choose Create stack.

Stack creation can take about 20 minutes.

Ingest new records

In this section, we walk you through the steps to ingest new records.

Set up an initial source table

To set up an initial source table in Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Select the job rds-ingest-data-initial-<CloudFormation stack name>.
  3. On the Actions menu, choose Run job.
  4. Wait for the Run status to show as SUCCEEDED.

This AWS Glue job creates a source table event on the RDS database instance.

Start data ingestion to the source table on Amazon RDS

To start data ingestion to the source table on Amazon RDS, complete the following steps:

  1. On the AWS Glue console, choose Triggers.
  2. Select the trigger periodical-trigger-<CloudFormation stack name>.
  3. On the Actions menu, choose Activate trigger.
  4. Choose Enable.

This trigger runs the job rds-ingest-data-incremental-<CloudFormation stack name> to ingest one record every minute.

Start data ingestion to Kinesis Data Streams

To start data ingestion from Amazon RDS to Kinesis Data Streams, complete the following steps:

  1. On the AWS DMS console, choose Database migration tasks.
  2. Select the task rds-to-kinesis-<CloudFormation stack name> .
  3. On the Actions menu, choose Restart/Resume.
  4. Wait for the Status to show as Load complete, replication ongoing.

The AWS DMS replication task ingests data from Amazon RDS to Kinesis Data Streams continuously.

Start data ingestion to Amazon Redshift

Next, to start data ingestion from Kinesis Data Streams to Amazon Redshift, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Select the job streaming-cdc-kinesis2redshift-<CloudFormation stack name>.
  3. On the Actions menu, choose Run job.
  4. Choose Run job again.

This AWS Glue streaming job is implemented based on the guidelines in Updating and inserting new data. It performs the following actions:

  • Creates a staging table on the Amazon Redshift cluster using the Amazon Redshift Data API
  • Reads from Kinesis Data Streams, and creates a DataFrame with filtering only INSERT and UPDATE records
  • Reads from three dimension tables on the RDS database instance
  • Denormalizes, masks, and tokenizes the data
  • Writes into a staging table on the Amazon Redshift cluster
  • Merges the staging table into the destination table
  • Drops the staging table

After about 2 minutes from starting the job, the data should be ingested into the Amazon Redshift cluster.

Validate the ingested data

To validate the ingested data in the Amazon Redshift cluster, complete the following steps:

  1. On the Amazon Redshift console, choose EDITOR in the navigation pane.
  2. Choose Connect to database.
  3. For Connection, choose Create a new connection.
  4. For Authentication, choose Temporary credentials.
  5. For Cluster, choose the Amazon Redshift cluster cdc-sample-<CloudFormation stack name>.
  6. For Database name, enter dev.
  7. For Database user, enter the user that was specified in the CloudFormation template (for example, dbmaster).
  8. Choose Connect.
  9. Enter the query SELECT * FROM sport_event_activity and choose Run.

Now you can see the ingested records in the table sport_event_activity on the Amazon Redshift cluster. Let’s note the value of ticket_id from one of the records. For this post, we choose 1317 as an example.

Update existing records

Your Amazon Redshift cluster now has the latest data ingested from the tables on the source RDS database instance. Let’s update the data in the source table ticket_activity on the RDS database instance to see that the updated records are replicated to the Amazon Redshift cluster side.

The CloudFormation template creates another AWS Glue job. This job upserts the data with specific IDs on the source table event. To upsert the records in the source table, complete the following steps:

  1. On the AWS Glue console, choose Jobs.
  2. Choose the job rds-upsert-data-<CloudFormation stack name>.
  3. On the Actions menu, choose Edit job.
  4. Under Security configuration, script libraries, and job parameters (optional), for Job parameters, update the following parameters:
    1. For Key, enter --ticket_id_to_be_updated.
    2. For Value, replace 1 with one of the ticket IDs you observed on the Amazon Redshift console.
  5. Choose Save.
  6. Choose the job rds-upsert-data-<CloudFormation stack name>.
  7. On the Actions menu, choose Run job.
  8. Choose Run job.

This AWS Glue Python shell job simulates a customer activity to buy a ticket. It updates a record in the source table ticket_activity on the RDS database instance using the ticket ID passed in the job argument --ticket_id_to_be_updated. It automatically selects one customer, updates the field purchased_by with the customer ID, and updates the field updated_at with the current timestamp.

To validate the ingested data in the Amazon Redshift cluster, run the same query SELECT * FROM sport_event_activity. You can filter the record with the ticket_id value you noted earlier.

According to the rows returned to the query, the record ticket_id=1317 has been updated. The field updated_at has been updated from 2021-08-16 06:05:01 to 2021-08-16 06:53:52, and the field purchased_by has been updated from 449 to 14. From this result, you can see that this record has been successfully updated on the Amazon Redshift cluster side as well. You can also choose Queries in the left pane to see past query runs.

Clean up

Now to the final step, cleaning up the resources.

  1. Stop the AWS DMS replication task rds-to-kinesis-<CloudFormation stack name>.
  2. Stop the AWS Glue streaming job streaming-cdc-kinesis2redshift-<CloudFormation stack name>.
  3. Delete the CloudFormation stack.

Conclusion

In this post, we demonstrated how you can stream data—not only new records, but also updated records from relational databases—to Amazon Redshift. With this approach, you can easily achieve upsert use cases on Amazon Redshift clusters. In the AWS Glue streaming job, we demonstrated the common technique to denormalize, mask, and tokenize data for real-world use cases.


About the Authors

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys collaborating with different teams to deliver results like this post. In his spare time, he enjoys playing video games with his family.

Roman Gavrilov is an Engineering Manager at AWS Glue. He has over a decade of experience building scalable Big Data and Event-Driven solutions. His team works on Glue Streaming ETL to allow near real time data preparation and enrichment for machine learning and analytics.

Swiftly Search Metadata with an Amazon S3 Serverless Architecture

Post Syndicated from Jiwan Panjiker original https://aws.amazon.com/blogs/architecture/swiftly-search-metadata-with-an-amazon-s3-serverless-architecture/

As you increase the number of objects in Amazon Simple Storage Service (Amazon S3), you’ll need the ability to search through them and quickly find the information you need.

In this blog post, we offer you a cost-effective solution that uses a serverless architecture to search through your metadata. Using a serverless architecture helps you reduce operational costs because you only pay for what you use.

Our solution is built with Amazon S3 event notifications, AWS Lambda, AWS Glue Catalog, and Amazon Athena. These services allow you to search thousands of objects in an S3 bucket by filenames, object metadata, and object keys. This solution maintains an index in an Apache Parquet file, which optimizes Athena queries to search Amazon S3 metadata. Using this approach makes it straightforward to run queries as needed without the need to ingest data or manage any servers.

Using Athena to search Amazon S3 objects

Amazon S3 stores and retrieves objects for a range of use cases, such as data lakes, websites, cloud-native applications, backups, archive, machine learning, and analytics. When you have an S3 bucket with thousands of files in it, how do you search for and find what you need? The object search box within the Amazon S3 user interface allows you to search by prefix, or you can search using Amazon S3 API’s LIST operation, which only returns 1,000 objects at a time.

A common solution to this issue is to build an external index and search for Amazon S3 objects using the external index. The Indexing Metadata in Amazon Elasticsearch Service Using AWS Lambda and Python and Building and Maintaining an Amazon S3 Metadata Index without Servers blog posts show you how to build this solution with Amazon OpenSearch Service or Amazon DynamoDB.

Our solution stores the external index in Amazon S3 and uses Athena to search the index. Athena makes it straightforward to search Amazon S3 objects without the need to manage servers or introduce another data repository. In the next section, we’ll talk about a few use cases where you can apply this solution.

Metadata search use cases

When your clients upload files to their S3 buckets, you’ll sometimes need to verify the files that were uploaded. You must validate whether you have received all the required information, including metadata such as customer identifier, category, received date, etc. The following examples will make use of this metadata:

  • Searching for a file from a specific date (or) date range
  • Finding all objects uploaded by a given customer identifier
  • Reviewing files for a particular category

The next sections outline how to build a serverless architecture to apply to use cases like these.

Building a serverless file metadata search on AWS

Let’s go through layers that are involved in our serverless architecture solution:

  1. Data ingestion: Set object metadata when objects are uploaded into Amazon S3. This layer uploads objects using the Amazon S3 console, AWS SDK, REST API, and AWS CLI.
  2. Data processing: Integrate Amazon S3 event notifications with Lambda to process S3 events. The AWS Data Wrangler library within Lambda will transform and build the metadata index file.
  3. Data catalog: Use AWS Glue Data Catalog as a central repository to store table definition and add/update business-relevant attributes of the metadata index. AWS Data Wrangler API creates the Apache Parquet files to maintain the AWS Glue Catalog.
  4. Metadata search: Define tables for your metadata and run queries using standard SQL with Athena to get started faster.

Reference architecture

Figure 1 illustrates our approach to implementing serverless file metadata search, which consists of the following steps:

  1. When you create new objects/files in an S3 bucket, the source bucket is configured with Amazon S3 Event Notification events (put, post, copy, etc.). Amazon S3 events provide the metadata information required for further processing and building the metadata index file on the destination bucket.
  2. The S3 event is sent to a Lambda function with necessary permissions on Amazon S3 using a resource-based policy. The Lambda function processes the event with metadata and converts it into an Apache Parquet file, which is then written into a target bucket. The AWS Data Wrangler API transforms and builds the metadata index file. The Lambda layer configures AWS Data Wrangler library for necessary transformations.
  3. AWS Data Wrangler also creates and stores metadata in the AWS Glue Data Catalog. DataFrames are then written into target S3 buckets in Apache Parquet format. The AWS Glue Data Catalog is then updated with necessary metadata. The following example code snippet writes into an example table with columns for year, month, and date for an S3 object.

wr.s3.to_parquet(df=df, path=path, dataset=True, mode="append", partition_cols=["year","month","date"],database="example_database", table="example_table")

  1. With the AWS Glue Data Catalog built, Athena will use AWS Glue crawlers to automatically infer schemas and partitions of the metadata search index. Athena makes it easy to run interactive SQL queries directly into Amazon S3 by using the schema-on-read approach.

 

Serverless S3 metadata search

Figure 1. Serverless S3 metadata search

Athena charges based on the amount of data scanned for the query. The data being in columnar format and data partitioning will save costs as well as improve performance. Figure 2 provides a sample metadata query result from Athena.

Athena sample metadata query results

Figure 2. Athena sample metadata query results

Conclusion

This blog post shows you how to create a robust metadata index using serverless components. This solution allows you to search files in an S3 bucket by filenames, metadata, and keys.

We showed you how to set up Amazon S3 Event Notifications, Lambda, AWS Glue Catalog, and Athena. You can use this approach to maintain an index in an Apache Parquet file, store it in Amazon S3, and use Athena queries to search S3 metadata.

Our solution requires minimal administration effort. It does not require administration and maintenance of Amazon Elastic Compute Cloud (Amazon EC2) instances, DynamoDB tables, or Amazon OpenSearch Service clusters. Amazon S3 provides scalable storage, high durability, and availability at a low cost. Plus, this solution does not require in-depth knowledge of AWS services. When not in use, it will only incur cost for Amazon S3 and possibly for AWS Glue Data Catalog storage. When needed, this solution will scale out effortlessly.

Ready to get started? Read more and get started on building Amazon S3 Serverless file metadata search:

Build operational metrics for your enterprise AWS Glue Data Catalog at scale

Post Syndicated from Sachin Thakkar original https://aws.amazon.com/blogs/big-data/build-operational-metrics-for-your-enterprise-aws-glue-data-catalog-at-scale/

Over the last several years, enterprises have accumulated massive amounts of data. Data volumes have increased at an unprecedented rate, exploding from terabytes to petabytes and sometimes exabytes of data. Increasingly, many enterprises are building highly scalable, available, secure, and flexible data lakes on AWS that can handle extremely large datasets. After data lakes are productionized, to measure the efficacy of the data lake and communicate the gaps or accomplishments to the business groups, enterprise data teams need tools to extract operational insights from the data lake. Those insights help answer key questions such as:

  • The last time a table was updated
  • The total table count in each database
  • The projected growth of a given table
  • The most frequently queried table vs. least queried tables

In this post, I walk you through a solution to build an operational metrics dashboard (like the following screenshot) for your enterprise AWS Glue Data Catalog on AWS.

Solution overview

This post shows you how to collect metadata information from your data lake’s AWS Glue Data Catalog resources (databases and tables) and build an operational dashboard on this data.

The following diagram illustrates the overall solution architecture and steps.

The steps are as follows:

  1. A data collector Python program runs on a schedule and collects metadata details about databases and tables from the enterprise Data Catalog.
  2. The following key data attributes are collected for each table and database in your AWS Glue Data Catalog.
Table Data Database Data
TableName DatabaseName
DatabaseName CreateTime
Owner SharedResource
CreateTime SharedResourceOwner
UpdateTime SharedResourceDatabaseName
LastAccessTime Location
TableType Description
Retention
CreatedBy
IsRegisteredWithLakeFormation
Location
SizeInMBOnS3
TotalFilesonS3
  1. The program reads each table’s file location and computes the number of files on Amazon Simple Storage Service (Amazon S3) and the size in MB.
  2. All the data for the tables and databases is stored in an S3 bucket for downstream analysis. The program runs every day and creates new files partitioned by year, month, and day on Amazon S3.
  3. We crawl the data created in Step 4 using an AWS Glue crawler.
  4. The crawler creates an external database and tables for our generated dataset for downstream analysis.
  5. We can query the extracted data with Amazon Athena.
  6. We use Amazon QuickSight to build our operational metrics dashboard and gain insights into our data lake content and usage.

For simplicity, this program crawls and collects data from the Data Catalog for the us-east-1 Region only.

Walkthrough overview

The walkthrough includes the following steps:

  1. Configure your dataset.
  2. Deploy the core solution resources with an AWS CloudFormation template, and set up and trigger the AWS Glue job.
  3. Crawl the metadata dataset and create external tables in the Data Catalog.
  4. Build a view and query the data through Athena.
  5. Set up and import data into QuickSight to create an operational metrics dashboard for the Data Catalog.

Configure your dataset

We use the AWS COVID-19 data lake for analysis. This data lake is comprised of data in a publicly readable S3 bucket.

To make the data from the AWS COVID-19 data lake available in your AWS account, create a CloudFormation stack using the following template. If you’re signed in to your AWS account, the following link fills out most of the stack creation form for you. Make sure to change the Region to us-east-1. For instructions on creating a CloudFormation stack, see Get started.

This template creates a COVID-19 database in your Data Catalog and tables that point to the public AWS COVID-19 data lake. You don’t need to host the data in your account, and you can rely on AWS to refresh the data as datasets are updated through AWS Data Exchange.

For more information about the COVID-19 dataset, see A public data lake for analysis of COVID-19 data.

Your environment may already have existing datasets in the Data Catalog. The program collects the aforementioned attributes for those datasets as well, which can be used for analysis.

Deploy your resources

To make it easier to get started, we created a CloudFormation template that automatically sets up a few key components of the solution:

  • An AWS Glue job (Python program) that is triggered based on a schedule
  • The AWS Identity and Access Management (IAM) role required by the AWS Glue job so the job can collect and store details about databases and tables in the Data Catalog
  • A new S3 bucket for the AWS Glue job to store the data files
  • A new database in the Data Catalog for storing our metrics data tables

The source code for the AWS Glue job and the CloudFormation template are available in the GitHub repo.

You must first download the AWS Glue Python code from GitHub and upload it to an existing S3 bucket. The path of this file needs to be provided when running the CloudFormation stack.

  1. Launch the stack:
  2. Provide values for your parameters as shown in the following screenshot.

After the stack is deployed successfully, you can check the resources created on the stack’s Resources tab.

You can verify and check the AWS Glue job setup and trigger, which is scheduled as per your specified time.

Now that we have verified that the stack is successfully set up, we can run our AWS Glue job manually and collect key attributes for our analysis.

  1. On the AWS Glue console, choose AWS Glue Studio in the navigation pane.
  2. In the AWS Glue Studio Console, click on Jobs and select the DataCollector job and Run the job.

The AWS Glue job collects data and stores it in the S3 bucket created for us through AWS CloudFormation. The job creates separate folders for database and table data, as shown in the following screenshot.

Crawl and set up external tables for the metrics data

Follow these steps to create tables in the database by using AWS Glue crawlers on the data stored on Amazon S3. Note that the database has been created for us using the CloudFormation stack.

  1. On the AWS Glue console, under Databases in the navigation pane, choose Tables.
  2. Choose Add tables.
  3. Choose Add tables using a crawler.
  4. Enter a name for the crawler and choose Next.
  5. For Add crawler, choose Create source type.
  6. Specify the crawler source type by choosing Data stores and choose Next.
  7. In the Add a data store section, for Choose a data store, choose S3.
  8. For Crawl data in, select Specified path.
  9. For Include path, enter the path to the tables folder generated by the AWS Glue job: s3://<data bucket created using CFN>/datalake/tables/.
  10. When asked if you want to create another data store, select No and then choose Next.
  11. On the Choose an IAM Role page, select Choose an Existing IAM Role.
  12. For IAM role, choose the IAM role created through the CloudFormation stack.
  13. Choose Next.
  14. On the Output page, for Database, choose the AWS Glue database you created earlier.
  15. Choose Next.
  16. Review your selections and choose Finish.
  17. Select the crawler you just created and choose Run crawler.

The crawler should take only a few minutes to complete. While it’s running, status messages may appear, informing you that the system is attempting to run the crawler and then is actually running the crawler. You can choose the refresh icon to check on the current status of the crawler.

  1. In the navigation pane, choose Tables.

The table called tables, which was created by the crawler, should be listed.

Query data with Athena

This section demonstrates how to query these tables using Athena. Athena is a serverless, interactive query service that makes it easy to analyze the data in the AWS COVID-19 data lake. Athena supports SQL, a common language that data analysts use for analyzing structured data. To query the data, complete the following steps:

  1. Sign in to the Athena console.
  2. If this is your first time using Athena, you must specify a query result location on Amazon S3.
  3. On the drop-down menu, choose the datalake360db database.
  4. Enter your queries and explore the datasets.

Set up and import data into QuickSight and create an operational metrics dashboard

Set up QuickSight before you import the dataset, and make sure that you have at least 512 MB of SPICE capacity. For more information, see Managing SPICE Capacity.

Before proceeding, make sure your QuickSight account has IAM permissions to access Athena (see Authorizing Connections to Amazon Athena) and Amazon S3.

Let’s first create our datasets.

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Choose Athena from the list of data sources.
  4. For Data source name, enter a name.
  5. For Database, choose the database that you set up in the previous step (datalake360db).
  6. For Tables, select databases.
  7. Finish creating your dataset..
  8. Repeat same steps to create a tables dataset.

Now you edit the databases dataset.

  1. From the datasets list, choose the databases dataset.
  2. Choose Edit dataset.
  3. Change the createtime field type from string to date.
  4. Enter the date format as yy/MM/dd HH:mm:ss.
  5. Choose Update.
  6. Similarly, change the tables dataset fields createtime, updatetime, and lastaccessedtime to the date type.
  7. Choose Save and Publish to save the changes to the dataset.

Next, we add calculated fields for the count of databases and tables.

  1. For the tables dataset, choose Add calculation.
  2. Add the calculated field tablesCount as distinct_count({tablename}.
  3. Similarly, add a new calculated field databasesCount as distinct_count({databasename}.

Now let’s create a new analysis.

  1. In the navigation pane, choose Analysis.
  2. Choose the tables dataset.
  3. Choose Create analysis.

Let’s create our first visual for the count of number of databases and tables in our data lake Data Catalog.

  1. Create a new visual and add databasesCount from the fields list.

This provides us with a count of databases in our Data Catalog.

  1. Similarly, add a visual to display the total number of tables using the tablesCount field.

Let’s create second visual for the total number of files on Amazon S3 and total storage size on Amazon S3.

  1. Similar to the previous step, we add a new visual and choose the totalfilesons3 and sizeinmbons3 fields to display Amazon S3-related storage details.

Let’s create another visual to check which are the least used datasets.

  1. Add a visual using the LastAccessTime data element.

Finally, let’s create one more visual to check if databases are shared resources from different accounts.

  1. Select the databases dataset.
  2. We create a table visual type and add databasename, sharedresource, and description fields.

Now you have an idea of what types of visuals are possible using this data. The following screenshot is one example of a finished dashboard.

Clean up

To avoid ongoing charges, delete the CloudFormation stacks and output files in Amazon S3 that you created during deployment. You have to delete the data in the S3 buckets before you can delete the buckets.

Conclusion

In this post, we showed how you can set up an operational metrics dashboard for your Data Catalog. We set up our program to collect key data elements about our tables and databases from the AWS Glue Data Catalog. We then used this dataset to build our operational metrics dashboard and gained insights on our data lake.


About the Authors

Sachin Thakkar is a Senior Solutions Architect at Amazon Web Services, working with a leading Global System Integrator (GSI). He brings over 22 years of experience as an IT Architect and as Technology Consultant for large institutions. His focus area is on Data & Analytics. Sachin provides architectural guidance and supports the GSI partner in building strategic industry solutions on AWS

Designing a High-volume Streaming Data Ingestion Platform Natively on AWS

Post Syndicated from Soonam Jose original https://aws.amazon.com/blogs/architecture/designing-a-high-volume-streaming-data-ingestion-platform-natively-on-aws/

The total global data storage is projected to exceed 200 zettabytes by 2025. This exponential growth of data demands increased vigilance against cybercrimes. Emerging cybersecurity trends include increasing service attacks, ransomware, and critical infrastructure threats. Businesses are changing how they approach cybersecurity and are looking for new ways to tackle these threats. In the past, they have relied on internal IT or engaged a managed security services provider (MSSP) to monitor and prevent unauthorized access and attacks.

An end-to-end analytics solution should ingest and process log data streaming from various computing and IoT devices. It can then make processed data available to analytics systems users in near-real-time. However, the sheer volume of data in the future makes this difficult to address in a reliable and cost-effective manner.

In this blog post, we present three approaches for a high-volume log data ingestion and processing platform natively on Amazon Web Services (AWS). We also compare the pros and cons of each. We’ll discuss factors to consider when evaluating the different options and their associated flexibility, to take full advantage of AWS. We will showcase a fictional use case for a top MSSP who ingests high volumes of logs from security devices to cloud. This MSSP also performs downstream analytics and threat detection modeling.

The options we present here have a log collection platform (LCP) on-premises. It collects logs from security devices and sensors, performs necessary translations and tokenization, and pushes compressed log files to the processing tier on cloud. The collection platform can also be modernized to have the IoT-enabled devices send logs to AWS IoT services. This will push the data to Amazon Kinesis, a managed service for collecting and analyzing streaming data.

Approach 1: Amazon Kinesis for log ingestion and format conversion

Figure 1 illustrates a comprehensive solution that uses managed and serverless services on AWS.

Figure 1. Amazon Kinesis for log ingestion and format conversion

Figure 1. Amazon Kinesis for log ingestion and format conversion

1. LCP will invoke a scalable producer application for Amazon Kinesis Data Streams running on AWS Fargate behind an Application Load Balancer. The producer application will use the Amazon Kinesis Producer Library (KPL). KPL aggregates and batches data records to make ingestion into the data stream more efficient. The application may provide compressed records to the KPL to have it manage object compression.

The application can be set up as an HTTP endpoint that receives log files and processes them using KPL. Customer ID sent as part of an HTTP request header can be used to maintain affinity. The application can run in a Docker container, which is orchestrated by Amazon ECS on AWS Fargate. A target tracking scaling policy can manage the number of parallel running data ingestion containers to manage scalability of the ingestion process.

2. Amazon Kinesis Scaling Utility can be used to scale data streams up or down by a count, or as a percentage of the total fleet. The scaling utility archive file can be imported as a library to AWS Lambda. It will automatically manage the number of shards in the stream based on the observed PUT or GET rate of the stream. The combination of customer ID and security device ID may be used to define the partition key.

3. Records uploaded to the stream by the producer application will be consumed by Lambda. It will perform gateway transformations (required by all downstream consumers) and the normalization of record format. Any additional consumer level transformations may be handled separately, associated with respective consumers.

A combination of batch window and batch size configurations can improve efficiency of function invocations. Batch windows are the maximum amount of time in seconds to gather records before invoking the function. Batch size is the number of records to send to the function in each batch. The Lambda function will throttle sending records to Amazon Kinesis Data Firehose. Error handling will be accomplished via retries with a smaller batch size, with number of retries limited as appropriate. It will discard records that are too old.

An Amazon Simple Queue Service (SQS) queue can be configured as a failed-event destination for further offline analysis. A Lambda function can read from the error SQS queue to do basic checks and determine appropriate follow-up actions. This can be an initiated email for additional investigation or a command to discard the message.

4. Output of transformations by Lambda will be saved to the short term (hot) storage Amazon S3 bucket via Kinesis Data Firehose. This can efficiently handle Parquet format conversion required by downstream analytics applications. Kinesis Data Firehose delivery streams will be created per customer and configured with associated AWS Glue Data Catalog table, to perform parquet format conversion.

5. AWS Glue jobs will be used to consolidate and write larger files to the long term (cold) storage bucket.

6. The data in the cold storage bucket will be accessed by internal SOC analysts for threat detection and mitigation.

7. The data in cold storage buckets will also be accessed by end customers via dashboards in Amazon QuickSight.

8. This architecture also provides additional options to modernize streaming analytics using Amazon Kinesis Data Analytics or AWS Glue streaming jobs as appropriate.

While this architecture proposes a fully managed, end-to-end solution, the sheer volume of log messages may drive up the total cost of the solution. This is especially true for Kinesis Data Streams and Kinesis Data Firehose costs.

Approach 2: Containerized application on AWS Fargate for ingestion and Amazon Kinesis for format conversion

An alternative approach shown in Figure 2 replaces the gateway Kinesis Data Streams and transformations, with a containerized application on Fargate. Conversion to Parquet format and writing to the S3 bucket is still handled by Kinesis Data Firehose.

Figure 2. Containerized application for ingestion and Amazon Kinesis for format conversion

Figure 2. Containerized application for ingestion and Amazon Kinesis for format conversion

1. LCP will upload log files to a raw storage bucket in Amazon S3.

2. A Lambda function will process Event Notifications from the raw data storage bucket. It can insert Amazon S3 object pointers to a Kinesis Data Stream partitioned by Customer ID and Device ID.

3. The producer application will retrieve the Event Notifications from the Data Stream and retrieve corresponding log files from S3. It will perform initial aggregations and transformations, and output to Kinesis Data Firehose. The application can run in a Docker container that is orchestrated by Amazon ECS on Fargate. A target tracking scaling policy can manage the number of parallel running data ingestion containers, to manage scalability of the ingestion process. ECS cluster capacity can be scaled up or down based on Amazon CloudWatch alarms.

4. Kinesis Data Firehose converts to Parquet format, zips the data, and persists to a short-term storage bucket in S3. This is backed by Glue Data Catalog.

Steps 5, 6 and 7 perform consolidation and availability of the processed data to downstream consumers, as in the previous approach.

This option uses the built-in capabilities of Kinesis Data Firehose to transform to Parquet format and deliver to S3. Note that higher costs associated with the service may still be cost prohibitive for larger data volumes.

Approach 3: Containerized application on AWS Fargate for ingestion and format conversion

Figure 3 uses a containerized application running on Fargate for both gateway transformations. This app also provides conversion to Parquet format before writing the files to a short term (hot) storage bucket. All the other steps are the same as in option 2.

Figure 3. Containerized application for ingestion and format conversion

Figure 3. Containerized application for ingestion and format conversion

This option offers the least expensive way to transform, aggregate, and enrich the incoming log records, as well as convert them to Parquet format. But it comes with additional overhead for custom development of format conversion, checkpointing, error handling, and application management. Evaluate based on your business needs and workflow.

Conclusion

In this post, we discussed multiple approaches to design a platform on AWS to ingest and process high-volume security log records. We compared the pros and cons for each option. Amazon Kinesis is a fully managed and scalable service that helps easily collect, process, and analyze video and data streams in real time. A solution primarily based on Kinesis may become cost prohibitive due to large data volumes. Consider alternate approaches that use containerized applications on AWS Fargate. The trade-off would be the ability for custom development versus application management overhead.

To improve your security log analysis solution, explore one of the approaches we illustrate and customize as appropriate to fit your unique needs.

How Amazon Transportation Service enabled near-real-time event analytics at petabyte scale using AWS Glue with Apache Hudi

Post Syndicated from Madhavan Sriram original https://aws.amazon.com/blogs/big-data/how-amazon-transportation-service-enabled-near-real-time-event-analytics-at-petabyte-scale-using-aws-glue-with-apache-hudi/

This post is co-written with Madhavan Sriram and Diego Menin from Amazon Transportation Services (ATS).

The transportation and logistics industry covers a wide range of services, such as multi-modal transportation, warehousing, fulfillment, freight forwarding, and delivery. At Amazon Transportation Service (ATS), the lifecycle of the shipment is digitally tracked and appended to tens of tracking updates on average. Those tracking updates are vital to kick off events through the shipment operational and billing lifecycle, including delay identification and route optimization. They are also the base for the customer and consumer tracking experience through the different touchpoints.

In this post, we discuss how ATS enabled near-real-time event analytics at petabyte scale using Apache Hudi tables created by AWS Glue Spark jobs.

ATS was looking for ways to securely and cost-efficiently manage and derive analytical insights over petabyte-sized datasets, with data coming in from different sources at different paces, and stored over different storage solutions. You can gain deeper and richer insights when you bring together all your relevant data of all structures and types, from all sources, to analyze.

One of the main challenges that our data engineering team at ATS faced was bringing together all the data arriving in real time, and building a holistic view for our customers and partners. The majority of the orders placed through Amazon, one of the world’s largest online retailers, are operationalized by ATS for the transportation and logistics. ATS provides the business accurate and timely package delivery. ATS operations generate data at petabyte scale, so having the data available at their fingertips provides innumerable opportunities to improve operations through data-driven decision-making.

Apache Hudi is an open-source data management framework used to simplify incremental data processing and data pipeline development. This framework more efficiently manages business requirements like data lifecycles and improves data quality. Hudi enables you to manage data at the record level in Amazon Simple Storage Service (Amazon S3) data lakes to simplify change data capture (CDC) and streaming data ingestion at petabyte scale, and helps handle data privacy use cases requiring record-level updates and deletes.

Solution overview

One of the biggest challenges ATS faced was handling data at petabyte scale with the need for constant inserts, updates, and deletes with minimal time delay, which reflects real business scenarios and package movement to downstream data consumers.

Their traditional data warehouses couldn’t scale to the size of the data nor the frequency of data ingestion. They needed to scale to hundreds of GBs of data across multiple data ingestion sources in order to derive near-real-time data for downstream consumers to use for data analytics that powered business-critical reports, dashboards, and visualizations. The data is also used for training machine learning models with overall service level agreements (SLAs) of 15 minutes for data ingestion and processing.

In this post, we show how we ingest data in real time in the order of hundreds of GBs per hour and run inserts, updates, and deletes on a petabyte-scale data lake using Apache Hudi tables loaded using AWS Glue Spark jobs and other AWS server-less services including AWS Lambda, Amazon Kinesis Data Firehose, and Amazon DynamoDB. AWS ProServe, working closely with ATS, built a data lake comprising of Apache Hudi tables on Amazon S3 created and populated using AWS Glue. A data pipeline was created that supports inserts, updates, and deletes at petabyte scale on the Apache Hudi tables using AWS Glue. To support real-time time ingestion, ATS also implemented a real-time data ingestion pipeline based on Kinesis Data Firehose, DynamoDB, and Amazon DynamoDB Streams.

To tackle the challenges we discussed, we decided to follow the “Divide et Impera” approach, and define two separate workstreams:

  • Stream-based – We ingested data from four different data sources and 11 datasets, and performed some initial data transformation and joins steps, honoring a time window that may vary from 3 hours to 2 weeks across all workloads. The event rate might go up to thousands of events per second, and events might have duplicates, arrive late, or not be in the correct order. Our objective was to understand in real time the transit status of a given package or truck, capture the current status of ATS operations in real time, and extend the current stream-based solution to offload and supplement the current extract, transform, and load (ETL) solution, based on Amazon Redshift.
  • Data lake – We wanted the ability to store petabytes of data and allow for merges between historical data (petabytes) with newly ingested data. The data retention policy extends to up to 5 years, which brings increased costs and reduces performance significantly. Our team requires access to near-real-time data (less than 15 minutes) from stream-based ingestion, with full GDPR compliance. Our objective was to merge stream-based ingested data files to derive a holistic view of the dataset at a certain point in time, with an SLA of under 15 minutes. Data lineage capabilities would also be nice to have.

Stream-based solution

The following diagram illustrates the architecture of our stream-based solution.

The flow of the solution is as follows:

  1. Data is ingested from various sources in separate Firehose data streams, collected for up to 15 minutes and stored in S3 buckets.
  2. Upon the arrival of every new file in Amazon S3, a Lambda function is triggered to insert data into a DynamoDB table associated with a specific data source or datasets.
  3. With DynamoDB Streams, we trigger a second Lambda function that aggregates data in real time across the different DynamoDB tables by performing real-time DynamoDB table lookups. The ETL window is enforced using DynamoDB item TTL, so data is automatically deleted from the table after the TTL period expires.
  4. After it’s transformed, data is collected in Amazon S3 passing through a Firehose delivery stream and is ready to be ingested into our data lake.

The solution allows us to do the following:

  • Ingest data in parallel, in real time, and at the desired scale from all the data sources
  • Scale on demand, and with minimal human operational overhead; this is achieved using an AWS Serverless technology stack
  • Implement our desired time window on a per-item base, reducing costs and the total amount of data stored
  • Implement ETL using Lambda functions in Python, thereby providing a tighter grasp over expressing the business logic
  • Access data on Amazon S3 before it’s ingested into our data lake, and allow customers and partners to consume data in raw format if needed

The data present in Amazon S3 represents the starting point for a seamless data lake integration.

Data lake ingestion

Moving into our data lake, the following diagram illustrates our architecture for data lake ingestion.

The core implementation in this architecture is the AWS Glue Spark ingestion job for the Hudi table; it represents the entry point for the incremental data processing pipeline.

AWS Glue Spark job runs with a concurrency of 1 and contains the logic for upsert and delete sequentially applied on the Hudi table. The sequencing of delete after upsert in the AWS Glue Spark job ensures, deletes are applied after upsert and the data consistency is maintained even in case of job reruns.

To use Apache Hudi v0.7 on AWS Glue jobs using PySpark, we imported the following libraries in the AWS Glue jobs, extracted locally from the master node of Amazon EMR:

  • hudi-spark-bundle_2.11-0.7.0-amzn-1.jar
  • spark-avro_2.11-2.4.7-amzn-1.jar

We recommend using Glue 3.0 with Hudi 0.9.0 connector rather than importing Hudi v0.7 jar files from EMR, for seamless integration and have more capabilities and features.

Before we insert data the Hudi table, we prepare it for push. To optimize for incremental merge, we take a fixed lookup window based on business use case considerations. We start by reading historical data in a given time window. See the following code:

# HUDI DATA READ
read_options = {
  'hoodie.datasource.query.type': 'snapshot'
}


# HUDI DATAFRAME  created  from target Hudi Table on S3
hudi_df = spark. \
  read. \
  format("hudi"). \
  options(*read_options). \
  load(config['target'] + "////*")

# Read Historical data set, load(basePath) use "/partitionKey=partitionValue" folder structure for Spark auto partition discovery

# input_df is the INCREMENT DATAFRAME created from incrementally ingested data on S3
input_df = spark.read.format("csv"). options(header='true').load(config['incremental_path'])



window_year, window_month, window_day = year_month_day_window()
window_filter_clause = "year >= {} AND month >= {} and day >= {}".format(window_year, window_month, window_day)

# We merge it with the incoming newly available data: 

# Data from Hudi Table on S3, because our use case is global, id is unique else id + partitionPath = unique.
hudi_s3_df = hudi_df.select(col("node_id"),col("container_label"),col(config['sort_key'])).filter(window_filter_clause)

# Perform a left outer join between new data (input_df) and data present in S3 Hudi. (hudi_s3_df)
hudi_join_df = input_df.alias("incomingData").join(hudi_s3_df.alias("S3HudiData"), (input_df.node_id == hudi_s3_df.node_id) & (input_df.container_label == hudi_s3_df.container_label), "leftouter")

# As it's a Left Outer join, there might bew new records which aren't present on S3 Hudi. 

hudi_new_df = hudi_join_df.filter(col("S3HudiData.last_update_row_epoch_seconds").isNull()).drop(col("S3HudiData.node_id")).drop(col("S3HudiData.container_label")).drop(col("S3HudiData.last_update_row_epoch_seconds"))

# As it's a Left Outer join, Select the records where input_df.last_update_time > hudi_s3_df.last_update_time. 

hudi_updated_df = hudi_join_df.filter(col("S3HudiData.last_update_row_epoch_seconds").isNotNull() & (col("incomingData.last_update_row_epoch_seconds") > col("S3HudiData.last_update_row_epoch_seconds"))).drop(col("S3HudiData.node_id")).drop(col("S3HudiData.container_label")).drop(col("S3HudiData.last_update_row_epoch_seconds"))
hudi_final_df = hudi_new_df.union(hudi_updated_df)

#  After we prepare the data to be pushed in the Hudi table, we implement the Hudi table update using the following code:

(hudi_final_df.write.format(HUDI_FORMAT)
.option(TABLE_NAME, config['hudi_table_name'])
.option(RECORDKEY_FIELD_OPT_KEY, config["primary_key"])
.option(PARTITIONPATH_FIELD_OPT_KEY,config["partition_keys"])
.option(KEYGENERATOR_CLASS_OPT_KEY, COMPLEX_KEYGENERATOR_CLASS_OPT_VAL)
.option(PRECOMBINE_FIELD_OPT_KEY, config["sort_key"])
.option(OPERATION_OPT_KEY, UPSERT_OPERATION_OPT_VAL)
.option(UPSERT_PARALLELISM, 1500)
.option('hoodie.payload.ordering.field',config["sort_key"])
.option(PAYLOAD_CLASS_OPT_KEY,'org.apache.hudi.common.model.DefaultHoodieRecordPayload')
.option(HIVE_PARTITION_FIELDS_OPT_KEY, config["partition_keys"])
.option(HIVE_DATABASE_OPT_KEY,config['hudi_database'])
.option(HIVE_TABLE_OPT_KEY,config['hudi_table_name'])
.option(HIVE_SYNC_ENABLED_OPT_KEY,"true")
.option(HIVE_JDBC_SYNC,"false")
.option(HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY, MULTIPART_KEYS_EXTRACTOR_CLASS_OPT_VAL)
.option('hoodie.datasource.write.hive_style_partitioning', 'true')
# To switch to Global Bloom index, set the following configuration as GLOBAL_BLOOM.
.option('hoodie.index.type', 'GLOBAL_SIMPLE') 
.option('hoodie.simple.index.update.partition.path', 'true')
.option('hoodie.global.simple.index.parallelism', '500')
.mode("append")
.save(config['target']))

In the preceding code, config is a dictionary that includes all the Apache Hudi configurations. The AWS Glue Data Catalog is automatically synched after Hudi table creation, as part of the Glue job, reflecting the Amazon S3 partition structure. We can now query the data using Amazon Athena or Amazon Redshift Spectrum.

To comply with our strict internal ingestion SLA, we had to dedicate special attention to employing the right Hudi indexes and defining the right table type. For the latter, we analyzed the type of workload. Due to the analytic nature of the datasets and use case, we identified that the right configuration would be to use a COPY_ON_WRITE table, even if that was a compromise on the write performances but enhanced read performance.

For the former, we went through an experimentation phase. We started with a GLOBAL_BLOOM index, identifying an initial non-linear pattern for data writing performances.

Given the randomness and the time window specified for the input data, we have encountered a significant number of false positives, leading to reading the entire dataset back for comparison. Moreover, GLOBAL_BLOOM keeps increasing linearly corresponding to the data size, whereas GLOBAL_SIMPLE doesn’t bring this overhead (with a fixed lookup window) as can be observed in the diagram.

The graph represents the Total Time taken by Glue Hudi job(X-axis) over days (Y axis) as incoming data is merged with the historical data.  leveraging GLOBAL_BLOOM.  The graph in upper half, shows when same data was merged consecutively over days, non-linear time increase was observed. Lower half graph indicates Linear increase with a steep slope when new incoming data was merged with historical data.

GLOBAL_BLOOM wasn’t appropriate for our use case as the historical data spanned back to 5 years, and Glue job will not be able to meet the SLA demands. At this point, we investigated GLOBAL_SIMPLE indexes, reaching the expected performance patterns.

Our data lake solution allows us to do the following:

  • Ingest data files in a petabyte-scale data lake, with a 15-minute ingestion SLA from the moment we receive the data
  • Read the data at Peta Byte scale by leveraging Amazon S3 partitions (created by Glue jobs and mapped to Hudi partition logic) and faster lookups by using Hudi indexes
  • Use Hudi data lineage capabilities
  • Reduce costs for data storage, infrastructure maintenance, and development
  • Manage data governance using AWS Lake Formation, which allows partners and customers to query the data using their own tools, while allowing ATS to retain control over our data

Conclusion

In this post, we highlighted how ATS built a real-time fully serverless data ingestion platform, scaling up to thousands of events per second and merging with petabyte- sized historical data stored in a data lake in near-real time.

We built a petabyte-scale data lake solution based on Apache Hudi and AWS Glue that allows us to share our data within 15 minutes from ingestion with our partners and consumers, while retaining complete control over our data and automatically offloading costs for data consumption. This provides linear performance as data grows over time.

About Amazon Transportation Service

Amazon Transportation Service (ATS) is the middle mile of the transportation network of Amazon, connecting the fulfillment centers at one end and the delivery stations and post offices at the other end. We enable packages that are ordered and packaged from fulfillment centers that traverse across the European continent to be delivered in the final delivery station that does the house-to-house delivery.


About the Authors

Madhavan Sriram is a Manager, Data Science who comes with a wide experience across multiple enterprise organisations in the space of Big Data and Machine Learning technologies. He currently leads the Data Technology and Products team within Amazon Transportation Services (ATS) and builds data-intensive products for the transportation network within Amazon. In his free time, Madhavan enjoys photography and poetry.

Diego Menin is a Senior Data Engineer within the Data Technology and Products team. He comes with a wide experience across startups and enterprises with deep AWS expertise to develop scalable cloud-based data and analytics products. Within Amazon, he is the architect of Amazon’s transportation data lake and working heavily on streaming data and integration mechanisms with downstream applications through the data lake.

Gabriele Cacciola is a Senior Data Architect working for the Professional Service team with Amazon Web Services. Coming from a solid Startup experience, he currently helps enterprise customers across EMEA implement their ideas, innovate using the latest tech and build scalable data and analytics solutions to make critical business decisions. In his free time, Gabriele enjoys football and cooking.

Kunal Gautam is a Senior Big Data Architect at Amazon Web Services. Having experience in building his own Startup and working along with enterprises, he brings a unique perspective to get people, business and technology work in tandem for customers. He is passionate about helping customers in their digital transformation journey and enables them to build scalable data and advance analytics solutions to gain timely insights and make critical business decisions. In his spare time, Kunal enjoys Marathons, Tech Meetups and Meditation retreats.

Field Notes: How to Build an AWS Glue Workflow using the AWS Cloud Development Kit

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/architecture/field-notes-how-to-build-an-aws-glue-workflow-using-the-aws-cloud-development-kit/

Many customers use AWS Glue workflows to build and orchestrate their ETL (extract-transform-load) pipelines directly in the AWS Glue console using the visual tool to author workflows. This can be time consuming, harder to version control, and error prone due to manual configurations, when compared to managing your workflows as code. To improve your operational excellence, consider deploying the entire AWS Glue ETL pipeline using the AWS Cloud Development Kit (AWS CDK).

In this blog post, you will learn how to build an AWS Glue workflow using Amazon Simple Storage Service (Amazon S3), various components of AWS Glue, AWS Secrets Manager, Amazon Redshift, and the AWS CDK.

Architecture overview

In this architecture, you will use the AWS CDK to deploy your data sources, ETL scripts, AWS Glue workflow components, and an Amazon Redshift cluster for analyzing the transformed data.

AWS Glue workflow architecture

Figure 1. AWS Glue workflow architecture

It is common for customers to pre-aggregate data before sending it downstream to analytical engines, like Amazon Redshift, because table joins and aggregations are computationally expensive. The AWS Glue workflow will join COVID-19 case data, and COVID-19 hiring data together on their date columns in order to run correlation analysis on the final dataset. The datasets may seem arbitrary, but we wanted to offer a way to better understand the impacts COVID-19 had on jobs in the United States. The takeaway here is to use this as a blueprint for automating the deployment of data analytic pipelines for the data of interest to your business.

After the AWS CDK application is deployed, it will begin creating all of the resources required to build the complete workflow. When it completes, the components in the architecture will be created, and the AWS Glue workflow will be ready to start. In this blog post, you start workflows manually, but they can be configured to start on a scheduled time or from a workflow trigger.

The workflow is programmed to dynamically pull the raw data from the Registry of Open Data on AWS where you can find the Covid-19 case data and the Hiring Data respectively.

Prerequisites

This blog post uses an AWS CDK stack written in TypeScript and AWS Glue jobs written in Python. Follow the instructions in the AWS CDK Getting Started guide to set up your environment, before you proceed to deployment.

In addition to setting up your environment, you need to clone the Git repository, which contains the AWS CDK scripts and Python ETL scripts used by AWS Glue. The ETL scripts will be deployed to Amazon S3 by the AWS CDK stack as assets, and referenced by the AWS Glue jobs as part of the AWS Glue Workflow.

You should have the following prerequisites:

Deployment

After you have cloned the repository, navigate to the glue-cdk-blog/lib folder and open the blog-glue-workflow-stack.ts file. This is the AWS CDK script used to deploy all necessary resources to build your AWS Glue workflow. The blog-redshift-vpc-stack.ts contains the necessary resources to deploy the Amazon Redshift cluster, connections, and permissions. The glue-cdk-blog/lib/assets folder also contains the AWS Glue job scripts. These files are uploaded to Amazon S3 by the AWS CDK when you bootstrap.

You won’t review the individual lines of code in the script in this blog post, but if you are unfamiliar with any of the AWS CDK level 1 or level 2 constructs used in the sample, you can review what each construct does with the AWS CDK documentation. Familiarize yourself with the script you cloned and anticipate what resources will be deployed. Then, deploy both stacks and verify your initial findings.

After your environment is configured, and the packages and modules installed, deploy the AWS CDK stack and assets in two commands.

  1. Bootstrap the AWS CDK stack to create an S3 bucket in the predefined account that will contain the assets.

cdk bootstrap

  1. Deploy the AWS CDK stacks.

cdk deploy --all

Verify that both of these commands have completed successfully, and remediate any failures returned. Upon successful completion, you’re ready to start the AWS Glue workflow that was just created. You can find the AWS CDK commands reference in the AWS CDK Toolkit commands documentation, and help with Troubleshooting common AWS CDK issues you may encounter.

Walkthrough

Prior to initiating the AWS Glue workflow, explore the resources the AWS CDK stacks just deployed to your account.

  1. Log in to the AWS Management Console and the AWS CDK account.
  2. Navigate to Amazon S3 in the AWS console (you should see an S3 bucket with the name prefix of cdktoolkit-stagingbucket-xxxxxxxxxxxx).
  3. Review the objects stored in the bucket in the assets folder. These are the .py files used by your AWS Glue jobs. They were uploaded to the bucket when you issued the AWS CDK bootstrap command, and referenced within the AWS CDK script as the scripts to use for the AWS Glue jobs. When retrieving data from multiple sources, you cannot always control the naming convention of the sourced files. To solve this and create better standardization, you will use a job within the AWS Glue workflow to copy these scripts to another folder and rename them with a more meaningful name.
  4. Navigate to Amazon Redshift in the AWS console and verify your new cluster. You can use the Amazon Redshift Query Editor within the console to connect to the cluster and see that you have an empty database called db-covid-hiring. The Amazon Redshift cluster and networking resources were created by the redshift_vpc_stack which are listed here:
    • VPC, subnet and security group for Amazon Redshift
    • Secrets Manager secret
    • AWS Glue connection and S3 endpoint
    • Amazon Redshift cluster
  1. Navigate to AWS Glue in the AWS console and review the following new resources created by the workflow_stack CDK stack:
    • Two crawlers to crawl the data in S3
    • Three AWS Glue jobs used within the AWS Glue workflow
    • Five triggers to initiate AWS Glue jobs and crawlers
    • One AWS Glue workflow to manage the ETL orchestration
  1. All of these resources could have been deployed within a single stack, but this is intended to be a simple example on how to share resources across multiple stacks. The AWS Identity and Access Management (IAM) role that AWS Glue uses to run the ETL jobs in the workflow_stack, is also used by Secrets Manager for Amazon Redshift in the redshift_vpc_stack. Inspect the /bin/blog-glue-workflow-stack.ts file to further understand cross stack resource sharing.

By performing these steps, you have deployed all of the AWS Glue resources necessary to perform common ETL tasks. You then combined the resources to create an orchestration of tasks using an AWS Glue workflow. All of this was done using IaC with AWS CDK. Your workflow should look like Figure 2.

AWS Glue console showing the workflow created by the CDK

Figure 2. AWS Glue console showing the workflow created by the CDK

As mentioned earlier, you could have started your workflow using a scheduled cron trigger, but you initiated the workflow manually so you had time to review the resources the workflow_stack CDK deployed, prior to initiation of the workflow. Now that you have reviewed the resources, validate your workflow by initiating it and verifying it runs successfully.

  1. From within the AWS Glue console, select Workflows under ETL.
  2. Select the workflow named glue-workflow, and then select Run from the actions listbox.
  3. You can verify the status of the workflow by viewing the run details under the History tab.
  4. Your job will take approximately 15 minutes to successfully complete, and your history should look like Figure 3.
AWS Glue console showing the workflow as completed after the run

Figure 3. AWS Glue console showing the workflow as completed after the run

The workflow performs the following tasks:

  1. Prepares the ETL scripts by copying the files in the S3 asset bucket to a new folder and renames them with a more relevant name.
  2. Initiates a crawler to crawl the raw source data as csv files and adds tables to the Glue Data Catalog.
  3. Runs a Python script to perform some ETL tasks on the .csv files and converts them to parquet files.
  4. Crawls the parquet files and adds them to the Glue Data Catalog.
  5. Loads the parquet files into a DynamicFrame and runs an Amazon Redshift COPY command to load the data into the Amazon Redshift database.

After the workflow completes, you can query and perform analytics on the data that was populated in Amazon Redshift. Open the Amazon Redshift Query Editor and run a simple SELECT statement to query the covid_hiring_table which is the joined Covid-19 case data and hiring data (see Figure 4).

Amazon Redshift query editor showing the data that the workflow loaded into the Redshift tables

Figure 4. Amazon Redshift query editor showing the data that the workflow loaded into the Redshift tables

Cleaning up

Some resources, like S3 buckets and Amazon DynamoDB tables, must be manually emptied and deleted through the console to be fully removed. To clean up the deployment, delete all objects in the AWS CDK asset bucket in Amazon S3 by using the AWS console to empty the bucket, and then run cdk destroy –all to delete the resources the AWS CDK stacks created in your account. Finally, if you don’t plan on using AWS CloudFormation assets in this account in the future, you will need to delete the AWS CDK asset stack within the CloudFormation console to remove the AWS CDK asset bucket.

Conclusion

In this blog post, you learned how to automate the deployment of AWS Glue workflows using the AWS CDK. This further enhances your continuous integration and delivery (CI/CD) data pipelines by automating the deployment of the ETL jobs and AWS Glue workflow orchestration, providing an efficient, fast, and repeatable way to build and deploy AWS Glue workflows at scale.

Although AWS CDK primarily supports level 1 constructs for most AWS Glue resources, new constructs are added continually. See the AWS CDK API Reference for updates, prior to authoring your stacks, for AWS Glue level 2 construct support. You can find the code used in this blog post in this GitHub repository, and the AWS CDK in TypeScript reference to the AWS CDK namespace.

We hope this blog post helps enrich your work through the skills gained of automating the creation of Glue Workflows, enabling you to quickly build and deploy your own ETL pipelines and run analytical models that power your business.

Field Notes provides hands-on technical guidance from AWS Solutions Architects, consultants, and technical account managers, based on their experiences in the field solving real-world business problems for customers.

How NortonLifelock built a serverless architecture for real-time analysis of their VPN usage metrics

Post Syndicated from Madhu Nunna original https://aws.amazon.com/blogs/big-data/how-nortonlifelock-built-a-serverless-architecture-for-real-time-analysis-of-their-vpn-usage-metrics/

This post presents a reference architecture and optimization strategies for building serverless data analytics solutions on AWS using Amazon Kinesis Data Analytics. In addition, this post shows the design approach that the engineering team at NortonLifeLock took to build out an operational analytics platform that processes usage data for their VPN services, consuming petabytes of data across the globe on a daily basis.

NortonLifeLock is a global cybersecurity and internet privacy company that offers services to millions of customers for device security, and identity and online privacy for home and family. NortonLifeLock believes the digital world is only truly empowering when people are confident in their online security. NortonLifeLock has been an AWS customer since 2014.

For any organization, the value of operational data and metrics decreases with time. This lost value can equate to lost revenue and wasted resources. Real-time streaming analytics helps capture this value and provide new insights that can create new business opportunities.

AWS offers a rich set of services that you can use to provide real-time insights and historical trends. These services include managed Hadoop infrastructure services on Amazon EMR as well as serverless options such as Kinesis Data Analytics and AWS Glue.

Amazon EMR also supports multiple programming options for capturing business logic, such as Spark Streaming, Apache Flink, and SQL.

As a customer, it’s important to understand organizational capabilities, project timelines, business requirements, and AWS service best practices in order to define an optimal architecture from performance, cost, security, reliability, and operational excellence perspectives (the five pillars of the AWS Well-Architected Framework).

NortonLifeLock is taking a methodical approach to real-time analytics on AWS while using serverless technology to deliver on key business drivers such as time to market and total cost of ownership. In addition to NortonLifeLock’s implementation, this post provides key lessons learned and best practices for rapid development of real-time analytics workloads.

Business problem

NortonLifeLock offers a VPN product as a freemium service to users. Therefore, they need to enforce usage limits in real time to stop freemium users from using the service when their usage is over the limit. The challenge for NortonLifeLock is to do this in a reliable and affordable fashion.

NortonLifeLock runs its VPN infrastructure in almost all AWS Regions. Migrating to AWS from smaller hosting vendors has greatly improved user experience and VPN edge server performance, including a reduction in connection latency, time to connect and connection errors, faster upload and download speed, and more stability and uptime for VPN edge servers.

VPN usage data is collected by VPN edge servers and uploaded to backend stats servers every minute and persisted in backend databases. The usage information serves multiple purposes:

  • Displaying how much data a device has consumed for the past 30 days.
  • Enforcing usage limits on freemium accounts. When a user exhausts their free quota, that user is unable to connect through VPN until the next free cycle.
  • Analyzing usage data by the internal business intelligence (BI) team based on time, marketing campaigns, and account types, and using this data to predict future growth, ability to retain users, and more.

Design challenge

NortonLifeLock had the following design challenges:

  • The solution must be able to simultaneously satisfy both real-time and batch analysis.
  • The solution must be economical. NortonLifeLock VPN has hundreds of thousands of concurrent users, and if a user’s usage information is persisted as it comes in, it results in tens of thousands of reads and writes per second and tens of thousands of dollars a month in database costs.

Solution overview

NortonLifeLock decided to split storage into two parts by storing usage data in Amazon DynamoDB for real-time access and in Amazon Simple Storage Service (Amazon S3) for analysis, which addresses real-time enforcement and BI needs. Kinesis Data Analytics aggregates and loads data to Amazon S3 and DynamoDB. With Amazon Kinesis Data Streams and AWS Lambda as consumers of Kinesis Data Analytics, the implementation of user and device-level aggregations was simplified.

To keep costs down, user usage data was aggregated by the hour and persisted in DynamoDB. This spread hundreds of thousands of writes over an hour and reduced DynamoDB cost by 30 times.

Although increasing aggregation might not be an option for other problem domains, it’s acceptable in this case because it’s not necessary to be precise to the minute for user usage, and it’s acceptable to calculate and enforce the usage limit every hour.

The following diagram illustrates the high-level architecture. The solution is broken into three logical parts:

  • End-users – Real-time queries from devices to display current usage information (how much data is used daily)
  • Business analysts – Query historical usage information through Amazon Athena to extract business insights
  • Usage limit enforcement – Usage data ingestion and aggregation in real time

The solution has the following workflow:

  1. Usage data is collected by a VPN edge server and sends it to the backend service through Application Load Balancer.
  2. A single usage data record sent by the VPN edge server contains usage data for many users. A stats splitter splits the message into individual usage stats per user and forwards the message to Kinesis Data Streams.
  3. Usage data is consumed by both the legacy stats processor and the new Apache Flink application developed and deployed on Kinesis Data Analytics.
  4. The Apache Flink application carries out the following tasks:
    1. Aggregate device usage data hourly and send the aggregated result to Amazon S3 and the outgoing Kinesis data stream, which is picked up by a Lambda function that persists the usage data in DynamoDB.
    2. Aggregate device usage data daily and send the aggregated result to Amazon S3.
    3. Aggregate account usage data hourly and forward the aggregated results to the outgoing data stream, which is picked up by a Lambda function that checks if account usage is over the limit for that account. If account usage is over the limit, the function forwards the account information to another Lambda function, via Amazon Simple Queue Service (Amazon SQS), to cut off access on that account.

Design journey

NortonLifeLock needed a solution that was capable of real-time streaming and batch analytics. Kinesis Data Analysis fits this requirement because of the following key features:

  • Real-time streaming and batch analytics for data aggregation
  • Fully managed with a pay-as-you-go model
  • Auto scaling

NortonLifeLock needed Kinesis Data Analytics to do the following:

  • Aggregate customer usage data per device hourly and send results to Kinesis Data Streams (ultimately to DynamoDB) and the data lake (Amazon S3)
  • Aggregate customer usage data per account hourly and send results to Kinesis Data Streams (ultimately to DynamoDB and Lambda, which enforces usage limit)
  • Aggregate customer usage data per device daily and send results to the data lake (Amazon S3)

The legacy system processes usage data from an incoming Kinesis data stream, and they plan to use Kinesis Data Analytics to consume and process production data from the same stream. As such, NortonLifeLock started with SQL applications on Kinesis Data Analytics.

First attempt: Kinesis Data Analytics for SQL

Kinesis Data Analytics with SQL provides a high-level SQL-based abstraction for real-time stream processing and analytics. It’s configuration driven and very simple to get started. NortonLifeLock was able to create a prototype from scratch, get to production, and process the production load in less than 2 weeks. The solution met 90% of the requirements, and there were alternates for the remaining 10%.

However, they started to receive “read limit exceeded” alerts from the source data stream, and the legacy application was read throttled. With Amazon Support’s help, they traced the issues to the drastic reversal of the Kinesis Data Analytics MillisBehindLatest metric in Kinesis record processing. This was correlated to the Kinesis Data Analytics auto scaling events and application restarts, as illustrated by the following diagram. The highlighted areas show the correlation between spikes due to autoscaling and reversal of MillisBehindLatest metrics.

Here’s what happened:

  • Kinesis Data Analytics for SQL scaled up KPU due to load automatically, and the Kinesis Data Analytics application was restarted (part of scaling up).
  • Kinesis Data Analytics for SQL supports the at least once delivery model and uses checkpoints to ensure no data loss. But it doesn’t support taking a snapshot and restoring from the snapshot after a restart. For more details, see Delivery Model for Persisting Application Output to an External Destination.
  • When the Kinesis Data Analytics for SQL application was restarted, it needed to reprocess data from the beginning of the aggregation window, resulting in a very large number of duplicate records, which led to a dramatic increase in the Kinesis Data Analytics MillisBehindLatest metric.
  • To catch up with incoming data, Kinesis Data Analytics started re-reading from the Kinesis data stream, which led to over-consumption of read throughput and the legacy application being throttled.

In summary, Kinesis Data Analytics for SQL’s duplicates record processing on restarts, no other means to eliminate duplicates, and limited ability to control auto scaling led to this issue.

Although they found Kinesis Data Analytics for SQL easy to get started, these limitations demanded other alternatives. NortonLifeLock reached out to the Kinesis Data Analytics team and discussed the following options:

  • Option 1 – AWS was planning to release a new service, Kinesis Data Analytics Studio for SQL, Python, and Scala, which addresses these limitations. But this service was still a few months away (this service is now available, launched May 27, 2021).
  • Option 2 – The alternative was to switch to Kinesis Data Analytics for Apache Flink, which also provides the necessary tools to address all their requirements.

Second attempt: Kinesis Data Analytics for Apache Flink

Apache Flink has a comparatively steep learning curve (we used Java for streaming analytics instead of SQL), and it took about 4 weeks to build the same prototype, deploy it to Kinesis Data Analytics, and test the application in production. NortonLifeLock had to overcome a few hurdles, which we document in this section along with the lessons learned.

Challenge 1: Too many writes to outgoing Kinesis data stream

The first thing they noticed was that the write threshold on the outgoing Kinesis data stream was greatly exceeded. Kinesis Data Analytics was attempting to write 10 times the amount of expected data to the data stream, with 95% of data throttled.

After a lengthy investigation, it turned out that having too much parallelism in the Kinesis Data Analytics application led to this issue. They had followed default recommendations and set parallelism to 12 and it scaled up to 16. This means that every hour, 16 separate threads were attempting to write to the destination data stream simultaneously, leading to massive contention and writes throttled. These threads attempted to retry continuously, until all records were written to the data stream. This resulted in 10 times the amount of data processing attempted, even though only one tenth of the writes eventually succeeded.

The solution was to reduce parallelism to 4 and disable auto scaling. In the preceding diagram, the percentage of throttled records dropped to 0 from 95% after they reduced parallelism to 4 in the Kinesis Data Analytics application. This also greatly improved KPU utilization and reduced Kinesis Data Analytics cost from $50 a day to $8 a day.

Challenge 2: Use Kinesis Data Analytics sink aggregation

After tuning parallelism, they still noticed occasional throttling by Kinesis Data Streams because of the number of records being written, not record size. To overcome this, they turned on Kinesis Data Analytics sink aggregation to reduce the number of records being written to the data stream, and the result was dramatic. They were able to reduce the number of writes by 1,000 times.

Challenge 3: Handle Kinesis Data Analytics Flink restarts and the resulting duplicate records

Kinesis Data Analytics applications restart because of auto scaling or recovery from application or task manager crashes. When this happens, Kinesis Data Analytics saves a snapshot before shutdown and automatically reloads the latest snapshot and picks up where the work was left off. Kinesis Data Analytics also saves a checkpoint every minute so no data is lost, guaranteeing exactly-once processing.

However, when the Kinesis Data Analytics application shut down in the middle of sending results to Kinesis Data Streams, it doesn’t guarantee exactly-once data delivery. In fact, Flink only guarantees at least once delivery to Kinesis Data Analytics sink, meaning that Kinesis Data Analytics guarantees to send a record at least once, which leads to duplicate records sent when Kinesis Data Analytics is restarted.

How were duplicate records handled in the outgoing data stream?

Because duplicate records aren’t handled by Kinesis Data Analytics when sinks do not have exactly-once semantics, the downstream application must deal with the duplicate records. The first question you should ask is whether it’s necessary to deal with the duplicate records. Maybe it’s acceptable to tolerate duplicate records in your application? This, however, is not an option for NortonLifeLock, because no user wants to have their available usage taken twice within the same hour. So, logic had to be built in the application to handle duplicate usage records.

To deal with duplicate records, you can employ a strategy in which the application saves an update timestamp along with the user’s latest usage. When a record comes in, the application reads existing daily usage and compares the update timestamp against the current time. If the difference is less than a configured window (50 minutes if the aggregation window is 60 minutes), the application ignores the new record because it’s a duplicate. It’s acceptable for the application to potentially undercount vs. overcount user usage.

How were duplicate records handled in the outgoing S3 bucket?

Kinesis Data Analytics writes temporary files in Amazon S3 before finalizing and removing them. When Kinesis Data Analytics restarts, it attempts to write new S3 files, and potentially leaves behind temporary S3 files because of restart. Because Athena ignores all temporary S3 files, no further is action needed. If your BI tools take temporary S3 files into consideration, you have to configure the Amazon S3 lifecycle policy to clean up temporary S3 files after a certain time.

Conclusion

NortonLifelock has been successfully running a Kinesis Data Analytics application in production since May 2021. It provides several key benefits. VPN users can now keep track of their usage in near-real time. BI analysts can get timely insights that are used for targeted sales and marketing campaigns, and upselling features and services. VPN usage limits are enforced in near-real time, thereby optimizing the network resources. NortonLifelock is saving tens of thousands of dollars each month with this real-time streaming analytics solution. And this telemetry solution is able to keep up with petabytes of data flowing through their global VPN service, which is seeing double-digit monthly growth.

To learn more about Kinesis Data Analytics and getting started with serverless streaming solutions on AWS, please see Developer Guide for Studio, the easiest way to build Apache Flink applications in SQL, Python, Scala in a notebook interface.


About the Authors

Lei Gu has 25 years of software development experience and the architect for three key Norton products, Norton Secure Backup, VPN and Norton Family. He is passionate about cloud transformation and most recently spoke about moving from Cassandra to Amazon DynamoDB at AWS re:Invent 2019. Check out his Linkedin profile at https://www.linkedin.com/in/leigu/.

Madhu Nunna is a Sr. Solutions Architect at AWS, with over 20 years of experience in networks and cloud, with the last two years focused on AWS Cloud. He is passionate about Analytics and AI/ML. Outside of work, he enjoys hiking and reading books on philosophy, economics, history, astronomy and biology.

Hybrid Cloud Architectures Using Self-hosted Apache Kafka and AWS Glue

Post Syndicated from Brandon Rubadou original https://aws.amazon.com/blogs/architecture/hybrid-cloud-architectures-using-self-hosted-apache-kafka-and-aws-glue/

Using analytics to gain insights from a variety of datasets is key to successful transformation. There are many options to consider to realize the full value and potential of our data in a hybrid cloud infrastructure. Common practice is to route data produced from on-premises to a central repository or data lake. Here it can be consumed by multiple applications.

You can use an Apache Kafka cluster for data movement from on-premises to the data lake, using Amazon Simple Storage Service (Amazon S3). But you must either replicate the topics onto a cloud cluster, or develop a custom connector to read and copy the topics to Amazon S3. This presents a challenge for many customers.

This blog presents another option; an architecture solution leveraging AWS Glue.

Kafka and ETL processing

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. You can use Kafka clusters as a system to move data between systems. Producers typically publish data (or push) to a Kafka topic, where an application can consume it. Consumers are usually custom applications that feed data into respective target applications. These targets can be a data warehouse, an Amazon OpenSearch Service cluster, or others.

AWS Glue offers the ability to create jobs that will extract, transform, and load (ETL) data. This allows you to consume from many sources, such as from Apache Kafka, Amazon Kinesis Data Streams, or Amazon Managed Streaming for Apache Kafka (Amazon MSK). The jobs cleanse and transform the data, and then load the results into Amazon S3 data lakes or JDBC data stores.

Hybrid solution and architecture design

In most cases, the first step in building a responsive and manageable architecture is to review the data itself. For example, if we are processing insurance policy data from a financial organization, our data may contain fields that identify customer data. These can be account ID, an insurance claim identifier, and the dollar amount of the specific claim. Glue provides the ability to change any of these field types into the expected data lake schema type for processing.

Figure 1. Data flow - Source to data lake target

Figure 1. Data flow – Source to data lake target

Next, AWS Glue must be configured to connect to the on-premises Kafka server (see Figure 1). Private and secure connectivity to the on-premises environment can be established via AWS Direct Connect or a VPN solution. Traffic from the Amazon Virtual Private Cloud (Amazon VPC) is allowed to access the cluster directly. You can do this by creating a three-step streaming ETL job:

  1. Create a Glue connection to the on-premises Kafka source
  2. Create a Data Catalog table
  3. Create an ETL job, which saves to an S3 data lake

Configuring AWS Glue

  1. Create a connection. Using AWS Glue, create a secure SSL connection in the Data Catalog using the predefined Kafka connection type. Enter the hostname of the on-premises cluster and use the custom-managed certificate option for additional security. If you are in a development environment, you are required to generate a self-signed SSL certificate. Use your Kafka SSL endpoint to connect to Glue. (AWS Glue also supports client authentication for Apache Kafka streams.)
  2. Specify a security group. To allow AWS Glue to communicate between its components, specify a security group with a self-referencing inbound rule for all TCP ports. By creating this rule, you can restrict the source to the same security group in the Amazon VPC. Ensure you check the default security group for your VPC, as it could have a preconfigured self-referencing inbound rule for ALL traffic.
  3. Create the Data Catalog. Glue can auto-create the data schema. Since it’s a simple flat file, use the schema detection function of Glue. Set up the Kafka topic and refer to the connection.
  4. Define the job properties. Create the AWS Identity and Access Management (IAM) role to allow Glue to connect to S3 data. Select an S3 bucket and format. In this case, we use CSV and enable schema detection.

The Glue job can be scheduled, initiated manually, or by using an event driven architecture. Note that Glue does not yet support the “test connection” option within the console. Make sure you set the “Job Timeout” and enter a duration in minutes because the default value is blank.

When the job runs, it pulls the latest topics from the source on-premises Kafka cluster. Glue supports checkpoints to ensure that all source data is processed. By default, AWS Glue processes and writes out data in 100-second windows. This allows data to be processed efficiently and permits aggregations to be performed on data arriving later. You can modify this window size to increase timeliness or aggregation accuracy. AWS Glue streaming jobs use checkpoints rather than job bookmarks to track the data that has been read. AWS Glue bills hourly for streaming ETL jobs only while they are running.

Now that the connection is complete and the job is created, we can format the source data needed for the data lake. AWS Glue offers a set of built-in transforms that you can use to process your data using your ETL script. The transformed data is then placed in S3, where it can be leveraged as part of a larger data lake environment.

Many additional steps can be taken to render even more value from the information. For example, one team may choose to use a business intelligence tool like Amazon QuickSight to visualize and embed the data into an internal dashboard. Another team may want to use event driven architectures to notify financial analysts and initiate downstream actions when specific types of data are discovered. There are endless opportunities that should be determined by the business needs.

Summary

In this blog post, we have given an overview of an architecture that provides hybrid cloud data integration and analytics capability. Once the data is transformed and hosted in the S3 data lake, we can provide secure, reliable access to gain valuable insights. This solution allows for a variety of different producers and consumers, with the ability to handle increasing volumes of data.

AWS Glue along with Apache Kafka will ensure that your on-premises workloads are tightly integrated with your larger data lake solution.

If you have questions, post your thoughts in the comments section.

For further reading:

Emerging Solutions for Operations Research on AWS

Post Syndicated from Randy DeFauw original https://aws.amazon.com/blogs/architecture/emerging-solutions-for-operations-research-on-aws/

Operations research (OR) uses mathematical and analytical tools to arrive at optimal solutions for complex business problems like workforce scheduling. The mathematical techniques used to solve these problems, such as linear programming and mixed-integer programming, require the use of optimization software (solvers).  There are several popular and powerful solvers available, ranging from commercial options like IBM CPLEX to open-source packages like ORTools. While these solvers incorporate decades of algorithmic expertise and can solve large and complex problems effectively, they have some scalability limitations.

In this post, we’ll describe three alternatives that you can consider for solving OR problems (see Figure 1). None of these are as general purpose as traditional solvers, but they should be on your “emerging technologies” radar.

Figure 1. OR optimization options

Figure 1. OR optimization options

These include:

  1. A traditional solver running on a compute platform
  2. Reinforcement and machine learning (ML) algorithms running on Amazon SageMaker
  3. A quantum computing algorithm running on Amazon Braket. Experiments are collected in Amazon DynamoDB and the results are visualized in Amazon Elasticsearch Service.

A reference problem and solution

Let’s start with a reference problem and solve it with a traditional solver. We’ll tackle an inventory management issue (see Figure 2). We have a sales depot that supplies products for local sales outlets. For the depot’s Region, there are seven weeks of historical sales data for each product. We also know how much each product costs and for how much it can be sold. Finally, we know the overall weekly capacity of the depot. This depends on logistical constraints like the size of the warehouse and transportation availability. This scenario is loosely based on the Grupo Bimbo retailer’s Kaggle competition and dataset.

Figure 2. Sales depot inventory management scenario

Figure 2. Sales depot inventory management scenario

Our job is to place an inventory order to restock our sales depot each week. We quantify our work through a reward function. We want to maximize our revenue:

revenue = (sale price * number of units sold)

(Note that the sample dataset does not include cost of goods sold, only sale price.)

We use these constraints:

total units sold <= depot capacity
0 <= quantity sold of any given item <= forecasted demand for that item

There are many possible solutions to this problem. Using ORTools, we get an average reward (profit) of about $5,700, in about 1,000 simulations.

We can make the scenario slightly more realistic by acknowledging that our sales forecasts are not perfect. After we get the solution from the solver, we can penalize the reward (profit) by subtracting the cost of unsold goods. With this approach, we get a reward of about $2,450.

Solving OR problems with reinforcement learning

An alternative approach to the traditional solver is reinforcement learning (RL). RL is a field of ML that handles problems where the right answer is not immediately known, like playing a game of chess. RL fits our sales depot scenario, because we don’t know how well we will do until after we place the order and are able to view a week of sales activity.

Our sales depot problem resembles a knapsack problem. This is a common OR pattern where we want to fill a container (in this case, our sales depot) with as many items as possible until capacity is reached. Each item has a value (sales price) and a weight (cost). In RL we have to translate this into an observation space, an action space, a state, and a reward (see Figure 3).

The observation space is what our purchasing agent sees. This includes our depot capacity, the sales price, and the forecasted demand. The action space is what our agent can do. In the simplest case, it’s the number of each item to order for the depot, each week. The state is what the agent sees right now, and we model that as the sales results from last week. Finally, the reward function is our profit equation.

One important distinction between OR solvers and RL is that we can’t easily enforce hard constraints in RL. We can limit the amount of an individual product we purchase each week, but we can’t enforce an overall limit on the number of items purchased. We may exceed the capacity of our depot. The simplest way to handle that is to enforce a penalty. There are more sophisticated techniques available, such as interpreting our action as the percentage of budget to spend on each item. But let’s illustrate the simple case here.

Using an RL algorithm from the Ray RLLib package, our reward was $7,000 on average, including penalties for ordering too much of any given item.

Figure 3. Translating OR problem to RL

Figure 3. Translating OR problem to RL

Solving OR problems with machine learning

It’s possible to model a knapsack problem using ML rather than RL in some cases, and there are simple reference implementations available. The design assumes that we know, or can accurately estimate the reward for a given week. With our simple scenario, we can compute the reward using estimates of future sales. We can use this in a custom loss function to train a neural network.

Solving OR problems with quantum computing

Quantum computers are fundamentally different than the computers most of us use. The appeal of quantum computers is that they can tackle some types of problems much more efficiently than standard computers. Quantum computers can, in theory, solve prime number factoring for decryption in orders of magnitude faster than a standard computer. But they are still in their infancy and limited to the size of problem they can handle, due to hardware limitations.

D-Wave Systems, which make some of the types of quantum computers available through Amazon Braket, has a solver called QBSolv. QBSolv works on a specific type of optimization problem called quadratic unconstrained binary optimization (QUBO). It breaks large problems into smaller pieces that a quantum computer can handle. There is a reference pattern for translating a knapsack problem to a QUBO problem.

Running the sales depot problem through QBSolv on Amazon Braket and using a subset of the data, I was able to obtain a reward of $900. When I tried to run on the full dataset, I was not able to complete the decomposition step, likely due to a hardware limitation.

Conclusion

In this blog post, I review OR problems and traditional OR solvers. I then discussed three alternative approaches, RL, ML, and quantum computing. Each of these alternatives has drawbacks and none is a general-purpose replacement for traditional OR solvers.

However, RL and ML are potentially more scalable because you can train those solutions on a cluster of machines, rather than running an OR solver on a single machine. RL agents can also learn from experience, giving them flexibility to handle scenarios that may be difficult to incorporate into an OR solver. Quantum computing solutions are promising but the current state of the art for quantum computers limits their application to small-scale problems at the moment. All of these alternatives can potentially derive a solution more quickly than an OR solver.

Further Reading:

How MOIA built a fully automated GDPR compliant data lake using AWS Lake Formation, AWS Glue, and AWS CodePipeline

Post Syndicated from Leonardo Pêpe original https://aws.amazon.com/blogs/big-data/how-moia-built-a-fully-automated-gdpr-compliant-data-lake-using-aws-lake-formation-aws-glue-and-aws-codepipeline/

This is a guest blog post co-written by Leonardo Pêpe, a Data Engineer at MOIA.

MOIA is an independent company of the Volkswagen Group with locations in Berlin and Hamburg, and operates its own ride pooling services in Hamburg and Hanover. The company was founded in 2016 and develops mobility services independently or in partnership with cities and existing transport systems. MOIA’s focus is on ride pooling and the holistic development of the software and hardware for it. In October 2017, MOIA started a pilot project in Hanover to test a ride pooling service, which was brought into public operation in July 2018. MOIA covers the entire value chain in the area of ​​ride pooling. MOIA has developed a ridesharing system to avoid individual car traffic and use the road infrastructure more efficiently.

In this post, we discuss how MOIA uses AWS Lake Formation, AWS Glue, and AWS CodePipeline to store and process gigabytes of data on a daily basis to serve 20 different teams with individual user data access and implement fine-grained control of the data lake to comply with General Data Protection Regulation (GDPR) guidelines. This involves controlling access to data at a granular level. The solution enables MOIA’s fast pace of innovation to automatically adapt user permissions to new tables and datasets as they become available.

Background

Each MOIA vehicle can carry six passengers. Customers interact with the MOIA app to book a trip, cancel a trip, and give feedback. The highly distributed system prepares multiple offers to reach their destination with different pickup points and prices. Customers select an option and are picked up from their chosen location. All interactions between the customers and the app, as well as all the interactions between internal components and systems (the backend’s and vehicle’s IoT components), are sent to MOIA’s data lake.

Data from the vehicle, app, and backend must be centralized to have the synchronization between trips planned and implemented, and then to collect passenger feedback. To provide different pricing and routing options, MOIA needed centralized data. MOIA decided to build and secure its Amazon Simple Storage Service (Amazon S3) based Data Lake using AWS Lake Formation.

Different MOIA teams that includes Data Analysts, Data Scientists and Data Engineers need to access centralized data from different sources for the development and operations of the application workloads. It’s a legal requirement to control the access and format of the data to these different teams. The app development team needs to understand customer feedback in an anonymized way, pricing-related data must be accessed only by the business analytics team, vehicle data is meant to be used only by the vehicle maintenance team, and the routing team needs access to customer location and destination.

Architecture overview

The following diagram illustrates MOIA solution architecture.

The solution has the following components:

  1. Data input – The apps, backend, and IoT devices are continuously streaming event messages via Amazon Kinesis Data Streams in a nested information in a JSON format.
  2. Data transformation – When the raw data is received, the data pipeline orchestrator (Apache Airflow) starts the data transformation jobs to meet requirements for respective data formats. Amazon EMR, AWS Lambda, or AWS Fargate cleans, transforms, and loads the events in the S3 data lake.
  3. Persistence layer – After the data is transformed and normalized, AWS Glue crawlers classify the data to determine the format, schema, and associated properties. They group data into tables or partitions and write metadata to the AWS Glue Data Catalog based on the structured partitions created in Amazon S3. The data is written on Amazon S3 partitioned by its event type and timestamp so it can be easily accessed via Amazon Athena and the correct permissions can be applied using Lake Formation. The crawlers are run at a fixed interval in accordance with the batch jobs, so no manual runs are required.
  4. Governance layer – Access on the data lake is managed using the Lake Formation governance layer. MOIA uses Lake Formation to centrally define security, governance, and auditing policies in one place. These policies are consistently implemented, which eliminates the need to manually configure them across security services like AWS Identity and Access Management (IAM), AWS Key Management Service (AWS KMS), storage services like Amazon S3, or analytics and machine learning (ML) services like Amazon Redshift, Athena, Amazon EMR for Apache Spark. Lake Formation reduces the effort in configuring policies across services and provides consistent assistance for GDPR requirements and compliance. When a user makes a request to access Data Catalog resources or underlying data from the data lake, for the request to succeed, it must pass permission checks by both IAM and Lake Formation. Lake Formation permissions control access to Data Catalog resources, Amazon S3 locations, and the underlying data at those locations. User permissions to the data on the table, row, or column level are defined in Lake Formation, so that users only access the data if they’re authorized.
  5. Data access layer – After the flattened and structured data is available to be accessed by the data analysts, scientists, and engineers they can use Amazon Redshift or Athena to perform SQL analysis, or AWS Data Wrangler and Apache Spark to perform programmatic analysis and build ML use cases.
  6. Infrastructure orchestration and data pipeline – After the data is ingested and transformed, the infrastructure orchestration layer (CodePipeline and Lake Formation) creates or updates the governance layer with the proper and predefined permissions periodically every hour (see more about the automation in the governance layer later in this post).

Governance challenge

MOIA wants to evolve their ML models for routing, demand prediction, and business models continuously. This requires MOIA to constantly review models and update them, therefore power users such as data administrators and engineers frequently redesign the table schemas in the AWS Glue Data Catalog as part of the data engineering workflow. This highly dynamic metadata transformation requires an equally dynamic governance layer pipeline that can assign the right user permissions to all tables and adapt to these changes transparently without disruptions to end-users. Due to GDPR requirements, there is no room for error, and manual work is required to assign the right permissions according to GDPR compliance on the tables in the data lake with many terabytes of data. Without automation, many developers are needed for administration; this adds human error into the workflows, which is not acceptable. Manual administration and access management isn’t a scalable solution. MOIA needed to innovate faster with GDPR compliance with a small team of developers.

Data schema and data structure often changes at MOIA, resulting in new tables being created in the data lake. To guarantee that new tables inherit the permissions granted to the same group of users who already have access to the entire database, MOIA uses an automated process that grants Lake Formation permission to newly created tables, columns, and databases, as described in the next section.

Solution overview

The following diagram illustrates the continuous deployment loop using AWS CloudFormation.

The workflow contains the following steps:

  1. MOIA data scientists and engineers create or modify new tables to evolve the business and ML models. This results in new tables or changes in the existing schema of the table.
  2. The data pipeline is orchestrated via Apache Airflow, which controls the whole cycle of ingestion, cleansing, and data transformation using Spark jobs (as shown in Step 1). When the data is in its desired format and state, Apache Airflow triggers the AWS Glue crawler job to discover the data schema in the S3 bucket and add the new partitions to the Data Catalog. Apache Airflow also triggers CodePipeline to assign the right permissions on all the tables.
  3. Apache Airflow triggers CodePipeline every hour, which uses MOIA’s libraries to discover the new tables, columns, and databases, and grants Lake Formation permissions in the AWS CloudFormation template for all tables, including new and modified. This orchestration layer ensures Lake Formation permissions are granted. Without this step, nobody can access new tables, and new data isn’t visible to data scientists, business analysts, and others who need it.
  4. MOIA uses Stacker and Troposphere to identify all the tables, assign the tables the right permissions, and deploy the CloudFormation stack. Troposphere is the infrastructure as code (IaC) framework that renders the CloudFormation templates, and stacker helps with the parameterization and deployment of the templates on AWS. Stacker also helps produce reusable templates in the form of blueprints as pure Python code, which can be easily parameterized using YAML files. The Python code uses Boto3 clients to lookup the Data Catalog and search for databases and tables.
  5. The stacker blueprint libraries developed by MOIA (which internally use Boto3 and troposphere) are used to discover newly created databases or tables in the Data Catalog.
  6. A Permissions configuration file allows MOIA to predefine data lake tables (Can be a wildcard indicating all tables available in one database) and specific personas who have access to these tables. These roles are split into different categories called lake personas, and have specific access levels. Example lake personas include data domain engineers, data domain technical users, data administrators, and data analysts. Users and their permissions are defined in the file. In case access to personally identifiable information (PII) data is granted, the GDPR officer ensures that a record of processing activities is in place and the usage of personal data is documented according to the law.
  7. MOIA uses stacker to read the predefined permissions file, and uses the customized stacker blueprints library containing the logic to assign permissions to lake personas for each of the tables discovered in Step 5.
  8. The discovered and modified tables are matched with predefined permissions in Step 6 and Step 7. Stacker uses YAML format as the input for the CloudFormation template parameters to define users and data access permissions. These parameters are related to the user’s roles and define access to the tables. Based on this, MOIA creates a customized stacker that creates AWS CloudFormation resources dynamically. The following are the example stacks in AWS CloudFormation:
    1. data-lake-domain-database – Holds all resources that are relevant during the creation of the database, which includes lake permissions at the database level that allow the domain database admin personas to perform operations in the database.
    2. data-lake-domain-crawler – Scans Amazon S3 constantly to discover and create new tables and updates.
    3. data-lake-lake-table-permissions – Uses Boto3 combined with troposphere to list the available tables and grant lake permissions to the domain roles that access it.

This way, new CloudFormation templates are created, containing permissions for new or modified tables. This process guarantees a fully automated governance layer for the data lake. The generated CloudFormation template contains Lake Formation permission resources for each table, database, or column. The process of managing Lake Formation permissions on Data Catalog databases, tables, and columns is simplified by granting Data Catalog permissions using the Lake Formation tag-based access control method. The advantage of generating a CloudFormation template is audibility. When the new version of the CloudFormation stack is prepared with an access control set on new or modified tables, administrators can compare that stack with the older version to discover newly prepared and modified tables. MOIA can view the differences via the AWS CloudFormation console before new stack deployment.

  1. Either the CloudFormation stack is deployed in the account with the right permissions for all users, or a stack deployment failure notice is sent to the developers via a Slack channel (Step 10). This ensures the visibility in the deployment process and failed pipelines in permission assignment.
  2. Whenever the pipeline fails, MOIA receives the notification via Slack so the engineers can promptly fix the errors. This loop is very important to guarantee that whenever the schema changes, those changes are reflected in the data lake without manual intervention.
  3. Following this automated pipeline, all the tables are assigned right permissions.

Benefits

This solution delivers the following benefits:

  • Automated enforcement of data access permissions allows people to access the data without manual interventions in a scalable way. With automation, 1,000 hours of manual work are saved every year.
  • The GDPR team does the assessment internally every month. The GDPR team constantly provides guidance and approves each change in permissions. This audit trail for records of processing activities is automated with the help of Lake Formation (otherwise it needs a dedicated human resource).
  • The automated workflow of permission assignment is integrated into existing CI/CD processes, resulting in faster onboarding of new teams, features, and dataset releases. An average of 48 releases are done each month (including major and minor version releases and new event types). Onboarding new teams and forming internal new teams is very easy now.
  • This solution enables you to create a data lake using IaC processes that are easy and GDPR-compliant.

Conclusion

MOIA has created scalable, automated, and versioned permissions with a GDPR-supported, governed data lake using Lake Formation. This solution helps them bring new features and models to market faster, and reduces administrative and repetitive tasks. MOIA can focus on 48 average releases every month, contributing to a great customer experience and new data insights.


About the Authors

Leonardo Pêpe is a Data Engineer at MOIA. With a strong background in infrastructure and application support and operations, he is immersed in the DevOps philosophy. He’s helping MOIA build automated solutions for its data platform and enabling the teams to be more data-driven and agile. Outside of MOIA, Leonardo enjoys nature, Jiu-Jitsu and martial arts, and explores the good of life with his family.

 

Sushant Dhamnekar is a Solutions Architect at AWS. As a trusted advisor, Sushant helps automotive customers to build highly scalable, flexible, and resilient cloud architectures, and helps them follow the best practices around advanced cloud-based solutions. Outside of work, Sushant enjoys hiking, food, travel, and CrossFit workouts.

 

 

Shiv Narayanan is Global Business Development Manager for Data Lakes and Analytics solutions at AWS. He works with AWS customers across the globe to strategize, build, develop and deploy modern data platforms. Shiv loves music, travel, food and trying out new tech.

Create a custom Amazon S3 Storage Lens metrics dashboard using Amazon QuickSight

Post Syndicated from Jignesh Gohel original https://aws.amazon.com/blogs/big-data/create-amazon-s3-storage-lens-metrics-dashboard-amazon-quicksight/

Companies use Amazon Simple Storage Service (Amazon S3) for its flexibility, durability, scalability, and ability to perform many things besides storing data. This has led to an exponential rise in the usage of S3 buckets across numerous AWS Regions, across tens or even hundreds of AWS accounts. To optimize costs and analyze security posture, Amazon S3 Storage Lens provides a single view of object storage usage and activity across your entire Amazon S3 storage. S3 Storage Lens includes an embedded dashboard to understand, analyze, and optimize storage with over 29 usage and activity metrics, aggregated for your entire organization, with drill-downs for specific accounts, Regions, buckets, or prefixes. In addition to being accessible in a dashboard on the Amazon S3 console, the raw data can also be scheduled for export to an S3 bucket.

For most customers, the S3 Storage Lens dashboard will cover all your needs. However, you may require specialized views of your S3 Storage Lens metrics, including combining data across multiple AWS accounts, or with external data sources. For such cases, you can use Amazon QuickSight, which is a scalable, serverless, embeddable, machine learning (ML)-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights.

In this post, you learn how to use QuickSight to create simple customized dashboards to visualize S3 Storage Lens metrics. Specifically, this solution demonstrates two customization options:

  • Combining S3 Storage Lens metrics with external sources and being able to filter and visualize the metrics based on one or multiple accounts
  • Restricting users to view Amazon S3 metrics data only for specific accounts

You can further customize these dashboards based on your needs.

Solution architecture

The following diagram shows the high-level architecture of this solution. In addition to S3 Storage Lens and QuickSight, we use other AWS Serverless services like AWS Glue and Amazon Athena.

Solution Architecture for Amazon S3 Storage Lens custom metrics

The data flow includes the following steps:

  1. S3 Storage Lens collects the S3 metrics and exports them daily to a user-defined S3 bucket. Note that first we need to activate S3 Storage Lens from the Amazon S3 console and configure it to export the file either in CSV or Apache Parquet format.
  2. An AWS Glue crawler scans the data from the S3 bucket and populates the AWS Glue Data Catalog with tables. It automatically infers schema, format, and data types from the S3 bucket.
  3. You can schedule the crawler to run at regular intervals to keep metadata, table definitions, and schemas in sync with data in the S3 bucket. It automatically detects new partitions in Amazon S3 and adds the partition’s metadata to the AWS Glue table.
  4. Athena performs the following actions:
    • Uses the table populated by the crawler in Data Catalog to fetch the schema.
    • Queries and analyzes the data in Amazon S3 directly using standard SQL.
  5. QuickSight performs the following actions:
    • Uses the Athena connector to import the Amazon S3 metrics data.
    • Fetches the external data from a custom CSV file.

To demonstrate this, we have a sample CSV file that contains the mapping of AWS account numbers to team names owning these accounts. QuickSight combines these datasets using the data source join feature.

  1. When the combined data is available in QuickSight, users can create custom analysis and dashboards, apply appropriate QuickSight permissions, and share dashboards with other users.

At a high level, this solution requires you to complete the following steps:

  1. Enable S3 Storage Lens in your organization’s payer account or designate a member account. For instructions to have a member account as a delegated administrator, see Enabling a delegated administrator account for Amazon S3 Storage Lens.
  2. Set up an AWS Glue crawler, which populates the Data Catalog to query S3 Storage Lens data using Athena.
  3. Use QuickSight to import data (using the Athena connector) and create custom visualizations and dashboards that can be shared across multiple QuickSight users or groups.

Enable and configure the S3 Storage Lens dashboard

S3 Storage Lens includes an interactive dashboard available on the Amazon S3 console. It shows organization-wide visibility into object storage usage, activity trends, and makes actionable recommendations to improve cost-efficiency and apply data protection best practices. First you need to activate S3 Storage Lens via the Amazon S3 console. After it’s enabled, you can access an interactive dashboard containing preconfigured views to visualize storage usage and activity trends, with contextual recommendations. Most importantly, it also provides the ability to export metrics in CSV or Parquet format to an S3 bucket of your choice for further use. We use this export metrics feature in our solution. The following steps provide details on how you can enable this feature in your account.

  1. On the Amazon S3 console, under Storage Lens in the navigation pane, choose Dashboards.
  2. Choose Create dashboard.

Create S3 Storage Dashboard

  1. Provide the appropriate details in the Create dashboard
    • Make sure to select Include all accounts in your organization, Include Regions, and Include all Regions.

S3 Storage Lens Dashboard Configure

S3 Storage Lens has two tiers: Free metrics, which is free of charge, automatically available for all Amazon S3 customers, and contains 15 usage-related metrics; and Advanced metrics and recommendations, which has an additional charge, but includes all 29 usage and activity metrics with 15-month data retention, and contextual recommendations. For this solution, we select Free metrics. If you need additional metrics, you may select Advanced metrics.

  1. For Metrics export, select Enable.
  2. For Choose and output format, select Apache Parquet.
  3. For Destination bucket, select This account.
  4. For Destination, enter your S3 bucket path.

S3 Storage Lens Metrics Export Configuration

We highly recommend following security best practices for the S3 bucket you use, along with server-side encryption available with export. You can use an Amazon S3 key (SSE-S3) or AWS Key Management Service key (SSE-KMS) as encryption key types.

  1. Choose Create dashboard.

The data population process can take up to 48 hours. Proceed to the next steps only after the dashboard is available.

Set up the AWS Glue crawler

AWS Glue is serverless, fully managed extract, transform, and load (ETL) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores and data streams. AWS Glue consists of a central metadata repository known as the AWS Glue Data Catalog, an ETL engine, and a flexible scheduler that handles dependency resolution, job monitoring, and retries. We can use the AWS Glue to discover data, transform it, and make it available for search and querying. The AWS Glue Data Catalog is an index to the location, schema, and runtime metrics of your data. Athena uses this metadata definition to query data available in Amazon S3 using simple SQL statements.

The AWS Glue crawler populates the Data Catalog with tables from various sources, including Amazon S3. When the crawler runs, it classifies data to determine the format, schema, and associated properties of the raw data, performs grouping of data into tables or partitions, and writes metadata to the Data Catalog. You can configure the crawler to run at specific intervals to make sure the Data Catalog is in sync with the underlying source data.

For our solution, we use these services to catalog and query exported S3 Storage Lens metrics data. First, we create the crawler via the AWS Glue console. For the purpose of this example, we provide an AWS CloudFormation template that deploys the required AWS resources. This template creates the CloudFormation stack with three AWS resources in your AWS account:

When you create your stack with the CloudFormation template, provide the following information:

  • AWS Glue database name
  • AWS Glue crawler name
  • S3 URL path pointing to the reports folder where S3 Storage Lens has exported metrics data. For example, s3://[Name of the bucket]/StorageLens/o-lcpjprs6wq/s3-storage-lense-parquet-v1/V_1/reports/.

After the stack is complete, navigate to the AWS Glue console and confirm that a new crawler job is listed on the Crawlers page. When the crawler runs for the first time, it creates the table reports in the Data Catalog. The Data Catalog may need to be periodically refreshed, so this job is configured to run every day at midnight to sync the data. You can change this configuration to your desired schedule.

After the crawler job runs, we can confirm that the data is accessible using the following query in Athena (make sure to run this query in the database provided in the CloudFormation template):

select * from reports limit 10

Running this query should return results similar to the following screenshot.

Query Results

Create a QuickSight dashboard

When the data is available to access using Athena, we can use QuickSight to create customized analytics and publish dashboards across multiple users. This process involves creating a new QuickSight dataset, creating the analysis using this dataset, creating the dashboard, and configuring user permissions and security.

To get started, you must be signed in to QuickSight using the same payer account. If you’re signing into QuickSight for the first time, you’re prompted to complete the initial signup process (for example, choosing QuickSight Enterprise Edition). You’re also required to provide QuickSight access to your S3 bucket and Athena. For instructions on adding permissions, see Insufficient Permissions When Using Athena with Amazon QuickSight.

  1. In the QuickSight navigation pane, choose Datasets.
  2. Choose New dataset and select Athena.

QuickSight Create Dataset

  1. For Data source name, enter a name.
  2. Choose Create data source.

QuickSight create Athena Dataset

  1. For Catalog, choose AwsDataCatalog.
  2. For Database, choose the AWS Glue database that contains the table for S3 Storage Lens.
  3. For Tables, select your table (for this post, reports).

QuickSight table selection

  1. Choose Edit dataset and choose the query mode SPICE.
  2. Change the format of report_date and dt to Date.
  3. Choose Save.

We can use the cross data source join feature in QuickSight to connect external data sources to the S3 Storage Lens dataset. For this example, let’s say we want to visualize the number of S3 buckets mapped to the internal teams. This data is external to S3 Storage Lens and stored in a CSV file, which contains the mapping between the account numbers and internal team names.

Account to Team Mapping

  1. To import this data into our existing dataset, choose Add data.

QuickSight add external data

  1. Choose Upload a file to import the CSV file to the dataset.

QuickSight Upload External File

We’re redirected to the join configuration screen. From here, we can configure the join type and join clauses to connect these two data sources. For more information about the cross data source join functionality, see Joining across data sources on Amazon QuickSight.

  1. For this example, we need to perform the left join on columns aws_account_number (from the reports table) and Account (from the Account-to-Team-mapping table). This left join returns all records from the reports table and matching records from Account-to-Team-mapping.
  2. Choose Apply after selecting this configuration.

QuickSight DataSet Join

  1. Choose Save & visualize.

From here, you can create various analyses and visualizations on the imported datasets. For instructions on creating visualizations, see Creating an Amazon QuickSight Visual. We provide a sample template you can use to get the basic dashboard. This dashboard provides metrics for total Amazon S3 storage size, object count, S3 bucket by internal team, and more. It also allows authorized users to filter the metrics based on accounts and report dates. This is a simple report that can be further customized based on your needs.

Quicksight Final Dashboard

S3 Storage Lens’s IAM security policies don’t apply to imported data into QuickSight. So before you share this dashboard with anyone, one might want to restrict access according to the security requirement and business role of the user. For a comprehensive set of security features, see AWS Security in Amazon QuickSight. For implementation examples, see Applying row-level and column-level security on Amazon QuickSight dashboards. In our example, instead of all users having access to view S3 Storage Lens data for all accounts, you might want to restrict user access to only specific accounts.

QuickSight provides a feature called row-level security that can restrict user access to only a subset of table rows (or records). You can base the selection of these subsets of rows on filter conditions defined on specific columns.

For our current example, we want to allow user access to view the Amazon S3 metrics dashboard only for a few accounts. For this, we can use the column aws_account_number as filter criteria with account number values. We can implement this by creating a CSV file with columns named UserName and aws_account_number, and adding the rows for users and a list of account numbers (comma-separated). In the following example file, we have added a sample value for the user awslabs-qs-1 with a specific account. This means that user awslabs-qs-1 can only see the rows (or records) that match with the corresponding aws_account_number values specified in the permission CSV.

QuickSight Permissions file

For instructions on applying a permission rule file, see Using Row-Level Security (RLS) to Restrict Access to a Dataset.

You can further customize this QuickSight analysis to produce additional visualizations, apply additional permissions, and publish it to enterprise users and groups with various levels of security.

Conclusion

Harnessing the knowledge of S3 Storage Lens metrics with other custom data enables you to discover anomalies and identify cost-efficiencies across accounts. In this post, we used serverless components to build a workflow to use this data for real-time visualization. You can use this workflow to scale up and design an enterprise-level solution with a multi-account strategy and control fine-grained access to its data using the QuickSight row-level security feature.


About the Authors

Jignesh Gohel is a Technical Account Manager at AWS. In this role, he provides advocacy and strategic technical guidance to help plan and build solutions using best practices, and proactively keep customers’ AWS environments operationally healthy. He is passionate about building modular and scalable enterprise systems on AWS using serverless technologies. Besides work, Jignesh enjoys spending time with family and friends, traveling and exploring the latest technology trends.

 

Suman Koduri is a Global Category Lead for Data & Analytics category in AWS Marketplace. He is focused towards business development activities to further expand the presence and success of Data & Analytics ISVs in AWS Marketplace.  In this role, he leads the scaling, and evolution of new and existing ISVs, as well as field enablement and strategic customer advisement for the same. In his spare time, he loves running half marathon’s and riding his motorcycle.

How to Accelerate Building a Lake House Architecture with AWS Glue

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/how-to-accelerate-building-a-lake-house-architecture-with-aws-glue/

Customers are building databases, data warehouses, and data lake solutions in isolation from each other, each having its own separate data ingestion, storage, management, and governance layers. Often these disjointed efforts to build separate data stores end up creating data silos, data integration complexities, excessive data movement, and data consistency issues. These issues are preventing customers from getting deeper insights. To overcome these issues and easily move data around, a Lake House approach on AWS was introduced.

In this blog post, we illustrate the AWS Glue integration components that you can use to accelerate building a Lake House architecture on AWS. We will also discuss how to derive persona-centric insights from your Lake House using AWS Glue.

Components of the AWS Glue integration system

AWS Glue is a serverless data integration service that facilitates the discovery, preparation, and combination of data. It can be used for analytics, machine learning, and application development. AWS Glue provides all of the capabilities needed for data integration. So you can start analyzing your data and putting it to use in minutes, rather than months.

The following diagram illustrates the various components of the AWS Glue integration system.

Figure 1. AWS Glue integration components

Figure 1. AWS Glue integration components

Connect – AWS Glue allows you to connect to various data sources anywhere

Glue connector: AWS Glue provides built-in support for the most commonly used data stores. You can use Amazon Redshift, Amazon RDS, Amazon Aurora, Microsoft SQL Server, MySQL, MongoDB, or PostgreSQL using JDBC connections. AWS Glue also allows you to use custom JDBC drivers in your extract, transform, and load (ETL) jobs. For data stores that are not natively supported such as SaaS applications, you can use connectors. You can also subscribe to several connectors offered in the AWS Marketplace.

Glue crawlers: You can use a crawler to populate the AWS Glue Data Catalog with tables. A crawler can crawl multiple data stores in a single pass. Upon completion, the crawler creates or updates one or more tables in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these Data Catalog tables as sources and targets.

Catalog – AWS Glue simplifies data discovery and governance

Glue Data Catalog: The Data Catalog serves as the central metadata catalog for the entire data landscape.

Glue Schema Registry: The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. With AWS Glue Schema Registry, you can manage and enforce schemas on your data streaming applications.

Data quality – AWS Glue helps you author and monitor data quality rules

Glue DataBrew: AWS Glue DataBrew allows data scientists and data analysts to clean and normalize data. You can use a visual interface, reducing the time it takes to prepare data by up to 80%. With Glue DataBrew, you can visualize, clean, and normalize data directly from your data lake, data warehouses, and databases.

Curate data: You can use either Glue development endpoint or AWS Glue Studio to curate your data.

AWS Glue development endpoint is an environment that you can use to develop and test your AWS Glue scripts. You can choose either Amazon SageMaker notebook or Apache Zeppelin notebook as an environment.

AWS Glue Studio is a new visual interface for AWS Glue that supports extract-transform-and-load (ETL) developers. You can author, run, and monitor AWS Glue ETL jobs. You can now use a visual interface to compose jobs that move and transform data, and run them on AWS Glue.

AWS Data Exchange makes it easy for AWS customers to securely exchange and use third-party data in AWS. This is for data providers who want to structure their data across multiple datasets or enrich their products with additional data. You can publish additional datasets to your products using the AWS Data Exchange.

Deequ is an open-source data quality library developed internally at Amazon, for data quality. It provides multiple features such as automatic constraint suggestions and verification, metrics computation, and data profiling.

Build a Lake House architecture faster, using AWS Glue

Figure 2 illustrates how you can build a Lake House using AWS Glue components.

Figure 2. Building lake house architectures with AWS Glue

Figure 2. Building Lake House architectures with AWS Glue

The architecture flow follows these general steps:

  1. Glue crawlers scan the data from various data sources and populate the Data Catalog for your Lake House.
  2. The Data Catalog serves as the central metadata catalog for the entire data landscape.
  3. Once data is cataloged, fine-grained access control is applied to the tables through AWS Lake Formation.
  4. Curate your data with business and data quality rules by using Glue Studio, Glue development endpoints, or Glue DataBrew. Place transformed data in a curated Amazon S3 for purpose built analytics downstream.
  5. Facilitate data movement with AWS Glue to and from your data lake, databases, and data warehouse by using Glue connections. Use AWS Glue Elastic views to replicate the data across the Lake House.

Derive persona-centric insights from your Lake House using AWS Glue

Many organizations want to gather observations from increasingly larger volumes of acquired data. These insights help them make data-driven decisions with speed and agility. They must use a central data lake, a ring of purpose-built data services, and data warehouses based on persona or job function.

Figure 3 illustrates the Lake House inside-out data movement with AWS Glue DataBrew, Amazon Athena, Amazon Redshift, and Amazon QuickSight to perform persona-centric data analytics.

Figure 3. Lake house persona-centric data analytics using AWS Glue

Figure 3. Lake House persona-centric data analytics using AWS Glue

This shows how Lake House components serve various personas in an organization:

  1. Data ingestion: Data is ingested to Amazon Simple Storage Service (S3) from different sources.
  2. Data processing: Data curators and data scientists use DataBrew to validate, clean, and enrich the data. Amazon Athena is also used to run improvised queries to analyze the data in the lake. The transformation is shared with data engineers to set up batch processing.
  3. Batch data processing: Data engineers or developers set up batch jobs in AWS Glue and AWS Glue DataBrew. Jobs can be initiated by an event, or can be scheduled to run periodically.
  4. Data analytics: Data/Business analysts can now analyze prepared dataset in Amazon Redshift or in Amazon S3 using Athena.
  5. Data visualizations: Business analysts can create visuals in QuickSight. Data curators can enrich data from multiple sources. Admins can enforce security and data governance. Developers can embed QuickSight dashboard in applications.

Conclusion

Using a Lake House architecture will help you get persona-centric insights quickly from all of your data based on user role or job function. In this blog post, we describe several AWS Glue components and AWS purpose-built services that you can use to build Lake House architectures on AWS. We have also presented persona-centric Lake House analytics architecture using AWS Glue, to help you derive insights from your Lake House.

Read more and get started on building Lake House Architectures on AWS.

How MEDHOST’s cardiac risk prediction successfully leveraged AWS analytic services

Post Syndicated from Pandian Velayutham original https://aws.amazon.com/blogs/big-data/how-medhosts-cardiac-risk-prediction-successfully-leveraged-aws-analytic-services/

MEDHOST has been providing products and services to healthcare facilities of all types and sizes for over 35 years. Today, more than 1,000 healthcare facilities are partnering with MEDHOST and enhancing their patient care and operational excellence with its integrated clinical and financial EHR solutions. MEDHOST also offers a comprehensive Emergency Department Information System with business and reporting tools. Since 2013, MEDHOST’s cloud solutions have been utilizing Amazon Web Services (AWS) infrastructure, data source, and computing power to solve complex healthcare business cases.

MEDHOST can utilize the data available in the cloud to provide value-added solutions for hospitals solving complex problems, like predicting sepsis, cardiac risk, and length of stay (LOS) as well as reducing re-admission rates. This requires a solid foundation of data lake and elastic data pipeline to keep up with multi-terabyte data from thousands of hospitals. MEDHOST has invested a significant amount of time evaluating numerous vendors to determine the best solution for its data needs. Ultimately, MEDHOST designed and implemented machine learning/artificial intelligence capabilities by leveraging AWS Data Lab and an end-to-end data lake platform that enables a variety of use cases such as data warehousing for analytics and reporting.

Since you’re reading this post, you may also be interested in the following:

Getting started

MEDHOST’s initial objectives in evaluating vendors were to:

  • Build a low-cost data lake solution to provide cardiac risk prediction for patients based on health records
  • Provide an analytical solution for hospital staff to improve operational efficiency
  • Implement a proof of concept to extend to other machine learning/artificial intelligence solutions

The AWS team proposed AWS Data Lab to architect, develop, and test a solution to meet these objectives. The collaborative relationship between AWS and MEDHOST, AWS’s continuous innovation, excellent support, and technical solution architects helped MEDHOST select AWS over other vendors and products. AWS Data Lab’s well-structured engagement helped MEDHOST define clear, measurable success criteria that drove the implementation of the cardiac risk prediction and analytical solution platform. The MEDHOST team consisted of architects, builders, and subject matter experts (SMEs). By connecting MEDHOST experts directly to AWS technical experts, the MEDHOST team gained a quick understanding of industry best practices and available services allowing MEDHOST team to achieve most of the success criteria at the end of a four-day design session. MEDHOST is now in the process of moving this work from its lower to upper environment to make the solution available for its customers.

Solution

For this solution, MEDHOST and AWS built a layered pipeline consisting of ingestion, processing, storage, analytics, machine learning, and reinforcement components. The following diagram illustrates the Proof of Concept (POC) that was implemented during the four-day AWS Data Lab engagement.

Ingestion layer

The ingestion layer is responsible for moving data from hospital production databases to the landing zone of the pipeline.

The hospital data was stored in an Amazon RDS for PostgreSQL instance and moved to the landing zone of the data lake using AWS Database Migration Service (DMS). DMS made migrating databases to the cloud simple and secure. Using its ongoing replication feature, MEDHOST and AWS implemented change data capture (CDC) quickly and efficiently so MEDHOST team could spend more time focusing on the most interesting parts of the pipeline.

Processing layer

The processing layer was responsible for performing extract, tranform, load (ETL) on the data to curate them for subsequent uses.

MEDHOST used AWS Glue within its data pipeline for crawling its data layers and performing ETL tasks. The hospital data copied from RDS to Amazon S3 was cleaned, curated, enriched, denormalized, and stored in parquet format to act as the heart of the MEDHOST data lake and a single source of truth to serve any further data needs. During the four-day Data Lab, MEDHOST and AWS targeted two needs: powering MEDHOST’s data warehouse used for analytics and feeding training data to the machine learning prediction model. Even though there were multiple challenges, data curation is a critical task which requires an SME. AWS Glue’s serverless nature, along with the SME’s support during the Data Lab, made developing the required transformations cost efficient and uncomplicated. Scaling and cluster management was addressed by the service, which allowed the developers to focus on cleaning data coming from homogenous hospital sources and translating the business logic to code.

Storage layer

The storage layer provided low-cost, secure, and efficient storage infrastructure.

MEDHOST used Amazon S3 as a core component of its data lake. AWS DMS migration tasks saved data to S3 in .CSV format. Crawling data with AWS Glue made this landing zone data queryable and available for further processing. The initial AWS Glue ETL job stored the parquet formatted data to the data lake and its curated zone bucket. MEDHOST also used S3 to store the .CSV formatted data set that will be used to train, test, and validate its machine learning prediction model.

Analytics layer

The analytics layer gave MEDHOST pipeline reporting and dashboarding capabilities.

The data was in parquet format and partitioned in the curation zone bucket populated by the processing layer. This made querying with Amazon Athena or Amazon Redshift Spectrum fast and cost efficient.

From the Amazon Redshift cluster, MEDHOST created external tables that were used as staging tables for MEDHOST data warehouse and implemented an UPSERT logic to merge new data in its production tables. To showcase the reporting potential that was unlocked by the MEDHOST analytics layer, a connection was made to the Redshift cluster to Amazon QuickSight. Within minutes MEDHOST was able to create interactive analytics dashboards with filtering and drill-down capabilities such as a chart that showed the number of confirmed disease cases per US state.

Machine learning layer

The machine learning layer used MEDHOST’s existing data sets to train its cardiac risk prediction model and make it accessible via an endpoint.

Before getting into Data Lab, the MEDHOST team was not intimately familiar with machine learning. AWS Data Lab architects helped MEDHOST quickly understand concepts of machine learning and select a model appropriate for its use case. MEDHOST selected XGBoost as its model since cardiac prediction falls within regression technique. MEDHOST’s well architected data lake enabled it to quickly generate training, testing, and validation data sets using AWS Glue.

Amazon SageMaker abstracted underlying complexity of setting infrastructure for machine learning. With few clicks, MEDHOST started Jupyter notebook and coded the components leading to fitting and deploying its machine learning prediction model. Finally, MEDHOST created the endpoint for the model and ran REST calls to validate the endpoint and trained model. As a result, MEDHOST achieved the goal of predicting cardiac risk. Additionally, with Amazon QuickSight’s SageMaker integration, AWS made it easy to use SageMaker models directly in visualizations. QuickSight can call the model’s endpoint, send the input data to it, and put the inference results into the existing QuickSight data sets. This capability made it easy to display the results of the models directly in the dashboards. Read more about QuickSight’s SageMaker integration here.

Reinforcement layer

Finally, the reinforcement layer guaranteed that the results of the MEDHOST model were captured and processed to improve performance of the model.

The MEDHOST team went beyond the original goal and created an inference microservice to interact with the endpoint for prediction, enabled abstracting of the machine learning endpoint with the well-defined domain REST endpoint, and added a standard security layer to the MEDHOST application.

When there is a real-time call from the facility, the inference microservice gets inference from the SageMaker endpoint. Records containing input and inference data are fed to the data pipeline again. MEDHOST used Amazon Kinesis Data Streams to push records in real time. However, since retraining the machine learning model does not need to happen in real time, the Amazon Kinesis Data Firehose enabled MEDHOST to micro-batch records and efficiently save them to the landing zone bucket so that the data could be reprocessed.

Conclusion

Collaborating with AWS Data Lab enabled MEDHOST to:

  • Store single source of truth with low-cost storage solution (data lake)
  • Complete data pipeline for a low-cost data analytics solution
  • Create an almost production-ready code for cardiac risk prediction

The MEDHOST team learned many concepts related to data analytics and machine learning within four days. AWS Data Lab truly helped MEDHOST deliver results in an accelerated manner.


About the Authors

Pandian Velayutham is the Director of Engineering at MEDHOST. His team is responsible for delivering cloud solutions, integration and interoperability, and business analytics solutions. MEDHOST utilizes modern technology stack to provide innovative solutions to our customers. Pandian Velayutham is a technology evangelist and public cloud technology speaker.

 

 

 

 

George Komninos is a Data Lab Solutions Architect at AWS. He helps customers convert their ideas to a production-ready data product. Before AWS, he spent 3 years at Alexa Information domain as a data engineer. Outside of work, George is a football fan and supports the greatest team in the world, Olympiacos Piraeus.

Simplify data discovery for business users by adding data descriptions in the AWS Glue Data Catalog

Post Syndicated from Karim Hammouda original https://aws.amazon.com/blogs/big-data/simplify-data-discovery-for-business-users-by-adding-data-descriptions-in-the-aws-glue-data-catalog/

In this post, we discuss how to use AWS Glue Data Catalog to simplify the process for adding data descriptions and allows data analysts to access, search, and discover this cataloged metadata with BI tools.

In this solution, we use AWS Glue Data Catalog, to break the silos between cross-functional data producer teams, sometimes also known as domain data experts, and business-focused consumer teams that author business intelligence (BI) reports and dashboards.

Since you’re reading this post, you may also be interested in the following:

Data democratization and the need for self-service BI

To be able to extract insights and get value out of organizational-wide data assets, data consumers like data analysts need to understand the meaning of existing data assets. They rely on data platform engineers to perform such data discovery tasks on their behalf.

Although data platform engineers can programmatically extract and obtain some technical and operational metadata, such as database and table names and sizes, column schemas, and keys, this metadata is primarily used for organizing and manipulating data inside the data lake. They still rely on source data domain experts to gain more knowledge about the meaning of the data, its business context, and classification. It becomes more challenging when data domain experts tend to prioritize operational-critical requests and delay the analytical-related ones.

Such a cycled dependency, as illustrated in the following figure, can delay the organizational strategic vision for implementing a self-service data analytics platform to reduce the time of the data-to-insights process.

Solution overview

The Data Catalog fundamentally holds basic information about the actual data stored in various data sources, including but not limited to Amazon Simple Storage Service (Amazon S3), Amazon Relational Database Service (Amazon RDS), and Amazon Redshift. Information like data location, format, and columns schema can be automatically discovered and stored as tables, where each table specifies a single data store.

Throughout this post, we see how we can use the Data Catalog to make it easy for domain experts to add data descriptions, and for data analysts to access this metadata with BI tools.

First, we use the comment field in Data Catalog schema tables to store data descriptions with more business meaning. Comment fields aren’t like the other schema table fields (such as column name, data type, and partition key), which are typically populated automatically by an AWS Glue crawler.

We also use Amazon AI/ML capabilities to initially identify the description of each data entity. One way to do that is by using the Amazon Comprehend text analysis API. When we provide a sample of values for each data entity type, Amazon Comprehend natural language processing (NLP) models can identify a standard range of data classification, and we can use this as a description for identified data entities.

Next, because we need to identify entities unique to our domain or organization, we can use custom named entity recognition (NER) in Amazon Comprehend to add more metadata that is related to our business domain. One way to train custom NER models is to use Amazon SageMaker Ground Truth; for more information, see Developing NER models with Amazon SageMaker Ground Truth and Amazon Comprehend.

For this post, we use a dataset that has a table schema defined as per TPC-DS, and was generated using a data generator developed as part of AWS Analytics Reference Architecture code samples.

In this example, Amazon Comprehend API recognizes PII-related fields like Aid as a MAC address. While the none PII-related fields like Estatus, aren’t recognized. Therefore, the user enters a custom description manually, and we use the custom NER to automatically populate those fields, as shown in the following diagram.

 

After we add data meanings, we need to expose all the metadata captured in the Data Catalog to various data consumers. This can be done two different ways:

We can also use the latter method to expose the Data Catalog to BI authors comprehending data analyses and dashboards using Amazon QuickSight, so we use the second method for this post.

We do this by defining an Athena dataset that queries the information_schema and allows BI authors to use the QuickSight capability of text search filter to search and discover data using its business meaning (see the following diagram).

Solution details

The core part of this solution is done using AWS Glue jobs. We use two AWS Glue jobs, which are responsible for calling Amazon Comprehend APIs and updating the AWS Glue Data Catalog with added data descriptions accordingly.

The first job (Glue_Comprehend_Job) performs the first stage of detection using the Amazon Comprehend Detect PII API, and the second job (Glue_Comprehend_Custom) uses Amazon Comprehend custom entity recognition for entities labeled by domain experts. The following diagram illustrates this workflow.

We describe the details of each stage in the upcoming sections.

You can integrate this workflow into your existing data processing pipeline, which might be orchestrated with AWS services like AWS Step Functions, Amazon Managed Workflows for Apache Airflow (Amazon MWAA), AWS Glue workflows, or any third-party orchestrator.

The workflow can complement AWS Glue crawler functionality and inherit the same logic for scheduling and running crawlers. On the other end, we can query the updated Data Catalog with data descriptions via Athena (see the following diagram).

To show an end-to-end implementation of this solution, we have adopted a choreographically built architecture with additional AWS Lambda helper functions, which communicate between AWS services, triggering the AWS Glue crawler and AWS Glue jobs.

Stage-one: Enrich the Data Catalog with a standard built-in Amazon Comprehend entity detector

To get started, Choose   to launch a CloudFormation stack.

Define unique S3 bucket name and on the CloudFormation console, accept default values for the parameters.

This CloudFormation stack consists of the following:

  • An AWS Identity Access Management (IAM) role called Lambda-S3-Glue-comprehend.
  • An S3 bucket with a bucket name that can be defined based on preference.
  • A Lambda function called trigger_data_cataloging. This function is automatically triggered when any CSV file is uploaded to the folder row_data inside our S3 bucket. Then it creates an AWS Glue database if one doesn’t exist, and creates and runs an AWS Glue crawler called glue_crawler_comprehend.
  • An AWS Glue job called Glue_Comprehend_Job, which calls Amazon Comprehend APIs and updates the AWS Glue Data Catalog table accordingly.
  • A Lambda function called Glue_comprehend_workflow, which is triggered when the AWS Glue Crawler successfully finishes and calls the AWS Glue job Glue_Comprehend_Job.

To test the solution, create a prefix called row_data under the S3 bucket created from the CF stack, then upload the customer dataset sample to the prefix.

The first Lambda function is triggered to run the subsequent AWS Glue crawler and AWS Glue job to get data descriptions using Amazon Comprehend, and it updates the comment section of the dataset created in the AWS Glue Data Catalog.

Stage-two: Use Amazon Comprehend custom entity recognition

Amazon Comprehend was able to detect some of the entity types within the customer sample dataset. However, for the remaining undetected fields, we can get help from a domain data expert to label a sample dataset using Ground Truth. Then we use the labeled data output to train a custom NER model and rerun the AWS Glue job to update the comment column with a customized data description.

Train an Amazon Comprehend custom entity recognition model

One way to train Amazon Comprehend custom entity recognizers is to get augmented manifest information using Ground Truth to label the data. Ground Truth has a built-in NER task for creating labeling jobs so domain experts can identify entities in text. To learn more about how to create the job, see Named Entity Recognition.

As an example, we tagged three labels entities: customer information ID, current level of education, and customer credit rating. The domain experts get a web interface like one shown in the following screenshot to label the dataset.

We can use the output of the labeling job to train an Amazon Comprehend custom entity recognition model using the augmented manifest.

The augmented manifest option requires a minimum of 1,000 custom entity recognition samples. Another option can be to use a CSV file that contains the annotations of the entity lists for the training dataset. The required format depends on the type of CSV file that we provide. In this post, we use the CSV entity lists option with two sample files:

To create the training job, we can use the Amazon Comprehend console, the AWS Command Line Interface (AWS CLI), or the Amazon Comprehend API. For this post, we use the API to programmatically create a training Lambda function using the AWS SDK for Python, as shown on GitHub.

The training process can take approximately 15 minutes. When the training process is complete, choose the recognizer and make a note of the recognizer ARN, which we use in the next step.

Run custom entity recognition inference

When the training job is complete, create an Amazon Comprehend analysis job using the console or APIs as shown on GitHub.

The process takes approximately 10 minutes, and again we need to make a note of the output job file.

Create an AWS Glue job to update the Data Catalog

Now that we have the Amazon Comprehend inference output, we can use the following AWS CLI command to create an AWS Glue job that updates the Data Catalog Comment fields for this dataset with customized data description.

Download the AWS Glue job script from the GitHub repo, upload to the S3 bucket created from the CF Stack in stage-1, and run the following AWS CLI command:

aws glue create-job 
--name "Glue_Comprehend_Job_custom_entity" 
--role "Lambda-S3-Glue-comprehend" 
--command '{"Name" : "pythonshell", "ScriptLocation" : "s3://<Your S3 bucket>/glue_comprehend_workflow_custom.py","PythonVersion":"3"}'
--default-arguments '{"--extra-py-files": "s3://aws-bigdata-blog/artifacts/simplify-data-discovery-for-business-users/blog/python/library/boto3-1.17.70-py2.py3-none-any.whl" }'

After you create the AWS Glue job, edit the job script and update the bucket and key name variables with the output data location of the Amazon Comprehend analysis jobs and run the AWS Glue job. See the following code:

bucket ="<Bucket Name>"
key = "comprehend_output/<Random number>output/output.tar.gz"

When the job is complete, it updates the Data Catalog with customized data descriptions.

Expose Data Catalog data to data consumers for search and discovery

Data consumers that prefer using SQL can use Athena to run queries against the information_schema.columns table, which includes the comment field of the Data Catalog. See the following code:

SELECT table_catalog,
         table_schema,
         table_name,
         column_name,
         data_type,
         comment
FROM information_schema.columns
WHERE comment LIKE '%customer%'
AND table_name = 'row_data_row_data'

The following screenshot shows our query results.

The query searches all schema columns that might have any data meanings that contain customer; it returns crating, which contains customer in the comment field.

BI authors can use text search instead of SQL to search for data meanings of data stored in an S3 data lake. This can be done by setting up a visual layer on top of Athena inside QuickSight.

QuickSight is scalable, serverless, embeddable, and machine learning (ML) powered BI tool that is deeply integrated with other AWS services.

BI development in QuickSight is organized as a stack of datasets, analyses, and dashboards. We start by defining a dataset from a list of various integrated data sources. On top of this dataset, we can design multiple analyses to uncover hidden insights and trends in the dataset. Finally, we can publish these analyses as dashboards, which is the consumable form that can be shared and viewed across different business lines and stakeholders.

We want to help the BI authors while designing analyses to get a better knowledge of the datasets they’re working on. To do so, we first need to connect to the data source where the metadata is stored, in this case the Athena table information_schema.columns, so we create a dataset to act as a Data Catalog view inside QuickSight.

QuickSight offers different modes of querying data sources, which is decided as part of the dataset creation process. The first mode is called direct query, in which the fetching query runs directly against the external data source. The second mode is a caching layer called QuickSight Super-fast Parallel In-memory Calculation Engine (SPICE), which improves performance when data is shared and retrieved by various BI authors. In this mode, the data is stored locally and can be reused multiple times, instead of running queries against the data source every time the data needs to be retrieved. However, as with all caching solutions, you must take data volume limits into consideration while choosing datasets to be stored in SPICE.

In our case, we choose to keep the Data Catalog dataset in SPICE, because the volume of the dataset is relatively small and won’t consume a lot of SPICE resources. However, we need to decide if we want to refresh the data cached in SPICE. The answer depends on how frequently the data schema and Data Catalog change, but in any case we can use the built-in scheduling within QuickSight to refresh SPICE at the desired interval. For information about triggering a refresh in an event-based manner, see Event-driven refresh of SPICE datasets in Amazon QuickSight.

After we create the Data Catalog view as a dataset inside QuickSight stored in SPICE, we can use row-level security to restrict the access to this dataset. Each BI author has access with respect to their privileges for columns they can view metadata for.

Next, we see how we can allow BI authors to search through data descriptions populated in the comment field of the Data Catalog dataset. QuickSight offers features like filters, parameters, and controls to add more flexibility into QuickSight analyses and dashboards.

Finally, we use the QuickSight capability to add more than one dataset within an analysis view to allow BI authors to switch between the metadata for the dataset and the actual dataset. This allows the BI authors to self-serve, reducing dependency on data platform engineers to decide which columns they should use in their analyses.

To set up a simple Data Catalog search and discovery inside QuickSight, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. For New data sources, choose Amazon Athena.
  4. Name the dataset Data Catalog.
  5. Choose Create data source.
  6. For Choose your table, choose Use custom SQL.
  7. For Enter custom SQL query, name the query Data Catalog Query.
  8. Enter the following query:
SELECT * FROM information_schema.columns
  1. Choose Confirm query.
  2. Select Import to QuickSight SPICE for quicker analytics.
  3. Choose Visualize.

Next, we design an analysis on the dataset we just created to access the Data Catalog through Athena.

When we choose Visualize, we’re redirected to the QuickSight workspace to start designing our analysis.

  1. Under Visual types, choose Table.
  2. Under Fields list, add table_name, column_name, and comment to the Values field well.

Next, we use the filter control feature to allow users to perform text search for data descriptions.

  1. In the navigation pane, choose Filter.
  2. Choose the plus sign (+) to access the Create a new filter list.
  3. On the list of columns, choose comment to be the filter column.
  4. From the options menu (…) on the filter, choose Add to sheet.

We should be able to see a new control being added into our analysis to allow users to search the comment field.

Now we can start a text search for data descriptions that contain customer, where QuickSight shows the list of fields matching the search criteria and provides table and column names accordingly.

Alternatively, we can use parameters to be associated with the filter control if needed, for example to connect one dashboard to another. For more information, see the GitHub repo.

Finally, BI authors can switch between the metadata view that we just created and the actual Athena table view (row_all_row_data), assuming it’s already imported (if not, we can use the same steps from earlier to import the new dataset).

  1. In the navigation pane, choose Visualize.
  2. Choose the pen icon to add, edit, replace, or remove datasets.
  3. Choose Add dataset.
  4. Add row_all_row_data.
  5. Choose Select.

BI authors can now switch between data and metadata datasets.

They now have a metadata view along with the actual data view, so they can better understand the meaning of each column in the dataset they’re working on, and they can read any comment that can be passed from other teams within the organization without needing to do this manually.

Conclusion

In this post, we showed how to build a quick workflow using AWS Glue and Amazon AI/ML services to complement the AWS Glue crawler functionality. You can integrate this workflow into a typical AWS Glue data cataloging and processing pipeline to achieve alignment between cross-functional teams by simplifying and automating the process of adding data descriptions in the Data Catalog. This is an important step in data discovery, and the topic will be covered more in upcoming posts.

This solution is also a step towards implementing data privacy and protection regimes such as the Health Insurance Portability and Accountability Act (HIPAA) and General Data Protection Regulation (GDPR) by identifying sensitive data types like PII and enforcing access polices.

You can find the source code from this post on GitHub and use it to build your own solution. For more information about NER models, see Developing NER models with Amazon SageMaker Ground Truth and Amazon Comprehend.


About the Authors

Karim Hammouda is a Specialist Solutions Architect for Analytics at AWS with a passion for data integration, data analysis, and BI. He works with AWS customers to design and build analytics solutions that contribute to their business growth. In his free time, he likes to watch TV documentaries and play video games with his son.

 

 

Ahmed Raafat is a Senior Solutions Architect at Amazon Web Services, with a passion for machine learning solutions. Ahmed acts as a trusted advisor for many AWS enterprise customers to support and accelerate their cloud journey.