All posts by Pathik Shah

Use Amazon Athena with Spark SQL for your open-source transactional table formats

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/use-amazon-athena-with-spark-sql-for-your-open-source-transactional-table-formats/

AWS-powered data lakes, supported by the unmatched availability of Amazon Simple Storage Service (Amazon S3), can handle the scale, agility, and flexibility required to combine different data and analytics approaches. As data lakes have grown in size and matured in usage, a significant amount of effort can be spent keeping the data consistent with business events. To ensure files are updated in a transactionally consistent manner, a growing number of customers are using open-source transactional table formats such as Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake that help you store data with high compression rates, natively interface with your applications and frameworks, and simplify incremental data processing in data lakes built on Amazon S3. These formats enable ACID (atomicity, consistency, isolation, durability) transactions, upserts, and deletes, and advanced features such as time travel and snapshots that were previously only available in data warehouses. Each storage format implements this functionality in slightly different ways; for a comparison, refer to Choosing an open table format for your transactional data lake on AWS.

In 2023, AWS announced general availability for Apache Iceberg, Apache Hudi, and Linux Foundation Delta Lake in Amazon Athena for Apache Spark, which removes the need to install a separate connector or associated dependencies and manage versions, and simplifies the configuration steps required to use these frameworks.

In this post, we show you how to use Spark SQL in Amazon Athena notebooks and work with Iceberg, Hudi, and Delta Lake table formats. We demonstrate common operations such as creating databases and tables, inserting data into the tables, querying data, and looking at snapshots of the tables in Amazon S3 using Spark SQL in Athena.

Prerequisites

Complete the following prerequisites:

Download and import example notebooks from Amazon S3

To follow along, download the notebooks discussed in this post from the following locations:

After you download the notebooks, import them into your Athena Spark environment by following the To import a notebook section in Managing notebook files.

Navigate to specific Open Table Format section

If you are interested in Iceberg table format, navigate to Working with Apache Iceberg tables section.

If you are interested in Hudi table format, navigate to Working with Apache Hudi tables section.

If you are interested in Delta Lake table format, navigate to Working with Linux foundation Delta Lake tables section.

Working with Apache Iceberg tables

When using Spark notebooks in Athena, you can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook cell that change the cell’s behavior. For SQL, we can add the %%sql magic, which will interpret the entire cell contents as a SQL statement to be run on Athena.

In this section, we show how you can use SQL on Apache Spark for Athena to create, analyze, and manage Apache Iceberg tables.

Set up a notebook session

In order to use Apache Iceberg in Athena, while creating or editing a session, select the Apache Iceberg option by expanding the Apache Spark properties section. It will pre-populate the properties as shown in the following screenshot.

This image shows the Apache Iceberg properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section is available in the SparkSQL_iceberg.ipynb file to follow along.

Create a database and Iceberg table

First, we create a database in the AWS Glue Data Catalog. With the following SQL, we can create a database called icebergdb:

%%sql
CREATE DATABASE icebergdb

Next, in the database icebergdb, we create an Iceberg table called noaa_iceberg pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE icebergdb.noaa_iceberg(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING iceberg
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaaiceberg/'

Insert data into the table

To populate the noaa_iceberg Iceberg table, we insert data from the Parquet table sparkblogdb.noaa_pq that was created as part of the prerequisites. You can do this using an INSERT INTO statement in Spark:

%%sql
INSERT INTO icebergdb.noaa_iceberg select * from sparkblogdb.noaa_pq

Alternatively, you can use CREATE TABLE AS SELECT with the USING iceberg clause to create an Iceberg table and insert data from a source table in one step:

%%sql
CREATE TABLE icebergdb.noaa_iceberg
USING iceberg
PARTITIONED BY (year)
AS SELECT * FROM sparkblogdb.noaa_pq

Query the Iceberg table

Now that the data is inserted in the Iceberg table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature by year for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

We get following output.

Image shows output of first select query

Update data in the Iceberg table

Let’s look at how to update data in our table. We want to update the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea-Tac'. Using Spark SQL, we can run an UPDATE statement against the Iceberg table:

%%sql
UPDATE icebergdb.noaa_iceberg
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We can then run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac' location:

%%sql
select name, year, min(MIN) as minimum_temperature
from icebergdb.noaa_iceberg
where name = 'Sea-Tac'
group by 1,2

We get the following output.

Image shows output of second select query

Compact data files

Open table formats like Iceberg work by creating delta changes in file storage, and tracking the versions of rows through manifest files. More data files leads to more metadata stored in manifest files, and small data files often cause an unnecessary amount of metadata, resulting in less efficient queries and higher Amazon S3 access costs. Running Iceberg’s rewrite_data_files procedure in Spark for Athena will compact data files, combining many small delta change files into a smaller set of read-optimized Parquet files. Compacting files speeds up the read operation when queried. To run compaction on our table, run the following Spark SQL:

%%sql
CALL spark_catalog.system.rewrite_data_files
(table => 'icebergdb.noaa_iceberg', strategy=>'sort', sort_order => 'zorder(name)')

rewrite_data_files offers options to specify your sort strategy, which can help reorganize and compact data.

List table snapshots

Each write, update, delete, upsert, and compaction operation on an Iceberg table creates a new snapshot of a table while keeping the old data and metadata around for snapshot isolation and time travel. To list the snapshots of an Iceberg table, run the following Spark SQL statement:

%%sql
SELECT *
FROM spark_catalog.icebergdb.noaa_iceberg.snapshots

Expire old snapshots

Regularly expiring snapshots is recommended to delete data files that are no longer needed, and to keep the size of table metadata small. It will never remove files that are still required by a non-expired snapshot. In Spark for Athena, run the following SQL to expire snapshots for the table icebergdb.noaa_iceberg that are older than a specific timestamp:

%%sql
CALL spark_catalog.system.expire_snapshots
('icebergdb.noaa_iceberg', TIMESTAMP '2023-11-30 00:00:00.000')

Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff. The output will give a count of the number of data and metadata files deleted.

Drop the table and database

You can run the following Spark SQL to clean up the Iceberg tables and associated data in Amazon S3 from this exercise:

%%sql
DROP TABLE icebergdb.noaa_iceberg PURGE

Run the following Spark SQL to remove the database icebergdb:

%%sql
DROP DATABASE icebergdb

To learn more about all the operations you can perform on Iceberg tables using Spark for Athena, refer to Spark Queries and Spark Procedures in the Iceberg documentation.

Working with Apache Hudi tables

Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Apache Hudi tables.

Set up a notebook session

In order to use Apache Hudi in Athena, while creating or editing a session, select the Apache Hudi option by expanding the Apache Spark properties section.

This image shows the Apache Hudi properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section should be available in the SparkSQL_hudi.ipynb file to follow along.

Create a database and Hudi table

First, we create a database called hudidb that will be stored in the AWS Glue Data Catalog followed by Hudi table creation:

%%sql
CREATE DATABASE hudidb

We create a Hudi table pointing to a location in Amazon S3 where we will load the data. Note that the table is of copy-on-write type. It is defined by type= 'cow' in the table DDL. We have defined station and date as the multiple primary keys and preCombinedField as year. Also, the table is partitioned on year. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE hudidb.noaa_hudi(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string,
year string)
USING HUDI
PARTITIONED BY (year)
TBLPROPERTIES(
primaryKey = 'station, date',
preCombineField = 'year',
type = 'cow'
)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaahudi/'

Insert data into the table

Like with Iceberg, we use the INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq table created in the previous post:

%%sql
INSERT INTO hudidb.noaa_hudi select * from sparkblogdb.noaa_pq

Query the Hudi table

Now that the table is created, let’s run a query to find the maximum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Update data in the Hudi table

Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_hudi table:

%%sql
UPDATE hudidb.noaa_hudi
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We run the previous SELECT query to find the maximum recorded temperature for the 'Sea-Tac' location:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi
where name = 'Sea-Tac'
group by 1,2

Run time travel queries

We can use time travel queries in SQL on Athena to analyze past data snapshots. For example:

%%sql
select name, year, max(MAX) as maximum_temperature
from hudidb.noaa_hudi timestamp as of '2023-12-01 23:53:43.100'
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

This query checks the Seattle Airport temperature data as of a specific time in the past. The timestamp clause lets us travel back without altering current data. Note that the timestamp value is specified as a string in format yyyy-MM-dd HH:mm:ss.fff.

Optimize query speed with clustering

To improve query performance, you can perform clustering on Hudi tables using SQL in Spark for Athena:

%%sql
CALL run_clustering(table => 'hudidb.noaa_hudi', order => 'name')

Compact tables

Compaction is a table service employed by Hudi specifically in Merge On Read (MOR) tables to merge updates from row-based log files to the corresponding columnar-based base file periodically to produce a new version of the base file. Compaction is not applicable to Copy On Write (COW) tables and only applies to MOR tables. You can run the following query in Spark for Athena to perform compaction on MOR tables:

%%sql
CALL run_compaction(op => 'run', table => 'hudi_table_mor');

Drop the table and database

Run the following Spark SQL to remove the Hudi table you created and associated data from the Amazon S3 location:

%%sql
DROP TABLE hudidb.noaa_hudi PURGE

Run the following Spark SQL to remove the database hudidb:

%%sql
DROP DATABASE hudidb

To learn about all the operations you can perform on Hudi tables using Spark for Athena, refer to SQL DDL and Procedures in the Hudi documentation.

Working with Linux foundation Delta Lake tables

Next, we show how you can use SQL on Spark for Athena to create, analyze, and manage Delta Lake tables.

Set up a notebook session

In order to use Delta Lake in Spark for Athena, while creating or editing a session, select Linux Foundation Delta Lake by expanding the Apache Spark properties section.

This image shows the Delta Lake properties set while creating Spak session in Athena.

For steps, see Editing session details or Creating your own notebook.

The code used in this section should be available in the SparkSQL_delta.ipynb file to follow along.

Create a database and Delta Lake table

In this section, we create a database in the AWS Glue Data Catalog. Using following SQL, we can create a database called deltalakedb:

%%sql
CREATE DATABASE deltalakedb

Next, in the database deltalakedb, we create a Delta Lake table called noaa_delta pointing to a location in Amazon S3 where we will load the data. Run the following statement and replace the location s3://<your-S3-bucket>/<prefix>/ with your S3 bucket and prefix:

%%sql
CREATE TABLE deltalakedb.noaa_delta(
station string,
date string,
latitude string,
longitude string,
elevation string,
name string,
temp string,
temp_attributes string,
dewp string,
dewp_attributes string,
slp string,
slp_attributes string,
stp string,
stp_attributes string,
visib string,
visib_attributes string,
wdsp string,
wdsp_attributes string,
mxspd string,
gust string,
max string,
max_attributes string,
min string,
min_attributes string,
prcp string,
prcp_attributes string,
sndp string,
frshtt string)
USING delta
PARTITIONED BY (year string)
LOCATION 's3://<your-S3-bucket>/<prefix>/noaadelta/'

Insert data into the table

We use an INSERT INTO statement to populate the table by reading data from the sparkblogdb.noaa_pq table created in the previous post:

%%sql
INSERT INTO deltalakedb.noaa_delta select * from sparkblogdb.noaa_pq

You can also use CREATE TABLE AS SELECT to create a Delta Lake table and insert data from a source table in one query.

Query the Delta Lake table

Now that the data is inserted in the Delta Lake table, we can start analyzing it. Let’s run a Spark SQL to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'SEATTLE TACOMA AIRPORT, WA US'
group by 1,2

Update data in the Delta lake table

Let’s change the station name 'SEATTLE TACOMA AIRPORT, WA US' to 'Sea–Tac'. We can run an UPDATE statement on Spark for Athena to update the records of the noaa_delta table:

%%sql
UPDATE deltalakedb.noaa_delta
SET name = 'Sea-Tac'
WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'

We can run the previous SELECT query to find the minimum recorded temperature for the 'Sea-Tac' location, and the result should be the same as earlier:

%%sql
select name, year, max(MAX) as minimum_temperature
from deltalakedb.noaa_delta
where name = 'Sea-Tac'
group by 1,2

Compact data files

In Spark for Athena, you can run OPTIMIZE on the Delta Lake table, which will compact the small files into larger files, so the queries are not burdened by the small file overhead. To perform the compaction operation, run the following query:

%%sql
OPTIMIZE deltalakedb.noaa_delta

Refer to Optimizations in the Delta Lake documentation for different options available while running OPTIMIZE.

Remove files no longer referenced by a Delta Lake table

You can remove files stored in Amazon S3 that are no longer referenced by a Delta Lake table and are older than the retention threshold by running the VACCUM command on the table using Spark for Athena:

%%sql
VACUUM deltalakedb.noaa_delta

Refer to Remove files no longer referenced by a Delta table in the Delta Lake documentation for options available with VACUUM.

Drop the table and database

Run the following Spark SQL to remove the Delta Lake table you created:

%%sql
DROP TABLE deltalakedb.noaa_delta

Run the following Spark SQL to remove the database deltalakedb:

%%sql
DROP DATABASE deltalakedb

Running DROP TABLE DDL on the Delta Lake table and database deletes the metadata for these objects, but doesn’t automatically delete the data files in Amazon S3. You can run the following Python code in the notebook’s cell to delete the data from the S3 location:

import boto3

s3 = boto3.resource('s3')
bucket = s3.Bucket('<your-S3-bucket>')
bucket.objects.filter(Prefix="<prefix>/noaadelta/").delete()

To learn more about the SQL statements that you can run on a Delta Lake table using Spark for Athena, refer to the quickstart in the Delta Lake documentation.

Conclusion

This post demonstrated how to use Spark SQL in Athena notebooks to create databases and tables, insert and query data, and perform common operations like updates, compactions, and time travel on Hudi, Delta Lake, and Iceberg tables. Open table formats add ACID transactions, upserts, and deletes to data lakes, overcoming limitations of raw object storage. By removing the need to install separate connectors, Spark on Athena’s built-in integration reduces configuration steps and management overhead when using these popular frameworks for building reliable data lakes on Amazon S3. To learn more about selecting an open table format for your data lake workloads, refer to Choosing an open table format for your transactional data lake on AWS.


About the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.

Run Spark SQL on Amazon Athena Spark

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/run-spark-sql-on-amazon-athena-spark/

At AWS re:Invent 2022, Amazon Athena launched support for Apache Spark. With this launch, Amazon Athena supports two open-source query engines: Apache Spark and Trino. Athena Spark allows you to build Apache Spark applications using a simplified notebook experience on the Athena console or through Athena APIs. Athena Spark notebooks support PySpark and notebook magics to allow you to work with Spark SQL. For interactive applications, Athena Spark allows you to spend less time waiting and be more productive, with application startup time in under a second. And because Athena is serverless and fully managed, you can run your workloads without worrying about the underlying infrastructure.

Modern applications store massive amounts of data on Amazon Simple Storage Service (Amazon S3) data lakes, providing cost-effective and highly durable storage, and allowing you to run analytics and machine learning (ML) from your data lake to generate insights on your data. Before you run these workloads, most customers run SQL queries to interactively extract, filter, join, and aggregate data into a shape that can be used for decision-making, model training, or inference. Running SQL on data lakes is fast, and Athena provides an optimized, Trino- and Presto-compatible API that includes a powerful optimizer. In addition, organizations across multiple industries such as financial services, healthcare, and retail are adopting Apache Spark, a popular open-source, distributed processing system that is optimized for fast analytics and advanced transformations against data of any size. With support in Athena for Apache Spark, you can use both Spark SQL and PySpark in a single notebook to generate application insights or build models. Start with Spark SQL to extract, filter, and project attributes that you want to work with. Then to perform more complex data analysis such as regression tests and time series forecasting, you can use Apache Spark with Python, which allows you to take advantage of a rich ecosystem of libraries, including data visualization in Matplot, Seaborn, and Plotly.

In this first post of a three-part series, we show you how to get started using Spark SQL in Athena notebooks. We demonstrate querying databases and tables in the Amazon S3 and the AWS Glue Data Catalog using Spark SQL in Athena. We cover some common and advanced SQL commands used in Spark SQL, and show you how to use Python to extend your functionality with user-defined functions (UDFs) as well as to visualize queried data. In the next post, we’ll show you how to use Athena Spark with open-source transactional table formats. In the third post, we’ll cover analyzing data sources other than Amazon S3 using Athena Spark.

Prerequisites

To get started, you will need the following:

Provide Athena Spark access to your data through an IAM role

As you proceed through this walkthrough, we create new databases and tables. By default, Athena Spark doesn’t have permission to do this. To provide this access, you can add the following inline policy to the AWS Identity and Access Management (IAM) role attached to the workgroup, providing the region and your account number. For more information, refer to the section To embed an inline policy for a user or role (console) in Adding IAM identity permissions (console).

{
  "Version": "2012-10-17",
  "Statement": [
      {
          "Sid": "ReadfromPublicS3",
          "Effect": "Allow",
          "Action": [
              "s3:GetObject",
              "s3:ListBucket"
          ],
          "Resource": [
              "arn:aws:s3:::athena-examples-us-east-1/*",
              "arn:aws:s3:::athena-examples-us-east-1"
          ]
      },
      {
            "Sid": "GlueReadDatabases",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabases"
            ],
            "Resource": "arn:aws:glue:<region>:<account-id>:*"
        },
        {
            "Sid": "GlueReadDatabase",
            "Effect": "Allow",
            "Action": [
                "glue:GetDatabase",
                "glue:GetTable",
                "glue:GetTables",
                "glue:GetPartition",
                "glue:GetPartitions"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*",
                "arn:aws:glue:<region>:<account-id>:database/default"
            ]
        },
        {
            "Sid": "GlueCreateDatabase",
            "Effect": "Allow",
            "Action": [
                "glue:CreateDatabase"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb"
            ]
        },
        {
            "Sid": "GlueDeleteDatabase",
            "Effect": "Allow",
            "Action": "glue:DeleteDatabase",
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*"            ]
        },
        {
            "Sid": "GlueCreateDeleteTablePartitions",
            "Effect": "Allow",
            "Action": [
                "glue:CreateTable",
                "glue:UpdateTable",
                "glue:DeleteTable",
                "glue:BatchCreatePartition",
                "glue:CreatePartition",
                "glue:DeletePartition",
                "glue:BatchDeletePartition",
                "glue:UpdatePartition",
                "glue:GetPartition",
                "glue:GetPartitions",
                "glue:BatchGetPartition"
            ],
            "Resource": [
                "arn:aws:glue:<region>:<account-id>:catalog",
                "arn:aws:glue:<region>:<account-id>:database/sparkblogdb",
                "arn:aws:glue:<region>:<account-id>:table/sparkblogdb/*"
            ]
        }
  ]
}

Run SQL queries directly in notebook without using Python

When using Athena Spark notebooks, we can run SQL queries directly without having to use PySpark. We do this by using cell magics, which are special headers in a notebook that change the cells’ behavior. For SQL, we can add the %%sql magic, which will interpret the entire cell contents as a SQL statement to be run on Athena Spark.

Now that we have our workgroup and notebook created, let’s start exploring the NOAA Global Surface Summary of Day dataset, which provides environmental measures from various locations all over the earth. The datasets used in this post are public datasets hosted in the following Amazon S3 locations:

  • Parquet data for year 2020s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/
  • Parquet data for year 2021 s3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/year=2021/
  • Parquet data from year 2022s3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/year=2022/

To use this data, we need an AWS Glue Data Catalog database that acts as the metastore for Athena, allowing us to create external tables that point to the location of datasets in Amazon S3. First, we create a database in the Data Catalog using Athena and Spark.

Create a database

Run following SQL in your notebook using %%sql magic:

%%sql 
CREATE DATABASE sparkblogdb

You get the following output:
Output of CREATE DATABASE SQL

Create a table

Now that we have created a database in the Data Catalog, we can create a partitioned table that points to our dataset stored in Amazon S3:

%%sql
CREATE EXTERNAL TABLE sparkblogdb.noaa_pq(
  station string, 
  date string, 
  latitude string, 
  longitude string, 
  elevation string, 
  name string, 
  temp string, 
  temp_attributes string, 
  dewp string, 
  dewp_attributes string, 
  slp string, 
  slp_attributes string, 
  stp string, 
  stp_attributes string, 
  visib string, 
  visib_attributes string, 
  wdsp string, 
  wdsp_attributes string, 
  mxspd string, 
  gust string, 
  max string, 
  max_attributes string, 
  min string, 
  min_attributes string, 
  prcp string, 
  prcp_attributes string, 
  sndp string, 
  frshtt string)
  PARTITIONED BY (year string)
STORED AS PARQUET
LOCATION 's3://athena-examples-us-east-1/athenasparksqlblog/noaa_pq/'

This dataset is partitioned by year, meaning that we store data files for each year separately, which simplifies management and improves performance because we can target the specific S3 locations in a query. The Data Catalog knows about the table, and now we’ll let it work out how many partitions we have automatically by using the MSCK utility:

%%sql
MSCK REPAIR TABLE sparkblogdb.noaa_pq

When the preceding statement is complete, you can run the following command to list the yearly partitions that were found in the table:

%%sql
SHOW PARTITIONS sparkblogdb.noaa_pq

Output of SHOW PARTITIONS SQL

Now that we have the table created and partitions added, let’s run a query to find the minimum recorded temperature for the 'SEATTLE TACOMA AIRPORT, WA US' location:

%%sql
select year, min(MIN) as minimum_temperature 
from sparkblogdb.noaa_pq 
where name = 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1

You get the following output:

The image shows output of previous SQL statement.

Query a cross-account Data Catalog from Athena Spark

Athena supports accessing cross-account AWS Glue Data Catalogs, which enables you to use Spark SQL in Athena Spark to query a Data Catalog in an authorized AWS account.

The cross-account Data Catalog access pattern is often used in a data mesh architecture, when a data producer wants to share a catalog and data with consumer accounts. The consumer accounts can then perform data analysis and explorations on the shared data. This is a simplified model where we don’t need to use AWS Lake Formation data sharing. The following diagram gives an overview of how the setup works between one producer and one consumer account, which can be extended to multiple producer and consumer accounts.

The image gives an overview of how the setup works between one producer and one consumer account, which can be extended to multiple producer and consumer accounts.

You need to set up the right access policies on the Data Catalog of the producer account to enable cross-account access. Specifically, you must make sure the consumer account’s IAM role used to run Spark calculations on Athena has access to the cross-account Data Catalog and data in Amazon S3. For setup instructions, refer to Configuring cross-account AWS Glue access in Athena for Spark.

There are two ways the consumer account can access the cross-account Data Catalog from Athena Spark, depending on whether you are querying from one producer account or multiple.

Query a single producer table

If you are just querying data from a single producer’s AWS account, you can tell Athena Spark to only use that account’s catalog to resolve database objects. When using this option, you don’t have to modify the SQL because you’re configuring the AWS account ID at session level. To enable this method, edit the session and set the property "spark.hadoop.hive.metastore.glue.catalogid": "999999999999" using the following steps:

  1. In the notebook editor, on the Session menu, choose Edit session.
    Image shows wherre to click to edit session
  2. Choose Edit in JSON.
  3. Add the following property and choose Save:
    {"spark.hadoop.hive.metastore.glue.catalogid": "999999999999"}The image shows where to put JSON config property to query single producerThis will start a new session with the updated parameters.
  4. Run the following SQL statement in Spark to query tables from the producer account’s catalog:
    %%sql
    SELECT * 
    FROM <central-catalog-db>.<table> 
    LIMIT 10

Query multiple producer tables

Alternatively, you can add the producer AWS account ID in each database name, which is helpful if you’re going to query Data Catalogs from different owners. To enable this method, set the property {"spark.hadoop.aws.glue.catalog.separator": "/"} when invoking or editing the session (using the same steps as the previous section). Then, you add the AWS account ID for the source Data Catalog as part of the database name:

%%sql
SELECT * 
FROM `<producer-account1-id>/database1`.table1 t1 
join `<producer-account2-id>/database2`.table2 t2 
ON t1.id = t2.id
limit 10

If the S3 bucket belonging to the producer AWS account is configured with Requester Pays enabled, the consumer is charged instead of the bucket owner for requests and downloads. In this case, you can add the following property when invoking or editing an Athena Spark session to read data from these buckets:

{"spark.hadoop.fs.s3.useRequesterPaysHeader": "true"}

Infer the schema of your data in Amazon S3 and join with tables crawled in the Data Catalog

Rather than only being able to go through the Data Catalog to understand the table structure, Spark can infer schema and read data directly from storage. This feature allows data analysts and data scientists to perform a quick exploration of the data without needing to create a database or table, but which can also be used with other existing tables stored in the Data Catalog in the same or across different accounts. To do this, we use a Spark temp view, which is an in-memory data structure that stores the schema of data stored in a data frame.

Using the NOAA dataset partition for 2020, we create a temporary view by reading S3 data into a data frame:

year_20_pq = spark.read.parquet(f"s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/")
year_20_pq.createOrReplaceTempView("y20view")

Now you can query the y20view using Spark SQL as if it were a Data Catalog database:

%%sql
select count(*) 
from y20view

Output of previous SQL query showing count value

You can query data from both temporary views and Data Catalog tables in the same query in Spark. For example, now that we have a table containing data for years 2021 and 2022, and a temporary view with 2020’s data, we can find the dates in each year when the maximum temperature was recorded for 'SEATTLE TACOMA AIRPORT, WA US'.

To do this, we can use the window function and UNION:

%%sql
SELECT date,
       max as maximum_temperature
FROM (
        SELECT date,
            max,
            RANK() OVER (
                PARTITION BY year
                ORDER BY max DESC
            ) rnk
        FROM sparkblogdb.noaa_pq
        WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'
          AND year IN ('2021', '2022')
        UNION ALL
        SELECT date,
            max,
            RANK() OVER (
                ORDER BY max DESC
            ) rnk
        FROM y20view
        WHERE name = 'SEATTLE TACOMA AIRPORT, WA US'
    ) t
WHERE rnk = 1
ORDER by 1

You get the following output:

Output of previous SQL

Extend your SQL with a UDF in Spark SQL

You can extend your SQL functionality by registering and using a custom user-defined function in Athena Spark. These UDFs are used in addition to the common predefined functions available in Spark SQL, and once created, can be reused many times within a given session.

In this section, we walk through a straightforward UDF that converts a numeric month value into the full month name. You have the option to write the UDF in either Java or Python.

Java-based UDF

The Java code for the UDF can be found in the GitHub repository. For this post, we have uploaded a prebuilt JAR of the UDF to s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.jar.

To register the UDF, we use Spark SQL to create a temporary function:

%%sql
CREATE OR REPLACE TEMPORARY FUNCTION 
month_number_to_name as 'com.example.MonthNumbertoNameUDF'
using jar "s3a://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.jar";

Now that the UDF is registered, we can call it in a query to find the minimum recorded temperature for each month of 2022:

%%sql
select month_number_to_name(month(to_date(date,'yyyy-MM-dd'))) as month_yr_21,
min(min) as min_temp
from sparkblogdb.noaa_pq 
where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 
order by 2

You get the following output:

Output of SQL using UDF

Python-based UDF

Now let’s see how to add a Python UDF to the existing Spark session. The Python code for the UDF can be found in the GitHub repository. For this post, the code has been uploaded to s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.py.

Python UDFs can’t be registered in Spark SQL, so instead we use a small bit of PySpark code to add the Python file, import the function, and then register it as a UDF:

sc.addPyFile('s3://athena-examples-us-east-1/athenasparksqlblog/udf/month_number_to_name.py')

from month_number_to_name import month_number_to_name
spark.udf.register("month_number_to_name_py",month_number_to_name)

Now that the Python-based UDF is registered, we can use the same query from earlier to find the minimum recorded temperature for each month of 2022. The fact that it’s Python rather than Java doesn’t matter now:

%%sql
select month_number_to_name_py(month(to_date(date,'yyyy-MM-dd'))) as month_yr_21,
min(min) as min_temp
from sparkblogdb.noaa_pq 
where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 
order by 2

The output should be similar to that in the preceding section.

Plot visuals from the SQL queries

It’s straightforward to use Spark SQL, including across AWS accounts for data exploration, and not complicated to extend Athena Spark with UDFs. Now let’s see how we can go beyond SQL using Python to visualize data within the same Spark session to look for patterns in the data. We use the table and temporary views created previously to generate a pie chart that shows percentage of readings taken in each year for the station 'SEATTLE TACOMA AIRPORT, WA US'.

Let’s start by creating a Spark data frame from a SQL query and converting it to a pandas data frame:

#we will use spark.sql instead of %%sql magic to enclose the query string
#this will allow us to read the results of the query into a dataframe to use with our plot command
sqlDF = spark.sql("select year, count(*) as cnt from sparkblogdb.noaa_pq where name = 'SEATTLE TACOMA AIRPORT, WA US' group by 1 \
                  union all \
                  select 2020 as year, count(*) as cnt from y20view where name = 'SEATTLE TACOMA AIRPORT, WA US'")

#convert to pandas data frame
seatac_year_counts=sqlDF.toPandas()

Next, the following code uses the pandas data frame and Matplot library to plot a pie chart:

import matplotlib.pyplot as plt

# clear the state of the visualization figure
plt.clf()

# create a pie chart with values from the 'cnt' field, and yearly labels
plt.pie(seatac_year_counts.cnt, labels=seatac_year_counts.year, autopct='%1.1f%%')
%matplot plt

The following figure shows our output.

Output of code showing pie chart

Clean up

To clean up the resources created for this post, complete the following steps:

  1. Run the following SQL statements in the notebook’s cell to delete the database and tables from the Data Catalog:
    %%sql
    DROP TABLE sparkblogdb.noaa_pq
    
    %%sql
    DROP DATABASE sparkblogdb

  2. Delete the workgroup created for this post. This will also delete saved notebooks that are part of the workgroup.
  3. Delete the S3 bucket that you created as part of the workgroup.

Conclusion

Athena Spark makes it easier than ever to query databases and tables in the AWS Glue Data Catalog directly through Spark SQL in Athena, and to query data directly from Amazon S3 without needing a metastore for quick data exploration. It also makes it straightforward to use common and advanced SQL commands used in Spark SQL, including registering UDFs for custom functionality. Additionally, Athena Spark makes it effortless to use Python in a fast start notebook environment to visualize and analyze data queried via Spark SQL.

Overall, Spark SQL unlocks the ability to go beyond standard SQL in Athena, providing advanced users more flexibility and power through both SQL and Python in a single integrated notebook, and providing fast, complex analysis of data in Amazon S3 without infrastructure setup. To learn more about Athena Spark, refer to Amazon Athena for Apache Spark.


About the Authors

Pathik Shah is a Sr. Analytics Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Product Manager at AWS on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.

Explore your data lake using Amazon Athena for Apache Spark

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/explore-your-data-lake-using-amazon-athena-for-apache-spark/

Amazon Athena now enables data analysts and data engineers to enjoy the easy-to-use, interactive, serverless experience of Athena with Apache Spark in addition to SQL. You can now use the expressive power of Python and build interactive Apache Spark applications using a simplified notebook experience on the Athena console or through Athena APIs. For interactive Spark applications, you can spend less time waiting and be more productive because Athena instantly starts running applications in less than a second. And because Athena is serverless and fully managed, analysts can run their workloads without worrying about the underlying infrastructure.

Data lakes are a common mechanism to store and analyze data because they allow companies to manage multiple data types from a wide variety of sources, and store this data, structured and unstructured, in a centralized repository. Apache Spark is a popular open-source, distributed processing system optimized for fast analytics workloads against data of any size. It’s often used to explore data lakes to derive insights. For performing interactive data explorations on the data lake, you can now use the instant-on, interactive, and fully managed Apache Spark engine in Athena. It enables you to be more productive and get started quickly, spending almost no time setting up infrastructure and Spark configurations.

In this post, we show how you can use Athena for Apache Spark to explore and derive insights from your data lake hosted on Amazon Simple Storage Service (Amazon S3).

Solution overview

We showcase reading and exploring CSV and Parquet datasets to perform interactive analysis using Athena for Apache Spark and the expressive power of Python. We also perform visual analysis using the pre-installed Python libraries. For running this analysis, we use the built-in notebook editor in Athena.

For the purpose of this post, we use the NOAA Global Surface Summary of Day public dataset from the Registry of Open Data on AWS, which consists of daily weather summaries from various NOAA weather stations. The dataset is primarily in plain text CSV format. We have transformed the entire and subsets of the CSV dataset into Parquet format for our demo.

Before running the demo, we want to introduce the following concepts related to Athena for Spark:

  • Sessions – When you open a notebook in Athena, a new session is started for it automatically. Sessions keep track of the variables and state of notebooks.
  • Calculations – Running a cell in a notebook means running a calculation in the current session. As long as a session is running, calculations use and modify the state that is maintained for the notebook.

For more details, refer to Session and Calculations.

Prerequisites

For this demo, you need the following prerequisites:

  • An AWS account with access to the AWS Management Console
  • Athena permissions on the workgroup DemoAthenaSparkWorkgroup, which you create as part of this demo
  • AWS Identity and Access Management (IAM) permissions to create, read, and update the IAM role and policies created as part of the demo
  • Amazon S3 permissions to create an S3 bucket and read the bucket location

The following policy grants these permissions. Attach it to the IAM role or user you use to sign in to the console. Make sure to provide your AWS account ID and the Region in which you’re running the demo.

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": "athena:*",
            "Resource": "arn:aws:athena:<REGION>:<ACCOUNT_ID>:workgroup/DemoAthenaSparkWorkgroup"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:CreatePolicy",
                "iam:GetRole",
                "iam:ListAttachedRolePolicies",
                "iam:CreateRole",
                "iam:AttachRolePolicy",
                "iam:PutRolePolicy",
                "iam:ListRolePolicies",
                "iam:GetRolePolicy",
                "iam:PassRole"
            ],
            "Resource": [
                "arn:aws:iam::<ACCOUNT_ID>:role/service-role/AWSAthenaSparkExecutionRole-*",
                "arn:aws:iam::<ACCOUNT_ID>:policy/service-role/AWSAthenaSparkRolePolicy-*"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:CreateBucket",
                "s3:GetBucketLocation"
            ],
            "Resource": "arn:aws:s3:::<ACCOUNT_ID>-<REGION>-athena-results-bucket-*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "iam:ListPolicies",
                "iam:ListRoles",
                "athena:ListWorkGroups",
                "athena:ListEngineVersions"
            ],
            "Resource": "*"
        }
    ]
}

Create your Athena workgroup

We create a new Athena workgroup with Spark as the engine. Complete the following steps:

  1. On the Athena console, choose Workgroups in the navigation pane.
  2. Choose Create workgroup.
  3. For Workgroup name, enter DemoAthenaSparkWorkgroup.
    Make sure to enter the exact name because the preceding IAM permissions are scoped down for the workgroup with this name.
  4. For Analytics engine, choose Apache Spark.
  5. For Additional configurations, select Use defaults.
    The defaults include the creation of an IAM role with the required permissions to run Spark calculations on Athena and an S3 bucket to store calculation results. It also sets the notebook (which we create later) encryption key management to an AWS Key Management Service (AWS KMS) key owned by Athena.
  6. Optionally, add tags to your workgroup.
  7. Choose Create workgroup.

Modify the IAM role

Creating the workgroup creates a new IAM role. Choose the newly created workgroup, then the value under Role ARN to be redirected to the IAM console.

Add the following permission as an inline policy to the IAM role created earlier. This allows the role to read the S3 datasets. For instructions, refer to the section To embed an inline policy for a user or role (console) in Adding IAM identity permissions (console).

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/*",
                "arn:aws:s3:::noaa-gsod-pds/2022/*",
                "arn:aws:s3:::noaa-gsod-pds",
                "arn:aws:s3:::athena-examples-us-east-1"
            ]
        }
    ]
}

Set up your notebook

To run the analysis on Spark on Athena, we need a notebook. Complete the following steps to create one:

  1. On the Athena console, choose Notebook Editor.
  2. Choose the newly created workgroup DemoAthenaSparkWorkgroup on the drop-down menu.
  3. Choose Create Notebook.
  4. Provide a notebook name, for example AthenaSparkBlog.
  5. Keep the default session parameters.
  6. Choose Create.

Your notebook should now be loaded, which means you can start running Spark code. You should see the following screenshot.

Explore the dataset

Now that we have workgroup and notebook created, let’s start exploring the NOAA Global Surface Summary of Day dataset. The datasets used in this post are stored in the following locations:

  • CSV data for year 2022s3://noaa-gsod-pds/2022/
  • Parquet data for year 2021s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2021/
  • Parquet data for year 2020s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/
  • Entire dataset in Parquet format (until October 2022)s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/historical/

In the rest of this post, we show PySpark code snippets. Copy the code and enter it in the notebook’s cell. Press Shift+Enter to run the code as a calculation. Alternatively, you can choose Run. Add more cells to run subsequent code snippets.

We start by reading the CSV dataset for the year 2022 and print its schema to understand the columns contained in the dataset. Run the following code in the notebook cell:

year_22_csv = spark.read.option("header","true").csv(f"s3://noaa-gsod-pds/2022/")
year_22_csv.printSchema()

We get the following output.

We were able to submit the preceding code as a calculation instantly using the notebook.

Let’s continue exploring the dataset. Looking at the columns in the schema, we’re interested in previewing the data for the following attributes in 2022:

  • TEMP – Mean temperature
  • WDSP – Mean wind speed
  • GUST – Maximum wind gust
  • MAX – Maximum temperature
  • MIN – Minimum temperature
  • Name – Station name

Run the following code:

year_22_csv.select('NAME','DATE','TEMP','WDSP','GUST','MAX','MIN').show()

We get the following output.

Now we have an idea of what the dataset looks like. Next, let’s perform some analysis to find the maximum recorded temperature for the Seattle-Tacoma Airport in 2022. Run the following code:

from pyspark.sql.functions import max

year_22_csv.filter("NAME == 'SEATTLE TACOMA AIRPORT, WA US'").agg(max("MAX").alias("max_temp_yr_2022")).show()

We get the following output.

Next, we want to find the maximum recorded temperature for each month of 2022. For this, we use the Spark SQL feature of Athena. First, we need to create a temporary view on the year_22_csv data frame. Run the following code:

year_22_csv.createOrReplaceTempView("y22view")

To run our Spark SQL query, we use %%sql magic:

%%sql
select month(to_date(date,'yyyy-MM-dd')) as month_yr_22,max(MAX) as max_temp 
from y22view where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1

We get the following output.

The output of the preceding query produces the month in numeric form. To make it more readable, let’s convert the month numbers into month names. For this, we use a user-defined function (UDF) and register it to use in the Spark SQL queries for the rest of the notebook session. Run the following code to create and register the UDF:

import calendar

# UDF to convert month number to month name
spark.udf.register("month_name_udf",lambda x: calendar.month_name[int(x)])

We rerun the query to find the maximum recorded temperature for each month of 2022 but with the month_name_udf UDF we just created. Also, this time we sort the results based on the maximum temperature value. See the following code:

%%sql
select month_name_udf(month(to_date(date,'yyyy-MM-dd'))) as month_yr_22,
max(MAX) as max_temp
from y22view where NAME == 'SEATTLE TACOMA AIRPORT, WA US' 
group by 1 order by 2 desc

The following output shows the month names.

Until now, we have run interactive explorations for the year 2022 of the NOAA Global Surface Summary of Day dataset. Let’s say we want to compare the temperature values with the previous 2 years. We compare the maximum temperature across 2020, 2021, and 2022. As a reminder, the dataset for 2022 is in CSV format and for 2020 and 2021, the datasets are in Parquet format.

To continue with the analysis, we read the 2020 and 2021 Parquet datasets into the data frame and create temporary views on the respective data frames. Run the following code:

#Read the dataset
year_20_pq = spark.read.parquet(f"s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2020/")
year_21_pq = spark.read.parquet(f"s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/2021/")

#Create temporary views
year_20_pq.createOrReplaceTempView("y20view")
year_21_pq.createOrReplaceTempView("y21view")

#Preview the datasets
print('Preview for year 2020:')
year_20_pq.select('NAME','DATE','TEMP','WDSP','GUST','MAX','MIN').show(1)
print('Preview for year 2021:')
year_21_pq.select('NAME','DATE','TEMP','WDSP','GUST','MAX','MIN').show(1)

We get the following output.

To compare the recorded maximum temperature for each month in 2020, 2021, and 2022, we perform a join operation on the three views created so far from their respective data frames. Also, we reuse the month_name_udf UDF to convert month number to month name. Run the following code:

%%sql
select month_name_udf(month(to_date(y21.DATE,'yyyy-MM-dd'))) as month, max(y20.max) as max_temp_2020, 
max(y21.max) as max_temp_2021, max(y22.max) as max_temp_2022 \
from y20view y20 inner join y21view y21 inner join y22view y22 \
on month(to_date(y20.DATE,'yyyy-MM-dd')) = month(to_date(y21.DATE,'yyyy-MM-dd'))
and month(to_date(y21.DATE,'yyyy-MM-dd')) = month(to_date(y22.DATE,'yyyy-MM-dd')) \
where y20.NAME == 'SEATTLE TACOMA AIRPORT, WA US' and y21.NAME == 'SEATTLE TACOMA AIRPORT, WA US' and y22.NAME == 'SEATTLE TACOMA AIRPORT, WA US' \
group by 1

We get the following output.

So far, we’ve read CSV and Parquet datasets, run analysis on the individual datasets, and performed join and aggregation operations on them to derive insights instantly in an interactive mode. Next, we show how you can use the pre-installed libraries like Seaborn, Matplotlib, and Pandas for Spark on Athena to generate a visual analysis. For the full list of preinstalled Python libraries, refer to List of preinstalled Python libraries.

We plot a visual analysis to compare the recorded maximum temperature values for each month in 2020, 2021, and 2022. Run the following code, which creates a Spark data frame from the SQL query, converts it into a Pandas data frame, and uses Seaborn and Matplotlib for plotting:

import seaborn as sns
import matplotlib.pyplot as plt

y20_21_22=spark.sql("select month(to_date(y21.DATE,'yyyy-MM-dd')) as month, max(y20.max) as max_temp_yr_2020, \
max(y21.max) as max_temp_yr_2021, max(y22.max) as max_temp_yr_2022 \
from y20view y20 inner join y21view y21 inner join y22view y22 \
on month(to_date(y20.DATE,'yyyy-MM-dd')) = month(to_date(y21.DATE,'yyyy-MM-dd')) \
and month(to_date(y21.DATE,'yyyy-MM-dd')) = month(to_date(y22.DATE,'yyyy-MM-dd')) \
where y20.NAME == 'SEATTLE TACOMA AIRPORT, WA US' and y21.NAME == 'SEATTLE TACOMA AIRPORT, WA US' and y22.NAME == 'SEATTLE TACOMA AIRPORT, WA US' \
group by 1 order by 1")

#convert to pandas dataframe
y20_21_22=y20_21_22.toPandas()

#change datatypes to float for plotting
y20_21_22['max_temp_yr_2020']= y20_21_22['max_temp_yr_2020'].astype(float)
y20_21_22['max_temp_yr_2021']= y20_21_22['max_temp_yr_2021'].astype(float)
y20_21_22['max_temp_yr_2022']= y20_21_22['max_temp_yr_2022'].astype(float)

# Unpivot dataframe from wide to long format for plotting
y20_21_22=y20_21_22.melt('month',var_name='max_temperature', \
             value_name='temperature')

plt.clf()

sns.catplot(data=y20_21_22,x='month',y='temperature', hue='max_temperature', \
            sort=False, kind='point', height=4, aspect=1.5)
%matplot plt

The following graph shows our output.

Next, we plot a heatmap showing the maximum temperature trend for each month across all the years in the dataset. For this, we have converted the entire CSV dataset (until October 2022) into Parquet format and stored it in s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/historical/.

Run the following code to plot the heatmap:

noaa = spark.read.parquet(f"s3://athena-examples-us-east-1/athenasparkblog/noaa-gsod-pds/parquet/historical/")
noaa.createOrReplaceTempView("noaaview")

#query to find maximum temperature for each month from year 1973 to 2022
year_hist=spark.sql("select month(to_date(date,'yyyy-MM-dd')) as month, \
year(to_date(date,'yyyy-MM-dd')) as year,  cast(max(temp) as float) as temp \
from noaaview where NAME == 'SEATTLE TACOMA AIRPORT, WA US' group by 1,2") 

# convert spark dataframe to pandas
year_hist=year_hist.toPandas()
year_hist=year_hist.pivot("month","year","temp")

plt.clf()
grid_kws = {"height_ratios": (0.9, .05), "hspace": .5}
f, (ax, cbar_ax) = plt.subplots(2, gridspec_kw=grid_kws)

sns.heatmap(year_hist, ax=ax, cbar_ax=cbar_ax, cmap="RdYlBu_r", \
            cbar_kws={"orientation": "horizontal"})
%matplot plt

We get the following output.

From the potting, we can see the trend has been almost similar across the years, where the temperature rises during summer months and lowers as winter approaches in the Seattle-Tacoma Airport area. You can continue exploring the datasets further, running more analyses and plotting more visuals to get the feel of the interactive and instant-on experience Athena for Apache Spark offers.

Clean up resources

When you’re done with the demo, make sure to delete the S3 bucket you created to store the workgroup calculations to avoid storage costs. Also, you can delete the workgroup, which deletes the notebook as well.

Conclusion

In this post, we saw how you can use the interactive and serverless experience of Athena for Spark as the engine to run calculations instantly. You just need to create a workgroup and notebook to start running the Spark code. We explored datasets stored in different formats in an S3 data lake and ran interactive analyses to derive various insights. Also, we ran visual analyses by plotting charts using the preinstalled libraries. To learn more about Spark on Athena, refer to Using Apache Spark in Amazon Athena.


About the Authors

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Raj Devnath is a Sr. Product Manager at AWS working on Amazon Athena. He is passionate about building products customers love and helping customers extract value from their data. His background is in delivering solutions for multiple end markets, such as finance, retail, smart buildings, home automation, and data communication systems.

Analyze Amazon Ion datasets using Amazon Athena

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/analyze-amazon-ion-datasets-using-amazon-athena/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

Amazon Ion is a richly typed, self-describing, hierarchical data serialization format offering interchangeable binary and text representations. The text format extends JSON (meaning all JSON files are valid Ion files), and is easy to read and author, supporting rapid prototyping. The binary representation is efficient to store, transmit, and skip-scan parse. The rich type system provides unambiguous semantics for long-term preservation of data that can survive multiple generations of software evolution.

Athena now supports querying and writing data in Ion format. The Ion format is currently used by internal Amazon teams, by external services such as Amazon Quantum Ledger Database (Amazon QLDB) and Amazon DynamoDB (which can be exported into Ion), and in the open-source SQL query language PartiQL.

In this post, we discuss use cases and the unique features Ion offers, followed by examples of querying Ion with Athena. For demonstration purposes, we use the transformed version of the City Lots San Francisco dataset.

Features of Ion

In this section, we discuss some of the unique features that Ion offers:

  • Type system
  • Dual format
  • Efficiency gains
  • Skip scanning

Type system

Ion extends JSON, adding support for more precise data types to improve interpretability, simplify processing, and avoid rounding errors. These high precision numeric types are essential for financial services, where fractions of a cent on every transaction add up. Data types that are added are arbitrary-size integers, binary floating-point numbers, infinite-precision decimals, timestamps, CLOBS, and BLOBS.

Dual format

Users can be presented with a familiar text-based representation while benefiting from the performance efficiencies of a binary format. The interoperability between the two formats enables you to rapidly discover, digest, and interpret data in a familiar JSON-like representation, while underlying applications benefit from a reduction in storage, memory, network bandwidth, and latency from the binary format. This means you can write plain text queries that run against both text-based and binary-based Ion. You can rewrite parts of your data in text-based Ion when you need human readable data during development and switch to binary in production.

When debugging a process, the ability for systems engineers to locate data and understand it as quickly as possible is vital. Ion provides mechanisms to move between binary and a text-based representation, optimizing for both the human and the machine. Athena supports querying and writing data in both of these Ion formats. The following is an example Ion text document taken from the transformed version of the citylots dataset:

{ "type": "Feature"
, "properties": { "MAPBLKLOT": "0004002"
                 ,"BLKLOT": "0004002"
                 ,"BLOCK_NUM": "0004"
                 , "LOT_NUM": "002"
                 , "FROM_ST": "0"
                 , "TO_ST": "0"
                 , "STREET": "UNKNOWN"
                 , "ST_TYPE": null
                 , "ODD_EVEN": "E" }
, "geometry": { "type": "Polygon"
               , "coordinates": [ [ [ -122.415701204606876, 37.808327252671461, 0.0 ],
                                    [ -122.415760743593196, 37.808630700240904, 0.0 ],
                                    [ -122.413787891332404, 37.808566801319841, 0.0 ],
                                    [ -122.415701204606876, 37.808327252671461, 0.0 ] ] ] } }

Efficiency gains

Binary-encoded Ion reduces file size by moving repeated values, such as field names, into a symbol table. Symbol tables reduce CPU and read latency by limiting the validation of character encoding to the single instance of the value in the symbol table.

For example, a company that operates at Amazon’s scale can produce large volumes of application logs. When compressing Ion and JSON logs, we noticed approximately 35% less CPU time to compress the log, which produced an average of roughly 26% smaller files. Log files are critical when needed but costly to retain, so the reduction in file sizes combined with the read performance gains from symbol tables helps when handling these logs. The following is an example of file size reduction with the citylots JSON dataset when converted to Ion binary with GZIP and ZSTD compression:

77MB    citylots.ion
 17MB    citylots.ion.gz
 15MB    citylots.ion.zst
181MB    citylots.json
 22MB    citylots.json.gz
 18MB    citylots.json.zst

Skip-scanning

In a textual format, every byte must be read and interpreted, but because Ion’s binary format is a TLV (type-length-value) encoding, an application may skip over elements that aren’t needed. This reduces query and application processing costs correlated with the proportion of unexamined fields.

For example, forensic analysis of application log data involves reading large volumes of data where only a fraction of the data is needed for diagnosis. In these scenarios, skip-scanning allows the binary Ion reader to move past irrelevant fields without the cost of reading the element stored within a field. This results in users experiencing lower resource usage and quicker response times.

Query Ion datasets using Athena

Athena now supports querying and creating Ion-formatted datasets via an Ion-specific SerDe, which in conjunction with IonInputFormat and IonOutputFormat allows you to read and write valid Ion data. Deserialization allows you to run SELECT queries on the Ion data so that it can be queried to gain insights. Serialization through CTAS or INSERT INTO queries allows you to copy datasets from existing tables’ values or generate new data in the Ion format.

The interchangeable nature of Ion text and Ion binary means that Athena can read datasets that contain both types of files. Because Ion is a superset of JSON, a table using the Ion SerDe can also include JSON files. Unlike the JSON SerDe, where every new line character indicates a new row, the Ion SerDe uses a combination of closing brackets and new line characters to determine new rows. This means that if each JSON record in your source documents isn’t on a single line, these files can now be read in Athena via the Ion SerDe.

Create external tables

Athena supports querying Ion-based datasets by defining AWS Glue tables with the user-defined metadata. Let’s start with an example of creating an external table for a dataset stored in Ion text. The following is a sample row from the citylots dataset:

{
    type:"Feature",
    properties:{
        mapblklot:"0579021",
        blklot:"0579024",
        block_num:"0579",
        lot_num:"024",
        from_st:"2160",
        to_st:"2160",
        street:"PACIFIC",
        st_type:"AVE",
        odd_even:"E"
    },
    geometry:{
        type:"Polygon",coordinates:[[[-122.4308798855922, ...]]]
    }
}

To create an external table that has its data stored in Ion, you have two syntactic options.

First, you can specify STORED AS ION. This is a more concise method, and is best used for simple cases, when no additional properties are required. See the following code:

CREATE EXTERNAL TABLE city_lots_ion1 (
  type STRING, 
  properties struct<
    mapblklot:string,
    blklot:string,
    block_num:string,
    lot_num:string,
    from_st:string,
    to_st:string,
    street:string,
    st_type:string,
    odd_even:string>, 
  geometry struct<
    type:string,
    coordinates:array<array<array<decimal(18,15)>>>,
    multi_coordinates:array<array<array<array<decimal(18,15)>>>>>
)
STORED AS ION
LOCATION 's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_ion_binary/'

Alternatively, you can explicitly specify the Ion classpaths in ROW FORMAT SERDE, INPUTFORMAT, and OUTPUTFORMAT. Unlike the first method, you can specify a SERDEPROPERTIES clause here. In our example DDL, we added a SerDe property that allows values that are outside of the Hive data type ranges to overflow rather than fail the query:

CREATE EXTERNAL TABLE city_lots_ion2(
  type STRING, 
  properties struct<
    mapblklot:string,
    blklot:string,
    block_num:string,
    lot_num:string,
    from_st:string,
    to_st:string,
    street:string,
    st_type:string,
    odd_even:string>, 
  geometry struct<
    type:string,
    coordinates:array<array<array<decimal(18,15)>>>,
    multi_coordinates:array<array<array<array<decimal(18,15)>>>>>
)
ROW FORMAT SERDE 
  'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
 'ion.fail_on_overflow'='false'
 )
STORED AS INPUTFORMAT 
  'com.amazon.ionhiveserde.formats.IonInputFormat' 
OUTPUTFORMAT 
  'com.amazon.ionhiveserde.formats.IonOutputFormat'
LOCATION
  's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_ion_binary/'

Athena converts STORED AS ION into the explicit classpaths, so both tables look similar in the metastore. If we look in AWS Glue, we see both tables we just created have the same input format, output format, and SerDe serialization library.

Now that our table is created, we can run standard SELECT queries on the city_lots_ion table. Let’s run a query that specifies the block_num from our example row of Ion data to verify that we can read from the table:

-- QUERY
SELECT * FROM city_lots_ion1 WHERE properties.block_num='0579';

The following screenshot shows our results.

Use path extraction to read from specific fields

Athena supports further customization of how data is interpreted via SerDe properties. To specify these, you can add a WITH SERDEPROPERTIES clause, which is a subfield of the ROW FORMAT SERDE field.

In some situations, we may only care about some parts of the information. Let’s suppose we don’t want any of the geometry info from the citylots dataset, and only need a few of the fields in properties. One solution is to specify a search path using the path extractor SerDe property:

-- Path Extractor property
ion.<column>.path_extractor = <search path>

Path extractors are search paths that Athena uses to map the table columns to locations in the individual document. Full information on what can be done with path extractors is available on GitHub, but for our example, we focus on creating simple paths that use the names of each field as an index. In this case, the search path takes the form of a space-delimited set of indexes (and wraps it with parentheses) that indicate the location of each desired piece of information. We map the search paths to table columns by using the path extractor property.

By default, Athena builds path extractors dynamically based on column names unless overridden. This means that when we run our SELECT query on our city_lots_ion1 table, Athena builds the following search paths:

Default Extractors generated by Athena for city_lots_ion1.
-- Extracts the 'type' field to the 'type' column
    'ion.type.path_extractor' = '(type)'

-- Extracts the 'properties' field to the 'properties' column
    'ion.properties.path_extractor' = '(properties)'

-- Extracts the 'geometry' field to the 'geometry' column
    'ion.geometry.path_extractor' = '(geometry)'

Assuming we only care about the block and lot information from the properties struct, and the geometry type from the geometry struct, we can build search paths that map the desired fields from the row of data to table columns. First let’s build the search paths:

(properties mapblklot) - Search path for the mapblklot field in the properties struct
(properties blklot) - Search path for the blklot field in the properties struct
(properties block_num) - Search path for the block_num field in the properties struct
(properties lot_num) - Search path for the lot_num field in the properties struct
(geometry type) - Search path for the type field in the geometry struct

Now let’s map these search paths to table columns using the path extractor SerDe property. Because the search paths specify where to look for data, we are able to flatten and rename our datasets to better serve our purpose. For this example, let’s rename the mapblklot field to map_block_lot, blklot to block_lot, and the geometry type to shape:

 'ion.map_block_lot.path_extractor' = '(properties mapblklot)'
 'ion.block_lot.path_extractor' = '(properties blklot)'
 'ion.block_num.path_extractor' = '(properties block_num)'
 'ion.lot_num.path_extractor' = '(properties lot_num)'
 'ion.shape.path_extractor' = '(geometry type)'

Let’s put all of this together and create the city_blocks table:

CREATE EXTERNAL TABLE city_blocks (
    map_block_lot STRING,
    block_lot STRING,
    block_num STRING,
    lot_num STRING,
    shape STRING
)
ROW FORMAT SERDE
 'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
 'ion.map_block_lot.path_extractor' = '(properties mapblklot)',
 'ion.block_lot.path_extractor' = '(properties blklot)', 
 'ion.block_num.path_extractor' = '(properties block_num)',
 'ion.lot_num.path_extractor' = '(properties lot_num)',
 'ion.shape.path_extractor' = '(geometry type)'
 )
STORED AS ION
LOCATION 's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_ion_binary/'

Now we can run a select query on the city_blocks table, and see the results:

-- Select Query
SELECT * FROM city_blocks WHERE block_num='0579';

Utilizing search paths in this way enables skip-scan parsing when reading from Ion binary files, which allows Athena to skip over the unneeded fields and reduces the overall time it takes to run the query.

Use CTAS and UNLOAD for data transformation

Athena supports CREATE TABLE AS SELECT (CTAS), which creates a new table in Athena from the results of a SELECT statement from another query. Athena also supports UNLOAD, which writes query results to Amazon S3 from a SELECT statement to the specified data format.

Both CTAS and UNLOAD have a property to specify a format and a compression type. This allows you to easily convert Ion datasets to other data formats, such as Parquet or ORC, and vice versa, without needing to set up a complex extract, transform, and load (ETL) job. This is beneficial for situations when you want to transform your data, or know you will run repeated queries on a subset of your data and want to use some of the benefits inherent to columnar formats. Combining it with path extractors is especially helpful, because we’re only storing the data that we need in the new format.

Let’s use CTAS to convert our city_blocks table from Ion to Parquet, and compress it via GZIP. Because we have path extractors set up for the city_blocks table, we only need to convert a small portion of the original dataset:

CREATE TABLE city_blocks_parquet_gzip
WITH (format = 'PARQUET', write_compression='GZIP')
AS SELECT * FROM city_blocks;

We can now run queries against the city_block_parquet_gzip table, and should see the same result. To test this out, let’s run the same SELECT query we ran before on the Parquet table:

SELECT * FROM city_blocks_parquet_gzip WHERE block_num='0579';

When converting tables from another format to Ion, Athena supports the following compression codecs: ZSTD, BZIP2, GZIP, SNAPPY, and NONE. In addition to adding Ion as a new format for CTAS, we added the ion_encoding property, which allows you to choose whether the output files are created in Ion text or Ion binary. This allows for serialization of data from other formats back into Ion.

Let’s convert the original city_lots JSON file back to Ion, but this time we specify that we want to use ZSTD compression and a binary encoding.

The JSON file can be found at following location: s3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_json/

Because Ion is a superset of JSON, we can use the Ion SerDe to read this file:

CREATE EXTERNAL TABLE city_blocks_json_ion_serde (
    map_block_lot STRING,
    block_lot STRING,
    block_num STRING,
    lot_num STRING,
    shape STRING
)
ROW FORMAT SERDE
'com.amazon.ionhiveserde.IonHiveSerDe'
WITH SERDEPROPERTIES (
'ion.map_block_lot.path_extractor' = '(properties mapblklot)',
'ion.block_lot.path_extractor' = '(properties blklot)',
'ion.block_num.path_extractor' = '(properties block_num)',
'ion.lot_num.path_extractor' = '(properties lot_num)',
'ion.shape.path_extractor' = '(geometry type)'
)
STORED AS ION
LOCATION 's3://aws-bigdata-blog/artifacts/athena-ion-blog/city_lots_json/'

Now let’s copy this table into our desired Ion binary form:

CREATE TABLE city_blocks_ion_zstd
WITH (format = 'ION', write_compression='ZSTD', ion_encoding='BINARY')
AS SELECT * FROM city_blocks_parquet_gzip

Finally, let’s run our verification SELECT statement to verify everything was created properly:

SELECT * FROM city_blocks_ion_zstd WHERE block_num='0579'; 

Use UNLOAD to store Ion data in Amazon S3

Sometimes we just want to reformat the data and don’t need to store the additional metadata to query the table. In this case, we can use UNLOAD, which stores the results of the query in the specified format in an S3 bucket.

Let’s test it out, using UNLOAD to convert the drivers_names table from Ion to ORC, compress it via ZLIB, and store it to an S3 bucket:

UNLOAD (SELECT * FROM city_blocks_ion_zstd WHERE block_num='0579') 
TO 's3://<your-s3-bucket>/athena-ion-blog/unload/orc_zlib/'
WITH (format = 'ORC', compression='ZLIB')

When you check in Amazon S3, you can find a new file in the ORC format.

Conclusion

This post talked about the new feature in Athena that allows you to query and create Ion datasets using standard SQL. We discussed use cases and unique features of the Ion format like type system, dual formats (Ion text and Ion binary), efficiency gains, and skip-scanning. You can get started with querying an Ion dataset stored in Amazon S3 by simply creating a table in Athena, and also converting existing datasets to Ion format and vice versa using CTAS and UNLOAD statements.

To learn more about querying Ion using Athena, refer to Amazon Ion Hive SerDe.

References


About the Authors

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Jacob Stein works on the Amazon Athena team as a Software Development Engineer. He led the project to add support for Ion in Athena. He loves working on technical problems unique to internet scale data, and is passionate about developing scalable solutions for distributed systems.

Giovanni Matteo Fumarola is the Engineering Manager of the Athena Data Lake and Storage team. He is an Apache Hadoop Committer and PMC member. He has been focusing in the big data analytics space since 2013.

Pete Ford is a Sr. Technical Program Manager at Amazon.

Run queries 3x faster with up to 70% cost savings on the latest Amazon Athena engine

Post Syndicated from Pathik Shah original https://aws.amazon.com/blogs/big-data/run-queries-3x-faster-with-up-to-70-cost-savings-on-the-latest-amazon-athena-engine/

Amazon Athena is an interactive query service that makes it easy to analyze data in Amazon Simple Storage Service (Amazon S3) using standard SQL. Athena is serverless, so there is no infrastructure to manage, and you pay only for the queries that you run.

In November 2020, Athena announced the General Availability of the V2 version of its core engine in addition to performance enhancements and new feature capabilities. Today, V2 is the default engine version for every Athena workgroup and it includes additional enhancements made since its first release. Engine enhancements are released frequently and are automatically available to you without the need for manual intervention.

In this post, we discuss the performance benchmarks and performance enhancements of Athena’s latest engine.

We performed benchmark testing on our latest engine using TPC-DS benchmark queries at 3 TB scale and observed that query performance improved by 3x and cost decreased by 70% as a result of reduced scanned data size when compared to our earlier engine.

Performance and cost comparison on TPC-DS benchmarks

We used the industry-standard TPC-DS 3 TB to represent different customer use cases. These benchmark tests have a range of accuracy within +/- 10% and are representative of workloads with 10 times the stated benchmark size. This means a 3 TB benchmark dataset accurately represents customer workloads on 30–50 TB datasets.

In our testing, the dataset was stored in Amazon S3 in non-compressed Parquet format with no additional optimizations and the AWS Glue Data Catalog was used to store metadata for databases and tables. Fact tables were partitioned on the date column used for join operations and each fact table consisted of 2,000 partitions. We selected 71 of the 99 queries from the TPC-DS benchmark that best illustrated the differences between engines V1 and V2. We ran the queries with a concurrency of 3. This means up to 3 queries were in a running state at any given time and the next query was submitted as soon as one of the 3 running queries completed.

The following graph illustrates the total runtime of queries on engines V1 and V2 and shows runtime was 3x faster on engine V2.

We also compared the amount of data scanned by queries in this benchmark. As shown in the following graph, we found that the data scanned – and the resulting per-query costs – were 70% lower with engine V2.

Queries in our benchmark were consistently more performant with engine V2. The following graph shows the 10 TPC-DS queries with the largest improvement in runtime. For this set of queries, runtime improved by 6.9 times.

Now, let’s look at some of the enhancements in engine V2 that contributed towards these astounding results.

Performance enhancements in engine V2

The Athena engine is built upon Presto, an open-source distributed SQL query engine optimized for low latency. We’re continuously improving Athena’s engine with enhancements developed by Athena and AWS engineering teams as well as incorporating contributions from the PrestoDB and Trino community. The result is an engine with continuously increasing performance and cost-effectiveness benefits that are automatically available to you. A few such enhancements are highlighted in the following sections.

More efficient joins via dynamic filtering and dynamic partition pruning

Dynamic filtering and dynamic partition pruning improves the query runtime and reduces data scanned for queries with joins and a very selective filter clause for the table on the right side of join, as shown in the following example.

In the following query, Table_B is a small table with a very selective filter. (column_A = “value”). After the selective filter is applied to Table_B, a value list for a joined column Table_B.date is extracted first, and it’s pushed down to a joined table Table_A as a filter. It’s used to filter out unnecessary rows and partitions of Table_A. This results in reading fewer rows and partitions from the source for Table_A and helping reduce query runtime and data scan size, which in turn helps reduce the costs of running the query in Athena.

SELECT count(*)
FROM Table_A
    JOIN Table_B ON Table_A.date = Table_B.date
WHERE Table_B.column_A = "value"

More intelligent join ordering and distribution selections

Choosing a better join order and join algorithm is critical to better query performance. They can easily affect how much data is read from a particular table, how much data is transferred to the intermediate stages through networks, and how much memory is needed to build up a hash table to facilitate a join. Join order and join algorithm decisions are part of the cost-based optimizer that uses statistics to improve query plans by deciding how tables and subqueries are joined.

For cases where statistics aren’t available, we introduced a similar concept but through enumerating and analyzing the metadata of the S3 files to optimize query plans. The logic for those rules takes into account both small tables and small subqueries while making these decisions. For example, consider the following query:

SELECT *
FROM A, B, C
WHERE A.key = B.key AND C.key = B.key

The syntactical join order is A join B join C. With those optimization rules, if A is considered a small table after retrieving the approximate size through fast file enumeration on Amazon S3, the rules place table A on the build side (the side that is built into a hash table for a join) and makes the join as a broadcast join to speed up the query and reduce memory consumption. Additionally, the rules reorder the joins to minimize the intermediate result size, which helps further speed up the overall query runtime.

Nested field pruning for complex data types

In this improvement for Parquet and ORC datasets, when a nested field is queried in a complex data type like struct, array of structs, or map, only the specific subfield or nested field is read from the source instead of reading the entire row. If there is a filter on the nested field, Athena can now push down the predicate to the Parquet or ORC file to prune the content at source level. This has led to significant savings in data scanned and a reduction in query runtime. With this feature, you don’t need to flatten your nested data to improve query performance.

Optimized top N rank() functions

Previously, all input data for rank() window functions was sent to the window operator for processing and the LIMIT and filter clauses were applied at a later stage.

With this optimization, the engine can replace the window operator with a top N rank operator, push down the filters, and only keep top N (where N is the LIMIT number) entries for each window group to save memory and I/O throughput.

A good example of a query that benefited from this optimization is query 67 (shown in the following code) of the TPC-DS benchmark. It contains a subquery with a memory- and CPU-heavy window function rank() that is applied to the output of another subquery, which generates a huge amount of intermediate data after scanning the large fact table store_sales. The output of this subquery is further filtered with LIMIT and comparison operators before returning the final results. Because of the LIMIT and comparison operator, only records with the lowest 100 total sales are meaningful in each item category window group; the rest are discarded. Processing these records (which are discarded later through window functions) is memory and network intensive.

With this enhancement, only a small amount of data is kept in memory and sent across the network because the filters and limits are pushed down. This makes the entire workflow more efficient and enables the engine to process a larger amount of data with the same resources.

Query 67 was unsuccessful on engine V1 despite the considerable effort and time needed to scan (approximately 75 GB of data) and process data that was eventually thrown away due to resource exhaustion. On engine V2, this query completes in approximately 165 seconds and scans only 17 GB of data.

In the following query, filter clause rk <=100 and limit 100 are pushed to the rank() function as described earlier:

select * from
(select i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy, s_store_id,
            sumsales, 
            rank() over (partition by i_category order by sumsales desc) rk
     from
     (select i_category, i_class, i_brand, i_product_name, d_year, d_qoy, d_moy,
                s_store_id, sum(coalesce(ss_sales_price*ss_quantity,0)) sumsales
         from store_sales, date_dim, store, item
         where  ss_sold_date_sk=d_date_sk
         and ss_item_sk=i_item_sk
         and ss_store_sk = s_store_sk
         and d_month_seq between 1200 and 1200+11
         group by rollup(i_category, i_class, i_brand, i_product_name, d_year, d_qoy,
                       d_moy,s_store_id))dw1) dw2
where rk <= 100 
order by  i_category, i_class, i_brand, i_product_name, d_year, 
d_qoy, d_moy, s_store_id, sumsales, rk
limit 100  

Other optimizations

In addition to these optimizations, the following contributed towards faster queries and reduced data scan for the queries in our benchmark:

  • Further pushdown of LIMIT and filter operators to reduce the intermediate results size and data scanned from the sources
  • Enhancement of aggregation and window functions to consume much less memory and provide better performance
  • Addition of a distributed sort for the ORDER BY operator to utilize resources effectively which helps sort more data reliably
  • Improvement in task-scheduling mechanisms for more efficient processing across resources

Conclusion

With the performance optimizations in the latest engine V2 of Athena, you can run queries faster and at lower cost than before. The TPC-DS benchmark queries on engine V2 showed a 3x improvement in query runtime and cost reduction of 70%.

In our mission to innovate on behalf of customers, Athena routinely releases performance and reliability enhancements on its latest engine version. To stay up to date with the latest engine release, ensure your Athena workgroups have selected Automatic query engine upgrade in your workgroup settings.

For more information, see the performance enhancements for engine V2 and check our release notes to learn about new features and enhancements.


About the Authors

Pathik Shah is a Sr. Big Data Architect on Amazon Athena. He joined AWS in 2015 and has been focusing in the big data analytics space since then, helping customers build scalable and robust solutions using AWS analytics services.

Xuanyu Zhan is a Software Development Engineer on Amazon Athena. He joined Athena in 2019 and has been working on different areas of Athena engine V2, including engine upgrade, engine reliability, and engine performance.

Sungheun Wi is a Sr. Software Development Engineer on Amazon Athena. He joined AWS in 2019 and has been working on multiple database engines such as Athena and Aurora, focusing on analytic query processing enhancements.