Tag Archives: Amazon Athena

Automate AWS Clean Rooms querying and dashboard publishing using AWS Step Functions and Amazon QuickSight – Part 2

Post Syndicated from Venkata Kampana original https://aws.amazon.com/blogs/big-data/automate-aws-clean-rooms-querying-and-dashboard-publishing-using-aws-step-functions-and-amazon-quicksight-part-2/

Public health organizations need access to data insights that they can quickly act upon, especially in times of health emergencies, when data needs to be updated multiple times daily. For example, during the COVID-19 pandemic, access to timely data insights was critically important for public health agencies worldwide as they coordinated emergency response efforts. Up-to-date information and analysis empowered organizations to monitor the rapidly changing situation and direct resources accordingly.

This is the second post in this series; we recommend that you read this first post before diving deep into this solution. In our first post, Enable data collaboration among public health agencies with AWS Clean Rooms – Part 1 , we showed how public health agencies can create AWS Clean Room collaborations, invite other stakeholders to join the collaboration, and run queries on their collective data without either party having to share or copy underlying data with each other. As mentioned in the previous blog, AWS Clean Rooms enables multiple organizations to analyze their data and unlock insights they can act upon, without having to share sensitive, restricted, or proprietary records.

However, public health organizations leaders and decision-making officials don’t directly access data collaboration outputs from their Amazon Simple Storage Service (Amazon S3) buckets. Instead, they rely on up-to-date dashboards that help them visualize data insights to make informed decisions quickly.

To ensure these dashboards showcase the most updated insights, the organization builders and data architects need to catalog and update AWS Clean Rooms collaboration outputs on an ongoing basis, which often involves repetitive and manual processes that, if not done well, could delay your organization’s access to the latest data insights.

Manually handling repetitive daily tasks at scale poses risks like delayed insights, miscataloged outputs, or broken dashboards. At a large volume, it would require around-the-clock staffing, straining budgets. This manual approach could expose decision-makers to inaccurate or outdated information.

Automating repetitive workflows, validation checks, and programmatic dashboard refreshes removes human bottlenecks and help decrease inaccuracies. Automation helps ensure continuous, reliable processes that deliver the most current data insights to leaders without delays, all while streamlining resources.

In this post, we explain an automated workflow using AWS Step Functions and Amazon QuickSight to help organizations access the most current results and analyses, without delays from manual data handling steps. This workflow implementation will empower decision-makers with real-time visibility into the evolving collaborative analysis outputs, ensuring they have up-to-date, relevant insights that they can act upon quickly

Solution overview

The following reference architecture illustrates some of the foundational components of clean rooms query automation and publishing dashboards using AWS services. We automate running queries using Step Functions with Amazon EventBridge schedules, build an AWS Glue Data Catalog on query outputs, and publish dashboards using QuickSight so they automatically refresh with new data. This allows public health teams to monitor the most recent insights without manual updates.

The architecture consists of the following components, as numbered in the preceding figure:

  1. A scheduled event rule on EventBridge triggers a Step Functions workflow.
  2. The Step Functions workflow initiates the run of a query using the StartProtectedQuery AWS Clean Rooms API. The submitted query runs securely within the AWS Clean Rooms environment, ensuring data privacy and compliance. The results of the query are then stored in a designated S3 bucket, with a unique protected query ID serving as the prefix for the stored data. This unique identifier is generated by AWS Clean Rooms for each query run, maintaining clear segregation of results.
  3. When the AWS Clean Rooms query is successfully complete, the Step Functions workflow calls the AWS Glue API to update the location of the table in the AWS Glue Data Catalog with the Amazon S3 location where the query results were uploaded in Step 2.
  4. Amazon Athena uses the catalog from the Data Catalog to query the information using standard SQL.
  5. QuickSight is used to query, build visualizations, and publish dashboards using the data from the query results.

Prerequisites

For this walkthrough, you need the following:

Launch the CloudFormation stack

In this post, we provide a CloudFormation template to create the following resources:

  • An EventBridge rule that triggers the Step Functions state machine on a schedule
  • An AWS Glue database and a catalog table
  • An Athena workgroup
  • Three S3 buckets:
    • For AWS Clean Rooms to upload the results of query runs
    • For Athena to upload the results for the queries
    • For storing access logs of other buckets
  • A Step Functions workflow designed to run the AWS Clean Rooms query, upload the results to an S3 bucket, and update the table location with the S3 path in the AWS Glue Data Catalog
  • An AWS Key Management Service (AWS KMS) customer-managed key to encrypt the data in S3 buckets
  • AWS Identity and Access Management (IAM) roles and policies with the necessary permissions

To create the necessary resources, complete the following steps:

  1. Choose Launch Stack:

Launch Button

  1. Enter cleanrooms-query-automation-blog for Stack name.
  2. Enter the membership ID from the AWS Clean Rooms collaboration you created in Part 1 of this series.
  3. Choose Next.

  1. Choose Next again.
  2. On the Review page, select I acknowledge that AWS CloudFormation might create IAM resources.
  3. Choose Create stack.

After you run the CloudFormation template and create the resources, you can find the following information on the stack Outputs tab on the AWS CloudFormation console:

  • AthenaWorkGroup – The Athena workgroup
  • EventBridgeRule – The EventBridge rule triggering the Step Functions state machine
  • GlueDatabase – The AWS Glue database
  • GlueTable – The AWS Glue table storing metadata for AWS Clean Rooms query results
  • S3Bucket – The S3 bucket where AWS Clean Rooms uploads query results
  • StepFunctionsStateMachine – The Step Functions state machine

Test the solution

The EventBridge rule named cleanrooms_query_execution_Stepfunctions_trigger is scheduled to trigger every 1 hour. When this rule is triggered, it initiates the run of the CleanRoomsBlogStateMachine-XXXXXXX Step Functions state machine. Complete the following steps to test the end-to-end flow of this solution:

  1. On the Step Functions console, navigate to the state machine you created.
  2. On the state machine details page, locate the latest query run.

The details page lists the completed steps:

  • The state machine submits a query to AWS Clean Rooms using the startProtectedQuery API. The output of the API includes the query run ID and its status.
  • The state machine waits for 30 seconds before checking the status of the query run.
  • After 30 seconds, the state machine checks the query status using the getProtectedQuery API. When the status changes to SUCCESS, it proceeds to the next step to retrieve the AWS Glue table metadata information. The output of this step contains the S3 location to which the query run results are uploaded.
  • The state machine retrieves the metadata of the AWS Glue table named patientimmunization, which was created via the CloudFormation stack.
  • The state machine updates the S3 location (the location to which AWS Clean Rooms uploaded the results) in the metadata of the AWS Glue table.
  • After a successful update of the AWS Glue table metadata, the state machine is complete.
  1. On the Athena console, switch the workgroup to CustomWorkgroup.
  2. Run the following query:
“SELECT * FROM "cleanrooms_patientdb "."patientimmunization" limit 10;"

Visualize the data with QuickSight

Now that you can query your data in Athena, you can use QuickSight to visualize the results. Let’s start by granting QuickSight access to the S3 bucket where your AWS Clean Rooms query results are stored.

Grant QuickSight access to Athena and your S3 bucket

First, grant QuickSight access to the S3 bucket:

  1. Sign in to the QuickSight console.
  2. Choose your user name, then choose Manage QuickSight.
  3. Choose Security and permissions.
  4. For QuickSight access to AWS services, choose Manage.
  5. For Amazon S3, choose Select S3 buckets, and choose the S3 bucket named cleanrooms-query-execution-results -XX-XXXX-XXXXXXXXXXXX (XXXXX represents the AWS Region and account number where the solution is deployed).
  6. Choose Save.

Create your datasets and publish visuals

Before you can analyze and visualize the data in QuickSight, you must create datasets for your Athena tables.

  1. On the QuickSight console, choose Datasets in the navigation pane.
  2. Choose New dataset.
  3. Select Athena.
  4. Enter a name for your dataset.
  5. Choose Create data source.
  6. Choose the AWS Glue database cleanrooms_patientdb and select the table PatientImmunization.
  7. Select Directly query your data.
  8. Choose Visualize.

  1. On the Analysis tab, choose the visual type of your choice and add visuals.

Clean up

Complete the following steps to clean up your resources when you no longer need this solution:

  1. Manually delete the S3 buckets and the data stored in the bucket.
  2. Delete the CloudFormation templates.
  3. Delete the QuickSight analysis.
  4. Delete the data source.

Conclusion

In this post, we demonstrated how to automate running AWS Clean Rooms queries using an API call from Step Functions. We also showed how to update the query results information on the existing AWS Glue table, query the information using Athena, and create visuals using QuickSight.

The automated workflow solution delivers real-time insights from AWS Clean Rooms collaborations to decision makers through automated checks for new outputs, processing, and Amazon QuickSight dashboard refreshes. This eliminates manual handling tasks, enabling faster data-driven decisions based on latest analyses. Additionally, automation frees up staff resources to focus on more strategic initiatives rather than repetitive updates.

Contact the public sector team directly to learn more about how to set up this solution, or reach out to your AWS account team to engage on a proof of concept of this solution for your organization.

About AWS Clean Rooms

AWS Clean Rooms helps companies and their partners more easily and securely analyze and collaborate on their collective datasets—without sharing or copying one another’s underlying data. With AWS Clean Rooms, you can create a secure data clean room in minutes, and collaborate with any other company on the AWS Cloud to generate unique insights about advertising campaigns, investment decisions, and research and development.

The AWS Clean Rooms team is continually building new features to help you collaborate. Watch this video to learn more about privacy-enhanced collaboration with AWS Clean Rooms.

Check out more AWS Partners or contact an AWS Representative to know how we can help accelerate your business.

Additional resources


About the Authors

Venkata Kampana is a Senior Solutions Architect in the AWS Health and Human Services team and is based in Sacramento, CA. In that role, he helps public sector customers achieve their mission objectives with well-architected solutions on AWS.

Jim Daniel is the Public Health lead at Amazon Web Services. Previously, he held positions with the United States Department of Health and Human Services for nearly a decade, including Director of Public Health Innovation and Public Health Coordinator. Before his government service, Jim served as the Chief Information Officer for the Massachusetts Department of Public Health.

Mastering market dynamics: Transforming transaction cost analytics with ultra-precise Tick History – PCAP and Amazon Athena for Apache Spark

Post Syndicated from Pramod Nayak original https://aws.amazon.com/blogs/big-data/mastering-market-dynamics-transforming-transaction-cost-analytics-with-ultra-precise-tick-history-pcap-and-amazon-athena-for-apache-spark/

This post is cowritten with Pramod Nayak, LakshmiKanth Mannem and Vivek Aggarwal from the Low Latency Group of LSEG.

Transaction cost analysis (TCA) is widely used by traders, portfolio managers, and brokers for pre-trade and post-trade analysis, and helps them measure and optimize transaction costs and the effectiveness of their trading strategies. In this post, we analyze options bid-ask spreads from the LSEG Tick History – PCAP dataset using Amazon Athena for Apache Spark. We show you how to access data, define custom functions to apply on data, query and filter the dataset, and visualize the results of the analysis, all without having to worry about setting up infrastructure or configuring Spark, even for large datasets.

Background

Options Price Reporting Authority (OPRA) serves as a crucial securities information processor, collecting, consolidating, and disseminating last sale reports, quotes, and pertinent information for US Options. With 18 active US Options exchanges and over 1.5 million eligible contracts, OPRA plays a pivotal role in providing comprehensive market data.

On February 5, 2024, the Securities Industry Automation Corporation (SIAC) is set to upgrade the OPRA feed from 48 to 96 multicast channels. This enhancement aims to optimize symbol distribution and line capacity utilization in response to escalating trading activity and volatility in the US options market. SIAC has recommended that firms prepare for peak data rates of up to 37.3 GBits per second.

Despite the upgrade not immediately altering the total volume of published data, it enables OPRA to disseminate data at a significantly faster rate. This transition is crucial for addressing the demands of the dynamic options market.

OPRA stands out as one the most voluminous feeds, with a peak of 150.4 billion messages in a single day in Q3 2023 and a capacity headroom requirement of 400 billion messages over a single day. Capturing every single message is critical for transaction cost analytics, market liquidity monitoring, trading strategy evaluation, and market research.

About the data

LSEG Tick History – PCAP is a cloud-based repository, exceeding 30 PB, housing ultra-high-quality global market data. This data is meticulously captured directly within the exchange data centers, employing redundant capture processes strategically positioned in major primary and backup exchange data centers worldwide. LSEG’s capture technology ensures lossless data capture and uses a GPS time-source for nanosecond timestamp precision. Additionally, sophisticated data arbitrage techniques are employed to seamlessly fill any data gaps. Subsequent to capture, the data undergoes meticulous processing and arbitration, and is then normalized into Parquet format using LSEG’s Real Time Ultra Direct (RTUD) feed handlers.

The normalization process, which is integral to preparing the data for analysis, generates up to 6 TB of compressed Parquet files per day. The massive volume of data is attributed to the encompassing nature of OPRA, spanning multiple exchanges, and featuring numerous options contracts characterized by diverse attributes. Increased market volatility and market making activity on the options exchanges further contribute to the volume of data published on OPRA.

The attributes of Tick History – PCAP enable firms to conduct various analyses, including the following:

  • Pre-trade analysis – Evaluate potential trade impact and explore different execution strategies based on historical data
  • Post-trade evaluation – Measure actual execution costs against benchmarks to assess the performance of execution strategies
  • Optimized execution – Fine-tune execution strategies based on historical market patterns to minimize market impact and reduce overall trading costs
  • Risk management – Identify slippage patterns, identify outliers, and proactively manage risks associated with trading activities
  • Performance attribution – Separate the impact of trading decisions from investment decisions when analyzing portfolio performance

The LSEG Tick History – PCAP dataset is available in AWS Data Exchange and can be accessed on AWS Marketplace. With AWS Data Exchange for Amazon S3, you can access PCAP data directly from LSEG’s Amazon Simple Storage Service (Amazon S3) buckets, eliminating the need for firms to store their own copy of the data. This approach streamlines data management and storage, providing clients immediate access to high-quality PCAP or normalized data with ease of use, integration, and substantial data storage savings.

Athena for Apache Spark

For analytical endeavors, Athena for Apache Spark offers a simplified notebook experience accessible through the Athena console or Athena APIs, allowing you to build interactive Apache Spark applications. With an optimized Spark runtime, Athena helps the analysis of petabytes of data by dynamically scaling the number of Spark engines is less than a second. Moreover, common Python libraries such as pandas and NumPy are seamlessly integrated, allowing for the creation of intricate application logic. The flexibility extends to the importation of custom libraries for use in notebooks. Athena for Spark accommodates most open-data formats and is seamlessly integrated with the AWS Glue Data Catalog.

Dataset

For this analysis, we used the LSEG Tick History – PCAP OPRA dataset from May 17, 2023. This dataset comprises the following components:

  • Best bid and offer (BBO) – Reports the highest bid and lowest ask for a security at a given exchange
  • National best bid and offer (NBBO) – Reports the highest bid and lowest ask for a security across all exchanges
  • Trades – Records completed trades across all exchanges

The dataset involves the following data volumes:

  • Trades – 160 MB distributed across approximately 60 compressed Parquet files
  • BBO – 2.4 TB distributed across approximately 300 compressed Parquet files
  • NBBO – 2.8 TB distributed across approximately 200 compressed Parquet files

Analysis overview

Analyzing OPRA Tick History data for Transaction Cost Analysis (TCA) involves scrutinizing market quotes and trades around a specific trade event. We use the following metrics as part of this study:

  • Quoted spread (QS) – Calculated as the difference between the BBO ask and the BBO bid
  • Effective spread (ES) – Calculated as the difference between the trade price and the midpoint of the BBO (BBO bid + (BBO ask – BBO bid)/2)
  • Effective/quoted spread (EQF) – Calculated as (ES / QS) * 100

We calculate these spreads before the trade and additionally at four intervals after the trade (just after, 1 second, 10 seconds, and 60 seconds after the trade).

Configure Athena for Apache Spark

To configure Athena for Apache Spark, complete the following steps:

  1. On the Athena console, under Get started, select Analyze your data using PySpark and Spark SQL.
  2. If this is your first time using Athena Spark, choose Create workgroup.
  3. For Workgroup name¸ enter a name for the workgroup, such as tca-analysis.
  4. In the Analytics engine section, select Apache Spark.
  5. In the Additional configurations section, you can choose Use defaults or provide a custom AWS Identity and Access Management (IAM) role and Amazon S3 location for calculation results.
  6. Choose Create workgroup.
  7. After you create the workgroup, navigate to the Notebooks tab and choose Create notebook.
  8. Enter a name for your notebook, such as tca-analysis-with-tick-history.
  9. Choose Create to create your notebook.

Launch your notebook

If you have already created a Spark workgroup, select Launch notebook editor under Get started.


After your notebook is created, you will be redirected to the interactive notebook editor.


Now we can add and run the following code to our notebook.

Create an analysis

Complete the following steps to create an analysis:

  • Import common libraries:
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
  • Create our data frames for BBO, NBBO, and trades:
bbo_quote = spark.read.parquet(f"s3://<bucket>/mt=bbo_quote/f=opra/dt=2023-05-17/*")
bbo_quote.createOrReplaceTempView("bbo_quote")
nbbo_quote = spark.read.parquet(f"s3://<bucket>/mt=nbbo_quote/f=opra/dt=2023-05-17/*")
nbbo_quote.createOrReplaceTempView("nbbo_quote")
trades = spark.read.parquet(f"s3://<bucket>/mt=trade/f=opra/dt=2023-05-17/29_1.parquet")
trades.createOrReplaceTempView("trades")
  • Now we can identify a trade to use for transaction cost analysis:
filtered_trades = spark.sql("select Product, Price,Quantity, ReceiptTimestamp, MarketParticipant from trades")

We get the following output:

+---------------------+---------------------+---------------------+-------------------+-----------------+ 
|Product |Price |Quantity |ReceiptTimestamp |MarketParticipant| 
+---------------------+---------------------+---------------------+-------------------+-----------------+ 
|QQQ 230518C00329000|1.1700000000000000000|10.0000000000000000000|1684338565538021907,NYSEArca|
|QQQ 230518C00329000|1.1700000000000000000|20.0000000000000000000|1684338576071397557,NASDAQOMXPHLX|
|QQQ 230518C00329000|1.1600000000000000000|1.0000000000000000000|1684338579104713924,ISE|
|QQQ 230518C00329000|1.1400000000000000000|1.0000000000000000000|1684338580263307057,NASDAQOMXBX_Options|
|QQQ 230518C00329000|1.1200000000000000000|1.0000000000000000000|1684338581025332599,ISE|
+---------------------+---------------------+---------------------+-------------------+-----------------+

We use the highlighted trade information going forward for the trade product (tp), trade price (tpr), and trade time (tt).

  • Here we create a number of helper functions for our analysis
def calculate_es_qs_eqf(df, trade_price):
    df['BidPrice'] = df['BidPrice'].astype('double')
    df['AskPrice'] = df['AskPrice'].astype('double')
    df["ES"] = ((df["AskPrice"]-df["BidPrice"])/2) - trade_price
    df["QS"] = df["AskPrice"]-df["BidPrice"]
    df["EQF"] = (df["ES"]/df["QS"])*100
    return df

def get_trade_before_n_seconds(trade_time, df, seconds=0, groupby_col = None):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] < nseconds].groupby(groupby_col).last()
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    ret_df = ret_df.reset_index()
    return ret_df

def get_trade_after_n_seconds(trade_time, df, seconds=0, groupby_col = None):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] > nseconds].groupby(groupby_col).first()
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    ret_df = ret_df.reset_index()
    return ret_df

def get_nbbo_trade_before_n_seconds(trade_time, df, seconds=0):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] < nseconds].iloc[-1:]
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    return ret_df

def get_nbbo_trade_after_n_seconds(trade_time, df, seconds=0):
    nseconds=seconds*1000000000
    nseconds += trade_time
    ret_df = df[df['ReceiptTimestamp'] > nseconds].iloc[:1]
    ret_df['BidPrice'] = ret_df['BidPrice'].astype('double')
    ret_df['AskPrice'] = ret_df['AskPrice'].astype('double')
    return ret_df
  • In the following function, we create the dataset that contains all the quotes before and after the trade. Athena Spark automatically determines how many DPUs to launch for processing our dataset.
def get_tca_analysis_via_df_single_query(trade_product, trade_price, trade_time):
    # BBO quotes
    bbos = spark.sql(f"SELECT Product, ReceiptTimestamp, AskPrice, BidPrice, MarketParticipant FROM bbo_quote where Product = '{trade_product}';")
    bbos = bbos.toPandas()

    bbo_just_before = get_trade_before_n_seconds(trade_time, bbos, seconds=0, groupby_col='MarketParticipant')
    bbo_just_after = get_trade_after_n_seconds(trade_time, bbos, seconds=0, groupby_col='MarketParticipant')
    bbo_1s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=1, groupby_col='MarketParticipant')
    bbo_10s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=10, groupby_col='MarketParticipant')
    bbo_60s_after = get_trade_after_n_seconds(trade_time, bbos, seconds=60, groupby_col='MarketParticipant')
    
    all_bbos = pd.concat([bbo_just_before, bbo_just_after, bbo_1s_after, bbo_10s_after, bbo_60s_after], ignore_index=True, sort=False)
    bbos_calculated = calculate_es_qs_eqf(all_bbos, trade_price)

    #NBBO quotes
    nbbos = spark.sql(f"SELECT Product, ReceiptTimestamp, AskPrice, BidPrice, BestBidParticipant, BestAskParticipant FROM nbbo_quote where Product = '{trade_product}';")
    nbbos = nbbos.toPandas()

    nbbo_just_before = get_nbbo_trade_before_n_seconds(trade_time,nbbos, seconds=0)
    nbbo_just_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=0)
    nbbo_1s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=1)
    nbbo_10s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=10)
    nbbo_60s_after = get_nbbo_trade_after_n_seconds(trade_time, nbbos, seconds=60)

    all_nbbos = pd.concat([nbbo_just_before, nbbo_just_after, nbbo_1s_after, nbbo_10s_after, nbbo_60s_after], ignore_index=True, sort=False)
    nbbos_calculated = calculate_es_qs_eqf(all_nbbos, trade_price)

    calc = pd.concat([bbos_calculated, nbbos_calculated], ignore_index=True, sort=False)
    
    return calc
  • Now let’s call the TCA analysis function with the information from our selected trade:
tp = "QQQ 230518C00329000"
tpr = 1.16
tt = 1684338579104713924
c = get_tca_analysis_via_df_single_query(tp, tpr, tt)

Visualize the analysis results

Now let’s create the data frames we use for our visualization. Each data frame contains quotes for one of the five time intervals for each data feed (BBO, NBBO):

bbo = c[c['MarketParticipant'].isin(['BBO'])]
bbo_bef = bbo[bbo['ReceiptTimestamp'] < tt]
bbo_aft_0 = bbo[bbo['ReceiptTimestamp'].between(tt,tt+1000000000)]
bbo_aft_1 = bbo[bbo['ReceiptTimestamp'].between(tt+1000000000,tt+10000000000)]
bbo_aft_10 = bbo[bbo['ReceiptTimestamp'].between(tt+10000000000,tt+60000000000)]
bbo_aft_60 = bbo[bbo['ReceiptTimestamp'] > (tt+60000000000)]

nbbo = c[~c['MarketParticipant'].isin(['BBO'])]
nbbo_bef = nbbo[nbbo['ReceiptTimestamp'] < tt]
nbbo_aft_0 = nbbo[nbbo['ReceiptTimestamp'].between(tt,tt+1000000000)]
nbbo_aft_1 = nbbo[nbbo['ReceiptTimestamp'].between(tt+1000000000,tt+10000000000)]
nbbo_aft_10 = nbbo[nbbo['ReceiptTimestamp'].between(tt+10000000000,tt+60000000000)]
nbbo_aft_60 = nbbo[nbbo['ReceiptTimestamp'] > (tt+60000000000)]

In the following sections, we provide example code to create different visualizations.

Plot QS and NBBO before the trade

Use the following code to plot the quoted spread and NBBO before the trade:

fig = px.bar(title="Quoted Spread Before The Trade",
    x=bbo_bef.MarketParticipant,
    y=bbo_bef['QS'],
    labels={'x': 'Market', 'y':'Quoted Spread'})
fig.add_hline(y=nbbo_bef.iloc[0]['QS'],
    line_width=1, line_dash="dash", line_color="red",
    annotation_text="NBBO", annotation_font_color="red")
%plotly fig

Plot QS for each market and NBBO after the trade

Use the following code to plot the quoted spread for each market and NBBO immediately after the trade:

fig = px.bar(title="Quoted Spread After The Trade",
    x=bbo_aft_0.MarketParticipant,
    y=bbo_aft_0['QS'],
    labels={'x': 'Market', 'y':'Quoted Spread'})
fig.add_hline(
    y=nbbo_aft_0.iloc[0]['QS'],
    line_width=1, line_dash="dash", line_color="red",
    annotation_text="NBBO", annotation_font_color="red")
%plotly fig

Plot QS for each time interval and each market for BBO

Use the following code to plot the quoted spread for each time interval and each market for BBO:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['QS']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['QS']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['QS']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['QS']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['QS'])])
fig.update_layout(barmode='group',title="BBO Quoted Spread Per Market/TimeFrame",
    xaxis={'title':'Market'},
    yaxis={'title':'Quoted Spread'})
%plotly fig

Plot ES for each time interval and market for BBO

Use the following code to plot the effective spread for each time interval and market for BBO:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['ES']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['ES']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['ES']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['ES']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['ES'])])
fig.update_layout(barmode='group',title="BBO Effective Spread Per Market/TimeFrame",
    xaxis={'title':'Market'}, 
    yaxis={'title':'Effective Spread'})
%plotly fig

Plot EQF for each time interval and market for BBO

Use the following code to plot the effective/quoted spread for each time interval and market for BBO:

fig = go.Figure(data=[
    go.Bar(name="before trade", x=bbo_bef.MarketParticipant.unique(), y=bbo_bef['EQF']),
    go.Bar(name="0s after trade", x=bbo_aft_0.MarketParticipant.unique(), y=bbo_aft_0['EQF']),
    go.Bar(name="1s after trade", x=bbo_aft_1.MarketParticipant.unique(), y=bbo_aft_1['EQF']),
    go.Bar(name="10s after trade", x=bbo_aft_10.MarketParticipant.unique(), y=bbo_aft_10['EQF']),
    go.Bar(name="60s after trade", x=bbo_aft_60.MarketParticipant.unique(), y=bbo_aft_60['EQF'])])
fig.update_layout(barmode='group',title="BBO Effective/Quoted Spread Per Market/TimeFrame",
    xaxis={'title':'Market'}, 
    yaxis={'title':'Effective/Quoted Spread'})
%plotly fig

Athena Spark calculation performance

When you run a code block, Athena Spark automatically determines how many DPUs it requires to complete the calculation. In the last code block, where we call the tca_analysis function, we are actually instructing Spark to process the data, and we then convert the resulting Spark dataframes into Pandas dataframes. This constitutes the most intensive processing part of the analysis, and when Athena Spark runs this block, it shows the progress bar, elapsed time, and how many DPUs are processing data currently. For example, in the following calculation, Athena Spark is utilizing 18 DPUs.

When you configure your Athena Spark notebook, you have the option of setting the maximum number of DPUs that it can use. The default is 20 DPUs, but we tested this calculation with 10, 20, and 40 DPUs to demonstrate how Athena Spark automatically scales to run our analysis. We observed that Athena Spark scales linearly, taking 15 minutes and 21 seconds when the notebook was configured with a maximum of 10 DPUs, 8 minutes and 23 seconds when the notebook was configured with 20 DPUs, and 4 minutes and 44 seconds when the notebook was configured with 40 DPUs. Because Athena Spark charges based on DPU usage, at a per-second granularity, the cost of these calculations is similar, but if you set a higher maximum DPU value, Athena Spark can return the result of the analysis much faster. For more details on Athena Spark pricing please click here.

Conclusion

In this post, we demonstrated how you can use high-fidelity OPRA data from LSEG’s Tick History-PCAP to perform transaction cost analytics using Athena Spark. The availability of OPRA data in a timely manner, complemented with accessibility innovations of AWS Data Exchange for Amazon S3, strategically reduces the time to analytics for firms looking to create actionable insights for critical trading decisions. OPRA generates about 7 TB of normalized Parquet data each day, and managing the infrastructure to provide analytics based on OPRA data is challenging.

Athena’s scalability in handling large-scale data processing for Tick History – PCAP for OPRA data makes it a compelling choice for organizations seeking swift and scalable analytics solutions in AWS. This post shows the seamless interaction between the AWS ecosystem and Tick History-PCAP data and how financial institutions can take advantage of this synergy to drive data-driven decision-making for critical trading and investment strategies.


About the Authors

Pramod Nayak is the Director of Product Management of the Low Latency Group at LSEG. Pramod has over 10 years of experience in the financial technology industry, focusing on software development, analytics, and data management. Pramod is a former software engineer and passionate about market data and quantitative trading.

LakshmiKanth Mannem is a Product Manager in the Low Latency Group of LSEG. He focuses on data and platform products for the low-latency market data industry. LakshmiKanth helps customers build the most optimal solutions for their market data needs.

Vivek Aggarwal is a Senior Data Engineer in the Low Latency Group of LSEG. Vivek works on developing and maintaining data pipelines for processing and delivery of captured market data feeds and reference data feeds.

Alket Memushaj is a Principal Architect in the Financial Services Market Development team at AWS. Alket is responsible for technical strategy, working with partners and customers to deploy even the most demanding capital markets workloads to the AWS Cloud.

Use Amazon Athena with Spark SQL for your open-source transactional table formats

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/use-amazon-athena-with-spark-sql-for-your-open-source-transactional-table-formats/

AWS-powered data lakes, supported by the unmatched availability of Amazon Simple Storage Service (Amazon S3), can handle the scale, agility, and flexibility required to combine different data and analytics approaches. As data lakes have grown in size and matured in usage, a significant amount of effort can be spent keeping the data consistent with business events. To ensure files are updated in a transactionally consistent manner, a growing number of customers are using open-source transactional table formats such as Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake that help you store data with high compression rates, natively interface with your applications and frameworks, and simplify incremental data processing in data lakes built on Amazon S3. These formats enable ACID (atomicity, consistency, isolation, durability) transactions, upserts, and deletes, and advanced features such as time travel and snapshots that were previously only available in data warehouses. Each storage format implements this functionality in slightly different ways; for a comparison, refer to Choosing an open table format for your transactional data lake on AWS.

In 2023, AWS announced general availability for Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake in Amazon Athena for Apache Spark, which removes the need to install a separate connector or associated dependencies and manage versions, and simplifies the configuration steps required to use these frameworks.

In this post, we show you how to use Spark SQL in Amazon Athena notebooks and work with Iceberg, Hudi, and Delta Lake table formats. We demonstrate common operations such as creating databases and tables, inserting data into the tables, querying data, and looking at snapshots of the tables in Amazon S3 using Spark SQL in Athena.

Prerequisites

Complete the following prerequisites:

Download and import example notebooks from Amazon S3

To follow along, download the notebooks discussed in this post from the following locations:

After you download the notebooks, import them into your Athena Spark environment by following the To import a notebook section in Managing notebook files.

Navigate to specific Open Table Format section

If you are interested in Iceberg table format, navigate to Working with Apache Iceberg tables section.

If you are interested in Hudi table format, navigate to Working with Apache Hudi tables section.

If you are interested in Delta Lake table format, navigate to Working with Linux foundation Delta Lake tables section.

Working with Apache Iceberg tables

When using Spark notebooks in Athena, you can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook cell that change the cell’s behavior. For SQL, we can add the %%sql magic, which will interpret the entire cell contents as a SQL statement to be run on Athena.

In this section, we show how you can use SQL on Apache Spark for Athena to create, analyze, and manage Apache Iceberg tables.

Set up a notebook session

In order to use Apache Iceberg in Athena, while creating or editing a session, select the Apache Iceberg option by expanding the Apache Spark properties section. It will pre-populate the properties as shown in the following screenshot.

This image shows the Apache Iceberg properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section is available in the SparkSQL_iceberg.ipynb file to follow along.

Create a database and Iceberg table

First, we create a database in the AWS Glue Data Catalog. With the following SQL, we can create a database called icebergdb:

%%sql
CREATE DATABASE icebergdb

Next, in the database icebergdb, we create an Iceberg table called noaa_iceberg pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE icebergdb.noaa_iceberg(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING iceberg
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaaiceberg/'

Insert data into the table

To populate the noaa_iceberg Iceberg table, we insert data from the Parquet table sparkblogdb.noaa_pq that was created as part of the prerequisites. You can do this using an INSERT INTO statement in Spark:

%%sql
INSERT INTO icebergdb.noaa_iceberg select * from sparkblogdb.noaa_pq

Alternatively, you can use CREATE TABLE AS SELECT with the USING iceberg clause to create an Iceberg table and insert data from a source table in one step:

%%sql
CREATE TABLE icebergdb.noaa_iceberg
USING iceberg
PARTITIONED BY (year)
AS SELECT * FROM sparkblogdb.noaa_pq

Query the Iceberg table

Now that the data is inserted in the Iceberg table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature by year for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

We get following output.

Image shows output of first select query

Update data in the Iceberg table

Let’s look at how to update data in our table. We want to update the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea-Tac'. Using Spark SQL, we can run an UPDATE statement against the Iceberg table:

%%sql
UPDATE icebergdb.noaa_iceberg
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We can then run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac' location:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'Sea-Tac'
group by 1,2

We get the following output.

Image shows output of second select query

Compact data files

Open table formats like Iceberg work by creating delta changes in file storage, and tracking the versions of rows through manifest files. More data files leads to more metadata stored in manifest files, and small data files often cause an unnecessary amount of metadata, resulting in less efficient queries and higher Amazon S3 access costs. Running Iceberg’s rewrite_data_files procedure in Spark for Athena will compact data files, combining many small delta change files into a smaller set of read-optimized Parquet files. Compacting files speeds up the read operation when queried. To run compaction on our table, run the following Spark SQL:

%%sql
CALL spark_catalog.system.rewrite_data_files
(table => 'icebergdb.noaa_iceberg', strategy=>'sort', sort_order => 'zorder(name)')

rewrite_data_files offers options to specify your sort strategy, which can help reorganize and compact data.

List table snapshots

Each write, update, delete, upsert, and compaction operation on an Iceberg table creates a new snapshot of a table while keeping the old data and metadata around for snapshot isolation and time travel. To list the snapshots of an Iceberg table, run the following Spark SQL statement:

%%sql
SELECT *
FROM spark_catalog.icebergdb.noaa_iceberg.snapshots

Expire old snapshots

Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. It will never remove files that are still required by a non-expired snapshot. In Spark for Athena, run the following SQL to expire snapshots for the table icebergdb.noaa_iceberg that are older than a specific timestamp:

%%sql
CALL spark_catalog.system.expire_snapshots
('icebergdb.noaa_iceberg', TIMESTAMP '2023-11-30 00:00:00.000')

Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff. The output will give a count of the number of data and metadata files deleted.

Drop the table and database

You can run the following Spark SQL to clean up the Iceberg tables and associated data in Amazon S3 from this exercise:

%%sql
DROP TABLE icebergdb.noaa_iceberg PURGE

Run the following Spark SQL to remove the database icebergdb:

%%sql
DROP DATABASE icebergdb

To learn more about all the operations you can perform on Iceberg tables using Spark for Athena, refer to Spark Queries and Spark Procedures in the Iceberg documentation.

Working with Apache Hudi tables

Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Apache Hudi tables.

Set up a notebook session

In order to use Apache Hudi in Athena, while creating or editing a session, select the Apache Hudi option by expanding the Apache Spark properties section.

This image shows the Apache Hudi properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section should be available in the SparkSQL_hudi.ipynb file to follow along.

Create a database and Hudi table

First, we create a database called hudidb that will be stored in the AWS Glue Data Catalog followed by Hudi table creation:

%%sql
CREATE DATABASE hudidb

We create a Hudi table pointing to a location in Amazon S3 where we will load the data. Note that the table is of copy-on-write type. It is defined by type= 'cow' in the table DDL. We have defined station and date as the multiple primary keys and preCombinedField as year. Also, the table is partitioned on year. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE hudidb.noaa_hudi(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string,
year string)
USING HUDI
PARTITIONED BY (year)
TBLPROPERTIES(
primaryKey = 'station, date',
preCombineField = 'year',
type = 'cow'
)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaahudi/'

Insert data into the table

Like with Iceberg, we use the INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq table created in the previous post:

%%sql
INSERT INTO hudidb.noaa_hudi select * from sparkblogdb.noaa_pq

Query the Hudi table

Now that the table is created, let’s run a query to find the maximum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Update data in the Hudi table

Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_hudi table:

%%sql
UPDATE hudidb.noaa_hudi
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We run the previous SELECT query to find the maximum recorded temperature for the 'Sea-Tac' location:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'Sea-Tac'
group by 1,2

Run time travel queries

We can use time travel queries in SQL on Athena to analyze past data snapshots. For example:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi timestamp as of '2023-12-01 23:53:43.100'
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

This query checks the Seattle Airport temperature data as of a specific time in the past. The timestamp clause lets us travel back without altering current data. Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff.

Optimize query speed with clustering

To improve query performance, you can perform clustering on Hudi tables using SQL in Spark for Athena:

%%sql
CALL run_clustering(table => 'hudidb.noaa_hudi', order => 'name')

Compact tables

Compaction is a table service employed by Hudi specifically in Merge On Read (MOR) tables to merge updates from row-based log files to the corresponding columnar-based base file periodically to produce a new version of the base file. Compaction is not applicable to Copy On Write (COW) tables and only applies to MOR tables. You can run the following query in Spark for Athena to perform compaction on MOR tables:

%%sql
CALL run_compaction(op => 'run', table => 'hudi_table_mor');

Drop the table and database

Run the following Spark SQL to remove the Hudi table you created and associated data from the Amazon S3 location:

%%sql
DROP TABLE hudidb.noaa_hudi PURGE

Run the following Spark SQL to remove the database hudidb:

%%sql
DROP DATABASE hudidb

To learn about all the operations you can perform on Hudi tables using Spark for Athena, refer to SQL DDL and Procedures in the Hudi documentation.

Working with Linux foundation Delta Lake tables

Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Delta Lake tables.

Set up a notebook session

In order to use Delta Lake in Spark for Athena, while creating or editing a session, select Linux Foundation Delta Lake by expanding the Apache Spark properties section.

This image shows the Delta Lake properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section should be available in the SparkSQL_delta.ipynb file to follow along.

Create a database and Delta Lake table

In this section, we create a database in the AWS Glue Data Catalog. Using following SQL, we can create a database called deltalakedb:

%%sql
CREATE DATABASE deltalakedb

Next, in the database deltalakedb, we create a Delta Lake table called noaa_delta pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE deltalakedb.noaa_delta(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING delta
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaadelta/'

Insert data into the table

We use an INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq table created in the previous post:

%%sql
INSERT INTO deltalakedb.noaa_delta select * from sparkblogdb.noaa_pq

You can also use CREATE TABLE AS SELECT to create a Delta Lake table and insert data from a source table in one query.

Query the Delta Lake table

Now that the data is inserted in the Delta Lake table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Update data in the Delta lake table

Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_delta table:

%%sql
UPDATE deltalakedb.noaa_delta
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We can run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac' location, and the result should be the same as earlier:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'Sea-Tac'
group by 1,2

Compact data files

In Spark for Athena, you can run OPTIMIZE on the Delta Lake table, which will compact the small files into larger files, so the queries are not burdened by the small file overhead. To perform the compaction operation, run the following query:

%%sql
OPTIMIZE deltalakedb.noaa_delta

Refer to Optimizations in the Delta Lake documentation for different options available while running OPTIMIZE.

Remove files no longer referenced by a Delta Lake table

You can remove files stored in Amazon S3 that are no longer referenced by a Delta Lake table and are older than the retention threshold by running the VACCUM command on the table using Spark for Athena:

%%sql
VACUUM deltalakedb.noaa_delta

Refer to Remove files no longer referenced by a Delta table in the Delta Lake documentation for options available with VACUUM.

Drop the table and database

Run the following Spark SQL to remove the Delta Lake table you created:

%%sql
DROP TABLE deltalakedb.noaa_delta

Run the following Spark SQL to remove the database deltalakedb:

%%sql
DROP DATABASE deltalakedb

Running DROP TABLE DDL on the Delta Lake table and database deletes the metadata for these objects, but doesn’t automatically delete the data files in Amazon S3. You can run the following Python code in the notebook’s cell to delete the data from the S3 location:

import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('<your-S3-bucket>')
bucket.objects.filter(Prefix="<prefix>/noaadelta/").delete()

To learn more about the SQL statements that you can run on a Delta Lake table using Spark for Athena, refer to the quickstart in the Delta Lake documentation.

Conclusion

This post demonstrated how to use Spark SQL in Athena notebooks to create databases and tables, insert and query data, and perform common operations like updates, compactions, and time travel on Hudi, Delta Lake, and Iceberg tables. Open table formats add ACID transactions, upserts, and deletes to data lakes, overcoming limitations of raw object storage. By removing the need to install separate connectors, Spark on Athena’s built-in integration reduces configuration steps and management overhead when using these popular frameworks for building reliable data lakes on Amazon S3. To learn more about selecting an open table format for your data lake workloads, refer to Choosing an open table format for your transactional data lake on AWS.


About the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.

Design a data mesh on AWS that reflects the envisioned organization

Post Syndicated from Claudia Chitu original https://aws.amazon.com/blogs/big-data/design-a-data-mesh-on-aws-that-reflects-the-envisioned-organization/

This post is written in collaboration with Claudia Chitu and Spyridon Dosis from ACAST.

Founded in 2014, Acast is the world’s leading independent podcast company, elevating podcast creators and podcast advertisers for the ultimate listening experience. By championing an independent and open ecosystem for podcasting, Acast aims to fuel podcasting with the tools and monetization needed to thrive.

The company uses AWS Cloud services to build data-driven products and scale engineering best practices. To ensure a sustainable data platform amid growth and profitability phases, their tech teams adopted a decentralized data mesh architecture.

In this post, we discuss how Acast overcame the challenge of coupled dependencies between teams working with data at scale by employing the concept of a data mesh.

The problem

With an accelerated growth and expansion, Acast encountered a challenge that resonates globally. Acast found itself with diverse business units and a vast amount of data generated across the organization. The existing monolith and centralized architecture was struggling to meet the growing demands of data consumers. Data engineers were finding it increasingly challenging to maintain and scale the data infrastructure, resulting in data access, data silos, and inefficiencies in data management. A key objective was to enhance the end-to-end user experience, starting from the business needs.

Acast needed to address these challenges in order to get to an operational scale, meaning a global maximum of the number of people that can independently operate and deliver value. In this case, Acast tried to tackle the challenge of this monolith structure and the high time to value for product teams, tech teams, end consumers. It’s worth mentioning that they also have other product and tech teams, including operational or business teams, without AWS accounts.

Acast has a variable number of product teams, continuously evolving by merging existing ones, splitting them, adding new people, or simply creating new teams. In the last 2 years, they have had between 10–20 teams, consisting of 4–10 people each. Each team owns at least two AWS accounts, up to 10 accounts, depending on the ownership. The majority of data produced by these accounts is used downstream for business intelligence (BI) purposes and in Amazon Athena, by hundreds of business users every day.

The solution Acast implemented is a data mesh, architected on AWS. The solution mirrors the organizational structure rather than an explicit architectural decision. As per the Inverse Conway Maneuver, Acast’s technology architecture displays isomorphism with the business architecture. In this case, the business users are enabled through the data mesh architecture to get faster time to insights and know directly who the domain specific owners are, speeding up collaboration. This will be further detailed when we discuss the AWS Identity and Access Management (IAM) roles used, because one of the roles is dedicated to the business group.

Parameters of success

Acast succeeded in bootstrapping and scaling a new team- and domain-oriented data product and its corresponding infrastructure and setup, resulting in less friction in gathering insights and happier users and consumers.

The success of the implementation meant assessing various aspects of the data infrastructure, data management, and business outcomes. They classified the metrics and indicators in the following categories:

  • Data usage – A clear understanding of who is consuming what data source, materialized with a mapping of consumers and producers. Discussions with users showed they were happier to have faster access to data in a simpler way, a more structured data organization, and a clear mapping of who the producer is. A lot of progress has been made to advance their data-driven culture (data literacy, data sharing, and collaboration across business units).
  • Data governance – With their service-level object stating when the data sources are available (among other details), teams know whom to notify and can do so in a shorter time when there is late data coming in or other issues with the data. With a data steward role in place, the ownership has been strengthened.
  • Data team productivity – Through engineering retrospectives, Acast found that their teams appreciate autonomy to make decisions regarding their data domains.
  • Cost and resource efficiency – This is an area where Acast observed a reduction in data duplication, and therefore cost reduction (in some accounts, removing the copy of data 100%), by reading data across accounts while enabling scaling.

Data mesh overview

A data mesh is a sociotechnical approach to build a decentralized data architecture by using a domain-oriented, self-serve design (in a software development perspective), and borrows Eric Evans’ theory of domain-driven design and Manuel Pais’ and Matthew Skelton’s theory of team topologies. It’s important to establish the context to understand what data mesh is because it sets the stage for the technical details that follow and can help you understand how the concepts discussed in this post fit into the broader framework of a data mesh.

To recap before diving deeper into Acast’s implementation, the data mesh concept is based on the following principles:

  • It’s domain driven, as opposed to pipelines as a first-class concern
  • It serves data as a product
  • It’s a good product that delights users (data is trustworthy, documentation is available, and it’s easily consumable)
  • It offers federated computational governance and decentralized ownership—a self-serve data platform

Domain-driven architecture

In Acast’s approach of owning the operational and analytical datasets, teams are structured with ownership based on domain, reading directly from the producer of the data, via an API or programmatically from Amazon S3 storage or using Athena as a SQL query engine. Some examples of Acast’s domains are presented in the following figure.

As illustrated in the preceding figure, some domains are loosely coupled to other domains’ operational or analytical endpoints, with a different ownership. Others might have stronger dependency, which is expected, for business (some podcasters can be also advertisers, creating sponsorship creatives and running campaigns for their own shows, or transacting ads using Acast’s software as a service).

Data as a product

Treating data as a product entails three key components: the data itself, the metadata, and the associated code and infrastructure. In this approach, teams responsible for generating data are referred to as producers. These producer teams possess in-depth knowledge about their consumers, understanding how their data product is utilized. Any changes planned by the data producers are communicated in advance to all consumers. This proactive notification ensures that downstream processes are not disrupted. By providing consumers with advance notice, they have sufficient time to prepare for and adapt to the upcoming changes, maintaining a smooth and uninterrupted workflow. The producers run a new version of the initial dataset in parallel, notify the consumers individually, and discuss with them their necessary timeframe to start consuming the new version. When all consumers are using the new version, the producers make the initial version unavailable.

Data schemas are inferred from the common agreed-upon format to share files between teams, which is Parquet in the case of Acast. Data can be shared in files, batched or stream events, and more. Each team has its own AWS account acting as an independent and autonomous entity with its own infrastructure. For orchestration, they use the AWS Cloud Development Kit (AWS CDK) for infrastructure as code (IaC) and AWS Glue Data Catalogs for metadata management. Users can also raise requests to producers to improve the way the data is presented or to enrich the data with new data points for generating a higher business value.

With each team owning an AWS account and a data catalog ID from Athena, it’s straightforward to see this through the lenses of a distributed data lake on top of Amazon S3, with a common catalog mapping all the catalogs from all the accounts.

At the same time, each team can also map other catalogs to their own account and use their own data, which they produce along with the data from other accounts. Unless it is sensitive data, the data can be accessed programmatically or from the AWS Management Console in a self-service manner without being dependent on the data infrastructure engineers. This is a domain-agnostic, shared way to self-serve data. The product discovery happens through the catalog registration. Using only a few standards commonly agreed upon and adopted across the company, for the purpose of interoperability, Acast addressed the fragmented silos and friction to exchange data or consume domain-agnostic data.

With this principle, teams get assurance that the data is secure, trustworthy, and accurate, and appropriate access controls are managed at each domain level. Moreover, on the central account, roles are defined for different types of permissions and access, using AWS IAM Identity Center permissions. All datasets are discoverable from a single central account. The following figure illustrates how it’s instrumented, where two IAM roles are assumed by two types of user (consumer) groups: one that has access to a limited dataset, which is restricted data, and one that has access to non-restricted data. There is also a way to assume any of these roles, for service accounts, such as those used by data processing jobs in Amazon Managed Workflows for Apache Airflow (Amazon MWAA), for example.

How Acast solved for high alignment and a loosely coupled architecture

The following diagram shows a conceptual architecture of how Acast’s teams are organizing data and collaborating with each other.

Acast used the Well-Architected Framework for the central account to improve its practice running analytical workloads in the cloud. Through the lenses of the tool, Acast was able to address better monitoring, cost optimization, performance, and security. It helped them understand the areas where they could improve their workloads and how to address common issues, with automated solutions, as well as how to measure the success, defining KPIs. It saved them time to get the learnings that otherwise would have been taking longer to find. Spyridon Dosis, Acast’s Information Security Officer, shares, “We are happy AWS is always ahead with releasing tools that enable the configuration, assessment, and review of multi-account setup. This is a big plus for us, working in a decentralized organization.” Spyridon also adds, “A very important concept we value is the AWS security defaults (e.g. default encryption for S3 buckets).”

In the architecture diagram, we can see that each team can be a data producer, except the team owning the central account, which serves as the central data platform, modeling the logic from multiple domains to paint the full business picture. All other teams can be data producers or data consumers. They can connect to the central account and discover datasets via the cross-account AWS Glue Data Catalog, analyze them in the Athena query editor or with Athena notebooks, or map the catalog to their own AWS account. Access to the central Athena catalog is implemented with IAM Identity Center, with roles for open data and restricted data access.

For non-sensitive data (open data), Acast uses a template where the datasets are by default open to the entire organization to read from, using a condition to provide the organization-assigned ID parameter, as shown in the following code snippet:

{
    "Version": "2012-10-17",
    "Statement": [
        
        {
            "Effect": "Allow",
            "Principal": "*",
            "Action": [
               "s3:GetObject*",
                "s3:GetBucket*",
                "s3:List*"  
            ],
            "Resource": [
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET",
                "arn:aws:s3:::DOC-EXAMPLE-BUCKET/*"
            ],
            "Condition": {
                "StringEquals": {
                    "aws:PrincipalOrgID": "ORG-ID-NUMBER"
                }
            }
        }
    ]
}

When handling sensitive data like financials, the teams use a collaborative data steward model. The data steward works with the requester to evaluate access justification for the intended use case. Together, they determine appropriate access methods to meet the need while maintaining security. This could include IAM roles, service accounts, or specific AWS services. This approach enables business users outside the tech organization (which means they don’t have an AWS account) to independently access and analyze the information they need. By granting access through IAM policies on AWS Glue resources and S3 buckets, Acast provides self-serve capabilities while still governing delicate data through human review. The data steward role has been valuable for understanding use cases, assessing security risks, and ultimately facilitating access that accelerates the business through analytical insights.

For Acast’s use case, granular row- or column-level access controls weren’t needed, so the approach sufficed. However, other organizations may require more fine-grained governance over sensitive data fields. In those cases, solutions like AWS Lake Formation could implement permissions needed, while still providing a self-serve data access model. For more information, refer to Design a data mesh architecture using AWS Lake Formation and AWS Glue.

At the same time, teams can read from other producers directly, from Amazon S3 or via an API, keeping the dependency at minimum, which enhances the velocity of development and delivery. Therefore, an account can be a producer and a consumer in parallel. Each team is autonomous, and is accountable for their own tech stack.

Additional learnings

What did Acast learn? So far, we’ve discussed that the architectural design is an effect of the organizational structure. Because the tech organization consists of multiple cross-functional teams, and it’s straightforward to bootstrap a new team, following the common principles of data mesh, Acast learned this doesn’t go seamlessly every time. To set up a fully new account in AWS, teams go through the same journey, but slightly different, considering their own set of particularities.

This can create certain frictions, and it’s difficult to get all data producing teams to reach a high maturity of being data producers. This can be explained by the different data competencies in those cross-functional teams and not being dedicated data teams.

By implementing the decentralized solution, Acast effectively tackled the scalability challenge by adapting their teams to align with evolving business needs. This approach ensures high decoupling and alignment. Furthermore, they strengthened ownership, significantly reducing the time needed to identify and resolve issues because the upstream source is readily known and easily accessible with specified SLAs. The volume of data support inquiries has seen a reduction of over 50%, because business users are empowered to gain faster insights. Notably, they successfully eliminated tens of terabytes of redundant storage that were previously copied solely to fulfill downstream requests. This achievement was made possible through the implementation of cross-account reading, leading to the removal of associated development and maintenance costs for these pipelines.

Conclusion

Acast used the Inverse Conway Maneuver law and employed AWS services where each cross-functional product team has its own AWS account to build a data mesh architecture that allows scalability, high ownership, and self-service data consumption. This has been working well for the company, regarding how data ownership and operations were approached, to meet their engineering principles, resulting in having the data mesh as an effect rather than a deliberate intent. For other organizations, the desired data mesh might look different and the approach might have other learnings.

To conclude, a modern data architecture on AWS allows you to efficiently construct data products and data mesh infrastructure at a low cost without compromising on performance.

The following are some examples of AWS services you can use to design your desired data mesh on AWS:


About the Authors

Claudia Chitu is a Data strategist and an influential leader in the Analytics space. Focused on aligning data initiatives with the overall strategic goals of the organization, she employs data as a guiding force for long-term planning and sustainable growth.

Spyridon Dosis is an Information Security Professional in Acast. Spyridon supports the organization in designing, implementing and operating its services in a secure manner protecting the company and users’ data.

Srikant Das is an Acceleration Lab Solutions Architect at Amazon Web Services. He has over 13 years of experience in Big Data analytics and Data Engineering, where he enjoys building reliable, scalable, and efficient solutions. Outside of work, he enjoys traveling and blogging his experiences in social media.

Orchestrate Amazon EMR Serverless Spark jobs with Amazon MWAA, and data validation using Amazon Athena

Post Syndicated from Gaurav Parekh original https://aws.amazon.com/blogs/big-data/orchestrate-amazon-emr-serverless-spark-jobs-with-amazon-mwaa-and-data-validation-using-amazon-athena/

As data engineering becomes increasingly complex, organizations are looking for new ways to streamline their data processing workflows. Many data engineers today use Apache Airflow to build, schedule, and monitor their data pipelines.

However, as the volume of data grows, managing and scaling these pipelines can become a daunting task. Amazon Managed Workflows for Apache Airflow (Amazon MWAA) can help simplify the process of building, running, and managing data pipelines. By providing Apache Airflow as a fully managed platform, Amazon MWAA allows data engineers to focus on building data workflows instead of worrying about infrastructure.

Today, businesses and organizations require cost-effective and efficient ways to process large amounts of data. Amazon EMR Serverless is a cost-effective and scalable solution for big data processing that can handle large volumes of data. The Amazon Provider in Apache Airflow comes with EMR Serverless operators and is already included in Amazon MWAA, making it easy for data engineers to build scalable and reliable data processing pipelines. You can use EMR Serverless to run Spark jobs on the data, and use Amazon MWAA to manage the workflows and dependencies between these jobs. This integration can also help reduce costs by automatically scaling the resources needed to process data.

Amazon Athena is a serverless, interactive analytics service built on open-source frameworks, supporting open-table and file formats. You can use standard SQL to interact with data. Athena, a serverless and interactive analytics service, makes this possible without the need to manage complex infrastructure.

In this post, we use Amazon MWAA, EMR Serverless, and Athena to build a complete end-to-end data processing pipeline.

Solution overview

The following diagram illustrates the solution architecture.

The workflow includes the following steps:

  1. Create an Amazon MWAA workflow that retrieves data from your input Amazon Simple Storage Service (Amazon S3) bucket.
  2. Use EMR Serverless to process the data stored in Amazon S3. EMR Serverless automatically scales up or down based on the workload, so you don’t need to worry about provisioning or managing any infrastructure.
  3. Use EMR Serverless to transform the data using PySpark code and then store the transformed data back in your S3 bucket.
  4. Use Athena to create an external table based on the S3 dataset and run queries to analyze the transformed data. Athena uses the AWS Glue Data Catalog to store the table metadata.

Prerequisites

You should have the following prerequisites:

Data preparation

To illustrate using EMR Serverless jobs with Apache Spark via Amazon MWAA and data validation using Athena, we use the publicly available NYC taxi dataset. Download the following datasets to your local machine:

  • Green taxi and Yellow taxi trip records – Trip records for yellow and green taxis, which include information such as pick-up and drop-off dates and times, locations, trip distances, and payment types. In our example, we use the latest Parquet files for 2022.
  • Dataset for Taxi zone lookup – A dataset that provides location IDs and corresponding zone details for taxis.

In later steps, we upload these datasets to Amazon S3.

Create solution resources

This section outlines the steps for setting up data processing and transformation.

Create an EMR Serverless application

You can create one or more EMR Serverless applications that use open source analytics frameworks like Apache Spark or Apache Hive. Unlike EMR on EC2, you do not need to delete or terminate EMR Serverless applications. EMR Serverless application is only a definition and once created, can be re-used as long as needed. This makes the MWAA pipeline simpler as now you just have to submit jobs to a pre-created EMR Serverless application.

By default, EMR Serverless application will auto-start on job submission and auto-stop when idle for 15 minutes by default to ensure cost efficiency. You can modify the amount of idle time or choose to turn the feature off.

To create an application using EMR Serverless console, follow the instructions in “Create an EMR Serverless application”. Note down the application ID as we will use it in following steps.

Create an S3 bucket and folders

Complete the following steps to set up your S3 bucket and folders:

  1. On the Amazon S3 console, create an S3 bucket to store the dataset.
  2. Note the name of the S3 bucket to use in later steps.
  3. Create an input_data folder for storing input data.
  4. Within that folder, create three separate folders, one for each dataset: green, yellow, and zone_lookup.

You can download and work with the latest datasets available. For our testing, we use the following files:

  • The green/ folder has the file green_tripdata_2022-06.parquet
  • The yellow/ folder has the file yellow_tripdata_2022-06.parquet
  • The zone_lookup/ folder has the file taxi_zone_lookup.csv

Set up the Amazon MWAA DAG scripts

Complete the following steps to set up your DAG scripts:

  1. Download the following scripts to your local machine:
    1. requirements.txt – A Python dependency is any package or distribution that is not included in the Apache Airflow base install for your Apache Airflow version on your Amazon MWAA environment. For this post, we use Boto3 version >=1.23.9.
    2. blog_dag_mwaa_emrs_ny_taxi.py – This script is a part of the Amazon MWAA DAG and consists of the following tasks: yellow_taxi_zone_lookup, green_taxi_zone_lookup, and ny_taxi_summary,. These tasks involve running Spark jobs to lookup taxi zones, and generating a data summary .
    3. green_zone.py – This PySpark script reads data files for green taxi rides and zone lookup, performs a join operation to combine them, and generates an output file containing green taxi rides with zone information. It utilizes temporary views for the df_green and df_zone data frames, performs column-based joins, and aggregates data like passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_green_zone, as Parquet files.
    4. yellow_zone.py – This PySpark script processes yellow taxi ride and zone lookup data files by joining them to generate an output file containing yellow taxi rides with zone information. The script accepts a user-provided S3 bucket name and initiates a Spark session with the application name yellow_zone. It reads the yellow taxi files and zone lookup file from the specified S3 bucket, creates temporary views, performs a join based on location ID, and calculates statistics such as passenger count, trip distance, and fare amount. Lastly, it creates the output_data folder in the specified S3 bucket to write the resulting data frame, df_yellow_zone, as Parquet files.
    5. ny_taxi_summary.py – This PySpark script processes the green_zone and yellow_zone files to aggregate statistics on taxi rides, grouping data by service zones and location IDs. It requires an S3 bucket name as a command line argument, creates a SparkSession named ny_taxi_summary, reads the files from S3, performs a join, and generates a new data frame named ny_taxi_summary. It creates an output_data folder in the specified S3 bucket to write the resulting data frame to new Parquet files.
  2. On your local machine, update the blog_dag_mwaa_emrs_ny_taxi.py script with the following information:
    • Update your S3 bucket name in the following two lines:
      S3_LOGS_BUCKET = "<<bucket_name_here>>"
      S3_BASE_BUCKET = "<<bucket_name_here>>"

    • Update your role name ARN:
      JOB_ROLE_ARN = “<<emr_serverless_execution_role ARN here>>”
      e.g. arn:aws:iam::<<ACCOUNT_ID>>:role/<<ROLE_NAME>>

    • Update EMR Serverless Application ID. Use the Application ID created earlier.
      EMR_SERVERLESS_APPLICATION_ID  = “<<emr serverless application ID here>>

  3. Upload the requirements.txt file to the S3 bucket created earlier
  4. In the S3 bucket, create a folder named dags and upload the updated blog_dag_mwaa_emrs_ny_taxi.py file from your local machine.
  5. On the Amazon S3 console, create a new folder named scripts inside the S3 bucket and upload the scripts to this folder from your local machine.

Create an Amazon MWAA environment

To create an Airflow environment, complete the following steps:

  1. On the Amazon MWAA console, choose Create environment.
  2. For Name, enter mwaa_emrs_athena_pipeline.
  3. For Airflow version, choose the latest version (for this post, 2.5.1).
  4. For S3 Bucket, enter the path to your S3 bucket.
  5. For DAGs folder, enter the path to your dags folder.
  6. For Requirements file, enter the path to the requirements.txt file.
  7. Choose Next.
  8. For Virtual private cloud (VPC), choose a VPC that has a minimum of two private subnets.

This will populate two of the private subnets in your VPC.

  1. Under Web server access, select Public network.

This allows the Apache Airflow UI to be accessed over the internet by users granted access to the IAM policy for your environment.

  1. For Security group(s), select Create new security group.
  2. For Environment class, select mw1.small.
  3. For Execution role, choose Create a new role.
  4. For Role name, enter a name.
  5. Leave the other configurations as default and choose Next.
  6. On the next page, choose Create environment.

It may take about 20–30 minutes to create your Amazon MWAA environment.

  1. When the Amazon MWAA environment status changes to Available, navigate to the IAM console and update cluster execution role to add pass role privileges to emr_serverless_execution_role.

Trigger the Amazon MWAA DAG

To trigger the DAG, complete the following steps:

  1. On the Amazon MWAA console, choose Environments in the navigation pane.
  2. Open your environment and choose Open Airflow UI.
  3. Select blog_dag_mwaa_emr_ny_taxi, choose the play icon, and choose Trigger DAG.
  4. When the DAG is running, choose the DAG blog_dag_mwaa_emrs_ny_taxi and choose Graph to locate your DAG run workflow.

The DAG will take approximately 4–6 minutes to run all the scripts. You will see all the complete tasks and the overall status of the DAG will show as success.

To rerun the DAG, remove s3://<<your_s3_bucket here >>/output_data/.

Optionally, to understand how Amazon MWAA runs these tasks, choose the task you want to inspect.

Choose Run to view the task run details.

The following screenshot shows an example of the task logs.

If you like to dive deep in the execution logs, then on the EMR Serverless console, navigate to “Applications”. The Apache Spark driver logs will indicate the initiation of your job along with the details for executors, stages and tasks that were created by EMR Serverless. These logs can be helpful to monitor your job progress and troubleshoot failures.

By default, EMR Serverless will store application logs securely in Amazon EMR managed storage for a period of 30 days. However, you can also specify Amazon S3 or Amazon CloudWatch as your log delivery options during job submission.

Validate the final result set with Athena

Let’s validate the data loaded by the process using Athena SQL queries.

  1. On the Athena console, choose Query editor in the navigation pane.
  2. If you’re using Athena for the first time, under Settings, choose Manage and enter the S3 bucket location that you created earlier (<S3_BUCKET_NAME>/athena), then choose Save.
  3. In the query editor, enter the following query to create an external table:
CREATE EXTERNAL TABLE default.ny_taxi_summary(
  pu_service_zone string, 
  pulocationid bigint, 
  do_service_zone string, 
  dolocationid bigint, 
  passenger_count bigint, 
  trip_distance double, 
  fare_amount double, 
  extra double, 
  mta_tax double, 
  tip_amount double, 
  tolls_amount double, 
  improvement_surcharge double, 
  total_amount double, 
  congestion_surcharge double, 
  airport_fee double)
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION
  's3://<<YOUR-S3-BUCKET Here>>/output_data/ny_taxi_summary/' -- *** Change bucket name to your bucket***
TBLPROPERTIES (
  'classification'='parquet', 
  'compressionType'='none');


Run the following query on the recently created ny_taxi_summary table to retrieve the first 10 rows to validate the data:

select * from default.ny_taxi_summary limit 10;

Clean up

To prevent future charges, complete the following steps:

  1. On the Amazon S3 console, delete the S3 bucket you created to store the Amazon MWAA DAG, scripts, and logs.
  2. On the Athena console, drop the table you created:
    drop table default.ny_taxi_summary;

  3. On the Amazon MWAA console, navigate to the environment that you created and choose Delete.
  4. On the EMR Studio console, delete the application.

To delete the application, navigate to the List applications page. Select the application that you created and choose Actions → Stop to stop the application. After the application is in the STOPPED state, select the same application and choose Actions → Delete.

Conclusion

Data engineering is a critical component of many organizations, and as data volumes continue to grow, it’s essential to find ways to streamline data processing workflows. The combination of Amazon MWAA, EMR Serverless, and Athena provides a powerful solution to build, run, and manage data pipelines efficiently. With this end-to-end data processing pipeline, data engineers can easily process and analyze large amounts of data quickly and cost-effectively without the need to manage complex infrastructure. The integration of these AWS services provides a robust and scalable solution for data processing, helping organizations make informed decisions based on their data insights.

Now that you’ve seen how to submit Spark jobs on EMR Serverless via Amazon MWAA, we encourage you to use Amazon MWAA to create a workflow that will run PySpark jobs via EMR Serverless.

We welcome your feedback and inquiries. Please feel free to reach out to us if you have any questions or comments.


About the authors

Rahul Sonawane is a Principal Analytics Solutions Architect at AWS with AI/ML and Analytics as his area of specialty.

Gaurav Parekh is a Solutions Architect helping AWS customers build large scale modern architecture. He specializes in data analytics and networking. Outside of work, Gaurav enjoys playing cricket, soccer and volleyball.


Audit History

December 2023: This post was reviewed for technical accuracy by Santosh Gantaram, Sr. Technical Account Manager.

Enhance query performance using AWS Glue Data Catalog column-level statistics

Post Syndicated from Sandeep Adwankar original https://aws.amazon.com/blogs/big-data/enhance-query-performance-using-aws-glue-data-catalog-column-level-statistics/

Today, we’re making available a new capability of AWS Glue Data Catalog that allows generating column-level statistics for AWS Glue tables. These statistics are now integrated with the cost-based optimizers (CBO) of Amazon Athena and Amazon Redshift Spectrum, resulting in improved query performance and potential cost savings.

Data lakes are designed for storing vast amounts of raw, unstructured, or semi-structured data at a low cost, and organizations share those datasets across multiple departments and teams. The queries on these large datasets read vast amounts of data and can perform complex join operations on multiple datasets. When talking with our customers, we learned that one the challenging aspect of data lake performance is how to optimize these analytics queries to execute faster.

The data lake performance optimization is especially important for queries with multiple joins and that is where cost-based optimizers helps the most. In order for CBO to work, column statistics need to be collected and updated based on changes in the data. We’re launching capability of generating column-level statistics such as number of distinct, number of nulls, max, and min on files such as Parquet, ORC, JSON, Amazon ION, CSV, XML on AWS Glue tables. With this launch, customers now have integrated end-to-end experience where statistics on Glue tables are collected and stored in the AWS Glue Catalog, and made available to analytics services for improved query planning and execution.

Using these statistics, cost-based optimizers improves query run plans and boosts the performance of queries run in Amazon Athena and Amazon Redshift Spectrum. For example, CBO can use column statistics such as number of distinct values and number of nulls to improve row prediction. Row prediction is the number of rows from a table that will be returned by a certain step during the query planning stage. The more accurate the row predictions are, the more efficient query execution steps are. This leads to faster query execution and potentially reduced cost. Some of the specific optimizations that CBO can employ include join reordering and push-down of aggregations based on the statistics available for each table and column.

For customers using data mesh with AWS Lake Formation permissions, tables from different data producers are cataloged in the centralized governance accounts. As they generate statistics on tables on centralized catalog and share those tables with consumers, queries on those tables in consumer accounts will see query performance improvements automatically. In this post, we’ll demonstrate the capability of AWS Glue Data Catalog to generate column statistics for our sample tables.

Solution overview

To demonstrate the effectiveness of this capability, we employ the industry-standard TPC-DS 3 TB dataset stored in an Amazon Simple Storage Service (Amazon S3) public bucket. We’ll compare the query performance before and after generating column statistics for the tables, by running queries in Amazon Athena and Amazon Redshift Spectrum. We are providing queries that we used in this post and we encourage to try out your own queries following workflow as illustrated in the following details.

The workflow consists of the following high level steps:

  1. Cataloging the Amazon S3 Bucket: Utilize AWS Glue Crawler to crawl the designated Amazon S3 bucket, extracting metadata, and seamlessly storing it in the AWS Glue data catalog. We’ll query these tables using Amazon Athena and Amazon Redshift Spectrum.
  2. Generating column statistics: Employ the enhanced capabilities of AWS Glue Data Catalog to generate comprehensive column statistics for the crawled data, thereby providing valuable insights into the dataset.
  3. Querying with Amazon Athena and Amazon Redshift Spectrum: Evaluate the impact of column statistics on query performance by utilizing Amazon Athena and Amazon Redshift Spectrum to execute queries on the dataset.

The following diagram illustrates the solution architecture.

Walkthrough

To implement the solution, we complete the following steps:

  1. Set up resources with AWS CloudFormation.
  2. Run AWS Glue Crawler on Public Amazon S3 bucket to list the 3TB TPC-DS dataset.
  3. Run queries on Amazon Athena and Amazon Redshift and note down query duration
  4. Generate statistics for AWS Glue Data Catalog tables
  5. Run queries on Amazon Athena and Amazon Redshift and compare query duration with previous run
  6. Optional: Schedule AWS Glue column statistics jobs using AWS Lambda and the Amazon EventBridge Scheduler

Set up resources with AWS CloudFormation

This post includes an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. The template generates the following resources:

  • An Amazon Virtual Private Cloud (Amazon VPC), public subnet, private subnets and route tables.
  • An Amazon Redshift Serverless workgroup and namespace.
  • An AWS Glue crawler to crawl the public Amazon S3 bucket and create a table for the Glue Data Catalog for TPC-DS dataset
  • AWS Glue catalog databases and tables
  • An Amazon S3 bucket to store athena result.
  • AWS Identity and Access Management (AWS IAM) users and policies.
  • AWS Lambda and Amazon Event Bridge scheduler to schedule the AWS Glue Column statistics

To launch the AWS CloudFormation stack, complete the following steps:

Note: The AWS Glue data catalog tables are generated using the public bucket s3://blogpost-sparkoneks-us-east-1/blog/BLOG_TPCDS-TEST-3T-partitioned/, hosted in the us-east-1 region. If you intend to deploy this AWS CloudFormation template in a different region, it is necessary to either copy the data to the corresponding region or share the data within your deployed region for it to be accessible from Amazon Redshift.

  1. Log in to the AWS Management Console as AWS Identity and Access Management (AWS IAM) administrator.
  2. Choose Launch Stack to deploy a AWS CloudFormation template.
  3. Choose Next.
  4. On the next page, keep all the option as default or make appropriate changes based on your requirement choose Next.
  5. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  6. Choose Create.

This stack can take around 10 minutes to complete, after which you can view the deployed stack on the AWS CloudFormation console.

Run the AWS Glue Crawlers created by the AWS CloudFormation stack

To run your crawlers, complete the following steps:

  1. On the AWS Glue console to AWS Glue Console, choose Crawlers under Data Catalog in the navigation pane.
  2. Locate and run two crawlers tpcdsdb-without-stats and tpcdsdb-with-stats. It may take few mins to complete.

Once the crawler completes successfully, it would create two identical databases tpcdsdbnostats and tpcdsdbwithstats. The tables in tpcdsdbnostats will have No Stats and we’ll use them as reference. We generate statistics on tables in tpcdsdbwithstats. Please verify that you have those two databases and underlying tables from the AWS Glue Console. The tpcdsdbnostats database will look like below. At this time there are no statistics generated on these tables.

Run provided query using Amazon Athena on no-stats tables

To run your query in Amazon Athena on tables without statistics, complete the following steps:

  1. Download the athena queries from here.
  2. On the Amazon Athena Console, choose the provided query one at a time for tables in database tpcdsdbnostats.
  3. Run the query and note down the Run time for each query.

Run provided query using Amazon Redshift Spectrum on no-stats tables

To run your query in Amazon Redshift, complete the following steps:

  1. Download the Amazon Redshift queries from here.
  2. On the Redshift query editor v2, execute the Redshift Query for tables without stats section from downloaded query.
  3. Run the query and note down the query execution of each query.

Generate statistics on AWS Glue Catalog tables

To generate statistics on AWS Glue Catalog tables, complete the following steps:

  1. Navigate to the AWS Glue Console and choose the databases under Data Catalog.
  2. Click on tpcdsdbwithstats database and it will list all the available tables.
  3. Select any of these tables (e.g., call_center).
  4. Go to Column statistics – new tab and choose Generate statistics.
  5. Keep the default option. Under Choose columns keep Table (All columns) and Under Row sampling options Keep All rows, Under IAM role choose AWSGluestats-blog and select Generate statistics.

You’ll be able to see status of the statistics generation run as shown in the following illustration:

After generate statistics on AWS Glue Catalog tables, you should be able to see detailed column statistics for that table:

Reiterate steps 2–5 to generate statistics for all necessary tables, such as catalog_sales, catalog_returns, warehouse, item, date_dim, store_sales, customer, customer_address, web_sales, time_dim, ship_mode, web_site, web_returns. Alternatively, you can follow the “Schedule AWS Glue Statistics Runs” section near the end of this blog to generate statistics for all tables. Once done, assess query performance for each query.

Run provided query using Athena Console on stats tables

  1. On the Amazon Athena console, execute the Athena Query for tables with stats section from downloaded query.
  2. Run and note down the query execution of each query.

In our sample run of the queries on the tables, we observed the query execution time as per the below table. We saw clear improvement in the query performance, ranging from 13 to 55%.

Athena query time improvement

TPC-DS 3T Queries without glue stats (sec) with glue stats (sec) performance improvement (%)
Query 2 33.62 15.17 55%
Query 4 132.11 72.94 45%
Query 14 134.77 91.48 32%
Query 28 55.99 39.36 30%
Query 38 29.32 25.58 13%

Run the provided query using Amazon Redshift Spectrum on statistics tables

  1. On the Amazon Redshift query editor v2, execute the Redshift Query for tables with stats section from downloaded query.
  2. Run the query and note down the query execution of each query.

In our sample run of the queries on the tables, we observed the query execution time as per the below table. We saw clear improvement in the query performance, ranging from 13 to 89%.

Amazon Redshift Spectrum query time improvement

TPC-DS 3T Queries without glue stats (sec) with glue stats (sec) performance improvement (%)
Query 40 124.156 13.12 89%
Query 60 29.52 16.97 42%
Query 66 18.914 16.39 13%
Query 95 308.806 200 35%
Query 99 20.064 16 20%

Schedule AWS Glue statistics Runs

In this segment of the post, we’ll guide you through the steps of scheduling AWS Glue column statistics runs using AWS Lambda and the Amazon EventBridge Scheduler. To streamline this process, a AWS Lambda function and an Amazon EventBridge scheduler were created as part of the CloudFormation stack deployment.

  1. AWS Lambda function setup:

To begin, we utilize an AWS Lambda function to trigger the execution of the AWS Glue column statistics job. The AWS Lambda function invokes the start_column_statistics_task_run API through the boto3 (AWS SDK for Python) library. This sets the groundwork for automating the column statistics update.

Let’s explore the AWS Lambda function:

    • Go to the AWS Glue Lambda Console.
    • Select Functions and locate the GlueTableStatisticsFunctionv1.
    • For a clearer understanding of the AWS Lambda function, we recommend reviewing the code in the Code section and examining the environment variables under Configuration.
  1. Amazon EventBridge scheduler configuration

The next step involves scheduling the AWS Lambda function invocation using the Amazon EventBridge Scheduler. The scheduler is configured to trigger the AWS Lambda function daily at a specific time – in this case, 08:00 PM. This ensures that the AWS Glue column statistics job runs on a regular and predictable basis.

Now, let’s explore how you can update the schedule:

Cleaning up

To avoid unwanted charges to your AWS account, delete the AWS resources:

  1. Sign into the AWS CloudFormation console as the AWS IAM administrator used for creating the AWS CloudFormation stack.
  2. Delete the AWS CloudFormation stack you created.

Conclusion

In this post, we showed you how you can use AWS Glue Data Catalog to generate column-level statistics for AWS Glue tables. These statistics are now integrated with cost-based optimizer from Amazon Athena and Amazon Redshift Spectrum, resulting in improved query performance and potential costs savings. Refer to Docs for support for Glue Catalog Statistics across various AWS analytical services.

If you have questions or suggestions, submit them in the comments section.


About the Authors

Sandeep Adwankar is a Senior Technical Product Manager at AWS. Based in the California Bay Area, he works with customers around the globe to translate business and technical requirements into products that enable customers to improve how they manage, secure, and access data.

Navnit Shukla serves as an AWS Specialist Solution Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled Data Wrangling on AWS. He can be reached via LinkedIn.

AWS Weekly Roundup – EC2 DL2q instances, PartyRock, Amplify’s 6th birthday, and more – November 20, 2023

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/aws-weekly-roundup-ec2-dl2q-instances-partyrock-amplifys-6th-birthday-and-more-november-20-2023/

Last week I saw an astonishing 160+ new service launches. There were so many updates that we decided to publish a weekly roundup again. This continues the same innovative pace of the previous week as we are getting closer to AWS re:Invent 2023.

Our News Blog team is also finalizing new blog posts for re:Invent to introduce awesome launches with service teams for your reading pleasure. Jeff Barr shared The Road to AWS re:Invent 2023 to explain our blogging journey and process. Please stay tuned in the next week!

Last week’s launches
Here are some of the launches that caught my attention last week:

Amazon EC2 DL2q instances – New DL2q instances are powered by Qualcomm AI 100 Standard accelerators and are the first to feature Qualcomm’s AI technology in the public cloud. With eight Qualcomm AI 100 Standard accelerators and 128 GiB of total accelerator memory, you can run popular generative artificial intelligence (AI) applications and extend to edge devices across smartphones, autonomous driving, personal compute, and extended reality headsets to develop and validate these AI workloads before deploying.

PartyRock for Amazon Bedrock – We introduced PartyRock, a fun and intuitive hands-on, generative AI app-building playground powered by Amazon Bedrock. You can experiment, learn all about prompt engineering, build mini-apps, and share them with your friends—all without writing any code or creating an AWS account.

You also can now access the Meta Llama 2 Chat 13B foundation model and Cohere Command Light, Embed English, and multilingual models for Amazon Bedrock.

AWS Amplify celebrates its sixth birthday – We announced six new launches; a new documentation site, support for Next.js 14 with our hosting and JavaScript library, added custom token providers and an automatic React Native social sign-in update to Amplify Auth, new ChangePassword and DeleteUser account settings components, and updated all Amplify UI packages to use new Amplify JavaScript v6. You can also use wildcard subdomains when using a custom domain with your Amplify application deployed to AWS Amplify Hosting.

Amplify docs site UI

Also check out other News Blog posts about major launches published in the past week:

Other AWS service launches
Here are some other bundled feature launches per AWS service:

Amazon Athena  – You can use a new cost-based optimizer (CBO) to enhance query performance based on table and column statistics, collected by AWS Glue Data Catalog and Athena JDBC 3.x driver, a new alternative that supports almost all authentication plugins. You can also use Amazon EMR Studio to develop and run interactive queries on Amazon Athena.

Amazon CloudWatch – You can use a new CloudWatch metric called EBS Stalled I/O Check to monitor the health of your Amazon EBS volumes, the regular expression for Amazon CloudWatch Logs Live Tail filter pattern syntax to search and match relevant log events, observability of SAP Sybase ASE database in CloudWatch Application Insights, and up to two stats commands in a Log Insights query to perform aggregations on the results.

Amazon CodeCatalyst – You can connect to a Amazon Virtual Private Cloud (Amazon VPC) from CodeCatalyst Workflows, provision infrastructure using Terraform within CodeCatalyst Workflows, access CodeCatalyst with your workforce identities configured in IAM Identity Center, and create teams made up of members of the CodeCatalyst space.

Amazon Connect – You can use a pre-built queue performance dashboard and Contact Lens conversational analytics dashboard to view and compare real-time and historical aggregated queue performance. You can use quick responses for chats, previously written formats such as typing in ‘/#greet’ to insert a personalized response, and scanning attachments to detect malware or other unwanted content.

AWS Glue – AWS Glue for Apache Spark added new six database connectors: Teradata, SAP HANA, Azure SQL, Azure Cosmos DB, Vertica, and MongoDB, as well as the native connectivity to Amazon OpenSearch Service.

AWS Lambda – You can see single pane view of metrics, logs, and traces in the AWS Lambda console and advanced logging controls to natively capture logs in JSON structured format. You can view the SAM template on the Lambda console and export the function’s configuration to AWS Application Composer. AWS Lambda also supports Java 21 and NodeJS 20 versions built on the new Amazon Linux 2023 runtime.

AWS Local Zones in Dallas – You can enable the new Local Zone in Dallas, Texas, us-east-1-dfw-2a, with Amazon EC2 C6i, M6i, R6i, C6gn, and M6g instances and Amazon EBS volume types gp2, gp3, io1, sc1, and st1. You can also access Amazon ECS, Amazon EKS, Application Load Balancer, and AWS Direct Connect in this new Local Zone to support a broad set of workloads at the edge.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) – You can standardize access control to Kafka resources using AWS Identity and Access Management (IAM) and build Kafka clients for Amazon MSK Serverless written in all programming languages. These are open source client helper libraries and code samples for popular languages, including Java, Python, Go, and JavaScript. Also, Amazon MSK now supports an enhanced version of Apache Kafka 3.6.0 that offers generally available Tiered Storage and automatically sends you storage capacity alerts when you are at risk of exhausting your storage.

Amazon OpenSearch Service Ingestion – You can migrate your data from Elasticsearch version 7.x clusters to the latest versions of Amazon OpenSearch Service and use persistent buffering to protect the durability of incoming data.

Amazon RDS –Amazon RDS for MySQL now supports creating active-active clusters using the Group Replication plugin, upgrading MySQL 5.7 snapshots to MySQL 8.0, and Innovation Release version of MySQL 8.1.

Amazon RDS Custom for SQL Server extends point-in-time recovery support for up to 1,000 databases, supports Service Master Key Retention to use transparent data encryption (TDE), table- and column-level encryption, DBMail and linked servers, and use SQL Server Developer edition with the bring your own media (BYOM).

Additionally, Amazon RDS Multi-AZ deployments with two readable standbys now supports minor version upgrades and system maintenance updates with typically less than one second of downtime when using Amazon RDS Proxy.

AWS Partner Central – You can use an improved user experience in AWS Partner Central to build and promote your offerings and the new Investments tab in the Partner Analytics Dashboard to gain actionable insights. You can now link accounts and associated users between Partner Central and AWS Marketplace and use an enhanced co-sell experience with APN Customer Engagements (ACE) manager.

Amazon QuickSight – You can programmatically manage user access and custom permissions support for roles to restrict QuickSight functionality to the QuickSight account for IAM Identity Center and Active Directory using APIs. You can also use shared restricted folders, a Contributor role and support for data source asset types in folders and the Custom Week Start feature, an addition designed to enhance the data analysis experience for customers across diverse industries and social contexts.

AWS Trusted Advisor – You can use new APIs to programmatically access Trusted Advisor best practices checks, recommendations, and prioritized recommendations and 37 new Amazon RDS checks that provide best practices guidance by analyzing DB instance configuration, usage, and performance data.

There’s a lot more launch news that I haven’t covered. See AWS What’s New for more details.

See you virtually in AWS re:Invent
AWS re:Invent 2023Next week we’ll hear the latest from AWS, learn from experts, and connect with the global cloud community in Las Vegas. If you come, check out the agenda, session catalog, and attendee guides before your departure.

If you’re not able to attend re:Invent in person this year, we’re offering the option to livestream our Keynotes and Innovation Talks. With the registration for online pass, you will have access to on-demand keynote, Innovation Talks, and selected breakout sessions after the event.

Channy

Speed up queries with the cost-based optimizer in Amazon Athena

Post Syndicated from Darshit Thakkar original https://aws.amazon.com/blogs/big-data/speed-up-queries-with-cost-based-optimizer-in-amazon-athena/

Amazon Athena is a serverless, interactive analytics service built on open source frameworks, supporting open table file formats. Athena provides a simplified, flexible way to analyze petabytes of data where it lives. You can analyze data or build applications from an Amazon Simple Storage Service (Amazon S3) data lake and 30 data sources, including on-premises data sources or other cloud systems using SQL or Python. Athena is built on open source Trino and Presto engines and Apache Spark frameworks, with no provisioning or configuration effort required.

Starting today, the Athena SQL engine uses a cost-based optimizer (CBO), a new feature that uses table and column statistics stored in the AWS Glue Data Catalog as part of the table’s metadata. By using these statistics, CBO improves query run plans and boosts the performance of queries run in Athena. Some of the specific optimizations CBO can employ include join reordering and pushing aggregations down based on the statistics available for each table and column.

TPC-DS benchmarks These benchmarks demonstrate the power of the cost-based optimizer—queries run up to 2x times faster with CBO enabled compared to running the same TPC-DS queries without CBO.

Performance and cost comparison on TPC-DS benchmarks

We used the industry-standard TPC-DS 3 TB to represent different customer use cases. These are representative of workloads with 10 times the stated benchmark size. This means a 3 TB benchmark dataset accurately represents customer workloads on 30–50 TB datasets.

In our testing, the dataset was stored in Amazon S3 in non-compressed Parquet format and the AWS Glue Data Catalog was used to store metadata for databases and tables. Fact tables were partitioned on the date column used for join operations, and each fact table consisted of 2,000 partitions. To help illustrate the performance of CBO, we compare the behavior of various queries and highlight the performance differences between running with CBO enabled vs. disabled.

The following graph illustrates the runtime of queries on the engine with and without CBO.

The following graph presents the top 10 queries from the TPC-DS benchmark with the greatest performance improvement.

Let’s discuss some of the cost-based optimization techniques that contributed to improved query performance.

Cost-based join reordering

Join reordering, an optimization technique used by cost-based SQL optimizers, analyzes different join sequences to select the order that minimizes query runtime by reducing intermediate data processed at each step, lowering memory and CPU requirements.

Let’s talk about query 82 of the TPC-DS 3TB dataset. The query performs inner joins on four tables: item, inventory, date_dim, and store_sales. The store_sales table has 8.6 billion rows and is partitioned by date. The inventory table has 1 billion rows and is also partitioned by date. The item table contains 360,000 rows, and the date_dim table holds 73,000 rows.

Query 82

select  i_item_id ,i_item_desc ,i_current_price
from item, inventory, date_dim, store_sales
where i_current_price between 30 and 30+30
and inv_item_sk = i_item_sk
and d_date_sk=inv_date_sk
and cast(d_date as date) between cast('2002-05-30' as date) and (cast('2002-05-30' as date) +  interval '60' day)
and i_manufact_id in (437,129,727,663)
and inv_quantity_on_hand between 100 and 500
and ss_item_sk = i_item_sk
group by i_item_id,i_item_desc,i_current_price
order by i_item_id
limit 100

Without CBO

Without using CBO, the engine will determine the join order based on the sequence of tables defined in the input query with internal heuristics. The FROM clause of the input query is "from item, inventory, date_dim, store_sales" (all inner joins). After passing through internal heuristics, Athena chose the join order as ((item ⋈ (inventorydate_dim)) ⋈ store_sales). Despite store_sales being the largest fact table, it’s defined last in the FROM clause and therefore gets joined last. This plan fails to reduce the intermediate join sizes as early as possible, resulting in an increased query runtime. The following diagram shows the join order without CBO and the number of rows flowing through different stages.

With CBO

When using CBO, the optimizer determines the best join order using a variety of data, including statistics as well as join size estimation, join build side, and join type. In this instance, Athena’s selected join order is ((store_salesitem) ⋈ (inventorydate_dim)). The largest fact table, store_sales, without being shuffled, is first joined with the item dimension table. The other partitioned table, inventory, is also first joined in-place with the date_dim dimension table. The join with the dimension table acts as a filter on the fact table, which dramatically reduces the input data size of the join that follows. Note that which side a table resides for a join is significant in Athena, because it’s the table on the right that will be built into memory for the join operation. Therefore, we always want to keep the larger table on the left and the smaller table on the right. CBO chose a plan that the left side was 8.6 billion before, and now it’s 13.6 million.

With CBO, the query runtime improved by 25% (from 15 seconds down to 11 seconds) by choosing the optimal join order.

Next, let’s discuss another CBO technique.

Cost-based aggregation pushdown

Aggregation pushdown is an optimization technique used by query optimizers to improve performance. It involves pushing aggregation operations like SUM, COUNT, and AVG into an earlier stage in the query plan, while maintaining the same query semantics. This reduces the amount of data transferred between the stages. By minimizing data processing, aggregation pushdown decreases memory usage, I/O costs, and network traffic.

However, pushing down aggregation is not always beneficial. It depends on the data distribution. For example, grouping on a column with many rows but few distinct values (like gender) before joins works better. Grouping first means aggregating a large number of records into fewer records (just male, female, for example). Grouping after joining means a large number of records have to participate the join before being aggregated. On the other hand, grouping on a high cardinality column is better done after joins. Doing it before risks unnecessary aggregation overhead because each value is likely unique anyway and that step will not result in an earlier reduction in the amount of data transferred between intermediate stages.

Therefore, whether to push down aggregation should be a cost-based decision. Let’s take example of the query 2 run on a 3TB TPC-DS dataset, showing how the aggregation pushdown’s value depends on data distribution. The web_sales table has 2.1 billion rows and the catalog_sales table has 4.23 billion rows. Both tables are partitioned on the date column.

Query 2

with wscs as
 (select sold_date_sk
        ,sales_price
  from (select ws_sold_date_sk sold_date_sk
              ,ws_ext_sales_price sales_price
        from web_sales 
        union all
        select cs_sold_date_sk sold_date_sk
              ,cs_ext_sales_price sales_price
        from catalog_sales)),
 wswscs as 
 (select d_week_seq,
        sum(case when (d_day_name='Sunday') then sales_price else null end) sun_sales,
        sum(case when (d_day_name='Monday') then sales_price else null end) mon_sales,
        sum(case when (d_day_name='Tuesday') then sales_price else  null end) tue_sales,
        sum(case when (d_day_name='Wednesday') then sales_price else null end) wed_sales,
        sum(case when (d_day_name='Thursday') then sales_price else null end) thu_sales,
        sum(case when (d_day_name='Friday') then sales_price else null end) fri_sales,
        sum(case when (d_day_name='Saturday') then sales_price else null end) sat_sales
 from wscs
     ,date_dim
 where d_date_sk = sold_date_sk
 group by d_week_seq)
 select d_week_seq1
       ,round(sun_sales1/sun_sales2,2)
       ,round(mon_sales1/mon_sales2,2)
       ,round(tue_sales1/tue_sales2,2)
       ,round(wed_sales1/wed_sales2,2)
       ,round(thu_sales1/thu_sales2,2)
       ,round(fri_sales1/fri_sales2,2)
       ,round(sat_sales1/sat_sales2,2)
 from
 (select wswscs.d_week_seq d_week_seq1
        ,sun_sales sun_sales1
        ,mon_sales mon_sales1
        ,tue_sales tue_sales1
        ,wed_sales wed_sales1
        ,thu_sales thu_sales1
        ,fri_sales fri_sales1
        ,sat_sales sat_sales1
  from wswscs,date_dim 
  where date_dim.d_week_seq = wswscs.d_week_seq and
        d_year = 2001) y,
 (select wswscs.d_week_seq d_week_seq2
        ,sun_sales sun_sales2
        ,mon_sales mon_sales2
        ,tue_sales tue_sales2
        ,wed_sales wed_sales2
        ,thu_sales thu_sales2
        ,fri_sales fri_sales2
        ,sat_sales sat_sales2
  from wswscs
      ,date_dim 
  where date_dim.d_week_seq = wswscs.d_week_seq and
        d_year = 2001+1) z
 where d_week_seq1=d_week_seq2-53
 order by d_week_seq1

Without CBO

Athena first joins the result of the union all operation on the web_sales table and the catalog_sales table with another table. Only then does it perform aggregation on the joined results. In this example, the amount of data that needed to be joined was huge, resulting in a longer query runtime.

With CBO

Athena utilizes one of the statistics values, the distinct value count, to evaluate the cost implications of pushing down the aggregation vs. not doing so. When a column has many rows but few distinct values, CBO is more likely to push aggregation down. This shrank the qualified rows from web_sales and catalog_sales tables to 2,590 and 3,590 rows, respectively. These aggregated records were then unioned and used to join with the tables. Comparing to the plan without CBO, the records participating in the join from the two large tables dropped from 6.33 billion rows (2.1 billion + 4.23 billion) to just 6,180 rows (2,590 + 3,590). This significantly decreased query runtime.

With CBO, the query runtime improved by 50% (from 37 seconds down to 18 seconds). In summary, CBO helped Athena choose an optimal aggregation pushdown plan, cutting the query time in half compared to not using cost-based optimization.

Conclusion

In this post, we discussed how Athena uses a cost-based optimizer (CBO) in its engine v3 to use table statistics for generating more efficient query run plans. Testing on the TPC-DS benchmark showed an 11% improvement in overall query performance when using CBO compared to without it.

Two key optimization employed by CBO are join reordering and aggregate pushdown. Join reordering reduces intermediate data by intelligently picking the order to join tables based on statistics. Aggregate pushdown decreases intermediate data by pushing aggregations earlier in the plan when beneficial.

In summary, Athena’s new cost-based optimizer significantly speeds up queries by choosing superior run plans. CBO optimizes based on table statistics stored in the AWS Glue Data Catalog. This automatic optimization improves productivity for Athena users through more responsive query performance. To take advantage of optimization techniques of CBO, refer to working with column statistics to generate statistics on the tables and columns in the AWS Glue Data Catalog.


About the Authors

Darshit Thakkar is a Technical Product Manager with AWS and works with the Amazon Athena team based out of Boston, Massachusetts.

Wei Zheng is a Sr. Software Development Engineer with Amazon Athena. He joined AWS in 2021 and has been working on multiple performance improvements on Athena.

Chuho Chang is a Software Development Engineer with Amazon Athena. He has been working on query optimizers for over a decade.

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Visualize Amazon DynamoDB insights in Amazon QuickSight using the Amazon Athena DynamoDB connector and AWS Glue

Post Syndicated from Antonio Samaniego Jurado original https://aws.amazon.com/blogs/big-data/visualize-amazon-dynamodb-insights-in-amazon-quicksight-using-the-amazon-athena-dynamodb-connector-and-aws-glue/

Amazon DynamoDB is a fully managed, serverless, key-value NoSQL database designed to run high-performance applications at any scale. DynamoDB offers built-in security, continuous backups, automated multi-Region replication, in-memory caching, and data import and export tools. The scalability and flexible data schema of DynamoDB make it well-suited for a variety of use cases. These include internet-scale web and mobile applications, low-latency metadata stores, high-traffic retail websites, Internet of Things (IoT) and time series data, online gaming, and more.

Data stored in DynamoDB is the basis for valuable business intelligence (BI) insights. To make this data accessible to data analysts and other consumers, you can use Amazon Athena. Athena is a serverless, interactive service that allows you to query data from a variety of sources in heterogeneous formats, with no provisioning effort. Athena accesses data stored in DynamoDB via the open source Amazon Athena DynamoDB connector. Table metadata, such as column names and data types, is stored using the AWS Glue Data Catalog.

Finally, to visualize BI insights, you can use Amazon QuickSight, a cloud-powered business analytics service. QuickSight makes it straightforward for organizations to build visualizations, perform ad hoc analysis, and quickly get business insights from their data, anytime, on any device. Its generative BI capabilities enable you to ask questions about your data using natural language, without having to write SQL queries or learn a BI tool.

This post shows how you can use the Athena DynamoDB connector to easily query data in DynamoDB with SQL and visualize insights in QuickSight.

Solution overview

The following diagram illustrates the solution architecture.

Architecture Diagram

  1. The Athena DynamoDB connector runs in a pre-built, serverless AWS Lambda function. You don’t need to write any code.
  2. AWS Glue provides supplemental metadata from the DynamoDB table. In particular, an AWS Glue crawler is run to infer and store the DynamoDB table format, schema, and associated properties in the Glue Data Catalog.
  3. The Athena editor is used to test the connector and perform analysis via SQL queries.
  4. QuickSight uses the Athena connector to visualize BI insights from DynamoDB.

This walkthrough uses data from the ProductCatalog table, part of the DynamoDB developer guide sample data files.

Prerequisites

Before you get started, you should meet the following prerequisites:

Set up the Athena DynamoDB connector

The Athena DynamoDB connector comprises a pre-built, serverless Lambda function provided by AWS that communicates with DynamoDB so you can query your tables with SQL using Athena. The connector is available in the AWS Serverless Application Repository, and is used to create the Athena data source for later use in data analysis and visualization. To set up the connector, complete the following steps:

  1. On the Athena console, choose Data sources in the navigation pane.
  2. Choose Create data source.
  3. In the search bar, search for and choose Amazon DynamoDB.
  4. Choose Next.
  5. Under Data source details, enter a name. Note that this name should be unique and will be referenced in your SQL statements when you query your Athena data source.
  6. Under Connection details, choose Create Lambda function.

This will take you to the Lambda applications page on the Lambda console. Do not close the Athena data source creation tab; you will return to it in a later step.

  1. Scroll down to Application settings and enter a value for the following parameters (leave the other parameters as default):
    • SpillBucket – Specifies the Amazon Simple Storage Service (Amazon S3) bucket name for storing data that exceeds Lambda function response size limits. To create an S3 bucket, refer to Creating a bucket.
    • AthenaCatalogName – A lowercase name for the Lambda function to be created.Lambda Application Settings
  2. Select the acknowledgement check box and choose Deploy.

Wait for deployment to complete before moving to the next step.

  1. Return to the Athena data source creation tab.
  2. Under Connection details, choose the refresh icon and choose the Lambda function you created.Lambda Connection Details
  3. Choose Next.
  4. Review and choose Create data source.

Provide supplemental metadata via AWS Glue

The Athena connector already comes with a built-in inference capability to discover the schema and table properties of your data source. However, this capability is limited. To accurately discover the metadata of your DynamoDB table and centralize schema management as your data evolves over time, the connector integrates with AWS Glue.

To achieve this, an AWS Glue crawler is run to automatically determine the format, schema, and associated properties of the raw data stored in your DynamoDB table, writing the resulting metadata to a Glue database. Glue databases contain tables, which hold metadata from different data stores, independent from the actual location of the data. The Athena connector then references the Glue table and retrieves the corresponding DynamoDB metadata to enable queries.

Create the AWS Glue database

Complete the following steps to create the Glue database:

  1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Databases.
  2. Choose Add database (you can also edit an existing database if you already have one).
  3. For Name, enter a database name.
  4. For Location, enter the string literal dynamo-db-flag. This keyword indicates that the database contains tables that the connector can use for supplemental metadata.
  5. Choose Create database.

Following security best practices, it is also recommended that you enable encryption at rest for your Data Catalog. For details, refer to Encrypting your Data Catalog.

Create the AWS Glue crawler

Complete the following steps to create and run the Glue crawler:

  1. On the AWS Glue console, under Data Catalog in the navigation pane, choose Crawlers.
  2. Choose Create crawler.
  3. Enter a crawler name and choose Next.
  4. For Data sources, choose Add a data source.
  5. On the Data source drop-down menu, choose DynamoDB. For Table name, enter the name of your DynamoDB table (string literal).
  6. Choose Add a DynamoDB data source.
  7. Choose Next.
  8. For IAM Role, choose Create new IAM role.
  9. Enter a role name and choose Create. This will automatically create an IAM role that trusts AWS Glue and has permissions to access the crawler targets.
  10. Choose Next.
  11. For Target database, choose the database previously created.
  12. Choose Next.
  13. Review and choose Create crawler.
  14. On the newly created crawler page, choose Run crawler.

Crawler runtimes depend on your DynamoDB table size and properties. You can find crawler run details under Crawler runs.

Validate the output metadata

When your crawler run status shows as Completed, follow the below steps to validate the output metadata:

  1. On the AWS Glue console, choose Tables in the navigation pane. Here, you can confirm a new table has been added to the database as a result of the crawler run.
  2. Navigate to the newly created table and take a look at the Schema tab. This tab shows the column names, data types, and other parameters inferred from your DynamoDB table.
  3. If needed, edit the schema by choosing Edit schema.Glue Table Details
  4. Choose Advanced properties.
  5. Under Table properties, verify the crawler automatically created and set the classification key to dynamodb. This indicates to the Athena connector that the table can be used for supplemental metadata.
  6. Optionally, add the following properties to correctly catalog and reference DynamoDB data in AWS Glue and Athena queries. This is due to capital letters not being permitted in AWS Glue table and column names, but being permitted in DynamoDB table and attribute names.
    1. If your DynamoDB table name contains any capital letters, choose Actions and Edit Table and add an extra table property as follows:
      • Key: sourceTable
      • Value: YourDynamoDBTableName
    2. If your DynamoDB table has attributes that contain any capital letters, add an extra table property as follows:
      • Key: columnMapping
      • Value: yourcolumn1=YourColumn1, yourcolumn2=YourColumn2, …

Test the connector with the Athena SQL editor

After the Athena DynamoDB connector is deployed and the AWS Glue table is populated with supplemental metadata, the DynamoDB table is ready for analysis. The example in this post uses the Athena editor to make SQL queries to the ProductCatalog table. For further options to interact with Athena, see Accessing Athena.

Complete the following steps to test the connector:

  1. Open the Athena query editor.
  2. If this is your first time visiting the Athena console in your current AWS Region, complete the following steps. This is a prerequisite before you can run Athena queries. See Getting Started for more details.
    1. Choose Query editor in the navigation pane to open the editor.
    2. Navigate to Settings and choose Manage to set up a query result location in Amazon S3.
  3. Under Data, select the data source and database you created (you may need to choose the refresh icon for them to sync up with Athena).
  4. Tables belonging to the selected database appear under Tables. You can choose a table name for Athena to show the table column list and data types.
  5. Test the connector by pulling data from your table via a SELECT statement. When you run Athena queries, you can reference Athena data sources, databases, and tables as <datasource_name>.<database>.<table_name>. Retrieved records are shown under Results.

For increased security, refer to Encrypting Athena query results stored in Amazon S3 to encrypt query results at rest.

Athena Query Results

For this post, we run a SELECT statement to validate the process. You can refer to the SQL reference for Athena to build more complex queries and analyses.

Visualize in QuickSight

QuickSight allows for building modern interactive dashboards, paginated reports, embedded analytics, and natural language queries through a unified BI solution. In this step, we use QuickSight to generate visual insights from the DynamoDB table by connecting to the Athena data source previously created.

Allow QuickSight to access to resources

Complete the following steps to grant QuickSight access to resources:

  1. On the QuickSight console, choose the profile icon and choose Manage QuickSight.
  2. In the navigation pane, choose Security & Permissions.
  3. Under QuickSight access to AWS services, choose Manage.
  4. QuickSight may ask you to switch to the Region in which users and groups in your account are managed. To change the current Region, navigate to the profile icon on the QuickSight console and choose the Region you want to switch to.
  5. For IAM Role, choose Use QuickSight-managed role (default).

Subsequent instructions assume that the default QuickSight-managed role is being used. If this is not the case, make sure to update the existing role to the same effect.

  1. Under Allow access and autodiscovery for these resources, select IAM and Amazon S3.
  2. For Amazon S3, choose Select S3 buckets.
  3. Choose the spill bucket you specified in earlier when deploying the Lambda function for the connector and the bucket you specified as the Athena query result location in Amazon S3.
  4. For both buckets, select Write permission for Athena Workgroup.
  5. Choose Amazon Athena.
  6. In the pop-up window, choose Next.
  7. Choose Lambda and choose the Amazon Resource Name (ARN) of the Lambda function previously used for the Athena data source connector.
  8. Choose Finish.
  9. Choose Save.

Create the Athena dataset

To create the Athena dataset, complete the following steps:

  1. On the QuickSight console, choose the user profile and switch to the Region you deployed the Athena data source to.
  2. Return to the QuickSight home page.
  3. In the navigation pane, choose Datasets.
  4. Choose New dataset.
  5. For Create a Dataset, select Athena.
  6. For Data source name, enter a name and choose Validate connection.
  7. When the connection shows as Validated, choose Create data source.
  8. Under Catalog, Database, and Tables, select the Athena data source, AWS Glue database, and AWS Glue table previously created.
  9. Choose Select.
  10. On the Finish dataset creation page, select Import to SPICE for quicker analytics.
  11. Choose Visualize.

For additional information on QuickSight query modes, see Importing data into SPICE and Using SQL to customize data.

Build QuickSight visualizations

Once the DynamoDB data is available in QuickSight via the Athena DynamoDB connector, it is ready to be visualized. The QuickSight analysis in the below example shows a vertical stacked bar chart with the average price per product category for the ProductCatalog sample dataset. In addition, it shows a donut chart with the proportion of products by product category, and a tree map containing the count of bicycles per bicycle type.

If you use data imported to SPICE in a QuickSight analysis, the dataset will only be available after the import is complete. For further details, see Using SPICE data in an analysis.

Quicksight Analysis

For comprehensive information on how to create and share visualizations in QuickSight, refer to Visualizing data in Amazon QuickSight and Sharing and subscribing to data in Amazon QuickSight.

Clean up

To avoid incurring continued AWS usage charges, make sure you delete all resources created as part of this walkthrough.

  • Delete the Athena data source:
    1. On the Athena console, switch to the Region you deployed your resources in.
    2. Choose Data sources in the navigation pane.
    3. Select the data source you created and on the Actions menu, choose Delete.
  • Delete the Lambda application:
    1. On the AWS CloudFormation console, switch to the Region you deployed your resources in.
    2. Choose Stacks in the navigation pane.
    3. Select serverlessrepo-AthenaDynamoDBConnector and choose Delete.
  • Delete the AWS Glue resources:
    1. On the AWS Glue console, switch to the Region you deployed your resources in.
    2. Choose Databases in the navigation pane.
    3. Select the database you created and choose Delete.
    4. Choose Crawlers in the navigation pane.
    5. Select the crawler you created and on the Action menu, choose Delete crawler.
  • Delete the QuickSight resources:
    1. On the QuickSight console, switch to the Region you deployed your resources in.
    2. Delete the analysis created for this walkthrough.
    3. Delete the Athena dataset created for this walkthrough.
    4. If you no longer need the Athena data source to create other datasets, delete the data source.

Summary

This post demonstrated how you can use the Athena DynamoDB connector to query data in DynamoDB with SQL and build visualizations in QuickSight.

Learn more about the Athena DynamoDB connector in the Amazon Athena User Guide. Discover more available data source connectors to query and visualize a variety of data sources without setting up or managing any infrastructure while only paying for the queries you run.

For advanced QuickSight capabilities powered by AI, see Gaining insights with machine learning (ML) in Amazon QuickSight and Answering business questions with Amazon QuickSight Q.


About the Authors

Antonio Samaniego Jurado is a Solutions Architect at Amazon Web Services. With a strong passion for modern technology, Antonio helps customers build state-of-the-art applications on AWS. A creator at heart, he loves community-driven learning and sharing of best practices across the AWS service portfolio to make the best of customers cloud journey.

Pascal Vogel is a Solutions Architect at Amazon Web Services. Pascal helps startups and enterprises build cloud-native solutions. As a cloud enthusiast, Pascal loves learning new technologies and connecting with like-minded customers who want to make a difference in their cloud journey.

BMW Cloud Efficiency Analytics powered by Amazon QuickSight and Amazon Athena

Post Syndicated from Phillip Karg original https://aws.amazon.com/blogs/big-data/bmw-cloud-efficiency-analytics-powered-by-amazon-quicksight-and-amazon-athena/

This post is written in collaboration with Philipp Karg and Alex Gutfreund  from BMW Group.

Bayerische Motoren Werke AG (BMW) is a motor vehicle manufacturer headquartered in Germany with 149,475 employees worldwide and the profit before tax in the financial year 2022 was € 23.5 billion on revenues amounting to € 142.6 billion. BMW Group is one of the world’s leading premium manufacturers of automobiles and motorcycles, also providing premium financial and mobility services.

BMW Group uses 4,500 AWS Cloud accounts across the entire organization but is faced with the challenge of reducing unnecessary costs, optimizing spend, and having a central place to monitor costs. BMW Cloud Efficiency Analytics (CLEA) is a homegrown tool developed within the BMW FinOps CoE (Center of Excellence) aiming to optimize and reduce costs across all these accounts.

In this post, we explore how the BMW Group FinOps CoE implemented their Cloud Efficiency Analytics tool (CLEA), powered by Amazon QuickSight and Amazon Athena. With this tool, they effectively reduced costs and optimized spend across all their AWS Cloud accounts, utilizing a centralized cost monitoring system and using key AWS services. The CLEA dashboards were built on the foundation of the Well-Architected Lab. For more information on this foundation, refer to A Detailed Overview of the Cost Intelligence Dashboard.

CLEA gives full transparency into cloud costs, usage, and efficiency from a high-level overview to granular service, resource, and operational levels. It seamlessly consolidates data from various data sources within AWS, including AWS Cost Explorer (and forecasting with Cost Explorer), AWS Trusted Advisor, and AWS Compute Optimizer. Additionally, it incorporates BMW Group’s internal system to integrate essential metadata, offering a comprehensive view of the data across various dimensions, such as group, department, product, and applications.

The ultimate goal is to raise awareness of cloud efficiency and optimize cloud utilization in a cost-effective and sustainable manner. The dashboards, which offer a holistic view together with a variety of cost and BMW Group-related dimensions, were successfully launched in May 2023 and became accessible to users within the BMW Group.

Overview of the BMW Cloud Data Hub

At the BMW Group, Cloud Data Hub (CDH) is the central platform for managing company-wide data and data solutions. It works as a bundle for resources that are bound to a specific staging environment and Region to store data on Amazon Simple Storage Service (Amazon S3), which is renowned for its industry-leading scalability, data availability, security, and performance. Additionally, it manages table definitions in the AWS Glue Data Catalog, containing references to data sources and targets of extract, transform, and load (ETL) jobs in AWS Glue.

Data providers and consumers are the two fundamental users of a CDH dataset. Providers create datasets within assigned domain and as the owner of a dataset, they are responsible for the actual content and for providing appropriate metadata. They can use their own toolsets or rely on provided blueprints to ingest the data from source systems. Once released, consumers use datasets from different providers for analysis, machine learning (ML) workloads, and visualization.

Each CDH dataset has three processing layers: source (raw data), prepared (transformed data in Parquet), and semantic (combined datasets). It is possible to define stages (DEV, INT, PROD) in each layer to allow structured release and test without affecting PROD. Within each stage, it’s possible to create resources for storing actual data. Two resource types are associated with each database in a layer:

  • File store – S3 buckets for data storage
  • Database – AWS Glue databases for metadata sharing

Overview of the CLEA Landscape

The following diagram is a high-level overview of some of the technologies used for the extract, load, and transform (ELT) stages, as well as the final visualization and analysis layer. You might notice that this differs slightly from traditional ETL. The difference lies in when and where data transformation takes place. In ETL, data is transformed before it’s loaded into the data warehouse. In ELT, raw data is loaded into the data warehouse first, then it’s transformed directly within the warehouse. The ELT process has gained popularity with the rise of cloud-based, high-performance data warehouses, where transformation can be done more efficiently after loading.

Regardless of the method used, the goal is to provide high-quality, reliable data that can be used to drive business decisions.

CLEA Architecture

In this section, we take a closer look at the three essential stages mentioned previously: extract, load and transform.

Extract

The extract stage plays a pivotal role in the CLEA, serving as the initial step where data related to cost and usage and optimization is collected from a diverse range of sources within AWS. These sources encompass the AWS Cost and Usage Reports, Cost Explorer (and forecasting with Cost Explorer), Trusted Advisor, and Compute Optimizer. Furthermore, it fetches essential metadata from BMW Group’s internal system, offering a comprehensive view of the data across various dimensions, such as group, department, product, and applications in the later stages of data transformation.

The following diagram illustrates one of the data collection architectures that we use to collect Trusted Advisor data from nearly 4,500 AWS accounts and subsequently load that into Cloud Data Hub.

Let’s go through each numbered step as outlined in the architecture:

  1. A time-based rule in Amazon EventBridge triggers the CLEA Shared Workflow AWS Step Functions state machine.
  2. Based on the inputs, the Shared Workflow state machine invokes the Account Collector AWS Lambda function to retrieve AWS account details from AWS Organizations.
  3. The Account Collector Lambda function assumes an AWS Identity and Access Management (IAM) role to access linked account details via the Organizations API and writes them to Amazon Simple Queue Service (Amazon SQS) queues.
  4. The SQS queues trigger the Data Collector Lambda function using SQS Lambda triggers.
  5. The Data Collector Lambda function assumes an IAM role in each linked account to retrieve the relevant data and load it into the CDH source S3 bucket.
  6. When all linked accounts data is collected, the Shared Workflow state machine triggers an AWS Glue job for further data transformation.
  7. The AWS Glue job reads raw data from the CDH source bucket and transforms it into a compact Parquet format.

Load and transform

For the data transformations, we used an open-source data transformation tool called dbt (Data Build Tool), modifying and preprocessing the data through a number of abstract data layers:

  • Source – This layer contains the raw data the data source provides. The preferred data format is Parquet, but JSON, CSV, or plain text file are also allowed.
  • Prepared – The source layer is transformed and stored as the prepared layer in Parquet format for optimized columnar access. Initial cleaning, filtering, and basic transformations are performed in this layer.
  • Semantic – A semantic layer combines several prepared layer datasets to a single dataset that contains transformations, calculations, and business logic to deliver business-friendly insights.
  • QuickSight – QuickSight is the final presentation layer, which is directly ingested into QuickSight SPICE from Athena via incremental daily ingestion queries. These ingested datasets are used as a source in CLEA dashboards.

Overall, using dbt’s data modeling and the pay-as-you-go pricing of Athena, BMW Group can control costs by running efficient queries on demand. Furthermore, with the serverless architecture of Athena and dbt’s structured transformations, you can scale data processing without worrying about infrastructure management. In CLEA there are currently more than 120 dbt models implemented with complex transformations. The semantic layer is incrementally materialized and partially ingested into QuickSight with up to 4 TB of SPICE capacity. For dbt deployment and scheduling, we use GitHub Actions which allows us to introduce new dbt models and changes easily with automatic deployments and tests.

CLEA Access control

In this section, we explain how we implemented access control using row-level security in QuickSight and QuickSight embedding for authentication and authorization.

RLS for QuickSight

Row-level security (RLS) is a key feature that governs data access and privacy, which we implemented for CLEA. RLS is a mechanism that allows us to control the visibility of data at the row level based on user attributes. In essence, it ensures that users can only access the data that they are authorized to view, adding an additional layer of data protection within the QuickSight environment.

Understanding the importance of RLS requires a broader view of the data landscape. In organizations where multiple users interact with the same datasets but require different access levels due to their roles, RLS becomes a pivotal tool. It ensures data security and compliance with privacy regulations, preventing unauthorized access to sensitive data. Additionally, it offers a tailored user experience by displaying only relevant data to the user, thereby enhancing the effectiveness of data analysis.

For CLEA, we collected BMW Group metadata such as department, application, and organization, which are quite important to allow users to only see the accounts within their department, application, organization, and so on. This is achieved using both a user name and group name for access control. We use the user name for user-specific access control and the group name for adding some users to a specific group to extend their permissions for different use cases.

Lastly, because there are many dashboards created by CLEA, we also control which users a unique user can see and also the data itself in the dashboard. This is done at the group level. By default, all users are assigned to CLEA-READER, which is granted access to core dashboards that we want to share with users, but there are different groups that allow users to see additional dashboards after they’re assigned to that group.

The RLS dataset is refreshed daily to catch recent changes regarding new user additions, group changes, or any other user access changes. This dataset is also ingested to SPICE daily, which automatically updates all datasets restricted via this RLS dataset.

QuickSight embedding

CLEA is a cross-platform application that provides secure access to QuickSight embedded content with custom-built authentication and authorization logic that sits on top of BMW Group identity and role management services (referred to as BMW IAM).

CLEA provides access to sensitive data to multiple users with different permissions, which is why it is designed with fine-grained access control rules. It enforces access control using role-based access control (RBAC) and attribute-based access control (ABAC) models at two different levels:

  • At the dashboard level via QuickSight user groups (RBAC)
  • At the dashboard data level via QuickSight RLS (RBAC and ABAC)

Dashboard-level permissions define the list of dashboards users are able to visualize.

Dashboard data-level permissions define the subsets of dashboard data shown to the user and are applied using RLS with the user attributes mentioned earlier. Although the majority of roles defined in CLEA are used for dashboard-level permissions, some specific roles are strategically defined to grant permissions at the dashboard data level, taking priority over the ABAC model.

BMW has a defined set of guidelines suggesting the usage of their IAM services as the single source of truth for identity and access control, which the team took into careful consideration when designing the authentication and authorization processes for CLEA.

Upon their first login, users are automatically registered in CLEA and assigned a base role that grants them access to a basic set of dashboards.

The process of registering users in CLEA consists of mapping a user’s identity as retrieved from BMW’s identity provider (IdP) to a QuickSight user, then assigning the newly created user to the respective QuickSight user group.

For users that require more extensive permissions (at one of the levels mentioned before), it is possible to order additional role assignments via BMW’s self-service portal for role management. Authorized reviewers will then review it and either accept or reject the role assignments.

Role assignments will take effect the next time the user logs in, at which time the user’s assigned roles in BMW Group IAM are synced to the user’s QuickSight groups—internally referred to as the identity and permissions sync. As shown in the following diagram, the sync groups step calculates which users’ group memberships should be kept, created, and deleted following the logic.

Usage Insights

Amazon CloudWatch plays an indispensable role in enhancing the efficiency and usability of CLEA dashboards. Not only does CloudWatch offer real-time monitoring of AWS resources, but it also allows to track user activity and dashboard utilization. By analyzing usage metrics and logs, we can see who has logged in to the CLEA dashboards, what features are most frequently accessed, and how long users interact with various elements. These insights are invaluable for making data-driven decisions on how to improve the dashboards for a better user experience. Through the intuitive interface of CloudWatch, it’s possible to set up alarms for alerting about abnormal activities or performance issues. Ultimately, employing CloudWatch for monitoring offers a comprehensive view of both system health and user engagement, helping us refine and enhance our dashboards continually.

Conclusion

BMW Group’s CLEA platform offers a comprehensive and effective solution to manage and optimize cloud resources. By providing full transparency into cloud costs, usage, and efficiency, CLEA offers insights from high-level overviews to granular details at the service, resource, and operational level.

CLEA aggregates data from various sources, enabling a detailed roadmap of the cloud operations, tracking footprints across primes, departments, products, applications, resources, and tags. This dynamic vision helps identify trends, anticipate future needs, and make strategic decisions.

Future plans for CLEA include enhancing capabilities with data consistency and accuracy, integrating additional sources like Amazon S3 Storage Lens for deeper insights, and introducing Amazon QuickSight Q for intelligent recommendations powered by machine learning, further streamlining cloud operations.

By following the practices here, you can unlock the potential of efficient cloud resource management by implementing Cloud Intelligence Dashboards, providing you with precise insights into costs, savings, and operational effectiveness.


About the Authors

Philipp Karg is Lead FinOps Engineer at BMW Group and founder of the CLEA platform. He focus on boosting cloud efficiency initiatives and establishing a cost-aware culture within the company to ultimately leverage the cloud in a sustainable way.

Alex Gutfreund is Head of Product and Technology Integration at the BMW Group. He spearheads the digital transformation with a particular focus on platforms ecosystems and efficiencies. With extensive experience at the interface of business and IT, he drives change and makes an impact in various organizations. His industry knowledge spans from automotive, semiconductor, public transportation, and renewable energies.

Cizer Pereira is a Senior DevOps Architect at AWS Professional Services. He works closely with AWS customers to accelerate their journey to the cloud. He has a deep passion for Cloud Native and DevOps, and in his free time, he also enjoys contributing to open-source projects.

Selman Ay is a Data Architect in the AWS Professional Services team. He has worked with customers from various industries such as e-commerce, pharma, automotive and finance to build scalable data architectures and generate insights from the data. Outside of work, he enjoys playing tennis and engaging in outdoor activities.

Nick McCarthy is a Senior Machine Learning Engineer in the AWS Professional Services team. He has worked with AWS clients across various industries including healthcare, finance, sports, telecoms and energy to accelerate their business outcomes through the use of AI/ML. Outside of work Nick loves to travel, exploring new cuisines and cultures in the process.

Miguel Henriques is a Cloud Application Architect in the AWS Professional Services team with 4 years of experience in the automotive industry delivering cloud native solutions. In his free time, he is constantly looking for advancements in the web development space and searching for the next great pastel de nata.

Managing AWS Lambda runtime upgrades

Post Syndicated from Julian Wood original https://aws.amazon.com/blogs/compute/managing-aws-lambda-runtime-upgrades/

This post is written by Julian Wood, Principal Developer Advocate, and Dan Fox, Principal Specialist Serverless Solutions Architect.

AWS Lambda supports multiple programming languages through the use of runtimes. A Lambda runtime provides a language-specific execution environment, which provides the OS, language support, and additional settings, such as environment variables and certificates that you can access from your function code.

You can use managed runtimes that Lambda provides or build your own. Each major programming language release has a separate managed runtime, with a unique runtime identifier, such as python3.11 or nodejs20.x.

Lambda automatically applies patches and security updates to all managed runtimes and their corresponding container base images. Automatic runtime patching is one of the features customers love most about Lambda. When these patches are no longer available, Lambda ends support for the runtime. Over the next few months, Lambda is deprecating a number of popular runtimes, triggered by end of life of upstream language versions and of Amazon Linux 1.

Runtime Deprecation
Node.js 14 Nov 27, 2023
Node.js 16 Mar 11, 2024
Python 3.7 Nov 27, 2023
Java 8 (Amazon Linux 1) Dec 31, 2023
Go 1.x Dec 31, 2023
Ruby 2.7 Dec 07, 2023
Custom Runtime (provided) Dec 31, 2023

Runtime deprecation is not unique to Lambda. You must upgrade code using Python 3.7 or Node.js 14 when those language versions reach end of life, regardless of which compute service your code is running on. Lambda can help make this easier by tracking which runtimes you are using and providing deprecation notifications.

This post contains considerations and best practices for managing runtime deprecations and upgrades when using Lambda. Adopting these techniques makes managing runtime upgrades easier, especially when working with a large number of functions.

Specifying Lambda runtimes

When you deploy your function as a .zip file archive, you choose a runtime when you create the function. To change the runtime, you can update your function’s configuration.

Lambda keeps each managed runtime up to date by taking on the operational burden of patching the runtimes with security updates, bug fixes, new features, performance enhancements, and support for minor version releases. These runtime updates are published as runtime versions. Lambda applies runtime updates to functions by migrating the function from an earlier runtime version to a new runtime version.

You can control how your functions receive these updates using runtime management controls. Runtime versions and runtime updates apply to patch updates for a given Lambda runtime. Lambda does not automatically upgrade functions between major language runtime versions, for example, from nodejs14.x to nodejs18.x.

For a function defined as a container image, you choose a runtime and the Linux distribution when you create the container image. Most customers start with one of the Lambda base container images, although you can also build your own images from scratch. To change the runtime, you create a new container image from a different base container image.

Why does Lambda deprecate runtimes?

Lambda deprecates a runtime when upstream runtime language maintainers mark their language end-of-life or security updates are no longer available.

In almost all cases, the end-of-life date of a language version or operating system is published well in advance. The Lambda runtime deprecation policy gives end-of-life schedules for each language that Lambda supports. Lambda notifies you by email and via your Personal Health Dashboard if you are using a runtime that is scheduled for deprecation.

Lambda runtime deprecation happens in several stages. Lambda first blocks creating new functions that use a given runtime. Lambda later also blocks updating existing functions using the unsupported runtime, except to update to a supported runtime. Lambda does not block invocations of functions that use a deprecated runtime. Function invocations continue indefinitely after the runtime reaches end of support.

Lambda is extending the deprecation notification period from 60 days before deprecation to 180 days. Previously, blocking new function creation happened at deprecation and blocking updates to existing functions 30 days later. Blocking creation of new functions now happens 30 days after deprecation, and blocking updates to existing functions 60 days after.

Lambda occasionally delays deprecation of a Lambda runtime for a limited period beyond the end of support date of the language version that the runtime supports. During this period, Lambda only applies security patches to the runtime OS. Lambda doesn’t apply security patches to programming language runtimes after they reach their end of support date.

Can Lambda automatically upgrade my runtime?

Moving from one major version of the language runtime to another has a significant risk of being a breaking change. Some libraries and dependencies within a language have deprecation schedules and do not support versions of a language past a certain point. Moving functions to new runtimes could potentially impact large-scale production workloads that customers depend on.

Since Lambda cannot guarantee backward compatibility between major language versions, upgrading the Lambda runtime used by a function is a customer-driven operation.

Lambda function versions

You can use function versions to manage the deployment of your functions. In Lambda, you make code and configuration changes to the default function version, which is called $LATEST. When you publish a function version, Lambda takes a snapshot of the code, runtime, and function configuration to maintain a consistent experience for users of that function version. When you invoke a function, you can specify the version to use or invoke the $LATEST version. Lambda function versions are required when using Provisioned Concurrency or SnapStart.

Some developers use an auto-versioning process by creating a new function version each time they deploy a change. This results in many versions of a function, with only a single version actually in use.

While Lambda applies runtime updates to published function versions, you cannot update the runtime major version for a published function version, for example from Node.js 16 to Node.js 20. To update the runtime for a function, you must update the $LATEST version, then create a new published function version if necessary. This means that different versions of a function can use different runtimes. The following shows the same function with version 1 using Node.js 14.x and version 2 using Node.js 18.x.

Version 1 using Node.js 14.x

Version 1 using Node.js 14.x

Version 2 using Node.js 18.x

Version 2 using Node.js 18.x

Ensure you create a maintenance process for deleting unused function versions, which also impact your Lambda storage quota.

Managing function runtime upgrades

Managing function runtime upgrades should be part of your software delivery lifecycle, in a similar way to how you treat dependencies and security updates. You need to understand which functions are being actively used in your organization. Organizations can create prioritization based on security profiles and/or function usage. You can use the same communication mechanisms you may already be using for handling security vulnerabilities.

Implement preventative guardrails to ensure that developers can only create functions using supported runtimes. Using infrastructure as code, CI/CD pipelines, and robust testing practices makes updating runtimes easier.

Identifying impacted functions

There are tools available to check Lambda runtime configuration and to identify which functions and what published function versions are actually in use. Deleting a function or function version that is no longer in use is the simplest way to avoid runtime deprecations.

You can identify functions using deprecated or soon to be deprecated runtimes using AWS Trusted Advisor. Use the AWS Lambda Functions Using Deprecated Runtimes check, in the Security category that provides 120 days’ notice.

AWS Trusted Advisor Lambda functions using deprecated runtimes

AWS Trusted Advisor Lambda functions using deprecated runtimes

Trusted Advisor scans all versions of your functions, including $LATEST and published versions.

The AWS Command Line Interface (AWS CLI) can list all functions in a specific Region that are using a specific runtime. To find all functions in your account, repeat the following command for each AWS Region and account. Replace the <REGION> and <RUNTIME> parameters with your values. The --function-version ALL parameter causes all function versions to be returned; omit this parameter to return only the $LATEST version.

aws lambda list-functions --function-version ALL --region <REGION> --output text —query "Functions[?Runtime=='<RUNTIME>'].FunctionArn"

You can use AWS Config to create a view of the configuration of resources in your account and also store configuration snapshot data in Amazon S3. AWS Config queries do not support published function versions, they can only query the $LATEST version.

You can then use Amazon Athena and Amazon QuickSight to make dashboards to visualize AWS Config data. For more information, see the Implementing governance in depth for serverless applications learning guide.

Dashboard showing AWS Config data

Dashboard showing AWS Config data

There are a number of ways that you can track Lambda function usage.

You can use Amazon CloudWatch metrics explorer to view Lambda by runtime and track the Invocations metric within the default CloudWatch metrics retention period of 15 months.

Track invocations in Amazon CloudWatch metrics

Track invocations in Amazon CloudWatch metrics

You can turn on AWS CloudTrail data event logging to log an event every time Lambda functions are invoked. This helps you understand what identities are invoking functions and the frequency of their invocations.

AWS Cost and Usage Reports can show which functions are incurring cost and in use.

Limiting runtime usage

AWS CloudFormation Guard is an open-source evaluation tool to validate infrastructure as code templates. Create policy rules to ensure that developers only chose approved runtimes. For more information, see Preventative Controls with AWS CloudFormation Guard.

AWS Config rules allow you to check that Lambda function settings for the runtime match expected values. For more information on running these rules before deployment, see Preventative Controls with AWS Config. You can also reactively flag functions as non-compliant as your governance policies evolve. For more information, see Detective Controls with AWS Config.

Lambda does not currently have service control policies (SCP) to block function creation based on the runtime

Upgrade best practices

Use infrastructure as code tools to build and manage your Lambda functions, which can make it easier to manage upgrades.

Ensure you run tests against your functions when developing locally. Include automated tests as part of your CI/CD pipelines to provide confidence in your runtime upgrades. When rolling out function upgrades, you can use weighted aliases to shift traffic between two function versions as you monitor for errors and failures.

Using runtimes after deprecation

AWS strongly advises you to upgrade your functions to a supported runtime before deprecation to continue to benefit from security patches, bug-fixes, and the latest runtime features. While deprecation does not affect function invocations, you will be using an unsupported runtime, which may have unpatched security vulnerabilities. Your function may eventually stop working, for example, due to a certificate expiry.

Lambda blocks function creation and updates for functions using deprecated runtimes. To create or update functions after these operations are blocked, contact AWS Support.

Conclusion

Lambda is deprecating a number of popular runtimes over the next few months, reflecting the end-of-life of upstream language versions and Amazon Linux 1. This post covers considerations for managing Lambda function runtime upgrades.

For more serverless learning resources, visit Serverless Land.

Deploy Amazon QuickSight dashboards to monitor AWS Glue ETL job metrics and set alarms

Post Syndicated from Michael Hamilton original https://aws.amazon.com/blogs/big-data/deploy-amazon-quicksight-dashboards-to-monitor-aws-glue-etl-job-metrics-and-set-alarms/

No matter the industry or level of maturity within AWS, our customers require better visibility into their AWS Glue usage. Better visibility can lend itself to gains in operational efficiency, informed business decisions, and further transparency into your return on investment (ROI) when using the various features available through AWS Glue.

As your company grows, you should be able to answer simple questions about your AWS Glue usage, such as the following:

  • Where am I spending the most with AWS Glue?
  • Where can I save the most by taking advantage of new AWS Glue features?
  • What does my overall usage look like using AWS Glue?

AWS offers services such as Amazon QuickSight, a serverless business intelligence (BI) service that lets you centralize this view and even ask natural language questions of your data, using Amazon QuickSight Q. QuickSight can give business leaders and their technology counterparts a common landscape for reporting important details of their usage, providing automated narratives to bridge communication gaps.

In this post, we explore how to combine AWS Glue usage information and metrics with centralized reporting and visualization using QuickSight. This can provide you with a more comprehensive view of your usage and tools to help you dive deep into your AWS Glue job run environment. You have metrics available per job run within the AWS Glue console, but they don’t cover all available AWS Glue job metrics, and the visuals aren’t as interactive compared to the QuickSight dashboard.

Although we don’t cover optimizing your jobs for costs in this post, you can refer to Monitor and optimize cost on AWS Glue for Apache Spark to learn how to fine-tune your AWS Glue jobs for performance, efficiency ,and cost-optimization.

Let’s dive in!

Solution overview

The following diagram illustrates the architecture for the given solution. At a high level, a scheduled event triggers an orchestration flow consisting of multiple data, compute, and analytics resources—the output of which culminates as a set of visuals in a BI dashboard.

solution architecture

Now let’s dig into the technical details involved in this solution.

An AWS Step Functions workflow is scheduled to run once per hour through Amazon EventBridge, which triggers an AWS Lambda function that calls the AWS Glue GetJob and GetJobRun APIs. We parse this data to check for jobs that have succeeded, stopped, or failed in the past hour, as well as any streaming jobs. The metadata is extracted from each job run, including information like runtime, start time, end time, auto scaling, number of workers, and worker type, and is written to an Amazon DynamoDB table with TTL (time to live) enabled to ensure the table doesn’t grow too large.

We move into a parallel state to check two tables that Amazon Athena writes the output of the federated queries to. Athena first checks to make sure the tables exist in Amazon Simple Storage Service (Amazon S3), where the data will be stored. If the tables don’t exist, Athena creates them. One federated query gathers AWS Glue metric data from Amazon CloudWatch metrics; the other gathers data from the DynamoDB table where Lambda writes the AWS Glue job metadata it’s collecting. Both federated queries utilize appropriate filtering in order to only scan the necessary data from each source.

There is a choice state for each branch. If there is no new data to be added to a table in Amazon S3, the state ends and waits for the other to complete. For example, there could be an AWS Glue job that is running while the step is evaluating. In this case, the metrics for the job would be inserted in the table on Amazon S3, but the metadata from DynamoDB wouldn’t arrive until the following hour after the job has succeeded, stopped, or failed.

When new metrics or metadata are found, Athena inserts this data to the metrics or metadata tables in Amazon S3, which are both partitioned by the hour. After the data is inserted, the final steps call the QuickSight CreateIngestion API, which triggers data ingestion into QuickSight SPICE to power interactive analysis. At this point, the workflow has finished running and will run again the following hour.

In the following sections, we show you how to set up the solution, explore the dashboards, and configure alarms.

The code for this solution can be found at the AWS samples GitHub repository.

Prerequisites

You should have the following prerequisites:

Deploy solution resources with the AWS CDK

To provision the resources that build the dashboard and keep it up to date, we provide steps to download and deploy the solution via the AWS CDK. The solution was developed with cost-optimization as a priority, but some resources in the stack will incur costs once deployed.

This solution generates the following resources:

  • IAM role
  • EventBridge rule
  • Step Functions state machine
  • Lambda function
  • S3 bucket
  • Two AWS Glue tables and one AWS Glue database
  • DynamoDB table
  • Athena queries invoked by Step Functions
  • QuickSight data source, dataset, analysis, and dashboard

To deploy the solution, complete the following steps:

  1. Clone the source code from AWS samples GitHub repository to the client:
    git clone https://github.com/aws-samples/glue-metrics-in-quicksight

  2. Bootstrap your AWS CDK app:
    cd glue-metrics-in-quicksight
    npm i aws-cdk-lib
    cdk bootstrap

  3. Deploy the solution with the required parameters:
    1. The first parameter is for a new S3 bucket to be created, which holds the AWS Glue metrics and metadata.
    2. The second parameter is required in order for QuickSight to assign permissions to the user who will manage the assets. Refer to Managing user access inside Amazon QuickSight to find your existing QuickSight users.
      cdk deploy --parameters BucketName=New-Unique-Bucket-Name --parameters QuicksightUsername=QuickSight-Existing-User

If your deployment fails, make sure you installed the AWS CDK library and rerun cdk deploy after installing:

npm i aws-cdk-lib

The deployment may take up to 10 minutes.

After the solution is deployed, the Step Functions state machine will evaluate once per hour if it should ingest data into QuickSight. You can run some AWS Glue jobs after the stack is deployed and check the QuickSight dashboard in the next hour or two, where the job metadata and metrics will be populated for your analysis.

Explore the dashboard

The dashboard contains two sheets: Glue Jobs and Glue Metrics.

The Glue Jobs sheet includes all of the metadata about your AWS Glue job runs, including AWS Glue for Apache Spark, AWS Glue for Ray, and AWS Glue streaming ETL. Most of the visuals also have a hierarchy that you can drill down into with QuickSight, going as low as each specific job run ID. You can use controls to filter by date, job name, and job run ID.

In the following demonstration, you will see the pivot table, which is a simple view of all our job metadata, including estimated cost per job and job run. We open up a job name and see the different job runs. There is one individual job run that we would like to inspect the metrics on, so we choose the job name and choose View metrics for job run id: <my job run id>. This will take us to the Glue Metrics sheet and automatically filter for the job run ID we want to view.

glue information sheet

The Glue Metrics sheet is built to reflect the documentation we provide in AWS Glue resource monitoring. This documentation helps explain each visual in the dashboard. You can use the Glue Metrics sheet to view aggregated metrics across all jobs, a single job, or down to the job run ID.

To populate the Glue Metrics sheet, your AWS Glue jobs must be enabled to capture metrics in CloudWatch.

glue metrics sheet

Set up alerts

Setting up alerts on measures is also straightforward to do in QuickSight. To do so, choose (right-click) one of the tracked measures on either worksheet and choose Create Alarm. This will bring you to the configuration page to set up the metric you’d like to be alerted on.

quicksight alarm

The dashboard is designed to give you the freedom to alter it and make your own visualizations with the metadata and metrics that are provided to you. If you want even more insight into cost, consider deploying the CUDOS dashboard as well!

Clean up

If you no longer need the dashboard, delete the CDK app:

cdk destroy

Conclusion

In this post, we talked about the importance of having observability of your AWS Glue jobs and provided an AWS CDK app that deploys a QuickSight dashboard for you. We hope this helps you optimize your AWS Glue environment using the insights the dashboard provides. To learn about event-based alerting for your AWS Glue for Apache Spark and Ray jobs, refer to Automate alerting and reporting for AWS Glue job resource usage.


About the authors

Michael Hamilton is a Sr Analytics Solutions Architect focusing on helping enterprise customers in the south east modernize and simplify their analytics workloads on AWS. He enjoys mountain biking and spending time with his wife and three children when not working.

Cody Penta is a Solutions Architect at Amazon Web Services and is based out of Charlotte, NC. He has a focus in security and CDK, and enjoys solving the really difficult problems in the technology world. Off the clock, he loves relaxing in the mountains, coding personal projects, and gaming.

Angus Ferguson is a Solutions Architect at AWS who is passionate about meeting customers across the world, helping them solve their technical challenges. Angus specializes in Data & Analytics with a focus on customers in the financial services industry.

Aggregating, searching, and visualizing log data from distributed sources with Amazon Athena and Amazon QuickSight

Post Syndicated from Pratima Singh original https://aws.amazon.com/blogs/security/aggregating-searching-and-visualizing-log-data-from-distributed-sources-with-amazon-athena-and-amazon-quicksight/

Customers using Amazon Web Services (AWS) can use a range of native and third-party tools to build workloads based on their specific use cases. Logs and metrics are foundational components in building effective insights into the health of your IT environment. In a distributed and agile AWS environment, customers need a centralized and holistic solution to visualize the health and security posture of their infrastructure.

You can effectively categorize the members of the teams involved using the following roles:

  1. Executive stakeholder: Owns and operates with their support staff and has total financial and risk accountability.
  2. Data custodian: Aggregates related data sources while managing cost, access, and compliance.
  3. Operator or analyst: Uses security tooling to monitor, assess, and respond to related events such as service disruptions.

In this blog post, we focus on the data custodian role. We show you how you can visualize metrics and logs centrally with Amazon QuickSight irrespective of the service or tool generating them. We use Amazon Simple Storage Service (Amazon S3) for storage, AWS Glue for cataloguing, and Amazon Athena for querying the data and creating structured query language (SQL) views for QuickSight to consume.

Target architecture

This post guides you towards building a target architecture in line with the AWS Well-Architected Framework. The tiered and multi-account target architecture, shown in Figure 1, uses account-level isolation to separate responsibilities across the various roles identified above and makes access management more defined and specific to those roles. The workload accounts generate the telemetry around the applications and infrastructure. The data custodian account is where the data lake is deployed and collects the telemetry. The operator account is where the queries and visualizations are created.

Throughout the post, I mention AWS services that reduce the operational overhead in one or more stages of the architecture.

Figure 1: Data visualization architecture

Figure 1: Data visualization architecture

Ingestion

Irrespective of the technology choices, applications and infrastructure configurations should generate metrics and logs that report on resource health and security. The format of the logs depends on which tool and which part of the stack is generating the logs. For example, the format of log data generated by application code can capture bespoke and additional metadata deemed useful from a workload perspective as compared to access logs generated by proxies or load balancers. For more information on types of logs and effective logging strategies, see Logging strategies for security incident response.

Amazon S3 is a scalable, highly available, durable, and secure object storage that you will use as the storage layer. To build a solution that captures events agnostic of the source, you must forward data as a stream to the S3 bucket. Based on the architecture, there are multiple tools you can use to capture and stream data into S3 buckets. Some tools support integration with S3 and directly stream data to S3. Resources like servers and virtual machines need forwarding agents such as Amazon Kinesis Agent, Amazon CloudWatch agent, or Fluent Bit.

Amazon Kinesis Data Streams provides a scalable data streaming environment. Using on-demand capacity mode eliminates the need for capacity provisioning and capacity management for streaming workloads. For log data and metric collection, you should use on-demand capacity mode, because log data generation can be unpredictable depending on the requests that are being handled by the environment. Amazon Kinesis Data Firehose can convert the format of your input data from JSON to Apache Parquet before storing the data in Amazon S3. Parquet is naturally compressed, and using Parquet native partitioning and compression allows for faster queries compared to JSON formatted objects.

Scalable data lake

Use AWS Lake Formation to build, secure, and manage the data lake to store log and metric data in S3 buckets. We recommend using tag-based access control and named resources to share the data in your data store to share data across accounts to build visualizations. Data custodians should configure access for relevant datasets to the operators who can use Athena to perform complex queries and build compelling data visualizations with QuickSight, as shown in Figure 2. For cross-account permissions, see Use Amazon Athena and Amazon QuickSight in a cross-account environment. You can also use Amazon DataZone to build additional governance and share data at scale within your organization. Note that the data lake is different to and separate from the Log Archive bucket and account described in Organizing Your AWS Environment Using Multiple Accounts.

Figure 2: Account structure

Figure 2: Account structure

Amazon Security Lake

Amazon Security Lake is a fully managed security data lake service. You can use Security Lake to automatically centralize security data from AWS environments, SaaS providers, on-premises, and third-party sources into a purpose-built data lake that’s stored in your AWS account. Using Security Lake reduces the operational effort involved in building a scalable data lake, as the service automates the configuration and orchestration for the data lake with Lake Formation. Security Lake automatically transforms logs into a standard schema—the Open Cybersecurity Schema Framework (OCSF) — and parses them into a standard directory structure, which allows for faster queries. For more information, see How to visualize Amazon Security Lake findings with Amazon QuickSight.

Querying and visualization

Figure 3: Data sharing overview

Figure 3: Data sharing overview

After you’ve configured cross-account permissions, you can use Athena as the data source to create a dataset in QuickSight, as shown in Figure 3. You start by signing up for a QuickSight subscription. There are multiple ways to sign in to QuickSight; this post uses AWS Identity and Access Management (IAM) for access. To use QuickSight with Athena and Lake Formation, you first must authorize connections through Lake Formation. After permissions are in place, you can add datasets. You should verify that you’re using QuickSight in the same AWS Region as the Region where Lake Formation is sharing the data. You can do this by checking the Region in the QuickSight URL.

You can start with basic queries and visualizations as described in Query logs in S3 with Athena and Create a QuickSight visualization. Depending on the nature and origin of the logs and metrics that you want to query, you can use the examples published in Running SQL queries using Amazon Athena. To build custom analytics, you can create views with Athena. Views in Athena are logical tables that you can use to query a subset of data. Views help you to hide complexity and minimize maintenance when querying large tables. Use views as a source for new datasets to build specific health analytics and dashboards.

You can also use Amazon QuickSight Q to get started on your analytics journey. Powered by machine learning, Q uses natural language processing to provide insights into the datasets. After the dataset is configured, you can use Q to give you suggestions for questions to ask about the data. Q understands business language and generates results based on relevant phrases detected in the questions. For more information, see Working with Amazon QuickSight Q topics.

Conclusion

Logs and metrics offer insights into the health of your applications and infrastructure. It’s essential to build visibility into the health of your IT environment so that you can understand what good health looks like and identify outliers in your data. These outliers can be used to identify thresholds and feed into your incident response workflow to help identify security issues. This post helps you build out a scalable centralized visualization environment irrespective of the source of log and metric data.

This post is part 1 of a series that helps you dive deeper into the security analytics use case. In part 2, How to visualize Amazon Security Lake findings with Amazon QuickSight, you will learn how you can use Security Lake to reduce the operational overhead involved in building a scalable data lake and centralizing log data from SaaS providers, on-premises, AWS, and third-party sources into a purpose-built data lake. You will also learn how you can integrate Athena with Security Lake and create visualizations with QuickSight of the data and events captured by Security Lake.

Part 3, How to share security telemetry per Organizational Unit using Amazon Security Lake and AWS Lake Formation, dives deeper into how you can query security posture using AWS Security Hub findings integrated with Security Lake. You will also use the capabilities of Athena and QuickSight to visualize security posture in a distributed environment.

 
If you have feedback about this post, submit comments in the Comments section below. If you have questions about this post, contact AWS Support.

Want more AWS Security news? Follow us on Twitter.

Pratima Singh

Pratima Singh

Pratima is a Security Specialist Solutions Architect with Amazon Web Services based out of Sydney, Australia. She is a security enthusiast who enjoys helping customers find innovative solutions to complex business challenges. Outside of work, Pratima enjoys going on long drives and spending time with her family at the beach.

Create, train, and deploy Amazon Redshift ML model integrating features from Amazon SageMaker Feature Store

Post Syndicated from Anirban Sinha original https://aws.amazon.com/blogs/big-data/create-train-and-deploy-amazon-redshift-ml-model-integrating-features-from-amazon-sagemaker-feature-store/

Amazon Redshift is a fast, petabyte-scale, cloud data warehouse that tens of thousands of customers rely on to power their analytics workloads. Data analysts and database developers want to use this data to train machine learning (ML) models, which can then be used to generate insights on new data for use cases such as forecasting revenue, predicting customer churn, and detecting anomalies. Amazon Redshift ML makes it easy for SQL users to create, train, and deploy ML models using SQL commands familiar to many roles such as executives, business analysts, and data analysts. We covered in a previous post how you can use data in Amazon Redshift to train models in Amazon SageMaker, a fully managed ML service, and then make predictions within your Redshift data warehouse.

Redshift ML currently supports ML algorithms such as XGBoost, multilayer perceptron (MLP), KMEANS, and Linear Learner. Additionally, you can import existing SageMaker models into Amazon Redshift for in-database inference or remotely invoke a SageMaker endpoint.

Amazon SageMaker Feature Store is a fully managed, purpose-built repository to store, share, and manage features for ML models. However, one challenge in training a production-ready ML model using SageMaker Feature Store is access to a diverse set of features that aren’t always owned and maintained by the team that is building the model. For example, an ML model to identify fraudulent financial transactions needs access to both identifying (device type, browser) and transaction (amount, credit or debit, and so on) related features. As a data scientist building an ML model, you may have access to the identifying information but not the transaction information, and having access to a feature store solves this.

In this post, we discuss the combined feature store pattern, which allows teams to maintain their own local feature stores using a local Redshift table while still being able to access shared features from the centralized feature store. In a local feature store, you can store sensitive data that can’t be shared across the organization for regulatory and compliance reasons.

We also show you how to use familiar SQL statements to create and train ML models by combining shared features from the centralized store with local features and use these models to make in-database predictions on new data for use cases such as fraud risk scoring.

Overview of solution

For this post, we create an ML model to predict if a transaction is fraudulent or not, given the transaction record. To build this, we need to engineer features that describe an individual credit card’s spending pattern, such as the number of transactions or the average transaction amount, and also information about the merchant, the cardholder, the device used to make the payment, and any other data that may be relevant to detecting fraud.

To get started, we need an Amazon Redshift Serverless data warehouse with the Redshift ML feature enabled and an Amazon SageMaker Studio environment with access to SageMaker Feature Store. For an introduction to Redshift ML and instructions on setting it up, see Create, train, and deploy machine learning models in Amazon Redshift using SQL with Amazon Redshift ML.

We also need an offline feature store to store features in feature groups. The offline store uses an Amazon Simple Storage Service (Amazon S3) bucket for storage and can also fetch data using Amazon Athena queries. For an introduction to SageMaker Feature Store and instructions on setting it up, see Getting started with Amazon SageMaker Feature Store.

The following diagram illustrates solution architecture.

The workflow contains the following steps:

  1. Create the offline feature group in SageMaker Feature Store and ingest data into the feature group.
  2. Create a Redshift table and load local feature data into the table.
  3. Create an external schema for Amazon Redshift Spectrum to access the offline store data stored in Amazon S3 using the AWS Glue Data Catalog.
  4. Train and validate a fraud risk scoring ML model using local feature data and external offline feature store data.
  5. Use the offline feature store and local store for inference.

Dataset

To demonstrate this use case, we use a synthetic dataset with two tables: identity and transactions. They can both be joined by the TransactionID column. The transaction table contains information about a particular transaction, such as amount, credit or debit card, and so on, and the identity table contains information about the user, such as device type and browser. The transaction must exist in the transaction table, but might not always be available in the identity table.

The following is an example of the transactions dataset.

The following is an example of the identity dataset.

Let’s assume that across the organization, data science teams centrally manage the identity data and process it to extract features in a centralized offline feature store. The data warehouse team ingests and analyzes transaction data in a Redshift table, owned by them.

We work through this use case to understand how the data warehouse team can securely retrieve the latest features from the identity feature group and join it with transaction data in Amazon Redshift to create a feature set for training and inferencing a fraud detection model.

Create the offline feature group and ingest data

To start, we set up SageMaker Feature Store, create a feature group for the identity dataset, inspect and process the dataset, and ingest some sample data. We then prepare the transaction features from the transaction data and store it in Amazon S3 for further loading into the Redshift table.

Alternatively, you can author features using Amazon SageMaker Data Wrangler, create feature groups in SageMaker Feature Store, and ingest features in batches using an Amazon SageMaker Processing job with a notebook exported from SageMaker Data Wrangler. This mode allows for batch ingestion into the offline store.

Let’s explore some of the key steps in this section.

  1. Download the sample notebook.
  2. On the SageMaker console, under Notebook in the navigation pane, choose Notebook instances.
  3. Locate your notebook instance and choose Open Jupyter.
  4. Choose Upload and upload the notebook you just downloaded.
  5. Open the notebook sagemaker_featurestore_fraud_redshiftml_python_sdk.ipynb.
  6. Follow the instructions and run all the cells up to the Cleanup Resources section.

The following are key steps from the notebook:

  1. We create a Pandas DataFrame with the initial CSV data. We apply feature transformations for this dataset.
    identity_data = pd.read_csv(io.BytesIO(identity_data_object["Body"].read()))
    transaction_data = pd.read_csv(io.BytesIO(transaction_data_object["Body"].read()))
    
    identity_data = identity_data.round(5)
    transaction_data = transaction_data.round(5)
    
    identity_data = identity_data.fillna(0)
    transaction_data = transaction_data.fillna(0)
    
    # Feature transformations for this dataset are applied 
    # One hot encode card4, card6
    encoded_card_bank = pd.get_dummies(transaction_data["card4"], prefix="card_bank")
    encoded_card_type = pd.get_dummies(transaction_data["card6"], prefix="card_type")
    
    transformed_transaction_data = pd.concat(
        [transaction_data, encoded_card_type, encoded_card_bank], axis=1
    )

  2. We store the processed and transformed transaction dataset in an S3 bucket. This transaction data will be loaded later in the Redshift table for building the local feature store.
    transformed_transaction_data.to_csv("transformed_transaction_data.csv", header=False, index=False)
    s3_client.upload_file("transformed_transaction_data.csv", default_s3_bucket_name, prefix + "/training_input/transformed_transaction_data.csv")

  3. Next, we need a record identifier name and an event time feature name. In our fraud detection example, the column of interest is TransactionID.EventTime can be appended to your data when no timestamp is available. In the following code, you can see how these variables are set, and then EventTime is appended to both features’ data.
    # record identifier and event time feature names
    record_identifier_feature_name = "TransactionID"
    event_time_feature_name = "EventTime"
    
    # append EventTime feature
    identity_data[event_time_feature_name] = pd.Series(
        [current_time_sec] * len(identity_data), dtype="float64"
    )

  4. We then create and ingest the data into the feature group using the SageMaker SDK FeatureGroup.ingest API. This is a small dataset and therefore can be loaded into a Pandas DataFrame. When we work with large amounts of data and millions of rows, there are other scalable mechanisms to ingest data into SageMaker Feature Store, such as batch ingestion with Apache Spark.
    identity_feature_group.create(
        s3_uri=<S3_Path_Feature_Store>,
        record_identifier_name=record_identifier_feature_name,
        event_time_feature_name=event_time_feature_name,
        role_arn=<role_arn>,
        enable_online_store=False,
    )
    
    identity_feature_group_name = "identity-feature-group"
    
    # load feature definitions to the feature group. SageMaker FeatureStore Python SDK will auto-detect the data schema based on input data.
    identity_feature_group.load_feature_definitions(data_frame=identity_data)
    identity_feature_group.ingest(data_frame=identity_data, max_workers=3, wait=True)
    

  5. We can verify that data has been ingested into the feature group by running Athena queries in the notebook or running queries on the Athena console.

At this point, the identity feature group is created in an offline feature store with historical data persisted in Amazon S3. SageMaker Feature Store automatically creates an AWS Glue Data Catalog for the offline store, which enables us to run SQL queries against the offline data using Athena or Redshift Spectrum.

Create a Redshift table and load local feature data

To build a Redshift ML model, we build a training dataset joining the identity data and transaction data using SQL queries. The identity data is in a centralized feature store where the historical set of records are persisted in Amazon S3. The transaction data is a local feature for training data that needs to made available in the Redshift table.

Let’s explore how to create the schema and load the processed transaction data from Amazon S3 into a Redshift table.

  1. Create the customer_transaction table and load daily transaction data into the table, which you’ll use to train the ML model:
    DROP TABLE customer_transaction;
    CREATE TABLE customer_transaction (
      TransactionID INT,    
      isFraud INT,  
      TransactionDT INT,    
      TransactionAmt decimal(10,2), 
      card1 INT,    
      card2 decimal(10,2),card3 decimal(10,2),  
      card4 VARCHAR(20),card5 decimal(10,2),    
      card6 VARCHAR(20),    
      B1 INT,B2 INT,B3 INT,B4 INT,B5 INT,B6 INT,
      B7 INT,B8 INT,B9 INT,B10 INT,B11 INT,B12 INT,
      F1 INT,F2 INT,F3 INT,F4 INT,F5 INT,F6 INT,
      F7 INT,F8 INT,F9 INT,F10 INT,F11 INT,F12 INT,
      F13 INT,F14 INT,F15 INT,F16 INT,F17 INT,  
      N1 VARCHAR(20),N2 VARCHAR(20),N3 VARCHAR(20), 
      N4 VARCHAR(20),N5 VARCHAR(20),N6 VARCHAR(20), 
      N7 VARCHAR(20),N8 VARCHAR(20),N9 VARCHAR(20), 
      card_type_0  boolean,
      card_type_credit boolean,
      card_type_debit  boolean,
      card_bank_0  boolean,
      card_bank_american_express boolean,
      card_bank_discover  boolean,
      card_bank_mastercard  boolean,
      card_bank_visa boolean  
    );

  2. Load the sample data by using the following command. Replace your Region and S3 path as appropriate. You will find the S3 path in the S3 Bucket Setup For The OfflineStore section in the notebook or by checking the dataset_uri_prefix in the notebook.
    COPY customer_transaction
    FROM '<s3path>/transformed_transaction_data.csv' 
    IAM_ROLE default delimiter ',' 
    region 'your-region';

Now that we have created a local feature store for the transaction data, we focus on integrating a centralized feature store with Amazon Redshift to access the identity data.

Create an external schema for Redshift Spectrum to access the offline store data

We have created a centralized feature store for identity features, and we can access this offline feature store using services such as Redshift Spectrum. When the identity data is available through the Redshift Spectrum table, we can create a training dataset with feature values from the identity feature group and customer_transaction, joining on the TransactionId column.

This section provides an overview of how to enable Redshift Spectrum to query data directly from files on Amazon S3 through an external database in an AWS Glue Data Catalog.

  1. First, check that the identity-feature-group table is present in the Data Catalog under the sagemamker_featurestore database.
  2. Using Redshift Query Editor V2, create an external schema using the following command:
    CREATE EXTERNAL SCHEMA sagemaker_featurestore
    FROM DATA CATALOG
    DATABASE 'sagemaker_featurestore'
    IAM_ROLE default
    create external database if not exists;

All the tables, including identity-feature-group external tables, are visible under the sagemaker_featurestore external schema. In Redshift Query Editor v2, you can check the contents of the external schema.

  1. Run the following query to sample a few records—note that your table name may be different:
    Select * from sagemaker_featurestore.identity_feature_group_1680208535 limit 10;

  2. Create a view to join the latest data from identity-feature-group and customer_transaction on the TransactionId column. Be sure to change the external table name to match your external table name:
    create or replace view public.credit_fraud_detection_v
    AS select  "isfraud",
            "transactiondt",
            "transactionamt",
            "card1","card2","card3","card5",
             case when "card_type_credit" = 'False' then 0 else 1 end as card_type_credit,
             case when "card_type_debit" = 'False' then 0 else 1 end as card_type_debit,
             case when "card_bank_american_express" = 'False' then 0 else 1 end as card_bank_american_express,
             case when "card_bank_discover" = 'False' then 0 else 1 end as card_bank_discover,
             case when "card_bank_mastercard" = 'False' then 0 else 1 end as card_bank_mastercard,
             case when "card_bank_visa" = 'False' then 0 else 1 end as card_bank_visa,
            "id_01","id_02","id_03","id_04","id_05"
    from public.customer_transaction ct left join sagemaker_featurestore.identity_feature_group_1680208535 id
    on id.transactionid = ct.transactionid with no schema binding;

Train and validate the fraud risk scoring ML model

Redshift ML gives you the flexibility to specify your own algorithms and model types and also to provide your own advanced parameters, which can include preprocessors, problem type, and hyperparameters. In this post, we create a customer model by specifying AUTO OFF and the model type of XGBOOST. By turning AUTO OFF and using XGBoost, we are providing the necessary inputs for SageMaker to train the model. A benefit of this can be faster training times. XGBoost is as open-source version of the gradient boosted trees algorithm. For more details on XGBoost, refer to Build XGBoost models with Amazon Redshift ML.

We train the model using 80% of the dataset by filtering on transactiondt < 12517618. The other 20% will be used for inference. A centralized feature store is useful in providing the latest supplementing data for training requests. Note that you will need to provide an S3 bucket name in the create model statement. It will take approximately 10 minutes to create the model.

CREATE MODEL frauddetection_xgboost
FROM (select  "isfraud",
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05"
from credit_fraud_detection_v where transactiondt < 12517618
)
TARGET isfraud
FUNCTION ml_fn_frauddetection_xgboost
IAM_ROLE default
AUTO OFF
MODEL_TYPE XGBOOST
OBJECTIVE 'binary:logistic'
PREPROCESSORS 'none'
HYPERPARAMETERS DEFAULT EXCEPT(NUM_ROUND '100')
SETTINGS (S3_BUCKET <s3_bucket>);

When you run the create model command, it will complete quickly in Amazon Redshift while the model training is happening in the background using SageMaker. You can check the status of the model by running a show model command:

show model frauddetection_xgboost;

The output of the show model command shows that the model state is TRAINING. It also shows other information such as the model type and the training job name that SageMaker assigned.
After a few minutes, we run the show model command again:

show model frauddetection_xgboost;

Now the output shows the model state is READY. We can also see the train:error score here, which at 0 tells us we have a good model. Now that the model is trained, we can use it for running inference queries.

Use the offline feature store and local store for inference

We can use the SQL function to apply the ML model to data in queries, reports, and dashboards. Let’s use the function ml_fn_frauddetection_xgboost created by our model against our test dataset by filtering where transactiondt >=12517618, to predict whether a transaction is fraudulent or not. SageMaker Feature Store can be useful in supplementing data for inference requests.

Run the following query to predict whether transactions are fraudulent or not:

select  "isfraud" as "Actual",
        ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") as "Predicted"
from credit_fraud_detection_v where transactiondt >= 12517618;

For binary and multi-class classification problems, we compute the accuracy as the model metric. Accuracy can be calculated based on the following:

accuracy = (sum (actual == predicted)/total) *100

Let’s apply the preceding code to our use case to find the accuracy of the model. We use the test data (transactiondt >= 12517618) to test the accuracy, and use the newly created function ml_fn_frauddetection_xgboost to predict and take the columns other than the target and label as the input:

-- check accuracy 
WITH infer_data AS (
SELECT "isfraud" AS label,
ml_fn_frauddetection_xgboost(
        "transactiondt",
        "transactionamt",
        "card1","card2","card3","card5",
        "card_type_credit",
        "card_type_debit",
        "card_bank_american_express",
        "card_bank_discover",
        "card_bank_mastercard",
        "card_bank_visa",
        "id_01","id_02","id_03","id_04","id_05") AS predicted,
CASE 
   WHEN label IS NULL
       THEN 0
   ELSE label
   END AS actual,
CASE 
   WHEN actual = predicted
       THEN 1::INT
   ELSE 0::INT
   END AS correct
FROM credit_fraud_detection_v where transactiondt >= 12517618),
aggr_data AS (
SELECT SUM(correct) AS num_correct,
COUNT(*) AS total
FROM infer_data) 

SELECT (num_correct::FLOAT / total::FLOAT) AS accuracy FROM aggr_data;

Clean up

As a final step, clean up the resources:

  1. Delete the Redshift cluster.
  2. Run the Cleanup Resources section of your notebook.

Conclusion

Redshift ML enables you to bring machine learning to your data, powering fast and informed decision-making. SageMaker Feature Store provides a purpose-built feature management solution to help organizations scale ML development across business units and data science teams.

In this post, we showed how you can train an XGBoost model using Redshift ML with data spread across SageMaker Feature Store and a Redshift table. Additionally, we showed how you can make inferences on a trained model to detect fraud using Amazon Redshift SQL commands.


About the authors

Anirban Sinha is a Senior Technical Account Manager at AWS. He is passionate about building scalable data warehouses and big data solutions working closely with customers. He works with large ISVs customers, in helping them build and operate secure, resilient, scalable, and high-performance SaaS applications in the cloud.

Phil Bates is a Senior Analytics Specialist Solutions Architect at AWS. He has more than 25 years of experience implementing large-scale data warehouse solutions. He is passionate about helping customers through their cloud journey and using the power of ML within their data warehouse.

Gaurav Singh is a Senior Solutions Architect at AWS, specializing in AI/ML and Generative AI. Based in Pune, India, he focuses on helping customers build, deploy, and migrate ML production workloads to SageMaker at scale. In his spare time, Gaurav loves to explore nature, read, and run.

Unstructured data management and governance using AWS AI/ML and analytics services

Post Syndicated from Sakti Mishra original https://aws.amazon.com/blogs/big-data/unstructured-data-management-and-governance-using-aws-ai-ml-and-analytics-services/

Unstructured data is information that doesn’t conform to a predefined schema or isn’t organized according to a preset data model. Unstructured information may have a little or a lot of structure but in ways that are unexpected or inconsistent. Text, images, audio, and videos are common examples of unstructured data. Most companies produce and consume unstructured data such as documents, emails, web pages, engagement center phone calls, and social media. By some estimates, unstructured data can make up to 80–90% of all new enterprise data and is growing many times faster than structured data. After decades of digitizing everything in your enterprise, you may have an enormous amount of data, but with dormant value. However, with the help of AI and machine learning (ML), new software tools are now available to unearth the value of unstructured data.

In this post, we discuss how AWS can help you successfully address the challenges of extracting insights from unstructured data. We discuss various design patterns and architectures for extracting and cataloging valuable insights from unstructured data using AWS. Additionally, we show how to use AWS AI/ML services for analyzing unstructured data.

Why it’s challenging to process and manage unstructured data

Unstructured data makes up a large proportion of the data in the enterprise that can’t be stored in a traditional relational database management systems (RDBMS). Understanding the data, categorizing it, storing it, and extracting insights from it can be challenging. In addition, identifying incremental changes requires specialized patterns and detecting sensitive data and meeting compliance requirements calls for sophisticated functions. It can be difficult to integrate unstructured data with structured data from existing information systems. Some view structured and unstructured data as apples and oranges, instead of being complementary. But most important of all, the assumed dormant value in the unstructured data is a question mark, which can only be answered after these sophisticated techniques have been applied. Therefore, there is a need to being able to analyze and extract value from the data economically and flexibly.

Solution overview

Data and metadata discovery is one of the primary requirements in data analytics, where data consumers explore what data is available and in what format, and then consume or query it for analysis. If you can apply a schema on top of the dataset, then it’s straightforward to query because you can load the data into a database or impose a virtual table schema for querying. But in the case of unstructured data, metadata discovery is challenging because the raw data isn’t easily readable.

You can integrate different technologies or tools to build a solution. In this post, we explain how to integrate different AWS services to provide an end-to-end solution that includes data extraction, management, and governance.

The solution integrates data in three tiers. The first is the raw input data that gets ingested by source systems, the second is the output data that gets extracted from input data using AI, and the third is the metadata layer that maintains a relationship between them for data discovery.

The following is a high-level architecture of the solution we can build to process the unstructured data, assuming the input data is being ingested to the raw input object store.

Unstructured Data Management - Block Level Architecture Diagram

The steps of the workflow are as follows:

  1. Integrated AI services extract data from the unstructured data.
  2. These services write the output to a data lake.
  3. A metadata layer helps build the relationship between the raw data and AI extracted output. When the data and metadata are available for end-users, we can break the user access pattern into additional steps.
  4. In the metadata catalog discovery step, we can use query engines to access the metadata for discovery and apply filters as per our analytics needs. Then we move to the next stage of accessing the actual data extracted from the raw unstructured data.
  5. The end-user accesses the output of the AI services and uses the query engines to query the structured data available in the data lake. We can optionally integrate additional tools that help control access and provide governance.
  6. There might be scenarios where, after accessing the AI extracted output, the end-user wants to access the original raw object (such as media files) for further analysis. Additionally, we need to make sure we have access control policies so the end-user has access only to the respective raw data they want to access.

Now that we understand the high-level architecture, let’s discuss what AWS services we can integrate in each step of the architecture to provide an end-to-end solution.

The following diagram is the enhanced version of our solution architecture, where we have integrated AWS services.

Unstructured Data Management - AWS Native Architecture

Let’s understand how these AWS services are integrated in detail. We have divided the steps into two broad user flows: data processing and metadata enrichment (Steps 1–3) and end-users accessing the data and metadata with fine-grained access control (Steps 4–6).

  1. Various AI services (which we discuss in the next section) extract data from the unstructured datasets.
  2. The output is written to an Amazon Simple Storage Service (Amazon S3) bucket (labeled Extracted JSON in the preceding diagram). Optionally, we can restructure the input raw objects for better partitioning, which can help while implementing fine-grained access control on the raw input data (labeled as the Partitioned bucket in the diagram).
  3. After the initial data extraction phase, we can apply additional transformations to enrich the datasets using AWS Glue. We also build an additional metadata layer, which maintains a relationship between the raw S3 object path, the AI extracted output path, the optional enriched version S3 path, and any other metadata that will help the end-user discover the data.
  4. In the metadata catalog discovery step, we use the AWS Glue Data Catalog as the technical catalog, Amazon Athena and Amazon Redshift Spectrum as query engines, AWS Lake Formation for fine-grained access control, and Amazon DataZone for additional governance.
  5. The AI extracted output is expected to be available as a delimited file or in JSON format. We can create an AWS Glue Data Catalog table for querying using Athena or Redshift Spectrum. Like the previous step, we can use Lake Formation policies for fine-grained access control.
  6. Lastly, the end-user accesses the raw unstructured data available in Amazon S3 for further analysis. We have proposed integrating Amazon S3 Access Points for access control at this layer. We explain this in detail later in this post.

Now let’s expand the following parts of the architecture to understand the implementation better:

  • Using AWS AI services to process unstructured data
  • Using S3 Access Points to integrate access control on raw S3 unstructured data

Process unstructured data with AWS AI services

As we discussed earlier, unstructured data can come in a variety of formats, such as text, audio, video, and images, and each type of data requires a different approach for extracting metadata. AWS AI services are designed to extract metadata from different types of unstructured data. The following are the most commonly used services for unstructured data processing:

  • Amazon Comprehend – This natural language processing (NLP) service uses ML to extract metadata from text data. It can analyze text in multiple languages, detect entities, extract key phrases, determine sentiment, and more. With Amazon Comprehend, you can easily gain insights from large volumes of text data such as extracting product entity, customer name, and sentiment from social media posts.
  • Amazon Transcribe – This speech-to-text service uses ML to convert speech to text and extract metadata from audio data. It can recognize multiple speakers, transcribe conversations, identify keywords, and more. With Amazon Transcribe, you can convert unstructured data such as customer support recordings into text and further derive insights from it.
  • Amazon Rekognition – This image and video analysis service uses ML to extract metadata from visual data. It can recognize objects, people, faces, and text, detect inappropriate content, and more. With Amazon Rekognition, you can easily analyze images and videos to gain insights such as identifying entity type (human or other) and identifying if the person is a known celebrity in an image.
  • Amazon Textract – You can use this ML service to extract metadata from scanned documents and images. It can extract text, tables, and forms from images, PDFs, and scanned documents. With Amazon Textract, you can digitize documents and extract data such as customer name, product name, product price, and date from an invoice.
  • Amazon SageMaker – This service enables you to build and deploy custom ML models for a wide range of use cases, including extracting metadata from unstructured data. With SageMaker, you can build custom models that are tailored to your specific needs, which can be particularly useful for extracting metadata from unstructured data that requires a high degree of accuracy or domain-specific knowledge.
  • Amazon Bedrock – This fully managed service offers a choice of high-performing foundation models (FMs) from leading AI companies like AI21 Labs, Anthropic, Cohere, Meta, Stability AI, and Amazon with a single API. It also offers a broad set of capabilities to build generative AI applications, simplifying development while maintaining privacy and security.

With these specialized AI services, you can efficiently extract metadata from unstructured data and use it for further analysis and insights. It’s important to note that each service has its own strengths and limitations, and choosing the right service for your specific use case is critical for achieving accurate and reliable results.

AWS AI services are available via various APIs, which enables you to integrate AI capabilities into your applications and workflows. AWS Step Functions is a serverless workflow service that allows you to coordinate and orchestrate multiple AWS services, including AI services, into a single workflow. This can be particularly useful when you need to process large amounts of unstructured data and perform multiple AI-related tasks, such as text analysis, image recognition, and NLP.

With Step Functions and AWS Lambda functions, you can create sophisticated workflows that include AI services and other AWS services. For instance, you can use Amazon S3 to store input data, invoke a Lambda function to trigger an Amazon Transcribe job to transcribe an audio file, and use the output to trigger an Amazon Comprehend analysis job to generate sentiment metadata for the transcribed text. This enables you to create complex, multi-step workflows that are straightforward to manage, scalable, and cost-effective.

The following is an example architecture that shows how Step Functions can help invoke AWS AI services using Lambda functions.

AWS AI Services - Lambda Event Workflow -Unstructured Data

The workflow steps are as follows:

  1. Unstructured data, such as text files, audio files, and video files, are ingested into the S3 raw bucket.
  2. A Lambda function is triggered to read the data from the S3 bucket and call Step Functions to orchestrate the workflow required to extract the metadata.
  3. The Step Functions workflow checks the type of file, calls the corresponding AWS AI service APIs, checks the job status, and performs any postprocessing required on the output.
  4. AWS AI services can be accessed via APIs and invoked as batch jobs. To extract metadata from different types of unstructured data, you can use multiple AI services in sequence, with each service processing the corresponding file type.
  5. After the Step Functions workflow completes the metadata extraction process and performs any required postprocessing, the resulting output is stored in an S3 bucket for cataloging.

Next, let’s understand how can we implement security or access control on both the extracted output as well as the raw input objects.

Implement access control on raw and processed data in Amazon S3

We just consider access controls for three types of data when managing unstructured data: the AI-extracted semi-structured output, the metadata, and the raw unstructured original files. When it comes to AI extracted output, it’s in JSON format and can be restricted via Lake Formation and Amazon DataZone. We recommend keeping the metadata (information that captures which unstructured datasets are already processed by the pipeline and available for analysis) open to your organization, which will enable metadata discovery across the organization.

To control access of raw unstructured data, you can integrate S3 Access Points and explore additional support in the future as AWS services evolve. S3 Access Points simplify data access for any AWS service or customer application that stores data in Amazon S3. Access points are named network endpoints that are attached to buckets that you can use to perform S3 object operations. Each access point has distinct permissions and network controls that Amazon S3 applies for any request that is made through that access point. Each access point enforces a customized access point policy that works in conjunction with the bucket policy that is attached to the underlying bucket. With S3 Access Points, you can create unique access control policies for each access point to easily control access to specific datasets within an S3 bucket. This works well in multi-tenant or shared bucket scenarios where users or teams are assigned to unique prefixes within one S3 bucket.

An access point can support a single user or application, or groups of users or applications within and across accounts, allowing separate management of each access point. Every access point is associated with a single bucket and contains a network origin control and a Block Public Access control. For example, you can create an access point with a network origin control that only permits storage access from your virtual private cloud (VPC), a logically isolated section of the AWS Cloud. You can also create an access point with the access point policy configured to only allow access to objects with a defined prefix or to objects with specific tags. You can also configure custom Block Public Access settings for each access point.

The following architecture provides an overview of how an end-user can get access to specific S3 objects by assuming a specific AWS Identity and Access Management (IAM) role. If you have a large number of S3 objects to control access, consider grouping the S3 objects, assigning them tags, and then defining access control by tags.

S3 Access Points - Unstructured Data Management - Access Control

If you are implementing a solution that integrates S3 data available in multiple AWS accounts, you can take advantage of cross-account support for S3 Access Points.

Conclusion

This post explained how you can use AWS AI services to extract readable data from unstructured datasets, build a metadata layer on top of them to allow data discovery, and build an access control mechanism on top of the raw S3 objects and extracted data using Lake Formation, Amazon DataZone, and S3 Access Points.

In addition to AWS AI services, you can also integrate large language models with vector databases to enable semantic or similarity search on top of unstructured datasets. To learn more about how to enable semantic search on unstructured data by integrating Amazon OpenSearch Service as a vector database, refer to Try semantic search with the Amazon OpenSearch Service vector engine.

As of writing this post, S3 Access Points is one of the best solutions to implement access control on raw S3 objects using tagging, but as AWS service features evolve in the future, you can explore alternative options as well.


About the Authors

Sakti Mishra is a Principal Solutions Architect at AWS, where he helps customers modernize their data architecture and define their end-to-end data strategy, including data security, accessibility, governance, and more. He is also the author of the book Simplify Big Data Analytics with Amazon EMR. Outside of work, Sakti enjoys learning new technologies, watching movies, and visiting places with family.

Bhavana Chirumamilla is a Senior Resident Architect at AWS with a strong passion for data and machine learning operations. She brings a wealth of experience and enthusiasm to help enterprises build effective data and ML strategies. In her spare time, Bhavana enjoys spending time with her family and engaging in various activities such as traveling, hiking, gardening, and watching documentaries.

Sheela Sonone is a Senior Resident Architect at AWS. She helps AWS customers make informed choices and trade-offs about accelerating their data, analytics, and AI/ML workloads and implementations. In her spare time, she enjoys spending time with her family—usually on tennis courts.

Daniel Bruno is a Principal Resident Architect at AWS. He had been building analytics and machine learning solutions for over 20 years and splits his time helping customers build data science programs and designing impactful ML products.

Run Spark SQL on Amazon Athena Spark

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/run-spark-sql-on-amazon-athena-spark/

At AWS re:Invent 2022, Amazon Athena launched support for Apache Spark. With this launch, Amazon Athena supports two open-source query engines: Apache Spark and Trino. Athena Spark allows you to build Apache Spark applications using a simplified notebook experience on the Athena console or through Athena APIs. Athena Spark notebooks support PySpark and notebook magics to allow you to work with Spark SQL. For interactive applications, Athena Spark allows you to spend less time waiting and be more productive, with application startup time in under a second. And because Athena is serverless and fully managed, you can run your workloads without worrying about the underlying infrastructure.

Modern applications store massive amounts of data on Amazon Simple Storage Service (Amazon S3) data lakes, providing cost-effective and highly durable storage, and allowing you to run analytics and machine learning (ML) from your data lake to generate insights on your data. Before you run these workloads, most customers run SQL queries to interactively extract, filter, join, and aggregate data into a shape that can be used for decision-making, model training, or inference. Running SQL on data lakes is fast, and Athena provides an optimized, Trino- and Presto-compatible API that includes a powerful optimizer. In addition, organizations across multiple industries such as financial services, healthcare, and retail are adopting Apache Spark, a popular open-source, distributed processing system that is optimized for fast analytics and advanced transformations against data of any size. With support in Athena for Apache Spark, you can use both Spark SQL and PySpark in a single notebook to generate application insights or build models. Start with Spark SQL to extract, filter, and project attributes that you want to work with. Then to perform more complex data analysis such as regression tests and time series forecasting, you can use Apache Spark with Python, which allows you to take advantage of a rich ecosystem of libraries, including data visualization in Matplot, Seaborn, and Plotly.

In this first post of a three-part series, we show you how to get started using Spark SQL in Athena notebooks. We demonstrate querying databases and tables in the Amazon S3 and the AWS Glue Data Catalog using Spark SQL in Athena. We cover some common and advanced SQL commands used in Spark SQL, and show you how to use Python to extend your functionality with user-defined functions (UDFs) as well as to visualize queried data. In the next post, we’ll show you how to use Athena Spark with open-source transactional table formats. In the third post, we’ll cover analyzing data sources other than Amazon S3 using Athena Spark.

Prerequisites

To get started, you will need the following:

Provide Athena Spark access to your data through an IAM role

As you proceed through this walkthrough, we create new databases and tables. By default, Athena Spark doesn’t have permission to do this. To provide this access, you can add the following inline policy to the AWS Identity and Access Management (IAM) role attached to the workgroup, providing the region and your account number. For more information, refer to the section To embed an inline policy for a user or role (console) in Adding IAM identity permissions (console).

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Sid": "ReadfromPublicS3",
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::athena-examples-us-east-1/*",
              "arn:aws:s3:::athena-examples-us-east-1"
          ]
      },
      {
            "Sid": "GlueReadDatabases",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabases"
            ],
            "Resource": "arn:aws:glue:<region>:<account-id>:*"
        },
        {
            "Sid": "GlueReadDatabase",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*",
                "arn:aws:glue:<region>:<account-id>:database/default"
            ]
        },
        {
            "Sid": "GlueCreateDatabase",
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb"
            ]
        },
        {
            "Sid": "GlueDeleteDatabase",
            "Effect": "Allow",
            "Action": "glue:DeleteDatabase",
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*"            ]
        },
        {
            "Sid": "GlueCreateDeleteTablePartitions",
            "Effect": "Allow",
            "Action": [
                "glue:CreateTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*"
            ]
        }
  ]
}

Run SQL queries directly in notebook without using Python

When using Athena Spark notebooks, we can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook that change the cells’ behavior. For SQL, we can add the %%sql magic, which will interpret the entire cell contents as a SQL statement to be run on Athena Spark.

Now that we have our workgroup and notebook created, let’s start exploring the NOAA Global Surface Summary of Day dataset, which provides environmental measures from various locations all over the earth. The datasets used in this post are public datasets hosted in the following Amazon S3 locations:

  • Parquet data for year 2020s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/
  • Parquet data for year 2021 s3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/year=2021/
  • Parquet data from year 2022s3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/year=2022/

To use this data, we need an AWS Glue Data Catalog database that acts as the metastore for Athena, allowing us to create external tables that point to the location of datasets in Amazon S3. First, we create a database in the Data Catalog using Athena and Spark.

Create a database

Run following SQL in your notebook using %%sql magic:

%%sql 
CREATE DATABASE sparkblogdb

You get the following output:
Output of CREATE DATABASE SQL

Create a table

Now that we have created a database in the Data Catalog, we can create a partitioned table that points to our dataset stored in Amazon S3:

%%sql
CREATE EXTERNAL TABLE sparkblogdb.noaa_pq(
  station string, 
  date string, 
  latitude string, 
  longitude string, 
  elevation string, 
  name string, 
  temp string, 
  temp_attributes string, 
  dewp string, 
  dewp_attributes string, 
  slp string, 
  slp_attributes string, 
  stp string, 
  stp_attributes string, 
  visib string, 
  visib_attributes string, 
  wdsp string, 
  wdsp_attributes string, 
  mxspd string, 
  gust string, 
  max string, 
  max_attributes string, 
  min string, 
  min_attributes string, 
  prcp string, 
  prcp_attributes string, 
  sndp string, 
  frshtt string)
  PARTITIONED BY (year string)
STORED AS PARQUET
LOCATION 's3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/'

This dataset is partitioned by year, meaning that we store data files for each year separately, which simplifies management and improves performance because we can target the specific S3 locations in a query. The Data Catalog knows about the table, and now we’ll let it work out how many partitions we have automatically by using the MSCK utility:

%%sql
MSCK REPAIR TABLE sparkblogdb.noaa_pq

When the preceding statement is complete, you can run the following command to list the yearly partitions that were found in the table:

%%sql
SHOW PARTITIONS sparkblogdb.noaa_pq

Output of SHOW PARTITIONS SQL

Now that we have the table created and partitions added, let’s run a query to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select year, min(MIN) as minimum_temperature 
from sparkblogdb.noaa_pq 
where name = 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1

You get the following output:

The image shows output of previous SQL statement.

Query a cross-account Data Catalog from Athena Spark

Athena supports accessing cross-account AWS Glue Data Catalogs, which enables you to use Spark SQL in Athena Spark to query a Data Catalog in an authorized AWS account.

The cross-account Data Catalog access pattern is often used in a data mesh architecture, when a data producer wants to share a catalog and data with consumer accounts. The consumer accounts can then perform data analysis and explorations on the shared data. This is a simplified model where we don’t need to use AWS Lake Formation data sharing. The following diagram gives an overview of how the setup works between one producer and one consumer account, which can be extended to multiple producer and consumer accounts.

The image gives an overview of how the setup works between one producer and one consumer account, which can be extended to multiple producer and consumer accounts.

You need to set up the right access policies on the Data Catalog of the producer account to enable cross-account access. Specifically, you must make sure the consumer account’s IAM role used to run Spark calculations on Athena has access to the cross-account Data Catalog and data in Amazon S3. For setup instructions, refer to Configuring cross-account AWS Glue access in Athena for Spark.

There are two ways the consumer account can access the cross-account Data Catalog from Athena Spark, depending on whether you are querying from one producer account or multiple.

Query a single producer table

If you are just querying data from a single producer’s AWS account, you can tell Athena Spark to only use that account’s catalog to resolve database objects. When using this option, you don’t have to modify the SQL because you’re configuring the AWS account ID at session level. To enable this method, edit the session and set the property "spark.hadoop.hive.metastore.glue.catalogid": "999999999999" using the following steps:

  1. In the notebook editor, on the Session menu, choose Edit session.
    Image shows wherre to click to edit session
  2. Choose Edit in JSON.
  3. Add the following property and choose Save:
    {"spark.hadoop.hive.metastore.glue.catalogid": "999999999999"}The image shows where to put JSON config property to query single producerThis will start a new session with the updated parameters.
  4. Run the following SQL statement in Spark to query tables from the producer account’s catalog:
    %%sql
    SELECT * 
    FROM <central-catalog-db>.<table> 
    LIMIT 10

Query multiple producer tables

Alternatively, you can add the producer AWS account ID in each database name, which is helpful if you’re going to query Data Catalogs from different owners. To enable this method, set the property {"spark.hadoop.aws.glue.catalog.separator": "/"} when invoking or editing the session (using the same steps as the previous section). Then, you add the AWS account ID for the source Data Catalog as part of the database name:

%%sql
SELECT * 
FROM `<producer-account1-id>/database1`.table1 t1 
join `<producer-account2-id>/database2`.table2 t2 
ON t1.id = t2.id
limit 10

If the S3 bucket belonging to the producer AWS account is configured with Requester Pays enabled, the consumer is charged instead of the bucket owner for requests and downloads. In this case, you can add the following property when invoking or editing an Athena Spark session to read data from these buckets:

{"spark.hadoop.fs.s3.useRequesterPaysHeader": "true"}

Infer the schema of your data in Amazon S3 and join with tables crawled in the Data Catalog

Rather than only being able to go through the Data Catalog to understand the table structure, Spark can infer schema and read data directly from storage. This feature allows data analysts and data scientists to perform a quick exploration of the data without needing to create a database or table, but which can also be used with other existing tables stored in the Data Catalog in the same or across different accounts. To do this, we use a Spark temp view, which is an in-memory data structure that stores the schema of data stored in a data frame.

Using the NOAA dataset partition for 2020, we create a temporary view by reading S3 data into a data frame:

year_20_pq = spark.read.parquet(f"s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/")
year_20_pq.createOrReplaceTempView("y20view")

Now you can query the y20view using Spark SQL as if it were a Data Catalog database:

%%sql
select count(*) 
from y20view

Output of previous SQL query showing count value

You can query data from both temporary views and Data Catalog tables in the same query in Spark. For example, now that we have a table containing data for years 2021 and 2022, and a temporary view with 2020’s data, we can find the dates in each year when the maximum temperature was recorded for 'SEATTLE TACOMA AIRPORT, WA US'.

To do this, we can use the window function and UNION:

%%sql
SELECT date,
       max as maximum_temperature
FROM (
        SELECT date,
            max,
            RANK() OVER (
                PARTITION BY year
                ORDER BY max DESC
            ) rnk
        FROM sparkblogdb.noaa_pq
        WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'
          AND year IN ('2021', '2022')
        UNION ALL
        SELECT date,
            max,
            RANK() OVER (
                ORDER BY max DESC
            ) rnk
        FROM y20view
        WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'
    ) t
WHERE rnk = 1
ORDER by 1

You get the following output:

Output of previous SQL

Extend your SQL with a UDF in Spark SQL

You can extend your SQL functionality by registering and using a custom user-defined function in Athena Spark. These UDFs are used in addition to the common predefined functions available in Spark SQL, and once created, can be reused many times within a given session.

In this section, we walk through a straightforward UDF that converts a numeric month value into the full month name. You have the option to write the UDF in either Java or Python.

Java-based UDF

The Java code for the UDF can be found in the GitHub repository. For this post, we have uploaded a prebuilt JAR of the UDF to s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.jar.

To register the UDF, we use Spark SQL to create a temporary function:

%%sql
CREATE OR REPLACE TEMPORARY FUNCTION 
month_number_to_name as 'com.example.MonthNumbertoNameUDF'
using jar "s3a://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.jar";

Now that the UDF is registered, we can call it in a query to find the minimum recorded temperature for each month of 2022:

%%sql
select month_number_to_name(month(to_date(date,'yyyy-MM-dd'))) as month_yr_21,
min(min) as min_temp
from sparkblogdb.noaa_pq 
where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 
order by 2

You get the following output:

Output of SQL using UDF

Python-based UDF

Now let’s see how to add a Python UDF to the existing Spark session. The Python code for the UDF can be found in the GitHub repository. For this post, the code has been uploaded to s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.py.

Python UDFs can’t be registered in Spark SQL, so instead we use a small bit of PySpark code to add the Python file, import the function, and then register it as a UDF:

sc.addPyFile('s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.py')

from month_number_to_name import month_number_to_name
spark.udf.register("month_number_to_name_py",month_number_to_name)

Now that the Python-based UDF is registered, we can use the same query from earlier to find the minimum recorded temperature for each month of 2022. The fact that it’s Python rather than Java doesn’t matter now:

%%sql
select month_number_to_name_py(month(to_date(date,'yyyy-MM-dd'))) as month_yr_21,
min(min) as min_temp
from sparkblogdb.noaa_pq 
where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 
order by 2

The output should be similar to that in the preceding section.

Plot visuals from the SQL queries

It’s straightforward to use Spark SQL, including across AWS accounts for data exploration, and not complicated to extend Athena Spark with UDFs. Now let’s see how we can go beyond SQL using Python to visualize data within the same Spark session to look for patterns in the data. We use the table and temporary views created previously to generate a pie chart that shows percentage of readings taken in each year for the station 'SEATTLE TACOMA AIRPORT, WA US'.

Let’s start by creating a Spark data frame from a SQL query and converting it to a pandas data frame:

#we will use spark.sql instead of %%sql magic to enclose the query string
#this will allow us to read the results of the query into a dataframe to use with our plot command
sqlDF = spark.sql("select year, count(*) as cnt from sparkblogdb.noaa_pq where name = 'SEATTLE TACOMA AIRPORT, WA US' group by 1 \
                  union all \
                  select 2020 as year, count(*) as cnt from y20view where name = 'SEATTLE TACOMA AIRPORT, WA US'")

#convert to pandas data frame
seatac_year_counts=sqlDF.toPandas()

Next, the following code uses the pandas data frame and Matplot library to plot a pie chart:

import matplotlib.pyplot as plt

# clear the state of the visualization figure
plt.clf()

# create a pie chart with values from the 'cnt' field, and yearly labels
plt.pie(seatac_year_counts.cnt, labels=seatac_year_counts.year, autopct='%1.1f%%')
%matplot plt

The following figure shows our output.

Output of code showing pie chart

Clean up

To clean up the resources created for this post, complete the following steps:

  1. Run the following SQL statements in the notebook’s cell to delete the database and tables from the Data Catalog:
    %%sql
    DROP TABLE sparkblogdb.noaa_pq
    
    %%sql
    DROP DATABASE sparkblogdb

  2. Delete the workgroup created for this post. This will also delete saved notebooks that are part of the workgroup.
  3. Delete the S3 bucket that you created as part of the workgroup.

Conclusion

Athena Spark makes it easier than ever to query databases and tables in the AWS Glue Data Catalog directly through Spark SQL in Athena, and to query data directly from Amazon S3 without needing a metastore for quick data exploration. It also makes it straightforward to use common and advanced SQL commands used in Spark SQL, including registering UDFs for custom functionality. Additionally, Athena Spark makes it effortless to use Python in a fast start notebook environment to visualize and analyze data queried via Spark SQL.

Overall, Spark SQL unlocks the ability to go beyond standard SQL in Athena, providing advanced users more flexibility and power through both SQL and Python in a single integrated notebook, and providing fast, complex analysis of data in Amazon S3 without infrastructure setup. To learn more about Athena Spark, refer to Amazon Athena for Apache Spark.


About the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.

Simplify data transfer: Google BigQuery to Amazon S3 using Amazon AppFlow

Post Syndicated from Kartikay Khator original https://aws.amazon.com/blogs/big-data/simplify-data-transfer-google-bigquery-to-amazon-s3-using-amazon-appflow/

In today’s data-driven world, the ability to effortlessly move and analyze data across diverse platforms is essential. Amazon AppFlow, a fully managed data integration service, has been at the forefront of streamlining data transfer between AWS services, software as a service (SaaS) applications, and now Google BigQuery. In this blog post, you explore the new Google BigQuery connector in Amazon AppFlow and discover how it simplifies the process of transferring data from Google’s data warehouse to Amazon Simple Storage Service (Amazon S3), providing significant benefits for data professionals and organizations, including the democratization of multi-cloud data access.

Overview of Amazon AppFlow

Amazon AppFlow is a fully managed integration service that you can use to securely transfer data between SaaS applications such as Google BigQuery, Salesforce, SAP, Hubspot, and ServiceNow, and AWS services such as Amazon S3 and Amazon Redshift, in just a few clicks. With Amazon AppFlow, you can run data flows at nearly any scale at the frequency you choose—on a schedule, in response to a business event, or on demand. You can configure data transformation capabilities such as filtering and validation to generate rich, ready-to-use data as part of the flow itself, without additional steps. Amazon AppFlow automatically encrypts data in motion, and allows you to restrict data from flowing over the public internet for SaaS applications that are integrated with AWS PrivateLink, reducing exposure to security threats.

Introducing the Google BigQuery connector

The new Google BigQuery connector in Amazon AppFlow unveils possibilities for organizations seeking to use the analytical capability of Google’s data warehouse, and to effortlessly integrate, analyze, store, or further process data from BigQuery, transforming it into actionable insights.

Architecture

Let’s review the architecture to transfer data from Google BigQuery to Amazon S3 using Amazon AppFlow.

architecture

  1. Select a data source: In Amazon AppFlow, select Google BigQuery as your data source. Specify the tables or datasets you want to extract data from.
  2. Field mapping and transformation: Configure the data transfer using the intuitive visual interface of Amazon AppFlow. You can map data fields and apply transformations as needed to align the data with your requirements.
  3. Transfer frequency: Decide how frequently you want to transfer data—such as daily, weekly, or monthly—supporting flexibility and automation.
  4. Destination: Specify an S3 bucket as the destination for your data. Amazon AppFlow will efficiently move the data, making it accessible in your Amazon S3 storage.
  5. Consumption: Use Amazon Athena to analyze the data in Amazon S3.

Prerequisites

The dataset used in this solution is generated by Synthea, a synthetic patient population simulator and opensource project under the Apache License 2.0. Load this data into Google BigQuery or use your existing dataset.

Connect Amazon AppFlow to your Google BigQuery account

For this post, you use a Google account, OAuth client with appropriate permissions, and Google BigQuery data. To enable Google BigQuery access from Amazon AppFlow, you must set up a new OAuth client in advance. For instructions, see Google BigQuery connector for Amazon AppFlow.

Set up Amazon S3

Every object in Amazon S3 is stored in a bucket. Before you can store data in Amazon S3, you must create an S3 bucket to store the results.

Create a new S3 bucket for Amazon AppFlow results

To create an S3 bucket, complete the following steps:

  1. On the AWS Management console for Amazon S3, choose Create bucket.
  2. Enter a globally unique name for your bucket; for example, appflow-bq-sample.
  3. Choose Create bucket.

Create a new S3 bucket for Amazon Athena results

To create an S3 bucket, complete the following steps:

  1. On the AWS Management console for Amazon S3, choose Create bucket.
  2. Enter a globally unique name for your bucket; for example, athena-results.
  3. Choose Create bucket.

User role (IAM role) for AWS Glue Data Catalog

To catalog the data that you transfer with your flow, you must have the appropriate user role in AWS Identity and Access Management (IAM). You provide this role to Amazon AppFlow to grant the permissions it needs to create an AWS Glue Data Catalog, tables, databases, and partitions.

For an example IAM policy that has the required permissions, see Identity-based policy examples for Amazon AppFlow.

Walkthrough of the design

Now, let’s walk through a practical use case to see how the Amazon AppFlow Google BigQuery to Amazon S3 connector works. For the use case, you will use Amazon AppFlow to archive historical data from Google BigQuery to Amazon S3 for long-term storage an analysis.

Set up Amazon AppFlow

Create a new Amazon AppFlow flow to transfer data from Google Analytics to Amazon S3.

  1. On the Amazon AppFlow console, choose Create flow.
  2. Enter a name for your flow; for example, my-bq-flow.
  3. Add necessary Tags; for example, for Key enter env and for Value enter dev.

appflow-flow-setup­­­­

  1. Choose Next.
  2. For Source name, choose Google BigQuery.
  3. Choose Create new connection.
  4. Enter your OAuth Client ID and Client Secret, then name your connection; for example, bq-connection.

­bq-connection

  1. In the pop-up window, choose to allow amazon.com access to the Google BigQuery API.

bq-authentication

  1. For Choose Google BigQuery object, choose Table.
  2. For Choose Google BigQuery subobject, choose BigQueryProjectName.
  3. For Choose Google BigQuery subobject, choose DatabaseName.
  4. For Choose Google BigQuery subobject, choose TableName.
  5. For Destination name, choose Amazon S3.
  6. For Bucket details, choose the Amazon S3 bucket you created for storing Amazon AppFlow results in the prerequisites.
  7. Enter raw as a prefix.

appflow-source-destination

  1. Next, provide AWS Glue Data Catalog settings to create a table for further analysis.
    1. Select the User role (IAM role) created in the prerequisites.
    2. Create new database for example, healthcare.
    3. Provide a table prefix setting for example, bq.

glue-crawler-config

  1. Select Run on demand.

appflow-trigger-setup

  1. Choose Next.
  2. Select Manually map fields.
  3. Select the following six fields for Source field name from the table Allergies:
    1. Start
    2. Patient
    3. Code
    4. Description
    5. Type
    6. Category
  4. Choose Map fields directly.

appflow-field-mapping

  1. Choose Next.
  2. In the Add filters section, choose Next.
  3. Choose Create flow.

Run the flow

After creating your new flow, you can run it on demand.

  1. On the Amazon AppFlow console, choose my-bq-flow.
  2. Choose Run flow.

sppflow-run--status

For this walkthrough, choose run the job on-demand for ease of understanding. In practice, you can choose a scheduled job and periodically extract only newly added data.

Query through Amazon Athena

When you select the optional AWS Glue Data Catalog settings, Data Catalog creates the catalog for the data, allowing Amazon Athena to perform queries.

If you’re prompted to configure a query results location, navigate to the Settings tab and choose Manage. Under Manage settings, choose the Athena results bucket created in prerequisites and choose Save.

  1. On the Amazon Athena console, select the Data Source as AWSDataCatalog.
  2. Next, select Database as healthcare.
  3. Now you can select the table created by the AWS Glue crawler and preview it.

athena-results

  1. You can also run a custom query to find the top 10 allergies as shown in the following query.

Note: In the below query, replace the table name, in this case bq_appflow_mybqflow_1693588670_latest, with the name of the table generated in your AWS account.

SELECT type,
category,
"description",
count(*) as number_of_cases
FROM "healthcare"."bq_appflow_mybqflow_1693588670_latest"
GROUP BY type,
category,
"description"
ORDER BY number_of_cases DESC
LIMIT 10;

  1. Choose Run query.

athena-custom-query-results

This result shows the top 10 allergies by number of cases.

Clean up

To avoid incurring charges, clean up the resources in your AWS account by completing the following steps:

  1. On the Amazon AppFlow console, choose Flows in the navigation pane.
  2. From the list of flows, select the flow my-bq-flow, and delete it.
  3. Enter delete to delete the flow.
  4. Choose Connections in the navigation pane.
  5. Choose Google BigQuery from the list of connectors, select bq-connector, and delete it.
  6. Enter delete to delete the connector.
  7. On the IAM console, choose Roles in the navigation page, then select the role you created for AWS Glue crawler and delete it.
  8. On the Amazon Athena console:
    1. Delete the tables created under the database healthcare using AWS Glue crawler.
    2. Drop the database healthcare
  9. On the Amazon S3 console, search for the Amazon AppFlow results bucket you created, choose Empty to delete the objects, then delete the bucket.
  10. On the Amazon S3 console, search for the Amazon Athena results bucket you created, choose Empty to delete the objects, then delete the bucket.
  11. Clean up resources in your Google account by deleting the project that contains the Google BigQuery resources. Follow the documentation to clean up the Google resources.

Conclusion

The Google BigQuery connector in Amazon AppFlow streamlines the process of transferring data from Google’s data warehouse to Amazon S3. This integration simplifies analytics and machine learning, archiving, and long-term storage, providing significant benefits for data professionals and organizations seeking to harness the analytical capabilities of both platforms.

With Amazon AppFlow, the complexities of data integration are eliminated, enabling you to focus on deriving actionable insights from your data. Whether you’re archiving historical data, performing complex analytics, or preparing data for machine learning, this connector simplifies the process, making it accessible to a broader range of data professionals.

If you’re interested to see how the data transfer from Google BigQuery to Amazon S3 using Amazon AppFlow, take a look at step-by-step video tutorial. In this tutorial, we walk through the entire process, from setting up the connection to running the data transfer flow. For more information on Amazon AppFlow, visit Amazon AppFlow.


About the authors

Kartikay Khator is a Solutions Architect on the Global Life Science at Amazon Web Services. He is passionate about helping customers on their cloud journey with focus on AWS analytics services. He is an avid runner and enjoys hiking.

Kamen SharlandjievKamen Sharlandjiev is a Sr. Big Data and ETL Solutions Architect and Amazon AppFlow expert. He’s on a mission to make life easier for customers who are facing complex data integration challenges. His secret weapon? Fully managed, low-code AWS services that can get the job done with minimal effort and no coding.

Process and analyze highly nested and large XML files using AWS Glue and Amazon Athena

Post Syndicated from Navnit Shukla original https://aws.amazon.com/blogs/big-data/process-and-analyze-highly-nested-and-large-xml-files-using-aws-glue-and-amazon-athena/

In today’s digital age, data is at the heart of every organization’s success. One of the most commonly used formats for exchanging data is XML. Analyzing XML files is crucial for several reasons. Firstly, XML files are used in many industries, including finance, healthcare, and government. Analyzing XML files can help organizations gain insights into their data, allowing them to make better decisions and improve their operations. Analyzing XML files can also help in data integration, because many applications and systems use XML as a standard data format. By analyzing XML files, organizations can easily integrate data from different sources and ensure consistency across their systems, However, XML files contain semi-structured, highly nested data, making it difficult to access and analyze information, especially if the file is large and has complex, highly nested schema.

XML files are well-suited for applications, but they may not be optimal for analytics engines. In order to enhance query performance and enable easy access in downstream analytics engines such as Amazon Athena, it’s crucial to preprocess XML files into a columnar format like Parquet. This transformation allows for improved efficiency and usability in analytics workflows. In this post, we show how to process XML data using AWS Glue and Athena.

Solution overview

We explore two distinct techniques that can streamline your XML file processing workflow:

  • Technique 1: Use an AWS Glue crawler and the AWS Glue visual editor – You can use the AWS Glue user interface in conjunction with a crawler to define the table structure for your XML files. This approach provides a user-friendly interface and is particularly suitable for individuals who prefer a graphical approach to managing their data.
  • Technique 2: Use AWS Glue DynamicFrames with inferred and fixed schemas – The crawler has a limitation when it comes to processing a single row in XML files larger than 1 MB. To overcome this restriction, we use an AWS Glue notebook to construct AWS Glue DynamicFrames, utilizing both inferred and fixed schemas. This method ensures efficient handling of XML files with rows exceeding 1 MB in size.

In both approaches, our ultimate goal is to convert XML files into Apache Parquet format, making them readily available for querying using Athena. With these techniques, you can enhance the processing speed and accessibility of your XML data, enabling you to derive valuable insights with ease.

Prerequisites

Before you begin this tutorial, complete the following prerequisites (these apply to both techniques):

  1. Download the XML files technique1.xml and technique2.xml.
  2. Upload the files to an Amazon Simple Storage Service (Amazon S3) bucket. You can upload them to the same S3 bucket in different folders or to different S3 buckets.
  3. Create an AWS Identity and Access Management (IAM) role for your ETL job or notebook as instructed in Set up IAM permissions for AWS Glue Studio.
  4. Add an inline policy to your role with the iam:PassRole action:
  "Version": "2012-10-17",
  "Statement": [
    {
      "Action": ["iam:PassRole"],
      "Effect": "Allow",
      "Resource": "arn:aws:iam::*:role/AWSGlueServiceRole*",
      "Condition": {
        "StringLike": {
          "iam:PassedToService": ["glue.amazonaws.com"]
        }
      }
    }
}
  1. Add a permissions policy to the role with access to your S3 bucket.

Now that we’re done with the prerequisites, let’s move on to implementing the first technique.

Technique 1: Use an AWS Glue crawler and the visual editor

The following diagram illustrates the simple architecture that you can use to implement the solution.

Processing and Analyzing XML file using AWS Glue and Amazon Athena

To analyze XML files stored in Amazon S3 using AWS Glue and Athena, we complete the following high-level steps:

  1. Create an AWS Glue crawler to extract XML metadata and create a table in the AWS Glue Data Catalog.
  2. Process and transform XML data into a format (like Parquet) suitable for Athena using an AWS Glue extract, transform, and load (ETL) job.
  3. Set up and run an AWS Glue job via the AWS Glue console or the AWS Command Line Interface (AWS CLI).
  4. Use the processed data (in Parquet format) with Athena tables, enabling SQL queries.
  5. Use the user-friendly interface in Athena to analyze the XML data with SQL queries on your data stored in Amazon S3.

This architecture is a scalable, cost-effective solution for analyzing XML data on Amazon S3 using AWS Glue and Athena. You can analyze large datasets without complex infrastructure management.

We use the AWS Glue crawler to extract XML file metadata. You can choose the default AWS Glue classifier for general-purpose XML classification. It automatically detects XML data structure and schema, which is useful for common formats.

We also use a custom XML classifier in this solution. It’s designed for specific XML schemas or formats, allowing precise metadata extraction. This is ideal for non-standard XML formats or when you need detailed control over classification. A custom classifier ensures only necessary metadata is extracted, simplifying downstream processing and analysis tasks. This approach optimizes the use of your XML files.

The following screenshot shows an example of an XML file with tags.

Create a custom classifier

In this step, you create a custom AWS Glue classifier to extract metadata from an XML file. Complete the following steps:

  1. On the AWS Glue console, under Crawlers in the navigation pane, choose Classifiers.
  2. Choose Add classifier.
  3. Select XML as the classifier type.
  4. Enter a name for the classifier, such as blog-glue-xml-contact.
  5. For Row tag, enter the name of the root tag that contains the metadata (for example, metadata).
  6. Choose Create.

Create an AWS Glue Crawler to crawl xml file

In this section, we are creating a Glue Crawler to extract the metadata from XML file using the customer classifier created in previous step.

Create a database

  1. Go to the AWS Glue console, choose Databases in the navigation pane.
  2. Click on Add database.
  3. Provide a name such as blog_glue_xml
  4. Choose Create Database

Create a Crawler

Complete the following steps to create your first crawler:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. On the Set crawler properties page, provide a name for the new crawler (such as blog-glue-parquet), then choose Next.
  4. On the Choose data sources and classifiers page, select Not Yet under Data source configuration.
  5. Choose Add a data store.
  6. For S3 path, browse to s3://${BUCKET_NAME}/input/geologicalsurvey/.

Make sure you pick the XML folder rather than the file inside the folder.

  1. Leave the rest of the options as default and choose Add an S3 data source.
  2. Expand Custom classifiers – optional, choose blog-glue-xml-contact, then choose Next and keep the rest of the options as default.
  3. Choose your IAM role or choose Create new IAM role, add the suffix glue-xml-contact (for example, AWSGlueServiceNotebookRoleBlog), and choose Next.
  4. On the Set output and scheduling page, under Output configuration, choose blog_glue_xml for Target database.
  5. Enter console_ as the prefix added to tables (optional) and under Crawler schedule, keep the frequency set to On demand.
  6. Choose Next.
  7. Review all the parameters and choose Create crawler.

Run the Crawler

After you create the crawler, complete the following steps to run it:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Open the crawler you created and choose Run.

The crawler will take 1–2 minutes to complete.

  1. When the crawler is complete, choose Databases in the navigation pane.
  2. Choose the database you crated and choose the table name to see the schema extracted by the crawler.

Create an AWS Glue job to convert the XML to Parquet format

In this step, you create an AWS Glue Studio job to convert the XML file into a Parquet file. Complete the following steps:

  1. On the AWS Glue console, choose Jobs in the navigation pane.
  2. Under Create job, select Visual with a blank canvas.
  3. Choose Create.
  4. Rename the job to blog_glue_xml_job.

Now you have a blank AWS Glue Studio visual job editor. On the top of the editor are the tabs for different views.

  1. Choose the Script tab to see an empty shell of the AWS Glue ETL script.

As we add new steps in the visual editor, the script will be updated automatically.

  1. Choose the Job details tab to see all the job configurations.
  2. For IAM role, choose AWSGlueServiceNotebookRoleBlog.
  3. For Glue version, choose Glue 4.0 – Support Spark 3.3, Scala 2, Python 3.
  4. Set Requested number of workers to 2.
  5. Set Number of retries to 0.
  6. Choose the Visual tab to go back to the visual editor.
  7. On the Source drop-down menu, choose AWS Glue Data Catalog.
  8. On the Data source properties – Data Catalog tab, provide the following information:
    1. For Database, choose blog_glue_xml.
    2. For Table, choose the table that starts with the name console_ that the crawler created (for example, console_geologicalsurvey).
  9. On the Node properties tab, provide the following information:
    1. Change Name to geologicalsurvey dataset.
    2. Choose Action and the transformation Change Schema (Apply Mapping).
    3. Choose Node properties and change the name of the transform from Change Schema (Apply Mapping) to ApplyMapping.
    4. On the Target menu, choose S3.
  10. On the Data source properties – S3 tab, provide the following information:
    1. For Format, select Parquet.
    2. For Compression Type, select Uncompressed.
    3. For S3 source type, select S3 location.
    4. For S3 URL, enter s3://${BUCKET_NAME}/output/parquet/.
    5. Choose Node Properties and change the name to Output.
  11. Choose Save to save the job.
  12. Choose Run to run the job.

The following screenshot shows the job in the visual editor.

Create an AWS Gue Crawler to crawl the Parquet file

In this step, you create an AWS Glue crawler to extract metadata from the Parquet file you created using an AWS Glue Studio job. This time, you use the default classifier. Complete the following steps:

  1. On the AWS Glue console, choose Crawlers in the navigation pane.
  2. Choose Create crawler.
  3. On the Set crawler properties page, provide a name for the new crawler, such as blog-glue-parquet-contact, then choose Next.
  4. On the Choose data sources and classifiers page, select Not Yet for Data source configuration.
  5. Choose Add a data store.
  6. For S3 path, browse to s3://${BUCKET_NAME}/output/parquet/.

Make sure you pick the parquet folder rather than the file inside the folder.

  1. Choose your IAM role created during the prerequisite section or choose Create new IAM role (for example, AWSGlueServiceNotebookRoleBlog), and choose Next.
  2. On the Set output and scheduling page, under Output configuration, choose blog_glue_xml for Database.
  3. Enter parquet_ as the prefix added to tables (optional) and under Crawler schedule, keep the frequency set to On demand.
  4. Choose Next.
  5. Review all the parameters and choose Create crawler.

Now you can run the crawler, which takes 1–2 minutes to complete.

You can preview the newly created schema for the Parquet file in the AWS Glue Data Catalog, which is similar to the schema of the XML file.

We now possess data that is suitable for use with Athena. In the next section, we perform data queries using Athena.

Query the Parquet file using Athena

Athena doesn’t support querying the XML file format, which is why you converted the XML file into Parquet for more efficient data querying and use dot notation to query complex types and nested structures.

The following example code uses dot notation to query nested data:

SELECT 
    idinfo.citation.citeinfo.origin,
    idinfo.citation.citeinfo.pubdate,
    idinfo.citation.citeinfo.title,
    idinfo.citation.citeinfo.geoform,
    idinfo.citation.citeinfo.pubinfo.pubplace,
    idinfo.citation.citeinfo.pubinfo.publish,
    idinfo.citation.citeinfo.onlink,
    idinfo.descript.abstract,
    idinfo.descript.purpose,
    idinfo.descript.supplinf,
    dataqual.attracc.attraccr, 
    dataqual.logic,
    dataqual.complete,
    dataqual.posacc.horizpa.horizpar,
    dataqual.posacc.vertacc.vertaccr,
    dataqual.lineage.procstep.procdate,
    dataqual.lineage.procstep.procdesc
FROM "blog_glue_xml"."parquet_parquet" limit 10;

Now that we’ve completed technique 1, let’s move on to learn about technique 2.

Technique 2: Use AWS Glue DynamicFrames with inferred and fixed schemas

In the previous section, we covered the process of handling a small XML file using an AWS Glue crawler to generate a table, an AWS Glue job to convert the file into Parquet format, and Athena to access the Parquet data. However, the crawler encounters limitations when it comes to processing XML files that exceed 1 MB in size. In this section, we delve into the topic of batch processing larger XML files, necessitating additional parsing to extract individual events and conduct analysis using Athena.

Our approach involves reading the XML files through AWS Glue DynamicFrames, employing both inferred and fixed schemas. Then we extract the individual events in Parquet format using the relationalize transformation, enabling us to query and analyze them seamlessly using Athena.

To implement this solution, you complete the following high-level steps:

  1. Create an AWS Glue notebook to read and analyze the XML file.
  2. Use DynamicFrames with InferSchema to read the XML file.
  3. Use the relationalize function to unnest any arrays.
  4. Convert the data to Parquet format.
  5. Query the Parquet data using Athena.
  6. Repeat the previous steps, but this time pass a schema to DynamicFrames instead of using InferSchema.

The electric vehicle population data XML file has a response tag at its root level. This tag contains an array of row tags, which are nested within it. The row tag is an array that contains a set of another row tags, which provide information about a vehicle, including its make, model, and other relevant details. The following screenshot shows an example.

Create an AWS Glue Notebook

To create an AWS Glue notebook, complete the following steps:

  1. Open the AWS Glue Studio console, choose Jobs in the navigation pane.
  2. Select Jupyter Notebook and choose Create.

  1. Enter a name for your AWS Glue job, such as blog_glue_xml_job_Jupyter.
  2. Choose the role that you created in the prerequisites (AWSGlueServiceNotebookRoleBlog).

The AWS Glue notebook comes with a preexisting example that demonstrates how to query a database and write the output to Amazon S3.

  1. Adjust the timeout (in minutes) as shown in the following screenshot and run the cell to create the AWS Glue interactive session.

Create basic Variables

After you create the interactive session, at the end of the notebook, create a new cell with the following variables (provide your own bucket name):

BUCKET_NAME='YOUR_BUCKET_NAME'
S3_SOURCE_XML_FILE = f's3://{BUCKET_NAME}/xml_dataset/'
S3_TEMP_FOLDER = f's3://{BUCKET_NAME}/temp/'
S3_OUTPUT_INFER_SCHEMA = f's3://{BUCKET_NAME}/infer_schema/'
INFER_SCHEMA_TABLE_NAME = 'infer_schema'
S3_OUTPUT_NO_INFER_SCHEMA = f's3://{BUCKET_NAME}/no_infer_schema/'
NO_INFER_SCHEMA_TABLE_NAME = 'no_infer_schema'
DATABASE_NAME = 'blog_xml'

Read the XML file inferring the schema

If you don’t pass a schema to the DynamicFrame, it will infer the schema of the files. To read the data using a dynamic frame, you can use the following command:

df = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={"paths": [S3_SOURCE_XML_FILE]},
    format="xml",
    format_options={"rowTag": "response"},
)

Print the DynamicFrame Schema

Print the schema with the following code:

df.printSchema()

The schema shows a nested structure with a row array containing multiple elements. To unnest this structure into lines, you can use the AWS Glue relationalize transformation:

df_relationalized = df.relationalize(
    "root", S3_TEMP_FOLDER
)

We are only interested in the information contained within the row array, and we can view the schema by using the following command:

df_relationalized.select("root_row.row").printSchema()

The column names contain row.row, which correspond to the array structure and array column in the dataset. We don’t rename the columns in this post; for instructions to do so, refer to Automate dynamic mapping and renaming of column names in data files using AWS Glue: Part 1. Then you can convert the data to Parquet format and create the AWS Glue table using the following command:


s3output = glueContext.getSink(
  path= S3_OUTPUT_INFER_SCHEMA,
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="blog_xml", catalogTableName="jupyter_notebook_with_infer_schema"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(df_relationalized.select("root_row.row"))

AWS Glue DynamicFrame provides features that you can use in your ETL script to create and update a schema in the Data Catalog. We use the updateBehavior parameter to create the table directly in the Data Catalog. With this approach, we don’t need to run an AWS Glue crawler after the AWS Glue job is complete.

Read the XML file by setting a schema

An alternative way to read the file is by predefining a schema. To do this, complete the following steps:

  1. Import the AWS Glue data types:
    from awsglue.gluetypes import *

  2. Create a schema for the XML file:
    schema = StructType([ 
      Field("row", StructType([
        Field("row", ArrayType(StructType([
                Field("_2020_census_tract", LongType()),
                Field("__address", StringType()),
                Field("__id", StringType()),
                Field("__position", IntegerType()),
                Field("__uuid", StringType()),
                Field("base_msrp", IntegerType()),
                Field("cafv_type", StringType()),
                Field("city", StringType()),
                Field("county", StringType()),
                Field("dol_vehicle_id", IntegerType()),
                Field("electric_range", IntegerType()),
                Field("electric_utility", StringType()),
                Field("ev_type", StringType()),
                Field("geocoded_column", StringType()),
                Field("legislative_district", IntegerType()),
                Field("make", StringType()),
                Field("model", StringType()),
                Field("model_year", IntegerType()),
                Field("state", StringType()),
                Field("vin_1_10", StringType()),
                Field("zip_code", IntegerType())
        ])))
      ]))
    ])

  3. Pass the schema when reading the XML file:
    df = glueContext.create_dynamic_frame.from_options(
        connection_type="s3",
        connection_options={"paths": [S3_SOURCE_XML_FILE]},
        format="xml",
        format_options={"rowTag": "response", "withSchema": json.dumps(schema.jsonValue())},
    )

  4. Unnest the dataset like before:
    df_relationalized = df.relationalize(
        "root", S3_TEMP_FOLDER
    )

  5. Convert the dataset to Parquet and create the AWS Glue table:
    s3output = glueContext.getSink(
      path=S3_OUTPUT_NO_INFER_SCHEMA,
      connection_type="s3",
      updateBehavior="UPDATE_IN_DATABASE",
      partitionKeys=[],
      compression="snappy",
      enableUpdateCatalog=True,
      transformation_ctx="s3output",
    )
    s3output.setCatalogInfo(
      catalogDatabase="blog_xml", catalogTableName="jupyter_notebook_no_infer_schema"
    )
    s3output.setFormat("glueparquet")
    s3output.writeFrame(df_relationalized.select("root_row.row"))

Query the tables using Athena

Now that we have created both tables, we can query the tables using Athena. For example, we can use the following query:

SELECT * FROM "blog_xml"."jupyter_notebook_no_infer_schema " limit 10;

The following screenshot shows the results.

Clean Up

In this post, we created an IAM role, an AWS Glue Jupyter notebook, and two tables in the AWS Glue Data Catalog. We also uploaded some files to an S3 bucket. To clean up these objects, complete the following steps:

  1. On the IAM console, delete the role you created.
  2. On the AWS Glue Studio console, delete the custom classifier, crawler, ETL jobs, and Jupyter notebook.
  3. Navigate to the AWS Glue Data Catalog and delete the tables you created.
  4. On the Amazon S3 console, navigate to the bucket you created and delete the folders named temp, infer_schema, and no_infer_schema.

Key Takeaways

In AWS Glue, there’s a feature called InferSchema in AWS Glue DynamicFrames. It automatically figures out the structure of a data frame based on the data it contains. In contrast, defining a schema means explicitly stating how the data frame’s structure should be before loading the data.

XML, being a text-based format, doesn’t restrict the data types of its columns. This can cause issues with the InferSchema function. For example, in the first run, a file with column A having a value of 2 results in a Parquet file with column A as an integer. In the second run, a new file has column A with the value C, leading to a Parquet file with column A as a string. Now there are two files on S3, each with a column A of different data types, which can create problems downstream.

The same happens with complex data types like nested structures or arrays. For example, if a file has one tag entry called transaction, it’s inferred as a struct. But if another file has the same tag, it’s inferred as an array

Despite these data type issues, InferSchema is useful when you don’t know the schema or defining one manually is impractical. However, it’s not ideal for large or constantly changing datasets. Defining a schema is more precise, especially with complex data types, but has its own issues, like requiring manual effort and being inflexible to data changes.

InferSchema has limitations, like incorrect data type inference and issues with handling null values. Defining a schema also has limitations, like manual effort and potential errors.

Choosing between inferring and defining a schema depends on the project’s needs. InferSchema is great for quick exploration of small datasets, whereas defining a schema is better for larger, complex datasets requiring accuracy and consistency. Consider the trade-offs and constraints of each method to pick what suits your project best.

Conclusion

In this post, we explored two techniques for managing XML data using AWS Glue, each tailored to address specific needs and challenges you may encounter.

Technique 1 offers a user-friendly path for those who prefer a graphical interface. You can use an AWS Glue crawler and the visual editor to effortlessly define the table structure for your XML files. This approach simplifies the data management process and is particularly appealing to those looking for a straightforward way to handle their data.

However, we recognize that the crawler has its limitations, specifically when dealing with XML files having rows larger than 1 MB. This is where technique 2 comes to the rescue. By harnessing AWS Glue DynamicFrames with both inferred and fixed schemas, and employing an AWS Glue notebook, you can efficiently handle XML files of any size. This method provides a robust solution that ensures seamless processing even for XML files with rows exceeding the 1 MB constraint.

As you navigate the world of data management, having these techniques in your toolkit empowers you to make informed decisions based on the specific requirements of your project. Whether you prefer the simplicity of technique 1 or the scalability of technique 2, AWS Glue provides the flexibility you need to handle XML data effectively.


About the Authors

Navnit Shuklaserves as an AWS Specialist Solution Architect with a focus on Analytics. He possesses a strong enthusiasm for assisting clients in discovering valuable insights from their data. Through his expertise, he constructs innovative solutions that empower businesses to arrive at informed, data-driven choices. Notably, Navnit Shukla is the accomplished author of the book titled “Data Wrangling on AWS.

Patrick Muller works as a Senior Data Lab Architect at AWS. His main responsibility is to assist customers in turning their ideas into a production-ready data product. In his free time, Patrick enjoys playing soccer, watching movies, and traveling.

Amogh Gaikwad is a Senior Solutions Developer at Amazon Web Services. He helps global customers build and deploy AI/ML solutions on AWS. His work is mainly focused on computer vision, and natural language processing and helping customers optimize their AI/ML workloads for sustainability. Amogh has received his master’s in Computer Science specializing in Machine Learning.

Sheela Sonone is a Senior Resident Architect at AWS. She helps AWS customers make informed choices and tradeoffs about accelerating their data, analytics, and AI/ML workloads and implementations. In her spare time, she enjoys spending time with her family – usually on tennis courts.

Simplify operational data processing in data lakes using AWS Glue and Apache Hudi

Post Syndicated from Ravi Itha original https://aws.amazon.com/blogs/big-data/simplify-operational-data-processing-in-data-lakes-using-aws-glue-and-apache-hudi/

The Analytics specialty practice of AWS Professional Services (AWS ProServe) helps customers across the globe with modern data architecture implementations on the AWS Cloud. A modern data architecture is an evolutionary architecture pattern designed to integrate a data lake, data warehouse, and purpose-built stores with a unified governance model. It focuses on defining standards and patterns to integrate data producers and consumers and move data between data lakes and purpose-built data stores securely and efficiently. Out of the many data producer systems that feed data to a data lake, operational databases are most prevalent, where operational data is stored, transformed, analyzed, and finally used to enhance business operations of an organization. With the emergence of open storage formats such as Apache Hudi and its native support from AWS Glue for Apache Spark, many AWS customers have started adding transactional and incremental data processing capabilities to their data lakes.

AWS has invested in native service integration with Apache Hudi and published technical contents to enable you to use Apache Hudi with AWS Glue (for example, refer to Introducing native support for Apache Hudi, Delta Lake, and Apache Iceberg on AWS Glue for Apache Spark, Part 1: Getting Started). In AWS ProServe-led customer engagements, the use cases we work on usually come with technical complexity and scalability requirements. In this post, we discuss a common use case in relation to operational data processing and the solution we built using Apache Hudi and AWS Glue.

Use case overview

AnyCompany Travel and Hospitality wanted to build a data processing framework to seamlessly ingest and process data coming from operational databases (used by reservation and booking systems) in a data lake before applying machine learning (ML) techniques to provide a personalized experience to its users. Due to the sheer volume of direct and indirect sales channels the company has, its booking and promotions data are organized in hundreds of operational databases with thousands of tables. Of those tables, some are larger (such as in terms of record volume) than others, and some are updated more frequently than others. In the data lake, the data to be organized in the following storage zones:

  1. Source-aligned datasets – These have an identical structure to their counterparts at the source
  2. Aggregated datasets – These datasets are created based on one or more source-aligned datasets
  3. Consumer-aligned datasets – These are derived from a combination of source-aligned, aggregated, and reference datasets enriched with relevant business and transformation logics, usually fed as inputs to ML pipelines or any consumer applications

The following are the data ingestion and processing requirements:

  1. Replicate data from operational databases to the data lake, including insert, update, and delete operations.
  2. Keep the source-aligned datasets up to date (typically within the range of 10 minutes to a day) in relation to their counterparts in the operational databases, ensuring analytics pipelines refresh consumer-aligned datasets for downstream ML pipelines in a timely fashion. Moreover, the framework should consume compute resources as optimally as possible per the size of the operational tables.
  3. To minimize DevOps and operational overhead, the company wanted to templatize the source code wherever possible. For example, to create source-aligned datasets in the data lake for 3,000 operational tables, the company didn’t want to deploy 3,000 separate data processing jobs. The smaller the number of jobs and scripts, the better.
  4. The company wanted the ability to continue processing operational data in the secondary Region in the rare event of primary Region failure.

As you can guess, the Apache Hudi framework can solve the first requirement. Therefore, we will put our emphasis on the other requirements. We begin with a Data lake reference architecture followed by an overview of operational data processing framework. By showing you our open-source solution on GitHub, we delve into framework components and walk through their design and implementation aspects. Finally, by testing the framework, we summarize how it meets the aforementioned requirements.

Data lake reference architecture

Let’s begin with a big picture: a data lake solves a variety of analytics and ML use cases dealing with internal and external data producers and consumers. The following diagram represents a generic data lake architecture. To ingest data from operational databases to an Amazon Simple Storage Service (Amazon S3) staging bucket of the data lake, either AWS Database Migration Service (AWS DMS) or any AWS partner solution from AWS Marketplace that has support for change data capture (CDC) can fulfill the requirement. AWS Glue is used to create source-aligned and consumer-aligned datasets and separate AWS Glue jobs to do feature engineering part of ML engineering and operations. Amazon Athena is used for interactive querying and AWS Lake Formation is used for access controls.

Data Lake Reference Architecture

Operational data processing framework

The operational data processing (ODP) framework contains three components: File Manager, File Processor, and Configuration Manager. Each component runs independently to solve a portion of the operational data processing use case. We have open-sourced this framework on GitHub—you can clone the code repo and inspect it while we walk you through the design and implementation of the framework components. The source code is organized in three folders, one for each component, and if you customize and adopt this framework for your use case, we recommend promoting these folders as separate code repositories in your version control system. Consider using the following repository names:

  1. aws-glue-hudi-odp-framework-file-manager
  2. aws-glue-hudi-odp-framework-file-processor
  3. aws-glue-hudi-odp-framework-config-manager

With this modular approach, you can independently deploy the components to your data lake environment by following your preferred CI/CD processes. As illustrated in the preceding diagram, these components are deployed in conjunction with a CDC solution.

Component 1: File Manager

File Manager detects files emitted by a CDC process such as AWS DMS and tracks them in an Amazon DynamoDB table. As shown in the following diagram, it consists of an Amazon EventBridge event rule, an Amazon Simple Queue Service (Amazon SQS) queue, an AWS Lambda function, and a DynamoDB table. The EventBridge rule uses Amazon S3 Event Notifications to detect the arrival of CDC files in the S3 bucket. The event rule forwards the object event notifications to the SQS queue as messages. The File Manager Lambda function consumes those messages, parses the metadata, and inserts the metadata to the DynamoDB table odpf_file_tracker. These records will then be processed by File Processor, which we discuss in the next section.

ODPF Component: File Manager

Component 2: File Processor

File Processor is the workhorse of the ODP framework. It processes files from the S3 staging bucket, creates source-aligned datasets in the raw S3 bucket, and adds or updates metadata for the datasets (AWS Glue tables) in the AWS Glue Data Catalog.

We use the following terminology when discussing File Processor:

  1. Refresh cadence – This represents the data ingestion frequency (for example, 10 minutes). It usually goes with AWS Glue worker type (one of G.1X, G.2X, G.4X, G.8X, G.025X, and so on) and batch size.
  2. Table configuration – This includes the Hudi configuration (primary key, partition key, pre-combined key, and table type (Copy on Write or Merge on Read)), table data storage mode (historical or current snapshot), S3 bucket used to store source-aligned datasets, AWS Glue database name, AWS Glue table name, and refresh cadence.
  3. Batch size – This numeric value is used to split tables into smaller batches and process their respective CDC files in parallel. For example, a configuration of 50 tables with a 10-minute refresh cadence and a batch size of 5 results in a total of 10 AWS Glue job runs, each processing CDC files for 5 tables.
  4. Table data storage mode – There are two options:
    • Historical – This table in the data lake stores historical updates to records (always append).
    • Current snapshot – This table in the data lake stores latest versioned records (upserts) with the ability to use Hudi time travel for historical updates.
  5. File processing state machine – It processes CDC files that belong to tables that share a common refresh cadence.
  6. EventBridge rule association with the file processing state machine – We use a dedicated EventBridge rule for each refresh cadence with the file processing state machine as target.
  7. File processing AWS Glue job – This is a configuration-driven AWS Glue extract, transform, and load (ETL) job that processes CDC files for one or more tables.

File Processor is implemented as a state machine using AWS Step Functions. Let’s use an example to understand this. The following diagram illustrates running File Processor state machine with a configuration that includes 18 operational tables, a refresh cadence of 10 minutes, a batch size of 5, and an AWS Glue worker type of G.1X.

ODP framework component: File Processor

The workflow includes the following steps:

  1. The EventBridge rule triggers the File Processor state machine every 10 minutes.
  2. Being the first state in the state machine, the Batch Manager Lambda function reads configurations from DynamoDB tables.
  3. The Lambda function creates four batches: three of them will be mapped to five operational tables each, and the fourth one is mapped to three operational tables. Then it feeds the batches to the Step Functions Map state.
  4. For each item in the Map state, the File Processor Trigger Lambda function will be invoked, which in turn runs the File Processor AWS Glue job.
  5. Each AWS Glue job performs the following actions:
    • Checks the status of an operational table and acquires a lock when it is not processed by any other job. The odpf_file_processing_tracker DynamoDB table is used for this purpose. When a lock is acquired, it inserts a record in the DynamoDB table with the status updating_table for the first time; otherwise, it updates the record.
    • Processes the CDC files for the given operational table from the S3 staging bucket and creates a source-aligned dataset in the S3 raw bucket. It also updates technical metadata in the AWS Glue Data Catalog.
    • Updates the status of the operational table to completed in the odpf_file_processing_tracker table. In case of processing errors, it updates the status to refresh_error and logs the stack trace.
    • It also inserts this record into the odpf_file_processing_tracker_history DynamoDB table along with additional details such as insert, update, and delete row counts.
    • Moves the records that belong to successfully processed CDC files from odpf_file_tracker to the odpf_file_tracker_history table with file_ingestion_status set to raw_file_processed.
    • Moves to the next operational table in the given batch.
    • Note: a failure to process CDC files for one of the operational tables of a given batch does not impact the processing of other operational tables.

Component 3: Configuration Manager

Configuration Manager is used to insert configuration details to the odpf_batch_config and odpf_raw_table_config tables. To keep this post concise, we provide two architecture patterns in the code repo and leave the implementation details to you.

Solution overview

Let’s test the ODP framework by replicating data from 18 operational tables to a data lake and creating source-aligned datasets with 10-minute refresh cadence. We use Amazon Relational Database Service (Amazon RDS) for MySQL to set up an operational database with 18 tables, upload the New York City Taxi – Yellow Trip Data dataset, set up AWS DMS to replicate data to Amazon S3, process the files using the framework, and finally validate the data using Amazon Athena.

Create S3 buckets

For instructions on creating an S3 bucket, refer to Creating a bucket. For this post, we create the following buckets:

  1. odpf-demo-staging-EXAMPLE-BUCKET – You will use this to migrate operational data using AWS DMS
  2. odpf-demo-raw-EXAMPLE-BUCKET – You will use this to store source-aligned datasets
  3. odpf-demo-code-artifacts-EXAMPLE-BUCKET – You will use this to store code artifacts

Deploy File Manager and File Processor

Deploy File Manager and File Processor by following instructions from this README and this README, respectively.

Set up Amazon RDS for MySQL

Complete the following steps to set up Amazon RDS for MySQL as the operational data source:

  1. Provision Amazon RDS for MySQL. For instructions, refer to Create and Connect to a MySQL Database with Amazon RDS.
  2. Connect to the database instance using MySQL Workbench or DBeaver.
  3. Create a database (schema) by running the SQL command CREATE DATABASE taxi_trips;.
  4. Create 18 tables by running the SQL commands in the ops_table_sample_ddl.sql script.

Populate data to the operational data source

Complete the following steps to populate data to the operational data source:

  1. To download the New York City Taxi – Yellow Trip Data dataset for January 2021 (Parquet file), navigate to NYC TLC Trip Record Data, expand 2021, and choose Yellow Taxi Trip records. A file called yellow_tripdata_2021-01.parquet will be downloaded to your computer.
  2. On the Amazon S3 console, open the bucket odpf-demo-staging-EXAMPLE-BUCKET and create a folder called nyc_yellow_trip_data.
  3. Upload the yellow_tripdata_2021-01.parquet file to the folder.
  4. Navigate to the bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET and create a folder called glue_scripts.
  5. Download the file load_nyc_taxi_data_to_rds_mysql.py from the GitHub repo and upload it to the folder.
  6. Create an AWS Identity and Access Management (IAM) policy called load_nyc_taxi_data_to_rds_mysql_s3_policy. For instructions, refer to Creating policies using the JSON editor. Use the odpf_setup_test_data_glue_job_s3_policy.json policy definition.
  7. Create an IAM role called load_nyc_taxi_data_to_rds_mysql_glue_role. Attach the policy created in the previous step.
  8. On the AWS Glue console, create a connection for Amazon RDS for MySQL. For instructions, refer to Adding a JDBC connection using your own JDBC drivers and Setting up a VPC to connect to Amazon RDS data stores over JDBC for AWS Glue. Name the connection as odpf_demo_rds_connection.
  9. In the navigation pane of the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  10. Choose the file load_nyc_taxi_data_to_rds_mysql.py and choose Create.
  11. Complete the following steps to create your job:
    • Provide a name for the job, such as load_nyc_taxi_data_to_rds_mysql.
    • For IAM role, choose load_nyc_taxi_data_to_rds_mysql_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Advanced properties, Connections, select the connection you created earlier.
    • Under Job parameters, add the following parameters:
      • input_sample_data_path = s3://odpf-demo-staging-EXAMPLE-BUCKET/nyc_yellow_trip_data/yellow_tripdata_2021-01.parquet
      • schema_name = taxi_trips
      • table_name = table_1
      • rds_connection_name = odpf_demo_rds_connection
    • Choose Save.
  12. On the Actions menu, run the job.
  13. Go back to your MySQL Workbench or DBeaver and validate the record count by running the SQL command select count(1) row_count from taxi_trips.table_1. You will get an output of 1369769.
  14. Populate the remaining 17 tables by running the SQL commands from the populate_17_ops_tables_rds_mysql.sql script.
  15. Get the row count from the 18 tables by running the SQL commands from the ops_data_validation_query_rds_mysql.sql script. The following screenshot shows the output.
    Record volumes (for 18 Tables) in Operational Database

Configure DynamoDB tables

Complete the following steps to configure the DynamoDB tables:

  1. Download file load_ops_table_configs_to_ddb.py from the GitHub repo and upload it to the folder glue_scripts in the S3 bucket odpf-demo-code-artifacts-EXAMPLE-BUCKET.
  2. Create an IAM policy called load_ops_table_configs_to_ddb_ddb_policy. Use the odpf_setup_test_data_glue_job_ddb_policy.json policy definition.
  3. Create an IAM role called load_ops_table_configs_to_ddb_glue_role. Attach the policy created in the previous step.
  4. On the AWS Glue console, choose Glue ETL jobs, Python Shell script editor, and Upload and edit an existing script under Options.
  5. Choose the file load_ops_table_configs_to_ddb.py and choose Create.
  6. Complete the following steps to create a job:
    • Provide a name, such as load_ops_table_configs_to_ddb.
    • For IAM role, choose load_ops_table_configs_to_ddb_glue_role.
    • Set Data processing units to 1/16 DPU.
    • Under Job parameters, add the following parameters
      • batch_config_ddb_table_name = odpf_batch_config
      • raw_table_config_ddb_table_name = odpf_demo_taxi_trips_raw
      • aws_region = e.g., us-west-1
    • Choose Save.
  7. On the Actions menu, run the job.
  8. On the DynamoDB console, get the item count from the tables. You will find 1 item in the odpf_batch_config table and 18 items in the odpf_demo_taxi_trips_raw table.

Set up a database in AWS Glue

Complete the following steps to create a database:

  1. On the AWS Glue console, under Data catalog in the navigation pane, choose Databases.
  2. Create a database called odpf_demo_taxi_trips_raw.

Set up AWS DMS for CDC

Complete the following steps to set up AWS DMS for CDC:

  1. Create an AWS DMS replication instance. For Instance class, choose dms.t3.medium.
  2. Create a source endpoint for Amazon RDS for MySQL.
  3. Create target endpoint for Amazon S3. To configure the S3 endpoint settings, use the JSON definition from dms_s3_endpoint_setting.json.
  4. Create an AWS DMS task.
    • Use the source and target endpoints created in the previous steps.
    • To create AWS DMS task mapping rules, use the JSON definition from dms_task_mapping_rules.json.
    • Under Migration task startup configuration, select Automatically on create.
  5. When the AWS DMS task starts running, you will see a task summary similar to the following screenshot.
    DMS Task Summary
  6. In the Table statistics section, you will see an output similar to the following screenshot. Here, the Full load rows and Total rows columns are important metrics whose counts should match with the record volumes of the 18 tables in the operational data source.
    DMS Task Statistics
  7. As a result of successful full load completion, you will find Parquet files in the S3 staging bucket—one Parquet file per table in a dedicated folder, similar to the following screenshot. Similarly, you will find 17 such folders in the bucket.
    DMS Output in S3 Staging Bucket for Table 1

File Manager output

The File Manager Lambda function consumes messages from the SQS queue, extracts metadata for the CDC files, and inserts one item per file to the odpf_file_tracker DynamoDB table. When you check the items, you will find 18 items with file_ingestion_status set to raw_file_landed, as shown in the following screenshot.

CDC Files in File Tracker DynamoDB Table

File Processor output

  1. On the subsequent tenth minute (since the activation of the EventBridge rule), the event rule triggers the File Processor state machine. On the Step Functions console, you will notice that the state machine is invoked, as shown in the following screenshot.
    File Processor State Machine Run Summary
  2. As shown in the following screenshot, the Batch Generator Lambda function creates four batches and constructs a Map state for parallel running of the File Processor Trigger Lambda function.
    File Processor State Machine Run Details
  3. Then, the File Processor Trigger Lambda function runs the File Processor Glue Job, as shown in the following screenshot.
    File Processor Glue Job Parallel Runs
  4. Then, you will notice that the File Processor Glue Job runs create source-aligned datasets in Hudi format in the S3 raw bucket. For Table 1, you will see an output similar to the following screenshot. There will be 17 such folders in the S3 raw bucket.
    Data in S3 raw bucket
  5. Finally, in AWS Glue Data Catalog, you will notice 18 tables created in the odpf_demo_taxi_trips_raw database, similar to the following screenshot.
    Tables in Glue Database

Data validation

Complete the following steps to validate the data:

  1. On the Amazon Athena console, open the query editor, and select a workgroup or create a new workgroup.
  2. Choose AwsDataCatalog for Data source and odpf_demo_taxi_trips_raw for Database.
  3. Run the raw_data_validation_query_athena.sql SQL query. You will get an output similar to the following screenshot.
    Raw Data Validation via Amazon Athena

Validation summary: The counts in Amazon Athena match with the counts of the operational tables and it proves that the ODP framework has processed all the files and records successfully. This concludes the demo. To test additional scenarios, refer to Extended Testing in the code repo.

Outcomes

Let’s review how the ODP framework addressed the aforementioned requirements.

  1. As discussed earlier in this post, by logically grouping tables by refresh cadence and associating them to EventBridge rules, we ensured that the source-aligned tables are refreshed by the File Processor AWS Glue jobs. With the AWS Glue worker type configuration setting, we selected the appropriate compute resources while running the AWS Glue jobs (the instances of the AWS Glue job).
  2. By applying table-specific configurations (from odpf_batch_config and odpf_raw_table_config) dynamically, we were able to use one AWS Glue job to process CDC files for 18 tables.
  3. You can use this framework to support a variety of data migration use cases that require quicker data migration from on-premises storage systems to data lakes or analytics platforms on AWS. You can reuse File Manager as is and customize File Processor to work with other storage frameworks such as Apache Iceberg, Delta Lake, and purpose-built data stores such as Amazon Aurora and Amazon Redshift.
  4. To understand how the ODP framework met the company’s disaster recovery (DR) design criterion, we first need to understand the DR architecture strategy at a high level. The DR architecture strategy has the following aspects:
    • One AWS account and two AWS Regions are used for primary and secondary environments.
    • The data lake infrastructure in the secondary Region is kept in sync with the one in the primary Region.
    • Data is stored in S3 buckets, metadata data is stored in the AWS Glue Data Catalog, and access controls in Lake Formation are replicated from the primary to secondary Region.
    • The data lake source and target systems have their respective DR environments.
    • CI/CD tooling (version control, CI server, and so on) are to be made highly available.
    • The DevOps team needs to be able to deploy CI/CD pipelines of analytics frameworks (such as this ODP framework) to either the primary or secondary Region.
    • As you can imagine, disaster recovery on AWS is a vast subject, so we keep our discussion to the last design aspect.

By designing the ODP framework with three components and externalizing operational table configurations to DynamoDB global tables, the company was able to deploy the framework components to the secondary Region (in the rare event of a single-Region failure) and continue to process CDC files from the point it last processed in the primary Region. Because the CDC file tracking and processing audit data is replicated to the DynamoDB replica tables in the secondary Region, the File Manager microservice and File Processor can seamlessly run.

Clean up

When you’re finished testing this framework, you can delete the provisioned AWS resources to avoid any further charges.

Conclusion

In this post, we took a real-world operational data processing use case and presented you the framework we developed at AWS ProServe. We hope this post and the operational data processing framework using AWS Glue and Apache Hudi will expedite your journey in integrating operational databases into your modern data platforms built on AWS.


About the authors

Ravi-IthaRavi Itha is a Principal Consultant at AWS Professional Services with specialization in data and analytics and generalist background in application development. Ravi helps customers with enterprise data strategy initiatives across insurance, airlines, pharmaceutical, and financial services industries. In his 6-year tenure at Amazon, Ravi has helped the AWS builder community by publishing approximately 15 open-source solutions (accessible via GitHub handle), four blogs, and reference architectures. Outside of work, he is passionate about reading India Knowledge Systems and practicing Yoga Asanas.

srinivas-kandiSrinivas Kandi is a Data Architect at AWS Professional Services. He leads customer engagements related to data lakes, analytics, and data warehouse modernizations. He enjoys reading history and civilizations.

Securely process near-real-time data from Amazon MSK Serverless using an AWS Glue streaming ETL job with IAM authentication

Post Syndicated from Shubham Purwar original https://aws.amazon.com/blogs/big-data/securely-process-near-real-time-data-from-amazon-msk-serverless-using-an-aws-glue-streaming-etl-job-with-iam-authentication/

Streaming data has become an indispensable resource for organizations worldwide because it offers real-time insights that are crucial for data analytics. The escalating velocity and magnitude of collected data has created a demand for real-time analytics. This data originates from diverse sources, including social media, sensors, logs, and clickstreams, among others. With streaming data, organizations gain a competitive edge by promptly responding to real-time events and making well-informed decisions.

In streaming applications, a prevalent approach involves ingesting data through Apache Kafka and processing it with Apache Spark Structured Streaming. However, managing, integrating, and authenticating the processing framework (Apache Spark Structured Streaming) with the ingesting framework (Kafka) poses significant challenges, necessitating a managed and serverless framework. For example, integrating and authenticating a client like Spark streaming with Kafka brokers and zookeepers using a manual TLS method requires certificate and keystore management, which is not an easy task and requires a good knowledge of TLS setup.

To address these issues effectively, we propose using Amazon Managed Streaming for Apache Kafka (Amazon MSK), a fully managed Apache Kafka service that offers a seamless way to ingest and process streaming data. In this post, we use Amazon MSK Serverless, a cluster type for Amazon MSK that makes it possible for you to run Apache Kafka without having to manage and scale cluster capacity. To further enhance security and streamline authentication and authorization processes, MSK Serverless enables you to handle both authentication and authorization using AWS Identity and Access Management (IAM) in your cluster. This integration eliminates the need for separate mechanisms for authentication and authorization, simplifying and strengthening data protection. For example, when a client tries to write to your cluster, MSK Serverless uses IAM to check whether that client is an authenticated identity and also whether it is authorized to produce to your cluster.

To process data effectively, we use AWS Glue, a serverless data integration service that uses the Spark Structured Streaming framework and enables near-real-time data processing. An AWS Glue streaming job can handle large volumes of incoming data from MSK Serverless with IAM authentication. This powerful combination ensures that data is processed securely and swiftly.

The post demonstrates how to build an end-to-end implementation to process data from MSK Serverless using an AWS Glue streaming extract, transform, and load (ETL) job with IAM authentication to connect MSK Serverless from the AWS Glue job and query the data using Amazon Athena.

Solution overview

The following diagram illustrates the architecture that you implement in this post.

The workflow consists of the following steps:

  1. Create an MSK Serverless cluster with IAM authentication and an EC2 Kafka client as the producer to ingest sample data into a Kafka topic. For this post, we use the kafka-console-producer.sh Kafka console producer client.
  2. Set up an AWS Glue streaming ETL job to process the incoming data. This job extracts data from the Kafka topic, loads it into Amazon Simple Storage Service (Amazon S3), and creates a table in the AWS Glue Data Catalog. By continuously consuming data from the Kafka topic, the ETL job ensures it remains synchronized with the latest streaming data. Moreover, the job incorporates the checkpointing functionality, which tracks the processed records, enabling it to resume processing seamlessly from the point of interruption in the event of a job run failure.
  3. Following the data processing, the streaming job stores data in Amazon S3 and generates a Data Catalog table. This table acts as a metadata layer for the data. To interact with the data stored in Amazon S3, you can use Athena, a serverless and interactive query service. Athena enables the run of SQL-like queries on the data, facilitating seamless exploration and analysis.

For this post, we create the solution resources in the us-east-1 Region using AWS CloudFormation templates. In the following sections, we show you how to configure your resources and implement the solution.

Configure resources with AWS CloudFormation

In this post, you use the following two CloudFormation templates. The advantage of using two different templates is that you can decouple the resource creation of ingestion and processing part according to your use case and if you have requirements to create specific process resources only.

  • vpc-mskserverless-client.yaml – This template sets up data the ingestion service resources such as a VPC, MSK Serverless cluster, and S3 bucket
  • gluejob-setup.yaml – This template sets up the data processing resources such as the AWS Glue table, database, connection, and streaming job

Create data ingestion resources

The vpc-mskserverless-client.yaml stack creates a VPC, private and public subnets, security groups, S3 VPC Endpoint, MSK Serverless cluster, EC2 instance with Kafka client, and S3 bucket. To create the solution resources for data ingestion, complete the following steps:

  1. Launch the stack vpc-mskserverless-client using the CloudFormation template:
  2. Provide the parameter values as listed in the following table.
Parameters Description Sample Value
EnvironmentName Environment name that is prefixed to resource names .
PrivateSubnet1CIDR IP range (CIDR notation) for the private subnet in the first Availability Zone .
PrivateSubnet2CIDR IP range (CIDR notation) for the private subnet in the second Availability Zone .
PublicSubnet1CIDR IP range (CIDR notation) for the public subnet in the first Availability Zone .
PublicSubnet2CIDR IP range (CIDR notation) for the public subnet in the second Availability Zone .
VpcCIDR IP range (CIDR notation) for this VPC .
InstanceType Instance type for the EC2 instance t2.micro
LatestAmiId AMI used for the EC2 instance /aws/service/ami-amazon-linux- latest/amzn2-ami-hvm-x86_64-gp2
  1. When the stack creation is complete, retrieve the EC2 instance PublicDNS from the vpc-mskserverless-client stack’s Outputs tab.

The stack creation process can take around 15 minutes to complete.

  1. On the Amazon EC2 console, access the EC2 instance that you created using the CloudFormation template.
  2. Choose the EC2 instance whose InstanceId is shown on the stack’s Outputs tab.

Next, you log in to the EC2 instance using Session Manager, a capability of AWS Systems Manager.

  1. On the Amazon EC2 console, select the instanceid and on the Session Manager tab, choose Connect.


After you log in to the EC2 instance, you create a Kafka topic in the MSK Serverless cluster from the EC2 instance.

  1. In the following export command, provide the MSKBootstrapServers value from the vpc-mskserverless- client stack output for your endpoint:
    $ sudo su – ec2-user
    $ BS=<your-msk-serverless-endpoint (e.g.) boot-xxxxxx.yy.kafka-serverless.us-east-1.a>

  2. Run the following command on the EC2 instance to create a topic called msk-serverless-blog. The Kafka client is already installed in the ec2-user home directory (/home/ec2-user).
    $ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-topics.sh \
    --bootstrap-server $BS \
    --command-config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
    --create –topic msk-serverless-blog \
    --partitions 1
    
    Created topic msk-serverless-blog

After you confirm the topic creation, you can push the data to the MSK Serverless.

  1. Run the following command on the EC2 instance to create a console producer to produce records to the Kafka topic. (For source data, we use nycflights.csv downloaded at the ec2-user home directory /home/ec2-user.)
$ /home/ec2-user/kafka_2.12-2.8.1/bin/kafka-console-producer.sh \
--broker-list $BS \
--producer.config /home/ec2-user/kafka_2.12-2.8.1/bin/client.properties \
--topic msk-serverless-blog < nycflights.csv

Next, you set up the data processing service resources, specifically AWS Glue components like the database, table, and streaming job to process the data.

Create data processing resources

The gluejob-setup.yaml CloudFormation template creates a database, table, AWS Glue connection, and AWS Glue streaming job. Retrieve the values for VpcId, GluePrivateSubnet, GlueconnectionSubnetAZ, SecurityGroup, S3BucketForOutput, and S3BucketForGlueScript from the vpc-mskserverless-client stack’s Outputs tab to use in this template. Complete the following steps:

  1. Launch the stack gluejob-setup:

  1. Provide parameter values as listed in the following table.
Parameters Description Sample value
EnvironmentName Environment name that is prefixed to resource names. Gluejob-setup
VpcId ID of the VPC for security group. Use the VPC ID created with the first stack. Refer to the first stack’s output.
GluePrivateSubnet Private subnet used for creating the AWS Glue connection. Refer to the first stack’s output.
SecurityGroupForGlueConnection Security group used by the AWS Glue connection. Refer to the first stack’s output.
GlueconnectionSubnetAZ Availability Zone for the first private subnet used for the AWS Glue connection. .
GlueDataBaseName Name of the AWS Glue Data Catalog database. glue_kafka_blog_db
GlueTableName Name of the AWS Glue Data Catalog table. blog_kafka_tbl
S3BucketNameForScript Bucket Name for Glue ETL script. Use the S3 bucket name from the previous stack. For example, aws-gluescript-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
GlueWorkerType Worker type for AWS Glue job. For example, G.1X. G.1X
NumberOfWorkers Number of workers in the AWS Glue job. 3
S3BucketNameForOutput Bucket name for writing data from the AWS Glue job. aws-glueoutput-${AWS::AccountId}-${AWS::Region}-${EnvironmentName}
TopicName MSK topic name that needs to be processed. msk-serverless-blog
MSKBootstrapServers Kafka bootstrap server. boot-30vvr5lg.c1.kafka-serverless.us- east-1.amazonaws.com:9098

The stack creation process can take around 1–2 minutes to complete. You can check the Outputs tab for the stack after the stack is created.

In the gluejob-setup stack, we created a Kafka type AWS Glue connection, which consists of broker information like the MSK bootstrap server, topic name, and VPC in which the MSK Serverless cluster is created. Most importantly, it specifies the IAM authentication option, which helps AWS Glue authenticate and authorize using IAM authentication while consuming the data from the MSK topic. For further clarity, you can examine the AWS Glue connection and the associated AWS Glue table generated through AWS CloudFormation.

After successfully creating the CloudFormation stack, you can now proceed with processing data using the AWS Glue streaming job with IAM authentication.

Run the AWS Glue streaming job

To process the data from the MSK topic using the AWS Glue streaming job that you set up in the previous section, complete the following steps:

  1. On the CloudFormation console, choose the stack gluejob-setup.
  2. On the Outputs tab, retrieve the name of the AWS Glue streaming job from the GlueJobName row. In the following screenshot, the name is GlueStreamingJob-glue-streaming-job.

  1. On the AWS Glue console, choose ETL jobs in the navigation pane.
  2. Search for the AWS Glue streaming job named GlueStreamingJob-glue-streaming-job.
  3. Choose the job name to open its details page.
  4. Choose Run to start the job.
  5. On the Runs tab, confirm if the job ran without failure.

  1. Retrieve the OutputBucketName from the gluejob-setup template outputs.
  2. On the Amazon S3 console, navigate to the S3 bucket to verify the data.

  1. On the AWS Glue console, choose the AWS Glue streaming job you ran, then choose Stop job run.

Because this is a streaming job, it will continue to run indefinitely until manually stopped. After you verify the data is present in the S3 output bucket, you can stop the job to save cost.

Validate the data in Athena

After the AWS Glue streaming job has successfully created the table for the processed data in the Data Catalog, follow these steps to validate the data using Athena:

  1. On the Athena console, navigate to the query editor.
  2. Choose the Data Catalog as the data source.
  3. Choose the database and table that the AWS Glue streaming job created.
  4. To validate the data, run the following query to find the flight number, origin, and destination that covered the highest distance in a year:
SELECT distinct(flight),distance,origin,dest,year from "glue_kafka_blog_db"."output" where distance= (select MAX(distance) from "glue_kafka_blog_db"."output")

The following screenshot shows the output of our example query.

Clean up

To clean up your resources, complete the following steps:

  1. Delete the CloudFormation stack gluejob-setup.
  2. Delete the CloudFormation stack vpc-mskserverless-client.

Conclusion

In this post, we demonstrated a use case for building a serverless ETL pipeline for streaming with IAM authentication, which allows you to focus on the outcomes of your analytics. You can also modify the AWS Glue streaming ETL code in this post with transformations and mappings to ensure that only valid data gets loaded to Amazon S3. This solution enables you to harness the prowess of AWS Glue streaming, seamlessly integrated with MSK Serverless through the IAM authentication method. It’s time to act and revolutionize your streaming processes.

Appendix

This section provides more information about how to create the AWS Glue connection on the AWS Glue console, which helps establish the connection to the MSK Serverless cluster and allow the AWS Glue streaming job to authenticate and authorize using IAM authentication while consuming the data from the MSK topic.

  1. On the AWS Glue console, in the navigation pane, under Data catalog, choose Connections.
  2. Choose Create connection.
  3. For Connection name, enter a unique name for your connection.
  4. For Connection type, choose Kafka.
  5. For Connection access, select Amazon managed streaming for Apache Kafka (MSK).
  6. For Kafka bootstrap server URLs, enter a comma-separated list of bootstrap server URLs. Include the port number. For example, boot-xxxxxxxx.c2.kafka-serverless.us-east- 1.amazonaws.com:9098.

  1. For Authentication, choose IAM Authentication.
  2. Select Require SSL connection.
  3. For VPC, choose the VPC that contains your data source.
  4. For Subnet, choose the private subnet within your VPC.
  5. For Security groups, choose a security group to allow access to the data store in your VPC subnet.

Security groups are associated to the ENI attached to your subnet. You must choose at least one security group with a self-referencing inbound rule for all TCP ports.

  1. Choose Save changes.

After you create the AWS Glue connection, you can use the AWS Glue streaming job to consume data from the MSK topic using IAM authentication.


About the authors

Shubham Purwar is a Cloud Engineer (ETL) at AWS Bengaluru specialized in AWS Glue and Amazon Athena. He is passionate about helping customers solve issues related to their ETL workload and implement scalable data processing and analytics pipelines on AWS. In his free time, Shubham loves to spend time with his family and travel around the world.

Nitin Kumar is a Cloud Engineer (ETL) at AWS with a specialization in AWS Glue. He is dedicated to assisting customers in resolving issues related to their ETL workloads and creating scalable data processing and analytics pipelines on AWS.