All posts by Anubhav Awasthi

Monitor Apache HBase on Amazon EMR using Amazon Managed Service for Prometheus and Amazon Managed Grafana

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/monitor-apache-hbase-on-amazon-emr-using-amazon-managed-service-for-prometheus-and-amazon-managed-grafana/

Amazon EMR provides a managed Apache Hadoop framework that makes it straightforward, fast, and cost-effective to run Apache HBase. Apache HBase is a massively scalable, distributed big data store in the Apache Hadoop ecosystem. It is an open-source, non-relational, versioned database that runs on top of the Apache Hadoop Distributed File System (HDFS). It’s built for random, strictly consistent, real-time access for tables with billions of rows and millions of columns. Monitoring HBase clusters is critical in order to identify stability and performance bottlenecks and proactively preempt them. In this post, we discuss how you can use Amazon Managed Service for Prometheus and Amazon Managed Grafana to monitor, alert, and visualize HBase metrics.

HBase has built-in support for exporting metrics via the Hadoop metrics subsystem to files or Ganglia or via JMX. You can either use AWS Distro for OpenTelemetry or Prometheus JMX exporters to collect metrics exposed by HBase. In this post, we show how to use Prometheus exporters. These exporters behave like small webservers that convert internal application metrics to Prometheus format and serve it at /metrics path. A Prometheus server running on an Amazon Elastic Compute Cloud (Amazon EC2) instance collects these metrics and remote writes to an Amazon Managed Service for Prometheus workspace. We then use Amazon Managed Grafana to create dashboards and view these metrics using an Amazon Managed Service for Prometheus workspace as its data source.

This solution can be extended to other big data platforms such as Apache Spark and Apache Presto that also use JMX to expose their metrics.

Solution overview

The following diagram illustrates our solution architecture.

Solution Architecture

This post uses an AWS CloudFormation template to perform below actions:

  1. Install an open-source Prometheus server on an EC2 instance.
  2. Create appropriate AWS Identity and Access Management (IAM) roles and security group for the EC2 instance running the Prometheus server.
  3. Create an EMR cluster with an HBase on Amazon S3 configuration.
  4. Install JMX exporters on all EMR nodes.
  5. Create additional security groups for the EMR master and worker nodes to connect with the Prometheus server running on the EC2 instance.
  6. Create a workspace in Amazon Managed Service for Prometheus.

Prerequisites

To implement this solution, make sure you have the following prerequisites:

aws emr create-default-roles

Deploy the CloudFormation template

Deploy the CloudFormation template in the us-east-1 Region:

Launch Stack

It will take 15–20 minutes for the template to complete. The template requires the following fields:

  • Stack Name – Enter a name for the stack
  • VPC – Choose an existing VPC
  • Subnet – Choose an existing subnet
  • EMRClusterName – Use EMRHBase
  • HBaseRootDir – Provide a new HBase root directory (for example, s3://hbase-root-dir/).
  • MasterInstanceType – Use m5x.large
  • CoreInstanceType – Use m5x.large
  • CoreInstanceCount – Enter 2
  • SSHIPRange – Use <your ip address>/32 (you can go to https://checkip.amazonaws.com/ to check your IP address)
  • EMRKeyName – Choose a key pair for the EMR cluster
  • EMRRleaseLabel – Use emr-6.9.0
  • InstanceType – Use the EC2 instance type for installing the Prometheus server

cloud formation parameters

Enable remote writes on the Prometheus server

The Prometheus server is running on an EC2 instance. You can find the instance hostname in the CloudFormation stack’s Outputs tab for key PrometheusServerPublicDNSName.

  1. SSH into the EC2 instance using the key pair:
    ssh -i <sshKey.pem> ec2-user@<Public IPv4 DNS of EC2 instance running Prometheus server>

  2. Copy the value for Endpoint – remote write URL from the Amazon Managed Service for Prometheus workspace console.

  1. Edit remote_write url in /etc/prometheus/conf/prometheus.yml:
sudo vi /etc/prometheus/conf/prometheus.yml

It should look like the following code:

  1. Now we need to restart the Prometheus server to pick up the changes:
sudo systemctl restart prometheus

Enable Amazon Managed Grafana to read from an Amazon Managed Service for Prometheus workspace

We need to add the Amazon Managed Prometheus workspace as a data source in Amazon Managed Grafana. You can skip directly to step 3 if you already have an existing Amazon Managed Grafana workspace and want to use it for HBase metrics.

  1. First, let’s create a workspace on Amazon Managed Grafana. You can follow the appendix to create a workspace using the Amazon Managed Grafana console or run the following API from your terminal (provide your role ARN):
aws grafana create-workspace \
--account-access-type CURRENT_ACCOUNT \
--authentication-providers AWS_SSO \
--permission-type CUSTOMER_MANAGED \
--workspace-data-sources PROMETHEUS \
--workspace-name emr-metrics \
--workspace-role-arn <role-ARN> \
--workspace-notification-destinations SNS
  1. On the Amazon Managed Grafana console, choose Configure users and select a user you want to allow to log in to Grafana dashboards.

Make sure your IAM Identity Center user type is admin. We need this to create dashboards. You can assign the viewer role to all the other users.

  1. Log in to the Amazon Managed Grafana workspace URL using your admin credentials.
  2. Choose AWS Data Sources in the navigation pane.

  1. For Service, choose Amazon Managed Service for Prometheus.

  1. For Regions, choose US East (N. Virginia).

Create an HBase dashboard

Grafana labs has an open-source dashboard that you can use. For example, you can follow the guidance from the following HBase dashboard. Start creating your dashboard and chose the import option. Provide the URL of the dashboard or enter 12722 and choose Load. Make sure your Prometheus workspace is selected on the next page. You should see HBase metrics showing up on the dashboard.

Key HBase metrics to monitor

HBase has a wide range of metrics for HMaster and RegionServer. The following are a few important metrics to keep in mind.

HMASTER Metric Name Metric Description
. hadoop_HBase_numregionservers Number of live region servers
. hadoop_HBase_numdeadregionservers Number of dead region servers
. hadoop_HBase_ritcount Number of regions in transition
. hadoop_HBase_ritcountoverthreshold Number of regions that have been in transition longer than a threshold time (default: 60 seconds)
. hadoop_HBase_ritduration_99th_percentile Maximum time taken by 99% of the regions to remain in transition state
REGIONSERVER Metric Name Metric Description
. hadoop_HBase_regioncount Number of regions hosted by the region server
. hadoop_HBase_storefilecount Number of store files currently managed by the region server
. hadoop_HBase_storefilesize Aggregate size of the store files
. hadoop_HBase_hlogfilecount Number of write-ahead logs not yet archived
. hadoop_HBase_hlogfilesize Size of all write-ahead log files
. hadoop_HBase_totalrequestcount Total number of requests received
. hadoop_HBase_readrequestcount Number of read requests received
. hadoop_HBase_writerequestcount Number of write requests received
. hadoop_HBase_numopenconnections Number of open connections at the RPC layer
. hadoop_HBase_numactivehandler Number of RPC handlers actively servicing requests
Memstore . .
. hadoop_HBase_memstoresize Total memstore memory size of the region server
. hadoop_HBase_flushqueuelength Current depth of the memstore flush queue (if increasing, we are falling behind with clearing memstores out to Amazon S3)
. hadoop_HBase_flushtime_99th_percentile 99th percentile latency for flush operation
. hadoop_HBase_updatesblockedtime Number of milliseconds updates have been blocked so the memstore can be flushed
Block Cache . .
. hadoop_HBase_blockcachesize Block cache size
. hadoop_HBase_blockcachefreesize Block cache free size
. hadoop_HBase_blockcachehitcount Number of block cache hits
. hadoop_HBase_blockcachemisscount Number of block cache misses
. hadoop_HBase_blockcacheexpresshitpercent Percentage of the time that requests with the cache turned on hit the cache
. hadoop_HBase_blockcachecounthitpercent Percentage of block cache hits
. hadoop_HBase_blockcacheevictioncount Number of block cache evictions in the region server
. hadoop_HBase_l2cachehitratio Local disk-based bucket cache hit ratio
. hadoop_HBase_l2cachemissratio Bucket cache miss ratio
Compaction . .
. hadoop_HBase_majorcompactiontime_99th_percentile Time in milliseconds taken for major compaction
. hadoop_HBase_compactiontime_99th_percentile Time in milliseconds taken for minor compaction
. hadoop_HBase_compactionqueuelength Current depth of the compaction request queue (if increasing, we are falling behind with storefile compaction)
. flush queue length Number of flush operations waiting to be processed in the region server (a higher number indicates flush operations are slow)
IPC Queues . .
. hadoop_HBase_queuesize Total data size of all RPC calls in the RPC queues in the region server
. hadoop_HBase_numcallsingeneralqueue Number of RPC calls in the general processing queue in the region server
. hadoop_HBase_processcalltime_99th_percentile 99th percentile latency for RPC calls to be processed in the region server
. hadoop_HBase_queuecalltime_99th_percentile 99th percentile latency for RPC calls to stay in the RPC queue in the region server
JVM and GC . .
. hadoop_HBase_memheapusedm Heap used
. hadoop_HBase_memheapmaxm Total heap
. hadoop_HBase_pausetimewithgc_99th_percentile Pause time in milliseconds
. hadoop_HBase_gccount Garbage collection count
. hadoop_HBase_gctimemillis Time spent in garbage collection, in milliseconds
Latencies . .
. HBase.regionserver.<op>_<measure> Operation latencies, where <op> is Append, Delete, Mutate, Get, Replay, or Increment, and <measure> is min, max, mean, median, 75th_percentile, 95th_percentile, or 99th_percentile
. HBase.regionserver.slow<op>Count Number of operations we thought were slow, where <op> is one of the preceding list
Bulk Load . .
. Bulkload_99th_percentile hadoop_HBase_bulkload_99th_percentile
I/O . .
. FsWriteTime_99th_percentile hadoop_HBase_fswritetime_99th_percentile
. FsReadTime_99th_percentile hadoop_HBase_fsreadtime_99th_percentile
Exceptions . .
. exceptions.RegionTooBusyException .
. exceptions.callQueueTooBig .
. exceptions.NotServingRegionException .

Considerations and limitations

Note the following when using this solution:

  • You can set up alerts on Amazon Managed Service for Prometheus and visualize them in Amazon Managed Grafana.
  • This architecture can be easily extended to include other open-source frameworks such as Apache Spark, Apache Presto, and Apache Hive.
  • Refer to the pricing details for Amazon Managed Service for Prometheus and Amazon Managed Grafana.
  • These scripts are for guidance purposes only and aren’t ready for production deployments. Make sure to perform thorough testing.

Clean up

To avoid ongoing charges, delete the CloudFormation stack and workspaces created in Amazon Managed Grafana and Amazon Managed Service for Prometheus.

Conclusion

In this post, you learned how to monitor EMR HBase clusters and set up dashboards to visualize key metrics. This solution can serve as a unified monitoring platform for multiple EMR clusters and other applications. For more information on EMR HBase, see Release Guide and HBase Migration whitepaper.


Appendix

Complete the following steps to create a workspace on Amazon Managed Grafana:

  1. Log in to the Amazon Managed Grafana console and choose Create workspace.

  1. For Authentication access, select AWS IAM Identity Center.

If you don’t have IAM Identity Center enabled, refer to Enable IAM Identity Center.

  1. Optionally, to view Prometheus alerts in your Grafana workspace, select Turn Grafana alerting on.

  1. On the next page, select Amazon Managed Service for Prometheus as the data source.

  1. After the workspace is created, assign users to access Amazon Managed Grafana.

  1. For a first-time setup, assign admin privileges to the user.

You can add other users with only viewer access.

Make sure you are able to log in to the Grafana workspace URL using your IAM Identity Center user credentials.


About the Author

Anubhav Awasthi is a Sr. Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Introducing Amazon S3 shuffle in AWS Glue

Post Syndicated from Anubhav Awasthi original https://aws.amazon.com/blogs/big-data/introducing-amazon-s3-shuffle-in-aws-glue/

AWS Glue is a serverless data integration service that makes it easy to discover, prepare, and combine data for analytics, machine learning (ML), and application development. In AWS Glue, you can use Apache Spark, which is an open-source, distributed processing system for your data integration tasks and big data workloads. Apache Spark utilizes in-memory caching and optimized query execution for fast analytic queries against your datasets, which are split into multiple partitions so that you can execute different transformations in parallel.

Shuffling is an important step in a Spark job whenever data is rearranged between partitions. The groupByKey(), reduceByKey(), join(), and distinct() are some examples of wide transformations that can cause a shuffle. During a shuffle, data is written to disk and transferred across the network. As a result, the shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a No space left on device or MetadataFetchFailedException error when there is not enough disk space left on the executor and there is no recovery.

This post introduces a new Spark shuffle manager available in AWS Glue that disaggregates Spark compute and shuffle storage by utilizing Amazon Simple Storage Service (Amazon S3) to store Spark shuffle and spill files. Using Amazon S3 for Spark shuffle storage lets you run data-intensive workloads much more reliably.

Understanding the shuffle operation in AWS Glue

Spark creates physical plans for running your workflow, called Directed Acyclic Graphs (DAGs). The DAG represents a series of transformations on your dataset, each resulting in a new immutable RDD. All of the transformations in Spark are lazy, in that they are not computed until an action is called to generate results. There are two types of transformations:

  • Narrow transformation – Such as map, filter, union, and mapPartition, where each input partition contributes to only one output partition.
  • Wide transformation – Such as join, groupBykey, reduceByKey, and repartition, where each input partition contributes to many output partitions.

In Spark, a shuffle occurs whenever data is rearranged between partitions. This is required because the wide transformation needs information from other partitions in order to complete its processing. Spark gathers the required data from each partition and combines it into a new partition. During a shuffle phase, all Spark map tasks write shuffle data to a local disk that is then transferred across the network and fetched by Spark reduce tasks. With AWS Glue, workers write shuffle data on local disk volumes attached to the AWS Glue workers.

In addition to shuffle writes, Spark uses local disk to spill data from memory that exceeds the heap space defined by the spark.memory.fraction configuration parameter. Shuffle spill (memory) is the size of the de-serialized form of the data in the memory at the time when the worker spills it. Whereas shuffle spill (disk) is the size of the serialized form of the data on disk after the worker has spilled.

Challenges

Spark uses local disk for storing intermediate shuffle and shuffle spills. This introduces the following key challenges:

  • Hitting local storage limits – If you have a Spark job that computes transformations over a large amount of data, and results in either too much spill or shuffle or both, then you might get a failed job with  java.io.IOException: No space left on device exception if the underlying storage has filled up.
  • Co-location of storage with executors – If an executor is lost, then shuffle files are lost as well. This leads to several task and stage retries, as Spark tries to recompute stages in order to recover lost shuffle data. Spark natively provides an external shuffle service that lets it store shuffle data independent to the life of executors. But the shuffle service itself becomes a point of failure and must always be up in order to serve shuffle data. Additionally, shuffles are still stored on local disk, which might run out of space for a large job.

To illustrate one of the preceding scenarios, let’s use the query q80.sql from the standard TPC-DS 3 TB dataset as an example. This query attempts to calculate the total sales, returns, and eventual profit realized during a specific time frame. It involves multiple wide transformations (shuffles) caused by left outer join, group by, and union all. Let’s run the following query with 10 G1.x AWS Glue DPU (data processing unit). For the G.1X worker type, each worker maps to 1 DPU and 1 executor. 10 G1.x workers account for a total of 640GB of disk space. See the following sql query:

with ssr as
 (select  s_store_id as store_id,
          sum(ss_ext_sales_price) as sales,
          sum(coalesce(sr_return_amt, 0)) as returns,
          sum(ss_net_profit - coalesce(sr_net_loss, 0)) as profit
  from store_sales left outer join store_returns on
         (ss_item_sk = sr_item_sk and ss_ticket_number = sr_ticket_number),
     date_dim, store, item, promotion
 where ss_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
       and ss_store_sk = s_store_sk
       and ss_item_sk = i_item_sk
       and i_current_price > 50
       and ss_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by s_store_id),
 csr as
 (select  cp_catalog_page_id as catalog_page_id,
          sum(cs_ext_sales_price) as sales,
          sum(coalesce(cr_return_amount, 0)) as returns,
          sum(cs_net_profit - coalesce(cr_net_loss, 0)) as profit
  from catalog_sales left outer join catalog_returns on
         (cs_item_sk = cr_item_sk and cs_order_number = cr_order_number),
     date_dim, catalog_page, item, promotion
 where cs_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and cs_catalog_page_sk = cp_catalog_page_sk
       and cs_item_sk = i_item_sk
       and i_current_price > 50
       and cs_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by cp_catalog_page_id),
 wsr as
 (select  web_site_id,
          sum(ws_ext_sales_price) as sales,
          sum(coalesce(wr_return_amt, 0)) as returns,
          sum(ws_net_profit - coalesce(wr_net_loss, 0)) as profit
  from web_sales left outer join web_returns on
         (ws_item_sk = wr_item_sk and ws_order_number = wr_order_number),
     date_dim, web_site, item, promotion
 where ws_sold_date_sk = d_date_sk
       and d_date between cast('2000-08-23' as date)
                  and (cast('2000-08-23' as date) + interval '30' day)
        and ws_web_site_sk = web_site_sk
       and ws_item_sk = i_item_sk
       and i_current_price > 50
       and ws_promo_sk = p_promo_sk
       and p_channel_tv = 'N'
 group by web_site_id)
 select channel, id, sum(sales) as sales, sum(returns) as returns, sum(profit) as profit
 from (select
        'store channel' as channel, concat('store', store_id) as id, sales, returns, profit
      from ssr
      union all
      select
        'catalog channel' as channel, concat('catalog_page', catalog_page_id) as id,
        sales, returns, profit
      from csr
      union all
      select
        'web channel' as channel, concat('web_site', web_site_id) as id, sales, returns, profit
      from  wsr) x
 group by rollup (channel, id)
 order by channel, id

The following screenshot shows the AWS Glue job run details from the Apache Spark web UI:

The job runs for about 1 hour and 25 minutes, then we start observing task failures. Spark ends up stopping the stage and canceling the job when the task retries also fail.

The following screenshots show the aggregated metrics for the failed stage, as well as how much data is spilled to disk by individual executors:

As seen in the Shuffle Write metric from the above Spark UI screenshot, all 10 workers shuffle over 50 GB of data. Further writes aren’t allowed, and tasks start failing with a “No space left on device” error.

The remaining storage is occupied by data that is spilled to disk, as seen in the Shuffle Spill (Disk) metric from the above Spark UI screenshot. This failed job is a classic example of a data-intensive transformation where Spark is both shuffling and spilling to disk when executor memory is filled.

Solution overview

We have various methods for overcoming the disk space error:

  • Scale out – Increase the number of workers. This incurs an increase in cost. However, scaling out might not always work, especially if your data is heavily skewed on a few keys. Fixing skewness will require considerable modifications to your Spark application logic.
  • Increase shuffle partitions – Increasing the shuffle partitions can sometimes help overcome space errors. However, this might not always work, and therefore is unreliable.
  • Disaggregate compute and storage – This approach presents several of the advantages of not only scaling storage for large shuffles, but also adding reliability in the event of node failures because shuffle data is independently stored. Following are few implementations of this disaggregated approach:
    • Dedicated intermediate storage cluster – In this approach, you use an additional fleet of shuffle services to serve intermediate shuffle. It has several advantages, such as merging shuffle files and sequential I/O, but it introduces an overhead of fleet maintenance from both operations, as well as a cost standpoint. For examples of this approach, see Cosco: An Efficient Facebook-Scale Shuffle Service and Zeus: Uber’s Highly Scalable and Distributed Shuffle as a Service.
    • Serverless storage – AWS Glue implements a different approach in which you utilize Amazon S3, a cost-effective managed and serverless storage, to store intermediate shuffle data. This design does not depend upon a dedicated daemon, such as shuffle service, to preserve shuffle files. This lets you elastically scale your Spark job without the overhead of running, operating, and maintaining additional storage or compute nodes.

With AWS Glue 2.0, you can now use Amazon S3 to store Spark shuffle and spill data. Amazon S3 is an object storage service that offers industry-leading scalability, data availability, security, and performance. This gives complete elasticity to Spark jobs, thereby allowing you to run your most data intensive workloads reliably.

The following diagram illustrates how Spark map tasks write the shuffle and spill files to the given Amazon S3 shuffle bucket. Reducer tasks consider the shuffle blocks as remote blocks and read them from the same shuffle bucket.

Use Amazon S3 to store shuffle and spill data

The following job parameters enable and tune Spark to use S3 buckets for storing shuffle and spill data. You can also enable at-rest encryption when writing shuffle data to Amazon S3 by using security configuration settings.

Key  Value  Explanation
–write-shuffle-files-to-s3 TRUE This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data.
–write-shuffle-spills-to-s3 TRUE This is an optional flag that lets you offload spill files to S3 buckets, which provides additional resiliency to your Spark job. This is only required for large workloads that spill a lot of data to disk. This flag is disabled by default.
–conf spark.shuffle.glue.s3ShuffleBucket=S3://<shuffle-bucket> This is also optional, and it specifies the S3 bucket where we write the shuffle files. By default, we use —TempDir/shuffle-data.

You can also use the AWS Glue Studio console to enable Amazon S3 based shuffle or spill. You can choose the preceding properties from pre-populated options in the Job parameters section.

Results

Let’s run the same q80.sql query with Amazon S3 shuffle enabled. We can view the shuffle files stored in the S3 bucket in the following format:

shuffle_<jobid>_<mapperid>_<reducerid>.data/index

Two kinds of files are created:

  • Data – Stores the shuffle output of the current task
  • Index – Stores the classification information of the data in the data file by storing partition offsets

The following screenshots shows example shuffle directories and shuffle files:

The following screenshot shows the aggregated metrics from the Spark UI:

The following are a few key highlights:

  • q80.sql, which had failed earlier after 1 hour and 25 minutes, and was able to complete only 13 out of 18 stages, finished successfully in about 2 hours and 53 minutes, completing all 18 stages.
  • We were able to shuffle 479.7 GB of data without worrying about storage limits.
  • Additional workers aren’t required to scale storage, which provides substantial cost savings.

Considerations and best practices

Keep in mind the following best practices when considering this solution:

  • This feature is recommended when you want to ensure the reliable execution of your data intensive workloads that create a large amount of shuffle or spill data. Writing and reading shuffle files from Amazon S3 is marginally slower when compared to local disk for our experiments with TPC-DS queries. S3 shuffle performance would be impacted by the number and size of shuffle files. For example, S3 could be slower for reads as compared to local storage if you have a large number of small shuffle files or partitions in your Spark application.
  • You can use this feature if your job frequently suffers from No space left on device issues.
  • You can use this feature if your job frequently suffers fetch failure issues (org.apache.spark.shuffle.MetadataFetchFailedException).
  • You can use this feature if your data is skewed.
  • We recommend setting the S3 bucket lifecycle policies on the shuffle bucket (spark.shuffle.glue.s3ShuffleBucket) in order to clean up old shuffle data.
  • At the time of writing this blog, this feature is currently available on AWS Glue 2.0 and Spark 2.4.3.

Conclusion

This post discussed how we can independently scale storage in AWS Glue without adding additional workers. With this feature, you can expect jobs that are processing terabytes of data to run much more reliably. Happy shuffling!


About the Authors

Anubhav Awasthi is a Big Data Specialist Solutions Architect at AWS. He works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation.

Rajendra Gujja is a Software Development Engineer on the AWS Glue team. He is passionate about distributed computing and everything and anything to do with the data.

Mohit Saxena is a Software Engineering Manager on the AWS Glue team. His team works on distributed systems for efficiently managing data lakes on AWS and optimizes Apache Spark for performance and reliability.