All posts by Dipankar Kushari

Automate notifications on Slack for Amazon Redshift query monitoring rule violations

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/automate-notifications-on-slack-for-amazon-redshift-query-monitoring-rule-violation/

In this post, we walk you through how to set up automatic notifications of query monitoring rule (QMR) violations in Amazon Redshift to a Slack channel, so that Amazon Redshift users can take timely action.

Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud. With Amazon Redshift, you can analyze your data to derive holistic insights about your business and your customers. One of the challenges is to protect the data warehouse workload from poorly written queries that can consume significant resources. Amazon Redshift query monitoring rules are a feature of workload management (WLM) that allow automatic handling of poorly written queries. Rules that are applied to a WLM queue allow queries to be logged, canceled, hopped (only available with manual WLM), or to change priority (only available with automatic WLM). The reason to use QMRs is to protect against wasteful use of the cluster. You can also use these rules to log resource-intensive queries, which provides the opportunity to establish governance for ad hoc workloads.

The Amazon Redshift cluster automatically collects query monitoring rules metrics. This convenient mechanism lets you view attributes like the following:

  • Query runtime, in seconds
  • Query return row count
  • The CPU time for a SQL statement

It also makes Amazon Redshift Spectrum metrics available, such as the number of Redshift Spectrum rows and MBs scanned by a query.

When a query violates a QMR, Amazon Redshift logs the violation into the STL_WLM_RULE_ACTION system view. If the action is aborted for the queries that violate a QMR, end-users see an error that indicates query failure due to violation of QMRs. We recommend that administrative team members periodically examine violations listed in the STL_WLM_RULE_ACTION table and coach the involved end-users on how to avoid future rule violations.

Alternately, a centralized team, using a Slack channel for collaboration and monitoring, can configure Amazon Redshift events and alarms to be sent to their channel, so that they can take timely action. In the following sections, we walk you through how to set up automatic notifications of QMR violations to a Slack channel through the use of Slack events and alarms. This allows Amazon Redshift users to be notified and take timely actions without the need to query the system view.

Solution overview

To demonstrate how you can receive automatic notification to a Slack channel for QMR violation, we have designed the following architecture. As shown in the following diagram, we have mixed workload extract, transform, and load (ETL), business intelligence (BI) dashboards, and analytics applications that are powered by an Amazon Redshift cluster. The solution relies on AWS Lambda and Amazon Simple Notification Service (Amazon SNS) to send notifications of Amazon Redshift QMR violations to Slack.

To implement this solution, you create an Amazon Redshift cluster and attach a custom defined parameter group.

Amazon Redshift provides one default parameter group for each parameter group family. The default parameter group has preset values for each of its parameters, and it can’t be modified. If you want to use different parameter values than the default parameter group, you must create a custom parameter group and then associate your cluster with it.

In the parameter group, you can use automatic WLM and define a few workload queues, such as a queue for processing ETL workloads and a reporting queue for user queries. You can name the default queue adhoc. With automatic WLM, Amazon Redshift determines the optimal concurrency and memory allocation for each query that is running in each queue.

For each workload queue, you can define one or more QMRs. For example, you can create a rule to abort a user query if it runs for more than 300 seconds or returns more than 1 billion rows. Similarly, you can create a rule to log a Redshift Spectrum query that scans more than 100 MB.

The Amazon Redshift WLM evaluates metrics every 10 seconds. It records details about actions that result from QMR violation that is associated with user-defined queues in the STL_WLM_RULE_ACTION system table. In this solution, a Lambda function is scheduled to monitor the STL_WLM_RULE_ACTION system table every few minutes. When the function is invoked, if it finds a new entry, it publishes a detailed message to an SNS topic. A second Lambda function, created as the target subscriber to the SNS topic, is invoked whenever any message is published to the SNS topic. This second function invokes a pre-created Slack webhook, which sends the message that was received through the SNS topic to the Slack channel of your choice. (For more information on publishing messages by using Slack webhooks, see Sending messages using incoming webhooks.)

To summarize, the solution involves the following steps:

  1. Create an Amazon Redshift custom parameter group and add workload queues.
  2. Configure query monitoring rules.
  3. Attach the custom parameter group to the cluster.
  4. Create a SNS topic.
  5. Create a Lambda function and schedule it to run every 5 minutes by using an Amazon EventBridge rule.
  6. Create the Slack resources.
  7. Add an incoming webhook and authorize the Slack app to post messages to a Slack channel.
  8. Create the second Lambda function and subscribe to the SNS topic.
  9. Test the solution.

Create an Amazon Redshift custom parameter group and add workload queues

In this step, you create an Amazon Redshift custom parameter group with automatic WLM enabled. You also create the following queues to separate the workloads in the parameter group:

  • reporting – The reporting queue runs BI reporting queries that are performed by any user who belongs to the Amazon Redshift database group named reporting_group
  • adhoc – The default queue, renamed adhoc, performs any query that is not sent to any other queue

Complete the following steps to create your parameter group and add workload queues:

  1. Create a parameter group, named csblog, with automatic WLM enabled.
  2. On the Amazon Redshift console, select the custom parameter group you created.
  3. Choose Edit workload queues.
  4. On the Modify workload queues page, choose Add queue.
  5. Fill in the Concurrency scaling mode and Query priority fields as needed to create the reporting queue.
  6. Repeat these steps to add the adhoc queue.

For more information about WLM queues, refer to Configuring workload management.

Configure query monitoring rules

In this step, you add QMRs to each workload queue. For instructions, refer to Creating or modifying a query monitoring rule using the console.

For the reporting queue, add the following QMRs:

  • nested_loop – Logs any query involved in a nested loop join that results in a row count more than 10,000,000 rows.
  • long_running – Stops queries that run for more than 300 seconds (5 minutes).

For the adhoc queue, add the following QMRs:

  • returned_rows – Stops any query that returns more than 1,000,000 rows back to the calling client application (this isn’t practical and can degrade the end-to-end performance of the application).
  • spectrum_scan – Stops any query that scans more than 1000 MB of data from an Amazon Simple Storage Service (Amazon S3) data lake by using Redshift Spectrum.

Attach the custom parameter group to the cluster

To attach the custom parameter group to your provisioned Redshift cluster, follow the instructions in Associating a parameter group with a cluster. If you don’t already have a provisioned Redshift cluster, refer to Create a cluster.

For this post, we attached our custom parameter group csblog to an already created provisioned Amazon Redshift cluster.

Create an SNS topic

In this step, you create an SNS topic that receives a detailed message of QMR violation from the Lambda function that checks the Amazon Redshift system table for QMR violation entries. For instructions, refer to Creating an Amazon SNS topic.

For this post, we created an SNS topic named redshiftqmrrulenotification.

Create a Lambda function to monitor the system table

In this step, you create a Lambda function that monitors the STL_WLM_RULE_ACTION system table. Whenever any record is found in the table since the last time the function ran, the function publishes a detailed message to the SNS topic that you created earlier. You also create an EventBridge rule to invoke the function every 5 minutes.

For this post, we create a Lambda function named redshiftqmrrule that is scheduled to run every 5 minutes via an EventBridge rule named Redshift-qmr-rule-Lambda-schedule. For instructions, refer to Building Lambda functions with Python.

The following screenshot shows the function that checks the pg_catalog.stl_wlm_rule_action table.

To create an EventBridge rule and associate it with the Lambda function, refer to Create a Rule.

The following screenshot shows the EventBridge rule Redshift-qmr-rule-Lambda-schedule, which calls the function every 5 minutes.

We use the following Python 3.9 code for this Lambda function. The function uses an Amazon Redshift Data API call that uses GetClusterCredentials for temporary credentials.

import json
import time
import unicodedata
import traceback
import sys
from pip._internal import main
import urllib3
import os
import boto3
from datetime import datetime

# initiate redshift-data client in boto3
client = boto3.client("redshift-data")

query = "select userid,query,service_class,trim(rule) as rule,trim(action) as action,recordtime from stl_wlm_rule_action WHERE userid > 1 AND recordtime >= current_timestamp AT TIME ZONE 'UTC' - INTERVAL '5 minute' order by recordtime desc;"
sns = boto3.resource('sns')
sns_arn = os.environ['sns_arn']
platform_endpoint = sns.PlatformEndpoint('{sns_arn}'.format(sns_arn = sns_arn))

def status_check(client, query_id):
    desc = client.describe_statement(Id=query_id)
    status = desc["Status"]
    if status == "FAILED":
        raise Exception('SQL query failed:' + query_id + ": " + desc["Error"])
    return status.strip('"')

def execute_sql(sql_text, redshift_database, redshift_user, redshift_cluster_id):
    print("Executing: {}".format(sql_text))
    res = client.execute_statement(Database=redshift_database, DbUser=redshift_user, Sql=sql_text,
                                   ClusterIdentifier=redshift_cluster_id)
    
    query_id = res["Id"]
    print("query id")
    print(query_id)
    done = False
    while not done:
        time.sleep(1)
        status = status_check(client, query_id)
        if status in ("FAILED", "FINISHED"):
            print("status is: {}".format(status))
            break
    return query_id

def publish_to_sns(message):
    try:
        # Publish a message.
        response = platform_endpoint.publish(
                  Subject='Redshift Query Monitoring Rule Notifications',
                  Message=message,
                  MessageStructure='string'

            )
        return  response

    except:
        print(' Failed to publish messages to SNS topic: exception %s' % sys.exc_info()[1])
        return 'Failed'

def lambda_handler(event, context):
    
    rsdb = os.environ['rsdb']
    rsuser = os.environ['rsuser']
    rscluster = os.environ['rscluster']
    #print(query)
    res = execute_sql(query, rsdb, rsuser, rscluster)
    print("res")
    print(res)
    response = client.get_statement_result(
        Id = res
    )
    # datetime object containing current date and time
    now = datetime.now()
    dt_string = now.strftime("%d-%b-%Y %H:%M:%S")
    print(response) 
    if response['TotalNumRows'] > 0:
        messageText = '################## Reporting Begin' + ' [' + str(dt_string) + ' UTC] ##################\n\n'
        messageText = messageText + 'Total number of queries affected by QMR Rule violation for Redshift cluster "' + rscluster + '" is ' + str(len(response['Records'])) + '.' + '\n' + '\n'
        for i in range(len(response['Records'])):
            messageText = messageText + 'It was reported at ' + str(response['Records'][i][5]['stringValue'])[11:19] + ' UTC on ' + str(response['Records'][i][5]['stringValue'])[0:10] + ' that a query with Query ID - ' + str(response['Records'][i][1]['longValue']) + ' had to ' +  str(response['Records'][i][4]['stringValue']) + ' due to violation of QMR Rule "' + str(response['Records'][i][3]['stringValue']) + '".\n'
        messageText = messageText + '\n########################### Reporting End ############################\n\n'
        query_result_json = messageText
        response = publish_to_sns(query_result_json)
    else:
        print('No rows to publish to SNS')

We use four environment variables for this Lambda function:

  • rscluster – The Amazon Redshift provisioned cluster identifier
  • rsdb – The Amazon Redshift database where you’re running these tests
  • rsuser – The Amazon Redshift user who has the privilege to run queries on pg_catalog.stl_wlm_rule_action
  • sns_arn – The Amazon Resource Name (ARN) of the SNS topic that we created earlier

Create Slack resources

In this step, you create a new Slack workspace (if you don’t have one already), a new private Slack channel (only if you don’t have one or don’t want to use an existing one), and a new Slack app in the Slack workspace. For instructions, refer to Create a Slack workspace, Create a channel, and Creating an app.

For this post, we created the following resources in the Slack website and Slack desktop app:

  • A Slack workspace named RedshiftQMR*****
  • A private channel, named redshift-qmr-notification-*****-*******, in the newly created Slack workspace
  • A new Slack app in the Slack workspace, named RedshiftQMRRuleNotification (using the From Scratch option)

Add an incoming webhook and authorize Slack app

In this step, you enable and add an incoming webhook to the Slack workspace that you created. For full instructions, refer to Enable Incoming Webhooks and Create an Incoming Webhook. You also authorize your Slack app so that it can post messages to the private Slack channel.

  1. In the Slack app, under Settings in the navigation pane, choose Basic Information.
  2. Choose Incoming Webhooks.
  3. Turn on Activate Incoming Webhooks.
  4. Choose Add New Webhook to Workspace.
  5. Authorize the Slack app RedshiftQMRRuleNotification so that it can post messages to the private Slack channel redshift-qmr-notification-*****-*******.

The following screenshot shows the details of the newly added incoming webhook.

Create a second Lambda function and subscribe to the SNS topic

In this step, you create a second Lambda function that is subscribed to the SNS topic that you created earlier. For full instructions, refer to Building Lambda functions with Python and Subscribing a function to a topic.

For this post, we create a second function named redshiftqmrrulesnsinvoke, which is subscribed to the SNS topic redshiftqmrrulenotification. The second function sends a detailed QMR violation message (received from the SNS topic) to the designated Slack channel named redshift-qmr-notification-*. This function uses the incoming Slack webhook that we created earlier.

We also create an SNS subscription of the second Lambda function to the SNS topic that we created previously.

The following is the Python 3.9 code used for the second Lambda function:

import urllib3
import json
import os

http = urllib3.PoolManager()
def lambda_handler(event, context):
    
    url = os.environ['webhook']
    channel = os.environ['channel']
    msg = {
        "channel": channel,
        "username": "WEBHOOK_USERNAME",
        "text": event['Records'][0]['Sns']['Message'],
        "icon_emoji": ""
    }
    
    encoded_msg = json.dumps(msg).encode('utf-8')
    resp = http.request('POST',url, body=encoded_msg)
    print({
        "message": event['Records'][0]['Sns']['Message'], 
        "status_code": resp.status, 
        "response": resp.data
    })

We use two environment variables for the second Lambda function:

  • channel – The Slack channel that we created
  • webhook – The Slack webhook that we created

Test the solution

To show the effect of the QMRs, we ran queries that violate the QMRs we set up.

Test 1: Returned rows

Test 1 looks for violations of the returned_rows QMR, in which the return row count is over 1,000,000 for a query that ran in the adhoc queue.

We created and loaded a table named lineitem in a schema named aquademo, which has more than 18 billion records. You can refer to the GitHub repo to create and load the table.

We ran the following query, which violated the returned_rows QMR, and the query was stopped as specified in the action set in the QMR.

select * from aquademo.lineitem limit 1000001;

The following screenshot shows the view from the Amazon Redshift client after running the query.

The following screenshot shows the view on the Amazon Redshift console.

The following screenshot shows the notification we received in our Slack channel.

Test 2: Long-running queries

Test 2 looks for violations of the long_running QMR, in which query runtime is over 300 seconds for a user who belongs to reporting_group.

In the following code, we created a new Amazon Redshift group named reporting_group and added a new user, named reporting_user, to the group. reporting_group is assigned USAGE and SELECT privileges on all tables in the retail and aquademo schemas.

create group reporting_group;
create user reporting_user in group reporting_group password 'Test12345';
grant usage on schema retail,aquademo to group reporting_group;
grant select on all tables in schema retail,aquademo to group reporting_group;

We set the session authorization to reporting_user so the query runs in the reporting queue. We ran the following query, which violated the long_running QMR, and the query was stopped as specified in the action set in the QMR:

set session authorization reporting_user;
set enable_result_cache_for_session  to off;
select * from aquademo.lineitem;

The following screenshot shows the view from the Amazon Redshift client.

The following screenshot shows the view on the Amazon Redshift console.

The following screenshot shows the notification we received in our Slack channel.

Test 3: Nested loops

Test 3 looks for violations of the nested_loop QMR, in which the nested loop join row count is over 10,000,000 for a user who belongs to reporting_group.

We set the session authorization to reporting_user so the query runs in the reporting queue. We ran the following query, which violated the nested_loop QMR, and the query logged the violation as specified in the action set in the QMR:

set session authorization reporting_user;
set enable_result_cache_for_session  to off;
select ss.*,cd.* 
from retail.store_sales ss
, retail.customer_demographics cd;

Before we ran the original query, we also checked the explain plan and noted that this nested loop will return more than 10,000,000 rows. The following screenshot shows the query explain plan.

The following screenshot shows the notification we received in our Slack channel.

Test 4: Redshift Spectrum scans

Test 4 looks for violations of the spectrum_scan QMR, in which Redshift Spectrum scans exceed 1000 MB for a query that ran in the adhoc queue.

For this example, we used store_sales data (unloaded from an Amazon Redshift table that was created by using the TPC-DS benchmark data) loaded in an Amazon S3 location. Data in Amazon S3 is non-partitioned under one prefix and has a volume around 3.9 GB. We created an external schema (qmr_spectrum_rule_test) and external table (qmr_rule_store_sales) in Redshift Spectrum.

We used the following steps to run this test with the sample data:

  1. Run an unload SQL command:
    unload ('select * from store_sales')
    to 's3://<<Your Amazon S3 Location>>/store_sales/' 
    iam_role default;

  2. Create an external schema from Redshift Spectrum:
    CREATE EXTERNAL SCHEMA if not exists qmr_spectrum_rule_test
    FROM DATA CATALOG DATABASE 'qmr_spectrum_rule_test' region 'us-east-1' 
    IAM_ROLE default
    CREATE EXTERNAL DATABASE IF NOT exists;

  3. Create an external table in Redshift Spectrum:
    create external table qmr_spectrum_rule_test.qmr_rule_store_sales
    (
    ss_sold_date_sk int4 ,            
      ss_sold_time_sk int4 ,     
      ss_item_sk int4  ,      
      ss_customer_sk int4 ,           
      ss_cdemo_sk int4 ,              
      ss_hdemo_sk int4 ,         
      ss_addr_sk int4 ,               
      ss_store_sk int4 ,           
      ss_promo_sk int4 ,           
      ss_ticket_number int8 ,        
      ss_quantity int4 ,           
      ss_wholesale_cost numeric(7,2) ,          
      ss_list_price numeric(7,2) ,              
      ss_sales_price numeric(7,2) ,
      ss_ext_discount_amt numeric(7,2) ,             
      ss_ext_sales_price numeric(7,2) ,              
      ss_ext_wholesale_cost numeric(7,2) ,           
      ss_ext_list_price numeric(7,2) ,               
      ss_ext_tax numeric(7,2) ,                 
      ss_coupon_amt numeric(7,2) , 
      ss_net_paid numeric(7,2) ,   
      ss_net_paid_inc_tax numeric(7,2) ,             
      ss_net_profit numeric(7,2)                     
    ) ROW FORMAT DELIMITED 
      FIELDS TERMINATED BY '|' 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://<<Your Amazon S3 Location>>/store_sales/'
    TABLE PROPERTIES (
      'averageRecordSize'='130', 
      'classification'='csv', 
      'columnsOrdered'='true', 
      'compressionType'='none', 
      'delimiter'='|', 
      'recordCount'='11083990573', 
      'sizeKey'='1650877678933', 
      'typeOfData'='file');

  4. Run the following query:
    select * 
    FROM qmr_spectrum_rule_test.qmr_rule_store_sales 
    where ss_sold_date_sk = 2451074;

The query violated the spectrum_scan QMR, and the query was stopped as specified in the action set in the QMR.

The following screenshot shows the view from the Amazon Redshift client.

The following screenshot shows the view on the Amazon Redshift console.

The following screenshot shows the notification we received in our Slack channel.

Clean up

When you’re finished with this solution, we recommend deleting the resources you created to avoid incurring any further charges.

Conclusion

Amazon Redshift is a powerful, fully managed data warehouse that can offer significantly increased performance and lower cost in the cloud. In this post, we discussed how you can automate notification of misbehaving queries on Slack by using query monitoring rules. QMRs can help you maximize cluster performance and throughput when supporting mixed workloads. Use these instructions to set up your Slack channel to receive automatic notifications from your Amazon Redshift cluster for any violation of QMRs.


About the Authors

Dipankar Kushari is a Senior Specialist Solutions Architect in the Analytics team at AWS.

Harshida Patel is a Specialist Senior Solutions Architect in the Analytics team at AWS.

Export JSON data to Amazon S3 using Amazon Redshift UNLOAD

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/export-json-data-to-amazon-s3-using-amazon-redshift-unload/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL. Amazon Redshift offers up to three times better price performance than any other cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as high-performance business intelligence (BI) reporting, dashboarding applications, data exploration, and real-time analytics.

As the amount of data generated by IoT devices, social media, and cloud applications continues to grow, organizations are looking to easily and cost-effectively analyze this data with minimal time-to-insight. A vast amount of this data is available in semi-structured format and needs additional extract, transform, and load (ETL) processes to make it accessible or to integrate it with structured data for analysis. Amazon Redshift powers the modern data architecture, which enables you to query data across your data warehouse, data lake, and operational databases to gain faster and deeper insights not possible otherwise. With a modern data architecture, you can store data in semi-structured format in your Amazon Simple Storage Service (Amazon S3) data lake and integrate it with structured data on Amazon Redshift. This allows you to make this data available to other analytics and machine learning applications rather than locking it in a silo.

In this post, we discuss the UNLOAD feature in Amazon Redshift and how to export data from an Amazon Redshift cluster to JSON files on an Amazon S3 data lake.

JSON support features in Amazon Redshift

Amazon Redshift features such as COPY, UNLOAD, and Amazon Redshift Spectrum enable you to move and query data between your data warehouse and data lake.

With the UNLOAD command, you can export a query result set in text, JSON, or Apache Parquet file format to Amazon S3. UNLOAD command is also recommended when you need to retrieve large result sets from your data warehouse. Since UNLOAD processes and exports data in parallel from Amazon Redshift’s compute nodes to Amazon S3, this reduces the network overhead and thus time in reading large number of rows. When using the JSON option with UNLOAD, Amazon Redshift unloads to a JSON file with each line containing a JSON object, representing a full record in the query result. In the JSON file, Amazon Redshift types are unloaded as the closest JSON representation. For example, Boolean values are unloaded as true or false, NULL values are unloaded as null, and timestamp values are unloaded as strings. If a default JSON representation doesn’t suit a particular use case, you can modify it by casting to the desired type in the SELECT query of the UNLOAD statement.

Additionally, to create a valid JSON object, the name of each column in the query result must be unique. If the column names in the query result aren’t unique, the JSON UNLOAD process fails. To avoid this, we recommend using proper column aliases so that each column in the query result remains unique while getting unloaded. We illustrate this behavior later in this post.

With the Amazon Redshift SUPER data type, you can store data in JSON format on local Amazon Redshift tables. This way, you can process the data without any network overhead and use Amazon Redshift schema properties to optimally save and query semi structured data locally. In addition to achieving low latency, you can also use the SUPER data type when your query requires strong consistency, predictable query performance, complex query support, and ease of use with evolving schemas and schemaless data. Amazon Redshift supports writing nested JSON when the query result contains SUPER columns.

Updating and maintaining data with constantly evolving schemas can be challenging and adds extra ETL steps to the analytics pipeline. The JSON file format provides support for schema definition, is lightweight, and is widely used as a data transfer mechanism by different services, tools, and technologies.

Amazon OpenSearch Service (successor to Amazon Elasticsearch Service) is a distributed, open-source search and analytics suite used for a broad set of use cases like real-time application monitoring, log analytics, and website search. It uses JSON as the supported file format for data ingestion. The ability to unload data natively in JSON format from Amazon Redshift into the Amazon S3 data lake reduces complexity and additional data processing steps if that data needs to be ingested into Amazon OpenSearch Service for further analysis.

This is one example of how seamless data movement can help you build an integrated data platform with a data lake on Amazon S3, data warehouse on Amazon Redshift and search and log analytics using Amazon OpenSearch Service and any other JSON-oriented downstream analytics solution. For more information about the Lake House approach, see Build a Lake House Architecture on AWS.

Examples of Amazon Redshift JSON UNLOAD

In this post, we show you the following different scenarios:

  • Example 1 – Unload customer data in JSON format into Amazon S3, partitioning output files into partition folders, following the Apache Hive convention, with customer birth month as the partition key. We make a few changes to the columns in the SELECT statement of the UNLOAD command:
    • Convert the c_preferred_cust_flag column from character to Boolean
    • Remove leading and trailing spaces from the c_first_name, c_last_name, and c_email_address columns using the Amazon Redshift built-in function btrim
  • Example 2 – Unload line item data (with SUPER column) in JSON format into Amazon S3 with data not partitioned
  • Example 3 – Unload line item data (With SUPER column) in JSON format into Amazon S3, partitioning output files into partition folders, following the Apache Hive convention, with customer key as the partition key

For the first example, we used the customer table and data from the TPCDS dataset. For examples involving table with SUPER column, we used the customer_orders_lineitem table and data from the following tutorial.

Example 1: Export customer data

For this example, we used the customer table and data from TPCDS dataset. We created the database schema and customer table, and copied data into it. See the following code:

-- Created a new database
create schema json_unload_demo; 

-- created and populated customer table in the new schema

create table json_unload_demo.customer
(
  c_customer_sk int4 not null ,                 
  c_customer_id char(16) not null ,             
  c_current_cdemo_sk int4 ,   
  c_current_hdemo_sk int4 ,   
  c_current_addr_sk int4 ,    
  c_first_shipto_date_sk int4 ,                 
  c_first_sales_date_sk int4 ,
  c_salutation char(10) ,     
  c_first_name char(20) ,     
  c_last_name char(30) ,      
  c_preferred_cust_flag char(1) ,               
  c_birth_day int4 ,          
  c_birth_month int4 ,        
  c_birth_year int4 ,         
  c_birth_country varchar(20) ,                 
  c_login char(13) ,          
  c_email_address char(50) ,  
  c_last_review_date_sk int4 ,
  primary key (c_customer_sk)
) distkey(c_customer_sk);

copy json_unload_demo.customer from 's3://redshift-downloads/TPC-DS/2.13/3TB/customer/' 
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>' 
gzip delimiter '|' EMPTYASNULL;

You can create a default AWS Identity and Access Management (IAM) role for your Amazon Redshift cluster to copy from and unload to your Amazon S3 location. For more information, see Use the default IAM role in Amazon Redshift to simplify accessing other AWS services.

In this example, we unloaded customer data for all customers with birth year 1992 in JSON format into Amazon S3 without any partitions. We make the following changes to the UNLOAD statement:

  • Convert the c_preferred_cust_flag column from character to Boolean
  • Remove leading and trailing spaces from the c_first_name, c_last_name, and c_email_address columns using the btrim function
  • Set the maximum size of exported files in Amazon S3 to 64 MB

See the following code:

unload ('SELECT c_customer_sk,
    c_customer_id ,
    c_current_cdemo_sk ,
    c_current_hdemo_sk ,
    c_current_addr_sk ,
    c_first_shipto_date_sk ,
    c_first_sales_date_sk ,
    c_salutation ,
    btrim(c_first_name),
    btrim(c_last_name),
    c_birth_day ,
    c_birth_month ,
    c_birth_year ,
    c_birth_country ,
    c_last_review_date_sk,
    DECODE(c_preferred_cust_flag, ''Y'', TRUE, ''N'', FALSE)::boolean as c_preferred_cust_flag_bool,
    c_login, 
    btrim(c_email_address) 
    from customer where c_birth_year = 1992;')
to 's3://<<Your Amazon S3 Bucket>>/non-partitioned/non-super/customer/' 
FORMAT JSON 
partition by (c_birth_month)  include
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>'
MAXFILESIZE 64 MB;

When we ran the UNLOAD command, we encountered an error because the columns that used the btrim function all attempted to be exported as btrim (which is the default behavior of Amazon Redshift when the same function is applied to multiple columns that are selected together). To avoid this error, we need to use a unique column alias for each column where the btrim function was used.

If we select the c_first_name, c_last_name, and c_email_address columns by applying the btrim function and c_preferred_cust_flag, we can convert them from character to Boolean.

We ran the following query in Amazon Redshift Query Editor v2:

SELECT btrim(c_first_name) ,
    btrim(c_last_name),
    btrim(c_email_address) , 
    DECODE(c_preferred_cust_flag, 'Y', TRUE, 'N', FALSE)::boolean c_preferred_cust_flag_bool  
    from customer where c_birth_year = 1992 limit 10; 

All three columns that used the btrim function are set as btrim in the output result instead of their respective column name.

An error occurred in UNLOAD because we didn’t use a column alias.

We added column aliases in the following code:

unload ('SELECT c_customer_sk,
    c_customer_id ,
    c_current_cdemo_sk ,
    c_current_hdemo_sk ,
    c_current_addr_sk ,
    c_first_shipto_date_sk ,
    c_first_sales_date_sk ,
    c_salutation ,
    btrim(c_first_name) as c_first_name,
    btrim(c_last_name) as c_last_name,
    c_birth_day ,
    c_birth_month ,
    c_birth_year ,
    c_birth_country ,
    c_last_review_date_sk,
    DECODE(c_preferred_cust_flag, ''Y'', TRUE, ''N'', FALSE)::boolean as c_preferred_cust_flag_bool,
    c_login, 
    btrim(c_email_address) as c_email_addr_trimmed 
    from customer where c_birth_year = 1992;')
to 's3://<<Your Amazon S3 Bucket>>/non-partitioned/non-super/customer/' 
FORMAT JSON 
partition by (c_birth_month)  include
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>'
MAXFILESIZE 64 MB;

After we added column aliases, the UNLOAD command completed successfully and files were exported to the desired location in Amazon S3.

The following screenshot shows data is unloaded in JSON format partitioning output files into partition folders, following the Apache Hive convention, with customer birth month as the partition key into Amazon S3 from the Amazon Redshift customer table.

A query with Amazon S3 Select shows a snippet of data in the JSON file on Amazon S3 that was unloaded.

The column aliases c_first_name, c_last_name, and c_email_addr_trimmed were written into the JSON record as per the SELECT query. Boolean values were saved in c_preferred_cust_flag_bool as well.

Examples 2 and 3: Using the SUPER column

For the next two examples, we used the customer_orders_lineitem table and data. We created the customer_orders_lineitem table and copied data into it with the following code:

-- Created a new table with SUPER column

CREATE TABLE JSON_unload_demo.customer_orders_lineitem
(c_custkey bigint
,c_name varchar
,c_address varchar
,c_nationkey smallint
,c_phone varchar
,c_acctbal decimal(12,2)
,c_mktsegment varchar
,c_comment varchar
,c_orders super
);

-- Loaded data into the new table
COPY json_unload_demo.customer_orders_lineitem 
FROM 's3://redshift-downloads/semistructured/tpch-nested/data/json/customer_orders_lineitem'
IAM_ROLE '<<AWS IAM role attached to your amazon redshift cluster>>'
FORMAT JSON 'auto';

Next, we ran a few queries to explore the customer_orders_lineitem table’s data:

select * from json_unload_demo.customer_orders_lineitem;

select c_orders from json_unload_demo.customer_orders_lineitem;

SELECT attr as attribute_name, val as object_value FROM json_unload_demo.customer_orders_lineitem c, c.c_orders o, UNPIVOT o AS val AT attr;

Example 2: Without partitions

In this example, we unloaded all the rows of the customer_orders_lineitem table in JSON format into Amazon S3 without any partitions:

unload ('select * from json_unload_demo.customer_orders_lineitem;')
to 's3://<<Your Amazon S3 Bucket>>/non-partitioned/super/customer-order-lineitem/'
FORMAT JSON
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>';

After we run the UNLOAD command, the data is available in the desired Amazon S3 location. The following screenshot shows data is unloaded in JSON format without any partitions into Amazon S3 from the Amazon Redshift customer_orders_lineitem table.

A query with Amazon S3 Select shows a snippet of data in the JSON file on Amazon S3 that was unloaded.

Example 3: With partitions

In this example, we unloaded all the rows of the customer_orders_lineitem table in JSON format partitioning output files into partition folders, following the Apache Hive convention, with customer key as the partition key into Amazon S3:

unload ('select * from json_unload_demo.customer_orders_lineitem;')
to 's3://<<Your Amazon S3 Bucket>>/partitioned/super/customer-order-lineitem-1/'
FORMAT JSON
partition by (c_custkey) include
iam_role '<<AWS IAM role attached to your amazon redshift cluster>>';

After we run the UNLOAD command, the data is available in the desired Amazon S3 location. The following screenshot shows data is unloaded in JSON format partitioning output files into partition folders, following the Apache Hive convention, with customer key as the partition key into Amazon S3 from the Amazon Redshift customer_orders_lineitem table.

A query with Amazon S3 Select shows a snippet of data in the JSON file on Amazon S3 that got unloaded.

Conclusion

In this post, we showed how you can use the Amazon Redshift UNLOAD command to unload the result of a query to one or more JSON files into your Amazon S3 location. We also showed how you can partition the data using your choice of partition key while you unload the data. You can use this feature to export data to JSON files into Amazon S3 from your Amazon Redshift cluster or your Amazon Redshift Serverless endpoint to make your data processing simpler and build an integrated data analytics platform.


About the Authors

Dipankar Kushari is a Senior Analytics Solutions Architect with AWS.

Sayali Jojan is a Senior Analytics Solutions Architect with AWS. She has 7 years of experience working with customers to design and build solutions on the AWS Cloud, with a focus on data and analytics.

Cody Cunningham is a Software Development Engineer with AWS, working on data ingestion for Amazon Redshift.

Introducing new features for Amazon Redshift COPY: Part 1

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/part-1-introducing-new-features-for-amazon-redshift-copy/

Amazon Redshift is a fast, scalable, secure, and fully managed cloud data warehouse that makes it simple and cost-effective to analyze all your data using standard SQL. Amazon Redshift offers up to three times better price performance than any other cloud data warehouse. Tens of thousands of customers use Amazon Redshift to process exabytes of data per day and power analytics workloads such as high-performance business intelligence (BI) reporting, dashboarding applications, data exploration, and real-time analytics.

Loading data is a key process for any analytical system, including Amazon Redshift. Loading very large datasets can take a long time and consume a lot of computing resources. How your data is loaded can also affect query performance. You can use many different methods to load data into Amazon Redshift. One of the fastest and most scalable methods is to use the COPY command. This post dives into some of the recent enhancements made to the COPY command and how to use them effectively.

Overview of the COPY command

A best practice for loading data into Amazon Redshift is to use the COPY command. The COPY command loads data in parallel from Amazon Simple Storage Service (Amazon S3), Amazon EMR, Amazon DynamoDB, or multiple data sources on any remote hosts accessible through a Secure Shell (SSH) connection.

The COPY command reads and loads data in parallel from a file or multiple files in an S3 bucket. You can take maximum advantage of parallel processing by splitting your data into multiple files, in cases where the files are compressed. The COPY command appends the new input data to any existing rows in the target table. The COPY command can load data from Amazon S3 for the file formats AVRO, CSV, JSON, and TXT, and for columnar format files such as ORC and Parquet.

Use COPY with FILLRECORD

In situations when the contiguous fields are missing at the end of some of the records for data files being loaded, COPY reports an error indicating that there is mismatch between the number of fields in the file being loaded and the number of columns in the target table. In some situations, columnar files (such as Parquet) that are produced by applications and ingested into Amazon Redshift via COPY may have additional fields added to the files (and new columns to the target Amazon Redshift table) over time. In such cases, these files may have values absent for certain newly added fields. To load these files, you previously had to either preprocess the files to fill up values in the missing fields before loading the files using the COPY command, or use Amazon Redshift Spectrum to read the files from Amazon S3 and then use INSERT INTO to load data into the Amazon Redshift table.

With the FILLRECORD parameter, you can now load data files with a varying number of fields successfully in the same COPY command, as long as the target table has all columns defined. The FILLRECORD parameter addresses ease of use because you can now directly use the COPY command to load columnar files with varying fields into Amazon Redshift instead of achieving the same result with multiple steps.

With the FILLRECORD parameter, missing columns are loaded as NULLs. For text and CSV formats, if the missing column is a VARCHAR column, zero-length strings are loaded instead of NULLs. To load NULLs to VARCHAR columns from text and CSV, specify the EMPTYASNULL keyword. NULL substitution only works if the column definition allows NULLs.

Use FILLRECORD while loading Parquet data from Amazon S3

In this section, we demonstrate the utility of FILLRECORD by using a Parquet file that has a smaller number of fields populated than the number of columns in the target Amazon Redshift table. First we try to load the file into the table without the FILLRECORD parameter in the COPY command, then we use the FILLRECORD parameter in the COPY command.

For the purpose of this demonstration, we have created the following components:

  • An Amazon Redshift cluster with a database, public schema, awsuser as admin user, and an AWS Identity and Access Management (IAM) role, used to perform the COPY command to load the file from Amazon S3, attached to the Amazon Redshift cluster. For details on authorizing Amazon Redshift to access other AWS services, refer to Authorizing Amazon Redshift to access other AWS services on your behalf.
  • An Amazon Redshift table named call_center_parquet.
  • A Parquet file already uploaded to an S3 bucket from where the file is copied into the Amazon Redshift cluster.

The following code is the definition of the call_center_parquet table:

DROP TABLE IF EXISTS public.call_center_parquet;
CREATE TABLE IF NOT EXISTS public.call_center_parquet
(
	cc_call_center_sk INTEGER NOT NULL ENCODE az64
	,cc_call_center_id varchar(100) NOT NULL  ENCODE lzo
	,cc_rec_start_date VARCHAR(50)   ENCODE lzo
	,cc_rec_end_date VARCHAR(50)   ENCODE lzo
	,cc_closed_date_sk varchar (100)   ENCODE lzo
	,cc_open_date_sk INTEGER   ENCODE az64
	,cc_name VARCHAR(50)   ENCODE lzo
	,cc_class VARCHAR(50)   ENCODE lzo
	,cc_employees INTEGER   ENCODE az64
	,cc_sq_ft INTEGER   ENCODE az64
	,cc_hours VARCHAR(20)   ENCODE lzo
	,cc_manager VARCHAR(40)   ENCODE lzo
	,cc_mkt_id INTEGER   ENCODE az64
	,cc_mkt_class VARCHAR(50)   ENCODE lzo
	,cc_mkt_desc VARCHAR(100)   ENCODE lzo
	,cc_market_manager VARCHAR(40)   ENCODE lzo
	,cc_division INTEGER   ENCODE az64
	,cc_division_name VARCHAR(50)   ENCODE lzo
	,cc_company INTEGER   ENCODE az64
	,cc_company_name VARCHAR(50)   ENCODE lzo
	,cc_street_number INTEGER   ENCODE az64
	,cc_street_name VARCHAR(60)   ENCODE lzo
	,cc_street_type VARCHAR(15)   ENCODE lzo
	,cc_suite_number VARCHAR(10)   ENCODE lzo
	,cc_city VARCHAR(60)   ENCODE lzo
	,cc_county VARCHAR(30)   ENCODE lzo
	,cc_state CHAR(2)   ENCODE lzo
	,cc_zip INTEGER   ENCODE az64
	,cc_country VARCHAR(20)   ENCODE lzo
	,cc_gmt_offset NUMERIC(5,2)   ENCODE az64
	,cc_tax_percentage NUMERIC(5,2)   ENCODE az64
)
DISTSTYLE ALL
;
ALTER TABLE public.call_center_parquet OWNER TO awsuser;

The table has 31 columns.

The Parquet file doesn’t contain any value for the cc_gmt_offset and cc_tax_percentage fields. It has 29 columns. The following screenshot shows the schema definition for the Parquet file located in Amazon S3, which we load into Amazon Redshift.

We ran the COPY command two different ways: with or without the FILLRECORD parameter.

We first tried to load the Parquet file into the call_center_parquet table without the FILLRECORD parameter:

COPY call_center_parquet
FROM 's3://*****************/parquet/part-00000-d9a3ab22-9d7d-439a-b607-2ddc2d39c5b0-c000.snappy.parquet'
iam_role 'arn:aws:iam::**********:role/RedshiftAttachedRole'
FORMAT PARQUET;

It generated an error while performing the copy.

Next, we tried to load the Parquet file into the call_center_parquet table and used the FILLRECORD parameter:

COPY call_center_parquet
FROM 's3://*****************/parquet/part-00000-d9a3ab22-9d7d-439a-b607-2ddc2d39c5b0-c000.snappy.parquet'
iam_role 'arn:aws:iam::**********:role/RedshiftAttachedRole'
FORMAT PARQUET FILLRECORD;

The Parquet data was loaded successfully in the call_center_parquet table, and NULL was entered into the cc_gmt_offset and cc_tax_percentage columns.

Split large text files while copying

The second new feature we discuss in this post is automatically splitting large files to take advantage of the massive parallelism of the Amazon Redshift cluster. A best practice when using the COPY command in Amazon Redshift is to load data using a single COPY command from multiple data files. This loads data in parallel by dividing the workload among the nodes and slices in the Amazon Redshift cluster. When all the data from a single file or small number of large files is loaded, Amazon Redshift is forced to perform a much slower serialized load, because the Amazon Redshift COPY command can’t utilize the parallelism of the Amazon Redshift cluster. You have to write additional preprocessing steps to split the large files into smaller files so that the COPY command loads data in parallel into the Amazon Redshift cluster.

The COPY command now supports automatically splitting a single file into multiple smaller scan ranges. This feature is currently supported only for large uncompressed delimited text files. More file formats and options, such as COPY with CSV keyword, will be added in the near future.

This helps improve performance for COPY queries when loading a small number of large uncompressed delimited text files into your Amazon Redshift cluster. Scan ranges are implemented by splitting the files into 64 MB chunks, which get assigned to each Amazon Redshift slice. This change addresses ease of use because you don’t need to split large uncompressed text files as an additional preprocessing step.

With Amazon Redshift’s ability to split large uncompressed text files, you can see performance improvements for the COPY command with a single large file or a few files with significantly varying relative sizes (for example, one file 5 GB in size and 20 files of a few KBs). Performance improvements for the COPY command are more significant as the file size increases even with keeping the same Amazon Redshift cluster configuration. Based on tests done, we observed a more than 1,500% performance improvement for the COPY command for loading a 6 GB uncompressed text file when the auto splitting feature became available.

There are no changes in the COPY query or keywords to enable this change, and splitting of files is automatically applied for the eligible COPY commands. Splitting isn’t applicable for the COPY query with the keywords CSV, REMOVEQUOTES, ESCAPE, and FIXEDWIDTH.

For the test, we used a single 6 GB uncompressed text file and the following COPY command:

COPY store_sales
FROM 's3://*****************/text/store_sales.txt'
iam_role 'arn:aws:iam::**********:role/RedshiftAttachedRole';

The Amazon Redshift cluster without the auto split option took 102 seconds to copy the file from Amazon S3 to the Amazon Redshift store_sales table. When the auto split option was enabled in the Amazon Redshift cluster (without any other configuration changes), the same 6 GB uncompressed text file took just 6.19 seconds to copy the file from Amazon S3 to the store_sales table.

Summary

In this post, we showed two enhancements to the Amazon Redshift COPY command. First, we showed how you can add the FILLRECORD parameter in the COPY command in order to successfully load data files even when the contiguous fields are missing at the end of some of the records, as long as the target table has all the columns. Secondly, we described how Amazon Redshift auto splits large uncompressed text files into 64 MB chunks before copying the files into the Amazon Redshift cluster to enhance COPY performance. This automatic split of large files allows you to use the COPY command on large uncompressed text files—Amazon Redshift auto splits the file without needing you to add a preprocessing step to the split the large files yourself. Try these features to make your data loading to Amazon Redshift much simpler by removing custom preprocessing steps.

In Part 2 of this series, we will discuss additional new features of Amazon Redshift COPY command and demonstrate how you can take benefits of those to optimize your data loading process.


About the Authors

Dipankar Kushari is a Senior Analytics Solutions Architect with AWS.

Cody Cunningham is a Software Development Engineer with AWS, working on data ingestion for Amazon Redshift.

Joe Yong is a Senior Technical Product Manager on the Amazon Redshift team and a relentless pursuer of making complex database technologies easy and intuitive for the masses. He has worked on database and data management systems for SMP, MPP, and distributed systems. Joe has shipped dozens of features for on-premises and cloud-native databases that serve IoT devices through petabyte-sized cloud data warehouses. Off keyboard, Joe tries to onsight 5.11s, hunt for good eats, and seek a cure for his Australian Labradoodle’s obsession with squeaky tennis balls.

Anshul Purohit is a Software Development Engineer with AWS, working on data ingestion and query processing for Amazon Redshift.

Run and debug Apache Spark applications on AWS with Amazon EMR on Amazon EKS

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/run-and-debug-apache-spark-applications-on-aws-with-amazon-emr-on-amazon-eks/

Customers today want to focus more on their core business model and less on the underlying infrastructure and operational burden. As customers migrate to the AWS Cloud, they’re realizing the benefits of being able to innovate faster on their own applications by relying on AWS to handle big data platforms, operations, and automation.

Many of AWS’s customers have migrated their big data workloads from on premises to Amazon Elastic Compute Cloud (Amazon EC2) and Amazon EMR, and process large amounts of data to get insights from it in a secure and cost-effective manner.

If you’re using open-source Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS) clusters to run your big data workloads, you may want to use Amazon EMR to eliminate the heavy lifting of installing and managing your frameworks and integrations with other AWS services.

In this post, we discuss how to run and debug Apache Spark applications with Amazon EMR on Amazon EKS.

Benefits of using Amazon EMR on EKS

Amazon EMR on EKS is primarily beneficial for two key audiences:

  • Users that are self-managing open-source applications on Amazon EKS – You can get the benefits of Amazon EMR by having the ability to use the latest fully managed versions of open-source big data analytics frameworks and optimized EMR runtime for Spark with two times faster performance than open-source Apache Spark. You can take advantage of the integrated developer experience for data scientists and developers with Amazon EMR Studio, and a fully managed persistent application user interface (Spark History Server) for simplified logging, monitoring, and debugging. Amazon EMR also provides native integrations with AWS services including Amazon CloudWatch, Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, AWS Step Functions, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA).
  • Existing Amazon EMR users – You can use Amazon EMR on EKS to improve resource utilization by simplifying infrastructure management and consolidating your Amazon EMR applications to run alongside other container-based applications on shared Amazon EKS clusters. You can also centrally manage infrastructure using familiar Kubernetes tools. Additionally, this provides the advantage of running different versions and configurations of the same runtime on a single Amazon EKS cluster with separation of compute, which is no longer tied to a specific analytics framework, version, or configuration.

With Amazon EMR on EKS, you can now let your teams focus on developing big data applications on Spark as rapidly as possible in a highly reliable, available, secure, and cost-efficient manner.

The following diagram shows a high-level representation of Amazon EMR on EKS. The architecture loosely coupled applications to the infrastructure that they run on. When you submit a job to Amazon EMR, your job definition contains all of its application-specific parameters. Amazon EMR uses these parameters to instruct Amazon EKS about which pods and containers to deploy. Amazon EKS then brings online the computing resources from Amazon EC2 and AWS Fargate required to run the job. With this loose coupling of services, you can run multiple, securely isolated jobs simultaneously.

Solution overview

In this post, we guide you through a step-by-step process of deploying an Amazon EMR on EKS cluster and then walk you through various options and techniques for troubleshooting your Apache Spark jobs.

We then show you how to run a Spark application on that cluster using NOAA Global Historical Climatology Network Daily (GHCN-D). This job reads weather data, joins it with weather station data, and produces an output dataset in Apache Parquet format that contains the details of precipitation readings for the US for 2011.

We also look at various options to monitor the Spark jobs and view the logs.

The following diagram illustrates our high-level architecture.

The solution contains the following deployment steps:

  1. Install and configure the prerequisites, including the AWS Command Line Interface (AWS CLI) kubectl, and eksctl.
  2. Provision the Amazon EKS cluster using an AWS CloudFormation stack.
  3. Configure the AWS CLI tools and create credentials and permissions.
  4. Provision compute and set up an EMR virtual cluster on Amazon EKS.
  5. Create the Amazon EMR Spark application.
  6. Run the Spark application.

Prerequisites

Before you get started, complete the following prerequisites:

  1. Install the AWS CLI v2.
  2. Install kubectl.
  3. Install eksctl.

Provision the Amazon EKS cluster using AWS CloudFormation

This post uses two CloudFormation stacks. You can download the CloudFormation templates we reference in this post from a public S3 bucket, or you can launch them directly from this post. AWS Identity and Access Management (IAM) roles are also provisioned as part of this step. For more information about the IAM permissions required to provision and manage an Amazon EKS cluster, see Using service-linked roles for Amazon EKS.

The CloudFormation template eks_cluster.yaml creates the following resources in your preferred AWS account and Region:

  • Network resources (one VPC, three public and three private subnets, and two security groups)
  • One S3 bucket required to store data and artifacts to run the Spark job
  • An Amazon EKS cluster with managed node groups with m5.2xlarge EC2 instances (configurable in the provided CloudFormation template)

For instructions on creating a stack, see Creating a stack on the AWS CloudFormation console.

  1. Choose Launch Stack to launch the stack via the AWS CloudFormation console.

The default parameter values are already populated for your convenience. Proceed with CloudFormation stack creation after verifying these values.

  1. For Stack name, enter emr-on-eks-stack.
  2. For ClusterName, enter eks-cluster.
  3. For EKSVersion, enter 1.19.
  4. For JobexecutionRoleName, enter eksjobexecutionrole.

CloudFormation stack creation should take 10–15 minutes. Make sure the stack is complete by verifying the status as CREATE_COMPLETE.

Additionally, you can verify that the Amazon EKS cluster was created using the following command, which displays the details of the cluster and shows the status as ACTIVE:

aws eks describe-cluster --name eks-cluster

Note the S3 bucket name (oS3BucketName) and the job execution role (rJobExecutionServiceRole) from the stack.

  1. We upload our artifacts (PySpark script) and data into the S3 bucket.

Configure the AWS CLI tools and create credentials and permissions

To configure the AWS CLI tools, credentials, and permissions, complete the following steps:

  1. Configure kubectl to use the Amazon EKS cluster (the kubectl and eksctl commands need to run with the same AWS profile used when deploying the CloudFormation templates):
    aws eks --region <<Your AWS Region>> update-kubeconfig --name eks-cluster

  2. Create a dedicated namespace for running Apache Spark jobs using Amazon EMR on EKS:
    kubectl create namespace emroneks

  3. To enable Amazon EMS on EKS to access the namespace we created, we have to create a Kubernetes role and Kubernetes user, and map the Kubernetes user to the Amazon EMR on EKS linked role:
    eksctl create iamidentitymapping --cluster eks-cluster --namespace emroneks --service-name "emr-containers"

To use IAM roles for service accounts, an IAM OIDC provider must exist for your cluster.

  1. Create an IAM OIDC identity provider for the Amazon EKS cluster:
eksctl utils associate-iam-oidc-provider --cluster eks-cluster –approve

When you use IAM roles for service accounts to run jobs on a Kubernetes namespace, an administrator must create a trust relationship between the job execution role and the identity of the Amazon EMR managed service account.

  1. The following command updates the trust relationship of the job execution role (refer to the preceding screenshot of the CloudFormation stack):
aws emr-containers update-role-trust-policy \
  --cluster-name eks-cluster \
  --namespace emroneks \
  --role-name eksjobexecutionrole

Provision compute and set up an EMR virtual cluster on Amazon EKS

For the minimum IAM permissions required to manage and submit jobs on the Amazon EMR on EKS cluster, see Grant users access to Amazon EMR on EKS. The roles are provisioned as part of this step.

Use the second CloudFormation template (emr_virtual_cluster.yaml) to create the following resources in the same preferred AWS account and Region:

  • Amazon EMR virtual cluster
  • Amazon EKS managed node groups
  1. Choose Launch Stack to launch the stack via the AWS CloudFormation console.

The default values are already populated for your convenience. Proceed with stack creation after verifying these values.

  1. For Stack name, enter EMRvirtualcluster.
  2. For ClusterStackName, enter emr-on-eks-stack.
  3. For Namespace, enter emroneks.
  4. For NodeAutoscalinggroupDesiredCapacity, enter 1.
  5. For NodeAutoScalingGroupMaxSize, enter 1.
  6. For NodeInstanceType, enter m5.2xlarge.

Stack creation should take 10–15 minutes. Make sure the stack is complete by verifying the status as CREATE_COMPLETE.

Note the oEMRvirtualclusterID value as the output of the stack. We use this virtualclusterID to submit our Spark application.

Additionally, you can verify that the node groups are set up correctly using the following commands:

aws eks list-nodegroups --cluster-name eks-cluster

You receive the following result:

{
    "nodegroups": [
        "emr-virtual-cluster-NodeGroup"
    ]
}

You can verify the details of the nodes with the following command (use the node group name from the preceding command):

aws eks describe-nodegroup --cluster-name eks-cluster --nodegroup-name emr-virtual-cluster-NodeGroup

This lists the details of all the nodes provisioned, the instance type, and subnet associations, among other details.

You’re now ready to create and run a Spark application on the cluster.

Create an Amazon EMR Spark application

To create the PySpark job, perform the following steps:

  1. Copy the NOAA Open data registry 2011 Weather Station data and the Weather Station Lookup data and save the files under the s3://<<Your S3 Bucket>>/noaa/csv.gz/ prefix.
    1. To copy the 2011 Weather Station data, use the following AWS CLI command:
      aws s3 cp s3://noaa-ghcn-pds/csv.gz/2011.csv.gz s3://<<Your S3 Bucket>>/noaa/csv.gz/2011.csv.gz

    2. To copy the Weather Station Lookup data, use the following AWS CLI command:
      aws s3 cp s3://noaa-ghcn-pds/ghcnd-stations.txt s3://<<Your S3 Bucket>>/noaa/ghcnd-stations.txt

You can find the value for <<Your S3 Bucket>> in the oS3Bucketname key on the Outputs tab for the emr-on-eks-stack CloudFormation stack.

  1. Download the PySpark script and upload it under s3://<<Your S3 Bucket>>/scripts/.

Run the Spark application

We run the Spark job using the AWS CLI. The parameters for the job (virtual cluster ID, script location, parameters) are mentioned in the JSON file.

  1. Save the following JSON template as jobparameters.json in a local folder (for example, /path/to/jobparameters.json):
{
  "name": "emr-on-eks-spark-job",
  "virtualClusterId": "<virtualclusterid>",
  "executionRoleArn": "arn:aws:iam::<<Your AWS Account Number>>:role/eksjobexecutionrole",
  "releaseLabel": "emr-6.2.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://<<Your S3 Bucket>>/scripts/etl.py",
          "entryPointArguments": ["s3://<<Your S3 Bucket>>/"],
       "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4"
    }
  },
  "configurationOverrides": {
    "applicationConfiguration": [
      {
        "classification": "spark-defaults",
        "properties": {
"spark.scheduler.minRegisteredResourcesRatio": "0.8",
          "spark.scheduler.maxRegisteredResourcesWaitingTime": "300s" }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs",
        "logStreamNamePrefix": "emreksblog"
      },
      "s3MonitoringConfiguration": {
        "logUri": "s3://<<Your S3 Bucket>>/joblogs"
      }
    }
  }
}

The configurationOverrides section is optional and can be used to backport any Spark configurations that are set for jobs running in Amazon EMR on EC2. The Spark job runs successfully without any additional configuration changes.

  1. Modify the following keys in your JSON file (/path/to/jobparameters.json):
    1. virtualClusterId – The ID of the EMR cluster on Amazon EKS. You can get this by looking at the oEMRvirtualclusterID output from the CloudFormation template or by running the following code:
      aws emr-containers list-virtual-clusters \
      --state RUNNING \
      --query 'virtualClusters[?containerProvider.info.eksInfo.namespace==`emroneks`]'

    2. executionRoleArn – The ARN of the role created in the CloudFormation template. Replace <<Your AWS Account Number>> with the AWS account number you deploy this stack in.
    3. entryPoint – The value of the path to the ETL script S3 bucket provisioned in the CloudFormation stack (for example, s3://<<Your S3 Bucket>>/scripts/etl.py).
    4. entryPointArguments – The Spark job accepts one argument—the S3 bucket name where the data files are stored (s3://<<Your S3 Bucket>>/).
    5. logUri – The path were the controller logs, Spark driver, and executor logs are written into. Enter it as s3://<<Your S3 Bucket>>/ joblogs.
    6. cloudWatchMonitoringConfiguration – The CloudWatch log group details where logs are published. Enter the value for logGroupName as /emr-containers/jobs and logStreamNamePrefix as emreksblog.

You can change the sparkSubmitParameters parameter in the preceding JSON as per your needs, but your node groups must have the right capacity to accommodate the combination of Spark executors, memory, and cores that you define in sparkSubmitParameters. The preceding configuration works for the cluster we provisioned through the CloudFormation template without any changes.

  1. Submit the job with the following AWS CLI command:
    aws emr-containers start-job-run --cli-input-json file://path/to/jobparameters.json

This returns a response with the job ID, which we can use to track the status of the job:

{
    "id": "00000002ucgkgj546u1",
    "name": "emr-on-eks-spark-job",
    "arn": "arn:aws:emr-containers:region:accountID:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkgj546u1",
    "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1"
}

You can get the status of a job by running the following command:

aws emr-containers describe-job-run --id <your job run id>   --virtual-cluster-id <<your virtualcluster id>> 

You can observe the status change from SUBMITTED to RUNNING to COMPLETED or FAILED.

{
    "jobRun": {
        "id": "00000002ucgkstiadcs",
        "name": "emr-on-eks-spark-job",
        "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1",
        "arn": "arn:aws:emr-containers:<<region>>:<<Your AWS Account Number>>:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkstiadcs",
        "state": "SUBMITTED",
        "clientToken": "52203410-3e55-4294-a548-dc9212d10b37",


{
    "jobRun": {
        "id": "00000002ucgkstiadcs",
        "name": "emr-on-eks-spark-job",
        "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1",
        "arn": "arn:aws:emr-containers:<<region>>: :<<Your AWS Account Number>>:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkstiadcs",
        "state": "RUNNING",
        "clientToken": "52203410-3e55-4294-a548-dc9212d10b37",

{
    "jobRun": {
        "id": "00000002ucgkstiadcs",
        "name": "emr-on-eks-spark-job",
        "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1",
        "arn": "arn:aws:emr-containers:<<region>>: :<<Your AWS Account Number>>:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkstiadcs",
        "state": "COMPLETED",
        "clientToken": "52203410-3e55-4294-a548-dc9212d10b37",

When the job state changes to COMPLETED, you can see a prefix in your S3 bucket called noaaparquet with a dataset created within the prefix.

If the job status reaches the FAILED state, you can troubleshoot by going through the details found in the CloudWatch logs or the logs written into Amazon S3. For details on how to access and use those logs, refer to the following debugging section.

Occasionally, you may notice that the job is stuck in SUBMITTED status for a long time. This could be due to the fact that the Amazon EKS cluster is running other jobs and doesn’t have available capacity. When the existing job is complete, your job changes to the RUNNING state.

Another scenario could be that you set the driver and executor memory requirements in your Spark configuration (jobparameters.json) to more than what is available. Consider adjusting the spark.executor.memory and spark.driver.memory values based on the instance type in your node group. See the following code:

{
  "name": "emr-on-eks-spark-job",
  "virtualClusterId": "<virtualclusterid>",
  "executionRoleArn": "arn:aws:iam::<<Your AWS Account Number>>:role/eksjobexecutionrole",
  "releaseLabel": "emr-6.2.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://<<Your S3 Bucket>>/scripts/etl.py",
          "entryPointArguments": ["s3://<<Your S3 Bucket>>/"],
       "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4"

If your job is stuck or failing due to insufficient capacity, consider increasing the number of nodes in your node group or setting up the Amazon EKS Cluster Autoscaler. Refer to the debugging section for additional details from the Kubernetes dashboard.

For additional information on Amazon EMR on EKS fundamentals, refer to the appendix at the end of this post.

Debug your Spark application

Amazon EMR on EKS provides multiple options to debug and view the logs of the Spark application.

For issues specific to Spark applications, use Spark History Server, CloudWatch logs, or logs on Amazon S3, which we discuss in this section.

For troubleshooting issues, such as your jobs aren’t starting (job status in SUBMITTED state) or issues with Spark drivers, start with Kubernetes dashboard or kubectl CLI commands, discussed in detail in this section.

Spark History Server

Spark History Server provides an elaborate web UI that allows us to inspect various components of our applications. It offers details on memory usage, jobs, stages, and tasks, as well as event timelines, logs, and various metrics and statistics both at the Spark driver level and for individual executors. It shows collected metrics and the state of the program, revealing clues about possible performance bottlenecks that you can utilize for tuning and optimizing the application. You can look at the Spark History Server (in the Spark UI) from the Amazon EMR console to see the driver and executor logs, as long as you have Amazon S3 logging enabled (which we enabled as part of the job submission JSON payload). The Spark UI is available even after the job is complete and the cluster is stopped. For more information on troubleshooting, see How do I troubleshoot a failed Spark step in Amazon EMR?

The following screenshots show the Spark UI of the job submitted on the cluster.

Choose a specific app ID to see the details of the Spark SQL and stages that ran. This helps you see the explain plan of the query and rows processed by each stage to narrow down any bottlenecks in your process.

If you don’t see the Spark UI link enabled or you see an error message “Unable to launch application UI,” verify the parameter s3MonitoringConfiguration in the jobparameters.json to ensure that a valid S3 path is provided. Additionally, ensure that the job execution role has appropriate permissions to access the S3 bucket. This was defined in the CloudFormation template that you deployed earlier. See the following code:

"monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs",
        "logStreamNamePrefix": "emreksblog"
      },
      "s3MonitoringConfiguration": {
        "logUri": "s3://<<Your S3 Bucket>>/joblogs"
      }
    }

To increase the logging level of the Spark application to DEBUG, update the spark-log4j configuration. For instructions, see Change Log level for Spark application on EMR on EKS.

CloudWatch logs

In the preceding jobparameters.json, the log group name was /emr-containers/jobs and the prefix was emrjobs. You can access logs via the CloudWatch console for this prefix.

The path for various types of logs available in CloudWatch are as follows:

  • Controller logslogGroup/logStreamPrefix/virtual-cluster-id/jobs/job-id/containers/pod-name/(stderr/stdout)
  • Driver logslogGroup/logStreamPrefix/virtual-cluster-id/jobs/job-id/containers/spark-application-id/spark-job-id-driver/(stderrstdout)
  • Executor logs – logGroup/logStreamPrefix/virtual-cluster-id/jobs/job-id/containers/spark-application-id/executor-pod-name/(stderr/stdout)

In the jobparameters.json configuration, logGroup is set as /emr-containers/jobs and logStreamPrefix is set as emreksblog.

Here you can retrieve the Spark driver and executor logs to view additional details and stack trace of any error messages when your Spark job has failed.

You can filter the CloudWatch log stream by driver/stdout to see the output and driver/stderr to see details of errors from your Spark job.

The following are some common scenarios to verify in case the logs aren’t available in CloudWatch:

  • Ensure that the log group parameter is defined in jobparameters.json under monitoringConfiguration (refer to the JSON file for the details of parameters):
        "monitoringConfiguration": {
          "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/emr-containers/jobs",
            "logStreamNamePrefix": "emreksblog"
          },

  • Ensure that the service role associated with the Amazon EMR on EKS cluster has access to write to the CloudWatch log group. The CloudFormation template you deployed has the policy associated with the IAM role to grant appropriate permissions to allow access to write to the log groups. For additional IAM policy examples, see Using identity-based policies (IAM policies) for CloudWatch Logs.

Amazon S3 logs

In the configuration, the log path is listed as S3://<Your S3 bucket>/joblogs under the corresponding job ID.

You can go to S3 bucket you specified to check the logs. Your log data is sent to the following Amazon S3 locations depending on the types of logs:

  • Controller logsS3://<Your S3 bucket>//joblogs/virtual-cluster-id/jobs/job-id/containers/pod-name/(stderr.gz/stdout.gz)
  • Driver logsS3://<Your S3 bucket>//joblogs/virtual-cluster-id/jobs/job-id/containers/spark-application-id/spark-job-id-driver/(stderr.gz/stdout.gz)
  • Executor logsS3://<Your S3 bucket>//joblogs/virtual-cluster-id/jobs/job-id/containers/spark-application-id/executor-pod-name/(stderr.gz/stdout.gz)

Here you can retrieve the Spark driver and executor logs to view additional details and stack trace of any error messages when your Spark job has failed.

Kubernetes dashboard

You can view and monitor the online logs of a running Spark job in a Kubernetes dashboard. The dashboard provides information on the state of Kubernetes resources in your cluster and any errors that may have occurred while the job is running. The logs are only accessible through the Kubernetes dashboard while the cluster is running the job. The dashboard is a useful way to quickly identify any issue with the job while it’s running and the logs are getting written.

For details on how to deploy, set up, and view the dashboard, see Tutorial: Deploy the Kubernetes Dashboard (web UI). After you deploy the Kubernetes dashboard and launch it, complete the following steps to see the details of your job run:

  1. Choose the right namespace that was registered with the EMR virtual cluster for the Amazon EKS cluster.
  2. Choose Pods in the navigation pane to see all the running pods.
  3. Choose the additional options icon (three vertical dots) to open logs for each pod.

The following screenshot shows the Spark driver that was spawned when the Spark job was submitted to the EMR virtual cluster.

  1. Choose the spark-kubernetes-executor container log to see the running online logs of your Spark job.

The following screenshots show the running log of the Spark application while it’s running on the EMR virtual cluster.

  1. Choose Pods to see the CPU and memory consumption of the individual POD running the application.

The following screenshots the CPU and memory usage of the Spark application for the duration of the job. This helps determine if you have provisioned adequate capacity for your jobs.

In case of insufficient capacity with memory or CPU, you see the following error. You can choose the pod to see additional details.

Kubernetes CLI

You can view Spark driver and Spark executer logs using the Kubernetes CLI (kubectl). Logs are accessible through the Kubernetes CLI while the cluster is running the job.

  1. Get the name of the Spark driver and Spark executor pods in the emroneks namespace:
kubectl get pods -n emroneks

You see multiple pods for the Spark driver and executors that are currently running.

  1. Use the pod name for the driver to see the driver logs:
    kubectl logs <Spark driver pod name> -n emroneks -c spark-kubernetes-driver

  2. Use the pod name for the executors to see the executor logs:
    kubectl logs <Spark executor pod name> -n emroneks -c spark-kubernetes-executor

For more issues and resolutions when running jobs on Amazon EMR on EKS, see Common errors when running jobs.

Clean up

When you’re done using this solution, you should delete the following CloudFormation stacks, via the CloudFormation console, to avoid incurring any further charges:

  • EMRvirtualcluster
  • emr-on-eks-stack

Conclusion

This post describes how you can run your existing Apache Spark workloads on Amazon EMR on EKS. The use case demonstrates setting up the infrastructure, and running and monitoring your Spark job. We also showed you various options and techniques to debug and troubleshoot your jobs.

Amazon EMR also provides the capability to perform data analysis and data engineering tasks in a web-based integrated development environment (IDE), using fully managed Jupyter notebooks. Refer to this post to set up EMR Studio with EMR on EKS.


Appendix: Explaining the solution

In this solution, we first built an Amazon EKS cluster using a CloudFormation template and registered it with Amazon EMR. Then we submitted a Spark job using the AWS CLI on the EMR virtual cluster on Amazon EKS. Let’s look at some of the important concepts related to running a Spark job on Amazon EMR on EKS.

Kubernetes namespaces

Amazon EKS uses Kubernetes namespaces to divide cluster resources between multiple users and applications. These namespaces are the foundation for multi-tenant environments. A Kubernetes namespace can have both Amazon EC2 and Fargate as the compute provider. Fargate selection for pods can be done using user-defined Fargate profiles. This flexibility provides different performance and cost options for the Spark jobs to run on. In this post, we provisioned an Amazon EKS cluster with node groups containing an m5.2x large EC2 instance.

Virtual cluster

A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster, and each virtual cluster maps to one namespace on an Amazon EKS cluster.

Job run

A job run is a unit of work, such as a Spark JAR (Scala or Java application), PySpark script, or SparkSQL query, that you submit to Amazon EMR on EKS. One job can have multiple job runs. When you submit a job run, it should include the following information:

  • A virtual cluster where the job should run
  • A job name to identify the job
  • The execution role, which is a scoped IAM role that runs the job (in a Kubernetes service account), is used to run the pod, and allows you to specify which resources can be accessed by the job
  • The Amazon EMR release label that specifies the version of Amazon EMR Spark to use
  • The artifacts to use when submitting your job, such as spark-submit parameters

Amazon EMR containers

An Amazon EMR container is the API name for Amazon EMR on EKS. The emr-containers prefix is used in the following scenarios:

  • In the AWS CLI commands for Amazon EMR on EKS. For example, aws emr-containers start-job-run.
  • Before IAM policy actions for Amazon EMR on EKS. For example, "Action": [ "emr-containers:StartJobRun"]. For more information, see Policy actions for Amazon EMR on EKS.
  • In Amazon EMR on EKS service endpoints. For example, emr-containers.us-east-1.amazonaws.com.

In the solution overview, we went step by step through how we used above resources to create the Amazon EMR on EKS cluster and run a Spark job. For further details on these concepts, see Concepts.


About the Authors

Dipankar Kushari is a Senior Analytics Solutions Architect with AWS, helping customers build analytics platform and solutions. He has a keen interest in distributed computing. Dipankar enjoys spending time playing chess and watching old Hollywood movies.

 

 

 

Ashok Padmanabhan is a big data consultant with AWS Professional Services, helping customers build big data and analytics platform and solutions. When not building and designing data lakes, Ashok enjoys spending time at beaches near his home in Florida.

 

Gaurav Gundal is a DevOps consultant with AWS Professional Services, helping customers build solutions on the customer platform. When not building, designing, or developing solutions, Gaurav spends time with his family, plays guitar, and enjoys traveling to different places.

 

Naveen Madhire is a Big Data Architect with AWS Professional Services, helping customers create data lake solutions on AWS. Outside of work, he loves playing video games and watching crime series on TV.