Tag Archives: Amazon Simple Storage Services (S3)

Micro-frontend Architectures on AWS

Post Syndicated from Bryant Bost original https://aws.amazon.com/blogs/architecture/micro-frontend-architectures-on-aws/

A microservice architecture is characterized by independent services that are focused on a specific business function and maintained by small, self-contained teams. Microservice architectures are used frequently for web applications developed on AWS, and for good reason. They offer many well-known benefits such as development agility, technological freedom, targeted deployments, and more. Despite the popularity of microservices, many frontend applications are still built in a monolithic style. For example, they have one large code base that interacts with all backend microservices, and is maintained by a large team of developers.

Monolith Frontend

Figure 1. Microservice backend with monolith frontend

What is a Micro-frontend?

The micro-frontend architecture introduces microservice development principles to frontend applications. In a micro-frontend architecture, development teams independently build and deploy “child” frontend applications. These applications are combined by a “parent” frontend application that acts as a container to retrieve, display, and integrate the various child applications. In this parent/child model, the user interacts with what appears to be a single application. In reality, they are interacting with several independent applications, published by different teams.

Micro Frontend

Figure 2. Microservice backend with micro-frontend

Micro-frontend Benefits

Compared to a monolith frontend, a micro-frontend offers the following benefits:

  • Independent artifacts: A core tenet of microservice development is that artifacts can be deployed independently, and this remains true for micro-frontends. In a micro-frontend architecture, teams should be able to independently deploy their frontend applications with minimal impact to other services. Those changes will be reflected by the parent application.
  • Autonomous teams: Each team is the expert in its own domain. For example, the billing service team members have specialized knowledge. This includes the data models, the business requirements, the API calls, and user interactions associated with the billing service. This knowledge allows the team to develop the billing frontend faster than a larger, less specialized team.
  • Flexible technology choices: Autonomy allows each team to make technology choices that are independent from other teams. For instance, the billing service team could develop their micro-frontend using Vue.js and the profile service team could develop their frontend using Angular.
  • Scalable development: Micro-frontend development teams are smaller and are able to operate without disrupting other teams. This allows us to quickly scale development by spinning up new teams to deliver additional frontend functionality via child applications.
  • Easier maintenance: Keeping frontend repositories small and specialized allows them to be more easily understood, and this simplifies long-term maintenance and testing. For instance, if you want to change an interaction on a monolith frontend, you must isolate the location and dependencies of the feature within the context of a large codebase. This type of operation is greatly simplified when dealing with the smaller codebases associated with micro-frontends.

Micro-frontend Challenges

Conversely, a micro-frontend presents the following challenges:

  • Parent/child integration: A micro-frontend introduces the task of ensuring the parent application displays the child application with the same consistency and performance expected from a monolith application. This point is discussed further in the next section.
  • Operational overhead: Instead of managing a single frontend application, a micro-frontend application involves creating and managing separate infrastructure for all teams.
  • Consistent user experience: In order to maintain a consistent user experience, the child applications must use the same UI components, CSS libraries, interactions, error handling, and more. Maintaining consistency in the user experience can be difficult for child applications that are at different stages in the development lifecycle.

Building Micro-frontends

The most difficult challenge with the micro-frontend architecture pattern is integrating child applications with the parent application. Prioritizing the user experience is critical for any frontend application. In the context of micro-frontends, this means ensuring a user can seamlessly navigate from one child application to another inside the parent application. We want to avoid disruptive behavior such as page refreshes or multiple logins. At its most basic definition, parent/child integration involves the parent application dynamically retrieving and rendering child applications when the parent app is loaded. Rendering the child application depends on how the child application was built, and this can be done in a number of ways. Two of the most popular methods of parent/child integration are:

  1. Building each child application as a web component.
  2. Importing each child application as an independent module. These modules either declares a function to render itself or is dynamically imported by the parent application (such as with module federation).

Registering child apps as web components:

<html>
    <head>
        <script src="https://shipping.example.com/shipping-service.js"></script>
        <script src="https://profile.example.com/profile-service.js"></script>
        <script src="https://billing.example.com/billing-service.js"></script>
        <title>Parent Application</title>
    </head>
    <body>
        <shipping-service />
        <profile-service />
        <billing-service />
    </body>
</html>

Registering child apps as modules:

<html>
    <head>
        <script src="https://shipping.example.com/shipping-service.js"></script>
        <script src="https://profile.example.com/profile-service.js"></script>
        <script src="https://billing.example.com/billing-service.js"></script>
     <title>Parent Application</title>
    </head>
    <body>
    </body>
    <script>
        // Load and render the child applications form their JS bundles.
    </script>
</html>

The following diagram shows an example micro-frontend architecture built on AWS.

AWS Micro Frontend

Figure 3. Micro-frontend architecture on AWS

In this example, each service team is running a separate, identical stack to build their application. They use the AWS Developer Tools and deploy the application to Amazon Simple Storage Service (S3) with Amazon CloudFront. The CI/CD pipelines use shared components such as CSS libraries, API wrappers, or custom modules stored in AWS CodeArtifact. This helps drive consistency across parent and child applications.

When you retrieve the parent application, it should prompt you to log in to an identity provider and retrieve JWTs. In this example, the identity provider is an Amazon Cognito User Pool. After a successful login, the parent application retrieves the child applications from CloudFront and renders them inside the parent application. Alternatively, the parent application can elect to render the child applications on demand, when you navigate to a specific route. The child applications should not require you to log in again to the Amazon Cognito user pool. They should be configured to use the JWT obtained by the parent app or silently retrieve a new JWT from Amazon Cognito.

Conclusion

Micro-frontend architectures introduce many of the familiar benefits of microservice development to frontend applications. A micro-frontend architecture also simplifies the process of building complex frontend applications by allowing you to manage small, independent components.

Integrating Datadog data with AWS using Amazon AppFlow for intelligent monitoring

Post Syndicated from Gopalakrishnan Ramaswamy original https://aws.amazon.com/blogs/big-data/integrating-datadog-data-with-aws-using-amazon-appflow-for-intelligent-monitoring/

Infrastructure and operation teams are often challenged with getting a full view into their IT environments to do monitoring and troubleshooting. New monitoring technologies are needed to provide an integrated view of all components of an IT infrastructure and application system.

Datadog provides intelligent application and service monitoring by bringing together data from servers, databases, containers, and third-party services in the form of a software as a service (SaaS) offering. It provides operations and development professionals the ability to measure application and infrastructure performance, visualize metrics with the help of a unified dashboard and create alerts and notifications.

Amazon AppFlow is a fully managed service that provides integration capabilities by enabling you to transfer data between SaaS applications like Datadog, Salesforce, Marketo, and Slack and AWS services like Amazon Simple Storage Service (Amazon S3) and Amazon Redshift. It provides capabilities to transform, filter, and validate data to generate enriched and usable data in a few easy steps.

In this post, I walk you through the process of extracting log data from Datadog, using Amazon AppFlow and storing it in Amazon S3, and querying it with Amazon Athena.

Solution overview

The following diagram shows the flow of our solution.

The following diagram shows the flow of our solution.

The Datadog Agent is a lightweight software that can be installed in many different platforms, either directly or as a containerized version. It collects events and metrics from hosts and sends them to Datadog. Amazon AppFlow extracts the log data from Datadog and stores it in Amazon S3, which is then queried using Athena.

To implement the solution, you complete the following steps:

  1. Install and configure the Datadog Agent.
  2. Create a new Datadog application key.
  3. Create an Amazon AppFlow connection for Datadog.
  4. Create a flow in Amazon AppFlow.
  5. Run the flow and query the data.

Prerequisites

The walkthrough requires the following:

  • An AWS account
  • A Datadog account

Installing and configuring the Datadog Agent

The Datadog Agent is lightweight software installed on your hosts. With additional setup, the Agent can report live processes, logs, and traces. The Agent needs an API key, which is used to associate the Agent’s data with your organization. Complete the following steps to install and configure the Datadog Agent:

  1. Create a Datadog account if you haven’t already.
  2. Login to your account.
  3. Under Integrations, choose APIs.
  4. Copy the API key.
  5. Download the Datadog Agent software for the selected platform.
  6. Install the Agent on the hosts using the API key you copied.

Collecting logs is disabled by default in Datadog Agent. To enable Agent log collection and configure a custom log collection, perform the following steps on your host:

  1. Update the Datadog Agent’s main configuration file (datadog.yaml) with the following code:
    logs_enabled: true

In Windows this file is in C:\ProgramData\Datadog.

  1. Create custom log collection by customizing the conf.yaml file.

For example in Windows this file would be in the path C:\ProgramData\Datadog\conf.d\win32_event_log.d. The following code is a sample entry in the conf.yaml file that enables collection of Windows security events:

logs:
  - type: windows_event
    channel_path: Security
    source: Security
    service: windowsOS
    sourcecategory: windowsevent

Getting the Datadog application key

The application keys in conjunction with your organization’s API key give you full access to Datadog’s programmatic API. Application keys are associated with the user account that created them. The application key is used to log all requests made to the API. Get your application key with the following steps:

  1. Login into your Datadog account.
  2. Under Integrations, choose APIs.
  3. Expand Application Keys.
  4. For Application key name, enter a name.
  5. Choose Create Application key.

Creating an Amazon AppFlow connection for Datadog

A connection defines the source or destination to use in a flow. To create a new connection for Datadog, complete the following steps:

  1. On the Amazon AppFlow console, in the navigation pane, choose Connections. 
  2. For Connectors, choose Datadog.
  3. Choose Create Connection.
  4. For API key and Application Key, enter the keys procured from the previous steps.
  5. For Connection Name, enter a name; for example, myappflowconnection.
  6. Choose Connect.

Choose Connect.

Creating a flow in Amazon AppFlow

After you create the data connection, you can create a flow that uses the connection and defines the destination, data mapping, transformation, and filters.

Creating an S3 bucket

Create an S3 bucket as your Amazon AppFlow transfer destination.

  1. On the Amazon S3 console, choose Create bucket.
  2. Enter a name for your bucket; for example, mydatadoglogbucket.
  3. Ensure that Block all public access is selected.
  4. Enable bucket versioning and encryption (optional).
  5. Choose Create bucket.
  6. Enable Amazon S3 server access logging (optional).

Configuring the flow source

After you create the Datadog agent and the S3 bucket, complete the following steps to create a flow:

  1. On the Amazon AppFlow console, in the navigation pane, choose Flows.
  2. Choose Create flow.
  3. For Flow name, enter a name for your flow; for example mydatadogflow.
  4. For Source name, choose Datadog.
  5. For Choose Datadog connection, choose the connection created earlier.
  6. For Choose Datadog object, choose Logs.

For Choose Datadog object, choose Logs.

Choosing a destination

In the Destination details section, provide the following information:

  1. For Destination name, Choose Amazon S3.
  2. For Bucket details, choose the name of the S3 bucket created earlier.

This step create a folder with the flow name you specified within the bucket to store the logs.

This step creates a folder with the flow name you specified within the bucket to store the logs.

Additional settings

You can provide additional settings for data format (JSON, CSV, Parquet), data transfer preference, filename preference, flow trigger and transfer mode. Leave all settings as default:

  • For Data format preference, choose JSON format.
  • For Data transfer preference, choose No aggregation.
  • For Filename preference, choose No timestamp.
  • For Folder structure preference, choose No timestamped folder.

Adding a flow trigger

Flows can be run on a schedule, based on an event or on demand. For this post, we choose Run on demand.

Mapping data fields

You can map manually or using a CSV file. This determines how data is transferred from source to destination. You can apply transformations like concatenation, masking, and truncation to the mappings.

  1. In the Map data fields section, for Mapping method, choose Manually map fields.
  2. For Source field name, choose Map all fields directly.
  3. Choose Next.Choose Next.

Validation

You can add validation to perform certain actions based on conditions on field values.

  1. In the Validations section, for Field name choose Content.
  2. For Condition, choose Values are missing or null.
  3. For Action, choose Ignore record.For Action, choose Ignore record.

Filters

Filters specify which records to transfer. You can add multiple filters with criterion. For the Datadog data source, it’s mandatory to specify filters for Date_Range and Query. The format for specifying filter query for metrics and logs are different.

  1. In the Add filters section, for Field name, choose Date_Range.
  2. For Condition, choose is between.
  3. For Criterion 1 and Criterion 2, enter start and end dates for log collection.
  4. Choose Add filter.
  5. For your second filter, for Field name, choose
  6. For Condition, enter host:<yourhostname> AND service:(windowsOS OR LinuxOS).
  7. Choose Save.

Choose Save.

The service names specified in the filter should have Datadog logs enabled (refer to the earlier step when you installed and configured the Datadog Agent).

The following are some examples of the filter Query for metrics:

  • load.1{*} by {host}
  • avg:system.cpu.idle{*}
  • avg:system.cpu.system{*}
  • avg:system.cpu.user{*}
  • avg:system.cpu.guest{*}
  • avg:system.cpu.user{host:yourhostname}

The following are some examples of the filter Query for logs:

  • service:servicename
  • host:myhostname
  • host:hostname1 AND service:(servicename1 OR servicename2) 

Running the Flow and querying the data

If a flow is based on a trigger, you can activate or deactivate it. If it’s on demand, it must be run each time data needs to be transferred. When you run the flow, the logs or metrics are pulled into files residing in Amazon S3. The data is in the form of a nested JSON in this example. Use AWS Glue and Athena to create a schema and query the log data.

Querying data with Athena

When the Datadog data is in AWS, there are a host of possibilities to store, process, integrate with other data sources, and perform advanced analytics. One such method is to use Athena to query the data directly from Amazon S3.

  1. On the AWS Glue console, in the navigation pane, choose Databases.
  2. Choose Add database.
  3. For Database name, enter a name such as mydatadoglogdb.
  4. Choose Create.
  5. In the navigation pane, choose Crawlers.
  6. Choose Add Crawler.
  7. For Crawler name, enter a name, such as mylogcrawler.
  8. Choose Next.
  9. For Crawler source type, select Data stores.
  10. Choose Next.
  11. In the Add a data store section, choose S3 for the data store.
  12. Enter the path to the S3 folder that has the log files; for example s3://mydatadoglogbucket/logfolder/.
  13. In the Choose an IAM role section, select Create an IAM role and provide a name.
  14. For Frequency select Run on demand.
  15. In the Configure the crawler’s output section, for Database, select the database created previously.
  16. Choose Next.
  17. Review and choose Finish.
  18. When the crawler’s status changes to Active, select it and choose Run Crawler.

When the crawler finishes running, it creates the tables and populates them with data based on the schema it infers from the JSON log files.

  1. On the Athena console, choose Settings.
  2. Select an S3 bucket and folder where Athena results are stored.
  3. In the Athena query window, enter the following query:
    select * 
    from mydatadoglogdb.samplelogfile
    where content.attributes.level = 'Information'
    

  4. Choose Run Query.

This sample query gets all the log entries where the level is Information. We’re traversing a nested JSON object in the Athena query, simply with a dot notation.

Summary

In this post, I demonstrated how we can bring Datadog data into AWS. Doing so opens a host of opportunities to use the tools available in AWS to drive advance analytics and monitoring while integrating with data from other sources.

With Amazon AppFlow, you can integrate applications in a few minute, transfer data at massive scale, and enrich the data as it flows, using mapping, merging, masking, filtering, and validation. For more information about integrating SaaS applications and AWS, see Amazon AppFlow.


About the Author

Gopalakrishnan Ramaswamy is a Solutions Architect at AWS based out of India with extensive background in database, analytics, and machine learning. He helps customers of all sizes solve complex challenges by providing solutions using AWS products and services. Outside of work, he likes the outdoors, physical activities and spending time with friends and family.

Querying a Vertica data source in Amazon Athena using the Athena Federated Query SDK

Post Syndicated from Kelly Ragan original https://aws.amazon.com/blogs/big-data/querying-a-vertica-data-source-in-amazon-athena-using-the-athena-federated-query-sdk/

The ability to query data and perform ad hoc analysis across multiple platforms and data stores with a single tool brings immense value to the big data analytical arena. As organizations build out data lakes with increasing volumes of data, there is a growing need to combine that data with large amounts of data in other data stores. As the variety of data increases, it becomes paramount to have a query tool to bridge two or more data stores with a single query.

Even though data lakes became popular for analytic workloads recently, it’s not uncommon to have data warehouses in addition to data lakes for various reporting and business intelligence (BI) use cases. It becomes imperative to be able to seamlessly query the data stored in the data warehouse and the data lake. To address this issue, Amazon Athena has released a feature called Athena Federated Query. Athena is an interactive query service provided by AWS that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Vertica is a columnar MPP database platform that can be deployed in the cloud or on premises, and supports exabyte scale data warehouses. With Athena Federated Query and the Vertica connector, you can now run analytical queries over a data warehouse on Vertica and a data lake in Amazon S3.

Athena Federated Query includes pre-built connectors to a variety of AWS services and databases, as well as an SDK to build custom connectors to other databases and data stores. With this feature, federated queries can pull data from a data lake in an S3 bucket and from an external data source, and then combine it into a single result set in Athena. These connectors are an extension of the Athena query engine, which translates content between Athena and the external data source. Pre-built connectors exist for Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), and Amazon Relational Database Service (Amazon RDS), as well as a JDBC connector for Amazon Redshift, MySQL, and PostgreSQL. For other types of relational databases, you can use the Athena Federated Query SDK to create a custom connector.

In this post, we demonstrate how to deploy the custom connector between Athena and a Vertica database built using the Athena Federated Query SDK. After deploying the custom connector, we demonstrate issuing federated queries and moving data from Vertica to a data lake using CREATE TABLE AS (CTAS) with a federated query.

AWS services used in the solution

The Athena Federated Query SDK is an open-source framework to build custom connectors, and comes with a connector publish tool that deploys the connector executables in an application to the AWS Serverless Application Repository. The Athena Federated Query uses an AWS Lambda function that in turn uses the application deployed to the AWS Serverless Application Repository.

A custom connector is composed of a Lambda function that utilizes three components:

  • MetadataHandler – An interface that exposes metadata information of schemas, tables, and columns from the underlying data store to Athena
  • RecordHandler – An interface that provides hooks to read data from the external source and share it with the Athena query engine in Apache Arrow columnar format
  • CompositeHandler – For managing running the MetadataHandler and RecordHandler

The Lambda function connects to the external data store using an appropriate connection protocol and sends the parsed SQL statement. In the case of Vertica, it is JDBC. The RecordHandler processes the result set produced by the external data store and passes the rows to Athena for final processing. Multiple Lambda functions are called by Athena depending on the Lambda concurrency settings to read the result set in parallel. A spill bucket is used to handle a large dataset that exceeds the Lambda server’s capacity to process the result set.

The JDBC connection established by the Lambda function to the external database is used to send the parsed SQL statement and retrieve the result set rows from the external database. This scenario works well in terms of bandwidth for smaller databases and result sets. However, you might have Vertica deployments with petabyte or exabyte data warehouses. Typical queries return result sets on the order of 10, 20, 30 gigabytes, or more. Due to the bandwidth issue with a JDBC connection, the solution presented in this post modifies the Athena Federated Query SDK to implement a different route for the transmission of large result sets from Vertica to the Athena server for final processing.

The alternate solution utilizes the Vertica EXPORT command as a wrapper around the parsed SQL statement. You can use the EXPORT command to write a result set for a SQL statement directly to an S3 bucket using Vertica’s highly parallelized write to Amazon S3 using partitioning. This solution modifies the SDK to allow Athena to read the result set in the S3 bucket, determine the number of partitions, and call subsequent Lambda functions to parallelize the read of the result set. This produces an efficient way to move a multi-gigabyte result set from Vertica to Athena with parallelized writes from Vertica to Amazon S3 and parallelized reads from Amazon S3 to Athena. When connecting to the Vertica database, the SDK uses AWS Secrets Manager to retrieve a user ID and password for a service account on the Vertica database.

Solution architecture

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The following diagram shows the solution architecture for the Vertica custom connector when deployed to AWS.

The connector components are as follows:

  1. A user issues a federated SQL query in Athena against a table in Vertica.
  2. Athena parses the query and calls a Lambda function.
  3. The Lambda function makes a call to Secrets Manager to get the user ID and password for connecting to Vertica.
  4. The connector sends an EXPORT statement wrapper with the embedded SQL statement to Vertica through the JDBC connection. For example, see the following code:
    EXPORT TO PARQUET (directory = 's3://<bucket_name>/<folder_name>, 
    Compression='Snappy', fileSizeMB=64) OVER() as   
    SELECT  
    ORDER_ID,  
    ITEM,  
    CUSTOMER_ID,
    ORDERED_DATE
    FROM SCHEMA1.ORDERS  
    WHERE CUSTOMER_ID = 2;
    

  5. Vertica processes the SQL query and writes the result set to the S3 bucket specified in the EXPORT command. Vertica parallelizes the write to S3 bucket based on the fileSizeMB parameter into as many partitions as needed for the result set.
  6. Athena calls a Lambda function to scan the S3 bucket in order to determine the number of files to read for the result set.
  7. Athena invokes multiple Lambda functions depending on the number of partitions using Amazon S3 Select. This allows Athena to parallelize the read of the S3 files.
  8. Athena combines the result set returned from Vertica with data scanned from the data lake, and returns the combined result set to the user.

Prerequisites

Before you get started, make sure you have the following prerequisites:

  • Amazon EC2 IAM role permissions – The AWS Identity and Access Management (IAM) role of the Amazon Elastic Compute Cloud (Amazon EC2) machines hosting the Vertica database must be given write permissions to the VerticaExport S3 bucket, which is created when deploying the connector.
  • Secrets Manager – The Vertica connection credentials are stored in Secrets Manager. The secret name is prefixed with Vertica– and the secret value is the connection credentials.
  • Lambda IAM role permissions – When the Lambda function is deployed to the AWS Serverless Application Repository, it creates a custom IAM role for the function to run. The custom role has the following IAM permissions in order to successfully perform the read and write functions associated with the MetadataHandler and RecordHandler:
    • AWSLambdaBasicExecutionRole
    • AWSLambdaVPCAccessExecutionRole
    • For Secrets Manager, GetSecretValue for secrets with a prefix given in SecretNameOrPrefix
    • For Amazon S3, list, read, and write permissions for SpillBucket and ExportBucket, and list permissions for all S3 buckets
    • For Athena, GetQueryExecution

Demonstration tables

To demonstrate the Athena Vertica connector capabilities, we use the following components:

  • A Vertica database running in our AWS environment.
  • A Vertica table called orders containing details of customer orders.
  • An Athena table called customer, which has an S3 bucket as a data source. This table contains information regarding customers.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the customer table in Amazon S3.

The following screenshot shows the details of the orders table in Vertica.

The following screenshot shows the details of the orders table in Vertica.

Setting up the Athena Vertica connector project

To set up your connector project, complete the following steps:

  1. Create an S3 bucket in your AWS account. This is the bucket where the result set from Vertica is exported.
  2. Create another S3 bucket in your AWS account. This is the bucket where the code for the connector is stored and retrieved.
  3. Grant the IAM role of the EC2 machines hosting the Vertica database read and write permissions to the S3 result set bucket, allowing Vertica to export data to the bucket.
  4. Clone the GitHub repo in your local folder.
  5. Open the project in your preferred IDE.
  6. From the athena-query-federation directory, run mvn clean install.
  7. From the athena-vertica directory, run mvn clean install.
  8. From the athena-vertica directory, run ../tools/publish.sh <s3_code_bucket_name> athena-vertica [region] to publish the connector to your private AWS Serverless Application Repository.
  9. Upon successful completion of the script, the connector’s serverless application is published to the AWS Serverless Application Repository.

Deploying the connector

To deploy your connector, complete the following steps:

  1. On the AWS Serverless Application Repository console, choose Published Applications.
  2. On the Private Applications tab, select Show apps that create in order to see deployed applications.
  3. Choose the VerticaAthenaConnector serverless app.
  4. For AthenaCatalogName, enter the name of the connector Lambda function used when querying the Vertica tables (avc).
  5. For SecretNameOrPrefix, enter the prefix used to store the Vertica credentials in Secrets Manager (the default is Vertica-).
  6. For SpillBucket, enter the S3 bucket name where data is spilled in case the result set data volume crosses a certain limit (test-spill-bucket).
  7. For VerticaExportBucket, enter the S3 bucket where the result set from Vertica is exported (test-export-bucket).
  8. For VpcId, enter your VPC ID.

For VpcId, enter your VPC ID.

  1. For SpillPrefix, enter athena-spill.
  2. For SubnetIds, enter your subnet IDs.
  3. For VerticaConnectionString, enter the connection string of the Vertica database in the following format:
    jdbc:vertica://<host_name>:<port>/<database>?user=${vertica-username}&password=${vertica-password}
    

    Where, vertica-username and vertica-password are the secret names of the Vertica   user credentials stored in AWS Secrets Manager.

  1. Select I acknowledge that this app creates custom IAM roles.

Select I acknowledge that this app creates custom IAM roles.

  1. Choose Deploy.

Upon successful deployment, a Lambda function with the name given for AthenaCatalogName is deployed in your AWS environment. We use this function to issue federated queries to Vertica. The connector is now deployed and ready to use.

Using the connector

On the Athena console, you can query Vertica tables as shown in the following code. The value for <lambda_function> corresponds to the function you created in the previous section.

SELECT  
ORDER_ID,   
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
ORDER BY ORDER_ID DESC

In this example, we named the function as avc. The following screenshot shows our query results.

The following screenshot shows our query results.

This demonstrates that the newly deployed connector read the user-requested columns and the Vertica source table, wrapped an EXPORT statement around the SQL statement, and ran it in Vertica. The results of this query were exported to the specified S3 bucket (test-export-bucket) in Parquet format. The connector then invoked multiple Lambda functions to read the data from the S3 bucket using Amazon S3 Select and displayed it on the Athena console. Note that currently the connector exports Vertica timestamp and timestamptz data types as a varchar data type. Therefore we need to use the date_parse(string, format) function to convert the timestamps columns into the correct data type.

We can also create an Athena table using CTAS with the result set of the Vertica query using the following query:

CREATE TABLE default.vertica_customers_table AS (
SELECT  
ORDER_ID,
CUSTOMER_ID,
PRODUCT_ID,
PRODUCT_NAME,
ORDER_DATE
FROM "lambda:<lambda_function>".SCHEMA1.ORDERS  
WHERE CUSTOMER_ID <= 3461
);

We can then use the newly created table to query the data as shown in the following screenshot.

We can then use the newly created table to query the data as shown in the following screenshot.

In addition, we can also query and join the customer data in Amazon S3 and orders data in Vertica using the following sample query:

WITH   
customer_data AS (  
  SELECT   
   	CUSTOMER_NAME,  
   	CUSTOMER_ID  
    
  FROM default.customer
  ),  
orders_data AS (  
  SELECT    
   	ORDER_ID,  
   	PRODUCT_NAME,  
   	CUSTOMER_ID   
  FROM "lambda:<lambda_function>".schema1.orders  
  )  
SELECT a.CUSTOMER_ID, b.ORDER_ID, b.PRODUCT_NAME
FROM customer_data a 
INNER JOIN orders_data b
ON a.customer_data.customer_id = b.orders_data.customer_id 
WHERE lower(b.PRODUCT_NAME) like 'pencil'
ORDER BY b.ORDER_ID DESC

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This query joins the orders data in Vertica with customer data in the S3 bucket in the customer_id column and displays the results on the Athena console.

This demonstrates the ease of performing analytics across multiple platforms and data stores.

Conclusion

In this post, we introduced the Athena Vertica connector, its solution architecture, and demonstrated how to deploy the connector using the Athena Federated Query SDK. We saw how to run SQL queries on the Vertica data source. We also learned that we can use the connector to perform extract, transform, and load operations on the data in the Vertica tables and Amazon S3, enabling us to perform faster and better analytics across multiple platforms and data sources.

For more information about Athena Federated Query, see the GitHub repo.

Special Acknowledgement

Special acknowledgement goes to the Intuit Data Engineering staff Denise McInerney – Data Architect, Sanjay Rane – Group Engineering Manager – Data, and Kannan Nagarajan – Database Architect. They helped design, review, and support the development of the custom connector and architecture.


About the Authors

Kelly RaganKelly Ragan is a Senior Data Architect, Strategic Accounts Team, AWS Professional Services. He helps customers solve big data problems and wrestle with large-scale data warehouses. In his spare time, he enjoys snow skiing, bicycling, and camping in the Pacific Northwest.

 

 

Rohit MasurRohit Masur is an Associate Big Data Consultant, Data and Analytics Team, AWS Professional Services. He helps customers architect and implement solutions on AWS to get business value out of data. In his spare time, he enjoys reading books, going on long walks, and exploring new hiking trails in the Bay Area.

Automating AWS service logs table creation and querying them with Amazon Athena

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automating-aws-service-logs-table-creation-and-querying-them-with-amazon-athena/

I was working with a customer who was just getting started using AWS, and they wanted to understand how to query their AWS service logs that were being delivered to Amazon Simple Storage Service (Amazon S3). I introduced them to Amazon Athena, a serverless, interactive query service that allows you to easily analyze data in Amazon S3 and other sources. Together, we used Athena to query service logs, and were able to create tables for AWS CloudTrail logs, Amazon S3 access logs, and VPC flow logs. As I was walking the customer through the documentation and creating tables and partitions for each service log in Athena, I thought there had to be an easier and faster way to allow customers to query their logs in Amazon S3, which is the focus of this post.

This post demonstrates how to use AWS CloudFormation to automatically create AWS service log tables, partitions, and example queries in Athena. We also use the SQL query editor in Athena to query the AWS service log tables that AWS CloudFormation created.

Athena best practices

This solution is appropriate for ad hoc use and queries the raw log files. These raw files can range from compressed JSON to uncompressed text formats, depending on how they were configured to be sent to Amazon S3. If you need to query over hundreds of GBs or TBs of data per day in Amazon S3, performing ETL on your raw files and transforming them to a columnar file format like Apache Parquet can lead to increased performance and cost savings. You can save on your Amazon S3 storage costs by using snappy compression for Parquet files stored in Amazon S3. To learn more about Athena best practices, see Top 10 Performance Tuning Tips for Amazon Athena.

Table partition strategies

There are a few important considerations when deciding how to define your table partitions. Mainly you should ask: what types of queries will I be writing against my data in Amazon S3? Do I only need to query data for that day and for a single account, or do I need to query across months of data and multiple accounts? In this post, we talk about how to query across a single, partitioned account.

By partitioning data, you can restrict the amount of data scanned per query, thereby improving performance and reducing cost. When creating a table schema in Athena, you set the location of where the files reside in Amazon S3, and you can also define how the table is partitioned. The location is a bucket path that leads to the desired files. If you query a partitioned table and specify the partition in the WHERE clause, Athena scans the data only for that partition. For more information, see Table Location in Amazon S3 and Partitioning Data. You can then define partitions in Athena that map to the data residing in Amazon S3.

Let’s look at an example to see how defining a location and partitioning our table can improve performance and reduce costs. In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket, starting from the bucket name and going all the way down to the day.

In the following tree diagram, we’ve outlined what the bucket path may look like as logs are delivered to your S3 bucket

Outlined in red is where we set the location for our table schema, and Athena then scans everything after the CloudTrail folder. We then outlined our partitions in blue. This is where we can specify the granularity of our queries. In this case, we partition our table down to the day, which is very granular because we can tell Athena exactly where to look for our data. This is also the most performant and cost-effective option because it results in scanning only the required data and nothing else.

If you have to query multiple accounts and Regions, you should back off the location to AWSLogs and then create a non-partitioned CloudTrail table. This allows you to write queries across all your accounts and Regions, but the trade-off is that your queries take much longer and are more expensive due to Athena having to scan all the data that comes after AWSLogs every query. However, querying multiple accounts is beyond the scope of this post.

Prerequisites

Before you get started, you should have the following prerequisites:

  • Service logs already being delivered to Amazon S3
  • An AWS account with access to your service logs

Deploying the automated solution in your AWS account

The following steps walk you through deploying a CloudFormation template that creates saved queries for you to run (Create Table, Create Partition, and example queries for each service log).

  1. Choose Launch Stack:

  1. Choose Next.
  2. For Stack name, enter a name for your stack.

You don’t need to have every AWS service log that the template asks for. If you don’t have CloudFront logs for example, you can leave the PathParameter as is. If you need CloudFront logs in the future, you can simply update the Create Table statement with the correct Amazon S3 location in Athena.

  1. For each service log table you want to create, follow the steps below:
  • Replace <_BUCKET_NAME> with the name of your S3 bucket that holds each AWS service log. You can use the same bucket name if it’s used to hold more than one type of service log.
  • Replace <Prefix> with your own folder prefix in Amazon S3. If you don’t have a prefix, make sure to remove it from the path parameters.
  • Replace <ACCOUNT-ID> and <REGION> with desired account and region.

Choose Next.

  1. Choose Next.
  2. Enter any tags you wish to assign to the stack.
  3. Choose Next.
  4. Verify parameters are correct and choose Create stack at the bottom.

Verify the stack has been created successfully. The stack takes about 1 minute to create the resources.

Querying your tables

You’re now ready to start querying your service logs.

  1. On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

On the Athena console, on the Saved queries tab, search for the service log you want to interact with.

  1. Choose Create Table – CloudTrail Logs to run the SQL statement in the Athena query editor.

Make sure the location for Amazon S3 is correct in your SQL statement and verify you have the correct database selected.

  1. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

The table cloudtrail_logs is created in the selected database. You can repeat this process to create other service log tables.

For partitioned tables like cloudtrail_logs, you must add partitions to your table before querying.

  1. On the Saved queries tab, choose Create Partition – CloudTrail.
  2. Update the Region, year, month, and day you want to partition. Choose Run query or press Tab+Enter to run the query.

Choose Run query or press Tab+Enter to run the query.

After you run the query, you have successfully added a partition to your cloudtrail_logs table. Let’s look at some of the example queries we can run now.

  1. On the Saved queries tab, choose Query – CloudTrail Logs.

This is a base template included to begin querying your CloudTrail logs.

  1. Highlight the query and choose Run query.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

You can see the base query template uses the WHERE clause to leverage partitions that have been loaded.

Let’s say we have a spike in API calls from AWS Lambda and we want to see the users that the calls were coming from in a specific time range as well as the count for each user. Our query looks like the following code:

SELECT useridentity.sessioncontext.sessionissuer.username as "User",
       count(eventname) as "Lambda API Calls"
FROM cloudtrail_logs
WHERE eventsource = 'lambda.amazonaws.com'
       AND eventtime BETWEEN '2020-11-24T18:00:00Z' AND '2020-11-24T21:00:00Z' 
group by useridentity.sessioncontext.sessionissuer.username
order by count(eventname) desc

Or if we wanted to check our S3 Access Logs to make sure only authorized users are accessing certain prefixes:

SELECT *
FROM s3_access_logs
WHERE key='prefix/images/example.jpg'
        AND requester != 'arn:aws:iam::accountid:user/username'

Cost of solution and cleaning up

Deploying the CloudFormation template doesn’t cost anything. You’re only charged for the amount of data scanned by Athena. Remember to use the best practices we discussed earlier when querying your data in Amazon S3. For more pricing information, see Amazon Athena pricing and Amazon S3 pricing.

To clean up the resources that were created, delete the CloudFormation stack you created earlier. This also deletes the saved queries in Athena.

Summary

In this post, we discussed how we can use AWS CloudFormation to easily create AWS service log tables, partitions, and starter queries in Athena by entering bucket paths as parameters. We used CloudTrail and Amazon S3 access logs as examples, but you can replicate these steps for other service logs that you may need to query by visiting the Saved queries tab in Athena. Feel free to check out the video as well, where I go over how we store logs in Amazon S3 and then give a quick demo on how to deploy the solution.

For more information about service logs, see Easily query AWS service logs using Amazon Athena.


About the Author

Michael Hamilton is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife, kids, and a 2-year-old German shepherd.

AWS PrivateLink for Amazon S3 is Now Generally Available

Post Syndicated from Martin Beeby original https://aws.amazon.com/blogs/aws/aws-privatelink-for-amazon-s3-now-available/

At AWS re:Invent, we pre-announced that AWS PrivateLink for Amazon S3 was coming soon, and soon has arrived — this new feature is now generally available. AWS PrivateLink provides private connectivity between Amazon Simple Storage Service (S3) and on-premises resources using private IPs from your virtual network.

Way back in 2015, S3 was the first service to add a VPC endpoint; these endpoints provide a secure connection to S3 that does not require a gateway or NAT instances. Our customers welcomed this new flexibility but also told us they needed to access S3 from on-premises applications privately over secure connections provided by AWS Direct Connect or AWS VPN.

Our customers are very resourceful and by setting up proxy servers with private IP addresses in their Amazon Virtual Private Clouds and using gateway endpoints for S3, they found a way to solve this problem. While this solution works, proxy servers typically constrain performance, add additional points of failure, and increase operational complexity.

We looked at how we could solve this problem for our customers without these drawbacks and PrivateLink for S3 is the result.

With this feature you can now access S3 directly as a private endpoint within your secure, virtual network using a new interface VPC endpoint in your Virtual Private Cloud. This extends the functionality of existing gateway endpoints by enabling you to access S3 using private IP addresses. API requests and HTTPS requests to S3 from your on-premises applications are automatically directed through interface endpoints, which connect to S3 securely and privately through PrivateLink.

Interface endpoints simplify your network architecture when connecting to S3 from on-premises applications by eliminating the need to configure firewall rules or an internet gateway. You can also gain additional visibility into network traffic with the ability to capture and monitor flow logs in your VPC. Additionally, you can set security groups and access control policies on your interface endpoints.

Available Now
PrivateLink for S3 is available in all AWS Regions. AWS PrivateLink is available at a low per-GB charge for data processed and a low hourly charge for interface VPC endpoints. We hope you enjoy using this new feature and look forward to receiving your feedback. To learn more, check out the PrivateLink for S3 documentation.

Try out AWS PrivateLink for Amazon S3 today, and happy storing.

— Martin

Building a cost efficient, petabyte-scale lake house with Amazon S3 lifecycle rules and Amazon Redshift Spectrum: Part 2

Post Syndicated from Cristian Gavazzeni original https://aws.amazon.com/blogs/big-data/part-2-building-a-cost-efficient-petabyte-scale-lake-house-with-amazon-s3-lifecycle-rules-and-amazon-redshift-spectrum/

In part 1 of this series, we demonstrated building an end-to-end data lifecycle management system integrated with a data lake house implemented on Amazon Simple Storage Service (Amazon S3) with Amazon Redshift and Amazon Redshift Spectrum. In this post, we address the ongoing operation of the solution we built.

Data ageing process after a month (ongoing)

Let’s assume a month has elapsed since walking through the use case in the last post, and old historical data was classified and tiered accordingly to policy. You now need to enter the new monthly data generated into the lifecycle pipeline as follows:

  • June 2020 data – Produced and consolidated into Amazon Redshift local tables
  • December 2019 data – Migrated to Amazon S3
  • June 2019 data – Migrated from Amazon S3 to S3-IA
  • March 2019 data – Migrated from S3-IA to Glacier

The first step required is to increase the ageing counter of all the Parquet files (both midterm and shortterm prefixes), using aws s3api get-object-tagging to check the current value and increasing by 1 with aws s3api set-object-tagging. This can be cumbersome if you have many objects, but you can automate it with Amazon S3 CLI scripts or SDKs like Boto3 (Phyton).

The following is a simple Python script you can use to check the current tag settings for all keys in the prefix extract_shortterm:

from boto3 import client 
import re 
conn = client('s3') 
def printtags(mybucket, myprefix): 
    for key in conn.list_objects(Bucket = mybucket, Prefix = myprefix)['Contents']: 
        if key['Key'].endswith('.parquet'): 
            tagset = conn.get_object_tagging(Bucket=mybucket, Key=key['Key'])['TagSet'] 
            stringa = str(tagset) 
            stringtag = (re.findall("\d+", stringa)) 
            tagvalue = int(stringtag[0]) 
            print((key['Key']), "ageing = ", tagvalue) 
return 
#below set parameters bucket and prefix accordingly with your env 
printtags('rs-lakehouse-blog-post', 'extract_shortterm/')

This second Python script lists all current tag settings for all keys in the prefix extract_shortterm, increases by 1 ageing, and lists the keys and new tag values. If other tags were added to these objects prior to this step, this new tag overwrites the entire tagSet. The set object tagging operation is not an append, but a completely new PUT.

from boto3 import client 
import re
conn = client('s3') 
def updateTags(mybucket, myprefix): 
    for key in conn.list_objects(Bucket = mybucket, Prefix = myprefix)['Contents']: 
        if key['Key'].endswith('.parquet'): 
            tagset = conn.get_object_tagging(Bucket = mybucket, Key=key['Key'])['TagSet'] 
            stringa = str(tagset) 
            stringtag = (re.findall("\d+", stringa)) 
            tagvalue = int(stringtag[0]) 
            print((key['Key']), "Current ageing = ", tagvalue) 
            tagvalue = tagvalue+1 
            put_tags_response = conn.put_object_tagging(Bucket=mybucket, Key = key['Key'], Tagging = {'TagSet': [ { 'Key': 'ageing', 'Value': str(tagvalue) }, ] } ) 
return 
printtags('rs-lakehouse-blog-post', 'extract_shortterm/')

To run the pipeline described before, you need to perform the following:

  1. Unload the December 2019 data.
  2. Apply the tag ageing to 6.
  3. Add the new Parquet file as a new partition to the external table taxispectrum.taxi_archive.

For the relevant syntax, see [part 1]. You can automate these tasks using an AWS Lambda function and use a monthly schedule.

Check the results with a query to the external table and don’t forget to remove unloaded items from the Amazon Redshift local table, as you did in part 1 of this series.

In this use case, we know exactly the mapping of June 2019 to the Amazon S3 key name because we used a specific naming convention. If your use case is different, you can use the two pseudo-columns automatically created in every Amazon Redshift external table: $path and $size. See the following code:

select pickup, 
    "$path", 
    "$size" 
from taxispectrum.taxi_archive 
where pickup between '2019-06-01 00:00:00' and '2019-06-30 23:59:59' 
limit 10
;

 The following screenshot shows our results.

The following screenshot shows our results. 

We’re migrating the March 2019 Parquet file to Glacier, so you should remove the related partition from the AWS Glue Data Catalog:

ALTER TABLE taxispectrum.taxi_archive
DROP PARTITION (yearmonth=‘2019-03’)
;

Right to be forgotten

One of the pillar rules of GDPR is the “right to be forgotten” rule—the ability for a customer or employee to request deletion of any personal data.

Implementing this feature for external tables on Amazon Redshift requires a different approach than for local tables, because external tables don’t support delete and update operations.

You can implement two different approaches.

In and out

In this first approach, you copy the external table to a temporary Amazon Redshift table, delete the specific rows, unload the temporary table to Amazon S3 and overwrite the key name, and drop the temporary (internal table).

Let’s assume that the drivers in the dataset are identified with column pulocid. We want to delete all records related to a driver identified with pulocid 129 and who worked between October 2019 and November 2019.

  1. With the following code (from Amazon Redshift), you can identify a every single row matched with specific single Parquet file:
    select pickup,
    	dropoff,
    	pulocid,
    	“$path”
    from taxispectrum.taxi_shortterm
    where pulocid = 129 and pickup between ‘2019-10-01 00:00:00’ and ‘2019-11-30 23:59:59’
    ;

The following screenshot shows our results.

 The following screenshot shows our results.

  1. When checking the applied tags, note the value associated to the ageing tag, or save the output of the following command in a temporary JSON file:
    aws s3api get-object-tagging \
    --bucket rs-lakehouse-blog-post \
    --key extract_shortterm/green_tripdata_2019-10000.parquet > \
    /tmp/oldtag.json

  2. Create two temporary tables, one for each of the two Parquet files matching the query (October and November):
    create table temporaryoct (like greentaxi);

  3. Copy the Parquet file to the local table, using the format as parquet attribute:
    copy temporaryoct
    from ‘s3://rs-lakehouse-blog-post/extract_shortterm/green_tripdata_2019-10000.parquet’
    iam_role ‘arn:aws:iam::123456789012:role/BlogSpectrumRole’
    format as parquet
    ;

  4. Delete the records matching the “right to be forgotten” request criteria:
    delete from temporaryoct 
    where pulocid = 129 and pickup between '2019-10-01 00:00:00' and '2019-11-30 23:59:59'
    ;

  5. Overwrite the Parquet file with the UNLOAD command (note the allowoverwrite option):
    unload ('select * from temporaryoct') 
    to 's3://rs-lakehouse-blog-post/extract_shortterm/green_tripdata_2019-10000.parquet’ 
    iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' 
    parquet parallel off allowoverwrite
    ;

  6. Drop the temporary table:
    drop table temporaryoct;

In more complex use cases, user data might span multiple months, and our approach might not be effective. In these cases, using Spark to process and rewrite the Parquet could be a better and faster solution.

In other use cases, the number of records to be deleted could be a majority. If so, as an alternative to the delete and unload steps, you could use CREATE EXTERNAL TABLE AS (CTAS). CTAS creates an external table based on the column definition from a query and writes the results of that query on Amazon S3.

Edit your own

The second option is to use an external editor to access the Amazon S3 file and remove specific records. You could use a Spark script with the following steps:

  1. Create a DataFrame.
  2. Import a Parquet file in memory.
  3. Remove records matching your criteria.
  4. Overwrite the same Amazon S3 key with the new data.

Building a simple data ageing dashboard

Sometimes data temperature is very predictable and based on ageing, but in some cases, especially when data is originated and accessed from different entities, it’s not easy to build a model to fit the best storage transition strategy. For these scenarios, you can use Amazon S3 analytics storage class analysis and Amazon S3 access logs.

Storage class analytis observes the infrequent access patterns of a filtered set of data over a period of time. You can use the analysis results to help you improve your lifecycle policies. You can configure storage class analysis to analyze all the objects in a bucket rr, and configure filters to group objects together for analysis by a common prefix (objects that have names that begin with a common string), by object tags, or by both prefix and tags. Filtering by object groups is the best way to benefit from storage class analysis.

To achieve a better understanding of how data is accessed (and who accessed it, and when) and build a custom tiering strategy, you can use Amazon S3 access logs. This feature doesn’t have any additional costs, but log retention incurs Amazon S3 storage costs. You first define a recipient to store the logs.

  1. Create a new bucket named rs-lakehouse-blog-post-logs.
  2. To set up S3 Server access logging on the source bucket rs-lakehouse-blog-post on the Amazon S3 console, on the Properties tab, choose Server access logging.
  3. For Target bucket, enter rs-lakehouse-blog-post-logs.
  4. For Target prefix, leave blank.
  5. Choose Save.

You first define a recipient to store the logs.

Let’s assume that after few days of activities, you want to discover how users and applications accessed the data.

  1. On the Amazon Redshift console, create an external table to map the Amazon S3 access logs:
    CREATE EXTERNAL TABLE taxispectrum.s3accesslogs(
        BucketOwner                   varchar(256), 
        Bucket                        varchar(256), 
        RequestDateTime               varchar(256), 
        RemoteIP                      varchar(256), 
        Requester                     varchar(256), 
        RequestID                     varchar(256), 
        Operation                     varchar(256), 
        Key                           varchar(256), 
        RequestURI_operation          varchar(256),
        RequestURI_key                varchar(256),
        RequestURI_httpProtoversion   varchar(256),
        HTTPstatus                    varchar(256), 
        ErrorCode                     varchar(256), 
        BytesSent                     varchar(256), 
        ObjectSize                    varchar(256), 
        TotalTime                     varchar(256), 
        TurnAroundTime                varchar(256), 
        Referrer                      varchar(256), 
        UserAgent                     varchar(256), 
        VersionId                     varchar(256)
    )
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
    WITH SERDEPROPERTIES (
        'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \"([^ ]*) ([^ ]*) ([^ ]*)\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)'
    )
    STORED AS TEXTFILE
    LOCATION 's3://rs-lakehouse-blog-post-logs/'
    ;

  2. Check the AWS Identity and Access Management (IAM) policy S3-Lakehouse-Policy created in part 1 and add the following two lines to the JSON definition file:
    "arn:aws:s3:::rs-lakehouse-blog-post-logs",
    "arn:aws:s3:::rs-lakehouse-blog-post-logs/*"

  3. Query a few columns:
    select bucket, 
        key, 
        requestdatetime, 
        remoteip, 
        operation, 
        useragent 
    from taxispectrum.s3accesslogs
    ;

The following screenshot shows our results.

The following screenshot shows our results.

This report is a starting point for both auditing purposes and for analyzing access patterns. As a next step, you could use a business intelligence (BI) visualization solution like Amazon QuickSight and create a dataset in order to create a dashboard showing the most and least accessed files.

Cleaning up

To clean up your resources after walking through this post, complete the following steps:

  1. Delete the Amazon Redshift cluster without the final cluster snapshot:
    aws redshift delete-cluster –-cluster-identifier redshift-cluster-1

  2. Delete the schema and table defined in AWS Glue:
    aws glue delete-table \
        –-database-name blogdb \
        –-name taxi_archive 
        
    aws glue delete-table \
        –-database-name blogdb \
        –-name s3accesslogs 
        
    aws glue delete-database –-name blogdb

  3. Delete the S3 buckets and all their content:
    aws s3 rb s3://rs-lakehouse-blog-post –-force
    aws s3 rb s3://rs-lakehouse-blog-post-logs –-force 

Conclusion

In the first post in this series, we demonstrated how to implement a data lifecycle system for a lake house using Amazon Redshift, Redshift Spectrum, and Amazon S3 lifecycle rules. In this post, we focused on how to operationalize the solution with automation scripts (with the AWS Boto3 library for Python) and S3 Server access logs.


About the Authors

Cristian Gavazzeni is a senior solution architect at Amazon Web Services. He has more than 20 years of experience as a pre-sales consultant focusing on Data Management, Infrastructure and Security. During his spare time he likes eating Japanese food and travelling abroad with only fly and drive bookings.

 

 

Francesco MarelliFrancesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

Building a cost efficient, petabyte-scale lake house with Amazon S3 lifecycle rules and Amazon Redshift Spectrum: Part 1

Post Syndicated from Cristian Gavazzeni original https://aws.amazon.com/blogs/big-data/part-1-building-a-cost-efficient-petabyte-scale-lake-house-with-amazon-s3-lifecycle-rules-and-amazon-redshift-spectrum/

The continuous growth of data volumes combined with requirements to implement long-term retention (typically due to specific industry regulations) puts pressure on the storage costs of data warehouse solutions, even for cloud native data warehouse services such as Amazon Redshift. The introduction of the new Amazon Redshift RA3 node types helped in decoupling compute from storage growth. Integration points provided by Amazon Redshift Spectrum, Amazon Simple Storage Service (Amazon S3) storage classes, and other Amazon S3 features allow for compliance of retention policies while keeping costs under control.

An enterprise customer in Italy asked the AWS team to recommend best practices on how to implement a data journey solution for sales data; the objective of part 1 of this series is to provide step-by-step instructions and best practices on how to build an end-to-end data lifecycle management system integrated with a data lake house implemented on Amazon S3 with Amazon Redshift. In part 2, we show some additional best practices to operate the solution: implementing a sustainable monthly ageing process, using Amazon Redshift local tables to troubleshoot common issues, and using Amazon S3 access logs to analyze data access patterns.

Amazon Redshift and Redshift Spectrum

At re:Invent 2019, AWS announced new Amazon Redshift RA3 nodes. Even though this introduced new levels of cost efficiency in the cloud data warehouse, we faced customer cases where the data volume to be kept is an order of magnitude higher due to specific regulations that impose historical data to be kept for up to 10–12 years or more. In addition, this historical cold data must be accessed by other services and applications external to Amazon Redshift (such as Amazon SageMaker for AI and machine learning (ML) training jobs), and occasionally it needs to be queried jointly with Amazon Redshift hot data. In these situations, Redshift Spectrum is a great fit because, among other factors, you can use it in conjunction with Amazon S3 storage classes to further improve TCO.

Redshift Spectrum allows you to query data that resides in S3 buckets using already in place application code and logic used for data warehouse tables, and potentially performing joins and unions of Amazon Redshift local tables and data on Amazon S3.

Redshift Spectrum uses a fleet of compute nodes managed by AWS that increases system scalability. To use it, we need to define at least an external schema and an external table (unless an external schema and external database are already defined in the AWS Glue Data Catalog). Data definition language (DDL) statements used to define an external table include a location attribute to address S3 buckets and prefixes containing the dataset, which could be in common file formats like ORC, Parquet, AVRO, CSV, JSON, or plain text. Compressed and columnar file formats like Apache Parquet are preferred because they provide less storage usage and better performance.

For a data catalog, we could use AWS Glue or an external hive metastore. For this post, we use AWS Glue.

S3 Lifecycle rules

Amazon S3 storage classes include S3 Standard, S3-IA, S3 One-Zone, S3 Intelligent-Tiering, S3 Glacier, and S3 Glacier Deep Archive. For our use case, we need to keep data accessible for queries for 5 years and with high durability, so we consider only S3 Standard and S3-IA for this time frame, and S3 Glacier only for long term (5–12 years). Data access to S3 Glacier requires data retrieval in the range of minutes (if using expedited retrieval) and this can’t be matched with the ability to query data. We can adopt Glacier for very cold data if you implement a manual process to first restore the Glacier archive to a temporary S3 bucket, and then query this data defined via an external table.

S3 Glacier Select allows you to query on data directly in S3 Glacier, but it only supports uncompressed CSV files. Because the objective of this post is to propose a cost-efficient solution, we didn’t consider it. If for any reason you have constraints for storing in CSV file format (instead of compressed formats like Parquet), Glacier Select might also be a good fit.

Excluding retrieval costs, the cost for storage for S3-IA is typically around 45% cheaper than S3 Standard, and S3 Glacier is 68% cheaper than S3-IA. For updated pricing information, see Amazon S3 pricing.

We don’t use S3 Intelligent Tiering because it bases storage transition on the last access time, and this resets every time we need to query the data. We use the S3 Lifecycle rules that are based either on creation time or prefix or tag matching, which is consistent regardless of data access patterns.

Simulated use case and retention policy

For our use case, we need to implement the data retention strategy for trip records outlined in the following table.

Corporate Rule Dataset Start Dataset end Data Storage Engine
Last 6 months in Redshift Spectrum December 2019 May 2020 Amazon Redshift local tables Amazon Redshift
Months 6–11 in Amazon S3 June 2019 November 2019 S3 Standard Redshift Spectrum
Months 12–14 in S3-IA March 2019 May 2019 S3-IA Redshift Spectrum
After month 15 January 2019 February 2019 Glacier N/A

For this post, we create a new table in a new Amazon Redshift cluster and load a public dataset. We use the New York City Taxi and Limousine Commission (TLC) Trip Record Data because it provides the required historical depth.

We use the Green Taxi Trip Records, based on monthly CSV files containing 20 columns with fields like vendor ID, pickup time, drop-off time, fare, and other information. 

Preparing the dataset and setting up the Amazon Redshift environment

As a first step, we create an AWS Identity and Access Management (IAM) role for Redshift Spectrum. This is required to allow access to Amazon Redshift to Amazon S3 for querying and loading data, and also to allow access to the AWS Glue Data Catalog whenever we create, modify, or delete a new external table.

  1. Create a role named BlogSpectrumRole.
  2. Edit the following two JSON files, which have the IAM policies according to the bucket and prefix used in this post, as needed, and attach the policies to the role you created:
    S3-Lakehouse-Policy.JSON

    	{
    	    "Version": "2012-10-17",
    	    "Statement": [
    	        {
    	            "Sid": "VisualEditor0",
    	            "Effect": "Allow",
    	            "Action": "s3:*",
    	            "Resource": [
    	                "arn:aws:s3:::rs-lakehouse-blog-post",
    	                "arn:aws:s3:::rs-lakehouse-blog-post/extract_longterm/*",
    	                "arn:aws:s3:::rs-lakehouse-blog-post/extract_midterm/*",
    	                "arn:aws:s3:::rs-lakehouse-blog-post/extract_shortterm/*",
    	    "arn:aws:s3:::rs-lakehouse-blog-post/accesslogs/*"
    	            ]
    	        }
    	    ]
    	}

    Glue-Lakehouse-Policy.JSON

    {
    	"Version": "2012-10-17",
    	"Statement": [
    		{
    			"Sid": "VisualEditor0",
    			"Effect": "Allow",
    			"Action": [
    				"glue:CreateDatabase",
    				"glue:DeleteDatabase",
    				"glue:GetDatabase",
    				"glue:GetDatabases",
    				"glue:UpdateDatabase",
    				"glue:CreateTable",
    				"glue:DeleteTable",
    				"glue:BatchDeleteTable",
    				"glue:UpdateTable",
    				"glue:GetTable",
    				"glue:GetTables",
    				"glue:BatchCreatePartition",
    				"glue:CreatePartition",
    				"glue:DeletePartition",
    				"glue:BatchDeletePartition",
    				"glue:UpdatePartition",
    				"glue:GetPartition",
    				"glue:GetPartitions",
    				"glue:BatchGetPartition"
    				],
    			"Resource": [
    				"*"
    			]
    		}
    	]
    }

Now you create a single-node Amazon Redshift cluster based on a DC2.large instance type, attaching the newly created IAM role BlogSpectrumRole.

  1. On the Amazon Redshift console, choose Create cluster.
  2. Keep the default cluster identifier redshift-cluster-1.
  3. Choose the node type DC2.large.
  4. Set the configuration to single node.
  5. Keep the default dbname and ports.
  6. Set a primary user password.
  7. Choose Create cluster.
  8. After the cluster is configured, check the attached IAM role on the Properties tab for the cluster.
  9. Take note of the IAM role ARN, because you use it to create external tables.

Copying data into Amazon Redshift

To connect to Amazon Redshift, you can use a free client like SQL Workbench/J or use the AWS console embedded query editor with the previously created credentials.

  1. Create a table according to the dataset schema using the following DDL statement:
    	create table greentaxi(
    	vendor_id integer,
    	pickup timestamp,
    	dropoff timestamp,
    	storeandfwd char(2),
    	ratecodeid integer,
    	pulocid integer,
    	dolocid integer,
    	passenger_count integer,
    	trip_dist real,
    	fare_amount real,
    	extra real,
    	mta_tax real,
    	tip_amount real,
    	toll_amount real,
    	ehail_fee real,
    	improve_surch real,
    	total_amount real,
    	pay_type integer,
    	trip_type integer,
    	congest_surch real
    );

The most efficient method to load data into Amazon Redshift is using the COPY command, because it uses the distributed architecture (each slice can ingest one file at the same time).

  1. Load year 2020 data from January to June in the Amazon Redshift table with the following command (replace the IAM role value with the one you created earlier):
    copy greentaxi from ‘s3://nyc-tlc/trip data/green_tripdata_2020’ 
    	iam_role ‘arn:aws:iam::123456789012:role/BlogSpectrumRole’ 
    	delimiter ‘,’ 
    	CSV 
    	dateformat ‘auto’
    	region ‘us-east-1’ 
    ignoreheader as 1;

Now the greentaxi table includes all records starting from January 2019 to June 2020. You’re now ready to leverage Redshift Spectrum and S3 storage classes to save costs.

Extracting data from Amazon Redshift

You perform the next steps using the AWS Command Line Interface (AWS CLI). For download and installation instructions, see Installing, updating, and uninstalling the AWS CLI version 2.

Use the AWS CONFIGURE command to set the access key and secret access key of your IAM user and your selected AWS Region (same as your S3 buckets) of your Amazon Redshift cluster.

In this section, we evaluate two different use cases:

  • New customer – As a new customer, you don’t have much Amazon Redshift old data, and want to extract only the oldest monthly data and apply a lifecycle policy based on creation date. Storage tiering only affects future data and is fully automated.
  • Old customer – In this use case, you come from a multi-year data growth, and need to move existing Amazon Redshift data to different storage classes. In addition, you want a fully automated solution but with the ability to override and decide what and when to transition data between S3 storage classes. This requirement is due to many factors, like the GDPR rule “right to be forgotten.” You may need to edit historical data to remove specific customer records, which changes the file creation date. For this reason, you need S3 Lifecycle rules based on tagging instead of creation date.

New customer use case

The UNLOAD command uses the result of an embedded SQL query to extract data from Amazon Redshift to Amazon S3, producing different file formats such as CSV, text, and Parquet. To extract data from January 2019, complete the following steps:

  1. Create a destination bucket like the following:
    aws s3 mb s3://rs-lakehouse-blog-post

  2. Create a folder named archive in the destination bucket rs-lakehouse-blog-post:
    aws s3api put-object --bucket rs-lakehouse-blog-post -–key archive

  3. Use the following SQL code to implement the UNLOAD statement. The SELECT on Data data types requires quoting as well as a SELECT statement embedded in the UNLOAD command:
    unload ('select * from greentaxi where pickup between ''2019-01-01 00:00:00'' and ''2019-01-30 23:59:59''')
    to 's3://rs-lakehouse-blog-post/archive/5to12years_taxi'
    iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole'; 

    1. You can perform a check with the AWS CLI:
      aws s3 ls s3://rs-lakehouse-blog-post/archive/
      2020-10-12 14:49:51          0 
      2020-11-04 09:51:00   34792620 5to12years_taxi0000_part_00
      2020-11-04 09:51:00   34792738 5to12years_taxi0001_part_00

  • The output shows that the UNLOAD statement generated two files of 33 MB each. By default, UNLOAD generates at least one file for each slice in the Amazon Redshift cluster. My cluster is a single node with DC2 type instances with two slices. This default file format is text, which is not storage optimized.

    To simplify the process, you create a single file for each month so that you can later apply lifecycle rules to each file. In real-world scenarios, extracting data with a single file isn’t the best practice in terms of performance optimization. This is just to simplify the process for the purpose of this post.

    1. Create your files with the following code:
      unload ('select * from greentaxi where pickup between ''2019-01-01 00:00:00'' and ''2019-01-31 23:59:59''')
      to 's3://rs-lakehouse-blog-post/archive/green_tripdata_2019-01'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    The output of the UNLOAD commands is a single file (per month) in Parquet format, which takes 80% less space than the previous unload. This is important to save costs related to both Amazon S3 and Glacier, but also for costs associated to Redshift Spectrum queries, which is billed by amount of data scanned.

    1. You can check how efficient Parquet is compared to text format:
      aws s3 ls s3://rs-lakehouse-blog-post/archive/

      2020-08-12 17:17:42   14523090 green_tripdata_2019-01000.parquet 

    1. Clean up previous the text files:
      aws s3 rm s3://rs-lakehouse-blog-post/ --recursive \
      --exclude "*" --include "archive/5to12*"

      The next step is creating a lifecycle rule based on creation date to automate the migration to S3-IA after 12 months and to Glacier after 15 months. The proposed policy name is 12IA-15Glacier and it’s filtered on the prefix archive/.

    1. Create a JSON file containing the lifecycle policy definition named json:
      {
      	"Rules": [
      		{
      			"ID": "12IA-15Glacier",
      			"Filter": {
      				"Prefix": "archive"
      				},
      			"Status": "Enabled",
      			"Transitions": [
      			{
      				"Days": 365,
      				"StorageClass": "STANDARD_IA"
      			},
      			{
      	                    	"Days": 548,
      	                    	"StorageClass": "GLACIER"
      		}    
      		  ]
      }

    1. Run the following command to send the JSON file to Amazon S3:
      aws s3api put-bucket-lifecycle-configuration \ 
      --bucket rs-lakehouse-blog-post \ 
      --lifecycle-configuration file://lifecycle.json

    This lifecycle policy migrates all keys in the archive prefix from Amazon S3 to S3-IA after 12 months and from S3-IA to Glacier after 15 months. For example, if today were 2020-09-12, and you unload the 2020-03 data to Amazon S3, by 2021-09-12, this 2020-03 data is automatically migrated to S3-IA.

    If using this basic use case, you can skip the partition steps in the section Defining the external schema and external tables.

    Old customer use case

    In this use case, you extract data with different ageing in the same time frame. You extract all data from January 2019 to February 2019 and, because we assume that you aren’t using this data, archive it to S3 Glacier.

    Data from March 2019 to May 2019 is migrated as an external table on S3-IA, and data from June 2019 to November 2019 is migrated as an external table to S3 Standard. With this approach, you comply with customer long-term retention policies and regulations, and reduce TCO.

    You implement the retention strategy described in the Simulated use case and retention policy section.

    1. Create a destination bucket (if you also walked through the first use case, use a different bucket):
      aws s3 mb s3://rs-lakehouse-blog-post

    2. Create three folders named extract_longtermextract_midterm, and extract_shortterm in the destination bucket rs-lakehouse-blog-post. The following code is the syntax for creating the extract_longterm folder:
      aws s3api put-object --bucket rs-lakehouse-blog-post –key

    3. Extract the data:
      unload ('select * from greentaxi where pickup between ''2019-01-01 00:00:00'' and ''2019-01-31 23:59:59''')
      	to 's3://rs-lakehouse-blog-post/extract_longterm/green_tripdata_2019-01'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    4. Repeat these steps for the February 2019 time frame.

    Managing data ageing with Amazon S3 storage classes and lifecycle policies

    In this section, you manage your data with storage classes and lifecycle policies.

    1. Migrate your keys in Parquet format to Amazon Glacier:
      aws s3api copy-object \
      --copy-source rs-lakehouse-blog-post/extract_longterm/green_tripdata_2019-01000.parquet \
      --storage-class GLACIER \
      --bucket rs-lakehouse-blog-post \
      --key extract_longterm/green_tripdata_2019-01000.parquet
      	 
      aws s3api copy-object \
      --copy-source rs-lakehouse-blog-post/extract_longterm/green_tripdata_2019-02000.parquet \
      --storage-class GLACIER \
      --bucket rs-lakehouse-blog-post \
      --key extract_longterm/green_tripdata_2019-02000.parquet

    2. Extract the data from March 2019 to May 2019 (months 12–15) and migrate them to S3-IA. The following code is for March:
      unload ('select * from greentaxi where pickup between ''2019-03-01 00:00:00'' and ''2019-03-31 23:59:59''')
      to 's3://rs-lakehouse-blog-post/extract_midterm/03/green_tripdata_2019-03'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    3. Repeat the previous step for April and May.
    4. Migrate all three months to S3-IA using same process as before. The following code is for March:
      aws s3api copy-object \
      --copy-source rs-lakehouse-blog-post/extract_midterm/03/green_tripdata_2019-03000.parquet \
      --storage-class STANDARD_IA \ 8. --bucket rs-lakehouse-blog-post \
      --key extract_midterm/03/green_tripdata_2019-03000.parquet 

    5. Do the same for other two months.
    6. Check the newly applied storage class with the following AWS CLI command:
      aws s3api head-object \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/03/green_tripdata_2019-03000.parquet
       
      {
          "AcceptRanges": "bytes",
          "LastModified": "2020-10-12T13:47:32+00:00",
          "ContentLength": 14087514,
          "ETag": "\"15bf39e6b3f32b10ef589d75e0988ce6\"",
          "ContentType": "application/x-www-form-urlencoded; charset=utf-8",
          "Metadata": {},
          "StorageClass": "STANDARD_IA"
      }

    In the next step, you tag every monthly file with a key value named ageing set to the number of months elapsed from the origin date.

    1. Set March to 14, April to 13, and May to 12:
      aws s3api put-object-tagging \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/03/green_tripdata_2019-03000.parquet \
      --tagging '{"TagSet": [{ "Key": "ageing", "Value": "14"} ]}'
      	 
      aws s3api put-object-tagging \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/04/green_tripdata_2019-04000.parquet \
      --tagging '{"TagSet": [{ "Key": "ageing", "Value": "13"} ]}'
      	 
      aws s3api put-object-tagging \
      --bucket rs-lakehouse-blog-post \
      --key extract_midterm/05/green_tripdata_2019-05000.parquet \
      --tagging '{"TagSet": [{ "Key": "ageing", "Value": "12"} ]}'

    In this set of three objects, the oldest file has the tag ageing set to value 14, and the newest is set to 12. In the second post in this series, you discover how to manage the ageing tag as it increases month by month.

    The next step is to create a lifecycle rule based on this specific tag in order to automate the migration to Glacier at month 15. The proposed policy name is 15IAtoGlacier and the definition is to limit the scope to only object with the tag ageing set to 15 in the specific bucket.

    1. Create a JSON file containing the lifecycle policy definition named json:
      {
          "Rules": [
              {
                  "ID": "15IAtoGlacier",
                  "Filter": {
                      "Tag": {
                          "Key": "ageing",
                          "Value": "15"
                      }
                  },
                  "Status": "Enabled",
                  "Transitions": [
                      {
                          "Days": 1,
                          "StorageClass": "GLACIER"
                      }
                  ]
              }
          ]
      }

      
      

    2. Run the following command to send the JSON file to Amazon S3:
      aws s3api put-bucket-lifecycle-configuration \
      --bucket rs-lakehouse-blog-post \
      --lifecycle-configuration file://lifecycle.json 

    This lifecycle policy migrates all objects with the tag ageing set to 15 from S3-IA to Glacier.

    Though I described this process as automating the migration, I actually want to control the process from the application level using the self-managed tag mechanism. I use this approach because otherwise, the transition is based on file creation date, and the objective is to be able to delete, update, or create a new file whenever needed (for example, to delete parts of records in order to comply to the GDPR “right to be forgotten” rule).

    Now you extract all data from June 2019 to November 2019 (7–11 months old) and keep them in Amazon S3 with a lifecycle policy to automatically migrate to S3-IA after ageing 12 months, using same process as described. These six new objects also inherit the rule created previously to migrate to Glacier after 15 months. Finally, you set the ageing tag as described before.

    Use the extract_shortterm prefix for these unload operations.

    1. Unload June 2019 with the following code:
      unload ('select * from greentaxi where pickup between ''2019-06-01 00:00:00 '' and ''2019-06-30 23:59:59''')
      to 's3://rs-lakehouse-blog-post/extract_shortterm/06/green_tripdata_2019-06'
      iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;

    2. Use the same logic for the remaining months up to October.
    3. For November, see the following code:
      	unload ('select * from greentaxi where pickup between ''2019-11-01 00:00:00'' and ''2019-11-30 23:59:59''')
      	to 's3://rs-lakehouse-blog-post/extract_shortterm/11/green_tripdata_2019-11''
      	iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole' parquet parallel off;
      
      aws s3 ls –recursive s3://rs-lakehouse-blog-post/extract_shortterm/
      2020-10-12 14:52:11          0 extract_shortterm/
      2020-10-12 18:45:42          0 extract_shortterm/06/
      2020-10-12 18:46:49   10889436 extract_shortterm/06/green_tripdata_2019-06000.parquet
      2020-10-12 18:45:53          0 extract_shortterm/07/
      2020-10-12 18:47:03   10759747 extract_shortterm/07/green_tripdata_2019-07000.parquet
      2020-10-12 18:45:58          0 extract_shortterm/08/
      2020-10-12 18:47:24    9947793 extract_shortterm/08/green_tripdata_2019-08000.parquet
      2020-10-12 18:46:03          0 extract_shortterm/09/
      2020-10-12 18:47:45   10302432 extract_shortterm/09/green_tripdata_2019-09000.parquet
      2020-10-12 18:46:08          0 extract_shortterm/10/
      2020-10-12 18:48:00   10659857 extract_shortterm/10/green_tripdata_2019-10000.parquet
      2020-10-12 18:46:11          0 extract_shortterm/11/
      2020-10-12 18:48:14   10247201 extract_shortterm/11/green_tripdata_2019-11000.parquet

      aws s3 ls --recursive rs-lakehouse-blog-post/extract_longterm/

      2020-10-12 14:49:51          0 extract_longterm/
      2020-10-12 14:56:38   14403710 extract_longterm/green_tripdata_2019-01000.parquet
      2020-10-12 15:30:14   13454341 extract_longterm/green_tripdata_2019-02000.parquet

    4. Apply the tag ageing with range 11 to 6 (June 2019 to November 2019), using either the AWS CLI or console if you prefer.
    5. Create a new lifecycle rule named 12S3toS3IA, which transitions from Amazon S3 to S3-IA.
    6. With the AWS CLI, create a JSON file that includes the previously defined rule 15IAtoGlacier and new 12S3toS3IA, because the command s3api overwrites the current configuration (no incremental approach) with the new policy definition file (JSON). The following code is the new lifecycle.json:
      {
          "Rules": [
              {
                  "ID": "12S3toS3IA",
                  "Filter": {
                      "Tag": {
                          "Key": "ageing",
                          "Value": "12"
                      }
                  },
                  "Status": "Enabled",
                  "Transitions": [
                      {
                          "Days": 30,
                          "StorageClass": "STANDARD_IA"
                      }
                  ]
              },
              {
                  "ID": "15IAtoGlacier",
                  "Filter": {
                      "Tag": {
                          "Key": "ageing",
                          "Value": "15"
                      }
                  },
                  "Status": "Enabled",
                  "Transitions": [
                      {
                          "Days": 1,
                          "StorageClass": "GLACIER"
                      }
                  ]
              }
          ]
      }

      aws s3api get-bucket-lifecycle-configuration \
      --bucket rs-lakehouse-blog-post

    7. Check the applied policies with the following command:
      aws s3api get-bucket-lifecycle-configuration \
      --bucket rs-lakehouse-blog-post

  • You get in stdout a single JSON with merge of 15IAtoGlacier and 12S3toS3IA.

    Defining the external schema and external tables

    Before deleting the records you extracted from Amazon Redshift with the UNLOAD command, we define the external schema and external tables to enable Redshift Spectrum queries for these Parquet files.

    1. Enter the following code to create your schema:
      create external schema taxispectrum
      	from data catalog
      	database 'blogdb'
      	iam_role 'arn:aws:iam::123456789012:role/BlogSpectrumRole'
      create external database if not exists;

    2. Create the external table taxi_archive in the taxispectrum external schema. If you’re walking through the new customer use case, replace the prefix extract_midterm with archive:
      create external table taxispectrum.taxi_archive(
      	vendor_id integer,
      	pickup timestamp,
      	dropoff timestamp,
      	storeandfwd char(2),
      	ratecodeid integer,
      	pulocid integer,
      	dolocid integer,
      	passenger_count integer,
      	trip_dist real,
      	fare_amount real,
      	extra real,
      	mta_tax real,
      	tip_amount real,
      	toll_amount real,
      	ehail_fee real,
      	improve_surch real,
      	total_amount real,
      	pay_type integer,
      	trip_type integer,
      	congest_surch real)
      	partitioned by (yearmonth char(7))
      	stored as parquet
      location 's3://rs-lakehouse-blog-post/extract_midterm/'

    3. Add the six files stored in Amazon S3 and three files stored in S3-IA as partitions (if you’re walking through the new customer use case, you can skip the following partitioning steps). The following code shows March and April:
      ALTER TABLE taxispectrum.taxi_archive
      ADD PARTITION (yearmonth='2019-03') 
      LOCATION 's3://rs-lakehouse-blog-post/extract_midterm/03/';
      ALTER TABLE taxispectrum.taxi_archive
      ADD PARTITION (yearmonth='2019-04') 
      LOCATION 's3://rs-lakehouse-blog-post/extract_midterm/04/';

    4. Continue this process up to December 2019, using extract_shortterm instead of extract_midterm.
    5. Check the table isn’t empty with the following SQL statement:
      Select count (*) from taxispectrum.taxi_archive 

    You get the number of entries in this external table.

    1. Optionally, you can check the partitions mapped to this table with a query to the Amazon Redshift internal table:
      select * from svv_external_partitions

      Run a SELECT command using partitioning in order to optimize costs related to Redshift Spectrum scanning:

      select * from taxispectrum.taxi_archive where yearmonth='2019-11' and fare_amount > 20

    2. Redshift Spectrum scans only specific partitions matching yearmonth.

    The final step is cleaning all the records extracted from the Amazon Redshift local tables:

    delete from public.greentaxi where pickup between '2019-01-01 00:00:00' and '2019-11-30 23:59:59'

    Conclusion

    We demonstrated how to extract historical data from Amazon Redshift and implement an archive strategy with Redshift Spectrum and Amazon S3 storage classes. In addition, we showed how to optimize Redshift Spectrum scans with partitioning.

    In the next post in this series, we show how to operate this solution day by day, especially for the old customer use case, and share some best practices.


    About the Authors

    Cristian Gavazzeni is a senior solution architect at Amazon Web Services. He has more than 20 years of experience as a pre-sales consultant focusing on Data Management, Infrastructure and Security. During his spare time he likes eating Japanese food and travelling abroad with only fly and drive bookings.

     

     

    Francesco MarelliFrancesco Marelli is a senior solutions architect at Amazon Web Services. He has lived and worked in London for 10 years, after that he has worked in Italy, Switzerland and other countries in EMEA. He is specialized in the design and implementation of Analytics, Data Management and Big Data systems, mainly for Enterprise and FSI customers. Francesco also has a strong experience in systems integration and design and implementation of web applications. He loves sharing his professional knowledge, collecting vinyl records and playing bass.

    Building a Controlled Environment Agriculture Platform

    Post Syndicated from Ashu Joshi original https://aws.amazon.com/blogs/architecture/building-a-controlled-environment-agriculture-platform/

    This post was co-written by Michael Wirig, Software Engineering Manager at Grōv Technologies.

    A substantial percentage of the world’s habitable land is used for livestock farming for dairy and meat production. The dairy industry has leveraged technology to gain insights that have led to drastic improvements and are continuing to accelerate. A gallon of milk in 2017 involved 30% less water, 21% less land, a 19% smaller carbon footprint, and 20% less manure than it did in 2007 (US Dairy, 2019). By focusing on smarter water usage and sustainable land usage, livestock farming can grow to provide sustainable and nutrient-dense food for consumers and livestock alike.

    Grōv Technologies (Grōv) has pioneered the Olympus Tower Farm, a fully automated Controlled Environment Agriculture (CEA) system. Unique amongst vertical farming startups, Grōv is growing cattle feed to improve that sustainable use of land for livestock farming while increasing the economic margins for dairy and beef producers.

    The challenges of CEA

    The set of growing conditions for a CEA is called a “recipe,” which is a combination of ingredients like temperature, humidity, light, carbon dioxide levels, and water. The optimal recipe is dynamic and is sensitive to its ingredients. Crops must be monitored in near-real time, and CEAs should be able to self-correct in order to maintain the recipe. To build a system with these capabilities requires answers to the following questions:

    • What parameters are needed to measure for indoor cattle feed production?
    • What sensors enable the accuracy and price trade-offs at scale?
    • Where do you place the sensors to ensure a consistent crop?
    • How do you correlate the data from sensors to the nutrient value?

    To progress from a passively monitored system to a self-correcting, autonomous one, the CEA platform also needs to address:

    • How to maintain optimum crop conditions
    • How the system can learn and adapt to new seed varieties
    • How to communicate key business drivers such as yield and dry matter percentage

    Grōv partnered with AWS Professional Services (AWS ProServe) to build a digital CEA platform addressing the challenges posed above.

    Olympus Tower - Grov Technologies

    Tower automation and edge platform

    The Olympus Tower is instrumented for measuring recipe ingredients by combining the mechanical, electrical, and domain expertise of the Grōv team with the IoT edge and sensor expertise of the AWS ProServe team. The teams identified a primary set of features such as height, weight, and evenness of the growth to be measured at multiple stages within the Tower. Sensors were also added to measure secondary features such as water level, water pH, temperature, humidity, and carbon dioxide.

    The teams designed and developed a purpose-built modular and industrial sensor station. Each sensor station has sensors for direct measurement of the features identified. The sensor stations are extended to support indirect measurement of features using a combination of Computer Vision and Machine Learning (CV/ML).

    The trays with the growing cattle feed circulate through the Olympus Tower. A growth cycle starts on a tray with seeding, circulates through the tower over the cycle, and returns to the starting position to be harvested. The sensor station at the seeding location on the Olympus Tower tags each new growth cycle in a tray with a unique “Grow ID.” As trays pass by, each sensor station in the Tower collects the feature data. The firmware, jointly developed for the sensor station, uses AWS IoT SDK to stream the sensor data along with the Grow ID and metadata that’s specific to the sensor station. This information is sent every five minutes to an on-site edge gateway powered by AWS IoT Greengrass. Dedicated AWS Lambda functions manage the lifecycle of the Grow IDs and the sensor data processing on the edge.

    The Grōv team developed AWS Greengrass Lambda functions running at the edge to ingest critical metrics from the operation automation software running the Olympus Towers. This information provides the ability to not just monitor the operational efficiency, but to provide the hooks to control the feedback loop.

    The two sources of data were augmented with site-level data by installing sensor stations at the building level or site level to capture environmental data such as weather and energy consumption of the Towers.

    All three sources of data are streamed to AWS IoT Greengrass and are processed by AWS Lambda functions. The edge software also fuses the data and correlates all categories of data together. This enables two major actions for the Grōv team – operational capability in real-time at the edge and enhanced data streamed into the cloud.

    Grov Technologies - Architecture

    Cloud pipeline/platform: analytics and visualization

    As the data is streamed to AWS IoT Core via AWS IoT Greengrass. AWS IoT rules are used to route ingested data to store in Amazon Simple Sotrage Service (Amazon S3) and Amazon DynamoDB. The data pipeline also includes Amazon Kinesis Data Streams for batching and additional processing on the incoming data.

    A ReactJS-based dashboard application is powered using Amazon API Gateway and AWS Lambda functions to report relevant metrics such as daily yield and machine uptime.

    A data pipeline is deployed to analyze data using Amazon QuickSight. AWS Glue is used to create a dataset from the data stored in Amazon S3. Amazon Athena is used to query the dataset to make it available to Amazon QuickSight. This provides the extended Grōv tech team of research scientists the ability to perform a series of what-if analyses on the data coming in from the Tower Systems beyond what is available in the react-based dashboard.

    Data pipeline - Grov Technologies

    Completing the data-driven loop

    Now that the data has been collected from all sources and stored it in a data lake architecture, the Grōv CEA platform established a strong foundation for harnessing the insights and delivering the customer outcomes using machine learning.

    The integrated and fused data from the edge (sourced from the Olympus Tower instrumentation, Olympus automation software data, and site-level data) is co-related to the lab analysis performed by Grōv Research Center (GRC). Harvest samples are routinely collected and sent to the lab, which performs wet chemistry and microbiological analysis. Trays sent as samples to the lab are associated with the results of the analysis with the sensor data by corresponding Grow IDs. This serves as a mechanism for labeling and correlating the recipe data with the parameters used by dairy and beef producers – dry matter percentage, micro and macronutrients, and the presence of myco-toxins.

    Grōv has chosen Amazon SageMaker to build a machine learning pipeline on its comprehensive data set, which will enable fine tuning the growing protocols in near real-time. Historical data collection unlocks machine learning use cases for future detection of anomalous sensors readings and sensor health monitoring, as well.

    Because the solution is flexible, the Grōv team plans to integrate data from animal studies on their health and feed efficiency into the CEA platform. Machine learning on the data from animal studies will enhance the tuning of recipe ingredients that impact the animals’ health. This will give the farmer an unprecedented view of the impact of feed nutrition on the end product and consumer.

    Conclusion

    Grōv Technologies and AWS ProServe have built a strong foundation for an extensible and scalable architecture for a CEA platform that will nourish animals for better health and yield, produce healthier foods and to enable continued research into dairy production, rumination and animal health to empower sustainable farming practices.

    Dream11’s journey to building their Data Highway on AWS

    Post Syndicated from Pradip Thoke original https://aws.amazon.com/blogs/big-data/dream11s-journey-to-building-their-data-highway-on-aws/

    This is a guest post co-authored by Pradip Thoke of Dream11. In their own words, “Dream11, the flagship brand of Dream Sports, is India’s biggest fantasy sports platform, with more than 100 million users. We have infused the latest technologies of analytics, machine learning, social networks, and media technologies to enhance our users’ experience. Dream11 is the epitome of the Indian sports technology revolution.”

    Since inception, Dream11 has been a data-driven sports technology brand. 

    Since inception, Dream11 has been a data-driven sports technology brand. The systems that power Dream11, including their transactional data warehouse, run on AWS. As Dream11 hosts fantasy sports contests that are joined by millions of Indian sports fans, they have large volumes of transactional data that is organized in a well-defined Amazon Redshift data warehouse. Previously they were using 3rd party services to collect, analyze and build models over user interaction data combined with transactional data. Although this approach was convenient, it presented certain critical issues:

    • The approach wasn’t conducive to 360-degree user analytics. Dream11’s user interactions data wasn’t present on the cloud, where the rest of Dream11’s infrastructure and data were present (AWS, in this case). To get a complete picture of a user’s experience and journey, the user’s interaction data (client events) needs to be analyzed alongside their transactional data (server events). This is known as 360-degree user analytics.
    • It wasn’t possible to get accurate user journey funnel reports. Currently, there are limitations with every tool available on the market with respect to identifying and mapping a given user’s actions across multiple platforms (on the web, iOS, or Android), as well as multiple related apps. This use case is specifically important if your company is a parent to other companies.
    • The statistics on user behavior that Dream11 was getting weren’t as accurate as they wanted. Some of the popular services they were using for web & mobile analytics use the technique of sampling to be able to deal with high volumes of data. Although this is a well-regarded technique to deal with high volumes of data and provides reasonable accuracy in multiple cases, Dream11 wanted statistics to be as accurate as possible.
    • The analytics wasn’t real-time. Dream11 experiences intense use by their users just before and during the real-life sports matches, so real-time and near-real-time analytics is very critical for them. This need wasn’t sufficiently met by the plethora of services they were using.
    • Their approach was leading to high cost for custom analytics for Dream11’s user interactions data, consisting of hundreds of event types. Serverless query engines typically charge by the amount of data scanned and so it can get very expensive if events data isn’t organized properly in separate tables in a data lake to enable selective access.

    All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform.

    All these concerns and needs, led Dream11 to conclude that they needed their own centralized 360-degree analytics platform. Therefore, they embarked on the Data Highway project on AWS.

    This project has additional advantages. It is increasingly becoming important to store and process data securely. Having everything in-house can help Dream11 with data security and data privacy objectives. The platform enables 360-degree customer analytics, which further allows Dream11 to do intelligent user segmentation in-house and share only those segments (without exposing underlying transactional or interactions data) with third-party messaging service providers. 

    Design goals

    Dream11 had the following design goals for the project:

    • The system should be easy to maintain and should be able to handle a very high volume of data, consisting of billions of events and terabytes of data daily.
    • The cost should be low and should be pay-as-you-go.
    • Dream11’s web and mobile developers regularly create new types of events to capture new types of interactions. Whenever they add new types of events, they should be immediately available in the system for analytics, and their statistics should immediately reflect in relevant dashboards and reports without any human intervention.
    • Certain types of statistics (such as concurrency) should be available in real-time and near-real time—within 5 minutes or less.
    • Dream11 should be able to use custom logic to calculate key statistics. The analytics should be accurate—no more sampling.
    • The data for various events should be neatly organized in separate tables and analytics-friendly file formats.
    • Although Dream11 will have a common data lake, they shouldn’t be constrained to use a single analytics engine for all types of analytics. Different types of analytics engines excel for different types of queries.
    • The Product Management team should have access to views they commonly use in their decision-making process, such as funnels and user flow diagrams.
    • The system should be extensible by adding lanes in the system. Lanes allow you to reuse your basic setup without mixing events data for different business units. It also potentially allows you to study user behavior across different apps.
    • The system should be able to build 360-degree user profiles
    • The system should provide alerting on important changes to key business metrics.
    • Last but not the least, the system should be secure and reliable with 6 nines of availability guarantee.

    Data Highway architecture

    In less than 3 months, Dream11’s data team built a system that met all the aforementioned goals. The following diagram shows the high-level architecture.

    The following diagram shows the high-level architecture.

    For this project, they used the following components:

    The rest of this post explains the various design choices and trade-offs made by the Dream11’s data engineers. 

    Event ingestion, segregation, and organization

    Dream11 has several hundred event types. These events have common attributes and specific attributes. The following diagram shows the logical structure of these events.

    The following diagram shows the logical structure of these events.

    When the front end receives an event, it saves fields up to common attributes into a message and posts it to Kafka_AllEvents_CommonAttributes. This Kafka topic is the source for the following systems:

    • Apache HBase on Amazon EMR – Provides real-time concurrency analytics
    • Apache Druid – Provides near real-time dimensional analytics
    • Amazon Redshift – Provides session analytics

    The front end also saves events, as they are, into Kafka_AllEvents_AllAttributes. These events are further picked by Apache Ni-Fi, which forwards them to their respective topics. Apache Ni-Fi supports data routing, transformation, and system mediation logic using powerful and scalable directed graphs. Data is transformed and published to Kafka by using a combination of RouteOnAttribute and JoltTransformJSON processors (to parse JSON). Apache Ni-Fi basically reads event names and posts to the Kafka topic with matching names. If Kafka doesn’t have a topic with that name, it creates a new topic with that name. You can configure your Kafka brokers to auto-create a topic when a message is received for a non-existent topic.

    The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

      The following diagram illustrates the Amazon S3 sink connector per Kafka topic.

    The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

     

    The following diagram summarizes the overall design of the system for event ingestion, segregation, and organization.

    Storage, cataloging, ETL, and scheduling

    In this section, we discuss how Dream11 updates their AWS Glue Data Catalog, performs extract, transform, and load (ETL) jobs with Amazon EMR Presto, and uses Apache Airflow for schedule management.

    Updating the AWS Glue Data Catalog with metadata for the target table

    The AWS Glue Data Catalog provides a unified metadata repository across a variety of data sources and data formats. It provides out-of-the-box integration with Amazon S3, Amazon Relational Database Service (Amazon RDS), Amazon Redshift, Amazon Redshift Spectrum, Athena, Amazon EMR, and any application compatible with the Apache Hive metastore. You can create your table definitions one time and query across engines. For more information, see FAQ: Upgrading to the AWS Glue Data Catalog.

    Because this Data Catalog is accessible from multiple services that were going to be used for the Data Highway project, Dream11 decided to use it to register all the table definitions.

    Registering tables with AWS Glue Data Catalog is easy. You can use an AWS Glue crawler. It can infer schema from files in Amazon S3 and register a table in the Data Catalog. It works quite well, but Dream11 needed additional actions, such as automatically configuring Kafka Amazon S3 sink connectors etc. Therefore, they developed two Python based crawlers.

    The first Python based crawler runs every 2 hours and looks up Kafka topics. If it finds a new topic, it configures a Kafka Amazon S3 connector sink to dump its data to Amazon S3 every 30 minutes in JSON Gzip format. It also registers a table with Glue Data Catalog so that users can query the JSON data directly, if needed. 

    The second Python based crawler runs once a day and registers a corresponding table for each new table created that day to hold flattened data (Parquet, Snappy). It infers schemas and registers tables with the Data Catalog using its Table API. It adds customization needed by the Dream11 team to the metadata. It then creates Amazon EMR Presto ETL jobs to convert JSON, Gzip data to Parquet, Snappy, and registers them with Apache Airflow to run every 24 hours.

    ETL with Amazon EMR Presto

    Dream11 has a multi node, long-running, multi-purpose EMR cluster. They decided to run scheduled ETL jobs on it for the Data Highway project.

    ETL for an event table involves a simple SELECT FROM -> INSERT INTO command to convert JSON (Gzip) to Parquet (Snappy). Converted data takes up to 70% less space, results in 10 times improvement in Athena query performance. ETL happens once a day. Tables are partitioned by day.

    Data received on Kafka_AllEvents_CommonAttributes topic is loaded to Redshift. ETL involves SELECT FROM -> INSERT INTO to convert JSON (Gzip) to CSV, followed by Amazon Redshift COPY.

    Apache Airflow for schedule management

    Apache Airflow is an open-source tool for authoring and orchestrating big data workflows. With Apache Airflow, data engineers define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. Workflows are designed as a DAG that groups tasks that run independently. The DAG keeps track of the relationships and dependencies between tasks.

    Dream11 uses Apache Airflow to schedule Python scripts and over few hundred ETL jobs on Amazon EMR Presto to convert JSON (Gzip) data for over few hundred events to Parquet (Snappy) format, and converts JSON data containing common attributes for all events to CSV before loading to Amazon Redshift. For more information, see Orchestrate big data workflows with Apache Airflow, Genie, and Amazon EMR: Part 1.

    The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

    The following diagram shows the workflow to connect Apache Airflow to Amazon EMR.

    The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling.

    The following diagram summarizes the overall design of the system for storage, cataloging, ETL, and scheduling. 

    Real-time and near-real-time analytics

    In this section, we discuss the real-time and near-real-time analytics performed on Dream11’s data.

    Concurrency analytics with Apache Druid

    Apache Druid is an OLAP-style data store. It computes facts and metrics against various dimensions while data is being loaded. This avoids the need to compute results when a query is run.

    Dream11’s web and mobile events are loaded from the Kafka_AllEvents_CommonAttributes topic to Apache Druid with the help of the Apache Druid Kafka indexing service. Dream11 has a dashboard with different granularity levels and dimensions such as app version, org, and other dimensions present in the common event attributes list.

    Finding active users with Amazon EMR HBase

    Dream11 also needs to identify individual active users at any given time or during a given window. This is required by other downstream teams such as the Data Science team and Digital User Engagement team.

    With the help of a Java consumer, they push all events from the Kafka_AllEvents_ CommonAttributes topic to HBase on an EMR cluster with just required user dimensions. They can query the data in HBase with SQL syntax supported by the Apache Phoenix interface. 

    Session analytics with Amazon Redshift

    Dream11 maintains their transactional data warehouse on Amazon Redshift multi node cluster. Amazon Redshift allows them to run complex SQL queries efficiently. Amazon Redshift would have been a natural choice for event analytics for hundreds of event types. However, in Dream11’s case, events data grows very rapidly and this would be a lot of data in Amazon Redshift. Also, this data loses its value rapidly as time passes (relatively speaking) compared with transactional data. Therefore, they decided to do only session analytics in Amazon Redshift to benefit from its complex SQL query capabilities and to do analytics for individual events with the help of Athena (which we discuss in the next section).

    Data received on Kafka_AllEvents_CommonAttributes is loaded into Amazon S3 every 30 minutes by the associated kafka connector sink. This data is in JSON format with Gzip compression. Every 24 hours, a job runs on Amazon EMR Presto that flattens this data into CSV format. The data is loaded into Amazon Redshift with the COPY command. The data gets loaded first into a staging table. Data in the staging table is aggregated to get sessions data. Amazon Redshift already has transactional data from other tables that, combined now with the session data, allows Dream11 to perform 360-degree user analytics. They can now easily segment users based on their interactions data and transactions data. They can then run campaigns for those users with the help of messaging platforms. 

    Event analytics with Athena

    Dream11 uses Athena to analyze the data in Amazon S3. Athena is an interactive query service that makes it easy to analyze data in Amazon S3 using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. It made perfect sense to organize data for over hundreds of event tables in Amazon S3 and analyze them with Athena on demand.

    With Athena, you’re charged based on the amount of data scanned by each query. You can get significant cost savings and performance gains by compressing, partitioning, or converting your data to a columnar format, because each of those operations reduces the amount of data that Athena needs to scan to run a query. For more information, see Top 10 Performance Tuning Tips for Amazon Athena.

    As discussed before, Dream11 has registered over hundreds of tables for events data in JSON format, and similar number of tables for events data in Parquet format with the AWS Glue Data Catalog. They observed a performance gain of 10 times on conversion of data format to Parquet, and an 80% savings in space. Data in Amazon S3 can be queried directly through the Athena UI with SQL queries. The other option they use is connecting to Athena using a JDBC driver from Looker and their custom Java UI for the Data Aware project.

    Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

      Athena helps Dream11 produce funnel analytics and user path analytics reports and visualizations.

     The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

     The following diagram summarizes the overall design of the system for real-time and near-real-time analytics and visualization.

     

    Conclusion

    This architecture has enabled Dream11 to achieve all the design goals they set out with. Results of analytics for real-time requirements are available under millisecond latency, and the system costs 40% less than the previous system. Analytics is performed with all the data without sampling, so results are accurate and reliable. All the data and analytics engines are within Dream11’s AWS account, improving data security and privacy.

    As of this writing, the system handles 14 TB of data per day and it has served 80 million requests per minute at peak during Dream11 IPL 2020.

    Doing all their analytics in-house on AWS has not just improved speed, accuracy, and data security, it has also enabled newer possibilities. Now Dream11 has a 360-degree view of their users. They can study their users’ progress across multiple platforms – web, Android, and IOS. This new system is enabling novel applications of machine learning, digital user engagement, and social media technologies at Dream11.


    About the Authors

    Pradip Thoke is a AVP Data Engineering at Dream11 and leads their Data Engineering team. The team involved in this implementation includes Vikas Gite, Salman Dhariwala, Naincy Suman, Lavanya Pulijala, Ruturaj Bhokre, Dhanraj Gaikwad, Vishal Verma, Hitesh Bansal, Sandesh Shingare, Renu Yadav, Yash Anand, Akshay Rochwani, Alokh P, Sunaim and Nandeesh Bijoor.

     

    Girish Patil is a Principal Architect AI, Big Data, India Scale Apps for Amazon.

    How FanDuel Group secures personally identifiable information in a data lake using AWS Lake Formation

    Post Syndicated from Damian Grech original https://aws.amazon.com/blogs/big-data/how-fanduel-group-secures-personally-identifiable-information-in-a-data-lake-using-aws-lake-formation/

    This post is co-written with Damian Grech from FanDuel

    FanDuel Group is an innovative sports-tech entertainment company that is changing the way consumers engage with their favorite sports, teams, and leagues. The premier gaming destination in the US, FanDuel Group consists of a portfolio of leading brands across gaming, sports betting, daily fantasy sports, advance-deposit wagering, and TV/media, including FanDuel, Betfair US, and TVG. FanDuel Group has a presence across 50 states and over 8.5 million customers. The company is based in New York with offices in California, New Jersey, Florida, Oregon, and Scotland. FanDuel Group is a subsidiary of Flutter Entertainment plc, the world’s largest sports betting and gaming operator with a portfolio of globally recognized brands and a constituent of the FTSE 100 index of the London Stock Exchange.

    In this post, we discuss how FanDuel used AWS Lake Formation and Amazon Redshift Spectrum to restrict access to personally identifiable information (PII) in their data lake.

    The challenge

    In 2018, a series of mergers led to the creation of FanDuel Group, and the combined data engineering team found themselves operating three data warehouses running on Amazon Redshift. The team decided to create a new single platform to replace the three separate warehouses, consisting of a data warehouse containing the core business data model and a data lake to catalog and hold all other types of data. FanDuel’s vision was to create an unified data platform that served their data requirements. This included the ability to ingest and organize real-time and batch datasets, and secure and govern PII.

    Because the end-users of the existing data warehouses were familiar with Amazon Redshift, it was critical that they be able to access the data lake using Amazon Redshift. Other important architecture considerations included a simplified user experience, the ability to scale to huge data volumes, and a robust security model to provision relevant data to analysts and data scientists.

    To accomplish the vision, FanDuel decided to modernize the data platform and introduce Amazon Simple Storage Service (Amazon S3)-based data lakes. Data lakes are a logical construct that allows data to be stored in its native format using open data formats. With a data lake architecture, FanDuel can enable data analysts to analyze large volume of data without significant modeling. Also, data lakes allow FanDuel to store structured and unstructured data.

    Some of the data to be stored in the data lake was customer PII, so access to this category of data needed to be carefully restricted to only employees who required access to perform their job functions. To address these security challenges, FanDuel first tested out a tag-based approach on Amazon S3 to restrict access to the PII data. The idea was to write two datasets for a single dataset—one with PII and another without PII—and apply tags for files where PII is stored, securing files using AWS Identity and Access Management (IAM) policies. This approach was complex and needed 100–200 hours of development time for every data source that was ingested.

    Solution overview

    FanDuel decided to use Lake Formation and Redshift Spectrum to solve this challenge. The following architectural diagram shows how FanDuel secured their data lake.

    The solution includes the following steps:

    1. The FanDuel team registered the S3 location in Lake Formation.

    After the location is registered, Lake Formation takes control of the data lake, thereby eliminating the need to set up complicated policies in IAM.

    1. FanDuel built AWS Glue ETL jobs to extract data from sources, including MySQL databases and flat files. They used AWS Glue to cleanse and transform raw data to form refined datasets stored in Parquet-formatted files. They also used AWS Glue crawlers to register the cleansed datasets in the Data Catalog.
    2. The team used Lake Formation to set up column-based permissions using two roles:
      1. LimitedPIIAnalyst – Granted access to all columns. Only analysts who needed access to PII data were assigned this role.
      2. NonPIIAnalyst – Granted access to non-PII columns. By default, analysts using the data lake were assigned this role.
    3. FanDuel created two external schemas using Redshift Spectrum: one using the NonPIIAnalyst role, and one using the LimitedPIIAnalyst The following code is an example of the DDL that uses the role that was set up in Lake Formation:
      CREATE EXTERNAL SCHEMA nonpii_data_lake FROM DATA CATALOG
      DATABASE 'fanduel_data_lake' REGION 'us-east-1'
      IAM_ROLE 'arn:aws:iam::123456789012:role/NonPIIAnalyst';
      
      CREATE EXTERNAL SCHEMA limitedpii_data_lake FROM DATA CATALOG
      DATABASE 'fanduel_data_lake' REGION 'us-east-1'
      IAM_ROLE 'arn:aws:iam::123456789012:role/LimitedPIIAnalyst';
      

    FanDuel could already manage access permissions by adding or removing users from a group in Amazon Redshift, so they already had a group consisting of only the analysts who should be permitted access to PII. The following code grants this group access to the limitedpii_data_lake schema, which effectively means only this group can query the data lake using the LimitedPIIAnalyst role:

    GRANT USAGE ON SCHEMA nonpii_data_lake TO base_group;
    GRANT SELECT ON ALL TABLES IN SCHEMA nonpii_data_lake TO base_group;
    GRANT USAGE ON SCHEMA limitedpii_data_lake TO pii_permitted_group;
    GRANT SELECT ON ALL TABLES IN SCHEMA limitedpii_data_lake TO pii_permitted_group;
    

    Benefits

    The ability to extend queries to the data lake with Redshift Spectrum and have column-level access control provides superior control over the S3 tag-based permissions approach that was originally considered. This architecture provided the following benefits for FanDuel:

    • FanDuel could offer new capabilities to data analysts. For example, data analysts could quickly access raw data with PII and combine it with existing data in Amazon Redshift. Lake Formation provided a single view for monitoring the data access patterns.
    • Lake Formation column-level access control allowed them to secure PII data, which otherwise would have taken a complex S3 tag-based approach. This saved 100–200 hours of development time for every new data source and data footprint, because the original approach required creating two files (one with PII and another without PII), tagging files, and setting up permissions based on tags.
    • The ability to extend access from Amazon Redshift to the data lake with appropriate access control has allowed FanDuel to reduce data stored in Amazon Redshift.

    Conclusion

    FanDuel will leverage its new data platform to ingest additional data sources with real-time data so analysts and data scientists can gain insights and improve customer experience.

    Questions or feedback? Send an email to [email protected].


    About the Authors

    Damian Grech is a Data Engineering Senior Manager at FanDuel. Damian has over 15 years of experience in software delivery and has worked with organizations ranging from large enterprises to start-ups at their infant stages. In his spare time, you can find him either experimenting in the kitchen or trailing the Scottish Highlands.

     

     

    Shiv Narayanan is Global Business Development Manager for Data Lakes and Analytics solutions at AWS. He works with AWS customers across the globe to strategize, build, develop and deploy modern data platforms. Shiv loves music, travel, food and trying out new tech.

     

     

     

    Sidhanth Muralidhar is a Senior Technical Account Manager at Amazon Web Services. He works with large enterprise customers who run their workloads on AWS. He is passionate about working with customers and helping them in their cloud journey. In his spare time, he loves to play and watch football.

     

     

     

     

     

     

    Ingesting Jira data into Amazon S3

    Post Syndicated from Vishwa Gupta original https://aws.amazon.com/blogs/big-data/ingesting-jira-data-into-amazon-s3/

    Consolidating data from a work management tool like Jira and integrating this data with other data sources like ServiceNow, GitHub, Jenkins, and Time Entry Systems enables end-to-end visibility of different aspects of the software development lifecycle and helps keep your projects on schedule and within budget.

    Amazon Simple Storage Service (Amazon S3) is an object storage service that offers industry-leading scalability, performance, security, and data availability. Many of our customers choose to build their data lakes on Amazon S3. They find the flexible, pay-as-you-go, cloud model ideal when dealing with vast amounts of heterogeneous data.

    This post discusses some of the use cases for ingesting Jira data into an Amazon S3 data lake, the ingestion data flow, and a conceptional approach to ingesting data. We also provide the relevant Python code.

    Use cases

    Business use cases for Jira data ingestion range from proactive project monitoring, detection and resolution of project effort and cost variances, and non-compliance of SDLC processes. In this section, we provide an inclusive but not exhaustive list of use cases and their benefits.

    Cognitive project monitoring

    Cognitive project monitoring use cases include the following:

    • Automated analytics – You can prevent and reduce project schedule and budget variances by proactively monitoring metrics by combining data from Jira, GitHub, Jenkins, and Time Entry Systems.
    • Automated status reporting – You can use Amazon SageMaker machine learning (ML) models to derive prescriptive metrics by looking at data across various sources. This could reduce a project manager’s time spent stitching data and generating reports, and provide a holistic view of project-tracking metrics.

    Automated project compliance and governance

    You can analyze user behavior to detect potentially suspicious patterns by building a baseline of user activity. You create this based on primary data from HR (such as role, location, and work hours) and IT infrastructure (such as an assigned asset’s IP address).

    Possible business outcomes include the following:

    • Proactively identify user IDs and passwords being shared with other users
    • Detect insider threats, such as abnormal login times, unauthorized access to Jira, and incorrect access permissions in Jira, GitHub, Jenkins, or Time Entry Systems
    • Identify compromised accounts based on frequent logins from unassigned assets or unusual successive authentications
    • Identify theft of corporate IPs based on unusual printing volume, printing project-related documents, and emailing organization-related documents and code to external accounts

    Accelerated migration from Jira to another project management product

    You can also use AWS Glue and its Data Catalog metadata to map between two products. This could increase your data migration.

    Overview of solution

    One of the most common approaches to ingest data from Jira into AWS is to create a Python module, which is used in AWS Glue or AWS Lambda. The following diagram shows the high-level approach for an end-to-end solution. In this solution, Glue Development Endpoint and respective SageMaker Jupyter Notebook instance are used to create the Jira Python module to facilitate Jupyter notebook experience, interactive testing and debugging capability. The scope of this post is limited to the following steps:

    • Setting up access for Jira
    • Using the Python model with AWS Lambda or AWS Glue
    • Incrementally pulling changed data from Jira with JQL (Jira Query Language)
    • Ingesting data to the AWS serverless data lake

    Ingesting data from Jira into Amazon S3

    The Jira server exposes data using REST APIs and open authorization (OAuth) authentication methods. It uses a three-legged OAuth approach (also called the OAuth dance) to acquire access to the resources served by the APIs. For more information about the following steps, see OAuth for REST APIs.

    Generating an RSA public/private key pair

    Consumer key and consumer secret details are required for interacting with API endpoints. You store the details inside encrypted SSM parameters.

    To use macOS or Linux, run the following OpenSSL commands in the terminal (anywhere in the file system):

    openssl genrsa -out jira_privatekey.pem 1024
    openssl req -newkey rsa:1024 -x509 -key jira_privatekey.pem -out jira_publickey.cer -days 365
    openssl pkcs8 -topk8 -nocrypt -in jira_privatekey.pem -out jira_privatekey.pcks8
    openssl x509 -pubkey -noout -in jira_publickey.cer > jira_publickey.pem

    To use Windows, download OpenSSL and run it using the path to the bin folder. Create a new environment variable named OPENSSL_CONF and the value "path_to"\openssl.cnf. Run the command as admin:

    "path_to_openssl"\bin\openssl genrsa -out jira_privatekey.pem 1024
    "path_to_openssl"\bin\openssl req -newkey rsa:1024 -x509 -key jira_privatekey.pem -out jira_publickey.cer -days 365
    "path_to_openssl"\bin\openssl pkcs8 -topk8 -nocrypt -in jira_privatekey.pem -out jira_privatekey.pcks8
    "path_to_openssl"\bin\openssl x509 -pubkey -noout -in jira_publickey.cer > jira_publickey.pem
    

    Configuring a REST API-based consumer in Jira

    For full instructions on configuring your REST API-based consumer, see Step 2: Configure your client application as an OAuth consumer in OAuth for REST APIs. Be sure to complete the following steps:

    1. In the Link applications section, select Create incoming link.
    2. For Public key, enter the public key you created earlier.

    Performing the OAuth dance

    In this step, you go through the process of getting the access token from the resource so the consumer can access the resource.

    1. Create the following parameters in the AWS Systems Manager Parameter Store:
      1. jira_access_private_key – Stores the private key in AWS Systems Manager as a parameter.
      2. jira_access_urls – Stores URLs to access Jira. These URLs are constructed based on display URLs defined in JIRA by adding the additional tags:
        • request_token_urlhttps://jiratoawss3.atlassian.net/plugins/servlet/oauth/request-token
        • access_token_urlhttps://jiratoawss3.atlassian.net/plugins/servlet/oauth/access-token
        • authorize_urlhttps://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize
        • data_urlhttps://jiratoawss3.atlassian.net/rest/api/2/search
      3. jira_access_secrets – Stores secrets to access Jira. Initially, only two values are present in this SSM parameter; it’s updated later with access_token. You need the following two parameters to start:
        1. consumer_key
        2. consumer_secret
    2. Download the notebook file and upload it to SageMaker notebook instance of AWS Glue Development Endpoint.:
      1. Below are the steps to set-up AWS Glue Development Endpoint
        1. In the AWS Glue console, choose Dev endpoints. Choose Add endpoint.
        2. Specify an endpoint name, such as demo-endpoint.
        3. Choose an IAM role with permissions similar to the IAM role that you use to run AWS Glue ETL jobs. For more information, see Create an IAM Role for AWS Glue. Choose Next.
        4. In Networking, leave Skip networking information selected, and choose Next.
        5. In SSH Public Key, enter a public key generated by an SSH key generator program, such as ssh-keygen (do not use an Amazon EC2 key pair). The generated public key will be imported into your development endpoint. Save the corresponding private key to later connect to the development endpoint using SSH. Choose Next. For more information, see ssh-keygen in Wikipedia.b. Once status of AWS Glue Development Endpoint is ready follow steps to set-up SageMaker notebooks with in your development endpoint.
      2. Once status of AWS Glue Development Endpoint is ready follow
      3. Once status shows Ready, open the notebook and upload the downloaded notebook file.

    You now run the following cells.

    1. Install and import dependent modules with the following code:
      !pip install tlslite
      !pip install oauth2
      
      import urllib
      import oauth2 as oauth
      from tlslite.utils import keyfactory
      import json
      import sys
      import os
      import base64
      import boto3
      from boto3.dynamodb.conditions import Key, Attr
      import datetime
      import logging
      import pprint
      import time
      from pytz import timezone
      
      logger = logging.getLogger()
      logger.setLevel(logging.INFO)
      

    2. Create an SSM client to connect parameters defined in Systems Manager (update the Region if it’s different than us-east-1):
      ssm = boto3.client("ssm", region_name='us-east-1')

    3. Define the signature class to sign the Jira REST API requests:
      class SignatureMethod_RSA_SHA1(oauth.SignatureMethod):
          name = 'RSA-SHA1'
      
          def signing_base(self, request, consumer, token):
              if not hasattr(request, 'normalized_url') or request.normalized_url is None:
                  raise ValueError("Base URL for request is not set.")
      
              sig = (
                  oauth.escape(request.method),
                  oauth.escape(request.normalized_url),
                  oauth.escape(request.get_normalized_parameters()),
              )
      
              key = '%s&' % oauth.escape(consumer.secret)
              if token:
                  key += oauth.escape(token.secret)
              raw = '&'.join(sig)
              return key, raw
      
          def sign(self, request, consumer, token):
      
              key, raw = self.signing_base(request, consumer, token)
      
              # SSM support to fetch private key
              ssm_param = ssm.get_parameter(Name='jira_access_private_key', WithDecryption=True)
              jira_private_key_str = ssm_param['Parameter']['Value']
      
              privateKeyString = jira_private_key_str.strip()
      
              privatekey = keyfactory.parsePrivateKey(privateKeyString)
      
              # Used encode() to convert to bytes
              signature = privatekey.hashAndSign(raw.encode())
              return base64.b64encode(signature) 
      

    4. Get the consumer_key and consumer_secret from the SSM parameter that you defined earlier:
      jira_secrets = json.loads(ssm.get_parameter(Name='jira_access_secrets_1', WithDecryption=True)['Parameter']['Value'])
      jira_secrets
      
      consumer_key = jira_secrets["consumer_key"]
      consumer_key
      
      consumer_secret = jira_secrets["consumer_secret"]
      consumer_secret

    5. Define the URLs for request_token_url, access_token_url, and authorize_url:
      request_token_url = 'input_here'
      access_token_url = 'input_here'
      authorize_url = 'input_here'
      

      These URLs are defined while setting up the Rest API endpoint in Jira. It includes the following components:

      • request_token_url – https://jiratoawss3.atlassian.net/plugins/servlet/oauth/request-token
      • access_token_url – https://jiratoawss3.atlassian.net/plugins/servlet/oauth/access-token
      • authorize_url – https://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize
    6. Generate your request token:
      # Create Consumer using consumer_key and consumer_secret
      consumer = oauth.Consumer(consumer_key, consumer_secret)
      
      # Use Consumer to create oauth client
      client = oauth.Client(consumer)
      
      # Add Signature Method to the client
      client.set_signature_method(SignatureMethod_RSA_SHA1())
      
      # Get response from request token URL using the client
      resp, content = client.request(request_token_url, "POST")
      
      # Convert the content received from previous step into a Dictionary
      request_token = dict(urllib.parse.parse_qsl(content))
      
      # request token has two components oauth_token and oauth_token_secret
      request_token
      

    You only need to do this one time every five years (the default setting in Jira).

    The following is an example request token value:

    b'oauth_token=oFUFV5cqOuoWycnaCXYrkcioHuRw2TbV&oauth_token_secret=CzhMoEsozCV3xFZ179YQoLzRu4DYQHlR'

    The following are example values after the URL parse and converted to dict:

    • {b'oauth_token': b'oFUFV5cqOuoWycnaCXYrkcioHuRw2TbV ',
    • b'oauth_token_secret': b'CzhMoEsozCV3xFZ179YQoLzRu4DYQHlR'}
    1. Manually approve the request token by opening the following user in a browser:
      authorize_url + '?oauth_token=' + request_token[b'oauth_token'].decode()

    An example value of the final authorized user is:

    • https://jiratoawss3.atlassian.net/plugins/servlet/oauth/authorize?oauth_token=wYLlIxmcsnZTHgTy2ZpUmBakqzmqSbww.

    When you go to the URL in your output, you see the following screenshot.

    1. Use an approved request token to generate an access token:
      # Create an oauth token using components of request token
      token = oauth.Token(request_token[b'oauth_token'], request_token[b'oauth_token_secret'])
      
      # Use Consumer and token to create oauth client
      client = oauth.Client(consumer, token)
      
      # Add Signature Method to the client
      client.set_signature_method(SignatureMethod_RSA_SHA1())
      
      # Get response from access token URL using the client
      access_token_resp, access_token_content = client.request(access_token_url, "POST")
      
      access_token_content

    An example access token is:

    b'oauth_token=Ym3UDrs1iYnLUZ1t0TkT1PinfJNN3RLj&oauth_token_secret=FYQfGjLLhbCJg3DXZFaKsE6wsURVfebN&oauth_expires_in=157680000&oauth_session_handle=BulouCOypjssDS3GzeY7Ldi30h0ERWDo&oauth_authorization_expires_in=160272000'

    The following are example access token values after URL parse and converted to dict:

    • {b'oauth_token': b'Ym3UDrs1iYnLUZ1t0TkT1PinfJNN3RLj',
    • b'oauth_token_secret': b'FYQfGjLLhbCJg3DXZFaKsE6wsURVfebN',
    • b'oauth_expires_in': b'157680000',
    • b'oauth_session_handle': b'BulouCOypjssDS3GzeY7Ldi30h0ERWDo',
    • b'oauth_authorization_expires_in': b'160272000'}

    oauth_authorization_expires_in states when the token expires (in seconds), which is generally 5 years.

    1. Update the access_token key in the SSM parameter jira_access_secrets with the value for access_token_content.

    This access token is valid for 5 years (the expires_in key of access_token_content states when the token expires in seconds). Rotating the access key depends on your organization’s security policy and is out of scope for this post.

    Using the access token, querying data from Jira, and storing data in Amazon S3

    The following are the important points in this step:

    • The Jira REST API returns 50 records at a time, but gives a total record count, which is used to paginate through the result set.
    • The Jira REST API endpoint needs to be updated with JQL filters. JQL allows you to only pick changed records.
    • Data returned from the REST API endpoint is serialized to Amazon S3. The Python code batches the records from Jira pages and commits after every four pages (which is configurable) have been fetched from Jira.
    • JQL is appended to the data_url (defined earlier). When pulling data from Jira, it’s good practice to do a one-time bulk load and get incremental loads by maintaining the last data pull date in an Amazon DynamoDB The following screenshot shows an example of tracking dates in DynamoDB by projects. Because it’s an hourly batch, last_ingest_date is rounded up to the hour.
    • The key attributes of JQL used for constructing JQL and pulling data from Jira are:
      • project – Loop through the projects from DynamoDB and pull data for one project at a time.
      • updated – Last update date for Jira story or task. Data pull from Jira is based on this date.
        • For bulk loads, updated is less than or equal to the batch run date, rounded up to the hour.
        • For incremental loads, updated is greater than or equal to last_ingest_date from DynamoDB, and less than or equal to the batch run date, rounded up to the hour.
        • To use date in JQL, we need to parse the date before we use it.
      • startAt – Jira generally paginates the results every 50 records. This attribute is used to loop through the complete data. For example, if a project has 500 records and page size is 50 records, this attribute is incremented by page size in every iteration, and it takes 10 iterations to get complete data.
      • maxResults – This is the page size set up in Jira (maximum number of records Jira returns in every API call).
    • Use the provided notebook to perform the OAuth dance. The sample code pulls data from Jira based on the approach you specified earlier. The purpose of this code is to accelerate implementing data ingestion from Jira.

    Cleaning up

    To avoid incurring future charges, delete the resources set up as part of this post:

    • AWS Glue Development Endpoint
    • DynamoDB table
    • Systems Manager parameters
    • S3 bucket

    Next steps

    To extend the usability scope of Jira data in S3 buckets, you can crawl the location to create AWS Glue Data Catalog database tables. Registering the locations with AWS Lake Formation helps simplify permission management and allows you to implement fine-grained access control. You can also use Amazon Athena, Amazon Redshift, Amazon SageMaker, and Amazon QuickSight for data analysis, ML, and reporting services.

    Conclusion

    This post aims to simplify and accelerate the steps to ingest Jira data into Amazon S3. The solution includes Jira configuration, performing the three-legged OAuth dance, JQL-based attributes for data selection, and Python-based data extraction into Amazon S3.

    If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS Glue forum.


    About the Authors

    Vishwa Gupta is a Data and ML Engineer with AWS Professional Services Intelligence Practice. He helps customers implement big data and analytics platform and solutions. Outside of work, he enjoys spending time with family, traveling, and playing badminton.

     

     

     

    Sreeram Thoom is a Data Architect at Amazon Web Services.

     

     

     

     

     

    Working with Lambda layers and extensions in container images

    Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/working-with-lambda-layers-and-extensions-in-container-images/

    In this post, I explain how to use AWS Lambda layers and extensions with Lambda functions packaged and deployed as container images.

    Previously, Lambda functions were packaged only as .zip archives. This includes functions created in the AWS Management Console. You can now also package and deploy Lambda functions as container images.

    You can use familiar container tooling such as the Docker CLI with a Dockerfile to build, test, and tag images locally. Lambda functions built using container images can be up to 10 GB in size. You push images to an Amazon Elastic Container Registry (ECR) repository, a managed AWS container image registry service. You create your Lambda function, specifying the source code as the ECR image URL from the registry.

    Lambda container image support

    Lambda container image support

    Lambda functions packaged as container images do not support adding Lambda layers to the function configuration. However, there are a number of solutions to use the functionality of Lambda layers with container images. You take on the responsible for packaging your preferred runtimes and dependencies as a part of the container image during the build process.

    Understanding how Lambda layers and extensions work as .zip archives

    If you deploy function code using a .zip archive, you can use Lambda layers as a distribution mechanism for libraries, custom runtimes, and other function dependencies.

    When you include one or more layers in a function, during initialization, the contents of each layer are extracted in order to the /opt directory in the function execution environment. Each runtime then looks for libraries in a different location under /opt, depending on the language. You can include up to five layers per function, which count towards the unzipped deployment package size limit of 250 MB. Layers are automatically set as private, but they can be shared with other AWS accounts, or shared publicly.

    Lambda Extensions are a way to augment your Lambda functions and are deployed as Lambda layers. You can use Lambda Extensions to integrate functions with your preferred monitoring, observability, security, and governance tools. You can choose from a broad set of tools provided by AWS, AWS Lambda Ready Partners, and AWS Partners, or create your own Lambda Extensions. For more information, see “Introducing AWS Lambda Extensions – In preview.”

    Extensions can run in either of two modes, internal and external. An external extension runs as an independent process in the execution environment. They can start before the runtime process, and can continue after the function invocation is fully processed. Internal extensions run as part of the runtime process, in-process with your code.

    Lambda searches the /opt/extensions directory and starts initializing any extensions found. Extensions must be executable as binaries or scripts. As the function code directory is read-only, extensions cannot modify function code.

    It helps to understand that Lambda layers and extensions are just files copied into specific file paths in the execution environment during the function initialization. The files are read-only in the execution environment.

    Understanding container images with Lambda

    A container image is a packaged template built from a Dockerfile. The image is assembled or built from commands in the Dockerfile, starting from a parent or base image, or from scratch. Each command then creates a new layer in the image, which is stacked in order on top of the previous layer. Once built from the packaged template, a container image is immutable and read-only.

    For Lambda, a container image includes the base operating system, the runtime, any Lambda extensions, your application code, and its dependencies. Lambda provides a set of open-source base images that you can use to build your container image. Lambda uses the image to construct the execution environment during function initialization. You can use the AWS Serverless Application Model (AWS SAM) CLI or native container tools such as the Docker CLI to build and test container images locally.

    Using Lambda layers in container images

    Container layers are added to a container image, similar to how Lambda layers are added to a .zip archive function.

    There are a number of ways to use container image layering to add the functionality of Lambda layers to your Lambda function container images.

    Use a container image version of a Lambda layer

    A Lambda layer publisher may have a container image format equivalent of a Lambda layer. To maintain the same file path as Lambda layers, the published container images must have the equivalent files located in the /opt directory. An image containing an extension must include the files in the /opt/extensions directory.

    An example Lambda function, packaged as a .zip archive, is created with two layers. One layer contains shared libraries, and the other layer is a Lambda extension from an AWS Partner.

    aws lambda create-function –region us-east-1 –function-name my-function \

    aws lambda create-function --region us-east-1 --function-name my-function \  
        --role arn:aws:iam::123456789012:role/lambda-role \
        --layers \
            "arn:aws:lambda:us-east-1:123456789012:layer:shared-lib-layer:1" \
            "arn:aws:lambda:us-east-1:987654321987:extensions-layer:1" \
        …
    

    The corresponding Dockerfile syntax for a function packaged as a container image includes the following lines. These pull the container image versions of the Lambda layers and copy them into the function image. The shared library image is pulled from ECR and the extension image is pulled from Docker Hub.

    FROM public.ecr.aws/myrepo/shared-lib-layer:1 AS shared-lib-layer
    # Layer code
    WORKDIR /opt
    COPY --from=shared-lib-layer /opt/ .
    
    FROM aws-partner/extensions-layer:1 as extensions-layer
    # Extension  code
    WORKDIR /opt/extensions
    COPY --from=extensions-layer /opt/extensions/ .
    

    Copy the contents of a Lambda layer into a container image

    You can use existing Lambda layers, and copy the contents of the layers into the function container image /opt directory during docker build.

    You need to build a Dockerfile that includes the AWS Command Line Interface to copy the layer files from Amazon S3.

    The Dockerfile to add two layers into a single image includes the following lines to copy the Lambda layer contents.

    FROM alpine:latest
    
    ARG AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION:-"us-east-1"}
    ARG AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID:-""}
    ARG AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY:-""}
    ENV AWS_DEFAULT_REGION=${AWS_DEFAULT_REGION}
    ENV AWS_ACCESS_KEY_ID=${AWS_ACCESS_KEY_ID}
    ENV AWS_SECRET_ACCESS_KEY=${AWS_SECRET_ACCESS_KEY}
    
    RUN apk add aws-cli curl unzip
    
    RUN mkdir -p /opt
    
    RUN curl $(aws lambda get-layer-version-by-arn --arn arn:aws:lambda:us-east-1:1234567890123:layer:shared-lib-layer:1 --query 'Content.Location' --output text) --output layer.zip
    RUN unzip layer.zip -d /opt
    RUN rm layer.zip
    
    RUN curl $(aws lambda get-layer-version-by-arn --arn arn:aws:lambda:us-east-1:987654321987:extensions-layer:1 --query 'Content.Location' --output text) --output layer.zip
    RUN unzip layer.zip -d /opt
    RUN rm layer.zip
    

    To run the AWS CLI, specify your AWS_ACCESS_KEY, and AWS_SECRET_ACCESS_KEY, and include the required AWS_DEFAULT_REGION as command-line arguments.

    docker build . -t layer-image1:latest \
    --build-arg AWS_DEFAULT_REGION=us-east-1 \
    --build-arg AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE \
    --build-arg AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
    

    This creates a container image containing the existing Lambda layer and extension files. This can be pushed to ECR and used in a function.

    Build a container image from a Lambda layer

    You can repackage and publish Lambda layer file content as container images. Creating separate container images for different layers allows you to add them to multiple functions, and share them in a similar way as Lambda layers.

    You can create a separate container image containing the files from a single layer, or combine the files from multiple layers into a single image. If you create separate container images for layer files, you then add these images into your function image.

    There are two ways to manage language code dependencies. You can pre-build the dependencies and copy the files into the container image, or build the dependencies during docker build.

    In this example, I migrate an existing Python application. This comprises a Lambda function and extension, from a .zip archive to separate function and extension container images. The extension writes logs to S3.

    You can choose how to store images in repositories. You can either push both images to the same ECR repository with different image tags, or push to different repositories. In this example, I use separate ECR repositories.

    To set up the example, visit the GitHub repo and follow the instructions in the README.md file.

    The existing example extension uses a makefile to install boto3 using pip install with a requirements.txt file. This is migrated to the docker build process. I must add a Python runtime to be able to run pip install as part of the build process. I use python:3.8-alpine as a minimal base image.

    I create separate Dockerfiles for the function and extension. The extension Dockerfile contains the following lines.

    FROM python:3.8-alpine AS installer
    #Layer Code
    COPY extensionssrc /opt/
    COPY extensionssrc/requirements.txt /opt/
    RUN pip install -r /opt/requirements.txt -t /opt/extensions/lib
    
    FROM scratch AS base
    WORKDIR /opt/extensions
    COPY --from=installer /opt/extensions .
    

    I build, tag, login, and push the extension container image to an existing ECR repository.

    docker build -t log-extension-image:latest  .
    docker tag log-extension-image:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-image:latest
    aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 123456789012.dkr.ecr.us-east-1.amazonaws.com
    docker push 123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-image:latest
    

    The function Dockerfile contains the following lines, which add the files from the previously created extension image to the function image. There is no need to run pip install for the function as it does not require any additional dependencies.

    FROM 123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-image:latest AS layer
    FROM public.ecr.aws/lambda/python:3.8
    # Layer code
    WORKDIR /opt
    COPY --from=layer /opt/ .
    # Function code
    WORKDIR /var/task
    COPY app.py .
    CMD ["app.lambda_handler"]
    

    I build, tag, and push the function container image to a separate existing ECR repository. This creates an immutable image of the Lambda function.

    docker build -t log-extension-function:latest  .
    docker tag log-extension-function:latest 123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-function:latest
    docker push 123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-function:latest
    

    The function requires a unique S3 bucket to store the logs files, which I create in the S3 console. I create a Lambda function from the ECR repository image, and specify the bucket name as a Lambda environment variable.

    aws lambda create-function --region us-east-1  --function-name log-extension-function \
    --package-type Image --code ImageUri=123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-function:latest \
    --role "arn:aws:iam:: 123456789012:role/lambda-role" \
    --environment  "Variables": {"S3_BUCKET_NAME": "s3-logs-extension-demo-logextensionsbucket-us-east-1"}
    

    For subsequent extension code changes, I need to update both the extension and function images. If only the function code changes, I need to update the function image. I push the function image as the :latest image to ECR. I then update the function code deployment to use the updated :latest ECR image.

    aws lambda update-function-code --function-name log-extension-function --image-uri 123456789012.dkr.ecr.us-east-1.amazonaws.com/log-extension-function:latest

    Using custom runtimes with container images

    With .zip archive functions, custom runtimes are added using Lambda layers. With container images, you no longer need to copy in Lambda layer code for custom runtimes.

    You can build your own custom runtime images starting with AWS provided base images for custom runtimes. You can add your preferred runtime, dependencies, and code to these images. To communicate with Lambda, the image must implement the Lambda Runtime API. We provide Lambda runtime interface clients for all supported runtimes, or you can implement your own for additional runtimes.

    Running extensions in container images

    A Lambda extension running in a function packaged as a container image works in the same way as a .zip archive function. You build a function container image including the extension files, or adding an extension image layer. Lambda looks for any external extensions in the /opt/extensions directory and starts initializing them. Extensions must be executable as binaries or scripts.

    Internal extensions modify the Lambda runtime startup behavior using language-specific environment variables, or wrapper scripts. For language-specific environment variables, you can set the following environment variables in your function configuration to augment the runtime command line.

    • JAVA_TOOL_OPTIONS (Java Corretto 8 and 11)
    • NODE_OPTIONS (Node.js 10 and 12)
    • DOTNET_STARTUP_HOOKS (.NET Core 3.1)

    An example Lambda environment variable for JAVA_TOOL_OPTIONS:

    -javaagent:"/opt/ExampleAgent-0.0.jar"

    Wrapper scripts delegate the runtime start-up to a script. The script can inject and alter arguments, set environment variables, or capture metrics, errors, and other diagnostic information. The following runtimes support wrapper scripts: Node.js 10 and 12, Python 3.8, Ruby 2.7, Java 8 and 11, and .NET Core 3.1

    You specify the script by setting the value of the AWS_LAMBDA_EXEC_WRAPPER environment variable as the file system path of an executable binary or script, for example:

    /opt/wrapper_script

    Conclusion

    You can now package and deploy Lambda functions as container images in addition to .zip archives. Lambda functions packaged as container images do not directly support adding Lambda layers to the function configuration as .zip archives do.

    In this post, I show a number of solutions to use the functionality of Lambda layers and extensions with container images, including example Dockerfiles.

    I show how to migrate an existing Lambda function and extension from a .zip archive to separate function and extension container images. Follow the instructions in the README.md file in the GitHub repository.

    For more serverless learning resources, visit https://serverlessland.com.

    Amazon S3 Update – Strong Read-After-Write Consistency

    Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/amazon-s3-update-strong-read-after-write-consistency/

    When we launched S3 back in 2006, I discussed its virtually unlimited capacity (“…easily store any number of blocks…”), the fact that it was designed to provide 99.99% availability, and that it offered durable storage, with data transparently stored in multiple locations. Since that launch, our customers have used S3 in an amazing diverse set of ways: backup and restore, data archiving, enterprise applications, web sites, big data, and (at last count) over 10,000 data lakes.

    One of the more interesting (and sometimes a bit confusing) aspects of S3 and other large-scale distributed systems is commonly known as eventual consistency. In a nutshell, after a call to an S3 API function such as PUT that stores or modifies data, there’s a small time window where the data has been accepted and durably stored, but not yet visible to all GET or LIST requests. Here’s how I see it:

    This aspect of S3 can become very challenging for big data workloads (many of which use Amazon EMR) and for data lakes, both of which require access to the most recent data immediately after a write. To help customers run big data workloads in the cloud, Amazon EMR built EMRFS Consistent View and open source Hadoop developers built S3Guard, which provided a layer of strong consistency for these applications.

    S3 is Now Strongly Consistent
    After that overly-long introduction, I am ready to share some good news!

    Effective immediately, all S3 GET, PUT, and LIST operations, as well as operations that change object tags, ACLs, or metadata, are now strongly consistent. What you write is what you will read, and the results of a LIST will be an accurate reflection of what’s in the bucket. This applies to all existing and new S3 objects, works in all regions, and is available to you at no extra charge! There’s no impact on performance, you can update an object hundreds of times per second if you’d like, and there are no global dependencies.

    This improvement is great for data lakes, but other types of applications will also benefit. Because S3 now has strong consistency, migration of on-premises workloads and storage to AWS should now be easier than ever before.

    We’ve been working with the Amazon EMR team and developers in the open-source community to ensure that customers can take advantage of this update with their big data workloads. As a result of that you no longer need to use EMRFS Consistent View or S3Guard, further reducing the cost to run big data workloads in AWS.

    A Word From Dropbox
    Long-time AWS customer Dropbox recently migrated a 34 PB analytics data lake from on-premises Hadoop clusters to S3. Watch this video to learn more about strong consistency and how it has allowed Dropbox to simplify their data lake:

    Jeff;

     

     

    New – Amazon S3 Replication Adds Support for Multiple Destination Buckets

    Post Syndicated from Marcia Villalba original https://aws.amazon.com/blogs/aws/new-amazon-s3-replication-adds-support-for-multiple-destination-buckets/

    Amazon Simple Storage Service (S3) supports many types of replication, including S3 Same-Region Replication (SRR), which launched in 2019 and S3 Cross-Region Replication (CRR), which has been around since 2015. Today, we are happy to announce S3 Replication support for multiple destination buckets. S3 Replication now gives you the ability to replicate data from one source bucket to multiple destination buckets. With S3 Replication (multi-destination) you can replicate data in the same AWS Regions using S3 SRR or across different AWS Regions by using S3 CRR, or a combination of both.

    Before this launch, if you needed to have multiple copies of your data in different S3 buckets, you had to build your own S3 replication service by monitoring S3 events, identifying created objects, and using AWS Lambda functions to copy objects to each destination bucket.

    This launch removes the need for you to develop your own solutions to replicate the data across multiple destinations. You can use the flexibility of S3 Replication (multi-destination) to store multiple copies of your data in different storage classes, with different encryption types, or across different accounts depending on its intended use. Additionally, when replicating to multiple destinations, you can use CloudWatch metrics to track replication progress for each region pair.

    S3 Replication (multi-destination) is an extension to S3 Replication, and it supports all existing S3 Replication features like Replication Time Control (RTC) and delete marker replication. If you need a predictable replication time backed by a Service Level Agreement, you can use RTC to replicate objects in less than 15 minutes.

    How to Get Started With S3 Replication (multi-destination)
    In order to get S3 Replication working, all the buckets involved in the replication (source and destinations) must have bucket versioning enabled.

    To setup S3 Replication (multi-destination), you need to define replication rules. You can create a new rule in the bucket Management page, under Replication Rules.

    Screenshot of adding a rule

    When creating a new replication rule, one very important step is to set up permissions for replication, as S3 will need to replicate objects on your behalf. To do that, you can follow the instructions available in the S3 documentation page.

    To create the replication rule, just follow the steps in the console. You can specify to which objects of the bucket this rule applies, the destination bucket, if you want to change the storage class of the replicated objects and many other preferences for your replicated objects.

    Screenshot configuring the replication rule

    One thing to have in mind when activating a rule is that the replication will start for all new objects added to the bucket from that moment. Objects uploaded to the bucket before the rule was created need to be copied using one time operations like S3 batch operations or S3 copy.

    If you want to monitor the progress of your replication using CloudWatch metrics, don’t forget to click the Replication metrics and notifications checkbox.

    Screenshot of configuring replication rules metrics

    Now that we support multiple destinations for replication, rule priorities are used when there are two or more rules with the same destination. When that happens, the rule with the highest priority will be applied. For the same destination bucket, a lower priority rule will not be applied when the replication configuration has two or more rules with overlapping scope. If there are two or more rules with the same scope and different destinations, both rules will be applied.

    You can see a summary of all your rules in the Replication rules listing under the bucket Management page.

    Screenshot of replication rules listing

    Monitoring Replication
    When you have all the rules configured, you can start uploading objects to the source bucket and monitor how they get replicated in all the different destinations.

    To know the replication status of an object in the source bucket, you can see the Replication status in the object Details. The status types are:

    • COMPLETED: The replication was successful in all the destinations.
    • PENDING: The replication is still in progress.
    • FAILED: The replication failed to replicate in at least one of the destinations. When there is a failure in replication, the only way to fix it is by uploading the object again.

    screenshot of object metadata

    For replicated objects, you will see the REPLICA status under the Replication status.

    You can also use CloudWatch metrics to monitor the replication. First, you need to enable metrics for each of the rules. And then in the bucket Metrics, you can choose which rules you want to see the metrics of and see the charts for each of them; the metrics are also available in the CloudWatch console.

    Screenshot of replication metrics

    Availability
    S3 Replication (multi-destination) is available today in all AWS Regions. To get started, you can use the AWS Management Console, SDKs, S3 API, or AWS CloudFormation to create replication rules from one source bucket to multiple destination buckets.

    Pricing for S3 Replication (multi-destination) applies for each rule. For pricing information, please visit the Amazon S3 pricing page.

    For more information about this new feature visit the S3 Replication page.

    Marcia

     

    Serving Content Using a Fully Managed Reverse Proxy Architecture in AWS

    Post Syndicated from Leonardo Machado original https://aws.amazon.com/blogs/architecture/serving-content-using-fully-managed-reverse-proxy-architecture/

    With the trends to autonomous teams and microservice style architectures, web frontend tiers are challenged to become more flexible and integrate different components with independent architectures and technology stacks. Two scenarios are prominent:

    • Micro-Frontends, where there is a single page application and components within this page are owned by different teams
    • Web portals, where there is a landing page and subsections of the presence are owned by different teams. In the following we will refer to these as components as well.

    What these scenarios have in common is that they consist of loosely coupled components that are seamlessly hidden to the end user behind a common interface. Often, a reverse proxy serves content from one single entry domain but retrieves the content from different origins. In the example in Figure 1 (below) we want to address one specific domain name, and depending on the path prefix, we retrieve the content from an on-premises webserver, from a webserver running on Amazon Elastic Cloud Compute (EC2), or from Amazon S3 Static Hosting, in the figure represented by the prefixes /hotels, /pets, and /cars, respectively. If we forward the path to the webserver without the path prefix, the component would not know what prefix it is run under and the prefix could be changed any time without impacting the component, thus making the component context-unaware.

    Figure 1 - Architecture, AWS Amplify Console

    Figure 1: Architecture, AWS Amplify Console

    Some common requirements to these approaches are:

    • Components should be technology-agnostic, each component should be able to choose the technology stack independently.
    • Each component can be maintained by a dedicated autonomous team without depending on other teams.
    • All components are served from the same domain name. For example, this could have implications on search engine optimization.
    • Components should be unaware of the context where it is used.

    The traditional approach would be to run a reverse proxy tier with rewrite rules to different origins. In this post we look into managed alternatives in AWS that take away the heavy lifting of running and scaling the proxy infrastructure.

    Note: AWS Application Load Balancer can be used as a reverse proxy, but it only supports static targets (fixed IP address), no dynamic targets (domain name). Thus, we do not consider it here.

    AWS Amplify Console

    The AWS Amplify Console provides a Git-based workflow for hosting fullstack serverless web apps with continuous deployment. Amplify Console also offers a rewrites and redirects feature, which can be used for forwarding incoming requests with different path patterns to different origins (see Figure 2).

    Figure 2 - Dashboard, AWS Amplify Console (rewrites and redirects feature)

    Figure 2: Dashboard, AWS Amplify Console (rewrites and redirects feature)

    Note: In Figure 2, <*> stands for a wildcard that matches any pattern. Target addresses must be HTTPS (no HTTP allowed).

    This architectural option is the simplest to setup and manage and is the best approach for teams looking for the least management effort. AWS Amplify Console offers a simple interface for easily mapping incoming patterns to target addresses. It also makes it easy to serve additional static content if needed. Configuration options are limited and more complex scenarios cannot be implemented.

    If you want to rewrite paths to remove the path prefix, you can accomplish this by using the wildcard pattern. The source address would contain the path prefix, but the target address would omit the prefix as seen in Figure 2.

    When looking at pricing compared to the other approaches it is important to look at the outgoing traffic. With higher volumes, this can get expensive.

    Amazon API Gateway

    Amazon API Gateway is a fully managed service that makes it easy for developers to create, publish, maintain, monitor, and secure APIs at any scale. API Gateway’s REST API type allows users to setup HTTP proxy integrations, which can be used for forwarding incoming requests with different path patterns to different origin servers according to the API specifications (Figure 3).

    Figure 3 - Dashboard, Amazon API Gateway (HTTP proxy integration)

    Figure 3: Dashboard, Amazon API Gateway (HTTP proxy integration)

    Note: In Figure 3, {proxy+} and {proxy} stand for the same wildcard pattern.

    API Gateway, in comparison to Amplify Console, is better suited when looking for a higher customization degree. API Gateway offers multiple customization and monitoring features, such as custom gateway responses and dashboard monitoring.

    Similar to Amplify Console, API Gateway provides a feature to rewrite paths and thus remove context from the path using the {proxy} wildcard.

    API Gateway REST API pricing is based on the number of API calls as well as any external data transfers. External data transfers are charged at the EC2 data transfer rate.

    Note: The HTTP integration type in API Gateway REST APIs does not support forwarding trailing slashes. If this is needed for your application, consider other integration types such as AWS Lambda integration or AWS service integration.

    Amazon CloudFront and AWS [email protected]

    Amazon CloudFront is a fast content delivery network (CDN) service that securely delivers data, videos, applications, and APIs to customers globally with low latency and high transfer speeds. CloudFront is able to route incoming requests with different path patterns to different origins or origin groups by configuring its cache behavior rules (Figure 4).

    Figure 4 - Dashboard, CloudFront (Cache Behavior)

    Figure 4: Dashboard, CloudFront (Cache Behavior)

    Additionally, Amazon CloudFront allows for integration with AWS [email protected] functions. [email protected] runs your code in response to events generated by CloudFront. In this scenario we can use [email protected] to change the path pattern before forwarding a request to the origin and thus removing the context. For details on see this detailed re:Invent session.

    This approach offers most control over caching behavior and customization. Being able to add your own custom code through a custom Lambda function adds an entire new range of possibilities when processing your request. This enables you to do everything from simple HTTP request and response processing at the edge to more advanced functionality, such as website security, real-time image transformation, intelligent bot mitigation, and search engine optimization.

    Amazon CloudFront is charged by request and by [email protected] invocation. The data traffic out is charged with the CloudFront regional data transfer out pricing.

    Conclusion

    With AWS Amplify Console, Amazon API Gateway, and Amazon CloudFront, we have seen three approaches to implement a reverse proxy pattern using managed services from AWS. The easiest approach to start with is AWS Amplify Console. If you run into more complex scenarios consider API Gateway. For most flexibility and when data traffic cost becomes a factor look into Amazon CloudFront with [email protected]

    Fast and Cost-Effective Image Manipulation with Serverless Image Handler

    Post Syndicated from Ajay Swamy original https://aws.amazon.com/blogs/architecture/fast-and-cost-effective-image-manipulation-with-serverless-image-handler/

    As a modern company, you most likely have both a web-based and mobile app platform to provide content to customers who view it on a range of devices. This means you need to store multiple versions of images, depending on the device. The resulting image management can be a headache as it can be expensive and cumbersome to manage.

    Serverless Image Handler (SIH) is an AWS Solution Implementation you use to store a single version of every image featured in your content, while dynamically delivering different versions at runtime based on your end user’s device. The solution simplifies code, saves on storage costs, and is ideal for use with web applications and mobile apps. SIH features include the ability to resize images, change background colors, apply formatting, and add watermarks.

    Architecture overview

    The SIH solution utilizes an AWS CloudFormation template to deploy the solution within minutes, and it’s for those of you who have multiple image assets needing an option to dynamically change or manipulate customer-facing images. SIH deploys best-in-class AWS services such as Amazon CloudFront, Amazon API Gateway, and AWS Lambda functions, and it connects to your Amazon Simple Storage Service (Amazon S3) bucket for storage.

    Deploying this solution with the default parameters builds the following environment in AWS Cloud:

    SIH: Emvironment in AWS Cloud-2

    SIH uses the following AWS services:

    • Amazon CloudFront to quickly and securely  deliver images to your end users at scale
    • AWS Lambda to run code for image manipulation without the need for provisioning or managing servers (thereby reducing costs and overhead)
    • Your Amazon S3 bucket for storage of your image assets
    • AWS Secrets Manager to support the signing of image URLs so that image access is protected

    How does Serverless Image Handler work?

    When an HTTP request is received from a customer device, it is passed from CloudFront to API Gateway, and then forwarded to the Lambda function for processing. If the image is cached by CloudFront because of an earlier request, CloudFront will return the cached image instead of forwarding the request to the API Gateway. This reduces latency and eliminates the cost of reprocessing the image.

    Requests that are not cached are passed to the API Gateway, and the entire request is forwarded to the Lambda function. The Lambda function retrieves the original image from your Amazon S3 bucket and uses Sharp (the open source image processing software) to return a modified version of the image to the API Gateway. SIH also utilizes Thumbor to apply dynamic filters on the fly. Additionally, the solution generates a CloudFront domain name that supports caching in CloudFront. The newly manipulated image is now cached at CloudFront for easy access and retrieval. The end-to-end request and response can be secured by using the solution’s signed URL feature via AWS Secrets Manager, which allows you to prevent unauthorized use of your proprietary images.

    Lastly, SIH uses Amazon Rekognition for face detection in images submitted for smart cropping, allowing for easy cropping for specific content and image needs.

    Code example of image manipulation

    Please refer to the SIH implementation guide to quickly set up and use SIH. Using Node.js, you can create an image request as illustrated below. The code block specifies the image location as myImageBucket and specifies edits of grayscale :true to change the image to grayscale.

    const imageRequest = JSON.stringify({
        bucket: “myImageBucket”,
        key: “myImage.jpg”,
        edits: {
            grayscale: true
        }
    });
    
    const url = `${CloudFrontUrl}/${Buffer.from(imageRequest).toString(‘base64’)}`;
    

    With the generated URL, SIH can serve the grayscale image.

    Conclusion

    If you’re looking for a fast and cost-effective solution for image management, Serverless Image Handler provides a great way to manipulate and serve images on the fly with speed and security. Learn more about SIH and watch the accompanying Solving with AWS Solutions video below.

    Introducing Amazon S3 Storage Lens – Organization-wide Visibility Into Object Storage

    Post Syndicated from Martin Beeby original https://aws.amazon.com/blogs/aws/s3-storage-lens/

    When starting out in the cloud, a customer’s storage requirements might consist of a handful of S3 buckets, but as they grow, migrate more applications and realize the power of the cloud, things can become more complicated. A customer may have tens or even hundreds of accounts and have multiple S3 buckets across numerous AWS Regions. Customers managing these sorts of environments have told us that they find it difficult to understand how storage is used across their organization, optimize their costs, and improve security posture.

    Drawing from more than 14 years of experience helping customers optimize their storage, the S3 team has built a new feature called Amazon S3 Storage Lens. This is the first cloud storage analytics solution to give you organization-wide visibility into object storage, with point-in-time metrics and trend lines as well as actionable recommendations. All these things combined will help you discover anomalies, identify cost efficiencies and apply data protection best practices.

    With S3 Storage Lens , you can understand, analyze, and optimize storage with 29+ usage and activity metrics and interactive dashboards to aggregate data for your entire organization, specific accounts, regions, buckets, or prefixes. All of this data is accessible in the S3 Management Console or as raw data in an S3 bucket.

    Every Customer Gets a Default Dashboard

    S3 Storage Lens includes an interactive dashboard which you can find in the S3 console. The dashboard gives you the ability to perform filtering and drill-down into your metrics to really understand how your storage is being used. The metrics are organized into categories like data protection and cost efficiency, to allow you to easily find relevant metrics.

    For ease of use all customers receive a default dashboard. If you are like many customers, this maybe the only dashboard that you need, but if you want to, you can make changes. For example, you could configure the dashboard to export the data daily to an S3 bucket for analysis with another tool (Amazon QuickSight, Amazon Athena, Amazon Redshift, etc.) or you could upgrade to receive advanced metrics and recommendations.

    Creating a Dashboard
    You can also create your own dashboards from scratch, to do this I head over to the S3 console and click on the Dashboards menu item inside the Storage Lens section. Secondly, I click the Create dashboard button.

    Screenshot of the console

    I give my dashboard the name s3-lens-demo and select a home Region. The home Region is where the metrics data for your dashboard will be stored. I choose to enable the dashboard, meaning that it will be updated daily with new metrics.

    A dashboard can analyze storage across accounts, Regions, buckets, and prefix. I choose to include buckets from all accounts in my organization and across all regions in the Dashboard scope section.

    S3 Storage Lens has two tiers: Free Metrics, which is free of charge, automatically available for all S3 customers and contains 15 usage related metrics; and Advanced metrics and recommendations, which has an additional charge, but includes all 29 usage and activity metrics with 15-month data retention, and contextual recommendations. For this demo, I select Advanced metrics and recommendations.

    Screenshot of Management Console

    Finally, I can configure the dashboard metrics to be exported daily to a specific S3 bucket. The metrics can be exported to either CSV or Apache Parquet format for further analysis outside of the console.

    An alert pops up to tell me that my dashboard has been created, but it can take up to 48 hours to generate my initial metrics.

    What does a Dashboard Show?

    Once my dashboard has been created, I can start to explore the data. I can filter by Accounts, Regions, Storage classes, Buckets, and Prefixes at the top of the dashboard.

    The next section is a snapshot of the metrics such as the Total storage and Object count, and I can see a trendline that shows the trend on each metric over the last 30 days and a percentage change. The number in the % change column shows by default the Day/day percentage change, but I can select to compare by Week/week or Month/month.

    I can toggle between different Metric groups by selecting either Summary, Cost efficiency, Data protection, or Activity.

    There are some metrics here that are pretty typical like total storage and object counts, and you can already receive these in a few places in the S3 console and in Amazon CloudWatch – but in S3 Storage Lens you can receive these metrics in aggregate across your organization or account, or at the prefix level, which was not previously possible.

    There are some other metrics you might not expect, like metrics that pertain to S3 feature utilization. For example we can break out the % of objects that are using encryption, or the number of objects that are non-current versions. These metrics help you understand how your storage is configured, and allows you to identify discrepancies, and then drill in for details.

    The dashboard provides contextual recommendations alongside your metrics to indicate actions you can take based on the metric, for example ways to improve cost efficiency, or apply data protection best practices. Any recommendations are shown in the Recommendation column. A few days ago I took the screenshot below which shows a recommendation on one of my dashboards that suggests I should check my buckets’ default encryption configuration.

    The dashboard trends and distribution section allows me to compare two metrics over time in more detail. Here I have selected Total storage as my Primary metric and Object Count as my Secondary metric.

    These two metrics are now plotted on a graph, and I can select a date range to view the trend over time.

    The dashboard also shows me those two metrics and how they are distributed across Storage class and Regions.

    I can click on any value in this graph and Drill down to filter the entire dashboard on that value, or select Anayze by to navigate to a new dashboard view for that dimension.

    The last section of the dashboard allows me to perform a Top N analysis of a metric over a date range, where N is between 1 and 25. In the example below, I have selected the top 3 items in descending order for the Total storage metric.

    I can then see the top three accounts (note: there are only two accounts in my organization) and the Total storage metric for each account.

    It also shows the top 3 regions for the Total storage metric, and I can see that 51.15% of my data is stored in US East (N. Virginia)

    Lastly, the dashboard contains information about the top 3 buckets and prefixes and the associated trends.

    As I have shown, S3 Storage Lens delivers more than 29 individual metrics on S3 storage usage and activity for all accounts in your organization. These metrics are available in the S3 console to visualize storage usage and activity trends in a dashboard, with contextual recommendations that make it easy to take immediate action. In addition to the dashboard in the S3 console, you can export metrics in CSV or Parquet format to an S3 bucket of your choice for further analysis with other tools including Amazon QuickSight, Amazon Athena, or Amazon Redshift to name a few.

    Video Walkthrough

    If you would like a more indepth look at S3 Storage Lens the team have recorded the following video to explain how this new feature works.

    Available Now

    S3 Storage Lens is available in all commercial AWS Regions. You can use S3 Storage Lens with the Amazon S3 API, CLI, or in the S3 Console. For pricing information, regarding S3 Storage Lens advanced metrics and recommendations, check out the Amazon S3 pricing page. If you’d like to dive a little deeper, then you should check out the documentation or the S3 Storage Lens webpage.

    Happy Storing

    — Martin

     

    Creating a source to Lakehouse data replication pipe using Apache Hudi, AWS Glue, AWS DMS, and Amazon Redshift

    Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/creating-a-source-to-lakehouse-data-replication-pipe-using-apache-hudi-aws-glue-aws-dms-and-amazon-redshift/

    Most customers have their applications backed by various sql and nosql systems on prem and on cloud. Since the data is in various independent systems, customers struggle to derive meaningful info by combining data from all of these sources. Hence, customers create data lakes to bring their data in a single place.

    Typically, a replication tool such as AWS Database Migration Service (AWS DMS) can replicate the data from your source systems to Amazon Simple Storage Service (Amazon S3). When the data is in Amazon S3, customers process it based on their requirements. A typical requirement is to sync the data in Amazon S3 with the updates on the source systems. Although it’s easy to apply updates on a relational database management system (RDBMS) that backs an online source application, it’s tough to apply this change data capture (CDC) process on your data lakes. Apache Hudi is a good way to solve this problem. Currently, you can use Hudi on Amazon EMR to create Hudi tables.

    In this post, we use Apache Hudi to create tables in the AWS Glue Data Catalog using AWS Glue jobs. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. This post enables you to take advantage of the serverless architecture of AWS Glue while upserting data in your data lake, hassle-free.

    To write to Hudi tables using AWS Glue jobs, we use a JAR file created using open-source Apache Hudi. This JAR file is used as a dependency in the AWS Glue jobs created through the AWS CloudFormation template provided in this post. Steps to create the JAR file are included in the appendix.

    The following diagram illustrates the architecture the CloudFormation template implements.

    Prerequisites

    The CloudFormation template requires you to select an Amazon Elastic Compute Cloud (Amazon EC2) key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to get to the Aurora cluster that lives in the private subnet. Make sure you have a key in the Region where you deploy the template. If you don’t have one, you can create a new key pair.

    Solution overview

    The following are the high-level implementation steps:

    1. Create a CloudFormation stack using the provided template.
    2. Connect to the Amazon Aurora cluster used as a source for this post.
    3. Run InitLoad_TestStep1.sql, in the source Amazon Aurora cluster, to create a schema and a table.

    AWS DMS replicates the data from the Aurora cluster to the raw S3 bucket. AWS DMS supports a variety of sources.
    The CloudFormation stack creates an AWS Glue job (HudiJob) that is scheduled to run at a frequency set in the ScheduleToRunGlueJob parameter of the CloudFormation stack. This job reads the data from the raw S3 bucket, writes to the Curated S3 bucket, and creates a Hudi table in the Data Catalog. The job also creates an Amazon Redshift external schema in the Amazon Redshift cluster created by the CloudFormation stack.

    1. You can now query the Hudi table in Amazon Athena or Amazon Redshift. Visit Creating external tables for data managed in Apache Hudi or Considerations and Limitations to query Apache Hudi datasets in Amazon Athena for details.
    2. Run IncrementalUpdatesAndInserts_TestStep2.sql on the source Aurora cluster.

    This incremental data is also replicated to the raw S3 bucket through AWS DMS. HudiJob picks up the incremental data, using AWS Glue bookmarks, and applies it to the Hudi table created earlier.

    1. You can now query the changed data.

    Creating your CloudFormation stack

    Click on the Launch Stack button to get started and provide the following parameters:

    Parameter Description
    VpcCIDR CIDR range for the VPC.
    PrivateSubnet1CIDR CIDR range for the first private subnet.
    PrivateSubnet2CIDR CIDR range for the second private subnet.
    PublicSubnetCIDR CIDR range for the public subnet.
    AuroraDBMasterUserPassword Primary user password for the Aurora cluster.
    RedshiftDWMasterUserPassword Primary user password for the Amazon Redshift data warehouse.
    KeyName The EC2 key pair to be configured in the EC2 instance on the public subnet. This EC2 instance is used to get to the Aurora cluster in the private subnet. Select the value from the dropdown.
    ClientIPCIDR Your IP address in CIDR notation. The CloudFormation template creates a security group rule that grants ingress on port 22 to this IP address. On a Mac, you can run the following command to get your IP address: curl ipecho.net/plain ; echo /32
    EC2ImageId The image ID used to create the EC2 instance in the public subnet to be a jump box to connect to the source Aurora cluster. If you supply your image ID, the template uses it to create the EC2 instance.
    HudiStorageType This is used by the AWS Glue job to determine if you want to create a CoW or MoR storage type table. Enter MoR if you want to create MoR storage type tables.
    ScheduleToRunGlueJob The AWS Glue job runs on a schedule to pick the new files and load to the curated bucket. This parameter sets the schedule of the job.
    DMSBatchUnloadIntervalInSecs AWS DMS batches the inputs from the source and loads the output to the taw bucket. This parameter defines the frequency in which the data is loaded to the raw bucket.
    GlueJobDPUs The number of DPUs that are assigned to the two AWS Glue jobs.

    To simplify running the template, your account is given permissions on the key used to encrypt the resources in the CloudFormation template. You can restrict that to the role if desired.

    Granting Lake Formation permissions

    AWS Lake Formation enables customers to set up fine grained access control for their Datalake. Detail steps to set up AWS Lake Formation can be found here.

    Setting up AWS Lake Formation is out of scope for this post. However, if you have Lake Formation configured in the Region where you’re deploying this template, grant Create database permission to the LakeHouseExecuteGlueHudiJobRole role after the CloudFormation stack is successfully created.

    This will ensure that you don’t get the following error while running your AWS Glue job.

    org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Insufficient Lake Formation permission(s) on global_temp

    Similarly grant Describe permission to the LakeHouseExecuteGlueHudiJobRole role on default database.

    This will ensure that you don’t get the following error while running your AWS Glue job.

    AnalysisException: 'java.lang.RuntimeException: MetaException(message:Unable to verify existence of default database: com.amazonaws.services.glue.model.AccessDeniedException: Insufficient Lake Formation permission(s) on default (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException;

    Connecting to source Aurora cluster

    To connect to source Aurora cluster using SQL Workbench, complete the following steps:

    1. On SQL Workbench, under File, choose Connect window.

    1. Choose Manage Drivers.

    1. Choose PostgreSQL.
    2. For Library, use the driver JAR file.
    3. For Classname, enter org.postgresql.Driver.
    4. For Sample URL, enter jdbc:postgresql://host:port/name_of_database.

    1. Click the Create a new connection profile button.
    2. For Driver, choose your new PostgreSQL driver.
    3. For URL, enter lakehouse_source_db after port/.
    4. For Username, enter postgres.
    5. For Password, enter the same password that you used for the AuroraDBMasterUserPassword parameter while creating the CloudFormation stack.
    6. Choose SSH.
    7. On the Outputs tab of your CloudFormation stack, copy the IP address next to PublicIPOfEC2InstanceForTunnel and enter it for SSH hostname.
    8. For SSH port, enter 22.
    9. For Username, enter ec2-user.
    10. For Private key file, enter the private key for the public key chosen in the KeyName parameter of the CloudFormation stack.
    11. For Local port, enter any available local port number.
    12. On the Outputs tab of your stack, copy the value next to EndpointOfAuroraCluster and enter it for DB hostname.
    13. For DB port, enter 5432.
    14. Select Rewrite JDBC URL.


    Checking the Rewrite JDBC URL checkbox will automatically feed in the value of host and port in the URL text box as shown below.

    1. Test the connection and make sure that you get a message that the connection was successful.

     

    Troubleshooting

    Complete the following steps if you receive this message: Could not initialize SSH tunnel: java.net.ConnectException: Operation timed out (Connection timed out)

    1. Go to your CloudFormation stack and search for LakeHouseSecurityGroup under Resources .
    2. Choose the link in the Physical ID.

    1. Select your security group.
    2. From the Actions menu, choose Edit inbound rules.

    1. Look for the rule with the description:Rule to allow connection from the SQL client to the EC2 instance used as jump box for SSH tunnel
    2. From the Source menu, choose My IP.
    3. Choose Save rules.

    1. Test the connection from your SQL Workbench again and make sure that you get a successful message.

    Running the initial load script

    You’re now ready to run the InitLoad_TestStep1.sql script to create some test data.

    1. Open InitLoad_TestStep1.sql in your SQL client and run it.

    The output shows that 11 statements have been run.

    AWS DMS replicates these inserts to your raw S3 bucket at the frequency set in the DMSBatchUnloadIntervalInSecs parameter of your CloudFormation stack.

    1. On the AWS DMS console, choose the lakehouse-aurora-src-to-raw-s3-tgt task:
    2. On the Table statistics tab, you should see the seven full load rows of employee_details have been replicated.

    The lakehouse-aurora-src-to-raw-s3-tgt replication task has the following table mapping with transformation to add a schema name and a table name as additional columns:

    {
       "rules":[
          {
             "rule-type":"selection",
             "rule-id":"1",
             "rule-name":"1",
             "object-locator":{
                "schema-name":"human_resources",
                "table-name":"%"
             },
             "rule-action":"include",
             "filters":[
                
             ]
          },
          {
             "rule-type":"transformation",
             "rule-id":"2",
             "rule-name":"2",
             "rule-target":"column",
             "object-locator":{
                "schema-name":"%",
                "table-name":"%"
             },
             "rule-action":"add-column",
             "value":"schema_name",
             "expression":"$SCHEMA_NAME_VAR",
             "data-type":{
                "type":"string",
                "length":50
             }
          },
          {
             "rule-type":"transformation",
             "rule-id":"3",
             "rule-name":"3",
             "rule-target":"column",
             "object-locator":{
                "schema-name":"%",
                "table-name":"%"
             },
             "rule-action":"add-column",
             "value":"table_name",
             "expression":"$TABLE_NAME_VAR",
             "data-type":{
                "type":"string",
                "length":50
             }
          }
       ]
    }

    These settings put the name of the source schema and table as two additional columns in the output Parquet file of AWS DMS.
    These columns are used in the AWS Glue HudiJob to find out the tables that have new inserts, updates, or deletes.

    1. On the Resources tab of the CloudFormation stack, locate RawS3Bucket.
    2. Choose the Physical ID link.

    1. Navigate to human_resources/employee_details.

    The LOAD00000001.parquet file is created under human_resources/employee_details. (The name of your raw bucket is different from the following screenshot).

    You can also see the time of creation of this file. You should have at least one successful run of the AWS Glue job (HudiJob) after this time for the Hudi table to be created. The AWS Glue job is configured to load this data into the curated bucket at the frequency set in the ScheduleToRunGlueJob parameter of your CloudFormation stack. The default is 5 minutes.

    AWS Glue job HudiJob

    The following code is the script for HudiJob:

    import sys
    import os
    import json
    
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark.sql.functions import concat, col, lit, to_timestamp
    
    from awsglue.utils import getResolvedOptions
    from awsglue.context import GlueContext
    from awsglue.job import Job
    from awsglue.dynamicframe import DynamicFrame
    
    import boto3
    from botocore.exceptions import ClientError
    
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    spark = SparkSession.builder.config('spark.serializer','org.apache.spark.serializer.KryoSerializer').getOrCreate()
    glueContext = GlueContext(spark.sparkContext)
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    logger = glueContext.get_logger()
    
    logger.info('Initialization.')
    glueClient = boto3.client('glue')
    ssmClient = boto3.client('ssm')
    redshiftDataClient = boto3.client('redshift-data')
    
    logger.info('Fetching configuration.')
    region = os.environ['AWS_DEFAULT_REGION']
    
    curatedS3BucketName = ssmClient.get_parameter(Name='lakehouse-curated-s3-bucket-name')['Parameter']['Value']
    rawS3BucketName = ssmClient.get_parameter(Name='lakehouse-raw-s3-bucket-name')['Parameter']['Value']
    hudiStorageType = ssmClient.get_parameter(Name='lakehouse-hudi-storage-type')['Parameter']['Value']
    
    dropColumnList = ['db','table_name','Op']
    
    logger.info('Getting list of schema.tables that have changed.')
    changeTableListDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://'+rawS3BucketName], 'groupFiles': 'inPartition', 'recurse':True}, format = 'parquet', format_options={}, transformation_ctx = 'changeTableListDyf')
    
    logger.info('Processing starts.')
    if(changeTableListDyf.count() > 0):
        logger.info('Got new files to process.')
        changeTableList = changeTableListDyf.toDF().select('schema_name','table_name').distinct().rdd.map(lambda row : row.asDict()).collect()
    
        for dbName in set([d['schema_name'] for d in changeTableList]):
            spark.sql('CREATE DATABASE IF NOT EXISTS ' + dbName)
            redshiftDataClient.execute_statement(ClusterIdentifier='lakehouse-redshift-cluster', Database='lakehouse_dw', DbUser='rs_admin', Sql='CREATE EXTERNAL SCHEMA IF NOT EXISTS ' + dbName + ' FROM DATA CATALOG DATABASE \'' + dbName + '\' REGION \'' + region + '\' IAM_ROLE \'' + boto3.client('iam').get_role(RoleName='LakeHouseRedshiftGlueAccessRole')['Role']['Arn'] + '\'')
    
        for i in changeTableList:
            logger.info('Looping for ' + i['schema_name'] + '.' + i['table_name'])
            dbName = i['schema_name']
            tableNameCatalogCheck = ''
            tableName = i['table_name']
            if(hudiStorageType == 'MoR'):
                tableNameCatalogCheck = i['table_name'] + '_ro' #Assumption is that if _ro table exists then _rt table will also exist. Hence we are checking only for _ro.
            else:
                tableNameCatalogCheck = i['table_name'] #The default config in the CF template is CoW. So assumption is that if the user hasn't explicitly requested to create MoR storage type table then we will create CoW tables. Again, if the user overwrites the config with any value other than 'MoR' we will create CoW storage type tables.
            isTableExists = False
            isPrimaryKey = False
            isPartitionKey = False
            primaryKey = ''
            partitionKey = ''
            try:
                glueClient.get_table(DatabaseName=dbName,Name=tableNameCatalogCheck)
                isTableExists = True
                logger.info(dbName + '.' + tableNameCatalogCheck + ' exists.')
            except ClientError as e:
                if e.response['Error']['Code'] == 'EntityNotFoundException':
                    isTableExists = False
                    logger.info(dbName + '.' + tableNameCatalogCheck + ' does not exist. Table will be created.')
            try:
                table_config = json.loads(ssmClient.get_parameter(Name='lakehouse-table-' + dbName + '.' + tableName)['Parameter']['Value'])
                try:
                    primaryKey = table_config['primaryKey']
                    isPrimaryKey = True
                    logger.info('Primary key:' + primaryKey)
                except KeyError as e:
                    isPrimaryKey = False
                    logger.info('Primary key not found. An append only glueparquet table will be created.')
                try:
                    partitionKey = table_config['partitionKey']
                    isPartitionKey = True
                    logger.info('Partition key:' + partitionKey)
                except KeyError as e:
                    isPartitionKey = False
                    logger.info('Partition key not found. Partitions will not be created.')
            except ClientError as e:    
                if e.response['Error']['Code'] == 'ParameterNotFound':
                    isPrimaryKey = False
                    isPartitionKey = False
                    logger.info('Config for ' + dbName + '.' + tableName + ' not found in parameter store. Non partitioned append only table will be created.')
    
            inputDyf = glueContext.create_dynamic_frame_from_options(connection_type = 's3', connection_options = {'paths': ['s3://' + rawS3BucketName + '/' + dbName + '/' + tableName], 'groupFiles': 'none', 'recurse':True}, format = 'parquet',transformation_ctx = tableName)
            
            inputDf = inputDyf.toDF().withColumn('update_ts_dms',to_timestamp(col('update_ts_dms')))
            
            targetPath = 's3://' + curatedS3BucketName + '/' + dbName + '/' + tableName
    
            morConfig = {'hoodie.datasource.write.storage.type': 'MERGE_ON_READ', 'hoodie.compact.inline': 'false', 'hoodie.compact.inline.max.delta.commits': 20, 'hoodie.parquet.small.file.limit': 0}
    
            commonConfig = {'className' : 'org.apache.hudi', 'hoodie.datasource.hive_sync.use_jdbc':'false', 'hoodie.datasource.write.precombine.field': 'update_ts_dms', 'hoodie.datasource.write.recordkey.field': primaryKey, 'hoodie.table.name': tableName, 'hoodie.consistency.check.enabled': 'true', 'hoodie.datasource.hive_sync.database': dbName, 'hoodie.datasource.hive_sync.table': tableName, 'hoodie.datasource.hive_sync.enable': 'true'}
    
            partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey, 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor', 'hoodie.datasource.hive_sync.partition_fields': partitionKey}
                         
            unpartitionDataConfig = {'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.NonPartitionedExtractor', 'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.NonpartitionedKeyGenerator'}
            
            incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 20, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10}
            
            initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 3, 'hoodie.datasource.write.operation': 'bulk_insert'}
            
            deleteDataConfig = {'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload'}
    
            if(hudiStorageType == 'MoR'):
                commonConfig = {**commonConfig, **morConfig}
                logger.info('MoR config appended to commonConfig.')
            
            combinedConf = {}
    
            if(isPrimaryKey):
                logger.info('Going the Hudi way.')
                if(isTableExists):
                    logger.info('Incremental load.')
                    outputDf = inputDf.filter("Op != 'D'").drop(*dropColumnList)
                    if outputDf.count() > 0:
                        logger.info('Upserting data.')
                        if (isPartitionKey):
                            logger.info('Writing to partitioned Hudi table.')
                            outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                            combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}
                            outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                        else:
                            logger.info('Writing to unpartitioned Hudi table.')
                            combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig}
                            outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                    outputDf_deleted = inputDf.filter("Op = 'D'").drop(*dropColumnList)
                    if outputDf_deleted.count() > 0:
                        logger.info('Some data got deleted.')
                        if (isPartitionKey):
                            logger.info('Deleting from partitioned Hudi table.')
                            outputDf_deleted = outputDf_deleted.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                            combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig, **deleteDataConfig}
                            outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                        else:
                            logger.info('Deleting from unpartitioned Hudi table.')
                            combinedConf = {**commonConfig, **unpartitionDataConfig, **incrementalConfig, **deleteDataConfig}
                            outputDf_deleted.write.format('org.apache.hudi').options(**combinedConf).mode('Append').save(targetPath)
                else:
                    outputDf = inputDf.drop(*dropColumnList)
                    if outputDf.count() > 0:
                        logger.info('Inital load.')
                        if (isPartitionKey):
                            logger.info('Writing to partitioned Hudi table.')
                            outputDf = outputDf.withColumn(partitionKey,concat(lit(partitionKey+'='),col(partitionKey)))
                            combinedConf = {**commonConfig, **partitionDataConfig, **initLoadConfig}
                            outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
                        else:
                            logger.info('Writing to unpartitioned Hudi table.')
                            combinedConf = {**commonConfig, **unpartitionDataConfig, **initLoadConfig}
                            outputDf.write.format('org.apache.hudi').options(**combinedConf).mode('Overwrite').save(targetPath)
            else:
                if (isPartitionKey):
                    logger.info('Writing to partitioned glueparquet table.')
                    sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE', partitionKeys=[partitionKey])
                else:
                    logger.info('Writing to unpartitioned glueparquet table.')
                    sink = glueContext.getSink(connection_type = 's3', path= targetPath, enableUpdateCatalog = True, updateBehavior = 'UPDATE_IN_DATABASE')
                sink.setFormat('glueparquet')
                sink.setCatalogInfo(catalogDatabase = dbName, catalogTableName = tableName)
                outputDyf = DynamicFrame.fromDF(inputDf.drop(*dropColumnList), glueContext, 'outputDyf')
                sink.writeFrame(outputDyf)
    
    job.commit()

    Hudi tables need a primary key to perform upserts. Hudi tables can also be partitioned based on a certain key. We get the names of the primary key and the partition key from AWS Systems Manager Parameter Store.

    The HudiJob script looks for an AWS Systems Manager Parameter with the naming format lakehouse-table-<schema_name>.<table_name>. It compares the name of the parameter with the name of the schema and table columns, added by AWS DMS, to get the primary key and the partition key for the Hudi table.

    The CloudFormation template creates lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, as shown on the Resources tab.

    If you choose the Physical ID link, you can locate the value of the AWS Systems Manager Parameter. The AWS Systems Manager Parameter has {"primaryKey": "emp_no", "partitionKey": "department"} value in it.

    Because of the value in the lakehouse-table-human_resources.employee_details AWS Systems Manager Parameter, the AWS Glue script creates a human_resources.employee_details Hudi table partitioned on the department column for the employee_details table created in the source using the InitLoad_TestStep1.sql script. The HudiJob also uses the emp_no column as the primary key for upserts.

    If you reuse this CloudFormation template and create your own table, you have to create an associated AWS Systems Manager Parameter with the naming convention lakehouse-table-<schema_name>.<table_name>. Keep in mind the following:

    • If you don’t create a parameter, the script creates an unpartitioned glueparquet append-only table.
    • If you create a parameter that only has the primaryKey part in the value, the script creates an unpartitioned Hudi table.
    • If you create a parameter that only has the partitionKey part in the value, the script creates a partitioned glueparquet append-only table.

    If you have too many tables to replicate, you can also store the primary key and partition key configuration in Amazon DynamoDB or Amazon S3 and change the code accordingly.

    In the InitLoad_TestStep1.sql script, replica identity for human_resources.employee_details table is set to full. This makes sure that AWS DMS transfers the full delete record to Amazon S3. Having this delete record is important for the HudiJob script to delete the record from the Hudi table. A full delete record from AWS DMS for the human_resources.employee_details table looks like the following:

    { "Op": "D", "update_ts_dms": "2020-10-25 07:57:48.589284", "emp_no": 3, "name": "Jeff", "department": "Finance", "city": "Tokyo", "salary": 55000, "schema_name": "human_resources", "table_name": "employee_details"}

    The schema_name, and table_name columns are added by AWS DMS because of the task configuration shared previously.update_ts_dms has been set as the value for TimestampColumnName S3 setting in AWS DMS S3 Endpoint.Op is added by AWS DMS for cdc and it indicates source DB operations in migrated S3 data.

    We also set spark.serializer in the script. This setting is required for Hudi.

    In HudiJob script, you can also find a few Python dict that store various Hudi configuration properties. These configurations are just for demo purposes; you have to adjust them based on your workload. For more information about Hudi configurations, see Configurations.

    HudiJob is scheduled to run every 5 minutes by default. The frequency is set by the ScheduleToRunGlueJob parameter of the CloudFormation template. Make sure that you successfully run HudiJob at least one time after the source data lands in the raw S3 bucket. The screenshot in Step 6 of Running the initial load script section confirms that AWS DMS put the LOAD00000001.parquet file in the raw bucket at 11:54:41 AM and following screenshot confirms that the job execution started at 11:55 AM.

    The job creates a Hudi table in the AWS Glue Data Catalog (see the following screenshot). The table is partitioned on the department column.

    Granting AWS Lake Formation permissions

    If you have AWS Lake Formation enabled, make sure that you grant Select permission on the human_resources.employee_details table to the role/user used to run Athena query. Similarly, you also have to grant Select permission on the human_resources.employee_details table to the LakeHouseRedshiftGlueAccessRole role so you can query human_resources.employee_details in Amazon Redshift.

    Grant Drop permission on the human_resources database to LakeHouseExecuteLambdaFnsRole so that the template can delete the database when you delete the template. Also, the CloudFormation template does not roll back any AWS Lake Formation grants or changes that are manually applied.

    Granting access to KMS key

    The curated S3 bucket is encrypted by lakehouse-key, which is an AWS Key Management Service (AWS KMS) customer managed key created by AWS CloudFormation template.

    To run the query in Athena, you have to add the ARN of the role/user used to run the Athena query in the Allow use of the key section in the key policy.

    This will ensure that you don’t get com.amazonaws.services.s3.model.AmazonS3Exception: Access Denied (Service: Amazon S3; Status Code: 403; Error Code: AccessDenied; error while running your Athena query.

    You might not have to execute the above KMS policy change if you have kept the default of granting access to the AWS account and the role/user used to run Athena query has the necessary KMS related policies attached to it.

    Confirming job completion

    When HudiJob is complete, you can see the files in the curated bucket.

    1. On the Resources tab, search for CuratedS3Bucket.
    2. Choose the Physical ID link.

    The following screenshot shows the timestamp on the initial load.

    1. Navigate to the department=Finance prefix and select the Parquet file.
    2. Choose Select from.
    1. For File format, select Parquet.
    2. Choose Show file preview.

    You can see the value of the timestamp in the update_ts_dms column.

    Querying the Hudi table

    You can now query your data in Amazon Athena or Amazon Redshift.

    Querying in Amazon Athena

    Query the human_resources.employee_details table in Amazon Athena with the following code:

    SELECT emp_no,
             name,
             city,
             salary,
             department,
             from_unixtime(update_ts_dms/1000000,'America/Los_Angeles') update_ts_dms_LA,
             from_unixtime(update_ts_dms/1000000,'UTC') update_ts_dms_UTC         
    FROM "human_resources"."employee_details"
    ORDER BY emp_no

    The timestamp for all the records matches the timestamp in the update_ts_dms column in the earlier screenshot.

    Querying in Redshift Spectrum

    Read query your table in Redshift Spectrum for Apache Hudi support in Amazon Redshift.

    1. On the Amazon Redshift console, locate lakehouse-redshift-cluster.
    2. Choose Query cluster.

    1. For Database name, enter lakehouse_dw.
    2. For Database user, enter rs_admin.
    3. For Database password, enter the password that you used for the RedshiftDWMasterUserPassword parameter in the CloudFormation template.

    1. Enter the following query for the human_resources.employee_details table:
      SELECT emp_no,
               name,
               city,
               salary,
               department,
               (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' AT TIME ZONE 'america/los_angeles' update_ts_dms_LA,
               (TIMESTAMP 'epoch' + update_ts_dms/1000000 * interval '1 second') AT TIME ZONE 'utc' update_ts_dms_UTC
      FROM human_resources.employee_details
      ORDER BY emp_no 

    The following screenshot shows the query output.

    Running the incremental load script

    We now run the IncrementalUpdatesAndInserts_TestStep2.sql script. The output shows that 6 statements were run.

    AWS DMS now shows that it has replicated the new incremental changes. The changes are replicated at a frequency set in DMSBatchUnloadIntervalInSecs parameter of the CloudFormation stack.

    This creates another Parquet file in the raw S3 bucket.

    The incremental updates are loaded into the Hudi table according to the chosen frequency to run the job (the ScheduleToRunGlueJob parameter). The HudiJobscript uses job bookmarks to find out the incremental load so it only processes the new files brought in through AWS DMS.

    Confirming job completion

    Make sure that HudiJob runs successfully at least one time after the incremental file arrives in the raw bucket. The previous screenshot shows that the incremental file arrived in the raw bucket at 1:18:38 PM and the following screenshot shows that the job started at 1:20 PM.

    Querying the changed data

    You can now check the table in Athena and Amazon Redshift. Both results show that emp_no 3 is deleted, 8 and 9 have been added, and 2 and 5 have been updated.

    The following screenshot shows the results in Athena.

    The following screenshot shows the results in Redshift Spectrum.

    AWS Glue Job HudiMoRCompactionJob

    The CloudFormation template also deploys the AWS Glue job HudiMoRCompactionJob. This job is not scheduled; you only use it if you choose the MoR storage type. To execute the pipe for MoR storage type instead of CoW storage type, delete the CloudFormation stack and create it again. After creation, replace CoW in lakehouse-hudi-storage-type AWS Systems Manager Parameter with MoR.

    If you use MoR storage type, the incremental updates are stored in log files. You can’t see the updates in the _ro (read optimized) view, but can see them in the _rt view. Amazon Athena documentation and Amazon Redshift documentation gives more details about support and considerations for Apache Hudi.

    To see the incremental data in the _ro view, run the HudiMoRCompactionJob job. For more information about Hudi storage types and views, see Hudi Dataset Storage Types and Storage Types & Views. The following code is an example of the CLI command used to run HudiMoRCompactionJob job:

    aws glue start-job-run --job-name HudiMoRCompactionJob --arguments="--DB_NAME=human_resources","--TABLE_NAME=employee_details","--IS_PARTITIONED=true"

    You can decide on the frequency of running this job. You don’t have to run the job immediately after the HudiJob. You should run this job when you want the data to be available in the _ro view. You have to pass the schema name and the table name to this script so it knows the table to compact.

    Additional considerations

    The JAR file we use in this post has not been tested for AWS Glue streaming jobs. Additionally, there are some hardcoded Hudi options in the HudiJob script. These options are set for the sample table that we create for this post. Update the options based on your workload. 

    Conclusion

    In this post, we created AWS Glue 2.0 jobs that moved the source upserts and deletes into Hudi tables. The code creates tables in the AWS GLue Data Catalog and updates partitions so you don’t have to run the crawlers to update them.

    This post simplified your LakeHouse code base by giving you the benefits of Apache Hudi along with serverless AWS Glue. We also showed how to create an source to LakeHouse replication system using AWS Glue, AWS DMS, and Amazon Redshift with minimum overhead.


    Appendix

    We can write to Hudi tables because of the hudi-spark.jar file that we downloaded to our DependentJarsAndTempS3Bucket S3 bucket with the CloudFormation template. The path to this file is added as a dependency in both the AWS Glue jobs. This file is based on open-source Hudi. To create the JAR file, complete the following steps:

    1. Get Hudi 0.5.3 and unzip it using the following code:
      wget https://github.com/apache/hudi/archive/release-0.5.3.zip
      unzip hudi-release-0.5.3.zip

    2. Edit Hudi pom.xml:
      vi hudi-release-0.5.3/pom.xml

      1. Remove the following code to make the build process faster:
        <module>packaging/hudi-hadoop-mr-bundle</module>
        <module>packaging/hudi-hive-bundle</module>
        <module>packaging/hudi-presto-bundle</module>
        <module>packaging/hudi-utilities-bundle</module>
        <module>packaging/hudi-timeline-server-bundle</module>
        <module>docker/hoodie/hadoop</module>
        <module>hudi-integ-test</module>

      2. Change the versions of all three dependencies of httpcomponents to 4.4.1. The following is the original code:
        <!-- Httpcomponents -->
              <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>fluent-hc</artifactId>
                <version>4.3.2</version>
              </dependency>
              <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>4.3.2</version>
              </dependency>
              <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.3.6</version>
              </dependency>

        The following is the replacement code:

        <!-- Httpcomponents -->
              <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>fluent-hc</artifactId>
                <version>4.4.1</version>
              </dependency>
              <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>4.4.1</version>
              </dependency>
              <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.4.1</version>
              </dependency>

    3. Build the JAR file:
      mvn clean package -DskipTests -DskipITs -f <Full path of the hudi-release-0.5.3 dir>

    4. You can now get the JAR from the following location:
    hudi-release-0.5.3/packaging/hudi-spark-bundle/target/hudi-spark-bundle_2.11-0.5.3-rc2.jar

    The other JAR dependency used in the AWS Glue jobs is spark-avro_2.11-2.4.4.jar.


    About the Author

    Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with the customers on their use cases, architects a solution to solve their business problems and helps the customers build an scalable prototype. Prior to his journey in AWS, Vishal helped customers implement BI, DW and DataLake projects in US and Australia.

    Getting started with RPA using AWS Step Functions and Amazon Textract

    Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/getting-started-with-rpa-using-aws-step-functions-and-amazon-textract/

    This post is courtesy of Joe Tringali, Solutions Architect.

    Many organizations are using robotic process automation (RPA) to automate workflow, back-office processes that are labor-intensive. RPA, as software bots, can often handle many of these activities. Often RPA workflows contain repetitive manual tasks that must be done by humans, such as viewing invoices to find payment details.

    AWS Step Functions is a serverless function orchestrator and workflow automation tool. Amazon Textract is a fully managed machine learning service that automatically extracts text and data from scanned documents. Combining these services, you can create an RPA bot to automate the workflow and enable employees to handle more complex tasks.

    In this post, I show how you can use Step Functions and Amazon Textract to build a workflow that enables the processing of invoices. Download the code for this solution from https://github.com/aws-samples/aws-step-functions-rpa.

    Overview

    The following serverless architecture can process scanned invoices in PDF or image formats for submitting payment information to a database.

    Example architecture
    To implement this architecture, I use single-purpose Lambda functions and Step Functions to build the workflow:

    1. Invoices are scanned and loaded into an Amazon Simple Storage Service (S3) bucket.
    2. The loading of an invoice into Amazon S3 triggers an AWS Lambda function to be invoked.
    3. The Lambda function starts an asynchronous Amazon Textract job to analyze the text and data of the scanned invoice.
    4. The Amazon Textract job publishes a completion notification message with a status of “SUCCEEDED” or “FAILED” to an Amazon Simple Notification Service (SNS) topic.
    5. SNS sends the message to an Amazon Simple Queue Service (SQS) queue that is subscribed to the SNS topic.
    6. The message in the SQS queue triggers another Lambda function.
    7. The Lambda function initiates a Step Functions state machine to process the results of the Amazon Textract job.
    8. For an Amazon Textract job that completes successfully, a Lambda function saves the document analysis into an Amazon S3 bucket.
    9. The loading of the document analysis to Amazon S3 triggers another Lambda function.
    10. The Lambda function retrieves the text and data of the scanned invoice to find the payment information. It writes an item to an Amazon DynamoDB table with a status indicating if the invoice can be processed.
    11. If the DynamoDB item contains the payment information, another Lambda function is invoked.
    12. The Lambda function archives the processed invoice into another S3 bucket.
    13. If the DynamoDB item does not contain the payment information, a message is published to an Amazon SNS topic requesting that the invoice be reviewed.

    Amazon Textract can extract information from the various invoice images and associate labels with the data. You must then handle the various labels that different invoices may associate with the payee name, due date, and payment amount.

    Determining payee name, due date and payment amount

    After the document analysis has been saved to S3, a Lambda function retrieves the text and data of the scanned invoice to find the information needed for payment. However, invoices can use a variety of labels for the same piece of data, such a payment’s due date.

    In the example invoices included with this blog, the payment’s due date is associated with the labels “Pay On or Before”, “Payment Due Date” and “Payment Due”. Payment amounts can also have different labels, such as “Total Due”, “New Balance Total”, “Total Current Charges”, and “Please Pay”. To address this, I use a series of helper functions in the app.py file in the process_document_analysis folder of the GitHub repo.

    In app.py, there is the following get_ky_map helper function:

    def get_kv_map(blocks):
        key_map = {}
        value_map = {}
        block_map = {}
        for block in blocks:
            block_id = block['Id']
            block_map[block_id] = block
            if block['BlockType'] == "KEY_VALUE_SET":
                if 'KEY' in block['EntityTypes']:
                    key_map[block_id] = block
                else:
                    value_map[block_id] = block
        return key_map, value_map, block_map
    

    The get_kv_map function is invoked by the Lambda function handler. It iterates over the “Blocks” element of the document analysis produced by Amazon Textract to create dictionaries of keys (labels) and values (data) associated with each block identified by Amazon Textract. It then invokes the following get_kv_relationship helper function:

    def get_kv_relationship(key_map, value_map, block_map):
        kvs = {}
        for block_id, key_block in key_map.items():
            value_block = find_value_block(key_block, value_map)
            key = get_text(key_block, block_map)
            val = get_text(value_block, block_map)
            kvs[key] = val
        return kvs
    

    The get_kv_relationship function merges the key and value dictionaries produced by the get_kv_map function to create a single Python key value dictionary where labels are the keys to the dictionary and the invoice’s data are the values. The handler then invokes the following get_line_list helper function:

    def get_line_list(blocks):
        line_list = []
        for block in blocks:
            if block['BlockType'] == "LINE":
                if 'Text' in block: 
                    line_list.append(block["Text"])
        return line_list
    
    

    Extracting payee names is more complex because the data may not be labeled. The payee may often differ from the entity sending the invoice. With the Amazon Textract analysis in a format more easily consumable by Python, I use the following get_payee_name helper function to parse and extract the payee:

    def get_payee_name(lines):
        payee_name = ""
        payable_to = "payable to"
        payee_lines = [line for line in lines if payable_to in line.lower()]
        if len(payee_lines) > 0:
            payee_line = payee_lines[0]
            payee_line = payee_line.strip()
            pos = payee_line.lower().find(payable_to)
            if pos > -1:
                payee_line = payee_line[pos + len(payable_to):]
                if payee_line[0:1] == ':':
                    payee_line = payee_line[1:]
                payee_name = payee_line.strip()
        return payee_name
    
    

    The get_amount helper function searches the key value dictionary produced by the get_kv_relationship function to retrieve the payment amount:

    def get_amount(kvs, lines):
        amount = None
        amounts = [search_value(kvs, amount_tag) for amount_tag in amount_tags if search_value(kvs, amount_tag) is not None]
        if len(amounts) > 0:
            amount = amounts[0]
        else:
            for idx, line in enumerate(lines):
                if line.lower() in amount_tags:
                    amount = lines[idx + 1]
                    break
        if amount is not None:
            amount = amount.strip()
            if amount[0:1] == '$':
                amount = amount[1:]
        return amount
    

    The amount_tags variable contains a list of possible labels associated with the payment amount:

    amount_tags = ["total due", "new balance total", "total current charges", "please pay"]

    Similarly, the get_due_date helper function searches the key value dictionary produced by the get_kv_relationship function to retrieve the payment due date:

    def get_due_date(kvs):
        due_date = None
        due_dates = [search_value(kvs, due_date_tag) for due_date_tag in due_date_tags if search_value(kvs, due_date_tag) is not None]
        if len(due_dates) > 0:
            due_date = due_dates[0]
        if due_date is not None:
            date_parts = due_date.split('/')
            if len(date_parts) == 3:
                due_date = datetime(int(date_parts[2]), int(date_parts[0]), int(date_parts[1])).isoformat()
            else:
                date_parts = [date_part for date_part in re.split("\s+|,", due_date) if len(date_part) > 0]
                if len(date_parts) == 3:
                    datetime_object = datetime.strptime(date_parts[0], "%b")
                    month_number = datetime_object.month
                    due_date = datetime(int(date_parts[2]), int(month_number), int(date_parts[1])).isoformat()
        else:
            due_date = datetime.now().isoformat()
        return due_date
    

    The due_date_tag contains a list of possible labels associated with the payment due:

    due_date_tags = ["pay on or before", "payment due date", "payment due"]

    If all required elements needed to issue a payment are found, it adds an item to the DynamoDB table with a status attribute of “Approved for Payment”. If the Lambda function cannot determine the value of one or more required elements, it adds an item to the DynamoDB table with a status attribute of “Pending Review”.

    Payment Processing

    If the item in the DynamoDB table is marked “Approved for Payment”, the processed invoice is archived. If the item’s status attribute is marked “Pending Review”, an SNS message is published to an SNS Pending Review topic. You can subscribe to this topic so that you can add additional labels to the Python code for determining payment due dates and payment amounts.

    Note that the Lambda functions are single-purpose functions, and all workflow logic is contained in the Step Functions state machine. This diagram shows the various tasks (states) of a successful workflow.

    State machine workflow

    For more information about this solution, download the code from the GitHub repo (https://github.com/aws-samples/aws-step-functions-rpa).

    Prerequisites

    Before deploying the solution, you must install the following prerequisites:

    1. Python.
    2. AWS Command Line Interface (AWS CLI) – for instructions, see Installing the AWS CLI.
    3. AWS Serverless Application Model Command Line Interface (AWS SAM CLI) – for instructions, see Installing the AWS SAM CLI.

    Deploying the solution

    The solution creates the following S3 buckets with names suffixed by your AWS account ID to prevent a global namespace collision of your S3 bucket names:

    • scanned-invoices-<YOUR AWS ACCOUNT ID>
    • invoice-analyses-<YOUR AWS ACCOUNT ID>
    • processed-invoices-<YOUR AWS ACCOUNT ID>

    The following steps deploy the example solution in your AWS account. The solution deploys several components including a Step Functions state machine, Lambda functions, S3 buckets, a DynamoDB table for payment information, and SNS topics.

    AWS CloudFormation requires an S3 bucket and stack name for deploying the solution. To deploy:

    1. Download code from GitHub repo (https://github.com/aws-samples/aws-step-functions-rpa).
    2. Run the following command to build the artifacts locally on your workstation:sam build
    3. Run the following command to create a CloudFormation stack and deploy your resources:sam deploy --guided --capabilities CAPABILITY_NAMED_IAM

    Monitor the progress and wait for the completion of the stack creation process from the AWS CloudFormation console before proceeding.

    Testing the solution

    To test the solution, upload the PDF test invoices to the S3 bucket named scanned-invoices-<YOUR AWS ACCOUNT ID>.

    A Step Functions state machine with the name <YOUR STACK NAME>-ProcessedScannedInvoiceWorkflow runs the workflow. Amazon Textract document analyses are stored in the S3 bucket named invoice-analyses-<YOUR AWS ACCOUNT ID>, and processed invoices are stored in the S3 bucket named processed-invoices-<YOUR AWS ACCOUNT ID>. Processed payments are found in the DynamoDB table named <YOUR STACK NAME>-invoices.

    You can monitor the status of the workflows from the Step Functions console. Upon completion of the workflow executions, review the items added to DynamoDB from the Amazon DynamoDB console.

    Cleanup

    To avoid ongoing charges for any resources you created in this blog post, delete the stack:

    1. Empty the three S3 buckets created during deployment using the S3 console:
      – scanned-invoices-<YOUR AWS ACCOUNT ID>
      – invoice-analyses-<YOUR AWS ACCOUNT ID>
      – processed-invoices-<YOUR AWS ACCOUNT ID>
    2. Delete the CloudFormation stack created during deployment using the CloudFormation console.

    Conclusion

    In this post, I showed you how to use a Step Functions state machine and Amazon Textract to automatically extract data from a scanned invoice. This eliminates the need for a person to perform the manual step of reviewing an invoice to find payment information to be fed into a backend system. By replacing the manual steps of a workflow with automation, an organization can free up their human workforce to handle more value-added tasks.

    To learn more, visit AWS Step Functions and Amazon Textract for more information. For more serverless learning resources, visit https://serverlessland.com.

     

    Using AWS Lambda extensions to send logs to custom destinations

    Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/using-aws-lambda-extensions-to-send-logs-to-custom-destinations/

    You can now send logs from AWS Lambda functions directly to a destination of your choice using AWS Lambda Extensions. Lambda Extensions are a new way for monitoring, observability, security, and governance tools to easily integrate with AWS Lambda. For more information, see “Introducing AWS Lambda Extensions – In preview”.

    To help you troubleshoot failures in Lambda functions, AWS Lambda automatically captures and streams logs to Amazon CloudWatch Logs. This stream contains the logs that your function code and extensions generate, in addition to logs the Lambda service generates as part of the function invocation.

    Previously, to send logs to a custom destination, you typically configure and operate a CloudWatch Log Group subscription. A different Lambda function forwards logs to the destination of your choice.

    Logging tools, running as Lambda extensions, can now receive log streams directly from within the Lambda execution environment, and send them to any destination. This makes it even easier for you to use your preferred extensions for diagnostics.

    Today, you can use extensions to send logs to Coralogix, Datadog, Honeycomb, Lumigo, New Relic, and Sumo Logic.

    Overview

    To receive logs, extensions subscribe using the new Lambda Logs API.

    Lambda Logs API

    Lambda Logs API

    The Lambda service then streams the logs directly to the extension. The extension can then process, filter, and route them to any preferred destination. Lambda still sends the logs to CloudWatch Logs.

    You deploy extensions, including ones that use the Logs API, as Lambda layers, with the AWS Management Console and AWS Command Line Interface (AWS CLI). You can also use infrastructure as code tools such as AWS CloudFormation, the AWS Serverless Application Model (AWS SAM), Serverless Framework, and Terraform.

    Logging extensions from AWS Lambda Ready Partners and AWS Partners available at launch

    Today, you can use logging extensions with the following tools:

    • The Datadog extension now makes it easier than ever to collect your serverless application logs for visualization, analysis, and archival. Paired with Datadog’s AWS integration, end-to-end distributed tracing, and real-time enhanced AWS Lambda metrics, you can proactively detect and resolve serverless issues at any scale.
    • Lumigo provides monitoring and debugging for modern cloud applications. With the open source extension from Lumigo, you can send Lambda function logs directly to an S3 bucket, unlocking new post processing use cases.
    • New Relic enables you to efficiently monitor, troubleshoot, and optimize your Lambda functions. New Relic’s extension allows you send your Lambda service platform logs directly to New Relic’s unified observability platform, allowing you to quickly visualize data with minimal latency and cost.
    • Coralogix is a log analytics and cloud security platform that empowers thousands of companies to improve security and accelerate software delivery, allowing you to get deep insights without paying for the noise. Coralogix can now read Lambda function logs and metrics directly, without using Cloudwatch or S3, reducing the latency, and cost of observability.
    • Honeycomb is a powerful observability tool that helps you debug your entire production app stack. Honeycomb’s extension decreases the overhead, latency, and cost of sending events to the Honeycomb service, while increasing reliability.
    • The Sumo Logic extension enables you to get instant visibility into the health and performance of your mission-critical applications using AWS Lambda. With this extension and Sumo Logic’s continuous intelligence platform, you can now ensure that all your Lambda functions are running as expected, by analyzing function, platform, and extension logs to quickly identify and remediate errors and exceptions.

    You can also build and use your own logging extensions to integrate your organization’s tooling.

    Showing a logging extension to send logs directly to S3

    This demo shows an example of using a simple logging extension to send logs to Amazon Simple Storage Service (S3).

    To set up the example, visit the GitHub repo and follow the instructions in the README.md file.

    The example extension runs a local HTTP endpoint listening for HTTP POST events. Lambda delivers log batches to this endpoint. The example creates an S3 bucket to store the logs. A Lambda function is configured with an environment variable to specify the S3 bucket name. Lambda streams the logs to the extension. The extension copies the logs to the S3 bucket.

    Lambda environment variable specifying S3 bucket

    Lambda environment variable specifying S3 bucket

    The extension uses the Extensions API to register for INVOKE and SHUTDOWN events. The extension, using the Logs API, then subscribes to receive platform and function logs, but not extension logs.

    As the example is an asynchronous system, logs for one invoke may be processed during the next invocation. Logs for the last invoke may be processed during the SHUTDOWN event.

    Testing the function from the Lambda console, Lambda sends logs to CloudWatch Logs. The logs stream shows logs from the platform, function, and extension.

    Lambda logs visible in CloudWatch Logs

    Lambda logs visible in CloudWatch Logs

    The logging extension also receives the log stream directly from Lambda, and copies the logs to S3.

    Browsing to the S3 bucket, the log files are available.

    S3 bucket containing copied logs

    S3 bucket containing copied logs.

    Downloading the file shows the log lines. The log contains the same platform and function logs, but not the extension logs, as specified during the subscription.

    [{'time': '2020-11-12T14:55:06.560Z', 'type': 'platform.start', 'record': {'requestId': '49e64413-fd42-47ef-b130-6fd16f30148d', 'version': '$LATEST'}},
    {'time': '2020-11-12T14:55:06.774Z', 'type': 'platform.logsSubscription', 'record': {'name': 'logs_api_http_extension.py', 'state': 'Subscribed', 'types': ['platform', 'function']}},
    {'time': '2020-11-12T14:55:06.774Z', 'type': 'platform.extension', 'record': {'name': 'logs_api_http_extension.py', 'state': 'Ready', 'events': ['INVOKE', 'SHUTDOWN']}},
    {'time': '2020-11-12T14:55:06.776Z', 'type': 'function', 'record': 'Function: Logging something which logging extension will send to S3\n'}, {'time': '2020-11-12T14:55:06.780Z', 'type': 'platform.end', 'record': {'requestId': '49e64413-fd42-47ef-b130-6fd16f30148d'}}, {'time': '2020-11-12T14:55:06.780Z', 'type': 'platform.report', 'record': {'requestId': '49e64413-fd42-47ef-b130-6fd16f30148d', 'metrics': {'durationMs': 4.96, 'billedDurationMs': 100, 'memorySizeMB': 128, 'maxMemoryUsedMB': 87, 'initDurationMs': 792.41}, 'tracing': {'type': 'X-Amzn-Trace-Id', 'value': 'Root=1-5fad4cc9-70259536495de84a2a6282cd;Parent=67286c49275ac0ad;Sampled=1'}}}]
    

    Lambda has sent specific logs directly to the subscribed extension. The extension has then copied them directly to S3.

    For more example log extensions, see the Github repository.

    How do extensions receive logs?

    Extensions start a local listener endpoint to receive the logs using one of the following protocols:

    1. TCP – Logs are delivered to a TCP port in Newline delimited JSON format (NDJSON).
    2. HTTP – Logs are delivered to a local HTTP endpoint through PUT or POST, as an array of records in JSON format. http://sandbox:${PORT}/${PATH}. The $PATH parameter is optional.

    AWS recommends using an HTTP endpoint over TCP because HTTP tracks successful delivery of the log messages to the local endpoint that the extension sets up.

    Once the endpoint is running, extensions use the Logs API to subscribe to any of three different logs streams:

    • Function logs that are generated by the Lambda function.
    • Lambda service platform logs (such as the START, END, and REPORT logs in CloudWatch Logs).
    • Extension logs that are generated by extension code.

    The Lambda service then sends logs to endpoint subscribers inside of the execution environment only.

    Even if an extension subscribes to one or more log streams, Lambda continues to send all logs to CloudWatch.

    Performance considerations

    Extensions share resources with the function, such as CPU, memory, disk storage, and environment variables. They also share permissions, using the same AWS Identity and Access Management (IAM) role as the function.

    Log subscriptions consume memory resources as each subscription opens a new memory buffer to store the logs. This memory usage counts towards memory consumed within the Lambda execution environment.

    For more information on resources, security and performance with extensions, see “Introducing AWS Lambda Extensions – In preview”.

    What happens if Lambda cannot deliver logs to an extension?

    The Lambda service stores logs before sending to CloudWatch Logs and any subscribed extensions. If Lambda cannot deliver logs to the extension, it automatically retries with backoff. If the log subscriber crashes, Lambda restarts the execution environment. The logs extension re-subscribes, and continues to receive logs.

    When using an HTTP endpoint, Lambda continues to deliver logs from the last acknowledged delivery. With TCP, the extension may lose logs if an extension or the execution environment fails.

    The Lambda service buffers logs in memory before delivery. The buffer size is proportional to the buffering configuration used in the subscription request. If an extension cannot process the incoming logs quickly enough, the buffer fills up. To reduce the likelihood of an out of memory event due to a slow extension, the Lambda service drops records and adds a platform.logsDropped log record to the affected extension to indicate the number of dropped records.

    Disabling logging to CloudWatch Logs

    Lambda continues to send logs to CloudWatch Logs even if extensions subscribe to the logs stream.

    To disable logging to CloudWatch Logs for a particular function, you can amend the Lambda execution role to remove access to CloudWatch Logs.

    {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Deny",
            "Action": [
                "logs:CreateLogGroup",
                "logs:CreateLogStream",
                "logs:PutLogEvents"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        }
      ]
    }
    

    Logs are no longer delivered to CloudWatch Logs for functions using this role, but are still streamed to subscribed extensions. You are no longer billed for CloudWatch logging for these functions.

    Pricing

    Logging extensions, like other extensions, share the same billing model as Lambda functions. When using Lambda functions with extensions, you pay for requests served and the combined compute time used to run your code and all extensions, in 100 ms increments. To learn more about the billing for extensions, visit the Lambda FAQs page.

    Conclusion

    Lambda extensions enable you to extend the Lambda service to more easily integrate with your favorite tools for monitoring, observability, security, and governance.

    Extensions can now subscribe to receive log streams directly from the Lambda service, in addition to CloudWatch Logs. Today, you can install a number of available logging extensions from AWS Lambda Ready Partners and AWS Partners. Extensions make it easier to use your existing tools with your serverless applications.

    To try the S3 demo logging extension, follow the instructions in the README.md file in the GitHub repository.

    Extensions are now available in preview in all commercial regions other than the China regions.

    For more serverless learning resources, visit https://serverlessland.com.