All posts by Ashwini Kumar

Query big data with resilience using Trino in Amazon EMR with Amazon EC2 Spot Instances for less cost

Post Syndicated from Ashwini Kumar original https://aws.amazon.com/blogs/big-data/query-big-data-with-resilience-using-trino-in-amazon-emr-with-amazon-ec2-spot-instances-for-less-cost/

Amazon Elastic Compute Cloud (Amazon EC2) Spot Instances offer spare compute capacity available in the AWS Cloud at steep discounts compared to On-Demand prices. Amazon EMR provides a managed Hadoop framework that makes it straightforward, fast, and cost-effective to process vast amounts of data using EC2 instances. Amazon EMR with Spot Instances allows you to reduce costs for running your big data workloads on AWS. Amazon EC2 can interrupt Spot Instances with a 2-minute notification whenever Amazon EC2 needs to reclaim capacity for On-Demand customers. Spot Instances are best suited for running stateless and fault-tolerant big data applications such as Apache Spark with Amazon EMR, which are resilient against Spot node interruptions.

Trino (formerly PrestoSQL) is an open-source, highly parallel, distributed SQL query engine to run interactive queries as well as batch processing on petabytes of data. It can perform in-place, federated queries on data stored in a multitude of data sources, including relational databases (MySQL, PostgreSQL, and others), distributed data stores (Cassandra, MongoDB, Elasticsearch, and others), and Amazon Simple Storage Service (Amazon S3), without the need for complex and expensive processes of copying the data to a single location.

Before Project Tardigrade, Trino queries failed whenever any of the nodes in Trino clusters failed, and there was no automatic retry mechanism with iterative querying capability. Also, failed queries had to be restarted from scratch. Due to this limitation, the cost of failures of long-running extract, transform, and load (ETL) and batch queries on Trino was high in terms of completion time, compute wastage, and spend. Spot Instances were not appropriate for long-running queries with Trino clusters and only suited for short-lived Trino queries.

In October 2022, Amazon EMR announced a new capability in the Trino engine to detect 2-minute Spot interruption notifications and determine if the existing queries can complete within 2 minutes on those nodes. If the queries can’t finish, Trino will fail them quickly and retry the queries on different nodes. Also, Trino doesn’t schedule new queries on these Spot nodes, which are about to be reclaimed. In November 2022, Amazon EMR added support for Project Tardigrade’s fault-tolerant option in the Trino engine with Amazon EMR 6.8 and above. Enabling this feature mitigates Trino task failures caused by worker node failures due to Spot interruptions or On-Demand node stops. Trino now retries failed tasks using intermediate exchange data checkpointed on Amazon S3 or HDFS.

These new enhancements in Trino with Amazon EMR provide improved resiliency for running ETL and batch workloads on Spot Instances with reduced costs. This post showcases the resilience of Amazon EMR with Trino using fault-tolerant configuration to run long-running queries on Spot Instances to save costs. We simulate Spot interruptions on Trino worker nodes by using AWS Fault Injection Simulator (AWS FIS).

Trino architecture overview

Trino runs a query by breaking up the run into a hierarchy of stages, which are implemented as a series of tasks distributed over a network of Trino workers. This pipelined execution model runs multiple stages in parallel and streams data from one stage to another as the data becomes available. This parallel architecture reduces end-to-end latency and makes Trino a fast tool for ad hoc data exploration and ETL jobs over very large datasets. The following diagram illustrates this architecture.

In a Trino cluster, the coordinator is the server responsible for parsing statements, planning queries, and managing workers. The coordinator is also the node to which a client connects and submits statements to run. Every Trino cluster must have at least one coordinator. The coordinator creates a logical model of a query involving a series of stages, which is then translated into a series of connected tasks running on Trino workers. In Amazon EMR, the Trino coordinator runs on the EMR primary node and workers run on core and task nodes.

Faster insights with lower costs with EC2 Spot

You can save significant costs for your ETL and batch workloads running on EMR Trino clusters with a blend of Spot and On-Demand Instances. You can also reduce time-to-insight with faster query runs with lower costs by running more worker nodes on Spot Instances, using the parallel architecture of Trino.

For example, a long-running query on EMR Trino that takes an hour can be finished faster by provisioning more worker nodes on Spot Instances, as shown in the following figure.

Fault-tolerant Trino configuration in Amazon EMR

Fault-tolerant execution in Trino is disabled by default; you can enable it by setting a retry policy in the Amazon EMR configuration. Trino supports two types of retry policies:

  • QUERY – The QUERY retry policy instructs Trino to retry the whole query automatically when an error occurs on a worker node. This policy is only suitable for short-running queries because the whole query is retried from scratch.
  • TASK – The TASK retry policy instructs Trino to retry individual query tasks in the event of failure. This policy is recommended for long-running ETL and batch queries.

With fault-tolerant execution enabled, intermediate exchange data is spooled on an exchange manager so that another worker node can reuse it in the event of a node failure to complete the query run. The exchange manager uses a storage location on Amazon S3 or Hadoop Distributed File System (HDFS) to store and manage spooled data, which is spilled beyond in-memory buffer size of worker nodes. By default, Amazon EMR release 6.9.0 and later uses HDFS as an exchange manager.

Solution overview

In this post, we create an EMR cluster with following architecture.

We provision the following resources using Amazon EMR and AWS FIS:

  • An EMR 6.9.0 cluster with the following configuration:
    • Apache Hadoop, Hue, and Trino applications
    • EMR instance fleets with the following:
      • One primary node (On-Demand) as the Trino coordinator
      • Two core nodes (On-Demand) as the Trino workers and exchange manager
      • Four task nodes (Spot Instances) as Trino workers
    • Trino’s fault-tolerant configuration with following:
      • TPCDS connector
      • The TASK retry policy
      • Exchange manager directory on HDFS
      • Optional recommended settings for query performance optimization
  • An FIS experiment template to target Spot worker nodes in the Trino cluster with interruptions to demonstrate fault-tolerance of EMR Trino with Spot Instances

We use the new Amazon EMR console to create an EMR 6.9.0 cluster. For more information about the new console, refer to Summary of differences.

Create an EMR 6.9.0 cluster

Complete the following steps to create your EMR cluster:

  1. On the Amazon EMR console, create an EMR 6.9.0 cluster named emr-trino-cluster with Hadoop, Hue, and Trino applications using the Custom application bundle.

We need Hue’s web-based interface for submitting SQL queries to the Trino engine and HDFS on core nodes to store intermediate exchange data for Trino’s fault-tolerant runs.

Using multiple Spot capacity pools (each instance type in each Availability Zone is a separate pool) is a best practice to increase your chances of getting large-scale Spot capacity and minimize the impact of a specific instance type being reclaimed in EMR clusters. The Amazon EMR console allows you to configure up to 5 instance types for your core fleet and 15 instance types for your task fleet with the Spot allocation strategy, which allows up to 30 instance types for each fleet from the AWS Command Line Interface (AWS CLI) or Amazon EMR API.

  1. Configure the primary, core, and task fleets with primary and core nodes with On-Demand Instances (m5.xlarge) and task nodes with Spot Instances using multiple instance types.

When you use the Amazon EMR console, the number of vCPUs of the EC2 instance type are used as the count towards the total target capacity of a core or task fleet by default. For example, an m5.xlarge instance type with 4 vCPUs is considered as 4 units of capacity by default.

  1. On the Actions menu under Core or Task fleet, choose Edit weighted capacity.

  1. Because each instance type with 4 vCPUs (xlarge size) is 4 units of capacity, let’s set the cluster size with 8 core units (2 nodes) with On-Demand and 16 task units (4 nodes) with Spot.

Unlike core and task fleets, the primary fleet is always one instance, so no sizing configuration is needed or available for the primary node on the Amazon EMR console.

  1. Select Price-capacity optimized as your Spot allocation strategy, which launches the lowest-priced Spot Instances from your most available pools.

  1. Configure Trino’s fault-tolerant settings in the Software settings section:
[
  {
    "Classification": "trino-connector-tpcds",
    "Properties": {
      "connector.name": "tpcds"
    }
  },
  {
    "Classification": "trino-config",
    "Properties": {
      "exchange.compression-enabled": "true",
      "query.low-memory-killer.delay": "0s",
      "query.remote-task.max-error-duration": "1m",
      "retry-policy": "TASK"
    }
  },
  {
    "Classification": "trino-exchange-manager",
    "Properties": {
      "exchange.base-directories": "/exchange",
      "exchange.use-local-hdfs": "true"
    }
  }
]

Alternatively, you can create a JSON config file with the configuration, store it in an S3 bucket, and select the file path from its S3 location by selecting Load JSON from Amazon S3.

Let’s understand some optional settings for query performance optimization that we have configured:

  • “exchange.compression-enabled”:”true” – This is recommended to enable compression to reduce the amount of data spooled on exchange manager.
  • “query.low-memory-killer.delay”: “0s” – This will reduce the low memory killer delay to allow the Trino engine to unblock nodes running short on memory faster.
  • “query.remote-task.max-error-duration”: “1m” – By default, Trino waits for up to 5 minutes for the task to recover before considering it lost and rescheduling it. This timeout can be reduced for faster retrying of the failed tasks.

For more details of Trino’s fault-tolerant configuration parameters, refer to Fault-tolerant execution.

  1. Let’s also add a tag key called Name with the value MyTrinoCluster to launch EC2 instances with this tag name.

We’ll use this tag to target Spot Instances in the cluster with AWS FIS.

The EMR cluster will take few minutes to be ready in the Waiting state.

Configure an FIS experiment template to target Spot Instances with interruptions in the EMR Trino cluster

We now use the AWS FIS console to simulate interruptions of Spot Instances in the EMR Trino cluster and showcase the fault-tolerance of the Trino engine. Complete the following steps:

  1. On the AWS FIS console, create an experiment template.

  1. Under Actions, choose Add action.
  2. Create an AWS FIS action with Action type as aws:ec2:send-spot-instance-interruptions and Duration Before Interruption as 2 minutes.
  3. Choose Save.

This means FIS will interrupt targeted Spot Instances after 2 minutes of running the experiment.

  1. Under Targets, choose Edit to target all Spot Instances running in the EMR cluster.
  2. For Resource tags, use Name= MyTrinoCluster.
  3. For Resource filters, use as State.Name=running.
  4. For Selection mode, set to ALL.
  5. Choose Save.

  1. Create a new AWS Identity and Access Management (IAM) role automatically to provide permissions to AWS FIS.

  1. Choose Create experiment template.

Launch Hue and Trino web interfaces

When your EMR cluster is in the Waiting state, connect to the Hue web interface for Trino queries and the Trino web interface for monitoring. Alternatively, you can submit your Trino queries using trino-cli after connecting via SSH to your EMR cluster’s primary node. In this post, we will use the Hue web interface for running queries on the EMR Trino engine.

  1. To connect to Hue interface on the primary node from your local computer, navigate to the EMR cluster’s Properties, Network and security, and EC2 security groups (firewall) section.
  2. Edit the primary node security group’s inbound rule to add your IP address and port (port 22).
  3. Retrieve your EMR cluster’s primary node public DNS from your EMR cluster’s Summary tab.

Refer to View web interfaces hosted on Amazon EMR clusters for details on connecting to web interfaces in the primary node from your local computer. You can set up an SSH tunnel with dynamic port forwarding between your local computer and the EMR primary node. Then you can configure proxy settings for your internet browser by using an add-ons such as FoxyProxy for Firefox or SwitchyOmega for Chrome to manage your SOCKS proxy settings.

  1. Connect to Hue by copying the URL (http://<youremrcluster-primary-node-public-dns>:8888/) in your web browser.
  2. Create an account with your choice of user name and password.

After you log in to your account, you can see the query editor on Hue’s web interface.

By default, Amazon EMR configures the Trino web interface on the Trino coordinator (EMR primary node) to use port 8889.

  1. To connect to the Trino web interface, copy the URL (http://<youremrcluster-primary-node-public-dns>:8889/) in your web browser, where you can monitor the Trino cluster and query performance.

In the following screenshot, we can see six active Trino workers (two core and four task nodes of EMR cluster) and no running queries.

  1. Let’s run the Trino query

    select * from system.runtime.nodes from the Hue query editor to see the coordinator and worker nodes’ status and details.

We can see all cluster nodes are in the active state.

Test fault tolerance on Spot interruptions

To test the fault tolerance on Spot interruptions, complete the following steps:

  1. Run the following Trino query using Hue’s query editor:
with inv as
(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stdev,mean, case mean when 0 then null else stdev/mean end cov
from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy
,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean
from tpcds.sf100.inventory
,tpcds.sf100.item
,tpcds.sf100.warehouse
,tpcds.sf100.date_dim
where inv_item_sk = i_item_sk
and inv_warehouse_sk = w_warehouse_sk
and inv_date_sk = d_date_sk
and d_year =1999
group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo
where case mean when 0 then 0 else stdev/mean end > 1)
select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov
,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov
from inv inv1,inv inv2
where inv1.i_item_sk = inv2.i_item_sk
and inv1.w_warehouse_sk = inv2.w_warehouse_sk
and inv1.d_moy=4
and inv2.d_moy=4+1
and inv1.cov > 1.5
order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov ,inv2.d_moy,inv2.mean, inv2.cov

When you go to the Trino web interface, you can see the query running on six active worker nodes (two core On-Demand and four task nodes on Spot Instances).

  1. On the AWS FIS console, choose Experiment templates in the navigation pane.
  2. Select the experiment template EMR_Trino_Interrupter and choose Start experiment.

After a few seconds, the experiment will be in the Completed state and it will trigger stopping all four Spot Instances (four Trino workers) after 2 minutes.

After some time, we can observe in the Trino web UI that we have lost four Trino workers (task nodes running on Spot Instances) but the query is still running with the two remaining On-Demand worker nodes (core nodes). Without the fault-tolerant configuration in EMR Trino, the whole query would fail with even a single worker node failure.

  1. Run the select * from system.runtime.nodes query again in Hue to check the Trino cluster nodes status.

We can see four Spot worker nodes with the status shutting_down.

Trino starts shutting down the four Spot worker nodes as soon as they receive the 2-minute Spot interruption notification sent by the AWS FIS experiment. It will start retrying any failed tasks of these four Spot workers on the remaining active workers (two core nodes) of the cluster. The Trino engine will also not schedule tasks of any new queries on Spot worker nodes in the shutting_down state.

The Trino query will keep running on the remaining two worker nodes and succeed despite the interruption of the four Spot worker nodes. Soon after the Spot nodes stop, Amazon EMR will replenish the stopped capacity (four task nodes) by launching four replacement Spot nodes.

Achieve faster query performance for lower cost with more Trino workers on Spot

Now let’s increase Trino workers capacity from 6 to 10 nodes by manually resizing EMR task nodes on Spot Instances (from 4 to 8 nodes).

We run the same query on a larger cluster with 10 Trino workers. Let’s compare the query completion time (wall time in the Trino Web UI) with the earlier smaller cluster with six workers. We can see 32% faster query performance (1.57 minutes vs. 2.33 minutes).

You can run more Trino workers on Spot Instances to run queries faster to meet your SLAs or process a larger number of queries. With Spot Instances available at discounts up to 90% off On-Demand prices, your cluster costs will not increase significantly vs. running the whole compute capacity on On-Demand Instances.

Clean up

To avoid ongoing charges for resources, navigate to the Amazon EMR console and delete the cluster emr-trino-cluster.

Conclusion

In this post, we showed how you can configure and launch EMR clusters with the Trino engine using its fault-tolerant configuration. With the fault tolerant feature, Trino worker nodes can be run as EMR task nodes on Spot Instances with resilience. You can configure a well-diversified task fleet with multiple instance types using the price-capacity optimized allocation strategy. This will make Amazon EMR request and launch task nodes from the most available, lower-priced Spot capacity pools to minimize costs, interruptions, and capacity challenges. We also demonstrated the resilience of EMR Trino against Spot interruptions using an AWS FIS Spot interruption experiment. EMR Trino continues to run queries by retrying failed tasks on remaining available worker nodes in the event of any Spot node interruption. With fault-tolerant EMR Trino and Spot Instances, you can run big data queries with resilience, while saving costs. For your SLA-driven workloads, you can also add more compute on Spot to adhere to or exceed your SLAs for faster query performance with lower costs compared to On-Demand Instances.


About the Authors

Ashwini Kumar is a Senior Specialist Solutions Architect at AWS based in Delhi, India. Ashwini has more than 18 years of industry experience in systems integration, architecture, and software design, with more recent experience in cloud architecture, DevOps, containers, and big data engineering. He helps customers optimize their cloud spend, minimize compute waste, and improve performance at scale on AWS. He focuses on architectural best practices for various workloads with services including EC2 Spot, AWS Graviton, EC2 Auto Scaling, Amazon EKS, Amazon ECS, and AWS Fargate.

Dipayan Sarkar is a Specialist Solutions Architect for Analytics at AWS, where he helps customers modernize their data platform using AWS Analytics services. He works with customers to design and build analytics solutions, enabling businesses to make data-driven decisions.