Tag Archives: Amazon EMR

How Viasat scaled their big data applications by migrating to Amazon EMR

Post Syndicated from Manoj Gundawar original https://aws.amazon.com/blogs/big-data/how-viasat-scaled-their-big-data-applications-by-migrating-to-amazon-emr/

This post is co-written with Manoj Gundawar from Viasat.

Viasat is a satellite internet service provider based in Carlsbad, CA, with operations across the United States and worldwide. Viasat’s ambition is to be the first truly global, scalable, broadband service provider with a mission to deliver connections that can change the world. Viasat operates across three main business segments: satellite services, commercial networks, and government systems, providing high-speed satellite broadband services and secure networking systems.

In this post, we discuss how migrating our on-premises big data workloads to Amazon EMR helped us achieve a fully managed cloud-native solution and freed us from the constraints of our legacy on-premises solution, so we can focus on business innovation with a lower TCO.

Challenge with the legacy big data environment

Viasat’s big data application, Usage Data Mart, is a high-volume, low-latency Hadoop-based solution that ingests internet usage data from a multitude of source systems. It curates and aggregates with data from other sources, and provides the data in an optimized system to support high-volume access to reporting and web interfaces. This critical application processes over 1.3 billion internet usage records per day, sourced from various upstream systems. Viasat customer support representatives use this data through APIs and reports to assist end-users on any queries regarding their internet usage. Various internal teams and customers also use data for usage accounting, tracking, compliance, and billing purposes.

The Usage Data Mart was previously implemented in an on-premises footprint of over 40 nodes with three independent clusters all running a commercial distribution of Hadoop to process our big data work load. The legacy Hadoop environment required three times the data replication to achieve high availability, which resulted in a large infrastructure footprint. In this architecture, we had a data ingestion cluster to ingest data from a Kafka streaming service, MySQL, and Oracle databases. We had extract, transform, and load (ETL) jobs for each data source and data domains or models, and most of the ETL jobs filter, curate, canonicalize, and aggregate data (by specific keys) using MapReduce framework and load in HBase as well as in HDFS or Amazon Simple Storage Service (Amazon S3) in Apache Parquet format. We used HBase to provide on-demand query and aggregation of data via REST APIs that used HBase coprocessors to apply aggregation and other business logic. Our independent reporting cluster queried Parquet files on HDFS and Amazon S3 with Apache Drill to generate a few dozen periodic reports. We had separated HBase and ETL clusters from reporting clusters to avoid any resource contention.

Our legacy environment had challenges at various levels:

  • Hardware – As the hardware aged, we encountered hardware failures on a regular basis. Expiring warranties on hardware and faulty component replacement increased operational risk. The burden of managing the hardware replacement and servers failing to restart after maintenance imposed a serious risk to the business, which made maintenance unpredictable and time-consuming.
  • Software – Yearly software license renewal costs and engineering efforts involved with software version upgrades to keep it on a supported version added operational complexity. Moreover, the commercial Hadoop distribution that we were running reached end of life in 2020.
  • Scaling – We needed to scale on-premises hardware to meet growing business needs during additional satellite launches that required forecasting and capacity planning. Delays in procurement and shipment of hardware affected project timelines. Finally, increasing data usage and customer adoption of this portal presented serious scaling challenges for Viasat.

How migration to Amazon EMR helped solve this challenge

Viasat evaluated a few alternatives to modernize big data applications and determined that Amazon EMR would be the right platform for our requirements. The separate compute and storage architecture of Amazon EMR helped us address our challenges. The following are some of the key benefits we realized with migration to AWS:

  • Storage – Amazon EMR supports HBase on Amazon S3, where Amazon S3 is used as persistent storage for the HBase cluster and allows us to scale our compute needs independently of storage. It also allows us to easily decommission HBase clusters, test upgrades, and optimizes our total cost of ownership. For guidelines and best practices, see Migrating to Apache HBase on Amazon S3 on Amazon EMR.
  • DNS records – Because it’s so easy for us to provision new HBase clusters, we needed a way to maintain DNS records to make the cluster easily accessible. We use a simple AWS Lambda function to update DNS records during EMR cluster boot up. The DNS records are stored in our custom DNS service and we use a custom API to update the records.
  • Customization – Amazon EMR allows you to customize existing software on the cluster as well as install additional software. We use Apache Drill for reporting and, with EMR bootstrap actions, we were able to install Apache Drill on all the nodes of the reporting cluster to provide us with distributed reporting using Parquet files generated with a MapReduce job. Because our reporting uses different query patterns than the data in the HBase cluster, the Parquet files are written with a different partition optimized for reporting. We increased the default bucket size from approximately 8 MB to 16 MB and pointed Amazon EMR to private Amazon S3 endpoints to avoid traffic going through an external firewall, which increased performance.

The following diagram depicts the process we followed for migration to Amazon EMR.

Viasat successfully migrated our on-premises big data applications to Amazon EMR in May 2021, following a lift-and-shift approach to move the big data workload to the cloud with minimal changes. Although we have an experienced big data team, we used AWS Infrastructure Event Management (IEM) to support queries on fine-tuning the Amazon EMR infrastructure within the migration timeline.

The following diagram outlines our new architecture. With this new architecture, we can process the same workloads with 50% of the compute footprint and approximately 50% of the costs compared to our on-premises clusters and still meet our SLAs.

Conclusion

With the new data platform, we can ingest additional data sources so that our analysts and customer service teams can gain insights and improve customer experience. As Viasat is launching new satellites and growing business in multiple new countries, we’re looking to stand up this solution in new AWS Regions (closest to the host country) and scale it as needed.

Questions or feedback? Send an email to [email protected].


About the Authors

Manoj Gundawar is a product owner at Viasat. He builds product roadmap, provides architecture guidance and manages full software development life cycle to build high quality product/software with minimal TCO. He is passionate about delighting the customers by providing innovating solutions, leveraging technology, agile methodology and continues improvement mindset.

Archana Srinivasan is a Technical Account Manager within Enterprise Support at Amazon Web Services. Archana helps AWS customers leverage Enterprise Support entitlements to solve complex operational challenges and accelerate their cloud adoption.

Kiran Guduguntla is a WW Go-to-Market Specialist for Amazon EMR at AWS. He works with AWS customers across the globe to strategize, build, develop and deploy modern Big Data solutions. He is passionate about working with customers and helping them in their cloud journey. Kiran loves music, travel, food and watching football.

Authorize SparkSQL data manipulation on Amazon EMR using Apache Ranger

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/authorize-sparksql-data-manipulation-on-amazon-emr-using-apache-ranger/

With Amazon EMR 5.32, Amazon EMR introduced Apache Ranger 2.0 support, which allows you to enable authorization and audit capabilities for Apache Spark, Amazon Simple Storage Service (Amazon S3), and Apache Hive. It also enabled authorization audits to be logged in Amazon CloudWatch. However, although you could control Apache Spark writes to Amazon S3 with these authorization capabilities, SparkSQL support was limited to only read authorization.

We’re happy to announce that with Amazon EMR 6.4, Apache Ranger SparkSQL integration supports authorizing capabilities for data manipulation statements (DML). You can now authorize INSERT INTO, INSERT OVERWRITE, and ALTER statements for SparkSQL using Apache Ranger policies.

Architecture overview

Amazon EMR support for Apache SparkSQL is implemented using the Amazon EMR record server, which reads Apache Ranger policy definitions, evaluates access, and filters data before passing the data back to the individual Spark executors.

The following image shows the high-level architecture.

Implementation details

Before you begin, set up your Apache Ranger and EMR cluster. For instructions, see Introducing Amazon EMR integration with Apache Ranger. If you have an existing installation on Apache Ranger server with Apache Spark service definitions deployed, use the following code to redeploy the service definitions:

# Get existing Spark service definition id calling Ranger REST API and JSON processor
curl --silent -f -u <admin_user_login>:<password_for_ranger_admin_user> \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef/name/amazon-emr-spark' | jq .id

# Download the latest Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-spark.json

# Update the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X PUT -d @ranger-servicedef-amazon-emr-spark.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef/<id-you-got from step1>'

Now that the service definition has been updated, let’s test the policies.

For our use case, assume you have an external Amazon S3 backed partitioned Hive table. You want to use a SparkSQL DML statement to insert data into the table.

Use the following code for a table definition:

CREATE EXTERNAL TABLE IF NOT EXISTS students_s3 (name VARCHAR(64), address VARCHAR(64)) 
PARTITIONED BY (student_id INT) 
STORED AS PARQUET
LOCATION 's3://xxxxxx/students_s3/'

You can now set up the authorization policies on Apache Ranger. The following screenshots illustrate this process.

Because the table is externally backed by Amazon S3, we first need to enable read and write access to the Amazon S3 location of the table. If the location is on HDFS, the URL should have the HDFS path—for example, hdfs://xxxx.

Next, we add SELECT, UPDATE, and ALTER permissions, allowing users to use the DML commands. Any update to the table metadata like statistics or partition information requires the ALTER permission.

After we set up these Apache Ranger policies, we can start testing the DML statements. The following code is an example of an INSERT INTO statement:

spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
spark.sql("INSERT INTO students_s3 VALUES ('Amy Smith', '123 Park Ave, San Jose', 231111)")
studentsSQL = spark.sql("select * from default.students_s3 where student_id=231111")
studentsSQL.show()

The following screenshot shows our results.

We can audit this action on CloudWatch, similar to other actions.

Limitations

Inserting data into a partition where the partition location is different from the table location is not currently supported. The partition location must always be a child directory of the main table location.

SQL statement/Ranger action STATUS Supported EMR release
SELECT Supported As of 5.32
SHOW DATABASES Supported As of 5.32
SHOW TABLES Supported As of 5.32
SHOW COLUMNS Supported As of 5.32
SHOW TABLE PROPERTIES Supported As of 5.32
DESCRIBE TABLE Supported As of 5.32
CREATE TABLE Not Supported
UPDATE TABLE Not Supported
INSERT OVERWRITE Supported As of 6.4
INSERT INTO Supported As of 6.4
ALTER TABLE Supported As of 6.4
DELETE FROM Not Supported

Available now

Amazon EMR support for SparkSQL statements INSERT INTO, INSERT OVERWRITE, and ALTER TABLE with Apache Ranger is available on Amazon EMR 6.4 in the following AWS Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (N. California)
  • US West (Oregon)
  • Africa (Cape Town)
  • Asia Pacific (Hong Kong)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Canada (Central)
  • Europe (Frankfurt)
  • Europe (Ireland)
  • Europe (London)
  • Europe (Paris)
  • Europe (Milan)
  • Europe (Stockholm)
  • South America (São Paulo)
  • Middle East (Bahrain)

For the latest Region availability, see the Amazon EMR Management Guide.

Conclusion

Amazon EMR 6.4 has introduced additional authorizing capabilities for data manipulation statements with Apache Ranger 2.0. You can use statements like INSERT INTO, INSERT OVERWRITE, and ALTER in SparkSQL and control authorization using Apache Ranger policies.

Related resources

To additional information, see the following resources:


About the Authors

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up gardening during the lockdown.

Jalpan Randeri is a Senior Software Engineer at AWS. He likes working on performance optimization and data access controls for big data systems. Outside work, he likes watching anime & playing video games.

How NortonLifelock built a serverless architecture for real-time analysis of their VPN usage metrics

Post Syndicated from Madhu Nunna original https://aws.amazon.com/blogs/big-data/how-nortonlifelock-built-a-serverless-architecture-for-real-time-analysis-of-their-vpn-usage-metrics/

This post presents a reference architecture and optimization strategies for building serverless data analytics solutions on AWS using Amazon Kinesis Data Analytics. In addition, this post shows the design approach that the engineering team at NortonLifeLock took to build out an operational analytics platform that processes usage data for their VPN services, consuming petabytes of data across the globe on a daily basis.

NortonLifeLock is a global cybersecurity and internet privacy company that offers services to millions of customers for device security, and identity and online privacy for home and family. NortonLifeLock believes the digital world is only truly empowering when people are confident in their online security. NortonLifeLock has been an AWS customer since 2014.

For any organization, the value of operational data and metrics decreases with time. This lost value can equate to lost revenue and wasted resources. Real-time streaming analytics helps capture this value and provide new insights that can create new business opportunities.

AWS offers a rich set of services that you can use to provide real-time insights and historical trends. These services include managed Hadoop infrastructure services on Amazon EMR as well as serverless options such as Kinesis Data Analytics and AWS Glue.

Amazon EMR also supports multiple programming options for capturing business logic, such as Spark Streaming, Apache Flink, and SQL.

As a customer, it’s important to understand organizational capabilities, project timelines, business requirements, and AWS service best practices in order to define an optimal architecture from performance, cost, security, reliability, and operational excellence perspectives (the five pillars of the AWS Well-Architected Framework).

NortonLifeLock is taking a methodical approach to real-time analytics on AWS while using serverless technology to deliver on key business drivers such as time to market and total cost of ownership. In addition to NortonLifeLock’s implementation, this post provides key lessons learned and best practices for rapid development of real-time analytics workloads.

Business problem

NortonLifeLock offers a VPN product as a freemium service to users. Therefore, they need to enforce usage limits in real time to stop freemium users from using the service when their usage is over the limit. The challenge for NortonLifeLock is to do this in a reliable and affordable fashion.

NortonLifeLock runs its VPN infrastructure in almost all AWS Regions. Migrating to AWS from smaller hosting vendors has greatly improved user experience and VPN edge server performance, including a reduction in connection latency, time to connect and connection errors, faster upload and download speed, and more stability and uptime for VPN edge servers.

VPN usage data is collected by VPN edge servers and uploaded to backend stats servers every minute and persisted in backend databases. The usage information serves multiple purposes:

  • Displaying how much data a device has consumed for the past 30 days.
  • Enforcing usage limits on freemium accounts. When a user exhausts their free quota, that user is unable to connect through VPN until the next free cycle.
  • Analyzing usage data by the internal business intelligence (BI) team based on time, marketing campaigns, and account types, and using this data to predict future growth, ability to retain users, and more.

Design challenge

NortonLifeLock had the following design challenges:

  • The solution must be able to simultaneously satisfy both real-time and batch analysis.
  • The solution must be economical. NortonLifeLock VPN has hundreds of thousands of concurrent users, and if a user’s usage information is persisted as it comes in, it results in tens of thousands of reads and writes per second and tens of thousands of dollars a month in database costs.

Solution overview

NortonLifeLock decided to split storage into two parts by storing usage data in Amazon DynamoDB for real-time access and in Amazon Simple Storage Service (Amazon S3) for analysis, which addresses real-time enforcement and BI needs. Kinesis Data Analytics aggregates and loads data to Amazon S3 and DynamoDB. With Amazon Kinesis Data Streams and AWS Lambda as consumers of Kinesis Data Analytics, the implementation of user and device-level aggregations was simplified.

To keep costs down, user usage data was aggregated by the hour and persisted in DynamoDB. This spread hundreds of thousands of writes over an hour and reduced DynamoDB cost by 30 times.

Although increasing aggregation might not be an option for other problem domains, it’s acceptable in this case because it’s not necessary to be precise to the minute for user usage, and it’s acceptable to calculate and enforce the usage limit every hour.

The following diagram illustrates the high-level architecture. The solution is broken into three logical parts:

  • End-users – Real-time queries from devices to display current usage information (how much data is used daily)
  • Business analysts – Query historical usage information through Amazon Athena to extract business insights
  • Usage limit enforcement – Usage data ingestion and aggregation in real time

The solution has the following workflow:

  1. Usage data is collected by a VPN edge server and sends it to the backend service through Application Load Balancer.
  2. A single usage data record sent by the VPN edge server contains usage data for many users. A stats splitter splits the message into individual usage stats per user and forwards the message to Kinesis Data Streams.
  3. Usage data is consumed by both the legacy stats processor and the new Apache Flink application developed and deployed on Kinesis Data Analytics.
  4. The Apache Flink application carries out the following tasks:
    1. Aggregate device usage data hourly and send the aggregated result to Amazon S3 and the outgoing Kinesis data stream, which is picked up by a Lambda function that persists the usage data in DynamoDB.
    2. Aggregate device usage data daily and send the aggregated result to Amazon S3.
    3. Aggregate account usage data hourly and forward the aggregated results to the outgoing data stream, which is picked up by a Lambda function that checks if account usage is over the limit for that account. If account usage is over the limit, the function forwards the account information to another Lambda function, via Amazon Simple Queue Service (Amazon SQS), to cut off access on that account.

Design journey

NortonLifeLock needed a solution that was capable of real-time streaming and batch analytics. Kinesis Data Analysis fits this requirement because of the following key features:

  • Real-time streaming and batch analytics for data aggregation
  • Fully managed with a pay-as-you-go model
  • Auto scaling

NortonLifeLock needed Kinesis Data Analytics to do the following:

  • Aggregate customer usage data per device hourly and send results to Kinesis Data Streams (ultimately to DynamoDB) and the data lake (Amazon S3)
  • Aggregate customer usage data per account hourly and send results to Kinesis Data Streams (ultimately to DynamoDB and Lambda, which enforces usage limit)
  • Aggregate customer usage data per device daily and send results to the data lake (Amazon S3)

The legacy system processes usage data from an incoming Kinesis data stream, and they plan to use Kinesis Data Analytics to consume and process production data from the same stream. As such, NortonLifeLock started with SQL applications on Kinesis Data Analytics.

First attempt: Kinesis Data Analytics for SQL

Kinesis Data Analytics with SQL provides a high-level SQL-based abstraction for real-time stream processing and analytics. It’s configuration driven and very simple to get started. NortonLifeLock was able to create a prototype from scratch, get to production, and process the production load in less than 2 weeks. The solution met 90% of the requirements, and there were alternates for the remaining 10%.

However, they started to receive “read limit exceeded” alerts from the source data stream, and the legacy application was read throttled. With Amazon Support’s help, they traced the issues to the drastic reversal of the Kinesis Data Analytics MillisBehindLatest metric in Kinesis record processing. This was correlated to the Kinesis Data Analytics auto scaling events and application restarts, as illustrated by the following diagram. The highlighted areas show the correlation between spikes due to autoscaling and reversal of MillisBehindLatest metrics.

Here’s what happened:

  • Kinesis Data Analytics for SQL scaled up KPU due to load automatically, and the Kinesis Data Analytics application was restarted (part of scaling up).
  • Kinesis Data Analytics for SQL supports the at least once delivery model and uses checkpoints to ensure no data loss. But it doesn’t support taking a snapshot and restoring from the snapshot after a restart. For more details, see Delivery Model for Persisting Application Output to an External Destination.
  • When the Kinesis Data Analytics for SQL application was restarted, it needed to reprocess data from the beginning of the aggregation window, resulting in a very large number of duplicate records, which led to a dramatic increase in the Kinesis Data Analytics MillisBehindLatest metric.
  • To catch up with incoming data, Kinesis Data Analytics started re-reading from the Kinesis data stream, which led to over-consumption of read throughput and the legacy application being throttled.

In summary, Kinesis Data Analytics for SQL’s duplicates record processing on restarts, no other means to eliminate duplicates, and limited ability to control auto scaling led to this issue.

Although they found Kinesis Data Analytics for SQL easy to get started, these limitations demanded other alternatives. NortonLifeLock reached out to the Kinesis Data Analytics team and discussed the following options:

  • Option 1 – AWS was planning to release a new service, Kinesis Data Analytics Studio for SQL, Python, and Scala, which addresses these limitations. But this service was still a few months away (this service is now available, launched May 27, 2021).
  • Option 2 – The alternative was to switch to Kinesis Data Analytics for Apache Flink, which also provides the necessary tools to address all their requirements.

Second attempt: Kinesis Data Analytics for Apache Flink

Apache Flink has a comparatively steep learning curve (we used Java for streaming analytics instead of SQL), and it took about 4 weeks to build the same prototype, deploy it to Kinesis Data Analytics, and test the application in production. NortonLifeLock had to overcome a few hurdles, which we document in this section along with the lessons learned.

Challenge 1: Too many writes to outgoing Kinesis data stream

The first thing they noticed was that the write threshold on the outgoing Kinesis data stream was greatly exceeded. Kinesis Data Analytics was attempting to write 10 times the amount of expected data to the data stream, with 95% of data throttled.

After a lengthy investigation, it turned out that having too much parallelism in the Kinesis Data Analytics application led to this issue. They had followed default recommendations and set parallelism to 12 and it scaled up to 16. This means that every hour, 16 separate threads were attempting to write to the destination data stream simultaneously, leading to massive contention and writes throttled. These threads attempted to retry continuously, until all records were written to the data stream. This resulted in 10 times the amount of data processing attempted, even though only one tenth of the writes eventually succeeded.

The solution was to reduce parallelism to 4 and disable auto scaling. In the preceding diagram, the percentage of throttled records dropped to 0 from 95% after they reduced parallelism to 4 in the Kinesis Data Analytics application. This also greatly improved KPU utilization and reduced Kinesis Data Analytics cost from $50 a day to $8 a day.

Challenge 2: Use Kinesis Data Analytics sink aggregation

After tuning parallelism, they still noticed occasional throttling by Kinesis Data Streams because of the number of records being written, not record size. To overcome this, they turned on Kinesis Data Analytics sink aggregation to reduce the number of records being written to the data stream, and the result was dramatic. They were able to reduce the number of writes by 1,000 times.

Challenge 3: Handle Kinesis Data Analytics Flink restarts and the resulting duplicate records

Kinesis Data Analytics applications restart because of auto scaling or recovery from application or task manager crashes. When this happens, Kinesis Data Analytics saves a snapshot before shutdown and automatically reloads the latest snapshot and picks up where the work was left off. Kinesis Data Analytics also saves a checkpoint every minute so no data is lost, guaranteeing exactly-once processing.

However, when the Kinesis Data Analytics application shut down in the middle of sending results to Kinesis Data Streams, it doesn’t guarantee exactly-once data delivery. In fact, Flink only guarantees at least once delivery to Kinesis Data Analytics sink, meaning that Kinesis Data Analytics guarantees to send a record at least once, which leads to duplicate records sent when Kinesis Data Analytics is restarted.

How were duplicate records handled in the outgoing data stream?

Because duplicate records aren’t handled by Kinesis Data Analytics when sinks do not have exactly-once semantics, the downstream application must deal with the duplicate records. The first question you should ask is whether it’s necessary to deal with the duplicate records. Maybe it’s acceptable to tolerate duplicate records in your application? This, however, is not an option for NortonLifeLock, because no user wants to have their available usage taken twice within the same hour. So, logic had to be built in the application to handle duplicate usage records.

To deal with duplicate records, you can employ a strategy in which the application saves an update timestamp along with the user’s latest usage. When a record comes in, the application reads existing daily usage and compares the update timestamp against the current time. If the difference is less than a configured window (50 minutes if the aggregation window is 60 minutes), the application ignores the new record because it’s a duplicate. It’s acceptable for the application to potentially undercount vs. overcount user usage.

How were duplicate records handled in the outgoing S3 bucket?

Kinesis Data Analytics writes temporary files in Amazon S3 before finalizing and removing them. When Kinesis Data Analytics restarts, it attempts to write new S3 files, and potentially leaves behind temporary S3 files because of restart. Because Athena ignores all temporary S3 files, no further is action needed. If your BI tools take temporary S3 files into consideration, you have to configure the Amazon S3 lifecycle policy to clean up temporary S3 files after a certain time.

Conclusion

NortonLifelock has been successfully running a Kinesis Data Analytics application in production since May 2021. It provides several key benefits. VPN users can now keep track of their usage in near-real time. BI analysts can get timely insights that are used for targeted sales and marketing campaigns, and upselling features and services. VPN usage limits are enforced in near-real time, thereby optimizing the network resources. NortonLifelock is saving tens of thousands of dollars each month with this real-time streaming analytics solution. And this telemetry solution is able to keep up with petabytes of data flowing through their global VPN service, which is seeing double-digit monthly growth.

To learn more about Kinesis Data Analytics and getting started with serverless streaming solutions on AWS, please see Developer Guide for Studio, the easiest way to build Apache Flink applications in SQL, Python, Scala in a notebook interface.


About the Authors

Lei Gu has 25 years of software development experience and the architect for three key Norton products, Norton Secure Backup, VPN and Norton Family. He is passionate about cloud transformation and most recently spoke about moving from Cassandra to Amazon DynamoDB at AWS re:Invent 2019. Check out his Linkedin profile at https://www.linkedin.com/in/leigu/.

Madhu Nunna is a Sr. Solutions Architect at AWS, with over 20 years of experience in networks and cloud, with the last two years focused on AWS Cloud. He is passionate about Analytics and AI/ML. Outside of work, he enjoys hiking and reading books on philosophy, economics, history, astronomy and biology.

Configure Amazon EMR Studio and Amazon EKS to run notebooks with Amazon EMR on EKS

Post Syndicated from Randy DeFauw original https://aws.amazon.com/blogs/big-data/configure-amazon-emr-studio-and-amazon-eks-to-run-notebooks-with-amazon-emr-on-eks/

Amazon EMR on Amazon EKS provides a deployment option for Amazon EMR that allows you to run analytics workloads on Amazon Elastic Kubernetes Service (Amazon EKS). This is an attractive option because it allows you to run applications on a common pool of resources without having to provision infrastructure. In addition, you can use Amazon EMR Studio to build analytics code running on Amazon EKS clusters. EMR Studio is a web-based, integrated development environment (IDE) using fully managed Jupyter notebooks that can be attached to any EMR cluster, including EMR on EKS. It uses AWS Single Sign-On (SSO) or a compatible identity provider (IdP) to log directly in to EMR Studio through a secure URL using corporate credentials.

Deploying EMR Studio to attach to EMR on EKS requires integrating several AWS services:

In addition, you need to install the following EMR on EKS components:

This post helps you build all the necessary components and stitch them together by running a single script. We also describe the architecture of this setup and how the components work together.

Architecture overview

With EMR on EKS, you can run Spark applications alongside other types of applications on the same Amazon EKS cluster, which improves resource allocation and simplifies infrastructure management. For more information about how Amazon EMR operates inside an Amazon EKS cluster, see New – Amazon EMR on Amazon Elastic Kubernetes Service (EKS). EMR Studio provides a web-based IDE that makes it easy to develop, visualize, and debug applications that run in EMR. For more information, see Amazon EMR Studio (Preview): A new notebook-first IDE experience with Amazon EMR.

Spark kernels are scheduled pods in a namespace in an Amazon EKS cluster. EMR Studio uses Jupyter Enterprise Gateway (JEG) to launch Spark kernels on Amazon EKS. A managed endpoint of type JEG is provisioned as a Kubernetes deployment in the EMR virtual cluster’s associated namespace and exposed as a Kubernetes service. Each EMR virtual cluster maps to a Kubernetes namespace registered with the Amazon EKS cluster; virtual clusters don’t manage physical compute or storage, but point to the Kubernetes namespace where the workload is scheduled. Each virtual cluster can have several managed endpoints, each with their own configured kernels for different use cases and needs. JEG managed endpoints provide HTTPS endpoints, serviced by an Application Load Balancer (ALB), that are reachable only from EMR Studio and self-hosted notebooks that are created within a private subnet of the Amazon EKS VPC.

The following diagram illustrates the solution architecture.

The managed endpoint is created in the virtual cluster’s Amazon EKS namespace (in this case, sparkns) and the HTTPS endpoints are serviced from private subnets. The kernel pods run with the job-execution IAM role defined in the managed endpoint. During managed endpoint creation, EMR on EKS uses the AWS Load Balancer Controller in the kube-system namespace to create an ALB with a target group that connects with the JEG managed endpoint in the virtual cluster’s Kubernetes namespace.

You can configure each managed endpoint’s kernel differently. For example, to permit a Spark kernel to use AWS Glue as their catalog, you can apply the following configuration JSON file in the —configuration-overrides flag when creating a managed endpoint:

aws emr-containers create-managed-endpoint \
--type JUPYTER_ENTERPRISE_GATEWAY \
--virtual-cluster-id ${virtclusterid} \
--name ${virtendpointname} \
--execution-role-arn ${role_arn} \
--release-label ${emr_release_label} \
--certificate-arn ${certarn} \
--region ${region} \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults",
        "properties": {
          "spark.hadoop.hive.metastore.client.factory.class": "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory",
          "spark.sql.catalogImplementation": "hive"
        }
      }
    ]
  }'

The managed endpoint is a Kubernetes deployment fronted by a service inside the configured namespace (in this case, sparkns). When we trace the endpoint information, we can see how the Jupyter Enterprise Gateway deployment connects with the ALB and the target group:

# Get the endpoint ID
aws emr-containers list-managed-endpoints --region us-east-1 --virtual-cluster-id idzdhw2qltdr0dxkgx2oh4bp1
{
    "endpoints": [
        {
            "id": "5vbuwntrbzil1",
            "name": "virtual-emr-endpoint-demo",
            ...
            "serverUrl": "https://internal-k8s-default-ingress5-4f482e2d41-2097665209.us-east-1.elb.amazonaws.com:18888",

# List the deployment
kubectl get deployments -n sparkns -l "emr-containers.amazonaws.com/managed-endpoint-id=5vbuwntrbzil1"

NAME                READY   UP-TO-DATE   AVAILABLE   AGE
jeg-5vbuwntrbzil1   1/1     1            1           4h54m


# List the service
kubectl get svc -n sparkns -l "emr-containers.amazonaws.com/managed-endpoint-id=5vbuwntrbzil1"

NAME                    TYPE       CLUSTER-IP       EXTERNAL-IP   PORT(S)           AGE
service-5vbuwntrbzil1   NodePort   10.100.172.157   <none>        18888:30091/TCP   4h58m

# List the TargetGroups to get the TargetGroup ARN

kubectl get targetgroupbinding -n sparkns -o json | jq .items | jq .[].spec.targetGroupARN

"arn:aws:elasticloadbalancing:us-east-1:< account id >:targetgroup/k8s-sparkns-servicey-a37caa5e1e/02d10652a64cebd8"

# Get the TargetGroup Port number

aws elbv2 describe-target-groups --target-group-arns arn:aws:elasticloadbalancing:us-east-1:< account id >:targetgroup/k8s-sparkns-servicey-a37caa5e1e/02d10652a64cebd8 | jq .TargetGroups | jq .[].Port

30091


# Get Load Balancer ARN

aws elbv2 describe-target-groups --target-group-arns arn:aws:elasticloadbalancing:us-east-1:< account id >:targetgroup/k8s-sparkns-servicey-a37caa5e1e/02d10652a64cebd8 | jq .TargetGroups | jq .[].LoadBalancerArns | jq .[]

"arn:aws:elasticloadbalancing:us-east-1:< account id >:loadbalancer/app/k8s-sparkns-ingressy-830efa48aa/12199b1a7baee273"

# Get Listener Port number

aws elbv2 describe-listeners --load-balancer-arn arn:aws:elasticloadbalancing:us-east-1:< account id >:loadbalancer/app/k8s-sparkns-ingressy-830efa48aa/12199b1a7baee273 | jq .Listeners | jq .[].Port

18888

To look at how this connects, consider two EMR Studio sessions. The ALB exposes port 18888 to the EMR Studio sessions. The JEG service maps the external port 18888 on the ALB to the dynamic NodePort on the JEG service (in this case, 30091). The JEG service forwards the traffic to the TargetPort 9547, which routes the traffic to the appropriate Spark driver pod. Each notebook session has its own kernel, which has its own respective Spark driver and executor pods, as the following diagram illustrates.

Attach EMR Studio to a virtual cluster and managed endpoint

Each time a user attaches a virtual cluster and a managed endpoint to their Studio Workspace and launches a Spark session, Spark drivers and Spark executors are scheduled. You can see that when you run kubectl to check what pods were launched:

$ kubectl get all -l app=enterprise-gateway
NAME                                  READY   STATUS      RESTARTS   AGE
pod/kb1a317e8-b77b-448c-9b7d-exec-1   1/1     Running     0          2m30s
pod/kb1a317e8-b77b-448c-9b7d-exec-2   1/1     Running     0          2m30s
pod/kb1a317e8-b77b-448c-9b7d-driver   2/2     Running     0          2m38s

$ kubectl get pods -n sparkns
NAME                                  READY   STATUS      RESTARTS   AGE
jeg-5vbuwntrbzil1-5fc8469d5f-pfdv9    1/1     Running     0          3d7h
kb1a317e8-b77b-448c-9b7d-exec-1       1/1     Running     0          2m38s
kb1a317e8-b77b-448c-9b7d-exec-2       1/1     Running     0          2m38s
kb1a317e8-b77b-448c-9b7d-driver       2/2     Running     0          2m46s

Each notebook Spark kernel session deploys a driver pod and executor pods that continue running until the kernel session is shut down.

The code in the notebook cells runs in the executor pods that were deployed in the Amazon EKS cluster.

Set up EMR on EKS and EMR Studio

Several steps and pieces are required to set up both EMR on EKS and EMR Studio. Enabling AWS SSO is a prerequisite. You can use the two provided launch scripts in this section or manually deploy it using the steps provided later in this post.

We provide two launch scripts in this post. One is a bash script that uses AWS CloudFormation, eksctl, and AWS Command Line Interface (AWS CLI) commands to provide an end-to-end deployment of a complete solution. The other uses the AWS Cloud Development Kit (AWS CDK) to do so.

The following diagram shows the architecture and components that we deploy.

Prerequisites

Make sure to complete the following prerequisites:

For information about the supported IdPs, see Enable AWS Single Sign-On for Amazon EMR Studio.

Bash script

The script is available on GitHub.

Prerequisites

The script requires you to use AWS Cloud9. Follow the instructions in the Amazon EKS Workshop. Make sure to follow these instructions carefully:

After you deploy the AWS Cloud9 desktop, proceed to the next steps.

Preparation

Use the following code to clone the GitHub repo and prepare the AWS Cloud9 prerequisites:

# Download script from the repository
$ git clone https://github.com/aws-samples/amazon-emr-on-eks-emr-studio.git

# Prepare the Cloud9 Desktop pre-requisites
$ cd amazon-emr-on-eks-emr-studio
$ bash ./prepare_cloud9.sh

Deploy the stack

Before running the script, provide the following information:

  • The AWS account ID and Region, if your AWS Cloud9 desktop isn’t in the same account ID or Region where you want to deploy EMR on EKS
  • The name of the Amazon Simple Storage Service (Amazon S3) bucket to create
  • The AWS SSO user to be associated with the EMR Studio session

After the script deploys the stack, the URL to the deployed EMR Studio is displayed:

# Launch the script and follow the instructions to provide user parameters
$ bash ./deploy_eks_cluster_bash.sh

...
Go to https://***. emrstudio-prod.us-east-1.amazonaws.com and login using < SSO user > ...

AWS CDK script

The AWS CDK scripts are available on GitHub. You need to checkout the main branch. The stacks deploy an Amazon EKS cluster and EMR on EKS virtual cluster in a new VPC with private subnets, and optionally an Amazon Managed Apache Airflow (Amazon MWAA) environment and EMR Studio.

Prerequisites

You need the AWS CDK version 1.90.1 or higher. For more information, see Getting started with the AWS CDK.

We use a prefix list to restrict access to some resources to network IP ranges that you approve. Create a prefix list if you don’t already have one.

If you plan to use EMR Studio, you need AWS SSO configured in your account.

Preparation

After you clone the repository and checkout the main branch, create and activate a new Python virtual environment:

# Clone the repository
$ git clone https://github.com/aws-samples/aws-cdk-for-emr-on-eks.git
$ cd aws-cdk-for-emr-on-eks/
$ git checkout main

# 
$ python3 -m venv .venv
$ source .venv/bin/activate

Now install the Python dependencies:

$ pip install -r requirements.txt

Lastly, bootstrap the AWS CDK:

$ cdk bootstrap aws://<account>/<region> \
  --context prefix=<prefix list> \
  --context instance=m5.xlarge \
  --context username=<SSO user name>

Deploy the stacks

Synthesize the AWS CDK stacks with the following code:

$ cdk synth \
  --context prefix=<prefix list> \
  --context instance=m5.xlarge \
  --context username=<SSO user name>

This command generates four stacks:

  • emr-eks-cdk – The main stack
  • mwaa-cdk – Adds Amazon MWAA
  • studio-cdk – Adds EMR Studio prerequisites
  • studio-cdk-live – Adds EMR Studio

The following diagram illustrates the resources deployed by the AWS CDK stacks.

Start by deploying the first stack:

$ cdk deploy <stack name> \
  --context prefix=<prefix list> \
  --context instance=m5.xlarge  \
  --context username=<SSO user name> \
  emr-eks-cdk

If you want to use Apache Airflow as your orchestrator, deploy that stack:

$ cdk deploy <stack name> \
  --context prefix=<prefix list> \
  --context instance=m5.xlarge \
  --context username=<SSO user name> \
  mwaa-cdk

Deploy the first EMR Studio stack:

$ cdk deploy <stack name> \
  --context prefix=<prefix list> \
  --context instance=m5.xlarge \
  --context username=<SSO user name> \
  studio-cdk

Wait for the managed endpoint to become active. You can check the status by running the following code:

$ aws emr-containers list-managed-endpoints --virtual-cluster-id <cluster ID> | jq '.endpoints[].state'

The virtual cluster ID is available in the AWS CDK output from the emr-eks-cdk stack.

When the endpoint is active, deploy the second EMR Studio stack:

$ cdk deploy <stack name> \
  --context prefix=<prefix list> \
  --context instance=m5.xlarge \
  --context username=<SSO user name> \
  studio-live-cdk

Manual deployment

If you prefer to manually deploy EMR on EKS and EMR Studio, use the steps in this section.

Set up a VPC

If you’re using Amazon EKS v. 1.18, set up a VPC that also has private subnets and appropriately tagged for external load balancers. For tagging, see: Application load balancing on Amazon EKS and Create an EMR Studio service role.

Create an Amazon EKS cluster

Launch an Amazon EKS cluster with at least one managed node group. For instructions, see Setting up and Getting Started with Amazon EKS.

Create relevant IAM policies, roles, IdP, and SSL/TLS certificate

To create your IAM policies, roles, IdP, and SSL/TLS certificate, complete the following steps:

  1. Enable cluster access for EMR on EKS.
  2. Create an IdP in IAM based on the EKS OIDC provider URL.
  3. Create an SSL/TLS certificate and place it in AWS Certificate Manager.
  4. Create the relevant IAM policies and roles:
    1. Job execution role
    2. Update the trust policy for the job execution role
    3. Deploy and create the IAM policy for the AWS Load Balancer Controller
    4. EMR Studio service role
    5. EMR Studio user role
    6. EMR Studio user policies associated with AWS SSO users and groups
  5. Register the Amazon EKS cluster with Amazon EMR to create the virtual EMR cluster
  6. Create the appropriate security groups to be attached to each EMR Studio created:
    1. Workspace security group
    2. Engine security group
  7. Tag the security groups with the appropriate tags. For instructions, see Create an EMR Studio service role.

Required installs in Amazon EKS

Deploy the AWS Load Balancer Controller in the Amazon EKS cluster if you haven’t already done so.

Create EMR on EKS relevant pieces and map the user to EMR Studio

Complete the following steps:

  1. Create at least one EMR virtual cluster associated with the Amazon EKS cluster. For instructions, see Step 1 of Set up Amazon EMR on EKS for EMR Studio.
  2. Create at least one managed endpoint. For instructions, see Step 2 of Set up Amazon EMR on EKS for EMR Studio.
  3. Create at least one EMR Studio; associate the EMR Studio with the private subnets configured with the Amazon EKS cluster. For instructions, see Create an EMR Studio.
  4. When the EMR Studio is available, map an AWS SSO user or group to the EMR Studio and apply an appropriate IAM policy to that user.

Use EMR Studio

To start using EMR Studio, complete the following steps:

  1. Find the URL for EMR Studio by the studios in a Region:
$ aws emr list-studios --region us-east-1
{
    "Studios": [
        {
            "StudioId": "es-XXXXXXXXXXXXXXXXXXXXXX",
            "Name": "emr_studio_1",
            "VpcId": "vpc-XXXXXXXXXXXXXXXXXXXX",
            "Url": "https://es-XXXXXXXXXXXXXXXXXXXXXX.emrstudio-prod.us-east-1.amazonaws.com",
            "CreationTime": "2021-02-10T14:04:13.672000+00:00"
        }
    ]
}
  1. With the listed URL, log in using the AWS SSO username you used earlier.

After authentication, the user is routed to the EMR Studio dashboard.

  1. Choose Create Workspace.
  2. For Workspace name, enter a name.
  3. For Subnet, choose the subnet that corresponds to one of the subnets associated with the managed node group.
  4. For S3 location, enter an S3 bucket where you can store the notebook content.

  1. After you create the Workspace, choose one that is in the Ready status.

  1. In the sidebar, choose the EMR cluster icon.
  2. Under Cluster type¸ choose EMR Cluster on EKS.
  3. Choose the available virtual cluster and available managed endpoint.
  4. Choose Attach.

After it’s attached, EMR Studio displays the kernels available in the Notebook and Console section.

  1. Choose PySpark (Kubernetes) to launch a notebook kernel and start a Spark session.

Because the endpoint configuration here uses AWS Glue for its metastore, you can list the databases and tables connected to the AWS Glue Data Catalog. You can use the following example script to test the setup. Modify the script as necessary for the appropriate database and table that you have in your Data Catalog:

words='Welcome to Amazon EMR Studio'.split(' ')
wordRDD = sc.parallelize(words)
wc = wordRDD.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a+b)
print(wc.collect())

# Connect to Glue Catalog
spark.sql("""show databases like '< Database Name >'""").show(truncate=False)
spark.sql("""show tables in < Database Name >""").show(truncate=False)
# Run a simple select
spark.sql("""select * from < Database Name >.< Table Name > limit 10""").show(truncate=False)


Clean up

To avoid incurring future charges, delete the resources launched here by running remove_setup.sh:

# Launch the script
$ bash ./remove_setup.sh</p>

Conclusion

EMR on EKS allows you to run applications on a common pool of resources inside an Amazon EKS cluster without having to provision infrastructure. EMR Studio is a fully managed Jupyter notebook and tool that provisions kernels that run on EMR clusters, including virtual clusters on Amazon EKS. In this post, we described the architecture of how EMR Studio connects with EMR on EKS and provided scripts to automatically deploy all the components to connect the two services.

If you have questions or suggestions, please leave a comment.


About the Authors

Randy DeFauw is a Principal Solutions Architect at Amazon Web Services. He works with the AWS customers to provide guidance and technical assistance on database projects, helping them improve the value of their solutions when using AWS.

Matthew Tan is a Senior Analytics Solutions Architect at Amazon Web Services and provides guidance to customers developing solutions with AWS Analytics services on their analytics workloads.

Reduce costs and increase resource utilization of Apache Spark jobs on Kubernetes with Amazon EMR on Amazon EKS

Post Syndicated from Saurabh Bhutyani original https://aws.amazon.com/blogs/big-data/reduce-costs-and-increase-resource-utilization-of-apache-spark-jobs-on-kubernetes-with-amazon-emr-on-amazon-eks/

Amazon EMR on Amazon EKS is a deployment option for Amazon EMR that allows you to run Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS). If you run open-source Apache Spark on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and run Apache Spark up to three times faster. If you already use Amazon EMR, you can run Amazon EMR-based Apache Spark applications with other types of applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management.

Earlier this year, we launched support for pod templates in Amazon EMR on Amazon EKS to make it simpler to run Spark jobs on shared Amazon EKS clusters. A pod is a group of one or more containers, with shared storage and network resources, and a specification for how to run the containers. Pod templates are specifications that determine how each pod runs.

When you submit analytics jobs to a virtual cluster on Amazon EMR on EKS, Amazon EKS schedules the pods to execute the jobs. Your Amazon EKS cluster may have multiple node groups and instance types attached to it, and these pods could get scheduled on any of those Amazon Elastic Compute Cloud (Amazon EC2) instances. Organizations today have requirements to have better resource utilization, running jobs on specific instances based on instance type, amount of disk, disk IOPS, and more, and also control costs when jobs are submitted by multiple teams to a virtual cluster on Amazon EMR on EKS.

In this post, we look at support in Amazon EMR on EKS for Spark’s pod template feature and how to use that for resource isolation and controlling costs.

Pod templates have many uses cases:

  • Cost reduction – To reduce costs, you can schedule Spark driver pods to run on EC2 On-Demand Instances while scheduling Spark executor pods to run on EC2 Spot Instances.
  • Resource utilization – To increase resource utilization, you can support multiple teams running their workloads on the same Amazon EKS cluster. Each team gets a designated Amazon EC2 node group to run their workloads on. You can use pod templates to enforce scheduling on the relevant node groups.
  • Logging and monitoring – To improve monitoring, you can run a separate sidecar container to forward logs to your existing monitoring application.
  • Initialization – To run initialization steps, you can run a separate init container that is run before the Spark main container starts. You can have your init container run initialization steps, such as downloading dependencies or generating input data. Then the Spark main container consumes the data.

Prerequisites

To follow along with the walkthrough, ensure that you have the following resources created:

Solution overview

We look at a common use in organizations where multiple teams want to submit jobs and need resource isolation and cost reduction. In this post, we simulate two teams trying to submit jobs to the Amazon EMR on EKS cluster and see how to isolate the resources between them when running jobs. We also look at cost reduction by having the Spark driver run on EC2 On-Demand Instances while using Spark executors to run on EC2 Spot Instances. The following diagram illustrates this architecture.

To implement the solution, we complete the following high-level steps:

  1. Create an Amazon EKS cluster.
  2. Create an Amazon EMR virtual cluster.
  3. Set up IAM roles.
  4. Create pod templates.
  5. Submit Spark jobs.

Create an Amazon EKS cluster

To create your Amazon EKS cluster, complete the following steps:

  1. Create a new file (create-cluster.yaml) with the following contents:
apiVersion: eksctl.io/v1alpha5
kind: ClusterConfig

metadata:
  name: blog-eks-cluster
  region: us-east-1

managedNodeGroups:
  # On-Demand nodegroup for job submitter/controller
 - name:controller
   instanceTypes: ["m5.xlarge", "m5a.xlarge","m5d.xlarge"]
   desiredCapacity: 1

  # On-Demand nodegroup for team-1 Spark driver
 - name: team-1-driver
   labels: { team: team-1-spark-driver }
   taints: [{ key: team-1, value: general-purpose, effect: NoSchedule }]
   instanceTypes: ["m5.xlarge", "m5a.xlarge","m5d.xlarge"]
   desiredCapacity: 3

  # Spot nodegroup for team-1 Spark executors
 - name: team-1-executor
   labels: { team: team-1-spark-executor }
   taints: [{ key: team-1, value: general-purpose, effect: NoSchedule }]
   instanceTypes: ["m5.2xlarge", "m5a.2xlarge", "m5d.2xlarge"]
   desiredCapacity: 5
   spot: true

  # On-Demand nodegroup for team-2 Spark driver
 - name: team-2-driver
   labels: { team: team-2-spark-driver }
   taints: [{ key: team-2, value: compute-intensive , effect: NoSchedule }]
   instanceTypes: ["c5.xlarge", "c5a.xlarge", "c5d.xlarge"]
   desiredCapacity: 3

  # Spot nodegroup for team-2 Spark executors
 - name: team-2-executor
   labels: { team: team-2-spark-executor }
   taints: [{ key: team-2, value: compute-intensive , effect: NoSchedule }]
   instanceTypes: ["c5.2xlarge","c5a.2xlarge","c5d.2xlarge"]
   spot: true
   desiredCapacity: 5
  1. Install the AWS CLI.

You can use version 1.18.157 or later, or version 2.0.56 or later. The following command is for Linux OS:

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install

For other operating systems, see Installing, updating, and uninstalling the AWS CLI version.

  1. Install eksctl (you must have eksctl 0.34.0 version or later):
curl --silent --location "https://github.com/weaveworks/eksctl/releases/latest/download/eksctl_$(uname -s)_amd64.tar.gz" | tar xz -C /tmp
sudo mv /tmp/eksctl /usr/local/bin
eksctl version
  1. Install kubectl:
curl -o kubectl https://amazon-eks.s3.us-west-2.amazonaws.com/1.18.8/2020-09-18/bin/linux/amd64/kubectl
chmod +x ./kubectl
sudo mv ./kubectl /usr/local/bin
  1. Create the Amazon EKS cluster using the create-cluster.yaml config file created in earlier steps:
eksctl create cluster -f create-cluster.yaml

This step launches an Amazon EKS cluster with five managed node groups: two node groups each for team-1 and team-2, one node group for Spark drivers using EC2 On-Demand capacity, while another one for Spark executors using EC2 Spot capacity.

After the Amazon EKS cluster is created, run the following command to check the node groups:

eksctl get nodegroups --cluster blog-eks-cluster

You should see a response similar to the following screenshot.

Create an Amazon EMR virtual cluster

We launch the EMR virtual cluster in the default namespace:

eksctl create iamidentitymapping \
    --cluster blog-eks-cluster \
    --namespace default \
    --service-name "emr-containers"
aws emr-containers create-virtual-cluster \
--name blog-emr-on-eks-cluster \
--container-provider '{"id": "blog-eks-cluster","type": "EKS","info": {"eksInfo": {"namespace": "default"}} }'

The command creates an EMR virtual cluster in the Amazon EKS default namespace and outputs the virtual cluster ID:

{
    "id": "me9zfn2lbt241wxhxg81gjlxb",
    "name": "blog-emr-on-eks-cluster",
    "arn": "arn:aws:emr-containers:us-east-1:xxxx:/virtualclusters/me9zfn2lbt241wxhxg81gjlxb"
}

Note the ID of the EMR virtual cluster to use to run the jobs.

Set up an IAM role

In this step, we create an Amazon EMR Spark job execution role with the following IAM policy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:ListBucket"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "logs:PutLogEvents",
                "logs:CreateLogStream",
                "logs:DescribeLogGroups",
                "logs:DescribeLogStreams",
               "logs:CreateLogGroup"
            ],
            "Resource": [
                "arn:aws:logs:*:*:*"
            ]
        }
    ]
} 

Navigate to the IAM console to create the role. Let’s call the role EMR_EKS_Job_Execution_Role. For more information, see Creating IAM roles and Creating IAM Policies.

Set up the trust policy for the role with the following command:

aws emr-containers update-role-trust-policy \
       --cluster-name blog-eks-cluster \
       --namespace default \
       --role-name EMR_EKS_Job_Execution_Role

Enable IAM roles for service accounts (IRSA) on the Amazon EKS cluster:

eksctl utils associate-iam-oidc-provider --cluster blog-eks-cluster --approve

Create pod templates with node selectors and taints

In this step, we create pod templates for the Team-1 Spark driver pods and Spark executor pods, and templates for the Team-2 Spark driver pods and Spark executor pods.

Run the following commands to view the nodes corresponding to team-1 for the label team=team-1-spark-driver:

$ kubectl get nodes --selector team=team-1-spark-driver
NAME                              STATUS   ROLES    AGE    VERSION
ip-192-168-123-197.ec2.internal   Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-25-114.ec2.internal    Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-91-201.ec2.internal    Ready    <none>   107m   v1.20.4-eks-6b7464

Similarly, you can view the nodes corresponding to team-1 for the label team=team-1-spark-executor. You can repeat the same commands to view the nodes corresponding to team-2 by changing the role labels:

$ kubectl get nodes --selector team=team-1-spark-executor
NAME                             STATUS   ROLES    AGE    VERSION
ip-192-168-100-76.ec2.internal   Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-5-196.ec2.internal    Ready    <none>   67m    v1.20.4-eks-6b7464
ip-192-168-52-131.ec2.internal   Ready    <none>   78m    v1.20.4-eks-6b7464
ip-192-168-58-137.ec2.internal   Ready    <none>   107m   v1.20.4-eks-6b7464
ip-192-168-70-68.ec2.internal    Ready    <none>   107m   v1.20.4-eks-6b7464

You can constrain a pod so that it can only run on particular set of nodes. There are several ways to do this and the recommended approaches all use label selectors to facilitate the selection. In some circumstances, you may want to control which node the pod deploys to; for example, to ensure that a pod ends up on a machine with an SSD attached to it, or to co-locate pods from two different services that communicate a lot into the same availability zone.

nodeSelector is the simplest recommended form of node selection constraint. nodeSelector is a field of PodSpec. It specifies a map of key-value pairs. For the pod to be eligible to run on a node, the node must have each of the indicated key-value pairs as labels.

Taints are used to repel pods from specific nodes. Taints and tolerations work together to ensure that pods aren’t scheduled onto inappropriate nodes. One or more taints are applied to a node; this marks that the node shouldn’t accept any pods that don’t tolerate the taints. Amazon EKS supports configuring Kubernetes taints through managed node groups. Taints and tolerations are a flexible way to steer pods away from nodes or evict pods that shouldn’t be running. A few of the use cases are dedicated nodes: If you want to dedicate a set of nodes, such as GPU instances for exclusive use by a particular group of users, you can add a taint to those nodes, and then add a corresponding toleration to their pods.

nodeSelector provides a very simple way to attract pods to nodes with particular labels. Taints on the other hand are used to repel pods from specific nodes. You can apply taints to a team’s node group and use pod templates to apply a corresponding toleration to their workload. This ensures that only the designated team can schedule jobs to their node group. The label, using affinity, directs the application to the team’s designated node group and a toleration enables it to schedule over the taint. During the Amazon EKS cluster creation, we provided taints for each of the managed node groups. We create pod templates to specify both nodeSelector and tolerations to schedule work to a team’s node group.

  1. Create a new file team-1-driver-pod-template.yaml with the following contents:
apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-1-spark-driver
  tolerations:
  - key: "team-1"
    operator: "Equal"
    value: "general-purpose"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-driver

Here, we specify nodeSelector as team: team-1-spark-driver. This makes sure that Spark driver pods are running on nodes created as part of node group team-1-spark-driver, which we created for Team-1. At the same time, we have a toleration for nodes tainted as team-1.

  1. Create a new file team-1-executor-pod-template.yaml with the following contents:
apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-1-spark-executor
  tolerations:
  - key: "team-1"
    operator: "Equal"
    value: "general-purpose"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-executor

Here, we specify nodeSelector as team: team-1-spark-executor. This makes sure that Spark executor pods are running on nodes created as part of node group team-1-spark-executor, which we created for Team-1. At the same time, we have a toleration for nodes tainted as team-1.

  1. Create a new file team-2-driver-pod-template.yaml with the following contents:
apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-2-spark-driver
  tolerations:
  - key: "team-2"
    operator: "Equal"
    value: "compute-intensive"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-driver

Here, we specify nodeSelector as team: team-2-spark-driver. This makes sure that Spark driver pods are running on nodes created as part of node group team-2-spark-driver, which we created for Team-2. At the same time, we have a toleration for nodes tainted as team-2.

  1. Create a new file team-2-executor-pod-template.yaml with the following contents.
apiVersion: v1
kind: Pod
spec:
nodeSelector:
team: team-2-spark-executor
tolerations:
- key: "team-2"
operator: "Equal"
value: "compute-intensive"
effect: "NoSchedule"
containers:
- name: spark-kubernetes-executor

Here, we specify nodeSelector as team: team-2-spark-executor. This makes sure that Spark executor pods are running on nodes created as part of node group team-2-spark-executor, which we created for Team-2. At the same time, we have a toleration for nodes tainted as team-2.

Save the preceding pod template files to your S3 bucket or refer to them using the following links:

  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-driver-template.yaml
  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-executor-template.yaml
  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-driver-template.yaml
  • s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-executor-template.yaml

Submit Spark jobs

In this step, we submit the Spark jobs and observe the output.

Substitute the values of the EMR virtual cluster ID, EMR_EKS_Job_Execution_Role ARN, and S3_Bucket:

export EMR_EKS_CLUSTER_ID=<<EMR virtual cluster id>>
export EMR_EKS_EXECUTION_ARN=<<EMR_EKS_Job_Execution_Role ARN>>
export S3_BUCKET=<<S3_Bucket>>

Submit the Spark job:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=6 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.driver.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-driver-template.yaml",
          "spark.kubernetes.executor.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-executor-template.yaml"
         }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "blog-emr-eks"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }   
}'

After submitting the job, run the following command to check if the Spark driver and executor pods are created and running:

kubectl get pods

You should see output similar to the following:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
00000002uf8fut3g6o6-m4nfz 3/3 Running 0 25s
spark-00000002uf8fut3g6o6-driver 2/2 Running 0 14s

Let’s check the pods deployed on team-1’s On-Demand Instances:

$ for n in $(kubectl get nodes -l team=team-1-spark-driver --no-headers | cut -d " " -f1); do echo "Pods on instance ${n}:";kubectl get pods -n default --no-headers --field-selector spec.nodeName=${n} ; echo ; done

Pods on instance ip-192-168-27-110.ec2.internal:
No resources found in default namespace.

Pods on instance ip-192-168-59-167.ec2.internal:
spark-00000002uf8fut3g6o6-driver 2/2 Running 0 29s

Pods on instance ip-192-168-73-223.ec2.internal:
No resources found in default namespace.

Let’s check the pods deployed on team-1’s Spot Instances:

$ for n in $(kubectl get nodes -l team=team-1-spark-executor --no-headers | cut -d " " -f1); do echo "Pods on instance ${n}:";kubectl get pods -n default --no-headers --field-selector spec.nodeName=${n} ; echo ; done

Pods on instance ip-192-168-108-90.ec2.internal:
No resources found in default namespace.

Pods on instance ip-192-168-39-31.ec2.internal:
No resources found in default namespace.

Pods on instance ip-192-168-87-75.ec2.internal:
pythonpi-1623711779860-exec-3 0/2 Running 0 27s
pythonpi-1623711779937-exec-4 0/2 Running 0 26s

Pods on instance ip-192-168-88-145.ec2.internal:
pythonpi-1623711779748-exec-2 0/2 Running 0 27s

Pods on instance ip-192-168-92-149.ec2.internal:
pythonpi-1623711779097-exec-1 0/2 Running 0 28s
pythonpi-1623711780071-exec-5 0/2 Running 0 27s

When the executor pods are running, you should see output similar to the following:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
00000002uf8fut3g6o6-m4nfz 3/3 Running 0 56s
pythonpi-1623712009087-exec-1 0/2 Running 0 2s
pythonpi-1623712009603-exec-2 0/2 Running 0 2s
pythonpi-1623712009735-exec-3 0/2 Running 0 2s
pythonpi-1623712009833-exec-4 0/2 Running 0 2s
pythonpi-1623712009945-exec-5 0/2 Running 0 1s
spark-00000002uf8fut3g6o6-driver 2/2 Running 0 45s

To check the status of the jobs on Amazon EMR console, choose the cluster on the Virtual Clusters page. You can also check the Spark History Server by choosing View logs.

When the job is complete, go to Amazon CloudWatch Logs and check the output by choosing the log (/emr-containers/jobs/<<xxx-driver>>/stdout) on the Log groups page. You should see output similar to the following screenshot.

Now submit the Spark job as team-2 and specify the pod template files pointing to team-2’s driver and executor pod specifications and observe where the pods are created:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=6 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.driver.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-driver-template.yaml",
          "spark.kubernetes.executor.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-2-executor-template.yaml"
         }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "blog-emr-eks"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }   
}'

We can check the status of the job on the Amazon EMR console and also by checking the CloudWatch logs.

Now, let’s run a use case where Team-1 doesn’t specify the correct toleration in the Spark driver’s pod template. We use the following pod template. As per the toleration specification, Team-1 is trying to schedule a Spark driver pod on nodes with label team-1-spark-driver and also wants it to get scheduled over nodes tainted as team-2. Because team-1 doesn’t have any nodes with that specification, we should see an error.

apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    team: team-1-spark-driver
  tolerations:
  - key: "team-2"
    operator: "Equal"
    value: "compute-intensive"
    effect: "NoSchedule"
  containers:
  - name: spark-kubernetes-driver

Submit the Spark job using this new pod template:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_EXECUTION_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/pi.py",
        "sparkSubmitParameters": "--conf spark.executor.instances=6 --conf spark.executor.memory=2G --conf spark.executor.cores=2 --conf spark.driver.cores=1"
        }
    }' \
--configuration-overrides '{
    "applicationConfiguration": [
      {
        "classification": "spark-defaults", 
        "properties": {
          "spark.driver.memory":"2G",
          "spark.kubernetes.driver.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-driver-template-negative.yaml",
          "spark.kubernetes.executor.podTemplateFile":"s3://aws-data-analytics-workshops/emr-eks-workshop/scripts/team-1-executor-template.yaml"
         }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs", 
        "logStreamNamePrefix": "blog-emr-eks"
      }, 
      "s3MonitoringConfiguration": {
        "logUri": "'"$S3_BUCKET"'/logs/"
      }
    }   
}'

Run the following command to check the status of the Spark driver pod:

$ kubectl get pods
NAME READY STATUS RESTARTS AGE
00000002uott6onmu8p-7t64m 3/3 Running 0 36s
spark-00000002uott6onmu8p-driver 0/2 Pending 0 25s

Let’s describe the driver pod to check the details. You should notice a failed event similar to Warning FailedScheduling 28s (x3 over 31s) default-scheduler 0/17 nodes are available: 8 node(s) had taint {team-1: general-purpose}, that the pod didn't tolerate, 9 node(s) didn't match Pod's node affinity

$ kubectl describe pod <<driver-pod-name>>
Name:           spark-00000002uott6onmu8p-driver
Namespace:      default
......
QoS Class:       Burstable
Node-Selectors:  team=team-1-spark-driver
Tolerations:     node.kubernetes.io/not-ready:NoExecute op=Exists for 300s
                 node.kubernetes.io/unreachable:NoExecute op=Exists for 300s
                 team-2=compute-intensive:NoSchedule
Events:
  Type     Reason            Age                From               Message
  ----     ------            ----               ----               -------
  Warning  FailedScheduling  28s (x3 over 31s)  default-scheduler  0/17 nodes are available: 8 node(s) had taint {team-1: general-purpose}, that the pod didn't tolerate, 9 node(s) didn't match Pod's node affinity..

This shows that if the pod template doesn’t have the right tolerations, the tainted nodes don’t tolerate the pod and don’t schedule over those nodes.

Clean up

Don’t forget to clean up the resources you created to avoid any unnecessary charges.

  1. Delete all the virtual clusters that you created:
#List all the virtual cluster ids
aws emr-containers list-virtual-clusters
#Delete virtual cluster by passing virtual cluster id
aws emr-containers delete-virtual-cluster —id <virtual-cluster-id>
  1. Delete the Amazon EKS cluster:
eksctl delete cluster blog-eks-cluster
  1. Delete the EMR_EKS_Job_Execution_Role role and policies.

Summary

In this post, we saw how to create an Amazon EKS cluster, configure Amazon EKS managed node groups, create an EMR virtual cluster on Amazon EKS, and submit Spark jobs. With pod templates, we saw how to manage resource isolation between various teams when submitting jobs and also learned how to reduce cost by running Spark driver pods on EC2 On-Demand Instances and Spark executor pods on EC2 Spot Instances.

To get started with pod templates, try out the Amazon EMR on EKS workshop or see the following resources:


About the Author

Saurabh Bhutyani is a Senior Big Data specialist solutions architect at Amazon Web Services. He is an early adopter of open source Big Data technologies. At AWS, he works with customers to provide architectural guidance for running analytics solutions on Amazon EMR, Amazon Athena, AWS Glue, and AWS Lake Formation

Run and debug Apache Spark applications on AWS with Amazon EMR on Amazon EKS

Post Syndicated from Dipankar Kushari original https://aws.amazon.com/blogs/big-data/run-and-debug-apache-spark-applications-on-aws-with-amazon-emr-on-amazon-eks/

Customers today want to focus more on their core business model and less on the underlying infrastructure and operational burden. As customers migrate to the AWS Cloud, they’re realizing the benefits of being able to innovate faster on their own applications by relying on AWS to handle big data platforms, operations, and automation.

Many of AWS’s customers have migrated their big data workloads from on premises to Amazon Elastic Compute Cloud (Amazon EC2) and Amazon EMR, and process large amounts of data to get insights from it in a secure and cost-effective manner.

If you’re using open-source Apache Spark on Amazon Elastic Kubernetes Service (Amazon EKS) clusters to run your big data workloads, you may want to use Amazon EMR to eliminate the heavy lifting of installing and managing your frameworks and integrations with other AWS services.

In this post, we discuss how to run and debug Apache Spark applications with Amazon EMR on Amazon EKS.

Benefits of using Amazon EMR on EKS

Amazon EMR on EKS is primarily beneficial for two key audiences:

  • Users that are self-managing open-source applications on Amazon EKS – You can get the benefits of Amazon EMR by having the ability to use the latest fully managed versions of open-source big data analytics frameworks and optimized EMR runtime for Spark with two times faster performance than open-source Apache Spark. You can take advantage of the integrated developer experience for data scientists and developers with Amazon EMR Studio, and a fully managed persistent application user interface (Spark History Server) for simplified logging, monitoring, and debugging. Amazon EMR also provides native integrations with AWS services including Amazon CloudWatch, Amazon Simple Storage Service (Amazon S3), the AWS Glue Data Catalog, AWS Step Functions, and Amazon Managed Workflows for Apache Airflow (Amazon MWAA).
  • Existing Amazon EMR users – You can use Amazon EMR on EKS to improve resource utilization by simplifying infrastructure management and consolidating your Amazon EMR applications to run alongside other container-based applications on shared Amazon EKS clusters. You can also centrally manage infrastructure using familiar Kubernetes tools. Additionally, this provides the advantage of running different versions and configurations of the same runtime on a single Amazon EKS cluster with separation of compute, which is no longer tied to a specific analytics framework, version, or configuration.

With Amazon EMR on EKS, you can now let your teams focus on developing big data applications on Spark as rapidly as possible in a highly reliable, available, secure, and cost-efficient manner.

The following diagram shows a high-level representation of Amazon EMR on EKS. The architecture loosely coupled applications to the infrastructure that they run on. When you submit a job to Amazon EMR, your job definition contains all of its application-specific parameters. Amazon EMR uses these parameters to instruct Amazon EKS about which pods and containers to deploy. Amazon EKS then brings online the computing resources from Amazon EC2 and AWS Fargate required to run the job. With this loose coupling of services, you can run multiple, securely isolated jobs simultaneously.

Solution overview

In this post, we guide you through a step-by-step process of deploying an Amazon EMR on EKS cluster and then walk you through various options and techniques for troubleshooting your Apache Spark jobs.

We then show you how to run a Spark application on that cluster using NOAA Global Historical Climatology Network Daily (GHCN-D). This job reads weather data, joins it with weather station data, and produces an output dataset in Apache Parquet format that contains the details of precipitation readings for the US for 2011.

We also look at various options to monitor the Spark jobs and view the logs.

The following diagram illustrates our high-level architecture.

The solution contains the following deployment steps:

  1. Install and configure the prerequisites, including the AWS Command Line Interface (AWS CLI) kubectl, and eksctl.
  2. Provision the Amazon EKS cluster using an AWS CloudFormation stack.
  3. Configure the AWS CLI tools and create credentials and permissions.
  4. Provision compute and set up an EMR virtual cluster on Amazon EKS.
  5. Create the Amazon EMR Spark application.
  6. Run the Spark application.

Prerequisites

Before you get started, complete the following prerequisites:

  1. Install the AWS CLI v2.
  2. Install kubectl.
  3. Install eksctl.

Provision the Amazon EKS cluster using AWS CloudFormation

This post uses two CloudFormation stacks. You can download the CloudFormation templates we reference in this post from a public S3 bucket, or you can launch them directly from this post. AWS Identity and Access Management (IAM) roles are also provisioned as part of this step. For more information about the IAM permissions required to provision and manage an Amazon EKS cluster, see Using service-linked roles for Amazon EKS.

The CloudFormation template eks_cluster.yaml creates the following resources in your preferred AWS account and Region:

  • Network resources (one VPC, three public and three private subnets, and two security groups)
  • One S3 bucket required to store data and artifacts to run the Spark job
  • An Amazon EKS cluster with managed node groups with m5.2xlarge EC2 instances (configurable in the provided CloudFormation template)

For instructions on creating a stack, see Creating a stack on the AWS CloudFormation console.

  1. Choose Launch Stack to launch the stack via the AWS CloudFormation console.

The default parameter values are already populated for your convenience. Proceed with CloudFormation stack creation after verifying these values.

  1. For Stack name, enter emr-on-eks-stack.
  2. For ClusterName, enter eks-cluster.
  3. For EKSVersion, enter 1.19.
  4. For JobexecutionRoleName, enter eksjobexecutionrole.

CloudFormation stack creation should take 10–15 minutes. Make sure the stack is complete by verifying the status as CREATE_COMPLETE.

Additionally, you can verify that the Amazon EKS cluster was created using the following command, which displays the details of the cluster and shows the status as ACTIVE:

aws eks describe-cluster --name eks-cluster

Note the S3 bucket name (oS3BucketName) and the job execution role (rJobExecutionServiceRole) from the stack.

  1. We upload our artifacts (PySpark script) and data into the S3 bucket.

Configure the AWS CLI tools and create credentials and permissions

To configure the AWS CLI tools, credentials, and permissions, complete the following steps:

  1. Configure kubectl to use the Amazon EKS cluster (the kubectl and eksctl commands need to run with the same AWS profile used when deploying the CloudFormation templates):
    aws eks --region <<Your AWS Region>> update-kubeconfig --name eks-cluster

  2. Create a dedicated namespace for running Apache Spark jobs using Amazon EMR on EKS:
    kubectl create namespace emroneks

  3. To enable Amazon EMS on EKS to access the namespace we created, we have to create a Kubernetes role and Kubernetes user, and map the Kubernetes user to the Amazon EMR on EKS linked role:
    eksctl create iamidentitymapping --cluster eks-cluster --namespace emroneks --service-name "emr-containers"

To use IAM roles for service accounts, an IAM OIDC provider must exist for your cluster.

  1. Create an IAM OIDC identity provider for the Amazon EKS cluster:
eksctl utils associate-iam-oidc-provider --cluster eks-cluster –approve

When you use IAM roles for service accounts to run jobs on a Kubernetes namespace, an administrator must create a trust relationship between the job execution role and the identity of the Amazon EMR managed service account.

  1. The following command updates the trust relationship of the job execution role (refer to the preceding screenshot of the CloudFormation stack):
aws emr-containers update-role-trust-policy \
  --cluster-name eks-cluster \
  --namespace emroneks \
  --role-name eksjobexecutionrole

Provision compute and set up an EMR virtual cluster on Amazon EKS

For the minimum IAM permissions required to manage and submit jobs on the Amazon EMR on EKS cluster, see Grant users access to Amazon EMR on EKS. The roles are provisioned as part of this step.

Use the second CloudFormation template (emr_virtual_cluster.yaml) to create the following resources in the same preferred AWS account and Region:

  • Amazon EMR virtual cluster
  • Amazon EKS managed node groups
  1. Choose Launch Stack to launch the stack via the AWS CloudFormation console.

The default values are already populated for your convenience. Proceed with stack creation after verifying these values.

  1. For Stack name, enter EMRvirtualcluster.
  2. For ClusterStackName, enter emr-on-eks-stack.
  3. For Namespace, enter emroneks.
  4. For NodeAutoscalinggroupDesiredCapacity, enter 1.
  5. For NodeAutoScalingGroupMaxSize, enter 1.
  6. For NodeInstanceType, enter m5.2xlarge.

Stack creation should take 10–15 minutes. Make sure the stack is complete by verifying the status as CREATE_COMPLETE.

Note the oEMRvirtualclusterID value as the output of the stack. We use this virtualclusterID to submit our Spark application.

Additionally, you can verify that the node groups are set up correctly using the following commands:

aws eks list-nodegroups --cluster-name eks-cluster

You receive the following result:

{
    "nodegroups": [
        "emr-virtual-cluster-NodeGroup"
    ]
}

You can verify the details of the nodes with the following command (use the node group name from the preceding command):

aws eks describe-nodegroup --cluster-name eks-cluster --nodegroup-name emr-virtual-cluster-NodeGroup

This lists the details of all the nodes provisioned, the instance type, and subnet associations, among other details.

You’re now ready to create and run a Spark application on the cluster.

Create an Amazon EMR Spark application

To create the PySpark job, perform the following steps:

  1. Copy the NOAA Open data registry 2011 Weather Station data and the Weather Station Lookup data and save the files under the s3://<<Your S3 Bucket>>/noaa/csv.gz/ prefix.
    1. To copy the 2011 Weather Station data, use the following AWS CLI command:
      aws s3 cp s3://noaa-ghcn-pds/csv.gz/2011.csv.gz s3://<<Your S3 Bucket>>/noaa/csv.gz/2011.csv.gz

    2. To copy the Weather Station Lookup data, use the following AWS CLI command:
      aws s3 cp s3://noaa-ghcn-pds/ghcnd-stations.txt s3://<<Your S3 Bucket>>/noaa/ghcnd-stations.txt

You can find the value for <<Your S3 Bucket>> in the oS3Bucketname key on the Outputs tab for the emr-on-eks-stack CloudFormation stack.

  1. Download the PySpark script and upload it under s3://<<Your S3 Bucket>>/scripts/.

Run the Spark application

We run the Spark job using the AWS CLI. The parameters for the job (virtual cluster ID, script location, parameters) are mentioned in the JSON file.

  1. Save the following JSON template as jobparameters.json in a local folder (for example, /path/to/jobparameters.json):
{
  "name": "emr-on-eks-spark-job",
  "virtualClusterId": "<virtualclusterid>",
  "executionRoleArn": "arn:aws:iam::<<Your AWS Account Number>>:role/eksjobexecutionrole",
  "releaseLabel": "emr-6.2.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://<<Your S3 Bucket>>/scripts/etl.py",
          "entryPointArguments": ["s3://<<Your S3 Bucket>>/"],
       "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4"
    }
  },
  "configurationOverrides": {
    "applicationConfiguration": [
      {
        "classification": "spark-defaults",
        "properties": {
"spark.scheduler.minRegisteredResourcesRatio": "0.8",
          "spark.scheduler.maxRegisteredResourcesWaitingTime": "300s" }
      }
    ],
    "monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs",
        "logStreamNamePrefix": "emreksblog"
      },
      "s3MonitoringConfiguration": {
        "logUri": "s3://<<Your S3 Bucket>>/joblogs"
      }
    }
  }
}

The configurationOverrides section is optional and can be used to backport any Spark configurations that are set for jobs running in Amazon EMR on EC2. The Spark job runs successfully without any additional configuration changes.

  1. Modify the following keys in your JSON file (/path/to/jobparameters.json):
    1. virtualClusterId – The ID of the EMR cluster on Amazon EKS. You can get this by looking at the oEMRvirtualclusterID output from the CloudFormation template or by running the following code:
      aws emr-containers list-virtual-clusters \
      --state RUNNING \
      --query 'virtualClusters[?containerProvider.info.eksInfo.namespace==`emroneks`]'

    2. executionRoleArn – The ARN of the role created in the CloudFormation template. Replace <<Your AWS Account Number>> with the AWS account number you deploy this stack in.
    3. entryPoint – The value of the path to the ETL script S3 bucket provisioned in the CloudFormation stack (for example, s3://<<Your S3 Bucket>>/scripts/etl.py).
    4. entryPointArguments – The Spark job accepts one argument—the S3 bucket name where the data files are stored (s3://<<Your S3 Bucket>>/).
    5. logUri – The path were the controller logs, Spark driver, and executor logs are written into. Enter it as s3://<<Your S3 Bucket>>/ joblogs.
    6. cloudWatchMonitoringConfiguration – The CloudWatch log group details where logs are published. Enter the value for logGroupName as /emr-containers/jobs and logStreamNamePrefix as emreksblog.

You can change the sparkSubmitParameters parameter in the preceding JSON as per your needs, but your node groups must have the right capacity to accommodate the combination of Spark executors, memory, and cores that you define in sparkSubmitParameters. The preceding configuration works for the cluster we provisioned through the CloudFormation template without any changes.

  1. Submit the job with the following AWS CLI command:
    aws emr-containers start-job-run --cli-input-json file://path/to/jobparameters.json

This returns a response with the job ID, which we can use to track the status of the job:

{
    "id": "00000002ucgkgj546u1",
    "name": "emr-on-eks-spark-job",
    "arn": "arn:aws:emr-containers:region:accountID:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkgj546u1",
    "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1"
}

You can get the status of a job by running the following command:

aws emr-containers describe-job-run --id <your job run id>   --virtual-cluster-id <<your virtualcluster id>> 

You can observe the status change from SUBMITTED to RUNNING to COMPLETED or FAILED.

{
    "jobRun": {
        "id": "00000002ucgkstiadcs",
        "name": "emr-on-eks-spark-job",
        "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1",
        "arn": "arn:aws:emr-containers:<<region>>:<<Your AWS Account Number>>:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkstiadcs",
        "state": "SUBMITTED",
        "clientToken": "52203410-3e55-4294-a548-dc9212d10b37",


{
    "jobRun": {
        "id": "00000002ucgkstiadcs",
        "name": "emr-on-eks-spark-job",
        "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1",
        "arn": "arn:aws:emr-containers:<<region>>: :<<Your AWS Account Number>>:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkstiadcs",
        "state": "RUNNING",
        "clientToken": "52203410-3e55-4294-a548-dc9212d10b37",

{
    "jobRun": {
        "id": "00000002ucgkstiadcs",
        "name": "emr-on-eks-spark-job",
        "virtualClusterId": "mdeljhfj0ejq5iprtzchljuh1",
        "arn": "arn:aws:emr-containers:<<region>>: :<<Your AWS Account Number>>:/virtualclusters/mdeljhfj0ejq5iprtzchljuh1/jobruns/00000002ucgkstiadcs",
        "state": "COMPLETED",
        "clientToken": "52203410-3e55-4294-a548-dc9212d10b37",

When the job state changes to COMPLETED, you can see a prefix in your S3 bucket called noaaparquet with a dataset created within the prefix.

If the job status reaches the FAILED state, you can troubleshoot by going through the details found in the CloudWatch logs or the logs written into Amazon S3. For details on how to access and use those logs, refer to the following debugging section.

Occasionally, you may notice that the job is stuck in SUBMITTED status for a long time. This could be due to the fact that the Amazon EKS cluster is running other jobs and doesn’t have available capacity. When the existing job is complete, your job changes to the RUNNING state.

Another scenario could be that you set the driver and executor memory requirements in your Spark configuration (jobparameters.json) to more than what is available. Consider adjusting the spark.executor.memory and spark.driver.memory values based on the instance type in your node group. See the following code:

{
  "name": "emr-on-eks-spark-job",
  "virtualClusterId": "<virtualclusterid>",
  "executionRoleArn": "arn:aws:iam::<<Your AWS Account Number>>:role/eksjobexecutionrole",
  "releaseLabel": "emr-6.2.0-latest",
  "jobDriver": {
    "sparkSubmitJobDriver": {
      "entryPoint": "s3://<<Your S3 Bucket>>/scripts/etl.py",
          "entryPointArguments": ["s3://<<Your S3 Bucket>>/"],
       "sparkSubmitParameters": "--conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.driver.memory=2G --conf spark.executor.cores=4"

If your job is stuck or failing due to insufficient capacity, consider increasing the number of nodes in your node group or setting up the Amazon EKS Cluster Autoscaler. Refer to the debugging section for additional details from the Kubernetes dashboard.

For additional information on Amazon EMR on EKS fundamentals, refer to the appendix at the end of this post.

Debug your Spark application

Amazon EMR on EKS provides multiple options to debug and view the logs of the Spark application.

For issues specific to Spark applications, use Spark History Server, CloudWatch logs, or logs on Amazon S3, which we discuss in this section.

For troubleshooting issues, such as your jobs aren’t starting (job status in SUBMITTED state) or issues with Spark drivers, start with Kubernetes dashboard or kubectl CLI commands, discussed in detail in this section.

Spark History Server

Spark History Server provides an elaborate web UI that allows us to inspect various components of our applications. It offers details on memory usage, jobs, stages, and tasks, as well as event timelines, logs, and various metrics and statistics both at the Spark driver level and for individual executors. It shows collected metrics and the state of the program, revealing clues about possible performance bottlenecks that you can utilize for tuning and optimizing the application. You can look at the Spark History Server (in the Spark UI) from the Amazon EMR console to see the driver and executor logs, as long as you have Amazon S3 logging enabled (which we enabled as part of the job submission JSON payload). The Spark UI is available even after the job is complete and the cluster is stopped. For more information on troubleshooting, see How do I troubleshoot a failed Spark step in Amazon EMR?

The following screenshots show the Spark UI of the job submitted on the cluster.

Choose a specific app ID to see the details of the Spark SQL and stages that ran. This helps you see the explain plan of the query and rows processed by each stage to narrow down any bottlenecks in your process.

If you don’t see the Spark UI link enabled or you see an error message “Unable to launch application UI,” verify the parameter s3MonitoringConfiguration in the jobparameters.json to ensure that a valid S3 path is provided. Additionally, ensure that the job execution role has appropriate permissions to access the S3 bucket. This was defined in the CloudFormation template that you deployed earlier. See the following code:

"monitoringConfiguration": {
      "cloudWatchMonitoringConfiguration": {
        "logGroupName": "/emr-containers/jobs",
        "logStreamNamePrefix": "emreksblog"
      },
      "s3MonitoringConfiguration": {
        "logUri": "s3://<<Your S3 Bucket>>/joblogs"
      }
    }

To increase the logging level of the Spark application to DEBUG, update the spark-log4j configuration. For instructions, see Change Log level for Spark application on EMR on EKS.

CloudWatch logs

In the preceding jobparameters.json, the log group name was /emr-containers/jobs and the prefix was emrjobs. You can access logs via the CloudWatch console for this prefix.

The path for various types of logs available in CloudWatch are as follows:

  • Controller logslogGroup/logStreamPrefix/virtual-cluster-id/jobs/job-id/containers/pod-name/(stderr/stdout)
  • Driver logslogGroup/logStreamPrefix/virtual-cluster-id/jobs/job-id/containers/spark-application-id/spark-job-id-driver/(stderrstdout)
  • Executor logs – logGroup/logStreamPrefix/virtual-cluster-id/jobs/job-id/containers/spark-application-id/executor-pod-name/(stderr/stdout)

In the jobparameters.json configuration, logGroup is set as /emr-containers/jobs and logStreamPrefix is set as emreksblog.

Here you can retrieve the Spark driver and executor logs to view additional details and stack trace of any error messages when your Spark job has failed.

You can filter the CloudWatch log stream by driver/stdout to see the output and driver/stderr to see details of errors from your Spark job.

The following are some common scenarios to verify in case the logs aren’t available in CloudWatch:

  • Ensure that the log group parameter is defined in jobparameters.json under monitoringConfiguration (refer to the JSON file for the details of parameters):
        "monitoringConfiguration": {
          "cloudWatchMonitoringConfiguration": {
            "logGroupName": "/emr-containers/jobs",
            "logStreamNamePrefix": "emreksblog"
          },

  • Ensure that the service role associated with the Amazon EMR on EKS cluster has access to write to the CloudWatch log group. The CloudFormation template you deployed has the policy associated with the IAM role to grant appropriate permissions to allow access to write to the log groups. For additional IAM policy examples, see Using identity-based policies (IAM policies) for CloudWatch Logs.

Amazon S3 logs

In the configuration, the log path is listed as S3://<Your S3 bucket>/joblogs under the corresponding job ID.

You can go to S3 bucket you specified to check the logs. Your log data is sent to the following Amazon S3 locations depending on the types of logs:

  • Controller logsS3://<Your S3 bucket>//joblogs/virtual-cluster-id/jobs/job-id/containers/pod-name/(stderr.gz/stdout.gz)
  • Driver logsS3://<Your S3 bucket>//joblogs/virtual-cluster-id/jobs/job-id/containers/spark-application-id/spark-job-id-driver/(stderr.gz/stdout.gz)
  • Executor logsS3://<Your S3 bucket>//joblogs/virtual-cluster-id/jobs/job-id/containers/spark-application-id/executor-pod-name/(stderr.gz/stdout.gz)

Here you can retrieve the Spark driver and executor logs to view additional details and stack trace of any error messages when your Spark job has failed.

Kubernetes dashboard

You can view and monitor the online logs of a running Spark job in a Kubernetes dashboard. The dashboard provides information on the state of Kubernetes resources in your cluster and any errors that may have occurred while the job is running. The logs are only accessible through the Kubernetes dashboard while the cluster is running the job. The dashboard is a useful way to quickly identify any issue with the job while it’s running and the logs are getting written.

For details on how to deploy, set up, and view the dashboard, see Tutorial: Deploy the Kubernetes Dashboard (web UI). After you deploy the Kubernetes dashboard and launch it, complete the following steps to see the details of your job run:

  1. Choose the right namespace that was registered with the EMR virtual cluster for the Amazon EKS cluster.
  2. Choose Pods in the navigation pane to see all the running pods.
  3. Choose the additional options icon (three vertical dots) to open logs for each pod.

The following screenshot shows the Spark driver that was spawned when the Spark job was submitted to the EMR virtual cluster.

  1. Choose the spark-kubernetes-executor container log to see the running online logs of your Spark job.

The following screenshots show the running log of the Spark application while it’s running on the EMR virtual cluster.

  1. Choose Pods to see the CPU and memory consumption of the individual POD running the application.

The following screenshots the CPU and memory usage of the Spark application for the duration of the job. This helps determine if you have provisioned adequate capacity for your jobs.

In case of insufficient capacity with memory or CPU, you see the following error. You can choose the pod to see additional details.

Kubernetes CLI

You can view Spark driver and Spark executer logs using the Kubernetes CLI (kubectl). Logs are accessible through the Kubernetes CLI while the cluster is running the job.

  1. Get the name of the Spark driver and Spark executor pods in the emroneks namespace:
kubectl get pods -n emroneks

You see multiple pods for the Spark driver and executors that are currently running.

  1. Use the pod name for the driver to see the driver logs:
    kubectl logs <Spark driver pod name> -n emroneks -c spark-kubernetes-driver

  2. Use the pod name for the executors to see the executor logs:
    kubectl logs <Spark executor pod name> -n emroneks -c spark-kubernetes-executor

For more issues and resolutions when running jobs on Amazon EMR on EKS, see Common errors when running jobs.

Clean up

When you’re done using this solution, you should delete the following CloudFormation stacks, via the CloudFormation console, to avoid incurring any further charges:

  • EMRvirtualcluster
  • emr-on-eks-stack

Conclusion

This post describes how you can run your existing Apache Spark workloads on Amazon EMR on EKS. The use case demonstrates setting up the infrastructure, and running and monitoring your Spark job. We also showed you various options and techniques to debug and troubleshoot your jobs.

Amazon EMR also provides the capability to perform data analysis and data engineering tasks in a web-based integrated development environment (IDE), using fully managed Jupyter notebooks. Refer to this post to set up EMR Studio with EMR on EKS.


Appendix: Explaining the solution

In this solution, we first built an Amazon EKS cluster using a CloudFormation template and registered it with Amazon EMR. Then we submitted a Spark job using the AWS CLI on the EMR virtual cluster on Amazon EKS. Let’s look at some of the important concepts related to running a Spark job on Amazon EMR on EKS.

Kubernetes namespaces

Amazon EKS uses Kubernetes namespaces to divide cluster resources between multiple users and applications. These namespaces are the foundation for multi-tenant environments. A Kubernetes namespace can have both Amazon EC2 and Fargate as the compute provider. Fargate selection for pods can be done using user-defined Fargate profiles. This flexibility provides different performance and cost options for the Spark jobs to run on. In this post, we provisioned an Amazon EKS cluster with node groups containing an m5.2x large EC2 instance.

Virtual cluster

A virtual cluster is a Kubernetes namespace that Amazon EMR is registered with. Amazon EMR uses virtual clusters to run jobs and host endpoints. Multiple virtual clusters can be backed by the same physical cluster, and each virtual cluster maps to one namespace on an Amazon EKS cluster.

Job run

A job run is a unit of work, such as a Spark JAR (Scala or Java application), PySpark script, or SparkSQL query, that you submit to Amazon EMR on EKS. One job can have multiple job runs. When you submit a job run, it should include the following information:

  • A virtual cluster where the job should run
  • A job name to identify the job
  • The execution role, which is a scoped IAM role that runs the job (in a Kubernetes service account), is used to run the pod, and allows you to specify which resources can be accessed by the job
  • The Amazon EMR release label that specifies the version of Amazon EMR Spark to use
  • The artifacts to use when submitting your job, such as spark-submit parameters

Amazon EMR containers

An Amazon EMR container is the API name for Amazon EMR on EKS. The emr-containers prefix is used in the following scenarios:

  • In the AWS CLI commands for Amazon EMR on EKS. For example, aws emr-containers start-job-run.
  • Before IAM policy actions for Amazon EMR on EKS. For example, "Action": [ "emr-containers:StartJobRun"]. For more information, see Policy actions for Amazon EMR on EKS.
  • In Amazon EMR on EKS service endpoints. For example, emr-containers.us-east-1.amazonaws.com.

In the solution overview, we went step by step through how we used above resources to create the Amazon EMR on EKS cluster and run a Spark job. For further details on these concepts, see Concepts.


About the Authors

Dipankar Kushari is a Senior Analytics Solutions Architect with AWS, helping customers build analytics platform and solutions. He has a keen interest in distributed computing. Dipankar enjoys spending time playing chess and watching old Hollywood movies.

 

 

 

Ashok Padmanabhan is a big data consultant with AWS Professional Services, helping customers build big data and analytics platform and solutions. When not building and designing data lakes, Ashok enjoys spending time at beaches near his home in Florida.

 

Gaurav Gundal is a DevOps consultant with AWS Professional Services, helping customers build solutions on the customer platform. When not building, designing, or developing solutions, Gaurav spends time with his family, plays guitar, and enjoys traveling to different places.

 

Naveen Madhire is a Big Data Architect with AWS Professional Services, helping customers create data lake solutions on AWS. Outside of work, he loves playing video games and watching crime series on TV.

Run a Spark SQL-based ETL pipeline with Amazon EMR on Amazon EKS

Post Syndicated from Melody Yang original https://aws.amazon.com/blogs/big-data/run-a-spark-sql-based-etl-pipeline-with-amazon-emr-on-amazon-eks/

This blog post has been translated into the following languages:

Increasingly, a business’s success depends on its agility in transforming data into actionable insights, which requires efficient and automated data processes. In the previous post – Build a SQL-based ETL pipeline with Apache Spark on Amazon EKS, we described a common productivity issue in a modern data architecture. To address the challenge, we demonstrated how to utilize a declarative approach as the key enabler to improve efficiency, which resulted in a faster time to value for businesses.

Generally speaking, managing applications declaratively in Kubernetes is a widely adopted best practice. You can use the same approach to build and deploy Spark applications with open-source or in-house build frameworks to achieve the same productivity goal.

For this post, we use the open-source data processing framework Arc, which is abstracted away from Apache Spark, to transform a regular data pipeline to an “extract, transform, and load (ETL) as definition” job. The steps in the data pipeline are simply expressed in a declarative definition (JSON) file with embedded declarative language SQL scripts.

The job definition in an Arc Jupyter notebook looks like the following screenshot.

This representation makes ETL much easier for a wider range of personas: analysts, data scientists, and any SQL authors who can fully express their data workflows without the need to write code in a programming language like Python.

In this post, we explore some key advantages of the latest Amazon EMR deployment option Amazon EMR on Amazon EKS to run Spark applications. We also explain its major difference from the commonly used Spark resource manager YARN, and demonstrate how to schedule a declarative ETL job with EMR on EKS. Building and testing the job on a custom Arc Jupyter kernel is out of scope for this post. You can find more tutorials on the Arc website.

Why choose Amazon EMR on Amazon EKS?

The following are some of the benefits of EMR on EKS:

  • Simplified architecture by unifying workloads – EMR on EKS enables us to run Apache Spark workloads on Amazon Elastic Kubernetes Service (Amazon EKS) without provisioning dedicated EMR clusters. If you have an existing Amazon EKS landscape in your organization, it makes sense to unify analytical workloads with other Kubernetes-based applications on the same Amazon EKS cluster. It improves resource utilization and significantly simplifies your infrastructure management.
  • More resources to share with a smaller JVM footprint – A major difference in this deployment option is the resource manager shift from YARN to Kubernetes and from a Hadoop cluster manager to a generic containerized application orchestrator. As shown in the following diagram, each Spark executor runs as a YARN container (compute and memory unit) in Hadoop. Broadly, YARN creates a JVM in each container requested by Hadoop applications, such as Apache Hive. When you run Spark on Kubernetes, it keeps your JVM footprint minimal, so that the Amazon EKS cluster can accommodate more applications, resulting in more spare resources for your analytical workloads.

  • Efficient resource sharing and cost savings – With the YARN cluster manager, if you want to reuse the same EMR cluster for concurrent Spark jobs to reduce cost, you have to compromise on resource isolation. Additionally, you have to pay for compute resources that aren’t fully utilized, such as a master node, because only Amazon EMR can use these unused compute resources. With EMR on EKS, you can enjoy the optimized resource allocation feature by sharing them across all your applications, which reduces cost.
  • Faster EMR runtime for Apache Spark – One of the key benefits of running Spark with EMR on EKS is the faster EMR runtime for Apache Spark. The runtime is a performance-optimized environment, which is available and turned on by default on Amazon EMR release 5.28.0 and later. In our performance tests using TPC-DS benchmark queries at 3 TB scale, we found EMR runtime for Apache Spark 3.0 provides a 1.7 times performance improvement on average, and up to 8 times improved performance for individual queries over open-source Apache Spark 3.0.0. It means you can run your Apache Spark applications faster and cheaper without requiring any changes to your applications.
  • Minimum setup to support multi-tenancy – While taking advantage of Spark’s Dynamic Resource Allocation, auto scaling in Amazon EKS, high availability with multiple Availability Zones, you can isolate your workloads for multi-tenancy use cases, with a minimum configuration required. Additionally, without any infrastructure setup, you can use an Amazon EKS cluster to run a single application that requires different Apache Spark versions and configurations, for example for development vs test environments.

Cost effectiveness

EMR on EKS pricing is calculated based on the vCPU and memory resources used from the time you start to download your Amazon EMR application image until the Spark pod on Amazon EKS stops, rounded up to the nearest second. The following screenshot is an example of the cost in the us-east-1 Region.

The Amazon EMR uplift price is in addition to the Amazon EKS pricing and any other services used by Amazon EKS, such as EC2 instances and EBS volumes. You pay $0.10 per hour for each Amazon EKS cluster that you use. However, you can use a single Amazon EKS cluster to run multiple applications by taking advantage of Kubernetes namespaces and AWS Identity and Access Management (IAM) security policies.

While the application runs, your resources are allocated and removed automatically by the Amazon EKS auto scaling feature, in order to eliminate over-provisioning or under-utilization of these resources. It enables you to lower costs because you only pay for the resources you use.

To further reduce the running cost for jobs that aren’t time-critical, you can schedule Spark executors on Spot Instances to save up to 90% over On-Demand prices. In order to maintain the resiliency of your Spark cluster, it is recommended to run driver on a reliable On-Demand Instance, because the driver is responsible for requesting new executors to replace failed ones when an unexpected event happens.

Kubernetes comes with a YAML specification called a pod template that can help you to assign Spark driver and executor pods to Spot or On-Demand EC2 instances. You define nodeSelector rules in pod templates, then upload to Amazon Simple Storage Service (Amazon S3). Finally, at the job submission, specify the Spark properties spark.kubernetes.driver.podTemplateFile and spark.kubernetes.executor.podTemplateFile to point to the pod templates in Amazon S3.

For example, the following is the code for executor_pod_template.yaml:

apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    eks.amazonaws.com/capacityType: SPOT

The following is the code for driver_pod_template.yaml:

apiVersion: v1
kind: Pod
spec:
  nodeSelector:
    eks.amazonaws.com/capacityType: ON_DEMAND

The following is the code for the Spark job submission:

aws emr-containers start-job-run \
--virtual-cluster-id ${EMR_EKS_CLUSTER_ID} \
--name spark-pi-pod-template \
--execution-role-arn ${EMR_EKS_ROLE_ARN} \
--release-label emr-5.33.0-latest \
--job-driver '{
    "sparkSubmitJobDriver": {
        "entryPoint": "s3://'${s3DemoBucket}'/someAppCode.py",
        "sparkSubmitParameters": "--conf spark.kubernetes.driver.podTemplateFile=\"s3://'${s3DemoBucket}'/pod_templates/driver_pod_template.yaml\" --conf spark.kubernetes.executor.podTemplateFile=\"s3://'${s3DemoBucket}'/pod_templates/executor_pod_template.yaml\" --conf spark.executor.instances=2 --conf spark.executor.memory=2G --conf spark.executor.cores=2"
	}
    }'

Beginning with Amazon EMR versions 5.33.0 or 6.3.0, Amazon EMR on EKS supports the Amazon S3-based pod template feature. If you’re using an unsupported Amazon EMR version, such as EMR 6.1.0, you can use the pod template feature without Amazon S3 support. Make sure your Spark version is 3.0 or later, and copy the template files into your custom Docker image. The job submit script is changed to the following:

"--conf spark.kubernetes.driver.podTemplateFile=/local/path/to/driver_pod_template.yaml" 
"--conf spark.kubernetes.executor.podTemplateFile=/local/path/to/executor_pod_template.yaml"

Serverless compute option: AWS Fargate

The sample solution runs on an Amazon EKS cluster with AWS Fargate. Fargate is a serverless compute engine for Amazon EKS and Amazon ECS. It makes it easy for you to focus on building applications because it removes the need to provision and manage EC2 instances or managed node groups in EKS. Fargate runs each task or pod in its own kernel, providing its own isolated compute environment. This enables your application to have resource isolation and enhanced security by design.

With Fargate, you don’t need to be an expert in Kubernetes operations. It automatically allocates the right amount of compute, eliminating the need to choose instances and scale cluster capacity, so the Kubernetes Cluster Autoscaler is no longer required to increase your cluster’s compute capacity.

In our instance, each Spark executor or driver is provisioned by a separate Fargate pod, to form a Spark cluster dedicated to an ETL pipeline. You only need to specify and pay for resources required to run the application—no more concerns about complex cluster management, queues, and isolation trade-offs.

Other Amazon EC2 options

In addition to the serverless choice, EMR on EKS can operate on all types of EKS clusters. For example, Amazon EKS managed node groups with Amazon Elastic Compute Cloud (Amazon EC2) On-Demand and Spot Instances.

Previously, we mentioned placing a Spark driver pod on an On-Demand Instance to reduce interruption risk. To further improve your cluster stability, it’s important to understand the Kubernetes high availability and restart policy features. These allow for exciting new possibilities, not only in computational multi-tenancy, but also in the ability of application self-recovery, for example relaunching a Spark driver on Spot or On-Demand instances. For more information and an example, see the GitHub repo.

As of this writing, our test result shows that a Spark driver can’t restart on failure with the EMR on EKS deployment type. So be mindful when designing a Spark application with the minimum downtime need, especially for a time-critical job. We recommend the following:

  • Diversify your Spot requests – Similar to Amazon EMR’s instance fleet, EMR on EKS allows you to deploy a single application across multiple instance types to further enhance availability. With Amazon EC2 Spot best practices, such as capacity rebalancing, you can diversify the Spot request across multiple instance types within each Availability Zone. It limits the impact of Spot interruptions on your workload, if a Spot Instance is reclaimed by Amazon EC2. For more details, see Running cost optimized Spark workloads on Kubernetes using EC2 Spot Instances.
  • Increase resilience – Repeatedly restarting a Spark application compromises your application performance or the length of your jobs, especially for time-sensitive data processes. We recommend the following best practice to increase your application resilience:
    • Ensure your job is stateless so that it can self-recover without waiting for a dependency.
    • If a checkpoint is required, for example in a stateful Spark streaming ETL, make sure your checkpoint storage is decoupled from the Amazon EKS compute resource, which can be detached and attached to your Kubernetes cluster via the persistent volume claims (PVCs), or simply use S3 Cloud Storage.
    • Run the Spark driver on the On-Demand Instance defined by a pod template. It adds an extra layer of resiliency to your Spark application with EMR on EKS.

Security

EMR on EKS inherits the fine-grained security feature IAM roles for service accounts, (IRSA), offered by Amazon EKS. This means your data access control is no longer at the compute instance level, instead it’s granular at the container or pod level and controlled by an IAM role. The token-based authentication approach allows us to use one of the AWS default credential providers WebIdentityTokenCredentialsProvider to exchange the Kubernetes-issued token for IAM role credentials. It makes sure our applications deployed with EMR on EKS can communicate to other AWS services in a secure and private channel, without the need to store a long-lived AWS credential pair as a Kubernetes secret.

To learn more about the implementation details, see the GitHub repo.

Solution overview

In this example, we introduce a quality-aware design with the open-source declarative data processing Arc framework to abstract Spark technology away from you. It makes it easy for you to focus on business outcomes, not technologies.

We walk through the steps to run a predefined Arc ETL job with the EMR on EKS approach. For more information, see the GitHub repo.

The sample solution launches a serverless Amazon EKS cluster, loads TLC green taxi trip records from a public S3 bucket, applies dataset schema, aggregates the data, and outputs to an S3 bucket as Parquet file format. The sample Spark application is defined as a Jupyter notebook green_taxi_load.ipynb powered by Arc, and the metadata information is defined in green_taxi_schema.json.

The following diagram illustrates our solution architecture.

Launch Amazon EKS

Provisioning takes approximately 20 minutes.

To get started, open AWS CloudShell in the us-east-1 Region. Run the following command to provision the new cluster eks-cluster, backed by Fargate. Then build the EMR virtual cluster emr-on-eks-cluster:

curl https://raw.githubusercontent.com/aws-samples/sql-based-etl-on-amazon-eks/main/emr-on-eks/provision.sh | bash

At the end of the provisioning, the shell script automatically creates an EMR virtual cluster by the following command. It registers Amazon EMR with the newly created Amazon EKS cluster. The dedicated namespace on the EKS is called emr.

Deploy the sample ETL job

  1. When provisioning is complete, submit the sample ETL job to EMR on EKS with a serverless virtual cluster called emr-on-eks-cluster:
curl https://raw.githubusercontent.com/aws-samples/sql-based-etl-on-amazon-eks/main/emr-on-eks/submit_arc_job.sh | bash

It runs the following job summit command:

The declarative ETL job can be found on the blogpost’s GitHub repository. This is a screenshot of the job specification:

  1. Check your job progress and auto scaling status:
kubectl get pod -n emr
kubectl get node --label-columns=topology.kubernetes.io/zone
  1. As the job requested 10 executors, it automatically scales the Spark application from 0 to 10 pods (executors) on the EKS cluster. The Spark application automatically removes itself from the EKS when the job is done.

  1. Navigate to your Amazon EMR console to view application logs on the Spark History Server.

  1. You can also check the logs in CloudShell, as long as your Spark Driver is running:
driver_name=$(kubectl get pod -n emr | grep "driver" | awk '{print $1}')
kubectl logs ${driver_name} -n emr -c spark-kubernetes-driver | grep 'event'

Clean up

To clean up the AWS resources you created, run the following code:

curl https://raw.githubusercontent.com/aws-samples/sql-based-etl-on-amazon-eks/main/emr-on-eks/deprovision.sh | bash

Region support

At the time of this writing, Amazon EMR on EKS is available in US East (N. Virginia), US West (Oregon), US West (N. California), US East (Ohio), Canada (Central), Europe (Ireland, Frankfurt, and London), and Asia Pacific (Mumbai, Seoul, Singapore, Sydney, and Tokyo) Regions. If you want to use EMR on EKS in a Region that isn’t available yet, check out the open-source Apache Spark on Amazon EKS alternative on aws-samples GitHub. You can deploy the sample solution to your Region as long as Amazon EKS is available. Migrating a Spark workload on Amazon EKS to the fully managed EMR on EKS is easy and straightforward, with minimum changes required. Because the self-contained Spark application remains the same, only the deployment implementation differs.

Conclusion

This post introduces Amazon EMR on Amazon EKS and provides a walkthrough of a sample solution to demonstrate the “ETL as definition” concept. A declarative data processing framework enables you to build and deploy your Spark workloads with enhanced efficiency. With EMR on EKS, running applications built upon a declarative framework maximizes data process productivity, performance, reliability, and availability at scale. This pattern abstracts Spark technology away from you, and helps you to focus on deliverables that optimize business outcomes.

The built-in optimizations provided by the managed EMR on EKS can help not only data engineers with analytical skills, but also analysts, data scientists, and any SQL authors who can fully express their data workflows declaratively in Spark SQL. You can use this architectural pattern to drive your data ownership shift in your organization, from IT to non-IT stakeholders who have a better understanding of business operations and needs.


About the Authors

Melody Yang is a Senior Analytics Specialist Solution Architect at AWS with expertise in Big Data technologies. She is an experienced analytics leader working with AWS customers to provide best practice guidance and technical advice in order to assist their success in data transformation. Her areas of interests are open-source frameworks and automation, data engineering and DataOps.

 

 

Shiva Achari is a Senior Data Lab Architect at AWS. He helps AWS customers to design and build data and analytics prototypes via the AWS Data Lab engagement. He has over 14 years of experience working with enterprise customers and startups primarily in the Data and Big Data Analytics space.

 

 

Daniel Maldonado is an AWS Solutions Architect, specialist in Microsoft workloads and Big Data technologies, focused on helping customers to migrate their applications and data to AWS. Daniel has over 12 years of experience working with Information Technologies and enjoys helping clients reap the benefits of running their workloads in the cloud.

 

 

Igor Izotov is an AWS enterprise solutions architect, and he works closely with Australia’s largest financial services organizations. Prior to AWS, Igor held solution architecture and engineering roles with tier-1 consultancies and software vendors. Igor is passionate about all things data and modern software engineering. Outside of work, he enjoys writing and performing music, a good audiobook, or a jog, often combining the latter two.

How to Accelerate Building a Lake House Architecture with AWS Glue

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/how-to-accelerate-building-a-lake-house-architecture-with-aws-glue/

Customers are building databases, data warehouses, and data lake solutions in isolation from each other, each having its own separate data ingestion, storage, management, and governance layers. Often these disjointed efforts to build separate data stores end up creating data silos, data integration complexities, excessive data movement, and data consistency issues. These issues are preventing customers from getting deeper insights. To overcome these issues and easily move data around, a Lake House approach on AWS was introduced.

In this blog post, we illustrate the AWS Glue integration components that you can use to accelerate building a Lake House architecture on AWS. We will also discuss how to derive persona-centric insights from your Lake House using AWS Glue.

Components of the AWS Glue integration system

AWS Glue is a serverless data integration service that facilitates the discovery, preparation, and combination of data. It can be used for analytics, machine learning, and application development. AWS Glue provides all of the capabilities needed for data integration. So you can start analyzing your data and putting it to use in minutes, rather than months.

The following diagram illustrates the various components of the AWS Glue integration system.

Figure 1. AWS Glue integration components

Figure 1. AWS Glue integration components

Connect – AWS Glue allows you to connect to various data sources anywhere

Glue connector: AWS Glue provides built-in support for the most commonly used data stores. You can use Amazon Redshift, Amazon RDS, Amazon Aurora, Microsoft SQL Server, MySQL, MongoDB, or PostgreSQL using JDBC connections. AWS Glue also allows you to use custom JDBC drivers in your extract, transform, and load (ETL) jobs. For data stores that are not natively supported such as SaaS applications, you can use connectors. You can also subscribe to several connectors offered in the AWS Marketplace.

Glue crawlers: You can use a crawler to populate the AWS Glue Data Catalog with tables. A crawler can crawl multiple data stores in a single pass. Upon completion, the crawler creates or updates one or more tables in your Data Catalog. Extract, transform, and load (ETL) jobs that you define in AWS Glue use these Data Catalog tables as sources and targets.

Catalog – AWS Glue simplifies data discovery and governance

Glue Data Catalog: The Data Catalog serves as the central metadata catalog for the entire data landscape.

Glue Schema Registry: The AWS Glue Schema Registry allows you to centrally discover, control, and evolve data stream schemas. With AWS Glue Schema Registry, you can manage and enforce schemas on your data streaming applications.

Data quality – AWS Glue helps you author and monitor data quality rules

Glue DataBrew: AWS Glue DataBrew allows data scientists and data analysts to clean and normalize data. You can use a visual interface, reducing the time it takes to prepare data by up to 80%. With Glue DataBrew, you can visualize, clean, and normalize data directly from your data lake, data warehouses, and databases.

Curate data: You can use either Glue development endpoint or AWS Glue Studio to curate your data.

AWS Glue development endpoint is an environment that you can use to develop and test your AWS Glue scripts. You can choose either Amazon SageMaker notebook or Apache Zeppelin notebook as an environment.

AWS Glue Studio is a new visual interface for AWS Glue that supports extract-transform-and-load (ETL) developers. You can author, run, and monitor AWS Glue ETL jobs. You can now use a visual interface to compose jobs that move and transform data, and run them on AWS Glue.

AWS Data Exchange makes it easy for AWS customers to securely exchange and use third-party data in AWS. This is for data providers who want to structure their data across multiple datasets or enrich their products with additional data. You can publish additional datasets to your products using the AWS Data Exchange.

Deequ is an open-source data quality library developed internally at Amazon, for data quality. It provides multiple features such as automatic constraint suggestions and verification, metrics computation, and data profiling.

Build a Lake House architecture faster, using AWS Glue

Figure 2 illustrates how you can build a Lake House using AWS Glue components.

Figure 2. Building lake house architectures with AWS Glue

Figure 2. Building Lake House architectures with AWS Glue

The architecture flow follows these general steps:

  1. Glue crawlers scan the data from various data sources and populate the Data Catalog for your Lake House.
  2. The Data Catalog serves as the central metadata catalog for the entire data landscape.
  3. Once data is cataloged, fine-grained access control is applied to the tables through AWS Lake Formation.
  4. Curate your data with business and data quality rules by using Glue Studio, Glue development endpoints, or Glue DataBrew. Place transformed data in a curated Amazon S3 for purpose built analytics downstream.
  5. Facilitate data movement with AWS Glue to and from your data lake, databases, and data warehouse by using Glue connections. Use AWS Glue Elastic views to replicate the data across the Lake House.

Derive persona-centric insights from your Lake House using AWS Glue

Many organizations want to gather observations from increasingly larger volumes of acquired data. These insights help them make data-driven decisions with speed and agility. They must use a central data lake, a ring of purpose-built data services, and data warehouses based on persona or job function.

Figure 3 illustrates the Lake House inside-out data movement with AWS Glue DataBrew, Amazon Athena, Amazon Redshift, and Amazon QuickSight to perform persona-centric data analytics.

Figure 3. Lake house persona-centric data analytics using AWS Glue

Figure 3. Lake House persona-centric data analytics using AWS Glue

This shows how Lake House components serve various personas in an organization:

  1. Data ingestion: Data is ingested to Amazon Simple Storage Service (S3) from different sources.
  2. Data processing: Data curators and data scientists use DataBrew to validate, clean, and enrich the data. Amazon Athena is also used to run improvised queries to analyze the data in the lake. The transformation is shared with data engineers to set up batch processing.
  3. Batch data processing: Data engineers or developers set up batch jobs in AWS Glue and AWS Glue DataBrew. Jobs can be initiated by an event, or can be scheduled to run periodically.
  4. Data analytics: Data/Business analysts can now analyze prepared dataset in Amazon Redshift or in Amazon S3 using Athena.
  5. Data visualizations: Business analysts can create visuals in QuickSight. Data curators can enrich data from multiple sources. Admins can enforce security and data governance. Developers can embed QuickSight dashboard in applications.

Conclusion

Using a Lake House architecture will help you get persona-centric insights quickly from all of your data based on user role or job function. In this blog post, we describe several AWS Glue components and AWS purpose-built services that you can use to build Lake House architectures on AWS. We have also presented persona-centric Lake House analytics architecture using AWS Glue, to help you derive insights from your Lake House.

Read more and get started on building Lake House Architectures on AWS.

Field Notes: Building an automated scene detection pipeline for Autonomous Driving – ADAS Workflow

Post Syndicated from Kevin Soucy original https://aws.amazon.com/blogs/architecture/field-notes-building-an-automated-scene-detection-pipeline-for-autonomous-driving/

This Field Notes blog post in 2020 explains how to build an Autonomous Driving Data Lake using this Reference Architecture. Many organizations face the challenge of ingesting, transforming, labeling, and cataloging massive amounts of data to develop automated driving systems. In this re:Invent session, we explored an architecture to solve this problem using Amazon EMR, Amazon S3, Amazon SageMaker Ground Truth, and more. You learn how BMW Group collects 1 billion+ km of anonymized perception data from its worldwide connected fleet of customer vehicles to develop safe and performant automated driving systems.

Architecture Overview

The objective of this post is to describe how to design and build an end-to-end Scene Detection pipeline which:

This architecture integrates an event-driven ROS bag ingestion pipeline running Docker containers on Elastic Container Service (ECS). This includes a scalable batch processing pipeline based on Amazon EMR and Spark. The solution also leverages AWS Fargate, Spot Instances, Elastic File System, AWS Glue, S3, and Amazon Athena.

reference architecture - build automated scene detection pipeline - Autonomous Driving

Figure 1 – Architecture Showing how to build an automated scene detection pipeline for Autonomous Driving

The data included in this demo was produced by one vehicle across four different drives in the United States. As the ROS bag files produced by the vehicle’s on-board software contains very complex data, such as Lidar Point Clouds, the files are usually very large (1+TB files are not uncommon).

These files usually need to be split into smaller chunks before being processed, as is the case in this demo. These files also may need to have post-processing algorithms applied to them, like lane detection or object detection.

In our case, the ROS bag files are split into approximately 10GB chunks and include topics for post-processed lane detections before they land in our S3 bucket. Our scene detection algorithm assumes the post processing has already been completed. The bag files include object detections with bounding boxes, and lane points representing the detected outline of the lanes.

Prerequisites

This post uses an AWS Cloud Development Kit (CDK) stack written in Python. You should follow the instructions in the AWS CDK Getting Started guide to set up your environment so you are ready to begin.

You can also use the config.json to customize the names of your infrastructure items, to set the sizing of your EMR cluster, and to customize the ROS bag topics to be extracted.

You will also need to be authenticated into an AWS account with permissions to deploy resources before executing the deploy script.

Deployment

The full pipeline can be deployed with one command: * `bash deploy.sh deploy true` . The progress of the deployment can be followed on the command line, but also in the CloudFormation section of the AWS console. Once deployed, the user must upload 2 or more bag files to the rosbag-ingest bucket to initiate the pipeline.

The default configuration requires two bag files to be processed before an EMR Pipeline is initiated. You would also have to manually initiate the AWS  Glue Crawler to be able to explore the parquet data with tools like Athena or Quicksight.

ROS bag ingestion with ECS Tasks, Fargate, and EFS

This solution provides an end-to-end scene detection pipeline for ROS bag files, ingesting the ROS bag files from S3, and transforming the topic data to perform scene detection in PySpark on EMR. This then exposes scene descriptions via DynamoDB to downstream consumers.

The pipeline starts with an S3 bucket (Figure 1 – #1) where incoming ROS bag files can be uploaded from local copy stations as needed. We recommend, using Amazon Direct Connect for a private, high-throughout connection to the cloud.

This ingestion bucket is configured to initiate S3 notifications each time an object ending in the prefix “.bag” is created. An AWS Lambda function then initiates a Step Function for orchestrating the ECS Task. This passes the bucket and bag file prefix to the ECS task as environment variables in the container.

The ECS Task (Figure 1 – #2) runs serverless leveraging Fargate as the capacity provider, This avoids the need to provision and autoscale EC2 instances in the ECS cluster. Each ECS Task processes exactly one bag file. We use Elastic FileStore to provide virtually unlimited file storage to the container, in order to easily work with larger bag files. The container uses the open-source bagpy python library to extract structured topic data (for example, GPS, detections, inertial measurement data,). The topic data is uploaded as parquet files to S3, partitioned by topic and source bag file. The application writes metadata about each file, such as the topic names found in the file and the number of messages per topic, to a DynamoDB table (Figure 1 – #4).

This module deploys an AWS  Glue Crawler configured to crawl this bucket of topic parquet files. These files populate the AWS Glue Catalog with the schemas of each topic table and make this data accessible in Athena, Glue jobs, Quicksight, and Spark on EMR.  We use the AWS Glue Catalog (Figure 1 – #5) as a permanent Hive Metastore.

Glue Data Catalog of parquet datasets on S3

Figure 2 – Glue Data Catalog of parquet datasets on S3

 

Run ad-hoc queries against the Glue tables using Amazon Athena

Figure 3 – Run ad-hoc queries against the Glue tables using Amazon Athena

The topic parquet bucket also has an S3 Notification configured for all newly created objects, which is consumed by an EMR-Trigger Lambda (Figure 1 – #5). This Lambda function is responsible for keeping track of bag files and their respective parquet files in DynamoDB (Figure 1 – #6). Once in DynamoDB, bag files are assigned to batches, initiating the EMR batch processing step function. Metadata is stored about each batch including the step function execution ARN in DynamoDB.

EMR pipeline orchestration with AWS Step Functions

Figure 4 – EMR pipeline orchestration with AWS Step Functions

The EMR batch processing step function (Figure 1 – #7) orchestrates the entire EMR pipeline, from provisioning an EMR cluster using the open-source EMR-Launch CDK library to submitting Pyspark steps to the cluster, to terminating the cluster and handling failures.

Batch Scene Analytics with Spark on EMR

There are two PySpark applications running on our cluster. The first performs synchronization of ROS bag topics for each bagfile. As the various sensors in the vehicle have different frequencies, we synchronize the various frequencies to a uniform frequency of 1 signal per 100 ms per sensor. This makes it easier to work with the data.

We compute the minimum and maximum timestamp in each bag file, and construct a unified timeline. For each 100 ms we take the most recent signal per sensor and assign it to the 100 ms timestamp. After this is performed, the data looks more like a normal relational table and is easier to query and analyze.

Batch Scene Analytics with Spark on EMR

Figure 5 – Batch Scene Analytics with Spark on EMR

Scene Detection and Labeling in PySpark

The second spark application enriches the synchronized topic dataset (Figure 1 – #8), analyzing the detected lane points and the object detections. The goal is to perform a simple lane assignment algorithm for objects detected by the on-board ML models and to save this enriched dataset (Figure 1 – #9) back to S3 for easy-access by analysts and data scientists.

Object Lane Assignment Example

Figure 9 – Object Lane Assignment example

 

Synchronized topics enriched with object lane assignments

Figure 9 – Synchronized topics enriched with object lane assignments

Finally, the last step takes this enriched dataset (Figure 1 – #9) to summarize specific scenes or sequences where a person was identified as being in a lane. The output of this pipeline includes two new tables as parquet files on S3 – the synchronized topic dataset (Figure 1 – #8) and the synchronized topic dataset enriched with object lane assignments (Figure 1 – #9), as well as a DynamoDB table with scene metadata for all person-in-lane scenarios (Figure 1 – #10).

Scene Metadata

The Scene Metadata DynamoDB table (Figure 1 – #10) can be queried directly to find sequences of events, as will be covered in a follow up post for visually debugging scene detection algorithms using WebViz/RViz. Using WebViz, we were able to detect that the on-board object detection model labels Crosswalks and Walking Signs as “person” even when a person is not crossing the street, for example:

Example DynamoDB item from the Scene Metadata table

Example DynamoDB item from the Scene Metadata table

Figure 10 – Example DynamoDB item from the Scene Metadata table

These scene descriptions can also be converted to Open Scenario format and pushed to an ElasticSearch cluster to support more complex scenario-based searches. For example, downstream simulation use cases or for visualization in QuickSight. An example of syncing DynamoDB tables to ElasticSearch using DynamoDB streams and Lambda can be found here (https://aws.amazon.com/blogs/compute/indexing-amazon-dynamodb-content-with-amazon-elasticsearch-service-using-aws-lambda/). As DynamoDB is a NoSQL data store, we can enrich the Scene Metadata table with scene parameters. For example, we can identify the maximum or minimum speed of the car during the identified event sequence, without worrying about breaking schema changes. It is also straightforward to save a dataframe from PySpark to DynamoDB using open-source libraries.

As a final note, the modules are built to be exactly that, modular. The three modules that are easily isolated are:

  1. the ECS Task pipeline for extracting ROS bag topic data to parquet files
  2. the EMR Trigger Lambda for tracking incoming files, creating batches, and initiating a batch processing step function
  3. the EMR Pipeline for running PySpark applications leveraging Step Functions and EMR Launch

Clean Up

To clean up the deployment, you can run bash deploy.sh destroy false. Some resources like S3 buckets and DynamoDB tables may have to be manually emptied and deleted via the console to be fully removed.

Limitations

The bagpy library used in this pipeline does not yet support complex or non-structured data types like images or LIDAR data. Therefore its usage is limited to data that can be stored in a tabular csv format before being converted to parquet.

Conclusion

In this post, we showed how to build an end-to-end Scene Detection pipeline at scale on AWS to perform scene analytics and scenario detection with Spark on EMR from raw vehicle sensor data. In a subsequent blog post, we will cover how how to extract and catalog images from ROS bag files, create a labelling job with SageMaker GroundTruth and then train a Machine Learning Model to detect cars.

Recommended Reading: Field Notes: Building an Autonomous Driving and ADAS Data Lake on AWS

Visualize data using Apache Spark running on Amazon EMR with Amazon QuickSight

Post Syndicated from Tom McMeekin original https://aws.amazon.com/blogs/big-data/visualize-data-using-apache-spark-running-on-amazon-emr-with-amazon-quicksight/

Organizations often need to process large volumes of data before serving to business stakeholders. In this blog, we will learn how to leverage Amazon EMR to process data using Apache Spark, the go-to platform for in-memory analytics of large data volume, and connect business intelligence (BI) tool Amazon QuickSight to serve data to end-users.

QuickSight is a fast, cloud-powered BI service that makes it easy to build visualizations, perform ad hoc analysis, and quickly get business insights from your data. With our cloud-based service, you can easily connect to your data, perform advanced analysis, and create stunning visualizations and rich dashboards that can be accessed from any browser or mobile device.

QuickSight supports connectors for big data analytics using Spark. With the SparkSQL connector in QuickSight, you can easily create interactive visualizations over large datasets using Amazon EMR. Amazon EMR provides a simple and cost-effective way to run highly distributed processing frameworks such as Spark.

In this post, we use the public data set, New York City Taxi and Limousine Commission (TLC) Trip Record Data, which contains data of trips taken by taxis and for-hire vehicles in New York City. We use an optimized Parquet version of the CSV public dataset available from the Registry of Open Data on AWS.

This post also explores how to use AWS Glue to create the Data Catalog by crawling the NYC taxi data in an Amazon Simple Storage Service (Amazon S3) bucket, making it immediately query able for analyzing. AWS Glue is a fully managed extract, transform, and load (ETL) service that makes it easy to prepare and load your data for analytics. To learn more about how to use AWS Glue to transform a dataset from CSV to Parquet, see Harmonize, Query, and Visualize Data from Various Providers using AWS Glue, Amazon Athena, and Amazon QuickSight.

Prerequisites

The following steps assume that you have a VPC with public and private subnets, with NAT configured for private subnets and an available S3 bucket for Amazon EMR logging.

If you create the EMR cluster in a private subnet, you can use AWS Systems Manager Session Manager, a bastion host, or a VPN connection to access the EMR cluster. For this post, we use Session Manager to access our EMR cluster.

In QuickSight Enterprise edition, you can create connections to your VPCs from your QuickSight account. Each connection creates an elastic network interface in your VPC for QuickSight to send traffic to instances in your VPC. For more information, see Connecting to a VPC with Amazon QuickSight. If you haven’t already signed up for QuickSight, you can sign up before getting started. QuickSight offers a free trial so you can try out this solution at no cost.

The following AWS CloudFormation template offers single-click deployment.

We use the US East (N. Virginia) Region as the default; we highly recommended you launch the stack to optimize querying the public dataset in Amazon S3. You can change to any Region that supports Amazon EMR, AWS Glue, and QuickSight, but it may impact the time it takes to query the data.

If deploying into production, we recommend you secure communication by using the Configure SSL with a QuickSight supported authority step after deployment of the CloudFormation template to enable SSL.

Solution overview

We walk you through the following steps:

  1. Deploy and configure Amazon EMR with a CloudFormation template.
  2. Run AWS Glue crawlers to crawl and populate the Hive-compatible metastore.
  3. Test JDBC connectivity using Beeline.
  4. Visualize the data with QuickSight.

The CloudFormation template provided in the prerequisites, provides a configured Amazon EMR cluster for you to start querying your data with Spark. After deploying the CloudFormation stack, you can skip the first step and start running the AWS Glue crawlers.

Deploy and Configure Amazon EMR

Those looking to dive deep to understand what the CloudFormation template is deploying can use the following steps to manually deploy Amazon EMR running Spark and connect it to QuickSight:

  1. Create an EMR cluster with 5.30.0 or later release.
  2. Connect to the cluster using Session Manager.
  3. Install and configure OpenLDAP.
  4. Create a user in LDAP.
  5. Start the Thrift server.
  6. Configure SSL using a QuickSight supported authority.

Create an EMR cluster

For this post, we use an EMR cluster with 5.30.0 or later release.

  1. On the Amazon EMR console, choose Create cluster.
  2. For Cluster name, enter a name (for example, visualisedatablog).
  3. For Release, choose your release version.
  4. For Applications, select Spark.
  5. Select Use AWS Glue Data Catalog for table metadata.
  6. For Instance type¸ choose your instance.
  7. For EC2 key pair, choose Proceed without an EC2 key pair.
  8. Choose Create cluster.

Make sure you enabled run as support for Session Manager.

Connect to the EMR cluster using Session Manager

Session Manager is a fully managed AWS Systems Manager capability that lets you manage your Amazon Elastic Compute Cloud (Amazon EC2) instances, on-premises instances, and virtual machines through an interactive, one-click, browser-based shell or through the AWS Command Line Interface (AWS CLI). Session Manager provides secure and auditable instance management without the need to open inbound ports, maintain bastion hosts, or manage SSH keys.

By default, sessions are launched using the credentials of a system-generated ssm-user account that is created on a managed instance. For Amazon EMR, you can instead launch sessions using the Hadoop user account. Session Manager provides two methods for specifying the Hadoop user operating system account to use. For more information, see Enable run as support for Linux and macOS instances. For those configuring Systems Manager for the first time, review Why is my EC2 instance not appearing under Managed Instances in the Systems Manager console? for helpful tips on getting started with adding managed instances.

After you log in to the primary node of your cluster, run the following commands to install and configure OpenLDAP.

Install and configure OpenLDAP

To install and configure OpenLDAP, complete the following steps (alternatively, you can download the script used in the CloudFormation script and run it):

  1. Run the following commands:
# Install LDAP Server
sudo yum -y install openldap compat-openldap openldap-clients openldap-servers openldap-servers-sql openldap-devel
# Restart LDAP 
sudo service slapd restart

For more about configuring OpenLDAP, see the OpenLDAP documentation.

  1. Run the following command to set a new password for the root account and store the resulting hash:
slappasswd

This command outputs a hash that looks like the following sample:

{SSHA}DmD616c3yZyKndsccebZK/vmWiaQde83
  1. Copy the hash output to a text editor to use in subsequent steps.

Next, we prepare the commands to set the password for the LDAP root.

  1. Run the following code (replace the hash with the one you generated in the previous step, and make sure carriage returns are preserved):
cat > /tmp/config.ldif <<EOF
dn: olcDatabase={2}hdb,cn=config
changetype: modify
replace: olcSuffix
olcSuffix: dc=example,dc=com

dn: olcDatabase={2}hdb,cn=config
changetype: modify
replace: olcRootDN
olcRootDN: cn=dev,dc=example,dc=com

dn: olcDatabase={2}hdb,cn=config
changetype: modify
replace: olcRootPW
olcRootPW: <<REPLACE_WITH_PASSWORD_HASH>>
EOF
  1. Run the following command to run the preceding commands against LDAP:
sudo ldapmodify -Y EXTERNAL -H ldapi:/// -f /tmp/config.ldif
  1. Copy the sample database configuration file to /var/lib/ldap and add relevant schemas:
sudo cp /usr/share/openldap-servers/DB_CONFIG.example /var/lib/ldap/DB_CONFIG

sudo ldapadd -Y EXTERNAL -H ldapi:/// -f /etc/openldap/schema/cosine.ldif
sudo ldapadd -Y EXTERNAL -H ldapi:/// -f /etc/openldap/schema/nis.ldif 
sudo ldapadd -Y EXTERNAL -H ldapi:/// -f /etc/openldap/schema/inetorgperson.ldif

Create a user in LDAP

Next, create a user account with a password in the LDAP directory with the following commands. When prompted for a password, use the LDAP root password that you created in the previous step (for this post, we use sparky as the username). Make sure carriage returns are preserved when copying and entering the code.

cat > /tmp/accounts.ldif <<EOF 
dn: dc=example,dc=com
objectclass: domain
objectclass: top
dc: example

dn: ou=dev,dc=example,dc=com
objectclass: organizationalUnit
ou: dev
description: Container for developer entries

dn: uid=$username,ou=dev,dc=example,dc=com
uid: $username
objectClass: inetOrgPerson
userPassword: <<REPLACE_WITH_STRONG_PASSWORD>>
sn: sparky
cn: dev
EOF

Run the following command to run the preceding commands against LDAP (you must enter the root LDAP password specified in the previous section):

sudo ldapadd -x -w <<LDAP_ROOT_PASSWORD>> -D "cn=dev,dc=example,dc=com" -f /tmp/accounts.ldif

We have now configured OpenLDAP on the EMR cluster running Spark and created the user sparky that we use to connect to QuickSight.

Start the Thrift server

Start the Thrift server by running the following command. By default, the Thrift server runs on port 10001. Amazon EMR by default places limits on executor sizes to avoid having the executor consume too much memory and interfere with the operating system and other processes running on the instance. To optimize the use of R family instances with the flexibility of also using the smallest supported instance types, we use –executor-memory=18GB –executor-cores=4 for our Thrift server configuration. See the following code:

sudo /usr/lib/spark/sbin/start-thriftserver.sh --master yarn –executor-memory=18GB –executor-cores=4

Now that we have configured the EMR cluster to accept connections, let’s get our public dataset ready.

Configure SSL using a QuickSight supported authority

If deploying into production, we recommend using a secure communication between QuickSight and Spark. QuickSight doesn’t accept certificates that are self-signed or issued from a non-public CA. For more information, see Amazon QuickSight SSL and CA Certificates. To secure the Thrift connection, you can enable the SSL encryption and restart the hive-server2 and Thrift service on the primary EMR instance.

After you have your certificate, you can enable SSL.

In your preferred editor, open and edit /etc/hive/conf/hive-site.xml:

    <property>
        <name>hive.server2.thrift.bind.host</name>
        <value>HOSTNAME</value>
    </property>
    <property>
        <name>hive.server2.use.SSL</name>
        <value>true</value>
    </property>
    <property>
        <name>hive.server2.keystore.path</name>
        <value>PATH_TO_KEYSTORE/KEYSTORE/KEYSTORE.jks</value>
    </property>
    <property>
        <name>hive.server2.keystore.password</name>
        <value>KEYSTORE_PASSWORD</value>
    </property>

Restart the Thrift server by running the following command:

sudo /usr/lib/spark/sbin/stop-thriftserver.sh --master yarn && sudo /usr/lib/spark/sbin/start-thriftserver.sh --master yarn

Run AWS Glue crawlers

Now let’s use AWS Glue crawlers to detect the schema. If you used the CloudFormation template, you already have a crawler ready to start via the AWS Glue console. When the crawler is complete, you should have a table listed in the database.

If you’re configuring the crawler manually on the AWS Glue console, the following screenshot summarizes the crawler configuration.

After the crawler has run, you can go to the Tables page to view the taxi_ny_pub table with the table properties and schema. The following screenshot shows the table details page; here you can find the partitions and various versions of the schema.

The Data Catalog is shared between Amazon EMR, Amazon Athena, and Amazon Redshift Spectrum. You can use Athena to preview the data that is stored in this table.

Test JDBC connectivity using Beeline

Now that the EMR cluster is deployed and the data is copied, we can quickly test the JDBC connectivity on the EMR cluster using Beeline. Beeline is an open-source JDBC client, based on the SQLLine CLI, used to connect to your cluster via the command line.

Log in to your EMR cluster using Session Manager. You can use Beeline to connect to the Thrift server and test the connection:

/usr/lib/spark/bin/beeline -u 'jdbc:hive2://<REPLACE_MASTER_PUBLIC_DNS>:10001/default' -n <<USERNAME>> -p <<PASSWORD>> -e "show databases;" 

The preceding command connects to the Spark cluster and shows you the list of databases, as in the following example code:

Connected to: Spark SQL (version 2.3.0) 
Driver: Hive JDBC (version 1.2.1-spark2-amzn-0) 
Transaction isolation: TRANSACTION_REPEATABLE_READ 
+---------------+--+ 
| databaseName | 
+---------------+--+ 
| default | 
| nyc_taxi |
| sampledb | 
+---------------+--+ 
3 rows selected (0.171 seconds) 
Beeline version 1.2.1-spark2-amzn-0 by Apache Hive 
Closing: 0: 
jdbc:hive2://<REPLACE_MASTER_PUBLIC_DNS>:10001/default

Visualize data with QuickSight

Now let’s connect Amazon EMR to QuickSight and do a quick visualization of this data.

  1. On the QuickSight console, on the Datasets page, choose New dataset.
  2. Choose Spark as your connector.

  1. For Data source name, enter a name (for example, SPARKY).
  2. For Database server, enter your public primary DNS.

To allow QuickSight to connect to your EMR cluster, you must create a security group containing an inbound rule authorizing access from the appropriate IP address range for the QuickSight servers in that Region. For further details on how to create appropriate security group rules, see Authorizing Connections from Amazon QuickSight to Amazon EC2 Instances.

For this post, we use security groups to control network connectivity.

  1. For Port, add TCP port 10001 as an inbound rule to allow for inbound connectivity from QuickSight to Amazon EMR.

If deploying into production, we recommend using a secure communication between QuickSight and Spark, which we covered in a previous step.

QuickSight Enterprise edition provides full integration with Amazon Virtual Private Cloud (Amazon VPC), which enables you to secure and isolate traffic between resources. For more information, see Connecting to a VPC with Amazon QuickSight. This allows you to deploy your EMR cluster in a private VPC Subnet.

  1. Enter a username and password.
  2. If you configured SSL, select Enable SSL.
  3. Choose Create data source.

The Spark cluster reads the Data Catalog and provides information about the schema and the tables in the schema. You can also choose the table created by the AWS Glue crawler and load the data into SPICE for faster analytics. SPICE is the in-memory calculation engine in QuickSight that provides blazing fast performance at scale. SPICE automatically replicates data for high availability, allowing thousands of users to simultaneously perform fast, interactive analysis, while shielding your underlying data infrastructure, which saves you time and resources. QuickSight supports uploading 250 million rows (and 500 GB) per SPICE dataset. If you have larger datasets than this, you can use the direct query option. In this post, we use SPICE.

Also make sure that you defined the correct permissions to access the S3 bucket for the EMR cluster. For instructions, see Reading and Writing Data to Amazon S3 Using EMRFS.

Let’s create a custom SQL query to perform some aggregations prior to loading into SPICE (see the following screenshot).

  1. Enter the following code:
SELECT 
SUM (cast (fare_amount as double)) as TotalFare 
,AVG(cast (fare_amount as double)) as AvgFare 
,AVG (cast (trip_distance as double)) as AvgTripDistance 
,AVG(passenger_count) as AvgPassengerCount 
,year 
,month
FROM nyc_taxi.taxi_ny_pub
WHERE year BETWEEN 2011 AND 2016
GROUP BY year, month;

The database and table names may vary in your deployment.

  1. For this post, select Import to SPICE for quicker analytics.

Alternatively, because the NYC taxi dataset is larger than 250 million rows, you can choose to directly query your data.

  1. To create a visualization, select the fields in the left panel.

For this post, we review the Average Fare Amount and Passenger Count between 2013–2019, using ML Insights to automatically generate natural language narratives when analyzing the 229.12 GB dataset.

Summary

In less than an hour, we created an EMR cluster, enabled OpenLDAP, and started the Thrift server. We also used AWS Glue to crawl a public dataset and visualize the data. Now you have what you need to get started creating powerful dashboards and reports using QuickSight on your Amazon S3 data using Apache Spark. Feel free to reach out if you have any questions or suggestions.

To learn more about these capabilities and start using them in your dashboards, check out the QuickSight User Guide.

If you have questions and suggestions, you can post them on the QuickSight forum.

Go to the QuickSight website to get started now for free.


About the Author

Tom McMeekin is an Enterprise Solutions Architect with a career in technology spanning over 20 years. Tom has worked across a number of industry verticals including Telecommunications, Manufacturing, Infrastructure and Development, Utilities, Energy, and Retail. Throughout his career, he has focused on solving complex business problems through innovative technologies that deliver the right business outcomes for his customers.

 

ERGO Breaks New Frontiers for Insurance with AI Factory on AWS

Post Syndicated from Piotr Klesta original https://aws.amazon.com/blogs/architecture/ergo-breaks-new-frontiers-for-insurance-with-ai-factory-on-aws/

This post is co-authored with Piotr Klesta, Robert Meisner and Lukasz Luszczynski of ERGO

Artificial intelligence (AI) and related technologies are already finding applications in our homes, cars, industries, and offices. The insurance business is no exception to this. When AI is implemented correctly, it adds a major competitive advantage. It enhances the decision-making process, improves efficiency in operations, and provides hassle-free customer assistance.

At ERGO Group, we realized early on that innovation using AI required more flexibility in data integration than most of our legacy data architectures allowed. Our internal governance, data privacy processes, and IT security requirements posed additional challenges towards integration. We had to resolve these issues in order to use AI at the enterprise level, and allow for sensitive data to be used in a cloud environment.

We aimed for a central system that introduces ‘intelligence’ into other core application systems, and thus into ERGO’s business processes. This platform would support the process of development, training, and testing of complex AI models, in addition to creating more operational efficiency. The goal of the platform is to take the undifferentiated heavy lifting away from our data teams so that they focus on what they do best – harness data insights.

Building ERGO AI Factory to power AI use cases

Our quest for this central system led to the creation of AI Factory built on AWS Cloud. ERGO AI Factory is a compliant platform for running production-ready AI use cases. It also provides a flexible model development and testing environment. Let’s look at some of the capabilities and services we offer to our advanced analytics teams.

Figure 1: AI Factory imperatives

Figure 1: AI Factory imperatives

  • Compliance: Enforcing security measures (for example, authentication, encryption, and least privilege) was one of our top priorities for the platform. We worked closely with the security teams to meet strict domain and geo-specific compliance requirements.
  • Data governance: Data lineage and deep metadata extraction are important because they support proper data governance and auditability. They also allow our users to navigate a complex data landscape. Our data ingestion frameworks include a mixture of third party and AWS services to capture and catalog both technical and business metadata.
  • Data storage and access: AI Factory stores data in Amazon Simple Storage Service (S3) in a secure and compliant manner. Access rights are only granted to individuals working on the corresponding projects. Roles are defined in Active Directory.
  • Automated data pipelines: We sought to provide a flexible and robust data integration solution. An ETL pipeline using Apache Spark, Apache Airflow, and Kubernetes pods is central to our data ingestion. We use this for AI model development and subsequent data preparation for operationalization and model integration.
  • Monitoring and security: AI Factory relies on open-source cloud monitoring solutions like Grafana to detect security threats and anomalies. It does this by collecting service and application logs, tracking metrics, and generating alarms.
  • Feedback loop: We store model inputs/outputs and use BI tools, such as Amazon QuickSight, to track the behavior and performance of productive AI models. It’s important to share such information with our business partners so we can build their trust and confidence with AI.
  • Developer-friendly environment: Creating AI models is possible in a notebook-style or integrated development environment. Because our data teams use a variety of machine learning (ML) frameworks and libraries, we keep our platform extensible and our framework agnostic. We support Python/R, Apache Spark, PyTorch and TensorFlow, and more. All this is bolstered by CI/CD processes that accelerate delivery and reduce errors.
  • Business process integration: AI Factory offers services to integrate ML models into existing business processes. We focus on standardizing processes and close collaboration with business and technical stakeholders. Our overarching goal is to operationalize the AI model in the shortest possible timeframe, while preserving high quality and security standards.

AI Factory architecture

So far, we have looked at the functional building blocks of the AI Factory. Let’s take an architectural view of the platform using a five-step workflow:

Figure 2: AI Factory high-level architecture

Figure 2: AI Factory high-level architecture

  1. Data ingestion environment: We use this environment to ingest data from the prominent on-premises ERGO data sources. We can schedule the batch or Delta transfer data to various cloud destinations using multiple Kubernetes-hosted microservices. Once ingested, data is persisted and cataloged as ERGO’s data lake on Amazon S3. It is prepared for processing by the upstream environments.
  2. Model development environment: This environment is used primarily by data scientists and data engineers. We use Amazon EMR and Amazon SageMaker extensively for data preparation, data wrangling, experimentation with predictive models, and development through rapid iterations.
  3. Model operationalization environment: Trained models with satisfactory KPIs are promoted from the model development to the operationalization environment. This is where we integrate AI models in business processes. The team focuses on launching and optimizing the operation of services and algorithms.
    • Integration with ERGO business processes is achieved using Kubernetes-hosted ‘Model Service.’ This allows us to infuse AI models provided by data scientists in existing business processes.
    • An essential part of model operationalization is to continuously monitor the quality of the deployed ML models using the ‘feedback loop service.’
  4. Model insights environment: This environment is used for displaying information about platform performance, processes, and analytical data. Data scientists use its services to check for unexpected bias or performance drifts that the model could exhibit. Feedback coming from the business through the “feedback loop service’ allows them to identify problems fast and retrain the model.
  5. Shared services: Though shown as the fifth step of the workflow, the shared services environment supports almost every step in the process. It provides common, shared components between different parts of the platform managing CI/CD and orchestration processes within the AI factory. Additional services like platform logging and monitoring, authentication, and metadata management are also delivered from the shared services environment.

A binding theme across the various subplatforms is that all provisioning and deployment activities are automated using Infrastructure as Code (IaC) practices. This reduces the potential for human error, provides architectural flexibility, and greatly speeds up software development and our infrastructure-related operations.

All components of the AI factory are run in the AWS Cloud and can be scaled and adapted as needed. The connection between model development and operationalization happens at well-defined interfaces to prevent unnecessary coupling of components.

Lessons learned

Security first

  • Align with security early and often
  • Understand all the regulatory obligations and document them as critical, non-functional requirements

Modular approach

  • Combine modern data science technology and professional IT with a cross-functional, agile way of working
  • Apply loosely coupled services with an API-first approach

Data governance

  • Tracking technical metadata is important but not sufficient, you need business attributes too
  • Determine data ownership in operational systems to map upstream data governance workflows
  • Establish solutions to data masking as the data moves across sub-platforms
  • Define access rights and permissions boundaries among various personas

FinOps strategy

  • Carefully track platform cost
  • Assign owners responsible for monitoring and cost improvements
  • Provide regular feedback to platform stakeholders on usage patterns and associated expenses

Working with our AWS team

  • Establish cadence for architecture review and new feature updates
  • Plan cloud training and enablement

The future for the AI factory

The creation of the AI Factory was an essential building block of ERGO’s strategy. Now we are ready to embrace the next chapter in our advanced analytics journey.

We plan to focus on important use cases that will deliver the highest business value. We want to make the AI Factory available to ERGO’s international subsidiaries. We are also enhancing and scaling its capabilities. We are creating an ‘analytical content hub’ based on automated text extraction, improving speech to text, and developing translation processes for all unstructured and semistructured data using AWS AI services.

Customize and Package Dependencies With Your Apache Spark Applications on Amazon EMR on Amazon EKS

Post Syndicated from Channy Yun original https://aws.amazon.com/blogs/aws/customize-and-package-dependencies-with-your-apache-spark-applications-on-amazon-emr-on-amazon-eks/

Last AWS re:Invent, we announced the general availability of Amazon EMR on Amazon Elastic Kubernetes Service (Amazon EKS), a new deployment option for Amazon EMR that allows customers to automate the provisioning and management of Apache Spark on Amazon EKS.

With Amazon EMR on EKS, customers can deploy EMR applications on the same Amazon EKS cluster as other types of applications, which allows them to share resources and standardize on a single solution for operating and managing all their applications. Customers running Apache Spark on Kubernetes can migrate to EMR on EKS and take advantage of the performance-optimized runtime, integration with Amazon EMR Studio for interactive jobs, integration with Apache Airflow and AWS Step Functions for running pipelines, and Spark UI for debugging.

When customers submit jobs, EMR automatically packages the application into a container with the big data framework and provides prebuilt connectors for integrating with other AWS services. EMR then deploys the application on the EKS cluster and manages running the jobs, logging, and monitoring. If you currently run Apache Spark workloads and use Amazon EKS for other Kubernetes-based applications, you can use EMR on EKS to consolidate these on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management.

Developers who run containerized, big data analytical workloads told us they just want to point to an image and run it. Currently, EMR on EKS dynamically adds externally stored application dependencies during job submission.

Today, I am happy to announce customizable image support for Amazon EMR on EKS that allows customers to modify the Docker runtime image that runs their analytics application using Apache Spark on your EKS cluster.

With customizable images, you can create a container that contains both your application and its dependencies, based on the performance-optimized EMR Spark runtime, using your own continuous integration (CI) pipeline. This reduces the time to build the image and helps predicting container launches for a local development or test.

Now, data engineers and platform teams can create a base image, add their corporate standard libraries, and then store it in Amazon Elastic Container Registry (Amazon ECR). Data scientists can customize the image to include their application specific dependencies. The resulting immutable image can be vulnerability scanned, deployed to test and production environments. Developers can now simply point to the customized image and run it on EMR on EKS.

Customizable Runtime Images – Getting Started
To get started with customizable images, use the AWS Command Line Interface (AWS CLI) to perform these steps:

  1. Register your EKS cluster with Amazon EMR.
  2. Download the EMR-provided base images from Amazon ECR and modify the image with your application and libraries.
  3. Publish your customized image to a Docker registry such as Amazon ECR and then submit your job while referencing your image.

You can download one of the following base images. These images contain the Spark runtime that can be used to run batch workloads using the EMR Jobs API. Here is the latest full image list available.

Release Label Spark Hadoop Versions Base Image Tag
emr-5.32.0-latest Spark 2.4.7 + Hadoop 2.10.1 emr-5.32.0-20210129
emr-5.33-latest Spark 2.4.7-amzn-1 + Hadoop 2.10.1-amzn-1 emr-5.33.0-20210323
emr-6.2.0-latest Spark 3.0.1 + Hadoop 3.2.1 emr-6.2.0-20210129
emr-6.3-latest Spark 3.1.1-amzn-0 + Hadoop 3.2.1-amzn-3 emr-6.3.0:latest

These base images are located in an Amazon ECR repository in each AWS Region with an image URI that combines the ECR registry account, AWS Region code, and base image tag in the case of US East (N. Virginia) Region.

755674844232.dkr.ecr.us-east-1.amazonaws.com/spark/emr-5.32.0-20210129

Now, sign in to the Amazon ECR repository and pull the image into your local workspace. If you want to pull an image from a different AWS Region to reduce network latency, choose the different ECR repository that corresponds most closely to where you are pulling the image from US West (Oregon) Region.

$ aws ecr get-login-password --region us-west-2 | docker login --username AWS --password-stdin 895885662937.dkr.ecr.us-west-2.amazonaws.com
$ docker pull 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-5.32.0-20210129

Create a Dockerfile on your local workspace with the EMR-provided base image and add commands to customize the image. If the application requires custom Java SDK, Python, or R libraries, you can add them to the image directly, just as with other containerized applications.

The following example Docker commands are for a use case in which you want to install useful Python libraries such as Natural Language Processing (NLP) using Spark and Pandas.

FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-5.32.0-20210129
USER root
### Add customizations here ####
RUN pip3 install pyspark pandas spark-nlp // Install Python NLP Libraries
USER hadoop:hadoop

In another use case, as I mentioned, you can install a different version of Java (for example, Java 11):

FROM 895885662937.dkr.ecr.us-west-2.amazonaws.com/spark/emr-5.32.0-20210129
USER root
### Add customizations here ####
RUN yum install -y java-11-amazon-corretto // Install Java 11 and set home
ENV JAVA_HOME /usr/lib/jvm/java-11-amazon-corretto.x86_64
USER hadoop:hadoop

If you’re changing Java version to 11, then you also need to change Java Virtual Machine (JVM) options for Spark. Provide the following options in applicationConfiguration when you submit jobs. You need these options because Java 11 does not support some Java 8 JVM parameters.

"applicationConfiguration": [ 
  {
    "classification": "spark-defaults",
    "properties": {
        "spark.driver.defaultJavaOptions" : "
		    -XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70",
        "spark.executor.defaultJavaOptions" : "
		    -verbose:gc -Xlog:gc*::time -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
			-XX:OnOutOfMemoryError='kill -9 %p' -XX:MaxHeapFreeRatio=70 
			-XX:+IgnoreUnrecognizedVMOptions"
    }
  }
]

To use custom images with EMR on EKS, publish your customized image and submit a Spark workload in Amazon EMR on EKS using the available Spark parameters.

You can submit batch workloads using your customized Spark image. To submit batch workloads using the StartJobRun API or CLI, use the spark.kubernetes.container.image parameter.

$ aws emr-containers start-job-run \
    --virtual-cluster-id <enter-virtual-cluster-id> \
    --name sample-job-name \
    --execution-role-arn <enter-execution-role-arn> \
    --release-label <base-release-label> \ # Base EMR Release Label for the custom image
    --job-driver '{
        "sparkSubmitJobDriver": {
        "entryPoint": "local:///usr/lib/spark/examples/jars/spark-examples.jar",
        "entryPointArguments": ["1000"],
        "sparkSubmitParameters": [ "--class org.apache.spark.examples.SparkPi --conf spark.kubernetes.container.image=123456789012.dkr.ecr.us-west-2.amazonaws.com/emr5.32_custom"
		  ]
      }
  }'

Use the kubectl command to confirm the job is running your custom image.

$ kubectl get pod -n <namespace> | grep "driver" | awk '{print $1}'
Example output: k8dfb78cb-a2cc-4101-8837-f28befbadc92-1618856977200-driver

Get the image for the main container in the Driver pod (Uses jq).

$ kubectl get pod/<driver-pod-name> -n <namespace> -o json | jq '.spec.containers
| .[] | select(.name=="spark-kubernetes-driver") | .image '
Example output: 123456789012.dkr.ecr.us-west-2.amazonaws.com/emr5.32_custom

To view jobs in the Amazon EMR console, under EMR on EKS, choose Virtual clusters. From the list of virtual clusters, select the virtual cluster for which you want to view logs. On the Job runs table, select View logs to view the details of a job run.

Automating Your CI Process and Workflows
You can now customize an EMR-provided base image to include an application to simplify application development and management. With custom images, you can add the dependencies using your existing CI process, which allows you to create a single immutable image that contains the Spark application and all of its dependencies.

You can apply your existing development processes, such as vulnerability scans against your Amazon EMR image. You can also validate for correct file structure and runtime versions using the EMR validation tool, which can be run locally or integrated into your CI workflow.

The APIs for Amazon EMR on EKS are integrated with orchestration services like AWS Step Functions and AWS Managed Workflows for Apache Airflow (MWAA), allowing you to include EMR custom images in your automated workflows.

Now Available
You can now set up customizable images in all AWS Regions where Amazon EMR on EKS is available. There is no additional charge for custom images. To learn more, see the Amazon EMR on EKS Development Guide and a demo video how to build your own images for running Spark jobs on Amazon EMR on EKS.

You can send feedback to the AWS forum for Amazon EMR or through your usual AWS support contacts.

Channy

Architecting Persona-centric Data Platform with On-premises Data Sources

Post Syndicated from Raghavarao Sodabathina original https://aws.amazon.com/blogs/architecture/architecting-persona-centric-data-platform-with-on-premises-data-sources/

Many organizations are moving their data from silos and aggregating it in one location. Collecting this data in a data lake enables you to perform analytics and machine learning on that data. You can store your data in purpose-built data stores, like a data warehouse, to get quick results for complex queries on structured data.

In this post, we show how to architect a persona-centric data platform with on-premises data sources by using AWS purpose-built analytics services and Apache NiFi. We will also discuss Lake House architecture on AWS, which is the next evolution from data warehouse and data lake-based solutions.

Data movement services

AWS provides a wide variety of services to bring data into a data lake:

You may want to bring on-premises data into the AWS Cloud to take advantage of AWS purpose-built analytics services, derive insights, and make timely business decisions. Apache NiFi is an open source tool that enables you to move and process data using a graphical user interface.

For this use case and solution architecture, we use Apache NiFi to ingest data into Amazon S3 and AWS purpose-built analytics services, based on user personas.

Building persona-centric data platform on AWS

When you are building a persona-centric data platform for analytics and machine learning, you must first identify your user personas. Who will be using your platform? Then choose the appropriate purpose-built analytics services. Envision a data platform analytics architecture as a stack of seven layers:

  1. User personas: Identify your user personas for data engineering, analytics, and machine learning
  2. Data ingestion layer: Bring the data into your data platform and data lineage lifecycle view, while ingesting data into your storage layer
  3. Storage layer: Store your structured and unstructured data
  4. Cataloging layer: Store your business and technical metadata about datasets from the storage layer
  5. Processing layer: Create data processing pipelines
  6. Consumption layer: Enable your user personas for purpose-built analytics
  7. Security and Governance: Protect your data across the layers

Reference architecture

The following diagram illustrates how to architect a persona-centric data platform with on-premises data sources by using AWS purpose-built analytics services and Apache NiFi.

Figure 1. Example architecture for persona-centric data platform with on-premises data sources

Figure 1. Example architecture for persona-centric data platform with on-premises data sources

Architecture flow:

    1. Identify user personas: You must first identify user personas to derive insights from your data platform. Let’s start with identifying your users:
      • Enterprise data service users who would like to consume data from your data lake into their respective applications.
      • Business users who would like to like create business intelligence dashboards by using your data lake datasets.
      • IT users who would like to query data from your data lake by using traditional SQL queries.
      • Data scientists who would like to run machine learning algorithms to derive recommendations.
      • Enterprise data warehouse users who would like to run complex SQL queries on your data warehouse datasets.
    2. Data ingestion layer: Apache NiFi scans the on-premises data stores and ingest the data into your data lake (Amazon S3). Apache NiFi can also transform the data in transit. It supports both Extract, Transform, Load (ETL) and Extract, Load, Transform (ELT) data transformations. Apache NiFi also supports data lineage lifecycle while ingesting data into Amazon S3.
    3. Storage layer: For your data lake storage, we recommend using Amazon S3 to build a data lake. It has unmatched 11 nines of durability and 99.99% availability. You can also create raw, transformed, and enriched storage layers depending upon your use case.
    4. Cataloging layer: AWS Lake Formation provides the central catalog to store and manage metadata for all datasets hosted in the data lake by AWS Glue Data Catalog. AWS services such as AWS Glue, Amazon EMR, and Amazon Athena natively integrate with Lake Formation. They automate discovering and registering dataset metadata into the Lake Formation catalog.
    5. Processing layer: Amazon EMR processes your raw data and places them into a new S3 bucket. Use AWS Glue DataBrew and AWS Glue to process the data as needed.
    6. Consumption layer or persona-centric analytics: Once data is transformed:
      • AWS Lambda and Amazon API Gateway will allow you to develop data services for enterprise data service users
      • You can develop user-friendly dashboards for your business users using Amazon QuickSight
      • Use Amazon Athena to query transformed data for your IT users
      • Your data scientists can utilize AWS Glue DataBrew to clean and normalize the data and Amazon SageMaker for machine learning models
      • Your enterprise data warehouse users can use Amazon Redshift to derive business intelligence
    7. Security and governance layer: AWS IAM provides users, groups, and role-level identity, in addition to the ability to configure coarse-grained access control for resources managed by AWS services in all layers. AWS Lake Formation provides fine-grained access controls and you can grant/revoke permissions at the database- or table- or column-level access.

Lake House architecture on AWS

The vast majority of data lakes are built on Amazon S3. At the same time, customers are leveraging purpose-built analytics stores that are optimized for specific use cases. Customers want the freedom to move data between their centralized data lakes and the surrounding purpose-built analytics stores. And they want to get insights with speed and agility in a seamless, secure, and compliant manner. We call this modern approach to analytics the Lake House architecture.

Figure 2. Lake House architecture on AWS

Figure 2. Lake House architecture on AWS

Refer to the whitepaper Derive Insights from AWS Lake house for various design patterns to derive persona-centric analytics by using the AWS Lake House approach. Check out the blog post Build a Lake House Architecture on AWS  for a Lake House reference architecture on AWS.

Conclusion

In this post, we show you how to build a persona-centric data platform on AWS with a seven-layered approach. This uses Apache NiFi as a data ingestion tool and AWS purpose-built analytics services for persona-centric analytics and machine learning. We have also shown how to build persona-centric analytics by using the AWS Lake House approach.

With the information in this post, you can now build your own data platform on AWS to gain faster and deeper insights from your data. AWS provides you the broadest and deepest portfolio of purpose-built analytics and machine learning services to support your business needs.

Read more and get started on building a data platform on AWS:

Improve query performance using AWS Glue partition indexes

Post Syndicated from Noritaka Sekiyama original https://aws.amazon.com/blogs/big-data/improve-query-performance-using-aws-glue-partition-indexes/

While creating data lakes on the cloud, the data catalog is crucial to centralize metadata and make the data visible, searchable, and queryable for users. With the recent exponential growth of data volume, it becomes much more important to optimize data layout and maintain the metadata on cloud storage to keep the value of data lakes.

Partitioning has emerged as an important technique for optimizing data layout so that the data can be queried efficiently by a variety of analytic engines. Data is organized in a hierarchical directory structure based on the distinct values of one or more columns. Over time, hundreds of thousands of partitions get added to a table, resulting in slow queries. To speed up query processing of highly partitioned tables cataloged in AWS Glue Data Catalog, you can take advantage of AWS Glue partition indexes.

Partition indexes are available for queries in Amazon EMRAmazon Redshift Spectrum, and AWS Glue extract, transform, and load (ETL) jobs (Spark DataFrame). When partition indexes are enabled on the heavily partitioned AWS Glue Data Catalog tables, all these query engines are accelerated. You can add partition indexes to both new tables and existing tables. This post demonstrates how to utilize partition indexes, and discusses the benefit you can get with partition indexes when working with highly partitioned data.

Partition indexes

AWS Glue partition indexes are an important configuration to reduce overall data transfers and processing, and reduce query processing time. In the AWS Glue Data Catalog, the GetPartitions API is used to fetch the partitions in the table. The API returns partitions that match the expression provided in the request. If no partition indexes are present on the table, all the partitions of the table are loaded, and then filtered using the query expression provided by the user in the GetPartitions request. The query takes more time to run as the number of partitions increase on a table with no indexes. With an index, the GetPartitions request tries to fetch a subset of the partitions instead of loading all the partitions in the table.

The following are key benefits of partition indexes:

  • Increased query performance
  • Increased concurrency as a result of fewer GetPartitions API calls
  • Cost savings:
    • Analytic engine cost (query performance is related to the charges in Amazon EMR and AWS Glue ETL)
    • AWS Glue Data Catalog API request cost

Setting up resources with AWS CloudFormation

This post provides an AWS CloudFormation template for a quick setup. You can review and customize it to suit your needs. Some of the resources that this stack deploys incur costs when in use.

The CloudFormation template generates the following resources:

If you’re using AWS Lake Formation permissions, you need to ensure that the IAM user or role running AWS CloudFormation has the required permissions (to create a database on the Data Catalog).

The tables use sample data located in an Amazon Simple Storage Service (Amazon S3) public bucket. Initially, no partition indexes are configured in these AWS Glue Data Catalog tables.

To create your resources, complete the following steps:

  1. Sign in to the CloudFormation console.
  2. Choose Launch Stack:
  3. Choose Next.
  4. For DatabaseName, leave as the default.
  5. Choose Next.
  6. On the next page, choose Next.
  7. Review the details on the final page and select I acknowledge that AWS CloudFormation might create IAM resources.
  8. Choose Create.

Stack creation can take up to 5 minutes. When the stack is completed, you have two Data Catalog tables: table_with_index and table_without_index. Both tables point to the same S3 bucket, and the data is highly partitioned based on yearmonthday, and hour columns for more than 42 years (1980-2021). In total, there are 367,920 partitions, and each partition has one JSON file, data.json. In the following sections, you see how the partition indexes work with these sample tables.

Setting up a partition index on the AWS Glue console

You can create partition indexes at any time. If you want to create a new table with partition indexes, you can make the CreateTable API call with a list of PartitionIndex objects. If you want to add a partition index to an existing table, make the CreatePartitionIndex API call. You can also perform these actions on the AWS Glue console. You can create up to three partition indexes on a table.

Let’s configure a new partition index for the table table_with_index we created with the CloudFormation template.

  1. On the AWS Glue console, choose Tables.
  2. Choose the table table_with_index.
  3. Choose Partitions and indices.
  4. Choose Add new index.
  5. For Index name, enter year-month-day-hour.
  6. For Selected keys from schema, select year, month, day, and hour.
  7. Choose Add index.

The Status column of the newly created partition index shows the status as Creating. Wait for the partition index to be Active. The process takes about 1 hour because more number of partitions longer it takes for index creation and we have 367,920 partitions on this table.

Now the partition index is ready for the table table_with_index. You can use this index from various analytic engines when you query against the table. You see default behavior in the table table_without_index because no partition indexes are configured for this table.

You can follow (or skip) any of the following sections based on your interest.

Making a GetPartitions API call with an expression

Before we use the partition index from various query engines, let’s try making the GetPartitions API call using AWS Command Line Interface (AWS CLI) to see the difference. The AWS CLI get-partitions command makes multiple GetPartitions API calls if needed. In this section, we simply use the time command to compare the duration for each table, and use the debug logging to compare the number of API calls for each table.

  1. Run the get-partitions command against the table table_without_index with the expression year='2021' and month='04' and day='01':
    $ time aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'"
    ...
    real    3m57.438s
    user    0m2.872s
    sys    0m0.248s
    

The command took about 4 minutes. Note that you used only three partition columns out of four.

  1. Run the same command with debug logging to get the number of the GetPartitionsAPI calls:
    $ aws glue get-partitions --database-name partition_index --table-name table_without_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-without-index.log
    $ cat get-partitions-without-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
         737

There were 737 GetPartitions API calls when the partition indexes aren’t used.

  1. Next, run the get-partitions command against table_with_index with the same expression:
    $ time aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2020' and month='07' and day='01' and hour='09'"
    ...
    real    0m2.697s
    user    0m0.442s
    sys    0m0.163s

The command took just 2.7 seconds. You can see how quickly the required partitions were returned.

  1. Run the same command with debug logging to get the number of the GetPartitionsAPI calls:
    $ aws glue get-partitions --database-name partition_index --table-name table_with_index --expression "year='2021' and month='04' and day='01'" --debug 2>get-partitions-with-index.log
    $ cat get-partitions-with-index.log | grep x-amz-target:AWSGlue.GetPartitions | wc -l
           4
    

There were only four GetPartitions API calls when the partition indexes are used.

Querying a table using Apache Spark on Amazon EMR

In this section, we explore querying a table using Apache Spark on Amazon EMR.

  1. Launch a new EMR cluster with Apache Spark.

For instructions, see Setting Up Amazon EMR. You need to specify the AWS Glue Data Catalog as the metastore. In this example, we use the default EMR cluster (release: emr-6.2.0, three m5.xlarge nodes).

  1. Connect to the EMR node using SSH.
  2. Run the spark-sql command on the EMR node to start an interactive shell for Spark SQL:
    $ spark-sql

  3. Run the following SQL against partition_index.table_without_index:
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 35.518 seconds, Fetched 1 row(s)

The query took 35 seconds. Even though you aggregated records only in the specific partition, the query took so long because there are many partitions and the GetPartitions API call takes time.

Now let’s run the same query against table_with_index to see how much benefit the partition index introduces.

  1. Run the following SQL against partition_index.table_with_index:
    spark-sql> SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01';
    24    13840.894731640636
    Time taken: 2.247 seconds, Fetched 1 row(s)

The query took just 2 seconds. The reason for the difference in query duration is because the number of GetPartitions calls is smaller because of the partition index.

The following chart shows the granular metrics for query planning time without and with the partition index. The query planning time with the index is far less than that without the index.

For more information about comparing metrics in Apache Spark, see Appendix 2 at the end of this post.

Querying a table using Redshift Spectrum

To query with Redshift Spectrum, complete the following steps:

  1. Launch a new Redshift cluster.

You need to configure an IAM role for the cluster to utilize Redshift Spectrum and the Amazon Redshift query editor. Choose dc2.large, 1 node in this example. You need to launch the cluster in the us-east-1 Region because you need to place your cluster in the same Region as the bucket location.

  1. Connect with the Redshift query editor. For instructions, see Querying a database using the query editor.
  2. Create an external schema for the partition_index database to use it in Redshift Spectrum: (replace <your IAM role ARN> with your IAM role ARN).
    create external schema spectrum from data catalog 
    database 'partition_index' 
    iam_role '<your IAM role ARN>'
    create external database if not exists;

  3. Run the following SQL against spectrum_schema.table_without_index:
    SELECT count(*), sum(value) FROM spectrum.table_without_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query took more than 3 minutes.

  1. Run the following SQL against spectrum_schema.table_with_index:
    SELECT count(*), sum(value) FROM spectrum.table_with_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query for the table using indexes took just 8 seconds, which is much faster than the table without indexes.

Querying a table using AWS Glue ETL

Let’s launch an AWS Glue development endpoint and an Amazon SageMaker notebook.

  1. Open the AWS Glue console, choose Dev endpoints.
  2. Choose Add endpoint.
  3. For Development endpoint name, enter partition-index.
  4. For IAM role, choose your IAM role.

For more information about roles, see Managing Access Permissions for AWS Glue Resources.

  1. For Worker type under Security configuration, script libraries, and job parameters (optional), choose 1X.
  2. For Number of workers, enter 4.
  3. For Dependent jar path, enter s3://crawler-public/json/serde/json-serde.jar.
  4. Select Use Glue data catalog as the Hive metastore under Catalog options (optional).
  5. Choose Next.
  6. For Networking, leave as is (by default, Skip networking configuration is selected), and choose Next.
  7. For Add an SSH public key (Optional), leave it blank, and choose Next.
  8. Choose Finish.
  9. Wait for the development endpoint partition-index to show as READY.

The endpoint may take up to 10 minutes to be ready.

  1. Select the development endpoint partition-index, and choose Create SageMaker notebook on the Actions
  2. For Notebook name, enter partition-index.
  3. Select Create an IAM role.
  4. For IAM role, enter partition-index.
  5. Choose Create notebook.
  6. Wait for the notebook aws-glue-partition-index to show the status as Ready.

The notebook may take up to 3 minutes to be ready.

  1. Select the notebook aws-glue-partition-index, and choose Open notebook.
  2. Choose Sparkmagic (PySpark)on the New
  3. Enter the following code snippet against table_without_index, and run the cell:
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The query took 3 minutes.

  1. Enter the following code snippet against partition_index.table_with_index, and run the cell:
    %%time
    %%sql
    SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'

The following screenshot shows our output.

The cell took just 7 seconds. The query for the table using indexes is faster than the table without indexes.

Cleaning up

Now to the final step, cleaning up the resources:

  1. Delete the CloudFormation stack. 
  2. Delete the EMR cluster.
  3. Delete the Amazon Redshift cluster.
  4. Delete the AWS Glue development endpoint and SageMaker notebook.

Conclusion

In this post, we explained how to use partition indexes and how they accelerate queries in various query engines. If you have several millions of partitions, the performance benefit is significantly more. You can learn about partition indexes more deeply in Working with Partition Indexes.


Appendix 1: Setting up a partition index using AWS CLI

If you prefer using the AWS CLI, run the following create-partition-index command to set up a partition index:

$ aws glue create-partition-index --database-name partition_index --table-name table_with_index --partition-index Keys=year,month,day,hour,IndexName=year-month-day-hour

To get the status of the partition index, run the following get-partition-indexes command:

$ aws glue get-partition-indexes --database-name partition_index --table-name table_with_index
{
    "PartitionIndexDescriptorList": [
        {
            "IndexName": "year-month-day-hour",
            "Keys": [
                {
                    "Name": "year",
                    "Type": "string"
                },
                {
                    "Name": "month",
                    "Type": "string"
                },
                {
                    "Name": "day",
                    "Type": "string"
                },
                {
                    "Name": "hour",
                    "Type": "string"
                }
            ],
            "IndexStatus": "CREATING"
        }
    ]
}

Appendix 2: Comparing breakdown metrics in Apache Spark

If you’re interested in comparing the breakdown metrics for query planning time, you can register a SQL listener with the following Scala code snippet:

spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
  override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
    val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
    println(metricMap.toSeq)
  }
  override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
})

If you use spark-shell, you can register the listener as follows:

$ spark-shell
...
scala> spark.listenerManager.register(new org.apache.spark.sql.util.QueryExecutionListener {
     |   override def onSuccess(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, durationNs: Long): Unit = {
     |     val metricMap = qe.tracker.phases.mapValues { ps => ps.endTimeMs - ps.startTimeMs }
     |     println(metricMap.toSeq)
     |   }
     |   override def onFailure(funcName: String, qe: org.apache.spark.sql.execution.QueryExecution, exception: Exception): Unit = {}
     | })

Then run the same query without using the index to get the breakdown metrics:

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_without_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,208), (optimization,29002), (analysis,4))
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640632|
+--------+------------------+

In this example, we use the same setup for the EMR cluster (release: emr-6.2.0, three m5.xlarge nodes). The console has additional line:

Vector((planning,208), (optimization,29002), (analysis,4)) 

Apache Spark’s query planning mechanism has three phases: analysis, optimization, and physical planning (shown as just planning). This line means that the query planning took 4 milliseconds in analysis, 29,002 milliseconds in optimization, and 208 milliseconds in physical planning.

Let’s try running the same query using the index:

scala> spark.sql("SELECT count(*), sum(value) FROM partition_index.table_with_index WHERE year='2021' AND month='04' AND day='01'").show()
Vector((planning,7), (optimization,608), (analysis,2))                          
+--------+------------------+
|count(1)|        sum(value)|
+--------+------------------+
|      24|13840.894731640634|
+--------+------------------+

The query planning took 2 milliseconds in analysis, 608 milliseconds in optimization, and 7 milliseconds in physical planning.


About the Authors

Noritaka Sekiyama is a Senior Big Data Architect at AWS Glue and AWS Lake Formation. He is passionate about big data technology and open source software, and enjoys building and experimenting in the analytics area.

 

 

 

Sachet Saurabh is a Senior Software Development Engineer at AWS Glue and AWS Lake Formation. He is passionate about building fault tolerant and reliable distributed systems at scale.

 

 

 

Vikas Malik is a Software Development Manager at AWS Glue. He enjoys building solutions that solve business problems at scale. In his free time, he likes playing and gardening with his kids and exploring local areas with family.

 

 

 

 

Amazon EMR 2020 year in review

Post Syndicated from Abhishek Sinha original https://aws.amazon.com/blogs/big-data/amazon-emr-2020-year-in-review/

Tens of thousands of customers use Amazon EMR to run big data analytics applications on Apache Spark, Apache Hive, Apache HBase, Apache Flink, Apache Hudi, and Presto at scale. Amazon EMR automates the provisioning and scaling of these frameworks, and delivers high performance at low cost with optimized runtimes and support for a wide range of Amazon Elastic Compute Cloud (Amazon EC2) instance types and Amazon Elastic Kubernetes Service (Amazon EKS) clusters. Amazon EMR makes it easy for data engineers and data scientists to develop, visualize, and debug data science applications with Amazon EMR Studio (preview) and Amazon EMR Notebooks.

You can hear customers describe how they use Amazon EMR in the following 2020 AWS re:Invent sessions:

You can also find more information in the following posts:

Throughout 2020, we worked to deliver better Amazon EMR performance at a lower price, and to make Amazon EMR easier to manage and use for big data analytics within your Lake House Architecture. This post summarizes the key improvements during the year and provides links to additional information.

Differentiated engine performance

Amazon EMR simplifies building and operating big data environments and applications. You can launch an EMR cluster in minutes. You don’t need to worry about infrastructure provisioning, cluster setup, configuration, or tuning. Amazon EMR takes care of these tasks, allowing you to focus your teams on developing differentiated big data applications. In addition to eliminating the need for you to build and manage your own infrastructure to run big data applications, Amazon EMR gives you better performance than simply using open-source distributions, and provides 100% API compatibility. This means you can run your workloads faster without changing any code.

Amazon EMR runtime for Apache Spark is a performance-optimized runtime environment for Spark that is active by default. We first introduced the EMR runtime for Apache Spark in Amazon EMR release 5.28.0 in November 2019, and used queries based on the TPC-DS benchmark to measure the performance improvement over open-source Spark 2.4. Those results showed considerable improvement: the geometric mean in query execution time was 2.4 times faster and the total query runtime was 3.2 times faster. As discussed in Turbocharging Query Execution on Amazon EMR at AWS re:Invent 2020, we’ve continued to improve the runtime, and our latest results show that Amazon EMR 5.30 is three times faster than without the runtime, which means you can run petabyte-scale analysis at less than half the cost of traditional on-premises solutions. For more information, see How Drop used the EMR runtime for Apache Spark to halve costs and get results 5.4 times faster.

We’ve also improved Hive and PrestoDB performance. In April 2020, we announced support for Hive Low Latency Analytical Processing (LLAP) as a YARN service starting with Amazon EMR 6.0. Our tests show that Apache Hive is two times faster with Hive LLAP on Amazon EMR 6.0. You can choose to use Hive LLAP or dynamically allocated containers. In May 2020, we introduced the Amazon EMR runtime for PrestoDB in Amazon EMR 5.30. Our most recent tests based on TPC-DS benchmark queries compare Amazon EMR 5.31, which uses the runtime, to Amazon EMR 5.29, which does not. The geometric mean in query execution time is 2.6 times faster with Amazon EMR 5.31 and the runtime for PrestoDB.

Simpler incremental data processing

Apache Hudi (Hadoop Upserts, Deletes, and Incrementals) is an open-source data management framework used for simplifying incremental data processing and data pipeline development. You can use it to perform record-level inserts, updates, and deletes in Amazon Simple Storage Service (Amazon S3) data lakes, thereby simplifying building change data capture (CDC) pipelines. With this capability, you can comply with data privacy regulations and simplify data ingestion pipelines that deal with late-arriving or updated records from sources like streaming inputs and CDC from transactional systems. Apache Hudi integrates with open-source big data analytics frameworks like Apache Spark, Apache Hive, and Presto, and allows you to maintain data in Amazon S3 or HDFS in open formats like Apache Parquet and Apache Avro.

We first supported Apache Hudi starting with Amazon EMR release 5.28 in November 2019. In June 2020, Apache Hudi graduated from incubator with release 0.6.0, which we support with Amazon EMR releases 5.31.0, 6.2.0, and higher. The Amazon EMR team collaborated with the Apache Hudi community to create a new bootstrap operation, which allows you to use Hudi with your existing Parquet datasets without needing to rewrite the dataset. This bootstrap operation accelerates the process of creating a new Apache Hudi dataset from existing datasets—in our tests using a 1 TB Parquet dataset on Amazon S3, the bootstrap performed five times faster than bulk insert.

Also in June 2020, starting with Amazon EMR release 5.30.0, we added support for the HoodieDeltaStreamer utility, which provides an easy way to ingest data from many sources, including AWS Data Migration Services (AWS DMS). With this integration, you can now ingest data from upstream relational databases to your S3 data lakes in a seamless, efficient, and continuous manner. For more information, see Apply record level changes from relational databases to Amazon S3 data lake using Apache Hudi on Amazon EMR and AWS Database Migration Service.

Amazon Athena and Amazon Redshift Spectrum added support for querying Apache Hudi datasets in S3-based data lakes—Athena announcing in July 2020 and Redshift Spectrum announcing in September. Now, you can query the latest snapshot of Apache Hudi Copy-on-Write (CoW) datasets from both Athena and Redshift Spectrum, even while you continue to use Apache Hudi support in Amazon EMR to make changes to the dataset.

Differentiated instance performance

In addition to providing better software performance with Amazon EMR runtimes, we offer more instance options than any other cloud provider, allowing you to choose the instance that gives you the best performance and cost for your workload. You choose what types of EC2 instances to provision in your cluster (standard, high memory, high CPU, high I/O) based on your application’s requirements, and fully customize your cluster to suit your requirements.

In December 2020, we announced that Amazon EMR now supports M6g, C6g, and R6g instances with versions 6.1.0, 5.31.0 and later, which enables you to use instances powered by AWS Graviton2 processors. Graviton2 processors are custom designed by AWS using 64-bit Arm Neoverse cores to deliver the best price performance for cloud workloads running in Amazon EC2. Although your performance benefit will vary based on the unique characteristics of your workloads, our tests based on the TPC-DS 3 TB benchmark showed that the EMR runtime for Apache Spark provides up to 15% improved performance and up to 30% lower costs on Graviton2 instances relative to equivalent previous generation instances.

Easier cluster optimization

We’ve also made it easier to optimize your EMR clusters. In July 2020, we introduced Amazon EMR Managed Scaling, a new feature that automatically resizes your EMR clusters for best performance at the lowest possible cost. EMR Managed Scaling eliminates the need to predict workload patterns in advance or write custom automatic scaling rules that depend on an in-depth understanding of the application framework (for example, Apache Spark or Apache Hive). Instead, you specify the minimum and maximum compute resource limits for your clusters, and Amazon EMR constantly monitors key metrics based on the workload and optimizes the cluster size for best resource utilization. Amazon EMR can scale the cluster up during peaks and scale it down gracefully during idle periods, reducing your costs by 20–60% and optimizing cluster capacity for best performance.

EMR Managed Scaling is supported for Apache Spark, Apache Hive, and YARN-based workloads on Amazon EMR versions 5.30.1 and above. EMR Managed Scaling supports EMR instance fleets, enabling you to seamlessly scale Spot Instances, On-Demand Instances, and instances that are part of a Savings Plan, all within the same cluster. You can take advantage of Managed Scaling and instance fleets to provision the cluster capacity that has the lowest chance of getting interrupted, for the lowest cost.

In October 2020, we announced Amazon EMR support for the capacity-optimized allocation strategy for provisioning EC2 Spot Instances. The capacity-optimized allocation strategy automatically makes the most efficient use of available spare capacity while still taking advantage of the steep discounts offered by Spot Instances. You can now specify up to 15 instance types in your EMR task instance fleet configuration. This provides Amazon EMR with more options in choosing the optimal pools to launch Spot Instances from in order to decrease chances of Spot interruptions, and increases the ability to relaunch capacity using other instance types in case Spot Instances are interrupted when Amazon EC2 needs the capacity back.

For more information, see How Nielsen built a multi-petabyte data platform using Amazon EMR and Contextual targeting and ad tech migration best practices.

Workload consolidation

Previously, you had to choose between using fully managed Amazon EMR on Amazon EC2 or self-managing Apache Spark on Amazon EKS. When you use Amazon EMR on Amazon EC2, you can choose from a wide range of EC2 instance types to meet price and performance requirements, but you can’t run multiple versions of Apache Spark or other applications on a cluster, and you can’t use unused capacity for non-Amazon EMR applications. When you self-manage Apache Spark on Amazon EKS, you have to do the heavy lifting of installing, managing, and optimizing Apache Spark to run on Kubernetes, and you don’t get the benefit of optimized runtimes in Amazon EMR.

You no longer have to choose. In December 2020, we announced the general availability of Amazon EMR on Amazon EKS, a new deployment option for Amazon EMR that allows you to run fully managed open-source big data frameworks on Amazon EKS. If you already use Amazon EMR, you can now consolidate Amazon EMR-based applications with other Kubernetes-based applications on the same Amazon EKS cluster to improve resource utilization and simplify infrastructure management using common Amazon EKS tools. If you currently self-manage big data frameworks on Amazon EKS, you can now use Amazon EMR to automate provisioning and management, and take advantage of the optimized Amazon EMR runtimes to deliver better performance at lower cost.

Amazon EMR on EKS enables your team to collaborate more efficiently. You can run applications on a common pool of resources without having to provision infrastructure, and co-locate multiple Amazon EMR versions on a single Amazon EKS cluster to rapidly test and verify new Amazon EMR versions and the included open-source frameworks. You can improve developer productivity with faster cluster startup times because Amazon EMR application containers on existing Amazon EKS cluster instances start within 15 seconds, whereas creating new clusters of EC2 instances can take several minutes. You can use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to programmatically author, schedule, and monitor workflows, and use EMR Studio (preview) to develop, visualize, and debug applications. We discuss Amazon MWAA and EMR Studio more in the next section.

For more information, see Run Spark on Kubernetes with Amazon EMR on Amazon EKS and Amazon EMR on EKS Development Guide.

Higher developer productivity

Of course, your goal with Amazon EMR is not only to achieve the best price performance for your big data analytics workloads, but also to deliver new insights that help you run your business.

In November 2020, we announced Amazon MWAA, a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines. Airflow workflows retrieve input from sources like Amazon S3 using Athena queries, perform transformations on EMR clusters, and can use the resulting data to train machine learning (ML) models on Amazon SageMaker. Workflows in Airflow are authored as Directed Acyclic Graphs (DAGs) using the Python programming language.

At AWS re:Invent 2020, we introduced the preview of EMR Studio, a new notebook-first integrated development environment (IDE) experience with Amazon EMR. EMR Studio makes it easy for data scientists to develop, visualize, and debug applications written in R, Python, Scala, and PySpark. It provides fully managed Jupyter notebooks and tools like Spark UI and YARN Timeline Service to simplify debugging. You can install custom Python libraries or Jupyter kernels required for your applications directly to your EMR clusters, and can connect to code repositories such as AWS CodeCommit, GitHub, and Bitbucket to collaborate with peers. EMR Studio uses AWS Single Sign-On (AWS SSO), enabling you to log in directly with your corporate credentials without signing in to the AWS Management Console.

EMR Studio kernels and applications run on EMR clusters, so you get the benefit of distributed data processing using the performance-optimized EMR runtime for Apache Spark. You can create cluster templates in AWS Service Catalog to simplify running jobs for your data scientists and data engineers, and can take advantage of EMR clusters running on Amazon EC2, Amazon EKS, or both. For example, you might reuse existing EC2 instances in your shared Kubernetes cluster to enable fast startup time for development work and ad hoc analysis, and use EMR clusters on Amazon EC2 to ensure the best performance for frequently run, long-running workloads.

To learn more, see Introducing a new notebook-first IDE experience with Amazon EMR and Amazon EMR Studio.

Unified governance

At AWS, we recommend you use a Lake House Architecture to modernize your data and analytics infrastructure in the cloud. A Lake House Architecture acknowledges the idea that taking a one-size-fits-all approach to analytics eventually leads to compromises. It’s not simply about integrating a data lake with a data warehouse, but rather about integrating a data lake, data warehouse, and purpose-built analytics services, and enabling unified governance and easy data movement. For more information about this approach, see Harness the power of your data with AWS Analytics by Rahul Pathak, and his AWS re:Invent 2020 analytics leadership session.

As shown in the following diagram, Amazon EMR is one element in a Lake House Architecture on AWS, along with Amazon S3, Amazon Redshift, and more.

One of the most important pieces of a modern analytics architecture is the ability for you to authorize, manage, and audit access to data. AWS gives you the fine-grained access control and governance you need to manage access to data across a data lake and purpose-built data stores and analytics services from a single point of control.

In October 2020, we announced the general availability of Amazon EMR integration with AWS Lake Formation. By integrating Amazon EMR with AWS Lake Formation, you can enhance data access control on multi-tenant EMR clusters by managing Amazon S3 data access at the level of databases, tables, and columns. This feature also enables SAML-based single sign-on to EMR Notebooks and Apache Zeppelin, and simplifies the authentication for organizations using Active Directory Federation Services (ADFS). With this integration, you have a single place to manage data access for Amazon EMR, along with the other AWS analytics services shown in the preceding diagram. At AWS re:Invent 2020, we announced the preview of row-level security for Lake Formation, which makes it even easier to control access for all the people and applications that need to share data.

In January 2021, we introduced Amazon EMR integration with Apache Ranger. Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka. Starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects.

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

To learn more, see Integrate Amazon EMR with AWS Lake Formation and Integrate Amazon EMR with Apache Ranger.

Jumpstart your migration to Amazon EMR

Building a modern data platform using the Lake House Architecture enables you to collect data of all types, store it in a central, secure repository, and analyze it with purpose-built tools like Amazon EMR. Migrating your big data and ML to AWS and Amazon EMR offers many advantages over on-premises deployments. These include separation of compute and storage, increased agility, resilient and persistent storage, and managed services that provide up-to-date, familiar environments to develop and operate big data applications. We can help you design, deploy, and architect your analytics application workloads in AWS and help you migrate your big data and applications.

The AWS Well-Architected Framework helps you understand the pros and cons of decisions you make while building systems on AWS. By using the framework, you learn architectural best practices for designing and operating reliable, secure, efficient, and cost-effective systems in the cloud, and ways to consistently measure your architectures against best practices and identify areas for improvement. In May 2020, we announced the Analytics Lens for the AWS Well-Architected Framework, which offers comprehensive guidance to make sure that your analytics applications are designed in accordance with AWS best practices. We believe that having well-architected systems greatly increases the likelihood of business success.

To move to Amazon EMR, you can download the Amazon EMR migration guide to follow step-by-step instructions, get guidance on key design decisions, and learn best practices. You can also request an Amazon EMR Migration Workshop, a virtual workshop to jumpstart your Apache Hadoop/Spark migration to Amazon EMR. You can also learn how AWS partners have helped customers migrate to Amazon EMR in Mactores’s Seagate case study, Cloudwick’s on-premises to AWS Cloud migration to drive cost efficiency, and DNM’s global analytics platform for the cinema industry.


About the Authors

Abhishek Sinha is a Principal Product Manager at Amazon Web Services.

 

 

 

 

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

BJ Haberkorn is principal product marketing manager for analytics at Amazon Web Services. BJ has worked previously on voice technology including Amazon Alexa, real time communications systems, and processor design. He holds BS and MS degrees in electrical engineering from the University of Virginia.

Amazon MSK backup for Archival, Replay, or Analytics

Post Syndicated from Rohit Yadav original https://aws.amazon.com/blogs/architecture/amazon-msk-backup-for-archival-replay-or-analytics/

Amazon MSK is a fully managed service that helps you build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes. You can also stream changes to and from databases, and power machine learning and analytics applications.

Amazon MSK simplifies the setup, scaling, and management of clusters running Apache Kafka. MSK manages the provisioning, configuration, and maintenance of resources for a highly available Kafka clusters. It is fully compatible with Apache Kafka and supports familiar community-build tools such as MirrorMaker 2.0, Kafka Connect and Kafka streams.

Introduction

In the past few years, the volume of data that companies must ingest has increased significantly. Information comes from various sources, like transactional databases, system logs, SaaS platforms, mobile, and IoT devices. Businesses want to act as soon as the data arrives. This has resulted in increased adoption of scalable real-time streaming solutions. These solutions scale horizontally to provide the needed throughput to process data in real time, with milliseconds of latency. Customers have adopted Amazon MSK as a top choice of streaming platforms. Amazon MSK gives you the flexibility to retain topic data for longer term (default 7 days). This supports replay, analytics, and machine learning based use cases. When IT and business systems are producing and processing terabytes of data per hour, it can become expensive to store, manage, and retrieve data. This has led to legacy data archival processes moving towards cheaper, reliable, and long-term storage solutions like Amazon Simple Storage Service (S3).

Following are some of the benefits of archiving Amazon MSK topic data to Amazon S3:

  1. Reduced Cost – You only must retain the data in the cluster based on your Recovery Point Objective (RPO). Any historical data can be archived in Amazon S3 and replayed if necessary.
  2. Integration with Enterprise Data Lake – Since your data is available in S3, you can now integrate with other data analytics services like Amazon EMR, AWS Glue, Amazon Athena, to run data aggregation and analytics. For example, you can build reports to visualize month over month changes.
  3. Optimize Machine Learning Workloads – Machine learning applications will be able to train new models and improve predictions using historical streams of data available in Amazon S3. This also enables better integration with Amazon Machine Learning services.
  4. Compliance – Long-term data archival for regulatory and security compliance.
  5. Backloading data to other systems – Ability to rebuild data into other application environments such as pre-prod, testing, and more.

There are many benefits to using Amazon S3 as long-term storage for Amazon MSK topics. Let’s dive deeper into the recommended architecture for this pattern. We will present an architecture to back up Amazon MSK topics to Amazon S3 in real time. In addition, we’ll demonstrate some of the use cases previously mentioned.

Architecture

The diagram following illustrates the architecture for building a real-time archival pipeline to archive Amazon MSK topics to S3. This architecture uses an AWS Lambda function to process records from your Amazon MSK cluster when the cluster is configured as an event source. As a consumer, you don’t need to worry about infrastructure management or scaling with Lambda. You only pay for what you consume, so you don’t pay for over-provisioned infrastructure.

To create an event source mapping, you can add your Amazon MSK cluster in a Lambda function trigger. The Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches from one or more partitions and provides these to your function as an event payload. The function then processes records, and sends the payload to an Amazon Kinesis Data Firehose delivery stream. We use Kinesis Data Firehose delivery stream because it can natively batch, compress, transform, and encrypt your events before loading to S3.

In this architecture, Kinesis Data Firehose delivers the records received from Lambda in Gzip file to Amazon S3. These files are partitioned in hive style format by Kinesis Data Firehose:

data/year = yyyy/month = MM/day = dd/hour = HH

Figure 1. Archival Architecture

Figure 1. Archival Architecture

Let’s review some of the possible solutions that can be built on this archived data.

Integration with Enterprise Data Lake

The architecture diagram following shows how you can integrate the archived data in Amazon S3 with your Enterprise Data Lake. Since the data files are prefixed in hive style format, you can partition and store the Data Catalog in AWS Glue. With partitioning in place, you can perform optimizations like partition pruning, which enables predicate pushdown for improved performance of your analytics queries. You can also use AWS Data Analytics services like Amazon EMR and AWS Glue for batch analytics. Amazon Athena can be used to run serverless SQL-like interactive queries on visualization and data.

Data currently gets stored in JSON files. Following are some of the services/tools that can be integrated with your archive for reporting, analytics, visualization, and machine learning requirements.

Figure 2. Analytics Architecture

Figure 2. Analytics Architecture

Cloning data into other application environments

There are use cases where you would want to use this data to clone other application environments using this archive.

These clusters could be used for testing or debugging purposes. You could decide to use only a subset of your data from the archive. Let’s say you want to debug an issue beyond the configured retention period, but not replicate all the data to your testing environment. With archived data in S3, you can build downstream jobs to filter data that can be loaded into a new Amazon MSK cluster. The following diagram highlights this pattern:

Figure 3. Replay Architecture

Figure 3. Replay Architecture

Ready for a Test Drive

To help you get started, we would like to introduce an AWS Solution: AWS Streaming Data Solution for Amazon MSK (scroll down and see Option 3 tab). There is a single-click AWS CloudFormation template, which can assist you in quickly provisioning resources. This will get your real-time archival pipeline for Amazon MSK up and running quickly. This solution shortens your development time by removing or reducing the need for you to:

  • Model and provision resources using AWS CloudFormation
  • Set up Amazon CloudWatch alarms, dashboards, and logging
  • Manually implement streaming data best practices in AWS

This solution is data and logic agnostic, enabling you to start with boilerplate code and start customizing quickly. After deployment, use this solution’s monitoring capabilities to transition easily to production.

Conclusion

In this post, we explained the architecture to build a scalable, highly available real-time archival of Amazon MSK topics to long term storage in Amazon S3. The architecture was built using Amazon MSK, AWS Lambda, Amazon Kinesis Data Firehose, and Amazon S3. The architecture also illustrates how you can integrate your Amazon MSK streaming data in S3 with your Enterprise Data Lake.

Orchestrating analytics jobs on Amazon EMR Notebooks using Amazon MWAA

Post Syndicated from Fei Lang original https://aws.amazon.com/blogs/big-data/orchestrating-analytics-jobs-on-amazon-emr-notebooks-using-amazon-mwaa/

In a previous post, we introduced the Amazon EMR notebook APIs, which allow you to programmatically run a notebook on both Amazon EMR Notebooks and Amazon EMR Studio (preview) without accessing the AWS web console. With the APIs, you can schedule running EMR notebooks with cron scripts, chain multiple EMR notebooks, and use orchestration services such as AWS Step Functions triggered by AWS CloudWatch Events.

In this post, we show how to use Amazon Managed Workflows for Apache Airflow (Amazon MWAA) to orchestrate analytics jobs on EMR Notebooks. We will start by walking you through the process of using AWS CloudFormation to set up an Amazon MWAA environment, which allows you to programmatically author, schedule, and monitor different sorts of workflows on Amazon EMR. We will then use this environment to run an EMR notebook example which does data analysis with Hive.

The data source for the example in this post is from the public Amazon Customer Reviews Dataset. We use the Parquet formatted dataset as the input dataset for our EMR notebook.

Apache Airflow and Amazon MWAA

Apache Airflow is an open-source platform for authoring, scheduling, and monitoring workflows. With Apache Airflow, we can define direct acyclic graphs (DAGs). DAGs describe how to run a workflow and are written in Python. For additional details on Apache Airflow, see Concepts. Many organizations build, manage, and maintain Apache Airflow on AWS using services such as Amazon Elastic Compute Cloud (Amazon EC2) or Amazon Elastic Kubernetes Service (Amazon EKS). Amazon MWAA is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS, and to build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

Prerequisites

Before getting started, you must have the following prerequisites:

  • An AWS account that provides access to AWS services.
  • AWS Command Line Interface (AWS CLI) version 1.18.128 or later installed on your workstation.
  • An Amazon Simple Storage Service (Amazon S3) bucket that meets the following Amazon MWAA requirements:
    • The bucket must be in the same AWS Region where you create the MWAA environment.
    • The bucket name must start with airflow- and should be globally unique.
    • Bucket versioning is enabled.
    • A folder named dags must be created in the same bucket to store DAGs and associated support files.
  • An AWS Identity and Access Management (IAM) user with an access key and secret access key to configure the AWS CLI.
    • The IAM user has permissions to create an IAM role and policies, launch an EMR cluster, create an Amazon MWAA environment, and create stacks in AWS CloudFormation.
  • A possible limit increase for your account. (Usually a limit increase isn’t necessary. See AWS service quotas if you encounter a limit error while building the solution.)
  • An EMR notebook created through the Amazon EMR console, using the notebook file find_best_sellers.ipynb. See Creating a Notebook for instructions on creating an EMR notebook. Record the ID of the EMR notebook (for example, <e-*************************>); you will use this later in this post.

Architecture overview

At a high level, this solution uses Amazon MWAA with Amazon EMR to build pipelines for ETL workflow orchestration. The following diagram illustrates the solution architecture.

The following diagram illustrates the solution architecture.

We use the following services and configurations in this solution:

  • Amazon S3
  • VPC network configurations
  • VPC endpoints

Amazon S3

Amazon MWAA uses an S3 bucket to store DAGs and associated support files. You must create an S3 bucket before you can create the environment, with requirements as mentioned in the Prerequisites section. To use a bucket with an Amazon MWAA environment, you must create the bucket in the same Region where you create the environment. Refer to Create an Amazon S3 bucket for Amazon MWAA for further details.

VPC network configurations

Amazon MWAA requires a VPC network that meets the following requirements:

  • Includes two private subnets that are in two different Availability Zones within the same Region
  • Includes public subnets that are configured to route the private subnet data to the internet (via NAT gateways)

For more information, see Create the VPC network using a AWS CloudFormation template.

The Airflow UI in the Amazon MWAA environment is accessible over the internet by users granted access in the IAM policy. Amazon MWAA attaches an Application Load Balancer with an HTTPS endpoint for your web server as part of the Amazon MWAA managed service. For more information, see How it works.

VPC endpoints

VPC endpoints are highly available VPC components that enable private connections between your VPC and supported AWS services. Traffic between your VPC and the other services remains in your AWS network. For our example, we use the following VPC endpoints to ensure extra security, availability, and Amazon S3 data transfer performance:

  • An Amazon S3 gateway VPC endpoint to establish a private connection between the Amazon MWAA VPC and Amazon S3
  • An EMR interface VPC endpoint to securely route traffic directly to Amazon EMR from Amazon MWAA, instead of connecting over the internet

Setting up an Amazon MWAA environment

To make it easier to get started, we created a CloudFormation template that automatically configures and deploys the Amazon MWAA environment. The template takes care of the following tasks for you:

  • Create an Amazon MWAA execution IAM role.
  • Set up the VPC network for the Amazon MWAA environment, deploying the following resources:
    • A VPC with a pair of public and private subnets spread across two Availability Zones.
    • An internet gateway, with a default route on the public subnets.
    • A pair of NAT gateways (one in each Availability Zone), and default routes for them in the private subnets.
    • Amazon S3 gateway VPC endpoints and EMR interface VPC endpoints in the private subnets in two Availability Zones.
    • A security group to be used by the Amazon MWAA environment that only allows local inbound traffic and all outbound traffic.
  • Create an Amazon MWAA environment. For this post, we select mw1.small for the environment class and choose maximum worker count as 1. For monitoring, we choose to publish environment performance to CloudWatch Metrics. For Airflow logging configuration, we choose to send only the task logs and use log level INFO.

If you want to manually create, configure, and deploy the Amazon MWAA environment without using AWS CloudFormation, see Get started with Amazon Managed Workflows for Apache Airflow (MWAA).

Launching the CloudFormation template

To launch your stack and provision your resources, complete the following steps:

  1. Choose Launch Stack:

This automatically launches AWS CloudFormation in your AWS account with a template. It prompts you to sign in as needed. You can view the template on the AWS CloudFormation console as required. The Amazon MWAA environment is created in the same Region as you launched the CloudFormation stack. Make sure that you create the stack in your intended Region.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The CloudFormation stack requires a few parameters, as shown in the following screenshot.

The following table describes the parameters.

Parameter Description Default Value
Stack name Enter a meaningful name for the stack. We use MWAAEmrNBDemo for this example. Replace it with your own value. None
AirflowBucketName Name of the S3 bucket to store DAGs and support files. The S3 bucket must be in the same Region where you create the environment. The name must start with airflow-. Enter the S3 bucket created as a prerequisite. We use the S3 bucket airflow-emr-demo-us-west-2 for this post. You must replace it with your own value for this field. None
EnvironmentName An MWAA environment name that is prefixed to resource names. All the resources created by this templated are named after the value saved for this field. We name our environment mwaa-emr-blog-demo for this post. Replace it with your own value for this field. mwaa-
PrivateSubnet1CIDR The IP range (CIDR notation) for the private subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.20.0/24
PrivateSubnet2CIDR The IP range (CIDR notation) for the private subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications.. 10.192.21.0/24
PublicSubnet1CIDR The IP range (CIDR notation) for the public subnet in the first Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.10.0/24
PublicSubnet2CIDR The IP range (CIDR notation) for the public subnet in the second Availability Zone. For more information, see AWS CloudFormation VPC stack specifications. 10.192.11.0/24
VpcCIDR The IP range (CIDR notation) for this VPC being created. For more information, see AWS CloudFormation VPC stack specifications. 10.192.0.0/16

The default values for the IP range (CIDR notation) fields refer to the AWS CloudFormation VPC stack specifications. You can make changes based on the requirements of your own network settings.

  1. Enter the parameter values from the preceding table.
  2. Review the details on the Capabilities section and select the check boxes confirming AWS CloudFormation might create IAM resources with custom names.
  3. Choose Create Stack.

Stack creation takes a few minutes. After the CloudFormation stack is complete, on the Resources tab, you can find the resources being created in this CloudFormation stack. Now, we’re ready to run our example.

Orchestrating Hive analytics jobs on EMR Notebooks using Apache Airflow

The following diagram illustrates the workflow: As a user, you first need to create the DAG file that describes how to run the analytics jobs and upload it to the dags folder under the S3 bucket specified. The DAG can be triggered in Apache Airflow UI to orchestrate the job workflow, which includes creating an EMR cluster, waiting for the cluster to be ready, running Hive analytics jobs on EMR notebooks, uploading the results to Amazon S3, and cleaning up the cluster after the job is complete.

The following diagram illustrates the workflow.

Input notebook file

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

Let’s take a look at the following input notebook file find_best_sellers.ipynb, which we use for our example.

find_best_sellers.ipynb is a Python script that does analysis on the public Amazon Customer Reviews Dataset. It generates the top 20 best sellers in a given list of categories over a given period of time and saves the results to the given S3 output location. For demonstration purpose only, we rank the seller simply by the sum of review star ratings from verified purchases.

The explanations of the default parameters in the first cell and each code block are included in the notebook itself.

The last line in the first cell, we have OUTPUT_LOCATION = "s3://airflow-emr-demo-us-west-2/query_output/” as a default value for the input parameter. Replace it with your own value for the output location. You can also supply a different value for this for this parameter in the Airflow Variables later.

DAG file

The DAG file test_dag.py is used to orchestrate our job flow via Apache Airflow. It performs the following tasks:

  1. Create an EMR cluster with one m5.xlarge primary and two m5.xlarge core nodes on release version 6.2.0 with Spark, Hive, Livy and JupyterEnterpriseGateway installed as applications.
  2. Wait until the cluster is up and ready.
  3. Run the notebook find_best_sellers.ipynb on the EMR cluster created in Step 1.
  4. Wait until the notebook run is complete.
  5. Clean up the EMR cluster.

Here is the full source code of the DAG:

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from time import sleep
from datetime import datetime
import boto3, time
from builtins import range
from pprint import pprint
from airflow.operators.sensors import BaseSensorOperator
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator
from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor
from airflow.models import Variable
from airflow.utils import apply_defaults
from airflow.utils.dates import days_ago

# Available categories:
#
# Apparel,Automotive,Baby,Beauty,Books,Camera,Digital_Ebook_Purchase,Digital_Music_Purchase,
# Digital_Software,Digital_Video_Download,Digital_Video_Games,Electronics,Furniture,Gift_Card,
# Grocery,Health_&_Personal_Care,Home,Home_Entertainment,Home_Improvement,Jewelry,Kitchen,
# Lawn_and_Garden,Luggage,Major_Appliances,Mobile_Apps,Mobile_Electronics,Music,Musical_Instruments,
# Office_Products,Outdoors,PC,Personal_Care_Appliances,Pet_Products,Shoes,Software,Sports,Tools,
# Toys,Video,Video_DVD,Video_Games,Watches,Wireless

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')
# =========================================

JOB_FLOW_OVERRIDES = {
    'Name': 'Test-Cluster',
    'ReleaseLabel': 'emr-6.2.0',
    'Applications': [{'Name':'Spark'}, {'Name':'Hive'}, {'Name':'Livy'}, {'Name':'JupyterEnterpriseGateway'}],
    'Configurations': [
          {
            "Classification": "hive-site",
            "Properties": {
                "hive.execution.engine": "spark"
            }
        }
    ],
    'Instances': {
        'Ec2SubnetId': SUBNET_ID,
        'InstanceGroups': [
            {
                'Name': 'Master node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'MASTER',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 1,
            },
            {
                'Name': 'Core node',
                'Market': 'ON_DEMAND',
                'InstanceRole': 'CORE',
                'InstanceType': 'm5.xlarge',
                'InstanceCount': 2,
            }
        ],
        'KeepJobFlowAliveWhenNoSteps': True,
        'TerminationProtected': False,
    },
    'JobFlowRole': 'EMR_EC2_DefaultRole',
    'ServiceRole': 'EMR_DefaultRole',
    'LogUri': EMR_LOG_URI
}


class CustomEmrJobFlowSensor(EmrJobFlowSensor):
    NON_TERMINAL_STATES = ['STARTING', 'BOOTSTRAPPING', 'TERMINATING']

class NotebookExecutionSensor(EmrBaseSensor):
    NON_TERMINAL_STATES = ['START_PENDING', 'STARTING', 'RUNNING', 'FINISHING', 'STOP_PENDING', 'STOPPING']
    FAILED_STATE = ['FAILING', 'FAILED']
    template_fields = ['notebook_execution_id']
    template_ext = ()
    @apply_defaults
    def __init__(self, notebook_execution_id, *args, **kwargs):
        super(NotebookExecutionSensor, self).__init__(*args, **kwargs)
        self.notebook_execution_id = notebook_execution_id
    def get_emr_response(self):
        emr = EmrHook(aws_conn_id=self.aws_conn_id).get_conn()
        self.log.info('Poking notebook execution %s', self.notebook_execution_id)
        return emr.describe_notebook_execution(NotebookExecutionId=self.notebook_execution_id)
    @staticmethod
    def state_from_response(response):
        return response['NotebookExecution']['Status']
    @staticmethod
    def failure_message_from_response(response):
        state_change_reason = response['NotebookExecution']['LastStateChangeReason']
        if state_change_reason:
            return 'Execution failed with reason: ' + state_change_reason
        return None

def start_execution(**context):
    ti = context['task_instance']
    cluster_id = ti.xcom_pull(key='return_value', task_ids='create_cluster_task')
    print("Starting an execution using cluster: " + cluster_id)
    # generate a JSON key-pair of <String : String Array>, e.g. 
    # "\"CATEGORIES\": [\"Apparel\", \"Automotive\", \"Baby\", \"Books\"]"
    categories_escaped_quotes = ""
    for category in CATEGORIES_CSV.split(','):
        categories_escaped_quotes = categories_escaped_quotes + "\"" + category + "\","
    categories_escaped_quotes = categories_escaped_quotes[:-1]
    categories_parameter = "\"CATEGORIES\" : [" + categories_escaped_quotes + "]"

    output_location_parameter = "\"OUTPUT_LOCATION\": \"" + OUTPUT_LOCATION + "\""
    from_date_parameter = "\"FROM_DATE\": \"" + FROM_DATE + "\""
    to_date_parameter = "\"TO_DATE\": \"" + TO_DATE + "\""
    parameters = f"{{ {categories_parameter}, {output_location_parameter}, {from_date_parameter}, {to_date_parameter} }}"
    emr = boto3.client('emr', region_name=REGION)
    start_resp = emr.start_notebook_execution(
        EditorId=NOTEBOOK_ID,
        RelativePath=NOTEBOOK_FILE_NAME,
        ExecutionEngine={'Id': cluster_id, 'Type': 'EMR'},
        NotebookParams=parameters,
        ServiceRole='EMR_Notebooks_DefaultRole'
    )
    execution_id = start_resp['NotebookExecutionId']
    print("Started an execution: " + execution_id)
    return execution_id



with DAG('test_dag', description='test dag', schedule_interval='0 * * * *', start_date=datetime(2020,3,30), catchup=False) as dag:
    create_cluster = EmrCreateJobFlowOperator(
        task_id='create_cluster_task',
        job_flow_overrides=JOB_FLOW_OVERRIDES,
        aws_conn_id='aws_default',
        emr_conn_id='emr_default',
    )
    cluster_sensor = CustomEmrJobFlowSensor(
        task_id='check_cluster_task',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    start_execution = PythonOperator(
        task_id='start_execution_task', 
        python_callable=start_execution,
        provide_context=True
    )
    execution_sensor = NotebookExecutionSensor(
        task_id='check_execution_task',
        notebook_execution_id="{{ task_instance.xcom_pull(task_ids='start_execution_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )

    cluster_remover = EmrTerminateJobFlowOperator(
        task_id='terminate_cluster',
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_cluster_task', key='return_value') }}",
        aws_conn_id='aws_default',
    )
    
    create_cluster >> cluster_sensor >> start_execution >> execution_sensor >> cluster_remover

The very last line of the DAG code explains how the tasks are linked in the orchestration workflow. It’s overloading the right shift >> operator to create a dependency, meaning that the task on the left should be run first, and the output passed to the task on the right.

Instead of hard-coding the variables in the DAG code, we choose to supply these variables by importing a JSON file in the Airflow UI before actually running the DAG. This way, we can also update the variables without having to update the DAG code, which requires updating the DAG file in Amazon S3. We walk you through how to do so in the later steps. You can see the lines for VARIABLES that we repeated:

# =============== VARIABLES ===============
NOTEBOOK_ID = Variable.get('NOTEBOOK_ID')
NOTEBOOK_FILE_NAME = Variable.get('NOTEBOOK_FILE_NAME')
CATEGORIES_CSV = Variable.get('CATEGORIES_CSV')
REGION = Variable.get('REGION')
SUBNET_ID = Variable.get('SUBNET_ID')
EMR_LOG_URI = Variable.get('EMR_LOG_URI')
OUTPUT_LOCATION = Variable.get('OUTPUT_LOCATION')
FROM_DATE = Variable.get('FROM_DATE')
TO_DATE = Variable.get('TO_DATE')

We create a JSON formatted file named variables.json for our example. See the following code:

{
    "REGION": "us-west-2",
    "SUBNET_ID": "<subnet-********>",
    "EMR_LOG_URI": "s3://<S3 path for EMR logs>/",
    "NOTEBOOK_ID": "<e-*************************>",
    "NOTEBOOK_FILE_NAME": "find_best_sellers.ipynb",
    "CATEGORIES_CSV": "Apparel,Automotive,Baby,Beauty,Books",
    "FROM_DATE": "2015-08-25",
    "TO_DATE": "2015-08-31",
    "OUTPUT_LOCATION": "s3://<S3 path for query output>/"
}

To use this JSON code, you need to replace all the variable values (subnet and S3 paths) with the actual values.

Accessing Apache Airflow UI and running the workflow

To run the workflow, complete the following steps:

  1. On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

On the Amazon MWAA console, find the new environment mwaa-emr-blog-demo we created earlier with the CloudFormation template.

  1. Choose Open Airflow UI.
  2. Log in as an authenticated user.

Log in as an authenticated user.

Next, we import the JSON file for the variables into Airflow UI.

As we mentioned earlier, we want to supply the variable values for our DAG definition later upon triggering the DAG in Airflow UI instead of hard-coding the values.

  1. On the Admin menu, choose Variables.
  2. Choose Browse.
  3. Choose json.
  4. Choose Import Variables.

For more information about importing variables, see Variables.

  1. Run the following command in the same directory as where file test_dag.py is to upload the DAG file to the dags folder under the S3 bucket specified for the Airflow environment. Replace <your_airflow_bucket_name> with the S3 bucket name that you created as a prerequisite:
    aws s3 cp test_dag.py s3://<your_airflow_bucket_name>/dags/

test_dag.py should automatically appear in the Airflow UI.

  1. Trigger the DAG by turning it to On

Trigger the DAG by turning it to On

  1. Choose test_dag to go to the detail page for the DAG.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

On the Graph View tab, we can see the whole workflow of our pipeline and each individual task as defined in our DAG code.

  1. Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

Optionally, to trigger the DAG, choose Trigger DAG and add the following JSON formatted configuration before activate the DAG.

You now get an email when failure happens on any of the tasks. You can also configure to get email notification when retry happens as well.

  1. On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

On the Amazon EMR console, find the EMR cluster created by the create_cluster_task definition.

  1. On the Airflow UI, you can switch tabs to check the status of the workflow tasks.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

After a few minutes, we can see on the Tree View tab that the workflow is complete and all the tasks are successful.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

On the Gantt tab, we can see the time distribution of all the tasks of our workflow.

As specified in our DAG definition, the EMR cluster is stopped when the workflow is complete.

Because we use the cron expression 0 * * * * as the scheduled running interval for our workflow, if the triggered status of the DAG is ON, it runs every hour. You need to switch the status to OFF if you don’t want it to run again.

  1. On the Amazon S3 console, view the result of our notebook job in the S3 folder.

On the Amazon S3 console, view the result of our notebook job in the S3 folder.

For example, the following screenshot is the output for the Books category that we provided as a value in the CATEGORIES parameter. As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

As we can see, Go Set a Watchman: A Novel is the best Books seller from the week of 8-25-2015 to 8-31-2015.

Cleaning up

To avoid ongoing charges, delete the CloudFormation stack and any files in Amazon S3 that were created by running the examples in this post.

Conclusion

This post showed how to use the Amazon EMR Notebooks API and use orchestration services such as Amazon MWAA to build ETL pipelines. It demonstrated how set up a secured Amazon MWAA environment using a CloudFormation template and run a sample workflow with Apache Airflow.

If you want to learn how to run Amazon EMR applications such as PySpark with Amazon MWAA, see Running Spark Jobs on Amazon EMR with Apache Airflow.


About the Authors

Fei Lang is a senior big data architect at Amazon Web Services. She is passionate about building the right big data solution for customers. In her spare time, she enjoys the scenery of the Pacific Northwest, going for a swim, and spending time with her family.

 

 

Ray Liu is a software development engineer at AWS. Besides work, he enjoys traveling and spending time with family.

Run Apache Spark 3.0 workloads 1.7 times faster with Amazon EMR runtime for Apache Spark

Post Syndicated from Al MS original https://aws.amazon.com/blogs/big-data/run-apache-spark-3-0-workloads-1-7-times-faster-with-amazon-emr-runtime-for-apache-spark/

With Amazon EMR release 6.1.0, Amazon EMR runtime for Apache Spark is now available for Spark 3.0.0. EMR runtime for Apache Spark is a performance-optimized runtime for Apache Spark that is 100% API compatible with open-source Apache Spark.

In our benchmark performance tests using TPC-DS benchmark queries at 3 TB scale, we found EMR runtime for Apache Spark 3.0 provides a 1.7 times performance improvement on average, and up to 8 times improved performance for individual queries over open-source Apache Spark 3.0.0. With Amazon EMR 6.1.0, you can now run your Apache Spark 3.0 applications faster and cheaper without requiring any changes to your applications.

Results observed using TPC-DS benchmarks

To evaluate the performance improvements, we used TPC-DS benchmark queries with 3 TB scale and ran them on a 6-node c4.8xlarge EMR cluster with data in Amazon Simple Storage Service (Amazon S3). We ran the tests with and without the EMR runtime for Apache Spark. The following two graphs compare the total aggregate runtime and geometric mean for all queries in the TPC-DS 3 TB query dataset between the Amazon EMR releases.

The following table shows the total runtime in seconds.

The following table shows the total runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

The following table shows the geometric mean of the runtime in seconds.

In our tests, all queries ran successfully on EMR clusters that used the EMR runtime for Apache Spark. However, when using Spark 3.0 without the EMR runtime, 34 out of the 104 benchmark queries failed due to SPARK-32663. To work around these issues, we disabled spark.shuffle.readHostLocalDisk configuration. However, even after this change, queries 14a and 14b continued to fail. Therefore, we chose to exclude these queries from our benchmark comparison.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is illustrated in the following chart. The horizontal axis shows each query in the TPC-DS 3 TB benchmark. The vertical axis shows the speedup of each query due to the EMR runtime. We found a 1.7 times performance improvement as measured by the geometric mean of the per-query speedups, with all queries showing a performance improvement with the EMR Runtime.

The per-query speedup on Amazon EMR 6.1 with and without EMR runtime is also illustrated in the following chart.

Conclusion

You can run your Apache Spark 3.0 workloads faster and cheaper without making any changes to your applications by using Amazon EMR 6.1. To keep up to date, subscribe to the Big Data blog’s RSS feed to learn about more great Apache Spark optimizations, configuration best practices, and tuning advice.


About the Authors

AI MSAl MS is a product manager for Amazon EMR at Amazon Web Services.

 

 

 

 

Peter Gvozdjak is a senior engineering manager for EMR at Amazon Web Services.

Building complex workflows with Amazon MWAA, AWS Step Functions, AWS Glue, and Amazon EMR

Post Syndicated from Dipankar Ghosal original https://aws.amazon.com/blogs/big-data/building-complex-workflows-with-amazon-mwaa-aws-step-functions-aws-glue-and-amazon-emr/

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a fully managed service that makes it easy to run open-source versions of Apache Airflow on AWS and build workflows to run your extract, transform, and load (ETL) jobs and data pipelines.

You can use AWS Step Functions as a serverless function orchestrator to build scalable big data pipelines using services such as Amazon EMR to run Apache Spark and other open-source applications on AWS in a cost-effective manner, and use AWS Glue for a serverless environment to prepare (extract and transform) and load large amounts of datasets from a variety of sources for analytics and data processing with Apache Spark ETL jobs

For production pipelines, a common use case is to read data originating from a variety of sources. This data requires transformation to extract business value and generate insights before sending to downstream applications, such as machine learning algorithms, analytics dashboards, and business reports.

This post demonstrates how to use Amazon MWAA as a primary workflow management service to create and run complex workflows and extend the directed acyclic graph (DAG) to start and monitor a state machine created using Step Functions. In Airflow, a DAG is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

Architectural overview

The following diagram illustrates the architectural overview of the components involved in the orchestration of the workflow. This workflow uses Amazon EMR to preprocess data and starts a Step Functions state machine. The state machine transforms data using AWS Glue.

The state machine transforms data using AWS Glue.

The workflow includes the following core components:

  1. Airflow Scheduler triggers the DAG based on a schedule or manually.
  2. DAG uses PythonOperator to create an EMR cluster and waits for the cluster creation process to complete.
  3. DAG uses a custom operator EmrSubmitAndMonitorStepOperator to submit and monitor the Amazon EMR step.
  4. DAG uses PythonOperator to stop the EMR cluster when the preprocessing tasks are complete.
  5. DAG starts a Step Functions state machine and monitors it for completion using PythonOperator.

You can build complex ETL pipelines with Step Functions separately and trigger them from an Airflow DAG.

Prerequisites

Before starting, create an Amazon MWAA environment. If this is your first time using Amazon MWAA, see Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs. It’s located on the environment details page on the Amazon MWAA console.

Take a note of the Amazon Simple Storage Service (Amazon S3) bucket that stores the DAGs.

Also note the AWS Identity and Access Management (IAM) execution role. This role should be modified to allow MWAA to read and write from your S3 bucket, submit an Amazon EMR step, start a Step Functions state machine, and read from the AWS Systems Manager Parameter Store. The IAM role is available in the Permissions section of the environment details.

The IAM role is available in the Permissions section of the environment details.

The solution references Systems Manager parameters in an AWS CloudFormation template and scripts. For information on adding and removing IAM identity permissions, see Adding and removing IAM identity permissions. A sample IAM policy is also provided in the GitHub repository amazon-mwaa-complex-workflow-using-step-functions.

For this post, we use the MovieLens dataset. We concurrently convert the MovieLens CSV files to Parquet format and save them to Amazon S3 as part of preprocessing.

Setting up the state machine using Step Functions

Our solution extends the ETL pipeline to run a Step Functions state machine from the Airflow DAG. Step Functions lets you build visual workflows that enable fast translation of business requirements into technical requirements. With Step Functions, you can set up dependency management and failure handling using a JSON-based template. A workflow is a series of steps, such as tasks, choices, parallel runs, and timeouts with the output of one step acting as input into the next. For more information about other use cases, see AWS Step Functions Use Cases.

The following diagram shows the ETL process set up through a Step Functions state machine.

The following diagram shows the ETL process set up through a Step Functions state machine.

In the workflow, the Process Data step runs an AWS Glue job, and the Get Job Status step periodically checks for the job completion. The AWS Glue job reads the input datasets and creates output data for the most popular movies and top-rated movies. After the job is complete, the Run Glue Crawler step runs an AWS Glue crawler to catalog the data. The workflow also allows you to monitor and respond to failures at any stage.

Creating resources

Create your resources by following the installation instructions provided in the amazon-mwaa-complex-workflow-using-step-functions README.md.

Running the ETL workflow

To run your ETL workflow, complete the following steps:

  1. On the Amazon MWAA console, choose Open Airflow UI.
  2. Locate the mwaa_movielens_demo DAG.
  3. Turn on the DAG.

Turn on the DAG.

  1. Select the mwaa_movielens_demo DAG and choose Graph View.

This displays the overall ETL pipeline managed by Airflow.

This displays the overall ETL pipeline managed by Airflow.

  1. To view the DAG code, choose Code.

To view the DAG code, choose Code.

The code for the custom operator can be found in the amazon-mwaa-complex-workflow-using-step-functions GitHub repo. 

  1. From the Airflow UI, select the mwaa_movielens_demo DAG and choose Trigger DAG.
  2. Leave the Optional Configuration JSON box blank.

Leave the Optional Configuration JSON box blank.

When the Airflow DAG runs, the first task calls the PythonOperator to create an EMR cluster using Boto3. Boto is the AWS SDK for Python. It enables Python developers to create, configure, and manage AWS services, such as Amazon Elastic Compute Cloud (Amazon EC2) and Amazon S3. Boto provides object-oriented API, as well as low-level access to AWS services.

The second task waits until the EMR cluster is ready and in the Waiting state. As soon as the cluster is ready, the data load task runs, followed by the data preprocessing tasks, which are started in parallel using EmrSubmitAndMonitorStepOperator. Concurrency in the current Airflow DAG is set to 3, which runs three tasks in parallel. You can change the concurrency of Amazon EMR to run multiple Amazon EMR steps in parallel.

When the data preprocessing tasks are complete, the EMR cluster is stopped and the DAG starts the Step Functions state machine to initiate data transformation.

The final task in the DAG monitors the completion of the Step Functions state machine.

The DAG run should complete in approximately 10 minutes.

Verifying the DAG run

While the DAG is running, you can view the task logs.

  1. From Graph View, select any task and choose View Log.

From Graph View, select any task and choose View Log.

  1. When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

When the DAG starts the Step Functions state machine, verify the status on the Step Functions console.

  1. You can also monitor ETL process completion from the Airflow UI.

You can also monitor ETL process completion from the Airflow UI.

  1. On the Airflow UI, verify the completion from the log entries.

On the Airflow UI, verify the completion from the log entries.

Querying the data

After the successful completion of the Airflow DAG, two tables are created in the AWS Glue Data Catalog. To query the data with Amazon Athena, complete the following steps:

  1. On the Athena console, choose Databases.
  2. Select the mwaa-movielens-demo-db database.

You should see the two tables. If the tables aren’t listed, verify that the AWS Glue crawler run is complete and that the console is showing the correct Region.

  1. Run the following query:
    SELECT * FROM "mwaa-movielens-demo-db"."most_popular_movies" limit 10;

The following screenshot shows the output.

The following screenshot shows the output.

Cleaning up

To clean up the resources created as part of our CloudFormation template, delete the mwaa-demo-foundations stack. You can either use the AWS CloudFormation console or the AWS Command Line Interface (AWS CLI).

Conclusion

In this post, we used Amazon MWAA to orchestrate an ETL pipeline on Amazon EMR and AWS Glue with Step Functions. We created an Airflow DAG to demonstrate how to run data processing jobs concurrently and extended the DAG to start a Step Functions state machine to build a complex ETL pipeline. A custom Airflow operator submitted and then monitored the Amazon EMR steps synchronously.

If you have comments or feedback, please leave them in the comments section.


About the Author

Dipankar GhosalDipankar Ghosal is a Sr Data Architect at Amazon Web Services and is based out of Minneapolis, MN. He has a focus in analytics and enjoys helping customers solve their unique use cases. When he’s not working, he loves going hiking with his wife and daughter.

Introducing Amazon EMR integration with Apache Ranger

Post Syndicated from Varun Rao Bhamidimarri original https://aws.amazon.com/blogs/big-data/introducing-amazon-emr-integration-with-apache-ranger/

Data security is an important pillar in data governance. It includes authentication, authorization , encryption and audit.

Amazon EMR enables you to set up and run clusters of Amazon Elastic Compute Cloud (Amazon EC2) instances with open-source big data applications like Apache Spark, Apache Hive, Apache Flink, and Presto. You may also want to set up multi-tenant EMR clusters where different users (or teams) can use a shared EMR cluster to run big data analytics workloads. In a multi-tenant cluster, it becomes important to set up mechanisms for authentication (determine who is invoking the application and authenticate the user), authorization (set up who has access to what data), and audit (maintain a log of who accessed what data).

Apache Ranger is an open-source project that provides authorization and audit capabilities for Hadoop and related big data applications like Apache Hive, Apache HBase, and Apache Kafka.

We’re happy to share that starting with Amazon EMR 5.32, we’re including plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon Simple Storage Service (Amazon S3), and Apache Hive.

You can set up a multi-tenant EMR cluster, use Kerberos for user authentication, use Apache Ranger 2.0 (managed separately outside the EMR cluster) for authorization, and configure fine-grained data access policies for databases, tables, columns, and S3 objects. In this post, we explain how you can set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. We show how you can set up multiple short-running and long-running EMR clusters with a single, centralized Apache Ranger server that maintains data access control policies.

Managed Apache Ranger plugins for PrestoSQL and PrestoDB will soon follow.

You should consider this solution if one or all of these apply:

  • Have experience setting up and managing Apache Ranger admin server (needs to be self-managed)
  • Want to port existing Apache Ranger Hive policies over to Amazon EMR
  • Need to use the database-backed Hive Metastore and can’t use the AWS Glue Data Catalogdue to limitations
  • Require authorization support for Apache Spark (SQL and storage and file access) and Amazon S3
  • Store Apache Ranger authorization audits in Amazon Cloudwatch, avoiding the need to maintain an Apache Solr infrastructure

With this native integration, you use the Amazon EMR security configuration to specify Apache Ranger details, without the need for custom bootstrap scripts. You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

You can reuse existing Apache Hive Ranger policies, including support for row-level filters and column masking.

The following image shows table and column-level access set up for Apache SparkSQL.

Additionally, SSH users are blocked from getting AWS Identity and Access Management (IAM) permissions tied with the Amazon EMR instance profiles. This disables  access to Amazon S3 using tools like the AWS Command Line Interface(AWS CLI).

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI.

The following screenshot that shows access to Amazon S3 blocked when using AWS CLI. 

The following screenshots shows how access to the same Amazon S3 location is set up and used through EMRFS (default EMR file system implementation for reading and writing files from Amazon S3).

Prerequisites

Before getting started, you must have the following prerequisites:

  • Self-managed Apache Ranger server (2.x only) outside of an EMR cluster
  • TLS mutual authentication enabled between Apache Ranger server and Apache Ranger plugins running on the EMR cluster
  • Additional IAM roles:
    • IAM role for Apache Ranger– Defines privileges that trusted processes have when submitting Spark and Hive jobs
    • IAM role for other AWS services– Defines privileges that end-users have when accessing services that aren’t protected by Apache Ranger plugins.
  • Updates to the Amazon EC2 EMR role:
  • New Apache Ranger service definitions installed for Apache Spark and Amazon S3
  • Apache Ranger server certificate and private key for plugins uploaded into Secrets Manager
  • A CloudWatch log group for Apache Ranger audits

Architecture overview

The following diagram illustrates the architecture for this solution.

The following diagram illustrates the architecture for this solution.

In the architecture, the Amazon EMR secret agent intercepts user requests and vends credentials based on user and resources. The Amazon EMR record server receives requests to access data from Spark, reads data from Amazon S3, and returns filtered data based on Apache Ranger policies.

See Amazon EMR Components to learn more about Amazon EMR Secret Agent and Record Server.

Setting up your resources

In this section, we walk you through setting up your resources manually.

If you want to use CloudFormation scripts to automate the setup, see the section Setting up your architecture with CloudFormation later in this post.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see this script.

Uploading SSL private keys and certificates to Secrets Manager

Upload the private keys for the Apache Ranger plugins and SSL certification of the Apache Ranger server to Secrets Manager. When the EMR cluster starts up, it uses these files to configure the plugin. For reference, see the script create-tls-certs.sh.

Setting up an Apache Ranger server

You need to set up a two-way SSL-enabled Apache Ranger server. To set up the server manually, refer to the script install-ranger-admin-server.sh.

Installing Apache Ranger service definitions

In this section, we review installing the Apache Ranger service definitions for Apache Spark and Amazon S3.

Apache Spark

To add a new Apache Ranger service definition, see the following script:

mkdir /tmp/emr-spark-plugin/
cd /tmp/emr-spark-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-spark.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-spark-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark
mv ranger-spark-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-spark

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-spark.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

This script is included in the Apache Ranger server setup script, if you’re deploying resources with the CloudFormation template.

The policy definition is similar to Apache Hive, except that the actions are limited to select only. The following screenshot shows the definition settings.

The following screenshot shows the definition settings.

To change permissions, for the user, choose select.

To change permissions, for the user, choose select.

Amazon S3 (via Amazon EMR File System)

Similar to Apache Spark, we have a new Apache Ranger service definition for Amazon S3. See the following script:

mkdir /tmp/emr-emrfs-s3-plugin/
cd /tmp/emr-emrfs-s3-plugin/

# Download the Service definition
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-servicedef-amazon-emr-emrfs.json

# Download Service implementation jar/class
wget https://s3.amazonaws.com/elasticmapreduce/ranger/service-definitions/version-2.0/ranger-emr-emrfs-plugin-2.x.jar

# Copy Service implementation jar to Ranger server
export RANGER_HOME=.. # Replace this Ranger Admin's home directory eg /usr/lib/ranger/ranger-2.0.0-admin
mkdir $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs
mv ranger-emrfs-s3-plugin-2.x.jar $RANGER_HOME/ews/webapp/WEB-INF/classes/ranger-plugins/amazon-emr-emrfs 

# Add the service definition using the Ranger REST API
curl -u <admin_user_login>:<password_for_ranger_admin_user> -X POST -d @ranger-servicedef-amazon-emr-emrfs.json \
-H "Accept: application/json" \
-H "Content-Type: application/json" \
-k 'https://*<RANGER SERVER ADDRESS>*:6182/service/public/v2/api/servicedef'

If you’re using the CloudFormation template, this script is included in the Apache Ranger server setup script.

The following screenshot shows the policy details.

The following screenshot shows the policy details.

You can enable standard Amazon S3 access permissions in this policy.

You can enable standard Amazon S3 access permissions in this policy. 

Importing your existing Apache Hive policies

You can import your existing Apache Hive policies into the Apache Ranger server tied to the EMR cluster. For more information, see User Guide for Import-Export.

The following image shows how to use Apache Ranger’s export and import option.

 

CloudWatch for Apache Ranger audits

Apache Ranger audits are sent to CloudWatch. You should create a new Cloudwatch log group and specify that in the security configuration. See the following code:

aws logs create-log-group --log-group-name /aws/emr/rangeraudit/

You can search audit information using CloudWatch Insights. The following screenshot shows a query.

The following screenshot shows a query.
The following screenshot shows a query.

New Amazon EMR security configuration

The new Amazon EMR security configuration requires the following inputs:

  • IP address of the Apache Ranger server
  • IAM role for the Apache Ranger service (see the GitHub repo) running on the EMR cluster and accessing other AWS services (see the GitHub repo)
  • Secrets Manager name with the Apache Ranger admin server certificate
  • Secrets Manager name with the private key used by the plugins
  • CloudWatch log group name

The following code is an example of using the AWS CLI to create this security configuration:

aws emr create-security-configuration --name MyEMRRangerSecurityConfig --security-configuration
'{
   "EncryptionConfiguration":{
      "EnableInTransitEncryption":false,
      "EnableAtRestEncryption":false
   },
   "AuthenticationConfiguration":{
      "KerberosConfiguration":{
         "Provider":"ClusterDedicatedKdc",
         "ClusterDedicatedKdcConfiguration":{
            "TicketLifetimeInHours":24
         }
      }
   },
   "AuthorizationConfiguration":{
      "RangerConfiguration":{
         "AdminServerURL":"https://<RANGER ADMIN SERVER IP>:8080",
         "RoleForRangerPluginsARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<RANGER PLUGIN DATA ACCESS ROLE NAME>",
         "RoleForOtherAWSServicesARN":"arn:aws:iam::<AWS ACCOUNT ID>:role/<USER ACCESS ROLE NAME>",
         "AdminServerSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES ADMIN SERVERS PUBLIC TLS CERTICATE>",
         "RangerPluginConfigurations":[
            {
               "App":"Spark",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES SPARK PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"spark-policy-repository"
            },
            {
               "App":"Hive",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES HIVE PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"hive-policy-repository"
            },
            {
               "App":"EMRFS-S3",
               "ClientSecretARN":"arn:aws:secretsmanager:us-east-1:<AWS ACCOUNT ID>:secret:<SECRET NAME THAT PROVIDES EMRFS S3 PLUGIN PRIVATE TLS CERTICATE>",
               "PolicyRepositoryName":"emrfs-policy-repository"
            }
         ],
         "AuditConfiguration":{
            "Destinations":{
               "AmazonCloudWatchLogs":{
                  "CloudWatchLogGroup":"arn:aws:logs:us-east-1:<AWS ACCOUNT ID>:log-group:<LOG GROUP NAME FOR AUDIT EVENTS>"
               }
            }
         }
      }
   }
}'

Install Amazon EMR cluster with Kerberos

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

Setting up your architecture with CloudFormation

To help you get started, we added a new GitHub repo with setup instructions. The following diagram shows the logical architecture after the CloudFormation stack is fully deployed. Review the roadmap for future enhancements.

Start the cluster by choosing Amazon EMR version 5.32 and this newly created security configuration.

To set up this architecture using CloudFormation, complete the following steps:

  1. Use the create-tls-certs.sh script to upload the SSL key and certifications to Secrets Manager.
  2. Set up the VPC or Active Directory server by launching the following CloudFormation template.
  3. Verify DHCP options to make sure the domain name servers for the VPC are listed in the right order (LDAP/AD server first, followed by AmazonProvidedDNS).
  4. Set up the Apache Ranger server,  Amazon Relational Database Service (Amazon RDS) instance, and EMR cluster by launching the following CloudFormation template.

Limitations

When using this solution, keep in mind the following limitations:

  • As of this writing, Amazon EMR 6.x isn’t supported (only Amazon EMR 5.32+ is supported)
  • Non-Kerberos clusters will not be supported.
  • Jobs must be submitted through Apache Zeppelin, Hue, Livy, and SSH.
  • Only selected applications can be installed on the Apache Ranger-enabled EMR cluster, such as Hadoop, Tez and Ganglia. For a full list, see Supported Applications. The cluster creation request is rejected if you choose applications outside this supported list.
  • As of this writing, the SparkSQL plugin doesn’t support column masking and row-level filters.
  • The SparkSQL INSERT INTO and INSERT OVERWRITE overrides aren’t supported.
  • You can’t view audits on the Apache Ranger UI as they’re sent to CloudWatch.
  • The AWS Glue Data Catalog isn’t supported as the Apache Hive Metastore.

Available now

Native support for Apache Ranger 2.0 with Apache Hive, Apache Spark, and Amazon S3 is available today in the following AWS Regions:

  • US East (Ohio)
  • US East (N. Virginia)
  • US West (N. California)
  • US West (Oregon)
  • Africa (Cape Town)
  • Asia Pacific (Hong Kong)
  • Asia Pacific (Mumbai)
  • Asia Pacific (Seoul)
  • Asia Pacific (Singapore)
  • Asia Pacific (Sydney)
  • Canada (Central)
  • Europe (Frankfurt)
  • Europe (Ireland)
  • Europe (London)
  • Europe (Paris)
  • Europe (Milan)
  • Europe (Stockholm)
  • South America (São Paulo)
  • Middle East (Bahrain)

For the latest Region availability, see Amazon EMR Management Guide.

Conclusion

Amazon EMR 5.32 includes plugins to integrate with Apache Ranger 2.0 that enable authorization and audit capabilities for Apache SparkSQL, Amazon S3, and Apache Hive. This post demonstrates how to set up Amazon EMR to use Apache Ranger for data access controls for Apache Spark and Apache Hive workloads on Amazon EMR. If you have any thoughts of questions, please leave them in the comments.


About the Author

Varun Rao Bhamidimarri is a Sr Manager, AWS Analytics Specialist Solutions Architect team. His focus is helping customers with adoption of cloud-enabled analytics solutions to meet their business requirements. Outside of work, he loves spending time with his wife and two kids, stay healthy, mediate and recently picked up garnering during the lockdown.