Tag Archives: Advanced (300)

Build an Amazon Redshift data warehouse using an Amazon DynamoDB single-table design

Post Syndicated from Altaf Hussain original https://aws.amazon.com/blogs/big-data/build-an-amazon-redshift-data-warehouse-using-an-amazon-dynamodb-single-table-design/

Amazon DynamoDB is a fully managed NoSQL service that delivers single-digit millisecond performance at any scale. It’s used by thousands of customers for mission-critical workloads. Typical use cases for DynamoDB are an ecommerce application handling a high volume of transactions, or a gaming application that needs to maintain scorecards for players and games. In traditional databases, we would model such applications using a normalized data model (entity-relation diagram). This approach comes with a heavy computational cost in terms of processing and distributing the data across multiple tables while ensuring the system is ACID-compliant at all times, which can negatively impact performance and scalability. If these entities are frequently queried together, it makes sense to store them in a single table in DynamoDB. This is the concept of single-table design. Storing different types of data in a single table allows you to retrieve multiple, heterogeneous item types using a single request. Such requests are relatively straightforward, and usually take the following form:

SELECT * FROM TABLE WHERE Some_Attribute = 'some_value'

In this format, some_attribute is a partition key or part of an index.

Nonetheless, many of the same customers using DynamoDB would also like to be able to perform aggregations and ad hoc queries against their data to measure important KPIs that are pertinent to their business. Suppose we have a successful ecommerce application handling a high volume of sales transactions in DynamoDB. A typical ask for this data may be to identify sales trends as well as sales growth on a yearly, monthly, or even daily basis. These types of queries require complex aggregations over a large number of records. A key pillar of AWS’s modern data strategy is the use of purpose-built data stores for specific use cases to achieve performance, cost, and scale. Deriving business insights by identifying year-on-year sales growth is an example of an online analytical processing (OLAP) query. These types of queries are suited for a data warehouse.

The goal of a data warehouse is to enable businesses to analyze their data fast; this is important because it means they are able to gain valuable insights in a timely manner. Amazon Redshift is fully managed, scalable, cloud data warehouse. Building a performant data warehouse is non-trivial because the data needs to be highly curated to serve as a reliable and accurate version of the truth.

In this post, we walk through the process of exporting data from a DynamoDB table to Amazon Redshift. We discuss data model design for both NoSQL databases and SQL data warehouses. We begin with a single-table design as an initial state and build a scalable batch extract, load, and transform (ELT) pipeline to restructure the data into a dimensional model for OLAP workloads.

DynamoDB table example

We use an example of a successful ecommerce store allowing registered users to order products from their website. A simple ERD (entity-relation diagram) for this application will have four distinct entities: customers, addresses, orders, and products. For customers, we have information such as their unique user name and email address; for the address entity, we have one or more customer addresses. Orders contain information regarding the order placed, and the products entity provides information about the products placed in an order. As we can see from the following diagram, a customer can place one or more orders, and an order must contain one or more products.

We could store each entity in a separate table in DynamoDB. However, there is no way to retrieve customer details alongside all the orders placed by the customer without making multiple requests to the customer and order tables. This is inefficient from both a cost and performance perspective. A key goal for any efficient application is to retrieve all the required information in a single query request. This ensures fast, consistent performance. So how can we remodel our data to avoid making multiple requests? One option is to use single-table design. Taking advantage of the schema-less nature of DynamoDB, we can store different types of records in a single table in order to handle different access patterns in a single request. We can go further still and store different types of values in the same attribute and use it as a global secondary index (GSI). This is called index overloading.

A typical access pattern we may want to handle in our single table design is to get customer details and all orders placed by the customer.

To accommodate this access pattern, our single-table design looks like the following example.

By restricting the number of addresses associated with a customer, we can store address details as a complex attribute (rather than a separate item) without exceeding the 400 KB item size limit of DynamoDB.

We can add a global secondary index (GSIpk and GSIsk) to capture another access pattern: get order details and all product items placed in an order. We use the following table.

We have used generic attribute names, PK and SK, for our partition key and sort key columns. This is because they hold data from different entities. Furthermore, the values in these columns are prefixed by generic terms such as CUST# and ORD# to help us identify the type of data we have and ensure that the value in PK is unique across all records in the table.

A well-designed single table will not only reduce the number of requests for an access pattern, but will service many different access patterns. The challenge comes when we need to ask more complex questions of our data, for example, what was the year-on-year quarterly sales growth by product broken down by country?

The case for a data warehouse

A data warehouse is ideally suited to answer OLAP queries. Built on highly curated structured data, it provides the flexibility and speed to run aggregations across an entire dataset to derive insights.

To house our data, we need to define a data model. An optimal design choice is to use a dimensional model. A dimension model consists of fact tables and dimension tables. Fact tables store the numeric information about business measures and foreign keys to the dimension tables. Dimension tables store descriptive information about the business facts to help understand and analyze the data better. From a business perspective, a dimension model with its use of facts and dimensions can present complex business processes in a simple-to-understand manner.

Building a dimensional model

A dimensional model optimizes read performance through efficient joins and filters. Amazon Redshift automatically chooses the best distribution style and sort key based on workload patterns. We build a dimensional model from the single DynamoDB table based on the following star schema.

We have separated each item type into individual tables. We have a single fact table (Orders) containing the business measures price and numberofitems, and foreign keys to the dimension tables. By storing the price of each product in the fact table, we can track price fluctuations in the fact table without continually updating the product dimension. (In a similar vein, the DynamoDB attribute amount is a simple derived measure in our star schema: amount is the summation of product prices per orderid).

By splitting the descriptive content of our single DynamoDB table into multiple Amazon Redshift dimension tables, we can remove redundancy by only holding in each dimension the information pertinent to it. This allows us the flexibility to query the data under different contexts; for example, we may want to know the frequency of customer orders by city or product sales by date. The ability to freely join dimensions and facts when analyzing the data is one of the key benefits of dimensional modeling. It’s also good practice to have a Date dimension to allow us to perform time-based analysis by aggregating the fact by year, month, quarter, and so forth.

This dimensional model will be built in Amazon Redshift. When setting out to build a data warehouse, it’s a common pattern to have a data lake as the source of the data warehouse. The data lake in this context serves a number of important functions:

  • It acts as a central source for multiple applications, not just exclusively for data warehousing purposes. For example, the same dataset could be used to build machine learning (ML) models to identify trends and predict sales.
  • It can store data as is, be it unstructured, semi-structured, or structured. This allows you to explore and analyze the data without committing upfront to what the structure of the data should be.
  • It can be used to offload historical or less-frequently-accessed data, allowing you to manage your compute and storage costs more effectively. In our analytic use case, if we are analyzing quarterly growth rates, we may only need a couple of years’ worth of data; the rest can be unloaded into the data lake.

When querying a data lake, we need to consider user access patterns in order to reduce costs and optimize query performance. This is achieved by partitioning the data. The choice of partition keys will depend on how you query the data. For example, if you query the data by customer or country, then they are good candidates for partition keys; if you query by date, then a date hierarchy can be used to partition the data.

After the data is partitioned, we want to ensure it’s held in the right format for optimal query performance. The recommended choice is to use a columnar format such as Parquet or ORC. Such formats are compressed and store data column-wise, allowing for fast retrieval times, and are parallelizable, allowing for fast load times when moving the data into Amazon Redshift. In our use case, it makes sense to store the data in a data lake with minimal transformation and formatting to enable easy querying and exploration of the dataset. We partition the data by item type (Customer, Order, Product, and so on), and because we want to easily query each entity in order to move the data into our data warehouse, we transform the data into the Parquet format.

Solution overview

The following diagram illustrates the data flow to export data from a DynamoDB table to a data warehouse.

We present a batch ELT solution using AWS Glue for exporting data stored in DynamoDB to an Amazon Simple Storage Service (Amazon S3) data lake and then a data warehouse built in Amazon Redshift. AWS Glue is a fully managed extract, transform, and load (ETL) service that allows you to organize, cleanse, validate, and format data for storage in a data warehouse or data lake.

The solution workflow has the following steps:

  1. Move any existing files from the raw and data lake buckets into corresponding archive buckets to ensure any fresh export from DynamoDB to Amazon S3 isn’t duplicating data.
  2. Begin a new DynamoDB export to the S3 raw layer.
  3. From the raw files, create a data lake partitioned by item type.
  4. Load the data from the data lake to landing tables in Amazon Redshift.
  5. After the data is loaded, we take advantage of the distributed compute capability of Amazon Redshift to transform the data into our dimensional model and populate the data warehouse.

We orchestrate the pipeline using an AWS Step Functions workflow and schedule a daily batch run using Amazon EventBridge.

For simpler DynamoDB table structures you may consider skipping some of these steps by either loading data directly from DynamoDB to Redshift or using Redshift’s auto-copy or copy command to load data from S3.

Prerequisites

You must have an AWS account with a user who has programmatic access. For setup instructions, refer to AWS security credentials.

Use the AWS CloudFormation template cf_template_ddb-dwh-blog.yaml to launch the following resources:

  • A DynamoDB table with a GSI and point-in-time recovery enabled.
  • An Amazon Redshift cluster (we use two nodes of RA3.4xlarge).
  • Three AWS Glue database catalogs: raw, datalake, and redshift.
  • Five S3 buckets: two for the raw and data lake files; two for their respective archives, and one for the Amazon Athena query results.
  • Two AWS Identity and Access Management (IAM) roles: An AWS Glue role and a Step Functions role with the requisite permissions and access to resources.
  • A JDBC connection to Amazon Redshift.
  • An AWS Lambda function to retrieve the s3-prefix-list-id for your Region. This is required to allow traffic from a VPC to access an AWS service through a gateway VPC endpoint.
  • Download the following files to perform the ELT:
    • The Python script to load sample data into our DynamoDB table: load_dynamodb.py.
    • The AWS Glue Python Spark script to archive the raw and data lake files: archive_job.py.
    • The AWS Glue Spark scripts to extract and load the data from DynamoDB to Amazon Redshift: GlueSparkJobs.zip.
    • The DDL and DML SQL scripts to create the tables and load the data into the data warehouse in Amazon Redshift: SQL Scripts.zip.

Launch the CloudFormation template

AWS CloudFormation allows you to model, provision, and scale your AWS resources by treating infrastructure as code. We use the downloaded CloudFormation template to create a stack (with new resources).

  1. On the AWS CloudFormation console, create a new stack and select Template is ready.
  2. Upload the stack and choose Next.

  1. Enter a name for your stack.
  2. For MasterUserPassword, enter a password.
  3. Optionally, replace the default names for the Amazon Redshift database, DynamoDB table, and MasterUsername (in case these names are already in use).
  4. Reviewed the details and acknowledge that AWS CloudFormation may create IAM resources on your behalf.
  5. Choose Create stack.

Load sample data into a DynamoDB table

To load your sample data into DynamoDB, complete the following steps:

  1. Create an AWS Cloud9 environment with default settings.
  2. Upload the load DynamoDB Python script. From the AWS Cloud9 terminal, use the pip install command to install the following packages:
    1. boto3
    2. faker
    3. faker_commerce
    4. numpy
  3. In the Python script, replace all placeholders (capital letters) with the appropriate values and run the following command in the terminal:
python load_dynamodb.py

This command loads the sample data into our single DynamoDB table.

Extract data from DynamoDB

To extract the data from DynamoDB to our S3 data lake, we use the new AWS Glue DynamoDB export connector. Unlike the old connector, the new version uses a snapshot of the DynamoDB table and doesn’t consume read capacity units of your source DynamoDB table. For large DynamoDB tables exceeding 100 GB, the read performance of the new AWS Glue DynamoDB export connector is not only consistent but also significantly faster than the previous version.

To use this new export connector, you need to enable point-in-time recovery (PITR) for the source DynamoDB table in advance. This will take continuous backups of the source table (so be mindful of cost) and ensures that each time the connector invokes an export, the data is fresh. The time it takes to complete an export depends on the size of your table and how uniformly the data is distributed therein. This can range from a few minutes for small tables (up to 10 GiB) to a few hours for larger tables (up to a few terabytes). This is not a concern for our use case because data lakes and data warehouses are typically used to aggregate data at scale and generate daily, weekly, or monthly reports. It’s also worth noting that each export is a full refresh of the data, so in order to build a scalable automated data pipeline, we need to archive the existing files before beginning a fresh export from DynamoDB.

Complete the following steps:

  1. Create an AWS Glue job using the Spark script editor.
  2. Upload the archive_job.py file from GlueSparkJobs.zip.

This job archives the data files into timestamped folders. We run the job concurrently to archive the raw files and the data lake files.

  1. In Job details section, give the job a name and choose the AWS Glue IAM role created by our CloudFormation template.
  2. Keep all defaults the same and ensure maximum concurrency is set to 2 (under Advanced properties).

Archiving the files provides a backup option in the event of disaster recovery. As such, we can assume that the files will not be accessed frequently and can be kept in Standard_IA storage class so as to save up to 40% on costs while providing rapid access to the files when needed.

This job typically runs before each export of data from DynamoDB. After the datasets have been archived, we’re ready to (re)-export the data from our DynamoDB table.

We can use AWS Glue Studio to visually create the jobs needed to extract the data from DynamoDB and load into our Amazon Redshift data warehouse. We demonstrate how to do this by creating an AWS Glue job (called ddb_export_raw_job) using AWS Glue Studio.

  1. In AWS Glue Studio, create a job and select Visual with a blank canvas.
  2. Choose Amazon DynamoDB as the data source.

  1. Choose our DynamoDB table to export from.
  2. Leave all other options as is and finish setting up the source connection.

We then choose Amazon S3 as our target. In the target properties, we can transform the output to a suitable format, apply compression, and specify the S3 location to store our raw data.

  1. Set the following options:
    1. For Format, choose Parquet.
    2. For Compression type, choose Snappy.
    3. For S3 Target Location, enter the path for RawBucket (located on the Outputs tab of the CloudFormation stack).
    4. For Database, choose the value for GlueRawDatabase (from the CloudFormation stack output).
    5. For Table name, enter an appropriate name.

  1. Because our target data warehouse requires data to be in a flat structure, verify that the configuration option dynamodb.unnestDDBJson is set to True on the Script tab.

  1. On the Job details tab, choose the AWS Glue IAM role generated by the CloudFormation template.
  2. Save and run the job.

Depending on the data volumes being exported, this job may take a few minutes to complete.

Because we’ll be adding the table to our AWS Glue Data Catalog, we can explore the output using Athena after the job is complete. Athena is a serverless interactive query service that makes it simple to analyze data directly in Amazon S3 using standard SQL.

  1. In the Athena query editor, choose the raw database.

We can see that the attributes of the Address structure have been unnested and added as additional columns to the table.

  1. After we export the data into the raw bucket, create another job (called raw_to_datalake_job) using AWS Glue Studio (select Visual with a blank canvas) to load the data lake partitioned by item type (customer, order, and product).
  2. Set the source as the AWS Glue Data Catalog raw database and table.

  1. In the ApplyMapping transformation, drop the Address struct because we have already unnested these attributes into our flattened raw table.

  1. Set the target as our S3 data lake.

  1. Choose the AWS Glue IAM role in the job details, then save and run the job.

Now that we have our data lake, we’re ready to build our data warehouse.

Build the dimensional model in Amazon Redshift

The CloudFormation template launches a two-node RA3.4xlarge Amazon Redshift cluster. To build the dimensional model, complete the following steps:

  1. In Amazon Redshift Query Editor V2, connect to your database (default: salesdwh) within the cluster using the database user name and password authentication (MasterUserName and MasterUserPassword from the CloudFormation template).
  2. You may be asked to configure your account if this is your first time using Query Editor V2.
  3. Download the SQL scripts SQL Scripts.zip to create the following schemas and tables (run the scripts in numbered sequence).

In the landing schema:

  • address
  • customer
  • order
  • product

In the staging schema:

  • staging.address
  • staging.address_maxkey
  • staging.addresskey
  • staging.customer
  • staging.customer_maxkey
  • staging.customerkey
  • staging.date
  • staging.date_maxkey
  • staging.datekey
  • staging.order
  • staging.order_maxkey
  • staging.orderkey
  • staging.product
  • staging.product_maxkey
  • staging.productkey

In the dwh schema:

  • dwh.address
  • dwh.customer
  • dwh.order
  • dwh.product

We load the data from our data lake to the landing schema as is.

  1. Use the JDBC connector to Amazon Redshift to build an AWS Glue crawler to add the landing schema to our Data Catalog under the ddb_redshift database.

  1. Create an AWS Glue crawler with the JDBC data source.

  1. Select the JDBC connection you created and choose Next.

  1. Choose the IAM role created by the CloudFormation template and choose Next.

  1. Review your settings before creating the crawler.

The crawler adds the four landing tables in our AWS Glue database ddb_redshift.

  1. In AWS Glue Studio, create four AWS Glue jobs to load the landing tables (these scripts are available to download, and you can use the Spark script editor to upload these scripts individually to create the jobs):
    1. land_order_job
    2. land_product_job
    3. land_customer_job
    4. land_address_job

Each job has the structure as shown in the following screenshot.

  1. Filter the S3 source on the partition column type:
    1. For product, filter on type=‘product’.
    2. For order, filter on type=‘order’.
    3. For customer and address, filter on type=‘customer’.

  1. Set the target for the data flow as the corresponding table in the landing schema in Amazon Redshift.
  2. Use the built-in ApplyMapping transformation in our data pipeline to drop columns and, where necessary, convert the data types to match the target columns.

For more information about built-in transforms available in AWS Glue, refer to AWS Glue PySpark transforms reference.

The mappings for our four jobs are as follows:

  • land_order_job:
    mappings=[
    ("pk", "string", "pk", "string"),
    ("orderid", "string", "orderid", "string"),
    ("numberofitems", "string", "numberofitems", "int"),
    ("orderdate", "string", "orderdate", "timestamp"),
    ]

  • land_product_job:
    mappings=[
    ("orderid", "string", "orderid", "string"),
    ("category", "string", "category", "string"),
    ("price", "string", "price", "decimal"),
    ("productname", "string", "productname", "string"),
    ("productid", "string", "productid", "string"),
    ("color", "string", "color", "string"),
    ]

  • land_address_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  • land_customer_job:
    mappings=[
    ("username", "string", "username", "string"),
    ("email", "string", "email", "string"),
    ("fullname", "string", "fullname", "string"),
    ]

  1. Choose the AWS Glue IAM role, and under Advanced properties, verify the JDBC connector to Amazon Redshift as a connection.
  2. Save and run each job to load the landing tables in Amazon Redshift.

Populate the data warehouse

From the landing schema, we move the data to the staging layer and apply the necessary transformations. Our dimensional model has a single fact table, the orders table, which is the largest table and as such needs a distribution key. The choice of key depends on how the data is queried and the size of the dimension tables being joined to. If you’re unsure of your query patterns, you can leave the distribution keys and sort keys for your tables unspecified. Amazon Redshift automatically assigns the correct distribution and sort keys based on your queries. This has the advantage that if and when query patterns change over time, Amazon Redshift can automatically update the keys to reflect the change in usage.

In the staging schema, we keep track of existing records based on their business key (the unique identifier for the record). We create key tables to generate a numeric identity column for each table based on the business key. These key tables allow us to implement an incremental transformation of the data into our dimensional model.

CREATE TABLE IF NOT EXISTS staging.productkey ( 
    productkey integer identity(1,1), 
    productid character varying(16383), 
    CONSTRAINT products_pkey PRIMARY KEY(productkey));   

When loading the data, we need to keep track of the latest surrogate key value to ensure that new records are assigned the correct increment. We do this using maxkey tables (pre-populated with zero):

CREATE TABLE IF NOT EXISTS staging.product_maxkey ( 
    productmaxkey integer);

INSERT INTO staging.product_maxkey
select 0;    

We use staging tables to store our incremental load, the structure of which will mirror our final target model in the dwh schema:

---staging tables to load data from data lake 
   
CREATE TABLE IF NOT EXISTS staging.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey));
---dwh tables to load data from staging schema
     
CREATE TABLE IF NOT EXISTS dwh.product ( 
    productkey integer,
    productname character varying(200), 
    color character varying(50), 
    category character varying(100),
    PRIMARY KEY (productkey)); 

Incremental processing in the data warehouse

We load the target data warehouse using stored procedures to perform upserts (deletes and inserts performed in a single transaction):

CREATE OR REPLACE PROCEDURE staging.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

TRUNCATE TABLE staging.order;

--insert new records to get new ids
insert into staging.orderkey
(
orderid
)
select
c.orderid
from landing.order c
LEFT JOIN staging.orderkey i
ON c.orderid=i.orderid
where i.orderid IS NULL;

--update the max key
update staging.order_maxkey
set ordermaxkey = (select max(orderkey) from staging.orderkey);


insert into staging.order
(
orderkey,
customerkey,
productkey,
addresskey,
datekey,
numberofitems,
price
)
select
xid.orderkey,
cid.customerkey,
pid.productkey,
aid.addresskey,
d.datekey,
o.numberofitems,
p.price
from
landing.order o
join staging.orderkey xid on o.orderid=xid.orderid
join landing.customer c on substring(o.pk,6,length(o.pk))=c.username   ---order table needs username
join staging.customerkey cid on cid.username=c.username
join landing.address a on a.username=c.username
join staging.addresskey aid on aid.pk=a.buildingnumber::varchar+'||'+a.postcode  ---maybe change pk to addressid
join staging.datekey d on d.orderdate=o.orderdate
join landing.product p on p.orderid=o.orderid
join staging.productkey pid on pid.productid=p.productid;

COMMIT;

END;
$$ 
CREATE OR REPLACE PROCEDURE dwh.load_order() LANGUAGE plpgsql AS $$
DECLARE
BEGIN

---delete old records 
delete from dwh.order
using staging.order as stage
where dwh.order.orderkey=stage.orderkey;

--insert new and modified
insert into dwh.order
(
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey  
)
select
orderkey,
customerkey,  
productkey,
addresskey,
price,
datekey
from staging.order;

COMMIT;
END;
$$

Use Step Functions to orchestrate the data pipeline

So far, we have stepped through each component in our workflow. We now need to stitch them together to build an automated, idempotent data pipeline. A good orchestration tool must manage failures, retries, parallelization, service integrations, and observability, so developers can focus solely on the business logic. Ideally, the workflow we build is also serverless so there is no operational overhead. Step Functions is an ideal choice to automate our data pipeline. It allows us to integrate the ELT components we have built on AWS Glue and Amazon Redshift and conduct some steps in parallel to optimize performance.

  1. On the Step Functions console, create a new state machine.
  2. Select Write your workflow in code.

  1. Enter the stepfunction_workflow.json code into the definition, replacing all placeholders with the appropriate values:
    1. [REDSHIFT-CLUSTER-IDENTIFIER] – Use the value for ClusterName (from the Outputs tab in the CloudFormation stack).
    2. [REDSHIFT-DATABASE] – Use the value for salesdwh (unless changed, this is the default database in the CloudFormation template).

We use the Step Functions IAM role from the CloudFormation template.

This JSON code generates the following pipeline.

Starting from the top, the workflow contains the following steps:

  1. We archive any existing raw and data lake files.
  2. We add two AWS Glue StartJobRun tasks that run sequentially: first to export the data from DynamoDB to our raw bucket, then from the raw bucket to our data lake.
  3. After that, we parallelize the landing of data from Amazon S3 to Amazon Redshift.
  4. We transform and load the data into our data warehouse using the Amazon Redshift Data API. Because this is asynchronous, we need to check the status of the runs before moving down the pipeline.
  5. After we move the data load from landing to staging, we truncate the landing tables.
  6. We load the dimensions of our target data warehouse (dwh) first, and finally we load our single fact table with its foreign key dependency on the preceding dimension tables.

The following figure illustrates a successful run.

After we set up the workflow, we can use EventBridge to schedule a daily midnight run, where the target is a Step Functions StartExecution API calling our state machine. Under the workflow permissions, choose Create a new role for this schedule and optionally rename it.

Query the data warehouse

We can verify the data has been successfully loaded into Amazon Redshift with a query.

After we have the data loaded into Amazon Redshift, we’re ready to answer the query asked at the start of this post: what is the year-on-year quarterly sales growth by product and country? The query looks like the following code (depending on your dataset, you may need to select alternative years and quarters):

with sales2021q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2021q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2021 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  ),
sales2022q2
as
(
  select d.year, d.quarter,a.country,p.category,sum(o.price) as revenue2022q2
  from dwh.order o
  join dwh.date d on o.datekey=d.datekey
  join dwh.product p on o.productkey=p.productkey
  join dwh.address a on a.addresskey=o.addresskey
  where d.year=2022 and d.quarter=2
  group by d.year, d.quarter,a.country,p.category
  )

select a.country,a.category, ((revenue2022q2 - revenue2021q2)/revenue2021q2)*100 as quarteronquartergrowth
from sales2022q2 a
join sales2021q2 b on a.country=b.country and a.category=b.category
order by a.country,a.category

We can visualize the results in Amazon Redshift Query Editor V2 by toggling the chart option and setting Type as Pie, Values as quarteronquartergrowth, and Labels as category.

Cost considerations

We give a brief outline of the indicative costs associated with the key services covered in our solution based on us-east-1 Region pricing using the AWS Pricing Calculator:

  • DynamoDB – With on-demand settings for 1.5 million items (average size of 355 bytes) and associated write and read capacity plus PITR storage, the cost of DynamoDB is approximately $2 per month.
  • AWS Glue DynamoDB export connector – This connector utilizes the DynamoDB export to Amazon S3 feature. This has no hourly cost—you only pay for the gigabytes of data exported to Amazon S3 ($0.11 per GiB).
  • Amazon S3 – You pay for storing objects in your S3 buckets. The rate you’re charged depends on your objects’ size, how long you stored the objects during the month, and the storage class. In our solution, we used S3 Standard for our data lake and S3 Standard – Infrequent Access for archive. Standard-IA storage is $0.0125 per GB/month; Standard storage is $0.023 per GB/month.
  • AWS Glue Jobs – With AWS Glue, you only pay for the time your ETL job takes to run. There are no resources to manage, no upfront costs, and you are not charged for startup or shutdown time. AWS charges you an hourly rate based on the number of Data Processing Units (DPUs) used to run your ETL job. A single DPU provides 4 vCPU and 16 GB of memory. Every one of our nine Spark jobs uses 10 DPUs and has an average runtime of 3 minutes. This gives an approximate cost of $0.29 per job.
  • Amazon Redshift – We provisioned two RA3.4xlarge nodes for our Amazon Redshift cluster. If run on-demand, each node costs $3.26 per hour. If utilized 24/7, our monthly cost would be approximately $4,759.60. You should evaluate your workload to determine what cost savings can be achieved by using Amazon Redshift Serverless or using Amazon Redshift provisioned reserved instances.
  • Step Functions – You are charged based on the number of state transitions required to run your application. Step Functions counts a state transition as each time a step of your workflow is run. You’re charged for the total number of state transitions across all your state machines, including retries. The Step Functions free tier includes 4,000 free state transitions per month. Thereafter, it’s $0.025 per 1,000 state transitions.

Clean up

Remember to delete any resources created through the CloudFormation stack. You first need to manually empty and delete the S3 buckets. Then you can delete the CloudFormation stack using the AWS CloudFormation console or AWS Command Line Interface (AWS CLI). For instructions, refer to Clean up your “hello, world!” application and related resources.

Summary

In this post, we demonstrated how you can export data from DynamoDB to Amazon S3 and Amazon Redshift to perform advanced analytics. We built an automated data pipeline that you can use to perform a batch ELT process that can be scheduled to run daily, weekly, or monthly and can scale to handle very large workloads.

Please leave your feedback or comments in the comments section.


About the Author

Altaf Hussain is an Analytics Specialist Solutions Architect at AWS. He helps customers around the globe design and optimize their big data and data warehousing solutions.


Appendix

To extract the data from DynamoDB and load it into our Amazon Redshift database, we can use the Spark script editor and upload the files from GlueSparkJobs.zip to create each individual job necessary to perform the extract and load. If you choose to do this, remember to update, where appropriate, the account ID and Region placeholders in the scripts. Also, on the Job details tab under Advanced properties, add the Amazon Redshift connection.

Stream VPC Flow Logs to Datadog via Amazon Kinesis Data Firehose

Post Syndicated from Chaitanya Shah original https://aws.amazon.com/blogs/big-data/stream-vpc-flow-logs-to-datadog-via-amazon-kinesis-data-firehose/

It’s common to store the logs generated by customer’s applications and services in various tools. These logs are important for compliance, audits, troubleshooting, security incident responses, meeting security policies, and many other purposes. You can perform log analysis on these logs to understand users’ application behavior and patterns to make informed decisions.

When running workloads on Amazon Web Services (AWS), you need to analyze Amazon Virtual Private Cloud (Amazon VPC) Flow Logs to track the IP traffic going to and from the network interfaces for the workloads in their VPC. Analyzing VPC flow logs helps you understand how your applications are communicating over the VPC network and acts as a main source of information to the network in your VPC.

You can easily deliver data to supported destinations using the Amazon Kinesis Data Firehose integration with VPC flow logs. Kinesis Data Firehose is a fully managed service for delivering near-real-time streaming data to various destinations for storage and performing near-real-time analytics. With its extensible data transformation capabilities, you can also streamline log processing and log delivery pipelines into a single Kinesis Data Firehose delivery stream. You can perform analytics on VPC flow logs delivered from your VPC using the Kinesis Data Firehose integration with Datadog as a destination.

Datadog is a monitoring and security platform and AWS Partner Network (APN) Advanced Technology Partner with AWS Competencies in AWS Cloud Operations, DevOps, Migration, Security, Networking, Containers, and Microsoft Workloads, along with many others.

Datadog enables you to easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure. You can analyze all your AWS service logs while storing only the ones you need, generate metrics from aggregated logs to uncover, and send alerts about trends in your AWS services.

In this post, you learn how to integrate VPC flow logs with Kinesis Data Firehose and deliver it to Datadog.

Solution overview

This solution uses native integration of VPC flow logs streaming to Kinesis Data Firehose. We use a Kinesis Data Firehose delivery stream to buffer the streamed VPC flow logs to a Datadog destination endpoint in your Datadog account. You can use these logs with Datadog Log Management and Datadog Cloud SIEM to analyze the health, performance, and security of your cloud resources.

The following diagram illustrates the solution architecture.

We walk you through the following high-level steps:

  1. Link your AWS account with your Datadog account.
  2. Create the Kinesis Data Firehose stream where VPC service streams the flow logs.
  3. Create the VPC flow log subscription to Kinesis Data Firehose.
  4. Visualize VPC flow logs in the Datadog dashboard.

The account ID 123456781234 used in this post is a dummy account. It is used only for demonstration purposes.

Prerequisites

You should have the following prerequisites:

Link your AWS account with your Datadog account for AWS integration

Follow the instructions provided on the Datadog website for AWS Integration. To configure log archiving and enrich the log data sent from your AWS account with useful context, link the accounts. When you complete the linking setup, proceed to the following step.

Create a Kinesis Data Firehose stream

Now that your Datadog integration with AWS is complete, you can create a Kinesis Data Firehose delivery stream where VPC Flow Logs are streamed by following these steps:

  1. On the Amazon Kinesis console, choose Kinesis Data Firehose in the navigation pane.
  2. Choose Create delivery stream.
  3. Choose Direct PUT as the source.
  4. Set Destination as Datadog.
    Create delivery stream
  1. For Delivery stream name, enter PUT-DATADOG-DEMO.
  2. Keep Data transformation set to Disabled under Transform records.
  3. In Destination settings, for HTTP endpoint URL, choose the desired log’s HTTP endpoint based on your Region and Datadog account configuration.
    Kinesis delivery stream configuration
  4. For API key, enter your Datadog API key.

This allows your delivery stream to publish VPC Flow logs to the Datadog endpoint. API keys are unique to your organization. An API key is required by the Datadog Agent to submit metrics and events to Datadog.

  1. Set Content encoding to GZIP to reduce the size of data transferred.
  2. Set the Retry duration to 60.You can change the Retry duration value if you need to. This depends on the request handling capacity of the Datadog endpoint.
    Kinesis destination settings
    Under Buffer hints, Buffer size and Buffer interval are set with default values for Datadog integration.
    Kinesis buffer settings
  1. Under Backup settings, as mentioned in the prerequisites, choose the S3 bucket that you created to store failed logs and backup with specific prefix.
  2. Under S3 buffer hints section, set Buffer size to 5 and Buffer interval to 300.

You can change the S3 buffer size and interval based on your requirements.

  1. Under S3 compression and encryption, select GZIP for Compression for data records or another compression method of your choice.

Compressing data reduces the required storage space.

  1. Select Disabled for Encryption of the data records. You can enable encryption of the data records to secure access to your logs.
    Kinesis stream backup settings
  1. Optionally, in Advanced settings, select Enable server-side encryption for source records in delivery stream.
    You can use AWS managed keys or a CMK managed by you for the encryption type.
  1. Enable CloudWatch error logging.
  2. Choose Create or update IAM role, which is created by Kinesis Data Firehose as part of this stream.
    Kinesis stream Advanced settings
  1. Choose Next.
  2. Review your settings.
  3. Choose Create delivery stream.

Create a VPC flow logs subscription

Create a VPC flow logs subscription for the Kinesis Data Firehose delivery stream you created in the previous step:

  1. On the Amazon VPC console, choose Your VPCs.
  2. Select the VPC that you to create the flow log for.
  3. On the Actions menu, choose Create flow log.
    AWS VPCs
  1. Select All to send all flow log records to the Firehose destination.

If you want to filter the flow logs, you could alternatively select Accept or Reject.

  1. For Maximum aggregation interval, select 10 minutes or the minimum setting of 1 minute if you need the flow log data to be available for near-real-time analysis in Datadog.
  2. For Destination, select Send to Kinesis Data Firehose in the same account if the delivery stream is set up on the same account where you create the VPC flow logs.

If you want to send the data to a different account, refer to Publish flow logs to Kinesis Data Firehose.

  1. Choose an option for Log record format:
  2. If you leave Log record format as the AWS default format, the flow logs are sent as version 2 format.
  3. Alternatively, you can specify the custom fields for flow logs to capture and send it to Datadog.

For more information on log format and available fields, refer to Flow log records.

  1. Choose Create flow log.
    Create VPC Flow log

Now let’s explore the VPC flow logs in Datadog.

Visualize VPC flow logs in the Datadog dashboard

In the Logs Search option in the navigation pane, filter to source:vpc. The VPC flow logs from your VPC are in the Datadog Log Explorer and are automatically parsed so you can analyze your logs by source, destination, action, or other attributes.

Datadog Logs Dashboard

Clean up

After you test this solution, delete all the resources you created to avoid incurring future charges. Refer to the following links for instructions for deleting the resources:

Conclusion

In this post, we walked through a solution of how to integrate VPC flow logs with a Kinesis Data Firehose delivery stream, deliver it to a Datadog destination with no code, and visualize it in a Datadog dashboard. With Datadog, you can easily explore and analyze logs to gain deeper insights into the state of your applications and AWS infrastructure.

Try this new, quick, and hassle-free way of sending your VPC flow logs to a Datadog destination using Kinesis Data Firehose.


About the Author

Chaitanya Shah - AWS Chaitanya Shah is a Sr. Technical Account Manager(TAM) with AWS, based out of New York. He has over 22 years of experience working with enterprise customers. He loves to code and actively contributes to the AWS solutions labs to help customers solve complex problems. He provides guidance to AWS customers on best practices for their AWS Cloud migrations. He is also specialized in AWS data transfer and the data and analytics domain.

Accelerate data science feature engineering on transactional data lakes using Amazon Athena with Apache Iceberg

Post Syndicated from Vivek Gautam original https://aws.amazon.com/blogs/big-data/accelerate-data-science-feature-engineering-on-transactional-data-lakes-using-amazon-athena-with-apache-iceberg/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) and data sources residing in AWS, on-premises, or other cloud systems using SQL or Python. Athena is built on open-source Trino and Presto engines, and Apache Spark frameworks, with no provisioning or configuration effort required. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Apache Iceberg is an open table format for very large analytic datasets. It manages large collections of files as tables, and it supports modern analytical data lake operations such as record-level insert, update, delete, and time travel queries. Athena supports read, time travel, write, and DDL queries for Apache Iceberg tables that use the Apache Parquet format for data and the AWS Glue Data Catalog for their metastore.

Feature engineering is a process of identifying and transforming raw data (images, text files, videos, and so on), backfilling missing data, and adding one or more meaningful data elements to provide context so a machine learning (ML) model can learn from it. Data labeling is required for various use cases, including forecasting, computer vision, natural language processing, and speech recognition.

Combined with the capabilities of Athena, Apache Iceberg delivers a simplified workflow for data scientists to create new data features without needing to copy or recreate the entire dataset. You can create features using standard SQL on Athena without using any other service for feature engineering. Data scientists can reduce the time spent preparing and copying datasets, and instead focus on data feature engineering, experimentation, and analyzing data at scale.

In this post, we review the benefits of using Athena with the Apache Iceberg open table format and how it simplifies common feature engineering tasks for data scientists. We demonstrate how Athena can convert an existing table in Apache Iceberg format, then add columns, delete columns, and modify the data in the table without recreating or copying the dataset, and use these capabilities to create new features on Apache Iceberg tables.

Solution overview

Data scientists are generally accustomed to working with large datasets. Datasets are usually stored in either JSON, CSV, ORC, or Apache Parquet format, or similar read-optimized formats for fast read performance. Data scientists often create new data features, and backfill such data features with aggregated and ancillary data. Historically, this task was accomplished by creating a view on top of the table with the underlying data in Apache Parquet format, where such columns and data were added at runtime or by creating a new table with additional columns. Although this workflow is well-suited for many use cases, it’s inefficient for large datasets, because data would need to be generated at runtime or datasets would need to be copied and transformed.

Athena has introduced ACID (Atomicity, Consistency, Isolation, Durability) transaction capabilities that add INSERT, UPDATE, DELETE, MERGE, and time travel operations built on Apache Iceberg tables. These capabilities enable data scientists to create new data features and drop existing data features on existing datasets without worrying about copying or transforming the dataset or abstracting it with a view. Data scientists can focus on feature engineering work and avoid copying and transforming the datasets.

The Athena Iceberg UPDATE operation writes Apache Iceberg position delete files and newly updated rows as data files in the same transaction. You can make record corrections via a single UPDATE statement.

With the release of Athena engine version 3, the capabilities for Apache Iceberg tables are enhanced with the support for operations such as CREATE TABLE AS SELECT (CTAS) and MERGE commands that streamline the lifecycle management of your Iceberg data. CTAS makes it fast and efficient to create tables from other formats such as Apache Paquet, and MERGE INTO conditional updates, deletes, or inserts rows into an Iceberg table. A single statement can combine update, delete, and insert actions.

Prerequisites

Set up an Athena workgroup with Athena engine version 3 to use CTAS and MERGE commands with an Apache Iceberg table. To upgrade your existing Athena engine to version 3 in your Athena workgroup, follow the instructions in Upgrade to Athena engine version 3 to increase query performance and access more analytics features or refer to Changing the engine version in the Athena console.

Dataset

For demonstration, we use an Apache Parquet table that contains several million records of randomly distributed fictitious sales data from the last several years stored in an S3 bucket. Download the dataset, unzip it to your local computer, and upload it to your S3 bucket. In this post, we uploaded our dataset to s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/.

The following table shows the layout for the table customer_orders.

Column Name Data Type Description
orderkey string Order number for the order
custkey string Customer identification number
orderstatus string Status of the order
totalprice string Total price of the order
orderdate string Date of the order
orderpriority string Priority of the order
clerk string Name of the clerk who processed the order
shippriority string Priority on the shipping
name string Customer name
address string Customer address
nationkey string Customer nation key
phone string Customer phone number
acctbal string Customer account balance
mktsegment string Customer market segment

Perform feature engineering

As a data scientist, we want to perform feature engineering on the customer orders data by adding calculated one year total purchases and one year average purchases for each customer in the existing dataset. For demonstration purposes, we created the customer_orders table in the sampledb database using Athena as shown in the following DDL command. (You can use any of your existing datasets and follow the steps mentioned in this post.) The customer_orders dataset was generated and stored in the S3 bucket location s3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/ in Parquet format. This table is not an Apache Iceberg table.

CREATE EXTERNAL TABLE sampledb.customer_orders(
  `orderkey` string, 
  `custkey` string, 
  `orderstatus` string, 
  `totalprice` string, 
  `orderdate` string, 
  `orderpriority` string, 
  `clerk` string, 
  `shippriority` string, 
  `name` string, 
  `address` string, 
  `nationkey` string, 
  `phone` string, 
  `acctbal` string, 
  `mktsegment` string)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://sample-iceberg-datasets-xxxxxxxxxxx/sampledb/orders_and_customers/'
TBLPROPERTIES (
  'classification'='parquet');

Validate the data in the table by running a query:

SELECT * 
from sampledb.customer_orders 
limit 10;

We want to add new features to this table to get a deeper understanding of customer sales, which can result in faster model training and more valuable insights. To add new features to the dataset, convert the customer_orders Athena table to Apache Iceberg table on Athena. Issue a CTAS query statement to create a new table with Apache Iceberg format from the customer_orders table. While doing so, a new feature is added to get the total purchase amount in the past year (max year of the dataset) by each customer.

In the following CTAS query, a new column named one_year_sales_aggregate with the default value as 0.0 of data type double is added and table_type is set to ICEBERG:

CREATE TABLE  sampledb.customers_orders_aggregate
WITH (table_type = 'ICEBERG',
   format = 'PARQUET', 
   location = 's3://sample-iceberg-datasets-xxxxxxxxxxxx/sampledb/customer_orders_aggregate', 
   is_external = false
   ) 
AS 
SELECT 
orderkey,
custkey,
orderstatus,
totalprice,
orderdate, 
orderpriority, 
clerk, 
shippriority, 
name, 
address, 
nationkey, 
phone, 
acctbal, 
mktsegment,
0.0 as one_year_sales_aggregate
from sampledb.customer_orders;

Issue the following query to verify the data in the Apache Iceberg table with the new column one_year_sales_aggregate values as 0.0:

SELECT custkey, totalprice, one_year_sales_aggregate 
from sampledb.customers_orders_aggregate 
limit 10;

We want to populate the values for the new feature one_year_sales_aggregate in the dataset to get the total purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the Apache Iceberg table using Athena to populate values for the one_year_sales_aggregate feature:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y ') as    orderdate, 
            sum(CAST(totalprice as double)) as one_year_sales_aggregate
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y ') = (select date_format(max(CAST(orderdate as date)), '%Y ') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y ')) sales_one_year_agg
    ON (coa.custkey = sales_one_year_agg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_aggregate = sales_one_year_agg.one_year_sales_aggregate;

Issue the following query to validate the updated value for total spend by each customer in the past year:

SELECT custkey, totalprice, one_year_sales_aggregate
from sampledb.customers_orders_aggregate limit 10;

We decide to add another feature onto an existing Apache Iceberg table to compute and store the average purchase amount in the past year by each customer. Issue an ALTER query statement to add a new column to an existing table for feature one_year_sales_average:

ALTER TABLE sampledb.customers_orders_aggregate
ADD COLUMNS (one_year_sales_average double);

Before populating the values to this new feature, you can set the default value for the feature one_year_sales_average to 0.0. Using the same Apache Iceberg table on Athena, issue an UPDATE query statement to populate the value for the new feature as 0.0:

UPDATE sampledb.customers_orders_aggregate
SET one_year_sales_average = 0.0;

Issue the following query to verify the updated value for average spend by each customer in the past year is set to 0.0:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Now we want to populate the values for the new feature one_year_sales_average in the dataset to get the average purchase amount for each customer based on their purchases in the past year (max year of the dataset). Issue a MERGE query statement to the existing Apache Iceberg table on Athena using the Athena engine to populate values for the feature one_year_sales_average:

MERGE INTO sampledb.customers_orders_aggregate coa USING 
    (select custkey, 
            date_format(CAST(orderdate as date), '%Y') as orderdate, 
            avg(CAST(totalprice as double)) as one_year_sales_average
    FROM sampledb.customers_orders_aggregate o
    where date_format(CAST(o.orderdate as date), '%Y') = (select date_format(max(CAST(orderdate as date)), '%Y') from sampledb.customers_orders_aggregate)
    group by custkey, date_format(CAST(orderdate as date), '%Y')) sales_one_year_avg
    ON (coa.custkey = sales_one_year_avg.custkey)
    WHEN MATCHED
        THEN UPDATE SET one_year_sales_average = sales_one_year_avg.one_year_sales_average;

Issue the following query to verify the updated values for average spend by each customer:

SELECT custkey, orderdate, totalprice, one_year_sales_aggregate, one_year_sales_average 
from sampledb.customers_orders_aggregate 
limit 10;

Once additional data features have been added to the dataset, data scientists generally proceed to train ML models and make inferences using Amazon Sagemaker or equivalent toolset.

Conclusion

In this post, we demonstrated how to perform feature engineering using Athena with Apache Iceberg. We also demonstrated using the CTAS query to create an Apache Iceberg table on Athena from an existing dataset in Apache Parquet format, adding new features in an existing Apache Iceberg table on Athena using the ALTER query, and using UPDATE and MERGE query statements to update the feature values of existing columns.

We encourage you to use CTAS queries to create tables quickly and efficiently, and use the MERGE query statement to synchronize tables in one step to simplify data preparations and update tasks when transforming the features using Athena with Apache Iceberg. If you have comments or feedback, please leave them in the comments section.


About the Authors

Vivek Gautam is a Data Architect with specialization in data lakes at AWS Professional Services. He works with enterprise customers building data products, analytics platforms, and solutions on AWS. When not building and designing modern data platforms, Vivek is a food enthusiast who also likes to explore new travel destinations and go on hikes.

Mikhail Vaynshteyn is a Solutions Architect with Amazon Web Services. Mikhail works with healthcare and life sciences customers to build solutions that help improve patients’ outcomes. Mikhail specializes in data analytics services.

Naresh Gautam is a Data Analytics and AI/ML leader at AWS with 20 years of experience, who enjoys helping customers architect highly available, high-performance, and cost-effective data analytics and AI/ML solutions to empower customers with data-driven decision-making. In his free time, he enjoys meditation and cooking.

Harsha Tadiparthi is a specialist Principal Solutions Architect, Analytics at AWS. He enjoys solving complex customer problems in databases and analytics and delivering successful outcomes. Outside of work, he loves to spend time with his family, watch movies, and travel whenever possible.

Enable remote reads from Azure ADLS with SAS tokens using Spark in Amazon EMR

Post Syndicated from Kiran Anand original https://aws.amazon.com/blogs/big-data/enable-remote-reads-from-azure-adls-with-sas-tokens-using-spark-in-amazon-emr/

Organizations use data from many sources to understand, analyze, and grow their business. These data sources are often spread across various public cloud providers. Enterprises may also expand their footprint by mergers and acquisitions, and during such events they often end up with data spread across different public cloud providers. These scenarios can create the need for AWS services to remotely access, in an ad hoc and temporary fashion, data stored in another public cloud provider such as Microsoft Azure to enable business as usual or facilitate a transition.

In such scenarios, data scientists and analysts are presented with a unique challenge when working to complete a quick data analysis because data typically has to be duplicated or migrated to a centralized location. Doing so introduces time delays, increased cost, and higher complexity as pipelines or replication processes are stood up by data engineering teams. In the end, the data may not even be needed, resulting in further loss of resources and time. Having quick, secure, and constrained access to the maximum amount of data is critical for enterprises to improve decision-making. Amazon EMR, with its open-source Hadoop modules and support for Apache Spark and Jupyter and JupyterLab notebooks, is a good choice to solve this multi-cloud data access problem.

Amazon EMR is a top-tier cloud big data solution for petabyte-scale data processing, interactive analytics, and machine learning using open-source frameworks such as Apache Spark, Apache Hive, and Presto. Amazon EMR Notebooks, a managed environment based on Jupyter and JupyterLab notebooks, enables you to interactively analyze and visualize data, collaborate with peers, and build applications using EMR clusters running Apache Spark.

In this post, we demonstrate how to set up quick, constrained, and time-bound authentication and authorization to remote data sources in Azure Data Lake Storage (ADLS) using a shared access signature (SAS) when running Apache Spark jobs via EMR Notebooks attached to an EMR cluster. This access enables data scientists and data analysts to access data directly when operating in multi-cloud environments and join datasets in Amazon Simple Storage Service (Amazon S3) with datasets in ADLS using AWS services.

Overview of solution

Amazon EMR inherently includes Apache Hadoop at its core and integrates other related open-source modules. The hadoop-aws and hadoop-azure modules provide support for AWS and Azure integration, respectively. For ADLS Gen2, the integration is done through the abfs connector, which supports reading and writing data in ADLS. Azure provides various options to authorize and authenticate requests to storage, including SAS. With SAS, you can grant restricted access to ADLS resources over a specified time interval (maximum of 7 days). For more information about SAS, refer to Delegate access by using a shared access signature.

Out of the box, Amazon EMR doesn’t have the required libraries and configurations to connect to ADLS directly. There are different methods to connect Amazon EMR to ADLS, and they all require custom configurations. In this post, we focus specifically on connecting from Apache Spark in Amazon EMR using SAS tokens generated for ADLS. The SAS connectivity is possible in Amazon EMR version 6.9.0 and above, which bundles hadoop-common 3.3.0 where support for HADOOP-16730 has been implemented. However, although the hadoop-azure module provides a SASTokenProvider interface, it is not yet implemented as a class. For accessing ADLS using SAS tokens, this interface should be implemented as a custom class JAR and presented as a configuration within the EMR cluster.

You can find a sample implementation of the SASTokenProvider interface on GitHub. In this post, we use this sample implementation of the SASTokenProvider interface and package it as a JAR file that can be added directly to an EMR environment on version 6.9.0 and above. To enable the JAR, a set of custom configurations are required on Amazon EMR that enable the SAS token access to ADLS. The provided JAR needs to be added to the HADOOP_CLASSPATH, and then the HADOOP_CLASSPATH needs to be added to the SPARK_DIST_CLASSPATH. This is all handled in the sample AWS CloudFormation template provided with this post. At a high level, the CloudFormation template deploys the Amazon EMR cluster with the custom configurations and has a bootstrapping script that stages the JAR on the nodes of the EMR cluster. The CloudFormation template also stages a sample Jupyter notebook and datasets into an S3 bucket. When the EMR cluster is ready, the EMR notebook needs to be attached to it and the sample Jupyter notebook loaded. After the SAS token configurations are done in the notebook, we can start reading data remotely from ADLS by running the cells within the notebook. The following diagram provides a high-level view of the solution architecture.

Architecture Overview

We walk through the following high-level steps to implement the solution:

  1. Create resources using AWS CloudFormation.
  2. Set up sample data on ADLS and create a delegated access with an SAS token.
  3. Store the SAS token securely in AWS Secrets Manager.
  4. Deploy an EMR cluster with the required configurations to securely connect and read data from ADLS via the SAS token.
  5. Create an EMR notebook and attach it to the launched EMR cluster.
  6. Read data via Spark from ADLS within the JupyterLab notebook.

For this setup, data is going over the public internet, which is not a best practice nor an AWS recommendation, but it’s sufficient to showcase the Amazon EMR configurations that enable remote reads from ADLS. Solutions such as AWS Direct Connect or AWS Site-to-Site VPN should be utilized to secure data traffic in enterprise deployments.

For an AWS Command Line Interface (AWS CLI)-based deployment example, refer to the appendix at the end of this post.

Prerequisites

To get this solution working, we have a set of prerequisites for both AWS and Microsoft Azure:

  • An AWS account that can create AWS Identity and Access Management (IAM) resources with custom names and has access enabled for Amazon EMR, Amazon S3, AWS CloudFormation, and Secrets Manager.
  • The old Amazon EMR console enabled.
  • An Azure account with a storage account and container.
  • Access to blob data in ADLS with Azure AD credentials. The user must have the required role assignments in Azure. Refer to Assign an Azure role for more details.

We are following the best practice of using Azure AD credentials to create a user delegation SAS when applications need access to data storage using shared access signature tokens. In this post, we create and use a user delegation SAS with read and list permissions for ADLS access. For more information about creating SAS tokens using the Azure portal, refer to Use the Azure portal.

Before we generate the user delegation SAS token, we need to ensure the credential that will be used to generate the token has appropriate permissions to access data on the ADLS storage account. Requests submitted to the ADLS account using the user delegation SAS token are authorized with the Azure AD credentials that were used to create the SAS token.

The following minimum Azure role-based access control is required at the storage account level to access the data on ADLS storage:

  • Reader – Allow viewing resources such as listing the Azure storage account and its configuration
  • Storage Blob Data Reader – Allow reading and listing Azure storage containers and blobs
  • Storage Blob Delegator – In addition to the permissions to access the data on the ADLS account, you also need this role to generate a user delegation SAS token

Create an EMR cluster and S3 artifacts with AWS CloudFormation

To create the supported version of an EMR cluster with the required SAS configurations and stage all the required artifacts in Amazon S3, complete the following steps:

  1. Sign in to the AWS Management Console in your Region (for this post, we use us-east-1).
  2. Choose Launch Stack to deploy the CloudFormation template:
  3. Choose Next.

Create Stack

  1. For Stack name, enter an appropriate lowercase name.
  2. For EmrRelease, leave as default.

As of this writing, the stack has been tested against 6.9.0 and 6.10.0.

  1. Choose Next.
    Stack Details
  2. On the next page, choose Next.
  3. Review the details and select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.
  5. Monitor the progress of the stack creation until it’s complete (about 15–20 minutes).
  6. When the stack creation is complete, navigate to the stack detail page.
  7. On the Resources tab, find the logical ID with the name S3ArtifactsBucket.
  8. Choose the link for the physical ID that starts with emr-spark-on-adls-<GUID> to be redirected to the bucket on the Amazon S3 console.
  9. On the Objects tab, open the EMR/ folder.
    S3 Bucket
  10. Open the artifacts/ folder.

There are five artifacts staged by the CloudFormation stack in this path:

  • azsastknprovider-1.0-SNAPSHOT.jar – The custom implementation of the SASTokenProvider interface.
  • EMR-Direct-Read-From-ADLS.ipynb – The Jupyter notebook that we’ll use with the EMR cluster to read data from ADLS.
  • env-staging.sh – A bash script that the Amazon EMR bootstrap process runs to stage azsastknprovider-1.0-SNAPSHOT.jar across cluster nodes.
  • Medallion_Drivers_-_Active.csv – A sample dataset that needs to be staged in the ADLS container from which we will read.
  • self-signed-certs.zip – The openSSL self-signed certificates used by AWS CloudFormation to encrypt data in transit. This example is a proof of concept demonstration only. Using self-signed certificates is not recommended and presents a potential security risk. For production systems, use a trusted certification authority (CA) to issue certificates.
  1. Select Medallion_Drivers_-_Active.csv and choose Download.
  2. Select EMR-Direct-Read-From-ADLS.ipynb and choose Download.

Create the SAS token and stage sample data

To generate the user delegation SAS token from the Azure portal, log in to the Azure portal with your account credentials and complete the following steps:

  1. Navigate to Storage account, Access Control, and choose Add role assignment.
  2. Add the following roles to your user: Reader, Storage Blob Data Reader, and Storage Blob Delegator.
  3. Navigate to Storage account, Containers, and choose the container you want to use.
  4. Under Settings in the navigation pane, choose Shared access tokens.
  5. Select User delegation key for Signing method.
  6. On the Permissions menu, select Read and List.
    ADLS Container
  7. For Start and expiry date/time, define the start and expiry times for the SAS token.
  8. Choose Generate SAS token and URL.
  9. Copy the token under Blob SAS token and save this value.
    SAS Token
  10. Choose Overview in the navigation pane.
  11. Choose Upload and upload the Medallion_Drivers_-_Active.csv file downloaded earlier.

Store the SAS token in Secrets Manager

Next, we secure the SAS token in Secrets Manager so it can be programmatically pulled from the Jupyter notebook.

  1. Open the Secrets Manager console in the same Region you have been working in (in this case, us-east-1).
  2. Choose Store a new secret.
  3. For Secret type, select Other type of secret.
  4. In the Key/value pairs section, enter a name for the key and enter the blob SAS token for the value.
  5. For Encryption key, choose the default AWS managed key.
  6. Choose Next.
    Secret Type
  7. For Secret name, enter a name for your secret.
  8. Leave the optional fields as default and choose Next.
  9. On the next page, leave the settings as default and choose Next.

Setting up a secret rotation is a best practice but out of scope for this post. You can do so via Azure RM PowerShell, which can be integrated with the Lambda rotation function from Secrets Manager.

  1. Choose Store.
  2. Refresh the Secrets section and choose your secret.
  3. In the Secret details section, copy the value for Secret ARN to use in the Jupyter notebook.
    Secret ARN

Configure an EMR notebook with the SAS token and read ADLS data

Finally, we create the EMR notebook environment, integrate the SAS token into the downloaded Jupyter notebook, and perform a read against ADLS.

  1. Open the Amazon EMR console in the same Region you have been working in (in this case. us-east-1).
  2. Under EMR on EC2 in the navigation pane, choose Clusters.
  3. In the cluster table, choose Cluster with ADLS SAS Access.
    EMR Cluster

On the Summary tab, you will find the applications deployed on the cluster.

EMR Summary Tab

On the Configurations tab, you can see the configurations deployed by the CloudFormation stack loading the customer JAR in the appropriate classpaths.

EMR Configurations Tab

  1. Under EMR on EC2 in the navigation pane, choose Notebooks.
  2. Choose Create notebook.
  3. Enter an appropriate name for the notebook for Notebook name.
  4. For Cluster, select Choose an existing cluster, then choose the cluster you created earlier.
  5. Leave all other settings as default and choose Create notebook.
    Create Notebook
  6. When the notebook environment is set up, choose Open in JupyterLab.
  7. On your local machine, navigate to where you saved the EMR-Direct-Read-From-ADLS.ipynb notebook.
  8. Drag and drop it into the left pane of the JupyterLab environment.
  9. Choose EMR-Direct-Read-From-ADLS.ipynb from the left pane and ensure that the interpreter selected for the notebook in the top-right corner is PySpark.
    Open Notebook
  10. In the notebook, under the SAS TOKEN SETUP markup cell, replace <AWS_REGION> with the Region you are using (in this case, us-east-1).
  11. In the same code cell, replace <ADLS_SECRET_MANAGER_SECRET_ARN> with your secret ARN and <SECRET_KEY> with your secret key.

You can get the secret key from Secrets Manager in the Secret value section for the secret you created earlier.

Secret Key

  1. In the code cell below the HADOOP CONFIGURATIONS markup cell, replace <YOUR_STORAGE_ACCOUNT> with your Azure storage account where the SAS token was set up earlier.
  2. In the code cell below the READ TEST DATA markup cell, replace <YOUR_CONTAINER> and <YOUR_STORAGE_ACCOUNT> with your Azure container and storage account name, respectively.
  3. On the Run menu, choose Run All Cells.
    Run All Cells

After all notebook cells run, you should see 10 rows in a tabular format containing the data coming from ADLS, which now can be used directly in the notebook or can be written to Amazon S3 for further use.

Results

Clean up

Deploying a CloudFormation template incurs cost based on the resources deployed. The EMR cluster is configured to stop after an hour of inactivity, but to avoid incurring ongoing charges and to fully clean up the environment, complete the following steps:

  1. On the Amazon EMR console, choose Notebooks in the navigation pane.
  2. Select the notebook you created and choose Delete, and wait for the delete to complete before proceeding to the next step.
  3. On the Amazon EMR console, choose Clusters in the navigation pane.
  4. Select the cluster Cluster With ADLS SAS Access and choose Terminate.
  5. On the Amazon VPC console, choose Security groups in the navigation pane.
  6. Find the ElasticMapReduce-Master-Private, ElasticMapReduce-Slave-Private, ElasticMapReduce-ServiceAccess, ElasticMapReduceEditors-Livy, and ElasticMapReduceEditors-Editor security groups attached to the VPC created by the CloudFormation stack and delete their inbound and outbound rules.
  7. Select these five security groups and on the Actions menu, choose Delete security groups.
  8. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  9. Select the stack and choose Delete.
  10. On the Secrets Manager console, choose Secrets in the navigation pane.
  11. Select the stored SAS secret and on the Actions menu, choose Delete secret.
  12. On the IAM console, choose Roles in the navigation pane.
  13. Select the role EMR_Notebooks_DefaultRole and choose Delete.

Conclusion

In this post, we used AWS CloudFormation to deploy an EMR cluster with the appropriate configurations to connect to Azure Data Lake Storage using SAS tokens over the public internet. We provided an implementation of the SASTokenProvider interface to enable the SAS token-based connectivity to ADLS. We also provided relevant information on the SAS token creation steps on the Azure side. Furthermore, we showed how data scientists and analysts can use EMR notebooks connected to an EMR cluster to read data directly from ADLS with a minimum set of configurations. Finally, we used Secrets Manager to secure the storage of the SAS token and integrated it within the EMR notebook.

We encourage you to review the CloudFormation stack YAML template and test the setup on your own. If you implement the example and run into any issues or just have feedback, please leave a comment.


Appendix

AWS CLI-based deployment model

If you prefer to use command line options, this section provides AWS CLI commands to deploy this solution. Note that this is an alternative deployment model different from the CloudFormation template provided in the previous sections. Sample scripts and commands provided here include placeholders for values that need to be updated to suit your environment. The AWS CLI commands provided in this section should be used as guidance to understand the deployment model. Update the commands as needed to follow all the security procedures required by your organization.

Prerequisites for an AWS CLI-based deployment

The following are the assumptions made while using this AWS CLI-based deployment:

  • You will be deploying this solution in an existing AWS environment that has all the necessary security configurations enabled
  • You already have an Azure environment where you have staged the data that needs to be accessed through AWS services

You must also complete additional requirements on the AWS and Azure sides.

For AWS, complete the following prerequisites:

  1. Install the AWS CLI on your local computer or server. For instructions, see Installing, updating, and uninstalling the AWS CLI.
  2. Create an Amazon Elastic Compute Cloud (Amazon EC2) key pair for SSH access to your Amazon EMR nodes. For instructions, see Create a key pair using Amazon EC2.
  3. Create an S3 bucket to store the EMR configuration files, bootstrap shell script, and custom JAR file. Make sure that you create a bucket in the same Region as where you plan to launch your EMR cluster.
  4. Copy and save the SAS token from ADLS to use in Amazon EMR.

For Azure, complete the following prerequisites:

  1. Generate the user delegation SAS token on the ADLS container where your files are present, with the required levels of access granted. In this post, we are use SAS tokens with only read and list access.
  2. Copy and save the generated SAS token to use with Amazon EMR.

Update configurations

We have created a custom class that implements the SASTokenProvider interface and created a JAR file called azsastknprovider-1.0-SNAPSHOT.jar, which is provided as a public artifact for this post. A set of configurations are required on the Amazon EMR side to use the SAS tokens to access ADLS. A sample configuration file in JSON format called EMR-HadoopSpark-ADLS-SASConfig.json is also provided as a public artifact for this post. Download the JAR and sample config files.

While copying the code or commands from this post, make sure to remove any control characters or extra newlines that may get added.

  1. Create a shell script called env-staging-hadoopspark.sh to copy the custom JAR file azsastknprovider-1.0-SNAPSHOT.jar (provided in this post) to the EMR cluster nodes’ local storage during the bootstrap phase. The following code is a sample bootstrap shell script:
    #!/bin/bash
    # Stage the SASTokenProvider interface implementation jar on the local filesystem
    sudo mkdir /lib/customjars
    sudo aws s3 cp s3://<s3 bucket>/<path>/azsastknprovider-1.0-SNAPSHOT.jar /lib/customjars
    sudo chmod 755 /lib/customjars

  2. Update the bootstrap shell script to include your S3 bucket and the proper path where the custom JAR file is uploaded in your AWS environment.
  3. Upload the JAR file, config file, and the bootstrap shell script to your S3 bucket.
  4. Keep a copy of the updated configuration file EMR-HadoopSpark-ADLS-SASConfig.json locally in the same directory from where you plan to run the AWS CLI commands.

Launch the EMR cluster using the AWS CLI

We use the create-cluster command in the AWS CLI to deploy an EMR cluster. We need a bootstrap action at cluster creation to copy the custom JAR file to the EMR cluster nodes’ local storage. We also need to add a few custom configurations on Amazon EMR to connect to ADLS. For this, we need to supply a configuration file in JSON format. The following code launches and configures an EMR cluster that can connect to your Azure account and read objects in ADLS through Hadoop and Spark:

aws emr create-cluster \
--name "Spark Cluster with ADLS Access" \
--release-label emr-6.10.0 \
--applications Name=Hadoop Name=Spark \
--ec2-attributes KeyName=<key pair name> \
--instance-type m5.xlarge \
--instance-count 3 \
--use-default-roles \
--enable-debugging \
--log-uri s3://<s3 bucket>/<logs path>/ \
--configurations file://EMR-HadoopSpark-ADLS-SASConfig.json \
--bootstrap-actions Path="s3://<s3 bucket>/<path>/env-staging-hadoopspark.sh"

Additional configurations for Spark jobs

The following additional properties should be set inside your Spark application code to access data in ADLS through Amazon EMR. These should be set on the Spark session object used within your Spark application code.

spark.conf.set("fs.azure.account.auth.type.<azure storage account>.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.<azure storage account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.<azure storage account>.dfs.core.windows.net", "<Your SAS token from ADLS - remove the first character if it is &>")

These additional configurations can be set in the core-site.xml file for the EMR cluster. However, setting these in the application code is more secure and recommended because it won’t expose the SAS token in the Amazon EMR configurations.

Submit the Spark application in Amazon EMR using the AWS CLI

You can run a Spark application on Amazon EMR in different ways:

  • Log in to an EMR cluster node through SSH using the EC2 key pair you created earlier and then run the application using spark-submit
  • Submit a step via the console while creating the cluster or after the cluster is running
  • Use the AWS CLI to submit a step to a running cluster:
aws emr add-steps \
--cluster-id <cluster-id> \
--steps 'Type=Spark,Name=Read-ADLS-Data,ActionOnFailure=CONTINUE,Args=[s3://<s3 bucket>/<path>/<spark-application>,—deploy-mode,cluster]'

To read files in ADLS within a Spark application that is running on an EMR cluster, you need to use the abfs driver and refer to the file in the following format, just as you would have done in your Azure environment:

abfs://<azure container name>@<azure storage account>.dfs.core.windows.net/<path>/<file-name>

The following sample PySpark script reads a CSV file in ADLS using SAS tokens and writes it to Amazon S3, and can be run from the EMR cluster you created:

from pyspark import SparkContext
from pyspark.sql import SQLContext, SparkSession
spark = SparkSession.builder.appName('Read data from ADLS').getOrCreate()
spark.conf.set("fs.azure.account.auth.type.<azure storage account>.dfs.core.windows.net", "SAS")
spark.conf.set("fs.azure.sas.token.provider.type.<azure storage account>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.sas.FixedSASTokenProvider")
spark.conf.set("fs.azure.sas.fixed.token.<azure storage account>.dfs.core.windows.net", "<Your SAS token from ADLS>")
adlsDF = spark.read.csv("abfs://<azure container name>@<azure storage account>.dfs.core.windows.net/<path>/<file.csv>")
adlsDF.write.csv("s3://<s3 bucket>/<path>/")

Clean up using the AWS CLI

Delete the EMR cluster created using the delete-cluster command.


About the authors

Kiran Anand is a Principal Solutions Architect with the AWS Data Lab. He is a seasoned professional with more than 20 years of experience in information technology. His areas of expertise are databases and big data solutions for data engineering and analytics. He enjoys music, watching movies, and traveling with his family.

Andre Hass is a Sr. Solutions Architect with the AWS Data Lab. He has more than 20 years of experience in the databases and data analytics field. Andre enjoys camping, hiking, and exploring new places with his family on the weekends, or whenever he gets a chance. He also loves technology and electronic gadgets.

Stefan Marinov is a Sr. Solutions Architecture Manager with the AWS Data Lab. He is passionate about big data solutions and distributed computing. Outside of work, he loves spending active time outdoors with his family.

Hari Thatavarthy is a Senior Solutions Architect on the AWS Data Lab team. He helps customers design and build solutions in the data and analytics space. He believes in data democratization and loves to solve complex data processing-related problems.

Hao Wang is a Senior Big Data Architect at AWS. Hao actively works with customers building large scale data platforms on AWS. He has a background as a software architect on implementing distributed software systems. In his spare time, he enjoys reading and outdoor activities with his family.

AWS Glue streaming application to process Amazon MSK data using AWS Glue Schema Registry

Post Syndicated from Vivekanand Tiwari original https://aws.amazon.com/blogs/big-data/aws-glue-streaming-application-to-process-amazon-msk-data-using-aws-glue-schema-registry/

Organizations across the world are increasingly relying on streaming data, and there is a growing need for real-time data analytics, considering the growing velocity and volume of data being collected. This data can come from a diverse range of sources, including Internet of Things (IoT) devices, user applications, and logging and telemetry information from applications, to name a few. By harnessing the power of streaming data, organizations are able to stay ahead of real-time events and make quick, informed decisions. With the ability to monitor and respond to real-time events, organizations are better equipped to capitalize on opportunities and mitigate risks as they arise.

One notable trend in the streaming solutions market is the widespread use of Apache Kafka for data ingestion and Apache Spark for streaming processing across industries. Amazon Managed Streaming for Apache Kafka (Amazon MSK) is a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data.

However, as data volume and velocity grow, organizations may need to enrich their data with additional information from multiple sources, leading to a constantly evolving schema. The AWS Glue Schema Registry addresses this complexity by providing a centralized platform for discovering, managing, and evolving schemas from diverse streaming data sources. Acting as a bridge between producer and consumer apps, it enforces the schema, reduces the data footprint in transit, and safeguards against malformed data.

To process data effectively, we turn to AWS Glue, a serverless data integration service that provides an Apache Spark-based engine and offers seamless integration with numerous data sources. AWS Glue is an ideal solution for running stream consumer applications, discovering, extracting, transforming, loading, and integrating data from multiple sources.

This post explores how to use a combination of Amazon MSK, the AWS Glue Schema Registry, AWS Glue streaming ETL jobs, and Amazon Simple Storage Service (Amazon S3) to create a robust and reliable real-time data processing platform.

Overview of solution

In this streaming architecture, the initial phase involves registering a schema with the AWS Glue Schema Registry. This schema defines the data being streamed while providing essential details like columns and data types, and a table is created in the AWS Glue Data Catalog based on this schema. This schema serves as a single source of truth for producer and consumer and you can leverage the schema evolution feature of AWS Glue Schema Registry to keep it consistent as the data changes over time. Refer appendix section for more information on this feature. The producer application is able to retrieve the schema from the Schema Registry, and uses it to serialize the records into the Avro format and ingest the data into an MSK cluster. This serialization ensures that the records are properly structured and ready for processing.

Next, an AWS Glue streaming ETL (extract, transform, and load) job is set up to process the incoming data. This job extracts data from the Kafka topics, deserializes it using the schema information from the Data Catalog table, and loads it into Amazon S3. It’s important to note that the schema in the Data Catalog table serves as the source of truth for the AWS Glue streaming job. Therefore, it’s crucial to keep the schema definition in the Schema Registry and the Data Catalog table in sync. Failure to do so may result in the AWS Glue job being unable to properly deserialize records, leading to null values. To avoid this, it’s recommended to use a data quality check mechanism to identify such anomalies and take appropriate action in case of unexpected behavior. The ETL job continuously consumes data from the Kafka topics, so it’s always up to date with the latest streaming data. Additionally, the job employs checkpointing, which keeps track of the processed records and allows it to resume processing from where it left off in the event of a restart. For more information about checkpointing, see the appendix at the end of this post.

After the processed data is stored in Amazon S3, we create an AWS Glue crawler to create a Data Catalog table that acts as a metadata layer for the data. The table can be queried using Amazon Athena, a serverless, interactive query service that enables running SQL-like queries on data stored in Amazon S3.

The following diagram illustrates our solution architecture.

architecture diagram

For this post, we are creating the solution resources in the us-east-1 region using AWS CloudFormation templates. In the following sections, we will show you how to configure your resources and implement the solution.

Prerequisites

Create and download a valid key to SSH into an Amazon Elastic Compute Cloud (Amazon EC2) instance from your local machine. For instructions, see Create a key pair using Amazon EC2.

Configure resources with AWS CloudFormation

To create your solution resources, complete the following steps:

  1. Launch the stack vpc-subnet-and-mskclient using the CloudFormation template vpc-subnet-and-mskclient.template. This stack creates an Amazon VPC, private and public subnets, security groups, interface endpoints, an S3 bucket, an AWS Secrets Manager secret, and an EC2 instance.
    launch stack 1
  2. Provide parameter values as listed in the following table.

    Parameters Description
    EnvironmentName Environment name that is prefixed to resource names.
    VpcCIDR IP range (CIDR notation) for this VPC.
    PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone.
    PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone.
    PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone.
    PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone.
    KeyName Key pair name used to log in to the EC2 instance.
    SshAllowedCidr CIDR block for allowing SSH connection to the instance. Check your public IP using http://checkip.amazonaws.com/ and add /32 at the end of the IP address.
    InstanceType Instance type for the EC2 instance.
  3. When stack creation is complete, retrieve the EC2 instance PublicDNS and S3 bucket name (for key BucketNameForScript) from the stack’s Outputs tab.Cloudformation stack 1 - output
  4. Log in to the EC2 instance using the key pair you created as a prerequisite.
  5. Clone the GitHub repository, and upload the ETL script from the glue_job_script folder to the S3 bucket created by the CloudFormation template:
    $ git clone https://github.com/aws-samples/aws-glue-msk-with-schema-registry.git 
    $ cd aws-glue-msk-with-schema-registry 
    $ aws s3 cp glue_job_script/mskprocessing.py s3://{BucketNameForScript}/

  6. Launch another stack amazon-msk-and-glue using template amazon-msk-and-glue.template. This stack creates an MSK cluster, schema registry, schema definition, database, table, AWS Glue crawler, and AWS Glue streaming job.
    launch stack 1
  7. Provide parameter values as listed in the following table.

    Parameters Description Sample value
    EnvironmentName Environment name that is prefixed to resource names. amazon-msk-and-glue
    VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
    PrivateSubnet1 Subnet used for creating the MSK cluster and AWS Glue connection. Refer to the first stack’s output.
    PrivateSubnet2 Second subnet for the MSK cluster. Refer to the first stack’s output.
    SecretArn Secrets Manager secret ARN for Amazon MSK SASL/SCRAM authentication. Refer to the first stack’s output.
    SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
    AvailabilityZoneOfPrivateSubnet1 Availability Zone for the first private subnet used for the AWS Glue connection.
    SchemaRegistryName Name of the AWS Glue schema registry. test-schema-registry
    MSKSchemaName Name of the schema. test_payload_schema
    GlueDataBaseName Name of the AWS Glue Data Catalog database. test_glue_database
    GlueTableName Name of the AWS Glue Data Catalog table. output
    ScriptPath AWS Glue ETL script absolute S3 path. For example, s3://bucket-name/mskprocessing.py. Use the target S3 path from the previous steps.
    GlueWorkerType Worker type for AWS Glue job. For example, Standard, G.1X, G.2X, G.025X. G.1X
    NumberOfWorkers Number of workers in the AWS Glue job. 5
    S3BucketForOutput Bucket name for writing data from the AWS Glue job. aws-glue-msk-output-{accId}-{region}
    TopicName MSK topic name that needs to be processed. test

    The stack creation process can take around 15–20 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

    Cloudformation stack 2 output

    The following table summarizes the resources that are created as a part of this post.

    Logical ID Type
    VpcEndoint AWS::EC2::VPCEndpoint
    VpcEndoint AWS::EC2::VPCEndpoint
    DefaultPublicRoute AWS::EC2::Route
    EC2InstanceProfile AWS::IAM::InstanceProfile
    EC2Role AWS::IAM::Role
    InternetGateway AWS::EC2::InternetGateway
    InternetGatewayAttachment AWS::EC2::VPCGatewayAttachment
    KafkaClientEC2Instance AWS::EC2::Instance
    KeyAlias AWS::KMS::Alias
    KMSKey AWS::KMS::Key
    KmsVpcEndoint AWS::EC2::VPCEndpoint
    MSKClientMachineSG AWS::EC2::SecurityGroup
    MySecretA AWS::SecretsManager::Secret
    PrivateRouteTable1 AWS::EC2::RouteTable
    PrivateSubnet1 AWS::EC2::Subnet
    PrivateSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PrivateSubnet2 AWS::EC2::Subnet
    PrivateSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PublicRouteTable AWS::EC2::RouteTable
    PublicSubnet1 AWS::EC2::Subnet
    PublicSubnet1RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    PublicSubnet2 AWS::EC2::Subnet
    PublicSubnet2RouteTableAssociation AWS::EC2::SubnetRouteTableAssociation
    S3Bucket AWS::S3::Bucket
    S3VpcEndoint AWS::EC2::VPCEndpoint
    SecretManagerVpcEndoint AWS::EC2::VPCEndpoint
    SecurityGroup AWS::EC2::SecurityGroup
    SecurityGroupIngress AWS::EC2::SecurityGroupIngress
    VPC AWS::EC2::VPC
    BootstrapBrokersFunctionLogs AWS::Logs::LogGroup
    GlueCrawler AWS::Glue::Crawler
    GlueDataBase AWS::Glue::Database
    GlueIamRole AWS::IAM::Role
    GlueSchemaRegistry AWS::Glue::Registry
    MSKCluster AWS::MSK::Cluster
    MSKConfiguration AWS::MSK::Configuration
    MSKPayloadSchema AWS::Glue::Schema
    MSKSecurityGroup AWS::EC2::SecurityGroup
    S3BucketForOutput AWS::S3::Bucket
    CleanupResourcesOnDeletion AWS::Lambda::Function
    BootstrapBrokersFunction AWS::Lambda::Function

Build and run the producer application

After successfully creating the CloudFormation stack, you can now proceed with building and running the producer application to publish records on MSK topics from the EC2 instance, as shown in the following code. Detailed instructions including supported arguments and their usage are outlined in the README.md page in the GitHub repository.

$ cd amazon_msk_producer 
$ mvn clean package 
$ BROKERS={OUTPUT_VAL_OF_MSKBootstrapServers – Ref. Step 6}
$ REGISTRY_NAME={VAL_OF_GlueSchemaRegistryName - Ref. Step 6}
$ SCHEMA_NAME={VAL_OF_SchemaName– Ref. Step 6}
$ TOPIC_NAME="test"
$ SECRET_ARN={OUTPUT_VAL_OF_SecretArn – Ref. Step 3}
$ java -jar target/amazon_msk_producer-1.0-SNAPSHOT-jar-with-dependencies.jar -brokers $BROKERS -secretArn $SECRET_ARN -region us-east-1 -registryName $REGISTRY_NAME -schema $SCHEMA_NAME -topic $TOPIC_NAME -numRecords 10

If the records are successfully ingested into the Kafka topics, you may see a log similar to the following screenshot.

kafka log

Grant permissions

Confirm if your AWS Glue Data Catalog is being managed by AWS Lake Formation and grant necessary permissions. To check if Lake Formation is managing the permissions for the newly created tables, we can navigate to the Settings page on the Lake Formation console, or we can use the Lake Formation CLI command get-data-lake-settings.

If the check boxes on the Lake Formation Data Catalog settings page are unselected (see the following screenshot), that means that the Data Catalog permissions are being managed by LakeFormation.

Lakeformation status

If using the Lake Formation CLI, check if the values of CreateDatabaseDefaultPermissions and CreateTableDefaultPermissions are NULL in the output. If so, this confirms that the Data Catalog permissions are being managed by AWS Lake Formation.

If we can confirm that the Data Catalog permissions are being managed by AWS Lake Formation, we have to grant DESCRIBE and CREATE TABLE permissions for the database, and SELECT, ALTER, DESCRIBE and INSERT permissions for the table to the AWS Identity and Access Management role (IAM role) used by AWS Glue streaming ETL job before starting the job. Similarly, we have to grant DESCRIBE permissions for the database and DESCRIBE AND SELECT permissions for the table to the IAM principals using Amazon Athena to query the data. We can get the AWS Glue service IAM role, database, table, streaming job name, and crawler names from the Outputs tab of the CloudFormation stack amazon-msk-and-glue. For instructions on granting permissions via AWS Lake Formation, refer to Granting Data Catalog permissions using the named resource method.

Run the AWS Glue streaming job

To process the data from the MSK topic, complete the following steps:

  1. Retrieve the name of the AWS Glue streaming job from the amazon-msk-and-glue stack output.
  2. On the AWS Glue console, choose Jobs in the navigation pane.
  3. Choose the job name to open its details page.
  4. Choose Run job to start the job.

Because this is a streaming job, it will continue to run indefinitely until manually stopped.

Run the AWS Glue crawler

Once AWS Glue streaming job starts processing the data, you can use the following steps to check the processed data, and create a table using AWS Glue Crawler to query it

  1. Retrieve the name of the output bucket S3BucketForOutput from the stack output and validate if output folder has been created and contains data.
  2. Retrieve the name of the Crawler from the stack output.
  3. Navigate to the AWS Glue Console.
  4. In the left pane, select Crawlers.
  5. Run the crawler.

In this post, we run the crawler one time to create the target table for demo purposes. In a typical scenario, you would run the crawler periodically or create or manage the target table another way. For example, you could use the saveAsTable() method in Spark to create the table as part of the ETL job itself, or you could use enableUpdateCatalog=True in the AWS Glue ETL job to enable Data Catalog updates. For more information about this AWS Glue ETL feature, refer to Creating tables, updating the schema, and adding new partitions in the Data Catalog from AWS Glue ETL jobs.

Validate the data in Athena

After the AWS Glue crawler has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the crawler created.
  4. Enter a SQL query to validate the data.
  5. Run the query.

The following screenshot shows the output of our example query.

Athena output

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack amazon-msk-and-glue.
  2. Delete the CloudFormation stack vpc-subnet-and-mskclient.

Conclusion

This post provided a solution for building a robust streaming data processing platform using a combination of Amazon MSK, the AWS Glue Schema Registry, an AWS Glue streaming job, and Amazon S3. By following the steps outlined in this post, you can create and control your schema in the Schema Registry, integrate it with a data producer to ingest data into an MSK cluster, set up an AWS Glue streaming job to extract and process data from the cluster using the Schema Registry, store processed data in Amazon S3, and query it using Athena.

Let’s start using AWS Glue Schema Registry to manage schema evolution for streaming data ETL with AWS Glue. If you have any feedback related to this post, please feel free to leave them in the comments section below.

Appendix

This appendix section provides more information about Apache Spark Structured Streaming Checkpointing feature and a brief summary on how schema evolution can be handled using AWS Glue Schema Registry.

Checkpointing

Checkpointing is a mechanism in Spark streaming applications to persist enough information in a durable storage to make the application resilient and fault-tolerant. The items stored in checkpoint locations are mainly the metadata for application configurations and the state of processed offsets. Spark uses synchronous checkpointing, meaning it ensures that the checkpoint state is updated after every micro-batch run. It stores the end offset value of each partition under the offsets folder for the corresponding micro-batch run before processing, and logs the record of processed batches under the commits folder. In the event of a restart, the application can recover from the last successful checkpoint, provided the offset hasn’t expired in the source Kafka topic. If the offset has expired, we have to set the property failOnDataLoss to false so that the streaming query doesn’t fail as a result of this.

Schema evolution

As the schema of data evolves over time, it needs to be incorporated into producer and consumer applications to avert application failure due to data encoding issues. The AWS Glue Schema Registry offers a rich set of options for schema compatibility such as backward, forward, and full to update the schema in the Schema Registry. Refer to Schema versioning and compatibility for the full list.

The default option is backward compatibility, which satisfies the majority of use cases. This option allows you to delete any existing fields and add optional fields. Steps to implement schema evolution using the default compatibility are as follows:

  1. Register the new schema version to update the schema definition in the Schema Registry.
  2. Upon success, update the AWS Glue Data Catalog table using the updated schema.
  3. Restart the AWS Glue streaming job to incorporate the changes in the schema for data processing.
  4. Update the producer application code base to build and publish the records using the new schema, and restart it.

About the Authors

Author Headshot - Vivekanand TiwariVivekanand Tiwari is a Cloud Architect at AWS. He finds joy in assisting customers on their cloud journey, especially in designing and building scalable, secure, and optimized data and analytics workloads on AWS. During his leisure time, he prioritizes spending time with his family.

Author Headshot - Subramanya VajirayaSubramanya Vajiraya is a Sr. Cloud Engineer (ETL) at AWS Sydney specialized in AWS Glue. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. Outside of work, he enjoys going on bike rides and taking long walks with his dog Ollie, a 2-year-old Corgi.

Author Headshot - Akash DeepAkash Deep is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS. In his free time, he prioritizes spending quality time with his family.

Cost monitoring for Amazon EMR on Amazon EKS

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/cost-monitoring-for-amazon-emr-on-amazon-eks/

Amazon EMR is the industry-leading cloud big data solution, providing a collection of open-source frameworks such as Spark, Hive, Hudi, and Presto, fully managed and with per-second billing. Amazon EMR on Amazon EKS is a deployment option allowing you to deploy Amazon EMR on the same Amazon Elastic Kubernetes Service (Amazon EKS) clusters that is multi-tenant and used by other applications, improving resource utilization, reducing cost, and simplifying infrastructure management. EMR on EKS provide you up to 5.37 times better performance than OSS Spark v3.3.1 with 76.8% cost savings. It also provides a wide variety of job submission methods, like an AWS API called StartJobRun, or through a declarative way with a Kubernetes controller through the AWS Controllers for Kubernetes for Amazon EMR on EKS.

This consolidation comes with a trade-off of increased difficulty measuring fine-grained costs for showback or chargeback by team or application. According to a CNCF and FinOps Foundation survey, 68% of Kubernetes users either rely on monthly estimates or don’t monitor Kubernetes costs at all. And for respondents reporting active Kubernetes cost monitoring, AWS Cost Explorer and Kubecost were ranked as the most popular tools being used.

Currently, you can distribute costs per tenant using a hard multi-tenancy with separate EKS clusters in dedicated AWS accounts or a soft multi-tenancy using separate node groups in a shared EKS cluster. To reduce costs and improve resource utilization, you can use namespace-based segregation, where nodes are shared across different namespaces. However, calculating and attributing costs to teams by workload or namespaces while taking into account compute optimization (like Saving Plans or Spot Instance cost) and the cost of AWS services like EMR on EKS is a challenging and non-trivial task.

In this post, we present a cost chargeback solution for EMR on EKS that combines the AWS-native capabilities of AWS Cost and Usage Reports (AWS CUR) alongside the in-depth Kubernetes cost visibility and insights using Kubecost on Amazon EKS.

Solution overview

A job in EMR on EKS incur costs mainly on two dimensions: compute resources and a marginal uplift charge for EMR on EKS usage. To track the cost associated with each of the dimensions, we use data from three sources:

  • AWS CUR – We use this to get the EMR on EKS cost uplift per job and for Kubecost to reconcile the compute cost with any saving plans or reserved instance used. The supporting infrastructure for CUR is deployed as defined in Setting up Athena using AWS CloudFormation templates.
  • Kubecost – We use this to get the compute cost incurred by the executor and driver pods.

The cost allocation process includes the following components:

  • The compute cost is provided by Kubecost. However, in order to do an in-depth analysis, we define an hourly Kubernetes CronJob on it that starts a pod to retrieve data from Kubecost and stores it in Amazon Simple Storage Service (Amazon S3).
  • CUR files are stored in an S3 bucket.
  • We use Amazon Athena to create a view and provide a consolidated view of the total cost to run an EMR on EKS job.
  • Finally, you can connect your preferred business intelligence tools using the JDBC or ODBC connections to Athena. In this post, we use Amazon QuickSight native integration for visualization purposes.

The following diagram shows the overall architecture as well as how the different components interact with each other.

emr-eks-cost-tracking-architecture

We provide a shell script to deploy our the tracking solution. The shell script configures the infrastructure using an AWS CloudFormation template, the AWS Command Line Interface (AWS CLI), and eksctl and kubectl commands. This script runs the following actions:

  1. Start the CloudFormation deployment.
  2. Create and configure an AWS Cost and Usage Report.
  3. Configure and deploy Kubecost backed by Amazon Managed Service for Prometheus.
  4. Deploy a Kubernetes CronJob.

Prerequisites

You need the following prerequisites:

This post assumes you already have an EKS cluster and run EMR on EKS jobs. If you don’t have an EKS cluster ready to test the solution, we suggest starting with a standard EMR on EKS blueprint that configures a cluster to submit EMR on EKS jobs.

Set up the solution

To run the shell script, complete the following steps:

  1. Clone the following GitHub repository.
  2. Go to the folder cost-tracking with the following command:

cd cost-tracking

  1. Run the script with following command :

sh deploy-emr-eks-cost-tracking.sh REGION KUBECOST-VERSION EKS-CLUSTER-NAME ACCOUNT-ID

After you run the script, you’re ready to use Kubecost and the CUR data to understand the cost associated with your EMR on EKS jobs.

Tracking cost

In this section, we show you how to analyze the compute cost that is retrieved from Kubecost, how to query EMR on EKS uplift data, and how to combine them to have a single consolidated view for the cost.

Compute cost

Kubecost offers various ways to track cost per Kubernetes object. For example, you can track cost by pod, controller, job, label, or deployment. It also allows you to understand the cost of idle resources, like Amazon Elastic Compute Cloud (Amazon EC2) instances that aren’t fully utilized by pods. In this post, we assume that no nodes are provisioned if no EMR on EKS job is running, and we use the Karpenter Cluster Autoscaler to provision nodes when jobs are submitted. Karpenter also does bin packing, which optimizes the EC2 resource utilization and in turn reduces the cost of idle resources.

To track compute cost associated with EMR on EKS pods, we query the Kubecost allocation API by passing pod and labels in the aggregate parameter. We use the emr-containers.amazonaws.com/job.id and emr-containers.amazonaws.com/virtual-cluster-id labels that are always present in executor and driver pods. The labels are used to filter Kubecost data to get only the cost associated with EMR on EKS pods. You can review various levels of granularity at the pod, job, and virtual cluster level to understand the cost of a driver vs. executor, or of using Spot Instances in jobs. You can also use the virtual cluster cost to understand the overall cost of a EMR on EMR when it’s used in a namespace that is used by applications other than EMR on EKS.

We also provide the instance_id, instance size, and capacity type (On-Demand or Spot) that was used to run the pod. This is retrieved through querying the Kubecost assets API. This data can be useful to understand how you run your jobs and which capacity you use more often.

The data about the cost of running the pods as well as the assets is retrieved with a Kubernetes CronJob that submits the request to the Kubecost API, joins the two data sources (allocation and assets data) on the instance_id, cleans the data, and stores it in Amazon S3 in CSV format.

The compute cost data has multiple fields that are of interest, including cpucost, ramcost (cost of memory), pvcost (cost of Amazon EBS storage), efficiency of use of CPU and RAM, as well as total cost, which represents the aggregate cost of all the resources used, either at pod, job, or virtual cluster level.

To view this data, complete the following steps:

  1. On the Athena console, navigate to the query editor.
  2. Choose athenacurcfn_c_u_r for the database and cost_data for the table.
  3. Run the following query:
SELECT job_id,
vc_id,
sum(totalcost) as cost
FROM "athenacurcfn_c_u_r"."compute_cost"
GROUP BY job_id, vc_id

The following screenshot shows the query results.

To query the data about information at the pod level, you can run the following SQL statement:

SELECT
split_part(name, '/', 1) as pod_name,
job_id,
vc_id,
totalcost,
instance_id,
"properties.labels.node_kubernetes_io_instance_type",
capacity_type
FROM "athenacurcfn_c_u_r"."compute_cost";

EMR on EKS uplift

The cost associated with EMR on EKS uplift is available through AWS CUT and is stored in an S3 bucket. The script you ran in the setup step created an Athena table associated to the data in the S3 bucket. The following steps take you through how you can query the data:

  1. On the Athena console, navigate to the query editor.
  2. Choose athenacurcfn_c_u_r for the database and cur_data for the table.
  3. Run the following query:
SELECT
split_part(line_item_resource_id, '/', 5) as job_id,
split_part(line_item_resource_id, '/', 3) as vc_id,
sum(line_item_blended_cost) as cost
FROM athenacurcfn_c_u_r.automated
WHERE product_product_family='EMR Containers'
GROUP BY line_item_resource_id

This query provides you with the cost per job. The following screenshot shows the results.

You will have to wait up to 24 hours for the CUR data to be available. As such, you should only run the preceding query after the CUR data is available and you have run the EMR on EKS jobs.

Overall cost

To view the overall cost and perform analysis on it, create a view in Athena as follows:

CREATE VIEW emr_eks_cost AS
SELECT
split_part(line_item_resource_id, '/', 5) as job_id,
split_part(line_item_resource_id, '/', 3) as vc_id,
sum(line_item_blended_cost) as cost,
'emr-uplift' as category
FROM athenacurcfn_c_u_r.cur_data
WHERE product_product_family='EMR Containers'
GROUP BY line_item_resource_id
UNION
SELECT
job_id,
vc_id,
sum(totalCost) as cost,
'compute' as category
FROM "athenacurcfn_c_u_r"."compute_cost"
group by job_id, vc_id

Now that the view is created, you can query and analyze the cost of running your EMR on EKS jobs:

SELECT sum(cost) as total_cost, job_id, vc_id
FROM "athenacurcfn_c_u_r"."emr_eks_cost"
GROUP BY job_id, vc_id;

The following screenshot shows an example output of the query on the created view.

Lastly, you can use QuickSight for a graphical high-level view on your EMR on EKS spend. The following screenshot shows an example dashboard.

emr-eks-compute-cost-quicksight-dashboard

You can now adapt this solution to your specific needs and build your custom analysis.

Clean up

Throughout this post, you deployed and configured the required infrastructure components to track cost for your EMR on EKS workloads. To avoid incurring additional charges for this solution, delete all the resources you created:

  1. Empty the S3 buckets cost-data-REGION-ACCOUNT_ID and aws-athena-query-results-cur-REGION-ACCOUNT_ID.
  2. Delete the Athena workgroup kubecost-cur-workgroup.
  3. Empty and delete the ECR repository emreks-compute-cost-exporter.
  4. Run the script destroy-emr-eks-cost-tracking.sh, which will delete the AWS CloudFormation deployment, uninstall Kubecost, delete the CronJob, and delete the Cost and Usage Reports.

Conclusion

In this post, we showed how you can use Kubecost capabilities alongside Cost and Usage Reports to closely monitor the costs for Amazon EMR on EKS per virtual cluster or per job. This solution allows you to achieve more granular costs for chargebacks using Athena, Amazon Managed Service for Prometheus, and QuickSight.

The solution presented steps to set up Cost and Usage Reports and Kubecost, and configure a CronJob on an hourly basis to get the cost of running pods spun by EMR on EKS. You can modify the presented solution to run at longer intervals or to collect data on different EKS clusters. You can also modify the Python script run by the CronJob to further clean data or reduce the amount of data stored by eliminating fields you don’t need. You can use the insights provided to drive cost optimization efforts over time, detect any increase of costs, and measure the impact of new deployments or particular events on resource usage and cost performance. For more information about integrating EMR on EKS in your existing Amazon EKS deployment, refer to Design considerations for Amazon EMR on EKS in a multi-tenant Amazon EKS environment


About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Hamza Mimi Principal Solutions Architect in the French Public sector team at Amazon Web Services (AWS). With a long experience in the telecommunications industry. He is currently working as a customer advisor on topics ranging from digital transformation to architectural guidance.

Temporary elevated access management with IAM Identity Center

Post Syndicated from Taiwo Awoyinfa original https://aws.amazon.com/blogs/security/temporary-elevated-access-management-with-iam-identity-center/

AWS recommends using automation where possible to keep people away from systems—yet not every action can be automated in practice, and some operations might require access by human users. Depending on their scope and potential impact, some human operations might require special treatment.

One such treatment is temporary elevated access, also known as just-in-time access. This is a way to request access for a specified time period, validate whether there is a legitimate need, and grant time-bound access. It also allows you to monitor activities performed, and revoke access if conditions change. Temporary elevated access can help you to reduce risks associated with human access without hindering operational capabilities.

In this post, we introduce a temporary elevated access management solution (TEAM) that integrates with AWS IAM Identity Center (successor to AWS Single Sign-On) and allows you to manage temporary elevated access to your multi-account AWS environment. You can download the TEAM solution from AWS Samples, deploy it to your AWS environment, and customize it to meet your needs.

The TEAM solution provides the following features:

  • Workflow and approval — TEAM provides a workflow that allows authorized users to request, review, and approve or reject temporary access. If a request is approved, TEAM activates access for the requester with the scope and duration specified in the request.
  • Invoke access using IAM Identity Center — When temporary elevated access is active, a requester can use the IAM Identity Center AWS access portal to access the AWS Management Console or retrieve temporary credentials. A requester can also invoke access directly from the command line by configuring AWS Command Line Interface (AWS CLI) to integrate with IAM Identity Center.
  • View request details and session activity — Authorized users can view request details and session activity related to current and historical requests from within the application’s web interface.
  • Ability to use managed identities and group memberships — You can either sync your existing managed identities and group memberships from an external identity provider into IAM Identity Center, or manage them directly in IAM Identity Center, in order to control user authorization in TEAM. Similarly, users can authenticate directly in IAM Identity Center, or they can federate from an external identity provider into IAM Identity Center, to access TEAM.
  • A rich authorization model — TEAM uses group memberships to manage eligibility (authorization to request temporary elevated access with a given scope) and approval (authorization to approve temporary elevated access with a given scope). It also uses group memberships to determine whether users can view historical and current requests and session activity, and whether they can administer the solution. You can manage both eligibility and approval policies at different levels of granularity within your organization in AWS Organizations.

TEAM overview

You can download the TEAM solution and deploy it into the same organization where you enable IAM Identity Center. TEAM consists of a web interface that you access from the IAM Identity Center access portal, a workflow component that manages requests and approvals, an orchestration component that activates temporary elevated access, and additional components involved in security and monitoring.

Figure 1 shows an organization with TEAM deployed alongside IAM Identity Center.

Figure 1: An organization using TEAM alongside IAM Identity Center

Figure 1: An organization using TEAM alongside IAM Identity Center

Figure 1 shows three main components:

  • TEAM — a self-hosted solution that allows users to create, approve, monitor and manage temporary elevated access with a few clicks in a web interface.
  • IAM Identity Center — an AWS service which helps you to securely connect your workforce identities and manage their access centrally across accounts.
  • AWS target environment — the accounts where you run your workloads, and for which you want to securely manage both persistent access and temporary elevated access.

There are four personas who can use TEAM:

  • Requesters — users who request temporary elevated access to perform operational tasks within your AWS target environment.
  • Approvers — users who review and approve or reject requests for temporary elevated access.
  • Auditors — users with read-only access who can view request details and session activity relating to current and historical requests.
  • Admins — users who can manage global settings and define policies for eligibility and approval.

TEAM determines a user’s persona from their group memberships, which can either be managed directly in IAM Identity Center or synced from an external identity provider into IAM Identity Center. This allows you to use your existing access governance processes and tools to manage the groups and thereby control which actions users can perform within TEAM.

The following steps describe how you use TEAM during normal operations to request, approve, and invoke temporary elevated access. The steps correspond to the numbered items in Figure 1:

  1. Access the AWS access portal in IAM Identity Center (all personas)
  2. Access the TEAM application (all personas)
  3. Request elevated access (requester persona)
  4. Approve elevated access (approver persona)
  5. Activate elevated access (automatic)
  6. Invoke elevated access (requester persona)
  7. Log session activity (automatic)
  8. End elevated access (automatic; or requester or approver persona)
  9. View request details and session activity (requester, approver, or auditor persona)

In the TEAM walkthrough section later in this post, we provide details on each of these steps.

Deploy and set up TEAM

Before you can use TEAM, you need to deploy and set up the solution.

Prerequisites

To use TEAM, you first need to have an organization set up in AWS Organizations with IAM Identity Center enabled. If you haven’t done so already, create an organization, and then follow the Getting started steps in the IAM Identity Center User Guide.

Before you deploy TEAM, you need to nominate a member account for delegated administration in IAM Identity Center. This has the additional benefit of reducing the need to use your organization’s management account. We strongly recommend that you use this account only for IAM Identity Center delegated administration, TEAM, and associated services; that you do not deploy any other workloads into this account, and that you carefully manage access to this account using the principle of least privilege.

We recommend that you enforce multi-factor authentication (MFA) for users, either in IAM Identity Center or in your external identity provider. If you want to statically assign access to users or groups (persistent access), you can do that in IAM Identity Center, independently of TEAM, as described in Multi-account permissions.

Deploy TEAM

To deploy TEAM, follow the solution deployment steps in the TEAM documentation. You need to deploy TEAM in the same account that you nominate for IAM Identity Center delegated administration.

Access TEAM

After you deploy TEAM, you can access it through the IAM Identity Center web interface, known as the AWS access portal. You do this using the AWS access portal URL, which is configured when you enable IAM Identity Center. Depending on how you set up IAM Identity Center, you are either prompted to authenticate directly in IAM Identity Center, or you are redirected to an external identity provider to authenticate. After you authenticate, the AWS access portal appears, as shown in Figure 2.

Figure 2: TEAM application icon in the AWS access portal of IAM Identity Center

Figure 2: TEAM application icon in the AWS access portal of IAM Identity Center

You configure TEAM as an IAM Identity Center Custom SAML 2.0 application, which means it appears as an icon in the AWS access portal. To access TEAM, choose TEAM IDC APP.

When you first access TEAM, it automatically retrieves your identity and group membership information from IAM Identity Center. It uses this information to determine what actions you can perform and which navigation links are visible.

Set up TEAM

Before users can request temporary elevated access in TEAM, a user with the admin persona needs to set up the application. This includes defining policies for eligibility and approval. A user takes on the admin persona if they are a member of a named IAM Identity Center group that is specified during TEAM deployment.

Manage eligibility policies

Eligibility policies determine who can request temporary elevated access with a given scope. You can define eligibility policies to ensure that people in specific teams can only request the access that you anticipate they’ll need as part of their job function.

  • To manage eligibility policies, in the left navigation pane, under Administration, select Eligibility policy. Figure 3 shows this view with three eligibility policies already defined.
     
Figure 3: Manage eligibility policies

Figure 3: Manage eligibility policies

An eligibility policy has four main parts:

  • Name and Type — An IAM Identity Center user or group
  • Accounts or OUs — One or more accounts, organizational units (OUs), or both, which belong to your organization
  • Permissions — One or more IAM Identity Center permission sets (representing IAM roles)
  • Approval required — whether requests for temporary elevated access require approval.

Each eligibility policy allows the specified IAM Identity Center user, or a member of the specified group, to log in to TEAM and request temporary elevated access using the specified permission sets in the specified accounts. When you choose a permission set, you can either use a predefined permission set provided by IAM Identity Center, or you can create your own permission set using custom permissions to provide least-privilege access for particular tasks.

Note: If you specify an OU in an eligibility or approval policy, TEAM includes the accounts directly under that OU, but not those under its child OUs.

Manage approval policies

Approval policies work in a similar way as eligibility policies, except that they authorize users to approve temporary elevated access requests, rather than create them. If a specific account is referenced in an eligibility policy that is configured to require approval, then you need to create a corresponding approval policy for the same account. If there is no corresponding approval policy—or if one exists but its groups have no members — then TEAM won’t allow users to create temporary elevated access requests for that account, because no one would be able to approve them.

  • To manage approval policies, in the left navigation pane, under Administration, select Approval policy. Figure 4 shows this view with three approval policies already defined.
     
Figure 4: Manage approval policies

Figure 4: Manage approval policies

An approval policy has two main parts:

  • Id, Name, and Type — Identifiers for an account or organizational unit (OU)
  • Approver groups — One or more IAM Identity Center groups

Each approval policy allows a member of a specified group to log in to TEAM and approve temporary elevated access requests for the specified account, or all accounts under the specified OU, regardless of permission set.

Note: If you specify the same group for both eligibility and approval in the same account, this means approvers can be in the same team as requesters for that account. This is a valid approach, sometimes known as peer approval. Nevertheless, TEAM does not allow an individual to approve their own request. If you prefer requesters and approvers to be in different teams, specify different groups for eligibility and approval.

TEAM walkthrough

Now that the admin persona has defined eligibility and approval policies, you are ready to use TEAM.

To begin this walkthrough, imagine that you are a requester, and you need to perform an operational task that requires temporary elevated access to your AWS target environment. For example, you might need to fix a broken deployment pipeline or make some changes as part of a deployment. As a requester, you must belong to a group specified in at least one eligibility policy that was defined by the admin persona.

Step 1: Access the AWS access portal in IAM Identity Center

To access the AWS access portal in IAM Identity Center, use the AWS access portal URL, as described in the Access TEAM section earlier in this post.

Step 2: Access the TEAM application

To access the TEAM application, select the TEAM IDC APP icon, as described in the Access TEAM section earlier.

Step 3: Request elevated access

The next step is to create a new elevated access request as follows:

  1. Under Requests, in the left navigation pane, choose Create request.
  2. In the Elevated access request section, do the following, as shown in Figure 5:
    1. Select the account where you need to perform your task.
    2. For Role, select a permission set that will give you sufficient permissions to perform the task.
    3. Enter a start date and time, duration, ticket ID (typically representing a change ticket or incident ticket related to your task), and business justification.
    4. Choose Submit.
Figure 5: Create a new request

Figure 5: Create a new request

When creating a request, consider the following:

  • In each request, you can specify exactly one account and one permission set.
  • You can only select an account and permission set combination for which you are eligible based on the eligibility policies defined by the admin persona.
  • As a requester, you should apply the principle of least privilege by selecting a permission set with the least privilege, and a time window with the least duration, that will allow you to complete your task safely.
  • TEAM captures a ticket identifier for audit purposes only; it does not try to validate it.
  • The duration specified in a request determines the time window for which elevated access is active, if your request is approved. During this time window, you can invoke sessions to access the AWS target environment. It doesn’t affect the duration of each session.
  • Session duration is configured independently for each permission set by an IAM Identity Center administrator, and determines the time period for which IAM temporary credentials are valid for sessions using that permission set.
  • Sessions invoked just before elevated access ends might remain valid beyond the end of the approved elevated access period. If this is a concern, consider minimizing the session duration configured in your permission sets, for example by setting them to 1 hour.

Step 4: Approve elevated access

After you submit your request, approvers are notified by email. Approvers are notified when there are new requests that fall within the scope of what they are authorized to approve, based on the approval policies defined earlier.

For this walkthrough, imagine that you are now the approver. You will perform the following steps to approve the request. As an approver, you must belong to a group specified in an approval policy that the admin persona configured.

  1. Access the TEAM application in exactly the same way as for the other personas.
  2. In the left navigation pane, under Approvals, choose Approve requests. TEAM displays requests awaiting your review, as shown in Figure 6.
    • To view the information provided by the requester, select a request and then choose View details.
    Figure 6: Requests awaiting review

    Figure 6: Requests awaiting review

  3. Select a pending request, and then do one of the following:
    • To approve the request, select Actions and then choose Approve.
    • To reject the request, select Actions and then choose Reject.

    Figure 7 shows what TEAM displays when you approve a request.

    Figure 7: Approve a request

    Figure 7: Approve a request

  4. After you approve or reject a request, the original requester is notified by email.

A requester can view the status of their requests in the TEAM application.

  • To see the status of your open requests in the TEAM application, in the left navigation pane, under Requests, select My requests. Figure 8 shows this view with one approved request.
     
Figure 8: Approved request

Figure 8: Approved request

Step 5: Automatic activation of elevated access

After a request is approved, the TEAM application waits until the start date and time specified in the request and then automatically activates elevated access. To activate access, a TEAM orchestration workflow creates a temporary account assignment, which links the requester’s user identity in IAM Identity Center with the permission set and account in their request. Then TEAM notifies the requester by email that their request is active.

A requester can now view their active request in the TEAM application.

  1. To see active requests, in the left navigation pane under Elevated access, choose Active access. Figure 9 shows this view with one active request.
     
    Figure 9: Active request

    Figure 9: Active request

  2. To see further details for an active request, select a request and then choose View details. Figure 10 shows an example of these details.
    Figure 10: Details of an active request

    Figure 10: Details of an active request

Step 6: Invoke elevated access

During the time period in which elevated access is active, the requester can invoke sessions to access the AWS target environment to complete their task. Each session has the scope (permission set and account) approved in their request. There are three ways to invoke access.

The first two methods involve accessing IAM Identity Center using the AWS access portal URL. Figure 11 shows the AWS access portal while a request is active.

Figure 11: Invoke access from the AWS access portal

Figure 11: Invoke access from the AWS access portal

From the AWS access portal, you can select an account and permission set that is currently active. You’ll also see the accounts and permission sets that have been statically assigned to you using IAM Identity Center, independently of TEAM. From here, you can do one of the following:

  • Choose Management console to federate to the AWS Management Console.
  • Choose Command line or programmatic access to copy and paste temporary credentials.

The third method is to initiate access directly from the command line using AWS CLI. To use this method, you first need to configure AWS CLI to integrate with IAM Identity Center. This method provides a smooth user experience for AWS CLI users, since you don’t need to copy and paste temporary credentials to your command line.

Regardless of how you invoke access, IAM Identity Center provides temporary credentials for the IAM role and account specified in your request, which allow you to assume that role in that account. The temporary credentials are valid for the duration specified in the role’s permission set, defined by an IAM Identity Center administrator.

When you invoke access, you can now complete the operational tasks that you need to perform in the AWS target environment. During the period in which your elevated access is active, you can invoke multiple sessions if necessary.

Step 7: Log session activity

When you access the AWS target environment, your activity is logged to AWS CloudTrail. Actions you perform in the AWS control plane are recorded as CloudTrail events.

Note: Each CloudTrail event contains the unique identifier of the user who performed the action, which gives you traceability back to the human individual who requested and invoked temporary elevated access.

Step 8: End elevated access

Elevated access ends when either the requested duration elapses or it is explicitly revoked in the TEAM application. The requester or an approver can revoke elevated access whenever they choose.

When elevated access ends, or is revoked, the TEAM orchestration workflow automatically deletes the temporary account assignment for this request. This unlinks the permission set, the account, and the user in IAM Identity Center. The requester is then notified by email that their elevated access has ended.

Step 9: View request details and session activity

You can view request details and session activity for current and historical requests from within the TEAM application. Each persona can see the following information:

  • Requesters can inspect elevated access requested by them.
  • Approvers can inspect elevated access that falls within the scope of what they are authorized to approve.
  • Auditors can inspect elevated access for all TEAM requests.

Session activity is recorded based on the log delivery times provided by AWS CloudTrail, and you can view session activity while elevated access is in progress or after it has ended. Figure 12 shows activity logs for a session displayed in the TEAM application.

Figure 12: Session activity logs

Figure 12: Session activity logs

Security and resiliency considerations

The TEAM application controls access to your AWS environment, and you must manage it with great care to prevent unauthorized access. This solution is built using AWS Amplify to ease the reference deployment. Before operationalizing this solution, consider how to align it with your existing development and security practices.

Further security and resiliency considerations including setting up emergency break-glass access are available in the TEAM documentation.

Additional resources

AWS Security Partners provide temporary elevated access solutions that integrate with IAM Identity Center, and AWS has validated the integration of these partner offerings and assessed their capabilities against a common set of customer requirements. For further information, see temporary elevated access in the IAM Identity Center User Guide.

The blog post Managing temporary elevated access to your AWS environment describes an alternative self-hosted solution for temporary elevated access which integrates directly with an external identity provider using OpenID Connect, and federates users directly into AWS Identity and Access Management (IAM) roles in your accounts. The TEAM solution described in this blog post, on the other hand, integrates with IAM Identity Center, which provides a way to centrally manage user access to accounts across your organization and optionally integrates with an external identity provider.

Conclusion

In this blog post, you learned that your first priority should be to use automation to avoid the need to give human users persistent access to your accounts. You also learned that in the rare cases in which people need access to your accounts, not all access is equal; there are times when you need a process to verify that access is needed, and to provide temporary elevated access.

We introduced you to a temporary elevated access management solution (TEAM) that you can download from AWS Samples and use alongside IAM Identity Center to give your users temporary elevated access. We showed you the TEAM workflow, described the TEAM architecture, and provided links where you can get started by downloading and deploying TEAM.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, start a new thread on AWS IAM Identity Center re:Post or contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Taiwo Awoyinfa

Taiwo Awoyinfa

Taiwo is a senior cloud architect with AWS Professional Services. At AWS, he helps global customers with cloud transformation, migration and security initiatives. Taiwo has expertise in cloud architecture, networking, security and application development. He is passionate about identifying and solving problems that delivers value.

Author

James Greenwood

James is a principal security solutions architect who helps AWS Financial Services customers meet their security and compliance objectives in the AWS cloud. James has a background in identity and access management, authentication, credential management, and data protection with more than 20 years experience in the financial services industry.

Varvara Semenova

Varvara Semenova

Varvara is a cloud infrastructure architect with AWS Professional Services. She specialises in building microservices-based serverless applications to address the needs of AWS enterprise customers. Varvara uses her background in DevOps to help the customer innovate faster when developing cost-effective, secure, and reliable solutions.

How to deploy workloads in a multicloud environment with AWS developer tools

Post Syndicated from Brent Van Wynsberge original https://aws.amazon.com/blogs/devops/how-to-deploy-workloads-in-a-multicloud-environment-with-aws-developer-tools/

As organizations embrace cloud computing as part of “cloud first” strategy, and migrate to the cloud, some of the enterprises end up in a multicloud environment.  We see that enterprise customers get the best experience, performance and cost structure when they choose a primary cloud provider. However, for a variety of reasons, some organizations end up operating in a multicloud environment. For example, in case of mergers & acquisitions, an organization may acquire an entity which runs on a different cloud platform, resulting in the organization operating in a multicloud environment. Another example is in the case where an ISV (Independent Software Vendor) provides services to customers operating on different cloud providers. One more example is the scenario where an organization needs to adhere to data residency and data sovereignty requirements, and ends up with workloads deployed to multiple cloud platforms across locations. Thus, the organization ends up running in a multicloud environment.

In the scenarios described above, one of the challenges organizations face operating such a complex environment is managing release process (building, testing, and deploying applications at scale) across multiple cloud platforms. If an organization’s primary cloud provider is AWS, they may want to continue using AWS developer tools to deploy workloads in other cloud platforms. Organizations facing such scenarios can leverage AWS services to develop their end-to-end CI/CD and release process instead of developing a release pipeline for each platform, which is complex, and not sustainable in the long run.

In this post we show how organizations can continue using AWS developer tools in a hybrid and multicloud environment. We walk the audience through a scenario where we deploy an application to VMs running on-premises and Azure, showcasing AWS’ hybrid and multicloud DevOps capabilities.

Solution and scenario overview

In this post we’re demonstrating the following steps:

  • Setup a CI/CD pipeline using AWS CodePipeline, and show how it’s run when application code is updated, and checked into the code repository (GitHub).
  • Check out application code from the code repository, and use an IDE (Visual Studio Code) to make changes, and check-in the code to the code repository.
  • Check in the modified application code to automatically run the release process built using AWS CodePipeline. It makes use of AWS CodeBuild to retrieve the latest version of code from code repository, compile it, build the deployment package, and test the application.
  • Deploy the updated application to VMs across on-premises, and Azure using AWS CodeDeploy.

The high-level solution is shown below. This post does not show all of the possible combinations and integrations available to build the CI/CD pipeline. As an example, you can integrate the pipeline with your existing tools for test and build such as Selenium, Jenkins, SonarQube etc.

This post focuses on deploying application in a multicloud environment, and how AWS Developer Tools can support virtually any scenario or use case specific to your organization. We will be deploying a sample application from this AWS tutorial to an on-premises server, and an Azure Virtual Machine (VM) running Red Hat Enterprise Linux (RHEL). In future posts in this series, we will cover how you can deploy any type of workload using AWS tools, including containers, and serverless applications.

Architecture Diagram

CI/CD pipeline setup

This section describes instructions for setting up a multicloud CI/CD pipeline.

Note: A key point to note is that the CI/CD pipeline setup, and related sub-sections in this post, are a one-time activity, and you’ll not need to perform these steps every time an application is deployed or modified.

Install CodeDeploy agent

The AWS CodeDeploy agent is a software package that is used to execute deployments on an instance. You can install the CodeDeploy agent on an on-premises server and Azure VM by either using the command line, or AWS Systems Manager.

Setup GitHub code repository

Setup GitHub code repository using the following steps:

  1. Create a new GitHub code repository or use a repository that already exists.
  2. Copy the Sample_App_Linux app (zip) from Amazon S3 as described in Step 3 of Upload a sample application to your GitHub repository tutorial.
  3. Commit the files to code repository
    git add .
    git commit -m 'Initial Commit'
    git push

You will use this repository to deploy your code across environments.

Configure AWS CodePipeline

Follow the steps outlined below to setup and configure CodePipeline to orchestrate the CI/CD pipeline of our application.

  1. Navigate to CodePipeline in the AWS console and click on ‘Create pipeline’
  2. Give your pipeline a name (eg: MyWebApp-CICD) and allow CodePipeline to create a service role on your behalf.
  3. For the source stage, select GitHub (v2) as your source provide and click on the Connect to GitHub button to give CodePipeline access to your git repository.
  4. Create a new GitHub connection and click on the Install a new App button to install the AWS Connector in your GitHub account.
  5. Back in the CodePipeline console select the repository and branch you would like to build and deploy.

Image showing the configured source stage

  1. Now we create the build stage; Select AWS CodeBuild as the build provider.
  2. Click on the ‘Create project’ button to create the project for your build stage, and give your project a name.
  3. Select Ubuntu as the operating system for your managed image, chose the standard runtime and select the ‘aws/codebuild/standard’ image with the latest version.

Image showing the configured environment

  1. In the Buildspec section select “Insert build commands” and click on switch to editor. Enter the following yaml code as your build commands:
version: 0.2
phases:
    build:
        commands:
            - echo "This is a dummy build command"
artifacts:
    files:
        - "*/*"

Note: you can also integrate build commands to your git repository by using a buildspec yaml file. More information can be found at Build specification reference for CodeBuild.

  1. Leave all other options as default and click on ‘Continue to CodePipeline’

Image showing the configured buildspec

  1. Back in the CodePipeline console your Project name will automatically be filled in. You can now continue to the next step.
  2. Click the “Skip deploy stage” button; We will create this in the next section.
  3. Review your changes and click “Create pipeline”. Your newly created pipeline will now build for the first time!

Image showing the first execution of the CI/CD pipeline

Configure AWS CodeDeploy on Azure and on-premises VMs

Now that we have built our application, we want to deploy it to both the environments – Azure, and on-premises. In the “Install CodeDeploy agent” section we’ve already installed the CodeDeploy agent. As a one-time step we now have to give the CodeDeploy agents access to the AWS environment.  You can leverage AWS Identity and Access Management (IAM) Roles Anywhere in combination with the code-deploy-session-helper to give access to the AWS resources needed.
The IAM Role should at least have the AWSCodeDeployFullAccess AWS managed policy and Read only access to the CodePipeline S3 bucket in your account (called codepipeline-<region>-<account-id>) .

For more information on how to setup IAM Roles Anywhere please refer how to extend AWS IAM roles to workloads outside of AWS with IAM Roles Anywhere. Alternative ways to configure access can be found in the AWS CodeDeploy user guide. Follow the steps below for instances you want to configure.

  1. Configure your CodeDeploy agent as described in the user guide. Ensure the AWS Command Line Interface (CLI) is installed on your VM and execute the following command to register the instance with CodeDeploy.
    aws deploy register-on-premises-instance --instance-name <name_for_your_instance> --iam-role-arn <arn_of_your_iam_role>
  1. Tag the instance as follows
    aws deploy add-tags-to-on-premises-instances --instance-names <name_for_your_instance> --tags Key=Application,Value=MyWebApp
  2. You should now see both instances registered in the “CodeDeploy > On-premises instances” panel. You can now deploy application to your Azure VM and on premises VMs!

Image showing the registered instances

Configure AWS CodeDeploy to deploy WebApp

Follow the steps mentioned below to modify the CI/CD pipeline to deploy the application to Azure, and on-premises environments.

  1. Create an IAM role named CodeDeployServiceRole and select CodeDeploy > CodeDeploy as your use case. IAM will automatically select the right policy for you. CodeDeploy will use this role to manage the deployments of your application.
  2. In the AWS console navigate to CodeDeploy > Applications. Click on “Create application”.
  3. Give your application a name and choose “EC2/On-premises” as the compute platform.
  4. Configure the instances we want to deploy to. In the detail view of your application click on “Create deployment group”.
  5. Give your deployment group a name and select the CodeDeployServiceRole.
  6. In the environment configuration section choose On-premises Instances.
  7. Configure the Application, MyWebApp key value pair.
  8. Disable load balancing and leave all other options default.
  9. Click on create deployment group. You should now see your newly created deployment group.

Image showing the created CodeDeploy Application and Deployment group

  1. We can now edit our pipeline to deploy to the newly created deployment group.
  2. Navigate to your previously created Pipeline in the CodePipeline section and click edit. Add the deploy stage by clicking on Add stage and name it Deploy. Aftewards click Add action.
  3. Name your action and choose CodeDeploy as your action provider.
  4. Select “BuildArtifact” as your input artifact and select your newly created application and deployment group.
  5. Click on Done and on Save in your pipeline to confirm the changes. You have now added the deploy step to your pipeline!

Image showing the updated pipeline

This completes the on-time devops pipeline setup, and you will not need to repeat the process.

Automated DevOps pipeline in action

This section demonstrates how the devops pipeline operates end-to-end, and automatically deploys application to Azure VM, and on-premises server when the application code changes.

  1. Click on Release Change to deploy your application for the first time. The release change button manually triggers CodePipeline to update your code. In the next section we will make changes to the repository which triggers the pipeline automatically.
  2. During the “Source” stage your pipeline fetches the latest version from github.
  3. During the “Build” stage your pipeline uses CodeBuild to build your application and generate the deployment artifacts for your pipeline. It uses the buildspec.yml file to determine the build steps.
  4. During the “Deploy” stage your pipeline uses CodeDeploy to deploy the build artifacts to the configured Deployment group – Azure VM and on-premises VM. Navigate to the url of your application to see the results of the deployment process.

Image showing the deployed sample application

 

Update application code in IDE

You can modify the application code using your favorite IDE. In this example we will change the background color and a paragraph of the sample application.

Image showing modifications being made to the file

Once you’ve modified the code, save the updated file followed by pushing the code to the code repository.

git add .
git commit -m "I made changes to the index.html file "
git push

DevOps pipeline (CodePipeline) – compile, build, and test

Once the code is updated, and pushed to GitHub, the DevOps pipeline (CodePipeline) automatically compiles, builds and tests the modified application. You can navigate to your pipeline (CodePipeline) in the AWS Console, and should see the pipeline running (or has recently completed). CodePipeline automatically executes the Build and Deploy steps. In this case we’re not adding any complex logic, but based on your organization’s requirements you can add any build step, or integrate with other tools.

Image showing CodePipeline in action

Deployment process using CodeDeploy

In this section, we describe how the modified application is deployed to the Azure, and on-premises VMs.

  1. Open your pipeline in the CodePipeline console, and click on the “AWS CodeDeploy” link in the Deploy step to navigate to your deployment group. Open the “Deployments” tab.

Image showing application deployment history

  1. Click on the first deployment in the Application deployment history section. This will show the details of your latest deployment.

Image showing deployment lifecycle events for the deployment

  1. In the “Deployment lifecycle events” section click on one of the “View events” links. This shows you the lifecycle steps executed by CodeDeploy and will display the error log output if any of the steps have failed.

Image showing deployment events on instance

  1. Navigate back to your application. You should now see your changes in the application. You’ve successfully set up a multicloud DevOps pipeline!

Image showing a new version of the deployed application

Conclusion

In summary, the post demonstrated how AWS DevOps tools and services can help organizations build a single release pipeline to deploy applications and workloads in a hybrid and multicloud environment. The post also showed how to set up CI/CD pipeline to deploy applications to AWS, on-premises, and Azure VMs.

If you have any questions or feedback, leave them in the comments section.

About the Authors

Picture of Amandeep

Amandeep Bajwa

Amandeep Bajwa is a Senior Solutions Architect at AWS supporting Financial Services enterprises. He helps organizations achieve their business outcomes by identifying the appropriate cloud transformation strategy based on industry trends, and organizational priorities. Some of the areas Amandeep consults on are cloud migration, cloud strategy (including hybrid & multicloud), digital transformation, data & analytics, and technology in general.

Picture of Pawan

Pawan Shrivastava

Pawan Shrivastava is a Partner Solution Architect at AWS in the WWPS team. He focusses on working with partners to provide technical guidance on AWS, collaborate with them to understand their technical requirements, and designing solutions to meet their specific needs. Pawan is passionate about DevOps, automation and CI CD pipelines. He enjoys watching mma, playing cricket and working out in the gym.

Picture of Brent

Brent Van Wynsberge

Brent Van Wynsberge is a Solutions Architect at AWS supporting enterprise customers. He guides organizations in their digital transformation and innovation journey and accelerates cloud adoption. Brent is an IoT enthusiast, specifically in the application of IoT in manufacturing, he is also interested in DevOps, data analytics, containers, and innovative technologies in general.

Picture of Mike

Mike Strubbe

Mike is a Cloud Solutions Architect Manager at AWS with a strong focus on cloud strategy, digital transformation, business value, leadership, and governance. He helps Enterprise customers achieve their business goals through cloud expertise, coupled with strong business acumen skills. Mike is passionate about implementing cloud strategies that enable cloud transformations, increase operational efficiency and drive business value.

Automate and accelerate your Amazon QuickSight asset deployments using the new APIs

Post Syndicated from Vetri Natarajan original https://aws.amazon.com/blogs/big-data/automate-and-accelerate-your-amazon-quicksight-asset-deployments-using-the-new-apis/

Business intelligence (BI) and IT operations (BIOps) teams often need to automate and accelerate the deployment of BI assets to ensure business continuity. We heard that you wanted an automated and scalable way to deploy, back up, and replicate Amazon QuickSight assets at scale so that BIOps teams within your organization can work in an agile manner.

Today, we are releasing six new QuickSight APIs to allow programmatic access to export and import QuickSight assets—dashboards, analyses, datasets including ingestion schedules, data sources, themes, and VPC configurations—across accounts and environments. These new APIs allow you to interact with a collection of assets in a lift-and-shift manner for deployment across QuickSight accounts, enable backup and restore, and support replication so you can automate workflows and achieve your desired infrastructure setup. These new capabilities bring greater agility to your BIOps teams, allowing you to automate and seamlessly integrate QuickSight assets into existing infrastructure.

Prior to this launch, you needed to have an in-depth understanding of QuickSight asset relationships and couldn’t deploy, back up, or replicate at scale in an automated manner. In this post, we cover the capabilities of the new APIs in detail and go over common use cases.

Export APIs

You can use the following APIs to initiate, track, and describe the export jobs that produce the bundle files from the source account. A bundle file is a zip file (with the .qs extension) that contains assets specified by the caller, and optionally all dependencies of the assets. The APIs are as follows:

  • StartAssetBundleExportJob – Use this asynchronous API to export an asset bundle file.
  • DescribeAssetBundleExportJob – Use this synchronous API to get the status of your export job. When successful, this API call response will have a presigned URL to fetch the asset bundle.
  • ListAssetBundleExportJobs – Use this synchronous API to list past export jobs. The list will contain both finished and running jobs from the past 15 days.

Import APIs

These APIs initiate, track, and describe the import jobs that take the bundle file as input and create or update assets in the destination account:

Supported assets

You can export and import the following assets with these APIs:

  • Analyses
  • Dashboards
  • Data sources
  • Datasets including scheduled and incremental refreshes
  • Themes
  • VPC connections

Asset bundle output format

The output of the export job is a single zip file with the .qs extension. This zip file contains a separate folder for each asset type. Each folder contains a single JSON file for each asset with the resourceId as the file name. This folder structure makes it easy to commit the contents into a version control system like Git, so you can get the benefits of a complete version history.

The Asset-bundle API can also export QuickSight assets as AWS CloudFormation templates, one of the most popular infrastructure as code (IaC) frameworks. It makes it easy to manage your QuickSight assets at scale and automate your deployments. AWS CloudFormation also has built-in transaction and rollback capabilities, ensuring that all your environments are consistent and your assets are deployed correctly every time. Finally, you can use the CloudFormation templates to recreate your QuickSight resources in case of a disaster.

Permissions required

These APIs are available to users with AWS Identity and Access Management (IAM) permissions to run these APIs. The following IAM policy allows an IAM user to get access to these APIs:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [          
                "quicksight:StartAssetBundleImportJob",
                "quicksight:DescribeAssetBundleImportJob",
                "quicksight:ListAssetBundleImportJobs",
                "quicksight:StartAssetBundleExportJob",
                "quicksight:DescribeAssetBundleExportJob",
                "quicksight:ListAssetBundleExportJobs"
            ],
            "Resource": "*"
        }
    ]
}

Use case overview

Let’s consider a fictional company, AnyCompany, which owns healthcare facilities across the globe. They have set up a development QuickSight account for authors to create and update QuickSight assets and a separate production account. In some cases due, to data residency regulation, they have to maintain the same assets across multiple Regions. AnyCompany is scaling their business and they want to automate deployment within and across multiple QuickSight accounts and back up QuickSight assets on a schedule.

AnyCompany has the following key deployment and backup requirements:

  • Deployment – Deploy QuickSight assets across Regions and multiple accounts:
    • Deployment to the production account – AnyCompany wants to automate the deployment of QuickSight assets from their development to their production account.
    • Deployment to different Regions in the same account – AnyCompany’s central IT team needs to deploy dashboards and datasets across various Regions to meet data residency requirements.
    • Deployment to multiple accounts in different Regions – To meet their end customer requirements of separate QuickSight accounts, AnyCompany needs to deploy the dashboards and datasets across multiple accounts.
    • Deployment in the same account and same Region – AnyCompany consolidates all non-production environment into one QuickSight account. However, there has to be different dashboards and datasets for each non-production environment, such as development and testing.
  • Backup and restore – As AnyCompany rolls out critical dashboards for business, it needs to ensure high availability of the dashboards. As part of their strategy, AnyCompany wants to maintain a backup of assets to restore in case of disasters.
  • Deployment history – As part of the governed process, AnyCompany’s central IT team needs to have a history of deployments in each environment.

In the following sections, we discuss how to meet these requirements.

Deploy to a production account

The following figure shows the sequence of steps in the deployment process through the new asset deployment APIs.

For deployments, the import job API provides the capability to pass data source configurations to point to the respective test or production instances of data sources. In the preceding sequence flow, we use the AWS Command Line Interface (AWS CLI) to showcase the capability, but you can invoke the APIs through your automation pipeline using AWS SDKs.

For this use case, AnyCompany used Amazon Simple Storage Service (Amazon S3) to store their asset bundles..

On the development QuickSight account, complete the following steps:

  1. Use the StartAssetBundleExportJob API to export the dashboard and its dependencies.
  2. Place the output asset bundle in an S3 bucket in the production account.

On the production QuickSight account, complete the following steps:

  1. Use the StartAssetBundleImportJob API with the asset bundle in Amazon S3 as the source, overriding the data source details.
  2. Run the import job.

The following code shows their StartAssetBundleExportJob API call to export the dashboard and its dependencies:

aws quicksight start-asset-bundle-export-job 
--aws-account-id $AAI 
--asset-bundle-export-job-id job-1 
--resource-arns arn:aws:quicksight:$IR:$AAI:dashboard/<<dashboard-id>> 
--include-all-dependencies 
--export-format QUICKSIGHT_JSON

The following code is for the DescribeAssetBundleExportJob API:

aws quicksight describe-asset-bundle-export-job 
--aws-account-id $AAI 
--asset-bundle-export-job-id job-1

The output of the DescribeAssetBundleExportJob API call contains the presigned URL, which you use to download your respective assets and subsequently upload them to a dedicated S3 bucket in the target account.

The import job (StartAssetBundleImportJob) is initiated in the target account using S3Uri as one of the input parameters. You can also change the data source configuration while initiating the job. In the following example, the S3 manifest file location for the S3 data source is overridden:

aws quicksight start-asset-bundle-import-job
 --aws-account-id $AAI 
--asset-bundle-import-job-id job-1 
--asset-bundle-import-source "{\"S3Uri\": \"<<qsfile location\"}"
 --override-parameters '{"DataSources": 
 [{"DataSourceId": "<<DataSourceID>>", 
 "DataSourceParameters": 
 { "S3Parameters": {"ManifestFileLocation": 
 {"Bucket": "<<bucket name>>", "Key": "<<key for manifest file>>"}}}}]}' 
--region us-west-2

DescribeAssetBundleImportJob lets you to monitor the status of the import job:

aws quicksight describe-asset-bundle-import-job 
--aws-account-id $AAI 
--asset-bundle-import-job-id job-1 
--region us-west-2

The following screenshot shows the response.

Deploy to different Regions in the different accounts

To comply with data residency regulations, data can’t be moved outside a Region in certain countries. Therefore, the dashboards have to be deployed in each of these Regions. QuickSight provides the option to pass an asset bundle extracted from the source environment as a base64 encoded string for the import job (StartAssetBundleImportJob):

aws quicksight start-asset-bundle-import-job \
 --aws-account-id $AAI  \
--asset-bundle-import-job-id <<job-id>> \ 
--asset-bundle-import-source Body="$(base64 assetbundle-extract.qs)"

Deploy within a single account in the same Region

When using a centralized account approach for all the lower environments, AnyCompany wanted to have the same dashboards within a single Region to be able to connect with different data sources. To achieve this, they used the optional parameter resource-id-prefix in the import job (StartAssetBundleImportJob) to create a unique ID for each environment:

aws quicksight start-asset-bundle-import-job
--aws-account-id $AAI 
--asset-bundle-import-job-id job-1 
--asset-bundle-import-source "{\"S3Uri\": \"s3://qs file s3 location\"}"
--override-parameters '{"DataSources": 
[{"DataSourceId": "<<DataSource ID>>", 
"DataSourceParameters":{ "S3Parameters": {"ManifestFileLocation": 
{"Bucket": "ee-assets-prod-us-west-2", 
"Key": "modules/337d5d05acc64a6fa37bcba6b921071c/v1/SalesDataManifest.json"}}}}]}' 
--region us-west-2
--resource-id-prefix "test-"

Backup and version control

AnyCompany deploys business-critical dashboards, and it’s important for them to have proper backup and version control processes. They run scheduled export jobs at regular intervals along with asset deployments. Additionally, they use asset bundle APIs to meet their version control requirements.

The following screenshot shows the content of a sample bundle.

Deployment history

AnyCompany needs to track the deployment history of all the assets in all environments. They achieved this goal by using the ListAssetBundleExportJobs and ListAssetBundleImportJobs APIs to fetch the deployment history in a given account.

The following code is for ListAssetBundleExportJobs:

aws quicksight list-asset-bundle-export-jobs \
 --aws-account-id $AAI  \

The following code is for ListAssetBundleImportJobs:

aws quicksight list-asset-bundle-export-jobs \
 --aws-account-id $AAI  \

Conclusion

Asset bundle APIs provide methods for automation and acceleration in the deployment process across multiple environments. This post illustrated various use cases where you can apply these APIs for automation and scale. For more information, refer to Amazon QuickSight and What’s New in the Amazon QuickSight User Guide.

If you have any questions or feedback, please leave a comment. For additional discussions and help getting answers to your questions, check out the QuickSight Community.


About the authors

Vetri Natarajan is a Specialist Solutions Architect for Amazon QuickSight. Vetri has 15 years of experience implementing enterprise business intelligence (BI) solutions and greenfield data products. Vetri specializes in integration of BI solutions with business applications and enable data-driven decisions.

Zhao Pan is a software development manager for Amazon QuickSight. He is working to provide a delightful developer experience to our customers to automate and streamline their BI operations. He has 20 years of software development experience in various tech stacks. Prior to QuickSight he was a people and technical leader at ADP building a next-gen platform for human capital management. When he is not at his desk, he can usually be found in his garage building one contraption or another.

Mayank Agarwal is a product manager for Amazon QuickSight, AWS’ cloud-native, fully managed BI service. He focuses on embedded analytics and developer experience. He started his career as an embedded software engineer developing handheld devices. Prior to QuickSight he was leading engineering teams at Credence ID, developing custom mobile embedded device and web solutions using AWS services that make biometric enrollment and identification fast, intuitive, and cost-effective for Government sector, healthcare and transaction security applications.

Introducing Amazon EMR on EKS job submission with Spark Operator and spark-submit

Post Syndicated from Lotfi Mouhib original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-on-eks-job-submission-with-spark-operator-and-spark-submit/

Amazon EMR on EKS provides a deployment option for Amazon EMR that allows organizations to run open-source big data frameworks on Amazon Elastic Kubernetes Service (Amazon EKS). With EMR on EKS, Spark applications run on the Amazon EMR runtime for Apache Spark. This performance-optimized runtime offered by Amazon EMR makes your Spark jobs run fast and cost-effectively. The EMR runtime provides up to 5.37 times better performance and 76.8% cost savings, when compared to using open-source Apache Spark on Amazon EKS.

Building on the success of Amazon EMR on EKS, customers have been running and managing jobs using the emr-containers API, creating EMR virtual clusters, and submitting jobs to the EKS cluster, either through the AWS Command Line Interface (AWS CLI) or Apache Airflow scheduler. However, other customers running Spark applications have chosen Spark Operator or native spark-submit to define and run Apache Spark jobs on Amazon EKS, but without taking advantage of the performance gains from running Spark on the optimized EMR runtime. In response to this need, starting from EMR 6.10, we have introduced a new feature that lets you use the optimized EMR runtime while submitting and managing Spark jobs through either Spark Operator or spark-submit. This means that anyone running Spark workloads on EKS can take advantage of EMR’s optimized runtime.

In this post, we walk through the process of setting up and running Spark jobs using both Spark Operator and spark-submit, integrated with the EMR runtime feature. We provide step-by-step instructions to assist you in setting up the infrastructure and submitting a job with both methods. Additionally, you can use the Data on EKS blueprint to deploy the entire infrastructure using Terraform templates.

Infrastructure overview

In this post, we walk through the process of deploying a comprehensive solution using eksctl, Helm, and AWS CLI. Our deployment includes the following resources:

  • A VPC, EKS cluster, and managed node group, set up with the eksctl tool
  • Essential Amazon EKS managed add-ons, such as the VPC CNI, CoreDNS, and KubeProxy set up with the eksctl tool
  • Cluster Autoscaler and Spark Operator add-ons, set up using Helm
  • A Spark job execution AWS Identity and Access Management (IAM) role, IAM policy for Amazon Simple Storage Service (Amazon S3) bucket access, service account, and role-based access control, set up using the AWS CLI and eksctl

Prerequisites

Verify that the following prerequisites are installed on your machine:

Set up AWS credentials

Before proceeding to the next step and running the eksctl command, you need to set up your local AWS credentials profile. For instructions, refer to Configuration and credential file settings.

Deploy the VPC, EKS cluster, and managed add-ons

The following configuration uses us-west-1 as the default Region. To run in a different Region, update the region and availabilityZones fields accordingly. Also, verify that the same Region is used in the subsequent steps throughout the post.

Enter the following code snippet into the terminal where your AWS credentials are set up. Make sure to update the publicAccessCIDRs field with your IP before you run the command below. This will create a file named eks-cluster.yaml:

cat <<EOF >eks-cluster.yaml
---
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig
metadata:
  name: emr-spark-operator
  region: us-west-1 # replace with your region
  version: "1.25"
vpc:
  clusterEndpoints:
    publicAccess: true
    privateAccess: true
  publicAccessCIDRs: ["YOUR-IP/32"]
availabilityZones: ["us-west-1a","us-west-1b"] # replace with your region
iam:
  withOIDC: true
  serviceAccounts:
  - metadata:
      name: cluster-autoscaler
      namespace: kube-system
    wellKnownPolicies:
      autoScaler: true
    roleName: eksctl-cluster-autoscaler-role
managedNodeGroups:
  - name: m5x
    instanceType: m5.xlarge
    availabilityZones: ["us-west-1a"]
    volumeSize: 100
    volumeType: gp3
    minSize: 2
    desiredCapacity: 2
    maxSize: 10
    tags:
      k8s.io/cluster-autoscaler/enabled: "true"
      k8s.io/cluster-autoscaler/eks-nvme: "owned" 
addons:
  - name: vpc-cni
    version: latest
  - name: coredns
    version: latest
  - name: kube-proxy
    version: latest
cloudWatch:
  clusterLogging:
    enableTypes: ["*"]
EOF

Use the following command to create the EKS cluster : eksctl create cluster -f eks-cluster.yaml

Deploy Cluster Autoscaler

Cluster Autoscaler is crucial for automatically adjusting the size of your Kubernetes cluster based on the current resource demands, optimizing resource utilization and cost. Create an autoscaler-helm-values.yaml file and install the Cluster Autoscaler using Helm:

cat <<EOF >autoscaler-helm-values.yaml
---
autoDiscovery:
    clusterName: emr-spark-operator
    tags:
      - k8s.io/cluster-autoscaler/enabled
      - k8s.io/cluster-autoscaler/{{ .Values.autoDiscovery.clusterName }}
awsRegion: us-west-1 # Make sure the region same as the EKS Cluster
rbac:
  serviceAccount:
    create: false
    name: cluster-autoscaler
EOF
helm repo add autoscaler https://kubernetes.github.io/autoscaler
helm install nodescaler autoscaler/cluster-autoscaler \
--namespace kube-system \
--values autoscaler-helm-values.yaml --debug

You can also set up Karpenter as a cluster autoscaler to automatically launch the right compute resources to handle your EKS cluster’s applications. You can follow this blog on how to setup and configure Karpenter.

Deploy Spark Operator

Spark Operator is an open-source Kubernetes operator specifically designed to manage and monitor Spark applications running on Kubernetes. It streamlines the process of deploying and managing Spark jobs, by providing a Kubernetes custom resource to define, configure and run Spark applications, as well as manage the job life cycle through Kubernetes API. Some customers prefer using Spark Operator to manage Spark jobs because it enables them to manage Spark applications just like other Kubernetes resources.

Currently, customers are building their open-source Spark images and using S3a committers as part of job submissions with Spark Operator or spark-submit. However, with the new job submission option, you can now benefit from the EMR runtime in conjunction with EMRFS. Starting with Amazon EMR 6.10 and for each upcoming version of the EMR runtime, we will release the Spark Operator and its Helm chart to use the EMR runtime.

In this section, we show you how to deploy a Spark Operator Helm chart from an Amazon Elastic Container Registry (Amazon ECR) repository and submit jobs using EMR runtime images, benefiting from the performance enhancements provided by the EMR runtime.

Install Spark Operator with Helm from Amazon ECR

The Spark Operator Helm chart is stored in an ECR repository. To install the Spark Operator, you first need to authenticate your Helm client with the ECR repository. The charts are stored under the following path: ECR_URI/spark-operator.

Authenticate your Helm client and install the Spark Operator:

aws ecr get-login-password \
--region us-west-1 | helm registry login \
--username AWS \
--password-stdin 608033475327.dkr.ecr.us-west-1.amazonaws.com

You can authenticate to other EMR on EKS supported Regions by obtaining the AWS account ID for the corresponding Region. For more information, refer to how to select a base image URI.

Install Spark Operator

You can now install Spark Operator using the following command:

helm install spark-operator-demo \
oci://608033475327.dkr.ecr.us-west-1.amazonaws.com/spark-operator \
--set emrContainers.awsRegion=us-west-1 \
--version 1.1.26-amzn-0 \
--set serviceAccounts.spark.create=false \
--namespace spark-operator \
--create-namespace

To verify that the operator has been installed correctly, run the following command:

helm list --namespace spark-operator -o yaml

Set up the Spark job execution role and service account

In this step, we create a Spark job execution IAM role and a service account, which will be used in Spark Operator and spark-submit job submission examples.

First, we create an IAM policy that will be used by the IAM Roles for Service Accounts (IRSA). This policy enables the driver and executor pods to access the AWS services specified in the policy. Complete the following steps:

  1. As a prerequisite, either create an S3 bucket (aws s3api create-bucket --bucket <ENTER-S3-BUCKET> --create-bucket-configuration LocationConstraint=us-west-1 --region us-west-1) or use an existing S3 bucket. Replace <ENTER-S3-BUCKET> in the following code with the bucket name.
  2. Create a policy file that allows read and write access to an S3 bucket:
    cat >job-execution-policy.json <<EOL
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts"
                ],
                "Resource": [
                    "arn:aws:s3:::<ENTER-S3-BUCKET>",
                    "arn:aws:s3:::<ENTER-S3-BUCKET>/*",
                    "arn:aws:s3:::aws-data-lake-workshop/*",
                    "arn:aws:s3:::nyc-tlc",
                    "arn:aws:s3:::nyc-tlc/*"
                ]
            }
        ]
    }
    EOL

  3. Create the IAM policy with the following command:
    aws iam create-policy --policy-name emr-job-execution-policy --policy-document file://job-execution-policy.json

  4. Next, create the service account named emr-job-execution-sa-role as well as the IAM roles. The following eksctl command creates a service account scoped to the namespace and service account defined to be used by the executor and driver. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    eksctl create iamserviceaccount \
    --cluster=emr-spark-operator \
    --region us-west-1 \
    --name=emr-job-execution-sa \
    --attach-policy-arn=arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:policy/emr-job-execution-policy \
    --role-name=emr-job-execution-irsa \
    --namespace=data-team-a \
    --approve

  5. Create an S3 bucket policy to allow only the execution role create in step 4 to write and read from the S3 bucket create in step 1. Make sure to replace <ENTER_YOUR_ACCOUNT_ID> with your account ID before running the command:
    cat > bucketpolicy.json<<EOL
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:GetObject",
                    "s3:ListBucket",
                    "s3:PutObject",
                    "s3:DeleteObject",
                    "s3:AbortMultipartUpload",
                    "s3:ListMultipartUploadParts"
                ], "Principal": {
                    "AWS": "arn:aws:iam::<ENTER_YOUR_ACCOUNT_ID>:role/emr-job-execution-irsa"
                },
                "Resource": [
                    "arn:aws:s3:::<ENTER-S3-BUCKET>",
                    "arn:aws:s3:::<ENTER-S3-BUCKET>/*"
                ]
            }
        ]
    }
    EOL
    
    aws s3api put-bucket-policy --bucket ENTER-S3-BUCKET-NAME --policy file://bucketpolicy.json

  6. Create a Kubernetes role and role binding required for the service account used in the Spark job run:
    cat <<EOF >emr-job-execution-rbac.yaml
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      name: emr-job-execution-sa-role
      namespace: data-team-a
    rules:
      - apiGroups: ["", "batch","extensions"]
        resources: ["configmaps","serviceaccounts","events","pods","pods/exec","pods/log","pods/portforward","secrets","services","persistentvolumeclaims"]
        verbs: ["create","delete","get","list","patch","update","watch"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: emr-job-execution-sa-rb
      namespace: data-team-a
    roleRef:
      apiGroup: rbac.authorization.k8s.io
      kind: Role
      name: emr-job-execution-sa-role
    subjects:
      - kind: ServiceAccount
        name: emr-job-execution-sa
        namespace: data-team-a
    EOF

  7. Apply the Kubernetes role and role binding definition with the following command:
kubectl apply -f emr-job-execution-rbac.yaml

So far, we have completed the infrastructure setup, including the Spark job execution roles. In the following steps, we run sample Spark jobs using both Spark Operator and spark-submit with the EMR runtime.

Configure the Spark Operator job with the EMR runtime

In this section, we present a sample Spark job that reads data from public datasets stored in S3 buckets, processes them, and writes the results to your own S3 bucket. Make sure that you update the S3 bucket in the following configuration by replacing <ENTER_S3_BUCKET> with the URI to your own S3 bucket refered in step 2 of the “Set up the Spark job execution role and service account section. Also, note that we are using data-team-a as a namespace and emr-job-execution-sa as a service account, which we created in the previous step. These are necessary to run the Spark job pods in the dedicated namespace, and the IAM role linked to the service account is used to access the S3 bucket for reading and writing data.

Most importantly, notice the image field with the EMR optimized runtime Docker image, which is currently set to emr-6.10.0. You can change this to a newer version when it’s released by the Amazon EMR team. Also, when configuring your jobs, make sure that you include the sparkConf and hadoopConf settings as defined in the following manifest. These configurations enable you to benefit from EMR runtime performance, AWS Glue Data Catalog integration, and the EMRFS optimized connector.

  1. Create the file (emr-spark-operator-example.yaml) locally and update the S3 bucket location so that you can submit the job as part of the next step:
    cat <<EOF >emr-spark-operator-example.yaml
    ---
    apiVersion: "sparkoperator.k8s.io/v1beta2"
    kind: SparkApplication
    metadata:
      name: taxi-example
      namespace: data-team-a
    spec:
      type: Scala
      mode: cluster
      # EMR optimized runtime image
      image: "483788554619.dkr.ecr.eu-west-1.amazonaws.com/spark/emr-6.10.0:latest"
      imagePullPolicy: Always
      mainClass: ValueZones
      mainApplicationFile: s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar
      arguments:
        - s3://nyc-tlc/csv_backup
        - "2017"
        - s3://nyc-tlc/misc/taxi _zone_lookup.csv
        - s3://<ENTER_S3_BUCKET>/emr-eks-results
        - emr_eks_demo
      hadoopConf:
        # EMRFS filesystem config
        fs.s3.customAWSCredentialsProvider: com.amazonaws.auth.WebIdentityTokenCredentialsProvider
        fs.s3.impl: com.amazon.ws.emr.hadoop.fs.EmrFileSystem
        fs.AbstractFileSystem.s3.impl: org.apache.hadoop.fs.s3.EMRFSDelegate
        fs.s3.buffer.dir: /mnt/s3
        fs.s3.getObject.initialSocketTimeoutMilliseconds: "2000"
        mapreduce.fileoutputcommitter.algorithm.version.emr_internal_use_only.EmrFileSystem: "2"
        mapreduce.fileoutputcommitter.cleanup-failures.ignored.emr_internal_use_only.EmrFileSystem: "true"
      sparkConf:
        spark.eventLog.enabled: "true"
        spark.eventLog.dir: "s3://<ENTER_S3_BUCKET>/"
        spark.kubernetes.driver.pod.name: driver-nyc-taxi-etl
        # Required for EMR Runtime and Glue Catalogue
        spark.driver.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.driver.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        spark.executor.extraClassPath: /usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*
        spark.executor.extraLibraryPath: /usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native
        # EMRFS commiter
        spark.sql.parquet.output.committer.class: com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter
        spark.sql.parquet.fs.optimized.committer.optimization-enabled: "true"
        spark.sql.emr.internal.extensions: com.amazonaws.emr.spark.EmrSparkSessionExtensions
        spark.executor.defaultJavaOptions: -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70 -XX:OnOutOfMemoryError='kill -9 %p'
        spark.driver.defaultJavaOptions:  -XX:OnOutOfMemoryError='kill -9 %p' -XX:+UseParallelGC -XX:InitiatingHeapOccupancyPercent=70
      sparkVersion: "3.3.1"
      restartPolicy:
        type: Never
      driver:
        cores: 1
        memory: "4g"
        serviceAccount: emr-job-execution-sa
      executor:
        cores: 1
        instances: 4
        memory: "4g"
        serviceAccount: emr-job-execution-sa
    EOF

  2. Run the following command to submit the job to the EKS cluster:
    kubectl apply -f emr-spark-operator-example.yaml

    The job may take 4–5 minutes to complete, and you can verify the successful message in the driver pod logs.

  3. Verify the job by running the following command:
kubectl get pods -n data-team-a

Enable access to the Spark UI

The Spark UI is an important tool for data engineers because it allows you to track the progress of tasks, view detailed job and stage information, and analyze resource utilization to identify bottlenecks and optimize your code. For Spark jobs running on Kubernetes, the Spark UI is hosted on the driver pod and its access is restricted to the internal network of Kubernetes. To access it, we need to forward the traffic to the pod with kubectl. The following steps take you through how to set it up.

Run the following command to forward traffic to the driver pod:

kubectl port-forward <driver-pod-name> 4040:4040

You should see text similar to the following:

Forwarding from 127.0.0.1:4040 -> 4040
Forwarding from [::1]:4040 → 4040

If you didn’t specify the driver pod name at the submission of the SparkApplication, you can get it with the following command:

kubectl get pods -l spark-role=driver,spark-app-name=<your-spark-app-name> -o jsonpath='{.items[0].metadata.name}'

Open a browser and enter http://localhost:4040 in the address bar. You should be able to connect to the Spark UI.

spark-ui-screenshot

Spark History Server

If you want to explore your job after its run, you can view it through the Spark History Server. The preceding SparkApplication definition has the event log enabled and stores the events in an S3 bucket with the following path: s3://YOUR-S3-BUCKET/. For instructions on setting up the Spark History Server and exploring the logs, refer to Launching the Spark history server and viewing the Spark UI using Docker.

spark-submit

spark-submit is a command line interface for running Apache Spark applications on a cluster or locally. It allows you to submit applications to Spark clusters. The tool enables simple configuration of application properties, resource allocation, and custom libraries, streamlining the deployment and management of Spark jobs.

Beginning with Amazon EMR 6.10, spark-submit is supported as a job submission method. This method currently only supports cluster mode as the submission mechanism. To submit jobs using the spark-submit method, we reuse the IAM role for the service account we set up earlier. We also use the S3 bucket used for the Spark Operator method. The steps in this section take you through how to configure and submit jobs with spark-submit and benefit from EMR runtime improvements.

  1. In order to submit a job, you need to use the Spark version that matches the one available in Amazon EMR. For Amazon EMR 6.10, you need to download the Spark 3.3 version.
  2. You also need to make sure you have Java installed in your environment.
  3. Unzip the file and navigate to the root of the Spark directory.
  4. In the following code, replace the EKS endpoint as well as the S3 bucket then run the script:
./bin/spark-submit \
--class ValueZones \
--master k8s://EKS-ENDPOINT \
--conf spark.kubernetes.namespace=data-team-a \
--conf spark.kubernetes.container.image=608033475327.dkr.ecr.us-west-1.amazonaws.com/spark/emr-6.10.0:latest \
--conf spark.kubernetes.authenticate.driver.serviceAccountName=emr-job-execution-sa \
--conf spark.kubernetes.authenticate.executor.serviceAccountName=emr-job-execution-sa \
--conf spark.driver.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.driver.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.executor.extraClassPath="/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/sagemaker-spark-sdk/lib/sagemaker-spark-sdk.jar:/home/hadoop/extrajars/*" \
--conf spark.executor.extraLibraryPath="/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native" \
--conf spark.hadoop.fs.s3.customAWSCredentialsProvider=com.amazonaws.auth.WebIdentityTokenCredentialsProvider \
--conf spark.hadoop.fs.s3.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--conf spark.hadoop.fs.AbstractFileSystem.s3.impl=org.apache.hadoop.fs.s3.EMRFSDelegate \
--conf spark.hadoop.fs.s3.buffer.dir=/mnt/s3 \
--conf spark.hadoop.fs.s3n.impl=com.amazon.ws.emr.hadoop.fs.EmrFileSystem \
--deploy-mode cluster \
s3://aws-data-lake-workshop/spark-eks/spark-eks-assembly-3.3.0.jar s3://nyc-tlc/csv_backup 2017 s3://nyc-tlc/misc/taxi_zone_lookup.csv s3://S3_BUCKET/emr-eks-results emr_eks_demo

The job takes about 7 minutes to complete with two executors of one core and 1 G of memory.

Using custom kubernetes schedulers

Customers running a large volume of jobs concurrently might face challenges related to providing fair access to compute capacity that they aren’t able to solve with the standard scheduling and resource utilization management Kubernetes offers. In addition, customers that are migrating from Amazon EMR on Amazon Elastic Compute Cloud (Amazon EC2) and are managing their scheduling with YARN queues will not be able to transpose them to Kubernetes scheduling capabilities.

To overcome this issue, you can use custom schedulers like Apache Yunikorn or Volcano.Spark Operator natively supports these schedulers, and with them you can schedule Spark applications based on factors such as priority, resource requirements, and fairness policies, while Spark Operator simplifies application deployment and management. To set up Yunikorn with gang scheduling and use it in Spark applications submitted through Spark Operator, refer to Spark Operator with YuniKorn.

Clean up

To avoid unwanted charges to your AWS account, delete all the AWS resources created during this deployment:

eksctl delete cluster -f eks-cluster.yaml

Conclusion

In this post, we introduced the EMR runtime feature for Spark Operator and spark-submit, and explored the advantages of using this feature on an EKS cluster. With the optimized EMR runtime, you can significantly enhance the performance of your Spark applications while optimizing costs. We demonstrated the deployment of the cluster using the eksctl tool, , you can also use the Data on EKS blueprints for deploying a production-ready EKS which you can use for EMR on EKS and leverage these new deployment methods in addition to the EMR on EKS API job submission method. By running your applications on the optimized EMR runtime, you can further enhance your Spark application workflows and drive innovation in your data processing pipelines.


About the Authors

Lotfi Mouhib is a Senior Solutions Architect working for the Public Sector team with Amazon Web Services. He helps public sector customers across EMEA realize their ideas, build new services, and innovate for citizens. In his spare time, Lotfi enjoys cycling and running.

Vara Bonthu is a dedicated technology professional and Worldwide Tech Leader for Data on EKS, specializing in assisting AWS customers ranging from strategic accounts to diverse organizations. He is passionate about open-source technologies, data analytics, AI/ML, and Kubernetes, and boasts an extensive background in development, DevOps, and architecture. Vara’s primary focus is on building highly scalable data and AI/ML solutions on Kubernetes platforms, helping customers harness the full potential of cutting-edge technology for their data-driven pursuits.

Visualize data quality scores and metrics generated by AWS Glue Data Quality

Post Syndicated from Zack Zhou original https://aws.amazon.com/blogs/big-data/visualize-data-quality-scores-and-metrics-generated-by-aws-glue-data-quality/

AWS Glue Data Quality allows you to measure and monitor the quality of data in your data repositories. It’s important for business users to be able to see quality scores and metrics to make confident business decisions and debug data quality issues. AWS Glue Data Quality generates a substantial amount of operational runtime information during the evaluation of rulesets.

An operational scorecard is a mechanism used to evaluate and measure the quality of data processed and validated by AWS Glue Data Quality rulesets. It provides insights and metrics related to the performance and effectiveness of data quality processes.

In this post, we highlight the seamless integration of Amazon Athena and Amazon QuickSight, which enables the visualization of operational metrics for AWS Glue Data Quality rule evaluation in an efficient and effective manner.

This post is Part 5 of a five-post series to explain how to build dashboards to measure and monitor your data quality:

Solution overview

The solution allows you to build your AWS Glue Data Quality score and metrics dashboard using QuickSight in an easy and straightforward manner. The following architecture diagram shows an overview of the complete pipeline.

These are six main steps in the data pipeline:

  1. Amazon EventBridge triggers an AWS Lambda function when the event pattern for AWS Glue Data Quality matches the defined rule. (Refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality)
  2. The Lambda function writes the AWS Glue Data Quality result to an Amazon Simple Storage Service (Amazon S3) bucket.
  3. An AWS Glue crawler crawls the results.
  4. The crawler builds a Data Catalog, so the data can be queried using Athena.
  5. We can analyze the data quality score and metrics using Athena SQL queries.
  6. We can query and submit the Athena data to QuickSight to create visuals for the dashboard.

In the following sections, we discuss these steps in more detail.

Prerequisites

To follow along with this post, complete the following prerequisites:

  1. Have an AWS Identity and Access Management (IAM) role with permissions to extract data from an S3 bucket and write to the AWS Glue Data Catalog.
  2. Similarly, have a Lambda function execution role with access to AWS Glue and  S3 buckets.
  3. Set up the Athena query result location. For more information, refer to Working with Query Results, Output Files, and Query History.
  4. Set up QuickSight permissions and enable Athena table and S3 bucket access.

Set up and deploy the Lambda pipeline

To test the solution, we can use the following AWS CloudFormation template. The CloudFormation template creates the EventBridge rule, Lambda function, and S3 bucket to store the data quality results.

If you deployed the CloudFormation template in the previous post, you don’t need to deploy it again in this step.

The following screenshot shows a line of code in which the Lambda function writes the results from AWS Glue Data Quality to an S3 bucket. As depicted, the data will be stored in JSON format and organized according to the time horizon, facilitating convenient access and analysis of the data over time.

Set up the AWS Glue Data Catalog using a crawler

Complete the following steps to create an AWS Glue crawler and set up the Data Catalog:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. For Name, enter data-quality-result-crawler, then choose Next.
  4. Under Data sources, choose Add a data source.
  5. For Data source, choose S3.
  6. For S3 path, enter the S3 path to your data source. (s3://<AWS CloudFormation outputs key:DataQualityS3BucketNameOutputs>/gluedataqualitylogs/). Refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality for details.
  7. Choose Add an S3 data source and choose Next.
  8. For Existing IAM role, choose your IAM role (GlueDataQualityLaunchBlogDemoRole-xxxx). Refer to Set up alerts and orchestrate data quality rules with AWS Glue Data Quality for details. Then choose Next.
  9. For Target database, choose Add database.
  10. For Database name, enter data-quality-result-database, then choose Create.
  11. For Table name prefix, enter dq_, then choose Next.
  12. Choose Create crawler.
  13. On the Crawlers page, select data-quality-result-crawler and choose Run.

When the crawler is complete, you can see the AWS Glue Data Catalog table definition.

After you create the table definition on the AWS Glue Data Catalog, you can use Athena to query the Data Catalog table.

Query the Data Catalog table using Athena

Athena is an interactive query service that makes it easy to analyze data in Amazon S3 and the AWS Glue Data Catalog using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run on datasets at petabyte scale.

The purpose of this step is to understand our data quality statistics at the table level as well as at the ruleset level. Athena provides simple queries to assist you with this task. Use the queries in this section to analyze your data quality metrics and create an Athena view to use to build a QuickSight dashboard in the next step.

Query 1

The following is a simple SELECT query on the Data Catalog table:

SELECT * FROM "data-quality-result-database"."dq_gluedataqualitylogs" limit 10;

The following screenshot shows the output.

Before we run the second query, let’s check the schema for the table dq_gluedataqualitylogs.

The following screenshot shows the output.

The table shows that one of the columns, resultrun, is the array data type. In order to work with this column in QuickSight, we need to perform an additional step to transform it into multiple strings. This is necessary because QuickSight doesn’t support the array data type.

Query 2

Use the following query to review the data in the resultrun column:

SELECT resultrun FROM "data-quality-result-database"."dq_gluedataqualitylogs" limit 10;

The following screenshot shows the output.

Query 3

The following query flattens an array into multiple rows using CROSS JOIN in conjunction with the unnest operator and creates a view on the selected columns:

CREATE OR REPLACE VIEW data_quality_result_view AS
SELECT "databasename","tablename", 
"ruleset_name","runid", "resultid", 
"state", "numrulessucceeded", 
"numrulesfailed", "numrulesskipped", 
"score", "year","month",
"day",runs.name,runs.result,
runs.evaluationmessage,runs.Description
FROM "dq_gluedataqualitylogs"
CROSS JOIN unnest(resultrun) AS t(runs)

The following screenshot shows the output.

Verify the columns that were created using the unnest operator.

The following screenshot shows the output.

Query 4

Verify the Athena view created in the previous query:

SELECT * FROM data_quality_result_view LIMIT 10

The following screenshot shows the output.

Visualize the data with QuickSight

Now that you can query your data in Athena, you can use QuickSight to visualize the results. Complete the following steps:

  1. Sign in to the QuickSight console.
  2. In the upper right corner of the console, choose Admin/username, then choose Manage QuickSight.
  3. Choose Security and permissions.
  4. Under QuickSight access to AWS services, choose Add or remove.
  5. Choose Amazon Athena, then choose Next.
  6. Give QuickSight access to the S3 bucket where your data quality result is stored.

Create your datasets

Before you can analyze and visualize the data in QuickSight, you must create datasets for your Athena view (data_quality_result_view). Complete the following steps:

  1. On the Datasets page, choose New dataset, then choose Athena.
  2. Choose the AWS Glue database that you created earlier.
  3. Select Import to SPICE (alternatively, you can select Directly query your data).
  4. Choose Visualize.

Build your dashboard

Create your analysis with one donut chart, one pivot table, one vertical stacked bar, and one funnel chart that use the different fields in the dataset. QuickSight offers a wide range of charts and visuals to help you create your dashboard. For more information, refer to Visual types in Amazon QuickSight.

Clean up

To avoid incurring future charges, delete the resources created in this post.

Conclusion

In this post, we provide insights into running Athena queries and building customized dashboards in QuickSight to understand data quality metrics. This gives you a great starting point for using this solution with your datasets and applying business rules to build a complete data quality framework to monitor issues within your datasets.

To dive into the AWS Glue Data Quality APIs, refer to Data Quality API. To learn more about AWS Glue Data Quality, see the AWS Glue Data Quality Developer Guide. To learn more about QuickSight dashboards, refer to the Amazon QuickSight Developer Guide.


About the authors

Zack Zhou is a Software Development Engineer on the AWS Glue team.

Deenbandhu Prasad is a Senior Analytics Specialist at AWS, specializing in big data services. He is passionate about helping customers build modern data architecture on the AWS Cloud. He has helped customers of all sizes implement data management, data warehouse, and data lake solutions.

Avik Bhattacharjee is a Senior Partner Solutions Architect at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible, focusing on big data and analytics and AI/ML.

Amit Kumar Panda is a Data Architect at AWS Professional Services who is passionate about helping customers build scalable data analytics solutions to enable making critical business decisions.

Set up alerts and orchestrate data quality rules with AWS Glue Data Quality

Post Syndicated from Avik Bhattacharjee original https://aws.amazon.com/blogs/big-data/set-up-alerts-and-orchestrate-data-quality-rules-with-aws-glue-data-quality/

Alerts and notifications play a crucial role in maintaining data quality because they facilitate prompt and efficient responses to any data quality issues that may arise within a dataset. By establishing and configuring alerts and notifications, you can actively monitor data quality and receive timely alerts when data quality issues are identified. This proactive approach helps mitigate the risk of making decisions based on inaccurate information. Furthermore, it allows for necessary actions to be taken, such as rectifying errors in the data source, refining data transformation processes, and updating data quality rules.

We are excited to announce that AWS Glue Data Quality is now generally available, offering built-in integration with Amazon EventBridge and AWS Step Functions to streamline event-driven data quality management. You can access this feature today in the available Regions. It simplifies your experience of monitoring and evaluating the quality of your data.

This post is Part 4 of a five-post series to explain how to set up alerts and orchestrate data quality rules with AWS Glue Data Quality:

Solution overview

In this post, we provide a comprehensive guide on enabling alerts and notifications using Amazon Simple Notification Service (Amazon SNS) We walk you through the step-by-step process of using EventBridge to establish rules that activate an AWS Lambda function when the data quality outcome aligns with the designated pattern. The Lambda function is responsible for converting the data quality metrics and dispatching them to the designated email addresses via Amazon SNS.

To expedite the implementation of the solution, we have prepared an AWS CloudFormation template for your convenience. AWS CloudFormation serves as a powerful management tool, enabling you to define and provision all necessary infrastructure resources within AWS using a unified and standardized language.

The solution aims to automate data quality evaluation for AWS Glue Data Catalog tables (data quality at rest) and allows you to configure email notifications when the AWS Glue Data Quality results become available.

The following architecture diagram provides an overview of the complete pipeline.

The data pipeline consists of the following key steps:

  1. The first step involves AWS Glue Data Quality evaluations that are automated using Step Functions. The workflow is designed to start the evaluations based on the rulesets defined on the dataset (or table). The workflow accepts input parameters provided by the user.
  2. An EventBridge rule receives an event notification from the AWS Glue Data Quality evaluations including the results. The rule evaluates the event payload based on the predefined rule and then triggers a Lambda function for notification.
  3. The Lambda function sends an SNS notification containing data quality statistics to the designated email address. Additionally, the function writes the customized result to the specified Amazon Simple Storage Service (Amazon S3) bucket, ensuring its persistence and accessibility for further analysis or processing.

The following sections discuss the setup for these steps in more detail.

Deploy resources with AWS CloudFormation

We create several resources with AWS CloudFormation, including a Lambda function, EventBridge rule, Step Functions state machine, and AWS Identity and Access Management (IAM) role. Complete the following steps:

  1. To launch the CloudFormation stack, choose Launch Stack:
  2. Provide your email address for EmailAddressAlertNotification, which will be registered as the target recipient for data quality notifications.
  3. Leave the other parameters at their default values and create the stack.

The stack takes about 4 minutes to complete.

  1. Record the outputs listed on the Outputs tab on the AWS CloudFormation console.
  2. Navigate to the S3 bucket created by the stack (DataQualityS3BucketNameStaging) and upload the file yellow_tripdata_2022-01.parquet file.
  3. Check your email for a message with the subject “AWS Notification – Subscription Confirmation” and confirm your subscription.

Now that the CloudFormation stack is complete, let’s update the Lambda function code before running the AWS Glue Data Quality pipeline using Step Functions.

Update the Lambda function

This section explains the steps to update the Lambda function. We modify the ARN of Amazon SNS and the S3 output bucket name based on the resources created by AWS CloudFormation.

Complete the following steps:

  1. On the Lambda console, choose Functions in the navigation pane.
  2. Choose the function GlueDataQualityBlogAlertLambda-xxxx (created by the CloudFormation template in the previous step).
  3. Modify the values for sns_topic_arn and s3bucket with the corresponding values from the CloudFormation stack outputs for SNSTopicNameAlertNotification and DataQualityS3BucketNameOutputs, respectively.
  4. On the File menu, choose Save.
  5. Choose Deploy.

Now that we’ve updated the Lambda function, let’s check the EventBridge rule created by the CloudFormation template.

Review and analyze the EventBridge rule

This section explains the significance of the EventBridge rule and how rules use event patterns to select events and send them to specific targets. In this section, we create a rule with an event pattern set as Data Quality Evaluations Results Available and configure the target as a Lambda function.

  1. On the EventBridge console, choose Rules in the navigation pane.
  2. Choose the rule GlueDataQualityBlogEventBridge-xxxx.

On the Event pattern tab, we can review the source event pattern. Event patterns are based on the structure and content of the events generated by various AWS services or custom applications.

  1. We set the source as aws-glue-dataquality with the event pattern detail type Data Quality Evaluations Results Available.

On the Targets tab, you can review the specific actions or services that will be triggered when an event matches a specified pattern.

  1. Here, we configure EventBridge to invoke a specific Lambda function when an event matches the defined pattern.

This allows you to run serverless functions in response to events.

Now that you understand the EventBridge rule, let’s review the AWS Glue Data Quality pipeline created by Step Functions.

Set up and deploy the Step Functions state machine

AWS CloudFormation created the StateMachineGlueDataQualityCustomBlog-xxxx state machine to orchestrate the evaluation of existing AWS Glue Data Quality rules, creation of custom rules if needed, and subsequent evaluation of the ruleset. Complete the following steps to configure and run the state machine:

  1. On the Step Functions console, choose State machines in the navigation pane.
  2. Open the state machine StateMachineGlueDataQualityCustomBlog-xxxx.
  3. Choose Edit.
  4. Modify row 80 with the IAM role ARN starting with GlueDataQualityBlogStepsFunctionRole-xxxx and choose Save.

Step Functions needs certain permissions (least priviledge) to run the state machine and evaluate the AWS Glue Data Quality ruleset.

  1. Choose Start execution.
  2. Provide the following input:
    {
        "ruleset_name": "<AWS CloudFormation outputs key:GlueDataQualityCustomRulesetName>",
      	"database_name" : "<AWS CloudFormation outputs key:DataQualityDatabase>" ,
      	"table_name" : " <AWS CloudFormation outputs key:DataQualityTable>" ,
      	"dq_output_location" : "s3://<AWS CloudFormation outputs key:DataQualityS3BucketNameOutputs>/defaultlogs"
    }

This step assumes the existence of the ruleset and runs the workflow as depicted in the following screenshot. It runs the data quality ruleset evaluation and writes results to the S3 bucket.

If it doesn’t find the ruleset name in the data quality rules, it will create a custom ruleset for you and perform the data quality ruleset evaluation. AWS Step Functions is creating the custom ruleset. Below is a code snippet from the state machine code.


State machine results and run options

The Step Functions state machine has run AWS the Glue Data Quality evaluation. Now EventBridge matches the pattern Data Quality Evaluations Results Available and triggers the Lambda function. The Lambda function writes customized AWS Glue Data Quality metrics results to the S3 bucket and sends an email notification via Amazon SNS.

The following sample email provides operational metrics for the AWS Glue Data Quality ruleset evaluation. It provides details about the ruleset name, the number of rules passed or failed, and the score. This helps you visualize the results of each rule along with the evaluation message if a rule fails.

You have the flexibility to choose between two run modes for the Step Functions workflow:

  • The first option is on-demand mode, where you manually trigger the Step Functions workflow whenever you want to initiate the AWS Glue Data Quality evaluation.
  • Alternatively, you can schedule the entire Step Functions workflow using EventBridge. With EventBridge, you can define a schedule or specific triggers to automatically initiate the workflow at predetermined intervals or in response to specific events. This automated approach reduces the need for manual intervention and streamlines the data quality evaluation process. For more details, refer to Schedule a Serverless Workflow.

Clean up

To avoid incurring future charges and to clean up unused roles and policies, delete the resources you created:

  1. On the AWS CloudFormation console, choose Stacks in the navigation pane.
  2. Select your stack and delete it.

If you’re continuing to Part 5 in this series, you can skip this step.

Conclusion

In this post, we discussed three key steps that organizations can take to optimize data quality and reliability on AWS:

  • Create a CloudFormation template to ensure consistency and reproducibility in deploying AWS resources.
  • Integrate AWS Glue Data Quality ruleset evaluation and Lambda to automatically evaluate data quality and receive event-driven alerts and email notifications via Amazon SNS. This significantly enhances the accuracy and reliability of your data.
  • Use Step Functions to orchestrate AWS Glue Data Quality ruleset actions. You can create and evaluate custom and recommended rulesets, optimizing data quality and accuracy.

These steps form a comprehensive approach to data quality and reliability on AWS, helping organizations maintain high standards and achieve their goals.

To dive into the AWS Glue Data Quality APIs, refer to Data Quality APIs. To learn more about AWS Glue Data Quality, check out the AWS Glue Data Quality Developer Guide.

If you require any assistance in constructing this pipeline within the AWS Lake Formation environment or if you have any inquiries regarding this post, please inform us in the comments section or initiate a new thread on the Lake Formation forum.


About the authors

Avik Bhattacharjee is a Senior Partner Solution Architect at AWS. He works with customers to build IT strategy, making digital transformation through the cloud more accessible, focusing on big data and analytics and AI/ML.

Amit Kumar Panda is a Data Architect at AWS Professional Services who is passionate about helping customers build scalable data analytics solutions to enable making critical business decisions.

Neel Patel is a software engineer working within GlueML. He has contributed to the AWS Glue Data Quality feature and hopes it will expand the repertoire for all AWS CloudFormation users along with displaying the power and usability of AWS Glue as a whole.

Edward Cho is a Software Development Engineer at AWS Glue. He has contributed to the AWS Glue Data Quality feature as well as the underlying open-source project Deequ.

Set up advanced rules to validate quality of multiple datasets with AWS Glue Data Quality

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/set-up-advanced-rules-to-validate-quality-of-multiple-datasets-with-aws-glue-data-quality/

Data is the lifeblood of modern businesses. In today’s data-driven world, companies rely on data to make informed decisions, gain a competitive edge, and provide exceptional customer experiences. However, not all data is created equal. Poor-quality data can lead to incorrect insights, bad decisions, and lost opportunities.

AWS Glue Data Quality measures and monitors the quality of your dataset. It supports both data quality at rest and data quality in AWS Glue extract, transform, and load (ETL) pipelines. Data quality at rest focuses on validating the data stored in data lakes, databases, or data warehouses. It ensures that the data meets specific quality standards before it is consumed. Data quality in ETL pipelines, on the other hand, ensures the quality of data as it moves through the ETL process. It helps identify data quality issues during the ETL pipeline, allowing for early detection and correction of problems and prevents the failure of the data pipeline because of data quality issues.

This is Part 3 of a five-post series on AWS Glue Data Quality. In this post, we demonstrate the advanced data quality checks that you can typically perform when bringing data from a database to an Amazon Simple Storage Service (Amazon S3) data lake. Check out the other posts in this series:

Use case overview

Let’s consider an example use case where we have a database named classicmodels that contains retail data for a car dealership. This example database includes sample data for various entities, such as Customers, Products, ProductLines, Orders, OrderDetails, Payments, Employees, and Offices. You can find more details about this example database in MySQL Sample Database.

In this scenario, we assume the role of a data engineer who is responsible for building a data pipeline. The primary objective is to extract data from a relational database, specifically an Amazon RDS for MySQL database, and store it in Amazon S3, which serves as a data lake. After the data is loaded into the data lake, the data engineer is also responsible for performing data quality checks to ensure that the data in the data lake maintains its quality. To achieve this, the data engineer uses the newly launched AWS Glue Data Quality evaluation feature.

The following diagram illustrates the entity relationship model that describes the relationships between different tables. In this post, we use the employees, customers, and products table.

Solution overview

This solution focuses on transferring data from an RDS for MySQL database to Amazon S3 and performing data quality checks using the AWS Glue ETL pipeline and AWS Glue Data Catalog. The workflow involves the following steps:

  1. Data is extracted from the RDS for MySQL database using AWS Glue ETL.
  2. The extracted data is stored in Amazon S3, which serves as the data lake.
  3. The Data Catalog and AWS Glue ETL pipeline are utilized to validate the successful completion of data ingestion by performing data quality checks on the data stored in Amazon S3.

The following diagram illustrates the solution architecture.

To implement the solution, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Establish a connection to the RDS for MySQL instance from AWS Cloud9.
  3. Run an AWS Glue crawler on the RDS for MySQL database.
  4. Validate the Data Catalog.
  5. Run an AWS Glue ETL job to bring data from Amazon RDS for MySQL to Amazon S3.
  6. Evaluate the advanced data quality rules in the ETL job.
  7. Evaluate the advanced data quality rules in the Data Catalog.

Set up resources with AWS CloudFormation

This post includes a CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template generates the following resources:

  • An RDS for MySQL database instance (source)
  • An S3 bucket for the data lake (destination)
  • An AWS Glue ETL job to bring data from source to destination
  • An AWS Glue crawler to crawl the RDS for MySQL databases and create a centralized Data Catalog
  • AWS Identity and Access Management (IAM) users and policies
  • An AWS Cloud9 environment to connect to the RDS DB instance and create a sample dataset
  • An Amazon VPC, public subnet, two private subnets, internet gateway, NAT gateway, and route tables

To launch the CloudFormation stack, complete the following steps:

  1. Sign in to the AWS CloudFormation console.
  2. Choose Launch Stack:
    BDB-2063-launch-cloudformation-stack
  3. Choose Next.
  4. For DatabaseUserPassword, enter your preferred password.
  5. Choose Next.
  6. Scroll to the end and choose Next.
  7. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names and choose Submit.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Establish a connection to the RDS for MySQL instance from AWS Cloud9

To connect to the RDS for MySQL instance, complete the following steps:

  1. On the AWS Cloud9 console, choose Open under Cloud9 IDE for your environment.
  2. Run the following command to the AWS Cloud9 terminal. Provide your values for the MySQL endpoint (located on the CloudFormation stack’s Outputs tab), database user name, and database user password:
    $ mysql --host=<MySQLEndpoint> --user=<DatabaseUserName> password=<password>

  3. Download the SQL file.
  4. On the File menu, choose Upload from Local Files and upload the file to AWS Cloud9.
  5. Run the following SQL commands within the downloaded file:
    MySQL [(none)]> source mysqlsampledatabase.sql

  6. Retrieve a list of tables using the following SQL statement and make sure that eight tables are loaded successfully:
    use classicmodels;
    show tables;

Run an AWS Glue crawler on the RDS for MySQL database

To run your crawler, complete the following steps:

  1. On the AWS Glue console, choose Crawlers under Data Catalog in the navigation pane.
  2. Locate and run the crawler dq-rds-crawler.

The crawler will take a few minutes to crawl all the tables from the classicmodels database.

Validate the AWS Glue Data Catalog

To validate the Data Catalog when the crawler is complete, complete the following steps:

  1. On the AWS Glue console, choose Databases under Data Catalog in the navigation pane.
  2. Choose the mysql_private_classicmodels database.

You will able to see all the RDS tables available under mysql_private_classicmodels.

Run an AWS Glue ETL job to bring data from Amazon RDS for MySQL to Amazon S3

To run your ETL job, complete the following steps:

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select dq-rds-to-s3 from the job list and choose Run job.

When the job is complete, you will able to see three new tables under mysql_s3_db. It may take a few minutes to complete.

Now let’s dive into evaluating the data quality rules.

Evaluate the advanced data quality rules in the ETL job

In this section, we evaluate the results of different data quality rules.

ReferentialIntegrity

Let’s start with referential integrity. The ReferentialIntegrity data quality ruleset is currently supported in ETL jobs. This feature ensures that the relationships between tables in a database are maintained. It checks if the foreign key relationships between tables are valid and consistent, helping to identify any referential integrity violations.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. In AWS Glue Studio, select Visual with a blank canvas.
  3. Provide a name for your job; for example, RDS ReferentialIntegrity.
  4. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  5. For Name, enter a name for your data source; for example, employees.
  6. For Database, choose mysql_private_classicmodels.
  7. For Table, choose mysql_classicmodels_employees.
  8. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  9. For Name, enter a name for your data source; for example, customers.
  10. For Database, choose mysql_private_classicmodels.
  11. For Table, choose mysql_classicmodels_employees.
  12. Choose the plus sign in the AWS Glue Studio canvas and on the Transform tab, choose Evaluate Data Quality.
  13. For Node parents, choose employees and customers.
  14. For Aliases for referenced data source, select Primary source for employees and for customers, enter the alias customers.

All other datasets are used as references to ensure that the primary dataset has good-quality data.

  1. Search for ReferentialIntegrity under Rule types and choose the plus sign to add an example ReferentialIntegrity rule.
  2. Replace the rule with the following code and keep the remaining options as default:
    Rules = [
        ReferentialIntegrity "employeenumber" "customers.salesRepEmployeeNumber" between 0.6 to 0.7
    ]

  3. Under Data quality action, select Publish results to Amazon CloudWatch and select Fail job without loading target data.
  4. On the Job details tab, choose GlueServiceRole-for-gluedq-blog for IAM role and keep the remaining options as default.
  5. Choose Run and wait for the job to complete.

It will take a few minutes to complete.

  1. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

You can confirm if the job completed successfully and which data quality rules it passed. In this example, it indicates that 60–70% of EmployeeNumber from the employees table are present in the customers table.

You can identify which records failed the referential integrity using AWS Glue Studio. To learn more, refer to Getting started with AWS Glue Data Quality for ETL Pipelines.

Similarly, if you are checking if all the EmployeeNumber from the employees table are present in the customers table, you can pass the following rule:

Rules = [
    ReferentialIntegrity "employeenumber" "customers.salesRepEmployeeNumber" = 1
]

DatasetMatch

DatasetMatch compares two datasets to identify differences and similarities. You can use it to detect changes between datasets or to find duplicates, missing values, or inconsistencies across datasets.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. In AWS Glue Studio, select Visual with a blank canvas.
  3. Provide a name for your job; for example, RDS DatasetMatch.
  4. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  5. For Name, enter a name for your data source; for example, rds_employees_primary.
  6. For Database, choose mysql_private_classicmodels.
  7. For Table, choose mysql_classicmodels_employees.
  8. Choose the plus sign in the AWS Glue Studio canvas and on the Data tab, choose AWS Glue Data Catalog.
  9. For Name, enter a name for your data source; for example, s3_employees_reference.
  10. For Database, choose mysql_s3_db.
  11. For Table, choose s3_employees.
  12. Choose the plus sign in the AWS Glue Studio canvas and on the Transform tab, choose Evaluate Data Quality.
  13. For Node parents, choose employees and customers.
  14. For Aliases for referenced data source, select Primary source for rds_employees_primary and for s3_employees_reference, enter the alias reference.
  15. Replace the default example rules with the following code and keep the remaining options as default:
    Rules = [
        DatasetMatch "reference" "employeenumber,employeenumber" = 1
    ]

  16. On the Job details tab, choose GlueServiceRole-for-gluedq-blog for IAM role and keep the remaining options as default.
  17. Choose Run and wait for the job to complete.
  18. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

In this example, it indicates both datasets are identical.

AggregateMatch

AggregateMatch verifies the accuracy of aggregated data. It compares the aggregated values in a dataset against the expected results to identify any discrepancies, such as incorrect sums, averages, counts, or other aggregate calculations. This is a performant option to evaluate if two datasets match at an aggregate level. For this rule, we clone the previous job we created for DatasetMatch.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select RDS DatasetMatch and on the Actions menu, choose Clone job.
  3. Change the job name to DQ AggregateMatch.
  4. Change the dataset rds_employees_primary to rds_products_primary and the table to mysql_classicmodels_products.
  5. Change the dataset s3_orders_reference to s3_products_reference and the table to s3_products.
  6. Choose Evaluate Data Quality, and under Node parents, choose rds_products_primary and s3_products_reference.
  7. Replace the rules with the following code:
    AggregateMatch "avg(MSRP)" "avg(reference.MSRP)" = 1

  8. Choose Run and wait for the job to complete.
  9. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

The results indicate that the avg(msrp) on both datasets is the same.

RowCountMatch

RowCountMatch checks the number of rows in a dataset and compares it to an expected count. It helps identify missing or extra rows in a dataset, ensuring data completeness. For this rule, we edit the job we created earlier for AggregateMatch.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select RDS AggregateMatch and on the Actions menu, choose Edit job.
  3. Choose Evaluate Data Quality and choose the plus sign next to RowCountMatch.
  4. Keep the default data quality rules and choose Save:
    RowCountMatch "reference" = 1.0

  5. Choose Run and wait for the job to complete.
  6. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

It shows that the DQ RowCountMatch rule failed, indicating a mismatch between the row count of the source RDS table and the target S3 table. Further investigation reveals that the ETL job ran four times for the Products table, and the row counts didn’t match.

SchemaMatch

SchemaMatch validates the schema of two datasets matches. It checks if the actual data types match the expected data types and flags any inconsistencies, such as a numeric column containing non-numeric values. For this rule, we edit the job we used for AggregateMatch.

  1. On the AWS Glue console, choose Visual ETL under ETL jobs in the navigation pane.
  2. Select RDS AggregateMatch and on the Actions menu, choose Edit job.
  3. Choose Evaluate Data Quality and choose the plus sign next to RowCountMatch.
  4. Update the default rules with the following code and save the job:
    SchemaMatch "reference" = 1.0

  5. Choose Run and wait for the job to complete.
  6. When the job is complete, navigate to the Data quality tab and locate the Data quality results section.

It should show a successful completion with a Rule passed status, indicating that the schemas of both datasets are identical.

Evaluate the advanced data quality rules in the Data Catalog

The AWS Glue Data Catalog also supports advanced data quality rules. For this post, we show one example of an aggregate match between Amazon S3 and Amazon RDS.

  1. On the AWS Glue console, choose Databases in the navigation pane.
  2. Choose the mysql_private_classicmodels database to view the three tables created under it.
  3. Choose the mysql_classicmodels_products table.
  4. On the Data quality tab, choose Create data quality rules.
  5. Search for AggregateMatch and choose the plus sign to view the default example rule.
  6. Add the following rules:
    Rules = [
        AggregateMatch "avg(msrp)" "avg(mysql_s3_db.s3_products.msrp)" >= 0.9,
        ReferentialIntegrity "productname,productcode" "mysql_s3_db.s3_products.{productname,productcode}" = 1
        ]

reference is the alias of the secondary dataset defined in the AWS Glue ETL job. For the Data Catalog, you can use <database_name>.<table_name>.<column_name> to reference secondary datasets.

  1. Choose Save ruleset and provide the name production_catalog_dq_check.
  2. Choose GlueServiceRole-for-gluedq-blog for IAM role and keep the remaining options as default.
  3. Choose Run and wait for the data quality check to complete.

When the job is complete, you can confirm that both data quality checks passed.

With these advanced data quality features of AWS Glue Data Quality, you can enhance the reliability, accuracy, and consistency of your data, leading to better insights and decision-making.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the AWS Glue job.
  2. Delete the CloudFormation stack.

Conclusion

Data quality refers to the accuracy, completeness, consistency, timeliness, and validity of the information being collected, processed, and analyzed. High-quality data is essential for businesses to make informed decisions, gain valuable insights, and maintain their competitive advantage. As data complexity increases, advanced rules are critical to handle complex data quality challenges. The rules we demonstrated in this post can help you manage the quality of data that lives in disparate data sources, providing you the capabilities to reconcile them. Try them out and provide your feedback on what other use cases you need to solve!


About the authors

Navnit Shukla is AWS Specialist Solutions Architect in Analytics. He is passionate about helping customers uncover insights from their data. He builds solutions to help organizations make data-driven decisions.

Rahul Sharma is a Software Development Engineer at AWS Glue. He focuses on building distributed systems to support features in AWS Glue. He has a passion for helping customers build data management solutions on the AWS Cloud.

Edward Cho is a Software Development Engineer at AWS Glue. He has contributed to the AWS Glue Data Quality feature as well as the underlying open-source project Deequ.

Shriya Vanvari is a Software Developer Engineer in AWS Glue. She is passionate about learning how to build efficient and scalable systems to provide better experience for customers. Outside of work, she enjoys reading and chasing sunsets.

Getting started with AWS Glue Data Quality from the AWS Glue Data Catalog

Post Syndicated from Stuti Deshpande original https://aws.amazon.com/blogs/big-data/getting-started-with-aws-glue-data-quality-from-the-aws-glue-data-catalog/

AWS Glue is a serverless data integration service that makes it simple to discover, prepare, and combine data for analytics, machine learning (ML), and application development. You can use AWS Glue to create, run, and monitor data integration and ETL (extract, transform, and load) pipelines and catalog your assets across multiple data stores.

Hundreds of thousands of customers use data lakes for analytics and ML to make data-driven business decisions. Data consumers lose trust in data if it isn’t accurate and recent, making data quality essential for undertaking optimal and correct decisions.

Evaluation of the accuracy and freshness of data is a common task for engineers. Currently, various tools are available to evaluate data quality. However, these tools often require manual processes of data discovery and expertise in data engineering and coding.

AWS Glue Data Quality is a new feature of AWS Glue that measures and monitors the data quality of Amazon Simple Storage Service (Amazon S3)-based data lakes, data warehouses, and other data repositories. AWS Glue Data Quality can be accessed in the AWS Glue Data Catalog and in AWS Glue ETL jobs.

This is Part 1 of a five-part series of posts to explain how AWS Glue Data Quality works. Check out the next posts in the series:

In this post, we explore using the AWS Glue Data Quality feature by generating data quality recommendations and running data quality evaluations on your table in the Data Catalog. Then we demonstrate how to analyze your AWS Glue Data Quality run results through Amazon Athena.

Solution overview

We guide you through the following steps:

  1. Provision resources with AWS CloudFormation.
  2. Explore the generated recommendation rulesets and define rulesets to evaluate your table in the Data Catalog.
  3. Review the AWS Glue Data Quality recommendations.
  4. Analyze your AWS Glue Data Quality evaluation results with Athena.
  5. Operationalize the solution by setting up alerts and notifications using integration with Amazon EventBridge and Amazon Simple Notification Service (Amazon SNS).

For this post, we use the NYC Taxi dataset yellow_tripdata_2022-01.parquet.

Set up resources with AWS CloudFormation

The provided CloudFormation template creates the following resources for you:

  • The AWS Identity and Access Management (IAM) role required to run AWS Glue Data Quality evaluations
  • An S3 bucket to store the NYC Taxi dataset
  • An S3 bucket to store and analyze the results of AWS Glue Data Quality evaluations
  • An AWS Glue database and table created from the NYC Taxi dataset

Launch your CloudFormation stack

To create your resources for this use case, complete the following steps:

  1. Launch your CloudFormation stack in us-east-1:
    BDB-2063-launch-cloudformation-stack
  2. Under Parameters:
    • For Stack name, proceed with the default value myDQStack.
    • For DataQualityDatabase, proceed with the default value data_quality_catalog.
    • For DataQualityS3BucketName, provide a bucket name of your choice.
    • For DataQualityTable, proceed with the default value data_quality_tripdata_table

  3. Select I acknowledge that AWS CloudFormation might create IAM resources with custom names.
  4. Choose Create stack.

After the stack is successfully created, you can see all the resources created on the Resources tab.

  1. Navigate to the S3 bucket created by the stack and upload the yellow_tripdata_2022-01.parquet file.

Explore recommendation rulesets and define rulesets to evaluate your table

In this section, we generate data quality rule recommendations from AWS Glue Data Quality. We use these recommendations to run a data quality task against our dataset to obtain an analysis of our data.

Complete the following steps:

  1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Tables.
  2. Choose the data_quality_tripdata_table table created via the CloudFormation stack.
  3. Choose the Data quality tab.

In this section, you will find a video to get you started with AWS Glue Data Quality. It also lists features, pricing, and documentation.

  1. Choose Create data quality rules.

This is the ruleset console. You will find a Request data quality rule recommendations banner at the top. AWS Glue will scan your data and automatically generate rule recommendations.

  1. Choose Recommend rules.
  2. For IAM role, choose the IAM role created as part of the Cloud Formation template (GlueDataQualityBlogRole).
  3. Optionally, you can filter your data before reading on column values. This feature is available for Amazon S3-based data sources.
  4. For Requested number of workers, allocate the number of workers to run the recommendation task. For this post, we use the default value of 5.
  5. For Task timeout, set the runtime for this task. For this post, we use the default of 120 minutes.
  6. Choose Recommend rules.

The recommendation task will start instantly, and you will observe the status on the top changes to Starting.

Next, we add some of these recommended rules into our ruleset.

  1. When you see the recommendation run as Completed, choose Insert rule recommendations to select the rules that are recommended for you.

Make sure to place the cursor inside the brackets Rules = [ ].

  1. Select the following rules:
    • ColumnValues “VendorID” <=6
    • Completeness “passenger_count”>=0.97
  2. Choose Add selected rules.

You can see that these rules were automatically added to the ruleset.

Understanding AWS Glue Data Quality recommendations

AWS Glue Data Quality recommendations are suggestions generated by the AWS Glue Data Quality service and are based on the shape of your data. These recommendations automatically take into account aspects like row counts, mean, standard deviation, and so on, and generate a set of rules for you to use as a starting point.

The dataset used here was the NYC Taxi dataset. Based on this, the columns in this dataset, and the values of those columns, AWS Glue Data Quality recommends a set of rules. In total, the recommendation service automatically took into consideration all the columns of the dataset, and recommended 55 rules.

Some of these rules are:

  • ColumnValues “VendorID” <=6 – The ColumnValues rule type checks the percentage of complete (non-null) values in a column against a given expression. This rule resolves to true if the rule type response is less than or equal to value 6.
  • Completeness “passenger_count”>=0.97 – The Completeness rule type checks the percentage of complete (non-null) values in a column against a given expression. In this case, the rule checks if more than 97% of the values in a column are complete.

In addition to adding auto-generated recommendation rules, we manually add some rules to the ruleset. AWS Glue Data Quality provides some out-of-the-box rule types to choose from. For this post, we manually add the IsComplete rule for VendorID.

  1. In the left pane, on the Rule types tab, search IsComplete rule type and choose the plus sign next to IsComplete to add this rule.
  2. For the value within the quotation marks, enter VendorID.

Conversely, you could navigate to the Schema tab and add IsComplete to VendorID.

  1. Choose Save ruleset.

Next, we add a CustomSQL rule by selecting the rule type CustomSql, that validates that there are no fares charged for a trip if there are no passengers. This is to identify if there are any fraudulent transactions for fare_amount > 0 where passenger_count = 0. The rule is:

CustomSql "select count(*) from primary where passenger_count=0 and fare_amount > 0" = 0

There are two ways to provide the table name:

  • Either you can use the keyword “primary” for the table under consideration
  • You can use the full path such as database_name.table_name
  1. On the Rule types tab, choose the plus sign next to CustomSQL and enter the SQL statement.

The final ruleset looks like the following screenshot.

  1. Choose Save ruleset.
  2. For Ruleset name, enter a name.
  3. Choose Save ruleset.
  4. On the ruleset page, choose Run.
  5. For IAM role¸ choose GlueDataQualityBlogRole.
  6. Select Publish run metrics to Amazon CloudWatch.
  7. For Data quality result location, enter the S3 bucket location for the data quality results which is already created for you as part of Cloud Formation template (for this post, data-quality-tripdata-results).
  8. For Run frequency, choose On demand.
  9. Expand Additional configurations.
  10. For Requested number of workers, enter 5.
  11. Leave the remaining fields as is and choose Run.

The status changes to Starting.

  1. When it’s complete, choose the ruleset and choose Run history.
  2. Choose the run ID to find more about the run details.

Under Data quality result, you will also observe the result shows as DQ passed or DQ failed.

In the Evaluation run details section, you will find all the details about the data quality task run and rules that passed or failed. You can either view these results by navigating to the S3 bucket or downloading the results. Observe that the data quality task failed because one of the rules failed.

For the first section, AWS Glue Data Quality suggested 51 rules, based on the column values and the data within our NYC Taxi dataset. We selected a few rules out of the 51 rules into a ruleset and ran an AWS Glue Data Quality evaluation task using our ruleset against our dataset. In our results, we see the status of each rule within the run details of the data quality task.

You can also utilize the AWS Glue Data Quality APIs to carry out these steps.

Analyze your AWS Glue Data Quality evaluation results with Athena

If you have multiple AWS Glue Data Quality evaluation results against a dataset, you might want to track the trends of the dataset’s quality over a period of time. To achieve this, we can export our AWS Glue Data Quality evaluation results to Amazon S3, and use Athena to run analytical queries against the exported results. You could further use the results in Amazon QuickSight to build dashboards to have a graphical representation of your data quality trends

In Part 3 of this series, we show the steps needed to start tracking data on your dataset’s quality.

For our data quality runs that we set up in the previous sections, we set the Data quality results location parameter to the bucket location specified by the CloudFormation stack. After each successful run, you should see a Parquet format file that contains a single JSONL file being exported to your selected S3 location, corresponding to that particular run.

Complete the following steps to analyze the data:

  1. Navigate to Amazon Athena and on the console, navigate to Query Editor.
  2. Run the following CREATE TABLE statement (replace the <my_table_name> with a relevant value of your choice and <GlueDataQualityResultsS3Bucket_from_cfn> with the S3 bucket name to store data-quality results; the bucket name will have trailing keyword results, for example <given-name-results>. For this post, it is data-quality-tripdata-results.
    CREATE EXTERNAL TABLE `<my_table_name>`(
    `catalogid` string,
    `databasename` string,
    `tablename` string,
    `dqrunid` string,
    `evaluationstartedon` timestamp,
    `evaluationcompletedon` timestamp,
    `rule` string,
    `outcome` string,
    `failurereason` string,
    `evaluatedmetrics` string)
    PARTITIONED BY (
    `year` string,
    `month` string,
    `day` string)
    ROW FORMAT SERDE
    'org.openx.data.jsonserde.JsonSerDe'
    WITH SERDEPROPERTIES (
    'paths'='catalogId,databaseName,dqRunId,evaluatedMetrics,evaluationCompletedOn,evaluationStartedOn,failureReason,outcome,rule,tableName')
    STORED AS INPUTFORMAT
    'org.apache.hadoop.mapred.TextInputFormat'
    OUTPUTFORMAT
    'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
    's3://<GlueDataQualityResultsS3Bucket_from_cfn>/'
    TBLPROPERTIES (
    'classification'='json',
    'compressionType'='none',
    'typeOfData'='file')
    MSCK REPAIR TABLE `<my_table_name>`

After you create the table, you should be able to run queries to analyze your data quality results.

For example, consider the following query that shows the passed AWS Glue Data Quality evaluations against the table data_quality_tripdata_table within a certain time window. You can select the datetime values from the data quality results table (that you created above) <my_table_name> from the columns evaluationcompletedon to specify values for parse_datetime() within a certain duration in the following query:

SELECT * from `<my_table_name>`
WHERE "outcome" = 'Passed'
AND "tablename" = `data_quality_tripdata_table`
AND "evaluationcompletedon" between
parse_datetime('2023-05-12 00:00:00:000', 'yyyy-MM-dd HH:mm:ss:SSS') AND parse_datetime('2023-05-12 21:16:21:804', 'yyyy-MM-dd HH:mm:ss:SSS');

The output of the preceding query shows us details about all the runs with “outcome” = ‘Passed’ that ran against the NYC Taxi dataset table (“tablename” = ‘data_quality_tripdata_table’). The output also provides details about the rules passed and evaluated metrics.

As you can see, we are able to get detailed information about our AWS Glue Data Quality evaluations via the results uploaded to Amazon S3 and perform more detailed analysis.

Set up alerts and notifications using EventBridge and Amazon SNS

Alerts and notifications are important for data quality to enable timely and effective responses to data quality issues that arise in the dataset. By setting up alerts and notifications, you can proactively monitor the data quality and be alerted as soon as any data quality issues are detected. This reduces the risk of making decisions based on incorrect information.

AWS Glue Data Quality also offers integration with EventBridge for alerting and notification by triggering an AWS Lambda function that sends a customized SNS notification when the AWS Glue Data Quality ruleset evaluation is complete. Now you can receive event-driven alerts and email notifications via Amazon SNS. This integration significantly enhances the accuracy and reliability of data.

Clean up

To clean up your resources, complete the following steps:

  1. On the Athena console, delete the table created for data quality analysis.
  2. On the CloudWatch console, delete the alarms created.
  3. If you deployed the sample CloudFormation stack, delete the stack via the AWS CloudFormation console. You will need to empty the S3 bucket before you delete the bucket.
  4. If you enabled your AWS Glue Data Quality runs to output to Amazon S3, empty those buckets as well.

Conclusion

In this post, we talked about the ease and speed of incorporating data quality rules using AWS Glue Data Quality into your Data Catalog tables. We also talked about how to run recommendations and evaluate data quality against your tables. We then discussed analyzing the data quality results via Athena, and discussed integrations with EventBridge and Amazon SNS for alerts and notifications to get notified for data quality issues.

To dive into the AWS Glue Data Quality APIs, refer to Data Quality API documentation. To learn more about AWS Glue Data Quality, check out AWS Glue Data Quality.


About the authors

Stuti Deshpande is an Analytics Specialist Solutions Architect at AWS. She works with customers around the globe, providing them strategic and architectural guidance on implementing analytics solutions using AWS. She has extensive experience in Big Data, ETL, and Analytics. In her free time, Stuti likes to travel, learn new dance forms, and enjoy quality time with family and friends.

Aniket Jiddigoudar is a Big Data Architect on the AWS Glue team. He works with customers to help improve their big data workloads. In his spare time, he enjoys trying out new food, playing video games, and kickboxing.

Joseph Barlan is a Frontend Engineer at AWS Glue. He has over 5 years of experience helping teams build reusable UI components and is passionate about frontend design systems. In his spare time, he enjoys pencil drawing and binge watching tv shows.

Jesus Max Hernandez is a Software Development Engineer at AWS Glue. He joined the team in August after graduating from The University of Texas at El Paso. Outside of work, you can find him practicing guitar or playing softball in Central Park.

Divya Gaitonde

is a UX designer at AWS Glue. She has over 8 years of experience driving impact through data-driven products and seamless experiences. Outside of work, you can find her catching up on reading or people watching at a museum.

Enable complex row-level security in embedded dashboards for non-provisioned users in Amazon QuickSight with OR-based tags

Post Syndicated from Srikanth Baheti original https://aws.amazon.com/blogs/big-data/enable-complex-row-level-security-in-embedded-dashboards-for-non-provisioned-users-in-amazon-quicksight-with-or-based-tags/

Amazon QuickSight is a fully managed, cloud-native business intelligence (BI) service that makes it easy to connect to your data, create interactive dashboards, and share these with tens of thousands of users, both within QuickSight and embedded in your software as a service (SaaS) applications.

QuickSight Enterprise edition started supporting nested conditions within row-level security (RLS) tags where you can combine AND and OR conditions to simplify multi-tenant access patterns. Previously, QuickSight only supported the AND operator for all tags. When users are assigned multiple roles, which enables them to view data in multiple dimensions, you need both AND and OR operators to express RLS rules. QuickSight enables authors and developers to use the OR operator in the form of OR of AND, which allows you to satisfy even the most complex data security scenarios. In this post, we look at how this can be implemented.

Feature overview

When you embed QuickSight dashboards in your application for users who aren’t provisioned (registered) in QuickSight, this is called anonymous embedding. In this scenario, even though the user is anonymous to QuickSight, you can still customize the data that user sees in the dashboard using RLS tags.

You can do this in three simple steps:

  1. Add RLS tags to a dataset.
  2. Add the OR condition to RLS tags.
  3. Assign values to those tags at runtime using the GenerateEmbedUrlForAnonymousUser API operation. For more information, see Embedding QuickSight data dashboards for anonymous (unregistered) users.

To see this feature in action, see Using tag-based rules.

Use case overview

AnyHealth Inc. is a fictitious independent software vendor (ISV) in the healthcare space. They have a SaaS application for different hospitals across different regions of the country to manage their revenue. AnyHealth Inc has thousands of healthcare employees accessing their application portal. Part of their application portal has embedded operational insights related to their business within a QuickSight dashboard. AnyHealth doesn’t want to manage their users in QuickSight separately, and wants to secure data based on who the user is and the hospital the user is affiliated to. AnyHealth decided to authorize data access to their users at runtime, enabling row-level security using tags.

AnyHealth has hospitals (North Hospital, South Hospital, and Downtown Hospital) in regions Central, East, South, and West.

In this example, the following users access AnyHealth’s application with the embedded dashboard. Each user has a certain level of data restriction that define what they can access in the dashboards. PowerUser is a super user that can see the data for all hospitals and regions.

AnyHealth’s

Application Users

Hospital Region Condition Payor State
NorthMedicaidUser North Hospital Central and East OR Medicaid New York
SouthMedicareUser South Hospital South OR Medicare All states
NorthAdmin North Hospital All regions
SouthAdmin South Hospital All regions
PowerUser All hospitals All regions

These users are only application-level users and haven’t been provisioned in QuickSight. AnyHealth wants to continue with user management and their roles at the application level as a single source of truth. This way, when the user accesses the embedded QuickSight dashboard from the application, AnyHealth must secure the data on the dashboard based on the roles and permissions that user has. AnyHealth has different combinations of user permissions; for example, all AnyHealth administrators have access to all the data that can be achieved by PowerUser permissions. A hospital admin, for example NorthAdmin, is a user who is the administrator at North Hospital and can only view all the data related to that hospital. A hospital user, for example SouthUser, is a user who has access to data at South Hospital in a specific region.

Additionally, when there are Medicaid and Medicare claims, there are special users who monitor these programs. For example, there can be a user at North Hospital who has access to all the data in North Hospital in regions Central and East. But this user also manages Medicaid for New York. In this case, to show all the relevant data, RLS rules have to be defined such that the user can see data where (Hospital = North Hospital and Region in (Central, East)) or (payor = Medicaid and State = New York). This can be achieved with the new RLS with OR tags feature in QuickSight.

Solution overview

Setup involves two steps:

  1. Create tag keys.
  2. Set SessionTags for each user.

Create tag keys

AnyHealth creates tag keys on the dataset they’re using to power the dashboard. This can be done in two ways, either through an UpdateDataset API call or through the QuickSight console.

Configuration using the API

In the UpdateDataset API call, the RowLevelPermissionTagConfiguration element is set as follows. Note that the items within an item in TagRuleConfigurations will always run a logical AND when the rules are passed, and if there is more than one item in the list, then the items are run with a logical OR. We use the following sample configuration to address our use case:

"RowLevelPermissionTagConfiguration": {
            "Status": "ENABLED",
            "TagRules": [
                {
                    "TagKey": "region",
                    "ColumnName": "Region",
                    "TagMultiValueDelimiter": ",",
                    "MatchAllValue": "*"
                },
                {
                    "TagKey": "hospital",
                    "ColumnName": "Hospital",
                    "TagMultiValueDelimiter": ",",
                    "MatchAllValue": "*"
                },
                {
                    "TagKey": "payor",
                    "ColumnName": "Payor Segment",
                    "TagMultiValueDelimiter": "*",
                    "MatchAllValue": ","
                },
                {
                    "TagKey": "state",
                    "ColumnName": "State",
                    "TagMultiValueDelimiter": ",",
                    "MatchAllValue": "*"
                }
            ],
            "TagRuleConfigurations": [
                [
                    "region",
                    "hospital"
                ],
                [
                    "payor",
                    "state"
                ]
            ]
        }

Configuration using the QuickSight console

To use the QuickSight console, complete the following steps:

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose the dataset from the list to apply tag-based RLS tags (for this post, we use the patientinfo dataset).
  3. Choose Edit under Row-level security.
  4. On the Set up row-level security page, expand Tag-based rules.
  5. To begin adding rules, choose columns on the Column drop-down menu under Manage tags.
  6. Create rules as per the permissions table.

To grant access to QuickSight provisioned users, you still need to configure user-based rules.

  1. Repeat these steps to add the required tags.
  2. After all the tags are added, choose Add OR Condition under Manage rules.
  3. Choose your tags for the OR condition and choose Update.

Note that you need to explicitly update the first condition that automatically created AND for all fields added.

  1. Once the rules are created, choose Apply.

Set SessionTags

At runtime, when embedding the dashboards via the GenerateDahboardEmbedURLForAnonymousUser API, set SessionTags for each user.

SessionTags for NorthAdmin are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "North Hospital"
        },
        {
            "Key": "region",
            "Value": "*"
        }
    ]
}

SessionTags for SouthAdmin are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "South Hospital"
        },
        {
            "Key": "region",
            "Value": "*"
        }
    ]
}

SessionTags for PowerUser are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "*"
        },
        {
            "Key": "region",
            "Value": "*"
        }
    ]
}

SessionTags for NorthMedicaidUser are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "North Hospital"
        },
        {
            "Key": "region",
            "Value": "East"
        }, 
        {
            "Key": "payor",
            "Value": "Medicaid"
        },
        {
            "Key": "state",
            "Value": "New York"
        }
    ]
}

SessionTags for SouthMedicareUser are as follows:

{
    "SessionTags": [
        {
            "Key": "hospital",
            "Value": "South Hospital"
        },
        {
            "Key": "region",
            "Value": "South"
        }, 
        {
            "Key": "payor",
            "Value": "Medicare"
        },
        {
            "Key": "state",
            "Value": "*"
        }
    ]
}

The following screenshot shows what NorthMedicaidUser sees pertaining to all North hospitals in the East region and Medicaid in New York state.

The following screenshot shows what SouthMedicaidUser sees pertaining to all South hospitals in the South region or Medicare in all states.

Based on session tags with OR of AND’s support, AnyHealth has secured data on the embedded dashboards such that each user only sees specific data based on their access. You can access the dashboard as one of the users (by changing the user on the drop-down menu on the top right) and see how the data changes based on the user selected.

Overall, with row-level security using OR of AND, AnyHealth is able to provide a compelling analytics experience within their SaaS application, while making sure that each user only sees the appropriate data without having to provision and manage users in QuickSight. QuickSight provides a highly scalable, secure analytics option that you can set up and roll out to production in days, instead of weeks or months previously.

Conclusion

The combination of embedding dashboards for users not provisioned in QuickSight and row-level security using tags with OR of AND enables developers and ISVs to quickly set up sophisticated, customized analytics for their application users—all without any infrastructure setup or user management, while scaling to millions of users. For more updates from QuickSight embedded analytics, see What’s New in the Amazon QuickSight User Guide.

If you have any questions or feedback, please leave a comment. For additional discussions and help getting answers to your questions, check out the QuickSight Community.


About the Authors

Srikanth Baheti is a Specialized World Wide Principal Solution Architect for Amazon QuickSight. He started his career as a consultant and worked for multiple private and government organizations. Later he worked for PerkinElmer Health and Sciences & eResearch Technology Inc, where he was responsible for designing and developing high traffic web applications, highly scalable and maintainable data pipelines for reporting platforms using AWS services and Serverless computing.

Raji Sivasubramaniam is a Sr. Solutions Architect at AWS, focusing on Analytics. Raji is specialized in architecting end-to-end Enterprise Data Management, Business Intelligence and Analytics solutions for Fortune 500 and Fortune 100 companies across the globe. She has in-depth experience in integrated healthcare data and analytics with wide variety of healthcare datasets including managed market, physician targeting and patient analytics.

Mayank Agarwal is a product manager for Amazon QuickSight, AWS’ cloud-native, fully managed BI service. He focuses on embedded analytics and developer experience. He started his career as an embedded software engineer developing handheld devices. Prior to QuickSight he was leading engineering teams at Credence ID, developing custom mobile embedded device and web solutions using AWS services that make biometric enrollment and identification fast, intuitive, and cost-effective for Government sector, healthcare and transaction security applications.

Real-time inference using deep learning within Amazon Kinesis Data Analytics for Apache Flink

Post Syndicated from Jeremy Ber original https://aws.amazon.com/blogs/big-data/real-time-inference-using-deep-learning-within-amazon-kinesis-data-analytics-for-apache-flink/

Apache Flink is a framework and distributed processing engine for stateful computations over data streams. Amazon Kinesis Data Analytics for Apache Flink is a fully managed service that enables you to use an Apache Flink application to process streaming data. The Deep Java Library (DJL) is an open-source, high-level, engine-agnostic Java framework for deep learning.

In this blog post, we demonstrate how you can use DJL within Kinesis Data Analytics for Apache Flink for real-time machine learning inference. Real-time inference can be valuable in a variety of applications and industries where it is essential to make predictions or take actions based on new data as quickly as possible with low latencies. We show how to load a pre-trained deep learning model from the DJL model zoo into a Flink job and apply the model to classify data objects in a continuous data stream. The DJL model zoo includes a wide variety of pre-trained models for image classification, semantic segmentation, speech recognition, text embedding generation, question answering, and more. It supports HuggingFace, Pytorch, MXNet, and TensorFlow model frameworks and also helps developers create and publish their own models. We will focus on image classification and use a popular classifier called ResNet-18 to produce predictions in real time. The model has been pre-trained on ImageNet with 1.2 million images belonging to 1,000 class labels.

We provide sample code, architecture diagrams, and an AWS CloudFormation template so you can follow along and employ ResNet-18 as your classifier to make real-time predictions. The solution we provide here is a powerful design pattern for continuously producing ML-based predictions on streaming data within Kinesis Data Analytics for Apache Flink. You can adapt the provided solution for your use case and choose an alternative model from the model zoo or even provide your own model.

Image classification is a classic problem that takes an image and selects the best-fitting class, such as whether the image from an autonomous driving system is that of a bicycle or a pedestrian. Common use cases for real-time inference on streams of images include classifying images from vehicle cameras and license plate recognition systems, and classifying images uploaded to social media and ecommerce websites. The use cases typically need low latency while handling high throughput and potentially bursty streams. For example, in ecommerce websites, real-time classification of uploaded images can help in marking pictures of banned goods or hazardous materials that have been supplied by sellers. Immediate determination through streaming inference is needed to trigger alerts and follow-up actions to prevent these images from being part of the catalog. This enables faster decision-making compared to batch jobs that run on a periodic basis. The data stream pipeline can involve multiple models for different purposes, such as classifying uploaded images into ecommerce categories of electronics, toys, fashion, and so on.

Solution overview

The following diagram illustrates the workflow of our solution.

architecture showcasing a kinesis data analytics for apache flink application reading from Images in an Amazon S3 bucket, classifying those images and then writing out to another S3 bucket called "classifications"

The application performs the following steps:

  1. Read in images from Amazon Simple Storage Service (Amazon S3) using the Apache Flink Filesystem File Source connector.
  2. Window the images into a collection of records.
  3. Classify the batches of images using the DJL image classification class.
  4. Write inference results to Amazon S3 at the path specified.

Images are recommended to be of reasonable size so that they may fit into a Kinesis Processing Unit. Images larger than 50MB in size may result in latency in processing and classification.

The main class for this Apache Flink job is located at src/main/java/com.amazon.embeddedmodelinference/EMI.java. Here you can find the main() method and entry point to our Flink job.

Prerequisites

To get started, configure the following prerequisites on your local machine:

Once this is set up, you can clone the code base to access the source code for this solution. The Java application code for this example is available on GitHub. To download the application code, clone the remote repository using the following command:

git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples

Find and navigate to the folder of the image classification example, called image-classification.

An example set of images to stream and test the code is available in the imagenet-sample-images folder.

Let’s walk through the code step by step.

Test on your local machine

If you would like to test this application locally on your machine, ensure you have AWS credentials set up locally on your machine. Additionally, download the Flink S3 Filesystem Hadoop JAR to use with your Apache Flink installation and place it in a folder named plugins/s3 in the root of your project. Then configure the following environment variables either on your IDE or in your machine’s local variable scope:

IS_LOCAL=true;
plugins.dir=<<path-to-flink-s3-fs-hadoop jar>>
s3.access.key=<<aws access key>>
s3.secret.key=<<aws secret key>>

Replace these values with your own.showcasing the environment properties to replace on IntelliJ

After configuring the environment variables and downloading the necessary plugin JAR, let’s look at the code.

In the main method, after setting up our StreamExecutionEnvironment, we define our FileSource to read files from Amazon S3. By default, this source operator reads from a sample bucket. You can replace this bucket name with your own by changing the variable called bucket, or setting the application property on Kinesis Data Analytics for Apache Flink once deployed.

final FileSource<StreamedImage> source =
FileSource.forRecordStreamFormat(new ImageReaderFormat(), new Path(s3SourcePath))
               .monitorContinuously(Duration.ofSeconds(10))
               .build();

The FileSource is configured to read in files in the ImageReaderFormat, and will check Amazon S3 for new images every 10 seconds. This can be configured as well.

After we have read in our images, we convert our FileSource into a stream that can be processed:

DataStream<StreamedImage> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");

Next, we create a tumbling window of a variable time window duration, specified in the configuration, defaulting to 60 seconds. Every window close creates a batch (list) of images to be classified using a ProcessWindowFunction.

This ProcessWindowFunction calls the classifier predict function on the list of images and returns the best probability of classification from each image. This result is then sent back to the Flink operator, where it’s promptly written out to the S3 bucket path of your configuration.

.process(new ProcessWindowFunction<StreamedImage, String, String, TimeWindow>() {
                    @Override
                    public void process(String s,
                                        ProcessWindowFunction<StreamedImage, String, String, TimeWindow>.Context context,
                                        Iterable<StreamedImage> iterableImages,
                                        Collector<String> out) throws Exception {


                            List<Image> listOfImages = new ArrayList<Image>();
                            iterableImages.forEach(x -> {
                                listOfImages.add(x.getImage());
                            });
                        try
                        {
                            // batch classify images
                            List<Classifications> list = classifier.predict(listOfImages);
                            for (Classifications classifications : list) {
                                Classifications.Classification cl = classifications.best();
                                String ret = cl.getClassName() + ": " + cl.getProbability();
                                out.collect(ret);
                            }
                        } catch (ModelException | IOException | TranslateException e) {
                            logger.error("Failed predict", e);
                        }
                        }
                    });

In Classifier.java, we read the image and apply crop, transpose, reshape, and finally convert to an N-dimensional array that can be processed by the deep learning model. Then we feed the array to the model and apply a forward pass. During the forward pass, the model computes the neural network layer by layer. At last, the output object contains the probabilities for each image object that the model is being trained on. We map the probabilities with the object name and return to the map function.

Deploy the solution with AWS CloudFormation

To run this code base on Kinesis Data Analytics for Apache Flink, we have a helpful CloudFormation template that will spin up the necessary resources. Simply open AWS CloudShell or your local machine’s terminal and enter the following commands. Complete the following steps to deploy the solution:

  1. If you don’t have the AWS Cloud Development Kit (AWS CDK) bootstrapped in your account, run the following command, providing your account number and current Region:
cdk bootstrap aws://ACCOUNT-NUMBER/REGION

The script will clone a GitHub repo of images to classify and upload them to your source S3 bucket. Then it will launch the CloudFormation stack given your input parameters. video walking through the setup of the cloudformation template. Described in text later

  1. Enter the following code and replace the BUCKET variables with your own source bucket and sink bucket, which will contain the source images and the classifications, respectively:
export SOURCE_BUCKET=s3://SAMPLE-BUCKET/PATH;
export SINK_BUCKET=s3://SAMPLE_BUCKET/PATH;
git clone https://github.com/EliSchwartz/imagenet-sample-images; cd imagenet-sample-images;
aws s3 cp . $SOURCE_BUCKET --recursive --exclude "*/";
aws cloudformation create-stack --stack-name KDAImageClassification --template-url https://aws-blogs-artifacts-public.s3.amazonaws.com/artifacts/BDB-3098/BlogStack.template.json --parameters ParameterKey=inputBucketPath,ParameterValue=$SOURCE_BUCKET ParameterKey=outputBucketPath,ParameterValue=$SINK_BUCKET --capabilities CAPABILITY_IAM;

This CloudFormation stack creates the following resources:

    • A Kinesis Data Analytics application with 1 Kinesis Processing Unit (KPU) preconfigured with some application properties
    • An S3 bucket for your output results
  1. When the stack is complete, navigate to the Kinesis Data Analytics for Apache Flink console.
  2. Find the application called blog-DJL-flink-ImageClassification-application and choose Run.
  3. On the Amazon S3 console, navigate to the bucket you specified in the outputBucketPath variable.

If you have readable images in the source bucket listed, you should see classifications of those images within the checkpoint interval of the running application.

Deploy the solution manually

If you prefer to use your own code base, you can follow the manual steps in this section:

  • After you clone the application locally, create your application JAR by navigating to the directory that contains your pom.xml and running the following command:
mvn clean package

This builds your application JAR in the target/ directory called embedded-model-inference-1.0-SNAPSHOT.jar.

application properties on KDA console

  1. Upload this application JAR to an S3 bucket, either the one created from the CloudFormation template, or another one to store code artifacts.
  2. You can then configure your Kinesis Data Analytics application to point to this newly uploaded S3 JAR file.
  3. This is also a great opportunity to configure your runtime properties, as shown in the following screenshot.
  4. Choose Run to start your application.

You can open the Apache Flink Dashboard to check for application exceptions or to see data flowing through the tasks defined in the code.

image of flink dashboard showing successful running of the application

Validate the results

To validate our results, let’s check the results in Amazon S3 by navigating to the Amazon S3 console and finding our S3 bucket. We can find the output in a folder called output-kda.

image showing folders within amazon s3 partitioned by datetime

When we choose one of the data-partitioned folders, we can see partition files. Ensure that there is no underscore in front of your part file, because this indicates that the results are still being finalized according to the rollover interval defined in Apache Flink’s FileSink connector. After the underscores have disappeared, we can use Amazon S3 Select to view our data.

partition files as they land in Amazon S3

We now have a solution that continuously performs classification on incoming images in real time using Kinesis Data Analytics for Apache Flink. It extracts a pre-trained classification model (ResNet-18) from the DJL model zoo, applies some preprocessing, loads the model into a Flink operator’s memory, and continuously applies the model for online predictions on streaming images.

Although we used ResNet-18 in this post, you can choose another model by modifying the classifier. The DJL model zoo provides many other models, both for classification and other applications, that can be used out of the box. You can also load your custom model by providing an S3 link or URL to the criteria. DJL supports models in a large number of engines such as PyTorch, ONNX, TensorFlow, and MXNet. Using a model in the solution is relatively simple. All of the preprocessing and postprocessing code is encapsulated in the (built-in) translator, so all we have to do is load the model, create a predictor, and call predict(). This is done within the data source operator, which processes the stream of input data and sends the links to the data to the inference operator where the model you selected produces the prediction. Then the sink operator writes the results.

The CloudFormation template in this example focused on a simple 1 KPU application. You could extend the solution to further scale out to large models and high-throughput streams, and support multiple models within the pipeline.

Clean up

To clean up the CloudFormation script you launched, complete the following steps:

  1. Empty the source bucket you specified in the bash script.
  2. On the AWS CloudFormation console, locate the CloudFormation template called KDAImageClassification.
  3. Delete the stack, which will remove all of the remaining resources created for this post.
  4. You may optionally delete the bootstrapping CloudFormation template, CDKToolkit, which you launched earlier as well.

Conclusion

In this post, we presented a solution for real-time classification using the Deep Java Library within Kinesis Data Analytics for Apache Flink. We shared a working example and discussed how you can adapt the solution for your specific ML use case. If you have any feedback or questions, please leave them in the comments section.


About the Authors

Jeremy Ber has been working in the telemetry data space for the past 9 years as a Software Engineer, Machine Learning Engineer, and most recently a Data Engineer. At AWS, he is a Streaming Specialist Solutions Architect, supporting both Amazon Managed Streaming for Apache Kafka (Amazon MSK) and AWS’s managed offering for Apache Flink.

Deepthi Mohan is a Principal Product Manager for Amazon Kinesis Data Analytics, AWS’s managed offering for Apache Flink.

Gaurav Rele is a Data Scientist at the Amazon ML Solution Lab, where he works with AWS customers across different verticals to accelerate their use of machine learning and AWS Cloud services to solve their business challenges.

Configure Amazon OpenSearch Service for high availability

Post Syndicated from Rohin Bhargava original https://aws.amazon.com/blogs/big-data/configure-amazon-opensearch-service-for-high-availability/

Amazon OpenSearch Service is a fully open-source search and analytics engine that securely unlocks real-time search, monitoring, and analysis of business and operational data for use cases like recommendation engines, ecommerce sites, and catalog search. To be successful in your business, you need your systems to be highly available and performant, minimizing downtime and avoiding failure. When you use OpenSearch Service as your primary means of monitoring your infrastructure, you need to ensure its availability as well. Downtime for OpenSearch Service can have a significant effect on your business outcomes, such as loss of revenue, loss in productivity, loss in brand value, and more.

The industry standard for measuring availability is class of nines. OpenSearch Service provides 3 9’s of availability, when you follow best practices, which means it guarantees less than 43.83 minutes of downtime a month. In this post, you will learn how you can configure your OpenSearch Service domain for high availability and performance by following best practices and recommendations while setting up your domain.

There are two essential elements that influence your domain’s availability: the resource utilization of your domain, which is mostly driven by your workload, and external events such as infrastructure failures. Although the former can be controlled through continuous monitoring of the domain’s performance and health and scaling the domain accordingly, the latter cannot. To mitigate the impact of external events such as an Availability Zone outage, instance or disk failure, or networking issues on your domain, you must provision additional capacity, distributed over multiple Availability Zones, and keep multiple copies of data. Failure to do so may result in degraded performance, unavailability, and, in the worst-case situation, data loss.

Let’s look at the options available to you to ensure that domain is available and performant.

Cluster configuration

Under this section we will talk about various configuration options you have to setup your cluster properly which includes specifying the number of AZ for the deployment, setting up the master and data nodes, setting up indexes and shards.

Multi-AZ deployment

Data nodes are responsible for processing indexing and search requests in your domain. Deploying your data nodes across multiple Availability Zones improves the availability of your domain by adding redundant, per-zone data storage and processing. With a Multi-AZ deployment, your domain can remain available even when a full Availability Zone becomes unavailable. For production workloads, AWS recommends using three Availability Zones for your domain. Use two Availability Zones for Regions that support only two for improved availability. This ensures that your domain is available in the event of a Single-AZ failure.

Dedicated cluster manager (master nodes)

AWS recommends using three dedicated cluster manager (CM) nodes for all production workloads. CM nodes track the cluster’s health, the state and location of its indexes and shards, the mapping for all the indexes, and the availability of its data nodes, and it maintains a list of cluster-level tasks in process. Without dedicated CM nodes, the cluster uses data nodes, which makes the cluster vulnerable to workload demands. You should size CM nodes based on the size of the task—primarily, the data node counts, the index counts, and the shard counts. OpenSearch Service always deploys CM nodes across three Availability Zones, when supported by the Region (two in one Availability Zones and one in other Availability Zones if regions have only two Availability Zones). For a running domain, only one of the three CM nodes works as an elected leader. The other two CM nodes participate in an election if the elected CM node fails.

The following table shows AWS’s recommendations for CM sizing. CM nodes do work based on the number of nodes, indexes, shards, and mapping. The more work, the more compute and memory you need to hold and work with the cluster state.

Instance Count Cluster Manager Node RAM Size Maximum Supported Shard Count Recommended Minimum Dedicated Cluster Manager Instance Type
1–10 8 GiB 10,000 m5.large.search or m6g.large.search
11–30 16 GiB 30,000 c5.2xlarge.search or c6g.2xlarge.search
31–75 32 GiB 40,000 c5.4xlarge.search or c6g.4xlarge.search
76 – 125 64 GiB 75,000 r5.2xlarge.search or r6g.2xlarge.search
126 – 200 128 GiB 75,000 r5.4xlarge.search or r6g.4xlarge.search

Indexes and shards

Indexes are a logical construct that houses a collection of documents. You partition your index for parallel processing by specifying a primary shard count, where shards represent a physical unit for storing and processing data. In OpenSearch Service, a shard can be either a primary shard or a replica shard. You use replicas for durability—if the primary shard is lost, OpenSearch Service promotes one of the replicas to primary—and for improving search throughput. OpenSearch Service ensures that the primary and replica shards are placed in different nodes and across different Availability Zones, if deployed in more than one Availability Zone. For high availability, AWS recommends configuring at least two replicas for each index in a three-zone setup to avoid disruption in performance and availability. In a Multi-AZ setup, if a node fails or in the rare worst case an Availability Zone fails, you will still have a copy of the data.

Cluster monitoring and management

As discussed earlier, selecting your configuration based on best practices is only half the job. We also need to continuously monitor the resource utilization and performance to determine if the domain needs to be scaled. An under-provisioned or over-utilized domain can result in performance degradation and eventually unavailability.

CPU utilization

You use the CPU in your domain to run your workload. As a general rule, you should target 60% average CPU utilization for any data node, with peaks at 80%, and tolerate small spikes to 100%. When you consider availability, and especially considering the unavailability of a full zone, there are two scenarios. If you have two Availability Zones, then each zone handles 50% of the traffic. If a zone becomes unavailable, the other zone will take all of that traffic, doubling CPU utilization. In that case, you need to be at around 30–40% average CPU utilization in each zone to maintain availability. If you are running three Availability Zones, each zone is taking 33% of the traffic. If a zone becomes unavailable, each other zone will gain approximately 17% traffic. In this case, you should target 50–60% average CPU utilization.

Memory utilization

OpenSearch Service supports two types of garbage collection. The first is G1 garbage collection (G1GC), which is used by OpenSearch Service nodes, powered by AWS Graviton 2. The second is Concurrent Mark Sweep (CMS), which is used by all nodes powered by other processors. Out of all the memory allocated to a node, half of the memory (up to 32 GB) is assigned to the Java heap, and the rest of the memory is used by other operating system tasks, the file system cache, and so on. To maintain availability for a domain, we recommend keeping the max JVM utilization at around 80% in CMS and 95% in G1GC. Anything beyond that would impact the availability of your domain and make your cluster unhealthy. We also recommend enabling auto-tune, which actively monitors the memory utilization and triggers the garbage collector.

Storage utilization

OpenSearch Service publishes several guidelines for sizing of domains. We provide an empirical formula so that you can determine the right amount of storage required for your requirements. However, it’s important to keep an eye out for the depletion of storage with time and changes in workload characteristics. To ensure the domain doesn’t run out of storage and can continue to index data, you should configure Amazon CloudWatch alarms and monitor your free storage space.

AWS also recommends choosing a primary shard count so that each shard is within an optimal size band. You can determine the optimal shard size through proof-of-concept testing with your data and traffic. We use 10–30 GB primary shard sizes for search use cases and 45–50 GB primary shard sizes for log analytics use cases as a guideline. Because shards are the workers in your domain, they are directly responsible for the distribution of the workload across the data nodes. If your shards are too large, you may see stress in your Java heap from large aggregations, worse query performance, and worse performance on cluster-level tasks like shard rebalancing, snapshots, and hot-to-warm migrations. If your shards are too small, they can overwhelm the domain’s Java heap space, worsen query performance through excessive internal networking, and make cluster-level tasks slow. We also recommend keeping the number of shards per node proportional to the heap available (half of the instance’s RAM up to 32 GB)—25 shards per GB of Java heap. This makes a practical limit of 1,000 shards on any data node in your domain.

Conclusion

In this post, you learned various tips and tricks to set up a highly available domain using OpenSearch Service, which helps you to keep OpenSearch Service performant and available by running it across three Availability Zones.

Stay tuned for a series of posts focusing on the various features and functionalities with OpenSearch Service. If you have feedback about this post, submit it in the comments section. If you have questions about this post, start a new thread on the OpenSearch Service forum or contact AWS Support.


About the authors

Rohin Bhargava is a Sr. Product Manager with the Amazon OpenSearch Service team. His passion at AWS is to help customers find the correct mix of AWS services to achieve success for their business goals.

Prashant Agrawal is a Sr. Search Specialist Solutions Architect with Amazon OpenSearch Service. He works closely with customers to help them migrate their workloads to the cloud and helps existing customers fine-tune their clusters to achieve better performance and save on cost. Before joining AWS, he helped various customers use OpenSearch and Elasticsearch for their search and log analytics use cases. When not working, you can find him traveling and exploring new places. In short, he likes doing Eat → Travel → Repeat.

Automate alerting and reporting for AWS Glue job resource usage

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/automate-alerting-and-reporting-for-aws-glue-job-resource-usage/

Data transformation plays a pivotal role in providing the necessary data insights for businesses in any organization, small and large. To gain these insights, customers often perform ETL (extract, transform, and load) jobs from their source systems and output an enriched dataset. Many organizations today are using AWS Glue to build ETL pipelines that bring data from disparate sources and store the data in repositories like a data lake, database, or data warehouse for further consumption. These organizations are looking for ways they can reduce cost across their IT environments and still be operationally performant and efficient.

Picture a scenario where you, the VP of Data and Analytics, are in charge of your data and analytics environments and workloads running on AWS where you manage a team of data engineers and analysts. This team is allowed to create AWS Glue for Spark jobs in development, test, and production environments. During testing, one of the jobs wasn’t configured to automatically scale its compute resources, resulting in jobs timing out, costing the organization more than anticipated. The next steps usually include completing an analysis of the jobs, looking at cost reports to see which account generated the spike in usage, going through logs to see when what happened with the job, and so on. After the ETL job has been corrected, you may want to implement monitoring and set standard alert thresholds for your AWS Glue environment.

This post will help organizations proactively monitor and cost optimize their AWS Glue environments by providing an easier path for teams to measure efficiency of their ETL jobs and align configuration details according to organizational requirements. Included is a solution you will be able to deploy that will notify your team via email about any Glue job that has been configured incorrectly. Additionally, a weekly report is generated and sent via email that aggregates resource usage and provides cost estimates per job.

AWS Glue cost considerations

AWS Glue for Apache Spark jobs are provisioned with a number of workers and a worker type. These jobs can be either G.1X, G.2X, G.4X, G.8X or Z.2X (Ray) worker types that map to data processing units (DPUs). DPUs include a certain amount of CPU, memory, and disk space. The following table contains more details.

Worker Type DPUs vCPUs Memory (GB) Disk (GB)
G.1X 1 4 16 64
G.2X 2 8 32 128
G.4X 4 16 64 256
G.8X 8 32 128 512
Z.2X 2 8 32 128

For example, if a job is provisioned with 10 workers as G.1X worker type, the job will have access to 40 vCPU and 160 GB of RAM to process data and double using G.2X. Over-provisioning workers can lead to increased cost, due to not all workers being utilized efficiently.

In April 2022, Auto Scaling for AWS Glue was released for AWS Glue version 3.0 and later, which includes AWS Glue for Apache Spark and streaming jobs. Enabling auto scaling on your Glue for Apache Spark jobs will allow you to only allocate workers as needed, up to the worker maximum you specify. We recommend enabling auto scaling for your AWS Glue 3.0 & 4.0 jobs because this feature will help reduce cost and optimize your ETL jobs.

Amazon CloudWatch metrics are also a great way to monitor your AWS Glue environment by creating alarms for certain metrics like average CPU or memory usage. To learn more about how to use CloudWatch metrics with AWS Glue, refer to Monitoring AWS Glue using Amazon CloudWatch metrics.

The following solution provides a simple way to set AWS Glue worker and job duration thresholds, configure monitoring, and receive emails for notifications on how your AWS Glue environment is performing. If a Glue job finishes and detects worker or job duration thresholds were exceeded, it will notify you after the job run has completed, failed, or timed out.

Solution overview

The following diagram illustrates the solution architecture.

Solution Architecture

When you deploy this application via AWS Serverless Application Model (AWS SAM), it will ask what AWS Glue worker and job duration thresholds you would like to set to monitor the AWS Glue for Apache Spark and AWS Glue for Ray jobs running in that account. The solution will use these values as the decision criteria when invoked. The following is a breakdown of each step in the architecture:

  1. Any AWS Glue for Apache Spark job that succeeds, fails, stops, or times out is sent to Amazon EventBridge.
  2. EventBridge picks up the event from AWS Glue and triggers an AWS Lambda function.
  3. The Lambda function processes the event and determines if the data and analytics team should be notified about the particular job run. The function performs the following tasks:
    1. The function sends an email using Amazon Simple Notification Service (Amazon SNS) if needed.
      • If the AWS Glue job succeeded or was stopped without going over the worker or job duration thresholds, or is tagged to not be monitored, no alerts or notifications are sent.
      • If the job succeeded but ran with a worker or job duration thresholds higher than allowed, or the job either failed or timed out, Amazon SNS sends a notification to the designated email with information about the AWS Glue job, run ID, and reason for alerting, along with a link to the specific run ID on the AWS Glue console.
    2. The function logs the job run information to Amazon DynamoDB for a weekly aggregated report delivered to email. The Dynamo table has Time to Live enabled for 7 days, which keeps the storage to minimum.
  4. Once a week, the data within DynamoDB is aggregated by a separate Lambda function with meaningful information like longest-running jobs, number of retries, failures, timeouts, cost analysis, and more.
  5. Amazon Simple Email Service (Amazon SES) is used to deliver the report because it can be better formatted than using Amazon SNS. The email is formatted via HTML output that provides tables for the aggregated job run data.
  6. The data and analytics team is notified about the ongoing job runs through Amazon SNS, and they receive the weekly aggregation report through Amazon SES.

Note that AWS Glue Python shell and streaming ETL jobs are not supported because they’re not in scope of this solution.

Prerequisites

You must have the following prerequisites:

  • An AWS account to deploy the solution to
  • Proper AWS Identity and Access Management (IAM) privileges to create the resources
  • The AWS SAM CLI to build and deploy the solution button below, to run template on your AWS environment

Deploy the solution

This AWS SAM application provisions the following resources:

  • Two EventBridge rules
  • Two Lambda functions
  • An SNS topic and subscription
  • A DynamoDB table
  • An SES subscription
  • The required IAM roles and policies

To deploy the AWS SAM application, complete the following steps:

Clone the aws-samples GitHub repository:

git clone https://github.com/aws-samples/aws-glue-job-tracker.git

Deploy the AWS SAM application:

cd aws-glue-job-tracker
sam deploy --guided

sam deploy configuration

Provide the following parameters:

  • GlueJobWorkerThreshold – Enter the maximum number of workers you want an AWS Glue job to be able to run with before sending threshold alert. The default is 10. An alert will be sent if a Glue job runs with higher workers than specified.
  • GlueJobDurationThreshold – Enter the maximum duration in minutes you want an AWS Glue job to run before sending threshold alert. The default is 480 minutes (8 hours). An alert will be sent if a Glue job runs with higher job duration than specified.
  • GlueJobNotifications – Enter an email or distribution list of those who need to be notified through Amazon SNS and Amazon SES. You can go to the SNS topic after the deployment is complete and add emails as needed.

To receive emails from Amazon SNS and Amazon SES, you must confirm your subscriptions. After the stack is deployed, check your email that was specified in the template and confirm by choosing the link in each message. When the application is successfully provisioned, it will begin monitoring your AWS Glue for Apache Spark job environment. The next time a job fails, times out, or exceeds a specified threshold, you will receive an email via Amazon SNS. For example, the following screenshot shows an SNS message about a job that succeeded but had a job duration threshold violation.

You might have jobs that need to run at a higher worker or job duration threshold, and you don’t want the solution to evaluate them. You can simply tag that job with the key/value of remediate and false. The step function will still be invoked, but will use the PASS state when it recognizes the tag. For more information on job tagging, refer to AWS tags in AWS Glue.

Adding tags to glue job configuration

Configure weekly reporting

As mentioned previously, when an AWS Glue for Apache Spark job succeeds, fails, times out, or is stopped, EventBridge forwards this event to Lambda, where it logs specific information about each job run. Once a week, a separate Lambda function queries DynamoDB and aggregates your job runs to provide meaningful insights and recommendations about your AWS Glue for Apache Spark environment. This report is sent via email with a tabular structure as shown in the following screenshot. It’s meant for top-level visibility so you’re able to see your longest job runs over time, jobs that have had many retries, failures, and more. It also provides an overall cost calculation as an estimate of what each AWS Glue job will cost for that week. It should not be used as a guaranteed cost. If you would like to see exact cost per job, the AWS Cost and Usage Report is the best resource to use. The following screenshot shows one table (of five total) from the AWS Glue report function.

weekly report

Clean up

If you don’t want to run the solution anymore, delete the AWS SAM application for each account that it was provisioned in. To delete your AWS SAM stack, run the following command from your project directory:

sam delete

Conclusion

In this post, we discussed how you can monitor and cost-optimize your AWS Glue job configurations to comply with organizational standards and policy. This method can provide cost controls over AWS Glue jobs across your organization. Some other ways to help control the costs of your AWS Glue for Apache Spark jobs include the newly released AWS Glue Flex jobs and Auto Scaling. We also provided an AWS SAM application as a solution to deploy into your accounts. We encourage you to review the resources provided in this post to continue learning about AWS Glue. To learn more about monitoring and optimizing for cost using AWS Glue, please visit this recent blog. It goes in depth on all of the cost optimization options and includes a template that builds a CloudWatch dashboard for you with metrics about all of your Glue job runs.


About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers in the south east modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Angus Ferguson is a Solutions Architect at AWS who is passionate about meeting customers across the world, helping them solve their technical challenges. Angus specializes in Data & Analytics with a focus on customers in the financial services industry.

Improve operational efficiencies of Apache Iceberg tables built on Amazon S3 data lakes

Post Syndicated from Avijit Goswami original https://aws.amazon.com/blogs/big-data/improve-operational-efficiencies-of-apache-iceberg-tables-built-on-amazon-s3-data-lakes/

Apache Iceberg is an open table format for large datasets in Amazon Simple Storage Service (Amazon S3) and provides fast query performance over large tables, atomic commits, concurrent writes, and SQL-compatible table evolution. When you build your transactional data lake using Apache Iceberg to solve your functional use cases, you need to focus on operational use cases for your S3 data lake to optimize the production environment. Some of the important non-functional use cases for an S3 data lake that organizations are focusing on include storage cost optimizations, capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake, and handling increased Amazon S3 request rates.

In this post, we show you how to improve operational efficiencies of your Apache Iceberg tables built on Amazon S3 data lake and Amazon EMR big data platform.

Optimize data lake storage

One of the major advantages of building modern data lakes on Amazon S3 is it offers lower cost without compromising on performance. You can use Amazon S3 Lifecycle configurations and Amazon S3 object tagging with Apache Iceberg tables to optimize the cost of your overall data lake storage. An Amazon S3 Lifecycle configuration is a set of rules that define actions that Amazon S3 applies to a group of objects. There are two types of actions:

  • Transition actions – These actions define when objects transition to another storage class; for example, Amazon S3 Standard to Amazon S3 Glacier.
  • Expiration actions – These actions define when objects expire. Amazon S3 deletes expired objects on your behalf.

Amazon S3 uses object tagging to categorize storage where each tag is a key-value pair. From an Apache Iceberg perspective, it supports custom Amazon S3 object tags that can be added to S3 objects while writing and deleting into the table. Iceberg also let you configure a tag-based object lifecycle policy at the bucket level to transition objects to different Amazon S3 tiers. With the s3.delete.tags config property in Iceberg, objects are tagged with the configured key-value pairs before deletion. When the catalog property s3.delete-enabled is set to false, the objects are not hard-deleted from Amazon S3. This is expected to be used in combination with Amazon S3 delete tagging, so objects are tagged and removed using an Amazon S3 lifecycle policy. This property is set to true by default.

The example notebook in this post shows an example implementation of S3 object tagging and lifecycle rules for Apache Iceberg tables to optimize storage cost.

Implement business continuity

Amazon S3 gives any developer access to the same highly scalable, reliable, fast, inexpensive data storage infrastructure that Amazon uses to run its own global network of web sites. Amazon S3 is designed for 99.999999999% (11 9’s) of durability, S3 Standard is designed for 99.99% availability, and Standard – IA is designed for 99.9% availability. Still, to make your data lake workloads highly available in an unlikely outage situation, you can replicate your S3 data to another AWS Region as a backup. With S3 data residing in multiple Regions, you can use an S3 multi-Region access point as a solution to access the data from the backup Region. With Amazon S3 multi-Region access point failover controls, you can route all S3 data request traffic through a single global endpoint and directly control the shift of S3 data request traffic between Regions at any time. During a planned or unplanned regional traffic disruption, failover controls let you control failover between buckets in different Regions and accounts within minutes. Apache Iceberg supports access points to perform S3 operations by specifying a mapping of bucket to access points. We include an example implementation of an S3 access point with Apache Iceberg later in this post.

Increase Amazon S3 performance and throughput

Amazon S3 supports a request rate of 3,500 PUT/COPY/POST/DELETE or 5,500 GET/HEAD requests per second per prefix in a bucket. The resources for this request rate aren’t automatically assigned when a prefix is created. Instead, as the request rate for a prefix increases gradually, Amazon S3 automatically scales to handle the increased request rate. For certain workloads that need a sudden increase in the request rate for objects in a prefix, Amazon S3 might return 503 Slow Down errors, also known as S3 throttling. It does this while it scales in the background to handle the increased request rate. Also, if supported request rates are exceeded, it’s a best practice to distribute objects and requests across multiple prefixes. Implementing this solution to distribute objects and requests across multiple prefixes involves changes to your data ingress or data egress applications. Using Apache Iceberg file format for your S3 data lake can significantly reduce the engineering effort through enabling the ObjectStoreLocationProvider feature, which adds an S3 hash [0*7FFFFF] prefix in your specified S3 object path.

Iceberg by default uses the Hive storage layout, but you can switch it to use the ObjectStoreLocationProvider. This option is not enabled by default to provide flexibility to choose the location where you want to add the hash prefix. With ObjectStoreLocationProvider, a deterministic hash is generated for each stored file and a subfolder is appended right after the S3 folder specified using the parameter write.data.path (write.object-storage-path for Iceberg version 0.12 and below). This ensures that files written to Amazon S3 are equally distributed across multiple prefixes in your S3 bucket, thereby minimizing the throttling errors. In the following example, we set the write.data.path value as s3://my-table-data-bucket, and Iceberg-generated S3 hash prefixes will be appended after this location:

CREATE TABLE my_catalog.my_ns.my_table
( id bigint,
data string,
category string)
USING iceberg OPTIONS
( 'write.object-storage.enabled'=true,
'write.data.path'='s3://my-table-data-bucket')
PARTITIONED BY (category);

Your S3 files will be arranged under MURMUR3 S3 hash prefixes like the following:

2021-11-01 05:39:24 809.4 KiB 7ffbc860/my_ns/my_table/00328-1642-5ce681a7-dfe3-4751-ab10-37d7e58de08a-00015.parquet
2021-11-01 06:00:10 6.1 MiB 7ffc1730/my_ns/my_table/00460-2631-983d19bf-6c1b-452c-8195-47e450dfad9d-00001.parquet
2021-11-01 04:33:24 6.1 MiB 7ffeeb4e/my_ns/my_table/00156-781-9dbe3f08-0a1d-4733-bd90-9839a7ceda00-00002.parquet

Using Iceberg ObjectStoreLocationProvider is not a foolproof mechanism to avoid S3 503 errors. You still need to set appropriate EMRFS retries to provide additional resiliency. You can adjust your retry strategy by increasing the maximum retry limit for the default exponential backoff retry strategy or enabling and configuring the additive-increase/multiplicative-decrease (AIMD) retry strategy. AIMD is supported for Amazon EMR releases 6.4.0 and later. For more information, refer to Retry Amazon S3 requests with EMRFS.

In the following sections, we provide examples for these use cases.

Storage cost optimizations

In this example, we use Iceberg’s S3 tags feature with the write tag as write-tag-name=created and delete tag as delete-tag-name=deleted. This example is demonstrated on an EMR version emr-6.10.0 cluster with installed applications Hadoop 3.3.3, Jupyter Enterprise Gateway 2.6.0, and Spark 3.3.1. The examples are run on a Jupyter Notebook environment attached to the EMR cluster. To learn more about how to create an EMR cluster with Iceberg and use Amazon EMR Studio, refer to Use an Iceberg cluster with Spark and the Amazon EMR Studio Management Guide, respectively.

The following examples are also available in the sample notebook in the aws-samples GitHub repo for quick experimentation.

Configure Iceberg on a Spark session

Configure your Spark session using the %%configure magic command. You can use either the AWS Glue Data Catalog (recommended) or a Hive catalog for Iceberg tables. In this example, we use a Hive catalog, but we can change to the Data Catalog with the following configuration:

spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog

Before you run this step, create a S3 bucket and an iceberg folder in your AWS account with the naming convention <your-iceberg-storage-blog>/iceberg/.

Update your-iceberg-storage-blog in the following configuration with the bucket that you created to test this example. Note the configuration parameters s3.write.tags.write-tag-name and s3.delete.tags.delete-tag-name, which will tag the new S3 objects and deleted objects with corresponding tag values. We use these tags in later steps to implement S3 lifecycle policies to transition the objects to a lower-cost storage tier or expire them based on the use case.

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.catalog-impl":"org.apache.iceberg.hive.HiveCatalog", "spark.sql.catalog.dev.io-impl":"org.apache.iceberg.aws.s3.S3FileIO", "spark.sql.catalog.dev.warehouse":"s3://&amp;amp;lt;your-iceberg-storage-blog&amp;amp;gt;/iceberg/", "spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created", "spark.sql.catalog.dev.s3.delete.tags.delete-tag-name":"deleted", "spark.sql.catalog.dev.s3.delete-enabled":"false" } }

Create an Apache Iceberg table using Spark-SQL

Now we create an Iceberg table for the Amazon Product Reviews Dataset:

spark.sql(""" DROP TABLE if exists dev.db.amazon_reviews_iceberg""")
spark.sql(""" CREATE TABLE dev.db.amazon_reviews_iceberg (
marketplace string,
customer_id string,
review_id string,
product_id string,
product_parent string,
product_title string,
star_rating int,
helpful_votes int,
total_votes int,
vine string,
verified_purchase string,
review_headline string,
review_body string,
review_date date,
year int)
USING iceberg
location 's3://<your-iceberg-storage-blog>/iceberg/db/amazon_reviews_iceberg'
PARTITIONED BY (years(review_date))""")

In the next step, we load the table with the dataset using Spark actions.

Load data into the Iceberg table

While inserting the data, we partition the data by review_date as per the table definition. Run the following Spark commands in your PySpark notebook:

df = spark.read.parquet("s3://amazon-reviews-pds/parquet/product_category=Electronics/*.parquet")

df.sortWithinPartitions("review_date").writeTo("dev.db.amazon_reviews_iceberg").append()

Insert a single record into the same Iceberg table so that it creates a partition with the current review_date:

spark.sql("""insert into dev.db.amazon_reviews_iceberg values ("US", "99999999","R2RX7KLOQQ5VBG","B00000JBAT","738692522","Diamond Rio Digital",3,0,0,"N","N","Why just 30 minutes?","RIO is really great",date("2023-04-06"),2023)""")

You can check the new snapshot is created after this append operation by querying the Iceberg snapshot:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

You will see an output similar to the following showing the operations performed on the table.

Check the S3 tag population

You can use the AWS Command Line Interface (AWS CLI) or the AWS Management Console to check the tags populated for the new writes. Let’s check the tag corresponding to the object created by a single row insert.

On the Amazon S3 console, check the S3 folder s3://your-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/data/ and point to the partition review_date_year=2023/. Then check the Parquet file under this folder to check the tags associated with the data file in Parquet format.

From the AWS CLI, run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3.write.tags.write-tag-name":"created":

xxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket your-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see an output, similar to the below, showing the associated tags for the file

{ "VersionId": "null", "TagSet": [{ "Key": "write-tag-name", "Value": "created" } ] }

Delete a record and expire a snapshot

In this step, we delete a record from the Iceberg table and expire the snapshot corresponding to the deleted record. We delete the new single record that we inserted with the current review_date:

spark.sql("""delete from dev.db.amazon_reviews_iceberg where review_date = '2023-04-06'""")

We can now check that a new snapshot was created with the operation flagged as delete:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

This is useful if we want to time travel and check the deleted row in the future. In that case, we have to query the table with the snapshot-id corresponding to the deleted row. However, we don’t discuss time travel as part of this post.

We expire the old snapshots from the table and keep only the last two. You can modify the query based on your specific requirements to retain the snapshots:

spark.sql ("""CALL dev.system.expire_snapshots(table => 'dev.db.amazon_reviews_iceberg', older_than => DATE '2024-01-01', retain_last => 2)""")

If we run the same query on the snapshots, we can see that we have only two snapshots available:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.snapshots""").show()

From the AWS CLI, you can run the following command to see that the tag is created based on the Spark configuration spark.sql.catalog.dev.s3. delete.tags.delete-tag-name":"deleted":

xxxxxx@3c22fb1238d8 ~ % aws s3api get-object-tagging --bucket avijit-iceberg-storage-blog --key iceberg/db/amazon_reviews_iceberg/data/review_date_year=2023/00000-43-2fb892e3-0a3f-4821-a356-83204a69fa74-00001.parquet

You will see output similar to below showing the associated tags for the file

{ "VersionId": "null", "TagSet": [ { "Key": "delete-tag-name", "Value": "deleted" }, { "Key": "write-tag-name", "Value": "created" } ] }

You can view the existing metadata files from the metadata log entries metatable after the expiration of snapshots:

spark.sql("""SELECT * FROM dev.db.amazon_reviews_iceberg.metadata_log_entries""").show()

The snapshots that have expired show the latest snapshot ID as null.

Create S3 lifecycle rules to transition the buckets to a different storage tier

Create a lifecycle configuration for the bucket to transition objects with the delete-tag-name=deleted S3 tag to the Glacier Instant Retrieval class. Amazon S3 runs lifecycle rules one time every day at midnight Universal Coordinated Time (UTC), and new lifecycle rules can take up to 48 hours to complete the first run. Amazon S3 Glacier is well suited to archive data that needs immediate access (with milliseconds retrieval). With S3 Glacier Instant Retrieval, you can save up to 68% on storage costs compared to using the S3 Standard-Infrequent Access (S3 Standard-IA) storage class, when the data is accessed once per quarter.

When you want to access the data back, you can bulk restore the archived objects. After you restore the objects back in S3 Standard class, you can register the metadata and data as an archival table for query purposes. The metadata file location can be fetched from the metadata log entries metatable as illustrated earlier. As mentioned before, the latest snapshot ID with Null values indicates expired snapshots. We can take one of the expired snapshots and do the bulk restore:

spark.sql("""CALL dev.system.register_table(table => 'db.amazon_reviews_iceberg_archive', metadata_file => 's3://avijit-iceberg-storage-blog/iceberg/db/amazon_reviews_iceberg/metadata/00000-a010f15c-7ac8-4cd1-b1bc-bba99fa7acfc.metadata.json')""").show()

Capabilities for disaster recovery and business continuity, cross-account and multi-Region access to the data lake

Because Iceberg doesn’t support relative paths, you can use access points to perform Amazon S3 operations by specifying a mapping of buckets to access points. This is useful for multi-Region access, cross-Region access, disaster recovery, and more.

For cross-Region access points, we need to additionally set the use-arn-region-enabled catalog property to true to enable S3FileIO to make cross-Region calls. If an Amazon S3 resource ARN is passed in as the target of an Amazon S3 operation that has a different Region than the one the client was configured with, this flag must be set to ‘true‘ to permit the client to make a cross-Region call to the Region specified in the ARN, otherwise an exception will be thrown. However, for the same or multi-Region access points, the use-arn-region-enabled flag should be set to ‘false’.

For example, to use an S3 access point with multi-Region access in Spark 3.3, you can start the Spark SQL shell with the following code:

spark-sql --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket2/my/key/prefix \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.s3.use-arn-region-enabled=false \
--conf spark.sql.catalog.test.s3.access-points.my-bucket1=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap \
--conf spark.sql.catalog.test.s3.access-points.my-bucket2=arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap

In this example, the objects in Amazon S3 on my-bucket1 and my-bucket2 buckets use the arn:aws:s3::123456789012:accesspoint:mfzwi23gnjvgw.mrap access point for all Amazon S3 operations.

For more details on using access points, refer to Using access points with compatible Amazon S3 operations.

Let’s say your table path is under mybucket1, so both mybucket1 in Region 1 and mybucket2 in Region have paths of mybucket1 inside the metadata files. At the time of the S3 (GET/PUT) call, we replace the mybucket1 reference with a multi-Region access point.

Handling increased S3 request rates

When using ObjectStoreLocationProvider (for more details, see Object Store File Layout), a deterministic hash is generated for each stored file, with the hash appended directly after the write.data.path. The problem with this is that the default hashing algorithm generates hash values up to Integer MAX_VALUE, which in Java is (2^31)-1. When this is converted to hex, it produces 0x7FFFFFFF, so the first character variance is restricted to only [0-8]. As per Amazon S3 recommendations, we should have the maximum variance here to mitigate this.

Starting from Amazon EMR 6.10, Amazon EMR added an optimized location provider that makes sure the generated prefix hash has uniform distribution in the first two characters using the character set from [0-9][A-Z][a-z].

This location provider has been recently open sourced by Amazon EMR via Core: Improve bit density in object storage layout and should be available starting from Iceberg 1.3.0.

To use, make sure the iceberg.enabled classification is set to true, and write.location-provider.impl is set to org.apache.iceberg.emr.OptimizedS3LocationProvider.

The following is a sample Spark shell command:

spark-shell --conf spark.driver.memory=4g \
--conf spark.executor.cores=4 \
--conf spark.dynamicAllocation.enabled=true \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://my-bucket/iceberg-V516168123 \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.table-override.write.location-provider.impl=org.apache.iceberg.emr.OptimizedS3LocationProvider

The following example shows that when you enable the object storage in your Iceberg table, it adds the hash prefix in your S3 path directly after the location you provide in your DDL.

Define the table write.object-storage.enabled parameter and provide the S3 path, after which you want to add the hash prefix using write.data.path (for Iceberg Version 0.13 and above) or write.object-storage.path (for Iceberg Version 0.12 and below) parameters.

Insert data into the table you created.

The hash prefix is added right after the /current/ prefix in the S3 path as defined in the DDL.

Clean up

After you complete the test, clean up your resources to avoid any recurring costs:

  1. Delete the S3 buckets that you created for this test.
  2. Delete the EMR cluster.
  3. Stop and delete the EMR notebook instance.

Conclusion

As companies continue to build newer transactional data lake use cases using Apache Iceberg open table format on very large datasets on S3 data lakes, there will be an increased focus on optimizing those petabyte-scale production environments to reduce cost, improve efficiency, and implement high availability. This post demonstrated mechanisms to implement the operational efficiencies for Apache Iceberg open table formats running on AWS.

To learn more about Apache Iceberg and implement this open table format for your transactional data lake use cases, refer to the following resources:


About the Authors

Avijit Goswami is a Principal Solutions Architect at AWS specialized in data and analytics. He supports AWS strategic customers in building high-performing, secure, and scalable data lake solutions on AWS using AWS managed services and open-source solutions. Outside of his work, Avijit likes to travel, hike in the San Francisco Bay Area trails, watch sports, and listen to music.

Rajarshi Sarkar is a Software Development Engineer at Amazon EMR/Athena. He works on cutting-edge features of Amazon EMR/Athena and is also involved in open-source projects such as Apache Iceberg and Trino. In his spare time, he likes to travel, watch movies, and hang out with friends.

Prashant Singh is a Software Development Engineer at AWS. He is interested in Databases and Data Warehouse engines and has worked on Optimizing Apache Spark performance on EMR. He is an active contributor in open source projects like Apache Spark and Apache Iceberg. During his free time, he enjoys exploring new places, food and hiking.

Dive deep into AWS Glue 4.0 for Apache Spark

Post Syndicated from Gonzalo Herreros original https://aws.amazon.com/blogs/big-data/dive-deep-into-aws-glue-4-0-for-apache-spark/

Deriving insight from data is hard. It’s even harder when your organization is dealing with silos that impede data access across different data stores. Seamless data integration is a key requirement in a modern data architecture to break down data silos. AWS Glue is a serverless data integration service that makes data preparation simpler, faster, and cheaper. You can discover and connect to over 70 diverse data sources, manage your data in a centralized data catalog, and create, run, and monitor data integration pipelines to load data into your data lakes and your data warehouses. AWS Glue for Apache Spark takes advantage of Apache Spark’s powerful engine to process large data integration jobs at scale.

AWS Glue released version 4.0 at AWS re:Invent 2022, which includes many upgrades, such as the new optimized Apache Spark 3.3.0 runtime (3.5 times performance improvement on average over open-source Apache Spark 3.3.0), Python 3.10, and a new enhanced Amazon Redshift connector (10 times performance improvement on average over the previous version).

In this post, we discuss the main benefits that this new AWS Glue version brings and how it can help you build better data integration pipelines.

Spark upgrade highlights

The new version of Spark included in AWS Glue 4.0 brings a number of valuable features, which we highlight in this section. For more details, refer to Spark Release 3.3.0 and Spark Release 3.2.0.

Support for the pandas API

Support for the pandas API allows users familiar with the popular Python library to start writing distributed extract, transform, and load (ETL) jobs without having to learn a new framework API. We discuss this in more detail later in this post.

Python UDF profiling

With Python UDF profiling, now you can profile regular and pandas user-defined functions (UDFs). Calling show_profiles() on the SparkContext to get details about the Python run was added on Spark 1 for RDD; now it also works for DataFrame Python UDFs and provides valuable information about how many calls to the UDF are made and how much time is spent on it.

For instance, to illustrate how the profiler measures time, the following example is the profile of a pandas UDF that processes over a million rows (but the pandas UDF only needs 112 calls) and sleeps for 1 second. We can use the command spark.sparkContext.show_profiles(), as shown in the following screenshot.

Python UDF Profiling

Pushdown filters

Pushdown filters are used in more scenarios, such as aggregations or limits. The upgrade also offers support for Bloom filters and skew optimization. These improvements allow for handling larger datasets by reading less data and processing it more efficiently. For more information, refer to Spark Release 3.3.0.

ANSI SQL

Now you can ask SparkSQL to follow the ANSI behavior on those points that it traditionally differed from the standard. This helps users bring their existing SQL skills and start producing value on AWS Glue faster. For more information, refer to ANSI Compliance.

Adaptive query execution by default

Adaptive query execution (AQE) by default helps optimize Spark SQL performance. You can turn AQE on and off by using spark.sql.adaptive.enabled as an umbrella configuration. Since AWS Glue 4.0, it’s enabled by default, so you no longer need to enable this explicitly.

Improved error messages

Improved error messages provide better context and easy resolution. For instance, if you have a division by zero in a SQL statement on ANSI mode, previously you would get just a Java exception: java.lang.ArithmeticException: divide by zero. Depending on the complexity and number of queries, the cause might not be obvious and might require some reruns with trial and error until it’s identified.

On the new version, you get a much more revealing error:

Caused by: org.apache.spark.SparkArithmeticException: Division by zero. 
Use `try_divide` to tolerate divisor being 0 and return NULL instead. 
If necessary set "spark.sql.ansi.enabled" to "false" (except for ANSI interval type) to bypass this error.
== SQL(line 1, position 8) ==
select sum(cost)/(select count(1) from items where cost > 100) from items
       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

Not only does it show the query that caused the issue, but it also indicates the specific operation where the error occurred (the division in this case). In addition, it provides some guidance on resolving the issue.

New pandas API on Spark

The Spark upgrade brings a new, exciting feature, which is the chance to use your existing Python pandas framework knowledge in a distributed and scalable runtime. This lowers the barrier of entry for teams without previous Spark experience, so they can start delivering value quickly and make the most of the AWS Glue for Spark runtime.

The new API provides a pandas DataFrame-compatible API, so you can use existing pandas code and migrate it to AWS Glue for Spark changing the imports, although it’s not 100% compatible.

If you just want to migrate existing pandas code to run on pandas on Spark, you could replace the import and test:

# Replace pure pandas 
import pandas as pd
# With pandas API on Spark
import pyspark.pandas as pd

In some cases, you might want to use multiple implementations on the same script, for instance because a feature is still not available on the pandas API for Spark or the data is so small that some operations are more efficient if done locally rather than distributed. In that situation, to avoid confusion, it’s better to use a different alias for the pandas and the pandas on Spark module imports, and to follow a convention to name the different types of DataFrames, because it has implications in performance and features, for instance, pandas DataFrame variables starting with pdf_, pandas on Spark as psdf_, and standard Spark as sdf_ or just df_.

You can also convert to a standard Spark DataFrame calling to_spark(). This allows you to use features not available on pandas such as writing directly to catalog tables or using some Spark connectors.

The following code shows an example of combining the different types of APIs:

# The job has the parameter "--additional-python-modules":"openpyxl", 
#  the Excel library is not provided by default

import pandas as pd
# pdf is a pure pandas DF which resides in the driver memory only
pdf = pd.read_excel('s3://mybucket/mypath/MyExcel.xlsx', index_col=0)

import pyspark.pandas as ps
# psdf behaves like a pandas df but operations on it will be distributed among nodes
psdf = ps.from_pandas(pdf)
means_series = psdf.mean()

# Convert to a dataframe of column names and means
#  pandas on Spark series don't allow iteration so we convert to pure pandas
#  on a big dataset this could cause the driver to be overwhelmed but not here
# We reset to have the index labels as a columns to use them later
pdf_means = pd.DataFrame(means_series.to_pandas()).reset_index(level=0)
# Set meaningful column names
pdf_means.columns = ["excel_column", "excel_average"]

# We want to use this to enrich a Spark DF representing a huge table
#  convert to standard Spark DF so we can use the start API
sdf_means = ps.from_pandas(pdf_means).to_spark()

sdf_bigtable = spark.table("somecatalog.sometable")
sdf_bigtable.join(sdf_means, sdf_bigtable.category == sdf_means.excel_column)

Improved Amazon Redshift connector

A new version of the Amazon Redshift connector brings many improvements:

  • Pushdown optimizations
  • Support for reading SUPER columns
  • Writing allowed based on column names instead of position
  • An optimized serializer to increase performance when reading from Amazon Redshift
  • Other minor improvements like trimming pre- and post-actions and handling numeric time zone formats

This new Amazon Redshift connector is built on top of an existing open-source connector project and offers further enhancements for performance and security, helping you gain up to 10 times faster application performance. It accelerates AWS Glue jobs when reading from Amazon Redshift, and also enables you to run data-intensive workloads more reliably. For more details, see Moving data to and from Amazon Redshift. To learn more about how to use it, refer to New – Amazon Redshift Integration with Apache Spark.

When you use the new Amazon Redshift connector on an AWS Glue DynamicFrame, use the existing methods: GlueContext.create_data_frame and GlueContext.write_data_frame.

When you use the new Amazon Redshift connector on a Spark DataFrame, use the format io.github.spark_redshift_community.spark.redshift, as shown in the following code snippet:

df = spark.read.format("io.github.spark_redshift_community.spark.redshift") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("tempdir", redshiftTmpDir) \
    .option("user", user) \
    .option("password", password) \
    .option("aws_iam_role", aws_iam_role) \
    .load()

Other upgrades and improvements

The following are updates and improvements in the dependent libraries:

  • Spark 3.3.0-amzn-0
  • Hadoop 3.2.1-amzn-8
  • Hive 2.39-amzn-2
  • Parquet 1.12
  • Log4j 2
  • Python 3.10
  • Arrow 7.0.0
  • Boto3 1.26
  • EMRFS 2.53.0
  • AWS Glue Data Catalog client 3.6.0
  • New versions of the provided JDBC drivers:
    • MySQL 8.0.23
    • PostgreSQL 42.3.6
    • Microsoft SQL Server 9.4.0
    • Oracle 21.7
    • MongoDB 4.7.2
    • Amazon Redshift redshift-jdbc42-2.1.0.9
  • Integrated and upgraded plugins to popular table formats:
    • Iceberg 1.0.0
    • Hudi 0.12.1
    • Delta Lake 2.1.0

To learn more, refer to the appendices in Migrating AWS Glue jobs to AWS Glue version 4.0.

Improved performance

In addition to all the new features, AWS Glue 4.0 brings performance improvements at lower cost. In summary, AWS Glue 4.0 with Amazon Simple Storage Service (Amazon S3) is 2.7 times faster than AWS Glue 3.0, and AWS Glue 4.0 with Amazon Redshift is 7.1 times faster than AWS Glue 3.0. In the following sections, we provide details about AWS Glue 4.0 performance results with Amazon S3 and Amazon Redshift.

Amazon S3

The following chart shows the total job runtime for all queries (in seconds) in the 3 TB query dataset between AWS Glue 3.0 and AWS Glue 4.0. The TPC-DS dataset is located in an S3 bucket in Parquet format, and we used 30 G.2X workers in AWS Glue. We observed that our TPC-DS tests on Amazon S3 had a total job runtime on AWS Glue 4.0 that was 2.7 times faster than that on AWS Glue 3.0. Detailed instructions are explained in the appendix of this post.

. AWS Glue 3.0 AWS Glue 4.0
Total Query Time 5084.94274 1896.1904
Geometric Mean 14.00217 10.09472

TPC-DS benchmark result with S3

Amazon Redshift

The following chart shows the total job runtime for all queries (in seconds) in the 1 TB query dataset between AWS Glue 3.0 and AWS Glue 4.0. The TPC-DS dataset is located in a five-node ra3.4xlarge Amazon Redshift cluster, and we used 150 G.2X workers in AWS Glue. We observed that our TPC-DS tests on Amazon Redshift had a total job runtime on AWS Glue 4.0 that was 7.1 times faster than that on AWS Glue 3.0.

. AWS Glue 3.0 AWS Glue 4.0
Total Query Time 22020.58 3075.96633
Geometric Mean 142.38525 8.69973

TPC-DS benchmark result with Redshift

Get started with AWS Glue 4.0

You can start using AWS Glue 4.0 via AWS Glue Studio, the AWS Glue console, the latest AWS SDK, and the AWS Command Line Interface (AWS CLI).

To start using AWS Glue 4.0 in AWS Glue Studio, open the AWS Glue job and on the Job details tab, choose the version Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.

Glue version 4.0 in Glue Studio

To migrate your existing AWS Glue jobs from AWS Glue 0.9, 1.0, 2.0, and 3.0 to AWS Glue 4.0, see Migrating AWS Glue jobs to AWS Glue version 4.0.

The AWS Glue 4.0 Docker images are now available on Docker Hub, so you can use them to develop locally for the new version. Refer to Develop and test AWS Glue version 3.0 and 4.0 jobs locally using a Docker container for further details.

Conclusion

In this post, we discussed the main upgrades provided by the new 4.0 version of AWS Glue. You can already start writing new jobs on that version and benefit from all the improvements, as well as migrate your existing jobs.


About the authors

Gonzalo Herreros is a Senior Big Data Architect on the AWS Glue team. He’s been an Apache Spark enthusiast since version 0.8. In his spare time, he likes playing board games.

Noritaka Sekiyama is a Principal Big Data Architect on the AWS Glue team. He works based in Tokyo, Japan. He is responsible for building software artifacts to help customers. In his spare time, he enjoys cycling with his road bike.

Bo Li is a Senior Software Development Engineer on the AWS Glue team. He is devoted to designing and building end-to-end solutions to address customer’s data analytic and processing needs with cloud-based data-intensive technologies.

Rajendra Gujja is a Senior Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything about the data.

Savio Dsouza is a Software Development Manager on the AWS Glue team. His team works on solving challenging distributed systems problems for data integration on Glue platform for customers using Apache Spark.

Mohit Saxena is a Senior Software Development Manager on the AWS Glue team. His team works on distributed systems for building data lakes on AWS and simplifying integration with data warehouses for customers using Apache Spark.


Appendix: TPC-DS benchmark on AWS Glue against a dataset on Amazon S3

To perform a TPC-DS benchmark on AWS Glue against a dataset in an S3 bucket, you need to copy the TPC-DS dataset into your S3 bucket. These instructions are based on emr-spark-benchmark:

  1. Create a new S3 bucket in your test account if needed. In the following code, replace $YOUR_S3_BUCKET with your S3 bucket name. We suggest you export YOUR_S3_BUCKET as an environment variable:
export YOUR_S3_BUCKET=<Your bucket name>
aws s3 mb s3://$YOUR_S3_BUCKET
  1. Copy the TPC-DS source data as input to your S3 bucket. If it’s not exported as an environment variable, replace $YOUR_S3_BUCKET with your S3 bucket name:
aws s3 sync s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/ s3://$YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned/
  1. Build the benchmark application following the instructions in Steps to build spark-benchmark-assembly application.

For your convenience, we have provided the sample application JAR file spark-benchmark-assembly-3.3.0.jar, which we built for AWS Glue 4.0.

  1. Upload the spark-benchmar-assembly JAR file to your S3 bucket.
  2. In AWS Glue Studio, create a new AWS Glue job through the script editor:
    1. Under Job details, for Type, choose Spark.
    2. For Glue version, choose Glue 4.0 – Supports spark 3.3, Scala 2, Python 3.
    3. For Language, choose Scala.
    4. For Worker type, choose your preferred worker type.
    5. For Requested number of workers, choose your preferred number.
    6. Under Advanced properties, for Dependent JARs path, enter your S3 path for the spark-benchmark-assembly JAR file.
    7. For Script, enter the following code snippet:
import com.amazonaws.eks.tpcds.BenchmarkSQL

object GlueApp {
  def main(sysArgs: Array[String]): Unit = {
    BenchmarkSQL.main(
      Array(
        "s3://YOUR_S3_BUCKET/blog/BLOG_TPCDS-TEST-3T-partitioned",
        "s3://YOUR_S3_BUCKET/blog/GLUE_TPCDS-TEST-3T-RESULT/", 
        "/opt/tpcds-kit/tools", 
        "parquet", 
        "3000", 
        "3", 
        "false", 
        "q1-v2.4,q10-v2.4,q11-v2.4,q12-v2.4,q13-v2.4,q14a-v2.4,q14b-v2.4,q15-v2.4,q16-v2.4,q17-v2.4,q18-v2.4,q19-v2.4,q2-v2.4,q20-v2.4,q21-v2.4,q22-v2.4,q23a-v2.4,q23b-v2.4,q24a-v2.4,q24b-v2.4,q25-v2.4,q26-v2.4,q27-v2.4,q28-v2.4,q29-v2.4,q3-v2.4,q30-v2.4,q31-v2.4,q32-v2.4,q33-v2.4,q34-v2.4,q35-v2.4,q36-v2.4,q37-v2.4,q38-v2.4,q39a-v2.4,q39b-v2.4,q4-v2.4,q40-v2.4,q41-v2.4,q42-v2.4,q43-v2.4,q44-v2.4,q45-v2.4,q46-v2.4,q47-v2.4,q48-v2.4,q49-v2.4,q5-v2.4,q50-v2.4,q51-v2.4,q52-v2.4,q53-v2.4,q54-v2.4,q55-v2.4,q56-v2.4,q57-v2.4,q58-v2.4,q59-v2.4,q6-v2.4,q60-v2.4,q61-v2.4,q62-v2.4,q63-v2.4,q64-v2.4,q65-v2.4,q66-v2.4,q67-v2.4,q68-v2.4,q69-v2.4,q7-v2.4,q70-v2.4,q71-v2.4,q72-v2.4,q73-v2.4,q74-v2.4,q75-v2.4,q76-v2.4,q77-v2.4,q78-v2.4,q79-v2.4,q8-v2.4,q80-v2.4,q81-v2.4,q82-v2.4,q83-v2.4,q84-v2.4,q85-v2.4,q86-v2.4,q87-v2.4,q88-v2.4,q89-v2.4,q9-v2.4,q90-v2.4,q91-v2.4,q92-v2.4,q93-v2.4,q94-v2.4,q95-v2.4,q96-v2.4,q97-v2.4,q98-v2.4,q99-v2.4,ss_max-v2.4", 
        "true"
      )
    )
  }
}
  1. Save and run the job.

The result file will be stored under s3://YOUR_S3_BUCKET/blog/GLUE_TPCDS-TEST-3T-RESULT/.