All posts by Saurabh Bhutyani

Use generative AI with Amazon EMR, Amazon Bedrock, and English SDK for Apache Spark to unlock insights

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/use-generative-ai-with-amazon-emr-amazon-bedrock-and-english-sdk-for-apache-spark-to-unlock-insights/

In this era of big data, organizations worldwide are constantly searching for innovative ways to extract value and insights from their vast datasets. Apache Spark offers the scalability and speed needed to process large amounts of data efficiently.

Amazon EMR is the industry-leading cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning (ML) using open source frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR is the best place to run Apache Spark. You can quickly and effortlessly create managed Spark clusters from the AWS Management Console, AWS Command Line Interface (AWS CLI), or Amazon EMR API. You can also use additional Amazon EMR features, including fast Amazon Simple Storage Service (Amazon S3) connectivity using the Amazon EMR File System (EMRFS), integration with the Amazon EC2 Spot market and the AWS Glue Data Catalog, and EMR Managed Scaling to add or remove instances from your cluster. Amazon EMR Studio is an integrated development environment (IDE) that makes it straightforward for data scientists and data engineers to develop, visualize, and debug data engineering and data science applications written in R, Python, Scala, and PySpark. EMR Studio provides fully managed Jupyter notebooks, and tools like Spark UI and YARN Timeline Service to simplify debugging.

To unlock the potential hidden within the data troves, it’s essential to go beyond traditional analytics. Enter generative AI, a cutting-edge technology that combines ML with creativity to generate human-like text, art, and even code. Amazon Bedrock is the most straightforward way to build and scale generative AI applications with foundation models (FMs). Amazon Bedrock is a fully managed service that makes FMs from Amazon and leading AI companies available through an API, so you can quickly experiment with a variety of FMs in the playground, and use a single API for inference regardless of the models you choose, giving you the flexibility to use FMs from different providers and keep up to date with the latest model versions with minimal code changes.

In this post, we explore how you can supercharge your data analytics with generative AI using Amazon EMR, Amazon Bedrock, and the pyspark-ai library. The pyspark-ai library is an English SDK for Apache Spark. It takes instructions in English language and compiles them into PySpark objects like DataFrames. This makes it straightforward to work with Spark, allowing you to focus on extracting value from your data.

Solution overview

The following diagram illustrates the architecture for using generative AI with Amazon EMR and Amazon Bedrock.

Solution Overview

EMR Studio is a web-based IDE for fully managed Jupyter notebooks that run on EMR clusters. We interact with EMR Studio Workspaces connected to a running EMR cluster and run the notebook provided as part of this post. We use the New York City Taxi data to garner insights into various taxi rides taken by users. We ask the questions in natural language on top of the data loaded in Spark DataFrame. The pyspark-ai library then uses the Amazon Titan Text FM from Amazon Bedrock to create a SQL query based on the natural language question. The pyspark-ai library takes the SQL query, runs it using Spark SQL, and provides results back to the user.

In this solution, you can create and configure the required resources in your AWS account with an AWS CloudFormation template. The template creates the AWS Glue database and tables, S3 bucket, VPC, and other AWS Identity and Access Management (IAM) resources that are used in the solution.

The template is designed to demonstrate how to use EMR Studio with the pyspark-ai package and Amazon Bedrock, and is not intended for production use without modification. Additionally, the template uses the us-east-1 Region and may not work in other Regions without modification. The template creates resources that incur costs while they are in use. Follow the cleanup steps at the end of this post to delete the resources and avoid unnecessary charges.

Prerequisites

Before you launch the CloudFormation stack, ensure you have the following:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS CLI, and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation
  • The Titan Text G1 – Express model is currently in preview, so you need to have preview access to use it as part of this post

Create resources with AWS CloudFormation

The CloudFormation creates the following AWS resources:

  • A VPC stack with private and public subnets to use with EMR Studio, route tables, and NAT gateway.
  • An EMR cluster with Python 3.9 installed. We are using a bootstrap action to install Python 3.9 and other relevant packages like pyspark-ai and Amazon Bedrock dependencies. (For more information, refer to the bootstrap script.)
  • An S3 bucket for the EMR Studio Workspace and notebook storage.
  • IAM roles and policies for EMR Studio setup, Amazon Bedrock access, and running notebooks

To get started, complete the following steps:

  1. Choose Launch Stack:
    Launch Button
  2. Select I acknowledge that this template may create IAM resources.

The CloudFormation stack takes approximately 20–30 minutes to complete. You can monitor its progress on the AWS CloudFormation console. When its status reads CREATE_COMPLETE, your AWS account will have the resources necessary to implement this solution.

Create EMR Studio

Now you can create an EMR Studio and Workspace to work with the notebook code. Complete the following steps:

  1. On the EMR Studio console, choose Create Studio.
  2. Enter the Studio Name as GenAI-EMR-Studio and provide a description.
  3. In the Networking and security section, specify the following:
    • For VPC, choose the VPC you created as part of the CloudFormation stack that you deployed. Get the VPC ID using the CloudFormation outputs for the VPCID key.
    • For Subnets, choose all four subnets.
    • For Security and access, select Custom security group.
    • For Cluster/endpoint security group, choose EMRSparkAI-Cluster-Endpoint-SG.
    • For Workspace security group, choose EMRSparkAI-Workspace-SG.VPC Networking and Security
  4. In the Studio service role section, specify the following:
    • For Authentication, select AWS Identity and Access Management (IAM).
    • For AWS IAM service role, choose EMRSparkAI-StudioServiceRole.
  5. In the Workspace storage section, browse and choose the S3 bucket for storage starting with emr-sparkai-<account-id>.
  6. Choose Create Studio.Create Studio
  7. When the EMR Studio is created, choose the link under Studio Access URL to access the Studio.
  8. When you’re in the Studio, choose Create workspace.
  9. Add emr-genai as the name for the Workspace and choose Create workspace.
  10. When the Workspace is created, choose its name to launch the Workspace (make sure you’ve disabled any pop-up blockers).

Big data analytics using Apache Spark with Amazon EMR and generative AI

Now that we have completed the required setup, we can start performing big data analytics using Apache Spark with Amazon EMR and generative AI.

As a first step, we load a notebook that has the required code and examples to work with the use case. We use NY Taxi dataset, which contains details about taxi rides.

  1. Download the notebook file NYTaxi.ipynb and upload it to your Workspace by choosing the upload icon.
  2. After the notebook is imported, open the notebook and choose PySpark as the kernel.

PySpark AI by default uses OpenAI’s ChatGPT4.0 as the LLM model, but you can also plug in models from Amazon Bedrock, Amazon SageMaker JumpStart, and other third-party models. For this post, we show how to integrate the Amazon Bedrock Titan model for SQL query generation and run it with Apache Spark in Amazon EMR.

  1. To get started with the notebook, you need to associate the Workspace to a compute layer. To do so, choose the Compute icon in the navigation pane and choose the EMR cluster created by the CloudFormation stack.
  2. Configure the Python parameters to use the updated Python 3.9 package with Amazon EMR:
    %%configure -f
    {
    "conf": {
    "spark.executorEnv.PYSPARK_PYTHON": "/usr/local/python3.9.18/bin/python3.9",
    "spark.yarn.appMasterEnv.PYSPARK_PYTHON": "/usr/local/python3.9.18/bin/python3.9"
    }
    }

  3. Import the necessary libraries:
    from pyspark_ai import SparkAI
    from pyspark.sql import SparkSession
    from langchain.chat_models import ChatOpenAI
    from langchain.llms.bedrock import Bedrock
    import boto3
    import os

  4. After the libraries are imported, you can define the LLM model from Amazon Bedrock. In this case, we use amazon.titan-text-express-v1. You need to enter the Region and Amazon Bedrock endpoint URL based on your preview access for the Titan Text G1 – Express model.
    boto3_bedrock = boto3.client('bedrock-runtime', '<region>', endpoint_url='<bedrock endpoint url>')
    llm = Bedrock(
    model_id="amazon.titan-text-express-v1",
    client=boto3_bedrock)

  5. Connect Spark AI to the Amazon Bedrock LLM model for SQL query generation based on questions in natural language:
    #Connecting Spark AI to the Bedrock Titan LLM
    spark_ai = SparkAI(llm = llm, verbose=False)
    spark_ai.activate()

Here, we have initialized Spark AI with verbose=False; you can also set verbose=True to see more details.

Now you can read the NYC Taxi data in a Spark DataFrame and use the power of generative AI in Spark.

  1. For example, you can ask the count of the number of records in the dataset:
    taxi_records.ai.transform("count the number of records in this dataset").show()

We get the following response:

> Entering new AgentExecutor chain...
Thought: I need to count the number of records in the table.
Action: query_validation
Action Input: SELECT count(*) FROM spark_ai_temp_view_ee3325
Observation: OK
Thought: I now know the final answer.
Final Answer: SELECT count(*) FROM spark_ai_temp_view_ee3325
> Finished chain.
+----------+
| count(1)|
+----------+
|2870781820|
+----------+

Spark AI internally uses LangChain and SQL chain, which hide the complexity from end-users working with queries in Spark.

The notebook has a few more example scenarios to explore the power of generative AI with Apache Spark and Amazon EMR.

Clean up

Empty the contents of the S3 bucket emr-sparkai-<account-id>, delete the EMR Studio Workspace created as part of this post, and then delete the CloudFormation stack that you deployed.

Conclusion

This post showed how you can supercharge your big data analytics with the help of Apache Spark with Amazon EMR and Amazon Bedrock. The PySpark AI package allows you to derive meaningful insights from your data. It helps reduce development and analysis time, reducing time to write manual queries and allowing you to focus on your business use case.


About the Authors

Saurabh Bhutyani is a Principal Analytics Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running generative AI use cases, scalable analytics solutions and data mesh architectures using AWS services like Amazon Bedrock, Amazon SageMaker, Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Harsh Vardhan is an AWS Senior Solutions Architect, specializing in analytics. He has over 8 years of experience working in the field of big data and data science. He is passionate about helping customers adopt best practices and discover insights from their data.

Extend your data mesh with Amazon Athena and federated views

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/extend-your-data-mesh-with-amazon-athena-and-federated-views/

Amazon Athena is a serverless, interactive analytics service built on the Trino, PrestoDB, and Apache Spark open-source frameworks. You can use Athena to run SQL queries on petabytes of data stored on Amazon Simple Storage Service (Amazon S3) in widely used formats such as Parquet and open-table formats like Apache Iceberg, Apache Hudi, and Delta Lake. However, Athena also allows you to query data stored in 30 different data sources—in addition to Amazon S3—including relational, non-relational, and object stores running on premises or in other cloud environments.

In Athena, we refer to queries on non-Amazon S3 data sources as federated queries. These queries run on the underlying database, which means you can analyze the data without learning a new query language and without the need for separate extract, transform, and load (ETL) scripts to extract, duplicate, and prepare data for analysis.

Recently, Athena added support for creating and querying views on federated data sources to bring greater flexibility and ease of use to use cases such as interactive analysis and business intelligence reporting. Athena also updated its data connectors with optimizations that improve performance and reduce cost when querying federated data sources. The updated connectors use dynamic filtering and an expanded set of predicate pushdown optimizations to perform more operations in the underlying data source rather than in Athena. As a result, you get faster queries with less data scanned, especially on tables with millions to billions of rows of data.

In this post, we show how to create and query views on federated data sources in a data mesh architecture featuring data producers and consumers.

The term data mesh refers to a data architecture with decentralized data ownership. A data mesh enables domain-oriented teams with the data they need, emphasizes self-service, and promotes the notion of purpose-built data products. In a data mesh, data producers expose datasets to the organization and data consumers subscribe to and consume the data products created by producers. By distributing data ownership to cross-functional teams, a data mesh can foster a culture of collaboration, invention, and agility around data.

Let’s dive into the solution.

Solution overview

For this post, imagine a hypothetical ecommerce company that uses multiple data sources, each playing a different role:

  • In an S3 data lake, ecommerce records are stored in a table named Lineitems
  • Amazon ElastiCache for Redis stores Nations and ActiveOrders data, ensuring ultra-fast reads of operational data by downstream ecommerce systems
  • On Amazon Relational Database Service (Amazon RDS), MySQL is used to store data like email addresses and shipping addresses in the Orders, Customer, and Suppliers tables
  • For flexibility and low-latency reads and writes, an Amazon DynamoDB table holds Part and Partsupp data

We want to query these data sources in a data mesh design. In the following sections, we set up Athena data source connectors for MySQL, DynamoDB, and Redis, and then run queries that perform complex joins across these data sources. The following diagram depicts our data architecture.

Architecture diagram

As you proceed with this solution, note that you will create AWS resources in your account. We have provided you with an AWS CloudFormation template that defines and configures the required resources, including the sample MySQL database, S3 tables, Redis store, and DynamoDB table. The template also creates the AWS Glue database and tables, S3 bucket, Amazon S3 VPC endpoint, AWS Glue VPC endpoint, and other AWS Identity and Access Management (IAM) resources that are used in the solution.

The template is designed to demonstrate how to use federated views in Athena, and is not intended for production use without modification. Additionally, the template uses the us-east-1 Region and will not work in other Regions without modification. The template creates resources that incur costs while they are in use. Follow the cleanup steps at the end of this post to delete the resources and avoid unnecessary charges.

Prerequisites

Before you launch the CloudFormation stack, ensure you have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI), and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation

Create resources with AWS CloudFormation

To get started, complete the following steps:

  1. Choose Launch Stack: Cloudformation Launch Stack
  2. Select I acknowledge that this template may create IAM resources.

The CloudFormation stack takes approximately 20–30 minutes to complete. You can monitor its progress on the AWS CloudFormation console. When status reads CREATE_COMPLETE, your AWS account will have the resources necessary to implement this solution.

Deploy connectors and connect to data sources

With our resources provisioned, we can begin to connect the dots in our data mesh. Let’s start by connecting the data sources created by the CloudFormation stack with Athena.

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. For Data sources, select MySQL, then choose Next.
  4. For Data source name, enter a name, such as mysql. The Athena connector for MySQL is an AWS Lambda function that was created for you by the CloudFormation template.
  5. For Connection details, choose Select or enter a Lambda function.
  6. Choose mysql, then choose Next.
  7. Review the information and choose Create data source.
  8. Return to the Data sources page and choose mysql.
  9. On the connector details page, choose the link under Lambda function to access the Lambda console and inspect the function associated with this connector.
    mysql Data Soruce details
  10. Return to the Athena query editor.
  11. For Data source, choose mysql.
  12. For Database, choose the sales database.
  13. For Tables, you should see a listing of MySQL tables that are ready for you to query.
  14. Repeat these steps to set up the connectors for DynamoDB and Redis.

After all four data sources are configured, we can see the data sources on the Data source drop-down menu. All other databases and tables, like the lineitem table, which is stored on Amazon S3, are defined in the AWS Glue Data Catalog and can be accessed by choosing AwsDataCatalog as the data source.

This image shows AwsDataCatalog is being selected as Data Source

Analyze data with Athena

With our data sources configured, we are ready to start running queries and using federated views in a data mesh architecture. Let’s start by trying to find out how much profit was made on a given line of parts, broken out by supplier nation and year.

For such a query, we need to calculate, for each nation and year, the profit for parts ordered in each year that were filled by a supplier in each nation. Profit is defined as the sum of [(l_extendedprice*(1-l_discount)) - (ps_supplycost * l_quantity)] for all line items describing parts in the specified line.

Answering this question requires querying all four data sources—MySQL, DynamoDB, Redis, and Amazon S3—and is accomplished with the following SQL:

SELECT 
    n_name nation,
	year(CAST(o_orderdate AS date)) as o_year,
	((l_extendedprice * (1 - l_discount)) - (CAST(ps_supplycost AS double) * l_quantity)) as amount
FROM
    awsdatacatalog.data_lake.lineitem,
    dynamo.default.part,
    dynamo.default.partsupp,
    mysql.sales.supplier,
    mysql.sales.orders,
    redis.redis.nation
WHERE 
    ((s_suppkey = l_suppkey)
    AND (ps_suppkey = l_suppkey)
	AND (ps_partkey = l_partkey)
	AND (p_partkey = l_partkey)
	AND (o_orderkey = l_orderkey)
	AND (s_nationkey = CAST(Regexp_extract(_key_, '.*-(.*)', 1) AS int)))

Running this query on the Athena console produces the following result.

Result of above query

This query is fairly complex: it involves multiple joins and requires special knowledge of the correct way to calculate profit metrics that other end-users may not possess.

To simplify the analysis experience for those users, we can hide this complexity behind a view. For more information on using views with federated data sources, see Querying federated views.

Use the following query to create the view in the data_lake database under the AwsDataCatalog data source:

CREATE OR REPLACE VIEW "data_lake"."federated_view" AS
SELECT 
    n_name nation,
	year(CAST(o_orderdate AS date)) as o_year,
	((l_extendedprice * (1 - l_discount)) - (CAST(ps_supplycost AS double) * l_quantity)) as amount
FROM
    awsdatacatalog.data_lake.lineitem,
    dynamo.default.part,
    dynamo.default.partsupp,
    mysql.sales.supplier,
    mysql.sales.orders,
    redis.redis.nation
WHERE 
    ((s_suppkey = l_suppkey)
    AND (ps_suppkey = l_suppkey)
	AND (ps_partkey = l_partkey)
	AND (p_partkey = l_partkey)
	AND (o_orderkey = l_orderkey)
	AND (s_nationkey = CAST(Regexp_extract(_key_, '.*-(.*)', 1) AS int)))

Next, run a simple select query to validate the view was created successfully: SELECT * FROM federated_view limit 10

The result should be similar to our previous query.

With our view in place, we can perform new analyses to answer questions that would be challenging without the view due to the complex query syntax that would be required. For example, we can find the total profit by nation:

SELECT nation, sum(amount) AS total
from federated_view
GROUP BY nation 
ORDER BY nation ASC

Your results should resemble the following screenshot.

Result of above query

As you now see, the federated view makes it simpler for end-users to run queries on this data. Users are free to query a view of the data, defined by a knowledgeable data producer, rather than having to first acquire expertise in each underlying data source. Because Athena federated queries are processed where the data is stored, with this approach, we avoid duplicating data from the source system, saving valuable time and cost.

Use federated views in a multi-user model

So far, we have satisfied one of the principles of a data mesh: we created a data product (federated view) that is decoupled from its originating source and is available for on-demand analysis by consumers.

Next, we take our data mesh a step further by using federated views in a multi-user model. To keep it simple, assume we have one producer account, the account we used to create our four data sources and federated view, and one consumer account. Using the producer account, we give the consumer account permission to query the federated view from the consumer account.

The following figure depicts this setup and our simplified data mesh architecture.

Multi-user model setup

Follow these steps to share the connectors and AWS Glue Data Catalog resources from the producer, which includes our federated view, with the consumer account:

  1. Share the data sources mysql, redis, dynamo, and data_lake with the consumer account. For instructions, refer to Sharing a data source in Account A with Account B. Note that Account A represents the producer and Account B represents the consumer. Make sure you use the same data source names from earlier when sharing data. This is necessary for the federated view to work in a cross-account model.
  2. Next, share the producer account’s AWS Glue Data Catalog with the consumer account by following the steps in Cross-account access to AWS Glue data catalogs. For the data source name, use shared_federated_catalog.
  3. Switch to the consumer account, navigate to the Athena console, and verify that you see federated_view listed under Views in the shared_federated_catalog Data Catalog and data_lake database.
  4. Next, run a sample query on the shared view to see the query results.

Result of sample query

Clean up

To clean up the resources created for this post, complete the following steps:

  1. On the Amazon S3 console, empty the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code. Make sure you run this command on the correct bucket.
    aws s3 rm s3://athena-federation-workshop-<account-id> --recursive
  3. On the AWS CloudFormation console or the AWS CLI, delete the stack athena-federated-view-blog.

Summary

In this post, we demonstrated the functionality of Athena federated views. We created a view spanning four different federated data sources and ran queries against it. We also saw how federated views could be extended to a multi-user data mesh and ran queries from a consumer account.

To take advantage of federated views, ensure you are using Athena engine version 3 and upgrade your data source connectors to the latest version available. For information on how to upgrade a connector, see Updating a data source connector.


About the Authors

Saurabh Bhutyani is a Principal Big Data Specialist Solutions Architect at AWS. He is passionate about new technologies. He joined AWS in 2019 and works with customers to provide architectural guidance for running scalable analytics solutions and data mesh architectures using AWS analytics services like Amazon EMR, Amazon Athena, AWS Glue, AWS Lake Formation, and Amazon DataZone.

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Reduce costs and increase resource utilization of Apache Spark jobs on Kubernetes with Amazon EMR on Amazon EKS

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/reduce-costs-and-increase-resource-utilization-of-apache-spark-jobs-on-kubernetes-with-amazon-emr-on-amazon-eks/

Amazon EMR on Amazon EKS is a deployment option for Amazon EMR that allows you to run Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS). If you run open-source Apache Spark on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and run Apache Spark up to three times faster. If you already use Amazon EMR, you can run Amazon EMR-based Apache Spark applications with other types of applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management.

Earlier this year, we launched support for pod templates in Amazon EMR on Amazon EKS to make it simpler to run Spark jobs on shared Amazon EKS clusters. A pod is a group of one or more containers, with shared storage and network resources, and a specification for how to run the containers. Pod templates are specifications that determine how each pod runs.

When you submit analytics jobs to a virtual cluster on Amazon EMR on EKS, Amazon EKS schedules the pods to execute the jobs. Your Amazon EKS cluster may have multiple node groups and instance types attached to it, and these pods could get scheduled on any of those Amazon Elastic Compute Cloud (Amazon EC2) instances. Organizations today have requirements to have better resource utilization, running jobs on specific instances based on instance type, amount of disk, disk IOPS, and more, and also control costs when jobs are submitted by multiple teams to a virtual cluster on Amazon EMR on EKS.

In this post, we look at support in Amazon EMR on EKS for Spark’s pod template feature and how to use that for resource isolation and controlling costs.

Pod templates have many uses cases:

  • Cost reduction – To reduce costs, you can schedule Spark driver pods to run on EC2 On-Demand Instances while scheduling Spark executor pods to run on EC2 Spot Instances.
  • Resource utilization – To increase resource utilization, you can support multiple teams running their workloads on the same Amazon EKS cluster. Each team gets a designated Amazon EC2 node group to run their workloads on. You can use pod templates to enforce scheduling on the relevant node groups.
  • Logging and monitoring – To improve monitoring, you can run a separate sidecar container to forward logs to your existing monitoring application.
  • Initialization – To run initialization steps, you can run a separate init container that is run before the Spark main container starts. You can have your init container run initialization steps, such as downloading dependencies or generating input data. Then the Spark main container consumes the data.

Prerequisites

To follow along with the walkthrough, ensure that you have the following resources created:

Solution overview

We look at a common use in organizations where multiple teams want to submit jobs and need resource isolation and cost reduction. In this post, we simulate two teams trying to submit jobs to the Amazon EMR on EKS cluster and see how to isolate the resources between them when running jobs. We also look at cost reduction by having the Spark driver run on EC2 On-Demand Instances while using Spark executors to run on EC2 Spot Instances. The following diagram illustrates this architecture.

To implement the solution, we complete the following high-level steps:

  1. Create an Amazon EKS cluster.
  2. Create an Amazon EMR virtual cluster.
  3. Set up IAM roles.
  4. Create pod templates.
  5. Submit Spark jobs.

Create an Amazon EKS cluster

To create your Amazon EKS cluster, complete the following steps:

  1. Create a new file (create-cluster.yaml) with the following contents:
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig

metadata:
  name: blog-eks-cluster
  region: us-east-1

managedNodeGroups:
  # On-Demand nodegroup for job submitter/controller
 - name:controller
   instanceTypes: ["m5.xlarge", "m5a.xlarge","m5d.xlarge"]
   desiredCapacity: 1

  # On-Demand nodegroup for team-1 Spark driver
 - name: team-1-driver
   labels: { team: team-1-spark-driver }
   taints: [{ key: team-1, value: general-purpose, effect: NoSchedule }]
   instanceTypes: ["m5.xlarge", "m5a.xlarge","m5d.xlarge"]
   desiredCapacity: 3

  # Spot nodegroup for team-1 Spark executors
 - name: team-1-executor
   labels: { team: team-1-spark-executor }
   taints: [{ key: team-1, value: general-purpose, effect: NoSchedule }]
   instanceTypes: ["m5.2xlarge", "m5a.2xlarge", "m5d.2xlarge"]
   desiredCapacity: 5
   spot: true

  # On-Demand nodegroup for team-2 Spark driver
 - name: team-2-driver
   labels: { team: team-2-spark-driver }
   taints: [{ key: team-2, value: compute-intensive , effect: NoSchedule }]
   instanceTypes: ["c5.xlarge", "c5a.xlarge", "c5d.xlarge"]
   desiredCapacity: 3

  # Spot nodegroup for team-2 Spark executors
 - name: team-2-executor
   labels: { team: team-2-spark-executor }
   taints: [{ key: team-2, value: compute-intensive , effect: NoSchedule }]
   instanceTypes: ["c5.2xlarge","c5a.2xlarge","c5d.2xlarge"]
   spot: true
   desiredCapacity: 5
  1. Install the AWS CLI.

You can use version 1.18.157 or later, or version 2.0.56 or later. The following command is for Linux OS:

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

For other operating systems, see Installing, updating, and uninstalling the AWS CLI version.

  1. Install eksctl (you must have eksctl 0.34.0 version or later):
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
sudo mv /tmp/eksctl /usr/local/bin
eksctl version
  1. Install kubectl:
curl -o kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.8/2020-09-18/bin/linux/amd64/kubectl
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin
  1. Create the Amazon EKS cluster using the create-cluster.yaml config file created in earlier steps:
eksctl create cluster -f create-cluster.yaml

This step launches an Amazon EKS cluster with five managed node groups: two node groups each for team-1 and team-2, one node group for Spark drivers using EC2 On-Demand capacity, while another one for Spark executors using EC2 Spot capacity.

After the Amazon EKS cluster is created, run the following command to check the node groups:

eksctl get nodegroups --cluster blog-eks-cluster

You should see a response similar to the following screenshot.

Create an Amazon EMR virtual cluster

We launch the EMR virtual cluster in the default namespace:

eksctl create iamidentitymapping \
    --cluster blog-eks-cluster \
    --namespace default \
    --service-name "emr-containers"
aws emr-containers create-virtual-cluster \
--name blog-emr-on-eks-cluster \
--container-provider '{"id": "blog-eks-cluster","type": "EKS","info": {"eksInfo": {"namespace": "default"}} }'

The command creates an EMR virtual cluster in the Amazon EKS default namespace and outputs the virtual cluster ID:

{
    "id": "me9zfn2lbt241wxhxg81gjlxb",
    "name": "blog-emr-on-eks-cluster",
    "arn": "arn:aws:emr-containers:us-east-1:xxxx:/virtualclusters/me9zfn2lbt241wxhxg81gjlxb"
}

Note the ID of the EMR virtual cluster to use to run the jobs.

Set up an IAM role

In this step, we create an Amazon EMR Spark job execution role with the following IAM policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:CreateLogStream",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams",
               "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        }
    ]
} 

Navigate to the IAM console to create the role. Let’s call the role EMR_EKS_Job_Execution_Role. For more information, see Creating IAM roles and Creating IAM Policies.

Set up the trust policy for the role with the following command:

aws emr-containers update-role-trust-policy \
       --cluster-name blog-eks-cluster \
       --namespace default \
       --role-name EMR_EKS_Job_Execution_Role

Enable IAM roles for service accounts (IRSA) on the Amazon EKS cluster:

eksctl utils associate-iam-oidc-provider --cluster blog-eks-cluster --approve

Create pod templates with node selectors and taints

In this step, we create pod templates for the Team-1 Spark driver pods and Spark executor pods, and templates for the Team-2 Spark driver pods and Spark executor pods.

Run the following commands to view the nodes corresponding to team-1 for the label team=team-1-spark-driver:

$ kubectl get nodes --selector team=team-1-spark-driver
NAME                              STATUS   ROLES    AGE    VERSION
ip-192-168-123-197.ec2.internal   Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-25-114.ec2.internal    Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-91-201.ec2.internal    Ready    <none>   107m   v1.20.4-eks-6b7464

Similarly, you can view the nodes corresponding to team-1 for the label team=team-1-spark-executor. You can repeat the same commands to view the nodes corresponding to team-2 by changing the role labels:

$ kubectl get nodes --selector team=team-1-spark-executor
NAME                             STATUS   ROLES    AGE    VERSION
ip-192-168-100-76.ec2.internal   Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-5-196.ec2.internal    Ready    <none>   67m    v1.20.4-eks-6b7464
ip-192-168-52-131.ec2.internal   Ready    <none>   78m    v1.20.4-eks-6b7464
ip-192-168-58-137.ec2.internal   Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-70-68.ec2.internal    Ready    <none>   107m   v1.20.4-eks-6b7464

You can constrain a pod so that it can only run on particular set of nodes. There are several ways to do this and the recommended approaches all use label selectors to facilitate the selection. In some circumstances, you may want to control which node the pod deploys to; for example, to ensure that a pod ends up on a machine with an SSD attached to it, or to co-locate pods from two different services that communicate a lot into the same availability zone.

nodeSelector is the simplest recommended form of node selection constraint. nodeSelector is a field of PodSpec. It specifies a map of key-value pairs. For the pod to be eligible to run on a node, the node must have each of the indicated key-value pairs as labels.

Taints are used to repel pods from specific nodes. Taints and tolerations work together to ensure that pods aren’t scheduled onto inappropriate nodes. One or more taints are applied to a node; this marks that the node shouldn’t accept any pods that don’t tolerate the taints. Amazon EKS supports configuring Kubernetes taints through managed node groups. Taints and tolerations are a flexible way to steer pods away from nodes or evict pods that shouldn’t be running. A few of the use cases are dedicated nodes: If you want to dedicate a set of nodes, such as GPU instances for exclusive use by a particular group of users, you can add a taint to those nodes, and then add a corresponding toleration to their pods.

nodeSelector provides a very simple way to attract pods to nodes with particular labels. Taints on the other hand are used to repel pods from specific nodes. You can apply taints to a team’s node group and use pod templates to apply a corresponding toleration to their workload. This ensures that only the designated team can schedule jobs to their node group. The label, using affinity, directs the application to the team’s designated node group and a toleration enables it to schedule over the taint. During the Amazon EKS cluster creation, we provided taints for each of the managed node groups. We create pod templates to specify both nodeSelector and tolerations to schedule work to a team’s node group.

  1. Create a new file team-1-driver-pod-template.yaml with the following contents:
apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-1-spark-driver
  tolerations:
  - key: "team-1"
    operator: "Equal"
    value: "general-purpose"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-driver

Here, we specify nodeSelector as team: team-1-spark-driver. This makes sure that Spark driver pods are running on nodes created as part of node group team-1-spark-driver, which we created for Team-1. At the same time, we have a toleration for nodes tainted as team-1.

  1. Create a new file team-1-executor-pod-template.yaml with the following contents:
apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-1-spark-executor
  tolerations:
  - key: "team-1"
    operator: "Equal"
    value: "general-purpose"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-executor

Here, we specify nodeSelector as team: team-1-spark-executor. This makes sure that Spark executor pods are running on nodes created as part of node group team-1-spark-executor, which we created for Team-1. At the same time, we have a toleration for nodes tainted as team-1.

  1. Create a new file team-2-driver-pod-template.yaml with the following contents:
apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-2-spark-driver
  tolerations:
  - key: "team-2"
    operator: "Equal"
    value: "compute-intensive"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-driver

Here, we specify nodeSelector as team: team-2-spark-driver. This makes sure that Spark driver pods are running on nodes created as part of node group team-2-spark-driver, which we created for Team-2. At the same time, we have a toleration for nodes tainted as team-2.

  1. Create a new file team-2-executor-pod-template.yaml with the following contents.
apiVersion: v1
kind: Pod
spec:
nodeSelector:
team: team-2-spark-executor
tolerations:
- key: "team-2"
operator: "Equal"
value: "compute-intensive"
effect: "NoSchedule"
containers:
- name: spark-kubernetes-executor

Here, we specify nodeSelector as team: team-2-spark-executor. This makes sure that Spark executor pods are running on nodes created as part of node group team-2-spark-executor, which we created for Team-2. At the same time, we have a toleration for nodes tainted as team-2.

Save the preceding pod template files to your S3 bucket or refer to them using the following links:

  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-driver-template.yaml
  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-executor-template.yaml
  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-driver-template.yaml
  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-executor-template.yaml

Submit Spark jobs

In this step, we submit the Spark jobs and observe the output.

Substitute the values of the EMR virtual cluster ID, EMR_EKS_Job_Execution_Role ARN, and S3_Bucket:

export EMR_EKS_CLUSTER_ID=<<EMR virtual cluster id>>
export EMR_EKS_EXECUTION_ARN=<<EMR_EKS_Job_Execution_Role ARN>>
export S3_BUCKET=<<S3_Bucket>>

Submit the Spark job:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=6 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.driver.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-driver-template.yaml",
          "spark.kubernetes.executor.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-executor-template.yaml"
         }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "blog-emr-eks"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }   
}'

After submitting the job, run the following command to check if the Spark driver and executor pods are created and running:

kubectl get pods

You should see output similar to the following:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
00000002uf8fut3g6o6-m4nfz 3/3 Running 0 25s
spark-00000002uf8fut3g6o6-driver 2/2 Running 0 14s

Let’s check the pods deployed on team-1’s On-Demand Instances:

$ for n in $(kubectl get nodes -l team=team-1-spark-driver --no-headers | cut -d " " -f1); do echo "Pods on instance ${n}:";kubectl get pods -n default --no-headers --field-selector spec.nodeName=${n} ; echo ; done

Pods on instance ip-192-168-27-110.ec2.internal:
No resources found in default namespace.

Pods on instance ip-192-168-59-167.ec2.internal:
spark-00000002uf8fut3g6o6-driver 2/2 Running 0 29s

Pods on instance ip-192-168-73-223.ec2.internal:
No resources found in default namespace.

Let’s check the pods deployed on team-1’s Spot Instances:

$ for n in $(kubectl get nodes -l team=team-1-spark-executor --no-headers | cut -d " " -f1); do echo "Pods on instance ${n}:";kubectl get pods -n default --no-headers --field-selector spec.nodeName=${n} ; echo ; done

Pods on instance ip-192-168-108-90.ec2.internal:
No resources found in default namespace.

Pods on instance ip-192-168-39-31.ec2.internal:
No resources found in default namespace.

Pods on instance ip-192-168-87-75.ec2.internal:
pythonpi-1623711779860-exec-3 0/2 Running 0 27s
pythonpi-1623711779937-exec-4 0/2 Running 0 26s

Pods on instance ip-192-168-88-145.ec2.internal:
pythonpi-1623711779748-exec-2 0/2 Running 0 27s

Pods on instance ip-192-168-92-149.ec2.internal:
pythonpi-1623711779097-exec-1 0/2 Running 0 28s
pythonpi-1623711780071-exec-5 0/2 Running 0 27s

When the executor pods are running, you should see output similar to the following:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
00000002uf8fut3g6o6-m4nfz 3/3 Running 0 56s
pythonpi-1623712009087-exec-1 0/2 Running 0 2s
pythonpi-1623712009603-exec-2 0/2 Running 0 2s
pythonpi-1623712009735-exec-3 0/2 Running 0 2s
pythonpi-1623712009833-exec-4 0/2 Running 0 2s
pythonpi-1623712009945-exec-5 0/2 Running 0 1s
spark-00000002uf8fut3g6o6-driver 2/2 Running 0 45s

To check the status of the jobs on Amazon EMR console, choose the cluster on the Virtual Clusters page. You can also check the Spark History Server by choosing View logs.

When the job is complete, go to Amazon CloudWatch Logs and check the output by choosing the log (/emr-containers/jobs/<<xxx-driver>>/stdout) on the Log groups page. You should see output similar to the following screenshot.

Now submit the Spark job as team-2 and specify the pod template files pointing to team-2’s driver and executor pod specifications and observe where the pods are created:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=6 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.driver.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-driver-template.yaml",
          "spark.kubernetes.executor.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-executor-template.yaml"
         }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "blog-emr-eks"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }   
}'

We can check the status of the job on the Amazon EMR console and also by checking the CloudWatch logs.

Now, let’s run a use case where Team-1 doesn’t specify the correct toleration in the Spark driver’s pod template. We use the following pod template. As per the toleration specification, Team-1 is trying to schedule a Spark driver pod on nodes with label team-1-spark-driver and also wants it to get scheduled over nodes tainted as team-2. Because team-1 doesn’t have any nodes with that specification, we should see an error.

apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-1-spark-driver
  tolerations:
  - key: "team-2"
    operator: "Equal"
    value: "compute-intensive"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-driver

Submit the Spark job using this new pod template:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=6 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.driver.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-driver-template-negative.yaml",
          "spark.kubernetes.executor.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-executor-template.yaml"
         }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "blog-emr-eks"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }   
}'

Run the following command to check the status of the Spark driver pod:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
00000002uott6onmu8p-7t64m 3/3 Running 0 36s
spark-00000002uott6onmu8p-driver 0/2 Pending 0 25s

Let’s describe the driver pod to check the details. You should notice a failed event similar to Warning FailedScheduling 28s (x3 over 31s) default-scheduler 0/17 nodes are available: 8 node(s) had taint {team-1: general-purpose}, that the pod didn't tolerate, 9 node(s) didn't match Pod's node affinity

$ kubectl describe pod <<driver-pod-name>>
Name:           spark-00000002uott6onmu8p-driver
Namespace:      default
......
QoS Class:       Burstable
Node-Selectors:  team=team-1-spark-driver
Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
                 team-2=compute-intensive:NoSchedule
Events:
  Type     Reason            Age                From               Message
  ----     ------            ----               ----               -------
  Warning  FailedScheduling  28s (x3 over 31s)  default-scheduler  0/17 nodes are available: 8 node(s) had taint {team-1: general-purpose}, that the pod didn't tolerate, 9 node(s) didn't match Pod's node affinity..

This shows that if the pod template doesn’t have the right tolerations, the tainted nodes don’t tolerate the pod and don’t schedule over those nodes.

Clean up

Don’t forget to clean up the resources you created to avoid any unnecessary charges.

  1. Delete all the virtual clusters that you created:
#List all the virtual cluster ids
aws emr-containers list-virtual-clusters
#Delete virtual cluster by passing virtual cluster id
aws emr-containers delete-virtual-cluster —id <virtual-cluster-id>
  1. Delete the Amazon EKS cluster:
eksctl delete cluster blog-eks-cluster
  1. Delete the EMR_EKS_Job_Execution_Role role and policies.

Summary

In this post, we saw how to create an Amazon EKS cluster, configure Amazon EKS managed node groups, create an EMR virtual cluster on Amazon EKS, and submit Spark jobs. With pod templates, we saw how to manage resource isolation between various teams when submitting jobs and also learned how to reduce cost by running Spark driver pods on EC2 On-Demand Instances and Spark executor pods on EC2 Spot Instances.

To get started with pod templates, try out the Amazon EMR on EKS workshop or see the following resources:


About the Author

Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation

Migrating data from Google BigQuery to Amazon S3 using AWS Glue custom connectors

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/migrating-data-from-google-bigquery-to-amazon-s3-using-aws-glue-custom-connectors/

In today’s connected world, it’s common to have data sitting in various data sources in a variety of formats. Even though data is a critical component of decision making, for many organizations this data is spread across multiple public clouds. Organizations are looking for tools that make it easy to ingest data from these myriad data sources and be able to customize the data ingestion to meet their needs.

AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. AWS Glue provides all the capabilities needed for data integration and analysis can be done in minutes instead of weeks or months. AWS Glue custom connectors, a new capability in AWS Glue and AWS Glue Studio that makes it easy for you to transfer data from SaaS applications and custom data sources to your data lake in Amazon S3. With just a few clicks, you can search and select connectors from the AWS Marketplace and begin your data preparation workflow in minutes. You can also build custom connectors and share them across teams, and integrate open source Spark connectors and Athena federated query connectors into you data preparation workflows. AWS Glue Connector for Google BigQuery allows migrating data cross-cloud from Google BigQuery to Amazon Simple Storage Service (Amazon S3). AWS Glue Studio is a new graphical interface that makes it easy to create, run, and monitor extract, transform, and load (ETL) jobs in AWS Glue. You can visually compose data transformation workflows and seamlessly run them on AWS Glue’s Apache Spark-based serverless ETL engine.

In this post, we focus on using AWS Glue Studio to query BigQuery tables and save the data into Amazon Simple Storage Service (Amazon S3) in Parquet format, and then query it using Amazon Athena. To query BigQuery tables in AWS Glue, we use the new AWS Glue Connector for Google BigQuery from AWS Marketplace.

Solution Overview:

The following architecture diagram shows how AWS Glue connects to Google BigQuery for data ingestion.

The following architecture diagram shows how AWS Glue connects to Google BigQuery for data ingestion.

Prerequisites

Before getting started, make sure you have the following:

  • An account in Google Cloud, specifically a service account that has permissions to Google BigQuery
  • An AWS Identity and Access Management (IAM) user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI)
    • The IAM user also needs permissions to create an IAM role and policies

Configuring your Google account

We create a secret in AWS Secrets Manager to store the Google service account file contents as a base64-encoded string.

  1. Download the service account credentials JSON file from Google Cloud.

For base64 encoding, you can use one of the online utilities or system commands to do that. For Linux and Mac, you can use base64 <<service_account_json_file>> to print the file contents as a base64-encoded string.

  1. On the Secrets Manager console, choose Store a new secret.
  2. For Secret type, select Other type of secret.
  3. Enter your key as credentials and the value as the base64-encoded string.
  4. Leave the rest of the options at their default.
  5. Choose Next.

Choose Next.

  1. Give a name to the secret bigquery_credentials.
  2. Follow through the rest of the steps to store the secret.

For more information, see Tutorial: Creating and retrieving a secret.

Creating an IAM role for AWS Glue

The next step is to create an IAM role with the necessary permissions for the AWS Glue job. Attach the following AWS managed policies to the role:

Create and attach a policy to allow reading the secret from Secrets Manager and write access to the S3 bucket.

The following sample policy demonstrates the AWS Glue job as part of this post. Always make sure to scope down the policies before using in a production environment. Provide your secret ARN for the bigquery_credentials secret you created earlier and the S3 bucket for saving data from BigQuery:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "GetDescribeSecret",
            "Effect": "Allow",
            "Action": [
                "secretsmanager:GetResourcePolicy",
                "secretsmanager:GetSecretValue",
                "secretsmanager:DescribeSecret",
                "secretsmanager:ListSecretVersionIds"
            ],
            "Resource": "arn:aws:secretsmanager::<<account_id>>:secret:<<your_secret_id>>"
        },
        {
            "Sid": "S3Policy",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketLocation",
                "s3:ListBucket",
                "s3:GetBucketAcl",
                "s3:GetObject",
                "s3:PutObject",
                "s3:DeleteObject"
            ],
            "Resource": [
                "arn:aws:s3:::<<your_s3_bucket>>",
                "arn:aws:s3:::<<your_s3_bucket>>/*"
            ]
        }
    ]
}

Subscribing to the Glue Connector for BigQuery

To subscribe to the connector, complete the following steps:

  1. Navigate to the AWS Glue Connector for Google BigQuery on AWS Marketplace.
  2. Choose Continue to Subscribe.

Choose Continue to Subscribe.

  1. Review the terms and conditions, pricing, and other details.
  2. Choose Continue to Configuration.
  3. For Delivery Method, choose your delivery method.
  4. For Software Version, choose your software version.
  5. Choose Continue to Launch.

7. Choose Continue to Launch.

  1. Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio.

Under Usage instructions, choose Activate the Glue connector in AWS Glue Studio.

You’re redirected to AWS Glue Studio.

  1. For Name, enter a name for your connection (for example, bigquery).

For Name, enter a name for your connection (for example, bigquery).

  1. Optionally, choose a VPC, subnet, and security group.
  2. For AWS Secret, choose bigquery_credentials.
  3. Choose Create connection.

A message appears that the connection was successfully created, and the connection is now visible on the AWS Glue Studio console.

Creating the ETL job in AWS Glue Studio

  1. On Glue Studio, choose Jobs.
  2. For Source, choose BigQuery.
  3. For Target, choose S3.
  4. Choose Create.

Choose Create.

  1. Choose ApplyMapping and delete it.
  2. Choose BigQuery.
  3. For Connection, choose bigguery.
  4. Expand Connection options.
  5. Choose Add new option.

Choose Add new option.

  1. Add following Key/Value.
    1. Key: parentProject, Value: <<google_project_name>>
    2. Key: table, Value: bigquery-public-data.covid19_open_data.covid19_open_data

Add following Key/Value.

  1. Choose S3 bucket.
  2. Choose format and Compression Type.
  3. Specify S3 Target Location.

Specify S3 Target Location.

  1. Choose Job details.
  2. For Name, enter BigQuery_S3.
  3. For IAM Role, choose the role you created.
  4. For Type, choose Spark.
  5. For Glue version, choose Glue 2.0 – Supports Spark 2.4, Scala 2, Python3.
  6. Leave rest of the options as defaults.
  7. Choose Save.

Choose Save.

  1. To run the job, choose the Run Job button.

To run the job, choose the Run Job button.

  1. Once the job run succeeds, check the S3 bucket for data.

Once the job run succeeds, check the S3 bucket for data.

In this job, we use the connector to read data from the Big Query public dataset for COVID-19. For more information, see Apache Spark SQL connector for Google BigQuery (Beta) on GitHub.

The code reads the covid19 table in an AWS Glue dynamic DataFrame and writes the data to Amazon S3.

Querying the data

You can now use the Glue Crawlers to crawl the data in S3 bucket. It will create a table covid. You can now go to Athena and query this data. The following screenshot shows our query results.

The following screenshot shows our query results.

Pricing considerations

There might be egress charges for migrating data out of Google BigQuery into Amazon S3. Review and calculate the cost for moving data into Amazon S3.

AWS Glue 2.0 charges $0.44 per DPU-hour, billed per second, with a 1-minute minimum for Spark ETL jobs. An Apache Spark job run in AWS Glue requires a minimum of 2 DPUs. By default, AWS Glue allocates 10 DPUs to each Apache Spark job. Modify the number of workers based on your job requirements. For more information, see AWS Glue pricing.

Conclusion

In this post, we learned how to easily use AWS Glue ETL to connect to BigQuery tables and migrate the data into Amazon S3, and then query the data immediately with Athena. With AWS Glue, you can significantly reduce the cost, complexity, and time spent creating ETL jobs. AWS Glue is serverless, so there is no infrastructure to set up or manage. You pay only for the resources consumed while your jobs are running.

For more information about AWS Glue ETL jobs, see Simplify data pipelines with AWS Glue automatic code generation and workflows and Making ETL easier with AWS Glue Studio.


About the Author

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

Accessing and visualizing data from multiple data sources with Amazon Athena and Amazon QuickSight

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/accessing-and-visualizing-data-from-multiple-data-sources-with-amazon-athena-and-amazon-quicksight/

Amazon Athena now supports federated query, a feature that allows you to query data in sources other than Amazon Simple Storage Service (Amazon S3). You can use federated queries in Athena to query the data in place or build pipelines that extract data from multiple data sources and store them in Amazon S3. With Athena Federated Query, you can run SQL queries across data stored in relational, non-relational, object, and custom data sources. Athena queries including federated queries can be run from the Athena console, a JDBC or ODBC connection, the Athena API, the Athena CLI, the AWS SDK, or AWS Tools for Windows PowerShell.

The goal for this post is to discuss how we can use different connectors to run federated queries with complex joins across different data sources with Athena and visualize the data with Amazon QuickSight.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of the Athena query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB (with MongoDB compatibility), Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon Redshift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source in the cloud or on premises that is accessible from Lambda.

Prerequisites

Before creating your development environment, you must have the following prerequisites:

Configuring your data source connectors

After you deploy your CloudFormation stack, follow the instructions in the post Extracting and joining data from multiple data sources with Athena Federated Query to configure various Athena data source connectors for HBase on Amazon EMR, DynamoDB, ElastiCache for Redis, and Amazon Aurora MySQL.

You can run Athena federated queries in the AmazonAthenaPreviewFunctionality workgroup created as part of the CloudFormation stack or you could run them in the primary workgroup or other workgroups as long as you’re running with Athena engine version 2. As of this writing, Athena Federated Query is generally available in the Asia Pacific (Mumbai), Asia Pacific (Tokyo), Europe (Ireland), US East (N. Virginia), US East (Ohio), US West (N. California), and US West (Oregon) Regions. If you’re running in other Regions, use the AmazonAthenaPreviewFunctionality workgroup.

For information about changing your workgroup to Athena engine version 2, see Changing Athena Engine Versions.

Configuring QuickSight

The next step is to configure QuickSight to use these connectors to query data and visualize with QuickSight.

  1. On the AWS Management Console, navigate to QuickSight.
  2. If you’re not signed up for QuickSight, you’re prompted with the option to sign up. Follow the steps to sign up to use QuickSight.
  3. After you log in to QuickSight, choose Manage QuickSight under your account.

After you log in to QuickSight, choose Manage QuickSight under your account.

  1. In the navigation pane, choose Security & permissions.
  2. Under QuickSight access to AWS services, choose Add or remove.

Under QuickSight access to AWS services, choose Add or remove.

A page appears for enabling QuickSight access to AWS services.

  1. Choose Athena.

Choose Athena.

  1. In the pop-up window, choose Next.

In the pop-up window, choose Next.

  1. On the S3 tab, select the necessary S3 buckets. For this post, I select the athena-federation-workshop-<account_id> bucket and another one that stores my Athena query results.
  2. For each bucket, also select Write permission for Athena Workgroup.

For each bucket, also select Write permission for Athena Workgroup.

  1. On the Lambda tab, select the Lambda functions corresponding to the Athena federated connectors that Athena federated queries use. If you followed the post Extracting and joining data from multiple data sources with Athena Federated Query when configuring your Athena federated connectors, you can select dynamo, hbase, mysql, and redis.

For information about registering a data source in Athena, see the appendix in this post.

  1. Choose Finish.

Choose Finish.

  1. Choose Update.
  2. On the QuickSight console, choose New analysis.
  3. Choose New dataset.
  4. For Datasets, choose Athena.
  5. For Data source name, enter Athena-federation.
  6. For Athena workgroup, choose primary.
  7. Choose Create data source. 

As stated earlier, you can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

You can use the AmazonAthenaPreviewFunctionality workgroup or another workgroup as long as you’re running Athena engine version 2 in a supported Region.

  1. For Catalog, choose the catalog that you created for your Athena federated connector.

For information about creating and registering a data source in Athena, see the appendix in this post.

For information about creating and registering a data source in Athena, see the appendix in this post.

  1. For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

For this post, I choose the dynamo catalog, which does a federation to the Athena DynamoDB connector.

I can now see the database and tables listed in QuickSight.

  1. Choose Edit/Preview data to see the data.
  2. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

22. Choose Save & Visualize to start using this data for creating visualizations in QuickSight.

  1. To do a join with another Athena data source, choose Add data and select the catalog and table.
  2. Choose the join link between the two datasets and choose the appropriate join configuration.
  3. Choose Apply.

Choose Apply

You should be able to see the joined data.

You should be able to see the joined data.

Running a query in QuickSight

Now we use the custom SQL option in QuickSight to run a complex query with multiple Athena federated data sources.

  1. On the QuickSight console, choose New analysis.
  2. Choose New dataset.
  3. For Datasets, choose Athena.
  4. For Data source name, enter Athena-federation.
  5. For the workgroup, choose primary.
  6. Choose Create data source.
  7. Choose Use custom SQL.
  8. Enter the query for ProfitBySupplierNation.
  9. Choose Edit/Preview data.

Choose Edit/Preview data.

Under Query mode, you have the option to view your query in either SPICE or direct query. SPICE is the QuickSight Super-fast, Parallel, In-memory Calculation Engine. It’s engineered to rapidly perform advanced calculations and serve data. Using SPICE can save time and money because your analytical queries process faster, you don’t need to wait for a direct query to process, and you can reuse data stored in SPICE multiple times without incurring additional costs. You also can refresh data in SPICE on a recurring basis as needed or on demand. For more information about refresh options, see Refreshing Data.

With direct query, QuickSight doesn’t use SPICE data and sends the query every time to Athena to get the data.

  1. Select SPICE.
  2. Choose Apply.
  3. Choose Save & visualize.

Choose Save & visualize.

  1. On the Visualize page, under Fields list, choose nation and sum_profit.

QuickSight automatically chooses the best visualization type based on the selected fields. You can change the visual type based on your requirement. The following screenshot shows a pie chart for Sum_profit grouped by Nation.

The following screenshot shows a pie chart for Sum_profit grouped by Nation.

You can add more datasets using Athena federated queries and create dashboards. The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

The following screenshot is an example of a visual analysis over various datasets that were added as part of this post.

When your analysis is ready, you can choose Share to create a dashboard and share it within your organization.

Summary

QuickSight is a powerful visualization tool, and with Athena federated queries, you can run analysis and build dashboards on various data sources like DynamoDB, HBase on Amazon EMR, and many more. You can also easily join relational, non-relational, and custom object stores in Athena queries and use them with QuickSight to create visualizations and dashboards.

For more information about Athena Federated Query, see Using Amazon Athena Federated Query and Query any data source with Amazon Athena’s new federated query.


Appendix

To register a data source in Athena, complete the following steps:

  1. On the Athena console, choose Data sources.

On the Athena console, choose Data sources.

  1. Choose Connect data source.

Choose Connect data source.

  1. Select Query a data source.
  2. For Choose a data source, select a data source (for this post, I select Redis).
  3. Choose Next.

Choose Next.

  1. For Lambda function, choose your function.

For this post, I use the redis Lambda function, which I configured as part of configuring the Athena federated connector in the post Extracting and joining data from multiple data sources with Athena Federated Query.

  1. For Catalog name, enter a name (for example, redis).

The catalog name you specify here is the one that is displayed in QuickSight when selecting Lambda functions for access.

  1. Choose Connect.

Choose Connect.

When the data source is registered, it’s available in the Data source drop-down list on the Athena console.

When the data source is registered, it’s available in the Data source drop-down list on the Athena console.


About the Author

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation. In his free time, he likes to watch movies and spend time with his family.

 

 

 

Redacting sensitive information with user-defined functions in Amazon Athena

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/redacting-sensitive-information-with-user-defined-functions-in-amazon-athena/

Amazon Athena now supports user-defined functions (in Preview), a feature that enables you to write custom scalar functions and invoke them in SQL queries. Although Athena provides built-in functions, UDFs enable you to perform custom processing such as compressing and decompressing data, redacting sensitive data, or applying customized decryption. You can write your UDFs in Java using the Athena Query Federation SDK. When a UDF is used in a SQL query submitted to Athena, it’s invoked and run on AWS Lambda. You can use UDFs in both SELECT and FILTER clauses of a SQL query, and invoke multiple UDFs in the same query. Athena UDF functionality is available in Preview mode in the US East (N. Virginia) Region.

This blog post covers basic functionalities of Athena UDF, writing your own UDF, building and publishing your UDF to AWS Serverless Application Repository, configuring the UDF connector application, and using the UDF in Athena queries. You can implement use cases involving sensitive data, such as personal identifiable information (PII), credit cards, or Social Security numbers (SSN). In this post, we deploy a Redact UDF and use that to mask sensitive information.

Prerequisites

Before creating your development environment, you must have the following prerequisites:

To set up the development environment and address these prerequisites, deploy the CloudFormation template in the first part of this series, Extracting and joining data from multiple data sources with Athena Federated Query. The post provides instructions on building the required test environment and resources using the CloudFormation template.

Creating your IDE

After you deploy the CloudFormation template, you need to create the required AWS resources. To create the development environment to build and deploy the UDF, we use an AWS Cloud9 IDE. On the AWS Cloud9 console, locate your environment and choose Open IDE.

AWS Cloud9 Resize

The AWS Cloud9 IDE comes with a default 10 GB disk space, which can fill quickly when setting up the development environment, so you should resize it.

  1. Run the following command in the AWS Cloud9 IDE terminal to get the resize script:
    curl https://aws-data-analytics-workshops.s3.amazonaws.com/athena-  
         workshop/scripts/cloud9_resize.sh > cloud9_resize.sh

  1. Run the script by issuing the following command on the terminal to resize the disk to 20 GB:
    sh cloud9_resize.sh 20

  1. Check the free space on the disk with the following code:
    df -h

You should see something like the following screenshot.

Setting up the IDE

Next, you clone the SDK and prepare your IDE.

  1. Make sure that Git is installed on your system by entering the following code:
    sudo yum install git -y

  1. To install the Athena Query Federation SDK, enter the following command at the command line to clone the SDK repository. This repository includes the SDK, examples, and a suite of data source connectors.
    git clone https://github.com/awslabs/aws-athena-query-federation.git

If you’re working on a development machine that already has Apache Maven, the AWS CLI, and the AWS Serverless Application Model build tool installed, you can skip this step.

  1. From the root of the aws-athena-query-federation directory that you created when you cloned the repository, run the prepare_dev_env.sh script that prepares your development environment:
    cd aws-athena-query-federation	
    
    sudo chown ec2-user:ec2-user ~/.profile
    
    ./tools/prepare_dev_env.sh

This script requires manual inputs to run (choosing Enter as needed during the setup steps when prompted). You can edit this script to remove the manual inputs if you want to automate the setup entirely.

  1. Update your shell to source new variables created by the installation process or restart your terminal session:
    source ~/.profile

  1. Run the following code from the athena-federation-sdk directory within the GitHub project you checked out earlier:

Adding the UDF code and publishing the connector

In this section, you add your UDF function, build the JAR file, and deploy the connector.

  1. In the AWS Cloud9 IDE, expand the aws-athena-query-federation project and navigate to the AthenaUDFHandler.java file.
  2. Choose the file (double-click) to open it for editing.

Now we add the UDF code for a String Redact function, which redacts a string to show only the last four characters. You can use this UDF function to mask sensitive information.

  1. Enter the following code:
    /** Redact a string to show only the last 4 characters
         * 
         * 
         * 
         * @param input the string to redact
         * @return redacted string
         */
        public String redact(String input)
        {
            String redactedString = new StringBuilder(input).replace(0,     
                input.length()- 4, new String(new char[input.length() -   
                4]).replace("\0", "x")).toString(); 
            return redactedString;
        }

You can also copy the modified code with the following command (which must be run from the aws-athena-query-federation directory):

curl https://aws-data-analytics-workshops.s3.amazonaws.com/athena-workshop/scripts/AthenaUDFHandler.java > athena-udfs/src/main/java/com/amazonaws/athena/connectors/udfs/AthenaUDFHandler.java

After copying the file, you can open it in the AWS Cloud9 IDE to see its contents.

  1. To build the JAR file, save the file and run mvn clean install to build your project:
    cd ~/environment/aws-athena-query-federation/athena-udfs/
    
    mvn clean install

After it successfully builds, a JAR file is created in the target folder of your project named artifactId-version.jar, where artifactId is the name you provided in the Maven project, for example, athena-udfs.

  1. From the athena-udfs directory, run the following code to publish the connector to your private AWS Serverless Application Repository. The S3_BUCKET_NAME in the command is the Amazon Simple Storage Service (Amazon S3) location where a copy of the connector’s code is stored for the AWS Serverless Application Repository to retrieve it.
../tools/publish.sh S3_BUCKET_NAME athena-udfs

This allows users with relevant permission levels to deploy instances of the connector via a one-click form.

When the connector is published successfully, it looks like the following screenshot.

To see AthenaUserDefinedFunctions, choose the link shown in the terminal after the publish is successful or navigate to the AWS Serverless Application Repository by choosing Available Applications, Private applications.

Setting up the UDF connector

Now that the UDF connector code is published, we can install the UDF connector to use with Athena.

  1. Choose the AthenaUserDefinedFunctions application listed on the Private applications section in the AWS Serverless Application Repository.
  2. For Application name, leave it as the default name AthenaUserDefinedFunctions.
  3. For SecretNameorPrefix, enter a secret name if you have already saved it in AWS Secrets Manager; otherwise, enter database-*.
  4. For LambdaFunctionName, enter customudf.
  5. Leave the remaining fields as default.
  6. Select I acknowledge that this app creates custom IAM roles.
  7. Choose Deploy.

Querying with UDF in Athena

Now that the UDF connector code is deployed, we can run Athena queries that use the UDF.

If you ran the CloudFormation template from Part 1 of this blog series, the AmazonAthenaPreviewFunctionality workgroup was already created. If not, choose Create Workgroup on the Athena console and create a workgroup named AmazonAthenaPreviewFunctionality and set up your query result location in Amazon S3.

To proceed, make sure you are in the workgroup AmazonAthenaPreviewFunctionality. If not, choose the workgroup AmazonAthenaPreviewFunctionality and choose Switch workgroup.

You can now run a query to use the Redact UDF to mask sensitive information from PII columns. To show the comparison, we have included the PII column and masked data as part of the query results. If you ran the CloudFormation template from Part 1 of this series, you can navigate to the Saved Queries on the Athena console and choose RedactUdfCustomerAddress.

The following screenshot shows your query.

After the query runs, you should see results like the following screenshot. The redact_name, redact_phone, and redact_address columns only show the last four characters.

Cleaning up

To clean up the resources created as part of your CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty and delete the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
     aws s3 rm s3://athena-federation-workshop-<account-id> --recursive

  1. Use the AWS CloudFormation console or AWS CLI to delete the stacks Athena-Federation-Workshop and serverlessrepo-AthenaUserDefinedFunctions

Summary

In this post, you learned about Athena user-defined functions, how to create your own UDF, and how to deploy it to a private AWS Serverless Application Repository. You also learned how to configure the UDF and use it in your Athena queries. In the next post of this series, we discuss and demonstrate how to use a machine learning (ML) anomaly detection model developed on Amazon SageMaker and use that model in Athena queries to invoke an ML inference function to detect anomaly values in our orders dataset.


About the Authors


Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 


Amir Basirat is a Big Data specialist solutions architect at Amazon Web Services,
focused on Amazon EMR, Amazon Athena, AWS Glue and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a Big Data specialist for different technology companies. He also has a PhD in computer science, where his research was primarily focused on large-scale distributed computing and neural networks.

Extracting and joining data from multiple data sources with Athena Federated Query

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/extracting-and-joining-data-from-multiple-data-sources-with-athena-federated-query/

With modern day architectures, it’s common to have data sitting in various data sources. We need proper tools and technologies across those sources to create meaningful insights from stored data. Amazon Athena is primarily used as an interactive query service that makes it easy to analyze unstructured, semi-structured, and structured data stored in Amazon Simple Storage Service (Amazon S3) using standard SQL. With the federated query functionality in Athena, you can now run SQL queries across data stored in relational, non-relational, object, and custom data sources and store the results back in Amazon S3 for further analysis.

The goals for this series of posts are to discuss how we can configure different connectors to run federated queries with complex joins across different data sources, how to configure a user-defined function for redacting sensitive information when running Athena queries, and how we can use machine learning (ML) inference to detect anomaly detection in datasets to help developers, big data architects, data engineers, and business analysts in their daily operational routines.

Athena Federated Query

Athena uses data source connectors that run on AWS Lambda to run federated queries. A data source connector is a piece of code that translates between your target data source and Athena. You can think of a connector as an extension of Athena’s query engine. Prebuilt Athena data source connectors exist for data sources like Amazon CloudWatch Logs, Amazon DynamoDB, Amazon DocumentDB, Amazon Elasticsearch Service (Amazon ES), Amazon ElastiCache for Redis, and JDBC-compliant relational data sources such as MySQL, PostgreSQL, and Amazon RedShift under the Apache 2.0 license. You can also use the Athena Query Federation SDK to write custom connectors. After you deploy data source connectors, the connector is associated with a catalog name that you can specify in your SQL queries. You can combine SQL statements from multiple catalogs and span multiple data sources with a single query.

When a query is submitted against a data source, Athena invokes the corresponding connector to identify parts of the tables that need to be read, manages parallelism, and pushes down filter predicates. Based on the user submitting the query, connectors can provide or restrict access to specific data elements. Connectors use Apache Arrow as the format for returning data requested in a query, which enables connectors to be implemented in languages such as C, C++, Java, Python, and Rust. Because connectors run in Lambda, you can use them to access data from any data source on the cloud or on premises that is accessible from Lambda.

The first post of this series discusses how to configure Athena Federated Query connectors and use them to run federated queries for data residing in HBase on Amazon EMR, Amazon Aurora MySQL, DynamoDB, and ElastiCache for Redis databases.

Test data

To demonstrate Athena federation capabilities, we use the TPCH sample dataset. TPCH is a decision support benchmark and has broad industry-wide relevance. This benchmark illustrates decision support systems that examine large volumes of data, run queries with a high degree of complexity, and give answers to critical business questions. For our use case, imagine a hypothetical ecommerce company with the following architecture:

  • Lineitems processing records stored in HBase on Amazon EMR to meet requirements for a write-optimized data store with high transaction rate and long-term durability
  • ElastiCache for Redis stores Nations and ActiveOrders tables so that the processing engine can get fast access to them
  • An Aurora with MySQL engine is used for Orders, Customer, and Suppliers accounts data like email addresses and shipping addresses
  • DynamoDB hosts Part and Partsupp data, because DynamoDB offers high flexibility and high performance

The following diagram shows a schematic view of the TPCH tables and their associated data stores.

Building a test environment using AWS CloudFormation

Before following along with this post, you need to create the required AWS resources in your account. To do this, we have provided you with an AWS CloudFormation template to create a stack that contains the required resources: the sample TPCH database on Amazon Relational Database Service (Amazon RDS), HBase on Amazon EMR, Amazon ElastiCache for Redis, and DynamoDB.

The template also creates the AWS Glue database and tables, S3 bucket, Amazon S3 VPC endpoint, AWS Glue VPC endpoint, Athena named queries, AWS Cloud9 IDE, an Amazon SageMaker notebook instance, and other AWS Identity and Access Management (IAM) resources that we use to implement the federated query, user-defined functions (UDFs), and ML inference functions.

This template is designed only to show how you can use Athena Federated Query, UDFs, and ML inference. This setup isn’t intended for production use without modification. Additionally, the template is created for use in the us-east-1 Region, and doesn’t work in other Regions.

Before launching the stack, you must have the following prerequisites:

  • An AWS account that provides access to AWS services
  • An IAM user with an access key and secret key to configure the AWS Command Line Interface (AWS CLI), and permissions to create an IAM role, IAM policies, and stacks in AWS CloudFormation

To create your resources, complete the following steps:

  1. Choose Launch Stack:
  2. Select I acknowledge that this template may create IAM resources.

This template creates resources that incur costs while they remain in use. Follow the cleanup steps at the end of this post to delete and clean up the resources to avoid any unnecessary charges.

  1. When the CloudFormation template is complete, record the outputs listed on the Outputs tab on the AWS CloudFormation console.

The CloudFormation stack takes approximately 20–30 minutes to complete. Check the AWS CloudFormation console and wait for the status CREATE_COMPLETE.

When stack creation is complete, your AWS account has all the required resources to implement this solution.

  1. On the Outputs tab of the Athena-Federation-Workshop stack, capture the following:
    1. S3Bucket
    2. Subnets
    3. WorkshopSecurityGroup
    4. EMRSecurityGroup
    5. HbaseConnectionString
    6. RDSConnectionString

You need all this information when setting up connectors.

  1. When the stacks are complete, check the status of the Amazon EMR steps on the Amazon EMR console.

It can take up to 15 minutes for this step to complete.

Deploying connectors and connecting to data sources

Preparing to create federated queries is a two-part process: deploying a Lambda function data source connector, and connecting the Lambda function to a data source. In the first part, you give the Lambda function a name that you can later choose on the Athena console. In the second part, you give the connector a name that you can reference in your SQL queries.

We want to query different data sources, so in the following sections we set up Lambda connectors for HBase on Amazon EMR, Aurora MySQL, DynamoDB, and Redis before we start creating complex joins across data sources using Athena federated queries. The following diagram shows the architecture of our environment.

Installing the Athena JDBC connector for Aurora MySQL

The Athena JDBC connector supports the following databases:

  • MySQL
  • PostGreSQL
  • Amazon Redshift

To install the Athena JDBC connector for Aurora MySQL, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaJdbcConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name, AthenaJdbcConnector.
    2. SecretNamePrefix – Enter AthenaJdbcFederation.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. DefaultConnectionString – Enter the RDSConnectionString value from the AWS CloudFormation outputs.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaFunctionName – Enter mysql.
    7. LambdaMemory – Leave it as the default value 3008.
    8. LambdaTimeout – Leave it as the default value 900.
    9. SecurityGroupIds – Enter the WorkshopSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Change the default value to athena-spill/jdbc.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena JDBC connector for Aurora MySQL; you can refer to this Lambda function in your queries as lambda:mysql.

For more information about the Athena JDBC connector, see the GitHub repo.

Installing the Athena DynamoDB connector

To install Athena DynamoDB Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaDynamoDBConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaDynamoDBConnector.
    2. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    3. AthenaCatalogName – Enter dynamo.
    4. DisableSpillEncryption – Leave it as the default value false.
    5. LambdaMemory – Leave it as the default value 3008.
    6. LambdaTimeout – Leave it as the default value 900.
    7. SpillPrefix – Enter athena-spill-dynamo.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys Athena DynamoDB connector; you can refer to this Lambda function in your queries as lambda:dynamo.

For more information about the Athena DynamoDB connector, see the GitHub repo.

Installing the Athena HBase connector

To install the Athena HBase connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaHBaseConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaHBaseConnector
    2. SecretNamePrefix – Enter hbase-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter hbase.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. DefaultConnectionString – Enter the HbaseConnectionString value from the AWS CloudFormation outputs.
    7. LambdaMemory – Leave it as the default value of 3008.
    8. LambdaTimeout – Leave it as the default value of 900.
    9. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    10. SpillPrefix – Enter athena-spill-hbase.
    11. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena HBase connector; you can refer to this Lambda function in your queries as lambda:hbase.

For more information about the Athena HBase connector, see the GitHub repo.

Installing the Athena Redis connector

To install Athena Redis Connector, complete the following steps:

  1. In your AWS account, search for serverless application repository.
  2. Choose Available applications.
  3. Make sure that Show apps that create custom IAM roles or resource policies is selected.
  4. Search for athena federation.
  5. Locate and choose AthenaRedisConnector.
  6. Provide the following values:
    1. Application name – Leave it as default name AthenaRedisConnector.
    2. SecretNameOrPrefix – Enter redis-*.
    3. SpillBucket – Enter the S3Bucket value from the AWS CloudFormation outputs.
    4. AthenaCatalogName – Enter redis.
    5. DisableSpillEncryption – Leave it as the default value false.
    6. LambdaMemory – Leave it as the default value 3008.
    7. LambdaTimeout – Leave it as the default value 900.
    8. SecurityGroupIds – Enter the EMRSecurityGroup value from the AWS CloudFormation outputs.
    9. SpillPrefix – Enter athena-spill-redis.
    10. SubnetIds – Enter the Subnets value from the AWS CloudFormation outputs.
  7. Select I acknowledge that this app creates custom IAM roles.
  8. Choose Deploy.

This deploys the Athena Redis connector; you can refer to this Lambda function in your queries as lambda:redis.

For more information about the Athena Redis connector, see the GitHub repo.

Redis database and tables with the AWS Glue Data Catalog

Because Redis doesn’t have a schema of its own, the Redis connector can’t infer the columns or data type from Redis. The Redis connector needs an AWS Glue database and tables to be set up so it can associate the data to the schema. The CloudFormation template creates the necessary Redis database and tables in the Data Catalog. You can confirm this on the AWS Glue console.

Running federated queries

Now that the connectors are deployed, we can run Athena queries that use those connectors.

  1. On the Athena console, choose Get Started.
  2. Make sure you’re in the workgroup AmazonAthenaPreviewFunctionality. If not, choose Workgroups, select AmazonAthenaPreviewFunctionality, and choose Switch Workgroup.

On the Saved Queries tab, you can see a list of pre-populated queries to test.

The Sources saved query tests your Athena connector functionality for each data source, and you can make sure that you can extract data from each data source before running more complex queries involving different data sources.

  1. Highlight the first query up to the semicolon and choose Run query.

After successfully testing connections to each data source, you can proceed with running more complex queries, such as:

  • FetchActiveOrderInfo
  • ProfitBySupplierNationByYr
  • OrdersRevenueDateAndShipPrio
  • ShippedLineitemsPricingReport
  • SuppliersWhoKeptOrdersWaiting

If you see an error on the HBase query like the following, try rerunning it and it should resolve the issue.

GENERIC_USER_ERROR: Encountered an exception[java.lang.RuntimeException] from your LambdaFunction[hbase] executed in context[retrieving meta-data] with message[org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location for replica 0]

As an example of the advanced queries, the SuppliersWhoKeptOrdersWaiting query identifies suppliers whose product was part of a multi-supplier order (with current status of F) and they didn’t ship the required parts on time. This query uses multiple data sources: Aurora MySQL and HBase on Amazon EMR. As shown in the following screenshot, the query extracts data from the supplier table on Aurora MySQL, the lineitem table on HBase, and the orders tables on Aurora MySQL. The results are returned in 7.13 seconds.

Cleaning up

To clean up the resources created as part of our CloudFormation template, complete the following steps:

  1. On the Amazon S3 console, empty the bucket athena-federation-workshop-<account-id>.
  2. If you’re using the AWS CLI, delete the objects in the athena-federation-workshop-<account-id> bucket with the following code (make sure you’re running this command on the correct bucket):
    aws s3 rm s3://athena-federation-workshop-<account-id> --recursive

  3. On the AWS CloudFormation console, delete all the connectors so they’re no longer attached to the elastic network interface (ENI) of the VPC. Alternatively, go to each connector and deselect the VPC so it’s no longer attached to the VPC created by AWS CloudFormation.
  4. On the Amazon SageMaker console, delete any endpoints you created as part of the ML inference.
  5. On the Athena console, delete the AmazonAthenaPreviewFunctionality workgroup.
  6. On the AWS CloudFormation console or the AWS CLI, delete the stack Athena-Federation-Workshop.

Summary

In this post, we demonstrated the functionality of Athena federated queries by creating multiple different connectors and running federated queries against multiple data sources. In the next post, we show you how you can use the Athena Federation SDK to deploy your UDF and invoke it to redact sensitive information in your Athena queries.


About the Authors

Saurabh Bhutyani is a Senior Big Data Specialist Solutions Architect at Amazon Web Services. He is an early adopter of open-source big data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

 

 

 

Amir Basirat is a Big Data Specialist Solutions Architect at Amazon Web Services, focused on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation, where he helps customers craft distributed analytics applications on the AWS platform. Prior to his AWS Cloud journey, he worked as a big data specialist for different technology companies. He also has a PhD in computer science, where his research primarily focused on large-scale distributed computing and neural networks.