Tag Archives: Amazon Simple Storage Services (S3)

Build an AWS Well-Architected environment with the Analytics Lens

Post Syndicated from Nikki Rouda original https://aws.amazon.com/blogs/big-data/build-an-aws-well-architected-environment-with-the-analytics-lens/

Building a modern data platform on AWS enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools. Yet you may be unsure of how to get started and the impact of certain design decisions. To address the need to provide advice tailored to specific technology and application domains, AWS added the concept of well-architected lenses 2017. AWS now is happy to announce the Analytics Lens for the AWS Well-Architected Framework. This post provides an introduction of its purpose, topics covered, common scenarios, and services included.

The new Analytics Lens offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. The goal is to give you a consistent way to design and evaluate cloud architectures, based on the following five pillars:

  • Operational excellence
  • Security
  • Reliability
  • Performance efficiency
  • Cost optimization

The tool can help you assess the analytics workloads you have deployed in AWS by identifying potential risks and offering suggestions for improvements.

Using the Analytics Lens to address common requirements

The Analytics Lens models both the data architecture at the core of the analytics applications and the application behavior itself. These models are organized into the following six areas, which encompass the vast majority of analytics workloads deployed on AWS:

  1. Data ingestion
  2. Security and governance
  3. Catalog and search
  4. Central storage
  5. Processing and analytics
  6. User access

The following diagram illustrates these areas and their related AWS services.

There are a number of common scenarios where the Analytics Lens applies, such as the following:

  • Building a data lake as the foundation for your data and analytics initiatives
  • Efficient batch data processing at scale
  • Building a platform for streaming ingest and real-time event processing
  • Handling big data processing and streaming
  • Data-preparation operations

Whichever of these scenarios fits your needs, building to the principles of the Analytics Lens in the AWS Well-Architected Framework can help you implement best practices for success.

The Analytics Lens explains when and how to use the core services in the AWS analytics portfolio. These include Amazon Kinesis, Amazon Redshift, Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. It also explains how Amazon Simple Storage Service (Amazon S3) can serve as the storage for your data lake and how to integrate with relevant AWS security services. With reference architectures, best practices advice, and answers to common questions, the Analytics Lens can help you make the right design decisions.

Conclusion

Applying the lens to your existing architectures can validate the stability and efficiency of your design (or provide recommendations to address the gaps that are identified). AWS is committed to the Analytics Lens as a living tool; as the analytics landscape evolves and new AWS services come on line, we’ll update the Analytics Lens appropriately. Our mission will always be to help you design and deploy well-architected applications.

For more information about building your own Well-Architected environment using the Analytics Lens, see the Analytics Lens whitepaper.

Special thanks to the following individuals who contributed to building this resource, among many others who helped with review and implementation: Radhika Ravirala, Laith Al-Saadoon, Wallace Printz, Ujjwal Ratan, and Neil Mukerje.

Are there questions you’d like to see answered in the tool? Share your thoughts and questions in the comments.

 


About the Authors

Nikki Rouda is the principal product marketing manager for data lakes and big data at Amazon Web Services. Nikki has spent 20+ years helping enterprises in 40+ countries develop and implement solutions to their analytics and IT infrastructure challenges. Nikki holds an MBA from the University of Cambridge and an ScB in geophysics and math from Brown University.

 

 


Radhika Ravirala is a specialist solutions architect at Amazon Web Services, where she helps customers craft distributed analytics applications on the AWS platform. Prior to her cloud journey, she worked as a software engineer and designer for technology companies in Silicon Valley.

New – Enhanced Amazon Macie Now Available with Substantially Reduced Pricing

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-enhanced-amazon-macie-now-available/

Amazon Macie is a fully managed service that helps you discover and protect your sensitive data, using machine learning to automatically spot and classify data for you.

Over time, Macie customers told us what they like, and what they didn’t. The service team has worked hard to address this feedback, and today I am very happy to share that we are making available a new, enhanced version of Amazon Macie!

This new version has simplified the pricing plan: you are now charged based on the number of Amazon Simple Storage Service (S3) buckets that are evaluated, and the amount of data processed for sensitive data discovery jobs. The new tiered pricing plan has reduced the price by 80%. With higher volumes, you can reduce your costs by more than 90%.

At the same time, we have introduced many new features:

  • An expanded sensitive data discovery, including updated machine learning models for personally identifiable information (PII) detection, and customer-defined sensitive data types using regular expressions.
  • Multi-account support with AWS Organizations.
  • Full API coverage for programmatic use of the service with AWS SDKs and AWS Command Line Interface (CLI).
  • Expanded regional availability to 17 Regions.
  • A new, simplified free tier and free trial to help you get started and understand your costs.
  • A completely redesigned console and user experience.

Macie is now tightly integrated with S3 in the backend, providing more advantages:

  • Enabling S3 data events in AWS CloudTrail is no longer a requirement, further reducing overall costs.
  • There is now a continual evaluation of all buckets, issuing security findings for any public bucket, unencrypted buckets, and for buckets shared with (or replicated to) an AWS account outside of your Organization.

The anomaly detection features monitoring S3 data access activity previously available in Macie are now in private beta as part of Amazon GuardDuty, and have been enhanced to include deeper capabilities to protect your data in S3.

Enabling Amazon Macie
In the Macie console, I select to Enable Macie. If you use AWS Organizations, you can delegate an AWS account to administer Macie for your Organization.

After it has been enabled, Amazon Macie automatically provides a summary of my S3 buckets in the region, and continually evaluates those buckets to generate actionable security findings for any unencrypted or publicly accessible data, including buckets shared with AWS accounts outside of my Organization.

Below the summary, I see the top findings by type and by S3 bucket. Overall, this page provides a great overview of the status of my S3 buckets.

In the Findings section I have the full list of findings, and I can select them to archive, unarchive, or export them. I can also select one of the findings to see the full information collected by Macie.

Findings can be viewed in the web console and are sent to Amazon CloudWatch Events for easy integration with existing workflow or event management systems, or to be used in combination with AWS Step Functions to take automated remediation actions. This can help meet regulations such as Payment Card Industry Data Security Standard (PCI-DSS), Health Insurance Portability and Accountability Act (HIPAA), General Data Privacy Regulation (GDPR), and California Consumer Protection Act (CCPA).

In the S3 Buckets section, I can search and filter on buckets of interest to create sensitive data discovery jobs across one or multiple buckets to discover sensitive data in objects, and to check encryption status and public accessibility at object level. Jobs can be executed once, or scheduled daily, weekly, or monthly.

For jobs, Amazon Macie automatically tracks changes to the buckets and only evaluates new or modified objects over time. In the additional settings, I can include or exclude objects based on tags, size, file extensions, or last modified date.

To monitor my costs, and the use of the free trial, I look at the Usage section of the console.

Creating Custom Data Identifiers
Amazon Macie supports natively the most common sensitive data types, including personally identifying information (PII) and credential data. You can extend that list with custom data identifiers to discover proprietary or unique sensitive data for your business.

For example, often companies have a specific syntax for their employee IDs. A possible syntax is to have a capital letter, that defines if this is a full-time or a part-time employee, followed by a dash, and then eight numbers. Possible values in this case are F-12345678 or P-87654321.

To create this custom data identifier, I enter a regular expression (regex) to describe the pattern to match:

[A-Z]-\d{8}

To avoid false positives, I ask that the employee keyword is found near the identifier (by default, less than 50 characters apart). I use the Evaluate box to test that this configuration works with sample text, then I select Submit.

Available Now
For Amazon Macie regional availability, please see the AWS Region Table. You can find more information on how the new enhanced Macie in the documentation.

This release of Amazon Macie remains optimized for S3. However, anything you can get into S3, permanently or temporarily, in an object format supported by Macie, can be scanned for sensitive data. This allows you to expand the coverage to data residing outside of S3 by pulling data out of custom applications, databases, and third-party services, temporarily placing it in S3, and using Amazon Macie to identify sensitive data.

For example, we’ve made this even easier with RDS and Aurora now supporting snapshots to S3 in Apache Parquet, which is a format Macie supports. Similarly, in DynamoDB, you can use AWS Glue to export tables to S3 which can then be scanned by Macie. With the new API and SDKs coverage, you can use the new enhanced Amazon Macie as a building block in an automated process exporting data to S3 to discover and protect your sensitive data across multiple sources.

Danilo

Using dynamic Amazon S3 event handling with Amazon EventBridge

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/using-dynamic-amazon-s3-event-handling-with-amazon-eventbridge/

A common pattern in serverless applications is to invoke a Lambda function in response to an event from Amazon S3. For example, you could use this pattern for automating document translation, transcribing audio files, or staging data imports. You can configure this integration in many places, including the AWS Management Console, the AWS CLI, or the AWS Serverless Application Model (SAM).

If you need to fan out notifications, or hold messages in queue, you are also able to route S3 events to Amazon SNS or Amazon SQS. These standard notification mechanisms work well for most applications, and are simple to implement. However, for more complex notification patterns, you can use Amazon EventBridge to route events dynamically. This blog post explores advanced use-cases and how to implement these in your serverless applications.

S3 to EventBridge, using CloudTrail.

To set up the example applications, visit the GitHub repo and follow the instructions in the README.md file. The code uses SAM templates, enabling you to deploy the applications in your own AWS account. This walkthrough creates resources covered in the AWS Free Tier but you may incur cost if you test with large amounts of data.

Integrating S3 events with Lambda via EventBridge

EventBridge consumes S3 events via AWS CloudTrail. A single trail can log events for one or more S3 buckets, and you can configure which data events are recorded. It’s best practice to store CloudTrail log files in a separate S3 bucket. Once this is configured, EventBridge can then receive any event logged in the trail.

The first example in the GitHub repo shows how this can be configured in a SAM template. The application comprises an S3 bucket, a Lambda EventConsumer function, and other required resources. First, the template defines the two buckets:

Resources: 
  SourceBucket: 
    Type: AWS::S3::Bucket
    Properties:
      BucketName: "TheSourceBucket"

  LoggingBucket: 
    Type: AWS::S3::Bucket
    Properties:
      BucketName: "TheLoggingBucket"

Next, an S3 bucket policy grants permissions for CloudTrail to write files to the logging bucket:

  BucketPolicy: 
    Type: AWS::S3::BucketPolicy
    Properties: 
      Bucket: 
        Ref: LoggingBucket
      PolicyDocument: 
        Version: "2012-10-17"
        Statement: 
          - 
            Sid: "AWSCloudTrailAclCheck"
            Effect: "Allow"
            Principal: 
              Service: "cloudtrail.amazonaws.com"
            Action: "s3:GetBucketAcl"
            Resource: 
              !Sub |-
                arn:aws:s3:::${LoggingBucket}
          - 
            Sid: "AWSCloudTrailWrite"
            Effect: "Allow"
            Principal: 
              Service: "cloudtrail.amazonaws.com"
            Action: "s3:PutObject"
            Resource:
              !Sub |-
                arn:aws:s3:::${LoggingBucket}/AWSLogs/${AWS::AccountId}/*
            Condition: 
              StringEquals:
                s3:x-amz-acl: "bucket-owner-full-control"

The template configures the trail and sets the logging bucket. It defines event selectors, which identify the specific events for logging:

  myTrail: 
    Type: AWS::CloudTrail::Trail
    DependsOn: 
      - BucketPolicy
    Properties: 
      TrailName: "MyTrailName"
      S3BucketName: 
        Ref: LoggingBucket
      IsLogging: true
      IsMultiRegionTrail: false
      EventSelectors:
        - DataResources:
          - Type: AWS::S3::Object
            Values:
              - !Sub |-
                arn:aws:s3:::${SourceBucket}/
      IncludeGlobalServiceEvents: false

The SAM template configures a target Lambda function for receiving the events:

  EventConsumerFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: eventConsumer/
      Handler: app.handler
      Runtime: nodejs12.x

Finally, it defines a rule that sets the event pattern and targets. It also grants permission to EventBridge to invoke the Lambda function:

  EventRule: 
    Type: AWS::Events::Rule
    Properties: 
      Description: "EventRule"
      State: "ENABLED"
      EventPattern: 
        source: 
          - "aws.s3"
        detail: 
          eventName: 
            - "PutObject"
          requestParameters:
            bucketName: !Ref SourceBucketName

      Targets: 
        - 
          Arn: 
            Fn::GetAtt: 
              - "EventConsumerFunction"
              - "Arn"
          Id: "EventConsumerFunctionTarget"

  PermissionForEventsToInvokeLambda: 
    Type: AWS::Lambda::Permission
    Properties: 
      FunctionName: 
        Ref: "EventConsumerFunction"
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn: 
        Fn::GetAtt: 
          - "EventRule"
          - "Arn"

To deploy this application, follow the instructions in the GitHub repo’s README.file. To test, upload any file to the Source Bucket. This invokes the Lambda function via the EventBridge event, and logs out the event details. Open the CloudWatch Logs console for the deployed Lambda function to view the output.

The event pattern in this example matches on any PutObject event in the Source Bucket. You can also match on any attribute, or combination of attributes, in an S3 event. This makes it possible to identify events by source IP address, object size, time range, or principalId (the user causing the event). With access to the entire S3 event, this enables more granularity on matching events before invoking the target Lambda function.

Consuming events from existing S3 buckets

When deploying S3 and Lambda integrations in SAM templates, you cannot use existing buckets managed outside of the CloudFormation stack. Frequently, it’s useful to deploy serverless applications that integrate with existing S3 buckets. Using the S3-to-EventBridge integration, you can create new applications that receive events from existing buckets.

Consuming events from existing S3 buckets

The second example in the GitHub repo shows how to configure a new application for an existing bucket. This template takes the existing S3 bucket name as a parameter, and generates the CloudTrail trail, EventBridge rule, and required permissions.

Follow this example’s README.md file to deploy the application. To test, upload any file into the existing S3 bucket you selected. This invokes the eventConsumer logging function deployed in the template.

Invoking a single Lambda function from multiple S3 buckets

With EventBridge decoupling the producer and consumer of the events, this also makes it easier to introduce multiple producers. In the third example, the SAM template creates three buckets that invoke the same EventConsumer Lambda function:

Invoking Lambda from multiple S3 buckets

The MultiBucketName parameter is used to create the three buckets with a number appended to the name. First, the CloudTrail EventSelector includes the three buckets in the trail:

  # The CloudTrail trail 
  myTrail: 
    Type: AWS::CloudTrail::Trail
    DependsOn: 
      - BucketPolicy
    Properties: 
      TrailName: "myTrail"
      S3BucketName: 
        Ref: LoggingBucket
      IsLogging: true
      IsMultiRegionTrail: false
      EventSelectors:
        - DataResources:
          - Type: AWS::S3::Object
            Values:
              - !Sub 'arn:aws:s3:::${MultiBucketName}-1/'
              - !Sub 'arn:aws:s3:::${MultiBucketName}-2/'
              - !Sub 'arn:aws:s3:::${MultiBucketName}-3/'
      IncludeGlobalServiceEvents: false

Next, the EventRule includes the three bucket names in the event pattern, so events from any of these buckets can now trigger the rule:

  # EventBridge rule - invokes EventConsumerFunction 
  EventRule: 
    Type: AWS::Events::Rule
    Properties: 
      Description: "EventRule"
      State: "ENABLED"
      EventPattern: 
        source: 
          - "aws.s3"
        detail: 
          eventName: 
            - "PutObject"
          requestParameters:
            bucketName:
              - !Sub '${MultiBucketName}-1'
              - !Sub '${MultiBucketName}-2'
              - !Sub '${MultiBucketName}-3'

It’s also possible to use content-based filtering in event patterns to match dynamically on bucket names. For example, if you have multiple buckets with the prefix myCompanySales, you can create an event pattern to match all of these buckets:

      EventPattern: 
        source: 
          - "aws.s3"
        detail: 
          eventName: 
            - "PutObject"
          requestParameters:
            bucketName:
              - "prefix": "myCompanySales" 

This enables your application to consume events from new buckets created after the application is deployed. With content-based filtering, you can create search patterns that allow greater flexibility in matching events.

Multiple buckets with multiple Lambda functions

In the standard S3 and Lambda integration, a single Lambda function can only be invoked by distinct prefix and suffix patterns in the S3 trigger. This means that the same Lambda function cannot be set as the trigger for PutObject events for the same filetype or prefix. When you need to invoke multiple functions with the same or overlapping prefixes or suffixes, the EventBridge integration can handle this.

EventBridge allows up to five targets per rule, so you can specify up to five separate Lambda functions to receive the event. All five functions are invoked in parallel when the event pattern matches. To use this, add the targets in the rule – no change to the event pattern is required.

In the fourth example, the SAM template configures three buckets and three Lambda functions, all subscribing to the same event pattern.

Multiple buckets with multiple Lambda subscribers

This template takes the existing S3 bucket name as a parameter, and generates the CloudTrail trail, EventBridge rule, and required permissions. The key change to the template is in the EventRule, where now more than one target is defined:

      Targets: 
        - Arn: 
            Fn::GetAtt: 
              - "EventConsumerFunction1"
              - "Arn"
          Id: "EventConsumerFunctionTarget1"
        - Arn: 
            Fn::GetAtt: 
              - "EventConsumerFunction2"
              - "Arn"
          Id: "EventConsumerFunctionTarget2"
        - Arn: 
            Fn::GetAtt: 
              - "EventConsumerFunction3"
              - "Arn"
          Id: "EventConsumerFunctionTarget3"

This approach enables more complex routing of S3 events to Lambda targets. It allows events from multiple S3 buckets with overlapping prefixes and suffixes in object names. It also enables you to route those events to multiple Lambda functions simultaneously.

Conclusion

The standard S3 to Lambda integration enables developers to deploy code that responds to bucket- or object-based events. You can also use SNS or SQS as targets for fanning out or buffering messages from S3. Using Amazon EventBridge, you can employ even more sophisticated routing and filtering of events between S3 and Lambda.

In this blog post, I show how to deploy a basic integration using a SAM template with a single bucket and single Lambda function. I cover how to use existing S3 buckets in your new application deployments, and use EventBridge content filtering in rules to dynamically match bucket events.

Finally, in complex serverless applications, I show how EventBridge completely decouples the producers and consumers. This makes it easy to route events from multiple S3 buckets to multiple Lambda functions. When combined with attribute matching across the entire S3 event object, this allows much more granularity in identifying events before invoking Lambda functions.

To learn more about using decoupled, event-driven architectures in your serverless applications, visit the Amazon EventBridge Learning Path.

Building a Scalable Document Pre-Processing Pipeline

Post Syndicated from Joel Knight original https://aws.amazon.com/blogs/architecture/building-a-scalable-document-pre-processing-pipeline/

In a recent customer engagement, Quantiphi, Inc., a member of the Amazon Web Services Partner Network, built a solution capable of pre-processing tens of millions of PDF documents before sending them for inference by a machine learning (ML) model. While the customer’s use case—and hence the ML model—was very specific to their needs, the pipeline that does the pre-processing of documents is reusable for a wide array of document processing workloads. This post will walk you through the pre-processing pipeline architecture.

Pre-processing pipeline architecture-SM

Architectural goals

Quantiphi established the following goals prior to starting:

  • Loose coupling to enable independent scaling of compute components, flexible selection of compute services, and agility as the customer’s requirements evolved.
  • Work backwards from business requirements when making decisions affecting scale and throughput and not simply because “fastest is best.” Scale components only where it makes sense and for maximum impact.
  •  Log everything at every stage to enable troubleshooting when something goes wrong, provide a detailed audit trail, and facilitate cost optimization exercises by identifying usage and load of every compute component in the architecture.

Document ingestion

The documents are initially stored in a staging bucket in Amazon Simple Storage Service (Amazon S3). The processing pipeline is kicked off when the “trigger” Amazon Lambda function is called. This Lambda function passes parameters such as the name of the staging S3 bucket and the path(s) within the bucket which are to be processed to the “ingestion app.”

The ingestion app is a simple application that runs a web service to enable triggering a batch and lists documents from the S3 bucket path(s) received via the web service. As the app processes the list of documents, it feeds the document path, S3 bucket name, and some additional metadata to the “ingest” Amazon Simple Queue Service (Amazon SQS) queue. The ingestion app also starts the audit trail for the document by writing a record to the Amazon Aurora database. As the document moves downstream, additional records are added to the database. Records are joined together by a unique ID and assigned to each document by the ingestion app and passed along throughout the pipeline.

Chunking the documents

In order to maximize grip and control, the architecture is built to submit single-page files to the ML model. This enables correlating an inference failure to a specific page instead of a whole document (which may be many pages long). It also makes identifying the location of features within the inference results an easier task. Since the documents being processed can have varied sizes, resolutions, and page count, a big part of the pre-processing pipeline is to chunk a document up into its component pages prior to sending it for inference.

The “chunking orchestrator” app repeatedly pulls a message from the ingest queue and retrieves the document named therein from the S3 bucket. The PDF document is then classified along two metrics:

  • File size
  • Number of pages

We use these metrics to determine which chunking queue the document is sent to:

  • Large: Greater than 10MB in size or greater than 10 pages
  • Small: Less than or equal to 10MB and less than or equal to 10 pages
  • Single page: Less than or equal to 10MB and exactly one page

Each of these queues is serviced by an appropriately sized compute service that breaks the document down into smaller pieces, and ultimately, into individual pages.

  • Amazon Elastic Cloud Compute (EC2) processes large documents primarily because of the high memory footprint needed to read large, multi-gigabyte PDF files into memory. The output from these workers are smaller PDF documents that are stored in Amazon S3. The name and location of these smaller documents is submitted to the “small documents” queue.
  • Small documents are processed by a Lambda function that decomposes the document into single pages that are stored in Amazon S3. The name and location of these single page files is sent to the “single page” queue.

The Dead Letter Queues (DLQs) are used to hold messages from their respective size queue which are not successfully processed. If messages start landing in the DLQs, it’s an indication that there is a problem in the pipeline. For example, if messages start landing in the “small” or “single page” DLQ, it could indicate that the Lambda function processing those respective queues has reached its maximum run time.

An Amazon CloudWatch Alarm monitors the depth of each DLQ. Upon seeing DLQ activity, a notification is sent via Amazon Simple Notification Service (Amazon SNS) so an administrator can then investigate and make adjustments such as tuning the sizing thresholds to ensure the Lambda functions can finish before reaching their maximum run time.

In order to ensure no documents are left behind in the active run, there is a failsafe in the form of an Amazon EC2 worker that retrieves and processes messages from the DLQs. This failsafe app breaks a PDF all the way down into individual pages and then does image conversion.

For documents that don’t fall into a DLQ, they make it to the “single page” queue. This queue drives each page through the “image conversion” Lambda function which converts the single page file from PDF to PNG format. These PNG files are stored in Amazon S3.

Sending for inference

At this point, the documents have been chunked up and are ready for inference.

When the single-page image files land in Amazon S3, an S3 Event Notification is fired which places a message in a “converted image” SQS queue which in turn triggers the “model endpoint” Lambda function. This function calls an API endpoint on an Amazon API Gateway that is fronting the Amazon SageMaker inference endpoint. Using API Gateway with SageMaker endpoints avoided throttling during Lambda function execution due to high volumes of concurrent calls to the Amazon SageMaker API. This pattern also resulted in a 2x inference throughput speedup. The Lambda function passes the document’s S3 bucket name and path to the API which in turn passes it to the auto scaling SageMaker endpoint. The function reads the inference results that are passed back from API Gateway and stores them in Amazon Aurora.

The inference results as well as all the telemetry collected as the document was processed can be queried from the Amazon Aurora database to build reports showing number of documents processed, number of documents with failures, and number of documents with or without whatever feature(s) the ML model is trained to look for.

Summary

This architecture is able to take PDF documents that range in size from single page up to thousands of pages or gigabytes in size, pre-process them into single page image files, and then send them for inference by a machine learning model. Once triggered, the pipeline is completely automated and is able to scale to tens of millions of pages per batch.

In keeping with the architectural goals of the project, Amazon SQS is used throughout in order to build a loosely coupled system which promotes agility, scalability, and resiliency. Loose coupling also enables a high degree of grip and control over the system making it easier to respond to changes in business needs as well as focusing tuning efforts for maximum impact. And with every compute component logging everything it does, the system provides a high degree of auditability and introspection which facilitates performance monitoring, and detailed cost optimization.

Creating a scalable serverless import process for Amazon DynamoDB

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/creating-a-scalable-serverless-import-process-for-amazon-dynamodb/

Amazon DynamoDB is a web-scale NoSQL database designed to provide low latency access to data. It’s well suited to many serverless applications as a primary data store, and fits into many common enterprise architectures. In this post, I show how you can import large amounts of data to DynamoDB using a serverless approach. This uses Amazon S3 as a staging area and AWS Lambda for the custom business logic.

This pattern is useful as a general import mechanism into DynamoDB because it separates the challenge of scaling from the data transformation logic. The incoming data is stored in S3 objects, formatted as JSON, CSV, or any custom format your applications produce. The process works whether you import only a few large files or many small files. It takes advantage of parallelization to import data quickly into a DynamoDB table.

Using S3-to-Lambda to import at scale to DynamoDB.

This is useful for applications where upstream services produce transaction information, and can be effective for handling data generated by spiky workloads. Alternatively, it’s also a simple way to migrate from another data source to DynamoDB, especially for large datasets.

In this blog post, I show two different import applications. The first is a direct import into the DynamoDB table. The second explores a more advanced method for smoothing out volume in the import process. The code uses the AWS Serverless Application Model (SAM), enabling you to deploy the application in your own AWS Account. This walkthrough creates resources covered in the AWS Free Tier but you may incur cost for large data imports.

To set up both example applications, visit the GitHub repo and follow the instructions in the README.md file.

Directly importing data from S3 to DynamoDB

The first example application loads data directly from S3 to DynamoDB via a Lambda function. This uses the following architecture:

Architecture for the first example application.

  1. A downstream process creates source import data in JSON format and writes to an S3 bucket.
  2. When the objects are saved, S3 invokes the main Lambda function.
  3. The function reads the S3 object and converts the JSON into the correct format for the DynamoDB table. It uploads this data in batches to the table.

The repo’s SAM template creates a DynamoDB table with a partition key, configured to use on-demand capacity. This mode enables the DynamoDB service to scale appropriately to match the number of writes required by the import process. This means you do not need to manage DynamoDB table capacity, as you would in the standard provisioned mode.

  DDBtable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
      - AttributeName: ID
        AttributeType: S
      KeySchema:
      - AttributeName: ID
        KeyType: HASH
      BillingMode: PAY_PER_REQUEST

The template defines the Lambda function to import the data:

  ImportFunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: importFunction/
      Handler: app.handler
      Runtime: nodejs12.x
      MemorySize: 512
      Environment:
        Variables:
          DDBtable: !Ref DDBtable      
      Policies:
        - DynamoDBCrudPolicy:
            TableName: !Ref DDBtable        
        - S3ReadPolicy:
            BucketName: !Ref InputBucketName
      Events:
        FileUpload:
          Type: S3
          Properties:
            Bucket: !Ref InputS3Bucket
            Events: s3:ObjectCreated:*
            Filter: 
              S3Key:
                Rules:
                  - Name: suffix
                    Value: '.json'            

This uses SAM policy templates to provide write access to the DynamoDB table and read access to S3 bucket. It also defines the event that causes the function invocation from S3, filtering only for new objects with a .json suffix.

Testing the application

  1. Deploy the first application by following the README.md in the GitHub repo, and note the application’s S3 bucket name and DynamoDB table name.
  2. Change into the dataGenerator directory:
    cd ./dataGenerator
  3. Create sample data for testing. The following command creates 10 files of 100 records each:
    node ./app.js 100 10
  4. Upload the sample data into your application’s S3 bucket, replacing your-bucket below with your bucket name:
    aws s3 cp ./data/ s3://your-bucket --recursiveYour console output shows the following, confirming that the sample data is uploaded to S3.Generating and uploading sample data for testing.
  5. After a few seconds, enter this command to show the number of items in the application’s DynamoDB table. Replace your-table with your deployed table name:aws dynamodb scan --table-name your-table --select "COUNT"Your console output shows that 1,000 items are now stored in the DynamoDB and the files are successfully imported.Checking the number of items stored in the DynamoDB.

With on-demand provisioning, the per-table limit of 40,000 write request units still applies. For high volumes or sudden, spiky workloads, DynamoDB may throttle the import when using this approach. Any throttling events appears in the Metrics tab of the table in the DynamoDB service console. Throttling is intended to protect your infrastructure but there are times when you want to process these high volumes. The second application in the repo shows how to address this.

Handling extreme loads and variability in the import process

In this next example, the goal is to smooth out the traffic, so that the load process into DynamoDB is much more consistent. The key service used to achieve this is Amazon SQS, which holds all the items until a loader process stores the data in DynamoDB. The architecture looks like this:

Architecture for the second example application.

  1. A downstream process creates source import data in JSON format and writes to an S3 bucket.
  2. When the objects are saved, S3 invokes a Lambda function that transforms the input and adds these as messages in an Amazon SQS queue.
  3. The Lambda polls the SQS queue and invokes a function to process the messages batches.
  4. The function converts the JSON messages into the correct format for the DynamoDB table. It uploads this data in batches to the table.

Testing the application

In this test, you generate a much larger amount of data using a greater number of S3 objects. The instructions below creates 100,000 sample records, so running this code may incur cost on your AWS bill.

  1. Deploy the second application by following the README.md in the GitHub repo, and note the application’s S3 bucket name and DynamoDB table name.
  2. Change into the dataGenerator directory:
    cd ./dataGenerator
  3. Create sample data for testing. The following command creates 100 files of 1,000 records each:
    node ./app.js 1000 100
  4. Upload the sample data into your application’s S3 bucket, replacing your-bucket below with your deployed bucket name:aws s3 cp ./data/ s3://your-bucket --recursiveThis process takes around 10 minutes to complete with the default configuration in the repo.
  5. From the DynamoDB console, select the application’s table and then choose the Metrics tab. Select the Write capacity graph to zoom into the chart:Using CloudWatch to view WCUs consumed in the uploading process.

The default configuration deliberately slows down the load process to illustrate how it works. Using this approach, the load into the database is much more consistent, consuming between 125-150 write capacity units (WCUs) per minute. This design makes it possible to vary how quickly you load data into the DynamoDB table, depending upon the needs of your use-case.

How this works

In this second application, there are multiple points where the application uses a configuration setting to throttle the flow of data to the next step.

Throttling points in the architecture.

  1. AddToQueue function: this loads data from the source S3 object into SQS in batches of 25 messages. Depending on the size of your source records, you may add more records into a single SQS message, which has a size limit of 256 Kb. You can also compress this message with gzip to further add more records.
  2. Function concurrency: the SAM template sets the Loader function’s concurrency to 1, using the ReservedConcurrentExecutions attribute. In effect, this stops Lambda from scaling this function, which means it keeps fetching the next batch from SQS as soon as processing finishes. The concurrency is a multiplier – as this value is increased, the loading into the DynamoDB table increases proportionately, if there are messages available in SQS. Select a value greater than 1 to use parallelization in the load process.
  3. Loader function: this consumes messages from the SQS queue. The BatchSize configured in the SAM template is set to four messages per invocation. Since each message contains 25 records, this represents 100 records per invocation when the queue has enough messages. You can set a BatchSize value from 1 to 10, so could increase this from the application’s default.

When you combine these settings, it’s possible to dramatically increase the throughput for loading data into DynamoDB. Increasing the load also increases WCUs consumed, which increases cost. Your use case can inform you about the optimal balance between speed and cost. You can make changes, too – it’s simple to make adjustments to meet your requirements.

Additionally, each of the services used has its own service limits. For high production loads, it’s important to understand the quotas set, and whether these are soft or hard limits. If your application requires higher throughput, you can request raising soft limits via an AWS Support Center ticket.

Conclusion

DynamoDB does not offer a native import process and existing solutions may not meet your needs for unplanned, large-scale imports. The AWS Database Migration Service is not serverless, and the AWS Data Pipeline is schedule-based rather than event-based. This solution is designed to provide a fully serverless alternative that responds to incoming data to S3 on-demand.

In this post, I show how you can create a simple import process directly to the DynamoDB table, triggered by objects put into an S3 bucket. This provides a near-real time import process. I also show a more advanced approach to smooth out traffic for high-volume or spiky workloads. This helps create a resilient and consistent data import for DynamoDB.

To learn more, watch this video to see how to deploy and test the DynamoDB importer application.

IAM Access Analyzer flags unintended access to S3 buckets shared through access points

Post Syndicated from Andrea Nedic original https://aws.amazon.com/blogs/security/iam-access-analyzer-flags-unintended-access-to-s3-buckets-shared-through-access-points/

Customers use Amazon Simple Storage Service (S3) buckets to store critical data and manage access to data at scale. With Amazon S3 Access Points, customers can easily manage shared data sets by creating separate access points for individual applications. Access points are unique hostnames attached to a bucket and customers can set distinct permissions using access point policies. To help you identify buckets that can be accessed publicly or from other AWS accounts or organizations, AWS Identity and Access Management (IAM) Access Analyzer mathematically analyzes resource policies. Now, Access Analyzer analyzes access point policies in addition to bucket policies and bucket ACLs. This helps you find unintended access to S3 buckets that use access points. Access Analyzer makes it easier to identify and remediate unintended public, cross-account, or cross-organization sharing of your S3 buckets that use access points. This enables you to restrict bucket access and adhere to the security best practice of least privilege.

In this post, first I review Access Analyzer and how to enable it. Then I walk through an example of how to use Access Analyzer to identify an S3 bucket that is shared through an access point. Finally, I show you how to view Access Analyzer bucket findings in the S3 Management Console.

IAM Access Analyzer overview

Access Analyzer helps you determine which resources can be accessed publicly or from other accounts or organizations. Access Analyzer determines this by mathematically analyzing access control policies attached to resources. This form of analysis, called automated reasoning, applies logic and mathematical inference to determine all possible access paths allowed by a resource policy. This is how IAM Access Analyzer uses provable security to deliver comprehensive findings for unintended bucket access. You can enable Access Analyzer by navigating to the IAM console. From there, select Access Analyzer to create an analyzer for an account or an organization.

How to use IAM Access Analyzer to identify an S3 bucket shared through an access point

Once you’ve created your analyzer, you can view findings for resources that can be accessed publicly or from other AWS accounts or organizations. For your S3 bucket findings, the Shared through column indicates whether a bucket is shared through its S3 bucket policy, one of its access points, or the bucket ACL. Looking at the Shared through column in the image below, we see the first finding is shared through an Access point.

Figure 1: IAM Access Analyzer report of findings for resources shared outside of my account

Figure 1: IAM Access Analyzer report of findings for resources shared outside of my account

If you use access points to manage bucket access and one of your buckets is shared through an access point, you will see the bucket finding indicate ‘Access Point’. In this example, I select the first finding to learn more. In the detail image below, you can see that the Shared through field lists the Amazon Resource Name (arn) of the access point that grants access to the bucket and the details of the resources and principals. If this access wasn’t your intent, you can review the access point details in the S3 console. There you can modify the access point policy to remove access.

Figure 2: IAM Access Analyzer finding details for a bucket shared through an access point

Figure 2: IAM Access Analyzer finding details for a bucket shared through an access point

How to use Access Analyzer for S3 to identify an S3 bucket shared through an access point

You can also view Access Analyzer findings for S3 buckets in the S3 Management Console with Access Analyzer for S3. This view reports S3 buckets that are configured to allow access to anyone on the internet or other AWS accounts. This includes accounts outside of your AWS organization. For each public or shared bucket, Access Analyzer for S3 displays whether the bucket is shared through the bucket policy, access points, or the bucket ACL. In the example below, we see the my-test-public-bucket is set to public access using a Bucket policy and bucket ACL. Additionally, the my-test-bucket is shared access to other AWS accounts using a Bucket policy and one or more access points. After you identify a bucket with unintended access using Access Analyzer for S3, you can Block Public Access to the bucket. Amazon S3 block public access settings override the bucket policies that are applied to the bucket. The settings also override the access point policies applied to the bucket’s access points.

Figure 3: Access Analyzer for S3 findings report in the S3 Management Console

Figure 3: Access Analyzer for S3 findings report in the S3 Management Console

Next steps

To turn on IAM Access Analyzer at no additional cost, head over to the IAM console. IAM Access Analyzer is available in the IAM console and through APIs in all commercial AWS Regions and AWS GovCloud (US). To learn more about IAM Access Analyzer, visit the feature page.

If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on the AWS IAM Forum or contact AWS Support.

Want more AWS Security how-to content, news, and feature announcements? Follow us on Twitter.

Author

Andrea Nedic

Andrea is a Senior Technical Program Manager in the AWS Automated Reasoning Group. She enjoys hearing from customers about how they build on AWS. Outside of work, Andrea likes to ski, dance, and be outdoors. She holds a PhD from Princeton University.

Decoupling larger applications with Amazon EventBridge

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/decoupling-larger-applications-with-amazon-eventbridge/

Many applications start to grow in complexity as they mature, making it harder for developers to maintain code or add new features. This can lead to monolithic applications, where developers must know more about the entire architecture to make changes. Typically, this causes code to become more fragile, and the rate of development slows down.

This blog post shows how you can use an event-based architecture to decouple services and functional areas of applications. It uses the document repository solution as an example, to compare architecture after shifting to an event-based approach. The new architecture offers both greater extensibility and simplicity for developers adding new functionality in the future. It can help alleviate the problems associated with monolithic applications.

The original version of this application uses Amazon S3 event notifications to invoke AWS Lambda functions to index content in the Amazon Elasticsearch Service:

Original document repository application architecture

There are some limitations with this design. First, there is a single source bucket for documents, which may not reflect production usage. Also, while it could be modified to allow new file types for indexing, adding new functionality such as translating documents would require refactoring. And despite having multiple Lambda functions, it’s packaged as a single application, which makes it harder to deploy changes.

The new design uses events to decouple each service used to process incoming S3 objects. It can also use one or more buckets as event sources, which you can change dynamically as needed. Most importantly, it can be easier to introduce changes and new functionality, since the application is no longer deployed as a mono-repo. The new architecture uses this design:

Decoupled architecture

  1. Setup and configuration of AWS resources.
  2. Parser function to filter and reformat S3 events for the application.
  3. Converter functions to operate on distinct file types.
  4. Analyzer functions for interpreting the content of the files.
  5. The Loader function imports the metadata into the Amazon Elasticsearch Service.

The code uses the AWS Serverless Application Model (SAM), enabling you to deploy the application easily in your own AWS account. This walkthrough creates resources covered in the AWS Free Tier but you may incur cost for significant data usage. Additionally, it requires an Amazon Elasticsearch Service domain, which may incur cost on your AWS bill.

The resulting solution is five separate applications, which you deploy in stages. To set up the application, visit the GitHub repo and follow the instructions in the README.md file.

Setup and configuration

The SAM template in the setup directory creates the S3 buckets, and configures AWS CloudTrail to capture put events in these buckets. This is required as EventBridge consumes S3 events via CloudTrail. Now, when any object is stored in any of these S3 buckets, EventBridge receives an event.

This template also creates a customer managed IAM policy that creates read-only access to the source S3 buckets:

  MyManagedPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      ManagedPolicyName: docrepo-s3-read-policy
      PolicyDocument: 
        Version: 2012-10-17
        Statement: 
          - Effect: Allow
            Action:
              - s3:GetObject
              - s3:ListBucket
              - s3:GetBucketLocation
              - s3:GetObjectVersion
              - s3:GetLifecycleConfiguration
            Resource:
              - !Sub 'arn:aws:s3:::${Dept1Bucket}/*'
              - !Sub 'arn:aws:s3:::${Dept2Bucket}/*'
              - !Sub 'arn:aws:s3:::${Dept3Bucket}/*'

This policy can be attached to any Lambda function that must read the contents from one of the S3 buckets. If the pool of source buckets changes in the future, you only need to modify this policy. Any downstream Lambda functions using the policy automatically gain access to the added buckets.

In the second setup application, the Parser service receives those S3 events and reformats the event for downstream services. Specifically, it creates a new attribute for the file type of the S3 object. After you deploy these two templates, creating any objects in the source S3 buckets generates the following event in the default event bus:

Parsing events from Amazon S3

Building the converter processes

This application uses converters to process incoming objects in the S3 buckets. One converter handles one file type. There are two converters required to replicate the original application’s functionality, for pdf and docx files. An EventBridge rule matches incoming events and triggers the appropriate Lambda function to convert the object. This diagram shows abridged input and output events for these functions:

  1. A matching EventBridge rule invokes the relevant converter function. The function converts the source file into raw text.
  2. The text is split into batches of 5,000 characters.
  3. The functions publish the text batches back to EventBridge, using new detail-type and source attributes.

The SAM template specifies the EventBridge rules, the permissions for EventBridge to invoke the Lambda functions, and the processing Lambda functions. The Lambda functions use the customer managed IAM policy created during the setup for read-only access to the originating S3 bucket. Each converter has its own logic for processing file types differently, and can produce different types of events if needed.

The analyzer functions

In this workflow, any file type containing text is analyzed by Amazon Comprehend to detect entities. The AnalyzeText function is invoked by an EventBridge rule. The rule is filtering for the NewTextBatch attribute in an event from docRepo.converters.

Another EventBridge rule triggers the AnalyzeImage function. This is filtering for jpg and jpeg file types where the event source is docRepo.s3. This function uses Amazon Rekognition to identify labels in the images.

Both functions produce new events containing the entities and labels, using new detail-type and source attributes. These events are published back to the default bus on EventBridge:

Analyzers processing events

  1. A matching EventBridge rule invokes the relevant analyzer function. The function uses Amazon ML services to detect labels in images and entities in text.
  2. The functions publish the metadata back to EventBridge, using new detail-type and source attributes.

Loader function

The Loader function is invoked by an EventBridge rule that is filtering for events from the Analyzers functions. This final function receives those events and loads the labels and entities metadata into the Amazon Elasticsearch Service:

Loader function processing events

Choosing between AWS Step Functions and Amazon EventBridge

In this application, there is a sequence of steps to the workflow that could also be handled by AWS Step Functions. Both services can simplify workflows in distributed applications and make it easier to maintain and modify serverless applications. In many cases, it makes sense to use both services for larger enterprise applications with complex business logic.

However, EventBridge enables you to separate processes into independent applications. It also allows other consumers to build custom logic using your events without impacting your application design or performance. In enterprise applications, this makes it much easier to innovate and develop new application features.

Benefits for developers

With the original monolithic application divided into five separate applications, it’s now easier for different teams to work on this project. It’s also easier and safer to deploy changes to a single microservice without needing to deploy the entire application. Developers must only understand their own service rather than the complete architecture of the application.

For example, to add more S3 buckets to the source list, you only need to modify the SAM template in the setup part of the application. The Parser function consumes put events from any number of buckets, and downstream functions consume events via EventBridge. To add a new file type, you only need to add a new converter function. Or to change the indexing provider, you create a new loader function to route the metadata to another service. The services of this application are independent, decoupled by EventBridge, and you can add more producers and consumers as required.

Traditionally, one of the challenges with event-based applications is tracking the format of events. Event schemas are typically hard to manage because any service can produce an event. The schema may also change as developers release new versions of a service. To help solve these issues, EventBridge has a feature called schema discovery that can automate the tracking and management of events in your application.

All the microservices in this application publish with a source attribute of docRepo. If you enable schema discovery, EventBridge quickly identifies these custom event schemas:

Schema discovery in Amazon EventBridge

The schemas are defined in JSON using the OpenAPI Specification. As you develop new features, you can download code bindings directly from these schemas. For type-safe languages, this allows you to use events as objects directly in your applications, helping to accelerate development. To learn more about how to use code bindings and schema discovery, watch this video:

Conclusion

Larger applications can quickly become monoliths. You can use event-based architectures to decouple services within applications, and maintain flexibility as your application grows. Amazon EventBridge is a serverless event bus that can help simplify you architecture, allowing each service to operate independently with no dependence on event consumers.

In this post, I show how to rearchitect the Serverless Document Repository example into five smaller applications orchestrated using events. I explore the benefits of developing applications using this approach, including the ability to make changes more easily. I also show how EventBridge schema discovery can help automate event schema management.

To learn more about how to use Amazon EventBridge to decouple large applications, visit the Amazon EventBridge learning path.

Serving Billions of Ads in Just 100 ms Using Amazon Elasticache for Redis

Post Syndicated from Rodrigo Asensio original https://aws.amazon.com/blogs/architecture/serving-billions-of-ads-with-amazon-elasticache-for-redis/

This post was co-written with Lucas Ceballos, CTO of Smadex

Introduction

Showing ads may seem to be a simple task, but it’s not. Showing the right ad to the right user is an incredibly complex challenge that involves multiple disciplines such as artificial intelligence, data science, and software engineering. Doing it one million times per second with a 100-ms constraint is even harder.

In the ad-tech business, speed and infrastructure costs are the keys to success. The less the final user waits for an ad, the higher the probability of that user clicking on the ad. Doing that while keeping infrastructure costs under control is crucial for business profitability.

About Smadex

Smadex is the leading mobile-first programmatic advertising platform specifically built to deliver best user acquisition performance and complete transparency.

Its state-of-the-art digital signal processing (DSP) technology provides advertisers with the tools they need to achieve their goals and ROI, with measurable results from web forms, post-app install events, store visits, and sales.

Smadex advertising architecture

What does showing ads look like under the hood? At Smadex, our technology works based on the OpenRTB (Real-Time Bidding) protocol.

RTB is a means by which advertising inventory is bought and sold on a per-impression basis, via programmatic instantaneous auction, which is similar to financial markets.

To show ads, we participate in auctions deciding in real time which ad to show and how much to bid trying to optimize the cost of every impression.

High level diagram

  1. The final user browses the publisher’s website or app.
  2. Ad-exchange is called to start a new auction.
  3. Smadex receives the bid request and has to decide which ad to show and how much to offer in just 100 ms (and this is happening one million times per second).
  4. If Smadex won the auction, the ad must be sent and rendered on the publisher’s website or app.
  5. In the end, the user interacts with the ad sending new requests and events to Smadex platform.

Flow of data

As you can see in the previous diagram, showing ads is just one part of the challenge. After the ad is shown, the final user interacts with it in multiple ways, such as clicking it, installing an application, subscribing to a service, etc. This happens during a determined period that we call the “attribution window.” All of those interactions must be tracked and linked to the original bid transaction (using the request_id parameter).

Doing this is complicated: billions of bid transactions must be stored and available so that they can be quickly accessed every time the user interacts with the ad. The longer we store the transactions, the longer we can “wait” for an interaction to take place, and the better for our business and our clients, too.

Detailed diagram

Challenge #1: Cost

The challenge is: What kind of database can store billions of records per day, with at least a 30-day retention capacity (attribution window), be accessed by key-value, and all by spending as little as possible?

The answer is…none! Based on our research, all the available options that met the technical requirements were way out of our budget.

So…how to solve it? Here is when creativity and the combination of different AWS services comes into place.

We started to analyze the time dispersion of the events trying to find some clues. The interesting thing we spotted was that 90% of what we call “post-bid events” (impression, click, install, etc.) happened within one hour after the auction took place.

That means that we can process 90% of post-bid events by storing just one hour of bids.

Under our current workload, in one hour we participate in approximately 3.7 billion auctions generating 100 million bid records of an average 600 bytes each. This adds up to 55 gigabytes per hour, an easier amount of data to process.

Instead of thinking about one single database to store all the bid requests, we decided to split bids into two different categories:

  • Hot Bid: A request that took place within the last hour (small amount and frequently accessed)
  • Cold Bid: A request that took place more than our hour ago (huge amount and infrequently accessed)

Amazon ElastiCache for Redis is the best option to store 55 GB of data in memory, which gives us the ability to query in a key-value way with the lowest possible latency.

Hot Bids flow

Hot Bids flow diagram

  1. Every new bid is a hot bid by definition so it’s going to be stored in the hot bids Redis cluster.
  2. At the moment of the user interaction with the ad, the Smadex tracker component receives an HTTPS notification, including the bid request UUID that originated it.
  3. Based on the date of occurrence extracted from the received UUID, the tracker component can determine if it’s looking for a hot bid or not. If it’s a hot bid, the tracker reads it directly from Redis performing a key-value lookup query.

It’s been easy so far but what to do with the other 29 days and 23 hours we need to store?

Challenge #2: Performance

As we previously mentioned, cold bids are a huge infrequently accessed number of records with only 10% of post-bid events pointing to them. That sounds like a good use case for an inexpensive and slower data store like Amazon S3.

Thanks to the S3 low-cost storage prices combined with the ability to query S3 objects directly using Amazon Athena, we were able to optimize our costs by storing and querying cold bids by implementing a serverless architecture.

Cold Bids Flow

Cold Bids flow diagram

  1. Incoming bids are buffered by Fluentd and flushed to S3 every one minute in JSON format. Every single file flushed to S3 contains all the bids processed by a specific EC2 instance for one minute.
  2. An AWS Lambda function is automatically triggered on every new PutObject event from S3. This function transforms the JSON records to Parquet format and will save it back the S3 bucket, but this time into a specific partition folder based on file creation timestamp.
  3. As seen on the hot bids flow, the tracker component will determine if it’s looking for a hot or a cold bid based on the extracted timestamp of the request UUID. In this case, the cold bid will be retrieved by running an Amazon Athena look-up query leveraging the use of partitions and Parquet format to reduce as much as possible the latency and data that needs to be scanned.

Conclusion

Thanks to this combined approach using different technologies and a variety of AWS services we were able to extend our attribution window from 30 to 90 days while reducing the infrastructure costs by 45%.

 

 

Automating scalable business workflows using minimal code

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/automating-scalable-business-workflows-using-minimal-code/

Organizations frequently have complex workflows embedded in their processes. When a customer places an order, it triggers a workflow. Or when an employee requests vacation time, this starts another set of processes. Managing these at scale can be challenging in traditional applications, which must often manage thousands of separate tasks.

In this blog post, I show how to use a serverless application to build and manage enterprise workflows at scale. This minimal-code solution is highly scalable and flexible, and can be modified easily to meet your needs. This application uses Amazon S3, AWS Lambda, and AWS Step Functions:

Using S3-to-Lambda to trigger Step Functions workflows

AWS Step Functions allows you to represent workflows as a JSON state machine. This service can help remove custom code and convoluted logic from distributed systems, and make it easier to maintain and modify. S3 is a highly scalable service that stores trillions of objects, and Lambda runs custom code in response to events. By combining these services, it’s simple to build resilient workflows with high throughput, triggered by putting objects in S3 buckets.

There are many business use-cases for this approach. For example, you could automatically pay invoices from approved vendors under a threshold amount by reading the invoices stored in S3 using Amazon Textract. Or your application could automatically book consultations for patients emailing their completed authorization forms. Almost any action that is triggered by a document or form is a potential candidate for an automated workflow solution.

To set up the example application, visit the GitHub repo and follow the instructions in the README.md file. The code uses the AWS Serverless Application Model (SAM), enabling you to deploy the application in your own AWS account. This walkthrough creates resources covered in the AWS Free Tier but you may incur cost if you test with large amounts of data.

How the application works

The starting point for this serverless solution is S3. When new objects are stored, this triggers a Lambda function that starts an execution in the Step Functions workflow. Lambda scales to keep pace as more objects are written to the S3 bucket, and Step Functions creates a separate execution for each S3 object. It also manages the state of all the distinct workflows.

Simple Step Functions workflow.

  1. A downstream process stores data in the S3 bucket.
  2. This invokes the Start Execution Lambda function. The function creates a new execution in Step Functions using the S3 object as event data.
  3. The workflow invokes the Decider function. This uses Amazon Rekognition to detect the contents of objects stored in S3.
  4. This function uses environment variables to determine the matching attributes. If the S3 object matches the criteria, it triggers the Match function. Otherwise, the No Match function is invoked.

The application’s SAM template configures the Step Functions state machine as JSON. It also defines an IAM role allowing Step Functions to invoke the Lambda functions. The initial function invoked by S3 is defined to accept the state machine ARN as an environment variable. The template also defines the permissions needed and the S3 trigger:

  StartExecutionFunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: StartExecutionFunction/
      Handler: app.handler
      Runtime: nodejs12.x
      MemorySize: 128
      Environment:
        Variables:
          stateMachineArn: !Ref 'MatcherStateMachine'
      Policies:
        - S3CrudPolicy:
            BucketName: !Ref InputBucketName
        - Statement:
          - Effect: Allow
            Resource: !Ref 'MatcherStateMachine'
            Action:
              - states:*
      Events:
        FileUpload:
          Type: S3
          Properties:
            Bucket: !Ref InputBucket
            Events: s3:ObjectCreated:*

This uses SAM policy templates to provide read access to the S3 bucket. It also defines the event that causes the function invocation from S3, filtering only for new objects with a .json suffix.

The Decider function is the first step of the Step Functions workflow. It uses Amazon Rekognition to detect labels and words from the images provide. The SAM template passes the required labels and words to the function, together with an optional confidence score:

  DeciderFunction:
    Type: AWS::Serverless::Function 
    Properties:
      CodeUri: deciderFunction/
      Handler: app.handler
      Environment:
        Variables:
          requiredWords: "NEW YORK"
          requiredLabels: "Driving License,Person"
          minConfidence: 70

If the requiredLabels environment variable is present, the function’s code calls Amazon Rekognition’s detectLabel method. It then calls the detectText method if the requiredWords environment variable is used:

// The standard Lambda handler
exports.handler = async (event) => {
  return await processDocument(event)
}

// Detect words/labels on document or image
const processDocument = async (event) => {

  // If using a required labels
  if (process.env.requiredLabels) {
    // If no match, return immediately
    if (!await checkRequiredLabels(event)) return 'NoMatch'
  }  

  // If using a required words test
  if (process.env.requiredWords) {
    // If no match, return immediately
    if (!await checkRequiredWords(event)) return 'NoMatch'
  }

  return 'Match'
}

The Decider function returns “Match” or “No match” to the Step Functions workflow. This invokes downstream functions depending on the result. The Match and No Match functions are stubs where you can build the intended functionality in the workflow. This Step Functions workflow is designed generically so you can extend the functionality easily.

Testing the application

Deploy the first application by following the README.md in the GitHub repo, and note the application’s S3 bucket name. There are three test cases:

  • Create a workflow for a matched subject in an image. From photos uploaded to S3, identify which images contain one or more subjects, and invoke the Match path of the workflow.
  • Create a workflow for invoices from a specific vendor. From multiple invoices uploaded, matching those from a vendor, and trigger the Match path of the workflow.
  • Create a workflow for driver licenses issued by a single state. From a collection of drivers licenses, trigger the Match workflow for only a single state.

1. Create a workflow for matched subject in an image

In this example, the application identifies cats in images uploaded to the S3 bucket. The default configuration in the SAM template in the GitHub repo contains the environment variables set for this example:

Environment variables in SAM template.

First, I upload over 20 images of various animals to the S3 bucket:

Uploading files to the S3 bucket.

After navigating to the Step Function console, and selecting the application’s state machine, it shows 24 separate executions, one per image:

Step Functions execution detail.

I select one of these executions, for cat3.jpg. This has followed the MatchFound execution path of the workflow:

MatchFound execution path.

2. Create a workflow for invoices for a specified vendor.

For this example, the application looks for a customer account number and vendor name in invoices uploaded to the S3 bucket. The Decider function uses environment variables to determine the matching keywords. These can be updated by either deploying the SAM template or editing the Lambda function directly.

I modify the SAM template to match the vendor name and account number as follows:

SAM template with vendor information.

Next I upload several different invoices from the local machine to the S3 bucket:

Uploading different files to the S3 bucket.

In the Step Functions console, I select the execution for utility-bill.png. This execution matches the criteria and follows the MatchFound path in the workflow.

MatchFound path in visual workflow.

3. Matching a driver’s license by state

In this example, the application routes based upon the state where a driver’s license is issued. For this test, I use a range of sample images of licenses from DMVs in multiple states.

I modify the SAM template so the Decider function uses both label and word detection. I set “Driving License” and “Person” as required labels. This ensures that Amazon Rekognition identifies a person is in the photo in addition to the document type.

Environment variables in the SAM template.

Next, I upload the driver’s license images to the S3 bucket:

Uploading files to the S3 bucket.

In the Step Functions console, I open the execution for the driver-license-ny.png file, and it has followed the MatchFound path in the workflow:

Execution path for driver's license test.

When I select the execution for the Texas driver’s license, this did not match and has followed the NoMatchFound execution path:

Execution path for NoMatchFound.

Extending the functionality

By triggering Step Functions workflows from S3 PutObject events, this application is highly scalable. As more objects are stored in the S3 bucket, it creates as many executions as needed in the state machine. The custom code only handles the specific logic requirements for a single object and the Lambda service scales up to meet demand.

In these examples, the application uses Amazon Rekognition to analyze specific document types or image contents. You could extend this logic to include value ranges, multiple alternative workflow paths, or include steps to enable human intervention.

Using Step Functions also makes it easy to modify workflows as requirements change. Any incomplete workflows continue on the existing version of the state machine used when they started. As a result, you can add steps without impacting existing code, making it faster to adapt applications to users’ needs.

Conclusion

You can use Step Functions to model many common business workflows with JSON. Combining this powerful workflow management service with the scalability of S3 and Lambda, you can quickly build nuanced solutions that operate at scale.

In this post, I show how you can deploy a simple Step Functions workflow where executions are created by objects stored in an S3 bucket. Using minimal code, it can perform complex workflow routing tasks based on document types and contents. This provides a highly flexible and scalable way to manage common organizational workflow needs.

Translating documents at enterprise scale with serverless

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/translating-documents-at-enterprise-scale-with-serverless/

For organizations operating in multiple countries, helping customers in different languages is an everyday reality. But in many IT systems, data remains static in a single language, making it difficult or impossible for international customers to use. In this blog post, I show how you can automate language translation at scale to solve a number of common enterprise problems.

Many types of data are good targets for translation. For example, product catalog information, for sharing with a geographically broad customer base. Or customer emails and interactions in multiple languages, translating back to a single language for analytics. Or even resource files for mobile applications, to automate string processing into different languages during the build process.

Building the machine learning models for language translation is extraordinarily complex, so fortunately you have a pay-as-you-go service available in Amazon Translate. This can accurately translate text between 54 languages, and automatically detects the source language.

Developing a scalable translation solution for thousands of documents can be challenging using traditional, server-based architecture. Using a serverless approach, this becomes much easier since you can use storage and compute services that scale for you – Amazon S3 and AWS Lambda:

Integrating S3 with Translate via Lambda.

In this post, I show two solutions that provide an event-based architecture for automated translation. In the first, S3 invokes a Lambda function when objects are stored. It immediately reads the file and requests a translation from Amazon Translate. The second solution explores a more advanced method for scaling to large numbers of documents, queuing the requests and tracking their state. The walkthrough creates resources covered in the AWS Free Tier but you may incur costs for usage.

To set up both example applications, visit the GitHub repo and follow the instructions in the README.md file. Both applications use the AWS Serverless Application Model (SAM) to make it easy to deploy in your AWS account.

Translating in near real time

In the first application, the workflow is straightforward. The source text is sent immediately to Translate for processing, and the result is saved back into the S3 bucket. It provides near real-time translation whenever an object is saved. This uses the following architecture:

Architecture for the first example application.

  1. The source text is saved in the Batching S3 bucket.
  2. The S3 put event invokes the Batching Lambda function. Since Translate has a limit of 5,000 characters per request, it slices the contents of the input into parts small enough for processing.
  3. The resulting parts are saved in the Translation S3 bucket.
  4. The S3 put events invoke the Translation function, which scales up concurrently depending on the number of parts.
  5. Amazon Translate returns the translations back to the Lambda function, which saves the results in the Translation bucket.

The repo’s SAM template allows you to specify a list of target languages, as a space-delimited list of supported language codes. In this case, any text uploaded to S3 is translated into French, Spanish, and Italian:

Parameters:
  TargetLanguage:
    Type: String
    Default: 'fr es it'

Testing the application

  1. Deploy the first application by following the README.md in the GitHub repo. Note the application’s S3 Translation and Batching bucket names shown in the output:Output values after SAM deployment.
  2. The testdata directory contains several sample text files. Change into this directory, then upload coffee.txt to the S3 bucket, replacing your-bucket below with your Translation bucket name:
    cd ./testdata/
    aws s3 cp ./coffee.txt s3://your-bucket
  3. The application invokes the translation workflow, and within a couple of seconds you can list the output files in the translations folder:aws s3 ls s3://your-bucket/translations/

    Translations output.

  4. Create an output directory, then download the translations to your local machine to view the contents:
    mkdir output
    aws s3 cp s3://your-bucket/translations/ ./output/ --recursive
    more ./output/coffee-fr.txt
    more ./output/coffee-es.txt
    more ./output/coffee-it.txt
  5. For the next step, translate several text files containing test data. Copy these to the Translation bucket, replacing your-bucket below with your bucket name:aws s3 cp ./ s3://your-bucket --include "*.txt" --exclude "*/*" --recursive
  6. After a few seconds, list the files in the translations folder to see your translated files:aws s3 ls s3://your-bucket/translations/

    Listing the translated files.

  7. Finally, translate a larger file using the batching process. Copy this file to the Batching S3 bucket (replacing your-bucket with this bucket name):
    cd ../testdata-batching/
    aws s3 cp ./your-filename.txt s3://your-bucket
  8. Since this is a larger file, the batching Lambda function breaks it apart into smaller text files in the Translation bucket. List these files in the terminal, together with their translations:
    aws s3 ls s3://your-bucket
    aws s3 ls s3://your-bucket/translations/

    Listing the output files.

In this example, you can translate a reasonable number of text files for a trivial use-case. However, in an enterprise environment where there could be thousands of files in a single bucket, you need a more robust architecture. The second application introduces a more resilient approach.

Scaling up the translation solution

In an enterprise environment, the application must handle long documents and large quantities of documents. Amazon Translate has service limits in place per account – you can request an increase via an AWS Support Center ticket if needed. However, S3 can ingest a large number of objects quickly, so the application should decouple these two services.

The next example uses Amazon SQS for decoupling, and also introduces Amazon DynamoDB for tracking the status of each translation. The new architecture looks like this:

Decoupled translation architecture.

  1. A downstream process saves text objects in the Batching S3 bucket.
  2. The Batching function breaks these files into smaller parts, saving these in the Translation S3 bucket.
  3. When an object is saved in this bucket, this invokes the Add to Queue function. This writes a message to an SQS queue, and logs the item in a DynamoDB table.
  4. The Translation function receives messages from the SQS queue, and requests translations from the Amazon Translate service.
  5. The function updates the item as completed in the DynamoDB table, and stores the output translation in the Results S3 bucket.

Testing the application

This test uses a much larger text document – the text version of the novel War and Peace, which is over 3 million characters long. It’s recommended that you use a shorter piece of text for the walkthrough, between 20-50 kilobytes, to minimize cost on your AWS bill.

  1. Deploy the second application by following the README.md in the GitHub repo, and note the application’s S3 bucket name and DynamoDB table name.
    The output values from the SAM deployment.
  2. Download your text sample and then upload it the Batching bucket. Replace your-bucket with your bucket name and your-text.txt with your text file name:aws s3 cp ./your-text.txt s3://your-bucket/ 
  3. The batching process creates smaller files in the Translation bucket. After a few seconds, list the files in the Translation bucket (replacing your-bucket with your bucket name):aws s3 ls s3://patterns-translations-v2/ --recursive --summarize

    Listing the translated files.

  4. To see the status of the translations, navigate to the DynamoDB console. Select Tables in the left-side menu and then choose the application’s DynamoDB table. Select the Items tab:Listing items in the DynamoDB table.

    This shows each translation file and a status of Queue or Translated.

  5. As translations complete, these appear in the Results bucket:aws s3 ls s3://patterns-results-v2/ --summarize

    Listing the output translations.

How this works

In the second application, the SQS queue acts as a buffer between the Batching process and the Translation process. The Translation Lambda function fetches messages from the SQS queue when they are available, and submits the source file to Amazon Translate. This throttles the overall speed of processing.

There are configuration settings you can change in the SAM template to vary the speed of throughput:

  • Translator function: this consumes messages from the SQS queue. The BatchSize configured in the SAM template is set to one message per invocation. This is equivalent to processing one source file at a time. You can set a BatchSize value from 1 to 10, so could increase this from the application’s default.
  • Function concurrency: the SAM template sets the Loader function’s concurrency to 1, using the ReservedConcurrentExecutions attribute. In effect, this means Lambda can only invoke 1 function at the same time. As a result, it keeps fetching the next batch from SQS as soon as processing finishes. The concurrency is a multiplier – as this value is increased, the translation throughput increases proportionately, if there are messages available in SQS.
  • Amazon Translate limits: the service limits in place are designed to protect you from higher-than-intended usage. If you need higher soft limits, open an AWS Support Center ticket.

Combining these settings, you have considerable control over the speed of processing. The defaults in the sample application are set at the lowest values possible so you can observe the queueing mechanism.

Conclusion

Automated translation using deep learning enables you to make documents available at scale to an international audience. For organizations operating globally, this can improve your user experience and increase customer access to your company’s products and services.

In this post, I show how you can create a serverless application to process large numbers of files stored in S3. The write operation in the S3 bucket triggers the process, and you use SQS to buffer the workload between S3 and the Amazon Translate service. This solution also uses DynamoDB to help track the state of the translated files.

Creating a searchable enterprise document repository

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/creating-a-searchable-enterprise-document-repository/

Enterprise customers frequently have repositories with thousands of documents, images and other media. While these contain valuable information for users, it’s often hard to index and search this content. One challenge is interpreting the data intelligently, and another issue is processing these files at scale.

In this blog post, I show how you can deploy a serverless application that uses machine learning to interpret your documents. The architecture includes a queueing mechanism for handling large volumes, and posting the indexing metadata to an Amazon Elasticsearch domain. This solution is scalable and cost effective, and you can modify the functionality to meet your organization’s requirements.

The overall architecture for a searchable document repository solution.

The application takes unstructured data and applies machine learning to extract context for a search service, in this case Elasticsearch. There are many business uses for this design. For example, this could help Human Resources departments in searching for skills in PDF resumes. It can also be used by Marketing departments or creative groups with a large number of images, providing a simple way to search for items within the images.

As documents and images are added to the S3 bucket, these events invoke AWS Lambda functions. This runs custom code to extract data from the files, and also calls Amazon ML services for interpretation. For example, when adding a resume, the Lambda function extracts text from the PDF file while Amazon Comprehend determines the key phrases and topics in the document. For images, it uses Amazon Rekognition to determine the contents. In both cases, once it identifies the indexing attributes, it saves the data to Elasticsearch.

The code uses the AWS Serverless Application Model (SAM), enabling you to deploy the application easily in your own AWS Account. This walkthrough creates resources covered in the AWS Free Tier but you may incur cost for large data imports. Additionally, it requires an Elasticsearch domain, which may incur cost on your AWS bill.

To set up the example application, visit the GitHub repo and follow the instructions in the README.md file.

Creating an Elasticsearch domain

This application requires an Elasticsearch development domain for testing purposes. To learn more about production configurations, see the Elasticsearch documentation.

To create the test domain:

  1. Navigate to the Amazon Elasticsearch console. Choose Create a new domain.
  2. For Deployment type, choose Development and testing. Choose Next.
  3. In the Configure Domain page:
    1. For Elasticsearch domain name, enter serverless-docrepo.
    2. Change Instance Type to t2.small.elasticsearch.
    3. Leave all the other defaults. Choose Next at the end of the page.
  4. In Network Configuration, choose Public access. This is adequate for a tutorial but in a production use-case, it’s recommended to use VPC access.
  5. Under Access Policy, in the Domain access policy dropdown, choose Custom access policy.
  6. Select IAM ARN and in the Enter principal field, enter your AWS account ID (learn how to find your AWS account ID). In the Select Action dropdown, select Allow.
  7. Under Encryption, leave HTTPS checked. Choose Next.
  8. On the Review page, review your domain configuration, and then choose Confirm.

Your domain is now being configured, and the Domain status shows Loading in the Overview tab.

After creating the Elasticsearch domain, the domain status shows ‘Loading’.

It takes 10-15 minutes to fully configure the domain. Wait until the Domain status shows Active before continuing. When the domain is ready, note the Endpoint address since you need this in the application deployment.

The Endpoint for an Elasticsearch domain.

Deploying the application

After cloning the repo to your local development machine, open a terminal window and change to the cloned directory.

  1. Run the SAM build process to create an installation package, and then deploy the application:
    sam build
    sam deploy --guided
  2. In the guided deployment process, enter unique names for the S3 buckets when prompted. For the ESdomain parameter, enter the Elasticsearch domain Endpoint from the previous section.
  3. After the deployment completes, note the Lambda function ARN shown in the output:
    Lambda function ARN output.
  4. Back in the Elasticsearch console, select the Actions dropdown and choose Modify Access Policy. Paste the Lambda function ARN as an AWS Principal in the JSON, in addition to the root user, as follows:
    Modify access policy for Elasticsearch.
  5. Choose Submit. This grants the Lambda function access to the Elasticsearch domain.

Testing the application

To test the application, you need a few test documents and images with the file types DOCX (Microsoft Word), PDF, and JPG. This walkthrough uses multiple files to illustrate the queuing process.

  1. Navigate to the S3 console and select the Documents bucket from your deployment.
  2. Choose Upload and select your sample PDF or DOCX files:
    Upload files to S3.
  3. Choose Next on the following three pages to complete the upload process. The application now analyzes these documents and adds the indexing information to Elasticsearch.

To query Elasticsearch, first you must generate an Access Key ID and Secret Access Key. For detailed steps, see this documentation on creating security credentials. Next, use Postman to create an HTTP request for the Elasticsearch domain:

  1. Download and install Postman.
  2. From Postman, enter the Elasticsearch endpoint, adding /_search?q=keyword. Replace keyword with a search term.
  3. On the Authorization tab, complete the Access Key ID and Secret Access Key fields with the credentials you created. For Service Name, enter es.Postman query with AWS authorization.
  4. Choose Send. Elasticsearch responds with document results matching your search term.REST response from Elasticsearch.

How this works

This application creates a processing pipeline between the originating Documents bucket and the Elasticsearch domain. Each document type has a custom parser, preparing the content in a Queuing bucket. It uses an Amazon SQS queue to buffer work, which is fetched by a Lambda function and analyzed with Amazon Comprehend. Finally, the indexing metadata is saved in Elasticsearch.

Serverless architecture for text extraction and classification.

  1. Documents and images are saved in the Documents bucket.
  2. Depending upon the file type, this triggers a custom parser Lambda function.
    1. For PDFs and DOCX files, the extracted text is stored in a Staging bucket. If the content is longer than 5,000 characters, it is broken into smaller files by a Lambda function, then saved in the Queued bucket.
    2. For JPG files, the parser uses Amazon Rekognition to detect the contents of the image. The labeling metadata is stored in the Queued bucket.
  3. When items are stored in the Queued bucket, this triggers a Lambda function to add the job to an SQS queue.
  4. The Analyze function is invoked when there are messages in the SQS queue. It uses Amazon Comprehend to find entities in the text. This function then stores the metadata in Elasticsearch.

S3 and Lambda both scale to handle the traffic. The Elasticsearch domain is not serverless, however, so it’s possible to overwhelm this instance with requests. There may be a large number of objects stored in the Documents bucket triggering the workflow, so the application uses SQS couple to smooth out the traffic. When large numbers of objects are processed, you see the Messages Available increase in the SQS queue:

SQS queue buffers messages to smooth out traffic.
For the Lambda function consuming messages from the SQS queue, the BatchSize configured in the SAM template controls the rate of processing. The function continues to fetch messages from the SQS queue while Messages Available is greater than zero. This can be a useful mechanism for protecting downstream services that are not serverless, or simply cannot scale to match the processing rates of the upstream application. In this case, it provides a more consistent flow of indexing data to the Elasticsearch domain.

In a production use-case, you would scale the Elasticsearch domain depending upon the load of search requests, not just the indexing traffic from this application. This tutorial uses a minimal Elasticsearch configuration, but this service is capable of supporting enterprise-scale use-cases.

Conclusion

Enterprise document repositories can be a rich source of information but can be difficult to search and index. In this blog post, I show how you can use a serverless approach to build scalable solution easily. With minimum code, we can use Amazon ML services to create the indexing metadata. By using powerful image recognition and language comprehension capabilities, this makes the metadata more useful and the search solution more accurate.

This also shows how serverless solutions can be used with existing non-serverless infrastructure, like Elasticsearch. By decoupling scalable serverless applications, you can protect downstream services from heavy traffic loads, even as Lambda scales up. Elasticsearch provides a fast, easy way to query your document repository once the serverless application has completed the indexing process.

To learn more about how to use Elasticsearch for production workloads, see the documentation on managing domains.

Converting call center recordings into useful data for analytics

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/converting-call-center-recordings-into-useful-data-for-analytics/

Many businesses operate call centers that record conversations with customers for training or regulatory purposes. These vast collections of audio offer unique opportunities for improving customer service. However, since audio data is mostly unsearchable, it’s usually archived in these systems and never analyzed for insights.

Developing machine learning models for accurately understanding and transcribing speech is also a major challenge. These models require large datasets for optimal performance, along with teams of experts to build and maintain the software. This puts it out of reach for the majority of businesses and organizations. Fortunately, you can use AWS services to handle this difficult problem.

In this blog post, I show how you can use a serverless approach to analyze audio data from your call center. You can clone this application from the GitHub repo and modify to meet your needs. The solution uses Amazon ML services, together with scalable storage, and serverless compute. The example application has the following architecture:

The architecture for the call center audio analyzer.

For call center analysis, this application is useful to determine the types of general topics that customers are calling about. It can also detect the sentiment of the conversation, so if the call is a compliment or a complaint, you could take additional action. When combined with other metadata such as caller location or time of day, this can yield important insights to help you improve customer experience. For example, you might discover there are common service issues in a geography at a certain time of day.

To set up the example application, visit the GitHub repo and follow the instructions in the README.md file.

How the application works

A key part of the serverless solution is Amazon S3, an object store that scales to meet your storage needs. When new objects are stored, this triggers AWS Lambda functions, which scale to keep pace with S3 usage. The application coordinates activities between the S3 bucket and two managed Machine Learning (ML) services, storing the results in an Amazon DynamoDB table.

The ML services used are:

  • Amazon Transcribe, which transcribes audio data into JSON output, using a process called automatic speech recognition. This can understand 31 languages and dialects, and identify different speakers in a customer support call.
  • Amazon Comprehend, which offers sentiment analysis as one of its core features. This service returns an array of scores to estimate the probability that the input text is positive, negative, neutral, or mixed.

Sample application architecture.

  1. A downstream process, such as a call recording system, stores audio data in the application’s S3 bucket.
  2. When the MP3 objects are stored, this triggers the Transcribe function. The function creates a new job in the Amazon Transcribe service.
  3. When the transcription process finishes, Transcribe stores the JSON result in the same S3 bucket.
  4. This JSON object triggers the Sentiment function. The Sentiment function requests a sentiment analysis from the Comprehend service.
  5. After receiving the sentiment scores, this function stores the results in a DynamoDB table.

There is only one bucket used in the application. The two Lambda functions are triggered by the same bucket, using different object suffixes. This is configured in the SAM template, shown here:

  SentimentFunction:
    ...
      Events:
        FileUpload:
          Type: S3
          Properties:
            Bucket: !Ref InputS3Bucket
            Events: s3:ObjectCreated:*
            Filter: 
              S3Key:
                Rules:
                  - Name: suffix
                    Value: '.json'              

  TranscribeFunction:
    ... 
      Events:
        FileUpload:
          Type: S3
          Properties:
            Bucket: !Ref InputS3Bucket
            Events: s3:ObjectCreated:*
            Filter: 
              S3Key:
                Rules:
                  - Name: suffix
                    Value: '.mp3'    

Testing the application

To test the application, you need an MP3 audio file containing spoken text. For example, in my testing, I use audio files of a person reading business reviews representing positive, neutral, and negative experiences.

  1. After cloning the GitHub repo, follow the instructions in the README.md file to deploy the application. Note the name of the S3 bucket output in the deployment.SAM deployment CLI output
  2. Upload your test MP3 files using this command in a terminal, replacing your-bucket-name with the deployed bucket name:aws s3 cp .\ s3://your-bucket-name --recursiveOnce executed, your terminal shows the uploaded media files:

    Uploading sample media files.

  3.  Navigate to the Amazon Transcribe console, and choose Transcription jobs in the left-side menu. The MP3 files you uploaded appear here as separate jobs:Amazon Transcribe jobs in progress
  4. Once the Status column shows all pending job as Complete, navigate to the DynamoDB console.
  5. Choose Tables from the left-side menu and select the table created by the deployment. Choose the Items tab:Sentiment scores in the DynamoDB table
    Each MP3 file appears as a separate item with a sentiment rating and a probability for each sentiment category. It also includes the transcript of the audio.

Handling multiple languages

One of the most useful aspects of serverless architecture is the ability to add functionality easily. For call centers handling multiple languages, ideally you should translate to a common language for sentiment scoring. With this application, it’s easy to add an extra step to the process to translate the transcription language to a base language:

Advanced application architecture

A new Translate Lambda function is invoked by the S3 JSON suffix filter and creates text output in a common base language. The sentiment scoring function is triggered by new objects with the suffix TXT.

In this modified case, when the MP3 audio file is uploaded to S3, you can append the language identifier as metadata to the object. For example, to upload an MP3 with a French language identifier using the AWS CLI:

aws s3 cp .\test-audio-fr.mp3 s3://your-bucket --metadata Content-Language=fr-FR

The first Lambda function passes the language identifier to the Transcribe service. In the Transcribe console, the language appears in the new job:

French transcription job complete

After the job finishes, the JSON output is stored in the same S3 bucket. It shows the transcription from the French language audio:

French transcription output

The new Translate Lambda function passes the transcript value into the Amazon Translate service. This converts the French to English and saves the translation as a text file. The sentiment Lambda function now uses the contents of this text file to generate the sentiment scores.

This approach allows you to accept audio in a wide range of spoken languages but standardize your analytics in one base language.

Developing for extensibility

You might want to take action on phone calls that have a negative sentiment score, or publish scores to other applications in your organization. This architecture makes it simple to extend functionality once DynamoDB saves the sentiment scores. By using DynamoDB Streams, you can invoke a Lambda function each time a record is created or updated in the underlying DynamoDB table:

Adding notifications to the application

In this case, the routing function could trigger an email via Amazon SES where the sentiment score is negative. For example, this could email a manager to follow up with the customer. Alternatively, you may choose to publish all scores and results to any downstream application with Amazon EventBridge. By publishing events to the default event bus, you can allow consuming applications to build new functionality without needing any direct integration.

Deferred execution in Amazon Transcribe

The services used in the example application are all highly scalable and highly available, and can handle significant amounts of data. Amazon Transcribe allows up to 100 concurrent transcription jobs – see the service limits and quotas for more information.

The service also provides a mechanism for deferred execution, which allows you to hold jobs in a queue. When the numbering of executing jobs falls below the concurrent execution limit, the service takes the next job from this queue. This effectively means you can submit any number of jobs to the Transcribe service, and it manages the queue and processing automatically.

To use this feature, there are two additional attributes used in the startTranscriptionJob method of the AWS.TranscribeService object. When added to the Lambda handler in the Transcribe function, the code looks like this:

Deferred execution for Amazon Transcribe

After setting AllowDeferredExecution to true, you must also provide an IAM role ARN in the DataAccessRoleArn attribute. For more information on how to use this feature, see the Transcribe documentation for job execution settings.

Conclusion

In this blog post, I show how to transcribe the content of audio files and calculate a sentiment score. This can be useful for organizations wanting to analyze saved audio for customer calls, webinars, or team meetings.

This solution uses Amazon ML services to handle the audio and text analysis, and serverless services like S3 and Lambda to manage the storage and business logic. The serverless application here can scale to handle large amounts of production data. You can also easily extend the application to provide new functionality, built specifically for your organization’s use-case.

To learn more about building serverless applications at scale, visit the AWS Serverless website.

How Siemens built a fully managed scheduling mechanism for updates on Amazon S3 data lakes

Post Syndicated from Pedro Bento original https://aws.amazon.com/blogs/big-data/how-siemens-built-a-fully-managed-scheduling-mechanism-for-consistent-updates-on-amazon-s3-data-lakes/

Siemens is a global technology leader with more than 370,000 employees and 170 years of experience. To protect Siemens from cybercrime, the Siemens Cyber Defense Center (CDC) continuously monitors Siemens’ networks and assets. To handle the resulting enormous data load, the CDC built a next-generation threat detection and analysis platform called ARGOS. ARGOS is a hybrid-cloud solution that makes heavy use of fully managed AWS services for streaming, big data processing, and machine learning.

Users such as security analysts, data scientists, threat intelligence teams, and incident handlers continuously access data in the ARGOS platform. Further, various automated components update, extend, and remove data to enrich information, improve data quality, enforce PII requirements, or mutate data due to schema evolution or additional data normalization requirements. Keeping the data always available and consistent presents multiple challenges.

While object-based data lakes are highly beneficial from a cost perspective compared to traditional transactional databases in such scenarios, they hardly allow for atomic updates or require highly complex and costly extensions. To overcome this problem, Siemens designed a solution that enables atomic file updates on Amazon S3-based data lakes without compromising query performance and availability.

This post presents this solution, which is an easy-to-use scheduling service for S3 data update tasks. Siemens uses it for multiple purposes, including pseudonymization, anonymization, and removal of sensitive data. This post demonstrates how to use the solution to remove values from a dataset after a predefined amount of time. Adding further data processing tasks is straightforward because the solution has a well-defined architecture and the whole stack consists of fewer than 200 lines of source code. It is solely based on fully managed AWS services and therefore achieves minimal operational overhead.

Architecture overview

This post uses an S3-based data lake with continuous data ingestion and Amazon Athena as query mechanism. The goal is to remove certain values after a predefined time automatically after ingestion. Applications and users consuming the data via Athena are not impacted (for example, they do not observe downtimes or data quality issues like duplication).

The following diagram illustrates the architecture of this solution.

Siemens built the solution with the following services and components:

  1. Scheduling trigger – New data (for example, in JSON format) is continuously uploaded to a S3 bucket.
  2. Task scheduling – As soon as new files land, an AWS Lambda function processes the resulting S3 bucket notification events. As part of the processing, it creates a new item on Amazon DynamoDB that specifies a Time to Live (TTL) and the path to that S3 object.
  3. Task execution trigger – When the TTL expires, the DynamoDB item is deleted from the table and the DynamoDB stream triggers a Lambda function that processes the S3 object at that path.
  4. Task execution – The Lambda function derives meta information (like the relevant S3 path) from the TTL expiration event and processes the S3 object. Finally, the new S3 object replaces the older version.
  5. Data usage – The updated data is available for querying from Athena without further manual processing, and uses S3’s eventual consistency on read operations.

About DynamoDB Streams and TTL

TTL for DynamoDB lets you define when items in a table expire so they can be deleted from the database automatically. TTL comes at no extra cost as a way to reduce storage use and reduce the cost of storing irrelevant data without using provisioned throughput. You can set a timestamp for deletion on a per-item basis, which allows you to limit storage usage to only those records that are relevant, by enabling TTL on a table.

Solution overview

To implement this solution manually, complete the following steps:

  1. Create a DynamoDB table and configure DynamoDB Streams.
  2. Create a Lambda function to insert TTL records.
  3. Configure an S3 event notification on the target bucket.
  4. Create a Lambda function that performs data processing tasks.
  5. Use Athena to query the processed data.

If you want to deploy the solution automatically, you may skip these steps, and use the AWS Cloudformation template provided.

Prerequisites

To complete this walkthrough, you must have the following:

  • An AWS account with access to the AWS Management Console.
  • A role with access to S3, DynamoDB, Lambda, and Athena.

Creating a DynamoDB table and configuring DynamoDB Streams

Start first with the time-based trigger setup. For this, you use S3 notifications, DynamoDB Streams, and a Lambda function to integrate both services. The DynamoDB table stores the items to process after a predefined time.

Complete the following steps:

  1. On the DynamoDB console, create a table.
  2. For Table name, enter objects-to-process.
  3. For Primary key, enter path and choose String.
  4. Select the table and click on Manage TTL next to “Time to live attribute” under table details.
  5. For TTL attribute, enter ttl.
  6. For DynamoDB Streams, choose Enable with view type New and old images.

Note that you can enable DynamoDB TTL on non-numeric attributes, but it only works on numeric attributes.

The DynamoDB TTL is not minute-precise. Expired items are typically deleted within 48 hours of expiration. However, you may experience shorter deviations of only 10–30 minutes from the actual TTL value. For more information, see Time to Live: How It Works.

Creating a Lambda function to insert TTL records

The first Lambda function you create is for scheduling tasks. It receives a S3 notification as input, recreates the S3 path (for example, s3://<bucket>/<key>), and creates a new item on DynamoDB with two attributes: the S3 path and the TTL (in seconds). For more information about a similar S3 notification event structure, see Test the Lambda Function.

To deploy the Lambda function, on the Lambda console, create a function named NotificationFunction with the Python 3.7 runtime and the following code:

import boto3, os, time

# Put here a new parameter for TTL, default 300, 5 minutes
default_ttl = 300

s3_client = boto3.client('s3')
table = boto3.resource('dynamodb').Table('objects-to-process')

def parse_bucket_and_key(s3_notif_event):
    s3_record = s3_notif_event['Records'][0]['s3']
    return s3_record['bucket']['name'], s3_record['object']['key']

def lambda_handler(event, context):
    try:
        bucket_name, key = parse_bucket_and_key(event)
        head_obj = s3_client.head_object(Bucket=bucket_name, Key=key)
        tags = s3_client.get_object_tagging(Bucket=bucket_name, Key=key)
        if(head_obj['ContentLength'] > 0 and len(tags['TagSet']) == 0):
            record_path = f"s3://{bucket_name}/{key}"
            table.put_item(Item={'path': record_path, 'ttl': int(time.time()) + default_ttl})
    except:
        pass # Ignore

Configuring S3 event notifications on the target bucket

You can take advantage of the scalability, security, and performance of S3 by using it as a data lake for storing your datasets. Additionally, you can use S3 event notifications to capture S3-related events, such as the creation or deletion of objects within a bucket. You can forward these events to other AWS services, such as Lambda.

To configure S3 event notifications, complete the following steps:

  1. On the S3 console, create an S3 bucket named data-bucket.
  2. Click on the bucket and go to “Properties” tab.
  3. Under Advanced Settings, choose Events and add a notification.
  4. For Name, enter MyEventNotification.
  5. For Events, select All object create events.
  6. For Prefix, enter dataset/.
  7. For Send to, choose Lambda Function.
  8. For Lambda, choose NotificationFunction.

This configuration restricts the scheduling to events that happen within your previously defined dataset. For more information, see How Do I Enable and Configure Event Notifications for an S3 Bucket?

Creating a Lambda function that performs data processing tasks

You have now created a time-based trigger for the deletion of the record in the DynamoDB table. However, when the system delete occurs and the change is recorded in DynamoDB Streams, no further action is taken. Lambda can poll the stream to detect these change records and trigger a function to process them according to the activity (INSERT, MODIFY, REMOVE).

This post is only concerned with deleted items because it uses the TTL feature of DynamoDB Streams to trigger task executions. Lambda gives you the flexibility to either process the item by itself or to forward the processing effort to somewhere else (such as an AWS Glue job or an Amazon SQS queue).

This post uses Lambda directly to process the S3 objects. The Lambda function performs the following tasks:

  1. Gets the S3 object from the DynamoDB item’s S3 path attribute.
  2. Modifies the object’s data.
  3. Overrides the old S3 object with the updated content and tags the object as processed.

Complete the following steps:

  1. On the Lambda console, create a function named JSONProcessingFunction with Python 3.7 as the runtime and the following code:
    import os, json, boto3
    from functools import partial
    from urllib.parse import urlparse
    
    s3 = boto3.resource('s3')
    
    def parse_bucket_and_key(s3_url_as_string):
        s3_path = urlparse(s3_url_as_string)
        return s3_path.netloc, s3_path.path[1:]
    
    def extract_s3path_from_dynamo_event(event):
        if event["Records"][0]["eventName"] == "REMOVE":
            return event["Records"][0]["dynamodb"]["Keys"]["path"]["S"]
    
    def modify_json(json_dict, column_name, value):
        json_dict[column_name] = value
        return json_dict
        
    def get_obj_contents(bucketname, key):
        obj = s3.Object(bucketname, key)
        return obj.get()['Body'].iter_lines()
    
    clean_column_2_func = partial(modify_json, column_name="file_contents", value="")
    
    def lambda_handler(event, context):
        s3_url_as_string = extract_s3path_from_dynamo_event(event)
        if s3_url_as_string:
            bucket_name, key = parse_bucket_and_key(s3_url_as_string)
            updated_json = "\n".join(map(json.dumps, map(clean_column_2_func, map(json.loads, get_obj_contents(bucket_name, key)))))
            s3.Object(bucket_name, key).put(Body=updated_json, Tagging="PROCESSED=True")
        else:
            print(f"Invalid event: {str(event)}")

  2. On the Lambda function configuration webpage, click on Add trigger.
  3. For Trigger configuration, choose DynamoDB.
  4. For DynamoDB table, choose objects-to-process.
  5. For Batch size, enter 1.
  6. For Batch window, enter 0.
  7. For Starting position, choose Trim horizon.
  8. Select Enable trigger.

You use batch size = 1 because each S3 object represented on the DynamoDB table is typically large. If these files are small, you can use a larger batch size. The batch size is essentially the number of files that your Lambda function processes at a time.

Because any new objects on S3 (in a versioning-enabled bucket) create an object creation event, even if its key already exists, you must make sure that your task schedule Lambda function ignores any object creation events that your task execution function creates. Otherwise, it creates an infinite loop. This post uses tags on S3 objects: when the task execution function processes an object, it adds a processed tag. The task scheduling function ignores those objects in subsequent executions.

Using Athena to query the processed data

The final step is to create a table for Athena to query the data. You can do this manually or by using an AWS Glue crawler that infers the schema directly from the data and automatically creates the table for you. This post uses a crawler because it can handle schema changes and add new partitions automatically. To create this crawler, use the following code:

aws glue create-crawler --name data-crawler \ 
--role <AWSGlueServiceRole-crawler> \
--database-name data_db \
--description 'crawl data bucket!' \
--targets \
"{\
  \"S3Targets\": [\
    {\
      \"Path\": \"s3://<data-bucket>/dataset/\"\
    }\
  ]\
}"

Replace <AWSGlueServiceRole-crawler> and <data-bucket> with the name of your AWSGlueServiceRole and S3 bucket, respectively.

When the crawling process is complete, you can start querying the data. You can use the Athena console to interact with the table while its underlying data is being transparently updated. See the following code:

SELECT * FROM data_db.dataset LIMIT 1000

Automated setup

You can use the following AWS CloudFormation template to create the solution described on this post on your AWS account. To launch the template, choose the following link:

This CloudFormation stack requires the following parameters:

  • Stack name – A meaningful name for the stack, for example, data-updater-solution.
  • Bucket name – The name of the S3 bucket to use for the solution. The stack creation process creates this bucket.
  • Time to Live – The number of seconds to expire items on the DynamoDB table. Referenced S3 objects are processed on item expiration.

Stack creation takes up to a few minutes. Check and refresh the AWS CloudFormation Resources tab to monitor the process while it is running.

When the stack shows the state CREATE_COMPLETE, you can start using the solution.

Testing the solution

To test the solution, download the mock_uploaded_data.json dataset created with the Mockaroo data generator. The use case is a web service in which users can upload files. The goal is to delete those files some predefined time after the upload to reduce storage and query costs. To this end, the provided code looks for the attribute file_contents and replaces its value with an empty string.

You can now upload new data into your data-bucket S3 bucket under the dataset/ prefix. Your NotificationFunction Lambda function processes the resulting bucket notification event for the upload, and a new item appears on your DynamoDB table. Shortly after the predefined TTL time, the JSONProcessingFunction Lambda function processes the data and you can check the resulting changes via an Athena query.

You can also confirm that a S3 object was processed successfully if the DynamoDB item corresponding to this S3 object is no longer present in the DynamoDB table and the S3 object has the processed tag.

Conclusion

This post showed how to automatically re-process objects on S3 after a predefined amount of time by using a simple and fully managed scheduling mechanism. Because you use S3 for storage, you automatically benefit from S3’s eventual consistency model, simply by using identical keys (names) both for the original and processed objects. This way, you avoid query results with duplicate or missing data. Also, incomplete or only partially uploaded objects do not result in data inconsistencies because S3 only creates new object versions for successfully completed file transfers.

You may have previously used Spark to process objects hourly. This requires you to monitor objects that must be processed, to move and process them in a staging area, and to move them back to their actual destination. The main drawback is the final step because, due to Spark’s parallelism nature, files are generated with different names and contents. That prevents direct file replacement in the dataset and leads to downtimes or potential data duplicates when data is queried during a move operation. Additionally, because each copy/delete operation could potentially fail, you have to deal with possible partially processed data manually.

From an operations perspective, AWS serverless services simplify your infrastructure. You can combine the scalability of these services with a pay-as-you-go plan to start with a low-cost POC and scale to production quickly—all with a minimal code base.

Compared to hourly Spark jobs, you could potentially reduce costs by up to 80%, which makes this solution both cheaper and simpler.

Special thanks to Karl Fuchs, Stefan Schmidt, Carlos Rodrigues, João Neves, Eduardo Dixo and Marco Henriques for their valuable feedback on this post’s content.

 


About the Authors

Pedro Completo Bento is a senior big data engineer working at Siemens CDC. He holds a Master in Computer Science from the Instituto Superior Técnico in Lisbon. He started his career as a full-stack developer, specializing later on big data challenges. Working with AWS, he builds highly reliable, performant and scalable systems on the cloud, while keeping the costs at bay. In his free time, he enjoys to play boardgames with his friends.

 

 

Arturo Bayo is a big data consultant at Amazon Web Services. He promotes a data-driven culture in enterprise customers around EMEA, providing specialized guidance on business intelligence and data lake projects while working with AWS customers and partners to build innovative solutions around data and analytics.

 

 

 

 

Reducing custom code by using advanced rules in Amazon EventBridge

Post Syndicated from James Beswick original https://aws.amazon.com/blogs/compute/reducing-custom-code-by-using-advanced-rules-in-amazon-eventbridge/

Amazon EventBridge allows you to route events between AWS services, integrated software as a service (SaaS) applications, and your own applications. Event producers publish events onto an event bus, which uses rules to determine where to send those events. The rules can specify one or more targets, which can be other AWS services or Lambda functions. This model makes it easy to develop scalable, distributed serverless applications by handling event routing and filtering.

EventBridge Content Filtering

EventBridge recently introduced additional content filtering functionality, which creates new possibilities for building sophisticated rules. This blog post explores how to use event patterns to build rules that make this routing process more powerful without needing custom code. I show how this could work with a sample ATM banking application integrating into an AWS service.

Events, rules, and filtering

In EventBridge, an event is simply a JSON structure. It contains some top-level envelope fields, such as the source, event, and timestamp, followed by a detail field containing the body of the event. Events generated from AWS services always contain a number of descriptive fields and are identifiable by the source attribute prefix “aws”.

You can also generate events from your own applications. EventBridge requires specific envelope fields, but otherwise you are free to add additional attributes as needed. A typical event structure for a custom application looks like this:

{
  "Source": string,
  "EventBusName": string,
  "DetailType": string,
  "Detail": string
}

If your application uses nested attributes, you must convert the Detail attribute into a string. In programming languages such as Node.js, you can do this using JSON.stringify to send an event, and JSON.parse when receiving it. For example, for a banking application where an ATM application sends events to EventBridge, a cash withdrawal event may look like this:

{ 
  "Source": "custom.myATMapp",
  "EventBusName": "default",
  "DetailType": "transaction",
  "Detail": "{\"action\":\"withdrawal”,\"amount\":300}"
}

EventBridge rules use event patterns that are JSON structures. These match against the attributes in the events. In the rules, you only specify the fields where you want to apply filtering logic.

To see all events for a single application using an event bus, you can filter by source. Any incoming event with this source matches regardless of the content of other fields. In the ATM application example, a rule that accepts all events looks like this:

{ 
  "source": [ "custom.myATMapp" ]
}

EventBridge examines the incoming event and compares it against this rule. The rule specifies a source value of custom.myATMapp and, as this exists in the event, the pattern matches. It then routes the event to the rule’s targets:

EventBridge rules

The example above shows a static, exact match pattern – the attribute is either present, or it’s not. There are now additional operators available for dynamic matching based on specific comparison conditions. This provides functionality that’s similar to what you use in a SQL where clause for filtering records in a database.

Here is a summary of all the comparison operators available in EventBridge:

ComparisonExampleRule syntax
NullUserID is null“UserID”: [ null ]
EmptyLastName is empty“LastName”: [“”]
EqualsName is “Alice”“Name”: [ “Alice” ]
AndLocation is “New York” and Day is “Monday”“Location”: [ “New York” ], “Day”: [“Monday”]
OrPaymentType is “Credit” or “Debit”“PaymentType”: [ “Credit”, “Debit”]
NotWeather is anything but “Raining”“Weather”: [ { “anything-but”: [ “Raining” ] } ]
Numeric (equals)Price is 100“Price”: [ { “numeric”: [ “=”, 100 ] } ]
Numeric (range)Price is more than 10, and less than or equal to 20“Price”: [ { “numeric”: [ “>”, 10, “<=”, 20 ] } ]
ExistsProductName exists“ProductName”: [ { “exists”: true } ]
Does not existProductName does not exist“ProductName”: [ { “exists”: false } ]
Begins withRegion is in the US“Region”: [ {“prefix”: “us-“ } ]

Filtering events in a custom application

In this example, a bank runs software on a network of ATMs that forwards transactional information to EventBridge. This software sends all events to EventBridge, but downstream systems only want to receive a subset of ATM events:

ATM example application

The events from the ATMs have the following structure:

      {
        "Source": "custom.myATMapp",
        "EventBusName": "default",
        "DetailType": "transaction",
        "Time": "Wed Jan 29 2020 08:03:18 GMT-0500",
        "Detail":{
          "action": "withdrawal",
          "location": "NY-NYC-001",
          "amount": 300,
          "result": "approved",
          "transactionId": "123456",
          "cardPresent": true,
          "partnerBank": "Example Bank",
          "remainingFunds": 722.34
        }
      }

The downstream services can use the event patterns in EventBridge rules to ensure that they only receive specific events.

1. Transactions where the amount is over $300

The following event pattern filters for ATM transactions over $300.

{
  "source": [ "custom.myATMapp" ],
  "detail-type": [ "transaction" ],
  "detail": {
    "amount": [ { "numeric": [ ">", 300 ] } ]
  }
}

2. All ATMs in New York City

The ATM location attribute uses the format state-city-id, so NY-NYC-001 indicates that the machine is located in New York City in New York state. To filter events from only ATMs in the New York City area, I use a prefix in the filter:

{
  "source": [ "custom.myATMapp" ],
  "detail-type": [ "transaction" ],
  "detail": {
    "location": [ { "prefix": "NY-NYC-" } ]
  }
}

3. ATM customers using a third-party bank account

To filter for transactions that show a partnerBank attribute, the following event pattern checks for the existence of this attribute:

{
  "source": [ "custom.myATMapp" ],
  "detail-type": [ "transaction" ],
  "detail": {
    "partnerBank": [ { "exists": true } ]
  }
}

4. Combined filter

I can combine filters in a single event pattern to create use-cases that are more complex. For example, this filters on approved transactions where no partnerBank attribute exists, reporting from any ATM with a location different to NY-NYC-002:

{
  "source": [ "custom.myATMapp" ],
  "detail-type": [ "transaction" ],
  "detail": {
    "result": [ "approved" ],
    "partnerBank": [ { "exists": false } ],
    "location": [ { "anything-but": "NY-NYC-002" }]
  }
}

In each of these cases, EventBridge matches incoming events against the event patterns in these rules. If there is no match, it does not route the event. This eliminates custom code that otherwise exists to filter incoming events and terminate if necessary.

Filtering AWS events to create a custom S3-to-Lambda integration

EventBridge uses a variety of AWS services as native event sources. For other AWS services, such as Amazon S3, it consumes events via AWS CloudTrail. You must first enable CloudTrail logging for the service you want to use with EventBridge. Once enabled, you can filter on any of the attributes available in an AWS event. This allows you to create dynamic, flexible integrations in your event-driven applications.

The standard S3-to-Lambda trigger allows developers to subscribe a Lambda function to an event on a single bucket. Although these events can filter on prefixes and suffixes of object keys in S3, you cannot use multiple configurations that overlap. Beyond the prefix and suffix of the key name, you cannot filter further on any other attributes of the event before invoking the Lambda function. To examine the S3 event further, you must do this within the code in the function itself.

Using EventBridge, you can configure a rule between one or more S3 buckets, and one or more Lambda functions, based upon any of the attributes available. This enables you to create much more granular filters for routing events to downstream consumers. Using a declarative approach results in greater flexibility and less custom code. In this section, I show four use-cases where this could be useful.

S3 to EventBridge

(a) Invoking a single Lambda function from events in multiple buckets

This example uses multiple buckets with a common prefix in the bucket name (for example, buckets with the names “myApp-images”, “myApp-uploads”, and “myApp-archive”). You can use all these buckets as an event source to trigger the same Lambda function. This event pattern matches for all put events in those buckets:

{
  "source": [ "aws.s3" ],
  "detail-type": [ "AWS API Call via CloudTrail" ],
  "detail": {
    "eventSource": [ "s3.amazonaws.com" ],
    "eventName": [ "PutObject" ],
    "requestParameters": {
      "bucketName": [ { "prefix": "myApp-" } ]
    }
  }
}

(b) Invoking multiple consumers as targets

EventBridge allows up to five targets per rule, so you can specify up to five separate Lambda functions to receive the event. All five functions are invoked in parallel when the event pattern matches. To use this, add the targets in the rule – no changes to the event pattern is required.

If you need more than five targets, use Amazon Simple Notification Service (SNS). You can define an SNS topic as the EventBridge rule target, and then fan out from SNS to much larger number of subscribers.

{
  "source": [ "aws.s3" ],
  "detail-type": [ "AWS API Call via CloudTrail" ],
  "detail": {
    "eventSource": [ "s3.amazonaws.com" ],
    "eventName": [ "GetObject" ],
    "userAgent": [ "userAgent" ],

    "requestParameters": {
      "bucketName": [ "mybucket" ]
    }
  }
}

Conclusion

The new content filtering syntax in EventBridge enables precise filtering of events using comparison operators and ranges of values. This allows you to filter declaratively at the event bus rather than filtering downstream using custom code. For custom applications, like the ATM example, it enables you to build precise rules for specific use-cases, reducing the number of calls to targets.

This approach enables you to route events more precisely based upon any of the attributes reported in an event. This makes it easier to handle complex routing at the EventBridge level and reduces the need for custom code across your application.

To learn more about content filtering, see the Amazon EventBridge documentation.

Analyze your Amazon S3 spend using AWS Glue and Amazon Redshift

Post Syndicated from Shayon Sanyal original https://aws.amazon.com/blogs/big-data/analyze-your-amazon-s3-spend-using-aws-glue-and-amazon-redshift/

The AWS Cost & Usage Report (CUR) tracks your AWS usage and provides estimated charges associated with that usage. You can configure this report to present the data at hourly or daily intervals, and it is updated at least one time per day until it is finalized at the end of the billing period. The Cost & Usage Report is delivered automatically to an Amazon S3 bucket that you specify, and you can download it from there directly. You can also integrate the report into Amazon Redshift, query it with Amazon Athena, or upload it to Amazon QuickSight. For more information, see Query and Visualize AWS Cost and Usage Data Using Amazon Athena and Amazon QuickSight.

This post presents a solution that uses AWS Glue Data Catalog and Amazon Redshift to analyze S3 usage and spend by combining the AWS CUR, S3 inventory reports, and S3 server access logs.

Prerequisites

Before you begin, complete the following prerequisites:

  • You need an S3 bucket for your S3 inventory and server access log data files. For more information, see Create a Bucket and What is Amazon S3?
  • You must have the appropriate IAM permissions for Amazon Redshift to be able to access the S3 buckets – for this post, choose two non-restrictive IAM roles (AmazonS3FullAccess and AWSGlueConsoleFullAccess), but restrict your access accordingly for your own scenarios.

Amazon S3 inventory

Amazon S3 inventory is one of the tools S3 provides to help manage your storage. You can use it to audit and report on the replication and encryption status of your objects for business, compliance, and regulatory needs. Amazon S3 inventory provides comma-separated values (CSV), Apache optimized row columnar (ORC), or Apache Parquet output files that list your objects and their corresponding metadata on a daily or weekly basis for a given S3 bucket.

Amazon S3 server access logs

Server access logging provides detailed records for the requests you make to a bucket. Server access logs are useful for many applications, for example in security and access audits. It can also help you learn about your customer base and understand your S3 bill.

AWS Glue

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it simple and cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores. AWS Glue consists of a central metadata repository known as the Data Catalog, a crawler to populate the Data Catalog with tables, an ETL engine that automatically generates Python or Scala code, and a flexible scheduler that handles dependency resolution, job monitoring, and retries. AWS Glue is serverless, so there’s no infrastructure to set up or manage. This post uses AWS Glue to catalog S3 inventory data and server access logs, which makes it available for you to query with Amazon Redshift Spectrum.

Amazon Redshift

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. You can use Amazon Redshift to efficiently query and retrieve structured and semi-structured data from files in S3 without having to load the data into Amazon Redshift native tables. You can create Amazon Redshift external tables by defining the structure for files and registering them as tables in the AWS Glue Data Catalog.

Setting up S3 inventory reports for analysis

This post uses the Parquet file format for its inventory reports and delivers the files daily to S3 buckets. You can select both the frequency of delivery and output file formats under Advanced settings as shown in the screenshot below:

For more information about configuring your S3 inventory, see How Do I Configure Amazon S3 Inventory?

The following diagram shows the data flow for this solution:

Below steps summarize the data flow diagram represented above:

  • S3 Inventory Reports are delivered to an S3 bucket that you configure.
  • The AWS Glue crawler then crawls this S3 bucket and populates the metadata in the AWS Glue Data Catalog.
  • The AWS Glue Data Catalog is then accessible through an external schema in Redshift.
  • The S3 Inventory Reports (available in the AWS Glue Data Catalog) and the Cost and Usage Reports (available in another S3 bucket) are now ready to be joined and queried for analysis.

The inventory reports are delivered to an S3 bucket. The following screenshot shows the S3 bucket structure for the S3 inventory reports:

There is a data folder in this bucket. This folder contains the Parquet data you want to analyze. The following screenshot shows the content of the folder.

Because these are daily files, there is one file per day.

Configuring an AWS Glue crawler

You can use an AWS Glue crawler to discover this dataset in your S3 bucket and create the table schemas in the Data Catalog. After you create these tables, you can query them directly from Amazon Redshift.

To configure your crawler to read S3 inventory files from your S3 bucket, complete the following steps:

  1. Choose a crawler name.
  2. Choose S3 as the data store and specify the S3 path up to the data
  3. Choose an IAM role to read data from S3 – AmazonS3FullAccess and AWSGlueConsoleFullAccess.
  4. Set a frequency schedule for the crawler to run.
  5. Configure the crawler’s output by selecting a database and adding a prefix (if any).

This post uses the database s3spendanalysis.

The following screenshot shows the completed crawler configuration.

Run this crawler to add tables to your Glue Data Catalog. After the crawler has completed successfully, go to the Tables section on your AWS Glue console to verify the table details and table metadata. The following screenshot shows the table details and table metadata after your AWS Glue crawler has completed successfully:

Creating an external schema

Before you can query the S3 inventory reports, you need to create an external schema (and subsequently, external tables) in Amazon Redshift. An Amazon Redshift external schema references an external database in an external data catalog. Because you are using an AWS Glue Data Catalog as your external catalog, after you create an external schema in Amazon Redshift, you can see all the external tables in your Data Catalog in Amazon Redshift. To create the external schema, enter the following code:

create external schema spectrum_schema from data catalog
database 's3spendanalysis'
iam_role 'arn:aws:iam::<AWS_IAM_ROLE>';

Querying the table

On the Amazon Redshift dashboard, under Query editor, you can see the data table. You can also query the svv_external_schemas system table to verify that your external schema has been created successfully. See the following screenshot.

You can now query the S3 inventory reports directly from Amazon Redshift without having to move the data into Amazon Redshift first. The following screenshot shows how to do this using the Query Editor in the Amazon Redshift console:

Setting up S3 server access logs for analysis

The following diagram shows the data flow for this solution.

Below steps summarize the data flow diagram represented above:

  • S3 Server Access Logs are delivered to an S3 bucket that you configure.
  • These server access logs are then directly accessible to be queried from Amazon Redshift (note that we’ll be using CREATE EXTERNAL TABLE in Redshift Spectrum for this purpose, explained below).
  • The S3 Server Access Logs and the Cost and Usage Reports (available in another S3 bucket) are now ready to be joined and queried for analysis.

The S3 server access logs are delivered to an S3 bucket. For more information about setting up server access logging, see Amazon S3 Server Access Logging.

The following screenshot shows the S3 bucket structure for the server access logs.

The server access log files consist of a sequence of new-line delimited log records. Each log record represents one request and consists of space-delimited fields. The following code is an example log record:

b8ad5f5cfd3c09418536b47b157851fb7bea4a00486471093a7d765e35a4f8ef s3spendanalysisblog [23/Sep/2018:22:10:52 +0000] 72.21.196.65 arn:aws:iam::<AWS Account #>:user/shayons D5633DAD1063C5CA REST.GET.LIFECYCLE - "GET /s3spendanalysisblog?lifecycle= HTTP/1.1" 404 NoSuchLifecycleConfiguration 332 - 105 - "-" "S3Console/0.4, aws-internal/3 aws-sdk-java/1.11.408 Linux/4.9.119-0.1.ac.277.71.329.metal1.x86_64 OpenJDK_64-Bit_Server_VM/25.181-b13 java/1.8.0_181" -

Creating an external table

You can define the S3 server access logs as an external table. Because you already have an external schema, create an external table using the following code. This post uses RegEx SerDe to create a table that allows you to correctly parse all the fields present in the S3 server access logs. See the following code:

CREATE EXTERNAL TABLE spectrum_schema.s3accesslogs(
BucketOwner                   varchar(256), 
Bucket                        varchar(256), 
RequestDateTime               varchar(256), 
RemoteIP                      varchar(256), 
Requester                     varchar(256), 
RequestID                     varchar(256), 
Operation                     varchar(256), 
Key                           varchar(256), 
RequestURI_operation          varchar(256),
RequestURI_key                varchar(256),
RequestURI_httpProtoversion   varchar(256),
HTTPstatus                    varchar(256), 
ErrorCode                     varchar(256), 
BytesSent                     varchar(256), 
ObjectSize                    varchar(256), 
TotalTime                     varchar(256), 
TurnAroundTime                varchar(256), 
Referrer                      varchar(256), 
UserAgent                     varchar(256), 
VersionId                     varchar(256))
ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.RegexSerDe'
WITH SERDEPROPERTIES (
  'input.regex' = '([^ ]*) ([^ ]*) \\[(.*?)\\] ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) \"([^ ]*) ([^ ]*) ([^ ]*)\" (-|[0-9]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) ([^ ]*) (\"[^\"]*\") ([^ ]*)'
  )
STORED AS TEXTFILE
LOCATION
  's3://s3spendanalysisblog/accesslogs/';

Validating the data

You can validate the external table data in Amazon Redshift. The following screenshot shows how to do this using the Query Editor in the Amazon Redshift console:

You are now ready to analyze the data.

Analyzing the data using Amazon Redshift

In this post, you have a CUR file per day in your S3 bucket. The files themselves are organized in a monthly hierarchy. See the following screenshot.

Each day’s file consists of the following files for CUR data:

  • myCURReport-1.csv.gz – A zipped file of the data itself
  • myCURReport-Manifest.json – A JSON file that contains the metadata for the file
  • myCURReport-RedshiftCommands.sql – Amazon Redshift table creation scripts and a COPY command to create the CUR table from a Redshift manifest file
  • myCURReport-RedshiftManifest.json – The Amazon Redshift manifest file to create the CUR table

Using Amazon Redshift is one of the many ways to carry out this analysis. Amazon Redshift is a fast, fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL and your existing Business Intelligence (BI) tools. Amazon Redshift gives you fast querying capabilities over structured data using familiar SQL-based clients and BI tools using standard ODBC and JDBC connections. Queries are distributed and parallelized across multiple physical resources.

You are now ready to run SQL queries with the Amazon Redshift SQL Query Editor. This post also uses the psql client tool, a terminal-based front end from PostgreSQL, to query the data in the cluster.

To query the data, complete the following steps:

  1. Create a custom schema to contain your tables for analysis. See the following code:
    create schema if not exists redshift_schema;

    You should create your table in a schema other than public to control user access to database objects.

  2. Create a CUR table for the latest month in Amazon Redshift using the CUR SQL file in S3. See the following code:
    create table redshift_schema.AWSBilling201910 (
    identity_LineItemId VARCHAR(256),
    identity_TimeInterval VARCHAR(100),
    bill_InvoiceId VARCHAR(100),
    bill_BillingEntity VARCHAR(10),
    bill_BillType VARCHAR(100),
    bill_PayerAccountId VARCHAR(100),
    bill_BillingPeriodStartDate TIMESTAMPTZ,
    bill_BillingPeriodEndDate TIMESTAMPTZ,
    lineItem_UsageAccountId VARCHAR(100),
    lineItem_LineItemType VARCHAR(100),
    lineItem_UsageStartDate TIMESTAMPTZ,
    lineItem_UsageEndDate TIMESTAMPTZ,
    lineItem_ProductCode VARCHAR(100),
    lineItem_UsageType VARCHAR(100),
    lineItem_Operation VARCHAR(100),
    lineItem_AvailabilityZone VARCHAR(100),
    lineItem_ResourceId VARCHAR(256),
    lineItem_UsageAmount DECIMAL(11,2),
    lineItem_NormalizationFactor VARCHAR(10),
    lineItem_NormalizedUsageAmount DECIMAL(11,2),
    lineItem_CurrencyCode VARCHAR(10),
    lineItem_UnblendedRate DECIMAL(11,2),
    lineItem_UnblendedCost DECIMAL(11,2),
    lineItem_BlendedRate DECIMAL(11,2),
    lineItem_BlendedCost DECIMAL(11,2),
    lineItem_LineItemDescription VARCHAR(100),
    lineItem_TaxType VARCHAR(100),
    lineItem_LegalEntity VARCHAR(100),
    product_ProductName VARCHAR(100),
    product_alarmType VARCHAR(100),
    product_automaticLabel VARCHAR(100),
    product_availability VARCHAR(100),
    product_availabilityZone VARCHAR(100),
    product_clockSpeed VARCHAR(100),
    product_currentGeneration VARCHAR(100),
    product_databaseEngine VARCHAR(100),
    product_dedicatedEbsThroughput VARCHAR(100),
    product_deploymentOption VARCHAR(100),
    product_durability VARCHAR(100),
    product_ecu VARCHAR(100),
    product_edition VARCHAR(100),
    product_engineCode VARCHAR(100),
    product_enhancedNetworkingSupported VARCHAR(100),
    product_eventType VARCHAR(100),
    product_feeCode VARCHAR(100),
    product_feeDescription VARCHAR(100),
    product_fromLocation VARCHAR(100),
    product_fromLocationType VARCHAR(100),
    product_gpu VARCHAR(100),
    product_gpuMemory VARCHAR(100),
    product_group VARCHAR(100),
    product_groupDescription VARCHAR(100),
    product_instanceFamily VARCHAR(100),
    product_instanceType VARCHAR(100),
    product_instanceTypeFamily VARCHAR(100),
    product_io VARCHAR(100),
    product_labelingTaskType VARCHAR(100),
    product_licenseModel VARCHAR(100),
    product_location VARCHAR(100),
    product_locationType VARCHAR(100),
    product_maxThroughputvolume VARCHAR(100),
    product_maxVolumeSize VARCHAR(100),
    product_memory VARCHAR(100),
    product_messageDeliveryFrequency VARCHAR(100),
    product_messageDeliveryOrder VARCHAR(100),
    product_minVolumeSize VARCHAR(100),
    product_networkPerformance VARCHAR(100),
    product_normalizationSizeFactor VARCHAR(100),
    product_operation VARCHAR(100),
    product_physicalCpu VARCHAR(100),
    product_physicalGpu VARCHAR(100),
    product_physicalProcessor VARCHAR(100),
    product_processorArchitecture VARCHAR(100),
    product_processorFeatures VARCHAR(100),
    product_productFamily VARCHAR(100),
    product_protocol VARCHAR(100),
    product_queueType VARCHAR(100),
    product_region VARCHAR(100),
    product_servicecode VARCHAR(100),
    product_servicename VARCHAR(100),
    product_sku VARCHAR(100),
    product_storage VARCHAR(100),
    product_storageClass VARCHAR(100),
    product_storageMedia VARCHAR(100),
    product_subscriptionType VARCHAR(100),
    product_toLocation VARCHAR(100),
    product_toLocationType VARCHAR(100),
    product_transferType VARCHAR(100),
    product_usageFamily VARCHAR(100),
    product_usagetype VARCHAR(100),
    product_vcpu VARCHAR(100),
    product_version VARCHAR(100),
    product_volumeType VARCHAR(100),
    product_workforceType VARCHAR(100),
    pricing_RateId VARCHAR(100),
    pricing_publicOnDemandCost DECIMAL(11,2),
    pricing_publicOnDemandRate DECIMAL(11,2),
    pricing_term VARCHAR(100),
    pricing_unit VARCHAR(100),
    reservation_AmortizedUpfrontCostForUsage DECIMAL(11,2),
    reservation_AmortizedUpfrontFeeForBillingPeriod DECIMAL(11,2),
    reservation_EffectiveCost DECIMAL(11,2),
    reservation_EndTime TIMESTAMPTZ,
    reservation_ModificationStatus VARCHAR(100),
    reservation_NormalizedUnitsPerReservation BIGINT,
    reservation_RecurringFeeForUsage DECIMAL(11,2),
    reservation_StartTime TIMESTAMPTZ,
    reservation_SubscriptionId VARCHAR(100),
    reservation_TotalReservedNormalizedUnits BIGINT,
    reservation_TotalReservedUnits BIGINT,
    reservation_UnitsPerReservation BIGINT,
    reservation_UnusedAmortizedUpfrontFeeForBillingPeriod DECIMAL(11,2),
    reservation_UnusedNormalizedUnitQuantity BIGINT,
    reservation_UnusedQuantity BIGINT,
    reservation_UnusedRecurringFee DECIMAL(11,2),
    reservation_UpfrontValue BIGINT
    );

  3. Load the data into Amazon Redshift for the latest month, using the provided CUR Manifest file. See the following code:
    copy AWSBilling201910 from 's3://ss-cur//myCURReport/20191001-20191101/fd76beee-0709-42d5-bcb2-bb45f8ba1aae/myCURReport-RedshiftManifest.json'
    credentials 'arn:aws:iam::<AWS_IAM_ROLE>'
    GZIP CSV IGNOREHEADER 1 TIMEFORMAT 'auto' manifest;

  4. Validate the data loaded in the Amazon Redshift table. See the following code:
    select * from AWSBilling201910
    where lineItem_ProductCode = 'AmazonS3'
    and lineItem_ResourceId = 's3spendanalysisblog' limit 10;

    The following screenshot shows that data has been loaded correctly in the Amazon Redshift table:

Managing database security

You can manage database security in Amazon Redshift by controlling which users have access to which database objects. To make sure your objects are secure, create two groups: FINANCE and ADMIN, with two users in FINANCE and one user in ADMIN. Complete the following steps:

  1. Create the groups where the user accounts are assigned. The following code creates two different user groups:
    create group finance;
    create group admin;

    To view all user groups, query the PG_GROUP system catalog table (you should see finance and admin here):

    select * from pg_group:

  2. Create three database users with different privileges and add them to the groups. See the following code:
    create user finance1 password 'finance1Pass'
    in group finance;
    
    create user finance2 password 'finance2Pass'
    in group finance;
    
    create user admin1 password 'admin1Pass'
    in group admin;

    Validate the users have been successfully created. To view a list of users, query the PG_USER catalog table:

  3. Grant SELECT privileges to the FINANCE group and ALL privileges to the ADMIN group for your table AWSBilling201910 in redshift_schema. See the following code:
    grant select on table redshift_schema.AWSBilling201910 to group finance; 
    grant all on table redshift_schema.AWSBilling201910 to group admin;

    You can verify if you enforced database security correctly. The user finance1 tried to rename the table AWSBilling201910 in redshift_schema, but got a permission denied error message (due to restricted access). The following screenshot shows this scenario and the subsequent error message:

Example S3 inventory analysis

S3 charges split per bucket. The following query identifies the data storage and transfer costs for each separate S3 bucket:

SELECT
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."storage_class",
  SUM(CASE
    WHEN "lineitem_usagetype" like '%Byte%' THEN "lineitem_usageamount"/1024
    ELSE "lineitem_usageamount"
  END) as "Usage",
  CASE
    WHEN "lineitem_usagetype" like '%Byte%' THEN 'TBs'
    ELSE 'Requests'
  END as "Usage Units",
  sum("lineitem_blendedcost") as cost
from awsbilling201902 a
  join spectrum_schema.data b
    on a.lineItem_ResourceId = b.bucket
where "product_productname" = 'Amazon Simple Storage Service'
group by
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."storage_class"
order by
  sum("lineitem_blendedcost") desc;

The following screenshot shows the results of executing the above query:

Costs are split by type of storage (for example, Glacier versus standard storage).

The following query identifies S3 data transfer costs (intra-region and inter-region) by S3 storage class (usage amount, unblended cost, blended cost):

SELECT
 lineitem_productcode
 ,product_fromlocation
 ,product_tolocation,
  b.storage_class
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  join spectrum_schema.data b
ON
    a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
 AND a.product_productfamily = 'Data Transfer'
GROUP BY
 1,2,3,4
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 fee, API request, and storage charges:

SELECT
 lineitem_productcode
 ,product_productfamily
 ,b.storage_class
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  join spectrum_schema.data b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
  and a.product_productfamily <> 'Data Transfer'
GROUP BY
 1,2,3
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

Server access logs sample analysis queries

S3 access log charges per operation type. The following query identifies the data storage and transfer costs for each separate HTTP operation:

SELECT
  "lineitem_productcode",
  "lineitem_usagetype",
  "lineitem_resourceid",
  b."operation",
  b."httpstatus",
  b."bytessent",
  SUM(CASE
      WHEN "lineitem_usagetype" like '%Byte%'
        THEN "lineitem_usageamount" / 1024
      ELSE "lineitem_usageamount"
      END) as "Usage",
  CASE
  WHEN "lineitem_usagetype" like '%Byte%'
    THEN 'TBs'
  ELSE 'Requests'
  END  as "Usage Units",
  sum("lineitem_blendedcost") as cost
from awsbilling201902 a
  join spectrum_schema.s3accesslogs b
    on a.lineItem_ResourceId = b.bucket
where "product_productname" = 'Amazon Simple Storage Service'
group by
  1, 2, 3, 4, 5, 6
order by
  sum("lineitem_blendedcost") desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 data transfer costs (intra-region and inter-region) by S3 operation and HTTP status (usage amount, unblended cost, blended cost):

SELECT
 lineitem_productcode
 ,product_fromlocation
 ,product_tolocation
 ,b.operation
 ,b.httpstatus
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  JOIN spectrum_schema.s3accesslogs b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
 AND a.product_productfamily = 'Data Transfer'
GROUP BY
 1,2,3,4,5
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

The following query identifies S3 fee, API request, and storage charges:

SELECT
 lineitem_productcode
 ,product_productfamily
 ,b.operation
 ,b.httpstatus
 ,sum(lineitem_usageamount) usageamount
 ,sum(lineitem_unblendedcost) unblendedcost
 ,sum(lineitem_blendedcost) blendedcost
FROM
awsbilling201902 a
  JOIN spectrum_schema.s3accesslogs b
ON
   a.lineItem_ResourceId = b.bucket
WHERE
 a.lineitem_productcode = 'AmazonS3'
  and a.product_productfamily <> 'Data Transfer'
GROUP BY
 1,2,3,4
ORDER BY
 usageamount desc;

The following screenshot shows the result of executing the above query:

Overall data flow diagram

The following diagram shows the complete data flow for this solution.

Conclusion

AWS Glue makes provides an easy and convenient way to discover data stored in your S3 buckets automatically in a cloud-native, secure, and efficient way. This post demonstrated how to use AWS Glue and Amazon Redshift to analyze your S3 spend using Cost and Usage Reports. You also learned best practices for managing database security in Amazon Redshift through users and groups. Using this framework, you can start analyzing your S3 bucket spend with a few clicks in a matter of minutes on the AWS Management Console!

If you have questions or suggestions, please leave your thoughts in the comments section below.

 


About the Author

 Shayon Sanyal is a Data Architect, Data Lake for Global Financial Services at AWS.

 

 

 

Collect and distribute high-resolution crypto market data with ECS, S3, Athena, Lambda, and AWS Data Exchange

Post Syndicated from Jared Katz original https://aws.amazon.com/blogs/big-data/collect-and-distribute-high-resolution-crypto-market-data-with-ecs-s3-athena-lambda-and-aws-data-exchange/

This is a guest post by Floating Point Group. In their own words, “Floating Point Group is on a mission to bring institutional-grade trading services to the world of cryptocurrency.”

The need and demand for financial infrastructure designed specifically for trading digital assets may not be obvious. There’s a rather pervasive narrative that these coins and tokens are effectively natively digital counterparts to traditional assets such as currencies, commodities, equities, and fixed income. This narrative often manifests in the form of pithy one-liners recycled by pundits attempting to communicate the value proposition of various projects in the space (such as, “Bitcoin is just a currency with an algorithmically controlled, tamper-proof monetary policy,” or, “Ether is just a commodity like gasoline that you can use to pay for computational work on a global computer.”). Unsurprisingly, we at FPG often hear the question, “What’s so special about cryptocurrencies that they warrant dedicated financial services? Why do we need solutions for problems that have already been solved?”

The truth is that these assets and the widespread public interest surrounding them are entirely unprecedented. The decentralized ledger technology that serves as an immutable record of network transactions, the clever use of proof-of-work algorithms to economically incentivize rational actors to help uphold the security of the network (the proof-of-work concept dates back at least as far as 1993, but it was not until bitcoin that the technology showed potential for widespread adoption), the irreversible nature of transactions that poses unique legal challenges in cases such as human error or extortion, the precariousness of self-custody (third-party custody solutions don’t exactly have track records that inspire trust), the regulatory uncertainties that come with the difficulty of both classifying these assets as well as arbitrating their exchange which must ultimately be reconciled by entities like the IRS, SEC, and CFTC—it is all very new, and very weird. With 24-hour market volume regularly exceeding $100 billion, we decided to direct our focus towards problems related specifically to trading these assets. Granted, crypto trading has undoubtedly matured since the days of bartering for bitcoin in web forums and witnessing 10% price spreads between international exchanges. But there is still a long path ahead.

One major pain point we are aiming to address for institutional traders involves liquidity (or, more precisely, the lack thereof). Simply put, the buying and selling of cryptocurrencies occurs across many different trading venues (exchanges), and liquidity (the offers to buy or sell a certain quantity of an asset at a certain price) continues to become more fragmented as new exchanges emerge. So say you’re trying to buy 100 bitcoins. You must buy from people who are willing to sell. As you take the best (cheapest) offers, you’re left with increasingly expensive offers. By the time you fill your order (in this example, buy all 100 bitcoins), you may have paid a much higher average price than, say, the price you paid for the first bitcoin of your order. This phenomenon is referred to as slippage. One easy way to minimize slippage is by expanding your search for offers. So rather than looking at the offers on just one exchange, look at the offers across hundreds of exchanges. This process, traditionally referred to as smart order routing (SOR), is one of the core services we provide. Our SOR service allows traders to easily submit orders that our system can match against the best offers available across multiple trading venues by actively monitoring liquidity across dozens of exchanges.

Fanning out large orders in search of the best prices is a rather intuitive and widely applicable concept—roughly 75% of equities are purchased and sold via SOR. But the value of such a service for crypto markets is particularly salient: a perpetual cycle of new exchanges surging in popularity while incumbents falter has resulted in a seemingly incessant fragmentation of liquidity across trading venues—yet traders tend to assume an exchange-agnostic mindset, concerned exclusively with finding the best price for a given quantity of an asset.

Access to both real-time and historical market data is essential to the functionality of our SOR service. The highest resolution data we could hope to obtain for a given market would include every trade and every change applied to the order book, effectively allowing us to recreate the state of a market at any given point in time. The updates provided through the WebSocket streams are not sufficient for reconstructing order books. We also need to periodically fetch snapshots of the order books and store those, which we can do using an exchange’s REST API. We can fetch a snapshot and apply the corresponding updates from the streams to “replay” the order book.

Fortunately, this data is freely available, because many exchanges offer real-time feeds of market data via WebSocket APIs. We found several third-party vendors selling subscriptions to these data sets, typically in the form of CSV dumps delivered at a weekly or monthly cadence. This presented the question of build vs. buy. Given that we felt capable of building a robust and reliable system for ingesting real-time market data in a relatively short amount of time and at a fraction of the cost of purchasing the data from a vendor, we were already leaning in favor of building. Further investigation made buying look like an increasingly unattractive option. Disclaimers that multiple vendors issued about their inability to guarantee data quality and consistency did not inspire confidence. Inspecting sample data sets revealed that some essential fields provided in the original data streams were missing—fields necessary for achieving our goal of recreating the state of a market at an arbitrary point in time. We also recognized that a weekly or monthly delivery schedule would restrict our ability to explore relatively recent market data.

This post provides a high-level overview of how we ingest and store real-time market data and how we use the AWS Data Exchange API to organize and publish our data sets programmatically. Our system’s functionality extends well beyond data ingestion, normalization, and persistence; we run dedicated services for data validation, caching the most recent trade and order book for every market, computing and storing derivative metrics, and other services that help safeguard data accuracy and minimize the latency of our trading systems.

Data ingestion

The WebSocket streams we connect to for data consumption are often the same APIs responsible for providing real-time updates to an exchange’s trading dashboard.

WebSocket connections transmit data as discrete messages. We can inspect the content of individual messages as they stream into the browser. For example, the following screenshot shows a batch of order book updates.

The updates are expressed as arrays of bids and asks that were either added to the book or removed from it. Client-side code processes each update, resulting in a real-time rendering of the market’s order book. In practice, our data ingestion service (Ingester) does not read a single stream, but rather thousands of different streams, covering various data feeds for all markets across multiple exchanges. All the connections required for such broad coverage and the resulting flood of incoming data raise some obvious concerns about data loss. We’ve taken several measures to mitigate such concerns, including a redundant system design that allows us to spin up an arbitrary number of instances of the Ingester service. Like most of our microservices, Ingester is a Dockerized service run on Amazon ECS and deployed via Terraform.

All these instances consume the same data feeds as each other while a downstream mechanism handles deduplication (this is covered in more detail later in this post). We also set up Amazon CloudWatch alerts to notify us when we detect non-contiguous messages, indicating a gap in the incoming data. The alerts don’t directly mitigate data loss, but they do serve the important function of prompting an investigation.

Ingester builds up separate buffers of incoming messages, split out by data-type/exchange/market. Then, after a fixed time interval, each buffer is flushed into Amazon S3 as a gzipped JSON file. The buffer-flush cycle repeats.

The following screenshot shows a portion of the file content.

This code snippet is a single, pretty-printed JSON record from the file in the screenshot above.

{
   "event_type":"trade",
   "timestamp":1571980320422,
   "ticker_pair":"BTCUSDT",
   "trade_id":194230159,
   "price":"7405.69000000",
   "quantity":"3.20285300",
   "buyer_order_id":730178987,
   "seller_order_id":730178953,
   "trade_timestamp":1571980320417,
   "buyer_market_maker":false,
   "M":true
}

Ingester handles additional functionality, such as applying pre-defined mappings of venue-specific field names to our internal field names. Data normalization is one of many processes necessary to enable our systems to build a holistic understanding of market dynamics.

As with most distributed system designs, our services are written with horizontal scalability as a first-order priority. We took the same approach in designing our data ingestion service, but it has some features that make it a bit different than the archetypical horizontally scalable microservice. The most common motivations for adjusting the number of instances of a given service are load-balancing and throttling throughput. Either your system is experiencing backpressure and a consumer service scales to alleviate that pressure, or the consumer is over-provisioned and you scale down the number of instances for the sake of parsimony. For our data ingestion service, however, our motivation for running multiple instances is to minimize data loss via redundancy. The CPU usage for each instance is independent of instance count, because each instance does identical work.

For example, rather than helping alleviate backpressure by pulling messages from a single queue, each instance of our data ingestion service connects to the same WebSocket streams and performs the same amount of work. Another somewhat unusual and confounding aspect of horizontally scaling our data ingestion service is related to state: we batch records in memory and flush the records to S3 every minute (based on the incoming message’s timestamp, not the system timestamp, because those would be inconsistent). Redundancy is our primary measure for minimizing data loss, but we also need each instance to write the files to S3 in such a way that we don’t end up with duplicate records. Our first thought was that we’d need a mechanism for coordinating activity across the instances, such as maintaining a cache that would allow us to check if a record had already been persisted. But we realized that we could perform this deduplication without any coordination between instances at all. Most of the message streams we consume publish messages with sequence IDs. We can combine the sequence IDs with the incoming message timestamp to achieve our deduplication mechanism: we can deterministically generate the same exact file names containing the exact same data by writing our service code to check that the message added to the batch has the appropriate sequence ID relative to the previous message in the batch and using the timestamp on the incoming message to determine the exact start and end of each batch (we typically get a UNIX timestamp and check when we’ve rolled over to the next clock minute). This allows us to simply rely on a key collision in S3 for deduplication.

AWS suggests a similar solution for a slightly different problem, relating to Amazon Kinesis Data Streams. For more information, see Handling Duplicate Records.

With this scheme, even if records are processed more than one time, the resulting Amazon S3 file has the same name and has the same data. The retries only result in writing the same data to the same file more than one time.

After we store the data, we can perform simple analytics queries on the billions of records we’ve stored in S3 using Amazon Athena, a query service that requires minimal configuration and zero infrastructure overhead. Athena has a concept of partitions (inherited from one of its underlying services, Apache Hive). Partitions are mappings between virtual columns (in our case: pair, year, month, and day) and the S3 directories in which the corresponding data is stored.

S3’s file system is not actually hierarchical. Files are prepended with long key prefixes that are rendered as directories in the AWS console when browsing a bucket’s contents. This has some non-trivial performance consequences when querying or filtering on large data sets.

The following screenshot illustrates a typical directory path.

By pointing Athena directly to a particular subset of data, a well-defined partitioning scheme can drastically reduce query run times and costs. Though the ability the perform ad hoc business analytics queries is primarily a convenience, taking time to choose a sane multi-level partitioning scheme for Athena based on some of our most common access patterns seemed worthwhile. A poorly designed partition structure can result in Athena unnecessarily scanning huge swaths of data and ultimately render the service unusable.

Data publication

Our pipeline for transforming thousands of small gzipped JSON files into clean CSVs and loading them into AWS Data Exchange involves three distinct jobs, each expressed as an AWS Lambda function.

Job 1

Job 1 is initiated shortly after midnight UTC by a cron-scheduled CloudWatch event. As mentioned previously, our data ingestion service’s batching mechanism flushes each batch to S3 at a regular time interval. A timestamp on the incoming message (applied server-side) determines the rollover from one interval to the next, as opposed to the ingestion service’s system timestamp, so in the rare case that a non-trivial amount of time elapses between the consumption of the final message of batch n and the first message of batch n+1, we kick off the first Lambda function 20 minutes after midnight UTC to minimize the likelihood of omitting data pending write.

Job 1 formats values for the date and data source into an Athena query template and outputs the query results as a CSV to a specified prefix path in S3. (Every Athena query produces a .metadata file and a CSV file of the query results, though DDL statements do not output a CSV.) This PUT request to S3 triggers an S3 event notification.

We run a full replica data ingestion system as an additional layer of redundancy. Using the coalesce conditional expression, the Athena query in Job 1 merges data from our primary system with the corresponding data from our replica system, and fills in any gaps while deduplicating redundant records.

We experimented fairly extensively with AWS Glue and PySpark for the ETL-related work performed in Job 1. When we realized that we could merge all the small source files into one, join the primary and replica data sets, and sort the results with a single Athena query, we decided to stick with this seemingly simpler and more elegant approach.

The following code shows one of our Athena query templates.

Job 2

Job 2 is triggered by the S3 event notification from Job 1. Job 2 simply copies the query results CSV file to a different key within the same S3 bucket.

The motivation for this step is twofold. First, we cannot dictate the name of an Athena query results CSV file; it is automatically set to the Athena query ID. Second, when adding an S3 object as an asset to an AWS Data Exchange revision, the asset’s name is automatically set to the S3 object’s key. So to dictate how the CSV file name appears in AWS Data Exchange, we must first rename it, which we accomplish by copying it to a specified S3 key.

Job 3

Job 3 handles all work related to AWS Data Exchange and AWS Marketplace Catalog via their respective APIs. We use boto3, AWS’s Python SDK, to interface with these APIs. The AWS Marketplace Catalog API is necessary for adding data set revisions to products that have already been published. For more information, see Tutorial: Adding New Data Set Revisions to a Published Data Product.

Our code explicitly defines mappings with the following structure:

data source / DataSet / Product

The following code shows how we configure relationships between data sources, data sets, and products.

Our data sources are typically represented by a trading venue and data type combination (such as Binance trades or CoinbasePro order books). Each new file for a given data source is delivered as a single asset within a single new revision for a particular data set.

An S3 trigger kicks off the Lambda function. The trigger is scoped to a specified prefix that maps to a single data set. The function alias feature of AWS Lambda allows us to define the unique S3 triggers for each data set while reusing the same underlying Lambda function. Job 3 carries out the following steps (note that steps 1 through 5 refer to the AWS Data Exchange API while steps 6 and 7 refer to the AWS Marketplace Catalog API):

  1. Submits a request to create a new revision for the corresponding data set via CreateRevision.
  2. Adds the file that was responsible for triggering the Lambda function to the newly created revision via CreateJob using the IMPORT_ASSETS_FROM_S3 job type. To submit this job, we need to supply a few values: the S3 bucket and key values for the file are pulled from the Lambda event message, while the RevisionID argument comes from the response to the CreateRevision call in the previous step.
  3. Kicks off the job with StartJob, sourcing the JobID argument from the response to the CreateJob call in the previous step.
  4. Polls the job’s status via GetJob (using the job ID from the response to the StartJob call in the previous step) to check that our file (the asset) was successfully added to the revision.
  5. Finalizes the revision via UpdateRevision.
  6. Requests a description of the marketplace entity using DescribeEntity, passing in the product ID stored in our hardcoded mappings as the EntityID
  7. Kicks off the entity ChangeSet via StartChangeSet, passing in the entity ID from the previous step, the entity ID from the DescribeEntity response in the previous step as EntityID, the revision ARN parsed from the response to our earlier call to CreateRevision as RevisionArn, and the data set ARN as DataSetArn, which we fetch at the start of the code’s runtime using AWS Data Exchange API’s GetDataSet.

Here’s a thin wrapper class we wrote to carry out the steps detailed above:

from time import sleep
import logging
import json

import boto3

from config import (
    DATA_EXCHANGE_REGION,
    MARKETPLACE_CATALOG_REGION,
    LambdaS3TriggerMappings
)

logger = logging.getLogger()


class CustomDataExchangeClient:
    def __init__(self):
        self._de_client = boto3.client('dataexchange', region_name=DATA_EXCHANGE_REGION)
        self._mc_client = boto3.client('marketplace-catalog', region_name=MARKETPLACE_CATALOG_REGION)
    
    def _get_s3_data_source(self, bucket, prefix):
        return LambdaS3TriggerMappings[(bucket, prefix)]

    # Job State can be one of: WAITING | IN_PROGRESS | ERROR | COMPLETED | CANCELLED | TIMED_OUT
    def _wait_for_de_job_completion(self, job_id):
        while True:
            get_job_resp = self._de_client.get_job(JobId=job_id)
            if get_job_resp['State'] == 'COMPLETED':
                logger.info(f"Job '{job_id}' succeeded:\n\t{get_job_resp}")
                break
            elif get_job_resp['State'] in ('ERROR', 'CANCELLED'):
                raise Exception(f"Job '{job_id}' failed:\n\t{get_job_resp}")
            else:
                sleep(5)
                logger.info(f"Still waiting on job {job_id}...")
        return get_job_resp

    # ChangeSet Status can be one of: PREPARING | APPLYING | SUCCEEDED | CANCELLED | FAILED
    def _wait_for_mc_change_set_completion(self, change_set_id):
        while True:
            describe_change_set_resp = self._mc_client.describe_change_set(
                Catalog='AWSMarketplace',
                ChangeSetId=change_set_id
                )
            if describe_change_set_resp['Status'] == 'SUCCEEDED':
                logger.info(
                    f"ChangeSet '{change_set_id}' succeeded:\n\t{describe_change_set_resp}"
                )
                break
            elif describe_change_set_resp['Status'] in ('FAILED', 'CANCELLED'):
                raise Exception(
                    f"ChangeSet '{change_set_id}' failed:\n\t{describe_change_set_resp}"
                )
            else:
                sleep(1)
                logger.info(f"Still waiting on ChangeSet {change_set_id}...")
        return describe_change_set_resp

    def process_s3_event(self, s3_event):
        source_bucket = s3_event['Records'][0]['s3']['bucket']['name']
        source_key = s3_event['Records'][0]['s3']['object']['key']
        source_prefix = '/'.join(source_key.split('/')[0:-1])
        s3_data_source = self._get_s3_data_source(source_bucket, source_prefix)
        obj_name = source_key.split('/')[-1]
        
        s3_data_source.validate_object_name(obj_name)
        
        for data_set in s3_data_source.lambda_s3_trigger_target_data_sets:
            # Create revision
            create_revision_resp = self._de_client.create_revision(
                DataSetId=data_set.id,
                Comment=obj_name
            )
            logger.debug(create_revision_resp)
            revision_id = create_revision_resp['Id']
            revision_arn = create_revision_resp['Arn']

            # Create job
            create_job_resp = self._de_client.create_job(
                Type='IMPORT_ASSETS_FROM_S3',
                Details={
                    'ImportAssetsFromS3': {
                      'AssetSources': [
                          {
                              'Bucket': source_bucket,
                              'Key': source_key
                          },
                      ],
                      'DataSetId': data_set.id,
                      'RevisionId': revision_id
                    }
                }
            )
            logger.debug(create_job_resp)

            # Start job
            job_id = create_job_resp['Id']
            start_job_resp = self._de_client.start_job(JobId=job_id)
            logger.debug(start_job_resp)

            # Wait for Data Exchange job completion
            get_job_resp = self._wait_for_de_job_completion(job_id)
            logger.debug(get_job_resp)

            # Finalize revision
            update_revision_resp = self._de_client.update_revision(
                DataSetId=data_set.id,
                RevisionId=revision_id,
                Finalized=True
            )
            logger.debug(update_revision_resp)

            # Ensure revision finalization succeeded
            finalized_status = update_revision_resp['Finalized']
            if finalized_status is not True:
                raise Exception(f"Failed to finalize revision:\n{update_revision_resp}")

            # Publish the new revision to each product associated with the data set
            for product in data_set.products:
                # Describe the AWS Marketplace entity corresponding to the Data Exchange product
                describe_entity_resp = self._mc_client.describe_entity(
                    Catalog='AWSMarketplace',
                    EntityId=product.id
                )
                logger.debug(describe_entity_resp)

                entity_type = describe_entity_resp['EntityType']
                entity_id = describe_entity_resp['EntityIdentifier']

                # Isolate the target data set in the DescribeEntity response
                describe_entity_resp_data_sets = json.loads(describe_entity_resp['Details'])['DataSets']
                describe_entity_resp_data_set = list(
                    filter(lambda ds: ds['DataSetArn'] == data_set.arn, describe_entity_resp_data_sets)
                )
                # We should get the data set of interest in describe_entity_resp and only that data set
                assert len(describe_entity_resp_data_set) == 1

                # Start a ChangeSet to add the newly finalized revision to an existing product
                start_change_set_resp = self._mc_client.start_change_set(
                    Catalog='AWSMarketplace',
                    ChangeSet=[
                        {
                            "ChangeType": "AddRevisions",
                            "Entity": {
                                "Identifier": entity_id,
                                "Type": entity_type
                            },
                            "Details": json.dumps({
                                "DataSetArn": data_set.arn,
                                "RevisionArns": [revision_arn]
                            })
                        }
                    ]
                )
                logger.debug(start_change_set_resp)

                # Wait for the ChangeSet workflow to complete
                change_set_id = start_change_set_resp['ChangeSetId']
                describe_change_set_resp = self._wait_for_mc_change_set_completion(change_set_id)
                logger.debug(describe_change_set_resp)

The following screenshot shows the S3 trigger for Job 3.

The following screenshot shows an example of CloudWatch logs for Job 3.

The following screenshot shows a CloudWatch alarm for Job 3.

Finally, we can verify that our revisions were successfully added to their corresponding data sets and products through the AWS console.

AWS Data Exchange allows you to create private offers for your AWS account IDs, providing a convenient means of checking that revisions show up in each product as expected.

Conclusion

This post demonstrated how you can integrate AWS Data Exchange into an existing data pipeline frictionlessly. We’re pleased to have been invited to participate in the AWS Data Exchange private preview, and even more pleased with the service itself, which has proven to be a sophisticated yet natural extension of our system.

I want to offer special thanks to both Kyle Patsen and Rafic Melhem of the AWS Data Exchange team for generously fielding my questions (and patiently enduring my ramblings) for the better part of the past year. I also want to thank Lucas Adams for helping me design the system discussed in this post and, more importantly, for his unwavering vote of confidence.

If you are interested in learning more about FPG, don’t hesitate to contact us.

 

New for Amazon Redshift – Data Lake Export and Federated Query

Post Syndicated from Danilo Poccia original https://aws.amazon.com/blogs/aws/new-for-amazon-redshift-data-lake-export-and-federated-queries/

A data warehouse is a database optimized to analyze relational data coming from transactional systems and line of business applications. Amazon Redshift is a fast, fully managed data warehouse that makes it simple and cost-effective to analyze data using standard SQL and existing Business Intelligence (BI) tools.

To get information from unstructured data that would not fit in a data warehouse, you can build a data lake. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale. With a data lake built on Amazon Simple Storage Service (S3), you can easily run big data analytics and use machine learning to gain insights from your semi-structured (such as JSON, XML) and unstructured datasets.

Today, we are launching two new features to help you improve the way you manage your data warehouse and integrate with a data lake:

  • Data Lake Export to unload data from a Redshift cluster to S3 in Apache Parquet format, an efficient open columnar storage format optimized for analytics.
  • Federated Query to be able, from a Redshift cluster, to query across data stored in the cluster, in your S3 data lake, and in one or more Amazon Relational Database Service (RDS) for PostgreSQL and Amazon Aurora PostgreSQL databases.

This architectural diagram gives a quick summary of how these features work and how they can be used together with other AWS services.

Let’s explain the interactions you see in the diagram better, starting from how you can use these features, and the advantages they provide.

Using Redshift Data Lake Export

You can now unload the result of a Redshift query to your S3 data lake in Apache Parquet format. The Parquet format is up to 2x faster to unload and consumes up to 6x less storage in S3, compared to text formats. This enables you to save data transformation and enrichment you have done in Redshift into your S3 data lake in an open format.

You can then analyze the data in your data lake with Redshift Spectrum, a feature of Redshift that allows you to query data directly from files on S3. Or you can use different tools such as Amazon Athena, Amazon EMR, or Amazon SageMaker.

To try this new feature, I create a new cluster from the Redshift console, and follow this tutorial to load sample data that keeps track of sales of musical events across different venues. I want to correlate this data with social media comments on the events stored in my data lake. To understand their relevance, each event should have a way of comparing its relative sales to other events.

Let’s build a query in Redshift to export the data to S3. My data is stored across multiple tables. I need to create a query that gives me a single view of what is going on with sales. I want to join the content of the  sales and date tables, adding information on the gross sales for an event (total_price in the query), and the percentile in terms of all time gross sales compared to all events.

To export the result of the query to S3 in Parquet format, I use the following SQL command:

UNLOAD ('SELECT sales.*, date.*, total_price, percentile
           FROM sales, date,
                (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) / 10.0 as percentile
                   FROM (SELECT eventid, sum(pricepaid) total_price
                           FROM sales
                       GROUP BY eventid)) as percentile_events
          WHERE sales.dateid = date.dateid
            AND percentile_events.eventid = sales.eventid')
TO 's3://MY-BUCKET/DataLake/Sales/'
FORMAT AS PARQUET
CREDENTIALS 'aws_iam_role=arn:aws:iam::123412341234:role/myRedshiftRole';

To give Redshift write access to my S3 bucket, I am using an AWS Identity and Access Management (IAM) role. I can see the result of the UNLOAD command using the AWS Command Line Interface (CLI). As expected, the output of the query is exported using the Parquet columnar data format:

$ aws s3 ls s3://MY-BUCKET/DataLake/Sales/
2019-11-25 14:26:56 1638550 0000_part_00.parquet
2019-11-25 14:26:56 1635489 0001_part_00.parquet
2019-11-25 14:26:56 1624418 0002_part_00.parquet
2019-11-25 14:26:56 1646179 0003_part_00.parquet

To optimize access to data, I can specify one or more partition columns so that unloaded data is automatically partitioned into folders in my S3 bucket. For example, I can unload sales data partitioned by year, month, and day. This enables my queries to take advantage of partition pruning and skip scanning irrelevant partitions, improving query performance and minimizing cost.

To use partitioning, I need to add to the previous SQL command the PARTITION BY option, followed by the columns I want to use to partition the data in different directories. In my case, I want to partition the output based on the year and the calendar date (caldate in the query) of the sales.

UNLOAD ('SELECT sales.*, date.*, total_price, percentile
           FROM sales, date,
                (SELECT eventid, total_price, ntile(1000) over(order by total_price desc) / 10.0 as percentile
                   FROM (SELECT eventid, sum(pricepaid) total_price
                           FROM sales
                       GROUP BY eventid)) as percentile_events
          WHERE sales.dateid = date.dateid
            AND percentile_events.eventid = sales.eventid')
TO 's3://MY-BUCKET/DataLake/SalesPartitioned/'
FORMAT AS PARQUET
PARTITION BY (year, caldate)
CREDENTIALS 'aws_iam_role=arn:aws:iam::123412341234:role/myRedshiftRole';

This time, the output of the query is stored in multiple partitions. For example, here’s the content of a folder for a specific year and date:

$ aws s3 ls s3://MY-BUCKET/DataLake/SalesPartitioned/year=2008/caldate=2008-07-20/
2019-11-25 14:36:17 11940 0000_part_00.parquet
2019-11-25 14:36:17 11052 0001_part_00.parquet
2019-11-25 14:36:17 11138 0002_part_00.parquet
2019-11-25 14:36:18 12582 0003_part_00.parquet

Optionally, I can use AWS Glue to set up a Crawler that (on demand or on a schedule) looks for data in my S3 bucket to update the Glue Data Catalog. When the Data Catalog is updated, I can easily query the data using Redshift Spectrum, Athena, or EMR.

The sales data is now ready to be processed together with the unstructured and semi-structured  (JSON, XML, Parquet) data in my data lake. For example, I can now use Apache Spark with EMR, or any Sagemaker built-in algorithm to access the data and get new insights.

Using Redshift Federated Query
You can now also access data in RDS and Aurora PostgreSQL stores directly from your Redshift data warehouse. In this way, you can access data as soon as it is available. Straight from Redshift, you can now perform queries processing data in your data warehouse, transactional databases, and data lake, without requiring ETL jobs to transfer data to the data warehouse.

Redshift leverages its advanced optimization capabilities to push down and distribute a significant portion of the computation directly into the transactional databases, minimizing the amount of data moving over the network.

Using this syntax, you can add an external schema from an RDS or Aurora PostgreSQL database to a Redshift cluster:

CREATE EXTERNAL SCHEMA IF NOT EXISTS online_system
FROM POSTGRES
DATABASE 'online_sales_db' SCHEMA 'online_system'
URI ‘my-hostname' port 5432
IAM_ROLE 'iam-role-arn'
SECRET_ARN 'ssm-secret-arn';

Schema and port are optional here. Schema will default to public if left unspecified and default port for PostgreSQL databases is 5432. Redshift is using AWS Secrets Manager to manage the credentials to connect to the external databases.

With this command, all tables in the external schema are available and can be used by Redshift for any complex SQL query processing data in the cluster or, using Redshift Spectrum, in your S3 data lake.

Coming back to the sales data example I used before, I can now correlate the trends of my historical data of musical events with real-time sales. In this way, I can understand if an event is performing as expected or not, and calibrate my marketing activities without delays.

For example, after I define the online commerce database as the online_system external schema in my Redshift cluster, I can compare previous sales with what is in the online commerce system with this simple query:

SELECT eventid,
       sum(pricepaid) total_price,
       sum(online_pricepaid) online_total_price
  FROM sales, online_system.current_sales
 GROUP BY eventid
 WHERE eventid = online_eventid;

Redshift doesn’t import database or schema catalog in its entirety. When a query is run, it localizes the metadata for the Aurora and RDS tables (and views) that are part of the query. This localized metadata is then used for query compilation and plan generation.

Available Now
Amazon Redshift data lake export is a new tool to improve your data processing pipeline and is supported with Redshift release version 1.0.10480 or later. Refer to the AWS Region Table for Redshift availability, and check the version of your clusters.

The new federation capability in Amazon Redshift is released as a public preview and allows you to bring together data stored in Redshift, S3, and one or more RDS and Aurora PostgreSQL databases. When creating a cluster in the Amazon Redshift management console, you can pick three tracks for maintenance: Current, Trailing, or Preview. Within the Preview track, preview_features should be chosen to participate to the Federated Query public preview. For example:

These features simplify data processing and analytics, giving you more tools to react quickly, and a single point of view for your data. Let me know what you are going to use them for!

Danilo

Easily Manage Shared Data Sets with Amazon S3 Access Points

Post Syndicated from Brandon West original https://aws.amazon.com/blogs/aws/easily-manage-shared-data-sets-with-amazon-s3-access-points/

Storage that is secure, scalable, durable, and highly available is a fundamental component of cloud computing. That’s why Amazon Simple Storage Service (S3) was the first service launched by AWS, back in 2006. It has been a building block of many of the more than 175 services that AWS now offers. As we approach the beginning of a new decade, capabilities like Amazon Redshift, Amazon Athena, Amazon EMR and AWS Lake Formation have made S3 not just a way to store objects but an engine for turning that data into insights. These capabilities mean that access patterns and requirements for the data stored in buckets have evolved.

Today we’re launching a new way to manage data access at scale for shared data sets in S3: Amazon S3 Access Points. S3 Access Points are unique hostnames with dedicated access policies that describe how data can be accessed using that endpoint. Before S3 Access Points, shared access to data meant managing a single policy document on a bucket. These policies could represent hundreds of applications with many differing permissions, making audits, and updates a potential bottleneck affecting many systems.

With S3 Access Points, you can add access points as you add additional applications or teams, keeping your policies specific and easier to manage. A bucket can have multiple access points, and each access point has its own AWS Identity and Access Management (IAM) policy. Access point policies are similar to bucket policies, but associated with the access point. S3 Access Points can also be restricted to only allow access from within a Amazon Virtual Private Cloud. And because each access point has a unique DNS name, you can now address your buckets with any name that is unique within your AWS account and region.

Creating S3 Access Points

Let’s add an access point to a bucket using the S3 Console. You can also create and manage your S3 Access Points using the AWS Command Line Interface (CLI), AWS SDKs, or via the API. I’ve selected a bucket that contains artifacts generated by a AWS Lambda function, and clicked on the access points tab.

Access points tab in S3 Console

Let’s create a new access point. I want to give an IAM user Alice permission to GET and PUT objects with the prefix Alice. I’m going to name this access point alices-access-point. There are options for restricting access to a Virtual Private Cloud, which just requires a Virtual Private Cloud ID. In this, I want to allow access from outside the VPC as well, so after I took this screenshot, I selected Internet and moved onto the next step.

Creating an Access Point

S3 Access Points makes it easy to block public access. I’m going to block all public access to this access point.

Public access settings

And now I can attach my policy. In this policy, our Principal is our user Alice, and the resource is our access point combined with every object with the prefix /Alice. For more examples of the kinds of policies you might want to attach to your S3 Access Points, take a look at the docs.

Creating access point policy

After I create the access point, I can access it by hostname using the format https://[access_point_name]-[accountID].s3-accesspoint.[region].amazonaws.com. Via the SDKs and CLI, I can use it the same way I would use a bucket once I’ve updated to the latest version. For example, assuming I were authenticated as Alice, I could do the following:

$ aws s3api get-object --key /Alice/object.zip --bucket arn:aws:s3:us-east-1:[my-account-id]:alices-access-point download.zip

Access points that are not restricted to VPCs can also be used via the S3 Console.

Things to Know

When it comes to software design, keeping scopes small and focused on a specific task is almost always a good decision. With S3 Access Points, you can customize hostnames and permissions for any user or application that needs access to your shared data set. Let us know how you like this new capability, and happy building!

— Brandon

Continuously monitor unused IAM roles with AWS Config

Post Syndicated from Michael Chan original https://aws.amazon.com/blogs/security/continuously-monitor-unused-iam-roles-aws-config/

Developing in the cloud encourages you to iterate frequently as your applications and resources evolve. You should also apply this iterative approach to the AWS Identity and Access Management (IAM) roles you create. Periodically ensuring that all the resources you’ve created are still being used can reduce operational complexity by eliminating the need to track unnecessary resources. It also improves security: identifying unused IAM roles helps reduce the potential for improper or unintended access to your critical infrastructure and workloads.

The IAM API now provides you with information about when a role has last been used to make an AWS request. In this post, I demonstrate how you can identify inactive roles using role last used information. Additionally, I’ll show you how to implement continuous monitoring of role activity using AWS Config.

AWS services and features used

This solution uses the following services and features:

  • AWS IAM: This service enables you to manage access to AWS services and resources securely. It provides an API to retrieve the timestamp of an IAM role’s last use when making an AWS request, and the region where the request was made.
  • AWS Config: This service allows you to continuously monitor and record your AWS resource configurations. It will periodically trigger your AWS Config rule (see next bullet) and will record compliance status.
  • AWS Config Rule: This resource represents your desired configuration settings for specific AWS resources or for an entire AWS account. This resource will check the compliance status of your AWS resources. You can provide the logic that determines compliance, which enables you to mark IAM roles in use as “compliant” and inactive roles as “non-compliant.”
  • AWS Lambda: This service lets you run code without provisioning or managing servers. Lambda will be used to execute API calls to retrieve role last used information and to provide compliance evaluations to AWS Config.
  • Amazon Simple Storage Service (Amazon S3): This is a highly available and durable object store. You’ll use it to store your Lambda code in .zip format prior to deploying your Lambda function.
  • AWS CloudFormation: This service provides a common language for you to describe and provision all the infrastructure resources in your cloud environment. You’ll use it to provision all the resources described in this solution.

Solution logic

This solution identifies unused IAM roles within your account. First, you’ll identify unused roles based on a time window (last number of days) you set. I use 60 days in my example, but this range is configurable. Second, you’ll use AWS Lambda to process all the roles in your account. Third, you’ll determine if they’re compliant based on their creation time and role last used information. Last, you’ll send your evaluations to AWS Config, which records the results and reports if each role is compliant or not. If not, you can take steps to remediate, such as denying all actions that the role can perform.

Prerequisites

This solution has the following prerequisites:

Solution architecture

 

Figure 1: Solution architecture

Figure 1: Solution architecture

As shown in the diagram, AWS Config (1) executes the AWS Config custom rule daily, and this frequency is configurable (2), which in turn invokes the Lambda function (3). The Lambda function enumerates each role and determines its creation date and role last used timestamp, both of which are provided via IAM’s GetAccountAuthorizationDetails API (4). When the Lambda function has determined the compliance of all your roles, the function returns the compliance results to AWS Config (5). AWS Config retains the history of compliance changes evaluated by the rule. If configured, compliance notifications can be sent to an Amazon Simple Notification Service (Amazon SNS) topic. Compliance status is viewable either in the AWS Management Console or through use of the AWS CLI or AWS SDK.

Deploying the solution

The resources for this solution are deployed through AWS CloudFormation. You must prepare the Lambda function’s source code for packaging before AWS CloudFormation can deploy the complete solution into your account.

Step 1: Prepare the Lambda deployment

First, make sure you’re running a *nix prompt (Linux, Mac, or Windows subsystem for Linux). Follow the commands below to create an empty folder named iam-role-last-used where you’ll place your Lambda source code.


mkdir iam-role-last-used
cd iam-role-last-used
touch lambda_function.py

Note that the directory you create and the code it contains will later be compressed into a .zip file by the AWS CLI’s cloudformation package command. This command also uploads the deployment .zip file to your S3 bucket. The cloudformation deploy command will reference this bucket when deploying the solution.

Next, create a Lambda layer with the latest boto3 package. This ensures that your Lambda function is using an up-to-date boto3 SDK and allows you to control the dependencies in your function’s deployment package. You can do this by following steps 1 through 4 in these directions. Be sure to record the Lambda layer ARN that you create because you will use it later.

Finally, open the lambda_function.py file in your favorite editor or integrated development environment (IDE), and place the following code into the lambda_function.py file:


import boto3
from botocore.exceptions import ClientError
from botocore.config import Config
import datetime
import fnmatch
import json
import os
import re
import logging


logger = logging.getLogger()
logging.basicConfig(
    format="[%(asctime)s] %(levelname)s [%(module)s.%(funcName)s:%(lineno)d] %(message)s", datefmt="%H:%M:%S"
)
logger.setLevel(os.getenv('log_level', logging.INFO))

# Configure boto retries
BOTO_CONFIG = Config(retries=dict(max_attempts=5))

# Define the default resource to report to Config Rules
DEFAULT_RESOURCE_TYPE = 'AWS::IAM::Role'

CONFIG_ROLE_TIMEOUT_SECONDS = 60

# Set to True to get the lambda to assume the Role attached on the Config service (useful for cross-account).
ASSUME_ROLE_MODE = False

# Evaluation strings for Config evaluations
COMPLIANT = 'COMPLIANT'
NON_COMPLIANT = 'NON_COMPLIANT'


# This gets the client after assuming the Config service role either in the same AWS account or cross-account.
def get_client(service, execution_role_arn):
    if not ASSUME_ROLE_MODE:
        return boto3.client(service)
    credentials = get_assume_role_credentials(execution_role_arn)
    return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'],
                        aws_secret_access_key=credentials['SecretAccessKey'],
                        aws_session_token=credentials['SessionToken'],
                        config=BOTO_CONFIG
                        )


def get_assume_role_credentials(execution_role_arn):
    sts_client = boto3.client('sts')
    try:
        assume_role_response = sts_client.assume_role(RoleArn=execution_role_arn,
                                                      RoleSessionName="configLambdaExecution",
                                                      DurationSeconds=CONFIG_ROLE_TIMEOUT_SECONDS)
        return assume_role_response['Credentials']
    except ClientError as ex:
        if 'AccessDenied' in ex.response['Error']['Code']:
            ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role."
        else:
            ex.response['Error']['Message'] = "InternalError"
            ex.response['Error']['Code'] = "InternalError"
        raise ex


# Validates role pathname whitelist as passed via AWS Config parameters and returns a list of comma separated patterns.
def validate_whitelist(unvalidated_role_pattern_whitelist):
    # Names of users, groups, roles must be alphanumeric, including the following common
    # characters: plus (+), equal (=), comma (,), period (.), at (@), underscore (_), and hyphen (-).

    if not unvalidated_role_pattern_whitelist:
        return None

    regex = re.compile('^[-a-zA-Z0-9+=,[email protected]_/|*]+')
    if regex.search(unvalidated_role_pattern_whitelist):
        raise ValueError("[Error] Provided whitelist has invalid characters")

    return unvalidated_role_pattern_whitelist.split('|')


# This uses Unix filename pattern matching (as opposed to regular expressions), as documented here:
# https://docs.python.org/3.7/library/fnmatch.html.  Please note that if using a wildcard, e.g. "*", you should use
# it sparingly/appropriately.
# If the rolename matches the pattern, then it is whitelisted
def is_whitelisted_role(role_pathname, pattern_list):
    if not pattern_list:
        return False

    # If role_pathname matches pattern, then return True, else False
    # eg. /service-role/aws-codestar-service-role matches pattern /service-role/*
    # https://docs.python.org/3.7/library/fnmatch.html
    for pattern in pattern_list:
        if fnmatch.fnmatch(role_pathname, pattern):
            # whitelisted
            return True

    # not whitelisted
    return False


# Form an evaluation as a dictionary. Suited to report on scheduled rules.  More info here:
#   https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/config.html#ConfigService.Client.put_evaluations
def build_evaluation(resource_id, compliance_type, notification_creation_time, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None):
    evaluation = {}
    if annotation:
        evaluation['Annotation'] = annotation
    evaluation['ComplianceResourceType'] = resource_type
    evaluation['ComplianceResourceId'] = resource_id
    evaluation['ComplianceType'] = compliance_type
    evaluation['OrderingTimestamp'] = notification_creation_time
    return evaluation


# Determine if any roles were used to make an AWS request
def determine_last_used(role_name, role_last_used, max_age_in_days, notification_creation_time):

    last_used_date = role_last_used.get('LastUsedDate', None)
    used_region = role_last_used.get('Region', None)

    if not last_used_date:
        compliance_result = NON_COMPLIANT
        reason = "No record of usage"
        logger.info(f"NON_COMPLIANT: {role_name} has never been used")
        return build_evaluation(role_name, compliance_result, notification_creation_time, resource_type=DEFAULT_RESOURCE_TYPE, annotation=reason)


    days_unused = (datetime.datetime.now() - last_used_date.replace(tzinfo=None)).days

    if days_unused > max_age_in_days:
        compliance_result = NON_COMPLIANT
        reason = f"Was used {days_unused} days ago in {used_region}"
        logger.info(f"NON_COMPLIANT: {role_name} has not been used for {days_unused} days, last use in {used_region}")
        return build_evaluation(role_name, compliance_result, notification_creation_time, resource_type=DEFAULT_RESOURCE_TYPE, annotation=reason)

    compliance_result = COMPLIANT
    reason = f"Was used {days_unused} days ago in {used_region}"
    logger.info(f"COMPLIANT: {role_name} used {days_unused} days ago in {used_region}")
    return build_evaluation(role_name, compliance_result, notification_creation_time, resource_type=DEFAULT_RESOURCE_TYPE, annotation=reason)


# Returns a list of docts, each of which has authorization details of each role.  More info here:
#   https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/iam.html#IAM.Client.get_account_authorization_details
def get_role_authorization_details(iam_client):

    roles_authorization_details = []
    roles_list = iam_client.get_account_authorization_details(Filter=['Role'])

    while True:
        roles_authorization_details += roles_list['RoleDetailList']
        if 'Marker' in roles_list:
            roles_list = iam_client.get_account_authorization_details(Filter=['Role'], MaxItems=100, Marker=roles_list['Marker'])
        else:
            break

    return roles_authorization_details


# Check the compliance of each role by determining if role last used is > than max_days_for_last_used
def evaluate_compliance(event, context):

    # Initialize our AWS clients
    iam_client = get_client('iam', event["executionRoleArn"])
    config_client = get_client('config', event["executionRoleArn"])

    # List of resource evaluations to return back to AWS Config
    evaluations = []

    # List of dicts of each role's authorization details as returned by boto3
    all_roles = get_role_authorization_details(iam_client)

    # Timestamp of when AWS Config triggered this evaluation
    notification_creation_time = str(json.loads(event['invokingEvent'])['notificationCreationTime'])

    # ruleParameters is received from AWS Config's user-defined parameters
    rule_parameters = json.loads(event["ruleParameters"])

    # Maximum allowed days that a role can be unused, or has been last used for an AWS request
    max_days_for_last_used = int(os.environ.get('max_days_for_last_used', '60'))
    if 'max_days_for_last_used' in rule_parameters:
        max_days_for_last_used = int(rule_parameters['max_days_for_last_used'])

    whitelisted_role_pattern_list = []
    if 'role_whitelist' in rule_parameters:
        whitelisted_role_pattern_list = validate_whitelist(rule_parameters['role_whitelist'])

    # Iterate over all our roles.  If the creation date of a role is <= max_days_for_last_used, it is compliant
    for role in all_roles:

        role_name = role['RoleName']
        role_path = role['Path']
        role_creation_date = role['CreateDate']
        role_last_used = role['RoleLastUsed']
        role_age_in_days = (datetime.datetime.now() - role_creation_date.replace(tzinfo=None)).days

        if is_whitelisted_role(role_path + role_name, whitelisted_role_pattern_list):
            compliance_result = COMPLIANT
            reason = "Role is whitelisted"
            evaluations.append(
                build_evaluation(role_name, compliance_result, notification_creation_time, resource_type=DEFAULT_RESOURCE_TYPE, annotation=reason))
            logger.info(f"COMPLIANT: {role} is whitelisted")
            continue

        if role_age_in_days <= max_days_for_last_used:
            compliance_result = COMPLIANT
            reason = f"Role age is {role_age_in_days} days"
            evaluations.append(
                build_evaluation(role_name, compliance_result, notification_creation_time, resource_type=DEFAULT_RESOURCE_TYPE, annotation=reason))
            logger.info(f"COMPLIANT: {role_name} - {role_age_in_days} is newer or equal to {max_days_for_last_used} days")
            continue

        evaluation_result = determine_last_used(role_name, role_last_used, max_days_for_last_used, notification_creation_time)
        evaluations.append(evaluation_result)

    # Iterate over our evaluations 100 at a time, as put_evaluations only accepts a max of 100 evals.
    evaluations_copy = evaluations[:]
    while evaluations_copy:
        config_client.put_evaluations(Evaluations=evaluations_copy[:100], ResultToken=event['resultToken'])
        del evaluations_copy[:100]

Here’s how the above code works. The AWS Config custom rule invokes the Lambda function, calling the evaluate_compliance() method. evaluate_compliance() does the following:

  1. Retrieves information on all roles from IAM using the GetAccountAuthorizationDetails API as mentioned previously. This includes each role’s creation date and role last used timestamp.
  2. Marks each role as compliant if the role name matches one of the patterns in your whitelisted_role_pattern_list. This pattern list is passed to your rule via a user-configurable AWS CloudFormation parameter named RolePatternWhitelist. “Whitelisting roles,” below, provides instructions about how to do this.
  3. Marks each role as compliant if the age of the role in days (role_age_in_days) is less than or equal to the parameter MaxDaysForLastUsed (max_days_for_last_used). This is set via a user-configurable parameter in your CloudFormation stack. You’ll use this parameter to set the time window for how long a role can be inactive.
  4. If neither of the above conditions are met, then determine_last_used() is called, and each role will be marked as non-compliant if days_unused is greater than max_age_in_days.
  5. Finally, evaluate_compliance() calls put_evaluations() against AWS Config to store your evaluations of each role.

Step 2: Deploy the AWS CloudFormation template

Next, create an AWS CloudFormation template file named  iam-role-last-used.yml. This template uses the AWS Serverless Application Model (AWS SAM), which is an extension of CloudFormation. AWS SAM simplifies the deployment so that you don’t have to manually upload your deployment .zip file to your Amazon S3 bucket. To ensure that your template knows the location of your code .zip file, place the file on the same directory level as the iam-role-last-used directory that you created above. Then copy and paste the code below and save it to the iam-role-last-used.yml file.


AWSTemplateFormatVersion: '2010-09-09'
Description: "Creates an AWS Config rule and Lambda to check all roles' last used compliance"
Transform: 'AWS::Serverless-2016-10-31'
Parameters:

  MaxDaysForLastUsed:
    Description: Checks the number of days allowed for a role to not be used before being non-compliant
    Type: Number
    Default: 60
    MaxValue: 365

  NameOfSolution:
    Type: String
    Default: iam-role-last-used
    Description: The name of the solution - used for naming of created resources

  RolePatternWhitelist:
    Description: Pipe separated whitelist of role pathnames using simple pathname matching
    Type: String
    Default: ''
    AllowedPattern: '[-a-zA-Z0-9+=,[email protected]_/|*]+|^$'

  LambdaLayerArn:
    Type: String
    Description: The ARN for the Lambda Layer you will use.
  
Resources:
  LambdaInvokePermission:
    Type: 'AWS::Lambda::Permission'
    DependsOn: CheckRoleLastUsedLambda
    Properties: 
      FunctionName: !GetAtt CheckRoleLastUsedLambda.Arn
      Action: lambda:InvokeFunction
      Principal: config.amazonaws.com
      SourceAccount: !Ref 'AWS::AccountId'

  LambdaExecutionRole:
    Type: 'AWS::IAM::Role'
    Properties:
      RoleName: !Sub '${NameOfSolution}-${AWS::Region}'
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
        - Effect: Allow
          Principal:
            Service: lambda.amazonaws.com
          Action:
          - sts:AssumeRole
      Path: /
      Policies:
      - PolicyName: !Sub '${NameOfSolution}'
        PolicyDocument:
          Version: '2012-10-17'
          Statement:
          - Effect: Allow
            Action:
            - config:PutEvaluations
            Resource: '*'
          - Effect: Allow
            Action:
            - iam:GetAccountAuthorizationDetails
            Resource: '*'
          - Effect: Allow
            Action:
            - logs:CreateLogStream
            - logs:PutLogEvents
            Resource:
            - !Sub 'arn:${AWS::Partition}:logs:${AWS::Region}:*:log-group:/aws/lambda/${NameOfSolution}:log-stream:*'

  CheckRoleLastUsedLambda:
    Type: 'AWS::Serverless::Function'
    Properties:
      Description: "Checks IAM roles' last used info for AWS Config"
      FunctionName: !Sub '${NameOfSolution}'
      Handler: lambda_function.evaluate_compliance
      MemorySize: 256
      Role: !GetAtt LambdaExecutionRole.Arn
      Runtime: python3.7
      Timeout: 300
      CodeUri: ./iam-role-last-used
      Layers:
      - !Ref LambdaLayerArn

  LambdaLogGroup:
    Type: 'AWS::Logs::LogGroup'
    Properties: 
      LogGroupName: !Sub '/aws/lambda/${NameOfSolution}'
      RetentionInDays: 30

  ConfigCustomRule:
    Type: 'AWS::Config::ConfigRule'
    DependsOn:
    - LambdaInvokePermission
    - LambdaExecutionRole
    Properties:
      ConfigRuleName: !Sub '${NameOfSolution}'
      Description: Checks the number of days that an IAM role has not been used to make a service request. If the number of days exceeds the specified threshold, it is marked as non-compliant.
      InputParameters: !Sub '{"role-whitelist":"${RolePatternWhitelist}","max_days_for_last_used":"${MaxDaysForLastUsed}"}'
      Source: 
        Owner: CUSTOM_LAMBDA
        SourceDetails: 
        - EventSource: aws.config
          MaximumExecutionFrequency: TwentyFour_Hours
          MessageType: ScheduledNotification
        SourceIdentifier: !GetAtt CheckRoleLastUsedLambda.Arn

For your reference, below is a summary of the template.

  • Parameters (these are user-configurable variables):
    • MaxDaysForLastUsed—maximum amount of days allowed for a role that has not been used to make an AWS request before becoming non-compliant
    • NameOfSolution—the name of the solution, used for naming of created resources
    • RolePatternWhitelist—a pipe (“|”) separated whitelist of role pathnames using simple pathname matching (see Whitelisting roles below)
    • LambdaLayerArn—the unique ARN for your Lambda layer
  • Resources (these are the AWS resources that will be created within your account):
    • LambdaInvokePermission—allows AWS Config to invoke your Lambda function
    • LambdaExecutionRole—the role and permissions that Lambda will assume to process your roles. The policies assigned to this role allow you to perform the iam:GetAccountAuthorizationDetails, config:PutEvaluations, logs:CreateLogStream, and logs:PutLogEvents actions. The PutEvaluations action allows you to send evaluation results back to AWS Config. The CreateLogStream and PutLogEvents actions allows you to write the Lambda execution logs to AWS CloudWatch Logs.
    • CheckRoleLastUsedLambda—defines your Lambda function and its attributes
    • LambdaLogGroup—logs from Lambda will be written to this CloudWatch Log Group
    • ConfigCustomRule—defines your custom AWS Config rule and its attributes

With the CloudFormation template you created above, use the AWS CLI’s cloudformation package command to zip the deployment package and upload it to the S3 bucket that you specify, as shown below. Make sure to replace <YOUR S3 BUCKET> with your bucket name only. Do not include the s3:// prefix:


aws cloudformation package --region <YOUR REGION> --template-file iam-role-last-used.yml \
--s3-bucket <YOUR S3 BUCKET> \
--output-template-file iam-role-last-used-transformed.yml

This will create the file iam-role-last-used-transformed.yml, which adds a reference to the S3 bucket and the pathname needed by CloudFormation to deploy your Lambda function.

Finally, deploy the solution into your AWS account using the cloudformation deploy command below. You can provide different values for NameOfSolutionMaxDaysForLastAccess, or RolePatternWhitelist by using the –parameter-overrides option. Otherwise, defaults will be used. These are specified at the top of the AWS Cloudformation template pasted above, under the Parameters section.


aws cloudformation deploy --region <YOUR REGION> --template-file iam-role-last-used-transformed.yml \
--stack-name iam-role-last-used \
--parameter-overrides NameOfSolution='iam-role-last-used' \
MaxDaysForLastUsed=60 \
RolePatternWhitelist='/breakglass-role|/security-*' \
LambdaLayerArn='<YOUR LAMBDA LAYER ARN>' \
--capabilities CAPABILITY_NAMED_IAM

The deployment is complete after the AWS CLI indicates success. This typically takes only a few minutes:


Waiting for changeset to be created..
Waiting for stack create/update to complete
Successfully created/updated stack - iam-role-last-used

Step 3: View your findings

Now that your deployment is complete, you can view your compliance findings by going to the AWS Config console.

  1. Select the same region where you deployed the CloudFormation template.
  2. Select Rules in the left pane, which brings up the current list of rules in your account.
  3. Select the iam-role-last-used rule to view the rule’s details, as shown in Figure 2.

When a successful evaluation is indicated in the Overall rule status field, the compliance evaluation is complete. You may need to wait a few minutes for the function to complete successfully as results may not be available yet. You can periodically refresh your web browser to check for completion.
 

Figure 2: AWS Config rule details

Figure 2: AWS Config custom rule details

After the rule completes its evaluations of your roles, you’ll be able to view your compliance results on the same page. In the screenshot below, you can see that there are multiple non-compliant roles. You can switch between viewing compliant and non-compliant resources by selecting the dropdown menu under Compliance status.
 

Figure 3: Viewing the compliance status

Figure 3: Viewing the compliance status

For more insight, you can hover over the “i” symbol, which provides additional information about the role’s non-compliant status (see Figure 4).
 

Figure 4: Hover over the information icon

Figure 4: Hover over the information icon

Step 4: Export a report of your compliance

Once a successful evaluation has completed, you may want to create an exportable report of compliance. You can use the AWS CLI to programmatically script and automatically generate reports for your application, infrastructure, and security teams. They can use these reports to review non-compliant roles and take action if the role is no longer needed. The AWS CLI command below demonstrates how you can achieve this. Note that the command below encompasses a single line:

aws configservice get-compliance-details-by-config-rule –config-rule-name iam-role-last-used –output text –query ‘EvaluationResults [*].{A:EvaluationResultIdentifier.EvaluationResultQualifier.ResourceId,B:ComplianceType,C:Annotation}’

The output is tab-delimited and will be similar to the lines below. The first column displays the role name. The second column states the compliance status. The last column explains the reason for the compliance status:

AdminRole   COMPLIANT      Was last used in us-west-2 46 days ago
Ec2DevRole  NON_COMPLIANT  No record of usage

Remediation

Now that you have a report of non-compliant roles, you must decide what to do with them. If your teams agree that a role is not necessary, the remediation can be to simply delete the role. If unsure, you can retain the role but deny it from performing any action. You can do this by attaching a new permissions policy that will deny all actions for all resources. Re-enabling the role would be as easy as removing the added policy. Otherwise, if the role is necessary but not frequently used, you can whitelist the role through the method below.

Whitelisting roles

Whitelisted roles will be reported as compliant by the custom rule even if left unused. You might have roles such as a security incident response or a break-glass role that require whitelisting.

The whitelist is supplied via the CloudFormation parameter RolePatternWhitelist and is stored as an AWS Config rule parameter. The syntax uses UNIX filename pattern matching. If you need to specify multiple patterns, you can use the | (pipe) character as a delimiter between each pattern. Each delimited pattern will then be matched against the role name, including the path. For example, if you wish to whitelist the breakglass-role, security-incident-response-role and security-audit-role roles, the whitelist patterns you provide to the AWS CloudFormation template might be:

/breakglass-role|/security-*

Important: The use of wildcards (*) should be used thoughtfully, as they will match anything.

Enhancements

In this walkthrough, I’ve kept the architecture and code simple to make the solution easier to follow. You can further customize the solution through the following enhancements:

Conclusion

In this post, I’ve shown you how to use AWS IAM and AWS Config to implement a detective security control that provides visibility into your IAM roles and their last time of use. I’ve also shown how you can view the results in the AWS Management Console and export them using the AWS CLI. Finally, I’ve presented different options for remediation and a means to whitelist roles that are necessary but infrequently used. These techniques can augment your security and compliance program by preventing unintended access through your IAM roles.

Additional resources

If you have feedback about this blog post, submit comments in the Comments section below. If you have questions about this blog post, start a new thread on the IAM forum or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Michael Chan

Michael Chan

Michael is a Developer Advocate for AWS Identity. Prior to this, he was a Professional Services Consultant who assisted customers with their journey to AWS. He enjoys understanding customer problems and working backwards to provide practical solutions.

Roland AbiHanna

Roland is a Sr. Solutions Architect with Amazon Web Services. He’s focused on helping enterprise customers realize their business needs through cloud solutions, specializing in DevOps and automation. Prior to AWS, Roland ran DevOps for a variety of start-ups in Europe and the Middle East. Outside of work, Roland enjoys hiking and searching for the perfect blend of hops, barley, and water.

S3 Replication Update: Replication SLA, Metrics, and Events

Post Syndicated from Jeff Barr original https://aws.amazon.com/blogs/aws/s3-replication-update-replication-sla-metrics-and-events/

S3 Cross-Region Replication has been around since early 2015 (new Cross-Region Replication for Amazon S3), and Same-Region Replication has been around for a couple of months.

Replication is very easy to set up, and lets you use rules to specify that you want to copy objects from one S3 bucket to another one. The rules can specify replication of the entire bucket, or of a subset based on prefix or tag:

You can use replication to copy critical data within or between AWS regions in order to meet regulatory requirements for geographic redundancy as part of a disaster recover plan, or for other operational reasons. You can copy within a region to aggregate logs, set up test & development environments, and to address compliance requirements.

S3’s replication features have been put to great use: Since the launch in 2015, our customers have replicated trillions of objects and exabytes of data! Today I am happy to be able to tell you that we are making it even more powerful, with the addition of Replication Time Control. This feature builds on the existing rule-driven replication and gives you fine-grained control based on tag or prefix so that you can use Replication Time Control with the data set you specify. Here’s what you get:

Replication SLA – You can now take advantage of a replication SLA to increase the predictability of replication time.

Replication Metrics – You can now monitor the maximum replication time for each rule using new CloudWatch metrics.

Replication Events – You can now use events to track any object replications that deviate from the SLA.

Let’s take a closer look!

New Replication SLA
S3 replicates your objects to the destination bucket, with timing influenced by object size & count, available bandwidth, other traffic to the buckets, and so forth. In situations where you need additional control over replication time, you can use our new Replication Time Control feature, which is designed to perform as follows:

  • Most of the objects will be replicated within seconds.
  • 99% of the objects will be replicated within 5 minutes.
  • 99.99% of the objects will be replicated within 15 minutes.

When you enable this feature, you benefit from the associated Service Level Agreement. The SLA is expressed is terms of a percentage of objects that are expected to be replicated within 15 minutes, and provides for billing credits if the SLA is not met:

  • 99.9% to 98.0% – 10% credit
  • 98.0% to 95.0% – 25% credit
  • 95% to 0% – 100% credit

The billing credit applies to a percentage of the Replication Time Control fee, replication data transfer, S3 requests, and S3 storage charges in the destination for the billing period.

I can enable Replication Time Control when I create a new replication rule, and I can also add it to an existing rule:

Replication begins as soon as I create or update the rule. I can use the Replication Metrics and the Replication Events to monitor compliance.

In addition to the existing charges for S3 requests and data transfer between regions, you will pay an extra per-GB charge to use Replication Time Control; see the S3 Pricing page for more information.

Replication Metrics
Each time I enable Replication Time Control for a rule, S3 starts to publish three new metrics to CloudWatch. They are available in the S3 and CloudWatch Consoles:

I created some large tar files, and uploaded them to my source bucket. I took a quick break, and inspected the metrics. Note that I did my testing before the launch, so don’t get overly concerned with the actual numbers. Also, keep in mind that these metrics are aggregated across the replication for display, and are not a precise indication of per-object SLA compliance.

BytesPendingReplication jumps up right after the upload, and then drops down as the replication takes place:

ReplicationLatency peaks and then quickly drops down to zero after S3 Replication transfers over 37 GB from the United States to Australia with a maximum latency of 8.3 minutes:

And OperationsPendingCount tracks the number of objects to be replicated:

I can also set CloudWatch Alarms on the metrics. For example, I might want to know if I have a replication backlog larger than 75 GB (for this to work as expected, I must set the Missing data treatment to Treat missing data as ignore (maintain the alarm state):

These metrics are billed as CloudWatch Custom Metrics.

Replication Events
Finally, you can track replication issues by setting up events on an SQS queue, SNS topic, or Lambda function. Start at the console’s Events section:

You can use these events to monitor adherence to the SLA. For example, you could store Replication time missed threshold and Replication time completed after threshold events in a database to track occasions where replication took longer than expected. The first event will tell you that the replication is running late, and the second will tell you that it has completed, and how late it was.

To learn more, read about Replication.

Available Now
You can start using these features today in all commercial AWS Regions, excluding the AWS China (Beijing) and AWS China (Ningxia) Regions.

Jeff;

PS – If you want to learn more about how S3 works, be sure to attend the re:Invent session: Beyond Eleven Nines: Lessons from the Amazon S3 Culture of Durability.