Tag Archives: Analytics

Build the next generation, cross-account, event-driven data pipeline orchestration product

Post Syndicated from Maria Guerra original https://aws.amazon.com/blogs/big-data/build-the-next-generation-cross-account-event-driven-data-pipeline-orchestration-product/

This is a guest post by Mehdi Bendriss, Mohamad Shaker, and Arvid Reiche from Scout24.

At Scout24 SE, we love data pipelines, with over 700 pipelines running daily in production, spread across over 100 AWS accounts. As we democratize data and our data platform tooling, each team can create, maintain, and run their own data pipelines in their own AWS account. This freedom and flexibility is required to build scalable organizations. However, it’s full of pitfalls. With no rules in place, chaos is inevitable.

We took a long road to get here. We’ve been developing our own custom data platform since 2015, developing most tools ourselves. Since 2016, we have our self-developed legacy data pipeline orchestration tool.

The motivation to invest a year of work into a new solution was driven by two factors:

  • Lack of transparency on data lineage, especially dependency and availability of data
  • Little room to implement governance

As a technical platform, our target user base for our tooling includes data engineers, data analysts, data scientists, and software engineers. We share the vision that anyone with relevant business context and minimal technical skills can create, deploy, and maintain a data pipeline.

In this context, in 2015 we created the predecessor of our new tool, which allows users to describe their pipeline in a YAML file as a list of steps. It worked well for a while, but we faced many problems along the way, notably:

  • Our product didn’t support pipelines to be triggered by the status of other pipelines, but based on the presence of _SUCCESS files in Amazon Simple Storage Service (Amazon S3). Here we relied on periodic pulls. In complex organizations, data jobs often have strong dependencies to other work streams.
  • Given the previous point, most pipelines could only be scheduled based on a rough estimate of when their parent pipelines might finish. This led to cascaded failures when the parents failed or didn’t finish on time.
  • When a pipeline fails and gets fixed, then manually redeployed, all its dependent pipelines must be rerun manually. This means that the data producer bears the responsibility of notifying every single team downstream.

Having data and tooling democratized without the ability to provide insights into which jobs, data, and dependencies exist diminishes synergies within the company, leading to silos and problems in resource allocation. It became clear that we needed a successor for this product that would give more flexibility to the end-user, less computing costs, and no infrastructure management overhead.

In this post, we describe, through a hypothetical case study, the constraints under which the new solution should perform, the end-user experience, and the detailed architecture of the solution.

Case study

Our case study looks at the following teams:

  • The core-data-availability team has a data pipeline named listings that runs every day at 3:00 AM on the AWS account Account A, and produces on Amazon S3 an aggregate of the listings events published on the platform on the previous day.
  • The search team has a data pipeline named searches that runs every day at 5:00 AM on the AWS account Account B, and exports to Amazon S3 the list of search events that happened on the previous day.
  • The rent-journey team wants to measure a metric referred to as X; they create a pipeline named pipeline-X that runs daily on the AWS account Account C, and relies on the data of both previous pipelines. pipeline-X should only run daily, and only after both the listings and searches pipelines succeed.

User experience

We provide users with a CLI tool that we call DataMario (relating to its predecessor DataWario), and which allows users to do the following:

  • Set up their AWS account with the necessary infrastructure needed to run our solution
  • Bootstrap and manage their data pipeline projects (creating, deploying, deleting, and so on)

When creating a new project with the CLI, we generate (and require) every project to have a pipeline.yaml file. This file describes the pipeline steps and the way they should be triggered, alerting, type of instances and clusters in which the pipeline will be running, and more.

In addition to the pipeline.yaml file, we allow advanced users with very niche and custom needs to create their pipeline definition entirely using a TypeScript API we provide them, which allows them to use the whole collection of constructs in the AWS Cloud Development Kit (AWS CDK) library.

For the sake of simplicity, we focus on the triggering of pipelines and the alerting in this post, along with the definition of pipelines through pipeline.yaml.

The listings and searches pipelines are triggered as per a scheduling rule, which the team defines in the pipeline.yaml file as follows:

trigger: 
    schedule: 
        hour: 3

pipeline-x is triggered depending on the success of both the listings and searches pipelines. The team defines this dependency relationship in the project’s pipeline.yaml file as follows:

trigger: 
    executions: 
        allOf: 
            - name: listings 
              account: Account_A_ID 
              status: 
                  - SUCCESS 
            - name: searches 
              account: Account_B_ID 
              status: 
                  - SUCCESS

The executions block can define a complex set of relationships by combining the allOf and anyOf blocks, along with a logical operator operator: OR / AND, which allows mixing the allOf and anyOf blocks. We focus on the most basic use case in this post.

Accounts setup

To support alerting, logging, and dependencies management, our solution has components that must be pre-deployed in two types of accounts:

  • A central AWS account – This is managed by the Data Platform team and contains the following:
    • A central data pipeline Amazon EventBridge bus receiving all the run status changes of AWS Step Functions workflows running in user accounts
    • An AWS Lambda function logging the Step Functions workflow run changes in an Amazon DynamoDB table to verify if any downstream pipelines should be triggered based on the current event and previous run status changes log
    • A Slack alerting service to send alerts to the Slack channels specified by users
    • A trigger management service that broadcasts triggering events to the downstream buses in the user accounts
  • All AWS user accounts using the service – These accounts contain the following:
    • A data pipeline EventBridge bus that receives Step Functions workflow run status changes forwarded from the central EventBridge bus
    • An S3 bucket to store data pipelines artifacts, along their logs
    • Resources needed to run Amazon EMR clusters, like security groups, AWS Identity and Access Management (IAM) roles, and more

With the provided CLI, users can set up their account by running the following code:

$ dpc setup-user-account

Solution overview

The following diagram illustrates the architecture of the cross-account, event-driven pipeline orchestration product.

In this post, we refer to the different colored and numbered squares to reference a component in the architecture diagram. For example, the green square with label 3 refers to the EventBridge bus default component.

Deployment flow

This section is illustrated with the orange squares in the architecture diagram.

A user can create a project consisting of a data pipeline or more using our CLI tool as follows:

$ dpc create-project -n 'project-name'

The created project contains several components that allow the user to create and deploy data pipelines, which are defined in .yaml files (as explained earlier in the User experience section).

The workflow of deploying a data pipeline such as listings in Account A is as follows:

  • Deploy listings by running the command dpc deploy in the root folder of the project. An AWS CDK stack with all required resources is automatically generated.
  • The previous stack is deployed as an AWS CloudFormation template.
  • The stack uses custom resources to perform some actions, such as storing information needed for alerting and pipeline dependency management.
  • Two Lambda functions are triggered, one to store the mapping pipeline-X/slack-channels used for alerting in a DynamoDB table, and another one to store the mapping between the deployed pipeline and its triggers (other pipelines that should result in triggering the current one).
  • To decouple alerting and dependency management services from the other components of the solution, we use Amazon API Gateway for two components:
    • The Slack API.
    • The dependency management API.
  • All calls for both APIs are traced in Amazon CloudWatch log groups and two Lambda functions:
    • The Slack channel publisher Lambda function, used to store the mapping pipeline_name/slack_channels in a DynamoDB table.
    • The dependencies publisher Lambda function, used to store the pipelines dependencies (the mapping pipeline_name/parents) in a DynamoDB table.

Pipeline trigger flow

This is an event-driven mechanism that ensures that data pipelines are triggered as requested by the user, either following a schedule or a list of fulfilled upstream conditions, such as a group of pipelines succeeding or failing.

This flow relies heavily on EventBridge buses and rules, specifically two types of rules:

  • Scheduling rules.
  • Step Functions event-based rules, with a payload matching the set of statuses of all the parents of a given pipeline. The rules indicate for which set of statuses all the parents of pipeline-X should be triggered.

Scheduling

This section is illustrated with the black squares in the architecture diagram.

The listings pipeline running on Account A is set to run every day at 3:00 AM. The deployment of this pipeline creates an EventBridge rule and a Step Functions workflow for running the pipeline:

  • The EventBridge rule is of type schedule and is created on the default bus (this is the EventBridge bus responsible for listening to native AWS events—this distinction is important to avoid confusion when introducing the other buses). This rule has two main components:
    • A cron-like notation to describe the frequency at which it runs: 0 3 * * ? *.
    • The target, which is the Step Functions workflow describing the workflow of the listings pipeline.
  • The listings Step Function workflow describes and runs immediately when the rule gets triggered. (The same happens to the searches pipeline.)

Each user account has a default EventBridge bus, which listens to the default AWS events (such as the run of any Lambda function) and scheduled rules.

Dependency management

This section is illustrated with the green squares in the architecture diagram. The current flow starts after the Step Functions workflow (black square 2) starts, as explained in the previous section.

As a reminder, pipeline-X is triggered when both the listings and searches pipelines are successful. We focus on the listings pipeline for this post, but the same applies to the searches pipeline.

The overall idea is to notify all downstream pipelines that depend on it, in every AWS account, passing by and going through the central orchestration account, of the change of status of the listings pipeline.

It’s then logical that the following flow gets triggered multiple times per pipeline (Step Functions workflow) run as its status changes from RUNNING to either SUCCEEDED, FAILED, TIMED_OUT, or ABORTED. The reason being that there could be pipelines downstream potentially listening on any of those status change events. The steps are as follows:

  • The event of the Step Functions workflow starting is listened to by the default bus of Account A.
  • The rule export-events-to-central-bus, which specifically listens to the Step Function workflow run status change events, is then triggered.
  • The rule forwards the event to the central bus on the central account.
  • The event is then caught by the rule trigger-events-manager.
  • This rule triggers a Lambda function.
  • The function gets the list of all children pipelines that depend on the current run status of listings.
  • The current run is inserted in the run log Amazon Relational Database Service (Amazon RDS) table, following the schema sfn-listings, time (timestamp), status (SUCCEEDED, FAILED, and so on). You can query the run log RDS table to evaluate the running preconditions of all children pipelines and get all those that qualify for triggering.
  • A triggering event is broadcast in the central bus for each of those eligible children.
  • Those events get broadcast to all accounts through the export rules—including Account C, which is of interest in our case.
  • The default EventBridge bus on Account C receives the broadcasted event.
  • The EventBridge rule gets triggered if the event content matches the expected payload of the rule (notably that both pipelines have a SUCCEEDED status).
  • If the payload is valid, the rule triggers the Step Functions workflow pipeline-X and triggers the workflow to provision resources (which we discuss later in this post).

Alerting

This section is illustrated with the gray squares in the architecture diagram.

Many teams handle alerting differently across the organization, such as Slack alerting messages, email alerts, and OpsGenie alerts.

We decided to allow users to choose their preferred methods of alerting, giving them the flexibility to choose what kind of alerts to receive:

  • At the step level – Tracking the entire run of the pipeline
  • At the pipeline level – When it fails, or when it finishes with a SUCCESS or FAILED status

During the deployment of the pipeline, a new Amazon Simple Notification Service (Amazon SNS) topic gets created with the subscriptions matching the targets specified by the user (URL for OpsGenie, Lambda for Slack or email).

The following code is an example of what it looks like in the user’s pipeline.yaml:

notifications:
    type: FULL_EXECUTION
    targets:
        - channel: SLACK
          addresses:
               - data-pipeline-alerts
        - channel: EMAIL
          addresses:
               - [email protected]

The alerting flow includes the following steps:

  1. As the pipeline (Step Functions workflow) starts (black square 2 in the diagram), the run gets logged into CloudWatch Logs in a log group corresponding to the name of the pipeline (for example, listings).
  2. Depending on the user preference, all the run steps or events may get logged or not thanks to a subscription filter whose target is the execution-tracker-lambda Lambda function. The function gets called anytime a new event gets published in CloudWatch.
  3. This Lambda function parses and formats the message, then publishes it to the SNS topic.
  4. For the email and OpsGenie flows, the flow stops here. For posting the alert message on Slack, the Slack API caller Lambda function gets called with the formatted event payload.
  5. The function then publishes the message to the /messages endpoint of the Slack API Gateway.
  6. The Lambda function behind this endpoint runs, and posts the message in the corresponding Slack channel and under the right Slack thread (if applicable).
  7. The function retrieves the secret Slack REST API key from AWS Secrets Manager.
  8. It retrieves the Slack channels in which the alert should be posted.
  9. It retrieves the root message of the run, if any, so that subsequent messages get posted under the current run thread on Slack.
  10. It posts the message on Slack.
  11. If this is the first message for this run, it stores the mapping with the DB schema execution/slack_message_id to initiate a thread for future messages related to the same run.

Resource provisioning

This section is illustrated with the light blue squares in the architecture diagram.

To run a data pipeline, we need to provision an EMR cluster, which in turn requires some information like Hive metastore credentials, as shown in the workflow. The workflow steps are as follows:

  • Trigger the Step Functions workflow listings on schedule.
  • Run the listings workflow.
  • Provision an EMR cluster.
  • Use a custom resource to decrypt the Hive metastore password to be used in Spark jobs relying on central Hive tables or views.

End-user experience

After all preconditions are fulfilled (both the listings and searches pipelines succeeded), the pipeline-X workflow runs as shown in the following diagram.

As shown in the diagram, the pipeline description (as a sequence of steps) defined by the user in the pipeline.yaml is represented by the orange block.

The steps before and after this orange section are automatically generated by our product, so users don’t have to take care of provisioning and freeing compute resources. In short, the CLI tool we provide our users synthesizes the user’s pipeline definition in the pipeline.yaml and generates the corresponding DAG.

Additional considerations and next steps

We tried to stay consistent and stick to one programming language for the creation of this product. We chose TypeScript, which played well with AWS CDK, the infrastructure as code (IaC) framework that we used to build the infrastructure of the product.

Similarly, we chose TypeScript for building the business logic of our Lambda functions, and of the CLI tool (using Oclif) we provide for our users.

As demonstrated in this post, EventBridge is a powerful service for event-driven architectures, and it plays a central and important role in our products. As for its limitations, we found that pairing Lambda and EventBridge could fulfill all our current needs and granted a high level of customization that allowed us to be creative in the features we wanted to serve our users.

Needless to say, we plan to keep developing the product, and have a multitude of ideas, notably:

  • Extend the list of core resources on which workloads run (currently only Amazon EMR) by adding other compute services, such Amazon Elastic Compute Cloud (Amazon EC2)
  • Use the Constructs Hub to allow users in the organization to develop custom steps to be used in all data pipelines (we currently only offer Spark and shell steps, which suffice in most cases)
  • Use the stored metadata regarding pipeline dependencies for data lineage, to have an overview of the overall health of the data pipelines in the organization, and more

Conclusion

This architecture and product brought many benefits. It allows us to:

  • Have a more robust and clear dependency management of data pipelines at Scout24.
  • Save on compute costs by avoiding scheduling pipelines based approximately on when its predecessors are usually triggered. By shifting to an event-driven paradigm, no pipeline gets started unless all its prerequisites are fulfilled.
  • Track our pipelines granularly and in real time on a step level.
  • Provide more flexible and alternative business logic by exposing multiple event types that downstream pipelines can listen to. For example, a fallback downstream pipeline might be run in case of a parent pipeline failure.
  • Reduce the cross-team communication overhead in case of failures or stopped runs by increasing the transparency of the whole pipelines’ dependency landscape.
  • Avoid manually restarting pipelines after an upstream pipeline is fixed.
  • Have an overview of all jobs that run.
  • Support the creation of a performance culture characterized by accountability.

We have big plans for this product. We will use DataMario to implement granular data lineage, observability, and governance. It’s a key piece of infrastructure in our strategy to scale data engineering and analytics at Scout24.

We will make DataMario open source towards the end of 2022. This is in line with our strategy to promote our approach to a solution on a self-built, scalable data platform. And with our next steps, we hope to extend this list of benefits and ease the pain in other companies solving similar challenges.

Thank you for reading.


About the authors

Mehdi Bendriss is a Senior Data / Data Platform Engineer, MSc in Computer Science and over 9 years of experience in software, ML, and data and data platform engineering, designing and building large-scale data and data platform products.

Mohamad Shaker is a Senior Data / Data Platform Engineer, with over 9 years of experience in software and data engineering, designing and building large-scale data and data platform products that enable users to access, explore, and utilize their data to build great data products.

Arvid Reiche is a Data Platform Leader, with over 9 years of experience in data, building a data platform that scales and serves the needs of the users.

Marco Salazar is a Solutions Architect working with Digital Native customers in the DACH region with over 5 years of experience building and delivering end-to-end, high-impact, cloud native solutions on AWS for Enterprise and Sports customers across EMEA. He currently focuses on enabling customers to define technology strategies on AWS for the short- and long-term that allow them achieve their desired business objectives, specializing on Data and Analytics engagements. In his free time, Marco enjoys building side-projects involving mobile/web apps, microcontrollers & IoT, and most recently wearable technologies.

Your guide to AWS Analytics at re:Invent 2022

Post Syndicated from Imtiaz Sayed original https://aws.amazon.com/blogs/big-data/your-guide-to-aws-analytics-at-reinvent-2022/

Join the global cloud community at AWS re:Invent this year to meet, get inspired, and rethink what’s possible!

Reserved seating is available for registered attendees to secure seats in the sessions of their choice. You can reserve a seat in your favorite sessions by signing in to the attendee portal and navigating to Event Sessions. For those who can’t make it in person, you can get your free online pass to watch live keynotes and leadership sessions by registering for a virtual-only access. This curated attendee guide helps data and analytics enthusiasts manage their schedule*, as well as navigate the AWS analytics and business intelligence tracks to get the best out of re:Invent.

For additional session details, visit the AWS Analytics splash page.

#AWSanalytics, #awsfordata, #reinvent22

Keynotes

KEY002 | Adam Selipsky (CEO, Amazon Web Services) | Tuesday, November 29 | 8:30 AM – 10:30 AM

Join Adam Selipsky, CEO of Amazon Web Services, as he looks at the ways that forward-thinking builders are transforming industries and even our future, powered by AWS.

KEY003 | Swami Sivasubramanian (Vice President, AWS Data and Machine Learning) | Wednesday, November 30 | 8:30 AM – 10:30 AM

Join Swami Sivasubramanian, Vice President of AWS Data and Machine Learning, as he reveals the latest AWS innovations that can help you transform your company’s data into meaningful insights and actions for your business.

Leadership sessions

ANT203-L | Unlock the value of your data with AWS analytics | G2 Krishnamoorthy, VP of AWS Analytics | Wednesday, November 30 | 2:30 PM – 3:30 PM

G2 addresses the current state of analytics on AWS, covers the latest service innovations around data, and highlights customer successes with AWS analytics. Also, learn from organizations like FINRA and more who have turned to AWS for their digital transformation journey.

Breakout sessions

AWS re:Invent breakout sessions are lecture-style and one hour long sessions delivered by AWS experts, customers, and partners.

Monday, Nov 28 Tuesday, Nov 29 Wednesday, Nov 30 Thursday, Dec 1 Friday, Dec 2

10:00 AM – 11:00 AM

ANT326 | How BMW, Intuit, and Morningstar are transforming with AWS and Amazon Athena

11:00 AM – 12:00 PM

ANT301 | Democratizing your organization’s data analytics experience

10:00 AM – 11:00 AM

ANT212 | How JPMC and LexisNexis modernize analytics with Amazon Redshift

12:30 PM – 1:30 PM

ANT207 | What’s new in AWS streaming

8:30 AM – 9:30 AM

ANT311 | Building security operations with Amazon OpenSearch Service

11:30 AM – 12:30 PM

ANT206 | What’s new in Amazon OpenSearch Service

12:15 PM – 1:15 PM

ANT334 | Simplify and accelerate data integration and ETL modernization with AWS Glue

10:00 AM – 11:00 AM

ANT209 | Build interactive analytics applications

12:30 PM – 1:30 PM

BSI203 | Differentiate your apps with Amazon QuickSight embedded analytics

.

12:15 PM – 1:15 PM

ANT337 | Migrating to Amazon EMR to reduce costs and simplify operations

1:15 PM – 2:15 PM

ANT205 | Achieving your modern data architecture

10:45 AM – 11:45 AM

ANT218 | Leveling up computer vision and artificial intelligence development

1:15 PM – 2:15 PM

ANT336 | Building data mesh architectures on AWS

.

1:00 PM – 2:00 PM

ANT341 | How Riot Games processes 20 TB of analytics data daily on AWS

2:00 PM – 3:00 PM

BSI201 | Reinvent how you derive value from your data with Amazon QuickSight

11:30 AM – 12:30 PM

ANT340 | How Sony Orchard accelerated innovation with Amazon MSK

2:00 PM – 3:00 PM

ANT342 | How Poshmark accelerates growth via real-time analytics and personalization

.

1:45 PM – 2:45 PM

BSI207 | Get clarity on your data in seconds with Amazon QuickSight Q

2:45 PM – 3:45 PM

ANT339 | How Samsung modernized architecture for real-time analytics

1:00 PM – 2:00 PM

ANT201 | What’s new with Amazon Redshift

3:30 PM – 4:30 PM

ANT219 | Dow Jones and 3M: Observability with Amazon OpenSearch Service

.

3:15 PM – 4:15 PM

ANT302 | What’s new with Amazon EMR

3:30 PM – 4:30 PM

ANT204 | Enabling agility with data governance on AWS

2:30 PM – 3:30 PM

BSI202 | Migrate to cloud-native business analytics with Amazon QuickSight

. .

4:45 PM – 5:45 PM

ANT335 | How Disney Parks uses AWS Glue to replace thousands of Hadoop jobs

5:00 PM – 6:00 PM

ANT338 | Scaling data processing with Amazon EMR at the speed of market volatility

4:45 PM – 5:45 PM

ANT324 | Modernize your data warehouse

. .

5:30 PM – 6:30 PM

ANT220 | Using Amazon AppFlow to break down data silos for analytics and ML

5:45 PM – 6:45 PM

ANT325 | Simplify running Apache Spark and Hive apps with Amazon EMR Serverless

5:30 PM – 6:30 PM

ANT317 | Self-service analytics with Amazon Redshift Serverless

. .

Chalk talks

Chalk talks are an hour long, highly interactive content format with a small audience. Each begins with a short lecture delivered by an AWS expert, followed by a Q&A session with the audience.

Monday, Nov 28 Tuesday, Nov 29 Wednesday, Nov 30 Thursday, Dec 1 Friday, Dec 2

12:15 PM – 1:15 PM

ANT303 | Security and data access controls in Amazon EMR

11:00 AM – 12:00 PM

ANT318 [Repeat] | Build event-based microservices with AWS streaming services

9:15 AM – 10:15 AM

ANT320 [Repeat] | Get better price performance in cloud data warehousing with Amazon Redshift

11:45 AM – 12:45 PM

ANT329 | Turn data to insights in seconds with secure and reliable Amazon Redshift

9:15 AM – 10:15 AM

ANT314 [Repeat] | Why and how to migrate to Amazon OpenSearch Service

12:15 PM – 1:15 PM

BSI401 | Insightful dashboards through advanced calculations with QuickSight

11:45 AM – 12:45 PM

BSI302 | Deploy your BI assets at scale to thousands with Amazon QuickSight

10:45 AM – 11:45 AM

ANT330 [Repeat] | Run Apache Spark on Kubernetes with Amazon EMR on Amazon EKS

1:15 PM – 2:15 PM

ANT401 | Ingest machine-generated data at scale with Amazon OpenSearch Service

10:00 AM – 11:00 AM

ANT322 [Repeat] | Simplifying ETL migration and data integration with AWS Glue

1:00 PM – 2:00 PM

ANT323 [Repeat] | Break through data silos with Amazon Redshift

1:15 PM – 2:15 PM

ANT327 | Modernize your analytics architecture with Amazon Athena

12:15 PM – 1:15 PM

ANT323 [Repeat] | Break through data silos with Amazon Redshift

2:00 PM – 3:00 PM

ANT333 [Repeat] | Build a serverless data streaming workload with Amazon Kinesis

..

1:45 PM – 2:45 PM

ANT319 | Democratizing ML for data analysts

2:45 PM – 3:45 PM

ANT320 [Repeat] | Get better price performance in cloud data warehousing with Amazon Redshift

4:00 PM – 5:00 PM

ANT314 [Repeat] | Why and how to migrate to Amazon OpenSearch Service

.2:00 AM – 3:00 PM

ANT330 [Repeat] | Run Apache Spark on Kubernetes with Amazon EMR on Amazon EKS

.

1:45 PM – 2:45 PM

ANT322 [Repeat] | Simplifying ETL migration and data integration with AWS Glue

2:45 PM – 3:45 PM

BSI301 | Architecting multi-tenancy for your apps with Amazon QuickSight

4:45 PM – 5:45 PM

ANT333 [Repeat] | Build a serverless data streaming workload with Amazon Kinesis

. .

5:30 PM – 6:30 PM

ANT315 | Optimizing Amazon OpenSearch Service domains for scale and cost

4:15 PM – 5:15 PM

ANT304 | Run serverless Spark workloads with AWS analytics

4:45 PM – 5:45 PM

ANT331 | Understanding TCO for different Amazon EMR deployment models

. .
.

5:00 PM – 6:00 PM

ANT328 | Build transactional data lakes using open-table formats in Amazon Athena

4:45 PM – 5:45 PM

ANT321 | What’s new in AWS Lake Formation

. .
. .

7:00 PM – 8:00 PM

ANT318 [Repeat] | Build event-based microservices with AWS streaming services

. .

Builders’ sessions

These are one-hour small-group sessions with up to nine attendees per table and one AWS expert. Each builders’ session begins with a short explanation or demonstration of what you’re going to build. Once the demonstration is complete, bring your laptop to experiment and build with the AWS expert.

Monday, Nov 28 Tuesday, Nov 29 Wednesday, Nov 30 Thursday, Dec 1 Friday, Dec 2
………………………….

11:00 AM – 12:00 PM

ANT402 | Human vs. machine: Amazon Redshift ML inferences

1:00 PM – 2:00 PM

ANT332 | Build a data pipeline using Apache Airflow and Amazon EMR Serverless

11:00 AM – 12:00 PM

ANT316 [Repeat] | How to build dashboards for machine-generated data

………………………
. .

7:00 PM – 8:00 PM

ANT316 [Repeat] | How to build dashboards for machine-generated data

. .

Workshops

Workshops are two-hour interactive sessions where you work in teams or individually to solve problems using AWS services. Each workshop starts with a short lecture, and the rest of the time is spent working the problem. Bring your laptop to build along with AWS experts.

Monday, Nov 28 Tuesday, Nov 29 Wednesday, Nov 30 Thursday, Dec 1 Friday, Dec 2

10:00 AM – 12:00 PM

ANT306 [Repeat] | Beyond monitoring: Observability with operational analytics

11:45 AM – 1:45 PM

ANT313 | Using Apache Spark for data science and ML workflows with Amazon EMR

8:30 AM – 10:30 AM

ANT307 | Improve search relevance with ML in Amazon OpenSearch Service

11:00 AM – 1:00 PM

ANT403 | Event detection with Amazon MSK and Amazon Kinesis Data Analytics

8:30 AM – 10:30 AM

ANT309 [Repeat]| Build analytics applications using Apache Spark with Amazon EMR Serverless

4:00 PM – 6:00 PM

ANT309 [Repeat]| Build analytics applications using Apache Spark with Amazon EMR Serverless

2:45 PM – 4:45 PM

ANT310 [Repeat] | Build a data mesh with AWS Lake Formation and AWS Glue

12:15 PM – 2:15 PM

ANT306 [Repeat] | Beyond monitoring: Observability with operational analytics

11:45 AM – 1:45 PM

BSI205 | Build stunning customized dashboards with Amazon QuickSight

.
. .

12:15 PM – 2:15 PM

ANT312 | Near real-time ML inferences with Amazon Redshift

2:45 PM – 4:45 PM

ANT308 | Seamless data sharing using Amazon

.
. .

5:30 PM – 7:30 PM

ANT310 [Repeat] | Build a data mesh with AWS Lake Formation and AWS Glue

. .
. .

5:30 PM – 7:30 PM

BSI303 | Seamlessly embed analytics into your apps with Amazon QuickSight

. .

* All schedules are in PDT time zone.

AWS Analytics & Business Intelligence kiosks

Join us at the AWS Analytics Kiosk in the AWS Village at the Expo. Dive deep into AWS Analytics with AWS subject matter experts, see the latest demos, ask questions, or just drop by to socially connect with your peers.


About the author

Imtiaz (Taz) Sayed is the WW Tech Leader for Analytics at AWS. He enjoys engaging with the community on all things data and analytics. He can be reached via
LinkedIn.

Retain more for less with tiered storage for Amazon MSK

Post Syndicated from Masudur Rahaman Sayem original https://aws.amazon.com/blogs/big-data/retain-more-for-less-with-tiered-storage-for-amazon-msk/

Organizations are adopting Apache Kafka and Amazon Managed Streaming for Apache Kafka (Amazon MSK) to capture and analyze data in real-time. Amazon MSK allows you to build and run production applications on Apache Kafka without needing Kafka infrastructure management expertise or having to deal with the complex overheads associated with running Apache Kafka on your own. With increasing maturity, customers seek to build sophisticated use cases that combine aspects of real time and batch processing. For instance, you may want to train machine learning (ML) models based on historic data and then use these models to do real time inferencing. Or you may want to be able to recompute previous results when the application logic changed, e.g., when a new KPI is added to a streaming analytics application or when a bug was fixed that caused incorrect output. These use cases often require storing data for several weeks, months, or even years.

Apache Kafka is well positioned to support these kind of use cases. Data is retained in the Kafka cluster as long as required by configuring the retention policy. In this way, the most recent data can be processed in real time for low-latency use cases while historic data remains accessible in the cluster and can be processed in a batch fashion.

However, retaining data in a Kafka cluster can become expensive because storage and compute are tightly coupled in a cluster. To scale storage, you need to add more brokers. But adding more brokers with the sole purpose of increasing the storage squanders the rest of the compute resources like CPU and memory. Also, a large cluster with more nodes adds operational complexity with a longer time to recover and rebalance when a broker fails. To avoid that operational complexity and higher cost, you can move your data to Amazon Simple Storage Service (Amazon S3) for long-term access and with cost-effective storage classes in Amazon S3 you can optimize your overall storage cost. This solves cost challenges, but now you have to build and maintain that part of the architecture for data movement to a different data store. You also need to build different data processing logic using different APIs for consuming data (Kafka API for streaming, Amazon S3 API for historic reads).

Today, we’re announcing Amazon MSK tiered storage, which brings a virtually unlimited and low-cost storage tier for Amazon MSK, making it simpler and cost-effective for developers to build streaming data applications. Since the launch of Amazon MSK in 2019, we have enabled capabilities such as vertical scaling and automatic scaling of broker storage so you can operate your Kafka workloads in a cost-effective way. Earlier this year, we launched provisioned throughput which enables seamlessly scaling I/O without having to provision additional brokers. Tiered storage makes it even more cost-effective for you to run Kafka workloads. You can now store data in Apache Kafka without worrying about limits. You can effectively balance your performance and costs by using the performance-optimized primary storage for real-time data and the new low-cost tier for the historical data. With a few clicks, you can move streaming data into a lower-cost tier to store data and only pay for what you use.

Tiered storage frees you from making hard trade-offs between supporting the data retention needs of your application teams and the operational complexity that comes with it. This enables you to use the same code to process both real-time and historical data to minimize redundant workflows and simplify architectures. With Amazon MSK tiered storage, you can implement a Kappa architecture – a streaming-first software architecture deployment pattern – to use the same data processing pipeline for correctness and completeness of data over a much longer time horizon for business analysis.

How Amazon MSK tiered storage works

Let’s look at how tiered storage works for Amazon MSK. Apache Kafka stores data in files called log segments. As each segment completes, based on the segment size configured at cluster or topic level, it’s copied to the low-cost storage tier. Data is held in performance-optimized storage for a specified retention time, or up to a specified size, and then deleted. There is a separate time and size limit setting for the low-cost storage, which must be longer than the performance-optimized storage tier. If clients request data from segments stored in the low-cost tier, the broker reads the data from it and serves the data in the same way as if it were being served from the performance-optimized storage. The APIs and existing clients work with minimal changes. When your application starts reading data from the low-cost tier, you can expect an increase in read latency for the first few bytes. As you start reading the remaining data sequentially from the low-cost tier, you can expect latencies that are similar to the primary storage tier. With tiered storage, you pay for the amount of data you store and the amount of data you retrieve.

For a pricing example, let’s consider a workload where your ingestion rate is 15 MB/s, with a replication factor of 3, and you want to retain data in your Kafka cluster for 7 days. For such a workload, it requires 6x m5.large brokers, with 32.4 TB EBS storage, which costs $4,755. But if you use tiered storage for the same workload with local retention of 4 hours and overall data retention of 7 days, it requires 3x m5.large brokers, with 0.8 TB EBS storage and 9 TB of tiered storage, which costs $1,584. If you want to read all the historic data at once, it costs $13 ($0.0015 per GB retrieval cost). In this example with tiered storage, you save around 66% of your overall cost.

Get started using Amazon MSK tiered storage

To enable tiered storage on your existing cluster, upgrade your MSK cluster to Kafka version 2.8.2.tiered and then choose Tiered storage and EBS storage as your cluster storage mode on the Amazon MSK console.

After tiered storage is enabled on the cluster level, run the following command to enable tiered storage on an existing topic. In this example, you’re enabling tiered storage on a topic called msk-ts-topic with 7 days’ retention (local.retention.ms=604800000) for a local high-performance storage tier, setting 180 days’ retention (retention.ms=15550000000) to retain the data in the low-cost storage tier, and updating the log segment size to 48 MB:

bin/kafka-configs.sh --bootstrap-server $bsrv --alter --entity-type topics --entity-name msk-ts-topic --add-config 'remote.storage.enable=true, local.retention.ms=604800000, retention.ms=15550000000, segment.bytes=50331648'

Availability and pricing

Amazon MSK tiered storage is available in all AWS regions where Amazon MSK is available excluding the AWS China, AWS GovCloud regions. This low-cost storage tier scales to virtually unlimited storage and requires no upfront provisioning. You pay only for the volume of data retained and retrieved in the low-cost tier.

For more information about this feature and its pricing, see the Amazon MSK developer guide and Amazon MSK pricing page. For finding the right sizing for your cluster, see the best practices page.

Summary

With Amazon MSK tiered storage you don’t need to provision storage for the low-cost tier or manage the infrastructure. Tiered storage enables you to scale to virtually unlimited storage. You can access data in the low-cost tier using the same clients you currently use to read data from the high-performance primary storage tier. Apache Kafka’s consumer API, streams API, and connectors consume data from both tiers without changes. You can modify the retention limits on the low-cost storage tier similarly as to how you can modify the retention limits on the high-performance storage.

Enable tiered storage on your MSK clusters today to retain data longer at a lower cost.


About the Author

Masudur Rahaman Sayem is a Streaming Architect at AWS. He works with AWS customers globally to design and build data streaming architecture to solve real-world business problems. He is passionate about distributed systems. He also likes to read, especially classic comic books.

Measure the adoption of your Amazon QuickSight dashboards and view your BI portfolio in a single pane of glass

Post Syndicated from Maitri Brahmbhatt original https://aws.amazon.com/blogs/big-data/measure-the-adoption-of-your-amazon-quicksight-dashboards-and-view-your-bi-portfolio-in-a-single-pane-of-glass/

Amazon QuickSight is a fully managed, cloud-native business intelligence (BI) service. If you plan to deploy enterprise-grade QuickSight dashboards, measuring user adoption and usage patterns is an important ingredient for the success of your BI investment. For example, knowing the usage patterns like geo location, department, and job role can help you fine-tune your dashboards to the right audience. Furthermore, to return the investment of your BI portfolio, with dashboard usage, you can reduce license costs by identifying inactive QuickSight authors.

In this post, we introduce the latest Admin Console, an AWS packaged solution that you can easily deploy and use to create a usage and inventory dashboard for your QuickSight assets. The Admin Console helps identify usage patterns of an individual user and dashboards. It can also help you track which dashboards and groups you have or need access to, and what you can do with that access, by providing more details on QuickSight group and user permissions and activities and QuickSight asset (dashboards, analyses, and datasets) permissions. With timely access to interactive usage metrics, the Admin Console can help BI leaders and administrators make a cost-efficient plan for dashboard improvements. Another common use case of this dashboard is to provide a centralized repository of the QuickSight assets. QuickSight artifacts consists of multiple types of assets (dashboards, analyses, datasets, and more) with dependencies between them. Having a single repository to view all assets and their dependencies can be an important element in your enterprise data dictionary.

This post demonstrates how to build the Admin Console using a serverless data pipeline. With basic AWS knowledge, you can create this solution in your own environment within an hour. Alternatively, you can dive deep into the source code to meet your specific needs.

Admin Console dashboard

The following animation displays the contents of our demo dashboard.

The Admin Console dashboard includes six sheets:

  • Landing Page – Provides drill-down into each detailed tabs.
  • User Analysis – Provides detailed analysis of the user behavior and identifies active and inactive users and authors.
  • Dashboard Analysis – Shows the most commonly viewed dashboards.
  • Assets Access Permissions – Provides information on permissions applied to each asset, such as dashboard, analysis, datasets, data source, and themes.
  • Data Dictionary – Provides information on the relationships between each of your assets, such as which analysis was used to build each dashboard, and which datasets and data sources are being used in each analysis. It also provides details on each dataset, including schema name, table name, columns, and more.
  • Overview – Provides instructions on how to use the dashboard.

You can interactively play with the sample dashboard in the following Interactive Dashboard Demo.

Let’s look at Forwood Safety, an innovative, values-driven company with a laser focus on fatality prevention. An early adopter of QuickSight, they collaborated with AWS to deploy this solution to collect BI application usage insights.

“Our engineers love this admin console solution,” says Faye Crompton, Leader of Analytics and Benchmarking at Forwood. “It helps us to understand how users analyze critical control learnings by helping us to quickly identify the most frequently visited dashboards in Forwood’s self-service analytics and reporting tool, FAST.”

Solution overview

The following diagram illustrates the workflow of the solution.

The workflow involves the following steps:

  1. The AWS Lambda function Data_Prepare is scheduled to run hourly. This function calls QuickSight APIs to get the QuickSight namespace, group, user, and asset access permissions information.
  2. The Lambda function Dataset_Info is scheduled to run hourly. This function calls QuickSight APIs to get dashboard, analysis, dataset, and data source information.
  3. Both the functions save the results to an Amazon Simple Storage Service (Amazon S3) bucket.
  4. AWS CloudTrail logs are stored in an S3 bucket.
  5. Based on the file in Amazon S3 that contains user-group information, dataset information, QuickSight assets access permissions information, as well as dashboard views and user login events from the CloudTrail logs, five Amazon Athena tables are created. Optionally, the BI engineer can combine these tables with employee information tables to display human resource information of the users.
  6. Four QuickSight datasets fetch the data from the Athena tables created in Step 5 and import them into SPICE. Then, based on these datasets, a QuickSight dashboard is created.

Prerequisites

For this walkthrough, you should have the following prerequisites:

Create solution resources

We can create all the resources needed for this dashboard using three CloudFormation templates: one for Lambda functions, one for Athena tables, and one for QuickSight objects.

CloudFormation template for Lambda functions

This template creates the Lambda functions data_prepare and dataset_info.

  • Choose Launch Stack and follow the steps to create these resources.

After the stack creation is successful, you have two Lambda functions, data_prepare and dataset_info, and one S3 bucket named admin-console[AWS-account-ID]. You can verify if the Lambda function can run successfully and if the group_membership, object_access, datasets_info, and data_dictionary folders are created in the S3 bucket under admin-console[AWS-account-ID]/monitoring/quicksight/, as shown in the following screenshots.

The Data_Prepare Lambda function is scheduled to run hourly with the CloudWatch Events rule admin-console-every-hour. This function calls the QuickSight Assets APIs to get QuickSight users, assets, and the access permissions information. Finally, this function creates two files, group_membership.csv and object_access.csv, and saves these files to an S3 bucket.

The Dataset_Info Lambda function is scheduled to run hourly and calls the QuickSight Assets APIs to get datasets, schemas, tables, and fields (columns) information. Then this function creates two files, datasets_info.csv and data_dictionary.csv, and saves these files to an S3 bucket.

  •  Create a CloudTrail log if you don’t already have one and note down the S3 bucket name of the log files for future use.
  •  Note down all the resources created from the previous steps. If the S3 bucket name for the CloudTrail log from step 2 is different from the one in step 1’s output, use the S3 bucket from step 2.

The following table summarizes the keys and values you use when creating the Athena tables with the next CloudFormation stack.

Key Value Description
cloudtraillog s3://cloudtrail-awslogs-[aws-account-id]-do-not-delete/AWSLogs/[aws-account-id]/CloudTrail The Amazon S3 location of the CloudTrail log
cloudtraillogtablename cloudtrail_logs The table name of CloudTrail log
groupmembership s3://admin-console[aws-account-id]/monitoring/quicksight/group_membership The Amazon S3 location of group_membership.csv
objectaccess s3://admin-console[aws-account-id]/monitoring/quicksight/object_access The Amazon S3 location of object_access.csv
dataset info s3://admin-console[aws-account-id]/monitoring/quicksight/datsets_info The Amazon S3 location of datsets_info.csv
datadict s3://admin-console[aws-account-id]/monitoring/quicksight/data_dictionary The Amazon S3 location of data_dictionary.csv

CloudFormation template for Athena tables

To create your Athena tables, complete the following steps:

  • Download the following JSON file.
  • Edit the file and replace the corresponding fields with the keys and values you noted in the previous section.

For example, search for the groupmembership keyword.

Then replace the location value with the Amazon S3 location for the groupmembership folder.

  • Create Athena tables by deploying this edited file as a CloudFormation template. For instructions, refer to Get started.

After a successful deployment, you have a database called admin-console created in AwsDataCatalog in Athena and three tables in the database: cloudtrail_logs, group_membership, object_access, datasets_info and data_dict

  • Confirm the tables via the Athena console.

The following screenshot shows sample data of the group_membership table.

The following screenshot shows sample data of the object_access table.

For instructions on building an Athena table with CloudTrail events, see Amazon QuickSight Now Supports Audit Logging with AWS CloudTrail. For this post, we create the table cloudtrail_logs in the default database.

  • After all five tables are created in Athena, go to the security permissions on the QuickSight console to enable bucket access for s3://admin-console[AWS-account-ID] and s3://cloudtrail-awslogs-[aws-account-id]-do-not-delete.
  • Enable Athena access under Security & Permissions.

Now QuickSight can access all five tables through Athena.

CloudFormation template for QuickSight objects

To create the QuickSight objects, complete the following steps:

  • Get the QuickSight admin user’s ARN by running following command in the AWS Command Line Interface (AWS CLI):
    aws quicksight describe-user --aws-account-id [aws-account-id] --namespace default --user-name [admin-user-name]

    For example: arn:aws:quicksight:us-east-1:12345678910:user/default/admin/xyz.

  • Choose Launch Stack to create the QuickSight datasets and dashboard:

  • Provide the ARN you noted earlier.

After a successful deployment, four datasets named Admin-Console-Group-Membership, Admin-Console-dataset-info, Admin-Console-Object-Access, and Admin-Console-CFN-Main are created and you have the dashboard named admin-console-dashboard. If modifying the dashboard is preferred, use the dashboard save-as option, then recreate the analysis, make modifications, and publish a new dashboard.

  • Set your preferred SPICE refresh schedule for the four SPICE datasets, and share the dashboard in your organization as needed.

Dashboard demo

The following screenshot shows the Admin Console Landing page.

The following screenshot shows the User Analysis sheet.

The following screenshot shows the Dashboards Analysis sheet.

The following screenshot shows the Access Permissions sheet.

The following screenshot shows the Data Dictionary sheet.

The following screenshot shows the Overview sheet.

You can interactively play with the sample dashboard in the following Interactive Dashboard Demo.

You can reference the public template of the preceding dashboard in create-template, create-analysis, and create-dashboard API calls to create this dashboard and analysis in your account. The public template of this dashboard with the template ARN is 'TemplateArn': 'arn:aws:quicksight:us-east-1:889399602426:template/admin-console'.

Tips and tricks

Here are some advanced tips and tricks to build the dashboard as the Admin Console to analyze usage metrics. The following steps are based on the dataset admin_console. You can apply the same logic to create the calculated fields to analyze user login activities.

  • Create parameters – For example, we can create a parameter called InActivityMonths, as in the following screenshot. Similarly, we can create other parameters such as InActivityDays, Start Date, and End Date.

  • Create controls based on the parameters – In the following screenshot, we create controls based on the start and end date.

  • Create calculated fields – For instance, we can create a calculated field to detect the active or inactive status of QuickSight authors. If the time span between the latest view dashboard activity and now is larger or equal to the number defined in the Inactivity Months control, the author status is Inactive. The following screenshot shows the relevant code. According to the end-user’s requirements, we can define several calculated fields to perform the analysis.

  • Create visuals – For example, we create an insight to display the top three dashboard views by reader and a visual to display the authors of these dashboards.

  • Add URL actions – You can add an URL action to define some extra features to email inactive authors or check details of users.

The following sample code defines the action to email inactive authors:

mailto:<<email>>?subject=Alert to inactive author! &body=Hi, <<username>>, any author without activity for more than a month will be deleted. Please log in to your QuickSight account to continue accessing and building analyses and dashboards!

Clean up

To avoid incurring future charges, delete all the resources you created with the CloudFormation templates.

Conclusion

This post discussed how BI administrators can use QuickSight, CloudTrail, and other AWS services to create a centralized view to analyze QuickSight usage metrics. We also presented a serverless data pipeline to support the Admin Console dashboard.

If you would like to have a demo, please email us.

Appendix

We can perform some additional sophisticated analysis to collect advanced usage metrics. For example, Forwood Safety raised a unique request to analyze the readers who log in but don’t view any dashboard actions (see the following code). This helps their clients identify and prevent any wasting of reader sessions fees. Leadership teams value the ability to minimize uneconomical user activity.

CREATE OR REPLACE VIEW "loginwithoutviewdashboard" AS
with login as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventname = 'AssumeRoleWithSAML'
GROUP BY  1,2,3),
dashboard as
(SELECT COALESCE("useridentity"."username", "split_part"("useridentity"."arn", '/', 3)) AS "user_name", awsregion,
date_parse(eventtime, '%Y-%m-%dT%H:%i:%sZ') AS event_time
FROM cloudtrail_logs
WHERE
eventsource = 'quicksight.amazonaws.com'
AND
eventname = 'GetDashboard'
GROUP BY  1,2,3),
users as 
(select Namespace,
Group,
User,
(case
when Group in (‘quicksight-fed-bi-developer’, ‘quicksight-fed-bi-admin’)
then ‘Author’
else ‘Reader’
end)
as author_status
from "group_membership" )
select l.* 
from login as l 
join dashboard as d 
join users as u 
on l.user_name=d.user_name 
and 
l.awsregion=d.awsregion 
and 
l.user_name=u.user_name
where d.event_time>(l.event_time + interval '30' minute ) 
and 
d.event_time<l.event_time 
and 
u.author_status='Reader'

About the Authors

Ying Wang is a Manager of Software Development Engineer. She has 12 years of expertise in data analytics and science. She assisted customers with enterprise data architecture solutions to scale their data analytics in the cloud during her time as a data architect. Currently, she helps customer to unlock the power of Data with QuickSight from engineering by delivering new features.

Ian Liao is a Senior Data Visualization Architect at AWS Professional Services. Before AWS, Ian spent years building startups in data and analytics. Now he enjoys helping customer to scale their data application on the cloud.

Maitri Brahmbhatt is a Business Intelligence Engineer at AWS. She helps customers and partners leverage their data to gain insights into their business and make data driven decisions by developing QuickSight dashboards.

Simplify data analysis and collaboration with SQL Notebooks in Amazon Redshift Query Editor V2.0

Post Syndicated from Ranjan Burman original https://aws.amazon.com/blogs/big-data/simplify-data-analysis-and-collaboration-with-sql-notebooks-in-amazon-redshift-query-editor-v2-0/

Amazon Redshift Query Editor V2.0 is a web-based analyst workbench that you can use to author and run queries on your Amazon Redshift data warehouse. You can visualize query results with charts, and explore, share, and collaborate on data with your teams in SQL through a common interface.

With SQL Notebooks, Amazon Redshift Query Editor V2.0 simplifies organizing, documenting, and sharing of data analysis with SQL queries. The notebook interface enables users such as data analysts, data scientists, and data engineers to author SQL code more easily, organizing multiple SQL queries and annotations on a single document. You can also collaborate with your team members by sharing notebooks. With SQL Notebooks, you can visualize the query results using charts. SQL Notebooks support provides an alternative way to embed all queries required for a complete data analysis in a single document using SQL cells. Query Editor V2.0 simplifies development of SQL notebooks with query versioning and export/import features. You can use the built-in version history feature to track changes in your SQL and markdown cells. With the export/import feature, you can easily move your notebooks from development to production accounts or share with team members cross-Region and cross-account.

In this post, we demonstrate how to use SQL Notebooks using Query Editor V2.0 and walk you through some of the new features.

Use cases for SQL Notebooks

Customers want to use SQL notebooks when they want reusable SQL code with multiple SQL statements and annotations or documentations. For example:

  • A data analyst might have several SQL queries to analyze data that create temporary tables, and runs multiple SQL queries in sequence to derive insights. They might also perform visual analysis of the results.
  • A data scientist might create a notebook that creates some training data, creates a model, tests the model, and runs sample predictions.
  • A data engineer might have a script to create schema and tables, load sample data, and run test queries.

Solution overview

For this post, we use the Global Database of Events, Language, and Tone (GDELT) dataset, which monitors news across the world, and the data is stored for every second of every day. This information is freely available as part of the Registry of Open Data on AWS.

For our use case, a data scientist wants to perform unsupervised learning with Amazon Redshift ML by creating a machine learning (ML) model, and then generate insights from the dataset, create multiple versions of the notebook, visualize using charts, and share the notebook with other team members.

Prerequisites

To use the SQL Notebooks feature, you must add a policy for SQL Notebooks to a principal—an AWS Identity and Access Management (IAM) user or role—that already has one of the Query Editor V2.0 managed policies. For more information, see Accessing the query editor V2.0.

Import the sample notebook

To import the sample SQL notebook in Query Editor V2.0, complete the following steps:

  1. Download the sample SQL notebook.
  2. On the Amazon Redshift console, choose Query Editor V2 in the navigation pane. Query Editor V2.0 opens in a new browser tab.
  3. To connect to a database, choose the cluster or workgroup name.
  4. If prompted, enter your connection parameters.  For more information about different authentication methods, refer to Connecting to an Amazon Redshift database.
  5. When you’re connected to the database, choose Notebooks in the navigation pane.
  6. Choose Import to use the SQL notebook downloaded in the first step.
    After the notebook is imported successfully, it will be available under My notebooks.
  7. To open the notebook, right-click on the notebook and choose Open notebook, or double-click on the notebook.

Perform data analysis

Let’s explore how you can run different queries from the SQL notebook cells for your data analysis.

  1. Let’s start by creating the table.
  2. Next, we load data into the table using COPY command. Before running the COPY command in the notebook, you need to have a default IAM role attached to your Amazon Redshift cluster, or replace the default keyword with the IAM role ARN attached to the Amazon Redshift cluster:
    COPY gdelt_data FROM 's3://gdelt-open-data/events/1979.csv'
    region 'us-east-1' iam_role 'arn:aws:iam::<account-id>:role/<role-name>' csv delimiter '\t';

    For more information, refer to Creating an IAM role as default in Amazon Redshift.

    Before we create the ML model, let’s examine the training data.

  3. Before you run the cell to create the ML model, replace the <your-amazon-s3-bucket-name> with the S3 bucket of your account to store intermediate results.
  4. Create the ML model.
  5. To check the status of the model, run the notebook cell Show status of the model.  The model is ready when the Model State key value is READY.
  6. Let’s identify the clusters associated with each GlobalEventId.
  7. Let’s get insights into the data points assigned to one of the clusters.

In the preceding screenshot, we can observe the data points assigned to the clusters. We see clusters of events corresponding to interactions between the US and China (probably due to the establishment of diplomatic relations), between the US and RUS (probably corresponding to the SALT II Treaty), and those involving Iran (probably corresponding to the Iranian Revolution).

To add text and format the appearance to provide context and additional information for your data analysis tasks, you can add a markdown cell. For example, in our sample notebook, we have provided a description about the query in the markdown cells to make it simpler to understand. For more information on markdown cells, refer to Markdown Cells.

To run all the queries in the SQL notebook at once, choose Run all.

Add new SQL and markdown cells

To add new SQL queries or markdown cells, complete the following steps:

  1. After you open the SQL notebook, hover over the cell and choose Insert SQL to add a SQL cell or Insert markdown to add a markdown cell.
  2. The new cell is added before the cell you selected.
  3. You can also move the new cell after a specific cell by choosing the up or down icon.

Visualize notebook results using charts

Now that you can run the SQL notebook cell and get the results, you can display a graphic visualization of the results by using the chart option in Query Editor V2.0.

Let’s run the following query to get more insights into the data points assigned to one of the cluster’s results and visualize using charts.

To visualize the query results, configure a chart on the Results tab. Choose actor2name for the X-axis and totalarticles for the Y-axis dropdown. By default, the graph type is a bar chart.

Charts can be plotted in every cell, and each cell can have multiple result tables, but only one of them can have a chart. For more information about working with charts in Query Editor V2.0, refer to Visualizing query results.

Versioning in SQL Notebooks

Version control enables easier collaboration with your peers and reduces the risks of any mistakes. You can create multiple versions of the same SQL notebook by using the Save version option in Query Editor V2.0.

  1. In the navigation pane, choose Notebooks.
  2. Choose the SQL notebook that you want to open.
  3. Choose the options menu (three dots) and choose Save version.

    SQL Notebooks creates the new version and displays a message that the version has been created successfully.

    Now we can view the version history of the notebook.
  4. Choose the SQL notebook for which you created the version (right-click) and choose Version history.

    You can see a list of all the versions of the SQL notebook.
  5. To revert to a specific version of the notebook, choose the version you want and choose Revert to version.
  6. To create a new notebook from a version, choose the version you want and choose Create a new notebook from the version.

Duplicate the SQL notebook

While working with your peers, you might need to share your notebook, but you also need to continue making changes in your notebook. To avoid any impact with the shared version, you can duplicate the notebook and keep working on your changes in the duplicate copy of the notebook.

  1. In the navigation pane, choose Notebooks.
  2. Open the SQL notebook.
  3. Choose the options menu (three dots) and choose Duplicate.
  4. Provide the duplicate notebook name.
  5. Choose Duplicate.

Share notebooks

You often need to collaborate with other teams, for example to share the queries for integration testing, deploy the queries from dev to the production account, and more. You can achieve this by sharing the notebook with your team.

A team is defined for a set of users who collaborate and share Query Editor V2.0 resources. An administrator can create a team by adding a tag to an IAM role.

Before you start sharing your notebook with your team, make sure that you have the principal tag sqlworkbench-team set to the same value as the rest of your team members in your account. For example, an administrator might set the value to accounting-team for everyone in the accounting department. To create a team and tag, refer to Permissions required to use the query editor v2.0.

To share a SQL notebook with a team in the same account, complete the following steps:

  1. Open the SQL notebook you want to share.
  2. Choose the options menu (three dots) and choose Share with my team.Notebooks that are shared to the team can be seen in the notebooks panel’s Shared to my team tab, and the notebooks that are shared by the user can be seen in Shared by me tab.You can also use the export/import feature for other use cases. For example, developers can deploy notebooks from lower environments to production, or customers can provide a SAAS solution sharing notebook with their end-users in different accounts or Regions. Complete the following steps to export and import SQL notebooks:
  3. Open the SQL notebook you want to share.
  4. Choose the options menu (three dots) and choose Export. SQL Notebooks saves the notebook in your local desktop as a .ipynb file.
  5. Import the notebook into another account or Region.

Run parameterized queries in a SQL notebook

Database users often need to pass parameters to the queries with different values at runtime. You can achieve this in SQL Notebooks by using parameterized queries. It can be defined in the query as ${parameter_name}, and when the query is run, it prompts to set a value for the parameter.

Let’s look at the following example, in which we pass the events_cluster parameter.

  1. Insert a SQL cell in the SQL notebook and add the following SQL query:
    select news_monitoring_cluster ( AvgTone, EventCode, NumArticles, Actor1Geo_Lat, Actor1Geo_Long, Actor2Geo_Lat, Actor2Geo_Long ) as events_cluster, eventcode, actor1name, actor2name, sum(numarticles) as totalarticles
    from gdelt_data
    where events_cluster = ${events_cluster}
    and actor1name <> ' 'and actor2name <> ' '
    group by 1,2,3,4
    order by 5 desc

  2. When prompted, input the value of the parameter events_cluster, (for this post, we set the value as 4).
  3. Choose Run now to run the query.

The following screenshot shows the query results with the events_cluster parameter value set to 4.

Conclusion

In this post, we introduced SQL Notebooks using the Amazon Redshift Query Editor V2.0. We used a sample notebook to demonstrate how it simplifies data analysis tasks for a data scientist and how you can collaborate using notebooks with your team.


About the Authors

Ranjan Burman is an Analytics Specialist Solutions Architect at AWS. He specializes in Amazon Redshift and helps customers build scalable analytical solutions. He has more than 15 years of experience in different database and data warehousing technologies. He is passionate about automating and solving customer problems with the use of cloud solutions.

Erol Murtezaoglu, a Technical Product Manager at AWS, is an inquisitive and enthusiastic thinker with a drive for self-improvement and learning. He has a strong and proven technical background in software development and architecture, balanced with a drive to deliver commercially successful products. Erol highly values the process of understanding customer needs and problems in order to deliver solutions that exceed expectations.

Cansu Aksu is a Frontend Engineer at AWS. She has several years of experience in building user interfaces that simplify complex actions and contribute to a seamless customer experience. In her career in AWS, she has worked on different aspects of web application development, including front end, backend, and application security.

Andrei Marchenko is a Full Stack Software Development Engineer at AWS. He works to bring notebooks to life on all fronts—from the initial requirements to code deployment, from the database design to the end-user experience. He uses a holistic approach to deliver the best experience to customers.

Debu-PandaDebu Panda is a Senior Manager, Product Management at AWS. He is an industry leader in analytics, application platform, and database technologies, and has more than 25 years of experience in the IT world. Debu has published numerous articles on analytics, enterprise Java, and databases and has presented at multiple conferences such as re:Invent, Oracle Open World, and Java One. He is lead author of the EJB 3 in Action (Manning Publications 2007, 2014) and Middleware Management (Packt, 2009)

How The Mill Adventure enabled data-driven decision-making in iGaming using Amazon QuickSight

Post Syndicated from Deepak Singh original https://aws.amazon.com/blogs/big-data/how-the-mill-adventure-enabled-data-driven-decision-making-in-igaming-using-amazon-quicksight/

This post is co-written with Darren Demicoli from The Mill Adventure.

The Mill Adventure is an iGaming industry enabler offering customizable turnkey solutions to B2B partners and custom branding enablement for its B2C partners. They provide a complete gaming platform, including licenses and operations, for rapid deployment and success in iGaming, and are committed to improving the iGaming experience by being a differentiator through innovation. The Mill Adventure already provides its services to a number of iGaming brands and seeks to continuously grow through the ranks of the industry.

In this post, we show how The Mill Adventure is helping its partners answer business-critical iGaming questions by building a data analytics application using modern data strategy using AWS. This modern data strategy approach has led to high velocity innovation while lowering the total operating cost.

With a gross market revenue exceeding $70 billion and a global player base of around 3 billion players (per a recent imarc Market Overview 2022-2027), the iGaming industry has, without a doubt, been booming over the past few years. This presents a lucrative opportunity to an ever-growing list of businesses seeking to tap into the market and attract a bigger share as their audience. Needless to say, staying competitive in this somewhat saturated market is extremely challenging. Making data-driven decisions is critical to the growth and success of iGaming businesses.

Business challenges

Gaming companies typically generate a massive amount of data, which could potentially enable meaningful insights and answer business-critical questions. Some of the critical and common business challenges in iGaming industry are:

  • What impacts the brand’s turnover—its new players, retained players, or a mix of both?
  • How to assess the effectiveness of a marketing campaign? Should a campaign be reinstated? Which games to promote via campaigns?
  • Which affiliates drive quality players that have better conversion rates? Which paid traffic channels should be discontinued?
  • For how long does the typical player stay active within a brand? What is the lifetime deposit from a player?
  • How to improve the registration to first deposit processes? What are the most pressing issues impacting player conversion?

Though sufficient data was captured, The Mill Adventure found two key challenges in their ability to generate actionable insights:

  • Lack of analysis-ready datasets (not raw and unusable data formats)
  • Lack of timely access to business-critical data

For example, The Mill Adventure generates over 50 GB of data daily. Its partners have access to this data. However, due to the data being in a raw form, they find it of little value in answering their business-critical questions. This affects their decision-making processes.

To address these challenges, The Mill Adventure chose to build a modern data platform on AWS that was not only capable of providing timely and meaningful business insights for the iGaming industry, but also efficiently manageable, low-cost, scalable, and secure.

Modern data architecture

The Mill Adventure wanted to build a data analytics platform using a modern data strategy that would grow as the company grows. Key tenets of this modern data strategy are:

  • Build a modern business application and store data in the cloud
  • Unify data from different application sources into a common data lake, preferably in its native format or in an open file format
  • Innovate using analytics and machine learning, with an overarching need to meet security and governance compliance requirements

A modern data architecture on AWS applies these tenets. Two key features that form the basic foundation of a modern data architecture on AWS are serverless and microservices.

The Mill Adventure solution

The Mill Adventure built a serverless iGaming data analytics platform that allows its partners to have quick and easy access to a dashboard with data visualizations driven by the varied sources of gaming data, including real-time streaming data. With this platform, stakeholders can use data to devise strategies and plan for future growth based on past performance, evaluate outcomes, and respond to market events with more agility. Having the capability to access insightful information in a timely manner and respond promptly has substantial impact on the turnover and revenue of the business.

A serverless iGaming platform on AWS

In building the iGaming platform, The Mill Adventure was quick to recognize the benefits of having a serverless microservice infrastructure. We wanted to spend time on innovating and building new applications, not managing infrastructure. AWS services such as Amazon API Gateway, AWS Lambda, Amazon DynamoDB, Amazon Kinesis Data Streams, Amazon Simple Storage Service (Amazon S3), Amazon Athena, and Amazon QuickSight are at the core of this data platform solution. Moving to AWS serverless services has saved time, reduced cost, and improved productivity. A microservice architecture has enabled us to accelerate time to value, increase innovation speed, and reduce the need to re-platform, refactor, and rearchitect in the future.

The following diagram illustrates the data flow from the gaming platform to QuickSight.

The data flow includes the following steps:

  1. As players access the gaming portal, associated business functions such as gaming activity, payment, bonus, accounts management, and session management capture the relevant player actions.
  2. Each business function has a corresponding Lambda-based microservice that handles the ingestion of the data from that business function. For example, the Session service handles player session management. The Payment service handles player funds, including deposits and withdrawals from player wallets. Each microservice stores data locally in DynamoDB and manages the create, read, update, and delete (CRUD) tasks for the data. For event sourcing implementation details, see How The Mill Adventure Implemented Event Sourcing at Scale Using DynamoDB.
  3. Data records resulting from the CRUD outputs are written in real time to Kinesis Data Streams, which forms the primary data source for the analytics dashboards of the platform.
  4. Amazon S3 forms the underlying storage for data in Kinesis Data Streams and forms the internal real-time data lake containing raw data.
  5. The raw data is transformed and optimized through custom-built extract, transform, and load (ETL) pipelines and stored in a different S3 bucket in the data lake.
  6. Both raw and processed data are immediately available for querying via Athena and QuickSight.

Raw data is transformed, optimized, and stored as processed data using an hourly data pipeline to meet analytics and business intelligence needs. The following figure shows an example of record counts and the size of the data being written into Kinesis Data Streams, which eventually needs to be processed from the data lake.

These data pipeline jobs can be broadly classified into six main stages:

  • Cleanup – Filtering out invalid records
  • Deduplicate – Removing duplicate data records
  • Aggregate at various levels – Grouping data at various aggregation levels of interest (such as per player, per session, or per hour or day)
  • Optimize – Writing files to Amazon S3 in optimized Parquet format
  • Report – Triggering connectors with updated data (such as updates to affiliate providers and compliance)
  • Ingest – Triggering an event to ingest data in QuickSight for analytics and visualizations

The output of this data pipeline is two-fold:

  • A transformed data lake that is designed and optimized for fast query performance
  • A refreshed view of data for all QuickSight dashboards and analyses

Cultivating a data-driven mindset with QuickSight

The Mill Adventure’s partners access their data securely via QuickSight datasets. These datasets are purposefully curated views on top of the transformed data lake. Each partner can access and visualize their data immediately. With QuickSight, partners can build useful dashboards without having deep technical knowledge or familiarity with the internal structure of the data. This approach significantly reduces the time and effort required and speeds up access to valuable gaming insights for business decision-making.

The Mill Adventure also provides each partner with a set of readily available dashboards. These dashboards are built on the years of experience that The Mill Adventure has in the iGaming industry, cover the most common business intelligence requirements, and jumpstart a data-driven mindset.

In the following sections, we provide a high-level overview of some of The Mill Adventure iGaming dashboard features and how these are used to meet the iGaming business analytics needs.

Key performance indicators

This analysis provides a comprehensive set of iGaming key performance indicators (KPIs) across different functional areas, including but not limited to payment activity (deposits and withdrawals), game activity (bets, gross game wins, return to player) and conversion metrics (active customers, active players, unique depositing customers, newly registered customers, new depositing customers, first-time depositors). These are presented concisely in both a quantitative view and in more visual forms.

In the following example KPI report, we can see how by presenting different iGaming metrics for key periods and lifetime, we can identify the overall performance of the brand.

Affiliates analysis

This analysis presents metrics related to the activity generated by players acquired through affiliates. Affiliates usually account for a large share of the traffic driven to gaming sites, and such a report helps identify the most effective affiliates. It informs performance trends per affiliate and compares across different affiliates. By combining data from multiple sources via QuickSight cross-data source joins, affiliate provider-related data such as earnings and clicks can be presented together with other key gaming platform metrics. By having these metrics broken down by affiliate, we can determine which affiliates contribute the most to the brand, as shown in the following example figure.

Cohort analysis

Cohort analyses track the progression of KPIs (such as average deposits) over a period of time for groups of players after their first deposit day. In the following figure, the average deposits per user (ADPU) is presented for players registering in different quarters within the last 2 years. By moving horizontally along each row on the graph, we can see how the ADPU changes for successive quarters for the same group of players. In the following example, the ADPU decreases substantially, indicating higher player churn.

We can use cohort analyses to calculate the churn rate (rate of players who become inactive). Additionally, by averaging the ADPU figures from this analysis, you can extract the lifetime value (LTV) of the ADPU. This shows the average deposit that can be expected to be deposited by players over their lifetime with the brand.

Player onboarding journey

Player onboarding is not a single-step process. In particular, jurisdictional requirements impose a number of compliance checks that need to be fulfilled along various stages during registration flow. All these, plus other steps along the registration (such as email verification), could pose potential pitfalls for players, leading them to fail to complete registration. Showing these steps in QuickSight funnel visuals helps identify such issues and pinpoint any bottlenecks in such flows, as shown in the following example. Additionally, Sankey visuals are used to monitor player movement across registration steps, identifying steps that need to be optimized.

Campaign outcome analysis

Bonus campaigns are a valuable promotional technique used to reward players and boost engagement. Campaigns can drive turnover and revenue, but there is always an inherent cost associated. It’s critical to assess the performance of campaigns and determine the net outcome. We have built a specific analysis to simplify the task of evaluating these promotions. A number of key metrics related to players activated by campaigns are available. These include both monetary incentives for game activity and deposits and other details related to player demographics (such as country, age group, gender, and channel). Individual campaign performance is analyzed and high-performance ones are identified.

In the following example, the figure on the left shows a time series distribution of deposits coming from campaigns in comparison to the global ones. The figure on the right shows a geographic plot of players activated from selected campaigns.

Demographics distribution analysis

Brands may seek to improve player engagement and retention by tailoring their content for their player base. They need to collect and understand information about their players’ demographics. Players’ demographic distribution varies from brand to brand, and the outcome of actions taken on different brands will vary due to this distribution. Keeping an eye on this demographic (age, country, gender) distribution helps shape a brand strategy in the best way that suits the player base and helps choose the right promotions that appeal most to its audience.

Through visuals such as the following example, it’s possible to quickly analyze the distribution of the selected metric along different demographic categories.

In addition, grouping players by the number of days since registration indicates which players are making a higher contribution to revenue, whether it is existing players or newly registered players. In the following figure, we can see that players who registered in the last 3 months continually account for the highest contribution to deposits. In addition, the proportion of deposits coming from the other two bands of players isn’t increasing, indicating an issue with player retention.

Compliance and responsible gaming

The Mill Adventure treats player protection with the utmost priority. Each iGaming regulated market has its own rules that need to be followed by the gaming operators. These include a number of compliance reports that need to be regularly sent to authorities in the respective jurisdictions. This process was simplified for new brands by creating a common reports template and automating the report creation in QuickSight. This helps new B2B brands meet these reporting requirements quickly and with minimal effort.

In addition, a number of control reports highlighting different areas of player protection are in place. As shown in the following example, responsible gaming reports such as those outlining player behavior deviations help identify accounts with problematic gambling patterns.

Players whose gaming pattern varies from the identified norm are flagged for inspection. This is useful to identify players who may need intervention.

Assessing game play and releases

It’s important to measure the performance and popularity of new games post release. Metrics such as unique player participation and player stakes are monitored during the initial days after the release, as shown in the following figures.

Not only does this help evaluate the overall player engagement, but it can also give a clear indication of how these games will perform in the future. By identifying popular games, a brand may choose to focus marketing campaigns on those games, and therefore ensure that it’s promoting games that appeal to its player base.

As shown in these example dashboards, we can use QuickSight to design and create business analytics insights of the iGaming data. This helps us answer real-life business-critical questions and take measurable actions using these insights.

Conclusion

In the iGaming industry, decisions not backed up by data are like an attempt to hit the bullseye blindfolded. With QuickSight, The Mill Adventure empowers its B2B partners and customers to harness data in a timely and convenient manner and support decision-making with winning strategies. Ultimately, in addition to gaining a competitive edge in maximizing revenue opportunities, improved decision-making will also lead to enhanced player experiences.

Reach out to The Mill Adventure and kick-start your iGaming journey today.

Explore rich set of out-of-the-box Amazon QuickSight ML Insights. Amazon QuickSight Q enables dashboards with natural language querying capabilities. For more information and resources on how to get started with free trial, visit Amazon QuickSight.


About the authors

Darren Demicoli is a Senior Devops and Business Intelligence Engineer at The Mill Adventure. He has worked in different roles in technical infrastructure, software development and database administration and has been building solutions for the iGaming sector for the past few years. Outside work, he enjoys travelling, exploring good food and spending time with his family.

Padmaja Suren is a Technical Business Development Manager serving the Public Sector Field Community in Market Intelligence on Analytics. She has 20+ years of experience in building scalable data platforms using a variety of technologies. At AWS she has served as Specialist Solution Architect on services such as Database, Analytics and QuickSight. Prior to AWS, she has implemented successful data and BI initiatives for diverse industry sectors in her capacity as Datawarehouse and BI Architect. She dedicates her free time on her passion project SanghWE which delivers psychosocial education for sexual trauma survivors to heal and recover.

Deepak Singh is a Solution Architect at AWS with specialization in business intelligence and analytics. Deepak has worked across a number of industry verticals such as Finance, Healthcare, Utilities, Retail, and High Tech. Throughout his career, he has focused on solving complex business problems to help customers achieve impactful business outcomes using applied intelligence solutions and services.

Using mobile sensor data to encourage safer driving

Post Syndicated from Grab Tech original https://engineering.grab.com/using-mobile-sensor-data-to-encourage-safer-driving

“Telematics”, a cross between the words telecommunications and informatics, was coined in the late 1970s to refer to the use of communication technologies in facilitating exchange of information. In the modern day, such technologies may include cloud platforms, mobile networks, and wireless transmissions (e.g., Bluetooth). Although the initial intention is for a more general scope, telematics is now specifically used to refer to vehicle telematics where details of vehicle movements are tracked for use cases such as driving safety, driver profiling, fleet optimisation, and productivity improvements.

We’ve previously published this article to share how Grab uses telematics to improve driver safety. In this blog post, we dive deeper into how telematics technology is used at Grab to encourage safer driving for our driver and delivery partners.

Background

At Grab, the safety of our users and their experience on our platform is our highest priority. By encouraging safer driving habits from our driver and delivery partners, road traffic accidents can be minimised, potentially reducing property damage, injuries, and even fatalities. Safe driving also helps ensure smoother rides and a more pleasant experience for consumers using our platform.

To encourage safer driving, we should:

  1. Have a data-driven approach to understand how our driver and delivery partners are driving.
  2. Help partners better understand how to improve their driving by summarising key driving history into a personalised Driving Safety Report.

Understanding driving behaviour

One of the most direct forms of driving assessment is consumer feedback or complaints. However, the frequency and coverage of this feedback is not very high as they are only applicable to transport verticals like JustGrab or GrabBike and not delivery verticals like GrabFood or GrabExpress. Plus, most driver partners tend not to receive any driving-related feedback (whether positive or negative), even for the transport verticals.

A more comprehensive method of assessing driving behaviour is to use the driving data collected during Grab bookings. To make sense of these data, we focus on selected driving manoeuvres (e.g., braking, acceleration, cornering, speeding) and detect the number of instances where our data shows unsafe driving in each of these areas.

We acknowledge that the detected instances may be subjected to errors and may not provide the complete picture of what’s happening on the ground (e.g., partners may be forced to do an emergency brake due to someone swerving into their lane).

To address this, we have incorporated several fail-safe checks into our detection logic to minimise erroneous detection. Also, any assessment of driving behaviour will be based on an aggregation of these unsafe driving instances over a large amount of driving data. For example, individual harsh braking instances may be inconclusive but if a driver partner displays multiple counts consistently across many bookings, it is likely that the partner may be used to unsafe driving practices like tailgating or is distracted while driving.

Telematics for detecting unsafe driving

For Grab to consistently ensure our consumers’ safety, we need to proactively detect unsafe driving behaviour before an accident occurs. However, it is not feasible for someone to be with our driver and delivery partners all the time to observe their driving behaviour. We should leverage sensor data to monitor these driving behaviour at scale.

Traditionally, a specialised “black box” inertial measurement unit (IMU) equipped with sensors such as accelerometers, gyroscopes, and GPS needs to be installed in alignment with the vehicle to directly measure vehicular acceleration and speed. In this manner, it would be straightforward to detect unsafe driving instances using this data. Unfortunately, the cost of purchasing and installing such devices for all our partners is prohibitively high and it would be hard to scale.

Instead, we can leverage a device that all partners already have: their mobile phone. Modern smartphones already contain similar sensors to those in IMUs and data can be collected through the telematics SDK. More details on telematics data collection can be found in a recently published Grab tech blog article1.

It’s important to note that telematics data are collected at a sufficiently high sampling frequency (much more than 1 Hz) to minimise inaccuracies in detecting unsafe driving instances characterised by sharp acceleration impulses.

Processing mobile sensor data to detect unsafe driving

Unlike specialised IMUs installed in vehicles, mobile sensor data have added challenges to detecting unsafe driving.

Accounting for orientation: Phone vs. vehicle

The phone is usually in a different orientation compared to the vehicle. Strictly speaking, the phone accelerometer sensor measures the accelerations of the phone and not the vehicle acceleration. To infer vehicle acceleration from phone sensor data, we developed a customised processing algorithm optimised specifically for Grab’s data.

First, the orientation offset of the phone with respect to the vehicle is defined using Euler angles: roll, pitch and yaw. In data windows with no net acceleration of the vehicle (e.g., no braking, turning motion), the only acceleration measured by the accelerometer is gravitational acceleration. Roll and pitch angles can then be determined through trigonometric manipulation. The complete triaxial accelerations of the phone are then rotated to the horizontal plane and the yaw angle is determined by principal component analysis (PCA).

An assumption here is that there will be sufficient braking and acceleration manoeuvring for PCA to determine the correct forward direction. This Euler angles determination is done periodically to account for any movement of phones during the trip. Finally, the raw phone accelerations are rotated to the vehicle orientation through a matrix multiplication with the rotation matrix derived from the Euler angles (see Figure 1).

Figure 1: Inference of vehicle acceleration from the phone sensor data. Smartphone and car images modified from designs found in Freepik.com.

Handling variations in data quality

Our processing algorithm is optimised to be highly robust and handle large variations in data quality that is expected from bookings on the Grab platform. There are many reported methods for processing mobile data to reorientate telematics data for four wheel vehicles23.

However, with the prevalent use of motorcycles on our platform, especially for delivery verticals, we observed that data collected from two wheel vehicles tend to be noisier due to differences in phone stability and vehicular vibrations. Data noise can be exacerbated if partners hold the phone in their hand or place it in their pockets while driving.

In addition, we also expect a wide variation in data quality and sensor availability from different phone models, such as older, low-end models to the newest, flagship models. A good example to illustrate the robustness of our algorithm is having different strategies to handle different degrees of data noise. For example, a simple low-pass filter is used for low noise data, while more complex variational decomposition and Kalman filter approaches are used for high noise data.

Detecting behaviour anomalies with thresholds

Once the vehicular accelerations are inferred, we can use a thresholding approach (see Figure 2) to detect unsafe driving instances.

For unsafe acceleration and braking, a peak finding algorithm is used to detect acceleration peaks beyond a threshold in the longitudinal (forward/backward) direction. For unsafe cornering, older and lower end phones are usually not equipped with gyroscope sensors, so we should look for peaks of lateral (sidewards) acceleration (which constitutes the centripetal acceleration during the turn) beyond a threshold. GPS bearing data that coarsely measures the orientation of the vehicle is then used to confirm that a cornering and not lane change instance is being detected. The thresholds selected are fine-tuned on Grab’s data using initial values based on published literature4 and other sources.

To reduce false positive detection, no unsafe driving instances will be flagged when:

  1. Large discrepancies are observed between speeds derived from integrating the longitudinal (forward/backward) acceleration and speeds directly measured by the GPS sensor.
  2. Large phone motions are detected. For example, when the phone falls to the seat from the dashboard, accelerations recorded on the phone sensor will deviate significantly from the vehicle accelerations.
  3. GPS speed is very low before and after the unsafe driving instance is detected. This is limited to data collected from motorcycles which is usually used by delivery partners. It implies that the partner is walking and not in a vehicle. For example, a GrabFood delivery partner may be collecting the food from the merchant partner on foot, so no unsafe driving instances should be detected.
Figure 2: Animation showing unsafe driving detection by thresholding. Dotted lines in acceleration charts indicate selected thresholds. Map tiles by stamen design.

Detecting speeding instances from GPS speeds and map data

To define speeding along a stretch of road, we used a rule-based method by comparing raw speeds from GPS pings with speeding thresholds for that road. Although GPS speeds are generally accurate (subjected to minimal GPS errors), we need to take more precautions to ensure the right speeding thresholds are determined.

These thresholds are set using known speed limits from available map data or hourly aggregated speed statistics where speed limits are not available. The coverage and accuracy of known speed limits is continuously being improved by our in-house mapping initiatives and validated comprehensively by the respective local ground teams in selected cities.

Aggregating GPS pings from Grab driver and delivery partners can be a helpful proxy to actual speed limits by defining speeding violations as outliers from socially acceptable speeds derived from partners collectively. To reliably compute aggregated speed statistics, a representative speed profile for each stretch of road must first be inferred from raw GPS pings (see Figure 3).

As ping sampling intervals are fixed, more pings tend to be recorded for slower speeds. To correct the bias in the speed profile, we reweigh ping counts by using speed values as weights. Furthermore, to minimise distortions in the speed profile from vehicles driving at lower-than-expected speeds due to high traffic volumes, only pings from free-flowing traffic are used when inferring the speed profile.

Free-flowing traffic is defined by speeds higher than the median speed on each defined road category (e.g., small residential roads, normal primary roads, large expressways). To ensure extremely high speeds are flagged regardless of the speed of other drivers, maximum threshold values for aggregated speeds are set for each road category using heuristics based on the maximum known speed limit of that road category.

Figure 3: Steps to infer a representative speed profile for computing aggregated speed statistics.

Besides a representative speed profile, hourly aggregation should also include data from a sufficient number of unique drivers depending on speed variability. To obtain enough data, hourly aggregations are performed on the same day of the week over multiple weeks. This way, we have a comprehensive time-specific speed profile that accounts for traffic quality (e.g., peak hour traffic, traffic differences between weekdays/weekends) and driving conditions (e.g., visibility difference between day/night).

When detecting speeding violations, the GPS pings used are snapped-to-road and stationary pings, pings with unrealistic speeds, while pings with low GPS accuracy (e.g., when the vehicle is in a tunnel) are excluded. A speeding violation is defined as a sequence of consecutive GPS pings that exceed the speeding threshold. The following checks were put in place to minimise erroneous flagging of speeding violations:

  1. Removal of duplicated (or stale) GPS pings.
  2. Sufficient speed buffer given to take into account GPS errors.
  3. Sustained speeding for a prolonged period of time is required to exclude transient speeding events (e.g., during lane change).

Driving safety report

The driving safety report is a platform safety product that driver and delivery partners can access via their driver profile page on the Grab Driver Application (see Figure 4). It is updated daily and aims to create awareness regarding driving habits by summarising key information from the processed data into a personalised report that can be easily consumed.

Individual reports of each driving manoeuvre (e.g., braking, acceleration, cornering and speeding) are available for daily and weekly views. Partners can also get more detailed information of each individual instance such as when these unsafe driving instances were detected.

Figure 4: Driving safety report for driver and delivery partners using four wheel vehicles. a) Actionable insights feature circled by red dotted lines. b) Daily view of various unsafe driving instances where more details of each instance can be viewed by tapping on “See details”.

Actionable insights

Besides compiling the instances of unsafe driving in a report to create awareness, we are also using these data to provide some actionable recommendations for our partners to improve their driving.

With unsafe driving feedback from consumers and reported road traffic accident data from our platform, we also train machine learning models to identify patterns in the detected unsafe driving instances and estimate the likelihood of partners receiving unsafe driving feedback or getting into accidents. One use case is to compute a safe driving score that equates a four-wheel partner’s driving behaviour to a numerical value where a higher score indicates a safer driver.

Additionally, we use Shapley additive explanation (SHAP) approaches to determine which driving manoeuvre contributes the most to increasing the likelihood of partners receiving unsafe driving feedback or getting into accidents. This information is included as an actionable insight in the driving safety report and helps partners to identify the key area to improve their driving.

What’s next?

At the moment, Grab performs telematics processing and unsafe driving detections after the trip and updates the report the next day. One of the biggest improvements would be to share this information with partners faster. We are actively working on developing a real-time processing algorithm that addresses this and also, satisfies the robustness requirements such that partners are immediately aware after an unsafe driving instance is detected.

Besides detecting typical unsafe driving manoeuvres, we are also exploring other use cases for mobile sensor data in road safety such as detection of poor road conditions, counterflow driving against traffic, and phone usage leading to distracted driving.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

References

  1. Burhan, W. (2022). How telematics helps Grab to improve safety. Grab Tech Blog. https://engineering.grab.com/telematics-at-grab 

  2. Mohan, P., Padmanabhan, V.N. and Ramjee, R. (2008).Nericell: rich monitoring of road and traffic conditions using mobile smartphones. SenSys ‘08: Proceedings of the 6th ACM conference on Embedded network sensor systems, 312-336. https://doi.org/10.1145/1460412.1460444 

  3. Sentiance (2016). Driving behavior modeling using smart phone sensor data. Sentiance Blog. https://sentiance.com/2016/02/11/driving-behavior-modeling-using-smart-phone-sensor-data/ 

  4. Yarlagadda, J. and Pawar, D.S. (2022). Heterogeneity in the Driver Behavior: An Exploratory Study Using Real-Time Driving Data. Journal of Advanced Transportation. vol. 2022, Article ID 4509071. https://doi.org/10.1155/2022/4509071 

Doing data preparation using on-premises PostgreSQL databases with AWS Glue DataBrew

Post Syndicated from John Espenhahn original https://aws.amazon.com/blogs/big-data/doing-data-preparation-using-on-premises-postgresql-databases-with-aws-glue-databrew/

Today, with AWS Glue DataBrew, data analysts and data scientists can easily access and visually explore any amount of data across their organization directly from their Amazon Simple Storage Service (Amazon S3) data lake, Amazon Redshift data warehouse, and Amazon Aurora and Amazon Relational Database Service (Amazon RDS) databases. Customers can choose from over 250 built-in functions to combine, pivot, and transpose the data without writing code.

Now, with added support for JDBC-accessible databases, DataBrew also supports additional data stores including PostgreSQL, MySQL, Oracle, and Microsoft SQL Server. In this blog post, we will be using DataBrew to clean data from an on-premise database, and storing the cleaned data in an Amazon S3 data lake.

Solution Overview

I will be configuring an existing subnet in an Amazon VPC for use with DataBrew. Then configuring DataBrew to securely connect to an existing on-premise database and executing a data preparation job.

Components

  1. You should have an AWS account with a Virtual Private Cloud (Amazon VPC). DataBrew will connect to your database from this VPC.
  2. You should have a subnet within your Amazon VPC. In this blog, this subnet will be configured for use with DataBrew.
  3. You should have an on-premise database with data to be cleaned with DataBrew.
  4. I assume you have a VPN connection between your Amazon VPC and on premise network to enable secure connections between them. I’ve implemented a VPN tunnel using AWS Site-to-Site VPN. You may choose to  Simulate Site-to-Site VPN Customer Gateways Using strongSwan.
  5. This guide will walk through creation of a DataBrew dataset, project, and job.
  6. DataBrew requires access to Amazon S3 and AWS Glue. This guide will walk through creating VPC endpoints to enable private connections between your VPC and these AWS services for DataBrew to use.
  7. To establish network connectivity, DataBrew will provision an Amazon VPC elastic network interface in the VPC you specify. This blog will cover securing this network interface with a security group.

Prerequisites

To complete this blog, you should have the following prerequisites:

Additionally, you will need to have enabled access to your on-premise network from the subnet in the Amazon VPC. If you haven’t enabled it already, you can Simulate Site-to-Site VPN Customer Gateways Using strongSwan, or you can enable access by completing the AWS Site-to-Site VPN getting started guide.

If you are unsure if you have enabled access from your VPC subnet to your on-premise database, you can test access by running the AWS Systems Manager automation AWSSupport-SetupIPMonitoringFromVPC. From the User Guide, choose Run this Automation. In the Systems Manager console, under Input Parameters, you will need to enter the Amazon VPC subnet ID for SubnetId and the IP address of your on-premise host for TargetIPs. Then choose Execute. Once the automation completes, locate the Outputs section and open the URL linked under createCloudWatchDashboard.Output. From that dashboard, confirm from the Ping log group that pings are successful. If they are not, you will need to investigate. A useful resource for this is How do I troubleshoot instance connection timeout errors in Amazon VPC.

Step 1: Configure the Amazon VPC

Ensure your Amazon VPC has DNS Support and DNS Hostnames enabled. You can verify this by selecting your VPC in the Amazon VPC console and checking the details for DNS hostnames and DNS resolution. If they are disabled, they can be enabled by choosing Actions then the corresponding Edit option.

On-premise or hybrid DNS are also supported, but requires additional setup. See Other Considerations at the end of this post for more.

Step 2: Configure the Amazon VPC Subnet

Your subnet must have access to Amazon S3 and AWS Glue services. I will add VPC endpoints for Amazon S3 and AWS Glue services to keep my traffic within the AWS network.

  1. To add the VPC endpoint for Amazon S3, open the Amazon VPC console at https://console.aws.amazon.com/vpc/.
  2. In the navigation pane, choose Endpoints, Create Endpoint.
  3. Filter by “s3”.
  4. Choose the service where the Type column indicates Gateway.
  5. Select the route tables associated with the subnet to be used with DataBrew.
  6. Choose Create endpoint.
  7. To add the VPC endpoint for AWS Glue, again choose Create Endpoint.
  8. Filter by “glue”.
  9. Choose the service where the Type column indicates Interface.
  10. Select the route tables associated with the subnet to be used with DataBrew.
  11. Choose Create endpoint.

Step 3 : Configure Network ACL

By default Network ACLs allow all inbound and outbound traffic. If you have customized your network ACL, ensure inbound return traffic from and outbound traffic to your on-premise network, Amazon S3, and AWS Glue are allowed.

  1. From the Amazon VPC console, choose Subnets.
  2. Choose the subnet you are using with DataBrew.
  3. From the Details tab, choose the Network ACL link.
  4. Validate your inbound and outbound rules, updating your rules to allow the required traffic if needed. The screenshot below shows the default rules I am using.

Step 4: Configure the VPC security group

To provide connectivity to your VPC, DataBrew will create an Elastic Network Interface (ENI) in the VPC subnet you specify. DataBrew attaches the security group you specify to the ENI to limit network access. This security group must have a self-referential rule to allow all inbound TCP traffic from itself. This will block access from unspecified sources. I will be using the default security group, which has the following configuration.

Your security group must allow outbound traffic to itself, Amazon S3, AWS Glue, and your on-premise network. I’ll be using the default security group, which allows all outbound traffic.

Optionally, you may wish to explicitly restrict outbound traffic to only your on-premise network, Amazon S3, and AWS Glue. To do so, remove the All TPC outbound rule. Ensure your security group has a self-referential rule to allow all outbound TCP traffic to itself. Allow traffic to your on-premise network by specifying the CIDR block associated with your network. In my case, it is 10.196.0.0/16. Allow traffic to Amazon S3 with the AWS-managed S3 prefix list, which includes the set of CIDR blocks for Amazon S3. Allow traffic to the AWS Glue VPC endpoint by associating the same security group with the AWS Glue VPC endpoint created above from the Amazon VPC console.

An example of what these scoped-down outbound rules may look like:

Ensure your on-premise network security rules allow traffic from your Amazon VPC subnet’s CIDR block.

Step 5 : Create database credentials

Following best practices, I will be creating a database user with scoped down permissions for use with DataBrew.

  1. Connect to your database. In my case with psql -h 10.196.0.20
  2. Create a user, which I’ll call postgresql, with readonly access to the table that will be used with DataBrew. My table is called demo in database postgres. I’ll do this by executing the following queries:
    CREATE USER postgresql WITH PASSWORD ‘****’;
    GRANT CONNECT ON DATABASE postgres TO postgresql;
    GRANT USAGE ON SCHEMA public TO postgresql;
    REVOKE CREATE ON SCHEMA public FROM postgresql;
    GRANT SELECT ON demo TO postgresql;

Step 6 : Create DataBrew project

  1. From the AWS DataBrew console, choose Create project.
  2. Enter a Project name.
  3. Under Select a dataset choose New dataset.
  4. Enter a Dataset name.
  5. Under Connect to new dataset choose JDBC.
  6. Choose Add JDBC connection.
  7. Enter a Connection name, I use my-connection.
  8. Choose Enter JDBC details.
  9. Choose Database type, in my case PostgreSQL.
  10. For Host/endpoint, enter your host’s private IP address.
  11. Enter your Database name, Database user, and Database password.
  12. Choose your VPC, and the Subnet and Security Group you configured above.
  13. Review “Additional configurations”, where you can optionally configure the following:

    1. If you are using a recent database version, such as MySQL 8, you may need to provide a custom JDBC driver. For more information, see the Developer Guide.
    2. DataBrew can be set to fail the connection to your database if it is unable to connect over SSL. Additionally, DataBrew provides default certificates for establishing SSL connections. If you obtained a certificate from a third-party issuer, or the default certificates provided by DataBrew do not meet your requirements, you can provide your own. DataBrew handles only X.509 certificates. The certificate must be DER-encoded and supplied in base64 encoding PEM format.
  14. Choose Create connection at the bottom of the modal.
  15. Choose the newly created connection by clicking on its name.
  16. Enter the name of the table within your database you want to bring into DataBrew.
  17. Under the Permissions header, choose Create new IAM role from the dropdown and enter a role suffix.
  18. Choose Create project, this will open the project view. After one to two minutes you will be able to work with your data. If the connection fails, see How do I troubleshoot instance connection timeout errors in Amazon VPC.
  19. Start by applying some simple transforms, I’m dropping some columns that are not needed in my data lake. To do so, from the action bar I choose COLUMN, then Delete.
  20. This opens the side-bar where I choose the column to delete, and choose Apply.

Step 7 : Create DataBrew job

Once I’ve got a few transforms added to my project’s recipe, I will run a job to execute the recipe against my full dataset, and store the result in my Amazon S3 bucket.

  1. Choose Create job from the top of the project grid view.
  2. On the job page, provide a Job name and S3 output location.
  3. Under the header Permissions, choose Create new IAM role. This will create a new scoped down IAM role with the permissions required to execute your job.
  4. Finally, choose Create and run job. Once the job completes, you can view the output in Amazon S3.

Cleanup

From the DataBrew console, delete your Job, Project, Recipe, and Dataset.

If you executed the Systems Manager automation to test access, under the Systems Manager console, choose CloudWatch Dashboard. Select the dashboard created by the automation. Choose Terminate test. Then choose Execute.

Other considerations

AWS Glue DataBrew’s networking requirements are similar to that of AWS Glue ETL jobs. Below summarizes some of those advanced networking conditions. For more details on AWS Glue ETL, see How to access and analyze on-premises data stores using AWS Glue by Rajeev Meharwal.

DNS

If you are using AWS VPC provided DNS, ensure you have enabled DnsHostnames and DnsSupport for your VPC. For more information, see DNS support in your VPC.

If you have configured a custom DNS server with your AWS VPC, you must implement forward and reverse lookups for Amazon EC2 private DNS hostnames. For more information, see Amazon DNS server. Alternatively, setup hybrid DNS resolution to resolve both on-premise DNS servers and the VPC provided DNS. For implementation details, see the following AWS Security Blog posts:

Joining or unioning multiple databases

If you are joining a database dataset into your project, the database must be accessible from the project dataset’s subnet.

For example, if you have completed the setup above using Private Subnet 1, and you have another Amazon RDS database in Private Subnet 2 in the same VPC, as shown below. You will want a local route for the route table associated with Subnet 1. You will also need to ensure the security group attached to your Amazon RDS database allows inbound traffic from your DataBrew security group.

If your Amazon RDS database is in a different AWS VPC than you are using with DataBrew, you will need to setup VPC peering.


About the Authors

John Espenhahn is a Software Engineer working on AWS Glue DataBrew service. He has also worked on Amazon Kendra user experience as a part of Database, Analytics & AI AWS consoles. He is passionate about technology and building in the analytics space.

 

 

 

Nitya Sheth is a Software Engineer working on AWS Glue DataBrew service. He has also worked on AWS Synthetics service as well as on user experience implementations for Database, Analytics & AI AWS consoles. In his free time, he divides his time between exploring new hiking places and new books.

 

 

 

 

Simplify semi-structured nested JSON data analysis with AWS Glue DataBrew and Amazon QuickSight

Post Syndicated from Sriharsh Adari original https://aws.amazon.com/blogs/big-data/simplify-semi-structured-nested-json-data-analysis-with-aws-glue-databrew-and-amazon-quicksight/

As the industry grows with more data volume, big data analytics is becoming a common requirement in data analytics and machine learning (ML) use cases. Data comes from many different sources in structured, semi-structured, and unstructured formats. For semi-structured data, one of the most common lightweight file formats is JSON. However, due to the complex nature of data, JSON often includes nested key-value structures. Analysts may want a simpler graphical user interface to conduct data analysis and profiling.

To support these requirements, AWS Glue DataBrew offers an easy visual data preparation tool with over 350 pre-built transformations. You can use DataBrew to analyze complex nested JSON files that would otherwise require days or weeks writing hand-coded transformations. You can then use Amazon QuickSight for data analysis and visualization.

In this post, we demonstrate how to configure DataBrew to work with nested JSON objects and use QuickSight for data visualization.

Solution overview

To implement our solution, we create a DataBrew project and DataBrew job for unnesting data. We profile the unested data in DataBrew and analyze data in QuickSight. The following diagram illustrates the architecture of this solution.

Prerequisites

Before you get started, make sure you have the following prerequisites:

Prepare the data

To illustrate the DataBrew functionality to support data analysis for nested JSON files, we use a publicly available sample customer order details nested JSON dataset.

Complete the following steps to prepare your data:

  1. Sign in to the AWS Management Console.
  2. Browse to the publicly available datasets on the Amazon S3 console.
  3. Select the first dataset (customer_1.json) and choose Download to save the files on your local machine.
  4. Repeat this step to download all three JSON files.

    You can view the sample data from your local machine using any text editor, as shown in the following screenshot.
  5. Create input and output S3 buckets with subfolders nestedjson and outputjson to capture data.
  6. Choose Upload and upload the three JSON files to the nestedjson folder.

Create a DataBrew project

To create your Amazon S3 connection, complete the following steps:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Choose Create project.
  3. For Project name, enter Glue-DataBew-NestedJSON-Blog.
  4. Select New dataset.
  5. For Dataset name, enter Glue-DataBew-NestedJSON-Dataset.
  6. For Enter your source from S3, enter the path to the nestedjson folder.
  7. Choose Select the entire folder to select all the files.
  8. Under Additional configurations, select JSON as the file type, then select JSON document.
  9. In the Permissions section, choose Choose existing IAM role if you have one available, or choose Create new IAM role.
  10. Choose Create project.
  11. Skip the preview steps and wait for the project to be ready.
    As shown in the following screenshot, the three JSON files were uploaded to the S3 bucket, so three rows of customer order details are loaded.
    The orders column contains nested files. We can use DataBrew to unnest or nest transform to flatten the columns and rows.
  12. Choose the menu icon (three dots) and choose Nest-unnest.
  13. Depending on the nesting, either choose Unnest to columns or Unnest to rows. In this blog post, we choose Unnest to columns to flatten example JSON file.

    Repeat this step until you get a flattened json for all the nested json data and this will create the AWS Glue Databrew recipe as shown below.
  14. Choose Apply.

    DataBrew automatically creates the required recipe steps with updated column values.
  15. Choose Create job.
  16. For Job name, enter Glue-DataBew-NestedJSON-job.
  17. For S3 location, enter the path to the outputjson folder.
  18. In the Permissions section, for Role name, choose the role you created earlier.
  19. Choose Create and run job.

On the Jobs page, you can choose the job to view its run history, details, and data lineage.

Profile the metadata with DataBrew

After you have a flattened file in the S3 output bucket, you can use DataBrew to carry out the data analysis and profiling for the flattened file. Complete the following steps:

  1. On the Datasets page, choose Connect new datasets.
  2. Provide your dataset details and choose Create dataset.
  3. Choose the newly added data source, then choose the Data profile overview tab.
  4. Enter the name of the job and the S3 path to save the output.
  5. Choose Create and run job.

The job takes around two minutes to complete and display all the updated information. You can explore the data further on the Data profile overview and Column statistics tabs.

Visualize the data in QuickSight

After you have the output file generated by DataBrew in the S3 output bucket, you can use QuickSight to query the JSON data. QuickSight is a scalable, serverless, embeddable, ML-powered business intelligence (BI) service built for the cloud. QuickSight lets you easily create and publish interactive BI dashboards that include ML-powered insights. QuickSight dashboards can be accessed from any device, and seamlessly embedded into your applications, portals, and websites.

Launch QuickSight

On the console, enter quicksight into the search bar and choose QuickSight.

You’re presented with the QuickSight welcome page. If you haven’t signed up for QuickSight, you may have to complete the signup wizard. For more information, refer to Signing up for an Amazon QuickSight subscription.

After you have signed up, QuickSight presents a “Welcome wizard.” You can view the short tutorial, or you can close it.

Grant Amazon S3 access

To grant Amazon S3 access, complete the following steps:

  1. On the QuickSight console, choose your user name, choose Manage QuickSight, then choose Security & permissions.
  2. Choose Add or remove.
  3. Locate Amazon S3 in the list. Choose one of the following:
    1. If the check box is clear, select Amazon S3.
    2. If the check box is already selected, choose Details, then choose Select S3 buckets.
  4. Choose the buckets that you want to access from QuickSight, then choose Select.
  5. Choose Update.
  6. If you changed your Region during the first step of this process, change it back to the Region that you want to use.

Create a dataset

Now that you have QuickSight up and running, you can create your dataset. Complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.

    QuickSight supports several data sources. For a complete list, refer to Supported data sources.
  3. For your data source, choose S3.

    The S3 import requires a data source name and a manifest file.
  4. On your machine, use a text editor to create a manifest file called BlogGlueDataBrew.manifest using the following structure (provide the name of the your output bucket):
    {
        "fileLocations": [
            {
                "URIPrefixes": [
                "https://s3.amazonaws.com/ s3://<output bucket>/outputjson/"
                ]
            }
        ],
        "globalUploadSettings": {
            "format": "CSV",
            "delimiter": ","
        }
    }

    The manifest file points to the folder that you created earlier as part of your DataBrew project. For more information, refer to Supported formats for Amazon S3 manifest files.

  5. Select Upload and navigate to the manifest file to upload it.
  6. Choose Connect to upload data into SPICE, which is an in-memory database built into QuickSight to achieve fast performance.
  7. Choose Visualize.

You can now create visuals by adding different fields.

To learn more about authoring dashboards in QuickSight, check out the QuickSight Author Workshop.

Clean up

Complete the following steps to avoid incurring future charges:

  1. On the DataBrew console, choose Projects in the navigation pane.
  2. Select the project you created and on the Actions menu, choose Delete.
  3. Choose Jobs in the navigation pane.
  4. Select the job you created and on the Actions menu, choose Delete.
  5. Choose Recipes in the navigation pane.
  6. Select the recipe you created and on the Actions menu, choose Delete.
  7. On the QuickSight dashboard, choose your user name on the application bar, then choose Manage QuickSight.
  8. Choose Account settings, then choose Delete account.
  9. Choose Delete account.
  10. Enter confirm and choose Delete account.

Conclusion

This post walked you through the steps to configure DataBrew to work with nested JSON objects and use QuickSight for data visualization. We used Glue DataBrew to unnest our JSON file and profile the data, and then used QuickSight to create dashboards and visualizations for further analysis.

You can use this solution for your own use cases when you need to unnest complex semi-structured JSON files without writing code. If you have comments or feedback, please leave them in the comments section.


About the authors

Sriharsh Adari is a Senior Solutions Architect at Amazon Web Services (AWS), where he helps customers work backwards from business outcomes to develop innovative solutions on AWS. Over the years, he has helped multiple customers on data platform transformations across industry verticals. His core area of expertise include Technology Strategy, Data Analytics, and Data Science. In his spare time, he enjoys playing sports, binge-watching TV shows, and playing Tabla.

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Amogh Gaikwad is a Solutions Developer at Amazon Web Services. He helps global customers build and deploy AI/ML solutions. His work is mainly focused on computer vision, and NLP uses-cases and helping customers optimize their AI/ML workloads for sustainability. Amogh has received his master’s in Computer Science specializing in Machine Learning.

Ingest VPC flow logs into Splunk using Amazon Kinesis Data Firehose

Post Syndicated from Ranjit Kalidasan original https://aws.amazon.com/blogs/big-data/ingest-vpc-flow-logs-into-splunk-using-amazon-kinesis-data-firehose/

In September 2017, during the annual Splunk.conf, Splunk and AWS jointly announced Amazon Kinesis Data Firehose integration to support Splunk Enterprise and Splunk Cloud as a delivery destination. This native integration between Splunk Enterprise, Splunk Cloud, and Kinesis Data Firehose is designed to make AWS data ingestion setup seamless, while offering a secure and fault-tolerant delivery mechanism. We want to enable you to monitor and analyze machine data from any source and use it to deliver operational intelligence and optimize IT, security, and business performance.

With Kinesis Data Firehose, you can use a fully managed, reliable, and scalable data streaming solution to Splunk. In September 2022, AWS announced a new Amazon Virtual Private Cloud (Amazon VPC) feature that enables you to create VPC flow logs to send the flow log data directly into Kinesis Data Firehose as a destination. Previously, you could send VPC flow logs to either Amazon CloudWatch Logs or Amazon Simple Storage Service (Amazon S3) before it was ingested by other AWS or Partner tools. In this post, we show you how to use this feature to set up VPC flow logs for ingesting into Splunk using Kinesis Data Firehose.

Overview of solution

We deploy the following architecture to ingest data into Splunk.

We create a VPC flow log in an existing VPC to send the flow log data to a Kinesis Data Firehose delivery stream. This delivery stream has an AWS Lambda function enabled for data transformation and has destination settings to point to the Splunk endpoint along with an HTTP Event Collector (HEC) token.

Prerequisites

Before you begin, ensure that you have the following prerequisites:

  • AWS account – If you don’t have an AWS account, you can create one. For more information, see Setting Up for Amazon Kinesis Data Firehose.
  • Splunk AWS Add-on – Ensure you install the Splunk AWS Add-on app from Splunkbase in your Splunk deployment. This app provides the required source types and event types mapping to AWS machine data.
  • HEC token – In your Splunk deployment, set up an HEC token with the source type aws:cloudwatchlogs:vpcflow.

Create the transformation Lambda function

Integrating VPC flow logs with Kinesis Data Firehose requires a Lambda function to transform the flow log records. The data that VPC flow logs sends to the delivery stream is encoded as JSON records. However, Splunk expects this as raw flow log data. Therefore, when you create the delivery stream, you enable data transformation and configure a Lambda function to transform the flow log data to raw format. Kinesis Data Firehose then sends the data in raw format to Splunk.

You can deploy this transformation Lambda function as a serverless application from the Lambda serverless app repository on the Lambda console. The name of this application is splunk-firehose-flowlogs-processor.

After it’s deployed, you can see a Lambda function and an AWS Identity and Access Management (IAM) role getting deployed on the console. Note the physical ID of the Lambda function; you use this when you create the Firehose delivery stream in the next step.

Create a Kinesis Data Firehose delivery stream

In this step, you create a Kinesis Data Firehose delivery stream to receive the VPC flow log data and deliver that data to Splunk.

  1. On the Kinesis Data Firehose console, create a new delivery stream.
  2. For Source, choose Direct PUT.
  3. For Destination, choose Splunk.
  4. For Delivery stream name, enter a name (for example, VPCtoSplunkStream).
  5. In the Transform records section, for Data transformation, select Enabled.
  6. For AWS Lambda function, choose Browse.
  7. Select the function you created earlier by looking for the physical ID.
  8. Choose Choose.
  9. In the Destination settings section, for Splunk cluster endpoint, enter your endpoint.If you’re using a Splunk Cloud endpoint, refer to Configure Amazon Kinesis Firehose to send data to the Splunk platform for different Splunk cluster endpoint values.
  10. For Splunk endpoint type, select Raw endpoint.
  11. For Authentication token, enter the value of your Splunk HEC that you created as a prerequisite.
  12. In the Backup settings section, for Source record backup in Amazon S3, select Failed events only so you only save the data that fails to be ingested into Splunk.
  13. For S3 backup bucket, enter the path to an S3 bucket.
  14. Complete creating your delivery stream.

The creation process may take a few minutes to complete.

Create a VPC flow log

In this final step, you create a VPC flow log with Kinesis Data Firehose as destination type.

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC for which to create the flow log.
  3. On the Actions menu, choose Create flow log.
  4. Provide the required settings for Filter:
    1. If you want to filter the flow logs, select Accept traffic or Reject traffic.
    2. Select All if you need all the information sent to Splunk.
  5. For Maximum aggregation interval, select a suitable interval for your use case.Select the minimum setting of 1 minute interval if you need the flow log data to be available for near-real-time analysis in Splunk.
  6. For Destination, select Send to Kinesis Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.If you want to send the data to a different account, refer to Publish flow logs to Kinesis Data Firehose.
  7. For Log record format, if you leave it at AWS default format, the flow logs are sent as version 2 format. Alternatively, you can specify which fields you need to be captured and sent to Splunk.For more information on log format and available fields, refer to Flow log records.
  8. Review all the parameters and create the flow log.Within a few minutes, you should be able to see the data in Splunk.
  9. Open your Splunk console and navigate to the Search tab of the Search & Reporting app.
  10. Run the following SPL query to look at sample VPC flow log records:
    index=<index name> sourcetype="aws:cloudwatchlogs:vpcflow"

Clean up

To avoid incurring future charges, delete the resources you created in the following order:

  1. Delete the VPC flow log.
  2. Delete the Kinesis Data Firehose delivery stream.
  3. Delete the serverless application to delete the transformation Lambda function.
  4. If you created a new VPC and new resources in the VPC, then delete the resources and VPC.

Conclusion

You can use VPC flow log data in multiple Splunk solutions, like the Splunk App for AWS Security Dashboards for traffic analysis or Splunk Security Essentials, which uses the data to provide deeper insights into the security posture of your AWS environment. Using Kinesis Data Firehose to send VPC flow log data into Splunk provides many benefits. This managed service can automatically scale to meet the data demand and provide near-real-time data analysis. Try out this new quick and hassle-free way of sending your VPC flow logs to Splunk Enterprise or Splunk Cloud Platform using Kinesis Data Firehose.

You can deploy this solution today on your AWS account by following the Kinesis Data Firehose Immersion Day Lab for Splunk


About the authors

Ranjit Kalidasan is a Senior Solutions Architect with Amazon Web Services based in Boston, Massachusetts. He is Partner Solutions Architect helping security ISV partners to co-build and co-market solutions with AWS. He brings over 20 years of experience in Information technology helping global customers implement complex solutions for Security & Analytics. You can connect with Ranjit in Linkedin.

Introducing runtime roles for Amazon EMR steps: Use IAM roles and AWS Lake Formation for access control with Amazon EMR

Post Syndicated from Stefano Sandona original https://aws.amazon.com/blogs/big-data/introducing-runtime-roles-for-amazon-emr-steps-use-iam-roles-and-aws-lake-formation-for-access-control-with-amazon-emr/

You can use the Amazon EMR Steps API to submit Apache Hive, Apache Spark, and others types of applications to an EMR cluster. You can invoke the Steps API using Apache Airflow, AWS Steps Functions, the AWS Command Line Interface (AWS CLI), all the AWS SDKs, and the AWS Management Console. Jobs submitted with the Steps API use the Amazon Elastic Compute Cloud (Amazon EC2) instance profile to access AWS resources such as Amazon Simple Storage Service (Amazon S3) buckets, AWS Glue tables, and Amazon DynamoDB tables from the cluster.

Previously, if a step needed access to a specific S3 bucket and another step needed access to a specific DynamoDB table, the AWS Identity and Access Management (IAM) policy attached to the instance profile had to allow access to both the S3 bucket and the DynamoDB table. This meant that the IAM policies you assigned to the instance profile had to contain a union of all the permissions for every step that ran on an EMR cluster.

We’re happy to introduce runtime roles for EMR steps. A runtime role is an IAM role that you associate with an EMR step, and jobs use this role to access AWS resources. With runtime roles for EMR steps, you can now specify different IAM roles for the Spark and the Hive jobs, thereby scoping down access at a job level. This allows you to simplify access controls on a single EMR cluster that is shared between multiple tenants, wherein each tenant can be easily isolated using IAM roles.

The ability to specify an IAM role with a job is also available on Amazon EMR on EKS and Amazon EMR Serverless. You can also use AWS Lake Formation to apply table- and column-level permission for Apache Hive and Apache Spark jobs that are submitted with EMR steps. For more information, refer to Configure runtime roles for Amazon EMR steps.

In this post, we dive deeper into runtime roles for EMR steps, helping you understand how the various pieces work together, and how each step is isolated on an EMR cluster.

Solution overview

In this post, we walk through the following:

  1. Create an EMR cluster enabled to use the new role-based access control with EMR steps.
  2. Create two IAM roles with different permissions in terms of the Amazon S3 data and Lake Formation tables they can access.
  3. Allow the IAM principal submitting the EMR steps to use these two IAM roles.
  4. See how EMR steps running with the same code and trying to access the same data have different permissions based on the runtime role specified at submission time.
  5. See how to monitor and control actions using source identity propagation.

Set up EMR cluster security configuration

Amazon EMR security configurations simplify applying consistent security, authorization, and authentication options across your clusters. You can create a security configuration on the Amazon EMR console or via the AWS CLI or AWS SDK. When you attach a security configuration to a cluster, Amazon EMR applies the settings in the security configuration to the cluster. You can attach a security configuration to multiple clusters at creation time, but can’t apply them to a running cluster.

To enable runtime roles for EMR steps, we have to create a security configuration as shown in the following code and enable the runtime roles property (configured via EnableApplicationScopedIAMRole). In addition to the runtime roles, we’re enabling propagation of the source identity (configured via PropagateSourceIdentity) and support for Lake Formation (configured via LakeFormationConfiguration). The source identity is a mechanism to monitor and control actions taken with assumed roles. Enabling Propagate source identity allows you to audit actions performed using the runtime role. Lake Formation is an AWS service to securely manage a data lake, which includes defining and enforcing central access control policies for your data lake.

Create a file called step-runtime-roles-sec-cfg.json with the following content:

{
    "AuthorizationConfiguration": {
        "IAMConfiguration": {
            "EnableApplicationScopedIAMRole": true,
            "ApplicationScopedIAMRoleConfiguration": 
                {
                    "PropagateSourceIdentity": true
                }
        },
        "LakeFormationConfiguration": {
            "AuthorizedSessionTagValue": "Amazon EMR"
        }
    }
}

Create the Amazon EMR security configuration:

aws emr create-security-configuration \
--name 'iamconfig-with-iam-lf' \
--security-configuration file://step-runtime-roles-sec-cfg.json

You can also do the same via the Amazon console:

  1. On the Amazon EMR console, choose Security configurations in the navigation pane.
  2. Choose Create.
  3. Choose Create.
  4. For Security configuration name, enter a name.
  5. For Security configuration setup options, select Choose custom settings.
  6. For IAM role for applications, select Runtime role.
  7. Select Propagate source identity to audit actions performed using the runtime role.
  8. For Fine-grained access control, select AWS Lake Formation.
  9. Complete the security configuration.

The security configuration appears in your security configuration list. You can also see that the authorization mechanism listed here is the runtime role instead of the instance profile.

Launch the cluster

Now we launch an EMR cluster and specify the security configuration we created. For more information, refer to Specify a security configuration for a cluster.

The following code provides the AWS CLI command for launching an EMR cluster with the appropriate security configuration. Note that this cluster is launched on the default VPC and public subnet with the default IAM roles. In addition, the cluster is launched with one primary and one core instance of the specified instance type. For more details on how to customize the launch parameters, refer to create-cluster.

If the default EMR roles EMR_EC2_DefaultRole and EMR_DefaultRole don’t exist in IAM in your account (this is the first time you’re launching an EMR cluster with those), before launching the cluster, use the following command to create them:

aws emr create-default-roles

Create the cluster with the following code:

#Change with your Key Pair
KEYPAIR=<MY_KEYPAIR>
INSTANCE_TYPE="r4.4xlarge"
#Change with your Security Configuration Name
SECURITY_CONFIG="iamconfig-with-iam-lf"
#Change with your S3 log URI
LOG_URI="s3://mybucket/logs/"

aws emr create-cluster \
--name "iam-passthrough-cluster" \
--release-label emr-6.7.0 \
--use-default-roles \
--security-configuration $SECURITY_CONFIG \
--ec2-attributes KeyName=$KEYPAIR \
--instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=$INSTANCE_TYPE  InstanceGroupType=CORE,InstanceCount=1,InstanceType=$INSTANCE_TYPE \
--applications Name=Spark Name=Hadoop Name=Hive \
--log-uri $LOG_URI

When the cluster is fully provisioned (Waiting state), let’s try to run a step on it with runtime roles for EMR steps enabled:

#Change with your EMR cluster ID
CLUSTER_ID=j-XXXXXXXXXXXXX
aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
            "Type": "CUSTOM_JAR",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Name": "Spark Example",
            "Args": [
              "spark-submit",
              "--class",
              "org.apache.spark.examples.SparkPi",
              "/usr/lib/spark/examples/jars/spark-examples.jar",
              "5"
            ]
        }]'

After launching the command, we receive the following as output:

An error occurred (ValidationException) when calling the AddJobFlowSteps operation: Runtime roles are required for this cluster. Please specify the role using the ExecutionRoleArn parameter.

The step failed, asking us to provide a runtime role. In the next section, we set up two IAM roles with different permissions and use them as the runtime roles for EMR steps.

Set up IAM roles as runtime roles

Any IAM role that you want to use as a runtime role for EMR steps must have a trust policy that allows the EMR cluster’s EC2 instance profile to assume it. In our setup, we’re using the default IAM role EMR_EC2_DefaultRole as the instance profile role. In addition, we create two IAM roles called test-emr-demo1 and test-emr-demo2 that we use as runtime roles for EMR steps.

The following code is the trust policy for both of the IAM roles, which lets the EMR cluster’s EC2 instance profile role, EMR_EC2_DefaultRole, assume these roles and set the source identity and LakeFormationAuthorizedCaller tag on the role sessions. The TagSession permission is needed so that Amazon EMR can authorize to Lake Formation. The SetSourceIdentity statement is needed for the propagate source identity feature.

Create a file called trust-policy.json with the following content (replace 123456789012 with your AWS account ID):

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:role/EMR_EC2_DefaultRole"
            },
            "Action": "sts:AssumeRole"
        },
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:role/EMR_EC2_DefaultRole"
            },
            "Action": "sts:SetSourceIdentity"
        },
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:role/EMR_EC2_DefaultRole"
            },
            "Action": "sts:TagSession",
            "Condition": {
                "StringEquals": {
                    "aws:RequestTag/LakeFormationAuthorizedCaller": "Amazon EMR"
                }
            }
        }
    ]
}

Use that policy to create the two IAM roles, test-emr-demo1 and test-emr-demo2:

aws iam create-role \
--role-name test-emr-demo1 \
--assume-role-policy-document file://trust-policy.json

aws iam create-role \
--role-name test-emr-demo2 \
--assume-role-policy-document file://trust-policy.json

Set up permissions for the principal submitting the EMR steps with runtime roles

The IAM principal submitting the EMR steps needs to have permissions to invoke the AddJobFlowSteps API. In addition, you can use the Condition key elasticmapreduce:ExecutionRoleArn to control access to specific IAM roles. For example, the following policy allows the IAM principal to only use IAM roles test-emr-demo1 and test-emr-demo2 as the runtime roles for EMR steps.

  1. Create the job-submitter-policy.json file with the following content (replace 123456789012 with your AWS account ID):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "AddStepsWithSpecificExecRoleArn",
                "Effect": "Allow",
                "Action": [
                    "elasticmapreduce:AddJobFlowSteps"
                ],
                "Resource": "*",
                "Condition": {
                    "StringEquals": {
                        "elasticmapreduce:ExecutionRoleArn": [
                            "arn:aws:iam::123456789012:role/test-emr-demo1",
                            "arn:aws:iam::123456789012:role/test-emr-demo2"
                        ]
                    }
                }
            },
            {
                "Sid": "EMRDescribeCluster",
                "Effect": "Allow",
                "Action": [
                    "elasticmapreduce:DescribeCluster"
                ],
                "Resource": "*"
            }
        ]
    }

  2. Create the IAM policy with the following code:
    aws iam create-policy \
    --policy-name emr-runtime-roles-submitter-policy \
    --policy-document file://job-submitter-policy.json

  3. Assign this policy to the IAM principal (IAM user or IAM role) you’re going to use to submit the EMR steps (replace 123456789012 with your AWS account ID and replace john with the IAM user you use to submit your EMR steps):
    aws iam attach-user-policy \
    --user-name john \
    --policy-arn "arn:aws:iam::123456789012:policy/emr-runtime-roles-submitter-policy"

IAM user john can now submit steps using arn:aws:iam::123456789012:role/test-emr-demo1 and arn:aws:iam::123456789012:role/test-emr-demo2 as the step runtime roles.

Use runtime roles with EMR steps

We now prepare our setup to show runtime roles for EMR steps in action.

Set up Amazon S3

To prepare your Amazon S3 data, complete the following steps:

  1. Create a CSV file called test.csv with the following content:
    1,a,1a
    2,b,2b

  2. Upload the file to Amazon S3 in three different locations:
    #Change this with your bucket name
    BUCKET_NAME="emr-steps-roles-new-us-east-1"
    
    aws s3 cp test.csv s3://${BUCKET_NAME}/demo1/
    aws s3 cp test.csv s3://${BUCKET_NAME}/demo2/
    aws s3 cp test.csv s3://${BUCKET_NAME}/nondemo/

    For our initial test, we use a PySpark application called test.py with the following contents:

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("my app").enableHiveSupport().getOrCreate()
    
    #Change this with your bucket name
    BUCKET_NAME="emr-steps-roles-new-us-east-1"
    
    try:
      spark.read.csv("s3://" + BUCKET_NAME + "/demo1/test.csv").show()
      print("Accessed demo1")
    except:
      print("Could not access demo1")
    
    try:
      spark.read.csv("s3://" + BUCKET_NAME + "/demo2/test.csv").show()
      print("Accessed demo2")
    except:
      print("Could not access demo2")
    
    try:
      spark.read.csv("s3://" + BUCKET_NAME + "/nondemo/test.csv").show()
      print("Accessed nondemo")
    except:
      print("Could not access nondemo")
    spark.stop()

    In the script, we’re trying to access the CSV file present under three different prefixes in the test bucket.

  3. Upload the Spark application inside the same S3 bucket where we placed the test.csv file but in a different location:
    #Change this with your bucket name
    BUCKET_NAME="emr-steps-roles-new-us-east-1"
    aws s3 cp test.py s3://${BUCKET_NAME}/scripts/

Set up runtime role permissions

To show how runtime roles for EMR steps works, we assign to the roles we created different IAM permissions to access Amazon S3. The following table summarizes the grants we provide to each role (emr-steps-roles-new-us-east-1 is the bucket you configured in the previous section).

S3 locations \ IAM Roles test-emr-demo1 test-emr-demo2
s3://emr-steps-roles-new-us-east-1/* No Access No Access
s3://emr-steps-roles-new-us-east-1/demo1/* Full Access No Access
s3://emr-steps-roles-new-us-east-1/demo2/* No Access Full Access
s3://emr-steps-roles-new-us-east-1/scripts/* Read Access Read Access
  1. Create the file demo1-policy.json with the following content (substitute emr-steps-roles-new-us-east-1 with your bucket name):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:*"
                ],
                "Resource": [
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/demo1",
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/demo1/*"
                ]                    
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*"
                ],
                "Resource": [
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/scripts",
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/scripts/*"
                ]                    
            }
        ]
    }

  2. Create the file demo2-policy.json with the following content (substitute emr-steps-roles-new-us-east-1 with your bucket name):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:*"
                ],
                "Resource": [
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/demo2",
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/demo2/*"
                ]                    
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:Get*"
                ],
                "Resource": [
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/scripts",
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/scripts/*"
                ]                    
            }
        ]
    }

  3. Create our IAM policies:
    aws iam create-policy \
    --policy-name test-emr-demo1-policy \
    --policy-document file://demo1-policy.json
    
    aws iam create-policy \
    --policy-name test-emr-demo2-policy \
    --policy-document file://demo2-policy.json

  4. Assign to each role the related policy (replace 123456789012 with your AWS account ID):
    aws iam attach-role-policy \
    --role-name test-emr-demo1 \
    --policy-arn "arn:aws:iam::123456789012:policy/test-emr-demo1-policy"
    
    aws iam attach-role-policy \
    --role-name test-emr-demo2 \
    --policy-arn "arn:aws:iam::123456789012:policy/test-emr-demo2-policy"

    To use runtime roles with Amazon EMR steps, we need to add the following policy to our EMR cluster’s EC2 instance profile (in this example EMR_EC2_DefaultRole). With this policy, the underlying EC2 instances for the EMR cluster can assume the runtime role and apply a tag to that runtime role.

  5. Create the file runtime-roles-policy.json with the following content (replace 123456789012 with your AWS account ID):
    {
        "Version": "2012-10-17",
        "Statement": [{
                "Sid": "AllowRuntimeRoleUsage",
                "Effect": "Allow",
                "Action": [
                    "sts:AssumeRole",
                    "sts:TagSession",
                    "sts:SetSourceIdentity"
                ],
                "Resource": [
                    "arn:aws:iam::123456789012:role/test-emr-demo1",
                    "arn:aws:iam::123456789012:role/test-emr-demo2"
                ]
            }
        ]
    }

  6. Create the IAM policy:
    aws iam create-policy \
    --policy-name emr-runtime-roles-policy \
    --policy-document file://runtime-roles-policy.json

  7. Assign the created policy to the EMR cluster’s EC2 instance profile, in this example EMR_EC2_DefaultRole:
    aws iam attach-role-policy \
    --role-name EMR_EC2_DefaultRole \
    --policy-arn "arn:aws:iam::123456789012:policy/emr-runtime-roles-policy"

Test permissions with runtime roles

We’re now ready to perform our first test. We run the test.py script, previously uploaded to Amazon S3, two times as Spark steps: first using the test-emr-demo1 role and then using the test-emr-demo2 role as the runtime roles.

To run an EMR step specifying a runtime role, you need the latest version of the AWS CLI. For more details about updating the AWS CLI, refer to Installing or updating the latest version of the AWS CLI.

Let’s submit a step specifying test-emr-demo1 as the runtime role:

#Change with your EMR cluster ID
CLUSTER_ID=j-XXXXXXXXXXXXX
#Change with your AWS Account ID
ACCOUNT_ID=123456789012
#Change with your Bucket name
BUCKET_NAME=emr-steps-roles-new-us-east-1

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
            "Type": "CUSTOM_JAR",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Name": "Spark Example",
            "Args": [
              "spark-submit",
              "s3://'"${BUCKET_NAME}"'/scripts/test.py"
            ]
        }]' \
--execution-role-arn arn:aws:iam::${ACCOUNT_ID}:role/test-emr-demo1

This command returns an EMR step ID. To check our step output logs, we can proceed two different ways:

  • From the Amazon EMR console – On the Steps tab, choose the View logs link related to the specific step ID and select stdout.
  • From Amazon S3 – While launching our cluster, we configured an S3 location for logging. We can find our step logs under $(LOG_URI)/steps/<stepID>/stdout.gz.

The logs could take a couple of minutes to populate after the step is marked as Completed.

The following is the output of the EMR step with test-emr-demo1 as the runtime role:

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  a| 1a|
|  2|  b| 2b|
+---+---+---+

Accessed demo1
Could not access demo2
Could not access nondemo

As we can see, only the demo1 folder was accessible by our application.

Diving deeper into the step stderr logs, we can see that the related YARN application application_1656350436159_0017 was launched with the user 6GC64F33KUW4Q2JY6LKR7UAHWETKKXYL. We can confirm this by connecting to the EMR primary instance using SSH and using the YARN CLI:

[hadoop@ip-172-31-63-203]$ yarn application -status application_1656350436159_0017
...
Application-Id : application_1656350436159_0017
Application-Name : my app
Application-Type : SPARK
User : 6GC64F33KUW4Q2JY6LKR7UAHWETKKXYL
Queue : default
Application Priority : 0
...

Please note that in your case, the YARN application ID and the user will be different.

Now we submit the same script again as a new EMR step, but this time with the role test-emr-demo2 as the runtime role:

#Change with your EMR cluster ID
CLUSTER_ID=j-XXXXXXXXXXXXX
#Change with your AWS Account ID
ACCOUNT_ID=123456789012
#Change with your Bucket name
BUCKET_NAME=emr-steps-roles-new-us-east-1

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
            "Type": "CUSTOM_JAR",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Name": "Spark Example",
            "Args": [
              "spark-submit",
              "s3://'"${BUCKET_NAME}"'/scripts/test.py"
            ]
        }]' \
--execution-role-arn arn:aws:iam::${ACCOUNT_ID}:role/test-emr-demo2

The following is the output of the EMR step with test-emr-demo2 as the runtime role:

Could not access demo1
+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
|  1|  a| 1a|
|  2|  b| 2b|
+---+---+---+

Accessed demo2
Could not access nondemo

As we can see, only the demo2 folder was accessible by our application.

Diving deeper into the step stderr logs, we can see that the related YARN application application_1656350436159_0018 was launched with a different user 7T2ORHE6Z4Q7PHLN725C2CVWILZWYOLE. We can confirm this by using the YARN CLI:

[hadoop@ip-172-31-63-203]$ yarn application -status application_1656350436159_0018
...
Application-Id : application_1656350436159_0018
Application-Name : my app
Application-Type : SPARK
User : 7T2ORHE6Z4Q7PHLN725C2CVWILZWYOLE
Queue : default
Application Priority : 0
...

Each step was able to only access the CSV file that was allowed by the runtime role, so the first step was able to only access s3://emr-steps-roles-new-us-east-1/demo1/test.csv and the second step was only able to access s3://emr-steps-roles-new-us-east-1/demo2/test.csv. In addition, we observed that Amazon EMR created a unique user for the steps, and used the user to run the jobs. Please note that both roles need at least read access to the S3 location where the step scripts are located (for example, s3://emr-steps-roles-demo-bucket/scripts/test.py).

Now that we have seen how runtime roles for EMR steps work, let’s look at how we can use Lake Formation to apply fine-grained access controls with EMR steps.

Use Lake Formation-based access control with EMR steps

You can use Lake Formation to apply table- and column-level permissions with Apache Spark and Apache Hive jobs submitted as EMR steps. First, the data lake admin in Lake Formation needs to register Amazon EMR as the AuthorizedSessionTagValue to enforce Lake Formation permissions on EMR. Lake Formation uses this session tag to authorize callers and provide access to the data lake. The Amazon EMR value is referenced inside the step-runtime-roles-sec-cfg.json file we used earlier when we created the EMR security configuration, and inside the trust-policy.json file we used to create the two runtime roles test-emr-demo1 and test-emr-demo2.

We can do so on the Lake Formation console in the External data filtering section (replace 123456789012 with your AWS account ID).

On the IAM runtime roles’ trust policy, we already have the sts:TagSession permission with the condition “aws:RequestTag/LakeFormationAuthorizedCaller": "Amazon EMR". So we’re ready to proceed.

To demonstrate how Lake Formation works with EMR steps, we create one database named entities with two tables named users and products, and we assign in Lake Formation the grants summarized in the following table.

IAM Roles \ Tables entities
(DB)
users
(Table)
products
(Table)
test-emr-demo1 Full Read Access No Access
test-emr-demo2 Read Access on Columns: uid, state Full Read Access

Prepare Amazon S3 files

We first prepare our Amazon S3 files.

  1. Create the users.csv file with the following content:
    00005678,john,pike,england,london,Hidden Road 78
    00009039,paolo,rossi,italy,milan,Via degli Alberi 56A
    00009057,july,finn,germany,berlin,Green Road 90

  2. Create the products.csv file with the following content:
    P0000789,Bike2000,Sport
    P0000567,CoverToCover,Smartphone
    P0005677,Whiteboard X786,Home

  3. Upload these files to Amazon S3 in two different locations:
    #Change this with your bucket name
    BUCKET_NAME="emr-steps-roles-new-us-east-1"
    
    aws s3 cp users.csv s3://${BUCKET_NAME}/entities-database/users/
    aws s3 cp products.csv s3://${BUCKET_NAME}/entities-database/products/

Prepare the database and tables

We can create our entities database by using the AWS Glue APIs.

  1. Create the entities-db.json file with the following content (substitute emr-steps-roles-new-us-east-1 with your bucket name):
    {
        "DatabaseInput": {
            "Name": "entities",
            "LocationUri": "s3://emr-steps-roles-new-us-east-1/entities-database/",
            "CreateTableDefaultPermissions": []
        }
    }

  2. With a Lake Formation admin user, run the following command to create our database:
    aws glue create-database \
    --cli-input-json file://entities-db.json

    We also use the AWS Glue APIs to create the tables users and products.

  3. Create the users-table.json file with the following content (substitute emr-steps-roles-new-us-east-1 with your bucket name):
    {
        "TableInput": {
            "Name": "users",
            "StorageDescriptor": {
                "Columns": [{
                        "Name": "uid",
                        "Type": "string"
                    },
                    {
                        "Name": "name",
                        "Type": "string"
                    },
                    {
                        "Name": "surname",
                        "Type": "string"
                    },
                    {
                        "Name": "state",
                        "Type": "string"
                    },
                    {
                        "Name": "city",
                        "Type": "string"
                    },
                    {
                        "Name": "address",
                        "Type": "string"
                    }
                ],
                "Location": "s3://emr-steps-roles-new-us-east-1/entities-database/users/",
                "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
                "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
                "Compressed": false,
                "SerdeInfo": {
                    "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                    "Parameters": {
                        "field.delim": ",",
                        "serialization.format": ","
                    }
                }
            },
            "TableType": "EXTERNAL_TABLE",
            "Parameters": {
                "EXTERNAL": "TRUE"
            }
        }
    }

  4. Create the products-table.json file with the following content (substitute emr-steps-roles-new-us-east-1 with your bucket name):
    {
        "TableInput": {
            "Name": "products",
            "StorageDescriptor": {
                "Columns": [{
                        "Name": "product_id",
                        "Type": "string"
                    },
                    {
                        "Name": "name",
                        "Type": "string"
                    },
                    {
                        "Name": "category",
                        "Type": "string"
                    }
                ],
                "Location": "s3://emr-steps-roles-new-us-east-1/entities-database/products/",
                "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
                "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
                "Compressed": false,
                "SerdeInfo": {
                    "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                    "Parameters": {
                        "field.delim": ",",
                        "serialization.format": ","
                    }
                }
            },
            "TableType": "EXTERNAL_TABLE",
            "Parameters": {
                "EXTERNAL": "TRUE"
            }
        }
    }

  5. With a Lake Formation admin user, create our tables with the following commands:
    aws glue create-table \
        --database-name entities \
        --cli-input-json file://users-table.json
        
    aws glue create-table \
        --database-name entities \
        --cli-input-json file://products-table.json

Set up the Lake Formation data lake locations

To access our tables data in Amazon S3, Lake Formation needs read/write access to them. To achieve that, we have to register Amazon S3 locations where our data resides and specify for them which IAM role to obtain credentials from.

Let’s create our IAM role for the data access.

  1. Create a file called trust-policy-data-access-role.json with the following content:
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "",
                "Effect": "Allow",
                "Principal": {
                    "Service": "lakeformation.amazonaws.com"
                },
                "Action": "sts:AssumeRole"
            }
        ]
    }

  2. Use the policy to create the IAM role emr-demo-lf-data-access-role:
    aws iam create-role \
    --role-name emr-demo-lf-data-access-role \
    --assume-role-policy-document file://trust-policy-data-access-role.json

  3. Create the file data-access-role-policy.json with the following content (substitute emr-steps-roles-new-us-east-1 with your bucket name):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:*"
                ],
                "Resource": [
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/entities-database",
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1/entities-database/*"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::emr-steps-roles-new-us-east-1"
                ]
            }
        ]
    }

  4. Create our IAM policy:
    aws iam create-policy \
    --policy-name data-access-role-policy \
    --policy-document file://data-access-role-policy.json

  5. Assign to our emr-demo-lf-data-access-role the created policy (replace 123456789012 with your AWS account ID):
    aws iam attach-role-policy \
    --role-name emr-demo-lf-data-access-role \
    --policy-arn "arn:aws:iam::123456789012:policy/data-access-role-policy"

    We can now register our data location in Lake Formation.

  6. On the Lake Formation console, choose Data lake locations in the navigation pane.
  7. Here we can register our S3 location containing data for our two tables and choose the created emr-demo-lf-data-access-role IAM role, which has read/write access to that location.

For more details about adding an Amazon S3 location to your data lake and configuring your IAM data access roles, refer to Adding an Amazon S3 location to your data lake.

Enforce Lake Formation permissions

To be sure we’re using Lake Formation permissions, we should confirm that we don’t have any grants set up for the principal IAMAllowedPrincipals. The IAMAllowedPrincipals group includes any IAM users and roles that are allowed access to your Data Catalog resources by your IAM policies, and it’s used to maintain backward compatibility with AWS Glue.

To confirm Lake Formations permissions are enforced, navigate to the Lake Formation console and choose Data lake permissions in the navigation pane. Filter permissions by “Database”:“entities” and remove all the permissions given to the principal IAMAllowedPrincipals.

For more details on IAMAllowedPrincipals and backward compatibility with AWS Glue, refer to Changing the default security settings for your data lake.

Configure AWS Glue and Lake Formation grants for IAM runtime roles

To allow our IAM runtime roles to properly interact with Lake Formation, we should provide them the lakeformation:GetDataAccess and glue:Get* grants.

Lake Formation permissions control access to Data Catalog resources, Amazon S3 locations, and the underlying data at those locations. IAM permissions control access to the Lake Formation and AWS Glue APIs and resources. Therefore, although you might have the Lake Formation permission to access a table in the Data Catalog (SELECT), your operation fails if you don’t have the IAM permission on the glue:Get* API.

For more details about Lake Formation access control, refer to Lake Formation access control overview.

  1. Create the emr-runtime-roles-lake-formation-policy.json file with the following content:
    {
        "Version": "2012-10-17",
        "Statement": {
            "Sid": "LakeFormationManagedAccess",
            "Effect": "Allow",
            "Action": [
                "lakeformation:GetDataAccess",
                "glue:Get*",
                "glue:Create*",
                "glue:Update*"
            ],
            "Resource": "*"
        }
    }

  2. Create the related IAM policy:
    aws iam create-policy \
    --policy-name emr-runtime-roles-lake-formation-policy \
    --policy-document file://emr-runtime-roles-lake-formation-policy.json

  3. Assign this policy to both IAM runtime roles (replace 123456789012 with your AWS account ID):
    aws iam attach-role-policy \
    --role-name test-emr-demo1 \
    --policy-arn "arn:aws:iam::123456789012:policy/emr-runtime-roles-lake-formation-policy"
    
    aws iam attach-role-policy \
    --role-name test-emr-demo2 \
    --policy-arn "arn:aws:iam::123456789012:policy/emr-runtime-roles-lake-formation-policy"

Set up Lake Formation permissions

We now set up the permission in Lake Formation for the two runtime roles.

  1. Create the file users-grants-test-emr-demo1.json with the following content to grant SELECT access to all columns in the entities.users table to test-emr-demo1:
    {
        "Principal": {
            "DataLakePrincipalIdentifier": "arn:aws:iam::123456789012:role/test-emr-demo1"
        },
        "Resource": {
            "Table": {
                "DatabaseName": "entities",
                "Name": "users"
            }
        },
        "Permissions": [
            "SELECT"
        ]
    }

  2. Create the file users-grants-test-emr-demo2.json with the following content to grant SELECT access to the uid and state columns in the entities.users table to test-emr-demo2:
    {
        "Principal": {
            "DataLakePrincipalIdentifier": "arn:aws:iam::123456789012:role/test-emr-demo2"
        },
        "Resource": {
            "TableWithColumns": {
                "DatabaseName": "entities",
                "Name": "users",
                "ColumnNames": ["uid", "state"]
            }
        },
        "Permissions": [
            "SELECT"
        ]
    }

  3. Create the file products-grants-test-emr-demo2.json with the following content to grant SELECT access to all columns in the entities.products table to test-emr-demo2:
    {
        "Principal": {
            "DataLakePrincipalIdentifier": "arn:aws:iam::123456789012:role/test-emr-demo2"
        },
        "Resource": {
            "Table": {
                "DatabaseName": "entities",
                "Name": "products"
            }
        },
        "Permissions": [
            "SELECT"
        ]
    }

  4. Let’s set up our permissions in Lake Formation:
    aws lakeformation grant-permissions \
    --cli-input-json file://users-grants-test-emr-demo1.json
    
    aws lakeformation grant-permissions \
    --cli-input-json file://users-grants-test-emr-demo2.json
    
    aws lakeformation grant-permissions \
    --cli-input-json file://products-grants-test-emr-demo2.json

  5. Check the permissions we defined on the Lake Formation console on the Data lake permissions page by filtering by “Database”:“entities”.

Test Lake Formation permissions with runtime roles

For our test, we use a PySpark application called test-lake-formation.py with the following content:


from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("Pyspark - TEST IAM RBAC with LF").enableHiveSupport().getOrCreate()

try:
    print("== select * from entities.users limit 3 ==\n")
    spark.sql("select * from entities.users limit 3").show()
except Exception as e:
    print(e)

try:
    print("== select * from entities.products limit 3 ==\n")
    spark.sql("select * from entities.products limit 3").show()
except Exception as e:
    print(e)

spark.stop()

In the script, we’re trying to access the tables users and products. Let’s upload our Spark application in the same S3 bucket that we used earlier:

#Change this with your bucket name
BUCKET_NAME="emr-steps-roles-new-us-east-1"

aws s3 cp test-lake-formation.py s3://${BUCKET_NAME}/scripts/

We’re now ready to perform our test. We run the test-lake-formation.py script first using the test-emr-demo1 role and then using the test-emr-demo2 role as the runtime roles.

Let’s submit a step specifying test-emr-demo1 as the runtime role:

#Change with your EMR cluster ID
CLUSTER_ID=j-XXXXXXXXXXXXX
#Change with your AWS Account ID
ACCOUNT_ID=123456789012
#Change with your Bucket name
BUCKET_NAME=emr-steps-roles-new-us-east-1

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
            "Type": "CUSTOM_JAR",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Name": "Spark Lake Formation Example",
            "Args": [
              "spark-submit",
              "s3://'"${BUCKET_NAME}"'/scripts/test-lake-formation.py"
            ]
        }]' \
--execution-role-arn arn:aws:iam::${ACCOUNT_ID}:role/test-emr-demo1

The following is the output of the EMR step with test-emr-demo1 as the runtime role:

== select * from entities.users limit 3 ==

+--------+-----+-------+-------+------+--------------------+
|     uid| name|surname|  state|  city|             address|
+--------+-----+-------+-------+------+--------------------+
|00005678| john|   pike|england|london|      Hidden Road 78|
|00009039|paolo|  rossi|  italy| milan|Via degli Alberi 56A|
|00009057| july|   finn|germany|berlin|       Green Road 90|
+--------+-----+-------+-------+------+--------------------+

== select * from entities.products limit 3 ==

Insufficient Lake Formation permission(s) on products (...)

As we can see, our application was only able to access the users table.

Submit the same script again as a new EMR step, but this time with the role test-emr-demo2 as the runtime role:

#Change with your EMR cluster ID
CLUSTER_ID=j-XXXXXXXXXXXXX
#Change with your AWS Account ID
ACCOUNT_ID=123456789012
#Change with your Bucket name
BUCKET_NAME=emr-steps-roles-new-us-east-1

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
            "Type": "CUSTOM_JAR",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Name": "Spark Lake Formation Example",
            "Args": [
              "spark-submit",
              "s3://'"${BUCKET_NAME}"'/scripts/test-lake-formation.py"
            ]
        }]' \
--execution-role-arn arn:aws:iam::${ACCOUNT_ID}:role/test-emr-demo2

The following is the output of the EMR step with test-emr-demo2 as the runtime role:

== select * from entities.users limit 3 ==

+--------+-------+
|     uid|  state|
+--------+-------+
|00005678|england|
|00009039|  italy|
|00009057|germany|
+--------+-------+

== select * from entities.products limit 3 ==

+----------+---------------+----------+
|product_id|           name|  category|
+----------+---------------+----------+
|  P0000789|       Bike2000|     Sport|
|  P0000567|   CoverToCover|Smartphone|
|  P0005677|Whiteboard X786|      Home|
+----------+---------------+----------+

As we can see, our application was able to access a subset of columns for the users table and all the columns for the products table.

We can conclude that the permissions while accessing the Data Catalog are being enforced based on the runtime role used with the EMR step.

Audit using the source identity

The source identity is a mechanism to monitor and control actions taken with assumed roles. The Propagate source identity feature similarly allows you to monitor and control actions taken using runtime roles by the jobs submitted with EMR steps.

We already configured EMR_EC2_defaultRole with "sts:SetSourceIdentity" on our two runtime roles. Also, both runtime roles let EMR_EC2_DefaultRole to SetSourceIdentity in their trust policy. So we’re ready to proceed.

We now see the Propagate source identity feature in action with a simple example.

Configure the IAM role that is assumed to submit the EMR steps

We configure the IAM role job-submitter-1, which is assumed specifying the source identity and which is used to submit the EMR steps. In this example, we allow the IAM user paul to assume this role and set the source identity. Please note you can use any IAM principal here.

  1. Create a file called trust-policy-2.json with the following content (replace 123456789012 with your AWS account ID):
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::123456789012:user/paul"
                },
                "Action": "sts:AssumeRole"
            },
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::123456789012:user/paul"
                },
                "Action": "sts:SetSourceIdentity"
            }
        ]
    }

  2. Use it as the trust policy to create the IAM role job-submitter-1:
    aws iam create-role \
    --role-name job-submitter-1 \
    --assume-role-policy-document file://trust-policy-2.json

    We use now the same emr-runtime-roles-submitter-policy policy we defined before to allow the role to submit EMR steps using the test-emr-demo1 and test-emr-demo2 runtime roles.

  3. Assign this policy to the IAM role job-submitter-1 (replace 123456789012 with your AWS account ID):
    aws iam attach-role-policy \
    --role-name job-submitter-1 \
    --policy-arn "arn:aws:iam::123456789012:policy/emr-runtime-roles-submitter-policy"

Test the source identity with AWS CloudTrail

To show how propagation of source identity works with Amazon EMR, we generate a role session with the source identity test-ad-user.

With the IAM user paul (or with the IAM principal you configured), we first perform the impersonation (replace 123456789012 with your AWS account ID):

aws sts assume-role \
--role-arn arn:aws:iam::123456789012:role/job-submitter-1 \
--role-session-name demotest \
--source-identity test-ad-user

The following code is the output received:

{
"Credentials": {
    "SecretAccessKey": "<SECRET_ACCESS_KEY>",
    "SessionToken": "<SESSION_TOKEN>",
    "Expiration": "<EXPIRATION_TIME>",
    "AccessKeyId": "<ACCESS_KEY_ID>"
},
"AssumedRoleUser": {
    "AssumedRoleId": "AROAUVT2HQ3......:demotest",
    "Arn": "arn:aws:sts::123456789012:assumed-role/test-emr-role/demotest"
},
"SourceIdentity": "test-ad-user"
}

We use the temporary AWS security credentials of the role session, to submit an EMR step along with the runtime role test-emr-demo1:

export AWS_ACCESS_KEY_ID="<ACCESS_KEY_ID>"
export AWS_SECRET_ACCESS_KEY="<SECRET_ACCESS_KEY>"
export AWS_SESSION_TOKEN="<SESSION_TOKEN>" 

#Change with your EMR cluster ID
CLUSTER_ID=j-XXXXXXXXXXXXX
#Change with your AWS Account ID
ACCOUNT_ID=123456789012
#Change with your Bucket name
BUCKET_NAME=emr-steps-roles-new-us-east-1

aws emr add-steps \
--cluster-id $CLUSTER_ID \
--steps '[{
            "Type": "CUSTOM_JAR",
            "ActionOnFailure": "CONTINUE",
            "Jar": "command-runner.jar",
            "Name": "Spark Lake Formation Example",
            "Args": [
              "spark-submit",
              "s3://'"${BUCKET_NAME}"'/scripts/test-lake-formation.py"
            ]
        }]' \
--execution-role-arn arn:aws:iam::${ACCOUNT_ID}:role/test-emr-demo1

In a few minutes, we can see events appearing in the AWS CloudTrail log file. We can see all the AWS APIs that the jobs invoked using the runtime role. In the following snippet, we can see that the step performed the sts:AssumeRole and lakeformation:GetDataAccess actions. It’s worth noting how the source identity test-ad-user has been preserved in the events.

Clean up

You can now delete the EMR cluster you created.

  1. On the Amazon EMR console, choose Clusters in the navigation pane.
  2. Select the cluster iam-passthrough-cluster, then choose Terminate.
  3. Choose Terminate again to confirm.

Alternatively, you can delete the cluster by using the Amazon EMR CLI with the following command (replace the EMR cluster ID with the one returned by the previously run aws emr create-cluster command):

aws emr terminate-clusters --cluster-ids j-3KVXXXXXXX7UG

Conclusion

In this post, we discussed how you can control data access on Amazon EMR on EC2 clusters by using runtime roles with EMR steps. We discussed how the feature works, how you can use Lake Formation to apply fine-grained access controls, and how to monitor and control actions using a source identity. To learn more about this feature, refer to Configure runtime roles for Amazon EMR steps.


About the authors

Stefano Sandona is an Analytics Specialist Solution Architect with AWS. He loves data, distributed systems and security. He helps customers around the world architecting their data platforms. He has a strong focus on Amazon EMR and all the security aspects around it.

Sharad Kala is a senior engineer at AWS working with the EMR team. He focuses on the security aspects of the applications running on EMR. He has a keen interest in working and learning about distributed systems.

Get started with Apache Hudi using AWS Glue by implementing key design concepts – Part 1

Post Syndicated from Amit Maindola original https://aws.amazon.com/blogs/big-data/part-1-get-started-with-apache-hudi-using-aws-glue-by-implementing-key-design-concepts/

Many organizations build data lakes on Amazon Simple Storage Service (Amazon S3) using a modern architecture for a scalable and cost-effective solution. Open-source storage formats like Parquet and Avro are commonly used, and data is stored in these formats as immutable files. As the data lake is expanded to additional use cases, there are still some use cases that are very difficult with data lakes, such as CDC (change data capture), time travel (querying point-in-time data), privacy regulation requiring deletion of data, concurrent writes, and consistency regarding handling small file problems.

Apache Hudi is an open-source transactional data lake framework that greatly simplifies incremental data processing and streaming data ingestion. However, organizations new to data lakes may struggle to adopt Apache Hudi due to unfamiliarity with the technology and lack of internal expertise.

In this post, we show how to get started with Apache Hudi, focusing on the Hudi CoW (Copy on Write) table type on AWS using AWS Glue, and implementing key design concepts for different use cases. We expect readers to have a basic understanding of data lakes, AWS Glue, and Amazon S3. We walk you through common batch data ingestion use cases with actual test results using a TPC-DS dataset to show how the design decisions can influence the outcome.

Apache Hudi key concepts

Before diving deep into the design concepts, let’s review the key concepts of Apache Hudi, which is important to understand before you make design decisions.

Hudi table and query types

Hudi supports two table types: Copy on Write (CoW) and Merge on Read (MoR). You have to choose the table type in advance, which influences the performance of read and write operations.

The difference in performance depends on the volume of data, operations, file size, and other factors. For more information, refer to Table & Query Types.

When you use the CoW table type, committed data is implicitly compacted, meaning it’s updated to columnar file format during write operation. With the MoR table type, data isn’t compacted with every commit. As a result, for the MoR table type, compacted data lives in columnar storage (Parquet) and deltas are stored in a log (Avro) raw format until compaction merges changes the data to columnar file format. Hudi supports snapshot, incremental, and read-optimized queries for Hudi tables, and the output of the result depends on the query type.

Indexing

Indexing is another key concept for the design. Hudi provides efficient upserts and deletes with fast indexing for both CoW and MoR tables. For CoW tables, indexing enables fast upsert and delete operations by avoiding the need to join against the entire dataset to determine which files to rewrite. For MoR, this design allows Hudi to bound the amount of records any given base file needs to be merged against. Specifically, a given base file needs to be merged only against updates for records that are part of that base file. In contrast, designs without an indexing component could end up having to merge all the base files against all incoming update and delete records.

Solution overview

The following diagram describes the high-level architecture for our solution. We ingest the TPC-DS (store_sales) dataset from the source S3 bucket in CSV format and write it to the target S3 bucket using AWS Glue in Hudi format. We can query the Hudi tables on Amazon S3 using Amazon Athena and AWS Glue Studio Notebooks.

The following diagram illustrates the relationships between our tables.

For our post, we use the following tables from the TPC-DS dataset: one fact table, store_sales, and the dimension tables store, item, and date_dim. The following table summarizes the table row counts.

Table Approximate Row Counts
store_sales 2.8 billion
store 1,000
item 300,000
date_dim 73,000

Set up the environment

After you sign in to your test AWS account, launch the provided AWS CloudFormation template by choosing Launch Stack:

Launch Button

This template configures the following resources:

  • AWS Glue jobs hudi_bulk_insert, hudi_upsert_cow, and hudi_bulk_insert_dim. We use these jobs for the use cases covered in this post.
  • An S3 bucket to store the output of the AWS Glue job runs.
  • AWS Identity and Access Management (IAM) roles and policies with appropriate permissions.

Before you run the AWS Glue jobs, you need to subscribe to the AWS Glue Apache Hudi Connector (latest version: 0.10.1). The connector is available on AWS Marketplace. Follow the connector installation and activation process from the AWS Marketplace link, or refer to Process Apache Hudi, Delta Lake, Apache Iceberg datasets at scale, part 1: AWS Glue Studio Notebook to set it up.

After you create the Hudi connection, add the connector name to all the AWS Glue scripts under Advanced properties.

Bulk insert job

To run the bulk insert job, choose the job hudi_bulk_insert on the AWS Glue console.

The job parameters as shown in the following screenshot are added as part of the CloudFormation stack setup. You can use different values to create CoW partitioned tables with different bulk insert options.

The parameters are as follows:

  • HUDI_DB_NAME – The database in the AWS Glue Data Catalog where the catalog table is created.
  • HUDI_INIT_SORT_OPTION – The options for bulk_insert include GLOBAL_SORT, which is the default. Other options include NONE and PARTITION_SORT.
  • HUDI_TABLE_NAME – The table name prefix that you want to use to identify the table created. In the code, we append the sort option to the name you specify in this parameter.
  • OUTPUT_BUCKET – The S3 bucket created through the CloudFormation stack where the Hudi table datasets are written. The bucket name format is <account number><bucket name>. The bucket name is the one given while creating the CloudFormation stack.
  • CATEGORY_ID – The default for this parameter is ALL, which processes categories of test data in a single AWS Glue job. To test the parallel on the same table, change the parameter value to one of categories from 3, 5, or 8 for the dataset that we use for each parallel AWS Glue job.

Upsert job for the CoW table

To run the upsert job, choose the job hudi_upsert_cow on the AWS Glue console.

The following job parameters are added as part of the CloudFormation stack setup. You can run upsert and delete operations on CoW partitioned tables with different bulk insert options based on the values provided for these parameters.

  • OUTPUT-BUCKET – The same value as the previous job parameter.
  • HUDI_TABLE_NAME – The name of the table created in your AWS Glue Data Catalog.
  • HUDI_DB_NAME – The same value as the previous job parameter. The default value is Default.

Bulk insert job for the Dimension tables

To test the queries on the CoW tables, the fact table that is created using the bulk insert operation needs supplemental dimensional tables. This AWS Glue job has to be run before you can test the TPC queries provided later in this post. To run this job, choose hudi_bulk_insert_dim on the AWS Glue console and use the parameters shown in the following screenshot.

The parameters are as follows:

  • OUTPUT-BUCKET – The same value as the previous job parameter.
  • HUDI_INIT_SORT_OPTION – The options for bulk_insert include GLOBAL_SORT, which is the default. Other available options are NONE and PARTITION_SORT.
  • HUDI_DB_NAME – The Hudi database name. Default is the default value.

Hudi design considerations

In this section, we walk you through a few use cases to demonstrate the difference in the outcome for different settings and operations.

Data migration use case

In Apache Hudi, you ingest the data into CoW or MoR tables types using either insert, upsert, or bulk insert operations. Data migration initiatives often involve one-time initial loads into the target datastore, and we recommend using the bulk insert operation for initial loads.

The bulk insert option provides the same semantics as insert, while implementing a sort-based data writing algorithm, which can scale very well for several hundred TBs of initial load. However, this just does a best-effort job at sizing files vs. guaranteeing file sizes like inserts and upserts do. Also, the primary keys aren’t sorted during the insert, therefore it’s not advised to use insert during the initial data load. By default, a Bloom index is created for the table, which enables faster lookups for upsert and delete operations.

Bulk insert has the following three sort options, which have different outcomes.

  • GLOAL_SORT – Sorts the record key for the entire dataset before writing.
  • PARTITION_SORT – Applies only to partitioned tables. In this option, the record key is sorted within each partition, and the insert time is faster than the default sort.
  • NONE – Doesn’t sort data before writing.

For testing the bulk insert with the three sort options, we use the following AWS Glue job configuration, which is part of the script hudi_bulk_insert:

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 200
  • Input file: TPC-DS/2.13/1TB/store_sales
  • Input file format: CSV (TPC-DS)
  • Number of input files: 1,431
  • Number of rows in the input dataset: Approximately 2.8 billion

The following charts illustrate the behavior of the bulk insert operations with GLOBAL_SORT, PARTITION_SORT, and NONE as sort options for a CoW table. The statistics in the charts are created by using an average of 10 bulk insert operation runs for each sort option.

Because bulk insert does a best-effort job to pack the data in files, you see a different number of files created with different sort options.

We can observe the following:

  • Bulk insert with GLOBAL_SORT has the least number of files, because Hudi tried to create the optimal sized files. However, it takes the most time.
  • Bulk insert with NONE as the sort option has the fastest write time, but resulted in a greater number of files.
  • Bulk insert with PARTITION_SORT also has a faster write time compared to GLOBAL SORT, but also results in a greater number of files.

Based on these results, although GLOBAL_SORT takes more time to ingest the data, it creates a smaller number of files, which has better upsert and read performance.

The following diagrams illustrate the Spark run plans for the bulk_insert operation using various sort options.

The first shows the Spark run plan for bulk_insert when the sort option is PARTITION_SORT.

The next is the Spark run plan for bulk_insert when the sort option is NONE.

The last is the Spark run plan for bulk_insert when the sort option is GLOBAL_SORT.

The Spark run plan for bulk_insert with GLOBAL_SORT involves shuffling of data to create optimal sized files. For the other two sort options, data shuffling isn’t involved. As a result, bulk_insert with GLOBAL_SORT takes more time compared to the other sort options.

To test the bulk insert with various bulk insert sort data options on a partitioned table, modify the Hudi AWS Glue job (hudi_bulk_insert) parameter --HUDI_INIT_SORT_OPTION.

We change the parameter --HUDI_INIT_SORT_OPTION to PARTITION_SORT or NONE to test the bulk insert with different data sort options. You need to run the job hudi_bulk_insert_dim, which loads the rest of the tables needed to test the SQL queries.

Now, look at the query performance difference between these three options. For query runtime, we ran two TPC-DS queries (q52.sql and q53.sql, as shown in the following query snippets) using interactive session with AWS Glue Studio Notebook with the following notebook configuration to compare the results.

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 50

Before executing the following queries, replace the table names in the queries with the tables you generate in your account.
q52

SELECT
  dt.d_year,
  item.i_brand_id brand_id,
  item.i_brand brand,
  sum(ss_ext_sales_price) ext_price
FROM date_dim dt, store_sales, item
WHERE dt.d_date_sk = store_sales.ss_sold_date_sk
  AND store_sales.ss_item_sk = item.i_item_sk
  AND item.i_manager_id = 1
  AND dt.d_moy = 11
  AND dt.d_year = 2000
GROUP BY dt.d_year, item.i_brand, item.i_brand_id
ORDER BY dt.d_year, ext_price DESC, brand_id
LIMIT 100
SELECT *
FROM
  (SELECT
    i_manufact_id,
    sum(ss_sales_price) sum_sales,
    avg(sum(ss_sales_price))
    OVER (PARTITION BY i_manufact_id) avg_quarterly_sales
  FROM item, store_sales, date_dim, store
  WHERE ss_item_sk = i_item_sk AND
    ss_sold_date_sk = d_date_sk AND
    ss_store_sk = s_store_sk AND
    d_month_seq IN (1200, 1200 + 1, 1200 + 2, 1200 + 3, 1200 + 4, 1200 + 5, 1200 + 6,
                          1200 + 7, 1200 + 8, 1200 + 9, 1200 + 10, 1200 + 11) AND
    ((i_category IN ('Books', 'Children', 'Electronics') AND

As you can see in the following chart, the performance of the GLOBAL_SORT table outperforms NONE and PARTITION_SORT due to a smaller number of files created in the bulk insert operation.

Ongoing replication use case

For ongoing replication, updates and deletes usually come from transactional databases. As you saw in the previous section, the bulk operation with GLOBAL_SORT took the most time and the operation with NONE took the least time. When you anticipate a higher volume of updates and deletes on an ongoing basis, the sort option is critical for your write performance.

To illustrate the ongoing replication using Apache Hudi upsert and delete operations, we tested using the following configuration:

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 100

To test the upsert and delete operations, we use the store_sales CoW table, which was created using the bulk insert operation in the previous section with all three sort options. We make the following changes:

  • Insert data into a new partition (month 1 and year 2004) using the existing data from month 1 of year 2002 with a new primary key; total of 32,164,890 records
  • Update the ss_list_price column by $1 for the existing partition (month 1 and year 2003); total of 5,997,571 records
  • Delete month 5 data for year 2001; total of 26,997,957 records

The following chart illustrates the runtimes for the upsert operation for the CoW table with different sort options used during the bulk insert.

As you can see from the test run, the runtime of the upsert is higher for NONE and PARTITION_SORT CoW tables. The Bloom index, which is created by default during the bulk insert operation, enables faster lookup for upsert and delete operations.

To test the upsert and delete operations on a CoW table for tables with different data sort options, modify the AWS Glue job (hudi_upsert_cow) parameter HUDI_TABLE_NAME to the desired table, as shown in the following screenshot.

For workloads where updates are performed on the most recent partitions, a Bloom index works fine. For workloads where the update volume is less but the updates are spread across partitions, a simple index is more efficient. You can specify the index type while creating the Hudi table by using the parameter hoodie.index.type. Both the Bloom index and simple index enforce uniqueness of table keys within a partition. If you need uniqueness of keys for the entire table, you must create a global Bloom index or global simple index based on the update workloads.

Multi-tenant partitioned design use case

In this section, we cover Hudi optimistic concurrency using a multi-tenant table design, where each tenant data is stored in a separate table partition. In a real-world scenario, you may encounter a business need to process different tenant data simultaneously, such as a strict SLA to make the data available for downstream consumption as quickly as possible. Without Hudi optimistic concurrency, you can’t have concurrent writes to the same Hudi table. In such a scenario, you can speed up the data writes using Hudi optimistic concurrency when each job operates on a different table dataset. In our multi-tenant table design using Hudi optimistic concurrency, you can run concurrent jobs, where each job writes data to a separate table partition.

For AWS Glue, you can implement Hudi optimistic concurrency using an Amazon DynamoDB lock provider, which was introduced with Apache Hudi 0.10.0. The initial bulk insert script has all the configurations needed to allow multiple writes. The role being used for AWS Glue needs to have DynamoDB permissions added to make it work. For more information about concurrency control and alternatives for lock providers, refer to Concurrency Control.

To simulate concurrent writes, we presume your tenant is based on the category field from the TPC DC test dataset and accordingly partitioned based on the category id field (i_category_id). Let’s modify the script hudi_bulk_insert to run an initial load for different categories. You need to configure your AWS Glue job to run concurrently based on the Maximum concurrency parameter, located under the advanced properties. We describe the Hudi configuration parameters that are needed in the appendix at the end of this post.

The TPC-DS dataset includes data from years 1998–2003. We use i_catagory_id as the tenant ID. The following screenshot shows the distribution of data for multiple tenants (i_category_id). In our testing, we load the data for i_category_id values 3, 5, and 8.

The AWS Glue job hudi_bulk_insert is designed to insert data into specific partitions based on the parameter CATEGORY_ID. If bulk insert job for dimension tables is not run before you need to run the job hudi_bulk_insert_dim, which loads the rest of the tables needed to test the SQL queries.

Now we run three concurrent jobs, each with respective values 3, 5, and 8 to simulate concurrent writes for multiple tenants. The following screenshot illustrates the AWS Glue job parameter to modify for CATEGORY_ID.

We used the following AWS Glue job configuration for each of the three parallel AWS Glue jobs:

  • AWS Glue version: 3.0
  • AWS Glue worker type: G1.X
  • Number of AWS Glue workers: 100
  • Input file: TPC-DS/2.13/1TB/store_sales
  • Input file format: CSV (TPC-DS)

The following screenshot shows all three concurrent jobs started around the same time for three categories, which loaded 867 million rows (50.1 GB of data) into the store_sales table. We used the GLOBAL_SORT option for all three concurrent AWS Glue jobs.

The following screenshot shows the data from the Hudi table where all three concurrent writers inserted data into different partitions, which is illustrated by different colors. All the AWS Glue jobs were run in US Central Time zone (UTC -5). The _hoodie_commit_time is in UTC.

The first two results highlighted in blue corresponds to the AWS Glue job CATEGORY_ID = 3, which had the start time of 09/27/2022 21:23:39 US CST (09/28/2022 02:23:39 UTC).

The next two results highlighted in green correspond to the AWS Glue job CATEGORY_ID = 8, which had the start time of 09/27/2022 21:23:50 US CST (09/28/2022 02:23:50 UTC).

The last two results highlighted in green correspond to the AWS Glue job CATEGORY_ID = 5, which had the start time of 09/27/2022 21:23:44 US CST (09/28/2022 02:23:44 UTC).

The sample data from the Hudi table has _hoodie_commit_time values corresponding to the AWS Glue job run times.

As you can see, we were able to load data into multiple partitions of the same Hudi table concurrently using Hudi optimistic concurrency.

Key findings

As the results show, bulk_insert with GLOBAL_SORT scales well for loading TBs of data in the initial load process. This option is recommended for use cases that require frequent changes after a large migration. Also, when query performance is critical in your use case, we recommend the GLOBAL_SORT option because of the smaller number of files being created with this option.

PARTITION_SORT has better performance for data load compared to GLOBAL_SORT, but it generates a significantly larger number of files, which negatively impacts query performance. You can use this option when the query involves a lot of joins between partitioned tables on record key columns.

The NONE option doesn’t sort the data, but it’s useful when you need the fastest initial load time and requires minimal updates, with the added capability of supporting record changes.

Clean up

When you’re done with this exercise, complete the following steps to delete your resources and stop incurring costs:

  1. On the Amazon S3 console, empty the buckets created by the CloudFormation stack.
  2. On the CloudFormation console, select your stack and choose Delete.

This cleans up all the resources created by the stack.

Conclusion

In this post, we covered some of the Hudi concepts that are important for design decisions. We used AWS Glue and the TPC-DS dataset to collect the results of different use cases for comparison. You can learn from the use cases covered in this post to make the key design decisions, particularly when you’re at the early stage of Apache Hudi adoption. You can go through the steps in this post to start a proof of concept using AWS Glue and Apache Hudi.

References

Appendix

The following table summarizes the Hudi configuration parameters that are needed.

Configuration Value Description Required
hoodie.write.
concurrency.mode
optimistic_concurrency_control Property to turn on optimistic concurrency control. Yes
hoodie.cleaner.
policy.failed.writes
LAZY Property to turn on optimistic concurrency control. Yes
hoodie.write.
lock.provider
org.apache.
hudi.client.
transaction.lock.
DynamoDBBasedLockProvider
Lock provider implementation to use. Yes
hoodie.write.
lock.dynamodb.table
<String> The DynamoDB table name to use for acquiring locks. If the table doesn’t exist, it will be created. You can use the same table across all your Hudi jobs operating on the same or different tables. Yes
hoodie.write.
lock.dynamodb.partition_key
<String> The string value to be used for the locks table partition key attribute. It must be a string that uniquely identifies a Hudi table, such as the Hudi table name. Yes: ‘tablename’
hoodie.write.
lock.dynamodb.region
<String> The AWS Region in which the DynamoDB locks table exists, or must be created. Yes:
Default: us-east-1
hoodie.write.
lock.dynamodb.billing_mode
<String> The DynamoDB billing mode to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect. Yes: Default
PAY_PER_REQUEST
hoodie.write.
lock.dynamodb.endpoint_url
<String> The DynamoDB URL for the Region where you’re creating the table. Yes: dynamodb.us-east-1.amazonaws.com
hoodie.write.
lock.dynamodb.read_capacity
<Integer> The DynamoDB read capacity to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect. No: Default 20
hoodie.write.
lock.dynamodb.
write_capacity
<Integer> The DynamoDB write capacity to be used for the locks table while creating. If the table already exists, then this doesn’t have an effect. No: Default 10

About the Authors

About the author Amit MaindolaAmit Maindola is a Data Architect focused on big data and analytics at Amazon Web Services. He helps customers in their digital transformation journey and enables them to build highly scalable, robust, and secure cloud-based analytical solutions on AWS to gain timely insights and make critical business decisions.

About the author Srinivas KandiSrinivas Kandi is a Data Architect with focus on data lake and analytics at Amazon Web Services. He helps customers to deploy data analytics solutions in AWS to enable them with prescriptive and predictive analytics.

About the author Amit MaindolaMitesh Patel is a Principal Solutions Architect at AWS. His main area of depth is application and data modernization. He helps customers to build scalable, secure and cost effective solutions in AWS.

Build incremental crawls of data lakes with existing Glue catalog tables

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/build-incremental-crawls-of-data-lakes-with-existing-glue-catalog-tables/

AWS Glue includes crawlers, a capability that make discovering datasets simpler by scanning data in Amazon Simple Storage Service (Amazon S3) and relational databases, extracting their schema, and automatically populating the AWS Glue Data Catalog, which keeps the metadata current. This reduces the time to insight by making newly ingested data quickly available for analysis with your preferred analytics and machine learning (ML) tools.

Previously, you could reduce crawler cost by using Amazon S3 Event Notifications to incrementally crawl changes on Data Catalog tables created by crawler. Today, we’re extending this support to crawling and updating Data Catalog tables that are created by non-crawler methods, such as using data pipelines. This crawler feature can be useful for several use cases, such as following:

  • You currently have a data pipeline to create AWS Glue Data Catalog tables and want to offload detection of partition information from the data pipeline to a scheduled crawler
  • You have an S3 bucket with event notifications enabled and want to continuously catalog new changes and prevent creation of new tables in case of ill-formatted files that break the partition detection
  • You have manually created Data Catalog tables and want to run incremental crawls on new file additions instead of running full crawls due to long crawl times

To accomplish incremental crawling, you can configure Amazon S3 Event Notifications to be sent to an Amazon Simple Queue Service (Amazon SQS) queue. You can then use the SQS queue as a source to identify changes and can schedule or run an AWS Glue crawler with Data Catalog tables as a target. With each run of the crawler, the SQS queue is inspected for new events. If no new events are found, the crawler stops. If events are found in the queue, the crawler inspects their respective folders, processes through built-in classifiers (for CSV, JSON, AVRO, XML, and so on), and determines the changes. The crawler then updates the Data Catalog with new information, such as newly added or deleted partitions or columns. This feature reduces the cost and time to crawl large and frequently changing Amazon S3 data.

This post shows how to create an AWS Glue crawler that supports Amazon S3 event notification on existing Data Catalog tables using the new crawler UI and an AWS CloudFormation template.

Overview of solution

To demonstrate how the new AWS Glue crawler performs incremental updates, we use the Toronto parking tickets dataset—specifically data about parking tickets issued in the city of Toronto between 2019–2020. The goal is to create a manual dataset as well as its associated metadata tables in AWS Glue, followed by an event-based crawler that detects and implements changes to the manually created datasets and catalogs.

As mentioned before, instead of crawling all the subfolders on Amazon S3, we use an Amazon S3 event-based approach. This helps improve the crawl time by using Amazon S3 events to identify the changes between two crawls by listing all the files from the subfolder that triggered the event instead of listing the full Amazon S3 target. To accomplish this, we create an S3 bucket, an event-based crawler, an Amazon Simple Storage Service (Amazon SNS) topic, and an SQS queue. The following diagram illustrates our solution architecture.

Prerequisites

For this walkthrough, you should have the following prerequisites:

If the AWS account you use to follow this post uses Lake Formation to manage permissions on the AWS Glue Data Catalog, make sure that you log in as a user with access to create databases and tables. For more information, refer to Implicit Lake Formation permissions.

Launch your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your CloudFormation stack in us-east-1:
  2. For Stack name, enter a name for your stack .
  3. For paramBucketName, enter a name for your S3 bucket (with your account number).
  4. Choose Next.
  5. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  6. Choose Create stack.

Wait for the stack formation to finish provisioning the requisite resources. When you see the CREATE_COMPLETE status, you can proceed to the next steps.

Additionally, note down the ARN of the SQS queue to use at a later point.

Query your Data Catalog

Next, we use Amazon Athena to confirm that the manual tables have been created in the Data Catalog, as part of the CloudFormation template.

  1. On the Athena console, choose Launch query editor.
  2. For Data source, choose AwsDataCatalog.
  3. For Database, choose torontoparking.

    The tickets table should appear in the Tables section.

    Now you can query the table to see its contents.
  4. You can write your own query, or choose Preview Table on the options menu.

    This writes a simple SQL query to show us the first 10 rows.
  5. Choose Run to run the query.

As we can see in the query results, the database and table for 2019 parking ticket data have been created and partitioned.

Create the Amazon S3 event crawler

The next step is to create the crawler that detects and crawls only on incrementally updated tables.

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. For Name, enter a name.
  4. Choose Next.

    Now we need to select the data source for the crawler.
  5. Select Yes to indicate that our data is already mapped to our AWS Glue Data Catalog.
  6. Choose Add tables.
  7. For Database, choose torontoparking and for Tables, choose tickets.
  8. Select Crawl based on events.
  9. For Include SQS ARN, enter the ARN you saved from the CloudFormation stack outputs.
  10. Choose Confirm.

    You should now see the table populated under Glue tables, with the parameter set as Recrawl by event.
  11. Choose Next.
  12. For Existing IAM role, choose the IAM role created by the CloudFormation template (GlueCrawlerTableRole).
  13. Choose Next.
  14. For Frequency, choose On demand.

    You also have the option of choosing a schedule on which the crawler will run regularly.
  15. Choose Next.
  16. Review the configurations and choose Create crawler.

    Now that the crawler has been created, we add the 2020 ticketing data to our S3 bucket so that we can test our new crawler. For this step, we use the AWS Command Line Interface (AWS CLI)
  17. To add this data, use the following command:
    aws s3 cp s3://aws-bigdata-blog/artifacts/gluenewcrawlerui2/source/year=2020/Parking_Tags_Data_2020.000.csv s3://glue-table-crawler-blog-<YOURACCOUNTNUMBER>/year=2020/Parking_Tags_Data_2020.000.csv

After successful completion of this command, your S3 bucket should contain the 2020 ticketing data and your crawler is ready to run. The terminal should return the following:

copy: s3://aws-bigdata-blog/artifacts/gluenewcrawlerui2/source/year=2020/Parking_Tags_Data_2020.000.csv to s3://glue-table-crawler-blog-<YOURACCOUNTNUMBER>/year=2020/Parking_Tags_Data_2020.000.csvRun the crawler and verify the updates

Run the crawler and verify the updates

Now that the new folder has been created, we run the crawler to detect the changes in the table and partitions.

  1. Navigate to your crawler on the AWS Glue console and choose Run crawler.

    After running the crawler, you should see that it added the 2020 data to the tickets table.
  2. On the Athena console, we can ensure that the Data Catalog has been updated by adding a where year = 2020 filter to the query.

AWS CLI option

You can also create the crawler using the AWS CLI. For more information, refer to create-crawler.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the CloudFormation stack, S3 bucket, AWS Glue crawler, AWS Glue database, and AWS Glue table.

Conclusion

You can use AWS Glue crawlers to discover datasets, extract schema information, and populate the AWS Glue Data Catalog. In this post, we provided a CloudFormation template to set up AWS Glue crawlers to use Amazon S3 event notifications on existing Data Catalog tables, which reduces the time and cost needed to incrementally process table data updates in the Data Catalog.

With this feature, incremental crawling can now be offloaded from data pipelines to the scheduled AWS Glue crawler, reducing cost. This alleviates the need for full crawls, thereby reducing crawl times and Data Processing Units (DPUs) required to run the crawler. This is especially useful for customers that have S3 buckets with event notifications enabled and want to continuously catalog new changes.

To learn more about this feature, refer to Accelerating crawls using Amazon S3 event notifications.

Special thanks to everyone who contributed to this crawler feature launch: Theo Xu, Jessica Cheng, Arvin Mohanty, and Joseph Barlan.


About the authors

Leonardo Gómez is a Senior Analytics Specialist Solutions Architect at AWS. Based in Toronto, Canada, he has over a decade of experience in data management, helping customers around the globe address their business and technical needs.

Aayzed Tanweer is a Solutions Architect working with startup customers in the FinTech space, with a special focus on analytics services. Originally hailing from Toronto, he recently moved to New York City, where he enjoys eating his way through the city and exploring its many peculiar nooks and crannies.

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Code versioning using AWS Glue Studio and GitHub

Post Syndicated from Leonardo Gomez original https://aws.amazon.com/blogs/big-data/code-versioning-using-aws-glue-studio-and-github/

AWS Glue now offers integration with Git, an open-source version control system widely used across the developer community. Thanks to this integration, you can incorporate your existing DevOps practices on AWS Glue jobs. AWS Glue is a serverless data integration service that helps you create jobs based on Apache Spark or Python to perform extract, transform, and load (ETL) tasks on datasets of almost any size.

Git integration in AWS Glue works for all AWS Glue job types, both visual and code-based. It offers built-in integration with both GitHub and AWS CodeCommit, and makes it easier to use automation tools like Jenkins and AWS CodeDeploy to deploy AWS Glue jobs. AWS Glue Studio’s visual editor now also supports parameterizing data sources and targets for transparent deployments between environments.

Overview of solution

To demonstrate how to integrate AWS Glue Studio with a code hosting platform for version control and collaboration, we use the Toronto parking tickets dataset, specifically the data about parking tickets issued in the city of Toronto in 2019. The goal is to create a job to filter parking tickets based on a specific category and push the code to a GitHub repo for version control. After the job is uploaded on the repository, we make some changes to the code and pull the changes back to the AWS Glue job.

Prerequisites

For this walkthrough, you should have the following prerequisites:

If the AWS account you use to follow this post uses AWS Lake Formation to manage permissions on the AWS Glue Data Catalog, make sure that you log in as a user with access to create databases and tables. For more information, refer to Implicit Lake Formation permissions.

Launch your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your CloudFormation stack in us-east-1:
  2. Under Parameters, for paramBucketName, enter a name for your S3 bucket (include your account number).
  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.
  5. Wait until the creation of the stack is complete, as shown on the AWS CloudFormation console.

Launching this stack creates AWS resources. You need the following resources from the Outputs tab for the next steps:

  • CFNGlueRole – The IAM role to run AWS Glue jobs
  • S3Bucket – The name of the S3 bucket to store solution-related files
  • CFNDatabaseBlog – The AWS Glue database to store the table related to this post
  • CFNTableTickets – The AWS Glue table to use as part of the sample job

Configure the GitHub repository

We use GitHub as the source control system for this post. In order to use it, you need a GitHub account. After the account is created, you need to create following components:

  • GitHub repository – Create a repository and name it glue-ver-log. For instructions, refer to Create a repo.
  • Branch – Create a branch and name it develop. For instructions, refer to Managing branches.
  • Personal access token – For instructions, refer to Creating a personal access token. Make sure to keep the personal access token handy because you use it in later steps.

Create an AWS Glue Studio job

Now that the infrastructure is set up, let’s author an AWS Glue job in our account. Complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Select Visual job with blank canvas and choose Create.
  3. Enter a name for the job using the title editor. For example, aws-glue-git-demo-job.
  4. On the Visual tab, choose Source and then choose AWS Glue Data Catalog

  5. For Database, choose torontoparking and for Table, choose tickets.
  6. Choose Transform and then Filter.
  7. Add a filter by infraction_description and set the value to PARK ON PRIVATE PROPERTY.
  8. Choose Target and then choose Amazon S3.
  9. For Format, choose Parquet.
  10. For S3 Target Location, enter s3://glue-version-blog-YOUR ACOUNT NUMBER/output/.
  11. For Data Catalog update options, select Do not update the Data Catalog.
  12. Go to the Script tab to verify that a script has been generated.
  13. Go to the Job Details tab to make sure that the role GlueBlogRole is selected and leave everything else with the default values.

    Because the catalog table names in the production and development environment may be different, AWS Glue Studio now allows you to parameterize visual jobs. To do so, perform the following steps:
  14. On the Job details tab, scroll to the Job parameters section under Advanced properties.
  15. Create the --source.database.name parameter and set the value to torontoparking.
  16. Create the --souce.table.name parameter and set the value to tickets.
  17. Go to the Visual tab and choose the AWS Glue Data Catalog node.Notice that under each of the database and table selection options is a new expandable section called Use runtime parameters.
  18. The run time parameters are auto populated with the parameters previously created. Clicking on the Apply button will apply the default values for these parameters.
  19. Go to the Script tab to review the script.AWS Glue Studio code generation automatically picks up the parameters to resolve and then makes the appropriate references in the script so that the parameters can be used.
    Now the job is ready to be pushed into the develop branch of our version control system.
  20. On the Version Control tab, for Version control system, choose Github.
  21. For Personal access token, enter your GitHub token.
  22. For Repository owner, enter the owner of your GitHub account.
  23. In the Repository configuration section, for Repository, choose glue-ver-blog.
  24. For Branch, choose develop.
  25. For Folder, leave it blank.
  26. Choose Save to save the job.

Push to the repository

Now the job can be pushed to the remote repository.

  1. On the Actions menu, choose Push to repository.
  2. Choose Confirm to confirm the operation.

    After the operation succeeds, the page reloads to reflect the latest information from the version control system. A notification shows the latest available commit and links you to the commit on GitHub.
  3. Choose the commit link to go to the repository on GitHub.

You have successfully created your first commit to GitHub from AWS Glue Studio!

Pull from the repository

Now that we have committed the AWS Glue job to GitHub, it’s time to see how we can pull changes using AWS Glue Studio. For this demo, we make a small modification in our example job using the GitHub UI and then pull the changes using AWS Glue Studio.

  1. On GitHub, choose the develop branch.
  2. Choose the aws-glue-git-demo-job folder.
  3. Choose the aws-glue-git-demo-job.json file.
  4. Choose the edit icon.
  5. Set the MaxRetries parameter to 1.
  6. Choose Commit changes.
  7. Return to the AWS Glue console and on the Actions menu, choose Pull from repository.
  8. Choose Confirm.

Notice that the commit ID has changed.

On the Job details tab, you can see that the value for Number of retries is 1.

Clean up

To avoid incurring future charges, and to clean up unused roles and policies, delete the resources you created: the datasets, CloudFormation stack, S3 bucket, AWS Glue job, AWS Glue database, and AWS Glue table.

Conclusion

This post showed how to integrate AWS Glue with GitHub, but this is only the beginning—now you can use the most popular functionalities offered by Git.

To learn more and get started using the AWS Glue Studio Git integration, refer to Configuring Git integration in AWS Glue.


About the authors

Leonardo Gómez is a Senior Analytics Specialist Solutions Architect at AWS. Based in Toronto, Canada, he has over a decade of experience in data management, helping customers around the globe address their business and technical needs.

Daiyan Alamgir is a Principal Frontend Engineer on AWS Glue based in New York. He leads the AWS Glue UI team and is focused on building interactive web-based applications for data analysts and engineers to address their data integration use cases.

Upgrade to Athena engine version 3 to increase query performance and access more analytics features

Post Syndicated from Blayze Stefaniak original https://aws.amazon.com/blogs/big-data/upgrade-to-athena-engine-version-3-to-increase-query-performance-and-access-more-analytics-features/

Customers tell us they want to have stronger performance and lower costs for their data analytics applications and workloads. Customers also want to use AWS as a platform that hosts managed versions of their favorite open-source projects, which will frequently adopt the latest features from the open-source communities. With Amazon Athena engine version 3, we continue to increase performance, provide new features and now deliver better currency with the Trino and Presto projects.

Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run. Customers such as Orca Security, the Agentless Cloud Security Platform, are already realizing the benefits of using Athena engine version 3 with the Apache Iceberg.

“At Orca Security, we are excited about the launch of Athena engine version 3,” says Arie Teter, VP R&D at Orca Security. “With Athena engine version 3, we will be able to query our massive petabyte-scale data lake more efficiently and at a lower cost. We are especially excited about being able to leverage all the latest Trino features with Athena’s new engine in order to deliver our customers the best-of-breed, ML-driven anomaly detection solution.”

In this post, we discuss benefits of Athena engine version 3, performance benchmark results for different table formats and information about upgrading to engine version 3.

New features, more often

One of the most exciting aspects of engine version 3 is its new continuous integration approach to open source software management that will improve currency with the Trino and PrestoDB projects. This approach enables Athena to deliver increased performance and new features at an even faster pace.

At AWS, we are committed to bringing the value of open source to our customers and providing contributions to open source communities. The Athena development team is actively contributing bug fixes and security, scalability, performance, and feature enhancements back to these open-source code bases, so anyone using Trino, PrestoDB and Apache Iceberg can benefit from the team’s contributions. For more information on AWS’s commitment to the open-source community, refer to Open source at AWS.

Athena engine version 3 incorporates over 50 new SQL functions, and 30 new features from the open-source Trino project. For example, Athena engine version 3 supports T-Digest functions that can be used to approximate rank-based statistics with high accuracy, new Geospatial functions to run optimized Geospatial queries, and new query syntaxes such as MATCH_RECOGNIZE for identifying data patterns in applications such as fraud detection and sensor data analysis.

Athena engine version 3 also gives you more AWS-specific features. For example, we have worked closely with the AWS Glue data catalog team to improve Athena’s metadata retrieval time, which we explain in the section “Faster query planning with AWS Glue Data Catalog” below.

For more information about what’s new in Athena engine version 3, refer to the Athena engine version 3 Improvements and new features.

Faster runtime, lower cost

Last year, we shared benchmark testing on Athena engine version 2 using TPC-DS benchmark queries at 3 TB scale and observed that query performance improved by three times and cost decreased by 70% as a result of reduced scanned data. These improvements have been a combination of enhancements developed by Athena and AWS engineering teams as well as contributions from the PrestoDB and Trino open-source communities.

The new engine version 3 will allow Athena to continue delivering performance improvements at a rapid pace. We performed benchmark testing on engine version 3 using TPC-DS benchmark queries at 3 TB scale, and observed 20% query performance improvement when compared to the latest release of engine version 2. Athena engine version 3 includes performance improvement across operators, clauses, and decoders: such as performance improvement of joins involving comparisons with the <,<=, >,>= operators, queries that contains JOIN, UNION, UNNEST, GROUP BY clauses, queries using IN predicate with a short list of constant.  Athena engine version 3 also provides query execution improvements that reduce the amount of data scanned which gives you additional performance gains. With Athena, you are charged based on the amount of data scanned by each query, so this also translates to lower costs. For more information, refer to Amazon Athena pricing.

Faster query planning with AWS Glue Data Catalog

Athena engine version 3 provides better integration with AWS Glue Data Catalog to improve query planning performance by up to ten times. Query planning is the process of listing instructions the query engine will follow in order to run a query. During query planning, Athena uses AWS Glue API to retrieve various information such as table and partition metadata, and column statistics. As the number of tables increases, the number of calls to the Glue API for metadata also increase which results in additional query latency. In engine version 3, we reduced this Glue API overhead thus brought down the overall query planning time. For smaller datasets and datasets with large number of tables, you can see the total runtime has been reduced significantly because the query planning time is a higher percentage of the total run time.

Figure 1 below charts the top 10 queries from the TPC-DS benchmark with the most performance improvement from engine version 2 to engine version 3 based on the Amazon CloudWatch metric for total runtime. Each query involves joining multiple tables with complex predicates.

Faster query runtime with Apache Iceberg integration

Athena engine version 3 provides better integration with the Apache Iceberg table format. Features such as Iceberg’s hidden partitioning now augment Athena optimizations such as partition pruning and dynamic filtering to reduce data scanned and improve query performance in Athena engine v3. You do not need to maintain partition columns or even understand the physical table layout to load data to table and achieve good query performance.

We performed TPC-DS benchmark testing by loading data into the Apache Iceberg table format, with hidden partitions configured, and compared the performance between Athena engine version 2 and 3. Figure 2 below is a chart of the top 10 query improvements, which all include complex predicates. The top query, query 52, has five WHERE predicates and two GROUP BY operations. Compared to engine version 2, the query runs thirteen times faster with sixteen times less data scanned on engine version 3.

Upgrading to Athena engine version 3

To use Athena engine version 3, you can create a new workgroup, or configure an existing workgroup, and select the recommended Athena engine version 3. Any Athena workgroup can upgrade from engine version 2 to engine version 3 without interruption in your ability to submit queries. For more information and instructions for changing your Athena engine version, refer to Changing Athena engine versions.

Athena engine version 3 has feature parity with all major features from Athena engine version 2. There are no changes required by you to use features like dynamic partition pruningApache Iceberg and Apache Hudi table formats, AWS Lake Formation governed tables integration, and Athena Federated Query in engine version 3.For more information on Athena features, refer to Amazon Athena features, and the Amazon Athena User Guide.

Athena engine version 3 includes additional improvements to support ANSI SQL compliance. This results in some changes to syntax, data processing, and timestamps that may cause errors when running the same queries in the new engine version. For information about error messages, causes, and suggested solutions, refer to Athena engine version 3 LimitationsBreaking changesData processing changes, and Timestamp changes.

To make sure that your Athena engine version upgrade goes smoothly, we recommend the following practices to facilitate your upgrade process. After you have confirmed your query behavior works as you expect, you can safely upgrade your existing Athena workgroups.

  • Review the Athena engine version 3 Limitations and Breaking changes and update any affected queries.
  • Test in pre-production to validate and qualify your queries against Athena engine version 3 by creating a test workgroup or upgrading an existing pre-production environment. For example, you can create a new test workgroup running engine version 3 to run integration tests from your pre-production or staging environment, and monitor for failures or performance regressions. For information about CloudWatch metrics and dimensions published by Athena, refer to Monitoring Athena queries with CloudWatch metrics.
  • Upgrade each query based on metrics to test your queries against an Athena engine version 3 workgroup. For example, you can create a new workgroup with engine version 3 alongside your existing engine version 2 workgroup. You can send a small percentage of queries to the engine version 3 workgroup, monitor for failures or performance regressions, then increase the number of queries if they’re successful and performant. Repeat until all your queries have been migrated to Athena engine version 3.

With our simplified automatic engine upgrade process, you can configure existing workgroups to be automatically upgraded to engine version 3 without requiring manual review or intervention. The upgrade behavior is as follows:

  • If Query engine version is set to Automatic, your workgroup will remain on engine version 2 pending the automatic upgrade, and Athena will choose when to upgrade the workgroup to engine version 3. Before upgrading a workgroup, we perform a set of validation tests to confirm that its queries perform correctly and efficiently on engine version 3. Because our validation is performed on a best effort basis, we recommend you perform your own validation testing to ensure all queries run as expected.
  • If Query engine version is set to Manual, you will have the ability to select your version. The default choice is set to engine version 3, with the ability to toggle to engine version 2.

Conclusion

This post discussed Athena engine version 3 benefits, performance benchmark results, and how you can start using engine version 3 today with minimal work required. You can get started with Athena engine version 3 by using the Athena Console, the AWS CLI, or the AWS SDK. To learn more about Athena, refer to the Amazon Athena User Guide.

Thanks for reading this post! If you have questions on Athena engine version 3, don’t hesitate to leave a comment in the comments section.


About the authors

Blayze Stefaniak is a Senior Solutions Architect for the Technical Strategist Program supporting Executive Customer Programs in AWS Marketing. He has experience working across industries including healthcare, automotive, and public sector. He is passionate about breaking down complex situations into something practical and actionable. In his spare time, you can find Blayze listening to Star Wars audiobooks, trying to make his dogs laugh, and probably talking on mute.

Daniel Chen is a Senior Product Manager at Amazon Web Services (AWS) Athena. He has experience in Banking and Capital Market of financial service industry and works closely with enterprise customers building data lakes and analytical applications on the AWS platform. In his spare time, he loves playing tennis and ping pong.

Theo Tolv is a Senior Big Data Architect in the Athena team. He’s worked with small and big data for most of his career and often hangs out on Stack Overflow answering questions about Athena.

Jack Ye is a software engineer of the Athena Data Lake and Storage team. He is an Apache Iceberg Committer and PMC member.

Improve federated queries with predicate pushdown in Amazon Athena

Post Syndicated from Rohit Bansal original https://aws.amazon.com/blogs/big-data/improve-federated-queries-with-predicate-pushdown-in-amazon-athena/

In modern data architectures, it’s common to store data in multiple data sources. However, organizations embracing this approach still need insights from their data and require technologies that help them break down data silos. Amazon Athena is an interactive query service that makes it easy to analyze structured, unstructured, and semi-structured data stored in Amazon Simple Storage Service (Amazon S3) in addition to relational, non-relation, object, and custom data sources through its query federation capabilities. Athena is serverless, so there’s no infrastructure to manage, and you only pay for the queries that you run.

Organizations building a modern data architecture want to query data in-place from purpose-built data stores without building complex extract, transform, and load (ETL) pipelines. Athena’s federated query feature allows organizations to achieve this and makes it easy to:

  • Create reports and dashboards from data stored in relational, non-relational, object, and custom data sources
  • Run on-demand analysis on data spread across multiple systems of record using a single tool and single SQL dialect
  • Join multiple data sources together to produce new input features for machine learning model training workflows

However, when querying and joining huge amounts of data from different data stores, it’s important for queries to run quickly, at low cost, and without impacting source systems. Predicate pushdown is supported by many query engines and is a technique that can drastically reduce query processing time by filtering data at the source early in the processing workflow. In this post, you’ll learn how predicate pushdown improves query performance and how you can validate when Athena applies predicate pushdown to federated queries.

Benefits of predicate pushdown

The key benefits of predicate pushdown are as follows:

  • Improved query runtime
  • Reduced network traffic between Athena and the data source
  • Reduced load on the remote data source
  • Reduced cost resulting from reduced data scans

Let’s explore a real-world scenario to understand when predicate pushdown is applied to federated queries in Athena.

Solution overview

Imagine a hypothetical ecommerce company with data stored in

Record counts for these tables are as follows.

Data Store Table Name Number of Records Description
Amazon Redshift Catalog_Sales 4.3 billion Current and historical Sales data fact Table
Amazon Redshift Date_dim 73,000 Date Dimension table
DynamoDB Part 20,000 Realtime Parts and Inventory data
DynamoDB Partsupp 80,000 Realtime Parts and supplier data
Aurora MySQL Supplier 1,000 Latest Supplier transactions
Aurora MySQL Customer 15,000 Latest Customer transactions

Our requirement is to query these sources individually and join the data to track pricing and supplier information and compare recent data with historical data using SQL queries with various filters applied. We’ll use Athena federated queries to query and join data from these sources to meet this requirement.

The following diagram depicts how Athena federated queries use data source connectors run as Lambda functions to query data stored in sources other than Amazon S3.

When a federated query is submitted against a data source, Athena invokes the data source connector to determine how to read the requested table and identify filter predicates in the WHERE clause of the query that can be pushed down to the source. Applicable filters are automatically pushed down by Athena and have the effect of omitting unnecessary rows early in the query processing workflow and improving overall query execution time.

Let’s explore three use cases to demonstrate predicate pushdown for our ecommerce company using each of these services.

Prerequisites

As a prerequisite, review Using Amazon Athena Federated Query to know more about Athena federated queries and how to deploy these data source connectors.

Use case 1: Amazon Redshift

In our first scenario, we run an Athena federated query on Amazon Redshift by joining its Catalog_sales and Date_dim tables. We do this to show the number of sales orders grouped by order date. The following query gets the information required and takes approximately 14 seconds scanning approximately 43 MB of data:

SELECT "d_date" AS Order_date,
     count(1) AS Total_Orders
 FROM "lambda:redshift"."order_schema"."catalog_sales" l,
     "lambda:redshift"."order_schema"."date_dim" d
 WHERE l.cs_sold_date_sk = d_date_sk
     and cs_sold_date_sk between 2450815 and 2450822 --Date keys for first week of Jan 1998
 GROUP BY "d_date"
 order by "d_date" 

Athena pushes the following filters to the source for processing:

  • cs_sold_date_sk between 2450815 and 2450822 for the Catalog_Sales table in Amazon Redshift.
  • d_date_sk between 2450815 and 2450822; because of the join l.cs_sold_date_sk=d_date_sk in the query, the Date_dim table is also filtered at the source, and only filtered data is moved from Amazon Redshift to Athena.

Let’s analyze the query plan by using recently released visual explain tool to confirm the filter predicates are pushed to the data source:

As shown above (only displaying the relevant part of the visual explain plan), because of the predicate pushdown, the Catalog_sales and Date_dim tables have filters applied at the source. Athena processes only the resulting filtered data.

Using the Athena console, we can see query processing details using the recently released query stats to interactively explore processing details with predicate pushdown at the query stage:

Displaying only the relevant query processing stages, Catalog_sales table has approximately 4.3 billion records, and Date_dim has approximately 73,000 records in Amazon Redshift. Only 11 million records from the Catalog_sales (Stage 4) and 8 records from the Date_dim (Stage 5) are passed from source to Athena, because the predicate pushdown pushes query filter conditions to the data sources. This filters out unneeded records at the source, and only brings the required rows to Athena.

Using predicate pushdown resulted in scanning 99.75% less data from Catalog_sales and 99.99% less data from Date_dim. This results in a faster query runtime and lower cost.

Use case 2: Amazon Redshift and Aurora MySQL

In our second use case, we run an Athena federated query on Aurora MySQL and Amazon Redshift data stores. This query joins the Catalog_sales and Date_dim tables in Amazon Redshift with the Customer table in the Aurora MySQL database to get the total number of orders with the total amount spent by each customer for the first week in January 1998 for the market segment of AUTOMOBILE. The following query gets the information required and takes approximately 35 seconds scanning approximately 337 MB of data:

SELECT  cs_bill_customer_sk Customer_id ,"d_date" Order_Date 
 ,count("cs_order_number") Total_Orders ,sum(l.cs_net_paid_inc_ship_tax) AS Total_Amount
 FROM "lambda:mysql".sales.customer c,"lambda:redshift"."order_schema"."catalog_sales" l
 ,"lambda:redshift"."order_schema"."date_dim" d
 WHERE c_mktsegment = 'AUTOMOBILE'
 AND c_custkey = cs_bill_customer_sk
 AND l.cs_sold_date_sk=d_date_sk 
 AND cs_sold_date_sk between 2450815 and 2450822 --Date keys for first week of Jan 1998
 GROUP BY cs_bill_customer_sk,"d_date"  
 ORDER BY cs_bill_customer_sk,"d_date"

Athena pushes the following filters to the data sources for processing:

  • cs_sold_date_sk between 2450815 and 2450822 for the Catalog_Sales table in Amazon Redshift.
  • d_date_sk between 2450815 and 2450822; because of the join l.cs_sold_date_sk=d_date_sk in the query, the Date_dim table is also filtered at the source (Amazon Redshift) and only filtered data is moved from Amazon Redshift to Athena.
  • c_mktsegment = 'AUTOMOBILE' for the Customer table in the Aurora MySQL database.

Now let’s consult the visual explain plan for this query to show the predicate pushdown to the source for processing:

As shown above (only displaying the relevant part of the visual explain plan), because of the predicate pushdown, Catalog_sales and Date_dim have the query filter applied at the source (Amazon Redshift), and the customer table has the market segment AUTOMOBILE filter applied at the source (Aurora MySQL). This brings only the filtered data to Athena.

As before, we can see query processing details using the recently released query stats to interactively explore processing details with predicate pushdown at the query stage:

Displaying only the relevant query processing stages, Catalog_sales has 4.3 billion records, Date_Dim has 73,000 records in Amazon Redshift, and Customer has 15,000 records in Aurora MySQL. Only 11 million records from Catalog_sales (Stage 6), 8 records from Date_dim (Stage 7), and 3,000 records from Customer (Stage 5) are passed from the respective sources to Athena because the predicate pushdown pushes query filter conditions to the data sources. This filters out unneeded records at the source and only brings the required rows to Athena.

Here, predicate pushdown resulted in scanning 99.75% less data from Catalog_sales, 99.99% less data from Date_dim, and 79.91% from Customer. Furthermore, this results in a faster query runtime and reduced cost.

Use case 3: Amazon Redshift, Aurora MySQL, and DynamoDB

For our third use case, we run an Athena federated query on Aurora MySQL, Amazon Redshift, and DynamoDB data stores. This query joins the Part and Partsupp tables in DynamoDB, the Catalog_sales and Date_dim tables in Amazon Redshift, and the Supplier and Customer tables in Aurora MySQL to get the quantities available at each supplier for orders with the highest revenue during the first week of January 1998 for the market segment of AUTOMOBILE and parts manufactured by Manufacturer#1.

The following query gets the information required and takes approximately 33 seconds scanning approximately 428 MB of data in Athena:

SELECT "d_date" Order_Date 
     ,c_mktsegment
     ,"cs_order_number"
     ,l.cs_item_sk Part_Key
     ,p.p_name Part_Name
     ,s.s_name Supplier_Name
     ,ps.ps_availqty Supplier_Avail_Qty
     ,l.cs_quantity Order_Qty
     ,l.cs_net_paid_inc_ship_tax Order_Total
 FROM "lambda:dynamo".default.part p, 
     "lambda:mysql".sales.supplier s, 
     "lambda:redshift"."order_schema"."catalog_sales" l, 
     "lambda:dynamo".default.partsupp ps, 
     "lambda:mysql".sales.customer c,
     "lambda:redshift"."order_schema"."date_dim" d
 WHERE 
     c_custkey = cs_bill_customer_sk
     AND l.cs_sold_date_sk=d_date_sk 
     AND c.c_mktsegment = 'AUTOMOBILE'
     AND cs_sold_date_sk between 2450815 and 2450822 --Date keys for first week of Jan 1998
     AND p.p_partkey=ps.ps_partkey
     AND s.s_suppkey=ps.ps_suppkey
     AND p.p_partkey=l.cs_item_sk
     AND p.p_mfgr='Manufacturer#1'

Athena pushes the following filters to the data sources for processing:

  • cs_sold_date_sk between 2450815 and 2450822 for the Catalog_Sales table in Amazon Redshift.
  • d_date_sk between 2450815 and 2450822; because of the join l.cs_sold_date_sk=d_date_sk in the query, the Date_dim table is also filtered at the source and only filtered data is moved from Amazon Redshift to Athena.
  • c_mktsegment = 'AUTOMOBILE' for the Customer table in the Aurora MySQL database.
  • p.p_mfgr='Manufacturer#1' for the Part table in DynamoDB.

Now let’s run the explain plan for this query to confirm predicates are pushed down to the source for processing:

As shown above (displaying only the relevant part of the plan), because of the predicate pushdown, Catalog_sales and Date_dim have the query filter applied at the source (Amazon Redshift), the Customer table has the market segment AUTOMOBILE filter applied at the source (Aurora MySQL), and the Part table has the part manufactured by Manufacturer#1 filter applied at the source (DynamoDB).

We can analyze query processing details using the recently released query stats to interactively explore processing details with predicate pushdown at the query stage:

Displaying only the relevant processing stages, Catalog_sales has 4.3 billion records, Date_Dim has 73,000 records in Amazon Redshift, Customer has 15,000 records in Aurora MySQL, and Part has 20,000 records in DynamoDB. Only 11 million records from Catalog_sales (Stage 5), 8 records from Date_dim (Stage 9), 3,000 records from Customer (Stage 8), and 4,000 records from Part (Stage 4) are passed from their respective sources to Athena, because the predicate pushdown pushes query filter conditions to the data sources. This filters out unneeded records at the source, and only brings the required rows from the sources to Athena.

Considerations for predicate pushdown

When using Athena to query your data sources, consider the following:

  • Depending on the data source, data source connector, and query complexity, Athena can push filter predicates to the source for processing. The following are some of the sources Athena supports predicate pushdown with:
  • Athena also performs predicate pushdown on data stored in an S3 data lake. And, with predicate pushdown for supported sources, you can join all your data sources in one query and achieve fast query performance.
  • You can use the recently released query stats as well as EXPLAIN and EXPLAIN ANALYZE on your queries to confirm predicates are pushed down to the source.
  • Queries may not have predicates pushed to the source if the query’s WHERE clause uses Athena-specific functions (for example, WHERE log2(col)<10).

Conclusion

In this post, we demonstrated three federated query scenarios on Aurora MySQL, Amazon Redshift, and DynamoDB to show how predicate pushdown improves federated query performance and reduces cost and how you can validate when predicate pushdown occurs. If the federated data source supports parallel scans, then predicate pushdown makes it possible to achieve performance that is close to the performance of Athena queries on data stored in Amazon S3. You can utilize the patterns and recommendations outlined in this post when querying supported data sources to improve overall query performance and minimize data scanned.


About the authors

Rohit Bansal is an Analytics Specialist Solutions Architect at AWS. He has nearly two decades of experience helping customers modernize their data platforms. He is passionate about helping customers build scalable, cost-effective data and analytics solutions in the cloud. In his spare time, he enjoys spending time with his family, travel, and road cycling.

Ruchir Tripathi is a Senior Analytics Solutions Architect aligned to Global Financial Services at AWS. He is passionate about helping enterprises build scalable, performant, and cost-effective solutions in the cloud. Prior to joining AWS, Ruchir worked with major financial institutions and is based out of New York Office.

Ingest streaming data to Apache Hudi tables using AWS Glue and Apache Hudi DeltaStreamer

Post Syndicated from Vishal Pathak original https://aws.amazon.com/blogs/big-data/ingest-streaming-data-to-apache-hudi-tables-using-aws-glue-and-apache-hudi-deltastreamer/

In today’s world with technology modernization, the need for near-real-time streaming use cases has increased exponentially. Many customers are continuously consuming data from different sources, including databases, applications, IoT devices, and sensors. Organizations may need to ingest that streaming data into data lakes built on Amazon Simple Storage Service (Amazon S3). You may also need to achieve analytics and machine learning (ML) use cases in near-real time. To ensure consistent results in those near-real-time streaming use cases, incremental data ingestion and atomicity, consistency, isolation, and durability (ACID) properties on data lakes have been a common ask.

To address such use cases, one approach is to use Apache Hudi and its DeltaStreamer utility. Apache Hudi is an open-source data management framework designed for data lakes. It simplifies incremental data processing by enabling ACID transactions and record-level inserts, updates, and deletes of streaming ingestion on data lakes built on top of Amazon S3. Hudi is integrated with well-known open-source big data analytics frameworks, such as Apache Spark, Apache Hive, Presto, and Trino, as well as with various AWS analytics services like AWS Glue, Amazon EMR, Amazon Athena, and Amazon Redshift. The DeltaStreamer utility provides an easy way to ingest streaming data from sources like Apache Kafka into your data lake.

This post describes how to run the DeltaStreamer utility on AWS Glue to read streaming data from Amazon Managed Streaming for Apache Kafka (Amazon MSK) and ingest the data into S3 data lakes. AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, ML, and application development. With AWS Glue, you can create Spark, Spark Streaming, and Python shell jobs to extract, transform, and load (ETL) data. You can create AWS Glue Spark streaming ETL jobs using either Scala or PySpark that run continuously, consuming data from Amazon MSK, Apache Kafka, and Amazon Kinesis Data Streams and writing it to your target.

Solution overview

To demonstrate the DeltaStreamer utility, we use fictional product data that represents product inventory including product name, category, quantity, and last updated timestamp. Let’s assume we stream the data from data sources to an MSK topic. Now we want to ingest this data coming from the MSK topic into Amazon S3 so that we can run Athena queries to analyze business trends in near-real time.

The following diagram provides the overall architecture of the solution described in this post.

To simulate application traffic, we use Amazon Elastic Compute Cloud (Amazon EC2) to send sample data to an MSK topic. Amazon MSK is a fully managed service that makes it easy to build and run applications that use Apache Kakfa to process streaming data. To consume the streaming data from Amazon MSK, we set up an AWS Glue streaming ETL job that uses the Apache Hudi Connector 0.10.1 for AWS Glue 3.0, with the DeltaStreamer utility to write the ingested data to Amazon S3. The Apache Hudi Connector 0.9.0 for AWS Glue 3.0 also supports the DeltaStreamer utility.

As the data is being ingested, the AWS Glue streaming job writes the data into the Amazon S3 base path. The data in Amazon S3 is cataloged using the AWS Glue Data Catalog. We then use Athena, which is an interactive query service, to query and analyze the data using standard SQL.

Prerequisites

We use an AWS CloudFormation template to provision some resources for our solution. The template requires you to select an EC2 key pair. This key is configured on an EC2 instance that lives in the public subnet. We use this EC2 instance to ingest data to the MSK cluster running in a private subnet. Make sure you have a key in the AWS Region where you deploy the template. If you don’t have one, you can create a new key pair.

Create the Apache Hudi connection

To add the Apache Hudi Connector for AWS Glue, complete the following steps:

  1. On the AWS Glue Studio console, choose Connectors.
  2. Choose Go to AWS Marketplace.
  3. Search for and choose Apache Hudi Connector for AWS Glue.
  4. Choose Continue to Subscribe.
  5. Review the terms and conditions, then choose Accept Terms.

    After you accept the terms, it takes some time to process the request.
    When the subscription is complete, you see the Effective date populated next to the product.
  6. Choose Continue to Configuration.
  7. For Fulfillment option, choose Glue 3.0.
  8. For Software version, choose 0.10.1.
  9. Choose Continue to Launch.
  10. Choose Usage instructions, and then choose Activate the Glue connector from AWS Glue Studio.

    You’re redirected to AWS Glue Studio.
  11. For Name, enter Hudi-Glue-Connector.
  12. Choose Create connection and activate connector.

A message appears that the connection was successfully created. Verify that the connection is visible on the AWS Glue Studio console.

Launch the CloudFormation stack

For this post, we provide a CloudFormation template to create the following resources:

  • VPC, subnets, security groups, and VPC endpoints
  • AWS Identity and Access Management (IAM) roles and policies with required permissions
  • An EC2 instance running in a public subnet within the VPC with Kafka 2.12 installed and with the source data initial load and source data incremental load JSON files
  • An Amazon MSK server running in a private subnet within the VPC
  • An AWS Glue Streaming DeltaStreamer job to consume the incoming data from the Kafka topic and write it to Amazon S3
  • Two S3 buckets: one of the buckets stores code and config files, and other is the target for the AWS Glue streaming DeltaStreamer job

To create the resources, complete the following steps:

  1. Choose Launch Stack:
  2. For Stack name, enter hudi-deltastreamer-glue-blog.
  3. For ClientIPCIDR, enter the IP address of your client that you use to connect to the EC2 instance.
  4. For HudiConnectionName, enter the AWS Glue connection you created earlier (Hudi-Glue-Connector).
  5. For KeyName, choose the name of the EC2 key pair that you created as a prerequisite.
  6. For VpcCIDR, leave as is.
  7. Choose Next.
  8. Choose Next.
  9. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  10. Choose Create stack.

After the CloudFormation template is complete and the resources are created, the Outputs tab shows the following information:

  • HudiDeltastreamerGlueJob – The AWS Glue streaming job name
  • MSKCluster – The MSK cluster ARN
  • PublicIPOfEC2InstanceForTunnel – The public IP of the EC2 instance for tunnel
  • TargetS3Bucket – The S3 bucket name

Create a topic in the MSK cluster

Next, SSH to Amazon EC2 using the key pair you created and run the following commands:

  1. SSH to the EC2 instance as ec2-user:
    ssh -i <KeyName> ec2-user@<PublicIPOfEC2InstanceForTunnel>

    You can get the KeyName value on the Parameters tab and the public IP of the EC2 instance for tunnel on the Outputs tab of the CloudFormation stack.

  2. For the next command, retrieve the bootstrap server endpoint of the MSK cluster by navigating to msk-source-cluster on the Amazon MSK console and choosing View client information.
  3. Run the following command to create the topic in the MSK cluster hudi-deltastream-demo:
    ./kafka_2.12-2.6.2/bin/kafka-topics.sh --create \
    --topic hudi-deltastream-demo \
    --bootstrap-server "<replace text with value under private endpoint on MSK>" \
    --partitions 1 \
    --replication-factor 2 \
    --command-config ./config_file.txt

  4. Ingest the initial data from the deltastreamer_initial_load.json file into the Kafka topic:
    ./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
    --broker-list "<replace text with value under private endpoint on MSK>" \
    --topic hudi-deltastream-demo \
    --producer.config ./config_file.txt < deltastreamer_initial_load.json

The following is the schema of a record ingested into the Kafka topic:

{
  "type":"record",
  "name":"products",
  "fields":[{
     "name": "id",
     "type": "int"
  }, {
     "name": "category",
     "type": "string"
  }, {
     "name": "ts",
     "type": "string"
  },{
     "name": "name",
     "type": "string"
  },{
     "name": "quantity",
     "type": "int"
  }
]}

The schema uses the following parameters:

  • id – The product ID
  • category – The product category
  • ts – The timestamp when the record was inserted or last updated
  • name – The product name
  • quantity – The available quantity of the product in the inventory

The following code gives an example of a record:

{
    "id": 1, 
    "category": "Apparel", 
    "ts": "2022-01-02 10:29:00", 
    "name": "ABC shirt", 
    "quantity": 4
}

Start the AWS Glue streaming job

To start the AWS Glue streaming job, complete the following steps:

  1. On the AWS Glue Studio console, find the job with the value for HudiDeltastreamerGlueJob.
  2. Choose the job to review the script and job details.
  3. On the Job details tab, replace the value of the --KAFKA_BOOTSTRAP_SERVERS key with the Amazon MSK bootstrap server’s private endpoint.
  4. Choose Save to save the job settings.
  5. Choose Run to start the job.

When the AWS Glue streaming job runs, the records from the MSK topic are consumed and written to the target S3 bucket created by AWS CloudFormation. To find the bucket name, check the stack’s Outputs tab for the TargetS3Bucket key value.

The data in Amazon S3 is stored in Parquet file format. In this example, the data written to Amazon S3 isn’t partitioned, but you can enable partitioning by specifying hoodie.datasource.write.partitionpath.field=<column_name> as the partition field and setting hoodie.datasource.write.hive_style_partitioning to True in the Hudi configuration property in the AWS Glue job script.

In this post, we write the data to a non-partitioned table, so we set the following two Hudi configurations:

  • hoodie.datasource.hive_sync.partition_extractor_class is set to org.apache.hudi.hive.NonPartitionedExtractor
  • hoodie.datasource.write.keygenerator.class is set to org.apache.hudi.keygen.NonpartitionedKeyGenerator

DeltaStreamer options and configuration

DeltaStreamer has multiple options available; the following are the options set in the AWS Glue streaming job used in this post:

  • continuous – DeltaStreamer runs in continuous mode running source-fetch.
  • enable-hive-sync – Enables table sync to the Apache Hive Metastore.
  • schemaprovider-class – Defines the class for the schema provider to attach schemas to the input and target table data.
  • source-class – Defines the source class to read data and has many built-in options.
  • source-ordering-field – The field used to break ties between records with the same key in input data. Defaults to ts (the Unix timestamp of record).
  • target-base-path – Defines the path for the target Hudi table.
  • table-type – Indicates the Hudi storage type to use. In this post, it’s set to COPY_ON_WRITE.

The following are some of the important DeltaStreamer configuration properties set in the AWS Glue streaming job:

# Schema provider props (change to absolute path based on your installation)
hoodie.deltastreamer.schemaprovider.source.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=s3://" + args("CONFIG_BUCKET") + "/artifacts/hudi-deltastreamer-glue/config/schema.avsc

# Kafka Source
hoodie.deltastreamer.source.kafka.topic=hudi-deltastream-demo

#Kafka props
bootstrap.servers=args("KAFKA_BOOTSTRAP_SERVERS")
auto.offset.reset=earliest
security.protocol=SSL

The configuration contains the following details:

  • hoodie.deltastreamer.schemaprovider.source.schema.file – The schema of the source record
  • hoodie.deltastreamer.schemaprovider.target.schema.file – The schema for the target record.
  • hoodie.deltastreamer.source.kafka.topic – The source MSK topic name
  • bootstap.servers – The Amazon MSK bootstrap server’s private endpoint
  • auto.offset.reset – The consumer’s behavior when there is no committed position or when an offset is out of range

Hudi configuration

The following are some of the important Hudi configuration options, which enable us to achieve in-place updates for the generated schema:

  • hoodie.datasource.write.recordkey.field – The record key field. This is the unique identifier of a record in Hudi.
  • hoodie.datasource.write.precombine.field – When two records have the same record key value, Apache Hudi picks the one with the largest value for the pre-combined field.
  • hoodie.datasource.write.operation – The operation on the Hudi dataset. Possible values include UPSERT, INSERT, and BULK_INSERT.

AWS Glue Data Catalog table

The AWS Glue job creates a Hudi table in the Data Catalog mapped to the Hudi dataset on Amazon S3. Because the hoodie.datasource.hive_sync.table configuration parameter is set to product_table, the table is visible under the default database in the Data Catalog.

The following screenshot shows the Hudi table column names in the Data Catalog.

Query the data using Athena

With the Hudi datasets available in Amazon S3, you can query the data using Athena. Let’s use the following query:

SELECT * FROM "default"."product_table";

The following screenshot shows the query output. The table product_table has four records from the initial ingestion: two records for the category Apparel, one for Cosmetics, and one for Footwear.

Load incremental data into the Kafka topic

Now suppose that the store sold some quantity of apparel and footwear and added a new product to its inventory, as shown in the following code. The store sold two items of product ID 1 (Apparel) and one item of product ID 3 (Footwear). The store also added the Cosmetics category, with product ID 5.

{"id": 1, "category": "Apparel", "ts": "2022-01-02 10:45:00", "name": "ABC shirt", "quantity": 2}
{"id": 3, "category": "Footwear", "ts": "2022-01-02 10:50:00", "name": "DEF shoes", "quantity": 5}
{"id": 5, "category": "Cosmetics", "ts": "2022-01-02 10:55:00", "name": "JKL Lip gloss", "quantity": 7}

Let’s ingest the incremental data from the deltastreamer_incr_load.json file to the Kafka topic and query the data from Athena:

./kafka_2.12-2.6.2/bin/kafka-console-producer.sh \
--broker-list "<replace text with value under private endpoint on MSK>" \
--topic hudi-deltastream-demo \
--producer.config ./config_file.txt < deltastreamer_incr_load.json

Within a few seconds, you should see a new Parquet file created in the target S3 bucket under the product_table prefix. The following is the screenshot from Athena after the incremental data ingestion showing the latest updates.

Additional considerations

There are some hard-coded Hudi options in the AWS Glue Streaming job scripts. These options are set for the sample table that we created for this post, so update the options based on your workload.

Clean up

To avoid any incurring future charges, delete the CloudFormation stack, which deletes all the underlying resources created by this post, except for the product_table table created in the default database. Manually delete the product_table table under the default database from the Data Catalog.

Conclusion

In this post, we illustrated how you can add the Apache Hudi Connector for AWS Glue and perform streaming ingestion into an S3 data lake using Apache Hudi DeltaStreamer with AWS Glue. You can use the Apache Hudi Connector for AWS Glue to create a serverless streaming pipeline using AWS Glue streaming jobs with the DeltaStreamer utility to ingest data from Kafka. We demonstrated this by reading the latest updated data using Athena in near-real time.

As always, AWS welcomes feedback. If you have any comments or questions on this post, please share them in the comments.


About the authors

Vishal Pathak is a Data Lab Solutions Architect at AWS. Vishal works with customers on their use cases, architects solutions to solve their business problems, and helps them build scalable prototypes. Prior to his journey in AWS, Vishal helped customers implement business intelligence, data warehouse, and data lake projects in the US and Australia.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He enjoys learning different use cases from customers and sharing knowledge about big data technologies with the wider community.

Anand Prakash is a Senior Solutions Architect at AWS Data Lab. Anand focuses on helping customers design and build AI/ML, data analytics, and database solutions to accelerate their path to production.

Manage your Amazon QuickSight datasets more efficiently with the new user interface

Post Syndicated from Arturo Duarte original https://aws.amazon.com/blogs/big-data/manage-your-amazon-quicksight-datasets-more-efficiently-with-the-new-user-interface/

Amazon QuickSight has launched a new user interface for dataset management. Previously, the dataset management experience was a popup dialog modal with limited space, and all functionality was displayed in this one small modal. The new dataset management experience replaces the existing popup dialog with a full-page experience, providing a clearer breakdown of a dataset’s properties.

In this post, we walk through the end-to-end dataset management user experience.

Access the new UI

To get started, choose Datasets in the navigation pane on the QuickSight console, and choose any dataset that you want to manage.

When you choose a dataset, you see the full-page dataset management UI. This new UI is divided into four main tabs: Summary, Refresh, Permissions, and Usage.

Use case overview

Let’s consider a fictional company, AnyCompany. They have used QuickSight for a long time and now have a large number of datasets that have to be managed. Among the datasets they use, they have a combination of Direct Query and SPICE modes. They need a unified view of each dataset, with details related to permissions, refreshes, and usage. Additionally, they need to be able to schedule when they want to refresh the data and have a history of all the successful and failed attempts of these updates.

The Summary tab

As a data analyst at AnyCompany, you need to review details about your datasets. You can find several options by navigating to the Summary tab.

The About section shows if the dataset is stored in SPICE or if it’s using Direct Query. If the dataset is stored in SPICE, you can also get the size of the dataset. If the dataset is using Direct Query, you can choose Set alert schedule to setup a schedule for when alerts on dashboards should be evaluated.

Specify the time zone, if you want to repeat it daily or hourly, and the start time.

To continue exploring the dataset, choose a new dataset that is stored in SPICE. In the Refresh section, you can verify the status of the SPICE dataset and the last successful refresh date.

Under Access Settings, you can see details about how many owners and viewers this dataset has and also the options to enable row-level and column-level security.

To add row-level security to this dataset, choose Set up under Row-level security.

Under User-based rules, select the permissions dataset with rules to restrict access for each user or group.

To apply column-level security, choose Set up under Column-level security.

Select the columns to be restricted and choose Next.

Choose who can access the restricted columns and choose Apply.

In the Sources section on the Summary tab, a list of data sources is displayed to show the ones used in this dataset. In the following example, we can see the sources SaaS Sales 2022.csv and SaaS-Sales-MonthlySummary.

You also need to identify where (analysis, dashboards, or other datasets) the different datasets are being used, to determine if you can eliminate some unused ones.

To verify this, you just have to look at the Usage section (more details are on the Usage tab).

It’s also possible to go to the data prep interface by choosing Edit dataset or duplicate it by opening the drop-down menu.

You can also directly create a new analysis with this dataset or choose Use in dataset to take advantage of dataset as a source capability. When you use this option, any data preparation that the parent dataset contains, such as any joins or calculated fields, is kept. You can add additional preparation to the data in the new, child datasets, such as joining new data and filtering data. You can also set up your own data refresh schedule for the child dataset and track the dashboards and analyses that use it. Some of the advantages are: Central management of datasets, reduction of dataset management, predefined key metrics and flexibility to customize data.

The Refresh tab

At AnyCompany, you also need to refresh the latest data for your datasets. To achieve this, you have two different options.

You can choose Refresh now to manually get the latest records in the dataset.

You can also choose Add new schedule to create a refresh schedule and not worry about running it manually in the future. You can set the time zone, start time, and frequency.

There are two types of scheduled refresh: full refresh and incremental refresh. Full refresh will completely reload the whole dataset, while incremental refresh only updates a specified small portion of your dataset. Using incremental refresh enables you to access the most recent insights much sooner.

In order to setup the incremental refresh, you need to perform the following actions:

  1. Choose Refresh Now.
  2. For Refresh type, choose Incremental refresh.
  3. If this is your first incremental refresh on the dataset, choose Configure.
  4. On the Configure incremental refresh page, do the following:
    1. For Date column, choose a date column that you want to base the look-back window on.
    2. For Window size, enter a number for size, and then choose an amount of time that you want to look back for changes.You can choose to refresh changes to the data that occurred within a specified number of hours, days, or weeks from now. For example, you can choose to refresh changes to the data that occurred within two weeks of the current date.
  5. Choose Submit.

There are two main sections on the Refresh tab: Schedules and History. Under Schedules, you can see details about the scheduled refreshes of the dataset. There is also the option to edit and delete the schedule.

In the History section, you can see details about the past refreshes, such as status, duration, skipped rows, ingested rows, dataset rows, and refresh type.

The Permissions tab

On the Permissions tab, you can manage the settings and permissions for users and groups that access the dataset.

As the dataset owner at AnyCompany, you need to manage access to the datasets and add users and groups. To do so, simply choose Add users & groups.

Choose the specific user or group to provide access to this dataset.

Review the list of users and groups that have access to the dataset as well as the level of permission (viewer or owner). You can also revoke access to the users or groups.

The Usage tab

It’s not always easy for AnyCompany to determine whether or not a dataset is being used by users or in other assets such as analyses or dashboards.

To answer this kind of question, you can easily review the information on the Usage tab. Here you can review the list of analyses and dashboards where the dataset is being used (choose the name of an analysis or dashboard to view the actual asset).

Under the Users column, you can get the details about who is using this analysis or dashboard.

Conclusion

In this post, we introduced the new user interface of the dataset management page on the QuickSight console. This new user interface simplifies the administration and use of datasets by having everything organized and centralized. This will primarily help authors and administrators quickly manage their datasets, while also contributing to a better QuickSight navigation experience. The new user interface is now generally available in all supported QuickSight Regions.

We look forward to your feedback and stories on how you use the new dataset management interface for your business needs.


About the Authors

Arturo Duarte is a Partner Solutions Architect focused on Amazon QuickSight at Amazon Web Services. He works with EMEA APN Partners to help develop their data and analytics practices with enterprise and mission-critical solutions for their end customers.

Emily Zhu is a Senior Product Manager at Amazon QuickSight, AWS’s cloud-native, fully managed SaaS BI service. She leads the development of the QuickSight analytics and query capability. Before joining AWS, she worked in the Amazon Prime Air drone delivery program and the Boeing company as senior strategist for several years. Emily is passionate about the potential of cloud-based BI solutions and looks forward to helping customers advance in their data-driven strategy making.